"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
import copy
from inspect import signature
from typing import TypedDict
import astropy.units as u
import numpy
from astropy.coordinates import Latitude, SkyCoord
from ska_ost_senscalc.common.beam import calculate_weighting
from ska_ost_senscalc.common.model import Limit, Weighting, WeightingInput
from ska_ost_senscalc.mid.calculator import Calculator
from ska_ost_senscalc.mid.model import MidSpectralMode
from ska_ost_senscalc.subarray import MIDArrayConfiguration, SubarrayStorage
from ska_ost_senscalc.utilities import Telescope
subarray_storage = SubarrayStorage(Telescope.MID)
# TypedDicts to constrain response dicts to match the OpenAPI spec -----------
[docs]class ConfusionNoiseResponse(TypedDict):
"""
ConfusionNoiseResponse is a typed dictionary constrained to match the
schema of a confusion noise JSON object, as contained in the parent JSON
result of a weighting endpoint query.
"""
value: list[float | int]
limit_type: list[str]
[docs]class BeamSizeResponse(TypedDict):
"""
BeamSizeResponse is a typed dictionary constrained to match the schema of
the JSON object outlining the synthesized beam size, as contained in the
parent JSON result of a weighting endpoint query.
"""
beam_maj_scaled: float
beam_min_scaled: float
beam_pa: float
[docs]class SingleWeightingResponse(TypedDict):
"""
SingleWeightingResponse is a typed dictionary constrained to match the
schema of a single weighting calculation, as performed for the main
continuum or zoom weighting calculation and for each subband frequency.
Child SingleWeightingResponse JSON object for each of these calculations
are contained in the parent JSON result
"""
weighting_factor: float | None
sbs_conv_factor: list[float]
confusion_noise: ConfusionNoiseResponse
beam_size: list[BeamSizeResponse]
[docs]class WeightingResponse(SingleWeightingResponse):
"""
WeightingResponse is a typed dictionary constrained to match the schema of
the main result of a weighting endpoint query.
"""
subbands: list[SingleWeightingResponse]
# end TypedDict definitions --------------------------------------------------
[docs]def get_subbands(n_subbands, obs_freq, bandwidth):
"""
Function to get the centres (and common width) of the N subbands of
bandwidth
:param n_subbands: Number of subbands
:type n_subbands: int
:param obs_freq: Frequency
:type obs_freq: float
:param bandwidth: Bandwidth
:type bandwidth: float
:return: A list of frequency centres for the subbands and the subband width
:rtype: Tuple(List, float)
"""
left = obs_freq - (0.5 * bandwidth)
subband_width = bandwidth / n_subbands
return [
left + ((i + 0.5) * subband_width) for i in range(n_subbands)
], subband_width
# Get the keywords of the Calculator constructor
KWARGS_CALCULATOR = [
p.name
for p in signature(Calculator).parameters.values()
if p.kind == p.KEYWORD_ONLY
]
[docs]def get_calculate_response(params):
"""
Using the parameters of the query return the appropriate values for the
calculation.
"""
# Parse the target
target = SkyCoord(
params["ra_str"],
params["dec_str"],
frame="icrs",
unit=(u.hourangle, u.deg),
)
params["target"] = target
if params.get("array_configuration"):
params["array_configuration"] = MIDArrayConfiguration(
params["array_configuration"]
)
# Keep only the params in the list of constructor inputs
constructor_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR}
# Main results
result = {}
calculator = Calculator(**constructor_params)
line_calculator = None
if params.get("resolution"):
line_params = constructor_params.copy()
line_params["bandwidth"] = params["resolution"]
line_calculator = Calculator(**line_params)
if params.get("integration_time"):
sensitivity = (
calculator.calculate_sensitivity(params["integration_time"]).to(u.Jy).value
)
result.update(
{
"result": {
"state": calculator.state(),
"sensitivity": sensitivity,
}
}
)
if line_calculator is not None:
# Calculate line sensitivity using resolution for bandwidth
line_sensitivity = (
line_calculator.calculate_sensitivity(params["integration_time"])
.to(u.Jy)
.value
)
result["result"]["line_sensitivity"] = line_sensitivity
if params.get("sensitivity"):
integration_time = (
calculator.calculate_integration_time(params["sensitivity"]).to(u.s).value
)
result.update(
{
"result": {
"state": calculator.state(),
"integration_time": integration_time,
}
}
)
if line_calculator is not None:
# Calculate line integration time using resolution for bandwidth
line_integration_time = (
line_calculator.calculate_integration_time(params["sensitivity"])
.to(u.s)
.value
)
result["result"]["line_integration_time"] = line_integration_time
# Subbands - if subbands is 1 then we just return the main sensitivity calculation as the subband is the whole bandwidth
if params.get("n_subbands", 1) != 1:
subband_results = []
subband_frequencies, subband_bandwidth = get_subbands(
params["n_subbands"], params["frequency"], params["bandwidth"]
)
if integration_time := params.get("integration_time"):
for subband_frequency in subband_frequencies:
subband_params = constructor_params.copy()
subband_params["frequency"] = subband_frequency
subband_params["bandwidth"] = subband_bandwidth
subband_calculator = Calculator(**subband_params)
sensitivity = (
subband_calculator.calculate_sensitivity(integration_time)
.to(u.Jy)
.value
)
subband_results.append(
{
"state": subband_calculator.state(),
"sensitivity": sensitivity,
}
)
if params.get("sensitivity"):
for subband_frequency, subband_sensitivity in zip(
subband_frequencies, params.get("subband_sensitivities")
):
subband_params = constructor_params.copy()
subband_params["frequency"] = subband_frequency
subband_params["bandwidth"] = subband_bandwidth
subband_calculator = Calculator(**subband_params)
integration_time = (
subband_calculator.calculate_integration_time(subband_sensitivity)
.to(u.s)
.value
)
subband_results.append(
{
"state": subband_calculator.state(),
"integration_time": integration_time,
}
)
result.update({"subbands": subband_results})
# Zooms
if params.get("zoom_frequencies"):
zoom_results = []
if params.get("integration_time"):
for zoom_frequency, zoom_resolution in zip(
params["zoom_frequencies"], params["zoom_resolutions"]
):
zoom_params = constructor_params.copy()
zoom_params["frequency"] = zoom_frequency
zoom_params["bandwidth"] = zoom_resolution
zoom_calculator = Calculator(**zoom_params)
sensitivity = (
zoom_calculator.calculate_sensitivity(params["integration_time"])
.to(u.Jy)
.value
)
zoom_results.append(
{
"state": zoom_calculator.state(),
"sensitivity": sensitivity,
}
)
if params.get("zoom_sensitivities"):
for zoom_frequency, zoom_resolution, zoom_sensitivity in zip(
params["zoom_frequencies"],
params["zoom_resolutions"],
params["zoom_sensitivities"],
):
zoom_params = constructor_params.copy()
zoom_params["frequency"] = zoom_frequency
zoom_params["bandwidth"] = zoom_resolution
zoom_calculator = Calculator(**zoom_params)
integration_time = (
zoom_calculator.calculate_integration_time(zoom_sensitivity)
.to(u.s)
.value
)
zoom_results.append(
{
"state": zoom_calculator.state(),
"integration_time": integration_time,
}
)
result.update({"zooms": zoom_results})
return result
[docs]def get_weighting_response(user_input: dict) -> WeightingResponse:
"""
Using the parameters of the query return the appropriate values for
weighting correction factor, synthesized-beam-sensitivity factor and beam
shape.
Raises:
- None
"""
if user_input["calculator_mode"] == MidSpectralMode.LINE.value:
# use zoom_frequencies instead if in Zoom mode
frequency = user_input.get("zoom_frequencies") * u.Hz
else:
frequency = [user_input.get("frequency") * u.Hz]
params = WeightingInput(
freq_centre=frequency,
dec=Latitude(user_input["dec_str"] + " degrees"),
weighting_mode=Weighting(user_input["weighting"]),
robustness=user_input.get("robustness", None),
subarray_configuration=MIDArrayConfiguration(user_input["array_configuration"]),
calc_mode=MidSpectralMode(user_input["calculator_mode"]),
taper=user_input.get("taper", 0.0) * u.arcsec,
telescope=Telescope.MID,
)
result = dict(**get_single_weighting_response(params))
# BTN-1957: Add subband support
# If the subband_frequencies_hz parameter is specified, the response should
# include the weighting response for each subband frequency.
subbands = []
if user_input.get("subband_frequencies_hz"):
for subband_frequency in user_input.get("subband_frequencies_hz"):
subband_params = copy.deepcopy(params)
subband_params.freq_centre = [subband_frequency * u.Hz]
# BTN-1957 spec states that continuum mode should always be assumed in
# the lookup table regardless of subband bandwidth
subband_params.calc_mode = MidSpectralMode.CONTINUUM
subbands.append(get_single_weighting_response(subband_params))
return dict(**result, subbands=subbands)
def get_single_weighting_response(params: WeightingInput) -> SingleWeightingResponse:
weighting_result = calculate_weighting(params)
# If confusion noise is labelled as an upper limit set it to
# have a value of 0 Jy and a label of 'value'
noise_values = weighting_result.confusion_noise.value
noise_limit_types = weighting_result.confusion_noise.limit
np_values = numpy.array([value.value for value in noise_values])
np_limit_types = numpy.array([limit.value for limit in noise_limit_types])
np_values[np_limit_types == Limit.UPPER.value] = 0
np_limit_types[np_limit_types == Limit.UPPER.value] = Limit.VALUE.value
confusion_noise_response: ConfusionNoiseResponse = {
# prefer returning Python primitives over returning numpy ndarrays
"value": list(np_values),
"limit_type": list(np_limit_types),
}
beam_size_response = [
{
"beam_maj_scaled": beam_size.beam_maj.value,
"beam_min_scaled": beam_size.beam_min.value,
"beam_pa": beam_size.beam_pa.value,
}
for beam_size in weighting_result.beam_size
]
return {
"weighting_factor": weighting_result.weighting_factor,
"sbs_conv_factor": [
sbsbf.value
for sbsbf in weighting_result.surface_brightness_conversion_factor
],
"confusion_noise": confusion_noise_response,
"beam_size": beam_size_response,
}
[docs]def get_subarray_response():
"""
return the appropriate subarray objects
"""
return [
{
"name": subarray.name,
"label": subarray.label,
"n_ska": subarray.n_ska,
"n_meer": subarray.n_meer,
}
for subarray in subarray_storage.list()
]