"""
This module contains the bridge implementation for Repository class which connects to an
ska-db-oda deployment over the network.
"""
# pylint: disable=abstract-method
import json
import logging
from http import HTTPStatus
from typing import List, Optional, TypeVar
import requests
from ska_oso_pdm import Metadata
from ska_db_oda.domain import OSOEntity, get_identifier_or_fetch_from_skuid
from ska_db_oda.domain.query import DateQuery, QueryParams, UserQuery
from ska_db_oda.domain.repository import RepositoryBridge
from ska_db_oda.infrastructure.rest.mapping import RESTMapping
LOGGER = logging.getLogger(__name__)
T = TypeVar("T", bound=OSOEntity)
U = TypeVar("U")
[docs]
class RESTBridge(RepositoryBridge[T, U]):
"""
Implementation of the Repository bridge which connects to the ODA API.
"""
def __init__(self, rest_mapping: RESTMapping, base_rest_uri: str):
self._rest_mapping = rest_mapping
self._base_rest_uri = base_rest_uri
self._resource_rest_uri = (
f"{self._base_rest_uri}/{self._rest_mapping.resource_name}"
)
self.transactions: dict[U, T] = {}
[docs]
def create(self, entity: T) -> T:
"""
Adds the entity to a pending transaction, ready to be committed as part of the UoW
Note unlike other implementations, this method does not update the metadata, as this is done
inside the ODA service after the API is called
"""
entity_id = get_identifier_or_fetch_from_skuid(entity)
LOGGER.debug(
"Adding entity with ID %s to the REST repository transactions", entity_id
)
self.transactions[entity_id] = entity
return entity
[docs]
def read(self, entity_id: U) -> T:
"""
Gets the latest version of the entity with the given entity_id.
As this method will always be accessed in the context of a UnitOfWork, the pending transactions
also need to be checked.
(Similar to with a database implementation where an entity that was added to a transaction but
not committed would still be accessible inside the transaction.)
"""
LOGGER.debug("Getting entity with ID %s from the REST repository", entity_id)
if entity_id in self.transactions:
return self.transactions[entity_id]
entity_uri = self._uri_from_entity_id(entity_id)
try:
LOGGER.debug("Sending GET request to %s", entity_uri)
response = requests.get(entity_uri)
except requests.RequestException as err:
msg = f"Error retrieving {entity_id} from {entity_uri}"
LOGGER.exception(msg)
raise OSError(msg) from err
if response.status_code == HTTPStatus.OK:
return self._rest_mapping.deserialise(response.content)
if response.status_code == HTTPStatus.NOT_FOUND:
try:
raise KeyError(response.json()["detail"])
except requests.exceptions.JSONDecodeError as err:
raise OSError(
f"URL {entity_uri} cannot be found, likely the host or namespace is"
" wrong"
) from err
raise OSError(response.content)
[docs]
def update(self, entity: T) -> T:
"""
Adds the entity to a pending transaction, ready to be committed as part of the UoW.
As the update method edits in the existing entity, if an entity with the same ID is passed twice to this method,
it will overwrite the previous version in the pending transactions.
"""
entity_id = get_identifier_or_fetch_from_skuid(entity)
LOGGER.debug(
"Updating entity with ID %s in the REST repository transactions", entity_id
)
self.transactions[entity_id] = entity
return entity
[docs]
def query(self, qry_params: QueryParams) -> List[T]:
"""
Queries the ODA API with the parameters
See :func:`~ska_db_oda.domain.repository.AbstractRepository.query` docstring for details
"""
param_string = self.get_param_string(qry_params)
query_uri = self._resource_rest_uri + param_string
try:
response = requests.get(query_uri)
except requests.RequestException as err:
msg = f"Error retrieving query from {query_uri}: {err.args}"
LOGGER.exception(msg)
raise OSError(msg) from err
if response.status_code == HTTPStatus.OK:
# The REST API will return a json list of objects - the TypeAdapter will deserialise this list
return [
self._rest_mapping.deserialise(json.dumps(entity_obj))
for entity_obj in response.json()
]
# ta = TypeAdapter(List[OSOExecutionBlock])
# return ta.validate_python(response.json())
raise OSError(response.content)
def get_param_string(self, qry_params: QueryParams):
if isinstance(qry_params, UserQuery):
if qry_params.entity_id:
return f"?entity_id={qry_params.entity_id}&match_type={qry_params.match_type.value}"
return f"?user={qry_params.user}&match_type={qry_params.match_type.value}"
if isinstance(qry_params, DateQuery):
match qry_params.query_type:
case DateQuery.QueryType.MODIFIED_BETWEEN:
field = "last_modified"
case DateQuery.QueryType.CREATED_BETWEEN:
field = "created"
case _:
raise ValueError(f"Unsupported query type {qry_params.query_type}")
if qry_params.start:
param_string = f"?{field}_after={qry_params.start.isoformat()}"
if qry_params.end:
param_string += f"&{field}_before={qry_params.end.isoformat()}"
return param_string
if qry_params.end:
return f"?{field}_before={qry_params.end.isoformat()}"
raise ValueError(
f"Unsupported query parameters {qry_params.__class__.__name__}"
)
def _get_latest_metadata(self, entity: OSOEntity) -> Optional[Metadata]:
"""
This method is deliberately not implemented, as it is used to update metadata,
and the RESTRepository makes requests to the API which uses another
repository implementation which will do the update.
"""
raise NotImplementedError
def _uri_from_entity_id(self, entity_id: U) -> str:
"""
Returns the full REST URI for the entity_id,
eg http://1.1.1.1/ska-db-oda/oda/api/v1/sbds/sbd-mvp01-20220713-00011
"""
return f"{self._resource_rest_uri}/{entity_id}"
def __contains__(self, entity_id: U):
"""
Checks if the entity_id is in the Repository - useful for tests.
"""
try:
self.read(entity_id)
return True
except KeyError:
return False