from __future__ import annotations
from pathlib import Path
from typing import Any, NamedTuple, TypeAlias, cast
import astropy.units as u
import numpy as np
import numpy.typing as npt
from astropy.coordinates import AltAz, EarthLocation, SkyCoord
from astropy.time import Time
from casacore.tables import table, taql
from low_comm_tools.log_config import logger
[docs]
IntArray: TypeAlias = npt.NDArray[np.int64]
[docs]
FloatArray: TypeAlias = npt.NDArray[np.floating[Any]]
[docs]
ComplexArray: TypeAlias = npt.NDArray[np.complexfloating[Any, Any]]
[docs]
def _as_path(p: str | Path) -> Path:
return p if isinstance(p, Path) else Path(p)
[docs]
def get_weight_from_ms(
ms_path: Path | str,
spectrum: bool = True,
) -> FloatArray:
ms_path = _as_path(ms_path)
col_name = "WEIGHT_SPECTRUM" if spectrum else "WEIGHT"
with table((ms_path).as_posix(), ack=False) as tab:
return cast(FloatArray, tab.getcol(col_name))
[docs]
def get_interval_from_ms(ms_path: Path | str) -> u.Quantity[u.s]:
ms_path = _as_path(ms_path)
with table(ms_path.as_posix(), ack=False) as tab:
return tab.getcol("INTERVAL") * u.s
[docs]
def get_scan_numbers_from_ms(ms_path: Path | str) -> IntArray:
ms_path = _as_path(ms_path)
with table(ms_path.as_posix(), ack=False) as tab:
return np.unique(tab.getcol("SCAN_NUMBER"))
[docs]
def get_baseline_length(
ms_path: Path | str,
ant_1: int,
ant_2: int,
) -> u.Quantity[u.m]:
"""Get the length of a baseline.
Args:
ms_path (Path | str): Path to MS
ant_1 (int): Antenna 1 index
ant_2 (int): Antenna 2 index
Returns:
u.Quantity[u.m]: Baseline length in metres
"""
ms_path = _as_path(ms_path)
with table((ms_path / "ANTENNA").as_posix(), ack=False) as ant_tab:
ant_pos = ant_tab.getcol("POSITION")
return np.linalg.norm(ant_pos[ant_2] - ant_pos[ant_1]) * u.m
[docs]
def get_telescope_name_from_ms(ms_path: Path | str) -> str:
ms_path = _as_path(ms_path)
with table((ms_path / "OBSERVATION").as_posix(), ack=False) as tab:
return str(tab.getcol("TELESCOPE_NAME")[0])
[docs]
def rename_telescope(
ms_path: Path,
telescope_name: str = "SKA-LOW",
) -> Path:
"""Rename TELESCOPE column
Args:
ms_path (Path): Path to MS
telescope_name (str, optional): New telescope name. Defaults to "SKA-LOW".
Returns:
Path: Updated MS path
"""
logger.info(f"Setting TELESCOPE_NAME to '{telescope_name}' in {ms_path}")
with table((ms_path).as_posix(), readonly=False, ack=True) as tab:
_ = tab # Keep linters happy
taql(f"UPDATE $tab::OBSERVATION SET TELESCOPE_NAME='{telescope_name}'")
return ms_path
[docs]
def update_ms_with_subtable(
ms_path: Path,
subtable_path: Path,
dry_run: bool = False,
) -> Path:
"""Add subtable to metadata
Args:
ms_path (Path): Path to MS
subtable_path (Path): Path to subtable
dry_run (bool, optional): Don't apply update. Defaults to False.
Returns:
Path: Updated MS
"""
if dry_run:
logger.info(f"Would make {subtable_path.name} a subtable of {ms_path}")
return ms_path
with (
table(ms_path.as_posix(), readonly=False, ack=False) as tab,
table(subtable_path.as_posix(), ack=False) as sub_tab,
):
tab.putkeyword(subtable_path.name, sub_tab, makesubrecord=True)
return ms_path
[docs]
def get_coord_from_ms(
ms_path: str | Path,
field_index: int = 0,
) -> SkyCoord:
ms_path = _as_path(ms_path)
with table((ms_path / "FIELD").as_posix(), ack=False) as tab:
field_row = tab.getcell("PHASE_DIR", field_index).flatten()
return SkyCoord(
ra=field_row[0],
dec=field_row[1],
unit=u.rad,
)
[docs]
def get_time_from_table(tab: table, use_centroid: bool = True) -> Time:
"""Get time from OPEN casacore tyable
Args:
tab (table): OPEN table
Returns:
Time: Times
"""
time_column = "TIME_CENTROID" if use_centroid else "TIME"
times_mjds = np.unique(tab.getcol(time_column)[:].flatten()) * u.s
return Time(times_mjds, format="mjd", scale="utc")
[docs]
def get_time_from_ms(
ms_path: str | Path,
use_centroid: bool = True,
) -> Time:
ms_path = _as_path(ms_path)
with table((ms_path).as_posix(), ack=False) as tab:
return get_time_from_table(tab, use_centroid=use_centroid)
[docs]
def get_freq_from_ms(
ms_path: str | Path,
) -> u.Quantity[u.Hz]:
ms_path = _as_path(ms_path)
with table((ms_path / "SPECTRAL_WINDOW").as_posix(), ack=False) as tab:
return tab.getcol("CHAN_FREQ").flatten() * u.Hz
[docs]
def get_location_from_ms(
ms_path: str | Path,
) -> EarthLocation:
ms_path = _as_path(ms_path)
with table((ms_path / "ANTENNA").as_posix(), ack=False) as tab:
location_array = tab.getcol("POSITION") * u.m
return EarthLocation.from_geocentric(
x=location_array[:, 0],
y=location_array[:, 1],
z=location_array[:, 2],
)
[docs]
def get_altaz_from_ms(
ms_path: str | Path,
field_index: int = 0,
) -> SkyCoord:
ms_path = _as_path(ms_path)
coord = get_coord_from_ms(ms_path, field_index=field_index)
time = get_time_from_ms(ms_path)
location = get_location_from_ms(ms_path)
return coord.transform_to(AltAz(obstime=time, location=location))
[docs]
def _fix_field_name(field_name: str) -> str:
field_name = "_".join(field_name.split())
if field_name.startswith("field_"):
field_name = field_name.replace("field_", "")
if field_name.endswith("_0"):
field_name = field_name.removesuffix("_0")
return field_name
[docs]
def get_field_name_from_ms(
ms_path: str | Path,
field_index: int = 0,
) -> str:
ms_path = _as_path(ms_path)
with table((ms_path / "FIELD").as_posix(), ack=False) as tab:
return _fix_field_name(str(tab.getcol("NAME")[field_index]))
[docs]
def get_columns_from_ms(
ms_path: str | Path,
) -> list[str]:
ms_path = _as_path(ms_path)
with table(ms_path.as_posix(), ack=False) as tab:
return list(tab.colnames())
[docs]
def get_antenna_names_from_ms(
ms_path: str | Path,
) -> list[str]:
ms_path = _as_path(ms_path)
with table((ms_path / "ANTENNA").as_posix(), ack=False) as tab:
return list(tab.getcol("NAME"))
[docs]
def get_antenna_index_from_name(
ms_path: str | Path,
name: str,
) -> int:
return get_antenna_names_from_ms(ms_path).index(name)
[docs]
def get_antenna_name_from_index(
ms_path: str | Path,
idx: int,
) -> str:
return get_antenna_names_from_ms(ms_path)[idx]
[docs]
class Antennas(NamedTuple):
[docs]
ant_1s: npt.NDArray[np.integer[Any]]
[docs]
ant_2s: npt.NDArray[np.integer[Any]]
[docs]
def get_antennas_from_ms(
ms_path: str | Path,
) -> Antennas:
ms_path = _as_path(ms_path)
with table((ms_path).as_posix(), ack=False) as tab:
ant_1s = np.unique(tab.getcol("ANTENNA1"))
ant_2s = np.unique(tab.getcol("ANTENNA2"))
return Antennas(ant_1s=ant_1s, ant_2s=ant_2s)