Source code for config_store

# config_store.py
from __future__ import annotations

import json
import logging
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any

from pymongo import ASCENDING, DESCENDING, MongoClient
from pymongo.errors import DuplicateKeyError

MONGO_URI = os.getenv("MONGO_URI", "mongodb://mongo:27017")
MONGO_DB = os.getenv("MONGO_DB", "octopus")
COLLECTION = os.getenv("CONFIG_COLLECTION", "endpoint_configs")
STATE_COLLECTION = os.getenv(
    "CONFIG_STATE_COLLECTION", "endpoint_config_state"
)

_log = logging.getLogger(__name__)

_client = MongoClient(
    MONGO_URI,
    serverSelectionTimeoutMS=int(os.getenv("MONGO_TIMEOUT_MS", "5000")),
)
_db = _client[MONGO_DB]
_coll = _db[COLLECTION]
_state = _db[STATE_COLLECTION]

# ensure index
try:
    # Avoid crashing on import when the client/collection is a stub or
    # unavailable.
    _coll.create_index([("version", ASCENDING)], unique=True)
except Exception:
    # In tests or constrained environments the collection may be a fake
    # without this method.
    pass


def _parse_ver(v: str) -> tuple[int, int, int]:
    parts = (v or "").split(".")
    if len(parts) != 3:
        raise ValueError(f"Invalid version '{v}' (expected MAJOR.MINOR.PATCH)")
    try:
        return int(parts[0]), int(parts[1]), int(parts[2])
    except Exception as exc:
        raise ValueError(
            f"Invalid version '{v}' (non-integer component)"
        ) from exc


def _bump_patch(v: str) -> str:
    a, b, c = _parse_ver(v)
    return f"{a}.{b}.{c+1}"


def _active_version() -> str | None:
    try:
        doc = _state.find_one({"_id": "active"})
        return doc.get("version") if doc else None
    except Exception:
        return None


# ——— Recovery helpers ———
[docs] def get_active_version() -> str | None: """Return the currently active version stored in state (or None).""" return _active_version()
[docs] def get_last_good_version() -> str | None: """Return the last-known-good version (if any).""" try: doc = _state.find_one({"_id": "last_good"}) return doc.get("version") if doc else None except Exception: return None
[docs] def set_last_good_version(version: str) -> None: """Persist last-known-good version (best-effort).""" try: _state.update_one( {"_id": "last_good"}, {"$set": {"version": version, "updated_at": datetime.utcnow()}}, upsert=True, ) except Exception: pass
[docs] def list_versions() -> dict[str, Any]: active = _active_version() versions: list[dict[str, Any]] = [] try: # Include message in the projection so the UI can show tooltips/labels. for d in _coll.find( {}, {"_id": 0, "version": 1, "created_at": 1, "message": 1} ).sort("created_at", DESCENDING): created = d.get("created_at") if isinstance(created, datetime): created_iso = created.isoformat() + "Z" else: created_iso = str(created) if created else None versions.append( { "version": d["version"], "created_at": created_iso, "message": d.get("message"), } ) except Exception: # Collection may be unavailable under tests; return what we have. pass return {"active": active, "versions": versions}
[docs] def fetch_active_config() -> dict[str, Any] | None: ver = _active_version() if not ver: return None try: doc = _coll.find_one({"version": ver}) return doc.get("config") if doc else None except Exception: return None
[docs] def fetch_config_by_version(version: str | None) -> dict[str, Any] | None: if not version: return fetch_active_config() try: doc = _coll.find_one({"version": version}) return doc.get("config") if doc else None except Exception: return None
def _scrub_mongo_uri(uri: str) -> str: """Hide credentials while keeping host/port visible in logs.""" if "@" not in uri: return uri return re.sub(r"://[^@]+@", "://***@", uri)
[docs] def health_check() -> None: """ Lightweight connectivity check. Raises on failure so callers can differentiate config-not-found from Mongo-unreachable cases. """ try: _client.admin.command("ping") except Exception as exc: _log.error( "Mongo health check failed (MONGO_URI=%s): %s", _scrub_mongo_uri(MONGO_URI), exc, ) raise
[docs] def set_active_version(version: str) -> None: if not _coll.find_one({"version": version}): raise ValueError(f"Version '{version}' does not exist") _state.update_one( {"_id": "active"}, {"$set": {"version": version}}, upsert=True )
def _latest_version() -> str | None: try: doc = ( _coll.find({}, {"version": 1, "_id": 0}) .sort("created_at", DESCENDING) .limit(1) .next(None) ) return doc["version"] if doc else None except Exception: return None
[docs] def save_new_version( config: dict[str, Any], *, version: str | None, activate: bool, message: str | None = None, ) -> str: """ Insert a new version document. When *version* is None, bump the patch from the active version (or the latest, or 1.0.0 if none exist). """ now = datetime.utcnow() if version is None: base = _active_version() or _latest_version() or "1.0.0" try: # If base is bootstrap 1.0.0 and missing, still bump from it. version = _bump_patch(base) except Exception: # if base invalid, fallback to 1.0.1 version = "1.0.1" else: # validate provided version _parse_ver(version) doc = { "version": version, "config": config, "created_at": now, "message": message, } try: _coll.insert_one(doc) except DuplicateKeyError: raise ValueError(f"Version '{version}' already exists") if activate: set_active_version(version) return version
[docs] def ensure_bootstrap_from_file(cfg_path: Path) -> None: """ Load endpoints.json and store it as version 1.0.0 (active) when the collection is empty. """ try: if _coll.estimated_document_count() > 0: return except Exception: # Skip when the collection is a fake (e.g., under tests). return if not cfg_path.exists(): return try: raw = json.loads(cfg_path.read_text()) # Avoid double insert if race if not _coll.find_one({"version": "1.0.0"}): _coll.insert_one( { "version": "1.0.0", "config": raw, "created_at": datetime.utcnow(), "message": "Bootstrap from endpoints.json", } ) set_active_version("1.0.0") # Also record as last_good on first boot set_last_good_version("1.0.0") except Exception: # swallow bootstrap failures; loader will raise later if nothing exists pass
# ——— NEW: deletion ———
[docs] def delete_version(version: str) -> None: """ Delete a saved config version (refuse when it is the active version) and adjust the last_good pointer when needed. """ if not version: raise ValueError("Version must be provided") if version == _active_version(): raise ValueError("Cannot delete the active version") res = _coll.delete_one({"version": version}) if res.deleted_count == 0: raise ValueError(f"Version '{version}' not found") # If last_good pointed to this, move it to active or latest remaining lg = get_last_good_version() if lg == version: replacement = _active_version() or _latest_version() if replacement: set_last_good_version(replacement) else: _state.delete_one({"_id": "last_good"})