Source code for ska_oso_oet.utils.ui

"""
The ska_oso_oet.utils.ui module contains common helper code for the UI layers.
"""

import os
import time
from queue import Empty, Queue

from fastapi import HTTPException
from pubsub import pub
from pydantic import BaseModel, Field, field_validator

from ska_oso_oet.procedure import domain

# time allowed for FastAPI <-> ProcWorker communication before timeout
OET_REQUEST_TIMEOUT = int(os.getenv("OET_REQUEST_TIMEOUT", "30"))

KUBE_NAMESPACE = os.getenv("KUBE_NAMESPACE", "ska-oso-oet")
OET_MAJOR_VERSION = "10"
SUBARRAY_ID = os.getenv("SUBARRAY_ID")
# The base path includes the namespace which is known at runtime
# to avoid clashes in deployments, for example in CICD
API_PATH = f"/{KUBE_NAMESPACE}/oet/{SUBARRAY_ID}/api/v{OET_MAJOR_VERSION}"


[docs] class ProcedureInput(BaseModel): """ Note: This is different to the lower level ProcedureInput that allows things like ProcedureInput(1, 'a', subarray_id=1) Instead this type would be like ProcedureInput(args=[1,'a'], kwargs={"subarray_id":1}) """ args: list = Field(default_factory=list) kwargs: dict = Field(default_factory=dict) @field_validator("kwargs", mode="before") @classmethod def subarray_in_kwargs(cls, kwargs): if "subarray_id" in kwargs.keys(): raise ValueError("'subarray_id' should not be passed as a script arg.") if "context" in kwargs.keys(): raise ValueError( "'context' is a reserved key and should not be passed as a script arg." ) return kwargs
[docs] class ScriptArgs(BaseModel): init: ProcedureInput | None = None main: ProcedureInput | None = None
def call_and_respond(request_topic, response_topic, *args, **kwargs): q = Queue(1) my_request_id = time.time_ns() # msg_src MUST be part of method signature for pypubsub to function def callback(msg_src, request_id, result): # pylint: disable=unused-argument if my_request_id == request_id: q.put(result) pub.subscribe(callback, response_topic) msg_src = "FastAPIWorker" # With the callback now setup, publish an event to mark the user request event pub.sendMessage( request_topic, msg_src=msg_src, request_id=my_request_id, *args, **kwargs ) try: result = q.get(timeout=OET_REQUEST_TIMEOUT) if isinstance(result, Exception): if isinstance(result, OSError): detail = f"{result.strerror}: {result.filename}" else: detail = str(result) raise HTTPException(500, detail=detail) return result except Empty as err: detail = f"Timeout waiting for msg #{my_request_id} on topic {response_topic}" raise HTTPException(504, detail=detail) from err
[docs] def convert_request_to_procedure_input( api_input: ProcedureInput, ) -> domain.ProcedureInput: """ :param api_input: Request with the args and kwargs, eg {'args': [1, 2], 'kwargs': {'subarray_id': 42}} :return: The ProcedureInput, eg <ProcedureInput(1, 2, subarray_id=42)> """ return domain.ProcedureInput(*api_input.args, **api_input.kwargs)