"""
The messages module provides simple Python representations of the structured
request and response for the TMC CentralNode.AssignResources command.
"""
import math
from datetime import datetime
from typing import Optional
from pydantic import Field
from ska_tmc_cdm.messages.base import CdmObject
from ska_tmc_cdm.messages.skydirection import SkyDirection
__all__ = [
"SDPWorkflow",
"SDPConfiguration",
"ProcessingBlockConfiguration",
"PbDependency",
"ScanType",
"Channel",
"BeamConfiguration",
"ChannelConfiguration",
"PolarisationConfiguration",
"PhaseDir",
"FieldConfiguration",
"ScriptConfiguration",
"ExecutionBlockConfiguration",
"ScriptConfiguration",
"EBScanTypeBeam",
"EBScanType",
]
SDP_SCHEMA = "https://schema.skao.int/ska-sdp-assignres/1.0"
[docs]
class SDPWorkflow(CdmObject):
"""
Class to hold SDPWorkflows for ProcessingBlock
Create a new SDPWorkflow object.
:param name: The name of the processing script
:param kind: The kind of processing script
:param version: Version of the processing script
"""
name: str
kind: str
version: str
[docs]
class Channel(CdmObject):
"""
Class to hold Channels for ScanType
Create a new Channel object.
:param count: Number of channels
:param start: First channel ID
:param stride: Distance between subsequent channel IDs
:param freq_min: Lower bound of first channel
:param freq_max: Upper bound of last channel
:param link_map: Channel map that specifies which network link is going to get used to send channels to SDP.
Intended to allow SDP to optimize network and receive node configuration.
:param spectral_window_id: spectral_window_id
"""
# TODO: double-check what's optional here:
count: int
start: int
stride: Optional[int] = None
freq_min: float
freq_max: float
link_map: Optional[list[list]] = None
spectral_window_id: Optional[str] = None
[docs]
class ScanType(CdmObject):
"""
Class to hold ScanType configuration
:param scan_type_id: (any scan type)
:param reference_frame: Specification of the reference frame or system for a set of pointing coordinates
:param ra: Right Ascension in degrees
:param dec: Declination in degrees
:param channels: Expected channel configuration.
"""
scan_type_id: str
reference_frame: str
ra: str
dec: str
channels: list[Channel]
[docs]
class PbDependency(CdmObject):
"""
Class to hold Dependencies for ProcessingBlock
:param pb_id: Unique identifier for this processing block
:param kind: The kind of processing script (realtime or batch)
"""
pb_id: str
kind: list[str]
[docs]
class ScriptConfiguration(CdmObject):
"""
Class to hold ScriptConfiguration
:param name: The name of the processing script
:param kind: The kind of processing script
:param version: Version of the processing script
"""
kind: Optional[str] = None
name: Optional[str] = None
version: Optional[str] = None
[docs]
class ProcessingBlockConfiguration(CdmObject):
"""
Class to hold ProcessingBlock configuration
:param pb_id: Processing block ID
:param workflow: Specification of the workflow to be executed along with configuration parameters for the workflow.
:param parameters: Processing script parameters
:param dependencies: Dependencies on other processing blocks
:param sbi_ids: list of scheduling block ids
:param script: Processing script description (dictionary for now)
"""
pb_id: Optional[str] = None
workflow: Optional[SDPWorkflow] = None
# FIXME: should probably be `Field(default_factory=dict)` not `None`
parameters: Optional[dict] = None
# FIXME: should probably be `Field(default_factory=list)` not `None`
dependencies: Optional[list[PbDependency]] = None
# FIXME: should probably be `Field(default_factory=list)` not `None`
sbi_ids: Optional[list[str]] = None
script: Optional[ScriptConfiguration] = None
[docs]
class BeamConfiguration(CdmObject):
"""
Class to hold Dependencies for Beam Configuration
:param beam_id: Name to identify the beam within the SDP configuration.
:param function: Identifies the type and origin of the generated beam data.
:param search_beam_id: search_beam_id
:param timing_beam_id: timing_beam_id
:param vlbi_beam_id: vlbi_beam_id
:param visibility_beam_id: visibility_beam_id
"""
beam_id: Optional[str] = None
function: Optional[str] = None
search_beam_id: Optional[int] = None
timing_beam_id: Optional[int] = None
vlbi_beam_id: Optional[int] = None
visibility_beam_id: Optional[int] = None
[docs]
class ChannelConfiguration(CdmObject):
"""
Class to hold Dependencies for Channel Configuration
:param channels_id: channels_id
:param spectral_windows: spectral_windows
"""
channels_id: Optional[str] = None
spectral_windows: list[Channel] = Field(default_factory=list)
[docs]
class PolarisationConfiguration(CdmObject):
"""
Class to hold Dependencies for Polarisation Configuration
:param polarisations_id: Polarisation definitions id
:param corr_type: corr_type
"""
polarisations_id: Optional[str] = None
corr_type: list[str] = Field(default_factory=list)
[docs]
class PhaseDir(CdmObject):
"""
Class to hold PhaseDir configuration
:param ra: Right Ascension in degrees (see ADR-49)
:param dec: Declination in degrees (see ADR-49)
:param reference_time: reference_time,
:param reference_frame: Specification of the reference frame or system for a set of pointing coordinates (see ADR-49)
"""
ra: list[float] = Field(default_factory=list)
dec: list[float] = Field(default_factory=list)
reference_time: Optional[datetime] = None
reference_frame: Optional[str] = None
@staticmethod
def _floatlist_eq(list_a: list[float], list_b: list[float]) -> bool:
tol = 1e-15
return all(
(math.isclose(x, y, abs_tol=tol) for x, y in zip(list_a, list_b))
)
def __eq__(self, other):
if not isinstance(other, PhaseDir):
return False
return (
self.reference_frame == other.reference_frame
and self.reference_time == other.reference_time
and self._floatlist_eq(self.ra, other.ra)
and self._floatlist_eq(self.dec, other.dec)
)
[docs]
class FieldConfiguration(CdmObject):
"""
Class to hold Field configuration
:param field_id: field_id
:param pointing_fqdn: pointing_fqdn
:param phase_dir: Phase direction
"""
field_id: Optional[str] = None
pointing_fqdn: Optional[str] = None
phase_dir: Optional[SkyDirection | PhaseDir] = None
[docs]
class EBScanTypeBeam(CdmObject):
"""
Class to hold EBScanTypeBeam Configuration
:param field_id: field_id
:param channels_id: channels_id
:param polarisations_id: polarisations_id
"""
field_id: Optional[str] = None
channels_id: Optional[str] = None
polarisations_id: Optional[str] = None
[docs]
class EBScanType(CdmObject):
"""
Class to hold EBScanType configuration
:param scan_type_id: scan_type_id
:param beams: Beam parameters for the purpose of the Science Data Processor.
:param derive_from: derive_from
"""
scan_type_id: Optional[str] = None
beams: dict[str, EBScanTypeBeam] = Field(default_factory=dict)
derive_from: Optional[str] = None
[docs]
class ExecutionBlockConfiguration(CdmObject):
"""
Class to hold ExecutionBlock configuration
:param eb_id: Execution block ID to associate with processing
:param max_length: Hint about the maximum observation length to support by the SDP.
:param context: Free-form information from OET, see ADR-54
:param beams: Beam parameters for the purpose of the Science Data Processor.
:param channels: Spectral windows per channel configuration.
:param polarisations: Polarisation definition.
:param fields: fields / Targets
:param scan_types: Scan types. Associates scans with per-beam fields & channel configurations
"""
eb_id: Optional[str] = None
max_length: Optional[float] = None
context: dict = Field(default_factory=dict, exclude=False)
# FIXME: should these all be `Field(default_factory=list)` instead of `None`?
beams: Optional[list[BeamConfiguration]] = None
channels: Optional[list[ChannelConfiguration]] = None
polarisations: Optional[list[PolarisationConfiguration]] = None
fields: Optional[list[FieldConfiguration]] = None
scan_types: Optional[list[EBScanType]] = None
[docs]
class SDPConfiguration(CdmObject):
"""
Class to hold SDP Configuration
:param interface: url string to determine JsonSchema version
:param transaction_id: string ID for tracking requests
:param processing_blocks: A Processing Block is an atomic unit of data processing for the purpose of SDP’s internal scheduler
:param execution_block: execution_block
:param resources: resources
"""
interface: Optional[str] = SDP_SCHEMA
transaction_id: Optional[str] = None
execution_block: Optional[ExecutionBlockConfiguration] = None
resources: Optional[dict] = None
processing_blocks: Optional[list[ProcessingBlockConfiguration]] = None