Source code for ska_db_oda.persistence.unitofwork.postgresunitofwork

import logging
import os

from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool

from ska_db_oda.persistence.domain.repository import (
    EntityRelationshipRepository,
    ExecutionBlockRepository,
    ExecutionBlockStatusHistoryRepository,
    PanelDecisionRepository,
    PanelRepository,
    PanelReviewRepository,
    ProjectRepository,
    ProjectStatusHistoryRepository,
    ProposalAccessRepository,
    ProposalRepository,
    SBDefinitionRepository,
    SBDefinitionStatusHistoryRepository,
    SBInstanceRepository,
    SBInstanceStatusHistoryRepository,
)
from ska_db_oda.persistence.infrastructure.postgres.mapping import (
    EntityRelationshipMapping,
    ExecutionBlockMapping,
    OSOExecutionBlockStatusHistoryMapping,
    PanelDecisionMapping,
    PanelMapping,
    PanelReviewMapping,
    ProjectMapping,
    ProjectStatusHistoryMapping,
    ProposalAccessMapping,
    ProposalMapping,
    SBDefinitionMapping,
    SBDefinitionStatusHistoryMapping,
    SBInstanceMapping,
    SBInstanceStatusHistoryMapping,
)
from ska_db_oda.persistence.infrastructure.postgres.repository import PostgresBridge
from ska_db_oda.persistence.unitofwork.abstractunitofwork import AbstractUnitOfWork

LOGGER = logging.getLogger(__name__)


def create_connection_pool() -> ConnectionPool:
    conninfo = (
        f"host={os.environ.get('PGHOST')} "
        f"port={os.environ.get('PGPORT', '5432')} "
        f"dbname={os.environ.get('PGDATABASE', 'oda')} "
        f"user={os.environ.get('PGUSER', 'oda_admin')} "
        f"password={os.environ.get('PGPASSWORD')}"
    )

    connect_kwargs = {"row_factory": dict_row}

    return ConnectionPool(conninfo, kwargs=connect_kwargs)


[docs] class PostgresUnitOfWork(AbstractUnitOfWork): """ A PostgreSQL implementation of the UoW which persists data in an instance of PostgreSQL specified in the initialisation config """ def __init__( self, connection_pool: ConnectionPool = None, ): """ In production applications, is it recommended to have a single psycopg ConnectionPool. It is therefore expected that the same object session will be injected into all the UoW instances. """ if connection_pool is None: connection_pool = create_connection_pool() self._connection_pool = connection_pool # pylint: disable=attribute-defined-outside-init def __enter__(self): conn = self._connection_pool.getconn() self._conn = conn self.sbds = SBDefinitionRepository(PostgresBridge(SBDefinitionMapping(), conn)) self.sbis = SBInstanceRepository(PostgresBridge(SBInstanceMapping(), conn)) self.sbds_status_history = SBDefinitionStatusHistoryRepository( PostgresBridge(SBDefinitionStatusHistoryMapping(), conn) ) self.sbis_status_history = SBInstanceStatusHistoryRepository( PostgresBridge(SBInstanceStatusHistoryMapping(), conn) ) self.ebs_status_history = ExecutionBlockStatusHistoryRepository( PostgresBridge(OSOExecutionBlockStatusHistoryMapping(), conn) ) self.ebs = ExecutionBlockRepository( PostgresBridge(ExecutionBlockMapping(), conn) ) self.prjs = ProjectRepository(PostgresBridge(ProjectMapping(), conn)) self.prjs_status_history = ProjectStatusHistoryRepository( PostgresBridge(ProjectStatusHistoryMapping(), conn) ) self.prsls = ProposalRepository(PostgresBridge(ProposalMapping(), conn)) self.pnlds = PanelDecisionRepository( PostgresBridge(PanelDecisionMapping(), conn) ) self.rvws = PanelReviewRepository(PostgresBridge(PanelReviewMapping(), conn)) self.panels = PanelRepository(PostgresBridge(PanelMapping(), conn)) self.prslacc = ProposalAccessRepository( PostgresBridge(ProposalAccessMapping(), conn) ) self.entity_relationship = EntityRelationshipRepository( PostgresBridge(EntityRelationshipMapping(), conn) ) return super().__enter__()
[docs] def commit(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.persistence.unitofwork.abstractunitofwork.AbstractUnitOfWork.commit` docstring for details """ self._conn.commit()
[docs] def rollback(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.persistence.unitofwork.abstractunitofwork.AbstractUnitOfWork.rollback` docstring for details """ self._conn.rollback() self._connection_pool.putconn(self._conn)