Source code for preferences_store

# preferences_store.py
from __future__ import annotations

import copy
import os
from datetime import datetime, timezone
from typing import Any

from motor.motor_asyncio import AsyncIOMotorClient

MONGO_URI = os.getenv("MONGO_URI", "mongodb://mongo:27017")
MONGO_DB = os.getenv("MONGO_DB", "octopus")
MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "preferences")

WORKSPACE_CONTENT_PATCH_KEYS = {
    "layout",
    "visibleMap",
    "widget_configs",
    "collapsedMap",
    "collapsedHeights",
    "panels",
    "panelOrder",
    "panelLayout",
}
WORKSPACE_PATCH_KEYS = WORKSPACE_CONTENT_PATCH_KEYS | {
    "workspaces",
}
MUTABLE_SHARED_WORKSPACE_KEYS = WORKSPACE_CONTENT_PATCH_KEYS | {
    "workspaceTarget",
}
CLIENT_ONLY_CONFIG_KEYS = {
    "_initialized",
    "_lastUserId",
    "error",
    "shared_workspace_library",
    "sharedWorkspaceLibrary",
}
MERGED_TOP_LEVEL_KEYS = {"widget_refresh_rates", "widget_propagation"}
REPLACED_TOP_LEVEL_KEYS = {
    "theme",
    "spacing",
    "spacingHorizontal",
    "spacingVertical",
    "edit",
    "currentWorkspace",
    "timeRange",
    "header_clocks",
    "remote_widgets",
    "variables",
    "workspace_versions",
}
MAX_WORKSPACE_VERSION_HISTORY = 1

_client = AsyncIOMotorClient(MONGO_URI)
_collection = _client[MONGO_DB][MONGO_COLLECTION]


def _deep_copy(value: Any) -> Any:
    return copy.deepcopy(value)


def _deep_merge(base: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
    out = _deep_copy(base)
    for key, value in patch.items():
        current = out.get(key)
        if isinstance(current, dict) and isinstance(value, dict):
            out[key] = _deep_merge(current, value)
        else:
            out[key] = _deep_copy(value)
    return out


def _deep_equal(left: Any, right: Any) -> bool:
    return left == right


def _normalize_string_list(value: Any) -> list[str]:
    if not isinstance(value, list):
        return []
    out: list[str] = []
    seen: set[str] = set()
    for item in value:
        text = str(item or "").strip()
        if not text:
            continue
        key = text.lower()
        if key in seen:
            continue
        seen.add(key)
        out.append(text)
    return out


def _normalize_access(value: Any) -> str:
    normalized = str(value or "").strip().lower()
    if normalized in {"edit", "editor", "write", "readwrite", "read-write"}:
        return "edit"
    return "view"


def _extract_grants(cfg: dict[str, Any] | None) -> list[dict[str, Any]]:
    if not cfg or not isinstance(cfg, dict):
        return []
    grants = cfg.get("shared_workspace_grants")
    if not isinstance(grants, list):
        grants = cfg.get("sharedWorkspaceGrants")
    if not isinstance(grants, list):
        return []
    return [g for g in grants if isinstance(g, dict)]


def _has_grants_key(cfg: dict[str, Any] | None) -> bool:
    if not cfg or not isinstance(cfg, dict):
        return False
    return "shared_workspace_grants" in cfg or "sharedWorkspaceGrants" in cfg


def _normalize_grant(
    raw: dict[str, Any], default_owner: str = ""
) -> dict[str, Any] | None:
    if not isinstance(raw, dict):
        return None
    name = str(raw.get("name") or "").strip()
    if not name:
        return None
    owner = str(raw.get("owner") or default_owner or "").strip()
    roles = _normalize_string_list(raw.get("roles"))
    legacy_role = str(raw.get("role") or "").strip()
    if legacy_role and legacy_role.lower() not in {
        role.lower() for role in roles
    }:
        roles.append(legacy_role)
    grant = {
        "id": str(raw.get("id") or f"{owner or 'shared'}::{name}").strip(),
        "name": name,
        "owner": owner,
        "roles": roles,
        "groups": _normalize_string_list(raw.get("groups")),
        "users": _normalize_string_list(raw.get("users")),
        "access": _normalize_access(raw.get("access")),
    }
    workspace = raw.get("workspace")
    if isinstance(workspace, dict):
        grant["workspace"] = _deep_copy(workspace)
    variables = raw.get("variables")
    if isinstance(variables, dict):
        grant["variables"] = {
            str(key).strip().upper(): "" if value is None else str(value)
            for key, value in variables.items()
            if str(key).strip()
        }
    return grant


def _normalize_grants(
    cfg: dict[str, Any] | None, default_owner: str = ""
) -> list[dict[str, Any]]:
    out: list[dict[str, Any]] = []
    seen: set[str] = set()
    for raw in _extract_grants(cfg):
        grant = _normalize_grant(raw, default_owner)
        if not grant:
            continue
        grant_id = str(grant.get("id") or "").strip()
        if grant_id and grant_id in seen:
            continue
        if grant_id:
            seen.add(grant_id)
        out.append(grant)
    return out


def _coerce_current_workspace(
    config: dict[str, Any], requested: Any | None = None
) -> None:
    has_workspaces_key = "workspaces" in config
    workspaces = _normalize_workspace_map(config.get("workspaces"))
    if has_workspaces_key or workspaces:
        config["workspaces"] = workspaces
    workspace_names = list(workspaces.keys())
    requested_name = _normalize_workspace_name(requested)
    if requested_name and requested_name in workspaces:
        config["currentWorkspace"] = requested_name
        return

    current = _normalize_workspace_name(config.get("currentWorkspace"))
    if current and current in workspaces:
        config["currentWorkspace"] = current
        return

    if workspace_names:
        config["currentWorkspace"] = workspace_names[0]
    else:
        config.pop("currentWorkspace", None)


def _sanitize_config(
    cfg: dict[str, Any] | None, *, coerce_workspace: bool = True
) -> dict[str, Any]:
    if not cfg or not isinstance(cfg, dict):
        return {}
    cleaned = {
        key: _deep_copy(value)
        for key, value in cfg.items()
        if key not in CLIENT_ONLY_CONFIG_KEYS
        and key not in {"sharedWorkspaceGrants", "workspaceTarget"}
    }
    if "workspaces" in cleaned:
        cleaned["workspaces"] = _normalize_workspace_map(
            cleaned.get("workspaces")
        )
    raw_header_clocks = cleaned.pop("headerClocks", None)
    if isinstance(cleaned.get("header_clocks"), dict):
        cleaned["header_clocks"] = _deep_copy(cleaned["header_clocks"])
    elif isinstance(raw_header_clocks, dict):
        cleaned["header_clocks"] = _deep_copy(raw_header_clocks)
    if coerce_workspace:
        _coerce_current_workspace(cleaned)
    if _has_grants_key(cfg):
        cleaned["shared_workspace_grants"] = _normalize_grants(cfg)
    return cleaned


def _current_timestamp() -> str:
    return datetime.now(timezone.utc).isoformat()


def _trim_history(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
    if len(entries) <= MAX_WORKSPACE_VERSION_HISTORY:
        return entries
    return entries[-MAX_WORKSPACE_VERSION_HISTORY:]


def _next_workspace_version(history: list[dict[str, Any]]) -> int:
    if not history:
        return 1
    return max(int(item.get("version") or 0) for item in history) + 1


def _latest_workspace_version(
    config: dict[str, Any], workspace_name: str | None
) -> int | None:
    if not workspace_name:
        return None
    raw_versions = config.get("workspace_versions")
    if not isinstance(raw_versions, dict):
        return None
    history = raw_versions.get(workspace_name)
    if not isinstance(history, list):
        return None
    latest = 0
    for item in history:
        if not isinstance(item, dict):
            continue
        latest = max(latest, int(item.get("version") or 0))
    return latest or None


def _normalize_workspace_name(value: Any) -> str:
    return str(value or "").strip()


def _normalize_workspace_map(
    raw_workspaces: Any,
) -> dict[str, dict[str, Any]]:
    if not isinstance(raw_workspaces, dict):
        return {}
    normalized: dict[str, dict[str, Any]] = {}
    for raw_name, workspace in raw_workspaces.items():
        name = _normalize_workspace_name(raw_name)
        if not name or not isinstance(workspace, dict):
            continue
        normalized[name] = _deep_copy(workspace)
    return normalized


def _workspace_names(config: dict[str, Any]) -> list[str]:
    return list(_normalize_workspace_map(config.get("workspaces")).keys())


def _workspace_mutation_result(
    user_id: str, config: dict[str, Any]
) -> dict[str, Any]:
    names = _workspace_names(config)
    return {
        "userId": user_id,
        "currentWorkspace": config.get("currentWorkspace"),
        "workspaceNames": names,
        "workspaceCount": len(names),
    }


def _target_workspace_name(
    config: dict[str, Any], patch: dict[str, Any]
) -> str | None:
    target = patch.get("workspaceTarget")
    if isinstance(target, str) and target.strip():
        return target.strip()
    current = patch.get("currentWorkspace")
    if isinstance(current, str) and current.strip():
        return current.strip()
    existing = config.get("currentWorkspace")
    if isinstance(existing, str) and existing.strip():
        return existing.strip()
    raw_versions = patch.get("workspace_versions")
    if isinstance(raw_versions, dict):
        candidates = [
            _normalize_workspace_name(name)
            for name in raw_versions.keys()
            if _normalize_workspace_name(name)
        ]
        deduped = list(dict.fromkeys(candidates))
        if len(deduped) == 1:
            return deduped[0]
    return None


def _apply_workspace_patch(
    config: dict[str, Any], patch: dict[str, Any]
) -> None:
    workspaces = _normalize_workspace_map(config.get("workspaces"))

    shared_targeted_patch = bool(
        isinstance(patch.get("workspaceTarget"), str)
        and patch.get("workspaceTarget").strip()
    )

    if (
        "workspaces" in patch
        and isinstance(patch.get("workspaces"), dict)
        and not shared_targeted_patch
    ):
        # Full workspace-map replacement. Null/invalid entries are tombstones.
        workspaces = _normalize_workspace_map(patch["workspaces"])

    if any(key in patch for key in WORKSPACE_CONTENT_PATCH_KEYS):
        target = _target_workspace_name(config, patch)
        if target and not isinstance(workspaces.get(target), dict):
            if len(workspaces) == 0:
                workspaces[target] = {}
            else:
                target = None
        if target and isinstance(workspaces.get(target), dict):
            workspace = _deep_copy(workspaces[target])
            for key in WORKSPACE_CONTENT_PATCH_KEYS:
                if key not in patch:
                    continue
                value = patch[key]
                if key == "widget_configs" and isinstance(value, dict):
                    current = workspace.get("widget_configs")
                    if not isinstance(current, dict):
                        current = {}
                    workspace["widget_configs"] = _deep_merge(current, value)
                elif key in {
                    "collapsedMap",
                    "collapsedHeights",
                } and isinstance(value, dict):
                    current = workspace.get(key)
                    if not isinstance(current, dict):
                        current = {}
                    workspace[key] = _deep_merge(current, value)
                else:
                    workspace[key] = _deep_copy(value)
            workspaces[target] = workspace

    config["workspaces"] = workspaces


def _append_workspace_versions(
    before: dict[str, Any], after: dict[str, Any], patch: dict[str, Any]
) -> None:
    if "workspace_versions" in patch:
        return
    if not any(key in patch for key in WORKSPACE_PATCH_KEYS):
        return

    before_workspaces = _normalize_workspace_map(before.get("workspaces"))
    after_workspaces = _normalize_workspace_map(after.get("workspaces"))

    raw_versions = before.get("workspace_versions")
    history_by_workspace = (
        _deep_copy(raw_versions) if isinstance(raw_versions, dict) else {}
    )
    saved_at = _current_timestamp()

    for workspace_name, next_workspace in after_workspaces.items():
        previous_workspace = before_workspaces.get(workspace_name)
        if _deep_equal(previous_workspace, next_workspace):
            continue

        history = history_by_workspace.get(workspace_name)
        if not isinstance(history, list):
            history = []
        latest_workspace = (
            history[-1].get("workspace")
            if history and isinstance(history[-1], dict)
            else None
        )
        if _deep_equal(latest_workspace, next_workspace):
            continue

        history.append(
            {
                "version": _next_workspace_version(history),
                "savedAt": saved_at,
                "workspace": _deep_copy(next_workspace),
            }
        )
        history_by_workspace[workspace_name] = _trim_history(history)

    # Drop history for workspaces that no longer exist after full-map updates.
    for workspace_name in list(history_by_workspace.keys()):
        if workspace_name not in after_workspaces:
            history_by_workspace.pop(workspace_name, None)

    after["workspace_versions"] = history_by_workspace


def _principal_matches(
    grant: dict[str, Any],
    username: str,
    role: str | None,
    groups: list[str] | None,
) -> bool:
    grant_roles = _normalize_string_list(grant.get("roles"))
    grant_groups = _normalize_string_list(grant.get("groups"))
    grant_users = _normalize_string_list(grant.get("users"))

    if not grant_roles and not grant_groups and not grant_users:
        return True

    lower_role = str(role or "").strip().lower()
    lower_groups = {group.lower() for group in groups or []}
    lower_username = str(username or "").strip().lower()

    if any(r.lower() in {"*", lower_role} for r in grant_roles):
        return True
    if any(group.lower() in lower_groups for group in grant_groups):
        return True
    if any(user.lower() == lower_username for user in grant_users):
        return True
    return False


[docs] async def can_edit_shared_workspace( owner_id: str, username: str, workspace_name: str, role: str | None = None, groups: list[str] | None = None, ) -> bool: doc = await _collection.find_one({"user_id": owner_id}) owner_cfg = _sanitize_config(doc.get("config", {}) if doc else {}) for grant in _normalize_grants(owner_cfg, owner_id): if grant.get("name") != workspace_name: continue if grant.get("access") != "edit": continue if _principal_matches(grant, username, role, groups): return True return False
[docs] async def fetch_preferences(user_id: str) -> dict[str, Any]: doc = await _collection.find_one({"user_id": user_id}) config: dict[str, Any] = _sanitize_config( doc.get("config", {}) if doc else {} ) # Some unit tests monkeypatch a FakeColl without .find; just return stored # config. if not hasattr(_collection, "find"): return config shared: list[dict[str, Any]] = [] seen_ids: set[str] = set() cursor = _collection.find( { "$or": [ { "config.shared_workspace_grants": { "$exists": True, "$ne": [], } }, {"config.sharedWorkspaceGrants": {"$exists": True, "$ne": []}}, ] } ) async for other in cursor: owner_id = other.get("user_id") if owner_id == user_id: continue owner_cfg = _sanitize_config(other.get("config", {}) or {}) workspaces = owner_cfg.get("workspaces") if not isinstance(workspaces, dict): workspaces = {} owner_variables = owner_cfg.get("variables") if not isinstance(owner_variables, dict): owner_variables = {} for grant in _normalize_grants(owner_cfg, str(owner_id or "")): workspace_name = grant.get("name") workspace = ( workspaces.get(workspace_name) if isinstance(workspace_name, str) else None ) if not isinstance(workspace, dict): continue grant_id = str(grant.get("id") or "").strip() if grant_id and grant_id in seen_ids: continue if grant_id: seen_ids.add(grant_id) merged = { key: value for key, value in grant.items() if key not in {"workspace", "variables"} } merged["owner"] = str(owner_id or grant.get("owner") or "") merged["workspace"] = _deep_copy(workspace) workspace_version = _latest_workspace_version( owner_cfg, workspace_name ) if workspace_version is not None: merged["workspace_version"] = workspace_version if owner_variables: merged["variables"] = _deep_copy(owner_variables) elif isinstance(grant.get("variables"), dict): merged["variables"] = _deep_copy(grant["variables"]) shared.append(merged) config["shared_workspace_library"] = shared return config
[docs] async def fetch_workspace_names(user_id: str) -> list[str]: doc = await _collection.find_one({"user_id": user_id}) config: dict[str, Any] = _sanitize_config( doc.get("config", {}) if doc else {} ) return _workspace_names(config)
[docs] async def fetch_workspace_snapshot( user_id: str, workspace_name: str ) -> dict[str, Any] | None: target = _normalize_workspace_name(workspace_name) if not target: return None doc = await _collection.find_one({"user_id": user_id}) config: dict[str, Any] = _sanitize_config( doc.get("config", {}) if doc else {} ) workspaces = config.get("workspaces") if not isinstance(workspaces, dict): return None workspace = workspaces.get(target) if not isinstance(workspace, dict): return None return { "name": target, "version": _latest_workspace_version(config, target), "workspace": _deep_copy(workspace), }
[docs] async def fetch_preferences_summary(user_id: str) -> dict[str, Any]: doc = await _collection.find_one({"user_id": user_id}) config: dict[str, Any] = _sanitize_config( doc.get("config", {}) if doc else {} ) return _workspace_mutation_result(user_id, config)
[docs] async def set_current_workspace( user_id: str, workspace_name: str ) -> dict[str, Any]: target = _normalize_workspace_name(workspace_name) if not target: raise ValueError("Workspace name cannot be empty") existing_doc = await _collection.find_one({"user_id": user_id}) current = _sanitize_config( existing_doc.get("config", {}) if existing_doc else {} ) workspaces = current.get("workspaces") if not isinstance(workspaces, dict) or target not in workspaces: raise ValueError(f"Workspace '{target}' does not exist") merged = _deep_copy(current) merged["currentWorkspace"] = target await _collection.update_one( {"user_id": user_id}, {"$set": {"config": merged}}, upsert=True ) return _workspace_mutation_result(user_id, merged)
[docs] async def save_workspace( user_id: str, workspace_name: str, workspace: dict[str, Any], make_current: bool = False, ) -> dict[str, Any]: target = _normalize_workspace_name(workspace_name) if not target: raise ValueError("Workspace name cannot be empty") if not isinstance(workspace, dict): raise ValueError("Workspace payload must be an object") existing_doc = await _collection.find_one({"user_id": user_id}) current = _sanitize_config( existing_doc.get("config", {}) if existing_doc else {} ) current_workspaces = current.get("workspaces") if not isinstance(current_workspaces, dict): current_workspaces = {} merged = _deep_copy(current) merged_workspaces = _deep_copy(current_workspaces) merged_workspaces[target] = _deep_copy(workspace) merged["workspaces"] = merged_workspaces if make_current or not _normalize_workspace_name( merged.get("currentWorkspace") ): merged["currentWorkspace"] = target raw_versions = merged.get("workspace_versions") versions = ( _deep_copy(raw_versions) if isinstance(raw_versions, dict) else {} ) previous_workspace = current_workspaces.get(target) next_workspace = merged_workspaces.get(target) if not _deep_equal(previous_workspace, next_workspace): history = versions.get(target) if not isinstance(history, list): history = [] latest_workspace = ( history[-1].get("workspace") if history and isinstance(history[-1], dict) else None ) if not _deep_equal(latest_workspace, next_workspace): history.append( { "version": _next_workspace_version(history), "savedAt": _current_timestamp(), "workspace": _deep_copy(next_workspace), } ) versions[target] = _trim_history(history) merged["workspace_versions"] = versions await _collection.update_one( {"user_id": user_id}, {"$set": {"config": merged}}, upsert=True ) return { "name": target, "version": _latest_workspace_version(merged, target), "workspace": _deep_copy(next_workspace), }
[docs] async def delete_workspace( user_id: str, workspace_name: str, next_workspace: str | None = None ) -> dict[str, Any]: target = _normalize_workspace_name(workspace_name) if not target: raise ValueError("Workspace name cannot be empty") existing_doc = await _collection.find_one({"user_id": user_id}) current = _sanitize_config( existing_doc.get("config", {}) if existing_doc else {} ) workspaces = current.get("workspaces") if not isinstance(workspaces, dict) or target not in workspaces: raise ValueError(f"Workspace '{target}' does not exist") if len(workspaces) <= 1: raise ValueError("Cannot delete the last workspace") merged = _deep_copy(current) merged_workspaces = _deep_copy(workspaces) merged_workspaces.pop(target, None) merged["workspaces"] = merged_workspaces preferred_next = _normalize_workspace_name(next_workspace) if preferred_next and preferred_next in merged_workspaces: resolved_current = preferred_next else: current_workspace = _normalize_workspace_name( merged.get("currentWorkspace") ) if ( current_workspace == target or current_workspace not in merged_workspaces ): resolved_current = next(iter(merged_workspaces.keys()), "") else: resolved_current = current_workspace merged["currentWorkspace"] = resolved_current raw_versions = merged.get("workspace_versions") if isinstance(raw_versions, dict) and target in raw_versions: versions = _deep_copy(raw_versions) versions.pop(target, None) merged["workspace_versions"] = versions raw_grants = merged.get("shared_workspace_grants") if isinstance(raw_grants, list): merged["shared_workspace_grants"] = [ grant for grant in _normalize_grants(merged, user_id) if grant.get("name") != target ] await _collection.update_one( {"user_id": user_id}, {"$set": {"config": merged}}, upsert=True ) return _workspace_mutation_result(user_id, merged)
[docs] async def rename_workspace( user_id: str, old_name: str, new_name: str ) -> dict[str, Any]: old_normalized = _normalize_workspace_name(old_name) new_normalized = _normalize_workspace_name(new_name) if not old_normalized or not new_normalized: raise ValueError("Workspace names cannot be empty") if old_normalized == new_normalized: existing_doc = await _collection.find_one({"user_id": user_id}) current = _sanitize_config( existing_doc.get("config", {}) if existing_doc else {} ) names = await fetch_workspace_names(user_id) return { "userId": user_id, "oldName": old_normalized, "newName": new_normalized, "currentWorkspace": current.get("currentWorkspace"), "workspaceNames": names, } existing_doc = await _collection.find_one({"user_id": user_id}) current = _sanitize_config( existing_doc.get("config", {}) if existing_doc else {} ) workspaces = current.get("workspaces") if not isinstance(workspaces, dict): workspaces = {} if old_normalized not in workspaces: raise ValueError(f"Workspace '{old_normalized}' does not exist") if new_normalized in workspaces: raise ValueError(f"Workspace '{new_normalized}' already exists") merged = _deep_copy(current) merged_workspaces = _deep_copy(workspaces) renamed_workspace = merged_workspaces.pop(old_normalized) merged_workspaces[new_normalized] = renamed_workspace merged["workspaces"] = merged_workspaces current_workspace = _normalize_workspace_name( merged.get("currentWorkspace") ) if current_workspace == old_normalized: merged["currentWorkspace"] = new_normalized raw_versions = merged.get("workspace_versions") if isinstance(raw_versions, dict) and old_normalized in raw_versions: versions = _deep_copy(raw_versions) versions[new_normalized] = versions.pop(old_normalized) merged["workspace_versions"] = versions raw_grants = merged.get("shared_workspace_grants") if isinstance(raw_grants, list): normalized_grants = _normalize_grants(merged, user_id) renamed_grants: list[dict[str, Any]] = [] for grant in normalized_grants: if grant.get("name") == old_normalized: owner = ( _normalize_workspace_name(grant.get("owner")) or user_id ) grant["name"] = new_normalized legacy_id = f"{owner}::{old_normalized}" current_id = _normalize_workspace_name(grant.get("id")) if not current_id or current_id == legacy_id: grant["id"] = f"{owner}::{new_normalized}" renamed_grants.append(grant) merged["shared_workspace_grants"] = renamed_grants await _collection.update_one( {"user_id": user_id}, {"$set": {"config": merged}}, upsert=True ) return { "userId": user_id, "oldName": old_normalized, "newName": new_normalized, "currentWorkspace": merged.get("currentWorkspace"), "workspaceNames": _workspace_names(merged), }
[docs] async def update_preferences( user_id: str, config: dict[str, Any] ) -> dict[str, Any]: incoming = config if isinstance(config, dict) else {} patch = _sanitize_config(incoming, coerce_workspace=False) workspace_target = incoming.get("workspaceTarget") if isinstance(workspace_target, str) and workspace_target.strip(): patch["workspaceTarget"] = workspace_target.strip() existing_doc = await _collection.find_one({"user_id": user_id}) current = _sanitize_config( existing_doc.get("config", {}) if existing_doc else {} ) merged = _deep_copy(current) for key in REPLACED_TOP_LEVEL_KEYS: if key in patch: merged[key] = _deep_copy(patch[key]) for key in MERGED_TOP_LEVEL_KEYS: if key in patch: value = patch.get(key) if isinstance(value, dict): existing = merged.get(key) if not isinstance(existing, dict): existing = {} merged[key] = _deep_merge(existing, value) else: merged[key] = _deep_copy(value) if _has_grants_key(incoming): merged["shared_workspace_grants"] = _normalize_grants( incoming, user_id ) if "workspaces" in patch or any( key in patch for key in WORKSPACE_PATCH_KEYS - {"workspaces"} ): _apply_workspace_patch(merged, patch) requested_current_workspace = ( patch.get("currentWorkspace") if "currentWorkspace" in patch else None ) _coerce_current_workspace(merged, requested_current_workspace) _append_workspace_versions(current, merged, patch) await _collection.update_one( {"user_id": user_id}, {"$set": {"config": merged}}, upsert=True ) return merged