import copy
import logging
import os
from typing import Union
from ska_oso_pdm.sb_definition import SBDefinition
from ska_ser_skuid.client import SkuidClient
from ska_ser_skuid.services import SKAEntityTypeService
LOGGER = logging.getLogger(__name__)
[docs]
def create_sbi(sbd: SBDefinition) -> SBDefinition:
"""
Create a Scheduling Block Instance from a Scheduling Block Definition.
Currently, an SBI is a snapshot of an SBD but with EB and PB IDs replaced.
"""
# TODO inject this dependency
uid_client = SkuidClient(os.environ["SKUID_URL"])
# Create a new ID for the SBI
sbi_map = {
sbd.sbd_id: uid_client.fetch_skuid(
SKAEntityTypeService.DefaultEntityTypes.SchedulingBlockInstance
)
}
LOGGER.info(f"New SBI ID mapping: {sbd.sbd_id} -> {sbi_map[sbd.sbd_id]}")
# Create a mapping of old processing block IDs to new IDs
if sbd.sdp_configuration is not None:
sdp_config = sbd.sdp_configuration
pb_map = {
pb.pb_id: uid_client.fetch_skuid(
SKAEntityTypeService.DefaultEntityTypes.ProcessingBlock
)
for pb in sdp_config.processing_blocks
if sdp_config is not None
}
for old_id, new_id in pb_map.items():
LOGGER.info(f"New PB ID mapping: {old_id} -> {new_id}")
sbi = copy.deepcopy(sbd)
# update the top-level ID. This is the SBI ID.
sbi.sbd_id = sbi_map[sbd.sbd_id]
if sbd.sdp_configuration is not None:
sdp_config = sbi.sdp_configuration
# Get EB ID for SDP Execution Block
#
# Currently, SBIs are effectively straight copies of SBDs. Eventually,
# SBIs will become real entities with an entity ID, stored in the ODA.
# These entities will be lightweight, *referencing* a specific SBD
# version, rather than including all the SBD content in the SBI.
# Once this is implemented, copying of EB ID into the SDP config would
# be done in the PDM transform rather than when creating an SBI.
sdp_config.execution_block.eb_id = uid_client.fetch_skuid(
SKAEntityTypeService.DefaultEntityTypes.ExecutionBlock
)
# Trawl through the SDP Processing Blocks replacing old IDs with new
for pb in sdp_config.processing_blocks:
pb.pb_id = pb_map[pb.pb_id]
# Expect the mapping of PBs to SBDs and (or??) SBIs to become
# more complicated. For a commensal SB built from a host SB and
# guest SB (e.g., interferometric observing SB plus a pulsar
# search SB), certain processing blocks might only reference their
# originating SBs. That's moot at the moment as we only operate on
# one SB.
if sbi.sbd_id not in pb.sbi_refs:
pb.sbi_refs.append(sbi.sbd_id)
if pb.dependencies:
for dependency in pb.dependencies:
dependency.pb_ref = pb_map[dependency.pb_ref]
return sbi
[docs]
def load_sbd(path: Union[str, os.PathLike]) -> SBDefinition:
"""
Load an SBDefinition from a JSON file on disk.
:param path: path to SBD.
:return: SBDefinition object
"""
if not os.path.isfile(path):
msg = f"SB file not found: {path}"
LOGGER.error(msg)
raise IOError(msg)
with open(path, "r", encoding="utf-8") as infile:
return SBDefinition.model_validate_json(infile.read())
[docs]
def save_sbi(sbi: SBDefinition, path: str):
"""
Save an SBI to disk.
Saves an SBI (really, an SBD but with fixed IDs) the specified path.
:param sbi: SBI to serialise
:param path: output file to write
"""
with open(path, "w", encoding="utf-8") as outfile:
outfile.write(sbi.model_dump_json())
return path