Source code for ska_low_mccs.station.point_station

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
MCCS pointing calculation prototype.

This is originally from the AAVS code.
"""

from __future__ import annotations  # allow forward references in type hints

import logging
import warnings
from datetime import datetime
from typing import Any, Optional

import katpoint
import numpy as np
from astropy.constants import c  # pylint: disable=no-name-in-module
from astropy.coordinates import Angle, EarthLocation, get_sun
from astropy.time import TimeDelta
from astropy.time.core import Time
from astropy.utils.exceptions import AstropyWarning

warnings.simplefilter("ignore", category=AstropyWarning)

__author__ = "Alessio Magro"

antennas_per_tile = 16


# Astropy needs to cache https://datacenter.iers.org/data/9/finals2000A.all
# This takes time and should be pre-cached.
# This is being done by the OCI image, but leaving this here just in case.
print("Premptively caching EIRS data...")
Time.now().ut1  # pylint: disable=expression-not-assigned
print("Done.")


[docs] class AntennaInformation: """Class for holding a station's antenna information."""
[docs] def __init__(self: AntennaInformation) -> None: """ Initialize AntennaInformation object. By default it will have 256 elements but no displacements, &c. """ self.nof_elements = 256 self.xyz = np.zeros((256, 3))
[docs] def load_displacements_file(self: AntennaInformation, txtfile: str) -> None: """ Load antenna displacements from a text file. The file is formatted as per AAVS_loc_italia_190429.txt This 4 float-formatted columns separated by spaces. The column order is - Element x y TPM The header line is skipped, and the TPM column is ignored. x and y units are metres :param txtfile: displacements file """ aavs2 = np.loadtxt(txtfile, skiprows=1) element_ids = aavs2[:, 0].astype(int) self.nof_elements = aavs2.shape[0] self.xyz = np.append( aavs2[np.argsort(element_ids - 1), 1:3], np.zeros((self.nof_elements, 1)), axis=1, )
[docs] def load_displacements_arrays( self: AntennaInformation, xyz_positions: np.ndarray, element_ids: np.ndarray, ) -> None: """ Load antenna displacements from numpy arrays. :param xyz_positions: array of the x, y, z positions of the antennas :param element_ids: array of the element ids """ self.nof_elements = xyz_positions.shape[0] self.xyz = xyz_positions[np.argsort(element_ids.astype(int) - 1)]
[docs] def load_displacements_individual( self: AntennaInformation, x_pos: float, y_pos: float, z_pos: float, element_id: int, ) -> None: """ Load antenna displacements individually. :param x_pos: x offset from station :param y_pos: y offset from station :param z_pos: z offset from station :param element_id: Id of the element """ self.xyz[element_id - 1] = [x_pos, y_pos, z_pos]
[docs] class StationInformation: """Class for holding information about a station."""
[docs] def __init__(self: StationInformation) -> None: """ Initialize a new instance. The instance is initialise with no location data and with a default AntennaInformation object (which will have no element displacement data). """ self.latitude: Optional[float] = None self.longitude: Optional[float] = None self.ellipsoidalheight: Optional[float] = None self.antennas = AntennaInformation()
[docs] def load_displacements_file(self: StationInformation, antennafile: str) -> None: """ Proxy to the method in the associated AntennaInformation object. :param antennafile: displacements file """ self.antennas.load_displacements_file(antennafile)
[docs] def load_displacements_arrays( self: StationInformation, xyz_positions: np.ndarray, element_ids: np.ndarray, ) -> None: """ Proxy to the method in the associated AntennaInformation object. :param xyz_positions: array of the x, y, z positions of the antennas :param element_ids: array of the element ids """ self.antennas.load_displacements_arrays( xyz_positions, element_ids, )
[docs] def load_displacements_individual( self: StationInformation, x_pos: float, y_pos: float, z_pos: float, element_id: int, ) -> None: """ Load antenna displacements individually. :param x_pos: x offset from station :param y_pos: y offset from station :param z_pos: z offset from station :param element_id: Id of the element """ self.antennas.load_displacements_individual( x_pos, y_pos, z_pos, element_id, )
[docs] def set_location( self: StationInformation, latitude: float, longitude: float, ellipsoidalheight: float, ) -> None: """ Set the location data for this station. :param latitude: the latitude of the station (WGS84) :param longitude: the longitude of the station (WGS84) :param ellipsoidalheight: the ellipsoidal height of the station :raises ValueError: values out of range """ if not -90.0 <= latitude <= 90.0: raise ValueError( f"latitude out of range, latitude: {latitude}, range -90, 90" ) if not -180.0 <= longitude <= 180.0: raise ValueError( "longitude out of range, longitude: {longitude}, range -180, 180" ) # Probably this range could be narrowed if not -107.0 <= ellipsoidalheight <= 8870.5: raise ValueError( """ellipsoidalheight out of range, ellipsoidalheight: {ellipsoidalheight}, range -107, 8870.5""" ) self.latitude = latitude self.longitude = longitude self.ellipsoidalheight = ellipsoidalheight
# pylint: disable=too-many-instance-attributes
[docs] class Pointing: """Helper class for generating beamforming coefficients."""
[docs] def __init__(self: Pointing, station_info: StationInformation) -> None: """ Pointing class, generates delay and delay rates to be downloaded to TPMs. :param station_info: Basic information for station location and antenna displacements """ # Store arguments self.station = station_info # Get station location self._longitude = self.station.longitude self._latitude = self.station.latitude self._height = self.station.ellipsoidalheight # Initial az/el pointing self._az = 0.0 self._el = 0.0 self._antennas = self.station.antennas self._nof_antennas = self._antennas.nof_elements # Get reference antenna location self._reference_antenna_loc = EarthLocation.from_geodetic( self._longitude, self._latitude, height=self._height, ellipsoid="WGS84", ) # Placeholder for delays self._delays: np.ndarray = None # type: ignore[assignment] self._delay_rates: np.ndarray = None # type: ignore[assignment]
[docs] def delays(self: Pointing) -> np.ndarray: """ Return delay values. :return: delay values """ return self._delays
[docs] def delay_rates(self: Pointing) -> np.ndarray: """ Return delay rates values. :return: delay rates values """ return self._delay_rates
# -------------------------------- POINTING FUNCTIONS --------------------------
[docs] def point_to_sun(self: Pointing, pointing_time: Optional[Time] = None) -> None: """ Generate delays to point towards the sun for the given time. :param pointing_time: Time at which delays should be generated """ # If no time is specified, get current time if pointing_time is None: pointing_time = Time(datetime.utcnow(), scale="tai") # Get sun position in RA, DEC and convert to Alz, Az in telescope # reference frame sun_position = get_sun(pointing_time) alt, az = self._ra_dec_to_alt_az( Angle(sun_position.ra), Angle(sun_position.dec), pointing_time, ) # Compute delays self.point_array_static(alt, az)
[docs] def point_to_special( self: Pointing, name: str, pointing_time: Optional[Time] = None ) -> None: """ Generate delays to point towards special named bodies for the given time. :param name: The special target to be pointed at :param pointing_time: Time at which delays should be generated """ # If no time is specified, get current time if pointing_time is None: pointing_time = Time(datetime.utcnow(), scale="tai") # Get coords from target name and convert to ra, dec in telescope target = katpoint.Target(f"{name}, special") ant = katpoint.Antenna(self._reference_antenna_loc) radec = target.radec(timestamp=pointing_time, antenna=ant) ra = Angle(radec.ra) dec = Angle(radec.dec) self.point_array_equatorial(right_ascension=ra, declination=dec)
[docs] def point_galactic( self: Pointing, longitude: float | Angle, latitude: float | Angle, delta_time: float = 10.0, pointing_time: Optional[Any] = None, ) -> None: """ Generate delays to point towards galactic coordinates. :param longitude: Longitude coordinates for target :param latitude: latitude coordinates for target :param delta_time: time in seconds from pointing time for calculating the delay rate :param pointing_time: time at the start of the observation """ # If no time is specified, get current time if pointing_time is None: pointing_time = Time(datetime.utcnow(), scale="tai") # Get coords from galactic coords and convert to ra, dec in telescope # reference frame target = katpoint.Target(f"gal, {longitude}, {latitude}") ant = katpoint.Antenna(self._reference_antenna_loc) radec = target.radec(timestamp=pointing_time, antenna=ant) ra = Angle(radec.ra) dec = Angle(radec.dec) self.point_array_equatorial( right_ascension=ra, declination=dec, pointing_time=pointing_time, delta_time=delta_time, )
[docs] def point_tle( self: Pointing, line1: str, line2: str, pointing_time: Optional[Any] = None, delta_time: float = 10.0, ) -> None: """ Generate delays to point towards tle coordinates. :param line1: Line one of an orbiting body :param line2: Line two of an orbiting body :param pointing_time: time at the start of the observation :param delta_time: time in seconds from pointing time for calculating the delay rate """ # If no time is specified, get current time if pointing_time is None: pointing_time = Time(datetime.utcnow(), scale="tai") target = katpoint.Target(f"tle, {line1}, {line2}") ant = katpoint.Antenna(self._reference_antenna_loc) azel = target.azel(timestamp=pointing_time, antenna=ant) az1 = Angle(azel.az) alt1 = Angle(azel.alt) azel2 = target.azel( timestamp=pointing_time + TimeDelta(delta_time, format="sec"), antenna=ant, ) alt2 = Angle(azel2.alt) az2 = Angle(azel2.az) self.point_array_static(altitude=alt1, azimuth=az1) # Calculate required delay rate if delta_time == 0.0: self._delay_rates = self._delays * 0.0 else: self._delay_rates = self._get_delay_rates( alt1, az1, alt2, az2, delta_time, )
[docs] def point_array_static( self: Pointing, altitude: float | Angle, azimuth: float | Angle, ) -> None: """ Calculate the delay given the altitude and azimuth coordinates of a sky object. :param altitude: altitude coordinates of a sky object as astropy angle :param azimuth: azimuth coordinates of a sky object as astropy angles """ # Type conversions if required altitude_angle = self.convert_to_astropy_angle(altitude) azimuth_angle = self.convert_to_astropy_angle(azimuth) self._az = azimuth_angle.deg self._el = altitude_angle.deg # Compute the delays self._delays = self._delays_from_altitude_azimuth( # type: ignore[assignment] altitude_angle, azimuth_angle ) self._delay_rates = self._delays * 0
# pylint: disable=too-many-arguments
[docs] def point_array_alt_az( self: Pointing, altitude: Angle, azimuth: Angle, altitude_rate: Angle = Angle(0.0, unit="deg"), azimuth_rate: Angle = Angle(0.0, unit="deg"), reference_time: Optional[Time] = None, delta_time: float = 10.0, pointing_time: Optional[Time] = None, ) -> None: """ Calculate the delay given the altitude and azimuth coordinates of a sky object. :param altitude: altitude coordinates of a sky object as astropy angle :param azimuth: azimuth coordinates of a sky object as astropy angles :param altitude_rate: rate of change of altitude in deg/s, used for moving objects :param azimuth_rate: rate of change of azimuth in deg/s, used for moving objects :param reference_time: Time at which the source is at the input altitude and azimuth, only required if using altitude_rate and azimuth_rate :param delta_time: time in seconds from pointing time for calculating the delay rate :param pointing_time: time at the start of the observation """ # Type conversions if required altitude = self.convert_to_astropy_angle(altitude) azimuth = self.convert_to_astropy_angle(azimuth) # If no time is specified, get current time if pointing_time is None: pointing_time = Time(datetime.utcnow(), scale="utc") # If reference time is specified, compute elapsed time since it if reference_time is not None: rate_time = (pointing_time - reference_time).to_value(format="sec") else: rate_time = 0.0 # Calculate required delay alt_1, az_1 = ( altitude + altitude_rate * rate_time, azimuth + azimuth_rate * rate_time, ) alt_2, az_2 = ( altitude + altitude_rate * (rate_time + delta_time), azimuth + azimuth_rate * (rate_time + delta_time), ) # Generate delays from calculated altitude and azimuth self.point_array_static(altitude=alt_1, azimuth=az_1) # Calculate required delay rate if delta_time == 0.0: self._delay_rates = self._delays * 0.0 else: self._delay_rates = self._get_delay_rates( alt_1, az_1, alt_2, az_2, delta_time, )
# pylint: disable=too-many-arguments
[docs] def point_array_equatorial( self: Pointing, right_ascension: Angle, declination: Angle, ra_rate: Angle = Angle(0.0, unit="deg"), dec_rate: Angle = Angle(0.0, unit="deg"), reference_time: Optional[Time] = None, delta_time: float = 10.0, pointing_time: Optional[Time] = None, ) -> None: """ Calculate the delay and delay_rate for a station pointing. Calculate the delay and delay_rate (change in delay over delta_time) for a station pointing described by equatorial coordinates (the input right_ascension and declination). :param right_ascension: Right ascension of source in degrees :param declination: Declination of source in degrees :param ra_rate: Rate of right ascension in deg/s, used for moving objects :param dec_rate: Rate of declination in deg/s, used for moving objects :param reference_time: Time at which the source is at the input right_acension and declination, only required if using ra_rate and dec_rate :param delta_time: Time in seconds from pointing_time for calculating the delay rate :param pointing_time: Time at the start of the observation """ # If no time is specified, get current time if pointing_time is None: pointing_time = Time(datetime.utcnow(), scale="utc") # If reference time is specified, compute elapsed time since it if reference_time is not None: rate_time = (pointing_time - reference_time).to_value(format="sec") else: rate_time = 0.0 # Calculate required delay alt_1, az_1 = self._ra_dec_to_alt_az( right_ascension + ra_rate * rate_time, declination + dec_rate * rate_time, pointing_time, ) alt_2, az_2 = self._ra_dec_to_alt_az( right_ascension + ra_rate * (rate_time + delta_time), declination + dec_rate * (rate_time + delta_time), pointing_time + TimeDelta(delta_time, format="sec"), ) # Generate delays from calculated altitude and azimuth self.point_array_static(altitude=alt_1, azimuth=az_1) # Calculate required delay rate if delta_time == 0.0: self._delay_rates = self._delays * 0.0 else: self._delay_rates = self._get_delay_rates( alt_1, az_1, alt_2, az_2, delta_time, )
def _get_delay_rates( self: Pointing, alt_1: Angle, az_1: Angle, alt_2: Angle, az_2: Angle, delta_time: float, ) -> np.ndarray: delays_1 = self._delays_from_altitude_azimuth(alt_1, az_1) delays_2 = self._delays_from_altitude_azimuth(alt_2, az_2) rates = np.zeros(256) for i, _ in enumerate(delays_1): rates[i] = (delays_2[i] - delays_1[i]) / delta_time return rates
[docs] def get_pointing_coefficients( self: Pointing, start_channel: int, nof_channels: int ) -> Optional[list[list[np.complex128]]]: """ Get complex pointing coefficients from generated delays. :param start_channel: Start channel index :param nof_channels: Number of channels starting with start_channel :return: Return the pointing coefficients as a tuple of numpy complex values """ if self._delays is None: logging.error("No pointing delays generated") return None # Compute frequency range channel_bandwidth = 400e6 / 512.0 frequencies = np.array( [ start_channel * channel_bandwidth + i * channel_bandwidth for i in range(nof_channels) ] ) # Generate coefficients coefficients = np.zeros((self._nof_antennas, nof_channels), dtype=np.cfloat) for i in range(nof_channels): delays = 2.0 * np.pi * frequencies[i] * self._delays coefficients[:, i] = np.cos(delays) + 1j * np.sin(delays) # All done, return coefficients return coefficients # type: ignore[return-value]
def _delays_from_altitude_azimuth( self: Pointing, altitude: Angle, azimuth: Angle ) -> np.ndarray: """ Calculate the delay using a target altitude Azimuth. Calculated delay is negative towards source, is the geometric delay due to the light travel time to be corrected, NOT the delay inserted in the beamformer. :param altitude: The altitude of the target astropy angle :param azimuth: The azimuth of the target astropy angle :return: The delay in seconds for each antenna """ # Calculate transformation scale = np.array( [ np.cos(altitude) * np.sin(azimuth), np.cos(altitude) * np.cos(azimuth), np.sin(altitude), ] ) # Apply to antenna displacements assert self._antennas.xyz is not None # for the type checker path_length = np.dot(scale, self._antennas.xyz.T) # Return frequency-independent geometric delays return np.multiply(-1.0 / c.value, path_length) def _ra_dec_to_alt_az( self: Pointing, right_ascension: float | Angle, declination: float | Angle, obstime: Time, ) -> list[Angle]: """ Calculate the altitude and azimuth coordinates of a sky object. From right ascension and declination and time. :param right_ascension: Right ascension of source - astropy Angle / string convertable to Angle :param declination: Declination of source - astropy Angle / string convertable to Angle :param obstime: Time of observation (as astropy Time") :return: List containing altitude and azimuth of source as astropy angle """ # Initialise a katpoint Target with the default frame (ICRS) and convert to # horizontal coordinates (altitude/azimuth) from the antenna's perspective. target = katpoint.Target.from_radec(ra=right_ascension, dec=declination) ant = katpoint.Antenna(self._reference_antenna_loc) altaz = target.azel(timestamp=obstime, antenna=ant) alt = Angle(altaz.alt) az = Angle(altaz.az) return [alt, az]
[docs] @staticmethod def convert_to_astropy_angle(angle: str | float | Angle) -> Angle: """ Convert a number or string to an Astropy angle. :param angle: angle :return: converted angle """ if not isinstance(angle, Angle): return Angle(angle, "deg") return angle
[docs] def is_above_horizon( self: Pointing, right_ascension: Angle, declination: Angle, pointing_time: Time, ) -> bool: """ Check if the target is above the horizon, given time for the reference antenna. :param right_ascension: The right ascension of the target as a astropy angle :param declination: The declination of the target as an astropy angle. :param pointing_time: The observation time as an astropy Time. :return: True if the target coordinates are above the horizon at the specified time, false otherwise. """ alt, _ = self._ra_dec_to_alt_az( right_ascension, declination, pointing_time, ) return alt > 0.0