"""
Base class for processing pipelines.
"""
from abc import ABC, abstractmethod
from typing import Optional
from numpy.typing import NDArray
from pydantic import BaseModel, Field, field_validator, model_validator
from ska_sdp_datamodels.calibration import PointingTable
from ska_sdp_datamodels.visibility import Visibility
[docs]
class PipelineBase(BaseModel, ABC):
"""
Abstract base class for processing pipelines.
Implements validated configuration parameters common to all pipelines.
"""
apply_mask: bool = Field(
default=False,
description="Whether to apply an RFI mask.",
)
beamwidth_factor: tuple[float, float] = Field(
default=(0.976, 1.098),
description=(
"Beamwidth factors for horizontal and vertical co-polarisations; "
"constants k in theoretical beamwidth = k * lambda / D."
),
)
start_freq: Optional[float] = Field(
default=None,
description="Start frequency in MHz (can be None).",
gt=0,
)
end_freq: Optional[float] = Field(
default=None,
description="End frequency in MHz (can be None).",
gt=0,
)
fit_to_sep_pol: bool = Field(
default=False,
description=(
"Fit primary beams to parallel-hands gain amplitudes "
"instead of Stokes I."
),
)
fit_to_vis: bool = Field(
default=False,
description=(
"Fit primary beams to cross-correlation visibility amplitudes "
"instead of antenna gains."
),
)
num_chunks: int = Field(
default=16,
gt=0,
description="Number of frequency chunks for calibration.",
)
rfi_file: Optional[str] = Field(
default=None,
description="Path to an RFI flagging file (can be None).",
)
thresh_width: float = Field(
default=1.15,
gt=0,
description="Maximum allowed ratio of fitted to expected beamwidth.",
)
use_modelvis: bool = Field(
default=False,
description=(
"Whether to use model visibilities for gain solving "
"(requires ska-sdp-func)."
),
)
use_source_offset_column: bool = Field(
default=False,
description=(
"Whether to read on-sky offsets from the SOURCE_OFFSET column."
),
)
[docs]
@model_validator(mode="after")
def validate_frequency_range(self):
"""Self-explanatory."""
if (
self.start_freq is not None
and self.end_freq is not None
and self.start_freq >= self.end_freq
):
raise ValueError("start_freq must be less than end_freq")
return self
[docs]
@model_validator(mode="after")
def check_fit_to_vis_and_fit_to_sep_pol_mutual_exclusion(self):
"""Self-explanatory."""
if self.fit_to_vis and self.fit_to_sep_pol:
raise NotImplementedError(
"Fitting to parallel-hands of the "
"visibility amplitudes not yet supported!"
)
return self
[docs]
@field_validator("num_chunks")
@classmethod
def num_chunks_must_not_be_two(cls, v: int) -> int:
"""Self-explanatory."""
if v == 2:
raise ValueError(
"num_chunks cannot be 2, because the top and bottom band "
"solutions/freqs/weights are discarded."
)
return v
@property
@abstractmethod
def common_prefix(self) -> str:
"""Common filepath prefix for saving data."""
@property
@abstractmethod
def ms_files(self) -> list[str]:
"""Measurement set filepaths used as input."""
[docs]
@abstractmethod
def export_data(
self,
output_offsets: dict[str, NDArray],
pointing_table: PointingTable,
visibility: Visibility,
) -> None:
"""Export pipeline results, either to disk or to Kafka."""