"""Base repository class and interfaces."""
from typing import Generic, TypeVar
from psycopg import Connection
from ska_db_oda.postgres.postgres_persistence import PostgresPersistence
from ska_db_oda.repository.domain.query import QueryParams
T = TypeVar("T")
U = TypeVar("U")
[docs]
class AbstractRepository(Generic[T, U]):
"""
Define the interface for users to add and retrieve entities from the ODA.
It is expected to be extended as SBDefinitionRepository, SBInstanceRepository, etc.
"""
postgres: PostgresPersistence
conn: Connection
def __init__(self, conn: Connection):
self.conn = conn
def __contains__(self, entity_id: U):
return entity_id in self.postgres
def __len__(self):
return len(self.postgres)
[docs]
def add(self, entity: T, user: str = None) -> T:
"""
Store the entity in the ODA.
:raises ODAError: if an error occurs while persisting the entity
:return: the entity as it exists in the ODA, eg with updated metadata
"""
return self.postgres.upsert(entity, user)
[docs]
def get(
self,
entity_id: U,
) -> T:
"""
Retrieve the latest version of the entity with the given id from the ODA.
:param entity_id: entity_id of the entity.
:raises ODANotFound: if the entity_id is not found in the database
:raises ODAError: if an error occurs while retrieving the entity
"""
return self.postgres.read(entity_id)
[docs]
def get_relationship(self, entity_id: U, parent_entity: U, associated_entity: U) -> T:
"""
Retrieve the entity data associated with the given parent entity ID from the ODA.
:raises ODANotFound: if the sbd_id is not found in the repository
:raises ODAError: if an error occurs while retrieving the SBD
:param entity_id: entity id to be search.
:param parent_entity: primary entity name to be join.
:param associated_entity: secondary entity name to be join.
"""
return self.postgres.read_relationship(entity_id, parent_entity, associated_entity)
[docs]
def query(self, qry_param: QueryParams) -> list[T]:
"""
Query the latest version of the entity based on QueryParams class from the ODA.
Returns the corresponding entities. Returns an empty list if no entities in the
repository match the parameters.
:raises ValueError: if the qry_params are not supported
:raises ODAError: if an error occurs while querying the entity
"""
return self.postgres.query(qry_param)
class AbstractVersionedRepository(AbstractRepository[T, U]):
"""
Extended Repository that supports version history retrieval.
In addition to the standard CRUD operations, this repository allows
retrieving specific historical versions of entities.
"""
def get_version(self, entity_id: U, version: int) -> T:
"""
Retrieve a specific version of the entity with the given id from the ODA.
First checks if the requested version is the current version in the main table.
If not, queries the history table for the specific version.
:param entity_id: entity_id of the entity
:param version: the specific version number to retrieve
:raises ODANotFound: if the entity_id or version is not found in the database
:raises ODAError: if an error occurs while retrieving the entity
:raises AttributeError: if the repository's mapping does not support version history
"""
return self.postgres.read_version(entity_id, version)