Source code for lifespan

# File: lifespan.py
from __future__ import annotations

import asyncio
import importlib
import logging
from contextlib import asynccontextmanager

logger = logging.getLogger(__name__)


[docs] @asynccontextmanager async def lifespan(app): # Resolve modules at runtime so tests can monkeypatch them safely pubsub = importlib.import_module("pubsub") endpoint_config = importlib.import_module("endpoint_config") widgets_registry = importlib.import_module("widgets.widgets_registry") # 1) initialise pubsub backend try: await pubsub.init_pubsub() except Exception: logger.exception("Startup failure: pubsub.init_pubsub()") raise # 2) init widgets registry (indexes + optional seeding from env) try: await widgets_registry.ensure_ready() except Exception: logger.exception("Startup failure: widgets_registry.ensure_ready()") raise # 3) start any background local-data streams producer_tasks = [] for raw, fn in (endpoint_config.get_stream_funcs() or {}).items(): producer_tasks.append( asyncio.create_task( pubsub.run_producer(raw.removeprefix("stream_"), fn) ) ) try: yield finally: for t in producer_tasks: t.cancel() await asyncio.gather(*producer_tasks, return_exceptions=True) # 4) close shared HTTP session for widget registry try: await widgets_registry.shutdown() except Exception: pass