import logging
import os
from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool
from ska_db_oda.domain.repository import (
ExecutionBlockRepository,
ProjectRepository,
ProposalRepository,
SBDefinitionRepository,
SBInstanceRepository,
)
from ska_db_oda.infrastructure.postgres.mapping import (
ExecutionBlockMapping,
ProjectMapping,
ProposalMapping,
SBDefinitionMapping,
SBInstanceMapping,
)
from ska_db_oda.infrastructure.postgres.repository import PostgresBridge
from ska_db_oda.unit_of_work.abstractunitofwork import AbstractUnitOfWork
LOGGER = logging.getLogger(__name__)
def create_connection_pool() -> ConnectionPool:
conninfo = (
f"host={os.environ.get('POSTGRES_HOST')} "
f"port={os.environ.get('POSTGRES_PORT', '5432')} "
f"dbname={os.environ.get('POSTGRES_DB_NAME', 'postgres')} "
f"user={os.environ.get('ADMIN_POSTGRES_USER', 'postgres')} "
f"password={os.environ.get('ADMIN_POSTGRES_PASSWORD')}"
)
connect_kwargs = {"row_factory": dict_row}
# conninfo = "host=localhost port=5432 dbname=postgres user=postgres password=oda"
return ConnectionPool(conninfo, kwargs=connect_kwargs)
[docs]
class PostgresUnitOfWork(AbstractUnitOfWork):
"""
A PostgreSQL implementation of the UoW which persists data in an instance of
PostgreSQL specified in the initialisation config
"""
def __init__(
self,
connection_pool: ConnectionPool = None,
):
"""
In production applications, is it recommended to have a single psycopg ConnectionPool.
It is therefore expected that the same object session will be injected into all the UoW instances.
"""
if connection_pool is None:
connection_pool = create_connection_pool()
self._connection_pool = connection_pool
def __enter__(self):
conn = self._connection_pool.getconn()
self._conn = conn # pylint: disable=attribute-defined-outside-init
self.sbds = SBDefinitionRepository(PostgresBridge(SBDefinitionMapping(), conn))
self.sbis = SBInstanceRepository(PostgresBridge(SBInstanceMapping(), conn))
self.ebs = ExecutionBlockRepository(
PostgresBridge(ExecutionBlockMapping(), conn)
)
self.prjs = ProjectRepository(PostgresBridge(ProjectMapping(), conn))
self.prsls = ProposalRepository(PostgresBridge(ProposalMapping(), conn))
return super().__enter__()
[docs]
def commit(self) -> None:
"""Implementation of the AbstractUnitOfWork method.
See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.commit` docstring for details
"""
self._conn.commit()
[docs]
def rollback(self) -> None:
"""Implementation of the AbstractUnitOfWork method.
See :func:`~ska_db_oda.unit_of_work.abstractunitofwork.AbstractUnitOfWork.rollback` docstring for details
"""
self._conn.rollback()
self._connection_pool.putconn(self._conn)