"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
import dataclasses
from inspect import signature
from typing import TypedDict
import astropy.units as u
import numpy
from astropy.coordinates import Angle, SkyCoord
from astropy.units import Quantity
from ska_ost_senscalc.mid.calculator import Calculator
from ska_ost_senscalc.mid_utilities import Beam, CalculatorMode, Limit, Weighting
from ska_ost_senscalc.subarray import MIDArrayConfiguration, SubarrayStorage
from ska_ost_senscalc.utilities import Telescope
subarray_storage = SubarrayStorage(Telescope.MID)
# TypedDicts to constrain response dicts to match the OpenAPI spec -----------
[docs]class ConfusionNoiseResponse(TypedDict):
"""
ConfusionNoiseResponse is a typed dictionary constrained to match the
schema of a confusion noise JSON object, as contained in the parent JSON
result of a weighting endpoint query.
"""
value: list[float | int]
limit_type: list[str]
[docs]class BeamSizeResponse(TypedDict):
"""
BeamSizeResponse is a typed dictionary constrained to match the schema of
the JSON object outlining the synthesized beam size, as contained in the
parent JSON result of a weighting endpoint query.
"""
beam_maj_scaled: float
beam_min_scaled: float
beam_pa: float
[docs]class SingleWeightingResponse(TypedDict):
"""
SingleWeightingResponse is a typed dictionary constrained to match the
schema of a single weighting calculation, as performed for the main
continuum or zoom weighting calculation and for each chunk frequency.
Child SingleWeightingResponse JSON object for each of these calculations
are contained in the parent JSON result
"""
weighting_factor: float | None
sbs_conv_factor: float
confusion_noise: ConfusionNoiseResponse
beam_size: list[BeamSizeResponse]
[docs]class WeightingResponse(SingleWeightingResponse):
"""
WeightingResponse is a typed dictionary constrained to match the schema of
the main result of a weighting endpoint query.
"""
chunks: list[SingleWeightingResponse]
# end TypedDict definitions --------------------------------------------------
[docs]def get_chunks(n_chunks, obs_freq, bandwidth):
"""
Function to get the centres (and common width) of the N chunks of
bandwidth
:param n_chunks: Number of chunks
:type n_chunks: int
:param obs_freq: Frequency
:type obs_freq: float
:param bandwidth: Bandwidth
:type bandwidth: float
:return: A list of frequency centres for the chunks and the chunk width
:rtype: Tuple(List, float)
"""
left = obs_freq - (0.5 * bandwidth)
chunk_width = bandwidth / n_chunks
return [left + ((i + 0.5) * chunk_width) for i in range(n_chunks)], chunk_width
# Get the keywords of the Calculator constructor
KWARGS_CALCULATOR = [
p.name
for p in signature(Calculator).parameters.values()
if p.kind == p.KEYWORD_ONLY
]
[docs]def get_calculate_response(params):
"""
Using the parameters of the query return the appropriate values for the
calculation.
"""
# Parse the target
target = SkyCoord(
params["ra_str"],
params["dec_str"],
frame="icrs",
unit=(u.hourangle, u.deg),
)
params["target"] = target
if params.get("array_configuration"):
params["array_configuration"] = MIDArrayConfiguration(
params["array_configuration"]
)
# Keep only the params in the list of constructor inputs
constructor_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR}
# Main results
result = {}
calculator = Calculator(**constructor_params)
line_calculator = None
if params.get("resolution"):
line_params = constructor_params.copy()
line_params["bandwidth"] = params["resolution"]
line_calculator = Calculator(**line_params)
if params.get("integration_time"):
sensitivity = (
calculator.calculate_sensitivity(params["integration_time"]).to(u.Jy).value
)
result.update(
{
"result": {
"state": calculator.state(),
"sensitivity": sensitivity,
}
}
)
if line_calculator is not None:
# Calculate line sensitivity using resolution for bandwidth
line_sensitivity = (
line_calculator.calculate_sensitivity(params["integration_time"])
.to(u.Jy)
.value
)
result["result"]["line_sensitivity"] = line_sensitivity
if params.get("sensitivity"):
integration_time = (
calculator.calculate_integration_time(params["sensitivity"]).to(u.s).value
)
result.update(
{
"result": {
"state": calculator.state(),
"integration_time": integration_time,
}
}
)
if line_calculator is not None:
# Calculate line integration time using resolution for bandwidth
line_integration_time = (
line_calculator.calculate_integration_time(params["sensitivity"])
.to(u.s)
.value
)
result["result"]["line_integration_time"] = line_integration_time
# Chunks
if params.get("n_chunks"):
chunk_results = []
chunk_frequencies, chunk_bandwidth = get_chunks(
params["n_chunks"], params["frequency"], params["bandwidth"]
)
if integration_time := params.get("integration_time"):
for chunk_frequency in chunk_frequencies:
chunk_params = constructor_params.copy()
chunk_params["frequency"] = chunk_frequency
chunk_params["bandwidth"] = chunk_bandwidth
chunk_calculator = Calculator(**chunk_params)
sensitivity = (
chunk_calculator.calculate_sensitivity(integration_time)
.to(u.Jy)
.value
)
chunk_results.append(
{
"state": chunk_calculator.state(),
"sensitivity": sensitivity,
}
)
if params.get("sensitivity"):
for chunk_frequency, chunk_sensitivity in zip(
chunk_frequencies, params.get("chunk_sensitivities")
):
chunk_params = constructor_params.copy()
chunk_params["frequency"] = chunk_frequency
chunk_params["bandwidth"] = chunk_bandwidth
chunk_calculator = Calculator(**chunk_params)
integration_time = (
chunk_calculator.calculate_integration_time(chunk_sensitivity)
.to(u.s)
.value
)
chunk_results.append(
{
"state": chunk_calculator.state(),
"integration_time": integration_time,
}
)
result.update({"chunks": chunk_results})
# Zooms
if params.get("zoom_frequencies"):
zoom_results = []
if params.get("integration_time"):
for zoom_frequency, zoom_resolution in zip(
params["zoom_frequencies"], params["zoom_resolutions"]
):
zoom_params = constructor_params.copy()
zoom_params["frequency"] = zoom_frequency
zoom_params["bandwidth"] = zoom_resolution
zoom_calculator = Calculator(**zoom_params)
sensitivity = (
zoom_calculator.calculate_sensitivity(params["integration_time"])
.to(u.Jy)
.value
)
zoom_results.append(
{
"state": zoom_calculator.state(),
"sensitivity": sensitivity,
}
)
if params.get("zoom_sensitivities"):
for zoom_frequency, zoom_resolution, zoom_sensitivity in zip(
params["zoom_frequencies"],
params["zoom_resolutions"],
params["zoom_sensitivities"],
):
zoom_params = constructor_params.copy()
zoom_params["frequency"] = zoom_frequency
zoom_params["bandwidth"] = zoom_resolution
zoom_calculator = Calculator(**zoom_params)
integration_time = (
zoom_calculator.calculate_integration_time(zoom_sensitivity)
.to(u.s)
.value
)
zoom_results.append(
{
"state": zoom_calculator.state(),
"integration_time": integration_time,
}
)
result.update({"zooms": zoom_results})
return result
[docs]@dataclasses.dataclass
class BeamArguments:
"""
BeamArguments is a dataclass holding arguments for a synthesised beam
calculation.
This class is essentially a proxy for a typed Beam constructor signature.
It was introduced as a temporary measure to make the code easier to read
and reason about. It should be removed when Beam is refactored to:
1. have a properly typed constructor signature
2. make mandatory arguments mandatory in the constructor signature, not
optional
3. remove the runtime checking for mandatory arguments
4. use the proper Exception types (ValueError, etc.), not RuntimeError.
Refactoring would make this class redundant. However, this is too large a
refactoring to attempt ahead of the imminent first release so we use this
intermediate adapter.
"""
frequency: float | int | None
zoom_frequencies: list[float | int] | None
dec: Angle
weighting: Weighting
robustness: int | None
calculator_mode: CalculatorMode
array_configuration: MIDArrayConfiguration
taper: float | Quantity | None
telescope: Telescope
[docs] @staticmethod
def from_params(params) -> "BeamArguments":
"""
Construct a WeightingParameters from a dictionary of parameters.
"""
return BeamArguments(
frequency=params.get("frequency", None),
zoom_frequencies=params.get("zoom_frequencies", None),
dec=Angle(params["dec_str"] + " degrees"),
weighting=Weighting(params["weighting"]),
robustness=params.get("robustness", None),
array_configuration=MIDArrayConfiguration(params["array_configuration"]),
calculator_mode=CalculatorMode(params["calculator_mode"]),
taper=params.get("taper", 0.0),
telescope=Telescope.MID,
)
[docs]def get_weighting_response(params: dict) -> WeightingResponse:
"""
Using the parameters of the query return the appropriate values for
weighting correction factor, synthesized-beam-sensitivity factor and beam
shape.
Input arguments and external state are not modified by this function.
Raises:
- None
"""
# calculate the beam weighting for the standard 'main' frequency or zoom
# frequencies
beam_arguments = BeamArguments.from_params(params)
main_result = get_single_weighting_response(beam_arguments)
# BTN-1957: Add chunk support
# If the chunk_frequencies parameter is specified, the response should
# include the weighting response for each chunk frequency.
result = dict(**main_result, chunks=get_chunks_weighting_response(params))
return result
[docs]def get_chunks_weighting_response(
params: dict,
) -> list[SingleWeightingResponse]:
"""
Calculate the beam weighting response for every chunk frequency in the
given parameters dict.
Input arguments and external state are not modified by this function.
Raises:
- None
"""
# create a base BeamArguments object that will be modified to match the
# chunk frequency in scope before any calculation is performed
chunk_params = BeamArguments.from_params(params)
chunk_results = []
chunk_frequencies = params.get("chunk_frequencies_hz", [])
for chunk_frequency in chunk_frequencies:
chunk_params.frequency = chunk_frequency
chunk_params.zoom_frequencies = None
# BTN-1957 spec states that continuum mode should always be assumed in
# the lookup table regardless of chunk bandwidth
chunk_params.calculator_mode = CalculatorMode.CONTINUUM
chunk_result = get_single_weighting_response(chunk_params)
chunk_results.append(chunk_result)
return chunk_results
[docs]def get_single_weighting_response(
beam_args: BeamArguments,
) -> SingleWeightingResponse:
"""
Calculate the beam weighting from the given beam parameters.
Input arguments and external state are not modified by this function.
Raises:
- None
"""
beam = Beam(
frequency=beam_args.frequency,
zoom_frequencies=beam_args.zoom_frequencies,
dec=beam_args.dec.degree,
weighting=beam_args.weighting,
robustness=beam_args.robustness,
array_configuration=beam_args.array_configuration,
calculator_mode=beam_args.calculator_mode,
taper=beam_args.taper,
)
# Process the confusion noise, transforming anything labelled as an upper
# limit to have a value of 0 Jy and a label of 'value'
noise_values, noise_limit_types = beam.confusion_noise()
np_limit_types = numpy.array(noise_limit_types)
noise_values[np_limit_types == Limit.UPPER.value] = 0
np_limit_types[np_limit_types == Limit.UPPER.value] = Limit.VALUE.value
confusion_noise_response: ConfusionNoiseResponse = {
# prefer returning Python primitives over returning numpy ndarrays
"value": list(noise_values.value),
"limit_type": list(np_limit_types),
}
beam_size_response = [
{
"beam_maj_scaled": beam_size.beam_maj.value,
"beam_min_scaled": beam_size.beam_min.value,
"beam_pa": beam_size.beam_pa.value,
}
for beam_size in beam.beam_size()
]
return {
"weighting_factor": beam.weighting_factor(),
"sbs_conv_factor": beam.surface_brightness_conversion_factor().value,
"confusion_noise": confusion_noise_response,
"beam_size": beam_size_response,
}
[docs]def get_subarray_response():
"""
return the appropriate subarray objects
"""
return [
{
"name": subarray.name,
"label": subarray.label,
"n_ska": subarray.n_ska,
"n_meer": subarray.n_meer,
}
for subarray in subarray_storage.list()
]