Source code for ska_db_oda.unit_of_work.postgresunitofwork

import logging
import os

from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool

from ska_db_oda.domain.repository import (
    ExecutionBlockRepository,
    ProjectRepository,
    ProposalRepository,
    SBDefinitionRepository,
    SBInstanceRepository,
)
from ska_db_oda.infrastructure.postgres.mapping import (
    ExecutionBlockMapping,
    ProjectMapping,
    ProposalMapping,
    SBDefinitionMapping,
    SBInstanceMapping,
)
from ska_db_oda.infrastructure.postgres.repository import PostgresBridge
from ska_db_oda.unit_of_work.abstractunitofwork import AbstractUnitOfWork

LOGGER = logging.getLogger(__name__)


def create_connection_pool() -> ConnectionPool:
    conninfo = (
        f"host={os.environ.get('POSTGRES_HOST')} "
        f"port={os.environ.get('POSTGRES_PORT', '5432')} "
        f"dbname={os.environ.get('POSTGRES_DB_NAME', 'postgres')} "
        f"user={os.environ.get('ADMIN_POSTGRES_USER', 'postgres')} "
        f"password={os.environ.get('ADMIN_POSTGRES_PASSWORD')}"
    )

    connect_kwargs = {"row_factory": dict_row}
    # conninfo = "host=localhost port=5432 dbname=postgres user=postgres password=oda"
    return ConnectionPool(conninfo, kwargs=connect_kwargs)


[docs] class PostgresUnitOfWork(AbstractUnitOfWork): """ A PostgreSQL implementation of the UoW which persists data in an instance of PostgreSQL specified in the initialisation config """ def __init__( self, connection_pool: ConnectionPool = None, ): """ In production applications, is it recommended to have a single psycopg ConnectionPool. It is therefore expected that the same object session will be injected into all the UoW instances. """ if connection_pool is None: connection_pool = create_connection_pool() self._connection_pool = connection_pool def __enter__(self): conn = self._connection_pool.getconn() self._conn = conn # pylint: disable=attribute-defined-outside-init self.sbds = SBDefinitionRepository(PostgresBridge(SBDefinitionMapping(), conn)) self.sbis = SBInstanceRepository(PostgresBridge(SBInstanceMapping(), conn)) self.ebs = ExecutionBlockRepository( PostgresBridge(ExecutionBlockMapping(), conn) ) self.prjs = ProjectRepository(PostgresBridge(ProjectMapping(), conn)) self.prsls = ProposalRepository(PostgresBridge(ProposalMapping(), conn)) return super().__enter__()
[docs] def commit(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.commit` docstring for details """ self._conn.commit()
[docs] def rollback(self) -> None: """Implementation of the AbstractUnitOfWork method. See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.rollback` docstring for details """ self._conn.rollback() self._connection_pool.putconn(self._conn)