"""
Helper module for common functionality used in observing scripts.
These functions connect directly to a database using the UnitOfWork
so require `PG_` environment variables to be set.
"""
import logging
import os
from collections.abc import Callable
from datetime import datetime, timezone
from functools import wraps
from os import getenv
from ska_oso_pdm import OSOExecutionBlock, SBDefinition
from ska_oso_pdm._shared import PythonArguments, TelescopeType
from ska_oso_pdm.execution_block import ErrorWrapper, RequestResponse, ResponseWrapper
from ska_db_oda.unitofwork import PostgresUnitOfWork
LOGGER = logging.getLogger(__name__)
[docs]
def save(sbd: SBDefinition) -> SBDefinition:
"""
Persist an SBDefinition entity in the ODA database.
If the identifier field (sbd_id) is not present in the object,
a new SBDefinition will be created. If the identifier field is present,
the version of that identifier in the database will be updated.
:param sbd: an SBDefinition to be persisted
:returns: the entity as it has been persisted in the database, with identifiers and metadata
:raises ConnectionError: if database persistence fails
"""
try:
with PostgresUnitOfWork() as uow:
persisted_sbd = uow.sbds.add(sbd)
uow.commit()
except Exception as err:
msg = f"Error '{repr(err)}' while saving SBDefinition in the ODA database"
LOGGER.exception(msg)
raise ConnectionError(msg) from err
return persisted_sbd
[docs]
def create_eb(telescope: TelescopeType, sbi_ref: str | None = None) -> str:
"""
Create an "empty" ExecutionBlock in the ODA database.
The created identifier is written to the `EB_ID` environment variable so subsequent
`capture_request_response` calls can associate records with the same ExecutionBlock.
:param telescope: telescope from which this EB originates
:param sbi_ref: SBInstance identifier to associate with the created ExecutionBlock
:return: persisted ExecutionBlock identifier
:raises ConnectionError: if database persistence fails or no identifier is returned
"""
entity = OSOExecutionBlock(sbi_ref=sbi_ref, telescope=telescope)
try:
with PostgresUnitOfWork() as uow:
persisted_eb = uow.ebs.add(entity)
uow.commit()
except Exception as err:
msg = f"Error '{repr(err)}' while creating ExecutionBlock in the ODA database"
LOGGER.exception(msg)
raise ConnectionError(msg) from err
os.environ["EB_ID"] = persisted_eb.eb_id
return persisted_eb.eb_id
[docs]
def capture_request_response(fn: Callable) -> Callable:
"""
Append the request_response from the decorated function to the current ExecutionBlock.
Uses the `EB_ID` environment variable to look up the target ExecutionBlock.
Any persistence failures are logged and do not block execution of the decorated function.
"""
def save_request_response(eb_id: str, request_response: RequestResponse):
try:
with PostgresUnitOfWork() as uow:
eb = uow.ebs.get(eb_id)
if eb.request_responses is None:
eb.request_responses = [request_response]
else:
eb.request_responses.append(request_response)
uow.ebs.add(eb)
uow.commit()
except Exception as err:
LOGGER.error(
"The update to ExecutionBlock %s request_response raised an error: %s",
eb_id,
repr(err),
)
@wraps(fn)
def wrapper(*args, **kwargs):
LOGGER.debug("Capturing the request function %s in an ExecutionBlock", fn.__name__)
eb_id = getenv("EB_ID")
if eb_id is None:
LOGGER.error(
"The EB_ID not set, meaning Execution Block updates will not be sent to"
" the ODA. Please ensure the create function from the EB ODA client has"
" been called."
)
return fn(*args, **kwargs)
request_sent_at = datetime.now(tz=timezone.utc)
try:
result = fn(*args, **kwargs)
response_received_at = datetime.now(tz=timezone.utc)
except Exception as err:
request_response = RequestResponse(
request=f"{fn.__module__}.{fn.__name__}",
request_args=PythonArguments(args=args, kwargs=kwargs),
status="ERROR",
error=ErrorWrapper(detail=repr(err)),
request_sent_at=request_sent_at,
)
save_request_response(eb_id, request_response)
raise
else:
request_response = RequestResponse(
request=f"{fn.__module__}.{fn.__name__}",
request_args=PythonArguments(args=args, kwargs=kwargs),
status="OK",
response=ResponseWrapper(result=repr(result)),
request_sent_at=request_sent_at,
response_received_at=response_received_at,
)
save_request_response(eb_id, request_response)
return result
return wrapper