Source code for ska_oso_oet.ui

"""
The ska_oso_oet.ui package contains code that present the OET interface to the
outside world. In practical terms, this means the OET application's REST
interface
"""

import json
import multiprocessing
import time
from threading import Event
from typing import Generator

from fastapi import APIRouter, FastAPI, Request
from fastapi.responses import StreamingResponse
from pubsub import pub
from pydantic import BaseModel
from ska_aaa_authhelpers import Role, watchdog

from ska_oso_oet.activity.ui import activities_router
from ska_oso_oet.auth import (
    OPERATOR_ROLE_FOR_TELESCOPE,
    OST_CLIENT_ID,
    Permissions,
    Scopes,
)
from ska_oso_oet.mptools import MPQueue
from ska_oso_oet.operator.ui import operator_actions_router
from ska_oso_oet.procedure.ui import procedures_router
from ska_oso_oet.utils.ui import API_PATH

sse_router = APIRouter(tags=["Server Sent Events"])

# EventSource.js library used in some front-end clients has a bug where they
# timeout after 45 seconds. To prevent this, we send a heartbeat.
HEARTBEAT_INTERVAL = 20.0


def serialize(obj):
    if isinstance(obj, BaseModel):
        return obj.model_dump()
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")


[docs] class Message(BaseModel): """ Data that is published as a server-sent event. """ data: str | dict type: str | None = None # pylint: disable=redefined-builtin id: float | int | str | None = None # pylint: disable=redefined-builtin retry: int | None = None def __str__(self): """ Serialize this object to a string, according to the `server-sent events specification <https://www.w3.org/TR/eventsource/>`_. """ if isinstance(self.data, dict): data = json.dumps(self.data, default=serialize) else: data = self.data lines = ["data:{value}".format(value=line) for line in data.splitlines()] if self.type: lines.insert(0, "event:{value}".format(value=self.type)) if self.id: lines.append("id:{value}".format(value=self.id)) if self.retry: lines.append("retry:{value}".format(value=self.retry)) return "\n".join(lines) + "\n\n"
[docs] def messages(shutdown_event: Event) -> Generator[Message, None, None]: """ A generator of Message objects created from received pubsub events """ mp_context = multiprocessing.get_context() q = MPQueue(ctx=mp_context) def add_to_q(topic: pub.Topic = pub.AUTO_TOPIC, **kwargs): kwargs["topic"] = topic.name if "request_id" in kwargs: request_id = kwargs["request_id"] del kwargs["request_id"] msg = Message(data=kwargs, id=request_id) else: msg = Message(data=kwargs) q.put(msg) pub.subscribe(add_to_q, pub.ALL_TOPICS) last_message_time = time.time() while not shutdown_event.is_set(): msg = q.safe_get(timeout=0.1) current_time = time.time() if msg is not None: last_message_time = current_time yield msg elif current_time - last_message_time >= HEARTBEAT_INTERVAL: last_message_time = current_time yield Message(data={"type": "heartbeat"})
@sse_router.get( "/stream", description=( "Opens an SSE stream of messages that are published to the OET topics. All new" " messages will be streamed until the connection is closed. Messages will not" " appear in the SwaggerUI - open the request url in a separate browser tab" " instead." ), response_description=( "A stream of messages with the text/event-stream MIME type - see" " https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface" ), dependencies=[ Permissions( roles={Role.SW_ENGINEER, OPERATOR_ROLE_FOR_TELESCOPE, Role.APP2APP}, scopes={Scopes.ACTIVITY_READ}, app_ids={OST_CLIENT_ID}, ) ], ) async def stream(request: Request): shutdown_event = request.app.state.sse_shutdown_event def generator(): # must immediately yield to return 200 OK response to client, # otherwise response is only sent on first event yield "\n" for message in messages(shutdown_event): yield str(message) return StreamingResponse(generator(), media_type="text/event-stream") def create_fastapi_app(): app = FastAPI( title="Observation Execution Tool API", openapi_url=f"{API_PATH}/openapi.json", docs_url=f"{API_PATH}/ui", lifespan=watchdog(), ) app.include_router(activities_router, prefix=API_PATH) app.include_router(procedures_router, prefix=API_PATH) app.include_router(operator_actions_router, prefix=API_PATH) app.include_router(sse_router, prefix=API_PATH) return app