Source code for ska_ost_senscalc.mid.service

"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
import copy
from inspect import signature

import astropy.units as u
from astropy.coordinates import Latitude, SkyCoord

from ska_ost_senscalc.common.model import Weighting, WeightingInput
from ska_ost_senscalc.common.service import (
    WeightingResponse,
    get_single_weighting_response,
    get_subbands,
)
from ska_ost_senscalc.common.spectropolarimetry import (
    SpectropolarimetryInput,
    get_spectropolarimetry_results,
)
from ska_ost_senscalc.mid.calculator import Calculator
from ska_ost_senscalc.mid.model import MidSpectralMode
from ska_ost_senscalc.mid.validation import MID_CONTINUUM_CHANNEL_WIDTH_KHZ
from ska_ost_senscalc.subarray import MIDArrayConfiguration, SubarrayStorage
from ska_ost_senscalc.utilities import Telescope

subarray_storage = SubarrayStorage(Telescope.MID)


# Get the keywords of the Calculator constructor
KWARGS_CALCULATOR = [
    p.name
    for p in signature(Calculator).parameters.values()
    if p.kind == p.KEYWORD_ONLY
]


[docs] def get_calculate_response(params): """ Using the parameters of the query return the appropriate values for the calculation. """ # Parse the target target = SkyCoord( params["ra_str"], params["dec_str"], frame="icrs", unit=(u.hourangle, u.deg), ) params["target"] = target # Keep only the params in the list of constructor inputs constructor_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR} # Main results result = {} calculator = Calculator(**constructor_params) line_calculator = None if params.get("resolution"): line_params = constructor_params.copy() line_params["bandwidth"] = params["resolution"] line_calculator = Calculator(**line_params) if params.get("integration_time"): sensitivity = ( calculator.calculate_sensitivity(params["integration_time"]).to(u.Jy).value ) result.update( { "result": { "state": calculator.state(), "sensitivity": sensitivity, } } ) if line_calculator is not None: # Calculate line sensitivity using resolution for bandwidth line_sensitivity = ( line_calculator.calculate_sensitivity(params["integration_time"]) .to(u.Jy) .value ) result["result"]["line_sensitivity"] = line_sensitivity if params.get("sensitivity"): integration_time = ( calculator.calculate_integration_time(params["sensitivity"]).to(u.s).value ) result.update( { "result": { "state": calculator.state(), "integration_time": integration_time, } } ) if line_calculator is not None: # Calculate line integration time using resolution for bandwidth line_integration_time = ( line_calculator.calculate_integration_time(params["sensitivity"]) .to(u.s) .value ) result["result"]["line_integration_time"] = line_integration_time # Subbands - if subbands is 1 then we just return the main sensitivity calculation as the subband is the whole bandwidth if params.get("n_subbands", 1) != 1: subband_results = [] subband_frequencies, subband_bandwidth = get_subbands( params["n_subbands"], params["frequency"], params["bandwidth"] ) if integration_time := params.get("integration_time"): for subband_frequency in subband_frequencies: subband_params = constructor_params.copy() subband_params["frequency"] = subband_frequency subband_params["bandwidth"] = subband_bandwidth subband_calculator = Calculator(**subband_params) sensitivity = ( subband_calculator.calculate_sensitivity(integration_time) .to(u.Jy) .value ) subband_results.append( { "subband_frequency": u.Quantity(subband_frequency, "Hz"), "state": subband_calculator.state(), "sensitivity": sensitivity, } ) if params.get("sensitivity"): for subband_frequency, subband_sensitivity in zip( subband_frequencies, params.get("subband_sensitivities") ): subband_params = constructor_params.copy() subband_params["frequency"] = subband_frequency subband_params["bandwidth"] = subband_bandwidth subband_calculator = Calculator(**subband_params) integration_time = ( subband_calculator.calculate_integration_time(subband_sensitivity) .to(u.s) .value ) subband_results.append( { "subband_frequency": u.Quantity(subband_frequency, "Hz"), "state": subband_calculator.state(), "integration_time": integration_time, } ) result.update({"subbands": subband_results}) # Zooms if params.get("zoom_frequencies"): zoom_results = [] if params.get("integration_time"): for zoom_frequency, zoom_resolution in zip( params["zoom_frequencies"], params["zoom_resolutions"] ): zoom_params = constructor_params.copy() zoom_params["frequency"] = zoom_frequency zoom_params["bandwidth"] = zoom_resolution zoom_calculator = Calculator(**zoom_params) sensitivity = ( zoom_calculator.calculate_sensitivity(params["integration_time"]) .to(u.Jy) .value ) zoom_results.append( { "state": zoom_calculator.state(), "sensitivity": sensitivity, } ) if params.get("zoom_sensitivities"): for zoom_frequency, zoom_resolution, zoom_sensitivity in zip( params["zoom_frequencies"], params["zoom_resolutions"], params["zoom_sensitivities"], ): zoom_params = constructor_params.copy() zoom_params["frequency"] = zoom_frequency zoom_params["bandwidth"] = zoom_resolution zoom_calculator = Calculator(**zoom_params) integration_time = ( zoom_calculator.calculate_integration_time(zoom_sensitivity) .to(u.s) .value ) zoom_results.append( { "state": zoom_calculator.state(), "integration_time": integration_time, } ) result.update({"zooms": zoom_results}) # BTN-2282 will remove this need for this as zoom and continuum will be in different functions effective_channel_width_hz = ( params["zoom_resolutions"][0] if params.get("zoom_resolutions") else params.get("resolution", MID_CONTINUUM_CHANNEL_WIDTH_KHZ * 1e3) ) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=u.Quantity(params["bandwidth"], "Hz"), frequency=u.Quantity(params["frequency"], "Hz"), effective_channel_width=u.Quantity(effective_channel_width_hz, "Hz"), ) result["result"]["spectropolarimetry_results"] = get_spectropolarimetry_results( spectropolarimetry_input ) return result
[docs] def get_weighting_response(user_input: dict) -> WeightingResponse: """ Using the parameters of the query return the appropriate values for weighting correction factor, synthesized-beam-sensitivity factor and beam shape. Raises: - None """ if user_input["calculator_mode"] == MidSpectralMode.LINE.value: # use zoom_frequencies instead if in Zoom mode frequency = user_input.get("zoom_frequencies") * u.Hz else: frequency = [user_input.get("frequency") * u.Hz] params = WeightingInput( freq_centre=frequency, dec=Latitude(user_input["dec_str"] + " degrees"), weighting_mode=Weighting(user_input["weighting"]), robustness=user_input.get("robustness", None), subarray_configuration=MIDArrayConfiguration(user_input["array_configuration"]), calc_mode=MidSpectralMode(user_input["calculator_mode"]), taper=user_input.get("taper", 0.0) * u.arcsec, telescope=Telescope.MID, ) result = dict(**get_single_weighting_response(params)) # BTN-1957: Add subband support # If the subband_frequencies_hz parameter is specified, the response should # include the weighting response for each subband frequency. subbands = [] if user_input.get("subband_frequencies_hz"): for subband_frequency in user_input.get("subband_frequencies_hz"): subband_params = copy.deepcopy(params) subband_params.freq_centre = [subband_frequency * u.Hz] # BTN-1957 spec states that continuum mode should always be assumed in # the lookup table regardless of subband bandwidth subband_params.calc_mode = MidSpectralMode.CONTINUUM subbands.append( { "subbband_frequency": u.Quantity(subband_frequency, "Hz"), "weighting": get_single_weighting_response(subband_params), } ) return dict(**result, subbands=subbands)
[docs] def get_subarray_response(): """ return the appropriate subarray objects """ return [ { "name": subarray.name, "label": subarray.label, "n_ska": subarray.n_ska, "n_meer": subarray.n_meer, } for subarray in subarray_storage.list() ]