import logging
import os
from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool
from ska_db_oda.persistence.domain.repository import (
EntityRelationshipRepository,
ExecutionBlockRepository,
ExecutionBlockStatusHistoryRepository,
PanelDecisionRepository,
PanelRepository,
PanelReviewRepository,
ProjectRepository,
ProjectStatusHistoryRepository,
ProposalAccessRepository,
ProposalRepository,
SBDefinitionRepository,
SBDefinitionStatusHistoryRepository,
SBInstanceRepository,
SBInstanceStatusHistoryRepository,
)
from ska_db_oda.persistence.infrastructure.postgres.mapping import (
EntityRelationshipMapping,
ExecutionBlockMapping,
OSOExecutionBlockStatusHistoryMapping,
PanelDecisionMapping,
PanelMapping,
PanelReviewMapping,
ProjectMapping,
ProjectStatusHistoryMapping,
ProposalAccessMapping,
ProposalMapping,
SBDefinitionMapping,
SBDefinitionStatusHistoryMapping,
SBInstanceMapping,
SBInstanceStatusHistoryMapping,
)
from ska_db_oda.persistence.infrastructure.postgres.repository import PostgresBridge
from ska_db_oda.persistence.unitofwork.abstractunitofwork import AbstractUnitOfWork
LOGGER = logging.getLogger(__name__)
def create_connection_pool() -> ConnectionPool:
conninfo = (
f"host={os.environ.get('PGHOST')} "
f"port={os.environ.get('PGPORT', '5432')} "
f"dbname={os.environ.get('PGDATABASE', 'oda')} "
f"user={os.environ.get('PGUSER', 'oda_admin')} "
f"password={os.environ.get('PGPASSWORD')}"
)
connect_kwargs = {"row_factory": dict_row}
return ConnectionPool(conninfo, kwargs=connect_kwargs)
[docs]
class PostgresUnitOfWork(AbstractUnitOfWork):
"""
A PostgreSQL implementation of the UoW which persists data in an instance of
PostgreSQL specified in the initialisation config
"""
def __init__(
self,
connection_pool: ConnectionPool = None,
):
"""
In production applications, is it recommended to have a single psycopg ConnectionPool.
It is therefore expected that the same object session will be injected into all the UoW instances.
"""
if connection_pool is None:
connection_pool = create_connection_pool()
self._connection_pool = connection_pool
# pylint: disable=attribute-defined-outside-init
def __enter__(self):
conn = self._connection_pool.getconn()
self._conn = conn
self.sbds = SBDefinitionRepository(PostgresBridge(SBDefinitionMapping(), conn))
self.sbis = SBInstanceRepository(PostgresBridge(SBInstanceMapping(), conn))
self.sbds_status_history = SBDefinitionStatusHistoryRepository(
PostgresBridge(SBDefinitionStatusHistoryMapping(), conn)
)
self.sbis_status_history = SBInstanceStatusHistoryRepository(
PostgresBridge(SBInstanceStatusHistoryMapping(), conn)
)
self.ebs_status_history = ExecutionBlockStatusHistoryRepository(
PostgresBridge(OSOExecutionBlockStatusHistoryMapping(), conn)
)
self.ebs = ExecutionBlockRepository(
PostgresBridge(ExecutionBlockMapping(), conn)
)
self.prjs = ProjectRepository(PostgresBridge(ProjectMapping(), conn))
self.prjs_status_history = ProjectStatusHistoryRepository(
PostgresBridge(ProjectStatusHistoryMapping(), conn)
)
self.prsls = ProposalRepository(PostgresBridge(ProposalMapping(), conn))
self.pnlds = PanelDecisionRepository(
PostgresBridge(PanelDecisionMapping(), conn)
)
self.rvws = PanelReviewRepository(PostgresBridge(PanelReviewMapping(), conn))
self.panels = PanelRepository(PostgresBridge(PanelMapping(), conn))
self.prslacc = ProposalAccessRepository(
PostgresBridge(ProposalAccessMapping(), conn)
)
self.entity_relationship = EntityRelationshipRepository(
PostgresBridge(EntityRelationshipMapping(), conn)
)
return super().__enter__()
[docs]
def commit(self) -> None:
"""Implementation of the AbstractUnitOfWork method.
See :func:`~ska_db_oda.persistence.unitofwork.abstractunitofwork.AbstractUnitOfWork.commit` docstring for details
"""
self._conn.commit()
[docs]
def rollback(self) -> None:
"""Implementation of the AbstractUnitOfWork method.
See :func:`~ska_db_oda.persistence.unitofwork.abstractunitofwork.AbstractUnitOfWork.rollback` docstring for details
"""
self._conn.rollback()
self._connection_pool.putconn(self._conn)