"""Base repository class and interfaces."""
from typing import Generic, TypeVar
from psycopg import Connection
from ska_db_oda.postgres.postgres_persistence import PostgresPersistence
from ska_db_oda.repository.domain.query import QueryParams
from ska_db_oda.repository.domain.relationship import EntityRelationship
T = TypeVar("T")
U = TypeVar("U")
[docs]
class AbstractRepository(Generic[T, U]):
"""
Define the interface for users to add and retrieve entities from the ODA.
It is expected to be extended as SBDefinitionRepository, SBInstanceRepository, etc.
"""
postgres: PostgresPersistence
conn: Connection
def __init__(self, conn: Connection):
self.conn = conn
def __contains__(self, entity_id: U):
return entity_id in self.postgres
def __len__(self):
return len(self.postgres)
[docs]
def add(self, entity: T, user: str = None) -> T:
"""
Store the entity in the ODA.
:raises ODAError: if an error occurs while persisting the entity
:return: the entity as it exists in the ODA, eg with updated metadata
"""
return self.postgres.upsert(entity, user)
[docs]
def get(
self,
entity_id: U,
) -> T:
"""
Retrieve the latest version of the entity with the given id from the ODA.
:param entity_id: entity_id of the entity.
:raises ODANotFound: if the entity_id is not found in the database
:raises ODAError: if an error occurs while retrieving the entity
"""
return self.postgres.read(entity_id)
[docs]
def get_relationship(self, entity_id: U) -> EntityRelationship:
"""
Retrieve related entity IDs for the given entity from the ODA.
Returns flat lists of related entity IDs grouped by entity type.
:param entity_id: SKUID of the entity to look up
:raises ODANotFound: If the entity_id is not found in the repository
:raises ODAError: If any database errors happen during retrieval
:return: Dict with entity_id, entity_type, and related_entities
"""
return self.postgres.read_relationship(entity_id)
[docs]
def query(self, qry_param: QueryParams) -> list[T]:
"""
Query the latest version of the entity based on QueryParams class from the ODA.
Returns the corresponding entities. Returns an empty list if no entities in the
repository match the parameters.
:raises ValueError: if the qry_params are not supported
:raises ODAError: if an error occurs while querying the entity
"""
return self.postgres.query(qry_param)
class AbstractVersionedRepository(AbstractRepository[T, U]):
"""
Extended Repository that supports version history retrieval.
In addition to the standard CRUD operations, this repository allows
retrieving specific historical versions of entities.
"""
def get_version(self, entity_id: U, version: int) -> T:
"""
Retrieve a specific version of the entity with the given id from the ODA.
First checks if the requested version is the current version in the main table.
If not, queries the history table for the specific version.
:param entity_id: entity_id of the entity
:param version: the specific version number to retrieve
:raises ODANotFound: if the entity_id or version is not found in the database
:raises ODAError: if an error occurs while retrieving the entity
:raises AttributeError: if the repository's mapping does not support version history
"""
return self.postgres.read_version(entity_id, version)