Source code for lifespan
# File: lifespan.py
from __future__ import annotations
import asyncio
import importlib
import logging
from contextlib import asynccontextmanager
logger = logging.getLogger(__name__)
[docs]
@asynccontextmanager
async def lifespan(app):
# Resolve modules at runtime so tests can monkeypatch them safely
pubsub = importlib.import_module("pubsub")
endpoint_config = importlib.import_module("endpoint_config")
widgets_registry = importlib.import_module("widgets.widgets_registry")
# 1) initialise pubsub backend
try:
await pubsub.init_pubsub()
except Exception:
logger.exception("Startup failure: pubsub.init_pubsub()")
raise
# 2) init widgets registry (indexes + optional seeding from env)
try:
await widgets_registry.ensure_ready()
except Exception:
logger.exception("Startup failure: widgets_registry.ensure_ready()")
raise
# 3) start any background local-data streams
producer_tasks = []
for raw, fn in (endpoint_config.get_stream_funcs() or {}).items():
producer_tasks.append(
asyncio.create_task(
pubsub.run_producer(raw.removeprefix("stream_"), fn)
)
)
try:
yield
finally:
for t in producer_tasks:
t.cancel()
await asyncio.gather(*producer_tasks, return_exceptions=True)
# 4) close shared HTTP session for widget registry
try:
await widgets_registry.shutdown()
except Exception:
pass