Source code for ska_db_oda.unitofwork.postgresunitofwork
# pylint: disable=no-member
import logging
import os
import psycopg
from psycopg.rows import dict_row
from ska_db_oda.repository import (
EntityRelationshipRepository,
ExecutionBlockRepository,
PanelDecisionRepository,
PanelRepository,
PanelReviewRepository,
ProjectRepository,
ProposalAccessRepository,
ProposalRepository,
SBDefinitionRepository,
SBInstanceRepository,
)
from ska_db_oda.repository.status import StatusRepository
from ska_db_oda.unitofwork.abstractunitofwork import AbstractUnitOfWork
from ska_db_oda.unitofwork.application_name import get_application_name
LOGGER = logging.getLogger(__name__)
def get_conninfo() -> str:
"""Build the PostgreSQL connection string from environment variables."""
conninfo = (
f"host={os.environ.get('PGHOST')} "
f"port={os.environ.get('PGPORT', '5432')} "
f"dbname={os.environ.get('PGDATABASE', 'oda')} "
f"user={os.environ.get('PGUSER', 'oda_admin')} "
f"password={os.environ.get('PGPASSWORD')}"
)
# Only include application_name if it was determined. It SHOULD always be
# defined, unless somehow the hostname fallback fails.
if app_name := get_application_name():
conninfo += f" application_name={app_name}"
return conninfo
[docs]
class PostgresUnitOfWork(AbstractUnitOfWork):
"""A PostgreSQL implementation of the UoW.
Persists data in an instance of PostgreSQL specified in the
initialisation config.
"""
def __init__(self):
"""Create a new PostgresUnitOfWork.
A database connection will be created when entering the context
manager and closed when exiting.
"""
self._conn = None
self.status = None
def __enter__(self):
# Create a new connection for this unit of work
conn = psycopg.connect(get_conninfo(), row_factory=dict_row)
self._conn = conn
self.sbds = SBDefinitionRepository(conn)
self.sbis = SBInstanceRepository(conn)
self.ebs = ExecutionBlockRepository(conn)
self.prjs = ProjectRepository(conn)
self.prsls = ProposalRepository(conn)
self.pnlds = PanelDecisionRepository(conn)
self.rvws = PanelReviewRepository(conn)
self.panels = PanelRepository(conn)
self.prslacc = ProposalAccessRepository(conn)
self.entity_relationship = EntityRelationshipRepository(conn)
self.status = StatusRepository(conn)
return super().__enter__()
[docs]
def commit(self) -> None:
"""Implement the AbstractUnitOfWork method.
See :func:`~ska_db_oda.unitofwork.abstractunitofwork.
AbstractUnitOfWork.commit` docstring for details
"""
self._conn.commit()
[docs]
def rollback(self) -> None:
"""Implement the AbstractUnitOfWork method.
See :func:`~ska_db_oda.unitofwork.abstractunitofwork.
AbstractUnitOfWork.rollback` docstring for details
"""
self._conn.rollback()