Source code for ska_oso_scripting.pdm_transforms.sdp

"""
The pdm_transforms.sdp module contains code to create SDP Configuration
Data Model (CDM) entities from a Project Data Model (PDM) Scheduling Block
Definition.
"""

import functools
import json
import logging
import os
from enum import Enum
from operator import itemgetter
from typing import Any, Callable, Iterable, Optional, TypeVar, Union

import astropy.units as u
from astropy.units import Quantity
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from ska_oso_pdm import SBDefinition, TelescopeType
from ska_oso_pdm.sb_definition import (
    Beam,
    CSPConfiguration,
    PointingKind,
    ScanDefinition,
    Target,
)
from ska_oso_pdm.sb_definition.csp.lowcbf import Correlation
from ska_oso_pdm.sb_definition.csp.midcbf import CorrelationSPWConfiguration
from ska_oso_pdm.sb_definition.sdp.sdp_configuration import SDPConfiguration, SDPScript
from ska_ser_skuid import EntityType, mint_skuid
from ska_telmodel_client import TMData
from ska_tmc_cdm.messages.central_node.sdp import BeamConfiguration
from ska_tmc_cdm.messages.central_node.sdp import Channel as cdm_Channel
from ska_tmc_cdm.messages.central_node.sdp import (
    ChannelConfiguration as cdm_ChannelConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import EBScanType, EBScanTypeBeam
from ska_tmc_cdm.messages.central_node.sdp import (
    ExecutionBlockConfiguration as cdm_EBConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    FieldConfiguration as cdm_FieldConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import PbDependency
from ska_tmc_cdm.messages.central_node.sdp import (
    PolarisationConfiguration as cdm_PolarisationConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    ProcessingBlockConfiguration as cdm_ProcessingBlockConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    ScriptConfiguration as cdm_ScriptConfiguration,
)
from ska_tmc_cdm.messages.central_node.sdp import (
    SDPConfiguration as cdm_centralnode_SDPConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.sdp import (
    SDPConfiguration as cdm_subarraynode_SDPConfiguration,
)
from tango import DevFailed, DeviceProxy

from ska_oso_scripting import get_eb_id_from_env
from ska_oso_scripting.api import is_ska_mid_environment
from ska_oso_scripting.api.functional import messages
from ska_oso_scripting.core.tango import TangoDeviceProxyFactory, TangoRegistry
from ska_oso_scripting.pdm_transforms.common import (
    convert_target,
    get_aperture_ids,
    get_dish_ids,
    max_number_of_pst_beams,
)
from ska_oso_scripting.pdm_transforms.csp import LOW_CHANNEL_WIDTH, MID_CHANNEL_WIDTH

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)


# Not every function in this module should be called externally
__all__ = [
    "create_sdpconfiguration_centralnode",
    "create_sdpconfiguration_subarraynode",
]

POLARISATIONS_ID = "all"
FIVE_POINT_SCANTYPE_PREFIX = "pointing-"
PST_SPWS_FACTOR = 1.5

SDP_CONFIGURE_INTERFACE_MID = "https://schema.skao.int/ska-sdp-configure/1.0"
SDP_CONFIGURE_INTERFACE_LOW = "https://schema.skao.int/ska-sdp-configure/0.4"

SDP_TMDATA_SOURCE = os.getenv(
    "SDP_SCRIPT_TMDATA",
    "car://gitlab.com/ska-telescope/sdp/ska-sdp-script?master#tmdata",
)

VersionedObj = TypeVar("VersionedObj")
ScanTypeGroup = tuple[tuple[ScanDefinition, int], ...]
ScanTypeGroupBuilder = Callable[[SBDefinition], Iterable[ScanTypeGroup]]
ScanTypeIdBuilder = Callable[[ScanTypeGroup, list[Target]], str]


class BeamFunction(Enum):
    """
    Enumeration of possible functions for an SDP Beam.
    """

    VISIBILITIES = "visibilities"
    PULSAR_SEARCH = "pulsar search"
    PULSAR_TIMING = "pulsar timing"
    VLBI = "vlbi"
    TRANSIENT_BUFFER = "transient buffer"


def vis_beam_id(index: int):
    return f"vis{index}"


def pst_beam_id(index: int):
    return f"pst{index}"


def vis_channels(index: int):
    return f"vis_channels_{index}"


def pulsar_channels(index: int):
    return f"pulsar_channels_{index}"


mid_processing_block_parameters_vis_receive_single_scan_dict = {"channels_per_port": 20}

mid_processing_block_parameters_vis_receive_5pt_scan_dict = {
    "telstate": {
        "target_fqdn": "tango://mid-dish/dish-manager/{dish_id}/programTrackTable",
        "source_offset_fqdn": "tango://mid-tmc/leaf-node-dish/{dish_id}/sourceOffset",
        "direction_fqdn": "tango://mid-dish/ds-manager/{dish_id}/achievedPointing",
    }
}

mid_processing_block_parameters_pointing_offset_dict = {"num_scans": 5}


def convert_target_to_fieldconfiguration(
    target: Target,
    pointing_fqdn: Optional[str] = None,
) -> cdm_FieldConfiguration:
    """
    Convert a PDM Target to the equivalent CDM FieldConfiguration.

    :param target: the PDM Target to convert
    :param pointing_fqdn: Tango FQDN of device outputting ephemeris coords
    :raises NotImplementError: if converting an unsupported coordinate system
        or pointing pattern.
    """
    LOG.info(
        f"Setting target attribute -> target_id:{target.target_id} ,"
        f" reference_coordinate:{target.reference_coordinate} ",
    )

    phase_dir = convert_target(target, apply_pattern_offsets=True)

    return cdm_FieldConfiguration(
        field_id=target.target_id,
        pointing_fqdn=pointing_fqdn,
        phase_dir=phase_dir,
    )


def convert_pst_to_fieldconfiguration(
    pst_beam: Beam,
) -> cdm_FieldConfiguration:
    """
    Convert a PDM Target PST to the equivalent CDM FieldConfiguration.
    :param pst_beam: the PDM Target PST to convert
    """

    phase_dir = convert_target(pst_beam)

    field_id = pst_beam.beam_name

    return cdm_FieldConfiguration(
        field_id=field_id,
        phase_dir=phase_dir,
    )


def create_scan_types(sbd: SBDefinition) -> list[EBScanType]:
    """
    Create an EBScanTypes from SBDs
    """
    if sbd.telescope == TelescopeType.SKA_MID:
        return _create_scan_types_by_configuration(
            sbd=sbd,
            scan_type_group_builder=_mid_scan_type_groups,
            scan_type_id_builder=_scan_type_id,
        )
    else:
        return _create_scan_types_by_configuration(
            sbd=sbd,
            scan_type_group_builder=_low_scan_type_groups,
            scan_type_id_builder=_scan_type_id,
        )


def _create_scan_types_by_configuration(
    sbd: SBDefinition,
    *,
    scan_type_group_builder: ScanTypeGroupBuilder,
    scan_type_id_builder: ScanTypeIdBuilder,
) -> list[EBScanType]:
    scan_types = []
    csp_configuration_ids = [
        csp_configuration.config_id for csp_configuration in sbd.csp_configurations
    ]
    for scan_type_group in scan_type_group_builder(sbd):
        beams: dict[str, EBScanTypeBeam] = {}
        for scan_definition, vis_beam_index in scan_type_group:
            _populate_scan_type_beams(
                beams=beams,
                scan_definition=scan_definition,
                vis_beam_index=vis_beam_index,
                targets=sbd.targets,
                csp_configuration_ids=csp_configuration_ids,
            )

        scan_type_id = scan_type_id_builder(scan_type_group, sbd.targets)
        eb_scan_type = EBScanType(
            scan_type_id=scan_type_id,
            beams=beams,
        )

        scan_types.append(eb_scan_type)

    return scan_types


def _mid_scan_type_groups(sbd: SBDefinition) -> Iterable[ScanTypeGroup]:
    return (
        ((scan_definition, 1),)
        for scan_definition in sbd.dish_allocations.scan_sequence
    )


def _low_scan_type_groups(sbd: SBDefinition) -> Iterable[ScanTypeGroup]:
    # We want to create a scan type for each scan to be performed, so
    # zip the scans for all the subarray beams and loop over them
    return (
        tuple(
            (scan_definition, subarray_beam_index)
            for subarray_beam_index, scan_definition in enumerate(
                subarray_beams_scan_definitions, start=1
            )
        )
        for subarray_beams_scan_definitions in zip(
            *(
                subarray_beam.scan_sequence
                for subarray_beam in sbd.mccs_allocation.subarray_beams
            )
        )
    )


def _scan_type_id(scan_type_group: ScanTypeGroup, targets: list[Target]) -> str:
    scan_definition, _ = scan_type_group[0]
    target = next(
        target for target in targets if target.target_id == scan_definition.target_ref
    )
    if target.pointing_pattern.active == PointingKind.FIVE_POINT:
        return FIVE_POINT_SCANTYPE_PREFIX + scan_definition.scan_definition_id
    return scan_definition.scan_definition_id


def _populate_scan_type_beams(
    beams: dict[str, EBScanTypeBeam],
    scan_definition: ScanDefinition,
    *,
    vis_beam_index: int,
    targets: list[Target],
    csp_configuration_ids: list[str],
) -> None:
    """
    function to mutate the
    """
    target = next(
        target for target in targets if target.target_id == scan_definition.target_ref
    )
    # The channel ids are created from the csp config indexes, so we need it here
    # to match up the correct channel
    csp_config_index = csp_configuration_ids.index(
        scan_definition.csp_configuration_ref
    )
    # But the sdp config is 1-based rather than 0-based
    channel_id_index = csp_config_index + 1

    beams[vis_beam_id(vis_beam_index)] = EBScanTypeBeam(
        field_id=scan_definition.target_ref,
        channels_id=vis_channels(channel_id_index),
        polarisations_id=POLARISATIONS_ID,
    )

    if target.tied_array_beams and target.tied_array_beams.pst_beams:
        existing_pst_count = sum(1 for k in beams if k.startswith("pst"))
        for i, pst_beam in enumerate(
            target.tied_array_beams.pst_beams, start=existing_pst_count + 1
        ):
            beams[pst_beam_id(i)] = EBScanTypeBeam(
                field_id=pst_beam.beam_name,
                channels_id=pulsar_channels(channel_id_index),
                polarisations_id=POLARISATIONS_ID,
            )


def create_spectral_window(
    sw_id: int,
    spectral_window: Union[Correlation, CorrelationSPWConfiguration],
    spectral_window_type: BeamFunction,
    start: int = 0,
) -> cdm_Channel:
    """
    Create a SpectralWindow.

    :param sw_id: the id version of the spectral window type
    :param spectral_window: the spectral window produced by the correlator either PDM Correlation in the case of SKA MID
    :param spectral_window_type: the type of spectral window being created
    :param start: the start channel of the spectral window being created
    or PDM CorrelationSPWConfiguration in the case of SKA LOW
    """

    downshift_freq_hz = 0.0

    match spectral_window:
        case CorrelationSPWConfiguration():
            zoom_channel_width = (
                MID_CHANNEL_WIDTH.to(u.Hz).value / 2**spectral_window.zoom_factor
            )
            count = spectral_window.number_of_channels

        case Correlation():
            zoom_channel_width = LOW_CHANNEL_WIDTH.to(u.Hz).value
            count = spectral_window.number_of_channels * 144
            # Downshift start and end frequencies by half a fine channel width.
            # PST divides each coarse channel into 216 fine channels; the correlator uses 144*24.
            if spectral_window_type == BeamFunction.PULSAR_TIMING:
                fine_channel_width = LOW_CHANNEL_WIDTH / 216
            else:
                fine_channel_width = LOW_CHANNEL_WIDTH / (144 * 24)
            downshift_freq_hz = fine_channel_width.to(u.Hz).value / 2.0
        case _:
            raise TypeError(
                f"expected PDM Correlation or CorrelationSPWConfiguration, got {type(spectral_window)}"
            )

    freq_min = (
        spectral_window.centre_frequency
        - ((zoom_channel_width * spectral_window.number_of_channels) / 2)
        - downshift_freq_hz
    )

    freq_max = (
        spectral_window.centre_frequency
        + ((zoom_channel_width * spectral_window.number_of_channels) / 2)
        - downshift_freq_hz
    )

    match spectral_window_type:
        case BeamFunction.VISIBILITIES:
            spectral_window_id = f"vis_spw_{sw_id}"
        case BeamFunction.PULSAR_TIMING:
            spectral_window_id = "spw_pulsar"
            count = count * PST_SPWS_FACTOR
        case _:
            raise TypeError(
                f"unsupported BeamFunction type {type(spectral_window_type)}"
            )

    return cdm_Channel(
        spectral_window_id=spectral_window_id,
        count=count,
        start=start,
        freq_min=freq_min,
        freq_max=freq_max,
        stride=1,
    )


def create_spws(
    csp_config: CSPConfiguration, spw_type: BeamFunction = BeamFunction.VISIBILITIES
) -> list[cdm_Channel]:
    """
    Create a list of spectral window definitions based on the correlator configuration.

    :param csp_config: the PDM CSPConfiguration to use to create the CDM Channel
    :param spw_type: the function of the Beam spectral window to create
    """
    if csp_config.midcbf:
        correlation_spws = csp_config.midcbf.subbands[0].correlation_spws
    else:
        correlation_spws = csp_config.lowcbf.correlation_spws

    counter = 0
    spectral_windows = []
    for sw_count, obj in enumerate(correlation_spws, start=1):
        spectral_window = create_spectral_window(
            sw_id=sw_count,
            spectral_window=obj,
            spectral_window_type=spw_type,
            start=counter,
        )
        counter = counter + spectral_window.count
        spectral_windows.append(spectral_window)

    return spectral_windows


def _get_first_value_for_device_property(proxy: DeviceProxy, name: str) -> Any:
    """
    Retrieve the first value for a specified device property from the given DeviceProxy.

    This function queries the properties of a device via the given DeviceProxy object and
    extracts the first value of the specified property.

    Parameters:
    proxy : DeviceProxy
        The DeviceProxy instance for the device to query.
    name : str
        The name of the device property for which the first value should be retrieved.

    Returns:
    Any
        The first value of the specified device property.
    """
    dbdata: dict[str, list[Any]] = proxy.get_property(name)
    device_property_values = dbdata[name]
    return device_property_values[0]


def _select_best_version(
    sdp_target_version: str,
    objs: Iterable[VersionedObj],
    *,
    version_key: Callable[[VersionedObj], str] = itemgetter("version"),
    compat_key: Callable[[VersionedObj], str],
    version_specifier: str = "",
) -> VersionedObj:
    """
    Pick the newest member of `objs` compatible with `sdp_target_version`.

    The version of each obj is determined by `version_key`, which when passed an obj,
    should return a Python-packaging-compatible version number like "0.1.5".

    Compatibility is determined by `compat_key`, which when passed a member of `objs`,
    should return a Python-packaging-compatible version specifier like ">=0.1,<0.2".

    `version_specifier` is an optional Python-packaging-compatible version specifier
    that further restricts which obj versions are considered. It can combine
    constraints (e.g., "==1.*,!=1.5.0" to pin to major version 1 while excluding
    1.5.0). Pre-release versions are excluded unless the specifier itself references
    a pre-release (e.g., "==1.0a1").
    """
    pin = SpecifierSet(version_specifier)
    selected_version, *_ = sorted(
        (
            obj
            for obj in objs
            if sdp_target_version in SpecifierSet(compat_key(obj))
            and version_key(obj) in pin
        ),
        key=lambda obj: Version(version_key(obj)),
        reverse=True,
    )
    return selected_version


def select_sdp_script_version_for_deployed_controller(
    script_name: SDPScript, version_specifier: str = ""
) -> str:
    """
    Find the latest version of the given script supported by the SDP version specified by the
    SDP Controller Tango device.

    :param script_name: name of the SDP script
    :param version_specifier: optional PEP 440 specifier restricting which versions
        are eligible (e.g., "==1.*" to pin to major version 1)."""
    sdp_controller_trl = TangoRegistry.get_sdp_controller_node()

    try:
        proxy = TangoDeviceProxyFactory()(sdp_controller_trl)
    except DevFailed:
        raise RuntimeError(
            "SDP Controller node is not available. Unable to determine the SDP version being used"
        )

    sdp_version = json.loads(proxy.sdpVersion)["version"]

    return select_sdp_script_version_for_sdp_version(
        sdp_version, script_name, version_specifier
    )


def select_sdp_script_version_for_sdp_version(
    sdp_version: str, script_name: SDPScript, version_specifier: str = ""
) -> str:
    """
    Find the latest version of the given script supported by the supplied `sdp_version`

    :param sdp_version: the version of SDP to query TMData for to get all supported scripts and script versions
    :param script_name: name of the SDP script
    :param version_specifier: optional PEP 440 specifier restricting which versions
        are eligible (e.g., "==1.*" to pin to major version 1).
    """
    # The vis-receive version corresponding to the highest SDP version
    # less than or equal to sdp_version version will be selected.
    tmdata = TMData(
        update=True,
        source_uris=[SDP_TMDATA_SOURCE],
    )
    scripts = filter(
        lambda script: script["name"] == script_name.value,
        tmdata["ska-sdp/scripts/scripts.yaml"].get_dict()["scripts"],
    )
    try:
        return _select_best_version(
            sdp_version,
            scripts,
            compat_key=itemgetter("sdp_version"),
            version_specifier=version_specifier,
        )["version"]
    except ValueError:
        raise ValueError(
            f"SDP script {script_name.value} not found in SDP Script TMData"
        )


def get_default_vis_receive_parameters(
    subarray_id: int, telescope: TelescopeType
) -> dict[str, Any]:
    match telescope:
        case TelescopeType.SKA_MID:
            parameters = mid_processing_block_parameters_vis_receive_single_scan_dict
        case TelescopeType.SKA_LOW:
            parameters = {}
        case _:
            raise ValueError(f"Telescope {telescope.value} not supported")

    csp_leaf_node_trl = TangoRegistry.get_csp_subarray_leaf_node(subarray_id)
    try:
        proxy = TangoDeviceProxyFactory()(csp_leaf_node_trl)
        telmodel_key = _get_first_value_for_device_property(proxy, "TelmodelPath")
        telmodel_source = _get_first_value_for_device_property(proxy, "TelmodelSource")
        extra_helm_values = {
            "receiver": {
                "options": {
                    "telescope_model": {
                        "telmodel_key": telmodel_key,
                        "telmodel_source_uris": telmodel_source,
                    }
                }
            }
        }

    except DevFailed:
        msg = "CSP leaf node is not available. Unable to inform SDP which telmodel source CSP is using."
        messages.publish_event_message(msg=msg)
        LOG.warning(msg)
        extra_helm_values = {}

    if extra_helm_values:
        parameters["extra_helm_values"] = extra_helm_values

    return parameters


def _deep_merge_param_dicts(
    base_params: dict[str, Any], user_params: dict[str, Any] = None
) -> dict[str, Any]:
    if user_params is None:
        user_params = {}
    result = base_params.copy()
    for user_key, user_value in user_params.items():
        if (
            user_key in result
            and isinstance(result[user_key], dict)
            and isinstance(user_value, dict)
        ):
            result[user_key] = _deep_merge_param_dicts(result[user_key], user_value)
        else:
            result[user_key] = user_value
    return result


SCRIPT_CONFIGS = {
    SDPScript.VIS_RECEIVE: functools.partial(
        cdm_ScriptConfiguration, name=SDPScript.VIS_RECEIVE.value, kind="realtime"
    ),
    SDPScript.POINTING_OFFSET: functools.partial(
        cdm_ScriptConfiguration, name=SDPScript.POINTING_OFFSET.value, kind="realtime"
    ),
    SDPScript.CONTINUUM_IMAGING: functools.partial(
        cdm_ScriptConfiguration, name=SDPScript.CONTINUUM_IMAGING.value, kind="batch"
    ),
}


class ProcessingBlockFactory:
    def __init__(self, telescope: TelescopeType, subarray_id: int):
        self.telescope = telescope
        self.subarray_id = subarray_id
        self._pb_template = functools.partial(
            cdm_ProcessingBlockConfiguration,
            sbi_ids=[],
        )
        self._pbs = {}

    def create_pb(
        self,
        script_name: SDPScript,
        version: str = "latest",
        extra_parameters: dict[str, Any] = None,
    ):
        if version == "latest":
            version = select_sdp_script_version_for_deployed_controller(script_name)
        pb_id = mint_skuid(EntityType.PB, form="long")
        parameters = None
        dependencies = None

        match script_name:
            case SDPScript.VIS_RECEIVE:
                parameters = _deep_merge_param_dicts(
                    get_default_vis_receive_parameters(
                        self.subarray_id, self.telescope
                    ),
                    extra_parameters,
                )
            case SDPScript.POINTING_OFFSET:
                parameters = _deep_merge_param_dicts(
                    mid_processing_block_parameters_pointing_offset_dict,
                    extra_parameters,
                )
                if SDPScript.VIS_RECEIVE not in self._pbs.keys():
                    raise ValueError(
                        "No vis-receive script defined in processing blocks yet, cannot add pointing-offset script."
                    )
                self._pbs[SDPScript.VIS_RECEIVE].parameters.update(
                    mid_processing_block_parameters_vis_receive_5pt_scan_dict
                )
                self._pbs[SDPScript.VIS_RECEIVE].dependencies = [
                    PbDependency(pb_id=pb_id, kind=[script_name.value])
                ]
                dependencies = [
                    PbDependency(
                        pb_id=self._pbs[SDPScript.VIS_RECEIVE].pb_id,
                        kind=[self._pbs[SDPScript.VIS_RECEIVE].script.name],
                    )
                ]
            case SDPScript.CONTINUUM_IMAGING:
                parameters = extra_parameters
                if SDPScript.VIS_RECEIVE not in self._pbs.keys():
                    raise ValueError(
                        "No vis-receive script defined in processing blocks yet, cannot add continuum-imaging script."
                    )
                dependencies = [
                    PbDependency(
                        pb_id=self._pbs[SDPScript.VIS_RECEIVE].pb_id,
                        kind=[self._pbs[SDPScript.VIS_RECEIVE].script.name],
                    )
                ]
            case _:
                raise ValueError(f"SDP script type {script_name} not supported")

        self._pbs[script_name] = self._pb_template(
            pb_id=pb_id,
            script=SCRIPT_CONFIGS[script_name](version=version),
            parameters=parameters,
            dependencies=dependencies,
        )

    def get_processing_blocks(self):
        return [self._pbs[pb_key] for pb_key in self._pbs.keys()]


def create_processing_blocks_from_sdp_params(
    subarray_id: int, telescope: TelescopeType, sdp_configs: list[SDPConfiguration]
):
    pb_factory = ProcessingBlockFactory(telescope, subarray_id)
    # We want the vis-receive to be at the front so that it can be referenced by other scripts
    sdp_configs.sort(key=lambda sdp_item: sdp_item.sdp_script != SDPScript.VIS_RECEIVE)
    for sdp_config in sdp_configs:
        pb_factory.create_pb(
            sdp_config.sdp_script,
            sdp_config.script_version,
            sdp_config.script_parameters,
        )

    return pb_factory.get_processing_blocks()


def create_default_processing_blocks(
    subarray_id: int, telescope: TelescopeType, targets: list[Target]
):
    pb_factory = ProcessingBlockFactory(telescope, subarray_id)
    pb_factory.create_pb(SDPScript.VIS_RECEIVE)
    if any(
        target.pointing_pattern.active == PointingKind.FIVE_POINT for target in targets
    ):
        pb_factory.create_pb(SDPScript.POINTING_OFFSET)

    return pb_factory.get_processing_blocks()


def create_channels(sbd: SBDefinition) -> list[cdm_ChannelConfiguration]:
    """
    This function creates a channel for every CSP configuration, plus an extra
    pulsar channel for each CSP configuration that has do_pst
    """
    channels = []
    for csp_configuration_index, csp_configuration in enumerate(
        sbd.csp_configurations, start=1
    ):
        channels.append(
            cdm_ChannelConfiguration(
                channels_id=vis_channels(csp_configuration_index),
                spectral_windows=create_spws(csp_configuration),
            )
        )

        if sbd.telescope == TelescopeType.SKA_LOW and csp_configuration.lowcbf.do_pst:
            channels.append(
                cdm_ChannelConfiguration(
                    channels_id=pulsar_channels(csp_configuration_index),
                    spectral_windows=create_spws(
                        csp_config=csp_configuration,
                        spw_type=BeamFunction.PULSAR_TIMING,
                    ),
                )
            )

    return channels


def create_fields(sbd: SBDefinition) -> list[cdm_FieldConfiguration]:
    fields = [convert_target_to_fieldconfiguration(target) for target in sbd.targets]

    pst_fields = [
        convert_pst_to_fieldconfiguration(pst)
        for target in sbd.targets
        if (target.tied_array_beams and target.tied_array_beams.pst_beams)
        for pst in target.tied_array_beams.pst_beams
    ]

    return fields + pst_fields


# Changed in light of BTN-2946 - scripting is now responsible for
# creating the correct number of PST beams based on the maximum
# number of concurrent PST beams needed across all scan slots rather than
# consolidating this from the target info.
def create_beams(sbd: SBDefinition) -> list[BeamConfiguration]:
    match sbd.telescope:
        case TelescopeType.SKA_LOW:
            beams = [
                BeamConfiguration(
                    beam_id=vis_beam_id(subarray_beam_index),
                    function="visibilities",
                    visibility_beam_id=subarray_beam_index,
                )
                for subarray_beam_index, subarray_beam in enumerate(
                    sbd.mccs_allocation.subarray_beams, start=1
                )
            ]
        case TelescopeType.SKA_MID:
            beams = [
                BeamConfiguration(
                    beam_id=vis_beam_id(1),
                    function="visibilities",
                    visibility_beam_id=1,
                )
            ]

    pst_beams = [
        BeamConfiguration(
            beam_id=pst_beam_id(i),
            function="pulsar timing",
            timing_beam_id=i,
        )
        for i in range(1, max_number_of_pst_beams(sbd) + 1)
    ]

    return beams + pst_beams


def create_central_node_sdp_config(
    subarray_id: int, execution_block: cdm_EBConfiguration, pdm_config: SBDefinition
) -> cdm_centralnode_SDPConfiguration:
    processing_blocks = []
    if pdm_config.sdp_configurations:
        processing_blocks.extend(
            create_processing_blocks_from_sdp_params(
                subarray_id=subarray_id,
                telescope=pdm_config.telescope,
                sdp_configs=pdm_config.sdp_configurations,
            )
        )
    else:
        processing_blocks.extend(
            create_default_processing_blocks(
                subarray_id=subarray_id,
                telescope=pdm_config.telescope,
                targets=pdm_config.targets,
            )
        )

    if pdm_config.mccs_allocation:
        receptors = get_aperture_ids(mccs_allocation=pdm_config.mccs_allocation)
    else:
        receptors = get_dish_ids(dish_allocation=pdm_config.dish_allocations)

    return cdm_centralnode_SDPConfiguration(
        execution_block=execution_block,
        resources={"receptors": receptors},
        processing_blocks=processing_blocks,
    )


[docs] def create_sdpconfiguration_centralnode( subarray_id: int, sbd: SBDefinition, ) -> cdm_centralnode_SDPConfiguration: """ Create a SDPConfiguration. :param subarray_id: the subarray id to target :param pdm_config: the SBDefinition to use in creation :raises TypeError: if pdm_config is not an SBDefinition """ if not isinstance(sbd, SBDefinition): raise TypeError(f"Expected PDM SBDefinition, got {type(sbd)}") eb_id = get_eb_id_from_env() channel_config = create_channels(sbd=sbd) fields = create_fields(sbd=sbd) beams = create_beams(sbd=sbd) scan_types = create_scan_types(sbd=sbd) execution_block = cdm_EBConfiguration( eb_id=eb_id, max_length=_total_scan_duration(sbd).to(u.s).value, context={}, beams=beams, scan_types=scan_types, channels=channel_config, polarisations=[ cdm_PolarisationConfiguration( polarisations_id=POLARISATIONS_ID, corr_type=["XX", "XY", "YX", "YY"] ) ], fields=fields, ) return create_central_node_sdp_config( subarray_id=subarray_id, execution_block=execution_block, pdm_config=sbd )
[docs] def create_sdpconfiguration_subarraynode( scan_definition: ScanDefinition, target: Target ) -> cdm_subarraynode_SDPConfiguration: """ Convert a PDM Scan Definition to an SDP Configuration aspect of a TMC SubArrayNode.Configure call if no PDM SDPConfiguration is present """ if not isinstance(scan_definition, ScanDefinition): raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}") if target.pointing_pattern.active == PointingKind.FIVE_POINT: scan_type = FIVE_POINT_SCANTYPE_PREFIX + scan_definition.scan_definition_id else: scan_type = scan_definition.scan_definition_id if is_ska_mid_environment(): interface = SDP_CONFIGURE_INTERFACE_MID else: interface = SDP_CONFIGURE_INTERFACE_LOW return cdm_subarraynode_SDPConfiguration(interface=interface, scan_type=scan_type)
def _total_scan_duration(sbd: SBDefinition) -> Quantity: if sbd.telescope == TelescopeType.SKA_MID: return sum(scan.scan_duration for scan in sbd.dish_allocations.scan_sequence) return sum( scan.scan_duration for scan in sbd.mccs_allocation.subarray_beams[0].scan_sequence )