Source code for ska_ost_senscalc.low.service

"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""

from typing import List, Optional, Tuple, TypedDict

from astropy.units import Quantity

from ska_ost_senscalc.common.model import CalculatorInputPSS
from ska_ost_senscalc.common.pss import convert_continuum_to_bf_sensitivity
from ska_ost_senscalc.common.service import get_subbands
from ska_ost_senscalc.common.spectropolarimetry import (
    SpectropolarimetryInput,
    SpectropolarimetryResults,
    get_spectropolarimetry_results,
)
from ska_ost_senscalc.low.bright_source_lookup import BrightSourceCatalog
from ska_ost_senscalc.low.calculator import calculate_sensitivity
from ska_ost_senscalc.low.model import CalculatorInput
from ska_ost_senscalc.low.validation import LOW_CONTINUUM_CHANNEL_WIDTH_KHZ
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope

subarray_storage = SubarrayStorage(Telescope.LOW)

FLUX_DENSITY_THRESHOLD_JY = 10.0


[docs] class SubbandResponse(TypedDict): subband_freq_centre: Quantity sensitivity: Quantity
[docs] class ContinuumSensitivityResponse(TypedDict): """ Typed dictionary constrained to match the OpenAPI schema for the response body of a single continuum sensitivity calculation. """ continuum_sensitivity: Quantity continuum_subband_sensitivities: Optional[List[SubbandResponse]] spectral_sensitivity: Quantity spectropolarimetry_results: SpectropolarimetryResults warning: Optional[str]
[docs] class SingleZoomSensitivityResponse(TypedDict): """ Typed dictionary constrained to match the OpenAPI schema for the response body of a single zoom sensitivity calculation. """ freq_centre: Quantity spectral_sensitivity: Quantity spectropolarimetry_results: SpectropolarimetryResults warning: Optional[str]
[docs] class PSSSensitivityResponse(TypedDict): """ Typed dictionary constrained to match the OpenAPI schema for the response body of a single PSS sensitivity calculation. """ # For pulsar search: folded_pulse_sensitivity: Quantity warning: Optional[str]
[docs] def convert_continuum_input_and_calculate( user_input: dict, ) -> ContinuumSensitivityResponse: """ :param user_input: A kwarg dict of the HTTP parameters sent by the user :return: a SensitivityResponse containing the calculated sensitivity and its units """ num_stations = _num_stations_from_input(user_input) continuum_calculator_input = CalculatorInput( freq_centre_mhz=user_input["freq_centre_mhz"], bandwidth_mhz=user_input["bandwidth_mhz"], num_stations=num_stations, pointing_centre=user_input["pointing_centre"], integration_time_h=user_input["integration_time_h"], elevation_limit=user_input["elevation_limit"], ) effective_resolution_mhz = (LOW_CONTINUUM_CHANNEL_WIDTH_KHZ / 1e3) * user_input.get( "spectral_averaging_factor", 1 ) spectral_calculator_input = CalculatorInput( freq_centre_mhz=user_input["freq_centre_mhz"], bandwidth_mhz=effective_resolution_mhz, num_stations=num_stations, pointing_centre=user_input["pointing_centre"], integration_time_h=user_input["integration_time_h"], elevation_limit=user_input["elevation_limit"], ) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(user_input["bandwidth_mhz"], "MHz"), frequency=Quantity(user_input["freq_centre_mhz"], "MHz"), effective_channel_width=Quantity(effective_resolution_mhz, "MHz"), ) continuum_result, warning = _get_calculation_value(continuum_calculator_input) # Warnings will be identical, so we don't need this one: spectral_result, _ = _get_calculation_value(spectral_calculator_input) subband_sensitivities = _get_subband_sensitivities(user_input, num_stations) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) return ContinuumSensitivityResponse( continuum_sensitivity=continuum_result, continuum_subband_sensitivities=subband_sensitivities, spectral_sensitivity=spectral_result, warning=warning, spectropolarimetry_results=spectropolarimetry_results, )
[docs] def convert_zoom_input_and_calculate( user_input: dict, ) -> [SingleZoomSensitivityResponse]: """ :param user_input: A kwarg dict of the HTTP parameters sent by the user :return: a dict containing the calculated sensitivity and its units """ num_stations = _num_stations_from_input(user_input) result = [] for freq_centre_mhz, spectral_resolution_hz, total_bandwidth_khz in zip( user_input["freq_centres_mhz"], user_input["spectral_resolutions_hz"], user_input["total_bandwidths_khz"], ): effective_resolution_hz = spectral_resolution_hz * user_input.get( "spectral_averaging_factor", 1 ) calculator_input = CalculatorInput( freq_centre_mhz=freq_centre_mhz, bandwidth_mhz=effective_resolution_hz * 1e-6, # Convert to MHz num_stations=num_stations, pointing_centre=user_input["pointing_centre"], integration_time_h=user_input["integration_time_h"], elevation_limit=user_input["elevation_limit"], ) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(total_bandwidth_khz, "kHz"), frequency=Quantity(freq_centre_mhz, "MHz"), effective_channel_width=Quantity(effective_resolution_hz, "Hz"), ) sensitivity, warning = _get_calculation_value(calculator_input) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) result.append( SingleZoomSensitivityResponse( freq_centre=Quantity(freq_centre_mhz, "MHz"), spectral_sensitivity=sensitivity, warning=warning, spectropolarimetry_results=spectropolarimetry_results, ) ) return result
[docs] def convert_pss_input_and_calculate(user_input: dict) -> PSSSensitivityResponse: """ :param user_input: A kwarg dict of the HTTP parameters sent by the user :return: a dict containing the calculated sensitivity and its units """ num_stations = _num_stations_from_input(user_input) calculator_input_pss = CalculatorInputPSS( freq_centre_mhz=user_input["freq_centre_mhz"], bandwidth_mhz=user_input["bandwidth_mhz"], chan_width=user_input["spectral_resolution_hz"], num_stations=num_stations, pointing_centre=user_input["pointing_centre"], integration_time_h=user_input["integration_time_h"], elevation_limit=user_input["elevation_limit"], dm=user_input["dm"], pulse_period=user_input["pulse_period"], intrinsic_pulse_width=user_input["intrinsic_pulse_width"], ) # First, estimate the corresponding continuum sensitivity continuum_input = CalculatorInput( freq_centre_mhz=calculator_input_pss.freq_centre_mhz, bandwidth_mhz=calculator_input_pss.chan_width * 1e-6, # Convert to MHz num_stations=calculator_input_pss.num_stations, pointing_centre=calculator_input_pss.pointing_centre, integration_time_h=calculator_input_pss.integration_time_h, elevation_limit=calculator_input_pss.elevation_limit, ) continuum_sensitivity, warning = _get_calculation_value(continuum_input) # Convert the continuum sensitivity to folded-pulse sensitivity folded_pulse_sensitivity = convert_continuum_to_bf_sensitivity( continuum_sensitivity, calculator_input_pss, ) return PSSSensitivityResponse( folded_pulse_sensitivity=folded_pulse_sensitivity, warning=warning )
[docs] def get_subarray_response(): """ return the appropriate subarray objects """ return [ { "name": subarray.name, "label": subarray.label, "n_stations": subarray.n_stations, } for subarray in subarray_storage.list() ]
def _get_calculation_value( calculator_input: CalculatorInput, ) -> Tuple[Quantity, Optional[str]]: result = calculate_sensitivity(calculator_input) sensitivity = Quantity(result.sensitivity, result.units) warning = _check_for_warning(calculator_input) return sensitivity, warning def _check_for_warning(calculator_input: CalculatorInput) -> Optional[str]: mwa_cat = BrightSourceCatalog(threshold_jy=FLUX_DENSITY_THRESHOLD_JY) if mwa_cat.check_for_bright_sources( calculator_input.pointing_centre, calculator_input.freq_centre_mhz ): return ( "The specified pointing contains at least one source brighter " + f"than {FLUX_DENSITY_THRESHOLD_JY} Jy. Your observation may be " + "dynamic range limited." ) return None def _num_stations_from_input(user_input: dict) -> int: """ If the user has given a subarray_configuration, extract the num_stations from that. Otherwise, use the value given by the user. Validation has checked that one and only on of these fields is present in the input. :param user_input: a dict of the parameters given by the user :return: the num_stations to use in the calculation """ if "subarray_configuration" in user_input: subarray = subarray_storage.load_by_name(user_input["subarray_configuration"]) return subarray.n_stations return user_input["num_stations"] def _get_subband_sensitivities( user_input: dict, num_stations: int ) -> List[SubbandResponse]: if user_input.get("n_subbands", 1) == 1: return [] subband_freq_centres_hz, subband_bandwidth = get_subbands( user_input["n_subbands"], user_input["freq_centre_mhz"], user_input["bandwidth_mhz"], ) return [ SubbandResponse( subband_freq_centre=Quantity(subband_frequency, "MHz"), sensitivity=_get_calculation_value( CalculatorInput( freq_centre_mhz=subband_frequency, bandwidth_mhz=subband_bandwidth, num_stations=num_stations, pointing_centre=user_input["pointing_centre"], integration_time_h=user_input["integration_time_h"], elevation_limit=user_input["elevation_limit"], ) )[0], ) for subband_frequency in subband_freq_centres_hz ]