# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
MCCS pointing calculation prototype.
This is originally from the AAVS code.
"""
from __future__ import annotations # allow forward references in type hints
import logging
import warnings
from datetime import datetime
from typing import Optional
import numpy as np
from astropy.constants import c # pylint: disable=no-name-in-module
from astropy.coordinates import AltAz, Angle, EarthLocation, SkyCoord, get_sun
from astropy.time import TimeDelta
from astropy.time.core import Time
from astropy.utils.exceptions import AstropyWarning
warnings.simplefilter("ignore", category=AstropyWarning)
__author__ = "Alessio Magro"
antennas_per_tile = 16
# Astropy needs to cache https://datacenter.iers.org/data/9/finals2000A.all
# This takes time and should be pre-cached.
# This is being done by the OCI image, but leaving this here just in case.
print("Premptively caching EIRS data...")
Time.now().ut1 # pylint: disable=expression-not-assigned
print("Done.")
# pylint: disable=too-many-instance-attributes
[docs]class Pointing:
"""Helper class for generating beamforming coefficients."""
[docs] def __init__(self: Pointing, station_info: StationInformation) -> None:
"""
Pointing class, generates delay and delay rates to be downloaded to TPMs.
:param station_info: Basic information for station location and
antenna displacements
"""
# Store arguments
self.station = station_info
# Get station location
self._longitude = self.station.longitude
self._latitude = self.station.latitude
self._height = self.station.ellipsoidalheight
# Initial az/el pointing
self._az = 0.0
self._el = 0.0
self._antennas = self.station.antennas
self._nof_antennas = self._antennas.nof_elements
# Get reference antenna location
self._reference_antenna_loc = EarthLocation.from_geodetic(
self._longitude,
self._latitude,
height=self._height,
ellipsoid="WGS84",
)
# Placeholder for delays and flag for below horizon
self._below_horizon = False
self._delays: np.ndarray = None # type: ignore[assignment]
self._delay_rates: np.ndarray = None # type: ignore[assignment]
[docs] def delays(self: Pointing) -> np.ndarray:
"""
Return delay values.
:return: delay values
"""
return self._delays
[docs] def delay_rates(self: Pointing) -> np.ndarray:
"""
Return delay rates values.
:return: delay rates values
"""
return self._delay_rates
# -------------------------------- POINTING FUNCTIONS --------------------------
[docs] def point_to_sun(self: Pointing, pointing_time: Optional[Time] = None) -> None:
"""
Generate delays to point towards the sun for the given time.
:param pointing_time: Time at which delays should be generated
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="tai")
# Get sun position in RA, DEC and convert to Alz, Az in telescope
# reference frame
sun_position = get_sun(pointing_time)
alt, az = self._ra_dec_to_alt_az(
Angle(sun_position.ra),
Angle(sun_position.dec),
pointing_time,
)
# Compute delays
self.point_array_static(alt, az)
[docs] def point_array_static(
self: Pointing,
altitude: float | Angle,
azimuth: float | Angle,
) -> None:
"""
Calculate the delay given the altitude and azimuth coordinates of a sky object.
:param altitude: altitude coordinates of a sky object as astropy angle
:param azimuth: azimuth coordinates of a sky object as astropy angles
"""
# Type conversions if required
altitude_angle = self.convert_to_astropy_angle(altitude)
azimuth_angle = self.convert_to_astropy_angle(azimuth)
self._az = azimuth_angle.deg
self._el = altitude_angle.deg
# Set above horizon flag
if altitude_angle < 0.0:
self._below_horizon = True
else:
self._below_horizon = False
# Compute the delays
self._delays = self._delays_from_altitude_azimuth( # type: ignore[assignment]
altitude_angle, azimuth_angle
)
self._delay_rates = self._delays * 0
# pylint: disable=too-many-arguments
[docs] def point_array_equatorial(
self: Pointing,
right_ascension: Angle,
declination: Angle,
ra_rate: Optional[Angle] = Angle(0.0, unit="deg"),
dec_rate: Optional[Angle] = Angle(0.0, unit="deg"),
reference_time: Optional[Time] = None,
delta_time: float = 10.0,
pointing_time: Optional[Time] = None,
) -> None:
"""
Calculate the delay and delay_rate for a station pointing.
Calculate the delay and delay_rate (change in delay over delta_time) for a
station pointing described by equatorial coordinates
(the input right_ascension and declination).
:param right_ascension: Right ascension of source in degrees
:param declination: Declination of source in degrees
:param ra_rate: Rate of right ascension in deg/s, used for moving objects
:param dec_rate: Rate of declination in deg/s, used for moving objects
:param reference_time: Time at which the source is at the input right_acension
and declination, only required if using ra_rate and dec_rate
:param delta_time: Time in seconds from pointing_time for calculating the
delay rate
:param pointing_time: Time at the start of the observation
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="utc")
# If reference time is specified, compute elapsed time since it
if reference_time is not None:
rate_time = (pointing_time - reference_time).to_value(format="sec")
else:
rate_time = 0.0
# Calculate required delay
alt_1, az_1 = self._ra_dec_to_alt_az(
right_ascension + ra_rate * rate_time,
declination + dec_rate * rate_time,
pointing_time,
)
alt_2, az_2 = self._ra_dec_to_alt_az(
right_ascension + ra_rate * (rate_time + delta_time),
declination + dec_rate * (rate_time + delta_time),
pointing_time + TimeDelta(delta_time, format="sec"),
)
# If required source is not above horizon, generate zeros
if alt_1 < 0.0:
self._delays = np.zeros(self._nof_antennas)
self._delay_rates = np.zeros(self._nof_antennas)
self._below_horizon = True
return
# Generate delays from calculated altitude and azimuth
self.point_array_static(altitude=alt_1, azimuth=az_1)
# Calculate required delay rate
if delta_time == 0.0:
self._delay_rates = self._delays * 0.0
else:
self._delay_rates = self._get_delay_rates(
alt_1,
az_1,
alt_2,
az_2,
delta_time,
)
# Set above horizon flag
self._below_horizon = False
def _get_delay_rates(
self: Pointing,
alt_1: Angle,
az_1: Angle,
alt_2: Angle,
az_2: Angle,
delta_time: float,
) -> np.ndarray:
delays_1 = self._delays_from_altitude_azimuth(alt_1, az_1)
delays_2 = self._delays_from_altitude_azimuth(alt_2, az_2)
rates = np.zeros(256)
for i, _ in enumerate(delays_1):
rates[i] = (delays_2[i] - delays_1[i]) / delta_time
return rates
[docs] def get_pointing_coefficients(
self: Pointing, start_channel: int, nof_channels: int
) -> Optional[list[list[np.complex128]]]:
"""
Get complex pointing coefficients from generated delays.
:param start_channel: Start channel index
:param nof_channels: Number of channels starting with start_channel
:return: Return the pointing coefficients as a tuple of numpy
complex values
"""
if self._delays is None:
logging.error("No pointing delays generated")
return None
# If below horizon flat is set, return 0s
if self._below_horizon:
return np.zeros(
(self._nof_antennas, nof_channels), dtype=np.complex128
) # type: ignore[return-value]
# Compute frequency range
channel_bandwidth = 400e6 / 512.0
frequencies = np.array(
[
start_channel * channel_bandwidth + i * channel_bandwidth
for i in range(nof_channels)
]
)
# Generate coefficients
coefficients = np.zeros((self._nof_antennas, nof_channels), dtype=np.cfloat)
for i in range(nof_channels):
delays = 2.0 * np.pi * frequencies[i] * self._delays
coefficients[:, i] = np.cos(delays) + 1j * np.sin(delays)
# All done, return coefficients
return coefficients # type: ignore[return-value]
def _delays_from_altitude_azimuth(
self: Pointing, altitude: Angle, azimuth: Angle
) -> np.ndarray:
"""
Calculate the delay using a target altitude Azimuth.
Calculated delay is negative towards source, is the geometric
delay due to the light travel time to be corrected, NOT
the delay inserted in the beamformer.
:param altitude: The altitude of the target astropy angle
:param azimuth: The azimuth of the target astropy angle
:return: The delay in seconds for each antenna
"""
# Calculate transformation
scale = np.array(
[
np.cos(altitude) * np.sin(azimuth),
np.cos(altitude) * np.cos(azimuth),
np.sin(altitude),
]
)
# Apply to antenna displacements
assert self._antennas.xyz is not None # for the type checker
path_length = np.dot(scale, self._antennas.xyz.T)
# Return frequency-independent geometric delays
return np.multiply(-1.0 / c.value, path_length)
def _ra_dec_to_alt_az(
self: Pointing,
right_ascension: float | Angle,
declination: float | Angle,
obstime: Time,
) -> list[Angle]:
"""
Calculate the altitude and azimuth coordinates of a sky object.
From right ascension and declination and time.
:param right_ascension: Right ascension of source -
astropy Angle / string convertable to Angle
:param declination: Declination of source - astropy Angle / string
convertable to Angle
:param obstime: Time of observation (as astropy Time")
:return: List containing altitude and azimuth of source as astropy angle
"""
# Initialise SkyCoord object using the default frame (ICRS) and convert to
# horizontal coordinates (altitude/azimuth) from the antenna's perspective.
sky_coordinates = SkyCoord(ra=right_ascension, dec=declination, unit="deg")
altaz = sky_coordinates.transform_to(
AltAz(obstime=obstime, location=self._reference_antenna_loc)
)
alt = Angle(altaz.alt)
az = Angle(altaz.az)
return [alt, az]
[docs] @staticmethod
def convert_to_astropy_angle(angle: str | float | Angle) -> Angle:
"""
Convert a number or string to an Astropy angle.
:param angle: angle
:return: converted angle
"""
if not isinstance(angle, Angle):
return Angle(angle, "deg")
return angle
[docs] def is_above_horizon(
self: Pointing,
right_ascension: Angle,
declination: Angle,
pointing_time: Time,
) -> bool:
"""
Check if the target is above the horizon, given time for the reference antenna.
:param right_ascension: The right ascension of the target as a astropy angle
:param declination: The declination of the target as an astropy angle.
:param pointing_time: The observation time as an astropy Time.
:return: True if the target coordinates are above the horizon at
the specified time, false otherwise.
"""
alt, _ = self._ra_dec_to_alt_az(
right_ascension,
declination,
pointing_time,
)
return alt > 0.0