Source code for ska_db_oda.repository.status

"""
Status Repository Module.

Provides status management using Pydantic models and repository patterns.
Handles status tracking, history queries, and activity dashboard data retrieval.
"""

import logging

from psycopg import Connection
from ska_ser_skuid import short_skuid

from ska_db_oda.postgres.mapping import (
    INTERNAL_STATUS_TO_PUBLIC_LABEL,
    PREFIX_TO_TABLE,
    PUBLIC_LABEL_TO_INTERNAL_STATUS,
    InternalStatus,
    Status,
    StatusMapping,
)
from ska_db_oda.postgres.postgres_persistence import PostgresPersistence
from ska_db_oda.postgres.ptt_queries import (
    get_execution_blocks_query,
    get_projects_query_with_params,
    get_sb_definitions_query,
    get_sb_instances_query,
)
from ska_db_oda.postgres.sqlqueries import (
    select_current_status_with_filter_query,
    select_status_history_query,
    set_status_for_all_sbds_in_project,
    update_status_sql,
)
from ska_db_oda.repository.domain.errors import ODANotFound, QueryParameterError
from ska_db_oda.repository.domain.query import QueryParams, StatusHistoryQuery

LOGGER = logging.getLogger(__name__)


[docs] class StatusRepository(PostgresPersistence): """Status repository for managing entity status tracking and history.""" def __init__(self, connection: Connection): """Initialize StatusRepository with database connection. :param connection: PostgreSQL database connection """ self._status_mapping = StatusMapping() super().__init__(self._status_mapping, connection)
[docs] def mark_project_ready(self, project_id: str, updated_by: str) -> None: """ Flag all of the SBDs belonging to this project as APPROVED. We mark the SBDs rather than the project directly so that if any new SBDs are created later, they won't inherit an APPROVED status from their grandparent project. The status will bubble up and make the derived status of project APPROVED. :param: project_id: SKUID for the project. :param: updated_by: User ID of someone performing the update. :raises ODAError: If something goes wrong. """ query, params = set_status_for_all_sbds_in_project( project_id, InternalStatus.APPROVED, updated_by ) self._connection.execute(query, params)
[docs] def mark_project_draft(self, project_id: str, updated_by: str) -> None: """ Flag all of the SBDs belonging to this project as NEW (Draft). This marks SBDs directly because it's the inverse/undo of mark_project_ready() :param: project_id: SKUID for the project. :param: updated_by: User ID of someone performing the update. :raises ODAError: If something goes wrong. """ query, params = set_status_for_all_sbds_in_project( project_id, InternalStatus.NEW, updated_by ) self._connection.execute(query, params)
[docs] def update_status(self, entity_id: str, status: str, updated_by: str) -> Status: """Update status for an entity using raw SQL query with entity type mapping. :param entity_id: SKUID of the entity to update :param status: New status value (public label) :param updated_by: Username of the user updating the status :return: Updated status entity :raises ODAError: If status update fails :raises KeyError: If status is not a valid public label """ query, params = update_status_sql( entity_id, PUBLIC_LABEL_TO_INTERNAL_STATUS[status], updated_by ) self._connection.execute(query, params) return Status(entity_id=entity_id, status=status, updated_by=updated_by)
[docs] def get_status_history(self, qry_params: QueryParams) -> list[Status]: """Query status history using repository pattern. :param qry_params: QueryParams object (must be StatusHistoryQuery) :return: List of status entities matching the query :raises QueryParameterError: If qry_params is not a StatusHistoryQuery """ match qry_params: case StatusHistoryQuery(): return self._query_status_history(qry_params) case _: raise QueryParameterError(qry_params=qry_params)
[docs] def get_current_status(self, entity_id: str) -> Status: """Retrieve current status for an entity. :param entity_id: SKUID of the entity :return: Current status of the entity :raises ODANotFound: If entity has no current status """ query, params = select_current_status_with_filter_query( self._status_mapping.table_details, entity_id ) result = self._execute_and_return_row(query, params) if result is None: raise ODANotFound(identifier=entity_id) return self._status_mapping.result_to_entity(result)
[docs] def get_activity_dashboard_data(self, qry_params: QueryParams) -> tuple: """Retrieve hierarchical project data for activity dashboard. This method queries projects with filters, retrieves related SB definitions, instances, and execution blocks, maps internal status values to public labels, and returns raw query data (transformation happens in REST layer). :param qry_params: QueryParams object containing filter criteria (purpose, user, dates, etc.) :return: Tuple of (projects, sbds, sbis, ebs) lists with mapped status labels. Returns empty lists if no projects match the filters. """ cursor = self._connection.cursor() # Step 1: Get filtered projects projects_query, params = get_projects_query_with_params(qry_params) cursor.execute(projects_query, params) projects = cursor.fetchall() if not projects: return [], [], [], [] # Step 2: Get SB definitions for projects project_ids = [p["project_id"] for p in projects] cursor.execute(get_sb_definitions_query(project_ids), (project_ids,)) sbds = cursor.fetchall() # Step 3: Get SB instances for SB definitions sbd_ids = [s["sbd_id"] for s in sbds] if sbds else [] sbis = [] if sbd_ids: cursor.execute(get_sb_instances_query(sbd_ids), (sbd_ids,)) sbis = cursor.fetchall() # Step 4: Get execution blocks for SB instances sbi_ids = [s["sbi_id"] for s in sbis] if sbis else [] ebs = [] if sbi_ids: cursor.execute(get_execution_blocks_query(sbi_ids), (sbi_ids,)) ebs = cursor.fetchall() # Step 5: Convert bigint IDs to SKUID strings for caller for p in projects: p["project_id"] = short_skuid("prj", p["project_id"]) for s in sbds: s["sbd_id"] = short_skuid("sbd", s["sbd_id"]) for s in sbis: s["sbi_id"] = short_skuid("sbi", s["sbi_id"]) for e in ebs: e["eb_id"] = short_skuid("eb", e["eb_id"]) # Step 6: Map internal status to public labels self._map_status_to_labels(projects, "prj") self._map_status_to_labels(sbds, "sbd") self._map_status_to_labels(sbis, "sbi") self._map_status_to_labels(ebs, "eb") return projects, sbds, sbis, ebs
def _query_status_history(self, qry_params: StatusHistoryQuery) -> list[Status]: """Handle status history queries (private method). :param qry_params: Status history query parameters :return: List of status history records, empty list if no results """ query, params = select_status_history_query( self._status_mapping.table_details, qry_params.entity_id, qry_params.updated_by, qry_params.status, qry_params.start_date, qry_params.end_date, ) results = self._execute_and_return_rows(query, params) if results is None: return [] return [self._status_mapping.result_to_entity(row) for row in results] def _map_status_to_labels(self, records: list, entity_prefix: str) -> None: """Map internal numeric status to public labels. :param records: List of database records with status field :param entity_prefix: Entity type prefix (prj, sbd, sbi, eb) """ table_name = PREFIX_TO_TABLE[entity_prefix] entity_mapping = INTERNAL_STATUS_TO_PUBLIC_LABEL[table_name] for record in records: record["status"] = entity_mapping[record["status"]]