# config_store.py
from __future__ import annotations
import json
import logging
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any
from pymongo import ASCENDING, DESCENDING, MongoClient
from pymongo.errors import DuplicateKeyError
MONGO_URI = os.getenv("MONGO_URI", "mongodb://mongo:27017")
MONGO_DB = os.getenv("MONGO_DB", "octopus")
COLLECTION = os.getenv("CONFIG_COLLECTION", "endpoint_configs")
STATE_COLLECTION = os.getenv(
"CONFIG_STATE_COLLECTION", "endpoint_config_state"
)
_log = logging.getLogger(__name__)
_client = MongoClient(
MONGO_URI,
serverSelectionTimeoutMS=int(os.getenv("MONGO_TIMEOUT_MS", "5000")),
)
_db = _client[MONGO_DB]
_coll = _db[COLLECTION]
_state = _db[STATE_COLLECTION]
# ensure index
try:
# Avoid crashing on import when the client/collection is a stub or
# unavailable.
_coll.create_index([("version", ASCENDING)], unique=True)
except Exception:
# In tests or constrained environments the collection may be a fake
# without this method.
pass
def _parse_ver(v: str) -> tuple[int, int, int]:
parts = (v or "").split(".")
if len(parts) != 3:
raise ValueError(f"Invalid version '{v}' (expected MAJOR.MINOR.PATCH)")
try:
return int(parts[0]), int(parts[1]), int(parts[2])
except Exception as exc:
raise ValueError(
f"Invalid version '{v}' (non-integer component)"
) from exc
def _bump_patch(v: str) -> str:
a, b, c = _parse_ver(v)
return f"{a}.{b}.{c+1}"
def _active_version() -> str | None:
try:
doc = _state.find_one({"_id": "active"})
return doc.get("version") if doc else None
except Exception:
return None
# ——— Recovery helpers ———
[docs]
def get_active_version() -> str | None:
"""Return the currently active version stored in state (or None)."""
return _active_version()
[docs]
def get_last_good_version() -> str | None:
"""Return the last-known-good version (if any)."""
try:
doc = _state.find_one({"_id": "last_good"})
return doc.get("version") if doc else None
except Exception:
return None
[docs]
def set_last_good_version(version: str) -> None:
"""Persist last-known-good version (best-effort)."""
try:
_state.update_one(
{"_id": "last_good"},
{"$set": {"version": version, "updated_at": datetime.utcnow()}},
upsert=True,
)
except Exception:
pass
[docs]
def list_versions() -> dict[str, Any]:
active = _active_version()
versions: list[dict[str, Any]] = []
try:
# Include message in the projection so the UI can show tooltips/labels.
for d in _coll.find(
{}, {"_id": 0, "version": 1, "created_at": 1, "message": 1}
).sort("created_at", DESCENDING):
created = d.get("created_at")
if isinstance(created, datetime):
created_iso = created.isoformat() + "Z"
else:
created_iso = str(created) if created else None
versions.append(
{
"version": d["version"],
"created_at": created_iso,
"message": d.get("message"),
}
)
except Exception:
# Collection may be unavailable under tests; return what we have.
pass
return {"active": active, "versions": versions}
[docs]
def fetch_active_config() -> dict[str, Any] | None:
ver = _active_version()
if not ver:
return None
try:
doc = _coll.find_one({"version": ver})
return doc.get("config") if doc else None
except Exception:
return None
[docs]
def fetch_config_by_version(version: str | None) -> dict[str, Any] | None:
if not version:
return fetch_active_config()
try:
doc = _coll.find_one({"version": version})
return doc.get("config") if doc else None
except Exception:
return None
def _scrub_mongo_uri(uri: str) -> str:
"""Hide credentials while keeping host/port visible in logs."""
if "@" not in uri:
return uri
return re.sub(r"://[^@]+@", "://***@", uri)
[docs]
def health_check() -> None:
"""
Lightweight connectivity check. Raises on failure so callers can
differentiate config-not-found from Mongo-unreachable cases.
"""
try:
_client.admin.command("ping")
except Exception as exc:
_log.error(
"Mongo health check failed (MONGO_URI=%s): %s",
_scrub_mongo_uri(MONGO_URI),
exc,
)
raise
[docs]
def set_active_version(version: str) -> None:
if not _coll.find_one({"version": version}):
raise ValueError(f"Version '{version}' does not exist")
_state.update_one(
{"_id": "active"}, {"$set": {"version": version}}, upsert=True
)
def _latest_version() -> str | None:
try:
doc = (
_coll.find({}, {"version": 1, "_id": 0})
.sort("created_at", DESCENDING)
.limit(1)
.next(None)
)
return doc["version"] if doc else None
except Exception:
return None
[docs]
def save_new_version(
config: dict[str, Any],
*,
version: str | None,
activate: bool,
message: str | None = None,
) -> str:
"""
Insert a new version document. When *version* is None, bump the patch from
the active version (or the latest, or 1.0.0 if none exist).
"""
now = datetime.utcnow()
if version is None:
base = _active_version() or _latest_version() or "1.0.0"
try:
# If base is bootstrap 1.0.0 and missing, still bump from it.
version = _bump_patch(base)
except Exception:
# if base invalid, fallback to 1.0.1
version = "1.0.1"
else:
# validate provided version
_parse_ver(version)
doc = {
"version": version,
"config": config,
"created_at": now,
"message": message,
}
try:
_coll.insert_one(doc)
except DuplicateKeyError:
raise ValueError(f"Version '{version}' already exists")
if activate:
set_active_version(version)
return version
[docs]
def ensure_bootstrap_from_file(cfg_path: Path) -> None:
"""
Load endpoints.json and store it as version 1.0.0 (active) when the
collection is empty.
"""
try:
if _coll.estimated_document_count() > 0:
return
except Exception:
# Skip when the collection is a fake (e.g., under tests).
return
if not cfg_path.exists():
return
try:
raw = json.loads(cfg_path.read_text())
# Avoid double insert if race
if not _coll.find_one({"version": "1.0.0"}):
_coll.insert_one(
{
"version": "1.0.0",
"config": raw,
"created_at": datetime.utcnow(),
"message": "Bootstrap from endpoints.json",
}
)
set_active_version("1.0.0")
# Also record as last_good on first boot
set_last_good_version("1.0.0")
except Exception:
# swallow bootstrap failures; loader will raise later if nothing exists
pass
# ——— NEW: deletion ———
[docs]
def delete_version(version: str) -> None:
"""
Delete a saved config version (refuse when it is the active version) and
adjust the last_good pointer when needed.
"""
if not version:
raise ValueError("Version must be provided")
if version == _active_version():
raise ValueError("Cannot delete the active version")
res = _coll.delete_one({"version": version})
if res.deleted_count == 0:
raise ValueError(f"Version '{version}' not found")
# If last_good pointed to this, move it to active or latest remaining
lg = get_last_good_version()
if lg == version:
replacement = _active_version() or _latest_version()
if replacement:
set_last_good_version(replacement)
else:
_state.delete_one({"_id": "last_good"})