Source code for ska_oso_oet.ui

The ska_oso_oet.ui package contains code that present the OET interface to the
outside world. In practical terms, this means the OET application's REST
import json
import multiprocessing
import os
from typing import Any, Dict, Generator, Optional, Union

import flask
import jsonpickle
import prance
from connexion import App
from flask import Blueprint, current_app, stream_with_context
from flask_swagger_ui import get_swaggerui_blueprint
from pubsub import pub

from ska_oso_oet.mptools import MPQueue

KUBE_NAMESPACE = os.getenv("KUBE_NAMESPACE", "ska-oso-oet")

[docs] class Message: """ Data that is published as a server-sent event. """
[docs] def __init__( self, data: Union[str, dict], type: Optional[str] = None, # pylint: disable=redefined-builtin id: Optional[ # pylint: disable=redefined-builtin Union[float, int, str] ] = None, retry: Optional[int] = None, ): """ Create a server-sent event. :param data: The event data. :param type: An optional event type. :param id: An optional event ID. :param retry: An optional integer, to specify the reconnect time for disconnected clients of this stream. """ = data self.type = type = id self.retry = retry
def __str__(self): """ Serialize this object to a string, according to the `server-sent events specification <>`_. """ if isinstance(, dict): data = jsonpickle.dumps( else: data = lines = ["data:{value}".format(value=line) for line in data.splitlines()] if self.type: lines.insert(0, "event:{value}".format(value=self.type)) if lines.append("id:{value}".format( if self.retry: lines.append("retry:{value}".format(value=self.retry)) return "\n".join(lines) + "\n\n" def __eq__(self, other): return ( isinstance(other, self.__class__) and == and self.type == other.type and == and self.retry == other.retry )
[docs] class ServerSentEventsBlueprint(Blueprint): """ A :class:`flask.Blueprint` subclass that knows how to subscribe to pypubsub topics and stream pubsub events as server-sent events. """
[docs] def __init__(self, *args, mp_context=None, **kwargs): super().__init__(*args, **kwargs) if mp_context is None: mp_context = multiprocessing.get_context() self._mp_context = mp_context
[docs] def messages(self) -> Generator[Message, None, None]: """ A generator of Message objects created from received pubsub events """ q = MPQueue(ctx=self._mp_context) def add_to_q(topic: pub.Topic = pub.AUTO_TOPIC, **kwargs): kwargs["topic"] = other = {} if "request_id" in kwargs: other["id"] = kwargs["request_id"] del kwargs["request_id"] msg = Message(kwargs, **other) q.put(msg) pub.subscribe(add_to_q, pub.ALL_TOPICS) shutdown_event = current_app.config["shutdown_event"] while not shutdown_event.is_set(): msg = q.safe_get(timeout=0.1) if msg is not None: yield msg
def stream(self) -> flask.Response: @stream_with_context def generator(): # must immediately yield to return 200 OK response to client, # otherwise response is only sent on first event yield "\n" for message in self.messages(): yield str(message) return current_app.response_class(generator(), mimetype="text/event-stream")
[docs] def get_openapi_spec() -> Dict[str, Any]: "Parses and Returns OpenAPI spec" cwd, _ = os.path.split(__file__) path = os.path.join(cwd, "./openapi/oet-openapi-v1.yaml") parser = prance.ResolvingParser(path, lazy=True, strict=True) parser.parse() spec = parser.specification # The OpenAPI specs define the server url with the default namespace, ska-oso-oet. We want to overwrite this to the actual namespace at runtime, for example in the CICD deployments. spec["servers"][0]["url"] = f"/{KUBE_NAMESPACE}/ska-oso-oet/api/v1.0" return spec
[docs] def create_app(open_api_spec=None): "Returns Flask App using Connexion" if open_api_spec is None: open_api_spec = get_openapi_spec() connexion = App(__name__, specification_dir="openapi/") connexion.add_api( open_api_spec, base_path="/api/v1.0", arguments={"title": "OpenAPI OET"}, pythonic_params=True, ) # TODO: Due to the limitation of Swagger Open API, we kept the same earlier blueprint approach for Stream API and couldn't include it in the open API spec, we can plan this work when full SSE support is available in OPEN API 3.0 or any latest version. sse = ServerSentEventsBlueprint("sse", __name__) sse.add_url_rule(rule="", endpoint="stream",, url_prefix="/api/v1.0/stream") # Use the Flask blueprint rather than letting the Connexion extra dependency handle the SwaggerUI. # This is because our k8s ingress rules require the SwaggerUI to make requests to a different base path # than the spec is registered with above. swagger_ui_url = f"/{KUBE_NAMESPACE}/ska-oso-oet/ui" specification_url = f"/{KUBE_NAMESPACE}/ska-oso-oet/openapi/oet-openapi-v1.yaml" swagger_urls_config = [{"url": specification_url, "name": "OET API"}] # Swagger UI expects the resolved spec files to be available at the fixed location def api_spec(): return json.dumps(open_api_spec) swaggerui_blueprint = get_swaggerui_blueprint( swagger_ui_url, specification_url, config={ "app_name": "Observation Execution Tool API", "urls": swagger_urls_config, "showCommonExtensions": True, }, ), url_prefix=swagger_ui_url) def server_error_response(cause): """ Custom error handler for Procedure API. This is overloaded for 400, 404, 500 and 504 and could conceivably be extended for other errors by adding the appropriate errorhander decorator. :param cause: root exception for failure (e.g., KeyError) :return: HTTP Response """ response = cause.get_response() if isinstance(cause.description, dict): response_data = { "error": f"{cause.code} {}", "type": cause.description["type"], "Message": cause.description["Message"], } else: response_data = { "error": f"{cause.code} {}", "type":, "Message": cause.description, } response.content_type = "application/json" = json.dumps(response_data) return response return