"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import copy
import logging
import math
import os
from datetime import timedelta
from functools import partial
from itertools import accumulate, chain
from typing import Final, Iterable, List, Optional, Union
import ska_oso_pdm.sb_definition as pdm_sbd
import ska_tmc_cdm.messages.subarray_node.configure.csp as cdm
from astropy import units as u
from ska_oso_pdm import TelescopeType
from ska_oso_pdm.sb_definition import (
EquatorialCoordinatesPST,
GalacticCoordinates,
ICRSCoordinates,
)
from ska_oso_pdm.sb_definition.csp.midcbf import CorrelationSPWConfiguration
from ska_tmc_cdm.messages.central_node.csp import PSTConfiguration
from ska_tmc_cdm.messages.subarray_node.configure.mccs import SubarrayBeamLogicalBands
from ska_tmc_cdm.messages.subarray_node.configure.pst import (
PSTBeamConfiguration,
PSTScanConfiguration,
PSTScanCoordinates,
)
from ska_oso_scripting import WORKAROUNDS
from ska_oso_scripting.functions.pdm_transforms.common import (
convert_target,
create_resources,
)
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_cspconfiguration",
"_calculate_logical_bands",
"LOW_CHANNEL_WIDTH",
"MID_CHANNEL_WIDTH",
]
# ska-telmodel schemas compatible with this data model
CSP_LOW_SCHEMA_URL = "https://schema.skao.int/ska-low-csp-configure/3.2"
CSP_MID_SCHEMA_URL = "https://schema.skao.int/ska-csp-configurescan/4.1"
# Channels that are used to observe PST ('station' or 'SPS' channels) have a width of 0.78125 MHz.
LOW_CHANNEL_WIDTH = u.Quantity(781.25, u.kHz)
MID_CHANNEL_WIDTH = u.Quantity(13440.0, u.Hz)
# TODO: The constants below are prime candidates for inclusion in OSD.
NUM_FSPS = 26
class ExternalConstants:
"""
CSP constants taken directly from CSP source code. We copy them here as we
don't want to introduce CSP control system code as a dependency but still
need access to their values.
"""
# Common sample rate for all receptor data streams, achieved after
# Resampling & Delay Tracking (RDT) [Hz]; applies for all function
# modes except VLBI
COMMON_SAMPLE_RATE: Final[int] = 220200960
VCC_OVERSAMPLING_FACTOR: Final[float] = 10 / 9
# Frequency Slice Bandwidth [Hz]
FS_BW: Final[int] = int(COMMON_SAMPLE_RATE / VCC_OVERSAMPLING_FACTOR)
HALF_FS_BW: Final[int] = FS_BW // 2
def convert_csp_id(pdm_val: Optional[str]) -> Optional[str]:
"""
Converts a PDM CSP ID to a CDM CSP ID.
Currently, both input and output type are str but this could change.
:param pdm_val: PDM CSP ID to convert
:return: CDM CSP ID
"""
if pdm_val is None:
return None
return str(pdm_val)
[docs]
def convert_cspconfiguration(
telescope: TelescopeType,
pdm_config: pdm_sbd.csp.CSPConfiguration,
pdm_mccs_allocation: Union[pdm_sbd.MCCSAllocation, None] = None,
target: Optional[pdm_sbd.Target] = None,
) -> cdm.CSPConfiguration:
"""
Converts the relevant parts of the SBDefinition into a CDM CSPConfiguration.
:param telescope: telescope type, either SKA_MID or SKA_LOW
:param pdm_config: The PDM CSPConfiguration to convert
:param pdm_mccs_allocation: The PDM MCCSAllocation that is also required for the conversion
:param target: The PDM target that is used for tied array observations (support for LOW only)
:return: the equivalent CDM configuration
"""
if not isinstance(pdm_config, pdm_sbd.csp.CSPConfiguration):
raise TypeError(f"Expected PDM CSPConfiguration, got {type(pdm_config)}")
if pdm_config.lowcbf is not None and pdm_mccs_allocation is None:
raise ValueError(
"A valid PDM MCCSAllocation must be specified for a LOW CSP Configuration"
)
if pdm_config.midcbf is not None and pdm_mccs_allocation is not None:
raise ValueError("Cannot specify an MCCSAllocation for a MID CSP Configuration")
match telescope:
# TMC-Mid behaviour when LOW-CBF config is specified is undefined, as
# is TMC-Low behaviour when MID-CBF config is given. So, be sure to
# only provide the CBF configuration that matches the telescope.
case TelescopeType.SKA_MID:
interface_url = CSP_MID_SCHEMA_URL
midcbf_configuration = convert_midcbfconfiguration(pdm_config.midcbf)
lowcbf_configuration = None
case TelescopeType.SKA_LOW:
interface_url = CSP_LOW_SCHEMA_URL
midcbf_configuration = None
lowcbf_configuration = convert_lowcbfconfiguration(
pdm_config.lowcbf, pdm_mccs_allocation, target
)
case _:
raise ValueError(f"Telescope type not recognised: {telescope}")
common_configuration = convert_commonconfiguration(pdm_config)
pss_config = create_pss_config(pdm_config.pss)
pst_config = create_pst_config(pdm_config, target, pdm_mccs_allocation)
return cdm.CSPConfiguration(
interface=interface_url,
common=common_configuration,
midcbf=midcbf_configuration,
lowcbf=lowcbf_configuration,
pst_config=pst_config,
pss_config=pss_config,
)
# Pulsar Search
def create_pss_config(
pdm_val: Optional[pdm_sbd.csp.PSSConfiguration],
) -> Optional[cdm.PSSConfiguration]:
"""
Convert a PDM PSS configuration to its CDM equivalent.
PSS configurations have not been defined yet so this function will always
fail unless the incoming configuration is also undefined.
:param pdm_val: PDM PSS configuration to convert
:return: CDM PSS configuration
"""
if pdm_val is not None:
raise NotImplementedError("Cannot convert PSS configurations")
return None
PSTScanConfigurationFactory = partial(
PSTScanConfiguration,
activation_time="2024-02-15T23:07:45Z",
bits_per_sample=32,
num_of_polarizations=2,
udp_nsamp=32,
wt_nsamp=32,
udp_nchan=24,
num_frequency_channels=20736,
observation_mode="VOLTAGE_RECORDER",
observer_id="jdoe",
project_id="project1",
pointing_id="pointing1",
itrf=[5109360.133, 2006852.586, -3238948.127],
receiver_id="LOW",
feed_polarization="LIN",
feed_handedness=1,
feed_angle=0.0,
feed_tracking_mode="FA",
feed_position_angle=0.0,
oversampling_ratio=[4, 3],
max_scan_length=10000.0,
subint_duration=30.0,
num_channelization_stages=2,
channelization_stages=[
{
"num_filter_taps": 1,
"filter_coefficients": [1.0],
"num_frequency_channels": 1024,
"oversampling_ratio": [32, 27],
},
{
"num_filter_taps": 1,
"filter_coefficients": [1.0],
"num_frequency_channels": 256,
"oversampling_ratio": [4, 3],
},
],
)
def convert_midcbfconfiguration(
pdm_val: pdm_sbd.csp.MidCBFConfiguration,
) -> Optional[cdm.MidCBFConfiguration]:
"""
Convert a PDM MidCBFConfiguration to a CDM MidCBFConfiguration.
:param pdm_val: PDM entity to convert
:return: equivalent CDM MidCBFConfiguration
"""
if pdm_val is None:
return None
if not isinstance(pdm_val, pdm_sbd.csp.MidCBFConfiguration):
raise TypeError(f"Expected PDM MidCBFConfiguration, got {type(pdm_val)}")
# wideband frequency shifts apply to all correlation spws in the subband
spws_and_offsets = [
(corr_spw, subband.frequency_slice_offset.to_value(u.Hz))
for subband in pdm_val.subbands
for corr_spw in subband.correlation_spws
]
sdp_start_channel_ids = list(
accumulate(
[corr_spw.number_of_channels for corr_spw, _ in spws_and_offsets], initial=0
)
)[:-1]
# We should not configure arbitrary FSPs, but only those allocated to this
# subarray. This information of available/allocated FSPs should come from
# TMC CentralNode or the OSO Scheduler, probably passed in as a script
# argument. For AA0.5 and single subarrays, it is sufficient to pretend we
# have access to all FSPs.
#
# FSP IDs are 1-based
assigned_fsp_ids = list(range(1, NUM_FSPS + 1))
available_fsps = frozenset(assigned_fsp_ids)
processing_regions = []
for (corr_spw, wideband_shift), sdp_start_channel in zip(
spws_and_offsets, sdp_start_channel_ids
):
cpr = convert_correlation_spw_to_processing_region(
corr_spw, sdp_start_channel, available_fsps, wideband_shift
)
processing_regions.append(cpr)
# subtract allocated FSPs to give the list of FSPs remaining
available_fsps = available_fsps.difference(cpr.fsp_ids)
frequency_slice_offsets = [
subband.frequency_slice_offset.to_value(u.Hz) for subband in pdm_val.subbands
]
if WORKAROUNDS.exclude_frequency_band_offsets:
return cdm.MidCBFConfiguration(
frequency_band=pdm_val.frequency_band,
correlation=cdm.CorrelationConfiguration(
processing_regions=processing_regions
),
)
else:
# assume frequency_band_offset_stream1 always present
return cdm.MidCBFConfiguration(
frequency_band=pdm_val.frequency_band,
correlation=cdm.CorrelationConfiguration(
processing_regions=processing_regions
),
frequency_band_offset_stream1=int(frequency_slice_offsets[0]),
frequency_band_offset_stream2=int(frequency_slice_offsets[1])
if len(frequency_slice_offsets) > 1
else None,
)
def convert_correlation_spw_to_processing_region(
pdm_val: CorrelationSPWConfiguration,
sdp_start_channel_id: int,
available_fsps: list[int],
wideband_offset: float,
) -> cdm.ProcessingRegionConfiguration:
"""
Convert a PDM CorrelationSPWConfiguration to a CDM ProcessingRegionConfiguration
NotImplementedError will be raised if zoom mode is requested, due to
limitations in the algorithm that calculates the number of FSPs required.
:param pdm_val: PDM entity to convert
:param sdp_start_channel_id: SDP start channel ID
:param available_fsps: lst of FSP IDs available for configuration
:param wideband_offset: wideband frequency shift in Hz
:return: equivalent CDM ProcessingRegionConfiguration
"""
if not isinstance(pdm_val, CorrelationSPWConfiguration):
raise TypeError(
f"Expected PDM CorrelationSPWConfiguration, got {type(pdm_val)}"
)
channel_width = int(MID_CHANNEL_WIDTH.to(u.Hz).value) / 2**pdm_val.zoom_factor
# start frequency for frequency slicer #1 is the *centre* frequency of the
# first fine channel.
start_freq = (
pdm_val.centre_frequency - channel_width * (pdm_val.number_of_channels - 1) / 2
)
# ... plus the CSP schema states that start_freq must be an *integer*, but
# makes no statement on whether to floor, round, banker's round, etc. We
# choose to round to the nearest int using floor().
start_freq = round(start_freq)
# The SBD has an attribute for logical FSP IDs. However, at the time of
# writing it is preferred that we ignore those values and calculate the
# numnber required again in scripting via a call to
# _calculate_fsps_required
#
# See https://jira.skatelescope.org/browse/BTN-2551?focusedId=371497&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-371497
#
num_fsps = _calculate_fsps_required(
centre_freq=pdm_val.centre_frequency,
num_channels=pdm_val.number_of_channels,
zoom_factor=pdm_val.zoom_factor,
wideband_offset=wideband_offset,
)
# assign FSPs from the list of FSPs still available
physical_fsp_ids = _consume_fsps(num_fsps, available_fsps)
processing_region = cdm.ProcessingRegionConfiguration(
fsp_ids=physical_fsp_ids,
receptors=pdm_val.receptors,
start_freq=start_freq,
channel_width=channel_width,
channel_count=pdm_val.number_of_channels,
integration_factor=pdm_val.time_integration_factor,
sdp_start_channel_id=sdp_start_channel_id,
)
return processing_region
def _consume_fsps(num_fsps: int, available_fsp_ids: Iterable[int]) -> list[int]:
"""
Take/consume a number of FSP IDs from a list of available FSP IDs.
This function will consume FSPs in numerical order, so if the list of
available FSP IDs is [5,7,1,9] and 2 FSPs are requested, FSPs 1 and 5
will be selected.
The input argument is not mutated.
@param num_fsps: number of FSP IDs to consume
@param available_fsp_ids: iterable of FSP IDs available for configuration
@raises ValueError: if insufficient FSP IDs are available
"""
if len(available_fsp_ids) < num_fsps:
raise ValueError(
f"Not enough FSPs available: {num_fsps} requested, {len(available_fsp_ids)} available"
)
consumed = sorted(available_fsp_ids)[:num_fsps]
return consumed
def _calculate_fsps_required(
centre_freq: float, num_channels: int, zoom_factor: int, wideband_offset: float
) -> int:
"""
Calculate the number of SKA-Mid FSPs required to observe a non-zoom mode
correlation spectral window.
@param centre_freq: correlator spw centre frequency in Hz
@param num_channels: number of channels in correlated spectral window
@param zoom_factor: zoom factor
@param wideband_offset: frequency slicer start frequency in Hz
@raises NotImplementedError if zoom_factor != 0
"""
if zoom_factor != 0:
raise NotImplementedError(
"Algorithm for calculating number of required FSPs only works for non-zoom mode"
)
channel_bw = int(MID_CHANNEL_WIDTH.to(u.Hz).value) / 2**zoom_factor
half_spw_bw = channel_bw * num_channels / 2
spw_min = centre_freq - half_spw_bw
spw_max = centre_freq + half_spw_bw
# The following is taken directly from CSP code, which uses this algorithm
# to calculate the number of coarse channels required and compares that it
# equals the number of FSPs requested. So, if we likewise calculate how
# many coarse channels are expected, we know how many FSPs are required.
#
# See ska_mid_cbf_mcs.commons.global_enum.get_coarse_channels()
coarse_channel_low = math.floor(
(spw_min - wideband_offset + ExternalConstants.HALF_FS_BW)
/ ExternalConstants.FS_BW
)
coarse_channel_high = math.floor(
(spw_max - wideband_offset + ExternalConstants.HALF_FS_BW)
/ ExternalConstants.FS_BW
)
return coarse_channel_high - coarse_channel_low + 1
# Pulsar Timing
def create_pst_config(
pdm_val: pdm_sbd.csp.CSPConfiguration,
pdm_targets: pdm_sbd.Target,
pdm_mccs: Optional[pdm_sbd.MCCSAllocation] = None,
) -> Optional[cdm.PSTConfiguration]:
"""
creates a PST configuration from the CSP Configuration and Target if PST is requested,
otherwise returns None
:param pdm_val: PDM CSP configuration
:param pdm_targets: PDM Targets that include tied array PST targets
:return: CDM PST configuration
"""
if not pdm_val.lowcbf:
return None
elif pdm_val.lowcbf.do_pst and not pdm_targets.tied_array_beams.pst_beams:
raise ValueError("Expected PDM Target with PST TiedArrayBeam information")
spws = pdm_val.lowcbf.correlation_spws
if pdm_targets.tied_array_beams:
pst_beams = pdm_targets.tied_array_beams.pst_beams
else:
return None
if len(spws) > 1 and len(pst_beams) > 0:
raise ValueError(
"Multiple spectral windows cannot be defined for tied array observations"
)
centre_frequency = spws[0].centre_frequency
total_bandwidth = spws[0].number_of_channels * LOW_CHANNEL_WIDTH.to(u.Hz).value
# this next bit is not pretty - and assumes that all PST Beams with be of the same CoordinateKind
if not pdm_targets.tied_array_beams.pst_beams:
return None
else:
receptors = create_resources(allocation=pdm_mccs)
receptor_weights = [1.0] * len(receptors) if receptors else []
match pdm_targets.tied_array_beams.pst_beams[0].beam_coordinate:
case ICRSCoordinates():
pst_scan_configs = [
PSTBeamConfiguration(
beam_id=pst_beam.beam_id,
scan=PSTScanConfigurationFactory(
source=pst_beam.beam_name,
coordinates=PSTScanCoordinates(
ra=pst_beam.beam_coordinate.ra_str,
dec=pst_beam.beam_coordinate.dec_str,
),
receptors=receptors,
receptor_weights=receptor_weights,
total_bandwidth=total_bandwidth,
centre_frequency=centre_frequency,
),
)
for pst_beam in pdm_targets.tied_array_beams.pst_beams
]
case EquatorialCoordinatesPST():
pst_scan_configs = [
PSTBeamConfiguration(
beam_id=pst_beam.beam_id,
scan=PSTScanConfigurationFactory(
source=pst_beam.beam_coordinate.target_id,
coordinates=PSTScanCoordinates(
ra=pst_beam.beam_coordinate.ra_str,
dec=pst_beam.beam_coordinate.dec_str,
),
receptors=receptors,
receptor_weights=receptor_weights,
total_bandwidth=total_bandwidth,
centre_frequency=centre_frequency,
),
)
for pst_beam in pdm_targets.tied_array_beams.pst_beams
]
case GalacticCoordinates():
pst_scan_configs = []
for pst_beam in pdm_targets.tied_array_beams.pst_beams:
ra_str = pst_beam.beam_coordinate.to_sky_coord().icrs.ra.to_string(
u.hour, pad=True, sep=":"
)
dec_str = (
pst_beam.beam_coordinate.to_sky_coord().icrs.dec.to_string(
pad=True, sep=":"
)
)
pst_scan_configs.append(
PSTBeamConfiguration(
beam_id=pst_beam.beam_id,
scan=PSTScanConfigurationFactory(
source=pst_beam.beam_name,
coordinates=PSTScanCoordinates(
ra=ra_str,
dec=dec_str,
),
receptors=receptors,
receptor_weights=receptor_weights,
total_bandwidth=total_bandwidth,
centre_frequency=centre_frequency,
),
)
)
case _:
raise ValueError(
f"{pdm_targets.tied_array_beams.pst_beams[0].beam_coordinate} is not supported at this time"
)
# Return None if no beams were created
return cdm.PSTConfiguration(beams=pst_scan_configs) if pst_scan_configs else None
def convert_pstconfiguration_centralnode(
pdm_targets: List[pdm_sbd.Target],
) -> Optional[PSTConfiguration]:
"""
Converts the relevant parts of the SBDefinition to a CDM PSTConfiguration for assigning resources, if
applicable
"""
if not pdm_targets:
return None
pst_beam_ids = set()
for target in pdm_targets:
if target.tied_array_beams:
pst_beam_ids.update(
[beams.beam_id for beams in target.tied_array_beams.pst_beams if beams]
)
if not pst_beam_ids:
return None
return PSTConfiguration(
pst_beam_ids=list(pst_beam_ids),
)
# Correlator Beamformer
def convert_cbfconfiguration(
pdm_val: Optional[pdm_sbd.csp.CBFConfiguration],
) -> Optional[cdm.CBFConfiguration]:
"""
Convert a PDM CBFConfiguration to a CDM CBFConfiguration
:param pdm_val: PDM entity to convert
:return: equivalent CDM CBFConfiguration
"""
if pdm_val is None:
return None
if not isinstance(pdm_val, pdm_sbd.csp.CBFConfiguration):
raise TypeError(f"Expected PDM CBFConfiguration, got {type(pdm_val)}")
if pdm_val.vlbi is not None:
raise NotImplementedError("Cannot convert CBF configurations for VLBI")
fsp_configs = [convert_fspconfiguration(fsp) for fsp in pdm_val.fsps]
return cdm.CBFConfiguration(fsp_configs=fsp_configs, vlbi_config=None)
def convert_lowcbfconfiguration(
pdm_lowcbf: pdm_sbd.csp.LowCBFConfiguration,
pdm_mccs: pdm_sbd.MCCSAllocation,
pdm_target: Optional[pdm_sbd.Target] = None,
) -> cdm.LowCBFConfiguration:
"""
Converts the relevant parts of the SBDefinition into a CDM LowCBFConfiguration
:param pdm_lowcbf: PDM LowCBFConfiguration to convert
:param pdm_mccs: PDM MCCSAllocation to be used to convert the lowCBF configuration
:param pdm_target: PDM Target to be used to convert the lowCBF configuration
:return: equivalent CDM LowCBFConfiguration
"""
if not isinstance(pdm_lowcbf, pdm_sbd.csp.LowCBFConfiguration):
raise TypeError(f"Expected PDM LowCBFConfiguration, got {type(pdm_lowcbf)}")
# The PDM model doesn't currently have the subarray_beam info in the scan definition.
# It is only stored in the mccs_allocation.
# For AA0.5/AA1 there will only be one subarray beam, and the scan definition
# implicitly is linked to that.
subarray_beam_id = pdm_mccs.subarray_beams[0].subarray_beam_id
lowcbf = cdm.LowCBFConfiguration(
vis=_convert_vis_configuration(pdm_lowcbf, pdm_mccs, subarray_beam_id),
stations=_convert_station_configuration(pdm_lowcbf, pdm_mccs, subarray_beam_id),
timing_beams=_convert_timing_beams_configuration(
pdm_target=pdm_target,
pdm_mccs=pdm_mccs,
pdm_lowcbf=pdm_lowcbf,
subarray_beam_id=subarray_beam_id,
),
)
return lowcbf
# Frequency Slice Processor
def convert_fspconfiguration(
pdm_val: pdm_sbd.csp.FSPConfiguration,
) -> cdm.FSPConfiguration:
"""
Convert a PDM FSPConfiguration to the equivalent CDM FSPConfiguration.
:param pdm_val: CDM FSPConfiguration to convert
:return: equivalent CDM FSPConfiguration
"""
if not isinstance(pdm_val, pdm_sbd.csp.FSPConfiguration):
raise TypeError(f"Expected PDM FSPConfiguration, got {type(pdm_val)}")
return cdm.FSPConfiguration(
fsp_id=pdm_val.fsp_id,
function_mode=cdm.FSPFunctionMode[pdm_val.function_mode.value],
frequency_slice_id=pdm_val.frequency_slice_id,
integration_factor=pdm_val.integration_factor,
zoom_factor=pdm_val.zoom_factor,
channel_averaging_map=copy.deepcopy(pdm_val.channel_averaging_map),
output_link_map=copy.deepcopy(pdm_val.output_link_map),
channel_offset=pdm_val.channel_offset,
zoom_window_tuning=pdm_val.zoom_window_tuning,
)
def convert_commonconfiguration(
pdm_csp: Optional[pdm_sbd.csp.CSPConfiguration],
) -> Optional[cdm.CommonConfiguration]:
"""
Create CDM CommonConfiguration from a PDM CSPConfiguration
:param pdm_csp: PDM CSPConfiguration entity to create CommonConfiguration
:return: equivalent CDM CommonConfiguration
"""
if (eb_id := os.getenv("EB_ID")) is None:
raise RuntimeError("environment variable EB_ID is not set")
elif not isinstance(pdm_csp, pdm_sbd.csp.CSPConfiguration):
raise TypeError(f"Expected PDM CSPConfiguration, got {type(pdm_csp)}")
elif not pdm_csp.midcbf and not pdm_csp.lowcbf:
raise ValueError("not a valid CSPConfiguration")
if pdm_csp.midcbf:
# It is expected that is if the first subband has band_5_tuning, the others will too
if (
pdm_csp.midcbf.subbands
and pdm_csp.midcbf.subbands[0].band_5_tuning is not None
):
band_5_tuning = [
subband.band_5_tuning for subband in pdm_csp.midcbf.subbands
]
else:
band_5_tuning = None
return cdm.CommonConfiguration(
config_id=convert_csp_id(pdm_csp.config_id),
eb_id=eb_id,
frequency_band=cdm.core.ReceiverBand(pdm_csp.midcbf.frequency_band.value),
band_5_tuning=band_5_tuning,
)
else:
return cdm.CommonConfiguration(
config_id=convert_csp_id(pdm_csp.config_id), eb_id=eb_id
)
def _calculate_n_fsp(number_of_channels: int, n_apertures: int, mode: str) -> int:
"""
Calculate the number of fsp IDs
:param number_of_channels: number of channels
:param n_apertures: number of apertures
:param mode: firmware mode
:return: calculated n_fsp for vis or pst
"""
if mode == "vis":
factor = 1.0
elif mode == "pst":
factor = 3.0
else:
raise NotImplementedError(
f"fsp mode {mode} is not supported. Only 'vis' and 'pst' are supported at this time"
)
return int(math.ceil((number_of_channels * n_apertures) / (1024 / factor)))
def _convert_vis_configuration(
pdm_lowcbf: Optional[pdm_sbd.csp.LowCBFConfiguration],
pdm_mccs: Optional[pdm_sbd.MCCSAllocation],
subarray_beam_id: int,
) -> Optional[cdm.VisConfiguration]:
"""
Converts the relevant SBDefinition parts to a CDM VisConfiguration
"""
if pdm_lowcbf is None:
return None
if not isinstance(pdm_lowcbf, pdm_sbd.csp.LowCBFConfiguration):
raise TypeError(f"Expected PDM VisConfiguration, got {type(pdm_lowcbf)}")
if pdm_mccs and isinstance(pdm_mccs, pdm_sbd.MCCSAllocation):
number_of_channels = _number_of_channels_for_lowcbf(pdm_lowcbf)
fsp_ids = [
_calculate_n_fsp(number_of_channels, len(subarray_beams.apertures), "vis")
for subarray_beams in pdm_mccs.subarray_beams
]
else:
fsp_ids = [
logical_fsp_id
for correlation_spw in pdm_lowcbf.correlation_spws
if correlation_spw.zoom_factor == 0
for logical_fsp_id in correlation_spw.logical_fsp_ids
]
# For lowcbf.vis firmware can be hardcoded as vis
fsp = cdm.VisFspConfiguration(firmware="vis", fsp_ids=fsp_ids)
stn_beams = [
cdm.VisStnBeamConfiguration(
stn_beam_id=subarray_beam_id,
integration_ms=pdm_lowcbf.correlation_spws[0].integration_time_ms
/ timedelta(milliseconds=1.0),
)
]
return cdm.VisConfiguration(fsp=fsp, stn_beams=stn_beams)
def _convert_station_configuration(
pdm_lowcbf: Optional[pdm_sbd.csp.LowCBFConfiguration],
pdm_mccs: pdm_sbd.MCCSAllocation,
subarray_beam_id: int,
) -> Optional[cdm.StationConfiguration]:
"""
Converts the relevant parts of the SBDefinition to a CDM StationConfiguration
"""
# Currently only one subarray_beam is supported
stns = [
[aperture.station_id, aperture.substation_id]
for aperture in pdm_mccs.subarray_beams[0].apertures
]
logical_bands = _calculate_logical_bands(pdm_lowcbf)
freq_ids = list(
chain(
*[
range(
logical_band.start_channel,
logical_band.start_channel + logical_band.number_of_channels,
)
for logical_band in logical_bands
]
)
)
stn_beams = [cdm.StnBeamConfiguration(beam_id=subarray_beam_id, freq_ids=freq_ids)]
return cdm.StationConfiguration(stns=stns, stn_beams=stn_beams)
def _convert_timing_beams_configuration(
pdm_target: Optional[pdm_sbd.Target] = None,
pdm_mccs: Optional[pdm_sbd.MCCSAllocation] = None,
pdm_lowcbf: Optional[pdm_sbd.csp.LowCBFConfiguration] = None,
subarray_beam_id: int = 1,
) -> Optional[cdm.TimingBeamsConfiguration]:
"""
Converts the relevant parts of the SBDefinition to a CDM TimingBeamsConfiguration
"""
if not pdm_target or not pdm_target.tied_array_beams.pst_beams:
return None
number_of_channels = _number_of_channels_for_lowcbf(pdm_lowcbf)
if pdm_mccs and isinstance(pdm_mccs, pdm_sbd.MCCSAllocation):
fsp_ids = [
_calculate_n_fsp(number_of_channels, len(subarray_beams.apertures), "pst")
for subarray_beams in pdm_mccs.subarray_beams
]
else:
fsp_ids = [1]
fsp = cdm.VisFspConfiguration(firmware="pst", fsp_ids=fsp_ids)
beams_configurations = []
for beam in pdm_target.tied_array_beams.pst_beams:
field = convert_target(beam)
beams_config = cdm.BeamsConfiguration(
pst_beam_id=beam.beam_id,
stn_beam_id=subarray_beam_id,
stn_weights=beam.stn_weights,
field=field,
)
beams_configurations.append(beams_config)
return cdm.TimingBeamsConfiguration(fsp=fsp, beams=beams_configurations)
def _calculate_logical_bands(
pdm_lowcbf: Optional[pdm_sbd.csp.LowCBFConfiguration],
) -> list[SubarrayBeamLogicalBands]:
return [
SubarrayBeamLogicalBands(
# This should always come out as an integer and this should be validated when creating the SBDefinition.
# We just cast here to keep Python happy.
start_channel=int(
correlation_spw.centre_frequency / 781.25e3
- correlation_spw.number_of_channels / 2
+ 0.5
),
number_of_channels=correlation_spw.number_of_channels,
)
for correlation_spw in pdm_lowcbf.correlation_spws
if correlation_spw.zoom_factor == 0
]
def _number_of_channels_for_lowcbf(pdm_lowcbf: pdm_sbd.csp.LowCBFConfiguration) -> int:
# The number of channels sent should be the total channels for all the SPWs in the CSP config
return sum(
[
correlation_spw.number_of_channels
for correlation_spw in pdm_lowcbf.correlation_spws
]
)