import logging
from typing import List, Tuple
import astropy.units as u
import numpy as np
from astroplan import (
AltitudeConstraint,
FixedTarget,
Observer,
is_always_observable,
is_observable,
time_grid_from_range,
)
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.time import Time
from astropy.utils.data import clear_download_cache
from ska_ost_senscalc.low.model import CalculatorInput, CalculatorResult
from ska_ost_senscalc.low.sefd_lookup import SEFDTable
LOGGER = logging.getLogger("senscalc")
# Ensure 'ska-low' is available
def ensure_site_available(site_name="ska-low"):
obs_sites = EarthLocation.get_site_names(refresh_cache=True)
if site_name not in obs_sites:
LOGGER.warning(
f"'{site_name}' not found in site list. Clearing cache and retrying..."
)
clear_download_cache()
obs_sites = EarthLocation.get_site_names(refresh_cache=True)
if site_name not in obs_sites:
LOGGER.debug(f"Available sites after refresh: {obs_sites}")
msg = f"'{site_name}' still not found after cache refresh."
raise ValueError(msg)
return site_name
LOW_SITE_NAME = ensure_site_available("ska-low")
# Fix the date rather than using today to ensure consistency of results
TIME_0 = Time(
"2023-01-01",
location=EarthLocation.of_site(LOW_SITE_NAME, refresh_cache=True),
)
LOW = Observer.at_site(LOW_SITE_NAME)
# Create a module level table rather than instantiating for each calculation
sefd_table = SEFDTable()
[docs]
def calculate_sensitivity(
calculator_input: CalculatorInput,
) -> CalculatorResult:
"""
For a given set of input parameters, estimate the Stokes I sensitivity for SKA LOW.
We have a lookup table which contains sensitivity for a coarse grid of
frequency, LST, and (Az, El). For each LST time bin, this function
1. Fetches the SEFD value for the corresponding (Az, El) and coarse frequency grid,
2. Fits a smooth function along the frequency axis to interpolate
SEFD on a fine frequency grid
3. computes the Stokes I sensitivity in each time and frequency bin (fine).
The computed Stokes I sensitivity are averaged to estimate the final Stokes I value.
:param calculator_input: an instance of CalculatorInput with validated user parameters
:return: a Calculator result containing the sensitivity and units
"""
LOGGER.debug("Calculating LOW sensitivity for input: %s", calculator_input)
# Find the start and end time of the observation
start_utc, end_utc, n_scans = _find_observation_time_utc(
calculator_input.pointing_centre,
calculator_input.integration_time_h,
calculator_input.elevation_limit,
)
# Make appropriate time and frequency grids.
freq_fine_mhz, bandwidth_mhz = _get_frequency_grids(
calculator_input.freq_centre_mhz, calculator_input.bandwidth_mhz
)
time_grid_utc = time_grid_from_range((start_utc, end_utc))
# Note that the last cell in the time grid can be shorter than 0.5h.
# We need to keep this in mind while estimating sensitivity in that cell.
target = _get_target(calculator_input.pointing_centre)
# Next, estimate the sensitivity in each time grid point
n_time_elem = len(time_grid_utc)
def calculate_sensitivity_for_timeslot(timeslot_id: int) -> float:
LOGGER.debug(
f"Processing time slot {timeslot_id}/{n_time_elem} -"
f" {time_grid_utc[timeslot_id]}"
)
# Find the LST corresponding to this UTC
start_lst = LOW.local_sidereal_time(time_grid_utc[timeslot_id]).value
end_lst = (start_lst + 0.5) % 24
# Determine the end time of this time cell in LST
# The time resolution in the look-up table is 30 minutes
# Note that the last time cell might be shorter than 0.5h
# and must be treated as appropriate
if timeslot_id == n_time_elem - 1:
delta_t_sec = (end_utc - Time(time_grid_utc[timeslot_id])).to("s")
else:
delta_t_sec = (0.5 * u.hr).to("s")
# Convert the pointing RA, DEC to the horizontal coordinates (Az, El)
# at this LST
target_altaz = LOW.altaz(time_grid_utc[timeslot_id], target)
# Now that we have all the relevant boundary conditions for this time cell, query the DB
# and get the SEFD values corresponding the values in the fine frequency grid
sefd_i_fine = sefd_table.lookup_stokes_i_sefd(
target_altaz.az.value,
target_altaz.alt.value,
start_lst,
end_lst,
freq_fine_mhz,
)
# Calculate sensitivity from SEFD
return _sefd_to_sensitivity(
sefd_i_fine,
calculator_input.num_stations,
delta_t_sec.value,
bandwidth_mhz,
)
LOGGER.debug("Calculating sensitivities for %s timeslots", n_time_elem)
sensitivity_intervals = [
calculate_sensitivity_for_timeslot(timeslot_id)
for timeslot_id in range(n_time_elem)
]
return _stack_sensitivities(sensitivity_intervals, n_scans)
def _find_observation_time_utc(
pointing: str | SkyCoord,
duration_hour: float | u.Quantity,
elevation_limit: float | u.Quantity,
) -> Tuple[Time, Time, float]:
"""
For a given pointing, find a suitable time range when the source is visible, splitting into
multiple scans if necessary.
:return: (start_utc, end_utc, n_scans) where n_scans is the number of scans, each with length end_utc-start_utc
in case the source is over the horizon for only a short period of time.
Note that n_scans is a float.
"""
target = _get_target(pointing)
if type(duration_hour) is not u.Quantity:
duration_hour = duration_hour * u.hr
if type(elevation_limit) is not u.Quantity:
elevation_limit = elevation_limit * u.deg
# identifying target visibility
observable = is_observable(
AltitudeConstraint(min=elevation_limit),
Observer.at_site("ska-low"),
target,
time_grid_from_range((TIME_0 - 0.5 * u.day, TIME_0 + 0.5 * u.day)),
)[0]
if not observable:
raise ValueError(
"Specified pointing centre is always below the horizon from the"
" SKA LOW site"
)
next_transit = LOW.target_meridian_transit_time(TIME_0, target, which="next")
observable_in_single_scan = is_always_observable(
AltitudeConstraint(min=elevation_limit),
Observer.at_site("ska-low"),
target,
time_grid_from_range(
(
next_transit - 0.5 * duration_hour,
next_transit + 0.5 * duration_hour,
)
),
)[0]
if observable_in_single_scan:
start_utc = next_transit - 0.5 * duration_hour
end_utc = next_transit + 0.5 * duration_hour
n_scans = 1
LOGGER.debug(
"Duration fits within visible time of the source. Start UTC:"
" %s, End UTC: %s, Num of scans: %s",
start_utc,
end_utc,
n_scans,
)
else:
next_rise = LOW.target_rise_time(
TIME_0, target, which="next", horizon=elevation_limit
)
next_set = LOW.target_set_time(
TIME_0, target, which="next", horizon=elevation_limit
)
if next_rise < next_set:
visible_time = next_set - next_rise
else:
previous_rise = LOW.target_rise_time(TIME_0, target, which="previous")
visible_time = next_set - previous_rise
start_utc = next_transit - 0.5 * visible_time
end_utc = next_transit + 0.5 * visible_time
n_scans = (duration_hour / visible_time.to(u.hr)).value
LOGGER.debug(
"Duration longer than visible time of the source. Start UTC:"
" %s, End UTC: %s, Num of scans: %s",
start_utc,
end_utc,
n_scans,
)
return start_utc, end_utc, n_scans
def _get_target(pointing_centre: str | SkyCoord) -> FixedTarget:
"""
create an astroplan.FixedTarget() object from a specified pointing centre
ToDo: extend this to accept a target name that can be queried?
:param pointing_centre: eg 10:00:00 -30:00:00 or astropy.coordinates.sky_coordinate.SkyCoord object
:return: a corresponding astroplan.FixedTarget object
"""
match pointing_centre:
case str():
point_coord = SkyCoord(pointing_centre, unit=(u.hourangle, u.deg))
case SkyCoord():
try:
getattr(pointing_centre, "ra")
point_coord = pointing_centre
except AttributeError as e:
return e
target = FixedTarget(name="target", coord=point_coord)
return target
def _get_frequency_grids(
freq_centre: int, bandwidth: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Create an incrementing frequency grid using the freq_centre and bandwidth
:param freq_centre: the user defined input for the observation
:param bandwidth: the user defined input for the observation
:return: a 2 element tuple containing the frequency grid and a grid of ones of the same dimensions
"""
freq_fine_mhz = np.arange(
freq_centre - bandwidth / 2, freq_centre + bandwidth / 2, step=1
)
bandwidth_mhz = np.ones_like(freq_fine_mhz)
# Note that the last frequency cell can be narrower than 1 MHz
# Adjust the last frequency cell value
bandwidth_mhz[-1] = (freq_centre + bandwidth / 2) - freq_fine_mhz[-1]
return freq_fine_mhz, bandwidth_mhz
def _sefd_to_sensitivity(
sefd: np.ndarray,
num_stations: int,
delta_t_sec: float,
bandwidth_mhz: np.ndarray,
) -> float:
"""
Compute Sensitivity from SEFD for a given number of antennas, observing time, and bandwidth.
If SEFD is an array with multiple elements, return the stacked sensitivity.
:param sefd: value of SEFD from the look up table
:param num_stations: the user defined input for the observation
:param delta_t_sec: the size of the time cell in seconds
:param bandwidth_mhz: the array of 1s representing the bandwidth
:return: the Stokes I sensitivity for this cell
"""
sensitivity = sefd / np.sqrt(
num_stations * (num_stations - 1) * delta_t_sec * bandwidth_mhz * 1e6
)
stacked_sensitivity = np.sqrt(np.sum(sensitivity**2)) / len(sefd)
return stacked_sensitivity
def _stack_sensitivities(
sensitivity_intervals: List[float], n_scans
) -> CalculatorResult:
"""
Scale the per-scan sensitivity to get the final sensitivity
:param sensitivity_intervals:
:param n_scans: The number of scans the observation is split into
:return: The result, in uJy/beam
"""
sensitivity_intervals = np.asarray(sensitivity_intervals)[
np.isfinite(sensitivity_intervals)
]
weight = 1 / sensitivity_intervals**2
sensitivity_per_scan = np.sqrt(
np.sum((sensitivity_intervals * weight) ** 2)
) / np.sum(weight)
final_sensitivity = sensitivity_per_scan / np.sqrt(n_scans)
return CalculatorResult(final_sensitivity * 1e6, "uJy/beam")