Source code for ska_ost_senscalc.low.service
"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
from dataclasses import asdict
from typing import List, Optional
from astropy import units as u
from astropy.units import Quantity
from ska_ost_senscalc.common.model import (
CalculatorInputPSS,
ContinuumCalculatorAndWeightingInput,
ContinuumSensitivitiesTransformationsInput,
ContinuumSensitivityResponse,
ContinuumSensitivityResults,
ContinuumWeightingResponse,
PSSSensitivityResponse,
PulsarMode,
PulsarSamplingTime,
SensitivitiesTransformationsWithoutWeightingInput,
SensitivityTransformationsSubbandsInput,
SingleZoomSensitivityResponse,
SingleZoomWeightingResponse,
SubbandResponse,
WeightingContinuumSpectralResults,
WeightingSpectralMode,
ZoomCalculatorAndWeightingInput,
ZoomSensitivitiesTransformationsInput,
ZoomSensitivityResponse,
ZoomSensitivityTransformedResults,
)
from ska_ost_senscalc.common.pss import convert_continuum_to_bf_sensitivity
from ska_ost_senscalc.common.service import (
calculate_continuum_sensitivities_subbands_results,
calculate_continuum_sensitivity,
calculate_zoom_results,
calculate_zoom_results_without_weighting,
get_continuum_weighting_response,
get_subbands,
get_weighted_sensitivity,
get_zoom_weighting_response,
is_beam_non_gaussian,
)
from ska_ost_senscalc.common.spectropolarimetry import (
SpectropolarimetryInput,
get_spectropolarimetry_results,
)
from ska_ost_senscalc.low.bright_source_lookup import BrightSourceCatalog
from ska_ost_senscalc.low.calculator import calculate_sensitivity
from ska_ost_senscalc.low.model import CalculatorInput
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope
subarray_storage = SubarrayStorage(Telescope.LOW)
FLUX_DENSITY_THRESHOLD_JY = 10.0
LOW_CONTINUUM_CHANNEL_WIDTH_KHZ = 24 * 781.25 / (4096 * 27 / 32)
[docs]
def convert_continuum_input_and_calculate(
calc_weight_input: ContinuumCalculatorAndWeightingInput,
) -> ContinuumSensitivityResponse:
"""
:param user_input: A ContinuumCalculatorAndWeightingInput object containing the parameters sent by the user
having been validated and converted for the calculator and weighting calculations.
:return: a SensitivityResponse with the calculated sensitivity and its units along
with weighting results and the transformed results containing the results combining calculate
and weighting results in a format easily usable by the UI."""
continuum_calculator_input = CalculatorInput(
freq_centre_mhz=calc_weight_input.freq_centre.to(u.MHz).value,
bandwidth_mhz=calc_weight_input.bandwidth_mhz,
num_stations=calc_weight_input.num_stations,
pointing_centre=calc_weight_input.pointing_centre,
integration_time_h=calc_weight_input.integration_time_h,
elevation_limit=calc_weight_input.elevation_limit,
)
effective_resolution_mhz = (
LOW_CONTINUUM_CHANNEL_WIDTH_KHZ / 1e3
) * calc_weight_input.spectral_averaging_factor
spectral_calculator_input = CalculatorInput(
freq_centre_mhz=calc_weight_input.freq_centre.to(u.MHz).value,
bandwidth_mhz=effective_resolution_mhz,
num_stations=calc_weight_input.num_stations,
pointing_centre=calc_weight_input.pointing_centre,
integration_time_h=calc_weight_input.integration_time_h,
elevation_limit=calc_weight_input.elevation_limit,
)
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=Quantity(calc_weight_input.bandwidth_mhz, "MHz"),
frequency=calc_weight_input.freq_centre.to(u.MHz),
effective_channel_width=Quantity(effective_resolution_mhz, "MHz"),
)
warnings = []
continuum_result = _get_calculation_value(continuum_calculator_input, warnings)
spectral_result = _get_calculation_value(spectral_calculator_input, warnings)
# Warnings will be identical, so remove duplicate:
warnings = list(set(warnings))
subband_sensitivities = _get_subband_sensitivities(
calc_weight_input, calc_weight_input.num_stations
)
spectropolarimetry_results = get_spectropolarimetry_results(
spectropolarimetry_input
)
# we only get weighting results if not a custom subarray
weighted_continuum = None
weighted_spectral = None
if calc_weight_input.subarray_configuration is not None:
# get continuum weighting results
weighted_continuum: ContinuumWeightingResponse = (
get_continuum_weighting_response(calc_weight_input)
)
# switch to spectral mode
calc_weight_input.spectral_mode = WeightingSpectralMode.LINE
# get spectral weighting results
weighted_spectral: ContinuumWeightingResponse = (
get_continuum_weighting_response(calc_weight_input)
)
transformed_warnings = []
continuum_sensitivities_transformations_input = (
ContinuumSensitivitiesTransformationsInput(
continuum_sensitivity=continuum_result,
spectral_sensitivity=spectral_result,
continuum_weighting_factor=weighted_continuum.weighting_factor
if weighted_continuum
else None,
spectral_weighting_factor=weighted_spectral.weighting_factor
if weighted_spectral
else None,
continuum_sbs_conv_factor=weighted_continuum.sbs_conv_factor
if weighted_continuum
else None,
spectral_sbs_conv_factor=weighted_spectral.sbs_conv_factor
if weighted_spectral
else None,
continuum_conf_noise=weighted_continuum.confusion_noise.value
if weighted_continuum
else None,
spectral_conf_noise=weighted_spectral.confusion_noise.value
if weighted_spectral
else None,
continuum_beam_min=weighted_continuum.beam_size.beam_min_scaled
if weighted_continuum
else None,
continuum_beam_maj=weighted_continuum.beam_size.beam_maj_scaled
if weighted_continuum
else None,
spectral_beam_min=weighted_spectral.beam_size.beam_min_scaled
if weighted_spectral
else None,
spectral_beam_maj=weighted_spectral.beam_size.beam_maj_scaled
if weighted_spectral
else None,
)
)
transformed_result = asdict(
calculate_continuum_sensitivity(
continuum_sensitivities_transformations_input,
transformed_warnings,
calc_weight_input.weighting_mode,
calc_weight_input.robustness,
is_custom=calc_weight_input.subarray_configuration is None,
)
)
subbands_transformed_result = {}
if subband_sensitivities:
# get subbands transformed result
weighted_subbands_input = SensitivityTransformationsSubbandsInput(
calculate_subbands=subband_sensitivities,
weighting_subbands=weighted_continuum.subbands
if weighted_continuum and weighted_continuum.subbands
else None,
)
subbands_transformed_result = asdict(
(
calculate_continuum_sensitivities_subbands_results(
weighted_subbands_input,
non_gaussian=is_beam_non_gaussian(
calc_weight_input.weighting_mode,
calc_weight_input.robustness,
),
is_custom=calc_weight_input.subarray_configuration is None,
warnings=[],
)
)
)
transformed_result["subbands"] = subbands_transformed_result
weighting = asdict(
WeightingContinuumSpectralResults(
continuum_weighting=weighted_continuum,
spectral_weighting=weighted_spectral,
)
)
response = ContinuumSensitivityResponse(
calculate=ContinuumSensitivityResults(
continuum_sensitivity=continuum_result,
continuum_subband_sensitivities=subband_sensitivities,
spectral_sensitivity=spectral_result,
warnings=warnings,
spectropolarimetry_results=spectropolarimetry_results,
),
weighting=weighting,
transformed_result=transformed_result,
)
return response
[docs]
def convert_zoom_input_and_calculate(
params: ZoomCalculatorAndWeightingInput,
) -> list[ZoomSensitivityResponse]:
"""
:param user_input: A kwarg dict of the HTTP parameters sent by the user
:return: a dict containing the calculated sensitivity and its units
"""
calculations = []
for freq_centre, spectral_resolution_hz, total_bandwidth_khz in zip(
params.freq_centres,
params.spectral_resolutions_hz,
params.total_bandwidths_khz,
):
effective_resolution_hz = (
spectral_resolution_hz * params.spectral_averaging_factor
)
calculator_input = CalculatorInput(
freq_centre_mhz=freq_centre.to(u.MHz).value,
bandwidth_mhz=effective_resolution_hz * 1e-6, # Convert to MHz
num_stations=params.num_stations,
pointing_centre=params.pointing_centre,
integration_time_h=params.integration_time_h,
elevation_limit=params.elevation_limit,
)
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=Quantity(total_bandwidth_khz, "kHz"),
frequency=freq_centre.to(u.MHz),
effective_channel_width=Quantity(effective_resolution_hz, "Hz"),
)
warnings = []
sensitivity = _get_calculation_value(calculator_input, warnings)
spectropolarimetry_results = get_spectropolarimetry_results(
spectropolarimetry_input
)
calculations.append(
SingleZoomSensitivityResponse(
freq_centre=freq_centre.to(u.MHz),
spectral_sensitivity=sensitivity,
warnings=warnings,
spectropolarimetry_results=spectropolarimetry_results,
)
)
transformed_warnings = []
if params.subarray_configuration is None:
weightings = None
transformed_result = _zoom_custom(calculations, transformed_warnings)
else:
weightings = get_zoom_weighting_response(params)
if is_beam_non_gaussian(params.weighting_mode, params.robustness):
transformed_result = _zoom_non_gaussian(
calculations, weightings, transformed_warnings
)
else:
transformed_result = _zoom_gaussian(
calculations, weightings, transformed_warnings
)
response = ZoomSensitivityResponse(
calculate=calculations,
weighting=weightings,
transformed_result=transformed_result,
)
return asdict(response)
def _zoom_non_gaussian(
calculations: list[SingleZoomSensitivityResponse],
weightings: list[SingleZoomWeightingResponse],
transformed_warnings: list[dict],
) -> dict:
transformed_result = []
for calculation, weighting in zip(calculations, weightings):
calculation: SingleZoomSensitivityResponse
weighting: SingleZoomWeightingResponse
weighted_spectral_sensitivity = get_weighted_sensitivity(
calculation.spectral_sensitivity,
weighting.weighting_factor,
)
result = ZoomSensitivityTransformedResults(
weighted_spectral_sensitivity=weighted_spectral_sensitivity,
warnings=transformed_warnings,
)
transformed_result.append(result)
return transformed_result
def _zoom_gaussian(
calculations: list[SingleZoomSensitivityResponse],
weightings: list[SingleZoomWeightingResponse],
transformed_warnings: list[dict],
) -> dict:
transformed_result = []
for calculation, weighting in zip(calculations, weightings):
result = calculate_zoom_results(
ZoomSensitivitiesTransformationsInput(
confusion_noise=weighting.confusion_noise.value,
maj_beam_size=weighting.beam_size.beam_maj_scaled,
min_beam_size=weighting.beam_size.beam_min_scaled,
sbs_conv_factor=weighting.sbs_conv_factor,
spectral_sensitivity=calculation.spectral_sensitivity,
weighting_factor=weighting.weighting_factor,
),
transformed_warnings,
)
transformed_result.append(result)
return transformed_result
def _zoom_custom(
calculations: list[SingleZoomSensitivityResponse], transformed_warnings: list[dict]
) -> list[ZoomSensitivityTransformedResults]:
# get non weighting (custom) weighted results
transformed_result = []
for calculation in calculations:
result = calculate_zoom_results_without_weighting(
SensitivitiesTransformationsWithoutWeightingInput(
spectral_sensitivity=calculation.spectral_sensitivity,
),
transformed_warnings, # TODO: Does not seem to be a good idea to include the same list to every result? And it is just a one flag.
)
transformed_result.append(result)
return transformed_result
[docs]
def convert_pss_input_and_calculate(user_input: dict) -> PSSSensitivityResponse:
"""
:param user_input: A kwarg dict of the HTTP parameters sent by the user
:return: a dict containing the calculated sensitivity and its units
"""
num_stations = _num_stations_from_input(user_input)
pulsar_mode = PulsarMode(user_input["pulsar_mode"])
# First, create a CalculatorInputPSS object.
# Note that, for PSS single-pulse mode, the integration time
# should be set to PSS sampling time
calculator_input_pss = CalculatorInputPSS(
freq_centre_mhz=user_input["freq_centre_mhz"],
bandwidth_mhz=user_input["bandwidth_mhz"],
chan_width=user_input["spectral_resolution_hz"],
num_stations=num_stations,
pointing_centre=user_input["pointing_centre"],
integration_time_h=(
user_input["integration_time_h"]
if user_input["pulsar_mode"] == "folded_pulse"
else PulsarSamplingTime.LOW_PSS.value.to(u.hour).value
),
elevation_limit=user_input["elevation_limit"],
dm=user_input["dm"],
pulse_period=user_input["pulse_period"],
intrinsic_pulse_width=user_input["intrinsic_pulse_width"],
)
# Next, estimate the continuum sensitivity corresponding to the
# integration time specified above
continuum_input = CalculatorInput(
freq_centre_mhz=calculator_input_pss.freq_centre_mhz,
bandwidth_mhz=calculator_input_pss.bandwidth_mhz,
num_stations=calculator_input_pss.num_stations,
pointing_centre=calculator_input_pss.pointing_centre,
integration_time_h=calculator_input_pss.integration_time_h,
elevation_limit=calculator_input_pss.elevation_limit,
)
# continuum_sensitivity, warning = _get_calculation_value(continuum_input)
warnings = []
continuum_sensitivity = _get_calculation_value(continuum_input)
# Finally, convert the continuum sensitivity to folded-pulse sensitivity
pss_sensitivity = convert_continuum_to_bf_sensitivity(
continuum_sensitivity,
calculator_input_pss,
pulsar_mode,
)
if pulsar_mode == PulsarMode.FOLDED_PULSE:
return PSSSensitivityResponse(
folded_pulse_sensitivity=pss_sensitivity, warnings=warnings
)
elif pulsar_mode == PulsarMode.SINGLE_PULSE:
return PSSSensitivityResponse(
single_pulse_sensitivity=pss_sensitivity, warnings=warnings
)
else:
raise ValueError(f"Unsupported pulsar mode: {pulsar_mode}")
[docs]
def get_subarray_response():
"""
return the appropriate subarray objects
"""
return [
{
"name": subarray.name,
"label": subarray.label,
"n_stations": subarray.n_stations,
}
for subarray in subarray_storage.list()
]
def _get_calculation_value(
calculator_input: CalculatorInput, warnings: Optional[list] = None
) -> Quantity:
result = calculate_sensitivity(calculator_input)
sensitivity = Quantity(result.sensitivity, result.units)
if warnings is not None:
_check_for_warning(calculator_input, warnings)
return sensitivity
def _check_for_warning(calculator_input: CalculatorInput, warnings: list[str]) -> None:
mwa_cat = BrightSourceCatalog(threshold_jy=FLUX_DENSITY_THRESHOLD_JY)
if mwa_cat.check_for_bright_sources(
calculator_input.pointing_centre, calculator_input.freq_centre_mhz
):
warnings.append(
(
"The specified pointing contains at least one source brighter "
+ f"than {FLUX_DENSITY_THRESHOLD_JY} Jy. Your observation may be "
+ "dynamic range limited."
)
)
def _num_stations_from_input(user_input: dict) -> int:
"""
If the user has given a subarray_configuration, extract the num_stations from that.
Otherwise, use the value given by the user.
Validation has checked that one and only on of these fields is present in the input.
:param user_input: a dict of the parameters given by the user
:return: the num_stations to use in the calculation
"""
if "subarray_configuration" in user_input:
subarray = subarray_storage.load_by_name(user_input["subarray_configuration"])
return subarray.n_stations
return user_input["num_stations"]
def _get_subband_sensitivities(
user_input: ContinuumCalculatorAndWeightingInput, num_stations: int
) -> List[SubbandResponse]:
if user_input.n_subbands == 1 and (
user_input.subband_freq_centres is None
or len(user_input.subband_freq_centres) == 0
):
return []
subband_freq_centres_hz, subband_bandwidth = get_subbands(
user_input.n_subbands,
user_input.freq_centre.to(u.MHz).value,
user_input.bandwidth_mhz,
)
return [
SubbandResponse(
subband_freq_centre=Quantity(subband_frequency, "MHz"),
sensitivity=_get_calculation_value(
CalculatorInput(
freq_centre_mhz=subband_frequency,
bandwidth_mhz=subband_bandwidth,
num_stations=num_stations,
pointing_centre=user_input.pointing_centre,
integration_time_h=user_input.integration_time_h,
elevation_limit=user_input.elevation_limit,
),
),
)
for subband_frequency in subband_freq_centres_hz
]