"""
Module holding functions useful to the MidCalculator
"""
import logging
import os
from enum import Enum
from math import sqrt
import astropy.units as u
import numpy as np
from astropy.constants import c, k_B
from astropy.table import Table
from astropy.units import Quantity
from scipy.interpolate import interp1d
from ska_ost_senscalc.subarray import (
LOWArrayConfiguration,
MIDArrayConfiguration,
SubarrayStorage,
)
from ska_ost_senscalc.utilities import (
MEERKAT_DISH_POINTING_ERROR,
SKA_DISH_POINTING_ERROR,
STATIC_DATA_PATH,
Atmosphere,
Celestial,
DishType,
Telescope,
TelParams,
Utilities,
)
logger = logging.getLogger("senscalc")
# pylint: disable=invalid-name
celestial = Celestial()
[docs]def read_table(file_name, columns=None):
"""
function to read in the lookup tables
"""
path_to_table = STATIC_DATA_PATH / "lookups" / file_name
return Table.read(path_to_table, include_names=columns)
mid_weighting_table = read_table("beam_weighting_mid.ecsv")
mid_reference_frequency = mid_weighting_table.meta["reference_frequency"]
low_weighting_table = read_table("beam_weighting_low.ecsv")
low_reference_frequency = low_weighting_table.meta["reference_frequency"]
weighting_tables = {
Telescope.MID: mid_weighting_table,
Telescope.LOW: low_weighting_table,
}
ref_frequencies = {
Telescope.MID: mid_reference_frequency,
Telescope.LOW: low_reference_frequency,
}
confusion_noise_table = read_table("confusion_noise.ecsv")
# original look up table in linear space, converting the table to log space for
# the calculation
for k in confusion_noise_table.keys():
confusion_noise_table[k] = np.log10(confusion_noise_table[k])
# look up table has the subarrays stored according to their filename stem. creating a new column that holds the
# subarray label data
mid_sub_storage = SubarrayStorage(Telescope.MID)
file_mapping = mid_sub_storage.filename_label_mapping()
mid_weighting_table.add_column(
[
file_mapping[os.path.splitext(file_stem)[0]]
for file_stem in mid_weighting_table["subarray"]
],
name="subarray_label",
)
low_sub_storage = SubarrayStorage(Telescope.LOW)
file_mapping = low_sub_storage.filename_label_mapping()
low_weighting_table.add_column(
[
file_mapping[os.path.splitext(file_stem)[0]]
for file_stem in low_weighting_table["subarray"]
],
name="subarray_label",
)
[docs]class Weighting(Enum):
"""
Enumeration for different weighting
"""
NATURAL = "natural"
ROBUST = "robust"
UNIFORM = "uniform"
[docs]class CalculatorMode(Enum):
"""
Enumeration for calculator modes
"""
LINE = "line"
CONTINUUM = "continuum"
PULSAR = "pulsar"
[docs]class Limit(Enum):
"""
Enumeration for different types of limit
"""
UPPER = "upper limit"
LOWER = "lower limit"
VALUE = "value"
[docs]class BeamSize(object):
"""Class for returning a BeamSize object.
:param beam_maj: the beam major axis in degrees
:type beam_maj: astropy.units.Quantity or number
:param beam_min: the beam minor axis in degrees
:type beam_min: astropy.units.Quantity or number
:param beam_pa: the beam position angle in degrees
:type beam_pa: astropy.units.Quantity or number
:return: BeamSize object
:rtype: object
"""
def __init__(self, beam_maj, beam_min, beam_pa):
"""
BeamSize class constructor
"""
self.beam_maj = Utilities.to_astropy(beam_maj, u.deg)
self.beam_min = Utilities.to_astropy(beam_min, u.deg)
self.beam_pa = Utilities.to_astropy(beam_pa, u.deg)
def __repr__(self):
return f"<BeamSize({self.beam_maj}, {self.beam_min}, {self.beam_pa})>"
[docs]def eta_bandpass():
"""Efficiency factor for due to the departure of the bandpass from an ideal, rectangular
shape. For now this is a placeholder.
:return: the efficiency, eta
:rtype: float
"""
eta = 1.0
logger.debug(f"eta_bandpass() -> {eta}")
return eta
[docs]def eta_coherence(obs_freq):
"""Efficiency factor for the sensitivity degradation due to the
incoherence on a baseline.
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:return: the efficiency, eta
:rtype: float
"""
eta = np.exp(np.log(0.98) * obs_freq.to_value(u.GHz) ** 2 / 15.4**2)
logger.debug(f"eta_coherence({obs_freq}) -> {eta}")
return eta
[docs]def eta_correlation():
"""Efficiency factor due to imperfection in the correlation algorithm, e.g. truncation error.
:return: the efficiency, eta
:rtype: float
"""
eta = 0.98
logger.debug(f"eta_correlation() -> {eta}")
return eta
[docs]def eta_digitisation(obs_band):
"""Efficiency factor due to losses from quantisation during signal digitisation.
This process is independent of the telescope and environment, but only depends on
the 'effective number of bits' (ENOB) of the system, which depends in turn on
digitiser quality and clock jitter, and on band flatness.
:param obs_band: the observing band
:type obs_band: str
:return: the efficiency, eta
:rtype: float
"""
if obs_band == "Band 1":
eta = 0.999
elif obs_band == "Band 2":
eta = 0.999
elif obs_band == "Band 3":
eta = 0.998
elif obs_band == "Band 4":
eta = 0.98
elif obs_band == "Band 5a":
eta = 0.955
elif obs_band == "Band 5b":
eta = 0.955
else:
raise RuntimeError("bad obs_band: %s" % obs_band)
logger.debug(f"eta_digitisation() -> {eta}")
return eta
[docs]def eta_dish(obs_freq, dish_type):
"""Efficiency factor due to losses for specified dish type.
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:param dish_type: the type of dish
:type dish_type: DishType
:return: the efficiency, eta
:rtype: float
"""
eta = TelParams.calculate_dish_efficiency(obs_freq, dish_type)
logger.debug(f"eta_dish({obs_freq}, {dish_type}) -> {eta}")
return eta
[docs]def eta_point(obs_freq, dish_type):
"""Efficiency factor at the observing frequency due to the dish pointing error.
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:param dish_type: the type of dish
:type dish_type: DishType
:return: the efficiency, eta
:rtype: float
"""
if dish_type is DishType.SKA1:
pointing_error = SKA_DISH_POINTING_ERROR
elif dish_type is DishType.MeerKAT:
pointing_error = MEERKAT_DISH_POINTING_ERROR
else:
raise RuntimeError("bad dish_type: %s" % dish_type)
eta = 1.0 / (
1.0
+ (
8
* np.log(2)
* (pointing_error / TelParams.dish_fwhm(obs_freq, dish_type)) ** 2
)
)
eta = eta.to_value(u.dimensionless_unscaled)
logger.debug(f"eta_point({obs_freq}, {dish_type}) -> {eta}")
return eta
# Not currently used.
[docs]def eta_rfi():
"""Efficiency factor due to Radio Frequency Interference (RFI)
:return: the efficiency, eta
:rtype: float
"""
eta = 1.00
logger.debug(f"eta_rfi() -> {eta}")
return eta
[docs]def eta_system(
eta_point, eta_coherence, eta_digitisation, eta_correlation, eta_bandpass
):
"""System efficiency for SKA interferometer
:param eta_point: efficiency loss due to pointing errors
:type eta_point: float
:param eta_coherence: efficiency due to loss of coherence
:type eta_coherence: float
:param eta_digitisation: efficiency loss due to errors in the digitisation process
:type eta_digitisation: float
:param eta_correlation: efficiency loss due to errors in the correlation process
:type eta_correlation: float
:param eta_bandpass: efficiency loss due to the bandpass not being rectangular
:type eta_bandpass: float
:return: the system efficiency
:rtype: float
"""
eta_system = (
eta_point * eta_coherence * eta_digitisation * eta_correlation * eta_bandpass
)
logger.debug(
f"eta_system({eta_point}, {eta_coherence}, {eta_digitisation},"
f" {eta_correlation}, {eta_bandpass}) -> {eta_system}"
)
return eta_system
[docs]def Tgal(target, obs_freq, dish_type, alpha):
"""Brightness temperature of Galactic background in target direction at observing frequency.
:param target: target direction
:type target: astropy.SkyCoord
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:param dish_type: the type of dish
:type dish_type: DishType
:param alpha: spectral index of emission
:type alpha: float
:return: the brightness temperature of the Galactic background
:rtype: astropy.units.Quantity
"""
result = celestial.calculate_Tgal(target, obs_freq, dish_type, alpha)
logger.debug(f"Tgal({target}, {obs_freq}, {dish_type}, {alpha}) -> {result}")
return result
[docs]def Trcv(obs_freq, obs_band, dish_type):
"""Receiver temperature for specified freq, band and dish.
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:param obs_band: the observing band
:type obs_band: str
:param dish_type: the type of dish
:type dish_type: DishType
:return: the receiver temperature
:rtype: astropy.units.Quantity
"""
result = TelParams.calculate_Trcv(obs_freq, obs_band, dish_type)
logger.debug(f"Trcv({obs_freq}, {obs_band}, {dish_type}) -> {result}")
return result
[docs]def Tsky(Tgal, obs_freq, elevation, weather):
"""Brightness temperature of sky in target direction.
:param Tgal: brightness temperature of Galactic background
:type Tgal: astropy.units.Quantity
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:param elevation: the observing elevation
:type elevation: astropy.units.Quantity
:param weather: the atmosphere PWV
:type weather: float
:return: the brightness temperature of the sky
:rtype: astropy.units.Quantity
"""
# Tatm, Tcmb cannot be overriden by the user
# Tatm at zenith
Tatm_zen = Atmosphere.calculate_Tatm(weather, obs_freq)
# Tcmb and Tgal are attenuated by the atmosphere,
# Tatm also varies with zenith angle
tau_zen = Atmosphere.get_tauz_atm(weather, obs_freq)
zenith = 90.0 * u.deg - elevation
tau = tau_zen / np.cos(zenith)
# approximating Tatm ~ Tphys * (1 - exp(-tau))
Tphys = Tatm_zen / (1 - np.exp(-tau_zen))
result = Tphys * (1 - np.exp(-tau)) + (Tgal + celestial.Tcmb) * np.exp(-tau)
logger.debug(f"Tsky({Tgal}, {obs_freq}, {elevation}, {weather}) -> {result}")
return result
[docs]def Tspl(dish_type):
"""Spillover temperature for specified dish type.
:param dish_type: the type of dish
:type dish_type: DishType
:return: the spillover temperature
:rtype: astropy.units.Quantity
"""
result = TelParams.calculate_Tspl(dish_type)
logger.debug(f"Tspl() -> {result}")
return result
[docs]def Tsys_dish(Trcv, Tspl, Tsky, obs_freq):
"""System temperature.
:param Trcv: the receiver temperature
:type Trcv: astropy.units.Quantity
:param Tspl: the spillover temperature
:type Tspl: astropy.units.Quantity
:param Tsky: the sky temperature
:type Tsky: astropy.units.Quantity
:param obs_freq: the observing frequency
:type obs_freq: astropy.units.Quantity
:return: the dish system temperature
:rtype: astropy.units.Quantity
"""
# Add the temperatures to get the system temperature on the ground
Tsys_ground = Trcv + Tspl + Tsky
# Apply high-frequency correction to Rayleigh-Jeans temperature
result = Utilities.Tx(obs_freq, Tsys_ground)
result = result.to(u.K)
logger.debug(f"Tsys_dish({Trcv}, {Tspl}, {Tsky}, {obs_freq}) -> {result}")
return result
[docs]class Beam:
"""Beam class
:param frequency: the observing frequency (Hz)
:type frequency: astropy.units.Quantity or number
:param zoom_frequencies: the frequencies of the line zooms (Hz)
:type zoom_frequencies: iterable astropy.units.Quantity or list
:param array_configuration: the subarray configuration
:type array_configuration: ArrayConfiguration
:param weighting: the type of uv-weighting used (robust or uniform)
:type weighting: Weighting
:param robustness: the Briggs robustness parameter
:type robustness: float
:param dec: the target declination (deg)
:type dec: astropy.units.Quantity or number
:param taper: the Gaussian taper to apply (arcsec)
:type taper: astropy.units.Quantity or number
:param calculator_mode: the calculator mode being used
:type calculator_mode: CalculatorMode
:return: Beam object
:rtype: object
"""
def __init__(
self,
*,
frequency=None,
zoom_frequencies=None,
array_configuration,
weighting,
robustness=None,
dec,
taper=0.0,
calculator_mode,
telescope=Telescope.MID,
):
self.weighting_table = weighting_tables[telescope]
self.reference_frequency = ref_frequencies[telescope]
if frequency is not None:
self.frequency = Utilities.to_astropy(frequency, u.Hz)
if zoom_frequencies is not None:
self.zoom_frequencies = Utilities.to_astropy(zoom_frequencies, u.Hz)
self.dec = Utilities.to_astropy(dec, u.deg)
self.weighting = weighting
if weighting != Weighting.ROBUST and robustness:
logger.warning(
f"Robust parameter defined ({robustness}) but will be"
f" ignored for given weighting ({weighting.name})"
)
self.robustness = robustness
self.array_configuration = array_configuration
if array_configuration is None:
raise RuntimeError("Parameter 'array_configuration' is required")
elif (
array_configuration not in MIDArrayConfiguration
and array_configuration not in LOWArrayConfiguration
):
raise RuntimeError(
f"Invalid array configuration: {array_configuration} for telescope {telescope}"
)
else:
self.array_configuration = array_configuration
if isinstance(taper, u.quantity.Quantity):
taper_arcsec = taper.to(u.arcsec)
else:
taper_arcsec = taper * u.arcsec
self.taper = taper_arcsec
self.calculator_mode = calculator_mode
self.telescope = telescope
[docs] def beam_size(self):
"""
method to return list of beam sizes.
"""
weighted = self.__filter_table()
self._validate_declination(weighted)
interp_bmaj = interp1d(
weighted["declination"],
weighted["cleanbeam_bmaj"],
fill_value="extrapolate",
)
interp_bmin = interp1d(
weighted["declination"],
weighted["cleanbeam_bmin"],
fill_value="extrapolate",
)
interp_bpa = interp1d(
weighted["declination"],
weighted["cleanbeam_bpa"],
fill_value="extrapolate",
)
beam_maj = interp_bmaj(self.dec) * u.deg
beam_min = interp_bmin(self.dec) * u.deg
beam_pa = interp_bpa(self.dec)
obj = []
if self.calculator_mode is CalculatorMode.LINE:
frequencies = self.zoom_frequencies
else:
frequencies = [self.frequency]
for freq in frequencies:
obj.append(
BeamSize(
beam_maj=beam_maj * self.reference_frequency / freq.to(u.GHz),
beam_min=beam_min * self.reference_frequency / freq.to(u.GHz),
beam_pa=beam_pa,
)
)
return obj
[docs] def weighting_factor(self):
"""
method to calculate the beam weighting factor of the observation
"""
# As of PI#15 we are no longer interpolating over frequency, elevation or integration time;
# only declination. This has considerably simplified this method. If interpolation over more
# than one parameter is required see commit 3640f432
if self.weighting is Weighting.NATURAL and self.taper == 0.0 * u.arcsec:
return 1.0
else:
weighted = self.__filter_table()
self._validate_declination(weighted)
natural = self.__filter_table(
use_weighting=Weighting.NATURAL, use_taper=0.0 * u.arcsec
)
interp_weighted = interp1d(
weighted["declination"],
weighted["pss_casa"],
fill_value="extrapolate",
)
interp_natural = interp1d(
natural["declination"],
natural["pss_casa"],
fill_value="extrapolate",
)
return interp_weighted(self.dec) / interp_natural(self.dec)
def _validate_declination(self, weighted):
# As of PI19, the weighting tables do not have values for every
# allowed declination. It was decided to extrapolate the table
# values within reasonable limits: to 1 degree of the maximum
# calculated declination.
extrapolation_tolerance = 1.0
dec_min = weighted["declination"].min()
dec_max = weighted["declination"].max() + extrapolation_tolerance
# Raise RuntimeError if declination value is outside the allowed
# extrapolation limits
if self.dec.value < dec_min or self.dec.value > dec_max:
raise ValueError(
"Cannot extrapolate beyond declination limits of "
f"{dec_min} <= dec < {dec_max}"
)
[docs] def surface_brightness_conversion_factor(self) -> Quantity:
"""
method to calculate the factor the sensitivity returned by calculator must be multiplied by
to obtain the surface brightness sensitivity in units of K/Jy.
"""
if self.calculator_mode is CalculatorMode.LINE:
frequency = self.zoom_frequencies
else:
frequency = self.frequency
wavelength = c / frequency.to(u.s**-1)
factor = (
wavelength**2 / (2.0 * k_B * self.__area()) * (1 * u.sr)
) # fudge factor to remove the /sr
return factor.to(u.K / u.Jy)
[docs] def confusion_noise(self):
"""
method to calculate the confusion noise of beam
"""
confusion_noise = []
lim = []
if self.calculator_mode == CalculatorMode.LINE:
frequency = self.zoom_frequencies.value
else:
frequency = [self.frequency.value]
log_frequency = np.log10(frequency)
beam_geomean = []
for beam in self.beam_size():
beam_geomean.append(sqrt(beam.beam_min.value * beam.beam_maj.value))
log_beam_geomean = np.log10(beam_geomean)
sigma_min = confusion_noise_table["sigma"].min(axis=0)
sigma_max = confusion_noise_table["sigma"].max(axis=0)
for n, log_f in enumerate(log_frequency):
sigma_m = interp1d(
confusion_noise_table["beam"],
confusion_noise_table["sigma"],
bounds_error=False,
axis=0,
fill_value=(sigma_min, sigma_max),
)(log_beam_geomean[n])
sigma = interp1d(
confusion_noise_table["frequency"][0],
sigma_m,
fill_value="extrapolate",
)(log_f)
confusion_noise.append(10**sigma)
if log_beam_geomean[n] < confusion_noise_table["beam"].min():
lim.append(Limit.UPPER.value)
elif log_beam_geomean[n] > confusion_noise_table["beam"].max():
lim.append(Limit.LOWER.value)
else:
lim.append(Limit.VALUE.value)
return confusion_noise * u.Jy, lim
def __filter_table(self, use_weighting=None, use_taper=None):
"""
private method to filter the lookup table
"""
if use_weighting is None:
use_weighting = self.weighting
if use_taper is None:
use_taper = self.taper
elif isinstance(use_taper, u.quantity.Quantity):
use_taper = use_taper.to(u.arcsec)
else:
use_taper = use_taper * u.arcsec
weighted = self.weighting_table[
(
(
self.weighting_table["subarray_label"]
== self.array_configuration.value
)
& (self.weighting_table["weighting"] == use_weighting.value)
& (self.weighting_table["spec_mode"] == self.calculator_mode.value)
& (self.weighting_table["taper_arcsec"] == use_taper.value)
)
]
# Only take robustness into account if weighting is `robust`
if use_weighting == Weighting.ROBUST:
weighted = weighted[(weighted["robustness"] == self.robustness)]
return weighted
def __area(self):
"""
private method to calculate the beam area
"""
beam = self.beam_size()
area = []
for b in beam:
theta = np.sqrt(b.beam_maj * b.beam_min).to(u.rad)
area_sr = (np.pi * theta**2 / (4 * np.log(2))).to(u.sr).value
area.append(area_sr)
return area * u.sr