Source code for ska_db_oda.client.oda_helper

"""
This helper module is intended to be imported when sending commands to the telescope,
for example during a Jupyter notebook session or SB driven observing from the OET.

It provides functionality for specific use cases of interactions with the ODA.
As use cases expand, we may look to generalise this. However, for now we are happy to
keep it as lightweight as possible. It is imported into the ska-oso-scripting package
so users can import it from that namespace.
"""

import logging
import os
from datetime import datetime, timezone
from functools import wraps
from http import HTTPStatus
from os import getenv
from typing import Callable, Optional

import requests
from ska_oso_pdm import OSOExecutionBlock, SBDefinition
from ska_oso_pdm._shared import PythonArguments, TelescopeType
from ska_oso_pdm.execution_block import ErrorWrapper, RequestResponse, ResponseWrapper

from ska_db_oda.domain import CODEC

LOGGER = logging.getLogger(__name__)


[docs] def save(sbd: SBDefinition) -> SBDefinition: """ Persists the entity in the ODA. If the identifier field (e.g. sbd_id for an SBDefinition) is not present in the object, a new version entity will be created with an identifier fetched from SKUID. If the identifier field is present then the version of that identifier in the ODA will be updated, if it exists. NOTE: this function assumes the ODA_URL environment variable is set to a running instance of the ODA :param sbd: an SBDefinition to be persisted :returns: the entity as it has been persisted in the ODA, with identifiers and metadata :raises KeyError: if ODA_URL is not set or if the identifier is given on the object, but it is not found in the ODA :raises ConnectionError: captures all errors and other responses from the ODA, including serialisation issues """ oda_url = getenv("ODA_URL") if not oda_url: raise KeyError( "ODA_URL environment variable is not set. Please set to a running instance" " of the ODA, eg" " ODA_URL=https://k8s.stfc.skao.int/staging-ska-db-oda/oda/api/v1/" ) # rstrip means it doesn't matter whether the client sets the trailing slash on the env variable if sbd.sbd_id: request_fn = requests.put request_uri = f"{oda_url.rstrip('/')}/sbds/{sbd.sbd_id}" else: request_fn = requests.post request_uri = f"{oda_url.rstrip('/')}/sbds" # Send the request and handle any unexpected errors with it try: LOGGER.debug("Sending request to %s", request_uri) response = request_fn( request_uri, data=CODEC.dumps(sbd), headers={"Content-type": "application/json"}, ) except requests.exceptions.RequestException as err: msg = f"Error '{repr(err)}' while sending post request to {request_uri}" LOGGER.exception(msg) raise ConnectionError(msg) from err if response.status_code == HTTPStatus.NOT_FOUND: raise KeyError( f"The requested identifier {sbd.sbd_id} could not be found. If you are" " trying to save a new SBDefintion, then the sbd_id field should be None" " and the ODA will generate the identifier" ) # Handle a request than returns a response with a non-success code if response.status_code != HTTPStatus.OK: try: detail = response.json()["detail"] except (requests.exceptions.JSONDecodeError, ValueError): # This is the case where the error response is one that is not explicitly raised by our # ODA server, ie something unexpected has gone wrong detail = response.text msg = ( f"Response status {response.status_code} with body '{detail}' while sending" f" post request to {request_uri}" ) LOGGER.error(msg) raise ConnectionError(msg) try: return CODEC.loads(SBDefinition, response.text) except (requests.exceptions.JSONDecodeError, KeyError) as err: # This scenario (ie a 200 response but not the usual ODA response body) should only occur with a # deployed instance of the ODA where requests are redirected to the SSO login page msg = ( "Request to create EB in the ODA returned a success status but the eb_id" f" was not present in the body. Instead received: {response.text}" ) LOGGER.error(msg) raise ConnectionError(msg) from err
[docs] def create_eb(telescope: TelescopeType, sbi_ref: Optional[str] = None) -> str: """ Calls the ODA POST /ebs API to create an 'empty' ExecutionBlock in the ODA, ready to be updated with request/responses from telescope operation. This function will also set the EB_ID environment variable to the value of the eb_id returned from the create request, so it can be used by subsequent calls to capture_request_response during the same session. Important: the ODA_URL environment variables must be set to a running instance of the ODA, eg https://k8s.stfc.skao.int/staging-ska-db-oda/oda/api/v1/ :param telescope: telescope from which this EB originates :param sbi_ref: the identifier of the SBInstance that the created ExecutionBlock should be linked to :return: The eb_id generated from SKUID that is persisted in the ODA :raises KeyError: if the ODA_URL variable is not set :raises ConnectionError: if the ODA requests raises an error or returns a status code other than 200 """ oda_url = getenv("ODA_URL") if not oda_url: raise KeyError( "ODA_URL environment variable is not set. Please set to a running instance" " of the ODA, eg" " ODA_URL=https://k8s.stfc.skao.int/staging-ska-db-oda/oda/api/v1/" ) entity = OSOExecutionBlock(sbi_ref=sbi_ref, telescope=telescope) request_uri = f"{oda_url.rstrip('/')}/ebs" try: response = requests.post( request_uri, data=CODEC.dumps(entity), headers={"Content-type": "application/json"}, ) except requests.exceptions.RequestException as err: msg = f"Error '{repr(err)}' while sending post request to {request_uri}" LOGGER.exception(msg) raise ConnectionError(msg) from err if response.status_code != HTTPStatus.OK: try: detail = response.json()["detail"] except (requests.exceptions.JSONDecodeError, KeyError): # This is the case where the error response is one that is not explicitly raised by our # ODA server, ie something unexpected has gone wrong detail = response.text msg = ( f"Response status {response.status_code} with body '{detail}' while sending" f" post request to {request_uri}" ) LOGGER.error(msg) raise ConnectionError(msg) try: eb_id = response.json()["eb_id"] except (requests.exceptions.JSONDecodeError, KeyError) as err: # This scenario (ie a 200 response but not the usual ODA response body) should only occur with a # deployed instance of the ODA where requests are redirected to the SSO login page msg = ( "Request to create EB in the ODA returned a success status but the eb_id" f" was not present in the body. Instead received: {response.text}" ) LOGGER.error(msg) raise ConnectionError(msg) from err os.environ["EB_ID"] = eb_id return eb_id
[docs] def capture_request_response(fn: Callable) -> Callable: """ A decorator function which will record requests and responses sent to the telescope in an Execution Block within the ODA. It will send individual request_response objects to the ODA PATCH /ebs/<eb_id>/request_response API over HTTP, containing the decorated function name, the arguments and the return value, as well as timestamps. Important: the function assumes two environment variables are set: * ODA_URL: the location of a running instance of the ODA, eg https://k8s.stfc.skao.int/staging-ska-db-oda/oda/api/v1/ * EB_ID: the identifier of the ExecutionBlock to update. The create_eb function from this module should have already been called during execution, which will set this variable. The decorator is designed such that it does not block execution of commands if there is a problem with the ODA connection, or the environment variables are not set. Instead, a warning message is logged and execution allowed to continue. Also, any errors raised by the decorated function will not be changed, they will just be recorded in the ODA and reraised. The standard OSO Scripting functions are decorated using this function, so will automatically record request/responses. To record other function calls in an Execution Block, either use this decorator in your source code: .. highlight:: python .. code-block:: python from ska_db_oda.client.ebclient import capture_request_response @capture_request_response def my_function_to_record(args): ... or use at runtime when calling the function: .. highlight:: python .. code-block:: python from ska_db_oda.client.ebclient import capture_request_response capture_request_response(my_function_to_record)(args) """ def send_request_response(oda_url: str, eb_id: str, request_body: RequestResponse): request_response_uri = f"{oda_url.rstrip('/')}/ebs/{eb_id}/request_response" LOGGER.debug( "Sending request to %s with body %s", request_response_uri, request_body ) try: response = requests.patch( request_response_uri, data=CODEC.dumps(request_body), headers={"Content-type": "application/json"}, ) except requests.exceptions.RequestException as err: LOGGER.error( "The request to %s raised an error: %s", request_response_uri, repr(err), ) else: if response.status_code != HTTPStatus.OK: LOGGER.error( "The request to %s returned an error code %s, with message: %s", request_response_uri, int(response.status_code), repr(response.content), ) @wraps(fn) def wrapper(*args, **kwargs): LOGGER.debug( "Capturing the request function %s in an ExecutionBlock", fn.__name__ ) ODA_URL = getenv("ODA_URL") if ODA_URL is None: LOGGER.error( "The ODA_URL not set, meaning Execution Block updates will not be sent" " to the ODA. Please set this variable to the URL for an instance of" " the ODA, eg ODA_URL=http://k8s.stfc.skao.int/staging-ska-db-oda/" ) return fn(*args, **kwargs) EB_ID = getenv("EB_ID") if EB_ID is None: LOGGER.error( "The EB_ID not set, meaning Execution Block updates will not be sent to" " the ODA. Please ensure the create function from the EB ODA client has" " been called." ) return fn(*args, **kwargs) request_sent_at = datetime.now(tz=timezone.utc) try: result = fn(*args, **kwargs) response_received_at = datetime.now(tz=timezone.utc) except Exception as err: request_body = RequestResponse( request=f"{fn.__module__}.{fn.__name__}", request_args=PythonArguments(args=args, kwargs=kwargs), status="ERROR", error=ErrorWrapper(detail=repr(err)), request_sent_at=request_sent_at, ) send_request_response(ODA_URL, EB_ID, request_body) raise err else: request_body = RequestResponse( request=f"{fn.__module__}.{fn.__name__}", request_args=PythonArguments(args=args, kwargs=kwargs), status="OK", response=ResponseWrapper(result=repr(result)), request_sent_at=request_sent_at, response_received_at=response_received_at, ) send_request_response(ODA_URL, EB_ID, request_body) return result return wrapper