"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import copy
import logging
import math
from datetime import timedelta
from functools import partial
from itertools import chain
from typing import List, Optional, Union
import ska_oso_pdm.sb_definition as pdm_sbd
import ska_oso_pdm.sb_definition.csp as pdm_csp
import ska_tmc_cdm.messages.subarray_node.configure.csp as cdm
from ska_oso_pdm.sb_definition.dish.dish_configuration import ReceiverBand
from ska_tmc_cdm.messages.central_node.csp import PSTConfiguration
from ska_tmc_cdm.messages.subarray_node.configure.mccs import SubarrayBeamLogicalBands
from ska_tmc_cdm.messages.subarray_node.configure.pst import (
PSTBeamConfiguration,
PSTScanConfiguration,
PSTScanCoordinates,
)
from ska_oso_scripting.functions.pdm_transforms.common import convert_target
from ska_oso_scripting.functions.pdm_transforms.dish import convert_frequency_band
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_cspconfiguration",
"_calculate_logical_bands",
]
CSP_SCHEMA_2_0_URL = "https://schema.skao.int/ska-csp-configure/2.0"
CSP_SCHEMA_LOW_3_2_URL = "https://schema.skao.int/ska-low-csp-configure/3.2"
# Channels that are used to observe PST ('station' or 'SPS' channels) have a width of 0.78125 MHz.
STATION_CHANNEL_WIDTH = 0.78125
def convert_csp_id(pdm_val: Optional[str]) -> Optional[str]:
"""
Converts a PDM CSP ID to a CDM CSP ID.
Currently, both input and output type are str but this could change.
:param pdm_val: PDM CSP ID to convert
:return: CDM CSP ID
"""
if pdm_val is None:
return None
return str(pdm_val)
[docs]
def convert_cspconfiguration(
pdm_config: pdm_csp.CSPConfiguration,
pdm_mccs_allocation: pdm_sbd.MCCSAllocation,
target: Union[pdm_sbd.Target, None] = None,
receiver_band: Union[ReceiverBand, cdm.core.ReceiverBand, None] = None,
) -> cdm.CSPConfiguration:
"""
Converts the relevant parts of the SBDefinition into a CDM CSPConfiguration.
:param pdm_config: The PDM CSPConfiguration to convert
:param pdm_mccs_allocation: The PDM MCCSAllocation that is also required for the conversion
:param target: The PDM target that is used for tied array observations (support for LOW only)
:param receiver_band: PDM receiver band to set for this configuration
:return: the equivalent CDM configuration
"""
if not isinstance(pdm_config, pdm_csp.CSPConfiguration):
raise TypeError(f"Expected PDM CSPConfiguration, got {type(pdm_config)}")
if pdm_config.subarray is not None and receiver_band is None:
raise ValueError(
"A valid ReceiverBand must be specified for a MID CSP Configuration"
)
if pdm_config.subarray is not None:
interface_url = CSP_SCHEMA_2_0_URL
else:
interface_url = CSP_SCHEMA_LOW_3_2_URL
common_configuration = convert_commonconfiguration(
pdm_config.common, pdm_config.config_id, receiver_band
)
subarray_config = convert_subarrayconfiguration(pdm_config.subarray)
cbf_configuration = convert_cbfconfiguration(pdm_config.cbf)
lowcbf_configuration = convert_lowcbfconfiguration(
pdm_config.lowcbf, pdm_mccs_allocation, target
)
pss_config = convert_pss_config(pdm_config.pss)
pst_config = convert_pst_config(pdm_config, target)
return cdm.CSPConfiguration(
interface=interface_url,
subarray=subarray_config,
common=common_configuration,
cbf_config=cbf_configuration,
lowcbf=lowcbf_configuration,
pst_config=pst_config,
pss_config=pss_config,
)
# Pulsar Search
def convert_pss_config(
pdm_val: Optional[pdm_csp.PSSConfiguration],
) -> Optional[cdm.PSSConfiguration]:
"""
Convert a PDM PSS configuration to its CDM equivalent.
PSS configurations have not been defined yet so this function will always
fail unless the incoming configuration is also undefined.
:param pdm_val: PDM PSS configuration to convert
:return: CDM PSS configuration
"""
if pdm_val is not None:
raise NotImplementedError("Cannot convert PSS configurations")
return None
PSTScanConfigurationFactory = partial(
PSTScanConfiguration,
activation_time="2024-02-15T23:07:45Z",
bits_per_sample=32,
num_of_polarizations=2,
udp_nsamp=32,
wt_nsamp=32,
udp_nchan=24,
num_frequency_channels=20736,
observation_mode="VOLTAGE_RECORDER",
observer_id="jdoe",
project_id="project1",
pointing_id="pointing1",
itrf=[5109360.133, 2006852.586, -3238948.127],
receiver_id="LOW",
feed_polarization="LIN",
feed_handedness=1,
feed_angle=0.0,
feed_tracking_mode="FA",
feed_position_angle=0.0,
oversampling_ratio=[4, 3],
max_scan_length=10000.0,
subint_duration=30.0,
receptors=["S8-1", "S8-6", "S9-2", "S9-5", "S10-3", "S10-6"],
receptor_weights=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
num_channelization_stages=2,
channelization_stages=[
{
"num_filter_taps": 1,
"filter_coefficients": [1.0],
"num_frequency_channels": 1024,
"oversampling_ratio": [32, 27],
},
{
"num_filter_taps": 1,
"filter_coefficients": [1.0],
"num_frequency_channels": 256,
"oversampling_ratio": [4, 3],
},
],
)
# Pulsar Timing
def convert_pst_config(
pdm_val: pdm_csp.PSTConfiguration,
pdm_targets: pdm_sbd.Target,
) -> Optional[cdm.PSTConfiguration]:
"""
Convert a PDM PST configuration to its CDM equivalent.
PST configurations have not been defined yet so this function will always
fail unless the incoming configuration is also undefined.
:param pdm_val: PDM PST configuration to convert
:param pdm_targets: PST targets to convert
:return: CDM PST configuration
"""
if pdm_val.lowcbf:
spws = pdm_val.lowcbf.correlation_spws
else:
return None
if len(spws) > 1:
raise ValueError(
"Multiple spectral windows cannot be defined for tied array observations"
)
centre_frequency = spws[0].centre_frequency
total_bandwidth = spws[0].number_of_channels * STATION_CHANNEL_WIDTH
pst_scan_configs = [
PSTBeamConfiguration(
beam_id=pst_beam.beam_id,
scan=PSTScanConfigurationFactory(
source=pst_beam.beam_coordinate.target_id,
coordinates=PSTScanCoordinates(
ra=pst_beam.beam_coordinate.ra_str,
dec=pst_beam.beam_coordinate.dec_str,
),
total_bandwidth=total_bandwidth,
centre_frequency=centre_frequency,
),
)
for pst_beam in pdm_targets.tied_array_beams.pst_beams
]
# Return None if no beams were created
return cdm.PSTConfiguration(beams=pst_scan_configs) if pst_scan_configs else None
def convert_pstconfiguration_centralnode(
pdm_targets: List[pdm_sbd.Target],
) -> Union[PSTConfiguration, None]:
"""
Converts the relevant parts of the SBDefinition to a CDM PSTConfiguration for assigning resources, if
applicable
"""
if not pdm_targets:
return None
pst_beam_ids = set()
for target in pdm_targets:
if target.tied_array_beams:
pst_beam_ids.update(
[beams.beam_id for beams in target.tied_array_beams.pst_beams if beams]
)
if not pst_beam_ids:
return None
return PSTConfiguration(
pst_beam_ids=list(pst_beam_ids),
)
def convert_subarrayconfiguration(
pdm_val: Optional[pdm_csp.SubarrayConfiguration],
) -> Optional[cdm.SubarrayConfiguration]:
"""
Convert a PDM SubarrayConfiguration to a CDM SubarrayConfiguration.
:param pdm_val: PDM entity to convert
:return: equivalent CDM SubarrayConfiguration
"""
if pdm_val is None:
return None
if not isinstance(pdm_val, pdm_csp.SubarrayConfiguration):
raise TypeError(f"Expected PDM SubarrayConfiguration, got {type(pdm_val)}")
return cdm.SubarrayConfiguration(subarray_name=pdm_val.subarray_name)
# Correlator Beamformer
def convert_cbfconfiguration(
pdm_val: Optional[pdm_csp.CBFConfiguration],
) -> Optional[cdm.CBFConfiguration]:
"""
Convert a PDM CBFConfiguration to a CDM CBFConfiguration
:param pdm_val: PDM entity to convert
:return: equivalent CDM CBFConfiguration
"""
if pdm_val is None:
return None
if not isinstance(pdm_val, pdm_csp.CBFConfiguration):
raise TypeError(f"Expected PDM CBFConfiguration, got {type(pdm_val)}")
if pdm_val.vlbi is not None:
raise NotImplementedError("Cannot convert CBF configurations for VLBI")
fsp_configs = [convert_fspconfiguration(fsp) for fsp in pdm_val.fsps]
return cdm.CBFConfiguration(fsp_configs=fsp_configs, vlbi_config=None)
def convert_lowcbfconfiguration(
pdm_lowcbf: Optional[pdm_csp.LowCBFConfiguration],
pdm_mccs: pdm_sbd.MCCSAllocation,
pdm_target: Optional[pdm_sbd.Target] = None,
) -> Optional[cdm.LowCBFConfiguration]:
"""
Converts the relevant parts of the SBDefinition into a CDM LowCBFConfiguration
:param pdm_lowcbf: PDM LowCBFConfiguration to convert
:param pdm_mccs: PDM MCCSAllocation to be used to convert the lowCBF configuration
:param pdm_target: PDM Target to be used to convert the lowCBF configuration
:return: equivalent CDM LowCBFConfiguration
"""
if pdm_lowcbf is None:
return None
if not isinstance(pdm_lowcbf, pdm_csp.LowCBFConfiguration):
raise TypeError(f"Expected PDM LowCBFConfiguration, got {type(pdm_lowcbf)}")
# The PDM model doesn't currently have the subarray_beam info in the scan definition.
# It is only stored in the mccs_allocation.
# For AA0.5/AA1 there will only be one subarray beam, and the scan definition
# implicitly is linked to that.
subarray_beam_id = pdm_mccs.subarray_beams[0].subarray_beam_id
lowcbf = cdm.LowCBFConfiguration(
vis=_convert_vis_configuration(pdm_lowcbf, pdm_mccs, subarray_beam_id),
stations=_convert_station_configuration(pdm_lowcbf, pdm_mccs, subarray_beam_id),
timing_beams=_convert_timing_beams_configuration(
pdm_target=pdm_target, pdm_mccs=pdm_mccs, subarray_beam_id=subarray_beam_id
),
)
return lowcbf
# Frequency Slice Processor
def convert_fspconfiguration(
pdm_val: pdm_csp.FSPConfiguration,
) -> cdm.FSPConfiguration:
"""
Convert a PDM FSPConfiguration to the equivalent CDM FSPConfiguration.
:param pdm_val: CDM FSPConfiguration to convert
:return: equivalent CDM FSPConfiguration
"""
if not isinstance(pdm_val, pdm_csp.FSPConfiguration):
raise TypeError(f"Expected PDM FSPConfiguration, got {type(pdm_val)}")
return cdm.FSPConfiguration(
fsp_id=pdm_val.fsp_id,
function_mode=cdm.FSPFunctionMode[pdm_val.function_mode.value],
frequency_slice_id=pdm_val.frequency_slice_id,
integration_factor=pdm_val.integration_factor,
zoom_factor=pdm_val.zoom_factor,
channel_averaging_map=copy.deepcopy(pdm_val.channel_averaging_map),
output_link_map=copy.deepcopy(pdm_val.output_link_map),
channel_offset=pdm_val.channel_offset,
zoom_window_tuning=pdm_val.zoom_window_tuning,
)
def convert_commonconfiguration(
pdm_val: Optional[pdm_csp.CommonConfiguration],
csp_id: pdm_csp.CSPConfigurationID,
receiver_band: Optional[Union[ReceiverBand, cdm.core.ReceiverBand]],
) -> Optional[cdm.CommonConfiguration]:
"""
Convert a PDM CommonConfiguration to a CDM CommonConfiguration
receiver_band can be provided as either a PDM or CDM receiver band.
:param pdm_val: PDM entity to convert
:param csp_id: CSP configuration ID
:param receiver_band: receiver band to set for this configuration
:return: equivalent CDM CommonConfiguration
"""
if pdm_val is None:
return None
if not isinstance(pdm_val, pdm_csp.CommonConfiguration):
raise TypeError(f"Expected PDM CommonConfiguration, got {type(pdm_val)}")
if receiver_band is None:
return cdm.CommonConfiguration(
config_id=convert_csp_id(csp_id),
)
else:
if isinstance(receiver_band, cdm.core.ReceiverBand):
frequency_band = receiver_band
else:
frequency_band = convert_frequency_band(receiver_band)
# band_5_tuning is mandatory in PDM but optional in CDM. If empty in PDM, we
# strip it from the CDM for more compact JSON.
band_5_tuning = pdm_val.band_5_tuning if len(pdm_val.band_5_tuning) > 0 else None
return cdm.CommonConfiguration(
config_id=convert_csp_id(csp_id),
frequency_band=frequency_band,
subarray_id=pdm_val.subarray_id,
band_5_tuning=band_5_tuning,
)
def _calculate_n_fsp(number_of_channels: int, n_apertures: int, mode: str) -> int:
"""
Calculate the number of fsp IDs
:param number_of_channels: number of channels
:param n_apertures: number of apertures
:param mode: firmware mode
:return: calculated n_fsp for vis or pst
"""
if mode == "vis":
factor = 1.0
elif mode == "pst":
factor = 3.0
else:
raise NotImplementedError(
f"fsp mode {mode} is not supported. Only 'vis' and 'pst' are supported at this time"
)
return int(math.ceil((number_of_channels * n_apertures) / (1024 / factor)))
def _convert_vis_configuration(
pdm_lowcbf: Optional[pdm_csp.LowCBFConfiguration],
pdm_mccs: Optional[pdm_sbd.MCCSAllocation],
subarray_beam_id: int,
) -> cdm.VisConfiguration:
"""
Converts the relevant SBDefinition parts to a CDM VisConfiguration
"""
if pdm_lowcbf is None:
return None
if not isinstance(pdm_lowcbf, pdm_csp.LowCBFConfiguration):
raise TypeError(f"Expected PDM VisConfiguration, got {type(pdm_lowcbf)}")
if pdm_mccs and isinstance(pdm_mccs, pdm_sbd.MCCSAllocation):
fsp_ids = [
_calculate_n_fsp(
subarray_beams.number_of_channels, len(subarray_beams.apertures), "vis"
)
for subarray_beams in pdm_mccs.subarray_beams
]
else:
fsp_ids = [
logical_fsp_id
for correlation_spw in pdm_lowcbf.correlation_spws
if correlation_spw.zoom_factor == 0
for logical_fsp_id in correlation_spw.logical_fsp_ids
]
# For lowcbf.vis firmware can be hardcoded as vis
fsp = cdm.VisFspConfiguration(firmware="vis", fsp_ids=fsp_ids)
stn_beams = [
cdm.VisStnBeamConfiguration(
stn_beam_id=subarray_beam_id,
integration_ms=correlation_spw.integration_time_ms
/ timedelta(milliseconds=1.0),
)
for correlation_spw in pdm_lowcbf.correlation_spws
]
return cdm.VisConfiguration(fsp=fsp, stn_beams=stn_beams)
def _convert_station_configuration(
pdm_lowcbf: Optional[pdm_csp.LowCBFConfiguration],
pdm_mccs: pdm_sbd.MCCSAllocation,
subarray_beam_id: int,
) -> Optional[cdm.StationConfiguration]:
"""
Converts the relevant parts of the SBDefinition to a CDM StationConfiguration
"""
# Currently only one subarray_beam is supported
stns = [
[aperture.station_id, aperture.substation_id]
for aperture in pdm_mccs.subarray_beams[0].apertures
]
logical_bands = _calculate_logical_bands(pdm_lowcbf)
freq_ids = list(
chain(
*[
range(
logical_band.start_channel,
logical_band.start_channel + logical_band.number_of_channels,
)
for logical_band in logical_bands
]
)
)
stn_beams = [cdm.StnBeamConfiguration(beam_id=subarray_beam_id, freq_ids=freq_ids)]
return cdm.StationConfiguration(stns=stns, stn_beams=stn_beams)
def _convert_timing_beams_configuration(
pdm_target: Optional[pdm_sbd.Target] = None,
pdm_mccs: Optional[pdm_sbd.MCCSAllocation] = None,
subarray_beam_id: int = 1,
) -> Optional[cdm.TimingBeamsConfiguration]:
"""
Converts the relevant parts of the SBDefinition to a CDM TimingBeamsConfiguration
"""
if not pdm_target or not pdm_target.tied_array_beams.pst_beams:
return None
if pdm_mccs and isinstance(pdm_mccs, pdm_sbd.MCCSAllocation):
fsp_ids = [
_calculate_n_fsp(
subarray_beams.number_of_channels, len(subarray_beams.apertures), "pst"
)
for subarray_beams in pdm_mccs.subarray_beams
]
else:
fsp_ids = [1]
fsp = cdm.VisFspConfiguration(firmware="pst", fsp_ids=fsp_ids)
beams_configurations = []
for beam in pdm_target.tied_array_beams.pst_beams:
field = convert_target(beam)
beams_config = cdm.BeamsConfiguration(
pst_beam_id=beam.beam_id,
stn_beam_id=subarray_beam_id,
stn_weights=beam.stn_weights,
field=field,
)
beams_configurations.append(beams_config)
return cdm.TimingBeamsConfiguration(fsp=fsp, beams=beams_configurations)
def _calculate_logical_bands(
pdm_lowcbf: Optional[pdm_csp.LowCBFConfiguration],
) -> list[SubarrayBeamLogicalBands]:
return [
SubarrayBeamLogicalBands(
# This should always come out as an integer and this should be validated when creating the SBDefinition.
# We just cast here to keep Python happy.
start_channel=int(
correlation_spw.centre_frequency / 781.25e3
- correlation_spw.number_of_channels / 2
+ 0.5
),
number_of_channels=correlation_spw.number_of_channels,
)
for correlation_spw in pdm_lowcbf.correlation_spws
if correlation_spw.zoom_factor == 0
]