"""This module contains classes and methods for use in the SKA
Sensitivity Calculator.
It implements the preparation of both the integration and sensitivity inputs and then doing the calculations using them.
"""
import astropy.units as u
import numpy as np
from astropy.coordinates.angles.core import Angle
from astropy.coordinates.sky_coordinate import SkyCoord
from astropy.units.quantity import Quantity
import ska_ost_senscalc.mid_utilities as mid_utils
from ska_ost_senscalc.mid.models import IntegrationInput, SensitivityInput
from ska_ost_senscalc.mid.sefd import SEFD_antenna, SEFD_array
from ska_ost_senscalc.subarray import MIDArrayConfiguration, SubarrayStorage
from ska_ost_senscalc.utilities import (
Atmosphere,
DishType,
Telescope,
TelParams,
Utilities,
)
DEFAULT_ALPHA = 2.75
DEFAULT_EL = 45 * u.deg
DEFAULT_PWV = 10
subarray_storage = SubarrayStorage(Telescope.MID)
def _eta_system(
frequency: Quantity,
rx_band: str,
eta_system: float,
eta_pointing: float,
eta_coherence: float,
eta_digitisation: float,
eta_correlation: float,
eta_bandpass: float,
) -> float:
if eta_system is None:
# Co-dependencies of eta_system
if eta_pointing is None:
eta_pointing = mid_utils.eta_point(frequency, DishType.SKA1)
if eta_coherence is None:
eta_coherence = mid_utils.eta_coherence(frequency)
if eta_digitisation is None:
eta_digitisation = mid_utils.eta_digitisation(rx_band)
if eta_correlation is None:
eta_correlation = mid_utils.eta_correlation()
if eta_bandpass is None:
eta_bandpass = mid_utils.eta_bandpass()
eta_system = mid_utils.eta_system(
eta_pointing,
eta_coherence,
eta_digitisation,
eta_correlation,
eta_bandpass,
)
return eta_system
def _n_ska_meer(
subarray_configuration: MIDArrayConfiguration, n_ska: int, n_meer: int
) -> tuple[int, int]:
# Validation has checked that either n_ska and n_meet are both set, or array_configuration is set
# TODO does the Calculator even need to know the array_configuration? We could set nmeer/nska in the validation layer instead
if subarray_configuration:
array_configuration = subarray_storage.load_by_label(
subarray_configuration.value
)
n_ska = array_configuration.n_ska
n_meer = array_configuration.n_meer
else:
n_ska = n_ska
n_meer = n_meer
return n_ska, n_meer
def _t_gal_ska_meer(
target: SkyCoord,
frequency: Quantity,
t_gal_ska: Quantity,
t_gal_meer: Quantity,
alpha: float,
) -> tuple[Quantity, Quantity]:
# alpha and Tgal
t_gal_ska_parsed = Utilities.to_astropy(t_gal_ska, u.K)
if t_gal_ska_parsed is not None:
t_gal_ska = t_gal_ska_parsed
else:
t_gal_ska = mid_utils.Tgal(target, frequency, DishType.SKA1, alpha)
t_gal_meer_parsed = Utilities.to_astropy(t_gal_meer, u.K)
if t_gal_meer_parsed is not None:
t_gal_meer = t_gal_meer_parsed
else:
t_gal_meer = mid_utils.Tgal(target, frequency, DishType.MeerKAT, alpha)
return t_gal_ska, t_gal_meer
def _t_sky_ska_meer(
frequency: Quantity,
el: Angle,
pwv: int,
t_sky_ska: Quantity,
t_sky_meer: Quantity,
t_gal_ska: Quantity,
t_gal_meer: Quantity,
) -> tuple[Quantity, Quantity]:
# Tsky
t_sky_ska_parsed = Utilities.to_astropy(t_sky_ska, u.K)
if t_sky_ska_parsed is not None:
t_sky_ska = t_sky_ska_parsed
else:
t_sky_ska = mid_utils.Tsky(
t_gal_ska,
frequency,
el,
pwv,
)
t_sky_meer_parsed = Utilities.to_astropy(t_sky_meer, u.K)
if t_sky_meer_parsed is not None:
t_sky_meer = t_sky_meer_parsed
else: # Default value
t_sky_meer = mid_utils.Tsky(
t_gal_meer,
frequency,
el,
pwv,
)
return t_sky_ska, t_sky_meer
def _t_spl_ska_meer(
t_spl_ska: Quantity, t_spl_meer: Quantity
) -> tuple[Quantity, Quantity]:
# Tspl
t_spl_ska_parsed = Utilities.to_astropy(t_spl_ska, u.K)
if t_spl_ska_parsed is not None:
t_spl_ska = t_spl_ska_parsed
else:
t_spl_ska = mid_utils.Tspl(DishType.SKA1)
t_spl_meer_parsed = Utilities.to_astropy(t_spl_meer, u.K)
if t_spl_meer_parsed is not None:
t_spl_meer = t_spl_meer_parsed
else:
t_spl_meer = mid_utils.Tspl(DishType.MeerKAT)
return t_spl_ska, t_spl_meer
def _t_rx_ska_meer(
frequency: Quantity, rx_band: str, t_rx_ska: Quantity, t_rx_meer: Quantity
) -> tuple[Quantity, Quantity]:
# Treceiver
t_rx_ska_parsed = Utilities.to_astropy(t_rx_ska, u.K)
if t_rx_ska_parsed is not None:
t_rx_ska = t_rx_ska_parsed
else:
t_rx_ska = mid_utils.Trcv(frequency, rx_band, DishType.SKA1)
t_rx_meer_parsed = Utilities.to_astropy(t_rx_meer, u.K)
if t_rx_meer_parsed is not None:
t_rx_meer = t_rx_meer_parsed
else:
t_rx_meer = mid_utils.Trcv(frequency, rx_band, DishType.MeerKAT)
return t_rx_ska, t_rx_meer
def _t_sys_ska_meer(
frequency: Quantity,
t_sky_ska: Quantity,
t_sky_meer: Quantity,
t_spl_ska: Quantity,
t_spl_meer: Quantity,
t_rx_ska: Quantity,
t_rx_meer: Quantity,
t_sys_ska: Quantity,
t_sys_meer: Quantity,
) -> tuple[Quantity, Quantity]:
# Tsys
t_sys_ska_parsed = Utilities.to_astropy(t_sys_ska, u.K)
if t_sys_ska_parsed is not None:
t_sys_ska = t_sys_ska_parsed
else:
t_sys_ska = mid_utils.Tsys_dish(
t_rx_ska,
t_spl_ska,
t_sky_ska,
frequency,
)
t_sys_meer_parsed = Utilities.to_astropy(t_sys_meer, u.K)
if t_sys_meer_parsed is not None:
t_sys_meer = t_sys_meer_parsed
else: # Default value
t_sys_meer = mid_utils.Tsys_dish(
t_rx_meer,
t_spl_meer,
t_sky_meer,
frequency,
)
return t_sys_ska, t_sys_meer
def _prepare_input(
*, # Makes the function keyword-only to avoid mistakes,
rx_band=None,
freq_centre_hz=None,
target=None,
bandwidth_hz=None,
subarray_configuration=None,
pwv=DEFAULT_PWV,
el=DEFAULT_EL,
eta_system=None,
eta_pointing=None,
eta_coherence=None,
eta_digitisation=None,
eta_correlation=None,
eta_bandpass=None,
n_ska=None,
eta_ska=None,
t_sys_ska=None,
t_spl_ska=None,
t_rx_ska=None,
n_meer=None,
eta_meer=None,
t_sys_meer=None,
t_spl_meer=None,
t_rx_meer=None,
t_sky_ska=None,
t_sky_meer=None,
t_gal_ska=None,
t_gal_meer=None,
alpha=DEFAULT_ALPHA,
**kwargs, # To make sure it works with unexpected parameters
) -> tuple[Quantity, float, Quantity, Quantity, Quantity]:
frequency = Utilities.to_astropy(freq_centre_hz, u.Hz)
# check target is visible at some time
location = TelParams.mid_core_location()
if target.icrs.dec > 90 * u.deg + location.lat:
# target is never above the horizon
raise RuntimeError("Target always below horizon")
bandwidth = Utilities.to_astropy(bandwidth_hz, u.Hz)
# Check elevation compatible with target
el = Utilities.to_astropy(el, u.deg)
new_el = 90.0 * u.deg - np.abs(location.to_geodetic().lat - target.icrs.dec)
if new_el < el:
el = new_el
else:
el = el
tau = Atmosphere.tau_atm(pwv, frequency, el)
eta_system = _eta_system(
frequency,
rx_band,
eta_system,
eta_pointing,
eta_coherence,
eta_digitisation,
eta_correlation,
eta_bandpass,
)
# eta dish
if eta_ska is None:
eta_ska = mid_utils.eta_dish(frequency, DishType.MeerKAT)
if eta_meer is None:
eta_meer = mid_utils.eta_dish(frequency, DishType.MeerKAT)
n_ska, n_meer = _n_ska_meer(subarray_configuration, n_ska, n_meer)
t_gal_ska, t_gal_meer = _t_gal_ska_meer(
target, frequency, t_gal_ska, t_gal_meer, alpha
)
t_sky_ska, t_sky_meer = _t_sky_ska_meer(
frequency, el, pwv, t_sky_ska, t_sky_meer, t_gal_ska, t_gal_meer
)
t_spl_ska, t_spl_meer = _t_spl_ska_meer(t_spl_ska, t_spl_meer)
t_rx_ska, t_rx_meer = _t_rx_ska_meer(frequency, rx_band, t_rx_ska, t_rx_meer)
t_sys_ska, t_sys_meer = _t_sys_ska_meer(
frequency,
t_sky_ska,
t_sky_meer,
t_spl_ska,
t_spl_meer,
t_rx_ska,
t_rx_meer,
t_sys_ska,
t_sys_meer,
)
# Compute SEFD
sefd_ska = SEFD_antenna(
t_sys_ska,
eta_ska * TelParams.dish_area(DishType.SKA1),
)
sefd_meer = SEFD_antenna(
t_sys_meer,
eta_meer * TelParams.dish_area(DishType.MeerKAT),
)
sefd_array = SEFD_array(n_ska, n_meer, sefd_ska, sefd_meer)
return sefd_array, eta_system, bandwidth, tau, el
[docs]
def calculate_sensitivity(params: SensitivityInput) -> Quantity:
"""Calculate sensitivity in Janskys for a specified integration time.
:param params: parameters prepared before using prepare prepare_sensitivity_input function
:type params: SensitivityInput
:return: the sensitivity of the telescope
:rtype: astropy.units.Quantity
"""
# test that integration_time is a time > 0
if params.integration_time.to_value(u.s) < 0.0:
raise RuntimeError("negative integration time")
sensitivity = params.sefd_array / (
params.eta_system * np.sqrt(2 * params.bandwidth * params.integration_time)
)
# Now want to calculate Tsys in space so that we can compare with
# astronomical fluxes. This means correcting Tsys for the attenuation
# due to the atmosphere in the direction of the target.
return sensitivity * np.exp(params.tau)
[docs]
def calculate_integration_time(params: IntegrationInput) -> Quantity:
"""Calculate the integration time (in seconds) required to reach the specified sensitivity.
:param params: parameters prepared before using prepare prepare_integration_input function
:type params: IntegrationInput
:return: the integration time required
:rtype: astropy.units.Quantity
"""
# test that sensitivity converts to Jy and is > 0
if params.sensitivity.to_value(u.Jy) < 0.0:
raise RuntimeError("negative sensitivity")
integration_time = (
np.exp(params.tau)
* params.sefd_array
/ (params.sensitivity * params.eta_system)
) ** 2 / (2 * params.bandwidth)
integration_time[params.elevation <= 0.0 * u.deg] = -1.0 * u.s
return integration_time.to(u.s)