Source code for ska_ost_senscalc.low.calculator

import logging
from datetime import date, datetime, timedelta
from typing import List, Tuple

import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.time import Time
from ephem import AlwaysUpError, FixedBody, NeverUpError, Observer

from ska_ost_senscalc.low.model import CalculatorInput, CalculatorResult
from ska_ost_senscalc.low.sefd_lookup import SEFDTable

LOGGER = logging.getLogger("senscalc")

# Longitude and Latitude are from the SKA1 LOW Configuration
#    Coordinates document (SKA-TEL-SKO-0000422 revision 04).
(LOW_LONGITUDE, LOW_LATITUDE, LOW_ELEVATION) = (
    "116.7644482",
    "-26.82472208",
    377.8,
)

# Create a module level table rather than instantiating for each calculation
sefd_table = SEFDTable()


[docs] def calculate_sensitivity( calculator_input: CalculatorInput, ) -> CalculatorResult: """ For a given set of input parameters, estimate the Stokes I sensitivity for SKA LOW. We have a look up table which contains sensitivity for a coarse grid of frequency, LST, and (Az, El). For each LST time bin, this function 1. Fetches the SEFD value for the corresponding (Az, El) and coarse frequency grid, 2. Fits a smooth function along the frequency axis to interpolate SEFD on a fine frequency grid 3. computes the Stokes I sensitivity in each time and frequency bin (fine). The computed Stokes I sensitivity are averaged to estimate the final Stokes I value. :param calculator_input: an instance of CalculatorInput with validated user parameters :return: a Calculator result containing the sensitivity and units """ LOGGER.debug("Calculating LOW sensitivity for input: %s", calculator_input) ska_low = _get_skalow_object(calculator_input.elevation_limit) skalow_lon = np.degrees(float(ska_low.lon)) skalow_lat = np.degrees(float(ska_low.lat)) # Find the start and end time of the observation start_utc, end_utc, n_scans = _find_observation_time_utc( calculator_input.pointing_centre, calculator_input.integration_time_h, ska_low ) # Make appropriate time and frequency grids. freq_fine_mhz, bandwidth_mhz = _get_frequency_grids( calculator_input.freq_centre_mhz, calculator_input.bandwidth_mhz ) time_grid_utc = np.arange(start_utc, end_utc, step=timedelta(hours=0.5)) # Note that the last cell in the time grid can be shorter than 0.5h. # We need to keep this in mind while estimating sensitivity in that cell. target = _get_target(calculator_input.pointing_centre) # Next, estimate the sensitivity in each time grid point n_time_elem = len(time_grid_utc) def calculate_sensitivity_for_timeslot(timeslot_id: int) -> float: LOGGER.debug( f"Processing time slot {timeslot_id}/{n_time_elem} -" f" {time_grid_utc[timeslot_id]}" ) # Find the LST corresponding to this UTC start_lst = _utc_to_lst(time_grid_utc[timeslot_id], skalow_lon, skalow_lat) end_lst = (start_lst + 0.5) % 24 # Determine the end time of this time cell in LST # The time resolution in the look-up table is 30 minutes # Note that the last time cell might be shorter than 0.5h # and must be treated as appropriate if timeslot_id == n_time_elem - 1: delta_t_sec = ( end_utc - time_grid_utc[timeslot_id].astype(datetime) ).total_seconds() else: delta_t_sec = 0.5 * 3600.0 # Convert the pointing RA, DEC to the horizontal coordinates (Az, El) # at this LST ska_low.date = time_grid_utc[timeslot_id].astype(datetime) target.compute(ska_low) az = np.degrees(float(target.az)) el = np.degrees(float(target.alt)) # Now that we have all the relevant boundary conditions for this time cell, query the DB # and get the SEFD values corresponding the the values in the fine frequency grid sefd_i_fine = sefd_table.lookup_stokes_i_sefd( az, el, start_lst, end_lst, freq_fine_mhz ) # Calculate sensitivity from SEFD return _sefd_to_sensitivity( sefd_i_fine, calculator_input.num_stations, delta_t_sec, bandwidth_mhz, ) LOGGER.debug("Calculating sensitivities for %s timeslots", n_time_elem) sensitivity_intervals = [ calculate_sensitivity_for_timeslot(timeslot_id) for timeslot_id in range(n_time_elem) ] return _stack_sensitivities(sensitivity_intervals, n_time_elem, n_scans)
def _get_skalow_object(elevation_limit) -> Observer: """ TODO: Elevation corresponds to MWA. Insert SKA LOW array centre elevation once known. :return: an ephem.Observer() object with details containing the SKA LOW array coordinates. """ ska_low = Observer() ska_low.lon = LOW_LONGITUDE ska_low.lat = LOW_LATITUDE ska_low.elevation = LOW_ELEVATION # There are problems calculating the sensitivity for low-elevation sources - # this introduces a low-elevation limit of 15 degrees ska_low.horizon = str(elevation_limit) return ska_low def _find_observation_time_utc( pointing: str, integration_time_h: float, observatory: Observer ) -> Tuple[datetime, datetime, float]: """ For a given pointing, find a suitable time range when the source is visible, splitting into multiple scans if necessary. TODO: Allow a user-defined elevation limit. Accept pointing in SkyCoord() format. :return: (start_utc, end_utc, n_scans) where n_scans is the number of scans, each with length end_utc-start_utc in case the source is over the horizon for only a short period of time. Note that n_scans is a float. """ target = _get_target(pointing) # Fix the date rather than using today to ensure consistency of results observatory.date = date.fromisoformat("2023-01-01") target.compute(observatory) try: next_rise = observatory.next_rising(target) next_transit = observatory.next_transit(target) next_set = observatory.next_setting(target) LOGGER.debug( "Source rise time: %s, Source transit time: %s, Source set time: %s", next_rise, next_transit, next_set, ) except NeverUpError as err: raise ValueError( "Specified pointing centre is always below the horizon from the" " SKA LOW site" ) from err except AlwaysUpError: # Source is always up start_utc = target.transit_time.datetime() - timedelta( hours=integration_time_h / 2 ) end_utc = target.transit_time.datetime() + timedelta( hours=integration_time_h / 2 ) n_scans = 1 LOGGER.debug( "Source always up. Start UTC: %s, End UTC: %s, Num of scans: %s", start_utc, end_utc, n_scans, ) else: visible_time_sec = (next_set.datetime() - next_rise.datetime()).total_seconds() if visible_time_sec < 0: previous_rise = observatory.previous_rising(target) visible_time_sec = ( next_set.datetime() - previous_rise.datetime() ).total_seconds() if integration_time_h < visible_time_sec / 3600.0: # The user-specified duration can fit within a single scan start_utc = next_transit.datetime() - timedelta( hours=integration_time_h / 2 ) end_utc = next_transit.datetime() + timedelta(hours=integration_time_h / 2) n_scans = 1 LOGGER.debug( "Duration fits within visible time of the source. Start" " UTC: %s, End UTC: %s, Num of scans: %s", start_utc, end_utc, n_scans, ) else: # The observation duration is longer than the source visibility time # Split integration_time_h into multiple scans with length less than visible_time_sec start_utc = next_transit.datetime() - timedelta( seconds=visible_time_sec / 2 ) end_utc = next_transit.datetime() + timedelta(seconds=visible_time_sec / 2) n_scans = integration_time_h * 3600.0 / visible_time_sec LOGGER.debug( "Duration longer than visible time of the source. Start" " UTC: %s, End UTC: %s, Num of scans: %s", start_utc, end_utc, n_scans, ) return start_utc, end_utc, n_scans def _utc_to_lst( time_utc: datetime, obs_lon: np.ndarray[float], obs_lat: np.ndarray[float] ) -> float: """ Converts UTC to Local Sidereal Time at the given location :param time_utc: the UTC time to convert :param obs_lat: the latitude of the geolocation :param obs_lon: the longitude of the geolocation :return: the corresponding LST time """ time_utc = Time(time_utc, location=(obs_lon, obs_lat)) return time_utc.sidereal_time("apparent").value def _get_target(pointing_centre: str) -> FixedBody: """ Create an ephem.FixedBody() object out of the specified pointing centre :param pointing_centre: eg 10:00:00 -30:00:00 :return: a corresponding ephem.FixedBody() """ point_coord = SkyCoord(pointing_centre, unit=(u.hourangle, u.deg)) target = FixedBody() target._epoch = "2000." target._ra = point_coord.ra.radian target._dec = point_coord.dec.radian return target def _get_frequency_grids( freq_centre: int, bandwidth: int ) -> Tuple[np.ndarray, np.ndarray]: """ Create an incrementing frequency grid using the freq_centre and bandwidth :param freq_centre: the user defined input for the observation :param bandwidth: the user defined input for the observation :return: a 2 element tuple containing the frequency grid and a grid of ones of the same dimensions """ freq_fine_mhz = np.arange( freq_centre - bandwidth / 2, freq_centre + bandwidth / 2, step=1 ) bandwidth_mhz = np.ones_like(freq_fine_mhz) # Note that the last frequency cell can be narrower than 1 MHz # Adjust the last frequency cell value bandwidth_mhz[-1] = (freq_centre + bandwidth / 2) - freq_fine_mhz[-1] return freq_fine_mhz, bandwidth_mhz def _sefd_to_sensitivity( sefd: np.ndarray, num_stations: int, delta_t_sec: float, bandwidth_mhz: np.ndarray, ) -> float: """ Compute Sensitivity from SEFD for a given number of antennas, observing time, and bandwidth. If SEFD is an array with multiple elements, return the stacked sensitivity. :param sefd: value of SEFD from the look up table :param num_stations: the user defined input for the observation :param delta_t_sec: the size of the time cell in seconds :param bandwidth_mhz: the array of 1s representing the bandwidth :return: the Stokes I sensitivity for this cell """ sensitivity = sefd / np.sqrt( num_stations * (num_stations - 1) * delta_t_sec * bandwidth_mhz * 1e6 ) stacked_sensitivity = np.sqrt(np.sum(sensitivity**2)) / len(sefd) return stacked_sensitivity def _stack_sensitivities( sensitivity_intervals: List[float], n_time_elem, n_scans ) -> CalculatorResult: """ Scale the per-scan sensitivity to get the final sensitivity :param sensitivity_intervals: :param n_time_elem: The number of elements in the time grid :param n_scans: The number of scans the observation is split into :return: The result, in uJy/beam """ sensitivity_intervals = np.asarray(sensitivity_intervals) sensitivity_per_scan = np.sqrt(np.sum(sensitivity_intervals**2)) / n_time_elem final_sensitivity = sensitivity_per_scan / np.sqrt(n_scans) return CalculatorResult(final_sensitivity * 1e6, "uJy/beam")