Source code for ska_ost_senscalc.mid.service

"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""

from inspect import signature
from typing import Optional, TypedDict

import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.units import Quantity

from ska_ost_senscalc.common.service import get_subbands
from ska_ost_senscalc.common.spectropolarimetry import (
    SpectropolarimetryInput,
    SpectropolarimetryResults,
    get_spectropolarimetry_results,
)
from ska_ost_senscalc.mid.calculator import Calculator
from ska_ost_senscalc.mid.validation import MID_CONTINUUM_CHANNEL_WIDTH_KHZ
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope

subarray_storage = SubarrayStorage(Telescope.MID)


[docs] class SingleSubbandResponse(TypedDict): subband_freq_centre: Quantity sensitivity: Optional[Quantity] integration_time: Optional[Quantity]
[docs] class ContinuumSensitivityResponse(TypedDict): """ Typed dictionary constrained to match the OpenAPI schema for the response body of a single continuum sensitivity calculation. """ continuum_sensitivity: Optional[Quantity] continuum_integration_time: Optional[Quantity] spectral_sensitivity: Optional[Quantity] spectral_integration_time: Optional[Quantity] continuum_subband_sensitivities: Optional[list[SingleSubbandResponse]] continuum_subband_integration_times: Optional[list[SingleSubbandResponse]] spectropolarimetry_results: SpectropolarimetryResults
[docs] class SingleLineSensitivityResponse(TypedDict): """ Typed dictionary constrained to match the OpenAPI schema for the response body of a single line sensitivity calculation. """ freq_centre: Quantity spectral_sensitivity: Optional[Quantity] spectral_integration_time: Optional[Quantity] spectropolarimetry_results: SpectropolarimetryResults
# Get the keywords of the Calculator constructor KWARGS_CALCULATOR = [ p.name for p in signature(Calculator).parameters.values() if p.kind == p.KEYWORD_ONLY ]
[docs] def get_continuum_calculate_response(params: dict) -> ContinuumSensitivityResponse: """ Extract the params from the request, convert them into the relevant calculator inputs, perform the calculations and collect the results into the response body. """ # Parse the target target = SkyCoord( params["pointing_centre"], frame="icrs", unit=(u.hourangle, u.deg), ) params["target"] = target # Keep only the params in the list of constructor inputs constructor_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR} # Main results result = {} # Create the calculator for the main continuum result continuum_calculator = Calculator(**constructor_params) # Create the calculator for the spectral results, which are returned in the continuum response body line_params = constructor_params.copy() effective_resolution_hz = ( MID_CONTINUUM_CHANNEL_WIDTH_KHZ * 1e3 * line_params.get("spectral_averaging_factor", 1) ) line_params["bandwidth_hz"] = effective_resolution_hz line_calculator = Calculator(**line_params) if params.get("integration_time_s"): sensitivity = continuum_calculator.calculate_sensitivity( params["integration_time_s"] ).to(u.Jy) result.update({"continuum_sensitivity": sensitivity}) # Calculate line sensitivity using effective resolution for bandwidth line_sensitivity = line_calculator.calculate_sensitivity( params["integration_time_s"] ).to(u.Jy) result.update({"spectral_sensitivity": line_sensitivity}) if params.get("sensitivity_jy"): integration_time = continuum_calculator.calculate_integration_time( params["sensitivity_jy"] ).to(u.s) result.update({"continuum_integration_time": integration_time}) # Calculate line integration time using resolution for bandwidth line_integration_time = line_calculator.calculate_integration_time( params["sensitivity_jy"] ).to(u.s) result.update({"spectral_integration_time": line_integration_time}) # Subbands - if subbands is 1 then we just return the main sensitivity calculation as the subband is the whole bandwidth if params.get("n_subbands", 1) != 1: subband_results = [] subband_freq_centres_hz, subband_bandwidth = get_subbands( params["n_subbands"], params["freq_centre_hz"], params["bandwidth_hz"] ) if integration_time_s := params.get("integration_time_s"): for subband_freq_centre_hz in subband_freq_centres_hz: # Create the calculator for each subband result subband_params = constructor_params.copy() subband_params["freq_centre_hz"] = subband_freq_centre_hz subband_params["bandwidth_hz"] = subband_bandwidth subband_calculator = Calculator(**subband_params) sensitivity = subband_calculator.calculate_sensitivity( integration_time_s ).to(u.Jy) subband_results.append( { "subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"), "sensitivity": sensitivity, } ) result.update({"continuum_subband_sensitivities": subband_results}) if params.get("sensitivity_jy"): for subband_freq_centre_hz, subband_sensitivity_jy in zip( subband_freq_centres_hz, params.get("subband_sensitivities_jy") ): # Create the calculator for each subband result subband_params = constructor_params.copy() subband_params["freq_centre_hz"] = subband_freq_centre_hz subband_params["bandwidth_hz"] = subband_bandwidth subband_calculator = Calculator(**subband_params) integration_time = subband_calculator.calculate_integration_time( subband_sensitivity_jy ).to(u.s) subband_results.append( { "subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"), "integration_time": integration_time, } ) result.update({"continuum_subband_integration_times": subband_results}) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=u.Quantity(params["bandwidth_hz"], "Hz"), frequency=u.Quantity(params["freq_centre_hz"], "Hz"), effective_channel_width=u.Quantity(effective_resolution_hz, "Hz"), ) result.update( { "spectropolarimetry_results": get_spectropolarimetry_results( spectropolarimetry_input ) } ) return ContinuumSensitivityResponse(**result)
[docs] def get_line_calculate_response( params, ) -> list[SingleLineSensitivityResponse]: """ Extract the params from the request, convert them into the relevant calculator inputs, perform the calculations and collect the results into the response body. """ # Parse the target target = SkyCoord( params["pointing_centre"], frame="icrs", unit=(u.hourangle, u.deg), ) params["target"] = target # Keep only the params in the list of constructor inputs constructor_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR} zoom_results = [] if params.get("integration_time_s"): for zoom_freq_centre_hz, zoom_spectral_resolution_hz, total_bandwidth_hz in zip( params["freq_centres_hz"], params["spectral_resolutions_hz"], params["total_bandwidths_hz"], ): zoom_params = constructor_params.copy() zoom_params["freq_centre_hz"] = zoom_freq_centre_hz zoom_params["bandwidth_hz"] = zoom_spectral_resolution_hz zoom_calculator = Calculator(**zoom_params) sensitivity = zoom_calculator.calculate_sensitivity( params["integration_time_s"] ).to(u.Jy) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(total_bandwidth_hz, "Hz"), frequency=Quantity(zoom_freq_centre_hz, "Hz"), effective_channel_width=Quantity(zoom_spectral_resolution_hz, "Hz"), ) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) zoom_results.append( SingleLineSensitivityResponse( freq_centre=zoom_freq_centre_hz * u.Hz, spectral_sensitivity=sensitivity, spectropolarimetry_results=spectropolarimetry_results, ) ) if params.get("sensitivities_jy"): for ( zoom_freq_centre_hz, zoom_spectral_resolution_hz, zoom_sensitivity_jy, total_bandwidth_hz, ) in zip( params["freq_centres_hz"], params["spectral_resolutions_hz"], params["sensitivities_jy"], params["total_bandwidths_hz"], ): zoom_params = constructor_params.copy() zoom_params["freq_centre_hz"] = zoom_freq_centre_hz zoom_params["bandwidth_hz"] = zoom_spectral_resolution_hz zoom_calculator = Calculator(**zoom_params) integration_time = zoom_calculator.calculate_integration_time( zoom_sensitivity_jy ).to(u.s) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(total_bandwidth_hz, "Hz"), frequency=Quantity(zoom_freq_centre_hz, "Hz"), effective_channel_width=Quantity(zoom_spectral_resolution_hz, "Hz"), ) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) zoom_results.append( SingleLineSensitivityResponse( freq_centre=zoom_freq_centre_hz * u.Hz, spectral_integration_time=integration_time, spectropolarimetry_results=spectropolarimetry_results, ) ) return zoom_results
[docs] def get_subarray_response(): """ return the appropriate subarray objects """ return [ { "name": subarray.name, "label": subarray.label, "n_ska": subarray.n_ska, "n_meer": subarray.n_meer, } for subarray in subarray_storage.list() ]