"""Repository classes for the project schema (projects, sb_definitions)."""
import json
from psycopg import Connection
from ska_oso_pdm import Project, SBDefinition
from ska_oso_pdm.project import ObservingBlock
from ska_oso_pdm.sb_definition.sb_definition import SBDefinitionID
from ska_ser_skuid import short_skuid
from ska_db_oda.postgres.mapping import (
ObservingBlockMapping,
ProjectMapping,
SBDefinitionMapping,
)
from ska_db_oda.postgres.postgres_persistence import PostgresPersistence
from ska_db_oda.repository.base import AbstractRepository, AbstractVersionedRepository
from ska_db_oda.repository.domain import QueryParams, skuid_to_int
from ska_db_oda.repository.domain.errors import ODAIntegrityError
[docs]
class ProjectRepository(AbstractRepository[Project, str]):
"""Abstraction over persistent storage of Projects.
The add() method cascades to save any obs_blocks on the Project.
The get() method resolves children by joining with observing_blocks
and sb_definitions tables to return the full Project with obs_blocks
containing their sbd_ids.
"""
def __init__(self, conn: Connection):
super().__init__(conn)
self.postgres = PostgresPersistence(postgres_mapping=ProjectMapping(), connection=conn)
self._sbd_mapping = SBDefinitionMapping()
[docs]
def add(self, entity: Project, user: str | None = None) -> Project:
"""
Store the Project and any obs_blocks in the ODA.
First upserts the Project, then upserts each ObservingBlock from entity.obs_blocks
with the project's prj_id as the foreign key.
:param entity: The Project to store
:param user: The user performing the operation
:raises ODAError: if an error occurs while persisting
:return: The Project with resolved obs_blocks from the database
"""
existing_obs = self._get_ob_ids_for_project(entity.prj_id)
# Using this interface, we can add new empty observing blocks
# easily enough, but we don't want anyone to accidentally delete
# an OB + the SBDs it contains.
if not existing_obs.issubset(ob.obs_block_id for ob in entity.obs_blocks):
raise ODAIntegrityError(
"Cannot remove Observing Blocks by omitting them from 'obs_blocks': you"
" must delete them explicitly."
)
# Upsert the Project first (OBs have FK to Project)
self.postgres.upsert(entity, user)
# Upsert each ObservingBlock with the project's ID as prj_ref
if entity.obs_blocks:
ob_mapping = ObservingBlockMapping(prj_ref=entity.prj_id)
ob_persistence = PostgresPersistence(postgres_mapping=ob_mapping, connection=self.conn)
for ob in entity.obs_blocks:
ob_persistence.upsert(ob, user)
# Return the fully resolved Project from the database
return self.get(entity.prj_id)
[docs]
def add_observing_blocks(
self, prj_id: str, observing_blocks: list[ObservingBlock], user: str = None
) -> Project:
"""
Store ObservingBlocks in the ODA with a reference to the prj_id.
NOTE: The relationship between SBD -> OB is stored in the sb_definitions table. This
method will ignore any sbd_ids passed in the observing_blocks rather than update
relationships. To add an SBDefinition to an ObservingBlock, use the
SBDefinitionRepository.add method, including the ob_ref in the SBDefinition.
:param observing_blocks: The ObservingBlocks that should be stored in the Project
:param user: The user performing the operation
:raises ODAError: if an error occurs while persisting
:return: The full Project with resolved obs_blocks from the database
"""
ob_mapping = ObservingBlockMapping(prj_ref=prj_id)
ob_persistence = PostgresPersistence(postgres_mapping=ob_mapping, connection=self.conn)
for observing_block in observing_blocks:
ob_persistence.upsert(observing_block, user)
[docs]
def delete_observing_block(self, prj_id: str, ob_id: str) -> None:
"""Delete an ObservingBlock and all its child SBDefinitions.
This operation will cascade delete all SBDefinitions that belong to the
ObservingBlock. However, it will fail if any of those SBDefinitions have
SBInstances referencing them.
:param prj_id: The Project ID that owns the ObservingBlock
:param ob_id: The ObservingBlock ID to delete
:raises ODANotFound: If the ObservingBlock is not found
:raises ODAIntegrityError: If the OB doesn't belong to the project,
or if any SBDefinitions have SBInstances referencing them
"""
# Verify the OB belongs to this project
existing_obs = self._get_ob_ids_for_project(prj_id)
if ob_id not in existing_obs:
raise ODAIntegrityError(f"ObservingBlock {ob_id} does not belong to Project {prj_id}")
# Get all SBDefinitions under this OB
sbd_ids = self._get_sbd_ids_for_ob(ob_id)
# Check if any SBDefinitions have SBInstances
sbi_count = self._count_sbinstances_for_sbds(sbd_ids)
if sbi_count > 0:
raise ODAIntegrityError(
f"Cannot delete ObservingBlock {ob_id}: {sbi_count} SBInstance(s) "
"already reference its SBDefinitions."
)
# Delete all SBDefinitions first
sbd_persistence = PostgresPersistence(
postgres_mapping=self._sbd_mapping, connection=self.conn
)
sbd_persistence.delete(*sbd_ids)
# Delete the ObservingBlock
ob_mapping = ObservingBlockMapping(prj_ref=prj_id)
ob_persistence = PostgresPersistence(postgres_mapping=ob_mapping, connection=self.conn)
ob_persistence.delete(ob_id)
[docs]
def get(self, entity_id: str) -> Project:
"""
Retrieve the Project with resolved obs_blocks and sbd_ids.
Fetches the project and resolves all child observing_blocks, populating
observing_block with its list of sbd_ids from the sb_definitions table.
:param entity_id: The project ID (prj_id)
:raises ODANotFound: if the project is not found
:return: Project with fully resolved obs_blocks containing sbd_ids
"""
# Fetch the base project
project = self.postgres.read(entity_id)
# Fetch observing blocks for this project with their sbd_ids in a single query
query = """
SELECT
ob.id AS id,
ob.data AS data,
ob.version AS version,
ob.created_by AS created_by,
ob.created_on AS created_on,
ob.last_modified_by AS last_modified_by,
ob.last_modified_on AS last_modified_on,
COALESCE(
json_agg(tool.skuid('sbd', sbd.id)) FILTER (WHERE sbd.id IS NOT NULL),
'[]'::json
) AS sbd_ids
FROM project.observing_blocks ob
LEFT JOIN project.sb_definitions sbd ON sbd.ob_fk = ob.id
WHERE ob.prj_fk = tool.skuid(%s)
GROUP BY ob.id, ob.data, ob.version, ob.created_by, ob.created_on,
ob.last_modified_by, ob.last_modified_on
"""
def query_json_loads(json_str: bytes):
if json_str.decode("utf-8").startswith("["):
return json.loads(json_str)
return ob_mapping.jsonb_load(json_str)
with self.conn.cursor() as cur:
# Set up jsonb deserialization for ObservingBlock
from psycopg.types.json import set_json_loads
ob_mapping = ObservingBlockMapping()
set_json_loads(query_json_loads, cur)
cur.execute(query, (entity_id,))
rows = cur.fetchall()
# Build the obs_blocks list
obs_blocks = []
for row in rows:
ob = ob_mapping.result_to_entity(row)
sbd_ids = row["sbd_ids"]
ob.sbd_ids = sbd_ids if sbd_ids else []
obs_blocks.append(ob)
# Set the resolved obs_blocks on the project
project.obs_blocks = obs_blocks
return project
[docs]
def query(self, qry_param: QueryParams) -> list[Project]:
"""Override the parent method to ensure the Observing Blocks are also resolved."""
projects_without_obs = super().query(qry_param)
# This inefficiently reuses the get method to resolve the OBs rather than
# doing it in one query as the query interface is too difficult to
# customise. BTN-3035 should address this
return [self.get(project.prj_id) for project in projects_without_obs]
def _get_ob_ids_for_project(self, prj_id: str) -> frozenset[str]:
query = """
SELECT ob.id from project.observing_blocks ob
WHERE ob.prj_fk = tool.skuid(%s)
"""
with self.conn.cursor() as cur:
cur.execute(query, (prj_id,))
return frozenset(short_skuid("ob", r["id"]) for r in cur)
def _get_sbd_ids_for_ob(self, ob_id: str) -> frozenset[str]:
query = "SELECT id FROM project.sb_definitions WHERE parent_fk = tool.skuid(%s)"
with self.conn.cursor() as cur:
cur.execute(query, (ob_id,))
return frozenset(short_skuid("sbd", r["id"]) for r in cur)
def _count_sbinstances_for_sbds(self, sbd_ids: frozenset[str]) -> int:
if not sbd_ids:
return 0
query = "SELECT COUNT(*) FROM execution.sb_instances WHERE parent_fk = ANY(%s)"
with self.conn.cursor() as cur:
cur.execute(query, ([skuid_to_int(i) for i in sbd_ids],))
return cur.fetchone()["count"]
[docs]
class SBDefinitionRepository(AbstractVersionedRepository[SBDefinition, SBDefinitionID]):
"""Abstraction over persistent storage of SBDefinitions.
Extends AbstractVersionedRepository to support retrieving specific
historical versions of SBDefinitions via the get_version() method.
"""
def __init__(self, conn: Connection):
super().__init__(conn)
self.postgres = PostgresPersistence(postgres_mapping=SBDefinitionMapping(), connection=conn)
[docs]
def delete(self, sbd_id: str) -> None:
"""Delete an SBDefinition by its identifier.
This operation will fail if any SBInstances reference this SBDefinition.
:param sbd_id: The SBDefinition ID to delete
:raises ODANotFound: If the sbd_id is not found in the database
:raises ODAIntegrityError: If SBInstances reference this SBDefinition
"""
sbi_count = self._count_sbinstances_for_sbd(sbd_id)
if sbi_count > 0:
raise ODAIntegrityError(
f"Cannot delete SBDefinition {sbd_id}: {sbi_count} SBInstance(s) "
"reference it. Delete the SBInstances first."
)
self.postgres.delete(sbd_id)
def _count_sbinstances_for_sbd(self, sbd_id: str) -> int:
"""Count the number of SBInstances that reference this SBDefinition."""
query = "SELECT COUNT(*) FROM execution.sb_instances WHERE parent_fk = tool.skuid(%s)"
with self.conn.cursor() as cur:
cur.execute(query, (sbd_id,))
return cur.fetchone()["count"]