"""
Pointing offset calibration pipeline - standalone version.
"""
import glob
import logging
import os
from pathlib import Path
from typing import Optional
from numpy.typing import NDArray
from pydantic import Field, field_validator
from ska_sdp_datamodels.calibration import PointingTable
from ska_sdp_datamodels.visibility import Visibility
from ska_sdp_wflow_pointing_offset import export_pointing_offset_data
from ska_sdp_wflow_pointing_offset.export_data import (
export_pointingtable_to_hdf5,
)
from ska_sdp_wflow_pointing_offset.pipeline_base import PipelineBase
LOG = logging.getLogger("ska-sdp-pointing-offset")
[docs]
class StandAlonePipeline(PipelineBase):
"""
The stand-alone version of the pointing calibration pipeline, i.e. to be
run outside of the SDP environment.
"""
msdir: str = Field(
description="Base directory containing the input measurement sets",
)
results_dir: Optional[str] = Field(
default=None,
description="Directory where the results will be stored.",
)
[docs]
@field_validator("msdir")
@classmethod
def msdir_must_exist_and_be_absolute(cls, v: str) -> str:
"""Self-explanatory."""
path = Path(v)
if not path.is_dir():
raise ValueError(f"msdir={v!s} must be an existing directory")
return str(path.resolve())
@property
def ms_files(self) -> list[str]:
pattern = os.path.join(self.msdir, "*.ms")
ms_files = sorted(glob.glob(pattern))
# pylint: disable=duplicate-code
# NOTE: this should be deleted, and the edge case handled by the
# processing code
if not ms_files:
raise FileNotFoundError(
f"Given directory ({self.msdir}) doesn't contain "
"the right (or any) MeasurementSets"
)
return ms_files
@property
def common_prefix(self) -> str:
# NOTE: The output naming convention could be a lot better, and that
# would also simplify the logic below.
# prefix is not the whole path just the file prefix
prefix = os.path.commonprefix(self.ms_files)
prefix = os.path.split(prefix)[-1].rstrip("-")
if not prefix:
prefix = "test_"
if not prefix.endswith("_"):
prefix += "_"
if self.results_dir:
return os.path.join(self.results_dir, prefix)
return os.path.join(self.msdir, prefix)
[docs]
def export_data(
self,
output_offsets: dict[str, NDArray],
pointing_table: PointingTable,
visibility: Visibility,
) -> None:
"""
Perform a set of data export operations:
1. write data into .txt file
2. write data into .hdf5 file
"""
# pylint: disable=duplicate-code
LOG.info("Writing fitted parameters and computed offsets to file...")
results_file = self.common_prefix + "pointing_offsets.txt"
export_pointing_offset_data(results_file, output_offsets, visibility)
LOG.info(
"Fitted parameters and computed offsets written to %s",
results_file,
)
results_file_hdf = results_file.replace(".txt", ".hdf5")
LOG.info("Write PointingTable into HDF5 file %s", results_file_hdf)
export_pointingtable_to_hdf5(pointing_table, results_file_hdf)