"""
The ska_oso_oet.ui package contains code that present the OET interface to the
outside world. In practical terms, this means the OET application's REST
interface
"""
import json
import multiprocessing
import time
from threading import Event
from typing import Generator
from fastapi import APIRouter, FastAPI, Request
from fastapi.responses import StreamingResponse
from pubsub import pub
from pydantic import BaseModel
from ska_aaa_authhelpers import Role, watchdog
from ska_oso_oet.activity.ui import activities_router
from ska_oso_oet.auth import (
OPERATOR_ROLE_FOR_TELESCOPE,
OST_CLIENT_ID,
Permissions,
Scopes,
)
from ska_oso_oet.mptools import MPQueue
from ska_oso_oet.operator.ui import operator_actions_router
from ska_oso_oet.procedure.ui import procedures_router
from ska_oso_oet.utils.ui import API_PATH
sse_router = APIRouter(tags=["Server Sent Events"])
# EventSource.js library used in some front-end clients has a bug where they
# timeout after 45 seconds. To prevent this, we send a heartbeat.
HEARTBEAT_INTERVAL = 20.0
def serialize(obj):
if isinstance(obj, BaseModel):
return obj.model_dump()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
[docs]
class Message(BaseModel):
"""
Data that is published as a server-sent event.
"""
data: str | dict
type: str | None = None # pylint: disable=redefined-builtin
id: float | int | str | None = None # pylint: disable=redefined-builtin
retry: int | None = None
def __str__(self):
"""
Serialize this object to a string, according to the `server-sent events
specification <https://www.w3.org/TR/eventsource/>`_.
"""
if isinstance(self.data, dict):
data = json.dumps(self.data, default=serialize)
else:
data = self.data
lines = ["data:{value}".format(value=line) for line in data.splitlines()]
if self.type:
lines.insert(0, "event:{value}".format(value=self.type))
if self.id:
lines.append("id:{value}".format(value=self.id))
if self.retry:
lines.append("retry:{value}".format(value=self.retry))
return "\n".join(lines) + "\n\n"
[docs]
def messages(shutdown_event: Event) -> Generator[Message, None, None]:
"""
A generator of Message objects created from received pubsub events
"""
mp_context = multiprocessing.get_context()
q = MPQueue(ctx=mp_context)
def add_to_q(topic: pub.Topic = pub.AUTO_TOPIC, **kwargs):
kwargs["topic"] = topic.name
if "request_id" in kwargs:
request_id = kwargs["request_id"]
del kwargs["request_id"]
msg = Message(data=kwargs, id=request_id)
else:
msg = Message(data=kwargs)
q.put(msg)
pub.subscribe(add_to_q, pub.ALL_TOPICS)
last_message_time = time.time()
while not shutdown_event.is_set():
msg = q.safe_get(timeout=0.1)
current_time = time.time()
if msg is not None:
last_message_time = current_time
yield msg
elif current_time - last_message_time >= HEARTBEAT_INTERVAL:
last_message_time = current_time
yield Message(data={"type": "heartbeat"})
@sse_router.get(
"/stream",
description=(
"Opens an SSE stream of messages that are published to the OET topics. All new"
" messages will be streamed until the connection is closed. Messages will not"
" appear in the SwaggerUI - open the request url in a separate browser tab"
" instead."
),
response_description=(
"A stream of messages with the text/event-stream MIME type - see"
" https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface"
),
dependencies=[
Permissions(
roles={Role.SW_ENGINEER, OPERATOR_ROLE_FOR_TELESCOPE, Role.APP2APP},
scopes={Scopes.ACTIVITY_READ},
app_ids={OST_CLIENT_ID},
)
],
)
async def stream(request: Request):
shutdown_event = request.app.state.sse_shutdown_event
def generator():
# must immediately yield to return 200 OK response to client,
# otherwise response is only sent on first event
yield "\n"
for message in messages(shutdown_event):
yield str(message)
return StreamingResponse(generator(), media_type="text/event-stream")
def create_fastapi_app():
app = FastAPI(
title="Observation Execution Tool API",
openapi_url=f"{API_PATH}/openapi.json",
docs_url=f"{API_PATH}/ui",
lifespan=watchdog(),
)
app.include_router(activities_router, prefix=API_PATH)
app.include_router(procedures_router, prefix=API_PATH)
app.include_router(operator_actions_router, prefix=API_PATH)
app.include_router(sse_router, prefix=API_PATH)
return app