Source code for ska_tmc_cdm.schemas.central_node.sdp

"""
The schemas.central_node module defines Marshmallow schemas that map TMC
Central Node message classes to/from a JSON representation.
"""


from marshmallow import Schema, fields, post_dump, post_load

from ska_tmc_cdm.messages.central_node.sdp import (
    BeamConfiguration,
    Channel,
    ChannelConfiguration,
    EBScanType,
    EBScanTypeBeam,
    ExecutionBlockConfiguration,
    FieldConfiguration,
    PbDependency,
    PhaseDir,
    PolarisationConfiguration,
    ProcessingBlockConfiguration,
    ScanType,
    ScriptConfiguration,
    SDPConfiguration,
    SDPWorkflow,
)
from ska_tmc_cdm.schemas import CODEC

__all__ = [
    "ScanTypeSchema",
    "SDPWorkflowSchema",
    "PbDependencySchema",
    "ChannelSchema",
    "ProcessingBlockSchema",
    "SDPConfigurationSchema",
    "ExecutionBlockConfigurationSchema",
    "BeamConfigurationSchema",
    "ChannelConfigurationSchema",
    "PolarisationConfigurationSchema",
    "FieldConfigurationSchema",
    "PhaseDirSchema",
    "ScriptConfigurationSchema",
    "EBScanTypeBeamSchema",
    "EBScanTypeSchema",
]


[docs] class ChannelSchema(Schema): """ Marshmallow schema for the SubBand class. """ count = fields.Integer(data_key="count", required=True) start = fields.Integer(data_key="start", required=True) stride = fields.Integer() freq_min = fields.Float(data_key="freq_min", required=True) freq_max = fields.Float(data_key="freq_max", required=True) link_map = fields.List(fields.List(fields.Integer())) spectral_window_id = fields.String() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_channel(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a Channel object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SubBand object populated from data """ count = data["count"] start = data["start"] stride = data.get("stride") freq_min = data["freq_min"] freq_max = data["freq_max"] link_map = data.get("link_map") spectral_window_id = data.get("spectral_window_id") return Channel( count, start, stride, freq_min, freq_max, link_map, spectral_window_id, )
[docs] class ScanTypeSchema(Schema): """ Marshmallow schema for the ScanType class. """ scan_type_id = fields.String(data_key="scan_type_id", required=True) reference_frame = fields.String(data_key="reference_frame", required=True) ra = fields.String(data_key="ra", required=True) dec = fields.String(data_key="dec", required=True) channels = fields.Nested(ChannelSchema, data_key="channels", many=True) @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_scan_type(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a ScanType object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: ScanType object populated from data """ scan_type_id = data["scan_type_id"] reference_frame = data["reference_frame"] ra = data["ra"] dec = data["dec"] channels = data["channels"] return ScanType(scan_type_id, reference_frame, ra, dec, channels)
[docs] class SDPWorkflowSchema(Schema): """ Represents the type of workflow being configured on the SDP """ name = fields.String(data_key="name", required=True) kind = fields.String(data_key="kind", required=True) version = fields.String(data_key="version", required=True) @post_load def create_sdp_wf(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a SDP Workflow object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDP Workflow object populated from data """ name = data["name"] kind = data["kind"] version = data["version"] return SDPWorkflow(name, kind, version)
[docs] class PbDependencySchema(Schema): """ Marshmallow schema for the PbDepedency class. """ pb_id = fields.String(data_key="pb_id") kind = fields.List(fields.String()) @post_load def create_pb_dependency(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a PbDependency object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: PbDependency object populated from data """ pb_id = data["pb_id"] kind = data["kind"] return PbDependency(pb_id, kind)
[docs] class ScriptConfigurationSchema(Schema): """ Marshmallow schema for the ScriptConfiguration class. """ kind = fields.String() name = fields.String() version = fields.String() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_executionblock_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a ScriptConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return ScriptConfiguration(**data)
[docs] class ProcessingBlockSchema(Schema): """ Marshmallow schema for the ProcessingBlock class. """ pb_id = fields.String(data_key="pb_id", required=True) workflow = fields.Nested(SDPWorkflowSchema) parameters = fields.Dict() dependencies = fields.List(fields.Nested(PbDependencySchema)) sbi_ids = fields.List(fields.String()) script = fields.Nested(ScriptConfigurationSchema) @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_processing_block_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a ProcessingBlock object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: PB object populated from data """ return ProcessingBlockConfiguration(**data)
[docs] class BeamConfigurationSchema(Schema): """ Marsmallow class for the BeamConfiguration class """ beam_id = fields.String() function = fields.String() search_beam_id = fields.Integer() timing_beam_id = fields.Integer() vlbi_beam_id = fields.Integer() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_beam_config(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a BeamConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return BeamConfiguration(**data)
[docs] class ChannelConfigurationSchema(Schema): """ Marsmallow class for the ChannelConfiguration class """ channels_id = fields.String() spectral_windows = fields.Nested(ChannelSchema, many=True) @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_channel_config(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a ChannelConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return ChannelConfiguration(**data)
[docs] class PolarisationConfigurationSchema(Schema): """ Marsmallow class for the PolarisationConfiguration class """ polarisations_id = fields.String() corr_type = fields.List(fields.String()) @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_polarisation_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a PolarisationConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return PolarisationConfiguration(**data)
[docs] class PhaseDirSchema(Schema): """ Marsmallow class for the PhaseDir class """ ra = fields.List(fields.Float()) dec = fields.List(fields.Number()) reference_time = fields.String() reference_frame = fields.String() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_phase_dir_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a PhaseDir object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return PhaseDir(**data)
[docs] class FieldConfigurationSchema(Schema): """ Marsmallow class for the FieldConfiguration class """ field_id = fields.String() phase_dir = fields.Nested(PhaseDirSchema) pointing_fqdn = fields.String() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_polarisation_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a FieldConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return FieldConfiguration(**data)
[docs] class EBScanTypeBeamSchema(Schema): """ Marsmallow class for the EBScanTypeBeam class """ field_id = fields.String() channels_id = fields.String() polarisations_id = fields.String() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_ebscantypebeams_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a EBScanTypeBeam object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return EBScanTypeBeam(**data)
[docs] class EBScanTypeSchema(Schema): """ Marsmallow class for the EBScanTypeBeam class """ scan_type_id = fields.String() beams = fields.Dict( keys=fields.String(), values=fields.Nested(EBScanTypeBeamSchema) ) derive_from = fields.String() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_ebscantype_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a EBScanType object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return EBScanType(**data)
[docs] class ExecutionBlockConfigurationSchema(Schema): """ Marsmallow class for the ExecutionBlockConfiguration class """ eb_id = fields.String(data_key="eb_id") max_length = fields.Float(data_key="max_length") context = fields.Dict() beams = fields.List(fields.Nested(BeamConfigurationSchema)) channels = fields.List(fields.Nested(ChannelConfigurationSchema)) polarisations = fields.List(fields.Nested(PolarisationConfigurationSchema)) scan_types = fields.List(fields.Nested(EBScanTypeSchema)) fields = fields.List(fields.Nested(FieldConfigurationSchema)) @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_executionblock_config( self, data, **_ ): # pylint: disable=no-self-use """ Convert parsed JSON back into a ExecutionBlockConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return ExecutionBlockConfiguration(**data)
[docs] @CODEC.register_mapping(SDPConfiguration) class SDPConfigurationSchema(Schema): """ Marsmallow class for the SDPConfiguration class """ interface = fields.String() execution_block = fields.Nested(ExecutionBlockConfigurationSchema) eb_id = fields.String(data_key="eb_id") max_length = fields.Float(data_key="max_length") scan_types = fields.Nested(ScanTypeSchema, many=True) processing_blocks = fields.Nested(ProcessingBlockSchema, many=True) resources = fields.Dict() @post_dump def filter_nulls(self, data, **_): # pylint: disable=no-self-use """ Filter out null values from JSON. :param data: Marshmallow-provided dict containing parsed object values :param _: kwargs passed by Marshmallow :return: dict suitable for PB configuration """ return {k: v for k, v in data.items() if v is not None} @post_load def create_sdp_config(self, data, **_): # pylint: disable=no-self-use """ Convert parsed JSON back into a SDPConfiguration object. :param data: Marshmallow-provided dict containing parsed JSON values :param _: kwargs passed by Marshmallow :return: SDPConfiguration object populated from data """ return SDPConfiguration(**data)