Source code for low_comm_tools.stations
from __future__ import annotations
from importlib import resources
from typing import Literal
import astropy.units as u
import numpy as np
import polars as pl
from astropy.constants import c as speed_of_light
from astropy.table import Table
from low_comm_tools import data
from low_comm_tools.constants import FIBRE_REFRACTIVE_INDEX
from low_comm_tools.exceptions import StationError, UnitError
[docs]
AAType = Literal["AA0.5", "AA1", "AA2", "AA*"]
[docs]
def load_rotation(unit: Literal["deg", "rad"] = "rad") -> dict[str, float]:
with resources.as_file(resources.files(data)) as data_path:
station_data_path = data_path / "station_data.csv"
station_data = Table.read(station_data_path)["station_name", "rot_deg"]
match unit:
case "deg":
return dict(
zip(
station_data["station_name"].astype(str).tolist(),
station_data["rot_deg"].astype(float).tolist(),
strict=False,
)
)
case "rad":
return dict(
zip(
station_data["station_name"].astype(str).tolist(),
np.deg2rad(station_data["rot_deg"]).astype(float).tolist(),
strict=False,
)
)
case _:
msg = f"Unknown unit '{unit}'. Must be 'deg' or 'rad'" # type: ignore[unreachable]
raise UnitError(msg)
[docs]
def _parse_station_name(station_name: str) -> str:
return station_name.split(".", maxsplit=1)[0]
[docs]
def get_station_rotation(
station_name: str, unit: Literal["deg", "rad"] = "rad"
) -> float:
with resources.as_file(resources.files(data)) as data_path:
station_data_path = data_path / "station_data.csv"
station_df = pl.scan_csv(station_data_path)
rot_df = (
station_df.filter(pl.col("station_name") == _parse_station_name(station_name))
.select("rot_deg")
.collect()
)
if len(rot_df) == 0:
msg = f"Station '{station_name}' is not in layout data file."
raise StationError(msg)
rot_deg = float(rot_df.item())
if unit == "deg":
return rot_deg
if unit == "rad":
return float(np.deg2rad(rot_deg))
msg = f"Unknown unit '{unit}'. Must be 'deg' or 'rad'" # type: ignore[unreachable]
raise UnitError(msg)
[docs]
def get_station_cable_delay(station_name: str, unit: Literal["m", "s"] = "m") -> float:
with resources.as_file(resources.files(data)) as data_path:
station_data_path = data_path / "fibre_otdr_lengths.csv"
fibre_df = pl.scan_csv(station_data_path)
delay_df = (
fibre_df.filter(pl.col("station_name") == _parse_station_name(station_name))
.select("fibre_length_m")
.collect()
)
if len(delay_df) == 0:
msg = f"Station '{station_name}' is not in fibre data file."
raise StationError(msg)
delay_m = float(delay_df.item())
if unit == "m":
return delay_m
if unit == "s":
return float(
delay_m / (speed_of_light.to(u.m / u.s).value / FIBRE_REFRACTIVE_INDEX)
)
msg = f"Unknown unit '{unit}'. Must be 'm' or 's'" # type: ignore[unreachable]
raise UnitError(msg)