Source code for low_comm_tools.ms_utils

from __future__ import annotations

from pathlib import Path
from typing import Any, NamedTuple, TypeAlias, cast

import astropy.units as u
import numpy as np
import numpy.typing as npt
from astropy.coordinates import AltAz, EarthLocation, SkyCoord
from astropy.time import Time
from casacore.tables import table, taql

from low_comm_tools.log_config import logger

[docs] IntArray: TypeAlias = npt.NDArray[np.int64]
[docs] FloatArray: TypeAlias = npt.NDArray[np.floating[Any]]
[docs] ComplexArray: TypeAlias = npt.NDArray[np.complexfloating[Any, Any]]
[docs] def _as_path(p: str | Path) -> Path: return p if isinstance(p, Path) else Path(p)
[docs] def get_weight_from_ms( ms_path: Path | str, spectrum: bool = True, ) -> FloatArray: ms_path = _as_path(ms_path) col_name = "WEIGHT_SPECTRUM" if spectrum else "WEIGHT" with table((ms_path).as_posix(), ack=False) as tab: return cast(FloatArray, tab.getcol(col_name))
[docs] def get_interval_from_ms(ms_path: Path | str) -> u.Quantity[u.s]: ms_path = _as_path(ms_path) with table(ms_path.as_posix(), ack=False) as tab: return tab.getcol("INTERVAL") * u.s
[docs] def get_scan_numbers_from_ms(ms_path: Path | str) -> IntArray: ms_path = _as_path(ms_path) with table(ms_path.as_posix(), ack=False) as tab: return np.unique(tab.getcol("SCAN_NUMBER"))
[docs] def get_baseline_length( ms_path: Path | str, ant_1: int, ant_2: int, ) -> u.Quantity[u.m]: """Get the length of a baseline. Args: ms_path (Path | str): Path to MS ant_1 (int): Antenna 1 index ant_2 (int): Antenna 2 index Returns: u.Quantity[u.m]: Baseline length in metres """ ms_path = _as_path(ms_path) with table((ms_path / "ANTENNA").as_posix(), ack=False) as ant_tab: ant_pos = ant_tab.getcol("POSITION") return np.linalg.norm(ant_pos[ant_2] - ant_pos[ant_1]) * u.m
[docs] def get_telescope_name_from_ms(ms_path: Path | str) -> str: ms_path = _as_path(ms_path) with table((ms_path / "OBSERVATION").as_posix(), ack=False) as tab: return str(tab.getcol("TELESCOPE_NAME")[0])
[docs] def rename_telescope( ms_path: Path, telescope_name: str = "SKA-LOW", ) -> Path: """Rename TELESCOPE column Args: ms_path (Path): Path to MS telescope_name (str, optional): New telescope name. Defaults to "SKA-LOW". Returns: Path: Updated MS path """ logger.info(f"Setting TELESCOPE_NAME to '{telescope_name}' in {ms_path}") with table((ms_path).as_posix(), readonly=False, ack=True) as tab: _ = tab # Keep linters happy taql(f"UPDATE $tab::OBSERVATION SET TELESCOPE_NAME='{telescope_name}'") return ms_path
[docs] def update_ms_with_subtable( ms_path: Path, subtable_path: Path, dry_run: bool = False, ) -> Path: """Add subtable to metadata Args: ms_path (Path): Path to MS subtable_path (Path): Path to subtable dry_run (bool, optional): Don't apply update. Defaults to False. Returns: Path: Updated MS """ if dry_run: logger.info(f"Would make {subtable_path.name} a subtable of {ms_path}") return ms_path with ( table(ms_path.as_posix(), readonly=False, ack=False) as tab, table(subtable_path.as_posix(), ack=False) as sub_tab, ): tab.putkeyword(subtable_path.name, sub_tab, makesubrecord=True) return ms_path
[docs] def get_coord_from_ms( ms_path: str | Path, field_index: int = 0, ) -> SkyCoord: ms_path = _as_path(ms_path) with table((ms_path / "FIELD").as_posix(), ack=False) as tab: field_row = tab.getcell("PHASE_DIR", field_index).flatten() return SkyCoord( ra=field_row[0], dec=field_row[1], unit=u.rad, )
[docs] def get_time_from_table(tab: table, use_centroid: bool = True) -> Time: """Get time from OPEN casacore tyable Args: tab (table): OPEN table Returns: Time: Times """ time_column = "TIME_CENTROID" if use_centroid else "TIME" times_mjds = np.unique(tab.getcol(time_column)[:].flatten()) * u.s return Time(times_mjds, format="mjd", scale="utc")
[docs] def get_time_from_ms( ms_path: str | Path, use_centroid: bool = True, ) -> Time: ms_path = _as_path(ms_path) with table((ms_path).as_posix(), ack=False) as tab: return get_time_from_table(tab, use_centroid=use_centroid)
[docs] def get_freq_from_ms( ms_path: str | Path, ) -> u.Quantity[u.Hz]: ms_path = _as_path(ms_path) with table((ms_path / "SPECTRAL_WINDOW").as_posix(), ack=False) as tab: return tab.getcol("CHAN_FREQ").flatten() * u.Hz
[docs] def get_location_from_ms( ms_path: str | Path, ) -> EarthLocation: ms_path = _as_path(ms_path) with table((ms_path / "ANTENNA").as_posix(), ack=False) as tab: location_array = tab.getcol("POSITION") * u.m return EarthLocation.from_geocentric( x=location_array[:, 0], y=location_array[:, 1], z=location_array[:, 2], )
[docs] def get_altaz_from_ms( ms_path: str | Path, field_index: int = 0, ) -> SkyCoord: ms_path = _as_path(ms_path) coord = get_coord_from_ms(ms_path, field_index=field_index) time = get_time_from_ms(ms_path) location = get_location_from_ms(ms_path) return coord.transform_to(AltAz(obstime=time, location=location))
[docs] def _fix_field_name(field_name: str) -> str: field_name = "_".join(field_name.split()) if field_name.startswith("field_"): field_name = field_name.replace("field_", "") if field_name.endswith("_0"): field_name = field_name.removesuffix("_0") return field_name
[docs] def get_field_name_from_ms( ms_path: str | Path, field_index: int = 0, ) -> str: ms_path = _as_path(ms_path) with table((ms_path / "FIELD").as_posix(), ack=False) as tab: return _fix_field_name(str(tab.getcol("NAME")[field_index]))
[docs] def get_columns_from_ms( ms_path: str | Path, ) -> list[str]: ms_path = _as_path(ms_path) with table(ms_path.as_posix(), ack=False) as tab: return list(tab.colnames())
[docs] def get_antenna_names_from_ms( ms_path: str | Path, ) -> list[str]: ms_path = _as_path(ms_path) with table((ms_path / "ANTENNA").as_posix(), ack=False) as tab: return list(tab.getcol("NAME"))
[docs] def get_antenna_index_from_name( ms_path: str | Path, name: str, ) -> int: return get_antenna_names_from_ms(ms_path).index(name)
[docs] def get_antenna_name_from_index( ms_path: str | Path, idx: int, ) -> str: return get_antenna_names_from_ms(ms_path)[idx]
[docs] class Antennas(NamedTuple):
[docs] ant_1s: npt.NDArray[np.integer[Any]]
[docs] ant_2s: npt.NDArray[np.integer[Any]]
[docs] def get_antennas_from_ms( ms_path: str | Path, ) -> Antennas: ms_path = _as_path(ms_path) with table((ms_path).as_posix(), ack=False) as tab: ant_1s = np.unique(tab.getcol("ANTENNA1")) ant_2s = np.unique(tab.getcol("ANTENNA2")) return Antennas(ant_1s=ant_1s, ant_2s=ant_2s)