# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
MCCS pointing calculation prototype.
This is originally from the AAVS code.
"""
from __future__ import annotations # allow forward references in type hints
import logging
import warnings
from datetime import datetime
from typing import Any, Optional
import katpoint
import numpy as np
from astropy.constants import c # pylint: disable=no-name-in-module
from astropy.coordinates import Angle, EarthLocation, get_sun
from astropy.time import TimeDelta
from astropy.time.core import Time
from astropy.utils.exceptions import AstropyWarning
warnings.simplefilter("ignore", category=AstropyWarning)
__author__ = "Alessio Magro"
antennas_per_tile = 16
# Astropy needs to cache https://datacenter.iers.org/data/9/finals2000A.all
# This takes time and should be pre-cached.
# This is being done by the OCI image, but leaving this here just in case.
print("Premptively caching EIRS data...")
Time.now().ut1 # pylint: disable=expression-not-assigned
print("Done.")
# pylint: disable=too-many-instance-attributes
[docs]
class Pointing:
"""Helper class for generating beamforming coefficients."""
[docs]
def __init__(self: Pointing, station_info: StationInformation) -> None:
"""
Pointing class, generates delay and delay rates to be downloaded to TPMs.
:param station_info: Basic information for station location and
antenna displacements
"""
# Store arguments
self.station = station_info
# Get station location
self._longitude = self.station.longitude
self._latitude = self.station.latitude
self._height = self.station.ellipsoidalheight
# Initial az/el pointing
self._az = 0.0
self._el = 0.0
self._antennas = self.station.antennas
self._nof_antennas = self._antennas.nof_elements
# Get reference antenna location
self._reference_antenna_loc = EarthLocation.from_geodetic(
self._longitude,
self._latitude,
height=self._height,
ellipsoid="WGS84",
)
# Placeholder for delays
self._delays: np.ndarray = None # type: ignore[assignment]
self._delay_rates: np.ndarray = None # type: ignore[assignment]
[docs]
def delays(self: Pointing) -> np.ndarray:
"""
Return delay values.
:return: delay values
"""
return self._delays
[docs]
def delay_rates(self: Pointing) -> np.ndarray:
"""
Return delay rates values.
:return: delay rates values
"""
return self._delay_rates
# -------------------------------- POINTING FUNCTIONS --------------------------
[docs]
def point_to_sun(self: Pointing, pointing_time: Optional[Time] = None) -> None:
"""
Generate delays to point towards the sun for the given time.
:param pointing_time: Time at which delays should be generated
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="tai")
# Get sun position in RA, DEC and convert to Alz, Az in telescope
# reference frame
sun_position = get_sun(pointing_time)
alt, az = self._ra_dec_to_alt_az(
Angle(sun_position.ra),
Angle(sun_position.dec),
pointing_time,
)
# Compute delays
self.point_array_static(alt, az)
[docs]
def point_to_special(
self: Pointing, name: str, pointing_time: Optional[Time] = None
) -> None:
"""
Generate delays to point towards special named bodies for the given time.
:param name: The special target to be pointed at
:param pointing_time: Time at which delays should be generated
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="tai")
# Get coords from target name and convert to ra, dec in telescope
target = katpoint.Target(f"{name}, special")
ant = katpoint.Antenna(self._reference_antenna_loc)
radec = target.radec(timestamp=pointing_time, antenna=ant)
ra = Angle(radec.ra)
dec = Angle(radec.dec)
self.point_array_equatorial(right_ascension=ra, declination=dec)
[docs]
def point_galactic(
self: Pointing,
longitude: float | Angle,
latitude: float | Angle,
delta_time: float = 10.0,
pointing_time: Optional[Any] = None,
) -> None:
"""
Generate delays to point towards galactic coordinates.
:param longitude: Longitude coordinates for target
:param latitude: latitude coordinates for target
:param delta_time: time in seconds from pointing time for calculating the
delay rate
:param pointing_time: time at the start of the observation
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="tai")
# Get coords from galactic coords and convert to ra, dec in telescope
# reference frame
target = katpoint.Target(f"gal, {longitude}, {latitude}")
ant = katpoint.Antenna(self._reference_antenna_loc)
radec = target.radec(timestamp=pointing_time, antenna=ant)
ra = Angle(radec.ra)
dec = Angle(radec.dec)
self.point_array_equatorial(
right_ascension=ra,
declination=dec,
pointing_time=pointing_time,
delta_time=delta_time,
)
[docs]
def point_tle(
self: Pointing,
line1: str,
line2: str,
pointing_time: Optional[Any] = None,
delta_time: float = 10.0,
) -> None:
"""
Generate delays to point towards tle coordinates.
:param line1: Line one of an orbiting body
:param line2: Line two of an orbiting body
:param pointing_time: time at the start of the observation
:param delta_time: time in seconds from pointing time for calculating the
delay rate
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="tai")
target = katpoint.Target(f"tle, {line1}, {line2}")
ant = katpoint.Antenna(self._reference_antenna_loc)
azel = target.azel(timestamp=pointing_time, antenna=ant)
az1 = Angle(azel.az)
alt1 = Angle(azel.alt)
azel2 = target.azel(
timestamp=pointing_time + TimeDelta(delta_time, format="sec"),
antenna=ant,
)
alt2 = Angle(azel2.alt)
az2 = Angle(azel2.az)
self.point_array_static(altitude=alt1, azimuth=az1)
# Calculate required delay rate
if delta_time == 0.0:
self._delay_rates = self._delays * 0.0
else:
self._delay_rates = self._get_delay_rates(
alt1,
az1,
alt2,
az2,
delta_time,
)
[docs]
def point_array_static(
self: Pointing,
altitude: float | Angle,
azimuth: float | Angle,
) -> None:
"""
Calculate the delay given the altitude and azimuth coordinates of a sky object.
:param altitude: altitude coordinates of a sky object as astropy angle
:param azimuth: azimuth coordinates of a sky object as astropy angles
"""
# Type conversions if required
altitude_angle = self.convert_to_astropy_angle(altitude)
azimuth_angle = self.convert_to_astropy_angle(azimuth)
self._az = azimuth_angle.deg
self._el = altitude_angle.deg
# Compute the delays
self._delays = self._delays_from_altitude_azimuth( # type: ignore[assignment]
altitude_angle, azimuth_angle
)
self._delay_rates = self._delays * 0
# pylint: disable=too-many-arguments
[docs]
def point_array_alt_az(
self: Pointing,
altitude: Angle,
azimuth: Angle,
altitude_rate: Angle = Angle(0.0, unit="deg"),
azimuth_rate: Angle = Angle(0.0, unit="deg"),
reference_time: Optional[Time] = None,
delta_time: float = 10.0,
pointing_time: Optional[Time] = None,
) -> None:
"""
Calculate the delay given the altitude and azimuth coordinates of a sky object.
:param altitude: altitude coordinates of a sky object as astropy angle
:param azimuth: azimuth coordinates of a sky object as astropy angles
:param altitude_rate: rate of change of altitude in deg/s, used for moving
objects
:param azimuth_rate: rate of change of azimuth in deg/s, used for moving
objects
:param reference_time: Time at which the source is at the input altitude and
azimuth, only required if using altitude_rate and azimuth_rate
:param delta_time: time in seconds from pointing time for calculating the
delay rate
:param pointing_time: time at the start of the observation
"""
# Type conversions if required
altitude = self.convert_to_astropy_angle(altitude)
azimuth = self.convert_to_astropy_angle(azimuth)
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="utc")
# If reference time is specified, compute elapsed time since it
if reference_time is not None:
rate_time = (pointing_time - reference_time).to_value(format="sec")
else:
rate_time = 0.0
# Calculate required delay
alt_1, az_1 = (
altitude + altitude_rate * rate_time,
azimuth + azimuth_rate * rate_time,
)
alt_2, az_2 = (
altitude + altitude_rate * (rate_time + delta_time),
azimuth + azimuth_rate * (rate_time + delta_time),
)
# Generate delays from calculated altitude and azimuth
self.point_array_static(altitude=alt_1, azimuth=az_1)
# Calculate required delay rate
if delta_time == 0.0:
self._delay_rates = self._delays * 0.0
else:
self._delay_rates = self._get_delay_rates(
alt_1,
az_1,
alt_2,
az_2,
delta_time,
)
# pylint: disable=too-many-arguments
[docs]
def point_array_equatorial(
self: Pointing,
right_ascension: Angle,
declination: Angle,
ra_rate: Angle = Angle(0.0, unit="deg"),
dec_rate: Angle = Angle(0.0, unit="deg"),
reference_time: Optional[Time] = None,
delta_time: float = 10.0,
pointing_time: Optional[Time] = None,
) -> None:
"""
Calculate the delay and delay_rate for a station pointing.
Calculate the delay and delay_rate (change in delay over delta_time) for a
station pointing described by equatorial coordinates
(the input right_ascension and declination).
:param right_ascension: Right ascension of source in degrees
:param declination: Declination of source in degrees
:param ra_rate: Rate of right ascension in deg/s, used for moving objects
:param dec_rate: Rate of declination in deg/s, used for moving objects
:param reference_time: Time at which the source is at the input right_acension
and declination, only required if using ra_rate and dec_rate
:param delta_time: Time in seconds from pointing_time for calculating the
delay rate
:param pointing_time: Time at the start of the observation
"""
# If no time is specified, get current time
if pointing_time is None:
pointing_time = Time(datetime.utcnow(), scale="utc")
# If reference time is specified, compute elapsed time since it
if reference_time is not None:
rate_time = (pointing_time - reference_time).to_value(format="sec")
else:
rate_time = 0.0
# Calculate required delay
alt_1, az_1 = self._ra_dec_to_alt_az(
right_ascension + ra_rate * rate_time,
declination + dec_rate * rate_time,
pointing_time,
)
alt_2, az_2 = self._ra_dec_to_alt_az(
right_ascension + ra_rate * (rate_time + delta_time),
declination + dec_rate * (rate_time + delta_time),
pointing_time + TimeDelta(delta_time, format="sec"),
)
# Generate delays from calculated altitude and azimuth
self.point_array_static(altitude=alt_1, azimuth=az_1)
# Calculate required delay rate
if delta_time == 0.0:
self._delay_rates = self._delays * 0.0
else:
self._delay_rates = self._get_delay_rates(
alt_1,
az_1,
alt_2,
az_2,
delta_time,
)
def _get_delay_rates(
self: Pointing,
alt_1: Angle,
az_1: Angle,
alt_2: Angle,
az_2: Angle,
delta_time: float,
) -> np.ndarray:
delays_1 = self._delays_from_altitude_azimuth(alt_1, az_1)
delays_2 = self._delays_from_altitude_azimuth(alt_2, az_2)
rates = np.zeros(256)
for i, _ in enumerate(delays_1):
rates[i] = (delays_2[i] - delays_1[i]) / delta_time
return rates
[docs]
def get_pointing_coefficients(
self: Pointing, start_channel: int, nof_channels: int
) -> Optional[list[list[np.complex128]]]:
"""
Get complex pointing coefficients from generated delays.
:param start_channel: Start channel index
:param nof_channels: Number of channels starting with start_channel
:return: Return the pointing coefficients as a tuple of numpy
complex values
"""
if self._delays is None:
logging.error("No pointing delays generated")
return None
# Compute frequency range
channel_bandwidth = 400e6 / 512.0
frequencies = np.array(
[
start_channel * channel_bandwidth + i * channel_bandwidth
for i in range(nof_channels)
]
)
# Generate coefficients
coefficients = np.zeros((self._nof_antennas, nof_channels), dtype=np.cfloat)
for i in range(nof_channels):
delays = 2.0 * np.pi * frequencies[i] * self._delays
coefficients[:, i] = np.cos(delays) + 1j * np.sin(delays)
# All done, return coefficients
return coefficients # type: ignore[return-value]
def _delays_from_altitude_azimuth(
self: Pointing, altitude: Angle, azimuth: Angle
) -> np.ndarray:
"""
Calculate the delay using a target altitude Azimuth.
Calculated delay is negative towards source, is the geometric
delay due to the light travel time to be corrected, NOT
the delay inserted in the beamformer.
:param altitude: The altitude of the target astropy angle
:param azimuth: The azimuth of the target astropy angle
:return: The delay in seconds for each antenna
"""
# Calculate transformation
scale = np.array(
[
np.cos(altitude) * np.sin(azimuth),
np.cos(altitude) * np.cos(azimuth),
np.sin(altitude),
]
)
# Apply to antenna displacements
assert self._antennas.xyz is not None # for the type checker
path_length = np.dot(scale, self._antennas.xyz.T)
# Return frequency-independent geometric delays
return np.multiply(-1.0 / c.value, path_length)
def _ra_dec_to_alt_az(
self: Pointing,
right_ascension: float | Angle,
declination: float | Angle,
obstime: Time,
) -> list[Angle]:
"""
Calculate the altitude and azimuth coordinates of a sky object.
From right ascension and declination and time.
:param right_ascension: Right ascension of source -
astropy Angle / string convertable to Angle
:param declination: Declination of source - astropy Angle / string
convertable to Angle
:param obstime: Time of observation (as astropy Time")
:return: List containing altitude and azimuth of source as astropy angle
"""
# Initialise a katpoint Target with the default frame (ICRS) and convert to
# horizontal coordinates (altitude/azimuth) from the antenna's perspective.
target = katpoint.Target.from_radec(ra=right_ascension, dec=declination)
ant = katpoint.Antenna(self._reference_antenna_loc)
altaz = target.azel(timestamp=obstime, antenna=ant)
alt = Angle(altaz.alt)
az = Angle(altaz.az)
return [alt, az]
[docs]
@staticmethod
def convert_to_astropy_angle(angle: str | float | Angle) -> Angle:
"""
Convert a number or string to an Astropy angle.
:param angle: angle
:return: converted angle
"""
if not isinstance(angle, Angle):
return Angle(angle, "deg")
return angle
[docs]
def is_above_horizon(
self: Pointing,
right_ascension: Angle,
declination: Angle,
pointing_time: Time,
) -> bool:
"""
Check if the target is above the horizon, given time for the reference antenna.
:param right_ascension: The right ascension of the target as a astropy angle
:param declination: The declination of the target as an astropy angle.
:param pointing_time: The observation time as an astropy Time.
:return: True if the target coordinates are above the horizon at
the specified time, false otherwise.
"""
alt, _ = self._ra_dec_to_alt_az(
right_ascension,
declination,
pointing_time,
)
return alt > 0.0