# preferences_store.py
from __future__ import annotations
import copy
import os
from datetime import datetime, timezone
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
MONGO_URI = os.getenv("MONGO_URI", "mongodb://mongo:27017")
MONGO_DB = os.getenv("MONGO_DB", "octopus")
MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "preferences")
WORKSPACE_CONTENT_PATCH_KEYS = {
"layout",
"visibleMap",
"widget_configs",
"collapsedMap",
"collapsedHeights",
"panels",
"panelOrder",
"panelLayout",
}
WORKSPACE_PATCH_KEYS = WORKSPACE_CONTENT_PATCH_KEYS | {
"workspaces",
}
MUTABLE_SHARED_WORKSPACE_KEYS = WORKSPACE_CONTENT_PATCH_KEYS | {
"workspaceTarget",
}
CLIENT_ONLY_CONFIG_KEYS = {
"_initialized",
"_lastUserId",
"error",
"shared_workspace_library",
"sharedWorkspaceLibrary",
}
MERGED_TOP_LEVEL_KEYS = {"widget_refresh_rates", "widget_propagation"}
REPLACED_TOP_LEVEL_KEYS = {
"theme",
"spacing",
"spacingHorizontal",
"spacingVertical",
"edit",
"currentWorkspace",
"timeRange",
"header_clocks",
"remote_widgets",
"variables",
"workspace_versions",
}
MAX_WORKSPACE_VERSION_HISTORY = 1
_client = AsyncIOMotorClient(MONGO_URI)
_collection = _client[MONGO_DB][MONGO_COLLECTION]
def _deep_copy(value: Any) -> Any:
return copy.deepcopy(value)
def _deep_merge(base: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
out = _deep_copy(base)
for key, value in patch.items():
current = out.get(key)
if isinstance(current, dict) and isinstance(value, dict):
out[key] = _deep_merge(current, value)
else:
out[key] = _deep_copy(value)
return out
def _deep_equal(left: Any, right: Any) -> bool:
return left == right
def _normalize_string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
out: list[str] = []
seen: set[str] = set()
for item in value:
text = str(item or "").strip()
if not text:
continue
key = text.lower()
if key in seen:
continue
seen.add(key)
out.append(text)
return out
def _normalize_access(value: Any) -> str:
normalized = str(value or "").strip().lower()
if normalized in {"edit", "editor", "write", "readwrite", "read-write"}:
return "edit"
return "view"
def _extract_grants(cfg: dict[str, Any] | None) -> list[dict[str, Any]]:
if not cfg or not isinstance(cfg, dict):
return []
grants = cfg.get("shared_workspace_grants")
if not isinstance(grants, list):
grants = cfg.get("sharedWorkspaceGrants")
if not isinstance(grants, list):
return []
return [g for g in grants if isinstance(g, dict)]
def _has_grants_key(cfg: dict[str, Any] | None) -> bool:
if not cfg or not isinstance(cfg, dict):
return False
return "shared_workspace_grants" in cfg or "sharedWorkspaceGrants" in cfg
def _normalize_grant(
raw: dict[str, Any], default_owner: str = ""
) -> dict[str, Any] | None:
if not isinstance(raw, dict):
return None
name = str(raw.get("name") or "").strip()
if not name:
return None
owner = str(raw.get("owner") or default_owner or "").strip()
roles = _normalize_string_list(raw.get("roles"))
legacy_role = str(raw.get("role") or "").strip()
if legacy_role and legacy_role.lower() not in {
role.lower() for role in roles
}:
roles.append(legacy_role)
grant = {
"id": str(raw.get("id") or f"{owner or 'shared'}::{name}").strip(),
"name": name,
"owner": owner,
"roles": roles,
"groups": _normalize_string_list(raw.get("groups")),
"users": _normalize_string_list(raw.get("users")),
"access": _normalize_access(raw.get("access")),
}
workspace = raw.get("workspace")
if isinstance(workspace, dict):
grant["workspace"] = _deep_copy(workspace)
variables = raw.get("variables")
if isinstance(variables, dict):
grant["variables"] = {
str(key).strip().upper(): "" if value is None else str(value)
for key, value in variables.items()
if str(key).strip()
}
return grant
def _normalize_grants(
cfg: dict[str, Any] | None, default_owner: str = ""
) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
seen: set[str] = set()
for raw in _extract_grants(cfg):
grant = _normalize_grant(raw, default_owner)
if not grant:
continue
grant_id = str(grant.get("id") or "").strip()
if grant_id and grant_id in seen:
continue
if grant_id:
seen.add(grant_id)
out.append(grant)
return out
def _coerce_current_workspace(
config: dict[str, Any], requested: Any | None = None
) -> None:
has_workspaces_key = "workspaces" in config
workspaces = _normalize_workspace_map(config.get("workspaces"))
if has_workspaces_key or workspaces:
config["workspaces"] = workspaces
workspace_names = list(workspaces.keys())
requested_name = _normalize_workspace_name(requested)
if requested_name and requested_name in workspaces:
config["currentWorkspace"] = requested_name
return
current = _normalize_workspace_name(config.get("currentWorkspace"))
if current and current in workspaces:
config["currentWorkspace"] = current
return
if workspace_names:
config["currentWorkspace"] = workspace_names[0]
else:
config.pop("currentWorkspace", None)
def _sanitize_config(
cfg: dict[str, Any] | None, *, coerce_workspace: bool = True
) -> dict[str, Any]:
if not cfg or not isinstance(cfg, dict):
return {}
cleaned = {
key: _deep_copy(value)
for key, value in cfg.items()
if key not in CLIENT_ONLY_CONFIG_KEYS
and key not in {"sharedWorkspaceGrants", "workspaceTarget"}
}
if "workspaces" in cleaned:
cleaned["workspaces"] = _normalize_workspace_map(
cleaned.get("workspaces")
)
raw_header_clocks = cleaned.pop("headerClocks", None)
if isinstance(cleaned.get("header_clocks"), dict):
cleaned["header_clocks"] = _deep_copy(cleaned["header_clocks"])
elif isinstance(raw_header_clocks, dict):
cleaned["header_clocks"] = _deep_copy(raw_header_clocks)
if coerce_workspace:
_coerce_current_workspace(cleaned)
if _has_grants_key(cfg):
cleaned["shared_workspace_grants"] = _normalize_grants(cfg)
return cleaned
def _current_timestamp() -> str:
return datetime.now(timezone.utc).isoformat()
def _trim_history(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
if len(entries) <= MAX_WORKSPACE_VERSION_HISTORY:
return entries
return entries[-MAX_WORKSPACE_VERSION_HISTORY:]
def _next_workspace_version(history: list[dict[str, Any]]) -> int:
if not history:
return 1
return max(int(item.get("version") or 0) for item in history) + 1
def _latest_workspace_version(
config: dict[str, Any], workspace_name: str | None
) -> int | None:
if not workspace_name:
return None
raw_versions = config.get("workspace_versions")
if not isinstance(raw_versions, dict):
return None
history = raw_versions.get(workspace_name)
if not isinstance(history, list):
return None
latest = 0
for item in history:
if not isinstance(item, dict):
continue
latest = max(latest, int(item.get("version") or 0))
return latest or None
def _normalize_workspace_name(value: Any) -> str:
return str(value or "").strip()
def _normalize_workspace_map(
raw_workspaces: Any,
) -> dict[str, dict[str, Any]]:
if not isinstance(raw_workspaces, dict):
return {}
normalized: dict[str, dict[str, Any]] = {}
for raw_name, workspace in raw_workspaces.items():
name = _normalize_workspace_name(raw_name)
if not name or not isinstance(workspace, dict):
continue
normalized[name] = _deep_copy(workspace)
return normalized
def _workspace_names(config: dict[str, Any]) -> list[str]:
return list(_normalize_workspace_map(config.get("workspaces")).keys())
def _workspace_mutation_result(
user_id: str, config: dict[str, Any]
) -> dict[str, Any]:
names = _workspace_names(config)
return {
"userId": user_id,
"currentWorkspace": config.get("currentWorkspace"),
"workspaceNames": names,
"workspaceCount": len(names),
}
def _target_workspace_name(
config: dict[str, Any], patch: dict[str, Any]
) -> str | None:
target = patch.get("workspaceTarget")
if isinstance(target, str) and target.strip():
return target.strip()
current = patch.get("currentWorkspace")
if isinstance(current, str) and current.strip():
return current.strip()
existing = config.get("currentWorkspace")
if isinstance(existing, str) and existing.strip():
return existing.strip()
raw_versions = patch.get("workspace_versions")
if isinstance(raw_versions, dict):
candidates = [
_normalize_workspace_name(name)
for name in raw_versions.keys()
if _normalize_workspace_name(name)
]
deduped = list(dict.fromkeys(candidates))
if len(deduped) == 1:
return deduped[0]
return None
def _apply_workspace_patch(
config: dict[str, Any], patch: dict[str, Any]
) -> None:
workspaces = _normalize_workspace_map(config.get("workspaces"))
shared_targeted_patch = bool(
isinstance(patch.get("workspaceTarget"), str)
and patch.get("workspaceTarget").strip()
)
if (
"workspaces" in patch
and isinstance(patch.get("workspaces"), dict)
and not shared_targeted_patch
):
# Full workspace-map replacement. Null/invalid entries are tombstones.
workspaces = _normalize_workspace_map(patch["workspaces"])
if any(key in patch for key in WORKSPACE_CONTENT_PATCH_KEYS):
target = _target_workspace_name(config, patch)
if target and not isinstance(workspaces.get(target), dict):
if len(workspaces) == 0:
workspaces[target] = {}
else:
target = None
if target and isinstance(workspaces.get(target), dict):
workspace = _deep_copy(workspaces[target])
for key in WORKSPACE_CONTENT_PATCH_KEYS:
if key not in patch:
continue
value = patch[key]
if key == "widget_configs" and isinstance(value, dict):
current = workspace.get("widget_configs")
if not isinstance(current, dict):
current = {}
workspace["widget_configs"] = _deep_merge(current, value)
elif key in {
"collapsedMap",
"collapsedHeights",
} and isinstance(value, dict):
current = workspace.get(key)
if not isinstance(current, dict):
current = {}
workspace[key] = _deep_merge(current, value)
else:
workspace[key] = _deep_copy(value)
workspaces[target] = workspace
config["workspaces"] = workspaces
def _append_workspace_versions(
before: dict[str, Any], after: dict[str, Any], patch: dict[str, Any]
) -> None:
if "workspace_versions" in patch:
return
if not any(key in patch for key in WORKSPACE_PATCH_KEYS):
return
before_workspaces = _normalize_workspace_map(before.get("workspaces"))
after_workspaces = _normalize_workspace_map(after.get("workspaces"))
raw_versions = before.get("workspace_versions")
history_by_workspace = (
_deep_copy(raw_versions) if isinstance(raw_versions, dict) else {}
)
saved_at = _current_timestamp()
for workspace_name, next_workspace in after_workspaces.items():
previous_workspace = before_workspaces.get(workspace_name)
if _deep_equal(previous_workspace, next_workspace):
continue
history = history_by_workspace.get(workspace_name)
if not isinstance(history, list):
history = []
latest_workspace = (
history[-1].get("workspace")
if history and isinstance(history[-1], dict)
else None
)
if _deep_equal(latest_workspace, next_workspace):
continue
history.append(
{
"version": _next_workspace_version(history),
"savedAt": saved_at,
"workspace": _deep_copy(next_workspace),
}
)
history_by_workspace[workspace_name] = _trim_history(history)
# Drop history for workspaces that no longer exist after full-map updates.
for workspace_name in list(history_by_workspace.keys()):
if workspace_name not in after_workspaces:
history_by_workspace.pop(workspace_name, None)
after["workspace_versions"] = history_by_workspace
def _principal_matches(
grant: dict[str, Any],
username: str,
role: str | None,
groups: list[str] | None,
) -> bool:
grant_roles = _normalize_string_list(grant.get("roles"))
grant_groups = _normalize_string_list(grant.get("groups"))
grant_users = _normalize_string_list(grant.get("users"))
if not grant_roles and not grant_groups and not grant_users:
return True
lower_role = str(role or "").strip().lower()
lower_groups = {group.lower() for group in groups or []}
lower_username = str(username or "").strip().lower()
if any(r.lower() in {"*", lower_role} for r in grant_roles):
return True
if any(group.lower() in lower_groups for group in grant_groups):
return True
if any(user.lower() == lower_username for user in grant_users):
return True
return False
[docs]
async def can_edit_shared_workspace(
owner_id: str,
username: str,
workspace_name: str,
role: str | None = None,
groups: list[str] | None = None,
) -> bool:
doc = await _collection.find_one({"user_id": owner_id})
owner_cfg = _sanitize_config(doc.get("config", {}) if doc else {})
for grant in _normalize_grants(owner_cfg, owner_id):
if grant.get("name") != workspace_name:
continue
if grant.get("access") != "edit":
continue
if _principal_matches(grant, username, role, groups):
return True
return False
[docs]
async def fetch_preferences(user_id: str) -> dict[str, Any]:
doc = await _collection.find_one({"user_id": user_id})
config: dict[str, Any] = _sanitize_config(
doc.get("config", {}) if doc else {}
)
# Some unit tests monkeypatch a FakeColl without .find; just return stored
# config.
if not hasattr(_collection, "find"):
return config
shared: list[dict[str, Any]] = []
seen_ids: set[str] = set()
cursor = _collection.find(
{
"$or": [
{
"config.shared_workspace_grants": {
"$exists": True,
"$ne": [],
}
},
{"config.sharedWorkspaceGrants": {"$exists": True, "$ne": []}},
]
}
)
async for other in cursor:
owner_id = other.get("user_id")
if owner_id == user_id:
continue
owner_cfg = _sanitize_config(other.get("config", {}) or {})
workspaces = owner_cfg.get("workspaces")
if not isinstance(workspaces, dict):
workspaces = {}
owner_variables = owner_cfg.get("variables")
if not isinstance(owner_variables, dict):
owner_variables = {}
for grant in _normalize_grants(owner_cfg, str(owner_id or "")):
workspace_name = grant.get("name")
workspace = (
workspaces.get(workspace_name)
if isinstance(workspace_name, str)
else None
)
if not isinstance(workspace, dict):
continue
grant_id = str(grant.get("id") or "").strip()
if grant_id and grant_id in seen_ids:
continue
if grant_id:
seen_ids.add(grant_id)
merged = {
key: value
for key, value in grant.items()
if key not in {"workspace", "variables"}
}
merged["owner"] = str(owner_id or grant.get("owner") or "")
merged["workspace"] = _deep_copy(workspace)
workspace_version = _latest_workspace_version(
owner_cfg, workspace_name
)
if workspace_version is not None:
merged["workspace_version"] = workspace_version
if owner_variables:
merged["variables"] = _deep_copy(owner_variables)
elif isinstance(grant.get("variables"), dict):
merged["variables"] = _deep_copy(grant["variables"])
shared.append(merged)
config["shared_workspace_library"] = shared
return config
[docs]
async def fetch_workspace_names(user_id: str) -> list[str]:
doc = await _collection.find_one({"user_id": user_id})
config: dict[str, Any] = _sanitize_config(
doc.get("config", {}) if doc else {}
)
return _workspace_names(config)
[docs]
async def fetch_workspace_snapshot(
user_id: str, workspace_name: str
) -> dict[str, Any] | None:
target = _normalize_workspace_name(workspace_name)
if not target:
return None
doc = await _collection.find_one({"user_id": user_id})
config: dict[str, Any] = _sanitize_config(
doc.get("config", {}) if doc else {}
)
workspaces = config.get("workspaces")
if not isinstance(workspaces, dict):
return None
workspace = workspaces.get(target)
if not isinstance(workspace, dict):
return None
return {
"name": target,
"version": _latest_workspace_version(config, target),
"workspace": _deep_copy(workspace),
}
[docs]
async def fetch_preferences_summary(user_id: str) -> dict[str, Any]:
doc = await _collection.find_one({"user_id": user_id})
config: dict[str, Any] = _sanitize_config(
doc.get("config", {}) if doc else {}
)
return _workspace_mutation_result(user_id, config)
[docs]
async def set_current_workspace(
user_id: str, workspace_name: str
) -> dict[str, Any]:
target = _normalize_workspace_name(workspace_name)
if not target:
raise ValueError("Workspace name cannot be empty")
existing_doc = await _collection.find_one({"user_id": user_id})
current = _sanitize_config(
existing_doc.get("config", {}) if existing_doc else {}
)
workspaces = current.get("workspaces")
if not isinstance(workspaces, dict) or target not in workspaces:
raise ValueError(f"Workspace '{target}' does not exist")
merged = _deep_copy(current)
merged["currentWorkspace"] = target
await _collection.update_one(
{"user_id": user_id}, {"$set": {"config": merged}}, upsert=True
)
return _workspace_mutation_result(user_id, merged)
[docs]
async def save_workspace(
user_id: str,
workspace_name: str,
workspace: dict[str, Any],
make_current: bool = False,
) -> dict[str, Any]:
target = _normalize_workspace_name(workspace_name)
if not target:
raise ValueError("Workspace name cannot be empty")
if not isinstance(workspace, dict):
raise ValueError("Workspace payload must be an object")
existing_doc = await _collection.find_one({"user_id": user_id})
current = _sanitize_config(
existing_doc.get("config", {}) if existing_doc else {}
)
current_workspaces = current.get("workspaces")
if not isinstance(current_workspaces, dict):
current_workspaces = {}
merged = _deep_copy(current)
merged_workspaces = _deep_copy(current_workspaces)
merged_workspaces[target] = _deep_copy(workspace)
merged["workspaces"] = merged_workspaces
if make_current or not _normalize_workspace_name(
merged.get("currentWorkspace")
):
merged["currentWorkspace"] = target
raw_versions = merged.get("workspace_versions")
versions = (
_deep_copy(raw_versions) if isinstance(raw_versions, dict) else {}
)
previous_workspace = current_workspaces.get(target)
next_workspace = merged_workspaces.get(target)
if not _deep_equal(previous_workspace, next_workspace):
history = versions.get(target)
if not isinstance(history, list):
history = []
latest_workspace = (
history[-1].get("workspace")
if history and isinstance(history[-1], dict)
else None
)
if not _deep_equal(latest_workspace, next_workspace):
history.append(
{
"version": _next_workspace_version(history),
"savedAt": _current_timestamp(),
"workspace": _deep_copy(next_workspace),
}
)
versions[target] = _trim_history(history)
merged["workspace_versions"] = versions
await _collection.update_one(
{"user_id": user_id}, {"$set": {"config": merged}}, upsert=True
)
return {
"name": target,
"version": _latest_workspace_version(merged, target),
"workspace": _deep_copy(next_workspace),
}
[docs]
async def delete_workspace(
user_id: str, workspace_name: str, next_workspace: str | None = None
) -> dict[str, Any]:
target = _normalize_workspace_name(workspace_name)
if not target:
raise ValueError("Workspace name cannot be empty")
existing_doc = await _collection.find_one({"user_id": user_id})
current = _sanitize_config(
existing_doc.get("config", {}) if existing_doc else {}
)
workspaces = current.get("workspaces")
if not isinstance(workspaces, dict) or target not in workspaces:
raise ValueError(f"Workspace '{target}' does not exist")
if len(workspaces) <= 1:
raise ValueError("Cannot delete the last workspace")
merged = _deep_copy(current)
merged_workspaces = _deep_copy(workspaces)
merged_workspaces.pop(target, None)
merged["workspaces"] = merged_workspaces
preferred_next = _normalize_workspace_name(next_workspace)
if preferred_next and preferred_next in merged_workspaces:
resolved_current = preferred_next
else:
current_workspace = _normalize_workspace_name(
merged.get("currentWorkspace")
)
if (
current_workspace == target
or current_workspace not in merged_workspaces
):
resolved_current = next(iter(merged_workspaces.keys()), "")
else:
resolved_current = current_workspace
merged["currentWorkspace"] = resolved_current
raw_versions = merged.get("workspace_versions")
if isinstance(raw_versions, dict) and target in raw_versions:
versions = _deep_copy(raw_versions)
versions.pop(target, None)
merged["workspace_versions"] = versions
raw_grants = merged.get("shared_workspace_grants")
if isinstance(raw_grants, list):
merged["shared_workspace_grants"] = [
grant
for grant in _normalize_grants(merged, user_id)
if grant.get("name") != target
]
await _collection.update_one(
{"user_id": user_id}, {"$set": {"config": merged}}, upsert=True
)
return _workspace_mutation_result(user_id, merged)
[docs]
async def rename_workspace(
user_id: str, old_name: str, new_name: str
) -> dict[str, Any]:
old_normalized = _normalize_workspace_name(old_name)
new_normalized = _normalize_workspace_name(new_name)
if not old_normalized or not new_normalized:
raise ValueError("Workspace names cannot be empty")
if old_normalized == new_normalized:
existing_doc = await _collection.find_one({"user_id": user_id})
current = _sanitize_config(
existing_doc.get("config", {}) if existing_doc else {}
)
names = await fetch_workspace_names(user_id)
return {
"userId": user_id,
"oldName": old_normalized,
"newName": new_normalized,
"currentWorkspace": current.get("currentWorkspace"),
"workspaceNames": names,
}
existing_doc = await _collection.find_one({"user_id": user_id})
current = _sanitize_config(
existing_doc.get("config", {}) if existing_doc else {}
)
workspaces = current.get("workspaces")
if not isinstance(workspaces, dict):
workspaces = {}
if old_normalized not in workspaces:
raise ValueError(f"Workspace '{old_normalized}' does not exist")
if new_normalized in workspaces:
raise ValueError(f"Workspace '{new_normalized}' already exists")
merged = _deep_copy(current)
merged_workspaces = _deep_copy(workspaces)
renamed_workspace = merged_workspaces.pop(old_normalized)
merged_workspaces[new_normalized] = renamed_workspace
merged["workspaces"] = merged_workspaces
current_workspace = _normalize_workspace_name(
merged.get("currentWorkspace")
)
if current_workspace == old_normalized:
merged["currentWorkspace"] = new_normalized
raw_versions = merged.get("workspace_versions")
if isinstance(raw_versions, dict) and old_normalized in raw_versions:
versions = _deep_copy(raw_versions)
versions[new_normalized] = versions.pop(old_normalized)
merged["workspace_versions"] = versions
raw_grants = merged.get("shared_workspace_grants")
if isinstance(raw_grants, list):
normalized_grants = _normalize_grants(merged, user_id)
renamed_grants: list[dict[str, Any]] = []
for grant in normalized_grants:
if grant.get("name") == old_normalized:
owner = (
_normalize_workspace_name(grant.get("owner")) or user_id
)
grant["name"] = new_normalized
legacy_id = f"{owner}::{old_normalized}"
current_id = _normalize_workspace_name(grant.get("id"))
if not current_id or current_id == legacy_id:
grant["id"] = f"{owner}::{new_normalized}"
renamed_grants.append(grant)
merged["shared_workspace_grants"] = renamed_grants
await _collection.update_one(
{"user_id": user_id}, {"$set": {"config": merged}}, upsert=True
)
return {
"userId": user_id,
"oldName": old_normalized,
"newName": new_normalized,
"currentWorkspace": merged.get("currentWorkspace"),
"workspaceNames": _workspace_names(merged),
}
[docs]
async def update_preferences(
user_id: str, config: dict[str, Any]
) -> dict[str, Any]:
incoming = config if isinstance(config, dict) else {}
patch = _sanitize_config(incoming, coerce_workspace=False)
workspace_target = incoming.get("workspaceTarget")
if isinstance(workspace_target, str) and workspace_target.strip():
patch["workspaceTarget"] = workspace_target.strip()
existing_doc = await _collection.find_one({"user_id": user_id})
current = _sanitize_config(
existing_doc.get("config", {}) if existing_doc else {}
)
merged = _deep_copy(current)
for key in REPLACED_TOP_LEVEL_KEYS:
if key in patch:
merged[key] = _deep_copy(patch[key])
for key in MERGED_TOP_LEVEL_KEYS:
if key in patch:
value = patch.get(key)
if isinstance(value, dict):
existing = merged.get(key)
if not isinstance(existing, dict):
existing = {}
merged[key] = _deep_merge(existing, value)
else:
merged[key] = _deep_copy(value)
if _has_grants_key(incoming):
merged["shared_workspace_grants"] = _normalize_grants(
incoming, user_id
)
if "workspaces" in patch or any(
key in patch for key in WORKSPACE_PATCH_KEYS - {"workspaces"}
):
_apply_workspace_patch(merged, patch)
requested_current_workspace = (
patch.get("currentWorkspace") if "currentWorkspace" in patch else None
)
_coerce_current_workspace(merged, requested_current_workspace)
_append_workspace_versions(current, merged, patch)
await _collection.update_one(
{"user_id": user_id}, {"$set": {"config": merged}}, upsert=True
)
return merged