Source code for ska_sdp_instrumental_calibration.data_managers.beams

import logging
from dataclasses import dataclass

import everybeam as eb
import numpy as np
from astropy.coordinates import ITRS, AltAz, SkyCoord
from astropy.coordinates.earth import EarthLocation
from astropy.time import Time

logger = logging.getLogger(__name__)


[docs] def convert_time_to_solution_time(time: float | np.ndarray) -> Time: """ Convert float time to Astropy time. Parameters ---------- time: float Input time in float Return ------ Time object containing an array of time values """ # This Time->Time conversion is what was originally done in INST # Although Time can be treated as a "Value Object", there are subtle # differences in the order in which floating point operations are performed # thus the results may slightly vary if you try to simplify this # By removeing the additional datetime64 to time conversion return Time(Time(time / 86400.0, format="mjd", scale="utc").datetime64)
# from everybeam.readthedocs.io/en/latest/tree/demos/lofar-array-factor.html
[docs] def radec_to_xyz(direction: SkyCoord, time: Time): """ Convert RA and Dec ICRS coordinates to ITRS cartesian coordinates. Parameters ---------- direction: SkyCoord astropy pointing direction time: astropy.time.Timne Observation time Return ------ NumPy array containing the ITRS X, Y and Z coordinates """ direction_itrs = direction.transform_to(ITRS(obstime=time)) return np.asarray(direction_itrs.cartesian.xyz.transpose())
[docs] class PointingBelowHorizon(Exception): """ Pointing below exception raised when a sky component with its RA-DEC at a given time is below horizon """ pass
[docs] class BeamsLow: """A beam class specific to handling low beams.""" def __init__( self, nstations: int, array_location: EarthLocation, direction: SkyCoord, frequency: np.ndarray, ms_path: str, soln_time: float, element_response_model: str, ): self.nstations = nstations self.array_location = array_location self.beam_direction = direction self.frequency = frequency self.beam_ms = ms_path self.element_response_model = element_response_model self.delay_dir_itrf = None self.solution_time = convert_time_to_solution_time(soln_time) self.solution_time_mjd_seconds = self.solution_time.mjd * 86400 # Check beam pointing direction for all solution times self.validate_direction_above_horizon(self.beam_direction) # Coordinates of beam centre self.delay_dir_itrf = radec_to_xyz( self.beam_direction, self.solution_time ) self.telescope = eb.load_telescope( # pylint: disable=I1101 self.beam_ms, element_response_model=element_response_model, ) self.scale = np.ones( (self.frequency.size,), dtype=self.frequency.dtype, ) if type(self.telescope) is eb.OSKAR: # pylint: disable=I1101 """ Set normalisation scaling to the Frobenius norm of the zenith response divided by sqrt(2). Should be the same for all stations so pick one. Should use the station location rather than central array location, e.g. using the following code, but some functions (e.g. ska-sdp-datamodels function create_named_configuration -- at least for some configurations) set xyz coordinates to ENU rather than the geocentric coordinates. So use the array location and a central station for now. Note that OSKAR datasets have correct geocentric coordinates, but also have the array location set to the first station xyz, so using array_location with stn=0 works. xyz = vis.configuration.xyz.data[stn, :] self.antenna_locations.append( EarthLocation.from_geocentric( xyz[0], xyz[1], xyz[2], unit="m", ) ) """ logger.info("Setting beam normalisation for OSKAR data") stn = 0 dir_itrf_zen = radec_to_xyz( SkyCoord( alt=90, az=0, unit="deg", frame="altaz", obstime=self.solution_time, location=self.array_location, ), self.solution_time, ) for chan, freq in enumerate(self.frequency): J = self.telescope.station_response( self.solution_time_mjd_seconds, stn, freq, dir_itrf_zen, dir_itrf_zen, ) self.scale[chan] = np.sqrt(2) / np.linalg.norm(J)
[docs] def validate_direction_above_horizon(self, direction: SkyCoord) -> AltAz: """ Calculate Altitude-Azimuth for a direction, and ensure that the direction is valid for the given solution interval of the beam Parameters ---------- direction: SkyCoord direction of the source Raises ------ PointingBelowHorizon exception if direction is below horizon Returns ------- Altaz[list] if direction is valid """ altaz = direction.transform_to( AltAz(obstime=self.solution_time, location=self.array_location) ) if altaz.alt.degree < 0: raise PointingBelowHorizon( "Pointing below horizon for some of the solution times" )
[docs] def array_response( self, direction: SkyCoord, ) -> np.ndarray: """Return the response of each antenna or station in a given direction Parameters ---------- direction: SkyCoord Direction of desired response Returns ------- np.complex128 array of beam matrices [nant, nfreq, 2, 2] """ # Get the component direction in ITRF dir_itrf = radec_to_xyz(direction, self.solution_time) beams = np.empty( ( self.nstations, self.frequency.size, 2, 2, ), dtype=np.complex128, ) # NOTE: Check if station names can be used instead of # station indices for stn in range(self.nstations): for chan, freq in enumerate(self.frequency): beams[stn, chan, :, :] = ( self.telescope.station_response( self.solution_time_mjd_seconds, stn, freq, dir_itrf, self.delay_dir_itrf, ) * self.scale[chan] ) return beams
[docs] @dataclass class BeamsFactory: """ Dataclass to denote a beam. """ nstations: int array_location: EarthLocation direction: SkyCoord ms_path: str element_response_model: str
[docs] def get_beams_low(self, frequency, soln_time) -> BeamsLow: """ Initializes and returns a BeamsLow object. Parameters ---------- frequency: np.ndarray Array of frequencies soln_time: float Solution time for the given solution interval Returns ------- A BeamsLow Object for the given frequency range and solution time. """ return BeamsLow( **self.__dict__, frequency=frequency, soln_time=soln_time )