"""Functions that apply short-term workarounds by editing the inputs (SBDefinitions) and outputs (TMC commands) of OSO observations."""
import logging
from collections.abc import Iterable, Mapping
from typing import Any
from ska_oso_pdm import SBDefinition
from ska_tmc_cdm.messages.central_node.assign_resources import AssignResourcesRequest
from ska_tmc_cdm.messages.subarray_node.configure import ConfigureRequest
from ska_oso_scripting.engineering.low.utils.aiv_utils import get_telescope_name
__all__ = [
"update_assign_resources_request",
"update_configure_requests",
"update_scheduling_block_definition",
]
LOG = logging.getLogger(__name__)
[docs]
def update_assign_resources_request(
assign_resources_request: AssignResourcesRequest,
) -> AssignResourcesRequest:
"""
Applies Low AIV overrides to set of AssignResourcesRequest for TMC CentralNode
The purpose of this function is for any short-lived workarounds or changes that
eventually should be fixed ska-oso-scripting.pdm_transforms (or possibly further upstream)
"""
LOG.info("Setting telescope_name in mswriter parameters")
for pb in assign_resources_request.sdp_config.processing_blocks:
mswriter_params = pb.parameters.setdefault("mswriter", dict())
mswriter_params["telescope_name"] = get_telescope_name()
return assign_resources_request
[docs]
def update_scheduling_block_definition(sbd: SBDefinition) -> SBDefinition:
"""
Applies Low AIV overrides to an SBDefinition, returning the mutated SBDefinition.
The purpose of this function is for any short-lived workarounds or changes that
eventually should be fixed in the ODT.
"""
return sbd