import logging
from dataclasses import dataclass
from typing import List, Optional, Sequence
import numpy as np
from astropy import units
from astropy.coordinates import Angle
from astropy.time import Time
from realtime.receive.core import baseline_utils, uvw_utils
from realtime.receive.core.antenna import Antenna
from realtime.receive.core.scan import PhaseDirection
logger = logging.getLogger(__name__)
@dataclass(frozen=False)
class State:
"""Contains metadata about a UVW calculation"""
time: Time
phase_direction: PhaseDirection
swap_baselines: bool = False
position_frame: str = "ITRF"
direction_frame: str = "ICRS"
time_tolerance: units.Quantity = 0.0 * units.s
direction_tolerance: units.Quantity = 0.0 * units.rad
def __eq__(self, other):
"""
Over-ridden equality for the state to introduce the concept of tolerance.
How long is the UVW cache valid for. Could be a real performance improvement
if there are a lot of baselines and/or UVW per channel.
"""
time_diff = abs(self.time - other.time)
ra_diff = self.phase_direction.ra - other.phase_direction.ra
dec_diff = self.phase_direction.dec - other.phase_direction.dec
if abs(time_diff) > self.time_tolerance:
return False
elif not ra_diff.is_within_bounds(self.direction_tolerance):
return False
elif not dec_diff.is_within_bounds(self.direction_tolerance):
return False
elif self.direction_frame != other.direction_frame:
return False
elif self.position_frame != other.position_frame:
return False
elif self.swap_baselines != other.swap_baselines:
return False
else:
return True
@dataclass()
class UVWCacheEntry:
"""A UVW calculation that matches the given state"""
state: State
cache: np.ndarray
[docs]
class UVWEngine:
"""
This is a container class that holds a cache of UVW objects.
On instantiation, it will take the minimal amount of information
required to form the UVWObject
THis is the Antennas, the Phase direction and the Time.
It will then create an empty UVWObject of the correct dimension.
After construction the interface is essentially just an update
to the time - or the direction.
The other states can be changed. But there is not an external interface
to that.
I reasoned that if you want to change a reference frame or the antennas then
you should probably create a new Engine. steve-ord
"""
[docs]
def num_stations(self):
"""
Return the number of stations
"""
return len(self._antennas)
[docs]
def num_baselines(self):
"""
Return the number of baselines
"""
return len(self._baselines)
[docs]
def baseline_indices(self):
"""
Return the baseline indices
"""
return self._baselines
def __init__(
self,
antennas: Sequence[Antenna],
baselines: Optional[Sequence[tuple]] = None,
swap_baselines=False,
include_autos=True,
lower_triangular=False,
position_frame="ITRF",
direction_frame="J2000",
):
"""
The default constructor for the UVWEngine container class:
:param antennas: Tuple (ordered List) of realtime.receive.core.base_tm.Antenna objects.
:param swap_baselines: Bool - should we change the antenna order in the baseline
:param position_frame: The physical frame of reference for the antenna positions
:param direction_frame: The frame for the direction
"""
self._library: List[UVWCacheEntry] = []
self._antennas = antennas
self._baselines = baselines
# These are set an instantiation but can be overridden
self._swap_baselines = swap_baselines
self._position_frame = position_frame
self._direction_frame = direction_frame
if self._direction_frame != "J2000":
if self._direction_frame != "j2000":
logger.warning(
"Position frame is not J2000 - "
"but all baselines are being generated in J2000 frame"
)
self._direction_frame = "J2000"
if self._baselines is None:
self._baselines = baseline_utils.baseline_indices(
num_stations=self.num_stations(),
autocorr=include_autos,
lower_triangular=lower_triangular,
)
baseline_utils.validate_antenna_and_baseline_counts(
self.num_stations(), self.num_baselines()
)
[docs]
def get_uvw(
self,
time: Time,
phase_direction: PhaseDirection,
direction_frame=None,
position_frame=None,
swap_baselines=None,
) -> Optional[np.ndarray]:
"""
This method given a time it updates the individual antenna UVW,
THen updates the baseline UVW. It first checks the current state machine
against the proposed state. It may not have to update the UVW at all.
:param time: The time as an astropy.Time object
:param phase_direction: The direction as a realtime.receiver.core.PhaseDirection object
:param position_frame: the frame of the antenna positions
:param swap_baselines: are the baselines defined as ant1-ant2 or ant2-ant1
:param direction_frame: What frame of reference is the look direction in. Note
We do not compare equivelance across frames. So if either the direction frame
or the direction itself change it is considered a state change - even if the resulting
UVW would be the same.
"""
# Default to current state
if time is None:
return None
if phase_direction is None:
return None
if swap_baselines is None:
swap_baselines = self._swap_baselines
if direction_frame is None:
direction_frame = self._direction_frame
if position_frame is None:
position_frame = self._position_frame
requested_state = State(
time=time,
phase_direction=phase_direction,
direction_frame=direction_frame,
position_frame=position_frame,
swap_baselines=swap_baselines,
)
_entry = self._get_matching_entry_from_library(requested_state)
if _entry is not None:
return _entry.cache
else:
# create a new entry and update the current cache
antenna_uvws = self._update_antenna_uvws(requested_state)
baseline_uvws = self._update_baseline_uvws(antenna_uvws, requested_state)
_entry = UVWCacheEntry(state=requested_state, cache=baseline_uvws)
if self.num_entries() > 2:
self._library.pop()
self._library.insert(0, _entry)
return _entry.cache
def _update_antenna_uvws(self, requested_state: State) -> List[np.ndarray]:
"""
Internal class to manage the updating of the UVW of the antenna. This is essentially the
baseline between the antenna location and the centre of the earth.
"""
uvw_list: List[np.ndarray] = []
for index in range(self.num_stations()):
antenna = self._antennas[index]
pos = np.array(antenna.pos)
uvw = self.get_antenna_uvw(
pos,
requested_state.time,
requested_state.phase_direction.ra,
requested_state.phase_direction.dec,
swap_baselines=False, # always set this to false as the swap is done on the baseline
position_frame=requested_state.position_frame,
)
uvw_list.append(uvw)
return uvw_list
def _update_baseline_uvws(self, uvws: list, requested_state: State) -> np.ndarray:
"""
Simply subtracts one antenna UVW from the other. The swap_baselines state
determines whether the baseline is ant1-ant2 or ant2-ant1
"""
swap = 1.0
if requested_state.swap_baselines:
swap = -1.0
baselines_a1 = np.array(tuple(index[0] for index in self.baseline_indices()), dtype=int)
baselines_a2 = np.array(tuple(index[1] for index in self.baseline_indices()), dtype=int)
uvws = np.array(uvws)
uvw_1 = uvws[baselines_a1]
uvw_2 = uvws[baselines_a2]
baseline_uvws = (uvw_1 - uvw_2) * swap
return baseline_uvws
def _get_matching_entry_from_library(self, test_state: State) -> Optional[UVWCacheEntry]:
for entry in self._library:
if entry.state == test_state:
# pull the state and cache out of the library
return entry
return None
[docs]
@staticmethod
def get_antenna_uvw(
antenna_location: np.ndarray,
epoch: Time,
ra: Angle,
dec: Angle,
position_frame="itrf",
epoch_frame="J2000",
swap_baselines=False,
) -> np.ndarray:
"""
Return the geocentric uvw vector for the the given position at the given epoch.
Note: This actually just calls get_uvw_J2000 with the geocentre as the other antenna and this antenna as the reference
position.
Note: This is the per antenna calculation that you probably want
:param list antenna_location: position of the antenna (vector geocentric XYZ in m)
:param astropy.Time epoch: Time for which to calculate the UVW
:param astropy.Angle ra: Right Ascension (J2000)
:param astropy.Angle dec: Declination (J2000)
:param str position_frame: The frame of the input positions
(generally WGS84 or ITRF) we are using casacore for
this and these are the two frames supported
:param str epoch_frame: Whether you want a catalog (J2000) frome or epoch of date (default: 'J2000')
:param Bool swap_baselines: (xyposB - xyposA) is assumed - if the reverse is required set this to True
:returns: The geocentric UVW baseline
:rtype: numpy.ndarray [u,v,w]
"""
xyposB = np.array([0, 0, 0])
if epoch_frame in ("J2000", "j2000"):
uvw = uvw_utils.get_uvw_J2000(
antenna_location,
xyposB,
antenna_location,
epoch,
ra,
dec,
position_frame,
swap_baselines,
)
else:
uvw = uvw_utils.get_uvw(antenna_location, xyposB, epoch, ra, dec, swap_baselines)
return uvw
[docs]
def num_entries(self):
"""Number of UVW calculations that are stored"""
return len(self._library)