Source code for ska_sdp_wflow_pointing_offset.pointing_pipeline_standalone

"""
Pointing offset calibration pipeline - standalone version.
"""

import glob
import logging
import os
from pathlib import Path
from typing import Optional

from numpy.typing import NDArray
from pydantic import Field, field_validator
from ska_sdp_datamodels.calibration import PointingTable
from ska_sdp_datamodels.visibility import Visibility

from ska_sdp_wflow_pointing_offset import export_pointing_offset_data
from ska_sdp_wflow_pointing_offset.export_data import (
    export_pointingtable_to_hdf5,
)
from ska_sdp_wflow_pointing_offset.pipeline_base import PipelineBase

LOG = logging.getLogger("ska-sdp-pointing-offset")


[docs] class StandAlonePipeline(PipelineBase): """ The stand-alone version of the pointing calibration pipeline, i.e. to be run outside of the SDP environment. """ msdir: str = Field( description="Base directory containing the input measurement sets", ) results_dir: Optional[str] = Field( default=None, description="Directory where the results will be stored.", )
[docs] @field_validator("msdir") @classmethod def msdir_must_exist_and_be_absolute(cls, v: str) -> str: """Self-explanatory.""" path = Path(v) if not path.is_dir(): raise ValueError(f"msdir={v!s} must be an existing directory") return str(path.resolve())
@property def ms_files(self) -> list[str]: pattern = os.path.join(self.msdir, "*.ms") ms_files = sorted(glob.glob(pattern)) # pylint: disable=duplicate-code # NOTE: this should be deleted, and the edge case handled by the # processing code if not ms_files: raise FileNotFoundError( f"Given directory ({self.msdir}) doesn't contain " "the right (or any) MeasurementSets" ) return ms_files @property def common_prefix(self) -> str: # NOTE: The output naming convention could be a lot better, and that # would also simplify the logic below. # prefix is not the whole path just the file prefix prefix = os.path.commonprefix(self.ms_files) prefix = os.path.split(prefix)[-1].rstrip("-") if not prefix: prefix = "test_" if not prefix.endswith("_"): prefix += "_" if self.results_dir: return os.path.join(self.results_dir, prefix) return os.path.join(self.msdir, prefix)
[docs] def export_data( self, output_offsets: dict[str, NDArray], pointing_table: PointingTable, visibility: Visibility, ) -> None: """ Perform a set of data export operations: 1. write data into .txt file 2. write data into .hdf5 file """ # pylint: disable=duplicate-code LOG.info("Writing fitted parameters and computed offsets to file...") results_file = self.common_prefix + "pointing_offsets.txt" export_pointing_offset_data(results_file, output_offsets, visibility) LOG.info( "Fitted parameters and computed offsets written to %s", results_file, ) results_file_hdf = results_file.replace(".txt", ".hdf5") LOG.info("Write PointingTable into HDF5 file %s", results_file_hdf) export_pointingtable_to_hdf5(pointing_table, results_file_hdf)