Source code for realtime.receive.core.uvw_engine

import logging
from dataclasses import dataclass
from typing import List, Optional, Sequence

import numpy as np
from astropy import units
from astropy.coordinates import Angle
from astropy.time import Time

from realtime.receive.core import baseline_utils, uvw_utils
from realtime.receive.core.antenna import Antenna
from realtime.receive.core.scan import PhaseDirection

logger = logging.getLogger(__name__)


@dataclass(frozen=False)
class State:
    """Contains metadata about a UVW calculation"""

    time: Time
    phase_direction: PhaseDirection
    swap_baselines: bool = False
    position_frame: str = "ITRF"
    direction_frame: str = "ICRS"
    time_tolerance: units.Quantity = 0.0 * units.s
    direction_tolerance: units.Quantity = 0.0 * units.rad

    def __eq__(self, other):
        """
        Over-ridden equality for the state to introduce the concept of tolerance.
        How long is the UVW cache valid for. Could be a real performance improvement
        if there are a lot of baselines and/or UVW per channel.
        """
        time_diff = abs(self.time - other.time)
        ra_diff = self.phase_direction.ra - other.phase_direction.ra
        dec_diff = self.phase_direction.dec - other.phase_direction.dec

        if abs(time_diff) > self.time_tolerance:
            return False

        elif not ra_diff.is_within_bounds(self.direction_tolerance):
            return False

        elif not dec_diff.is_within_bounds(self.direction_tolerance):
            return False

        elif self.direction_frame != other.direction_frame:
            return False

        elif self.position_frame != other.position_frame:
            return False

        elif self.swap_baselines != other.swap_baselines:
            return False

        else:
            return True


@dataclass()
class UVWCacheEntry:
    """A UVW calculation that matches the given state"""

    state: State
    cache: np.ndarray


[docs] class UVWEngine: """ This is a container class that holds a cache of UVW objects. On instantiation, it will take the minimal amount of information required to form the UVWObject THis is the Antennas, the Phase direction and the Time. It will then create an empty UVWObject of the correct dimension. After construction the interface is essentially just an update to the time - or the direction. The other states can be changed. But there is not an external interface to that. I reasoned that if you want to change a reference frame or the antennas then you should probably create a new Engine. steve-ord """
[docs] def num_stations(self): """ Return the number of stations """ return len(self._antennas)
[docs] def num_baselines(self): """ Return the number of baselines """ return len(self._baselines)
[docs] def baseline_indices(self): """ Return the baseline indices """ return self._baselines
def __init__( self, antennas: Sequence[Antenna], baselines: Optional[Sequence[tuple]] = None, swap_baselines=False, include_autos=True, lower_triangular=False, position_frame="ITRF", direction_frame="J2000", ): """ The default constructor for the UVWEngine container class: :param antennas: Tuple (ordered List) of realtime.receive.core.base_tm.Antenna objects. :param swap_baselines: Bool - should we change the antenna order in the baseline :param position_frame: The physical frame of reference for the antenna positions :param direction_frame: The frame for the direction """ self._library: List[UVWCacheEntry] = [] self._antennas = antennas self._baselines = baselines # These are set an instantiation but can be overridden self._swap_baselines = swap_baselines self._position_frame = position_frame self._direction_frame = direction_frame if self._direction_frame != "J2000": if self._direction_frame != "j2000": logger.warning( "Position frame is not J2000 - " "but all baselines are being generated in J2000 frame" ) self._direction_frame = "J2000" if self._baselines is None: self._baselines = baseline_utils.baseline_indices( num_stations=self.num_stations(), autocorr=include_autos, lower_triangular=lower_triangular, ) baseline_utils.validate_antenna_and_baseline_counts( self.num_stations(), self.num_baselines() )
[docs] def get_uvw( self, time: Time, phase_direction: PhaseDirection, direction_frame=None, position_frame=None, swap_baselines=None, ) -> Optional[np.ndarray]: """ This method given a time it updates the individual antenna UVW, THen updates the baseline UVW. It first checks the current state machine against the proposed state. It may not have to update the UVW at all. :param time: The time as an astropy.Time object :param phase_direction: The direction as a realtime.receiver.core.PhaseDirection object :param position_frame: the frame of the antenna positions :param swap_baselines: are the baselines defined as ant1-ant2 or ant2-ant1 :param direction_frame: What frame of reference is the look direction in. Note We do not compare equivelance across frames. So if either the direction frame or the direction itself change it is considered a state change - even if the resulting UVW would be the same. """ # Default to current state if time is None: return None if phase_direction is None: return None if swap_baselines is None: swap_baselines = self._swap_baselines if direction_frame is None: direction_frame = self._direction_frame if position_frame is None: position_frame = self._position_frame requested_state = State( time=time, phase_direction=phase_direction, direction_frame=direction_frame, position_frame=position_frame, swap_baselines=swap_baselines, ) _entry = self._get_matching_entry_from_library(requested_state) if _entry is not None: return _entry.cache else: # create a new entry and update the current cache antenna_uvws = self._update_antenna_uvws(requested_state) baseline_uvws = self._update_baseline_uvws(antenna_uvws, requested_state) _entry = UVWCacheEntry(state=requested_state, cache=baseline_uvws) if self.num_entries() > 2: self._library.pop() self._library.insert(0, _entry) return _entry.cache
def _update_antenna_uvws(self, requested_state: State) -> List[np.ndarray]: """ Internal class to manage the updating of the UVW of the antenna. This is essentially the baseline between the antenna location and the centre of the earth. """ uvw_list: List[np.ndarray] = [] for index in range(self.num_stations()): antenna = self._antennas[index] pos = np.array(antenna.pos) uvw = self.get_antenna_uvw( pos, requested_state.time, requested_state.phase_direction.ra, requested_state.phase_direction.dec, swap_baselines=False, # always set this to false as the swap is done on the baseline position_frame=requested_state.position_frame, ) uvw_list.append(uvw) return uvw_list def _update_baseline_uvws(self, uvws: list, requested_state: State) -> np.ndarray: """ Simply subtracts one antenna UVW from the other. The swap_baselines state determines whether the baseline is ant1-ant2 or ant2-ant1 """ swap = 1.0 if requested_state.swap_baselines: swap = -1.0 baselines_a1 = np.array(tuple(index[0] for index in self.baseline_indices()), dtype=int) baselines_a2 = np.array(tuple(index[1] for index in self.baseline_indices()), dtype=int) uvws = np.array(uvws) uvw_1 = uvws[baselines_a1] uvw_2 = uvws[baselines_a2] baseline_uvws = (uvw_1 - uvw_2) * swap return baseline_uvws def _get_matching_entry_from_library(self, test_state: State) -> Optional[UVWCacheEntry]: for entry in self._library: if entry.state == test_state: # pull the state and cache out of the library return entry return None
[docs] @staticmethod def get_antenna_uvw( antenna_location: np.ndarray, epoch: Time, ra: Angle, dec: Angle, position_frame="itrf", epoch_frame="J2000", swap_baselines=False, ) -> np.ndarray: """ Return the geocentric uvw vector for the the given position at the given epoch. Note: This actually just calls get_uvw_J2000 with the geocentre as the other antenna and this antenna as the reference position. Note: This is the per antenna calculation that you probably want :param list antenna_location: position of the antenna (vector geocentric XYZ in m) :param astropy.Time epoch: Time for which to calculate the UVW :param astropy.Angle ra: Right Ascension (J2000) :param astropy.Angle dec: Declination (J2000) :param str position_frame: The frame of the input positions (generally WGS84 or ITRF) we are using casacore for this and these are the two frames supported :param str epoch_frame: Whether you want a catalog (J2000) frome or epoch of date (default: 'J2000') :param Bool swap_baselines: (xyposB - xyposA) is assumed - if the reverse is required set this to True :returns: The geocentric UVW baseline :rtype: numpy.ndarray [u,v,w] """ xyposB = np.array([0, 0, 0]) if epoch_frame in ("J2000", "j2000"): uvw = uvw_utils.get_uvw_J2000( antenna_location, xyposB, antenna_location, epoch, ra, dec, position_frame, swap_baselines, ) else: uvw = uvw_utils.get_uvw(antenna_location, xyposB, epoch, ra, dec, swap_baselines) return uvw
[docs] def num_entries(self): """Number of UVW calculations that are stored""" return len(self._library)