Source code for ska_db_oda.script_helper

"""
Helper module for common functionality used in observing scripts.

These functions connect directly to a database using the UnitOfWork
so require `PG_` environment variables to be set.
"""

import logging
import os
from collections.abc import Callable
from datetime import datetime, timezone
from functools import wraps
from os import getenv

from ska_oso_pdm import OSOExecutionBlock, SBDefinition
from ska_oso_pdm._shared import PythonArguments, TelescopeType
from ska_oso_pdm.execution_block import ErrorWrapper, RequestResponse, ResponseWrapper

from ska_db_oda.unitofwork import PostgresUnitOfWork

LOGGER = logging.getLogger(__name__)


[docs] def save(sbd: SBDefinition) -> SBDefinition: """ Persist an SBDefinition entity in the ODA database. If the identifier field (sbd_id) is not present in the object, a new SBDefinition will be created. If the identifier field is present, the version of that identifier in the database will be updated. :param sbd: an SBDefinition to be persisted :returns: the entity as it has been persisted in the database, with identifiers and metadata :raises ConnectionError: if database persistence fails """ try: with PostgresUnitOfWork() as uow: persisted_sbd = uow.sbds.add(sbd) uow.commit() except Exception as err: msg = f"Error '{repr(err)}' while saving SBDefinition in the ODA database" LOGGER.exception(msg) raise ConnectionError(msg) from err return persisted_sbd
[docs] def create_eb(telescope: TelescopeType, sbi_ref: str | None = None) -> str: """ Create an "empty" ExecutionBlock in the ODA database. The created identifier is written to the `EB_ID` environment variable so subsequent `capture_request_response` calls can associate records with the same ExecutionBlock. :param telescope: telescope from which this EB originates :param sbi_ref: SBInstance identifier to associate with the created ExecutionBlock :return: persisted ExecutionBlock identifier :raises ConnectionError: if database persistence fails or no identifier is returned """ entity = OSOExecutionBlock(sbi_ref=sbi_ref, telescope=telescope) try: with PostgresUnitOfWork() as uow: persisted_eb = uow.ebs.add(entity) uow.commit() except Exception as err: msg = f"Error '{repr(err)}' while creating ExecutionBlock in the ODA database" LOGGER.exception(msg) raise ConnectionError(msg) from err os.environ["EB_ID"] = persisted_eb.eb_id return persisted_eb.eb_id
[docs] def capture_request_response(fn: Callable) -> Callable: """ Append the request_response from the decorated function to the current ExecutionBlock. Uses the `EB_ID` environment variable to look up the target ExecutionBlock. Any persistence failures are logged and do not block execution of the decorated function. """ def save_request_response(eb_id: str, request_response: RequestResponse): try: with PostgresUnitOfWork() as uow: eb = uow.ebs.get(eb_id) if eb.request_responses is None: eb.request_responses = [request_response] else: eb.request_responses.append(request_response) uow.ebs.add(eb) uow.commit() except Exception as err: LOGGER.error( "The update to ExecutionBlock %s request_response raised an error: %s", eb_id, repr(err), ) @wraps(fn) def wrapper(*args, **kwargs): LOGGER.debug("Capturing the request function %s in an ExecutionBlock", fn.__name__) eb_id = getenv("EB_ID") if eb_id is None: LOGGER.error( "The EB_ID not set, meaning Execution Block updates will not be sent to" " the ODA. Please ensure the create function from the EB ODA client has" " been called." ) return fn(*args, **kwargs) request_sent_at = datetime.now(tz=timezone.utc) try: result = fn(*args, **kwargs) response_received_at = datetime.now(tz=timezone.utc) except Exception as err: request_response = RequestResponse( request=f"{fn.__module__}.{fn.__name__}", request_args=PythonArguments(args=args, kwargs=kwargs), status="ERROR", error=ErrorWrapper(detail=repr(err)), request_sent_at=request_sent_at, ) save_request_response(eb_id, request_response) raise else: request_response = RequestResponse( request=f"{fn.__module__}.{fn.__name__}", request_args=PythonArguments(args=args, kwargs=kwargs), status="OK", response=ResponseWrapper(result=repr(result)), request_sent_at=request_sent_at, response_received_at=response_received_at, ) save_request_response(eb_id, request_response) return result return wrapper