Source code for ska_db_oda.unitofwork.postgresunitofwork

# pylint: disable=no-member
import logging
import os

import psycopg
from psycopg.rows import dict_row

from ska_db_oda.repository import (
    EntityRelationshipRepository,
    ExecutionBlockRepository,
    PanelDecisionRepository,
    PanelRepository,
    PanelReviewRepository,
    ProjectRepository,
    ProposalAccessRepository,
    ProposalRepository,
    SBDefinitionRepository,
    SBInstanceRepository,
)
from ska_db_oda.repository.status import StatusRepository
from ska_db_oda.unitofwork.abstractunitofwork import AbstractUnitOfWork
from ska_db_oda.unitofwork.application_name import get_application_name

LOGGER = logging.getLogger(__name__)


def get_conninfo() -> str:
    """Build the PostgreSQL connection string from environment variables."""
    conninfo = (
        f"host={os.environ.get('PGHOST')} "
        f"port={os.environ.get('PGPORT', '5432')} "
        f"dbname={os.environ.get('PGDATABASE', 'oda')} "
        f"user={os.environ.get('PGUSER', 'oda_admin')} "
        f"password={os.environ.get('PGPASSWORD')}"
    )

    # Only include application_name if it was determined. It SHOULD always be
    # defined, unless somehow the hostname fallback fails.
    if app_name := get_application_name():
        conninfo += f" application_name={app_name}"

    return conninfo


[docs] class PostgresUnitOfWork(AbstractUnitOfWork): """A PostgreSQL implementation of the UoW. Persists data in an instance of PostgreSQL specified in the initialisation config. """ def __init__(self): """Create a new PostgresUnitOfWork. A database connection will be created when entering the context manager and closed when exiting. """ self._conn = None self.status = None def __enter__(self): # Create a new connection for this unit of work conn = psycopg.connect(get_conninfo(), row_factory=dict_row) self._conn = conn self.sbds = SBDefinitionRepository(conn) self.sbis = SBInstanceRepository(conn) self.ebs = ExecutionBlockRepository(conn) self.prjs = ProjectRepository(conn) self.prsls = ProposalRepository(conn) self.pnlds = PanelDecisionRepository(conn) self.rvws = PanelReviewRepository(conn) self.panels = PanelRepository(conn) self.prslacc = ProposalAccessRepository(conn) self.entity_relationship = EntityRelationshipRepository(conn) self.status = StatusRepository(conn) return super().__enter__()
[docs] def commit(self) -> None: """Implement the AbstractUnitOfWork method. See :func:`~ska_db_oda.unitofwork.abstractunitofwork. AbstractUnitOfWork.commit` docstring for details """ self._conn.commit()
[docs] def rollback(self) -> None: """Implement the AbstractUnitOfWork method. See :func:`~ska_db_oda.unitofwork.abstractunitofwork. AbstractUnitOfWork.rollback` docstring for details """ self._conn.rollback()