"""
This module contains implementation of the AbstractRepository class, using memory.
Useful for development and tests as it doee not require a heavyweight database implementation.
"""
import logging
from typing import Dict, List, Optional, TypeVar
from ska_oso_pdm import Metadata
from ska_db_oda.domain import (
OSOEntity,
get_identifier,
get_identifier_or_fetch_from_skuid,
)
from ska_db_oda.domain.query import QueryParams
from ska_db_oda.domain.repository import RepositoryBridge
from ska_db_oda.infrastructure.filesystem.repository import QueryFilterFactory
LOGGER = logging.getLogger(__name__)
T = TypeVar("T", bound=OSOEntity)
U = TypeVar("U")
[docs]
class MemoryBridge(RepositoryBridge[T, U]):
"""
Implementation of the Repository bridge which persists entities in memory.
Entities will be stored in a nested dict:
`{<entity_id>: {<version>: <entity>}}`
"""
def __init__(self, entity_dict: Dict[U, Dict[int, T]]):
self._entity_dict: Dict[U, Dict[int, T]] = entity_dict
self._transactions: Dict[U, Dict[int, T]] = {}
def __len__(self):
return sum(len(versions) for versions in self._entity_dict.values())
def __contains__(self, entity_id: U):
return entity_id in self._entity_dict or entity_id in self._transactions
[docs]
def create(self, entity: T) -> T:
"""Implementation of the RepositoryBridge method.
See :func:`~ska_db_oda.domain.repository.RepositoryBridge.create` docstring for details
"""
entity_id = get_identifier_or_fetch_from_skuid(entity)
entity = self.update_metadata(entity)
LOGGER.debug("Adding entity with ID %s to the memory transactions", entity_id)
try:
self._transactions[entity_id][entity.metadata.version] = entity
except KeyError:
self._transactions[entity_id] = {entity.metadata.version: entity}
return entity
[docs]
def read(self, entity_id: U) -> T:
"""Implementation of the RepositoryBridge method.
See :func:`~ska_db_oda.domain.repository.RepositoryBridge.read` docstring for details
"""
LOGGER.debug("Getting entity with ID %s from memory", entity_id)
if entity_id in self._transactions:
pending_versions = self._transactions[entity_id]
return pending_versions[max(pending_versions.keys())]
versions = self._entity_dict[entity_id]
return versions[max(versions.keys())]
[docs]
def update(self, entity: T) -> T:
"""Implementation of the RepositoryBridge method.
See :func:`~ska_db_oda.domain.repository.RepositoryBridge.update` docstring for details
"""
entity = self._set_new_metadata(entity)
entity_id = get_identifier_or_fetch_from_skuid(entity)
LOGGER.debug("Updating entity with ID %s in the memory transactions", entity_id)
try:
self._transactions[entity_id][1] = entity
except KeyError:
self._transactions[entity_id] = {1: entity}
return entity
[docs]
def query(self, qry_params: QueryParams) -> List[T]:
# This implementation follows the same strategy as the Filesystem query implementation,
# using the QueryFilterFactory and then applying the filters to all the available entity ids.
final_result = []
entities = list(self._entity_dict.keys()) + list(self._transactions.keys())
filter_fns = QueryFilterFactory.filter_functions_for_query(qry_params)
for entity_id in entities:
entity = self.read(entity_id)
if all(fn(entity) for fn in filter_fns):
final_result.append(entity)
return final_result
def _get_latest_metadata(self, entity: T) -> Optional[Metadata]:
"""Implementation of the abstract MetaDataMixin method for a memory backend.
See :func:`~ska_db_oda.domain.metadatamixin.MetadataMixin._get_latest_metadata` docstring for details
"""
try:
return self.read(get_identifier(entity)).metadata
except KeyError:
return None