"""
The ska_oso_oet.utils.ui module contains common helper code for the UI layers.
"""
import os
import time
from queue import Empty, Queue
from fastapi import HTTPException
from pubsub import pub
from pydantic import BaseModel, Field, field_validator
from ska_oso_oet.procedure import domain
# time allowed for FastAPI <-> ProcWorker communication before timeout
OET_REQUEST_TIMEOUT = int(os.getenv("OET_REQUEST_TIMEOUT", "30"))
KUBE_NAMESPACE = os.getenv("KUBE_NAMESPACE", "ska-oso-oet")
OET_MAJOR_VERSION = "10"
SUBARRAY_ID = os.getenv("SUBARRAY_ID")
# The base path includes the namespace which is known at runtime
# to avoid clashes in deployments, for example in CICD
API_PATH = f"/{KUBE_NAMESPACE}/oet/{SUBARRAY_ID}/api/v{OET_MAJOR_VERSION}"
[docs]
class ScriptArgs(BaseModel):
init: ProcedureInput | None = None
main: ProcedureInput | None = None
def call_and_respond(request_topic, response_topic, *args, **kwargs):
q = Queue(1)
my_request_id = time.time_ns()
# msg_src MUST be part of method signature for pypubsub to function
def callback(msg_src, request_id, result): # pylint: disable=unused-argument
if my_request_id == request_id:
q.put(result)
pub.subscribe(callback, response_topic)
msg_src = "FastAPIWorker"
# With the callback now setup, publish an event to mark the user request event
pub.sendMessage(
request_topic, msg_src=msg_src, request_id=my_request_id, *args, **kwargs
)
try:
result = q.get(timeout=OET_REQUEST_TIMEOUT)
if isinstance(result, Exception):
if isinstance(result, OSError):
detail = f"{result.strerror}: {result.filename}"
else:
detail = str(result)
raise HTTPException(500, detail=detail)
return result
except Empty as err:
detail = f"Timeout waiting for msg #{my_request_id} on topic {response_topic}"
raise HTTPException(504, detail=detail) from err