Source code for ska_ost_senscalc.mid.service

"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
import dataclasses
from inspect import signature
from typing import TypedDict

import astropy.units as u
import numpy
from astropy.coordinates import Angle, SkyCoord
from astropy.units import Quantity

from ska_ost_senscalc.mid.calculator import Calculator
from ska_ost_senscalc.mid_utilities import Beam, CalculatorMode, Limit, Weighting
from ska_ost_senscalc.subarray import MIDArrayConfiguration, SubarrayStorage
from ska_ost_senscalc.utilities import Telescope

subarray_storage = SubarrayStorage(Telescope.MID)

# TypedDicts to constrain response dicts to match the OpenAPI spec -----------


[docs]class ConfusionNoiseResponse(TypedDict): """ ConfusionNoiseResponse is a typed dictionary constrained to match the schema of a confusion noise JSON object, as contained in the parent JSON result of a weighting endpoint query. """ value: list[float | int] limit_type: list[str]
[docs]class BeamSizeResponse(TypedDict): """ BeamSizeResponse is a typed dictionary constrained to match the schema of the JSON object outlining the synthesized beam size, as contained in the parent JSON result of a weighting endpoint query. """ beam_maj_scaled: float beam_min_scaled: float beam_pa: float
[docs]class SingleWeightingResponse(TypedDict): """ SingleWeightingResponse is a typed dictionary constrained to match the schema of a single weighting calculation, as performed for the main continuum or zoom weighting calculation and for each chunk frequency. Child SingleWeightingResponse JSON object for each of these calculations are contained in the parent JSON result """ weighting_factor: float | None sbs_conv_factor: float confusion_noise: ConfusionNoiseResponse beam_size: list[BeamSizeResponse]
[docs]class WeightingResponse(SingleWeightingResponse): """ WeightingResponse is a typed dictionary constrained to match the schema of the main result of a weighting endpoint query. """ chunks: list[SingleWeightingResponse]
# end TypedDict definitions --------------------------------------------------
[docs]def get_chunks(n_chunks, obs_freq, bandwidth): """ Function to get the centres (and common width) of the N chunks of bandwidth :param n_chunks: Number of chunks :type n_chunks: int :param obs_freq: Frequency :type obs_freq: float :param bandwidth: Bandwidth :type bandwidth: float :return: A list of frequency centres for the chunks and the chunk width :rtype: Tuple(List, float) """ left = obs_freq - (0.5 * bandwidth) chunk_width = bandwidth / n_chunks return [left + ((i + 0.5) * chunk_width) for i in range(n_chunks)], chunk_width
# Get the keywords of the Calculator constructor KWARGS_CALCULATOR = [ p.name for p in signature(Calculator).parameters.values() if p.kind == p.KEYWORD_ONLY ]
[docs]def get_calculate_response(params): """ Using the parameters of the query return the appropriate values for the calculation. """ # Parse the target target = SkyCoord( params["ra_str"], params["dec_str"], frame="icrs", unit=(u.hourangle, u.deg), ) params["target"] = target if params.get("array_configuration"): params["array_configuration"] = MIDArrayConfiguration( params["array_configuration"] ) # Keep only the params in the list of constructor inputs constructor_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR} # Main results result = {} calculator = Calculator(**constructor_params) line_calculator = None if params.get("resolution"): line_params = constructor_params.copy() line_params["bandwidth"] = params["resolution"] line_calculator = Calculator(**line_params) if params.get("integration_time"): sensitivity = ( calculator.calculate_sensitivity(params["integration_time"]).to(u.Jy).value ) result.update( { "result": { "state": calculator.state(), "sensitivity": sensitivity, } } ) if line_calculator is not None: # Calculate line sensitivity using resolution for bandwidth line_sensitivity = ( line_calculator.calculate_sensitivity(params["integration_time"]) .to(u.Jy) .value ) result["result"]["line_sensitivity"] = line_sensitivity if params.get("sensitivity"): integration_time = ( calculator.calculate_integration_time(params["sensitivity"]).to(u.s).value ) result.update( { "result": { "state": calculator.state(), "integration_time": integration_time, } } ) if line_calculator is not None: # Calculate line integration time using resolution for bandwidth line_integration_time = ( line_calculator.calculate_integration_time(params["sensitivity"]) .to(u.s) .value ) result["result"]["line_integration_time"] = line_integration_time # Chunks if params.get("n_chunks"): chunk_results = [] chunk_frequencies, chunk_bandwidth = get_chunks( params["n_chunks"], params["frequency"], params["bandwidth"] ) if integration_time := params.get("integration_time"): for chunk_frequency in chunk_frequencies: chunk_params = constructor_params.copy() chunk_params["frequency"] = chunk_frequency chunk_params["bandwidth"] = chunk_bandwidth chunk_calculator = Calculator(**chunk_params) sensitivity = ( chunk_calculator.calculate_sensitivity(integration_time) .to(u.Jy) .value ) chunk_results.append( { "state": chunk_calculator.state(), "sensitivity": sensitivity, } ) if params.get("sensitivity"): for chunk_frequency, chunk_sensitivity in zip( chunk_frequencies, params.get("chunk_sensitivities") ): chunk_params = constructor_params.copy() chunk_params["frequency"] = chunk_frequency chunk_params["bandwidth"] = chunk_bandwidth chunk_calculator = Calculator(**chunk_params) integration_time = ( chunk_calculator.calculate_integration_time(chunk_sensitivity) .to(u.s) .value ) chunk_results.append( { "state": chunk_calculator.state(), "integration_time": integration_time, } ) result.update({"chunks": chunk_results}) # Zooms if params.get("zoom_frequencies"): zoom_results = [] if params.get("integration_time"): for zoom_frequency, zoom_resolution in zip( params["zoom_frequencies"], params["zoom_resolutions"] ): zoom_params = constructor_params.copy() zoom_params["frequency"] = zoom_frequency zoom_params["bandwidth"] = zoom_resolution zoom_calculator = Calculator(**zoom_params) sensitivity = ( zoom_calculator.calculate_sensitivity(params["integration_time"]) .to(u.Jy) .value ) zoom_results.append( { "state": zoom_calculator.state(), "sensitivity": sensitivity, } ) if params.get("zoom_sensitivities"): for zoom_frequency, zoom_resolution, zoom_sensitivity in zip( params["zoom_frequencies"], params["zoom_resolutions"], params["zoom_sensitivities"], ): zoom_params = constructor_params.copy() zoom_params["frequency"] = zoom_frequency zoom_params["bandwidth"] = zoom_resolution zoom_calculator = Calculator(**zoom_params) integration_time = ( zoom_calculator.calculate_integration_time(zoom_sensitivity) .to(u.s) .value ) zoom_results.append( { "state": zoom_calculator.state(), "integration_time": integration_time, } ) result.update({"zooms": zoom_results}) return result
[docs]@dataclasses.dataclass class BeamArguments: """ BeamArguments is a dataclass holding arguments for a synthesised beam calculation. This class is essentially a proxy for a typed Beam constructor signature. It was introduced as a temporary measure to make the code easier to read and reason about. It should be removed when Beam is refactored to: 1. have a properly typed constructor signature 2. make mandatory arguments mandatory in the constructor signature, not optional 3. remove the runtime checking for mandatory arguments 4. use the proper Exception types (ValueError, etc.), not RuntimeError. Refactoring would make this class redundant. However, this is too large a refactoring to attempt ahead of the imminent first release so we use this intermediate adapter. """ frequency: float | int | None zoom_frequencies: list[float | int] | None dec: Angle weighting: Weighting robustness: int | None calculator_mode: CalculatorMode array_configuration: MIDArrayConfiguration taper: float | Quantity | None telescope: Telescope
[docs] @staticmethod def from_params(params) -> "BeamArguments": """ Construct a WeightingParameters from a dictionary of parameters. """ return BeamArguments( frequency=params.get("frequency", None), zoom_frequencies=params.get("zoom_frequencies", None), dec=Angle(params["dec_str"] + " degrees"), weighting=Weighting(params["weighting"]), robustness=params.get("robustness", None), array_configuration=MIDArrayConfiguration(params["array_configuration"]), calculator_mode=CalculatorMode(params["calculator_mode"]), taper=params.get("taper", 0.0), telescope=Telescope.MID, )
[docs]def get_weighting_response(params: dict) -> WeightingResponse: """ Using the parameters of the query return the appropriate values for weighting correction factor, synthesized-beam-sensitivity factor and beam shape. Input arguments and external state are not modified by this function. Raises: - None """ # calculate the beam weighting for the standard 'main' frequency or zoom # frequencies beam_arguments = BeamArguments.from_params(params) main_result = get_single_weighting_response(beam_arguments) # BTN-1957: Add chunk support # If the chunk_frequencies parameter is specified, the response should # include the weighting response for each chunk frequency. result = dict(**main_result, chunks=get_chunks_weighting_response(params)) return result
[docs]def get_chunks_weighting_response( params: dict, ) -> list[SingleWeightingResponse]: """ Calculate the beam weighting response for every chunk frequency in the given parameters dict. Input arguments and external state are not modified by this function. Raises: - None """ # create a base BeamArguments object that will be modified to match the # chunk frequency in scope before any calculation is performed chunk_params = BeamArguments.from_params(params) chunk_results = [] chunk_frequencies = params.get("chunk_frequencies_hz", []) for chunk_frequency in chunk_frequencies: chunk_params.frequency = chunk_frequency chunk_params.zoom_frequencies = None # BTN-1957 spec states that continuum mode should always be assumed in # the lookup table regardless of chunk bandwidth chunk_params.calculator_mode = CalculatorMode.CONTINUUM chunk_result = get_single_weighting_response(chunk_params) chunk_results.append(chunk_result) return chunk_results
[docs]def get_single_weighting_response( beam_args: BeamArguments, ) -> SingleWeightingResponse: """ Calculate the beam weighting from the given beam parameters. Input arguments and external state are not modified by this function. Raises: - None """ beam = Beam( frequency=beam_args.frequency, zoom_frequencies=beam_args.zoom_frequencies, dec=beam_args.dec.degree, weighting=beam_args.weighting, robustness=beam_args.robustness, array_configuration=beam_args.array_configuration, calculator_mode=beam_args.calculator_mode, taper=beam_args.taper, ) # Process the confusion noise, transforming anything labelled as an upper # limit to have a value of 0 Jy and a label of 'value' noise_values, noise_limit_types = beam.confusion_noise() np_limit_types = numpy.array(noise_limit_types) noise_values[np_limit_types == Limit.UPPER.value] = 0 np_limit_types[np_limit_types == Limit.UPPER.value] = Limit.VALUE.value confusion_noise_response: ConfusionNoiseResponse = { # prefer returning Python primitives over returning numpy ndarrays "value": list(noise_values.value), "limit_type": list(np_limit_types), } beam_size_response = [ { "beam_maj_scaled": beam_size.beam_maj.value, "beam_min_scaled": beam_size.beam_min.value, "beam_pa": beam_size.beam_pa.value, } for beam_size in beam.beam_size() ] return { "weighting_factor": beam.weighting_factor(), "sbs_conv_factor": beam.surface_brightness_conversion_factor().value, "confusion_noise": confusion_noise_response, "beam_size": beam_size_response, }
[docs]def get_subarray_response(): """ return the appropriate subarray objects """ return [ { "name": subarray.name, "label": subarray.label, "n_ska": subarray.n_ska, "n_meer": subarray.n_meer, } for subarray in subarray_storage.list() ]