Source code for ska_db_oda.infrastructure.rest.repository

"""
This module contains the bridge implementation for Repository class which connects to an
ska-db-oda deployment over the network.
"""
# pylint: disable=abstract-method
import json
import logging
from http import HTTPStatus
from typing import List, Optional, TypeVar

import requests
from ska_oso_pdm import Metadata

from ska_db_oda.domain import OSOEntity, get_identifier_or_fetch_from_skuid
from ska_db_oda.domain.query import DateQuery, QueryParams, UserQuery
from ska_db_oda.domain.repository import RepositoryBridge
from ska_db_oda.infrastructure.rest.mapping import RESTMapping

LOGGER = logging.getLogger(__name__)

T = TypeVar("T", bound=OSOEntity)
U = TypeVar("U")


[docs] class RESTBridge(RepositoryBridge[T, U]): """ Implementation of the Repository bridge which connects to the ODA API. """ def __init__(self, rest_mapping: RESTMapping, base_rest_uri: str): self._rest_mapping = rest_mapping self._base_rest_uri = base_rest_uri self._resource_rest_uri = ( f"{self._base_rest_uri}/{self._rest_mapping.resource_name}" ) self.transactions: dict[U, T] = {}
[docs] def create(self, entity: T) -> T: """ Adds the entity to a pending transaction, ready to be committed as part of the UoW Note unlike other implementations, this method does not update the metadata, as this is done inside the ODA service after the API is called """ entity_id = get_identifier_or_fetch_from_skuid(entity) LOGGER.debug( "Adding entity with ID %s to the REST repository transactions", entity_id ) self.transactions[entity_id] = entity return entity
[docs] def read(self, entity_id: U) -> T: """ Gets the latest version of the entity with the given entity_id. As this method will always be accessed in the context of a UnitOfWork, the pending transactions also need to be checked. (Similar to with a database implementation where an entity that was added to a transaction but not committed would still be accessible inside the transaction.) """ LOGGER.debug("Getting entity with ID %s from the REST repository", entity_id) if entity_id in self.transactions: return self.transactions[entity_id] entity_uri = self._uri_from_entity_id(entity_id) try: LOGGER.debug("Sending GET request to %s", entity_uri) response = requests.get(entity_uri) except requests.RequestException as err: msg = f"Error retrieving {entity_id} from {entity_uri}" LOGGER.exception(msg) raise OSError(msg) from err if response.status_code == HTTPStatus.OK: return self._rest_mapping.deserialise(response.content) if response.status_code == HTTPStatus.NOT_FOUND: try: raise KeyError(response.json()["detail"]) except requests.exceptions.JSONDecodeError as err: raise OSError( f"URL {entity_uri} cannot be found, likely the host or namespace is" " wrong" ) from err raise OSError(response.content)
[docs] def update(self, entity: T) -> T: """ Adds the entity to a pending transaction, ready to be committed as part of the UoW. As the update method edits in the existing entity, if an entity with the same ID is passed twice to this method, it will overwrite the previous version in the pending transactions. """ entity_id = get_identifier_or_fetch_from_skuid(entity) LOGGER.debug( "Updating entity with ID %s in the REST repository transactions", entity_id ) self.transactions[entity_id] = entity return entity
[docs] def query(self, qry_params: QueryParams) -> List[T]: """ Queries the ODA API with the parameters See :func:`~ska_db_oda.domain.repository.AbstractRepository.query` docstring for details """ param_string = self.get_param_string(qry_params) query_uri = self._resource_rest_uri + param_string try: response = requests.get(query_uri) except requests.RequestException as err: msg = f"Error retrieving query from {query_uri}: {err.args}" LOGGER.exception(msg) raise OSError(msg) from err if response.status_code == HTTPStatus.OK: # The REST API will return a json list of objects - the TypeAdapter will deserialise this list return [ self._rest_mapping.deserialise(json.dumps(entity_obj)) for entity_obj in response.json() ] # ta = TypeAdapter(List[OSOExecutionBlock]) # return ta.validate_python(response.json()) raise OSError(response.content)
def get_param_string(self, qry_params: QueryParams): if isinstance(qry_params, UserQuery): if qry_params.entity_id: return f"?entity_id={qry_params.entity_id}&match_type={qry_params.match_type.value}" return f"?user={qry_params.user}&match_type={qry_params.match_type.value}" if isinstance(qry_params, DateQuery): match qry_params.query_type: case DateQuery.QueryType.MODIFIED_BETWEEN: field = "last_modified" case DateQuery.QueryType.CREATED_BETWEEN: field = "created" case _: raise ValueError(f"Unsupported query type {qry_params.query_type}") if qry_params.start: param_string = f"?{field}_after={qry_params.start.isoformat()}" if qry_params.end: param_string += f"&{field}_before={qry_params.end.isoformat()}" return param_string if qry_params.end: return f"?{field}_before={qry_params.end.isoformat()}" raise ValueError( f"Unsupported query parameters {qry_params.__class__.__name__}" ) def _get_latest_metadata(self, entity: OSOEntity) -> Optional[Metadata]: """ This method is deliberately not implemented, as it is used to update metadata, and the RESTRepository makes requests to the API which uses another repository implementation which will do the update. """ raise NotImplementedError def _uri_from_entity_id(self, entity_id: U) -> str: """ Returns the full REST URI for the entity_id, eg http://1.1.1.1/ska-db-oda/oda/api/v1/sbds/sbd-mvp01-20220713-00011 """ return f"{self._resource_rest_uri}/{entity_id}" def __contains__(self, entity_id: U): """ Checks if the entity_id is in the Repository - useful for tests. """ try: self.read(entity_id) return True except KeyError: return False