Source code for ska_db_oda.unit_of_work.restunitofwork

"""
This module contains the implementation of the AbstractUnitOfWork Class which connects to a remote ODA API.
"""
# We mimic the database commit/rollback by accessing the transactions inside the repository
# pylint: disable=protected-access
import json
import logging
from http import HTTPStatus
from os import getenv
from typing import Optional

import requests

from ska_db_oda.domain import CODEC
from ska_db_oda.domain.repository import (
    ExecutionBlockRepository,
    ProjectRepository,
    ProposalRepository,
    SBDefinitionRepository,
    SBInstanceRepository,
)
from ska_db_oda.infrastructure.rest.mapping import (
    ExecutionBlockMapping,
    ProjectMapping,
    ProposalMapping,
    SBDefinitionMapping,
    SBInstanceMapping,
)
from ska_db_oda.infrastructure.rest.repository import RESTBridge
from ska_db_oda.unit_of_work.abstractunitofwork import AbstractUnitOfWork

LOGGER = logging.getLogger(__name__)


[docs] class RESTUnitOfWork(AbstractUnitOfWork): """ Implementation of the AbstractUnitOfWork which connects with the ska-db-oda API over the network """ def __init__(self, rest_uri: Optional[str] = None): if rest_uri is None: rest_uri = getenv("ODA_URL") if not rest_uri: raise KeyError("ODA_URL environment variable is not set.") self._rest_uri = rest_uri def __enter__(self): self.sbds = SBDefinitionRepository( RESTBridge(SBDefinitionMapping(), self._rest_uri) ) self.sbis = SBInstanceRepository( RESTBridge(SBInstanceMapping(), self._rest_uri) ) self.ebs = ExecutionBlockRepository( RESTBridge(ExecutionBlockMapping(), self._rest_uri) ) self.prjs = ProjectRepository(RESTBridge(ProjectMapping(), self._rest_uri)) self.prsls = ProposalRepository(RESTBridge(ProposalMapping(), self._rest_uri)) return super().__enter__()
[docs] def commit(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.commit` docstring for details """ def get_entity_payload(repo_bridge: RESTBridge): return [ json.loads(CODEC.dumps(entity)) for entity in repo_bridge.transactions.values() ] payload = dict( sbds=get_entity_payload(self.sbds._bridge), sbis=get_entity_payload(self.sbis._bridge), ebs=get_entity_payload(self.ebs._bridge), prjs=get_entity_payload(self.prjs._bridge), prsls=get_entity_payload(self.prsls._bridge), ) try: LOGGER.debug("Sending PUT request to %s", self._rest_uri) response = requests.put( f"{self._rest_uri}", data=json.dumps(payload), headers={"Content-type": "application/json"}, ) except requests.RequestException as err: msg = f"Error PUTting entities to {self._rest_uri}: {err.args}" LOGGER.exception(msg) raise OSError(msg) from err if response.status_code in [ HTTPStatus.UNPROCESSABLE_ENTITY, HTTPStatus.FORBIDDEN, ]: raise ValueError(json.loads(response.content)["detail"]) if response.status_code != HTTPStatus.OK: raise RuntimeError(response.content) self._clear_transactions()
[docs] def rollback(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.rollback` docstring for details """ self._clear_transactions()
def _clear_transactions(self) -> None: """ Clears the pending transactions from the Repositories """ self.sbds._bridge.transactions.clear() self.sbis._bridge.transactions.clear() self.ebs._bridge.transactions.clear() self.prjs._bridge.transactions.clear() self.prsls._bridge.transactions.clear()