"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
import copy
import dataclasses
from copy import deepcopy
from dataclasses import asdict, dataclass, field
from math import isnan, nan
from typing import Optional, TypedDict, Union
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.units import Quantity
from ska_ost_senscalc.common.model import (
CalculatorInputPSS,
ContinuumIntegrationTimeTransformedResults,
ContinuumRequest,
ContinuumSensitivitiesTransformationsInput,
ContinuumSensitivityTransformedResults,
ContinuumWeightingRequestParams,
IntegrationTimeTransformationsWithoutWeightingInput,
PssRequestMid,
PSSSensitivityResponse,
PulsarMode,
SensitivitiesTransformationsWithoutWeightingInput,
SensitivityTransformationsSubbandsInput,
SubbandsBeamSizeTransformedResults,
SubbandsContinuumIntegrationTimeTransformedResults,
SubbandsContinuumSensitivityTransformedResults,
SubbandsItemsTransformedResults,
Weighting,
WeightingSpectralMode,
ZoomRequestPrepared,
)
from ska_ost_senscalc.common.pss import convert_continuum_to_bf_sensitivity
from ska_ost_senscalc.common.service import (
ContinuumWeightingResponse,
SingleZoomWeightingResponse,
calculate_continuum_integration_time_results_without_weighting,
calculate_continuum_results_without_weighting,
calculate_continuum_sensitivities_subbands_results,
calculate_continuum_sensitivity,
get_confusion_noise,
get_continuum_weighting_response,
get_subbands,
get_surface_brightness_sensitivity,
get_synthesized_beam_size,
get_total_sensitivity,
get_weighted_sensitivity,
get_zoom_weighting_response,
is_beam_non_gaussian,
sensitivity_limit_reached,
sensitivity_on_unit,
thermal_sensitivity,
)
from ska_ost_senscalc.common.spectropolarimetry import (
SpectropolarimetryInput,
SpectropolarimetryResults,
get_spectropolarimetry_results,
)
from ska_ost_senscalc.mid.calculator import (
calculate_integration_time,
calculate_sensitivity,
prepare_integration_input,
prepare_sensitivity_input,
)
from ska_ost_senscalc.mid.models import (
ThermalData,
ZoomIntegrationTimeResult,
ZoomSensitivityResult,
)
from ska_ost_senscalc.mid.validation import (
MID_CONTINUUM_CHANNEL_WIDTH_KHZ,
validate_and_set_defaults_for_continuum,
)
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope
subarray_storage = SubarrayStorage(Telescope.MID)
# TODO: Move these models to models.py
[docs]
class SingleSubbandResponse(TypedDict):
subband_freq_centre: Quantity
sensitivity: Optional[Quantity]
integration_time: Optional[Quantity]
[docs]
class ContinuumSensitivityResponse(TypedDict):
"""
Typed dictionary constrained to match the OpenAPI schema for the
response body of a single continuum sensitivity calculation.
"""
continuum_sensitivity: Optional[Quantity]
continuum_integration_time: Optional[Quantity]
spectral_sensitivity: Optional[Quantity]
spectral_integration_time: Optional[Quantity]
continuum_subband_sensitivities: Optional[list[SingleSubbandResponse]]
continuum_subband_integration_times: Optional[list[SingleSubbandResponse]]
spectropolarimetry_results: SpectropolarimetryResults
[docs]
@dataclass
class SingleZoomBaseResponse:
"""
Base data class constrained to match the OpenAPI schema for the
response body of a single zoom sensitivity or integration time calculation.
"""
freq_centre: Quantity
spectropolarimetry_results: SpectropolarimetryResults
warnings: list[str] = field(default_factory=list)
[docs]
@dataclass
class SingleZoomSensitivityResponse(SingleZoomBaseResponse):
"""
Dataclass constrained to match the OpenAPI schema for the
response body of a single zoom sensitivity calculation.
"""
spectral_sensitivity: Quantity = None
[docs]
@dataclass
class SingleZoomIntegrationTimeResponse(SingleZoomBaseResponse):
"""
Dataclass constrained to match the OpenAPI schema for the
response body of a single zoom integration time calculation.
"""
spectral_integration_time: Quantity = None
[docs]
class ContinuumCalculateResponse(ContinuumSensitivityResponse):
continuum_weighting: ContinuumWeightingResponse
spectral_weighting: ContinuumWeightingResponse
transformed_result: ContinuumSensitivityTransformedResults | ContinuumSensitivityTransformedResults
[docs]
def calculate_pss_sensitivity(params: PssRequestMid) -> Quantity:
"""
Transform the continuum sensitivity to PSS sensitivity for a given set of
PSS-specific input parameters.
"""
# ToDo: Find a better way to estimate the total number of dishes in a subarray
subarray = next(
(
sub_array
for sub_array in get_subarray_response()
if sub_array["label"] == params.subarray_configuration
),
None,
)
if subarray is None:
raise ValueError(
f"Subarray '{params.subarray_configuration}' not found in response."
)
num_stations = subarray["n_ska"] + subarray["n_meer"]
# # Finally, convert the continuum sensitivity to folded-pulse sensitivity
# # Create a CalculatorInputPSS object.
# # Note that, for PSS single-pulse mode, the integration time
# # should be set to PSS sampling time
calculator_input_pss = CalculatorInputPSS(
freq_centre_mhz=params.freq_centre_hz * u.Hz.to(u.MHz),
bandwidth_mhz=params.bandwidth_hz * u.Hz.to(u.MHz),
chan_width=params.spectral_resolution_hz,
num_stations=num_stations,
integration_time_h=params.integration_time_s * u.s.to(u.hour),
dm=params.dm,
pulse_period=params.pulse_period,
intrinsic_pulse_width=params.intrinsic_pulse_width,
)
continuum_kwargs = copy.deepcopy(params)
continuum_kwargs.weighting_mode = (
Weighting.NATURAL
) # Change weighting scheme to natural
continuum_kwargs = asdict(continuum_kwargs)
# Next, calculate the continuum sensitivity using these input parameters assuming
# natural weighting scheme.
# Remove pulsar_mode, "spectral_resolution_hz", "dm", "pulse_period", and
# "intrinsic_pulse_width" from continuum_kwargs to make ContinuumRequest happy
for key in [
"pulsar_mode",
"dm",
"pulse_period",
"intrinsic_pulse_width",
"spectral_resolution_hz",
]:
continuum_kwargs.pop(key, None)
continuum_params = ContinuumRequest(**continuum_kwargs)
validated_params = validate_and_set_defaults_for_continuum(continuum_params)
continuum_result = convert_continuum_input_and_calculate(validated_params)
pulsar_mode = PulsarMode(params.pulsar_mode)
# convert_continuum_to_bf_sensitivity() expects the continuum sensitivity
# to be in Jy/beam. So, divide by u.beam
pss_sensitivity = convert_continuum_to_bf_sensitivity(
continuum_result["calculate"]["continuum_sensitivity"] / u.beam,
calculator_input_pss,
pulsar_mode,
)
if pulsar_mode == PulsarMode.FOLDED_PULSE:
return PSSSensitivityResponse(folded_pulse_sensitivity=pss_sensitivity)
elif pulsar_mode == PulsarMode.SINGLE_PULSE:
return PSSSensitivityResponse(single_pulse_sensitivity=pss_sensitivity)
else:
raise ValueError(f"Unsupported pulsar mode: {pulsar_mode}.")
# TODO: consider renaming the function to make it something like continuum_result
def _continuum_weightings(params: ContinuumRequest) -> tuple:
msg = "Number of subbands does not match their frequencies centres"
if len(params.subband_freq_centres) != 0:
# TODO: AK -> this looks like it is never needed and in fact a duplication of subband_freq_centres_hz.
# TODO: We should remove in next ticket/MR.
pass
no_subbands = params.n_subbands == 1 and len(params.subband_freq_centres_hz) == 0
some_subbands = (
params.n_subbands > 1
and len(params.subband_freq_centres_hz) == params.n_subbands
)
assert no_subbands or some_subbands, msg
# prepare weighting as we need them for both integration/sensitivity
# TODO: add checking for custom for not sending weighting request
# we only get weighting results if not a custom subarray
continuum_weighting = None
spectral_weighting = None
if params.subarray_configuration is not None:
weighting_request_params = ContinuumWeightingRequestParams(
spectral_mode=WeightingSpectralMode.CONTINUUM,
telescope=params.telescope,
weighting_mode=params.weighting_mode,
subarray_configuration=params.subarray_configuration,
dec=params.pointing_centre.dec,
freq_centre=params.freq_centre,
taper=params.taper,
robustness=params.robustness,
subband_freq_centres=params.subband_freq_centres_hz,
)
continuum_weighting = get_continuum_weighting_response(weighting_request_params)
# Spectural/Line Weighting
spectral_weighting_request_params = deepcopy(weighting_request_params)
spectral_weighting_request_params.spectral_mode = WeightingSpectralMode.LINE
spectral_weighting = get_continuum_weighting_response(
spectral_weighting_request_params
)
return continuum_weighting, spectral_weighting
def _calculate_continuum_integration_time(
params: ContinuumRequest,
spectral_params: ContinuumRequest,
continuum_weighting: ContinuumWeightingResponse,
spectral_weighting: ContinuumWeightingResponse,
result: dict,
) -> tuple:
# prepare thermal sensitivity
continuum_sensitivity = sensitivity_on_unit(
params.supplied_sensitivity,
continuum_weighting.sbs_conv_factor,
params.sensitivity_unit,
)
continuum_confusion_noise = Quantity(
_adjust_confusion_noise(params, continuum_weighting.confusion_noise.value),
u.Jy / u.beam,
)
continuum_thermal_sensitivity = thermal_sensitivity(
continuum_sensitivity,
continuum_weighting.weighting_factor,
continuum_confusion_noise.value,
)
error_message = _check_sensitivity_limit(
continuum_sensitivity, continuum_confusion_noise
)
if error_message:
# and not is_beam_non_gaussian(params.weighting_mode, params.robustness, params.taper):
result["warnings"] = [error_message]
return result, continuum_thermal_sensitivity, error_message
# prepare thermal sensitivity for spectral
spectral_sensitivity = sensitivity_on_unit(
params.supplied_sensitivity,
spectral_weighting.sbs_conv_factor,
params.sensitivity_unit,
)
spectral_thermal_sensitivity = thermal_sensitivity(
spectral_sensitivity,
spectral_weighting.weighting_factor,
continuum_confusion_noise.value,
)
integration_input = prepare_integration_input(
continuum_thermal_sensitivity.to(u.Jy).value, **asdict(params)
)
integration_time = calculate_integration_time(integration_input).to(u.s)
result["continuum_integration_time"] = integration_time[0]
# Calculate spectral integration time using resolution for bandwidth
integration_input = prepare_integration_input(
spectral_thermal_sensitivity.to(u.Jy).value,
**asdict(spectral_params),
)
spectral_integration_time = calculate_integration_time(integration_input).to(u.s)
result["spectral_integration_time"] = spectral_integration_time[0]
return result, continuum_thermal_sensitivity, error_message
def _calculate_continuum_integration_time_custom(
params: ContinuumRequest, spectral_params: ContinuumRequest, result: dict
) -> dict:
# For custom subarrays, we don't calculate a thermal sensitivity, we do the calculation with the supplied sensitivity
custom_continuum_integration_time = _calculate_custom_integration_time(
params.supplied_sensitivity, params
)
result["continuum_integration_time"] = custom_continuum_integration_time[0]
custom_spectral_integration_time = _calculate_custom_integration_time(
params.supplied_sensitivity, spectral_params
)
result["spectral_integration_time"] = custom_spectral_integration_time[0]
return result
def _calculate_continuum_subbands(
params: ContinuumRequest,
continuum_weighting: ContinuumWeightingResponse,
warnings: list[str],
result: dict,
) -> dict:
# Subbands - if subbands is 1 then we just return the main sensitivity calculation as the subband is the whole bandwidth
if params.n_subbands > 1:
subband_results = []
subband_freq_centres_hz, subband_bandwidth = get_subbands(
params.n_subbands, params.freq_centre_hz, params.bandwidth_hz
)
if integration_time_s := params.integration_time_s:
for subband_freq_centre_hz in subband_freq_centres_hz:
# Create the calculator for each subband result
subband_params = deepcopy(params)
subband_params.freq_centre_hz = subband_freq_centre_hz
subband_params.bandwidth_hz = subband_bandwidth
sensitivity_input = prepare_sensitivity_input(
integration_time_s, **asdict(subband_params)
)
sensitivity = calculate_sensitivity(sensitivity_input).to(u.Jy)
subband_results.append(
{
"subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"),
"sensitivity": sensitivity[0],
}
)
result["continuum_subband_sensitivities"] = subband_results
else:
if params.subarray_configuration is not None:
subband_result = _calculate_supplied_sensitivity_subbands(
params,
continuum_weighting,
warnings,
subband_freq_centres_hz,
subband_bandwidth,
)
result["continuum_subband_integration_times"] = subband_result
else:
custom_subband_result = _calculate_supplied_sensitivity_subbands_custom(
params, subband_bandwidth
)
result["continuum_subband_integration_times"] = custom_subband_result
return result
def _calculate_supplied_sensitivity_subbands(
params: ContinuumRequest,
continuum_weighting: ContinuumWeightingResponse,
warnings: list[str],
subband_freq_centres_hz: list[float],
subband_bandwidth: float,
) -> list[dict]:
"""
Calculates the subbands for calculate results supplied sensitivity.
:param params: the request parameters.
:type params: data class ContinuumRequest
:param continuum_weighting: the continuum weighting results.
:type continuum_weighting: ContinuumWeightingResponse
:param warnings: the calculate warnings.
:type warnings: list of strings
:param subband_freq_centres_hz: the subband frequency centres.
:type subband_freq_centres_hz: list of floats.
:param subband_bandwidth: the subband bandwidth.
:type subband_bandwidth: float
:return: the subbands results.
:rtype: a list of dictionaries
"""
# populate thermal array since we need to check first confusion noise
subband_thermal_sensitivities = []
subbands = []
if params.subband_supplied_sensitivities:
for sensitivity in params.subband_supplied_sensitivities:
subband_thermal_sensitivities.append(Quantity(sensitivity, u.Jy))
else:
for weighting in continuum_weighting.subbands:
continuum_confusion_noise = Quantity(
_adjust_confusion_noise(params, weighting.confusion_noise.value),
u.Jy / u.beam,
)
subband_sensitivities = sensitivity_on_unit(
params.supplied_sensitivity,
weighting.sbs_conv_factor,
params.sensitivity_unit,
)
subband_thermal_sensitivities.append(
thermal_sensitivity(
subband_sensitivities,
weighting.weighting_factor,
continuum_confusion_noise.value,
)
)
first_subband_confusion_noise = continuum_weighting.subbands[
0
].confusion_noise.value
if first_subband_confusion_noise >= subband_thermal_sensitivities[0].value:
warnings.append(
_sens_limit_error(
first_subband_confusion_noise, subband_thermal_sensitivities[0]
)
)
for subband_freq_centre_hz, subband_supplied_sensitivity in zip(
subband_freq_centres_hz, subband_thermal_sensitivities
):
# Create the calculator for each subband result
subband_params = deepcopy(params)
subband_params.freq_centre_hz = subband_freq_centre_hz
subband_params.bandwidth_hz = subband_bandwidth
integration_input = prepare_integration_input(
subband_supplied_sensitivity, **asdict(subband_params)
)
integration_time = calculate_integration_time(integration_input).to(u.s)
subbands.append(
{
"subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"),
"integration_time": integration_time[0],
}
)
return subbands
def _calculate_supplied_sensitivity_subbands_custom(
params: ContinuumRequest, subband_bandwidth: float
) -> list[dict]:
"""
Calculates the subbands for calculate results supplied sensitivity in the case of a custom subarray.
When a subarray is provided, the subbands are calculated using thermal sensitivities, which is not possible
for custom subarrays. Instead, the subbands are calculated using the supplied sensitivities.
:param params: the request parameters.
:type params: data class ContinuumRequest
:param subband_bandwidth: the subband bandwidth.
:type subband_bandwidth: float
:return: the subbands results.
:rtype: a list of dictionaries
"""
subbands = []
subband_supplied_sensitivities_jy = [
Quantity(sensitivity, params.subband_supplied_sensitivities_unit)
.to(u.Jy / u.beam)
.value
for sensitivity in params.subband_supplied_sensitivities
]
for subband_freq_centre_hz, subband_sensitivity_jy in zip(
params.subband_freq_centres_hz,
subband_supplied_sensitivities_jy,
):
# Create the calculator for each subband result
subband_params = deepcopy(params)
subband_params.freq_centre_hz = subband_freq_centre_hz
subband_params.bandwidth_hz = subband_bandwidth
integration_input = prepare_integration_input(
subband_sensitivity_jy, **asdict(subband_params)
)
integration_time = calculate_integration_time(integration_input).to(u.s)
subbands.append(
{
"subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"),
"integration_time": integration_time[0],
}
)
return subbands
def _continuum_transformed_result(
params: ContinuumRequest,
continuum_weighting: ContinuumWeightingResponse,
spectral_weighting: ContinuumWeightingResponse,
continuum_thermal_sensitivity: Quantity,
continuum_sensitivity: Quantity,
spectral_sensitivity: Quantity,
calculation: dict,
warnings: list[str],
non_gaussiam_beam: bool,
error_message: Union[bool, str] = False,
) -> dict:
if params.subarray_configuration is not None:
if params.integration_time_s:
if non_gaussiam_beam:
transformed_result = _continuum_non_gaussian_sensitivity(
calculation=calculation,
continuum_weighting=continuum_weighting,
spectral_weighting=spectral_weighting,
warnings=warnings,
)
else:
transformed_result = _continuum_gaussian_sensitivity(
calculation=calculation,
continuum_thermal_sensitivity=continuum_thermal_sensitivity,
continuum_weighting=continuum_weighting,
spectral_weighting=spectral_weighting,
warnings=warnings,
params=params,
)
else:
if non_gaussiam_beam:
transformed_result = _continuum_integration_time_non_gaussian(
calculation=calculation,
warnings=warnings,
)
else:
transformed_result = _continuum_integration_time(
continuum_thermal_sensitivity=continuum_thermal_sensitivity,
calculation=calculation,
continuum_weighting=continuum_weighting,
spectral_weighting=spectral_weighting,
error_message=error_message,
warnings=warnings,
)
else:
if params.integration_time_s:
transformed_result = _continuum_transformed_result_custom_sensitivity(
continuum_thermal_sensitivity=continuum_thermal_sensitivity,
continuum_sensitivity=continuum_sensitivity,
spectral_sensitivity=spectral_sensitivity,
calculation=calculation,
)
elif params.supplied_sensitivity:
transformed_result = _continuum_transformed_result_custom_integration_time(
calculation
)
if type(transformed_result) != dict:
transformed_result = asdict(transformed_result)
return transformed_result
def _continuum_transformed_result_custom_sensitivity(
continuum_thermal_sensitivity: Quantity,
continuum_sensitivity: Quantity,
spectral_sensitivity: Quantity,
calculation: dict,
) -> dict:
continuum_sensitivities_without_weighting_input = (
SensitivitiesTransformationsWithoutWeightingInput(
continuum_sensitivity=continuum_sensitivity,
spectral_sensitivity=spectral_sensitivity,
)
)
transformed_result = calculate_continuum_results_without_weighting(
continuum_sensitivities_without_weighting_input,
warnings=[],
)
transformed_result = asdict(transformed_result)
subbands = {}
if calculate_subbands := calculation.get("continuum_subband_sensitivities"):
subbands_data = SensitivityTransformationsSubbandsInput(
calculate_subbands=calculate_subbands,
continuum_thermal_sensitivity=continuum_thermal_sensitivity,
)
subbands = calculate_continuum_sensitivities_subbands_results(
subbands_data, non_gaussian=False, is_custom=True, warnings=[]
)
subbands = asdict(subbands)
transformed_result["subbands"] = subbands
return transformed_result
def _continuum_transformed_result_custom_integration_time(
calculation: dict,
) -> dict:
continuum_integration_time_without_weighting_input = (
IntegrationTimeTransformationsWithoutWeightingInput(
spectral_integration_time=calculation["spectral_integration_time"],
continuum_integration_time=calculation["continuum_integration_time"],
)
)
transformed_result = asdict(
calculate_continuum_integration_time_results_without_weighting(
continuum_integration_time_without_weighting_input,
warnings=[],
)
)
subbands = {}
# get custom subbands weighted results
if calculation.get("continuum_subband_integration_times"):
weighted_subbands_input = SensitivityTransformationsSubbandsInput(
calculate_subbands=calculation["continuum_subband_integration_times"],
)
subbands = asdict(
_calculate_continuum_integration_time_subbands_results_custom(
weighted_subbands_input, warnings=[]
)
)
transformed_result["subbands"] = subbands
return transformed_result
def _continuum_integration_time_non_gaussian(
calculation: dict,
warnings: list[str],
) -> dict:
continuum_integration_time = calculation["continuum_integration_time"]
spectral_integration_time = calculation["spectral_integration_time"]
subbands = {}
if calculation.get("continuum_subband_integration_times"):
max_time = calculation["continuum_subband_integration_times"][0][
"integration_time"
]
min_time = calculation["continuum_subband_integration_times"][-1][
"integration_time"
]
subbands = asdict(
SubbandsContinuumIntegrationTimeTransformedResults(
integration_time_per_subband=SubbandsItemsTransformedResults(
min_value=min_time, max_value=max_time
)
)
)
result = ContinuumIntegrationTimeTransformedResults(
continuum_integration_time=continuum_integration_time,
spectral_integration_time=spectral_integration_time,
warnings=warnings,
)
result = asdict(result)
result["subbands"] = subbands
return result
def _continuum_non_gaussian_sensitivity(
calculation: dict,
continuum_weighting: ContinuumWeightingResponse,
spectral_weighting: ContinuumWeightingResponse,
warnings: list[str],
) -> dict:
continuum_senstivity = (
calculation["continuum_sensitivity"] * continuum_weighting.weighting_factor
)
spectral_sensitivity = (
calculation["spectral_sensitivity"] * spectral_weighting.weighting_factor
)
subbands = {}
if calculation.get("continuum_subband_sensitivities"):
max_sensitivity = calculation["continuum_subband_sensitivities"][0][
"sensitivity"
]
min_sensitivity = calculation["continuum_subband_sensitivities"][-1][
"sensitivity"
]
subbands = asdict(
SubbandsContinuumSensitivityTransformedResults(
weighted_sensitivity_per_subband=SubbandsItemsTransformedResults(
min_value=u.Quantity(min_sensitivity.value, u.Jy / u.beam),
max_value=u.Quantity(max_sensitivity.value, u.Jy / u.beam),
)
)
)
result = ContinuumSensitivityTransformedResults(
weighted_continuum_sensitivity=u.Quantity(
continuum_senstivity.value, u.Jy / u.beam
),
weighted_spectral_sensitivity=u.Quantity(
spectral_sensitivity.value, u.Jy / u.beam
),
warnings=warnings,
)
result = asdict(result)
result["subbands"] = subbands
return result
def _continuum_gaussian_sensitivity(
calculation: dict,
continuum_thermal_sensitivity,
continuum_weighting: ContinuumWeightingResponse,
spectral_weighting: ContinuumWeightingResponse,
warnings: list[str],
params: ContinuumRequest,
) -> dict:
# integration time -> sensitivity
result = calculate_continuum_sensitivity(
ContinuumSensitivitiesTransformationsInput(
continuum_sensitivity=calculation["continuum_sensitivity"],
spectral_sensitivity=calculation["spectral_sensitivity"],
continuum_weighting_factor=continuum_weighting.weighting_factor,
spectral_weighting_factor=spectral_weighting.weighting_factor,
continuum_sbs_conv_factor=continuum_weighting.sbs_conv_factor,
spectral_sbs_conv_factor=spectral_weighting.sbs_conv_factor,
continuum_conf_noise=continuum_weighting.confusion_noise.value,
spectral_conf_noise=spectral_weighting.confusion_noise.value,
continuum_beam_min=continuum_weighting.beam_size.beam_min_scaled,
continuum_beam_maj=continuum_weighting.beam_size.beam_maj_scaled,
spectral_beam_min=spectral_weighting.beam_size.beam_min_scaled,
spectral_beam_maj=spectral_weighting.beam_size.beam_maj_scaled,
),
warnings,
params.weighting_mode,
params.robustness,
)
subbands = {}
if calculate_subbands := calculation.get("continuum_subband_sensitivities"):
weighting_subbands = continuum_weighting.subbands
subbands_data = SensitivityTransformationsSubbandsInput(
calculate_subbands=calculate_subbands,
weighting_subbands=weighting_subbands,
continuum_thermal_sensitivity=continuum_thermal_sensitivity,
)
subbands = calculate_continuum_sensitivities_subbands_results(
subbands_data,
non_gaussian=is_beam_non_gaussian(
params.weighting_mode,
robustness=params.robustness,
),
is_custom=False,
warnings=[],
)
subbands = asdict(subbands)
result = asdict(result)
result["subbands"] = subbands
return result
def _continuum_integration_time_subbands(
continuum_thermal_sensitivity,
calculation,
continuum_weighting: ContinuumWeightingResponse,
confusion_noises,
spectral_confusion_noise,
) -> dict:
subbands = {}
if calculation.get("continuum_subband_integration_times"):
assert len(continuum_weighting.subbands) > 0
confusion_noises.append(continuum_weighting.subbands[0].confusion_noise.value)
confusion_noises.append(continuum_weighting.subbands[-1].confusion_noise.value)
max_confusion_noise = max(confusion_noises)
warnings = []
if sensitivity_limit_reached(
max_confusion_noise, continuum_thermal_sensitivity
):
msg = _sens_limit_error(spectral_confusion_noise)
warnings.append(msg)
weighted_subbands_input = SensitivityTransformationsSubbandsInput(
calculate_subbands=calculation["continuum_subband_integration_times"],
weighting_subbands=continuum_weighting.subbands,
continuum_thermal_sensitivity=continuum_thermal_sensitivity,
)
subbands = asdict(
_calculate_continuum_integration_time_subbands_results(
weighted_subbands_input, warnings
)
)
return subbands
def _calculate_custom_integration_time(
supplied_sensitivity: float, params: ContinuumRequest
) -> Quantity:
supplied_sensitivity_jy = Quantity(
supplied_sensitivity, params.sensitivity_unit
).to(u.Jy / u.beam)
integration_input = prepare_integration_input(
supplied_sensitivity_jy.value, **asdict(params)
)
return calculate_integration_time(integration_input)
def _continuum_integration_time(
continuum_thermal_sensitivity: Quantity,
calculation: dict,
continuum_weighting: ContinuumWeightingResponse,
spectral_weighting: ContinuumWeightingResponse,
error_message: Union[bool, str],
warnings: list[str] = None,
) -> dict:
continuum_confusion_noise = get_confusion_noise(
confusion_noise=continuum_weighting.confusion_noise.value,
maj_beam_size=continuum_weighting.beam_size.beam_maj_scaled,
min_beam_size=continuum_weighting.beam_size.beam_min_scaled,
)
continuum_synthesized_beam_size = get_synthesized_beam_size(
continuum_weighting.beam_size.beam_min_scaled,
continuum_weighting.beam_size.beam_maj_scaled,
)
continuum_integration_time = calculation.get("continuum_integration_time")
spectral_confusion_noise = get_confusion_noise(
confusion_noise=spectral_weighting.confusion_noise.value,
maj_beam_size=spectral_weighting.beam_size.beam_maj_scaled,
min_beam_size=spectral_weighting.beam_size.beam_min_scaled,
)
spectral_synthesized_beam_size = get_synthesized_beam_size(
spectral_weighting.beam_size.beam_min_scaled,
spectral_weighting.beam_size.beam_maj_scaled,
)
spectral_integration_time = calculation.get("spectral_integration_time")
transformed_result = asdict(
ContinuumIntegrationTimeTransformedResults(
continuum_confusion_noise=continuum_confusion_noise,
continuum_synthesized_beam_size=continuum_synthesized_beam_size,
continuum_integration_time=continuum_integration_time,
spectral_confusion_noise=spectral_confusion_noise,
spectral_synthesized_beam_size=spectral_synthesized_beam_size,
spectral_integration_time=spectral_integration_time,
)
)
# For sensitivity -> integration time we need to compare the entered (total) sensitivity with the up to
# two confusion noise values that might be shown (full bandwidth, two confusion-noise values and spectral)
confusion_noises = [
continuum_weighting.confusion_noise.value,
spectral_weighting.confusion_noise.value,
]
max_confusion_noise = max(confusion_noises)
if sensitivity_limit_reached(max_confusion_noise, continuum_thermal_sensitivity):
msg = _sens_limit_warning(spectral_confusion_noise)
warnings.append(msg)
if error_message:
transformed_result["subbands"] = {}
return transformed_result
subbands = _continuum_integration_time_subbands(
continuum_thermal_sensitivity,
calculation=calculation,
continuum_weighting=continuum_weighting,
confusion_noises=confusion_noises,
spectral_confusion_noise=spectral_confusion_noise,
)
transformed_result["subbands"] = subbands
return transformed_result
def _calculate_continuum_integration_time_subbands_results(
subbands_input: SensitivityTransformationsSubbandsInput,
warnings: list[str],
) -> SubbandsContinuumIntegrationTimeTransformedResults:
"""
Uses an input containing values from Calculator and Weighting subbands to generate weighted subbands results to be displayed in the UI.
:param subbands_input: the subband values from the calculator and weighting.
:type subbands_input: data class SensitivityTransformationsSubbandsInput
:param warnings: the list of warnings to be returned.
:type warnings: List of string
:return: a data class object representing the subband weighted results.
:rtype: SubbandsContinuumIntegrationTimeTransformedResults
"""
# TODO: add tests for this function
# TODO: check if this function is used anywhere
max_time = subbands_input.calculate_subbands[0]["integration_time"]
min_time = subbands_input.calculate_subbands[-1]["integration_time"]
max_synthesized_beam = get_synthesized_beam_size(
maj_beam_size=subbands_input.weighting_subbands[0].beam_size.beam_maj_scaled,
min_beam_size=subbands_input.weighting_subbands[0].beam_size.beam_min_scaled,
)
min_synthesized_beam = get_synthesized_beam_size(
maj_beam_size=subbands_input.weighting_subbands[-1].beam_size.beam_maj_scaled,
min_beam_size=subbands_input.weighting_subbands[-1].beam_size.beam_min_scaled,
)
confusion_noise_per_subband = SubbandsItemsTransformedResults(
max_value=subbands_input.weighting_subbands[0].confusion_noise.value * u.Jy,
min_value=subbands_input.weighting_subbands[-1].confusion_noise.value * u.Jy,
)
synthesized_beam_size_per_subband = SubbandsBeamSizeTransformedResults(
max_value=max_synthesized_beam, min_value=min_synthesized_beam
)
integration_time_per_subband = SubbandsItemsTransformedResults(
max_value=max_time, min_value=min_time
)
# handle warnings
compare_max_confusion_noise_jy = [
subbands_input.weighting_subbands[0].confusion_noise.value,
subbands_input.weighting_subbands[-1].confusion_noise.value,
]
max_confusion_noise_jy = max(compare_max_confusion_noise_jy)
if sensitivity_limit_reached(
max_confusion_noise_jy, subbands_input.continuum_thermal_sensitivity
):
warning = _sens_limit_error(max_confusion_noise_jy)
warnings.append(warning)
return SubbandsContinuumIntegrationTimeTransformedResults(
confusion_noise_per_subband=confusion_noise_per_subband,
synthesized_beam_size_per_subband=synthesized_beam_size_per_subband,
integration_time_per_subband=integration_time_per_subband,
warnings=warnings,
)
def _calculate_continuum_integration_time_subbands_results_custom(
subbands_input: SensitivityTransformationsSubbandsInput,
warnings: list[str],
) -> SubbandsContinuumIntegrationTimeTransformedResults:
"""
Uses an input containing values from Calculator and Weighting subbands to generate weighted subbands results to be displayed in the UI.
:param subbands_input: the subband values from the calculator and no weighting.
:type subbands_input: data class SensitivityTransformationsSubbandsInput
:param warnings: the list of warnings to be returned.
:type warnings: List of string
:return: a data class object representing the subband weighted results.
:rtype: SubbandsContinuumIntegrationTimeTransformedResults
"""
max_time = subbands_input.calculate_subbands[0]["integration_time"]
min_time = subbands_input.calculate_subbands[-1]["integration_time"]
integration_time_per_subband = SubbandsItemsTransformedResults(
max_value=max_time, min_value=min_time
)
# There are no warnings for this mode (custom) as there is no confusion noise
return SubbandsContinuumIntegrationTimeTransformedResults(
integration_time_per_subband=integration_time_per_subband,
warnings=warnings,
)
[docs]
def get_zoom_calculate_response(
params: ZoomRequestPrepared,
) -> dict:
"""
Extract the params from the request, convert them into the relevant calculator inputs,
perform the calculations and collect the results into the response body.
"""
# Parse the target
target = SkyCoord(
params.pointing_centre,
frame="icrs",
unit=(u.hourangle, u.deg),
)
params.target = target
calculations = []
warnings = []
if integration_time_s := params.integration_time_s:
for zoom_freq_centre_hz, zoom_spectral_resolution_hz, total_bandwidth_hz in zip(
params.freq_centres_hz,
params.spectral_resolutions_hz,
params.total_bandwidths_hz,
):
zoom_params = dataclasses.replace(params)
zoom_params.freq_centre_hz = zoom_freq_centre_hz
effective_resolution_hz = (
zoom_spectral_resolution_hz * params.spectral_averaging_factor
)
zoom_params.bandwidth_hz = effective_resolution_hz
sensitivity_input = prepare_sensitivity_input(
integration_time_s, **asdict(zoom_params)
)
sensitivity = calculate_sensitivity(sensitivity_input).to(u.Jy)[0]
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=Quantity(total_bandwidth_hz, "Hz"),
frequency=Quantity(zoom_freq_centre_hz, "Hz"),
effective_channel_width=Quantity(effective_resolution_hz, "Hz"),
)
spectropolarimetry_results = get_spectropolarimetry_results(
spectropolarimetry_input
)
single = SingleZoomSensitivityResponse(
freq_centre=zoom_freq_centre_hz * u.Hz,
spectral_sensitivity=sensitivity,
spectropolarimetry_results=spectropolarimetry_results,
)
calculations.append(asdict(single))
# We need them early for sensitivities calculations
weightings = get_zoom_weighting_response(params)
thermals = []
if params.supplied_sensitivities:
assert (
len(params.freq_centres_hz)
== len(params.spectral_resolutions_hz)
== len(params.supplied_sensitivities)
== len(params.total_bandwidths_hz)
)
for i in range(len(params.freq_centres_hz)):
zoom_freq_centre_hz = params.freq_centres_hz[i]
zoom_spectral_resolution_hz = params.spectral_resolutions_hz[i]
zoom_supplied_sensitivity = params.supplied_sensitivities[i]
total_bandwidth_hz = params.total_bandwidths_hz[i]
try:
weighting = weightings[i]
sensitivity = sensitivity_on_unit(
zoom_supplied_sensitivity,
weighting.sbs_conv_factor,
params.sensitivity_unit,
)
confusion_noise = _adjust_confusion_noise(
params, weighting.confusion_noise.value
)
_thermal_sensitivity = thermal_sensitivity(
sensitivity, weighting.weighting_factor, confusion_noise
)
thermal = ThermalData(
sensitivity,
confusion_noise,
_thermal_sensitivity,
)
thermals.append(thermal)
error_message = _check_sensitivity_limit(
sensitivity, Quantity(confusion_noise, u.Jy / u.beam)
)
if error_message:
calculations.append({"warnings": [error_message]})
continue
except IndexError:
_thermal_sensitivity = zoom_supplied_sensitivity
zoom_params = dataclasses.replace(params)
zoom_params.freq_centre_hz = zoom_freq_centre_hz
effective_resolution_hz = (
zoom_spectral_resolution_hz * params.spectral_averaging_factor
)
zoom_params.bandwidth_hz = effective_resolution_hz
integration_input = prepare_integration_input(
_thermal_sensitivity, **asdict(zoom_params)
)
integration_time = calculate_integration_time(integration_input).to(u.s)[0]
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=Quantity(total_bandwidth_hz, "Hz"),
frequency=Quantity(zoom_freq_centre_hz, "Hz"),
effective_channel_width=Quantity(effective_resolution_hz, "Hz"),
)
spectropolarimetry_results = get_spectropolarimetry_results(
spectropolarimetry_input
)
single = SingleZoomIntegrationTimeResponse(
freq_centre=zoom_freq_centre_hz * u.Hz,
spectral_integration_time=integration_time,
spectropolarimetry_results=spectropolarimetry_results,
warnings=warnings,
)
calculations.append(asdict(single))
transformed_result = _zoom_transformed_result(
params, calculations, weightings, thermals
)
res = {
"calculate": calculations,
"weighting": {"spectral_weighting": [asdict(w) for w in weightings]},
"transformed_result": transformed_result,
}
# TODO: Once agreed, we need only returning required messages, rather than nulls. And for custom array/non Gaussian only.
cleaned_results = _replace_na_fileds(res)
return cleaned_results
def _zoom_transformed_result(
params: ZoomRequestPrepared,
calculations: list[SingleZoomSensitivityResponse],
weightings: list[SingleZoomWeightingResponse],
thermals: list[ThermalData],
) -> ZoomSensitivityResult | ZoomIntegrationTimeResult:
if params.integration_time_s:
if params.subarray_configuration is None:
return _zoom_custom_sensitivity(calculations)
else:
if is_beam_non_gaussian(
params.weighting_mode, params.robustness, params.taper
):
return _zoom_sensitivity_non_gaussian(calculations, weightings)
else:
return _zoom_sensitivity(calculations, weightings)
else:
if params.subarray_configuration is None:
return _zoom_custom_integration(calculations)
else:
warnings = []
calculations = deepcopy(calculations)
weightings = deepcopy(weightings)
if is_beam_non_gaussian(
params.weighting_mode, params.robustness, params.taper
):
res = _zoom_integration_time_non_gaussian(calculations)
else:
res = _zoom_integration_time(calculations, weightings, thermals)
res.extend(warnings)
return res
def _zoom_custom_integration(
calculations: list[SingleZoomSensitivityResponse],
) -> list[ZoomIntegrationTimeResult]:
res = []
for calculation in calculations:
zoom_result = ZoomIntegrationTimeResult(
spectral_integration_time=calculation["spectral_integration_time"],
)
res.append(asdict(zoom_result))
return res
def _zoom_custom_sensitivity(
calculations: list[SingleZoomSensitivityResponse],
) -> list[ZoomSensitivityResult]:
res = []
for calculation in calculations:
zoom_result = ZoomSensitivityResult(
weighted_spectral_sensitivity=u.Quantity(
calculation["spectral_sensitivity"].to(u.Jy).value, u.Jy / u.beam
),
)
res.append(asdict(zoom_result))
return res
def _zoom_sensitivity(
calculations: list[SingleZoomSensitivityResponse],
weightings: list[SingleZoomWeightingResponse],
) -> list[ZoomSensitivityResult]:
res = []
for calculation, weighting in zip(calculations, weightings):
try:
weighted_spectral_sensitivity = get_weighted_sensitivity(
calculation["spectral_sensitivity"],
weighting.weighting_factor,
)
except KeyError:
weighted_spectral_sensitivity = nan
min_beam_size = weighting.beam_size.beam_min_scaled
maj_beam_size = weighting.beam_size.beam_maj_scaled
spectral_confusion_noise = get_confusion_noise(
weighting.confusion_noise.value,
min_beam_size,
maj_beam_size,
)
total_spectral_sensitivity = get_total_sensitivity(
spectral_confusion_noise, weighted_spectral_sensitivity
)
spectral_synthesized_beam_size = get_synthesized_beam_size(
min_beam_size,
maj_beam_size,
)
spectral_surface_brightness_sensitivity = get_surface_brightness_sensitivity(
weighting.sbs_conv_factor,
total_spectral_sensitivity,
)
warnings = []
if sensitivity_limit_reached(
confusion_noise=spectral_confusion_noise,
total_sensitivity=total_spectral_sensitivity,
):
msg = _sens_limit_warning(spectral_confusion_noise)
warnings.append(msg)
zoom_result = ZoomSensitivityResult(
weighted_spectral_sensitivity=weighted_spectral_sensitivity,
warnings=warnings,
spectral_confusion_noise=spectral_confusion_noise,
total_spectral_sensitivity=total_spectral_sensitivity,
spectral_synthesized_beam_size=spectral_synthesized_beam_size,
spectral_surface_brightness_sensitivity=spectral_surface_brightness_sensitivity,
)
res.append(asdict(zoom_result))
return res
def _zoom_sensitivity_non_gaussian(
calculations: list[SingleZoomSensitivityResponse],
weightings: list[SingleZoomWeightingResponse],
) -> list[ZoomSensitivityResult]:
res = []
for calculation, weighting in zip(calculations, weightings):
sensitivity = calculation.get("spectral_sensitivity", nan)
zoom_result = ZoomSensitivityResult(
weighted_spectral_sensitivity=u.Quantity(
(sensitivity.to(u.Jy).value * weighting.weighting_factor), u.Jy / u.beam
),
)
res.append(asdict(zoom_result))
return res
def _zoom_integration_time(
calculations: list[SingleZoomSensitivityResponse],
weightings: list[SingleZoomWeightingResponse],
thermals: list[ThermalData],
) -> list[ZoomIntegrationTimeResult]:
res = []
for calculation, weighting, thermal in zip(calculations, weightings, thermals):
spectral_integration_time = calculation.get("spectral_integration_time")
if spectral_integration_time is None:
res.append({})
else:
min_beam_size = weighting.beam_size.beam_min_scaled
maj_beam_size = weighting.beam_size.beam_maj_scaled
spectral_confusion_noise = get_confusion_noise(
weighting.confusion_noise.value,
min_beam_size,
maj_beam_size,
)
spectral_synthesized_beam_size = get_synthesized_beam_size(
min_beam_size,
maj_beam_size,
)
warnings = []
# TODO: use the new error function here or assess to remove if not needed?
# Maybe it is needed for non-Gaussian beam where the error is just appended?
if sensitivity_limit_reached(spectral_confusion_noise, thermal.sensitivity):
msg = _sens_limit_warning(Quantity(thermal.confusion_noise, u.Jy))
warnings.append(msg)
zoom_result = ZoomIntegrationTimeResult(
spectral_integration_time=spectral_integration_time,
spectral_confusion_noise=spectral_confusion_noise,
spectral_synthesized_beam_size=spectral_synthesized_beam_size,
warnings=warnings,
)
res.append(asdict(zoom_result))
return res
def _confusion_noise_included(params: ZoomRequestPrepared) -> bool:
if params.subarray_configuration is None or is_beam_non_gaussian(
params.weighting_mode, params.robustness, params.taper
):
return False
else:
return True
def _adjust_confusion_noise(
params: ZoomRequestPrepared, confusion_noise: float
) -> float:
if _confusion_noise_included(params):
return confusion_noise
else:
return 0
def _sens_limit_warning(confusion_noise: Quantity) -> str:
return f"Warning: You are approaching the confusion limit ({confusion_noise}) given the synthesized beam-size and frequency."
def _sens_limit_error(confusion_noise: Quantity, sensitivity: Quantity) -> str:
return f"Error: the entered sensitivity ({sensitivity}) is less than or equal to the confusion noise ({confusion_noise})"
def _check_sensitivity_limit(
sensitivity: Quantity, confusion_noise: Quantity
) -> Union[bool, str]:
if sensitivity.value <= confusion_noise.value:
return f"Error: the entered sensitivity ({sensitivity}) is less than or equal to the confusion noise ({confusion_noise})"
return False
def _zoom_integration_time_non_gaussian(
calculations: list[SingleZoomSensitivityResponse],
) -> list[ZoomIntegrationTimeResult]:
res = []
for calculation in calculations:
zoom_result = ZoomIntegrationTimeResult(
spectral_integration_time=calculation["spectral_integration_time"]
)
res.append(asdict(zoom_result))
return res
def _replace_na_fileds(d: dict | list[dict]) -> dict | list:
if type(d) == list:
res = [_replace_na_fileds(elem) for elem in d]
return res
elif type(d) == dict:
res = {}
for k, v in d.items():
cleaned = _replace_na_fileds(v)
if isinstance(cleaned, float) and isnan(cleaned):
res[k] = None
else:
res[k] = cleaned
return res
else:
return d
[docs]
def get_subarray_response():
"""
return the appropriate subarray objects
"""
return [
{
"name": subarray.name,
"label": subarray.label,
"n_ska": subarray.n_ska,
"n_meer": subarray.n_meer,
}
for subarray in subarray_storage.list()
]