Source code for ska_ost_senscalc.low.calculator

import logging
from typing import List, Tuple

import astropy.units as u
import numpy as np
from astroplan import (
    AltitudeConstraint,
    FixedTarget,
    Observer,
    is_always_observable,
    is_observable,
    time_grid_from_range,
)
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.time import Time
from astropy.utils.data import clear_download_cache

from ska_ost_senscalc.low.model import CalculatorInput, CalculatorResult
from ska_ost_senscalc.low.sefd_lookup import SEFDTable

LOGGER = logging.getLogger("senscalc")


# Ensure 'ska-low' is available
def ensure_site_available(site_name="ska-low"):
    obs_sites = EarthLocation.get_site_names(refresh_cache=True)
    if site_name not in obs_sites:
        LOGGER.warning(
            f"'{site_name}' not found in site list. Clearing cache and retrying..."
        )
        clear_download_cache()
        obs_sites = EarthLocation.get_site_names(refresh_cache=True)
        if site_name not in obs_sites:
            LOGGER.debug(f"Available sites after refresh: {obs_sites}")
            msg = f"'{site_name}' still not found after cache refresh."
            raise ValueError(msg)

    return site_name


LOW_SITE_NAME = ensure_site_available("ska-low")

# Fix the date rather than using today to ensure consistency of results
TIME_0 = Time(
    "2023-01-01",
    location=EarthLocation.of_site(LOW_SITE_NAME, refresh_cache=True),
)

LOW = Observer.at_site(LOW_SITE_NAME)

# Create a module level table rather than instantiating for each calculation
sefd_table = SEFDTable()


[docs] def calculate_sensitivity( calculator_input: CalculatorInput, ) -> CalculatorResult: """ For a given set of input parameters, estimate the Stokes I sensitivity for SKA LOW. We have a lookup table which contains sensitivity for a coarse grid of frequency, LST, and (Az, El). For each LST time bin, this function 1. Fetches the SEFD value for the corresponding (Az, El) and coarse frequency grid, 2. Fits a smooth function along the frequency axis to interpolate SEFD on a fine frequency grid 3. computes the Stokes I sensitivity in each time and frequency bin (fine). The computed Stokes I sensitivity are averaged to estimate the final Stokes I value. :param calculator_input: an instance of CalculatorInput with validated user parameters :return: a Calculator result containing the sensitivity and units """ LOGGER.debug("Calculating LOW sensitivity for input: %s", calculator_input) # Find the start and end time of the observation start_utc, end_utc, n_scans = _find_observation_time_utc( calculator_input.pointing_centre, calculator_input.integration_time_h, calculator_input.elevation_limit, ) # Make appropriate time and frequency grids. freq_fine_mhz, bandwidth_mhz = _get_frequency_grids( calculator_input.freq_centre_mhz, calculator_input.bandwidth_mhz ) time_grid_utc = time_grid_from_range((start_utc, end_utc)) # Note that the last cell in the time grid can be shorter than 0.5h. # We need to keep this in mind while estimating sensitivity in that cell. target = _get_target(calculator_input.pointing_centre) # Next, estimate the sensitivity in each time grid point n_time_elem = len(time_grid_utc) def calculate_sensitivity_for_timeslot(timeslot_id: int) -> float: LOGGER.debug( f"Processing time slot {timeslot_id}/{n_time_elem} -" f" {time_grid_utc[timeslot_id]}" ) # Find the LST corresponding to this UTC start_lst = LOW.local_sidereal_time(time_grid_utc[timeslot_id]).value end_lst = (start_lst + 0.5) % 24 # Determine the end time of this time cell in LST # The time resolution in the look-up table is 30 minutes # Note that the last time cell might be shorter than 0.5h # and must be treated as appropriate if timeslot_id == n_time_elem - 1: delta_t_sec = (end_utc - Time(time_grid_utc[timeslot_id])).to("s") else: delta_t_sec = (0.5 * u.hr).to("s") # Convert the pointing RA, DEC to the horizontal coordinates (Az, El) # at this LST target_altaz = LOW.altaz(time_grid_utc[timeslot_id], target) # Now that we have all the relevant boundary conditions for this time cell, query the DB # and get the SEFD values corresponding the values in the fine frequency grid sefd_i_fine = sefd_table.lookup_stokes_i_sefd( target_altaz.az.value, target_altaz.alt.value, start_lst, end_lst, freq_fine_mhz, ) # Calculate sensitivity from SEFD return _sefd_to_sensitivity( sefd_i_fine, calculator_input.num_stations, delta_t_sec.value, bandwidth_mhz, ) LOGGER.debug("Calculating sensitivities for %s timeslots", n_time_elem) sensitivity_intervals = [ calculate_sensitivity_for_timeslot(timeslot_id) for timeslot_id in range(n_time_elem) ] return _stack_sensitivities(sensitivity_intervals, n_scans)
def _find_observation_time_utc( pointing: str | SkyCoord, duration_hour: float | u.Quantity, elevation_limit: float | u.Quantity, ) -> Tuple[Time, Time, float]: """ For a given pointing, find a suitable time range when the source is visible, splitting into multiple scans if necessary. :return: (start_utc, end_utc, n_scans) where n_scans is the number of scans, each with length end_utc-start_utc in case the source is over the horizon for only a short period of time. Note that n_scans is a float. """ target = _get_target(pointing) if type(duration_hour) is not u.Quantity: duration_hour = duration_hour * u.hr if type(elevation_limit) is not u.Quantity: elevation_limit = elevation_limit * u.deg # identifying target visibility observable = is_observable( AltitudeConstraint(min=elevation_limit), Observer.at_site("ska-low"), target, time_grid_from_range((TIME_0 - 0.5 * u.day, TIME_0 + 0.5 * u.day)), )[0] if not observable: raise ValueError( "Specified pointing centre is always below the horizon from the" " SKA LOW site" ) next_transit = LOW.target_meridian_transit_time(TIME_0, target, which="next") observable_in_single_scan = is_always_observable( AltitudeConstraint(min=elevation_limit), Observer.at_site("ska-low"), target, time_grid_from_range( ( next_transit - 0.5 * duration_hour, next_transit + 0.5 * duration_hour, ) ), )[0] if observable_in_single_scan: start_utc = next_transit - 0.5 * duration_hour end_utc = next_transit + 0.5 * duration_hour n_scans = 1 LOGGER.debug( "Duration fits within visible time of the source. Start UTC:" " %s, End UTC: %s, Num of scans: %s", start_utc, end_utc, n_scans, ) else: next_rise = LOW.target_rise_time( TIME_0, target, which="next", horizon=elevation_limit ) next_set = LOW.target_set_time( TIME_0, target, which="next", horizon=elevation_limit ) if next_rise < next_set: visible_time = next_set - next_rise else: previous_rise = LOW.target_rise_time(TIME_0, target, which="previous") visible_time = next_set - previous_rise start_utc = next_transit - 0.5 * visible_time end_utc = next_transit + 0.5 * visible_time n_scans = (duration_hour / visible_time.to(u.hr)).value LOGGER.debug( "Duration longer than visible time of the source. Start UTC:" " %s, End UTC: %s, Num of scans: %s", start_utc, end_utc, n_scans, ) return start_utc, end_utc, n_scans def _get_target(pointing_centre: str | SkyCoord) -> FixedTarget: """ create an astroplan.FixedTarget() object from a specified pointing centre ToDo: extend this to accept a target name that can be queried? :param pointing_centre: eg 10:00:00 -30:00:00 or astropy.coordinates.sky_coordinate.SkyCoord object :return: a corresponding astroplan.FixedTarget object """ match pointing_centre: case str(): point_coord = SkyCoord(pointing_centre, unit=(u.hourangle, u.deg)) case SkyCoord(): try: getattr(pointing_centre, "ra") point_coord = pointing_centre except AttributeError as e: return e target = FixedTarget(name="target", coord=point_coord) return target def _get_frequency_grids( freq_centre: int, bandwidth: int ) -> Tuple[np.ndarray, np.ndarray]: """ Create an incrementing frequency grid using the freq_centre and bandwidth :param freq_centre: the user defined input for the observation :param bandwidth: the user defined input for the observation :return: a 2 element tuple containing the frequency grid and a grid of ones of the same dimensions """ freq_fine_mhz = np.arange( freq_centre - bandwidth / 2, freq_centre + bandwidth / 2, step=1 ) bandwidth_mhz = np.ones_like(freq_fine_mhz) # Note that the last frequency cell can be narrower than 1 MHz # Adjust the last frequency cell value bandwidth_mhz[-1] = (freq_centre + bandwidth / 2) - freq_fine_mhz[-1] return freq_fine_mhz, bandwidth_mhz def _sefd_to_sensitivity( sefd: np.ndarray, num_stations: int, delta_t_sec: float, bandwidth_mhz: np.ndarray, ) -> float: """ Compute Sensitivity from SEFD for a given number of antennas, observing time, and bandwidth. If SEFD is an array with multiple elements, return the stacked sensitivity. :param sefd: value of SEFD from the look up table :param num_stations: the user defined input for the observation :param delta_t_sec: the size of the time cell in seconds :param bandwidth_mhz: the array of 1s representing the bandwidth :return: the Stokes I sensitivity for this cell """ sensitivity = sefd / np.sqrt( num_stations * (num_stations - 1) * delta_t_sec * bandwidth_mhz * 1e6 ) stacked_sensitivity = np.sqrt(np.sum(sensitivity**2)) / len(sefd) return stacked_sensitivity def _stack_sensitivities( sensitivity_intervals: List[float], n_scans ) -> CalculatorResult: """ Scale the per-scan sensitivity to get the final sensitivity :param sensitivity_intervals: :param n_scans: The number of scans the observation is split into :return: The result, in uJy/beam """ sensitivity_intervals = np.asarray(sensitivity_intervals)[ np.isfinite(sensitivity_intervals) ] weight = 1 / sensitivity_intervals**2 sensitivity_per_scan = np.sqrt( np.sum((sensitivity_intervals * weight) ** 2) ) / np.sum(weight) final_sensitivity = sensitivity_per_scan / np.sqrt(n_scans) return CalculatorResult(final_sensitivity * 1e6, "uJy/beam")