Source code for ska_ost_senscalc.low.service

"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""

from dataclasses import asdict
from typing import List, Optional

from astropy import units as u
from astropy.units import Quantity

from ska_ost_senscalc.common.model import (
    CalculatorInputPSS,
    ContinuumCalculatorAndWeightingInput,
    ContinuumSensitivitiesTransformationsInput,
    ContinuumSensitivityResponse,
    ContinuumSensitivityResults,
    ContinuumWeightingResponse,
    PSSSensitivityResponse,
    PulsarMode,
    PulsarSamplingTime,
    SensitivitiesTransformationsWithoutWeightingInput,
    SensitivityTransformationsSubbandsInput,
    SingleZoomSensitivityResponse,
    SingleZoomWeightingResponse,
    SubbandResponse,
    WeightingContinuumSpectralResults,
    WeightingSpectralMode,
    ZoomCalculatorAndWeightingInput,
    ZoomSensitivitiesTransformationsInput,
    ZoomSensitivityResponse,
    ZoomSensitivityTransformedResults,
)
from ska_ost_senscalc.common.pss import convert_continuum_to_bf_sensitivity
from ska_ost_senscalc.common.service import (
    calculate_continuum_sensitivities_subbands_results,
    calculate_continuum_sensitivity,
    calculate_zoom_results,
    calculate_zoom_results_without_weighting,
    get_continuum_weighting_response,
    get_subbands,
    get_weighted_sensitivity,
    get_zoom_weighting_response,
    is_beam_non_gaussian,
)
from ska_ost_senscalc.common.spectropolarimetry import (
    SpectropolarimetryInput,
    get_spectropolarimetry_results,
)
from ska_ost_senscalc.low.bright_source_lookup import BrightSourceCatalog
from ska_ost_senscalc.low.calculator import calculate_sensitivity
from ska_ost_senscalc.low.model import CalculatorInput
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope

subarray_storage = SubarrayStorage(Telescope.LOW)

FLUX_DENSITY_THRESHOLD_JY = 10.0
LOW_CONTINUUM_CHANNEL_WIDTH_KHZ = 24 * 781.25 / (4096 * 27 / 32)


[docs] def convert_continuum_input_and_calculate( calc_weight_input: ContinuumCalculatorAndWeightingInput, ) -> ContinuumSensitivityResponse: """ :param user_input: A ContinuumCalculatorAndWeightingInput object containing the parameters sent by the user having been validated and converted for the calculator and weighting calculations. :return: a SensitivityResponse with the calculated sensitivity and its units along with weighting results and the transformed results containing the results combining calculate and weighting results in a format easily usable by the UI.""" continuum_calculator_input = CalculatorInput( freq_centre_mhz=calc_weight_input.freq_centre.to(u.MHz).value, bandwidth_mhz=calc_weight_input.bandwidth_mhz, num_stations=calc_weight_input.num_stations, pointing_centre=calc_weight_input.pointing_centre, integration_time_h=calc_weight_input.integration_time_h, elevation_limit=calc_weight_input.elevation_limit, ) effective_resolution_mhz = ( LOW_CONTINUUM_CHANNEL_WIDTH_KHZ / 1e3 ) * calc_weight_input.spectral_averaging_factor spectral_calculator_input = CalculatorInput( freq_centre_mhz=calc_weight_input.freq_centre.to(u.MHz).value, bandwidth_mhz=effective_resolution_mhz, num_stations=calc_weight_input.num_stations, pointing_centre=calc_weight_input.pointing_centre, integration_time_h=calc_weight_input.integration_time_h, elevation_limit=calc_weight_input.elevation_limit, ) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(calc_weight_input.bandwidth_mhz, "MHz"), frequency=calc_weight_input.freq_centre.to(u.MHz), effective_channel_width=Quantity(effective_resolution_mhz, "MHz"), ) warnings = [] continuum_result = _get_calculation_value(continuum_calculator_input, warnings) spectral_result = _get_calculation_value(spectral_calculator_input, warnings) # Warnings will be identical, so remove duplicate: warnings = list(set(warnings)) subband_sensitivities = _get_subband_sensitivities( calc_weight_input, calc_weight_input.num_stations ) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) # we only get weighting results if not a custom subarray weighted_continuum = None weighted_spectral = None if calc_weight_input.subarray_configuration is not None: # get continuum weighting results weighted_continuum: ContinuumWeightingResponse = ( get_continuum_weighting_response(calc_weight_input) ) # switch to spectral mode calc_weight_input.spectral_mode = WeightingSpectralMode.LINE # get spectral weighting results weighted_spectral: ContinuumWeightingResponse = ( get_continuum_weighting_response(calc_weight_input) ) transformed_warnings = [] continuum_sensitivities_transformations_input = ( ContinuumSensitivitiesTransformationsInput( continuum_sensitivity=continuum_result, spectral_sensitivity=spectral_result, continuum_weighting_factor=weighted_continuum.weighting_factor if weighted_continuum else None, spectral_weighting_factor=weighted_spectral.weighting_factor if weighted_spectral else None, continuum_sbs_conv_factor=weighted_continuum.sbs_conv_factor if weighted_continuum else None, spectral_sbs_conv_factor=weighted_spectral.sbs_conv_factor if weighted_spectral else None, continuum_conf_noise=weighted_continuum.confusion_noise.value if weighted_continuum else None, spectral_conf_noise=weighted_spectral.confusion_noise.value if weighted_spectral else None, continuum_beam_min=weighted_continuum.beam_size.beam_min_scaled if weighted_continuum else None, continuum_beam_maj=weighted_continuum.beam_size.beam_maj_scaled if weighted_continuum else None, spectral_beam_min=weighted_spectral.beam_size.beam_min_scaled if weighted_spectral else None, spectral_beam_maj=weighted_spectral.beam_size.beam_maj_scaled if weighted_spectral else None, ) ) transformed_result = asdict( calculate_continuum_sensitivity( continuum_sensitivities_transformations_input, transformed_warnings, calc_weight_input.weighting_mode, calc_weight_input.robustness, is_custom=calc_weight_input.subarray_configuration is None, ) ) subbands_transformed_result = {} if subband_sensitivities: # get subbands transformed result weighted_subbands_input = SensitivityTransformationsSubbandsInput( calculate_subbands=subband_sensitivities, weighting_subbands=weighted_continuum.subbands if weighted_continuum and weighted_continuum.subbands else None, ) subbands_transformed_result = asdict( ( calculate_continuum_sensitivities_subbands_results( weighted_subbands_input, non_gaussian=is_beam_non_gaussian( calc_weight_input.weighting_mode, calc_weight_input.robustness, ), is_custom=calc_weight_input.subarray_configuration is None, warnings=[], ) ) ) transformed_result["subbands"] = subbands_transformed_result weighting = asdict( WeightingContinuumSpectralResults( continuum_weighting=weighted_continuum, spectral_weighting=weighted_spectral, ) ) response = ContinuumSensitivityResponse( calculate=ContinuumSensitivityResults( continuum_sensitivity=continuum_result, continuum_subband_sensitivities=subband_sensitivities, spectral_sensitivity=spectral_result, warnings=warnings, spectropolarimetry_results=spectropolarimetry_results, ), weighting=weighting, transformed_result=transformed_result, ) return response
[docs] def convert_zoom_input_and_calculate( params: ZoomCalculatorAndWeightingInput, ) -> list[ZoomSensitivityResponse]: """ :param user_input: A kwarg dict of the HTTP parameters sent by the user :return: a dict containing the calculated sensitivity and its units """ calculations = [] for freq_centre, spectral_resolution_hz, total_bandwidth_khz in zip( params.freq_centres, params.spectral_resolutions_hz, params.total_bandwidths_khz, ): effective_resolution_hz = ( spectral_resolution_hz * params.spectral_averaging_factor ) calculator_input = CalculatorInput( freq_centre_mhz=freq_centre.to(u.MHz).value, bandwidth_mhz=effective_resolution_hz * 1e-6, # Convert to MHz num_stations=params.num_stations, pointing_centre=params.pointing_centre, integration_time_h=params.integration_time_h, elevation_limit=params.elevation_limit, ) spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(total_bandwidth_khz, "kHz"), frequency=freq_centre.to(u.MHz), effective_channel_width=Quantity(effective_resolution_hz, "Hz"), ) warnings = [] sensitivity = _get_calculation_value(calculator_input, warnings) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) calculations.append( SingleZoomSensitivityResponse( freq_centre=freq_centre.to(u.MHz), spectral_sensitivity=sensitivity, warnings=warnings, spectropolarimetry_results=spectropolarimetry_results, ) ) transformed_warnings = [] if params.subarray_configuration is None: weightings = None transformed_result = _zoom_custom(calculations, transformed_warnings) else: weightings = get_zoom_weighting_response(params) if is_beam_non_gaussian(params.weighting_mode, params.robustness): transformed_result = _zoom_non_gaussian( calculations, weightings, transformed_warnings ) else: transformed_result = _zoom_gaussian( calculations, weightings, transformed_warnings ) response = ZoomSensitivityResponse( calculate=calculations, weighting=weightings, transformed_result=transformed_result, ) return asdict(response)
def _zoom_non_gaussian( calculations: list[SingleZoomSensitivityResponse], weightings: list[SingleZoomWeightingResponse], transformed_warnings: list[dict], ) -> dict: transformed_result = [] for calculation, weighting in zip(calculations, weightings): calculation: SingleZoomSensitivityResponse weighting: SingleZoomWeightingResponse weighted_spectral_sensitivity = get_weighted_sensitivity( calculation.spectral_sensitivity, weighting.weighting_factor, ) result = ZoomSensitivityTransformedResults( weighted_spectral_sensitivity=weighted_spectral_sensitivity, warnings=transformed_warnings, ) transformed_result.append(result) return transformed_result def _zoom_gaussian( calculations: list[SingleZoomSensitivityResponse], weightings: list[SingleZoomWeightingResponse], transformed_warnings: list[dict], ) -> dict: transformed_result = [] for calculation, weighting in zip(calculations, weightings): result = calculate_zoom_results( ZoomSensitivitiesTransformationsInput( confusion_noise=weighting.confusion_noise.value, maj_beam_size=weighting.beam_size.beam_maj_scaled, min_beam_size=weighting.beam_size.beam_min_scaled, sbs_conv_factor=weighting.sbs_conv_factor, spectral_sensitivity=calculation.spectral_sensitivity, weighting_factor=weighting.weighting_factor, ), transformed_warnings, ) transformed_result.append(result) return transformed_result def _zoom_custom( calculations: list[SingleZoomSensitivityResponse], transformed_warnings: list[dict] ) -> list[ZoomSensitivityTransformedResults]: # get non weighting (custom) weighted results transformed_result = [] for calculation in calculations: result = calculate_zoom_results_without_weighting( SensitivitiesTransformationsWithoutWeightingInput( spectral_sensitivity=calculation.spectral_sensitivity, ), transformed_warnings, # TODO: Does not seem to be a good idea to include the same list to every result? And it is just a one flag. ) transformed_result.append(result) return transformed_result
[docs] def convert_pss_input_and_calculate(user_input: dict) -> PSSSensitivityResponse: """ :param user_input: A kwarg dict of the HTTP parameters sent by the user :return: a dict containing the calculated sensitivity and its units """ num_stations = _num_stations_from_input(user_input) pulsar_mode = PulsarMode(user_input["pulsar_mode"]) # First, create a CalculatorInputPSS object. # Note that, for PSS single-pulse mode, the integration time # should be set to PSS sampling time calculator_input_pss = CalculatorInputPSS( freq_centre_mhz=user_input["freq_centre_mhz"], bandwidth_mhz=user_input["bandwidth_mhz"], chan_width=user_input["spectral_resolution_hz"], num_stations=num_stations, pointing_centre=user_input["pointing_centre"], integration_time_h=( user_input["integration_time_h"] if user_input["pulsar_mode"] == "folded_pulse" else PulsarSamplingTime.LOW_PSS.value.to(u.hour).value ), elevation_limit=user_input["elevation_limit"], dm=user_input["dm"], pulse_period=user_input["pulse_period"], intrinsic_pulse_width=user_input["intrinsic_pulse_width"], ) # Next, estimate the continuum sensitivity corresponding to the # integration time specified above continuum_input = CalculatorInput( freq_centre_mhz=calculator_input_pss.freq_centre_mhz, bandwidth_mhz=calculator_input_pss.bandwidth_mhz, num_stations=calculator_input_pss.num_stations, pointing_centre=calculator_input_pss.pointing_centre, integration_time_h=calculator_input_pss.integration_time_h, elevation_limit=calculator_input_pss.elevation_limit, ) # continuum_sensitivity, warning = _get_calculation_value(continuum_input) warnings = [] continuum_sensitivity = _get_calculation_value(continuum_input) # Finally, convert the continuum sensitivity to folded-pulse sensitivity pss_sensitivity = convert_continuum_to_bf_sensitivity( continuum_sensitivity, calculator_input_pss, pulsar_mode, ) if pulsar_mode == PulsarMode.FOLDED_PULSE: return PSSSensitivityResponse( folded_pulse_sensitivity=pss_sensitivity, warnings=warnings ) elif pulsar_mode == PulsarMode.SINGLE_PULSE: return PSSSensitivityResponse( single_pulse_sensitivity=pss_sensitivity, warnings=warnings ) else: raise ValueError(f"Unsupported pulsar mode: {pulsar_mode}")
[docs] def get_subarray_response(): """ return the appropriate subarray objects """ return [ { "name": subarray.name, "label": subarray.label, "n_stations": subarray.n_stations, } for subarray in subarray_storage.list() ]
def _get_calculation_value( calculator_input: CalculatorInput, warnings: Optional[list] = None ) -> Quantity: result = calculate_sensitivity(calculator_input) sensitivity = Quantity(result.sensitivity, result.units) if warnings is not None: _check_for_warning(calculator_input, warnings) return sensitivity def _check_for_warning(calculator_input: CalculatorInput, warnings: list[str]) -> None: mwa_cat = BrightSourceCatalog(threshold_jy=FLUX_DENSITY_THRESHOLD_JY) if mwa_cat.check_for_bright_sources( calculator_input.pointing_centre, calculator_input.freq_centre_mhz ): warnings.append( ( "The specified pointing contains at least one source brighter " + f"than {FLUX_DENSITY_THRESHOLD_JY} Jy. Your observation may be " + "dynamic range limited." ) ) def _num_stations_from_input(user_input: dict) -> int: """ If the user has given a subarray_configuration, extract the num_stations from that. Otherwise, use the value given by the user. Validation has checked that one and only on of these fields is present in the input. :param user_input: a dict of the parameters given by the user :return: the num_stations to use in the calculation """ if "subarray_configuration" in user_input: subarray = subarray_storage.load_by_name(user_input["subarray_configuration"]) return subarray.n_stations return user_input["num_stations"] def _get_subband_sensitivities( user_input: ContinuumCalculatorAndWeightingInput, num_stations: int ) -> List[SubbandResponse]: if user_input.n_subbands == 1 and ( user_input.subband_freq_centres is None or len(user_input.subband_freq_centres) == 0 ): return [] subband_freq_centres_hz, subband_bandwidth = get_subbands( user_input.n_subbands, user_input.freq_centre.to(u.MHz).value, user_input.bandwidth_mhz, ) return [ SubbandResponse( subband_freq_centre=Quantity(subband_frequency, "MHz"), sensitivity=_get_calculation_value( CalculatorInput( freq_centre_mhz=subband_frequency, bandwidth_mhz=subband_bandwidth, num_stations=num_stations, pointing_centre=user_input.pointing_centre, integration_time_h=user_input.integration_time_h, elevation_limit=user_input.elevation_limit, ), ), ) for subband_frequency in subband_freq_centres_hz ]