Source code for ska_sdp_wflow_pointing_offset.pipeline_base

"""
Base class for processing pipelines.
"""

from abc import ABC, abstractmethod
from typing import Optional

from numpy.typing import NDArray
from pydantic import BaseModel, Field, field_validator, model_validator
from ska_sdp_datamodels.calibration import PointingTable
from ska_sdp_datamodels.visibility import Visibility


[docs] class PipelineBase(BaseModel, ABC): """ Abstract base class for processing pipelines. Implements validated configuration parameters common to all pipelines. """ apply_mask: bool = Field( default=False, description="Whether to apply an RFI mask.", ) beamwidth_factor: tuple[float, float] = Field( default=(0.976, 1.098), description=( "Beamwidth factors for horizontal and vertical co-polarisations; " "constants k in theoretical beamwidth = k * lambda / D." ), ) start_freq: Optional[float] = Field( default=None, description="Start frequency in MHz (can be None).", gt=0, ) end_freq: Optional[float] = Field( default=None, description="End frequency in MHz (can be None).", gt=0, ) fit_to_sep_pol: bool = Field( default=False, description=( "Fit primary beams to parallel-hands gain amplitudes " "instead of Stokes I." ), ) fit_to_vis: bool = Field( default=False, description=( "Fit primary beams to cross-correlation visibility amplitudes " "instead of antenna gains." ), ) num_chunks: int = Field( default=16, gt=0, description="Number of frequency chunks for calibration.", ) rfi_file: Optional[str] = Field( default=None, description="Path to an RFI flagging file (can be None).", ) thresh_width: float = Field( default=1.15, gt=0, description="Maximum allowed ratio of fitted to expected beamwidth.", ) use_modelvis: bool = Field( default=False, description=( "Whether to use model visibilities for gain solving " "(requires ska-sdp-func)." ), ) use_source_offset_column: bool = Field( default=False, description=( "Whether to read on-sky offsets from the SOURCE_OFFSET column." ), )
[docs] @model_validator(mode="after") def validate_frequency_range(self): """Self-explanatory.""" if ( self.start_freq is not None and self.end_freq is not None and self.start_freq >= self.end_freq ): raise ValueError("start_freq must be less than end_freq") return self
[docs] @model_validator(mode="after") def check_fit_to_vis_and_fit_to_sep_pol_mutual_exclusion(self): """Self-explanatory.""" if self.fit_to_vis and self.fit_to_sep_pol: raise NotImplementedError( "Fitting to parallel-hands of the " "visibility amplitudes not yet supported!" ) return self
[docs] @field_validator("num_chunks") @classmethod def num_chunks_must_not_be_two(cls, v: int) -> int: """Self-explanatory.""" if v == 2: raise ValueError( "num_chunks cannot be 2, because the top and bottom band " "solutions/freqs/weights are discarded." ) return v
@property @abstractmethod def common_prefix(self) -> str: """Common filepath prefix for saving data.""" @property @abstractmethod def ms_files(self) -> list[str]: """Measurement set filepaths used as input."""
[docs] @abstractmethod def export_data( self, output_offsets: dict[str, NDArray], pointing_table: PointingTable, visibility: Visibility, ) -> None: """Export pipeline results, either to disk or to Kafka."""