from typing import Literal
from pydantic import AliasChoices, Field
from ska_tmc_cdm.messages.base import CdmObject
[docs]
class PSSBeamsConfiguration(CdmObject):
"""Class to hold PSS Beams configuration items
:param beam_id: Beam id
:param reference_frame: Reference frame for pointing coordinates(ICRS or HORIZON).
:param ra: Right Ascension of sub-array beam target, in degrees.
:param dec: Declination of sub-array beam target, in degrees.
:param centre_frequency: Centre frequency of the search beam.
"""
beam_id: int
reference_frame: Literal["ICRS", "HORIZON"]
ra: float
dec: float
centre_frequency: float
[docs]
class SinkID(CdmObject):
"""Class for Sink ID.
:param sink_id: Sink ID
"""
sink_id: str
[docs]
class SPSEventsConfiguration(CdmObject):
"""Class for SPS Events Configurations
:param active: Enable SPS events
:param sink: Sink configuration for PSS
"""
active: bool
sink: list[SinkID] = Field(default_factory=list)
[docs]
class ChannelsConfiguration(CdmObject):
"""Class for Channels Configurations
:param sps_events: Pulsar Search sps_events
"""
sps_events: SPSEventsConfiguration
[docs]
class SPCCLFiles(CdmObject):
"""Class for SPCCL Files
:param extension: extension.
:param dir: directory.
:param sink_id: sink id.
"""
extension: str
dir: str
sink_id: str
[docs]
class CandidateWindow(CdmObject):
"""Class for Candidate Window
:param ms_before: Number of milliseconds before the candidate start.
:param ms_after: Number of milliseconds after the candidate end time.
"""
ms_before: float
ms_after: float
[docs]
class SPCCLSigProcFiles(CdmObject):
"""Class for SPCCL Signal Processing Files
:param spectra_per_file: Spectra per file.
:param dir: Directory
:param extension: Extension
:param candidate_window: Candidate window configuration.
:param sink_id: Sink ID.
"""
spectra_per_file: int
dir: str
extension: str
candidate_window: CandidateWindow
sink_id: str
[docs]
class SinkConfiguration(CdmObject):
"""Class for Pulsar Search Sink Configuration
:param spccl_files: Pulsar Search Spccl files.
:param spccl_sigproc_files: SPCCL Signal Processing Files
"""
spccl_files: SPCCLFiles
spccl_sigproc_files: SPCCLSigProcFiles
[docs]
class Sinks(CdmObject):
"""Class for Pulsar Search Sinks.
:param channels: Pulsar Search Channels
:param sink_configs: Pulsar Search Sink Configurations.
"""
channels: ChannelsConfiguration
sink_configs: SinkConfiguration
[docs]
class SigProc(CdmObject):
"""Class for Pulsar Search sigproc.
:param file: The sigproc file(s) to read as input data.
:param chunk_samples: The number of time samples in each chunk.
:param default_nbits: The default number of bits to use.
:param active: Whether to enable Sigproc.
"""
file: str
chunk_samples: int
default_nbits: int = Field(
serialization_alias="default-nbits",
validation_alias=AliasChoices("default-nbits", "default_nbits"),
)
active: bool
[docs]
class UDPConfig(CdmObject):
"""Class for Pulsar Search UDP source Configuration.
:param number_of_threads: The number of threads to run LOW ingest services.
:param spectra_per_chunk: The number of time slices in each chunk.
:param number_of_channels: Total number of frequency channels.
:param max_buffers: The max number of udp packet buffers to use.
:param active: Whether to enable the UDP.
"""
number_of_threads: int
spectra_per_chunk: int
number_of_channels: int
max_buffers: int
active: bool
[docs]
class CheetahSource(CdmObject):
"""Class for cheetah pipeline sources.
:param sigproc: Pulsar Search sigproc
:param udp_low: Pulsar Search UDP source
:param udp_low_lite: Pulsar Search UDP Low
"""
sigproc: SigProc
udp_low: UDPConfig
udp_low_lite: UDPConfig
[docs]
class CheetahBeamConfiguration(CdmObject):
"""Class for Cheetah beam configurations
:param active: Whether to enable the beam
:param sinks: Sink Configuration
:param source: Source Configuration
:param beam_id: Beam ID
"""
active: bool
sinks: Sinks
source: CheetahSource
beam_id: int
[docs]
class CheetahBeamsConfiguration(CdmObject):
"""Class for Cheetah beams configuration.
:param beam: Beam Configurations
"""
beam: CheetahBeamConfiguration
[docs]
class PSBC(CdmObject):
"""Class for PSS Beamforming Collector (PSBC).
:param dump_time: Time interval (in seconds) at which beamformed data is dumped.
"""
dump_time: int
[docs]
class Labyrinth(CdmObject):
"""Class for Labyrinth acceleration algorithm.
:param active: Whether the labyrinth algorithm is enabled.
:param threshold: Threshold parameter for candidate detection.
"""
active: bool
threshold: float
[docs]
class FDAS(CdmObject):
"""Class for FDAS.
:param pool_id: Name of the processing pool to use.
:param priority: Scheduling priority for FDAS tasks.
:param active: Whether FDAS processing is active.
:param labyrinth: Labyrinth sub-configuration
"""
pool_id: str
priority: int
active: bool
labyrinth: Labyrinth
[docs]
class StrongSift(CdmObject):
"""Class for Strong Sift.
:param active: Whether the strong_sift stage is active.
:param num_candidate_harmonics: Number of harmonics to consider per candidate.
:param match_factor: Factor to scale thresholding match window.
:param dm_match_range: Allowed range in dispersion measure for match filtering.
"""
active: bool
num_candidate_harmonics: int
match_factor: float
dm_match_range: int
[docs]
class SiftConfig(CdmObject):
"""Class for SIFT Configuration.
:param pool_id: Pool ID to assign sift processing.
:param priority: Execution priority of the sift stage.
:param strong_sift: Configuration for the strong SIFT algorithm.
"""
pool_id: str
priority: int
strong_sift: StrongSift
[docs]
class AccelerationConfig(CdmObject):
"""Class for Acceleration configurations.
:param fdas: FDAS configuration.
"""
fdas: FDAS
[docs]
class CheetahConfiguration(CdmObject):
"""Class for Cheetah Configurations.
:param cheetah_id: Pipeline ID for the cheetah instance.
:param beams: List of cheetah beams,up to 3 beams.
:param psbc: PSS Beamforming Collector (PSBC)
:param acceleration: Acceleration configuration.
:param sift: SIFT configuration.
"""
cheetah_id: int
beams: list[CheetahBeamsConfiguration] = Field(
default_factory=list, max_length=3
)
psbc: PSBC
acceleration: AccelerationConfig
sift: SiftConfig
[docs]
class Activation(CdmObject):
"""Class for Activation.
:param active: Enable following configuration.
"""
active: bool
[docs]
class GPUBruteForce(CdmObject):
"""Class for GPU Brute force configuration.
:param active: Enable the configuration.
:param copy_dmtrails_to_host: Enable the configuration.
"""
active: bool
copy_dmtrials_to_host: bool
[docs]
class DeDispersionConfig(CdmObject):
"""Class for DeDispersion Configuration.
:param start: Start DM in cm^-3 pc.
:param end: End DM in cm^-3 pc (inclusive).
:param step: DM step size in cm^-3 pc.
"""
start: float
end: float
step: float
[docs]
class DDTRConfiguration(CdmObject):
"""Class for DDTR Configurations.
:param cpu: Enable the configuration.
:param fpga: Enable FGPA.
:param gpu_bruteforce: GPU brute force algorithm configuraiton.
:param klotski: Enable Klotski algorithm.
:param klotski_bruteforce: Enable Klotski brute force algorithm.
:param dedispersion: PSS dedispersion config.
:param dedispersion_samples: max samples to process in each call to the dedisperser.
"""
cpu: Activation
fpga: Activation
gpu_bruteforce: GPUBruteForce
klotski: Activation
klotski_bruteforce: Activation
dedispersion: list[DeDispersionConfig] = Field(default_factory=list)
dedispersion_samples: int
[docs]
class SPSCPUConfig(CdmObject):
"""Class for SPS CPU Configuration.
:param active: enable the configuration.
:param samples_per_iteration: samples per iteration.
:param number_of_widths: number of widths.
"""
active: bool
samples_per_iteration: int
number_of_widths: int
[docs]
class SPSKlotskiConfig(CdmObject):
"""Class for SPS Klotski Configuration.
:param active: Enable klotski sps mode.
:param pulse_widths: pulse widths.
"""
active: bool
pulse_widths: str
[docs]
class SPSConfiguration(CdmObject):
"""Class for SPS Configurations.
:param cpu: SPS cpu configuration.
:param threshold: Threshold
:param klotski: PSS klotski configuration
:param klotski_bruteforce: PSS klotski configuration
"""
cpu: SPSCPUConfig
threshold: float
klotski: SPSKlotskiConfig
klotski_bruteforce: SPSKlotskiConfig
[docs]
class PSSConfiguration(CdmObject):
"""Class to hold PST configuration items
:param beam: List of beam configurations
:param config_id: configuration id
:param cheetah: cheetah pipeline configuration
:param ddtr: ddtr configuration
:param sps: sps configuration
"""
beam: list[PSSBeamsConfiguration] = Field(default_factory=list)
config_id: int
cheetah: list[CheetahConfiguration] = Field(default_factory=list)
ddtr: DDTRConfiguration
sps: SPSConfiguration