Source code for ska_db_oda.unit_of_work.memoryunitofwork

"""
This module contains the implementation of the AbstractUnitOfWork Class.
"""
# We mimic the database commit/rollback by accessing the transactions inside the repository
# pylint: disable=protected-access
import logging
from typing import Dict

from ska_oso_pdm import OSOExecutionBlock as ExecutionBlock
from ska_oso_pdm import Project, Proposal, SBDefinition, SBInstance

from ska_db_oda.domain.repository import (
    ExecutionBlockRepository,
    ProjectRepository,
    ProposalRepository,
    SBDefinitionRepository,
    SBInstanceRepository,
)
from ska_db_oda.infrastructure.memory.repository import MemoryBridge
from ska_db_oda.unit_of_work.abstractunitofwork import AbstractUnitOfWork

LOGGER = logging.getLogger(__name__)


class MemorySession:
    def __init__(self):
        self.sbd_dict: Dict[str, Dict[int, SBDefinition]] = {}
        self.sbi_dict: Dict[str, Dict[int, SBInstance]] = {}
        self.eb_dict: Dict[str, Dict[int, ExecutionBlock]] = {}
        self.prj_dict: Dict[str, Dict[int, Project]] = {}
        self.prsl_dict: Dict[str, Dict[int, Proposal]] = {}


[docs] class MemoryUnitOfWork(AbstractUnitOfWork): """ A lightweight non-persistent implementation of the AbstractUnitOfWork that can store and retrieve SchedulingBlock objects. Commits or rolls back a series of database transactions as an atomic operation. Changes between commits are tracked via the _transactions variable. """ def __init__(self, session: MemorySession = None): """ The underlying collection is injected in so the same object can be used across the application. For example, if a new UoW is created for each HTTP request then the underlying memory should be shared between them """ if session is None: session = MemorySession() self.session = session def __enter__(self): self.sbds = SBDefinitionRepository(MemoryBridge(self.session.sbd_dict)) self.sbis = SBInstanceRepository(MemoryBridge(self.session.sbi_dict)) self.ebs = ExecutionBlockRepository(MemoryBridge(self.session.eb_dict)) self.prjs = ProjectRepository(MemoryBridge(self.session.prj_dict)) self.prsls = ProposalRepository(MemoryBridge(self.session.prsl_dict)) return super().__enter__()
[docs] def commit(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.commit` docstring for details """ self._commit_transactions_for_repo(self.sbds._bridge) self._commit_transactions_for_repo(self.sbis._bridge) self._commit_transactions_for_repo(self.ebs._bridge) self._commit_transactions_for_repo(self.prjs._bridge) self._commit_transactions_for_repo(self.prsls._bridge)
[docs] def rollback(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.rollback` docstring for details """ self._rollback_transactions_for_repo(self.sbds._bridge) self._rollback_transactions_for_repo(self.sbis._bridge) self._rollback_transactions_for_repo(self.ebs._bridge) self._rollback_transactions_for_repo(self.prjs._bridge) self._rollback_transactions_for_repo(self.prsls._bridge)
def _commit_transactions_for_repo(self, repo_bridge: MemoryBridge): """ Mimics the commit of a database transaction by writing the entities that are 'pending' in the memory Repository """ for entity_id, version_dict in repo_bridge._transactions.items(): LOGGER.debug("Committing entity %s to memory", entity_id) if entity_id not in repo_bridge._entity_dict: repo_bridge._entity_dict[entity_id] = version_dict else: for version, entity in version_dict.items(): repo_bridge._entity_dict[entity_id][version] = entity repo_bridge._transactions.clear() def _rollback_transactions_for_repo(self, repo_bridge: MemoryBridge): """ Mimics the rollback of a database transaction by writing the entities that are 'pending' in the memory Repository """ repo_bridge._transactions.clear()