Source code for ska_db_oda.infrastructure.memory.repository

"""
This module contains implementation of the AbstractRepository class, using memory.
Useful for development and tests as it doee not require a heavyweight database implementation.
"""
import logging
from typing import Dict, List, Optional, TypeVar

from ska_oso_pdm import Metadata

from ska_db_oda.domain import (
    OSOEntity,
    get_identifier,
    get_identifier_or_fetch_from_skuid,
)
from ska_db_oda.domain.query import QueryParams
from ska_db_oda.domain.repository import RepositoryBridge
from ska_db_oda.infrastructure.filesystem.repository import QueryFilterFactory

LOGGER = logging.getLogger(__name__)

T = TypeVar("T", bound=OSOEntity)
U = TypeVar("U")


[docs] class MemoryBridge(RepositoryBridge[T, U]): """ Implementation of the Repository bridge which persists entities in memory. Entities will be stored in a nested dict: `{<entity_id>: {<version>: <entity>}}` """ def __init__(self, entity_dict: Dict[U, Dict[int, T]]): self._entity_dict: Dict[U, Dict[int, T]] = entity_dict self._transactions: Dict[U, Dict[int, T]] = {} def __len__(self): return sum(len(versions) for versions in self._entity_dict.values()) def __contains__(self, entity_id: U): return entity_id in self._entity_dict or entity_id in self._transactions
[docs] def create(self, entity: T) -> T: """Implementation of the RepositoryBridge method. See :func:`~ska_db_oda.domain.repository.RepositoryBridge.create` docstring for details """ entity_id = get_identifier_or_fetch_from_skuid(entity) entity = self.update_metadata(entity) LOGGER.debug("Adding entity with ID %s to the memory transactions", entity_id) try: self._transactions[entity_id][entity.metadata.version] = entity except KeyError: self._transactions[entity_id] = {entity.metadata.version: entity} return entity
[docs] def read(self, entity_id: U) -> T: """Implementation of the RepositoryBridge method. See :func:`~ska_db_oda.domain.repository.RepositoryBridge.read` docstring for details """ LOGGER.debug("Getting entity with ID %s from memory", entity_id) if entity_id in self._transactions: pending_versions = self._transactions[entity_id] return pending_versions[max(pending_versions.keys())] versions = self._entity_dict[entity_id] return versions[max(versions.keys())]
[docs] def update(self, entity: T) -> T: """Implementation of the RepositoryBridge method. See :func:`~ska_db_oda.domain.repository.RepositoryBridge.update` docstring for details """ entity = self._set_new_metadata(entity) entity_id = get_identifier_or_fetch_from_skuid(entity) LOGGER.debug("Updating entity with ID %s in the memory transactions", entity_id) try: self._transactions[entity_id][1] = entity except KeyError: self._transactions[entity_id] = {1: entity} return entity
[docs] def query(self, qry_params: QueryParams) -> List[T]: # This implementation follows the same strategy as the Filesystem query implementation, # using the QueryFilterFactory and then applying the filters to all the available entity ids. final_result = [] entities = list(self._entity_dict.keys()) + list(self._transactions.keys()) filter_fns = QueryFilterFactory.filter_functions_for_query(qry_params) for entity_id in entities: entity = self.read(entity_id) if all(fn(entity) for fn in filter_fns): final_result.append(entity) return final_result
def _get_latest_metadata(self, entity: T) -> Optional[Metadata]: """Implementation of the abstract MetaDataMixin method for a memory backend. See :func:`~ska_db_oda.domain.metadatamixin.MetadataMixin._get_latest_metadata` docstring for details """ try: return self.read(get_identifier(entity)).metadata except KeyError: return None