Source code for ska_ost_senscalc.mid.service

"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""

import copy
import dataclasses
from copy import deepcopy
from dataclasses import asdict, dataclass, field
from math import isnan, nan
from typing import Optional, TypedDict, Union

import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.units import Quantity

from ska_ost_senscalc.common.model import (
    CalculatorInputPSS,
    ContinuumIntegrationTimeTransformedResults,
    ContinuumRequest,
    ContinuumSensitivitiesTransformationsInput,
    ContinuumSensitivityTransformedResults,
    ContinuumWeightingRequestParams,
    IntegrationTimeTransformationsWithoutWeightingInput,
    PssRequestMid,
    PSSSensitivityResponse,
    PulsarMode,
    SensitivitiesTransformationsWithoutWeightingInput,
    SensitivityTransformationsSubbandsInput,
    SubbandsBeamSizeTransformedResults,
    SubbandsContinuumIntegrationTimeTransformedResults,
    SubbandsContinuumSensitivityTransformedResults,
    SubbandsItemsTransformedResults,
    Weighting,
    WeightingSpectralMode,
    ZoomRequestPrepared,
)
from ska_ost_senscalc.common.pss import convert_continuum_to_bf_sensitivity
from ska_ost_senscalc.common.service import (
    ContinuumWeightingResponse,
    SingleZoomWeightingResponse,
    calculate_continuum_integration_time_results_without_weighting,
    calculate_continuum_results_without_weighting,
    calculate_continuum_sensitivities_subbands_results,
    calculate_continuum_sensitivity,
    get_confusion_noise,
    get_continuum_weighting_response,
    get_subbands,
    get_surface_brightness_sensitivity,
    get_synthesized_beam_size,
    get_total_sensitivity,
    get_weighted_sensitivity,
    get_zoom_weighting_response,
    is_beam_non_gaussian,
    sensitivity_limit_reached,
    sensitivity_on_unit,
    thermal_sensitivity,
)
from ska_ost_senscalc.common.spectropolarimetry import (
    SpectropolarimetryInput,
    SpectropolarimetryResults,
    get_spectropolarimetry_results,
)
from ska_ost_senscalc.mid.calculator import (
    calculate_integration_time,
    calculate_sensitivity,
    prepare_integration_input,
    prepare_sensitivity_input,
)
from ska_ost_senscalc.mid.models import (
    ThermalData,
    ZoomIntegrationTimeResult,
    ZoomSensitivityResult,
)
from ska_ost_senscalc.mid.validation import (
    MID_CONTINUUM_CHANNEL_WIDTH_KHZ,
    validate_and_set_defaults_for_continuum,
)
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope

subarray_storage = SubarrayStorage(Telescope.MID)


# TODO: Move these  models to models.py
[docs] class SingleSubbandResponse(TypedDict): subband_freq_centre: Quantity sensitivity: Optional[Quantity] integration_time: Optional[Quantity]
[docs] class ContinuumSensitivityResponse(TypedDict): """ Typed dictionary constrained to match the OpenAPI schema for the response body of a single continuum sensitivity calculation. """ continuum_sensitivity: Optional[Quantity] continuum_integration_time: Optional[Quantity] spectral_sensitivity: Optional[Quantity] spectral_integration_time: Optional[Quantity] continuum_subband_sensitivities: Optional[list[SingleSubbandResponse]] continuum_subband_integration_times: Optional[list[SingleSubbandResponse]] spectropolarimetry_results: SpectropolarimetryResults
[docs] @dataclass class SingleZoomBaseResponse: """ Base data class constrained to match the OpenAPI schema for the response body of a single zoom sensitivity or integration time calculation. """ freq_centre: Quantity spectropolarimetry_results: SpectropolarimetryResults warnings: list[str] = field(default_factory=list)
[docs] @dataclass class SingleZoomSensitivityResponse(SingleZoomBaseResponse): """ Dataclass constrained to match the OpenAPI schema for the response body of a single zoom sensitivity calculation. """ spectral_sensitivity: Quantity = None
[docs] @dataclass class SingleZoomIntegrationTimeResponse(SingleZoomBaseResponse): """ Dataclass constrained to match the OpenAPI schema for the response body of a single zoom integration time calculation. """ spectral_integration_time: Quantity = None
[docs] class ContinuumCalculateResponse(ContinuumSensitivityResponse): continuum_weighting: ContinuumWeightingResponse spectral_weighting: ContinuumWeightingResponse transformed_result: ContinuumSensitivityTransformedResults | ContinuumSensitivityTransformedResults
[docs] def calculate_pss_sensitivity(params: PssRequestMid) -> Quantity: """ Transform the continuum sensitivity to PSS sensitivity for a given set of PSS-specific input parameters. """ # ToDo: Find a better way to estimate the total number of dishes in a subarray subarray = next( ( sub_array for sub_array in get_subarray_response() if sub_array["label"] == params.subarray_configuration ), None, ) if subarray is None: raise ValueError( f"Subarray '{params.subarray_configuration}' not found in response." ) num_stations = subarray["n_ska"] + subarray["n_meer"] # # Finally, convert the continuum sensitivity to folded-pulse sensitivity # # Create a CalculatorInputPSS object. # # Note that, for PSS single-pulse mode, the integration time # # should be set to PSS sampling time calculator_input_pss = CalculatorInputPSS( freq_centre_mhz=params.freq_centre_hz * u.Hz.to(u.MHz), bandwidth_mhz=params.bandwidth_hz * u.Hz.to(u.MHz), chan_width=params.spectral_resolution_hz, num_stations=num_stations, integration_time_h=params.integration_time_s * u.s.to(u.hour), dm=params.dm, pulse_period=params.pulse_period, intrinsic_pulse_width=params.intrinsic_pulse_width, ) continuum_kwargs = copy.deepcopy(params) continuum_kwargs.weighting_mode = ( Weighting.NATURAL ) # Change weighting scheme to natural continuum_kwargs = asdict(continuum_kwargs) # Next, calculate the continuum sensitivity using these input parameters assuming # natural weighting scheme. # Remove pulsar_mode, "spectral_resolution_hz", "dm", "pulse_period", and # "intrinsic_pulse_width" from continuum_kwargs to make ContinuumRequest happy for key in [ "pulsar_mode", "dm", "pulse_period", "intrinsic_pulse_width", "spectral_resolution_hz", ]: continuum_kwargs.pop(key, None) continuum_params = ContinuumRequest(**continuum_kwargs) validated_params = validate_and_set_defaults_for_continuum(continuum_params) continuum_result = convert_continuum_input_and_calculate(validated_params) pulsar_mode = PulsarMode(params.pulsar_mode) # convert_continuum_to_bf_sensitivity() expects the continuum sensitivity # to be in Jy/beam. So, divide by u.beam pss_sensitivity = convert_continuum_to_bf_sensitivity( continuum_result["calculate"]["continuum_sensitivity"] / u.beam, calculator_input_pss, pulsar_mode, ) if pulsar_mode == PulsarMode.FOLDED_PULSE: return PSSSensitivityResponse(folded_pulse_sensitivity=pss_sensitivity) elif pulsar_mode == PulsarMode.SINGLE_PULSE: return PSSSensitivityResponse(single_pulse_sensitivity=pss_sensitivity) else: raise ValueError(f"Unsupported pulsar mode: {pulsar_mode}.")
# TODO: consider renaming the function to make it something like continuum_result
[docs] def convert_continuum_input_and_calculate(params: ContinuumRequest) -> dict: """Perform the calculations, weightings and weighted results. Then collect them into the response body.""" warnings = [] params.target = params.pointing_centre calculate_result = {} # Spectral results, which are returned in the continuum response body spectral_params = deepcopy(params) # For the spectral calculation, the bandwidth used in the calculation should be the effective resolution, # which is the intrinsic channel width multiplied by the spectral_averaging_factor effective_resolution_hz = ( MID_CONTINUUM_CHANNEL_WIDTH_KHZ * 1e3 * spectral_params.spectral_averaging_factor ) spectral_params.bandwidth_hz = effective_resolution_hz continuum_sensitivity = None spectral_sensitivity = None error_message = False non_gaussiam_beam = is_beam_non_gaussian( params.weighting_mode, params.robustness, params.taper ) continuum_weighting, spectral_weighting = _continuum_weightings(params) if integration_time_s := params.integration_time_s: sensitivity_input = prepare_sensitivity_input( integration_time_s, **asdict(params) ) continuum_sensitivity = calculate_sensitivity(sensitivity_input).to(u.Jy)[0] calculate_result["continuum_sensitivity"] = continuum_sensitivity # Calculate spectral sensitivity using effective resolution for bandwidth sensitivity_input = prepare_sensitivity_input( integration_time_s, **asdict(spectral_params) ) spectral_sensitivity = calculate_sensitivity(sensitivity_input).to(u.Jy)[0] calculate_result["spectral_sensitivity"] = spectral_sensitivity continuum_thermal_sensitivity = None # TODO: consider to remove if supplied_sensitivity := params.supplied_sensitivity: # TODO: consider calculating the sensitivity early on here and pass as a variable if params.subarray_configuration is not None: ( calculate_result, continuum_thermal_sensitivity, error_message, ) = _calculate_continuum_integration_time( params, spectral_params, continuum_weighting, spectral_weighting, calculate_result, ) else: calculate_result = _calculate_continuum_integration_time_custom( params, spectral_params, calculate_result ) calculate_result = _calculate_continuum_subbands( params, continuum_weighting, warnings, calculate_result ) calculate_result["spectropolarimetry_results"] = get_spectropolarimetry_results( SpectropolarimetryInput( bandwidth=u.Quantity(params.bandwidth_hz, "Hz"), frequency=u.Quantity(params.freq_centre_hz, "Hz"), effective_channel_width=u.Quantity(effective_resolution_hz, "Hz"), ) ) transformed_result = _continuum_transformed_result( params, continuum_weighting=continuum_weighting, spectral_weighting=spectral_weighting, continuum_thermal_sensitivity=continuum_thermal_sensitivity, continuum_sensitivity=continuum_sensitivity, spectral_sensitivity=spectral_sensitivity, calculation=calculate_result, warnings=warnings, non_gaussiam_beam=non_gaussiam_beam, error_message=error_message, ) # if the eror_message is True then only the weighting result is returned if error_message: transformed_result = None calculate_result.pop("continuum_subband_integration_times", None) calculate_result.pop("spectropolarimetry_results", None) if continuum_weighting: continuum_weighting = asdict(continuum_weighting) if spectral_weighting: spectral_weighting = asdict(spectral_weighting) combined_result = { "calculate": calculate_result, "weighting": { "continuum_weighting": continuum_weighting, "spectral_weighting": spectral_weighting, }, "transformed_result": transformed_result, } return combined_result
def _continuum_weightings(params: ContinuumRequest) -> tuple: msg = "Number of subbands does not match their frequencies centres" if len(params.subband_freq_centres) != 0: # TODO: AK -> this looks like it is never needed and in fact a duplication of subband_freq_centres_hz. # TODO: We should remove in next ticket/MR. pass no_subbands = params.n_subbands == 1 and len(params.subband_freq_centres_hz) == 0 some_subbands = ( params.n_subbands > 1 and len(params.subband_freq_centres_hz) == params.n_subbands ) assert no_subbands or some_subbands, msg # prepare weighting as we need them for both integration/sensitivity # TODO: add checking for custom for not sending weighting request # we only get weighting results if not a custom subarray continuum_weighting = None spectral_weighting = None if params.subarray_configuration is not None: weighting_request_params = ContinuumWeightingRequestParams( spectral_mode=WeightingSpectralMode.CONTINUUM, telescope=params.telescope, weighting_mode=params.weighting_mode, subarray_configuration=params.subarray_configuration, dec=params.pointing_centre.dec, freq_centre=params.freq_centre, taper=params.taper, robustness=params.robustness, subband_freq_centres=params.subband_freq_centres_hz, ) continuum_weighting = get_continuum_weighting_response(weighting_request_params) # Spectural/Line Weighting spectral_weighting_request_params = deepcopy(weighting_request_params) spectral_weighting_request_params.spectral_mode = WeightingSpectralMode.LINE spectral_weighting = get_continuum_weighting_response( spectral_weighting_request_params ) return continuum_weighting, spectral_weighting def _calculate_continuum_integration_time( params: ContinuumRequest, spectral_params: ContinuumRequest, continuum_weighting: ContinuumWeightingResponse, spectral_weighting: ContinuumWeightingResponse, result: dict, ) -> tuple: # prepare thermal sensitivity continuum_sensitivity = sensitivity_on_unit( params.supplied_sensitivity, continuum_weighting.sbs_conv_factor, params.sensitivity_unit, ) continuum_confusion_noise = Quantity( _adjust_confusion_noise(params, continuum_weighting.confusion_noise.value), u.Jy / u.beam, ) continuum_thermal_sensitivity = thermal_sensitivity( continuum_sensitivity, continuum_weighting.weighting_factor, continuum_confusion_noise.value, ) error_message = _check_sensitivity_limit( continuum_sensitivity, continuum_confusion_noise ) if error_message: # and not is_beam_non_gaussian(params.weighting_mode, params.robustness, params.taper): result["warnings"] = [error_message] return result, continuum_thermal_sensitivity, error_message # prepare thermal sensitivity for spectral spectral_sensitivity = sensitivity_on_unit( params.supplied_sensitivity, spectral_weighting.sbs_conv_factor, params.sensitivity_unit, ) spectral_thermal_sensitivity = thermal_sensitivity( spectral_sensitivity, spectral_weighting.weighting_factor, continuum_confusion_noise.value, ) integration_input = prepare_integration_input( continuum_thermal_sensitivity.to(u.Jy).value, **asdict(params) ) integration_time = calculate_integration_time(integration_input).to(u.s) result["continuum_integration_time"] = integration_time[0] # Calculate spectral integration time using resolution for bandwidth integration_input = prepare_integration_input( spectral_thermal_sensitivity.to(u.Jy).value, **asdict(spectral_params), ) spectral_integration_time = calculate_integration_time(integration_input).to(u.s) result["spectral_integration_time"] = spectral_integration_time[0] return result, continuum_thermal_sensitivity, error_message def _calculate_continuum_integration_time_custom( params: ContinuumRequest, spectral_params: ContinuumRequest, result: dict ) -> dict: # For custom subarrays, we don't calculate a thermal sensitivity, we do the calculation with the supplied sensitivity custom_continuum_integration_time = _calculate_custom_integration_time( params.supplied_sensitivity, params ) result["continuum_integration_time"] = custom_continuum_integration_time[0] custom_spectral_integration_time = _calculate_custom_integration_time( params.supplied_sensitivity, spectral_params ) result["spectral_integration_time"] = custom_spectral_integration_time[0] return result def _calculate_continuum_subbands( params: ContinuumRequest, continuum_weighting: ContinuumWeightingResponse, warnings: list[str], result: dict, ) -> dict: # Subbands - if subbands is 1 then we just return the main sensitivity calculation as the subband is the whole bandwidth if params.n_subbands > 1: subband_results = [] subband_freq_centres_hz, subband_bandwidth = get_subbands( params.n_subbands, params.freq_centre_hz, params.bandwidth_hz ) if integration_time_s := params.integration_time_s: for subband_freq_centre_hz in subband_freq_centres_hz: # Create the calculator for each subband result subband_params = deepcopy(params) subband_params.freq_centre_hz = subband_freq_centre_hz subband_params.bandwidth_hz = subband_bandwidth sensitivity_input = prepare_sensitivity_input( integration_time_s, **asdict(subband_params) ) sensitivity = calculate_sensitivity(sensitivity_input).to(u.Jy) subband_results.append( { "subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"), "sensitivity": sensitivity[0], } ) result["continuum_subband_sensitivities"] = subband_results else: if params.subarray_configuration is not None: subband_result = _calculate_supplied_sensitivity_subbands( params, continuum_weighting, warnings, subband_freq_centres_hz, subband_bandwidth, ) result["continuum_subband_integration_times"] = subband_result else: custom_subband_result = _calculate_supplied_sensitivity_subbands_custom( params, subband_bandwidth ) result["continuum_subband_integration_times"] = custom_subband_result return result def _calculate_supplied_sensitivity_subbands( params: ContinuumRequest, continuum_weighting: ContinuumWeightingResponse, warnings: list[str], subband_freq_centres_hz: list[float], subband_bandwidth: float, ) -> list[dict]: """ Calculates the subbands for calculate results supplied sensitivity. :param params: the request parameters. :type params: data class ContinuumRequest :param continuum_weighting: the continuum weighting results. :type continuum_weighting: ContinuumWeightingResponse :param warnings: the calculate warnings. :type warnings: list of strings :param subband_freq_centres_hz: the subband frequency centres. :type subband_freq_centres_hz: list of floats. :param subband_bandwidth: the subband bandwidth. :type subband_bandwidth: float :return: the subbands results. :rtype: a list of dictionaries """ # populate thermal array since we need to check first confusion noise subband_thermal_sensitivities = [] subbands = [] if params.subband_supplied_sensitivities: for sensitivity in params.subband_supplied_sensitivities: subband_thermal_sensitivities.append(Quantity(sensitivity, u.Jy)) else: for weighting in continuum_weighting.subbands: continuum_confusion_noise = Quantity( _adjust_confusion_noise(params, weighting.confusion_noise.value), u.Jy / u.beam, ) subband_sensitivities = sensitivity_on_unit( params.supplied_sensitivity, weighting.sbs_conv_factor, params.sensitivity_unit, ) subband_thermal_sensitivities.append( thermal_sensitivity( subband_sensitivities, weighting.weighting_factor, continuum_confusion_noise.value, ) ) first_subband_confusion_noise = continuum_weighting.subbands[ 0 ].confusion_noise.value if first_subband_confusion_noise >= subband_thermal_sensitivities[0].value: warnings.append( _sens_limit_error( first_subband_confusion_noise, subband_thermal_sensitivities[0] ) ) for subband_freq_centre_hz, subband_supplied_sensitivity in zip( subband_freq_centres_hz, subband_thermal_sensitivities ): # Create the calculator for each subband result subband_params = deepcopy(params) subband_params.freq_centre_hz = subband_freq_centre_hz subband_params.bandwidth_hz = subband_bandwidth integration_input = prepare_integration_input( subband_supplied_sensitivity, **asdict(subband_params) ) integration_time = calculate_integration_time(integration_input).to(u.s) subbands.append( { "subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"), "integration_time": integration_time[0], } ) return subbands def _calculate_supplied_sensitivity_subbands_custom( params: ContinuumRequest, subband_bandwidth: float ) -> list[dict]: """ Calculates the subbands for calculate results supplied sensitivity in the case of a custom subarray. When a subarray is provided, the subbands are calculated using thermal sensitivities, which is not possible for custom subarrays. Instead, the subbands are calculated using the supplied sensitivities. :param params: the request parameters. :type params: data class ContinuumRequest :param subband_bandwidth: the subband bandwidth. :type subband_bandwidth: float :return: the subbands results. :rtype: a list of dictionaries """ subbands = [] subband_supplied_sensitivities_jy = [ Quantity(sensitivity, params.subband_supplied_sensitivities_unit) .to(u.Jy / u.beam) .value for sensitivity in params.subband_supplied_sensitivities ] for subband_freq_centre_hz, subband_sensitivity_jy in zip( params.subband_freq_centres_hz, subband_supplied_sensitivities_jy, ): # Create the calculator for each subband result subband_params = deepcopy(params) subband_params.freq_centre_hz = subband_freq_centre_hz subband_params.bandwidth_hz = subband_bandwidth integration_input = prepare_integration_input( subband_sensitivity_jy, **asdict(subband_params) ) integration_time = calculate_integration_time(integration_input).to(u.s) subbands.append( { "subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"), "integration_time": integration_time[0], } ) return subbands def _continuum_transformed_result( params: ContinuumRequest, continuum_weighting: ContinuumWeightingResponse, spectral_weighting: ContinuumWeightingResponse, continuum_thermal_sensitivity: Quantity, continuum_sensitivity: Quantity, spectral_sensitivity: Quantity, calculation: dict, warnings: list[str], non_gaussiam_beam: bool, error_message: Union[bool, str] = False, ) -> dict: if params.subarray_configuration is not None: if params.integration_time_s: if non_gaussiam_beam: transformed_result = _continuum_non_gaussian_sensitivity( calculation=calculation, continuum_weighting=continuum_weighting, spectral_weighting=spectral_weighting, warnings=warnings, ) else: transformed_result = _continuum_gaussian_sensitivity( calculation=calculation, continuum_thermal_sensitivity=continuum_thermal_sensitivity, continuum_weighting=continuum_weighting, spectral_weighting=spectral_weighting, warnings=warnings, params=params, ) else: if non_gaussiam_beam: transformed_result = _continuum_integration_time_non_gaussian( calculation=calculation, warnings=warnings, ) else: transformed_result = _continuum_integration_time( continuum_thermal_sensitivity=continuum_thermal_sensitivity, calculation=calculation, continuum_weighting=continuum_weighting, spectral_weighting=spectral_weighting, error_message=error_message, warnings=warnings, ) else: if params.integration_time_s: transformed_result = _continuum_transformed_result_custom_sensitivity( continuum_thermal_sensitivity=continuum_thermal_sensitivity, continuum_sensitivity=continuum_sensitivity, spectral_sensitivity=spectral_sensitivity, calculation=calculation, ) elif params.supplied_sensitivity: transformed_result = _continuum_transformed_result_custom_integration_time( calculation ) if type(transformed_result) != dict: transformed_result = asdict(transformed_result) return transformed_result def _continuum_transformed_result_custom_sensitivity( continuum_thermal_sensitivity: Quantity, continuum_sensitivity: Quantity, spectral_sensitivity: Quantity, calculation: dict, ) -> dict: continuum_sensitivities_without_weighting_input = ( SensitivitiesTransformationsWithoutWeightingInput( continuum_sensitivity=continuum_sensitivity, spectral_sensitivity=spectral_sensitivity, ) ) transformed_result = calculate_continuum_results_without_weighting( continuum_sensitivities_without_weighting_input, warnings=[], ) transformed_result = asdict(transformed_result) subbands = {} if calculate_subbands := calculation.get("continuum_subband_sensitivities"): subbands_data = SensitivityTransformationsSubbandsInput( calculate_subbands=calculate_subbands, continuum_thermal_sensitivity=continuum_thermal_sensitivity, ) subbands = calculate_continuum_sensitivities_subbands_results( subbands_data, non_gaussian=False, is_custom=True, warnings=[] ) subbands = asdict(subbands) transformed_result["subbands"] = subbands return transformed_result def _continuum_transformed_result_custom_integration_time( calculation: dict, ) -> dict: continuum_integration_time_without_weighting_input = ( IntegrationTimeTransformationsWithoutWeightingInput( spectral_integration_time=calculation["spectral_integration_time"], continuum_integration_time=calculation["continuum_integration_time"], ) ) transformed_result = asdict( calculate_continuum_integration_time_results_without_weighting( continuum_integration_time_without_weighting_input, warnings=[], ) ) subbands = {} # get custom subbands weighted results if calculation.get("continuum_subband_integration_times"): weighted_subbands_input = SensitivityTransformationsSubbandsInput( calculate_subbands=calculation["continuum_subband_integration_times"], ) subbands = asdict( _calculate_continuum_integration_time_subbands_results_custom( weighted_subbands_input, warnings=[] ) ) transformed_result["subbands"] = subbands return transformed_result def _continuum_integration_time_non_gaussian( calculation: dict, warnings: list[str], ) -> dict: continuum_integration_time = calculation["continuum_integration_time"] spectral_integration_time = calculation["spectral_integration_time"] subbands = {} if calculation.get("continuum_subband_integration_times"): max_time = calculation["continuum_subband_integration_times"][0][ "integration_time" ] min_time = calculation["continuum_subband_integration_times"][-1][ "integration_time" ] subbands = asdict( SubbandsContinuumIntegrationTimeTransformedResults( integration_time_per_subband=SubbandsItemsTransformedResults( min_value=min_time, max_value=max_time ) ) ) result = ContinuumIntegrationTimeTransformedResults( continuum_integration_time=continuum_integration_time, spectral_integration_time=spectral_integration_time, warnings=warnings, ) result = asdict(result) result["subbands"] = subbands return result def _continuum_non_gaussian_sensitivity( calculation: dict, continuum_weighting: ContinuumWeightingResponse, spectral_weighting: ContinuumWeightingResponse, warnings: list[str], ) -> dict: continuum_senstivity = ( calculation["continuum_sensitivity"] * continuum_weighting.weighting_factor ) spectral_sensitivity = ( calculation["spectral_sensitivity"] * spectral_weighting.weighting_factor ) subbands = {} if calculation.get("continuum_subband_sensitivities"): max_sensitivity = calculation["continuum_subband_sensitivities"][0][ "sensitivity" ] min_sensitivity = calculation["continuum_subband_sensitivities"][-1][ "sensitivity" ] subbands = asdict( SubbandsContinuumSensitivityTransformedResults( weighted_sensitivity_per_subband=SubbandsItemsTransformedResults( min_value=u.Quantity(min_sensitivity.value, u.Jy / u.beam), max_value=u.Quantity(max_sensitivity.value, u.Jy / u.beam), ) ) ) result = ContinuumSensitivityTransformedResults( weighted_continuum_sensitivity=u.Quantity( continuum_senstivity.value, u.Jy / u.beam ), weighted_spectral_sensitivity=u.Quantity( spectral_sensitivity.value, u.Jy / u.beam ), warnings=warnings, ) result = asdict(result) result["subbands"] = subbands return result def _continuum_gaussian_sensitivity( calculation: dict, continuum_thermal_sensitivity, continuum_weighting: ContinuumWeightingResponse, spectral_weighting: ContinuumWeightingResponse, warnings: list[str], params: ContinuumRequest, ) -> dict: # integration time -> sensitivity result = calculate_continuum_sensitivity( ContinuumSensitivitiesTransformationsInput( continuum_sensitivity=calculation["continuum_sensitivity"], spectral_sensitivity=calculation["spectral_sensitivity"], continuum_weighting_factor=continuum_weighting.weighting_factor, spectral_weighting_factor=spectral_weighting.weighting_factor, continuum_sbs_conv_factor=continuum_weighting.sbs_conv_factor, spectral_sbs_conv_factor=spectral_weighting.sbs_conv_factor, continuum_conf_noise=continuum_weighting.confusion_noise.value, spectral_conf_noise=spectral_weighting.confusion_noise.value, continuum_beam_min=continuum_weighting.beam_size.beam_min_scaled, continuum_beam_maj=continuum_weighting.beam_size.beam_maj_scaled, spectral_beam_min=spectral_weighting.beam_size.beam_min_scaled, spectral_beam_maj=spectral_weighting.beam_size.beam_maj_scaled, ), warnings, params.weighting_mode, params.robustness, ) subbands = {} if calculate_subbands := calculation.get("continuum_subband_sensitivities"): weighting_subbands = continuum_weighting.subbands subbands_data = SensitivityTransformationsSubbandsInput( calculate_subbands=calculate_subbands, weighting_subbands=weighting_subbands, continuum_thermal_sensitivity=continuum_thermal_sensitivity, ) subbands = calculate_continuum_sensitivities_subbands_results( subbands_data, non_gaussian=is_beam_non_gaussian( params.weighting_mode, robustness=params.robustness, ), is_custom=False, warnings=[], ) subbands = asdict(subbands) result = asdict(result) result["subbands"] = subbands return result def _continuum_integration_time_subbands( continuum_thermal_sensitivity, calculation, continuum_weighting: ContinuumWeightingResponse, confusion_noises, spectral_confusion_noise, ) -> dict: subbands = {} if calculation.get("continuum_subband_integration_times"): assert len(continuum_weighting.subbands) > 0 confusion_noises.append(continuum_weighting.subbands[0].confusion_noise.value) confusion_noises.append(continuum_weighting.subbands[-1].confusion_noise.value) max_confusion_noise = max(confusion_noises) warnings = [] if sensitivity_limit_reached( max_confusion_noise, continuum_thermal_sensitivity ): msg = _sens_limit_error(spectral_confusion_noise) warnings.append(msg) weighted_subbands_input = SensitivityTransformationsSubbandsInput( calculate_subbands=calculation["continuum_subband_integration_times"], weighting_subbands=continuum_weighting.subbands, continuum_thermal_sensitivity=continuum_thermal_sensitivity, ) subbands = asdict( _calculate_continuum_integration_time_subbands_results( weighted_subbands_input, warnings ) ) return subbands def _calculate_custom_integration_time( supplied_sensitivity: float, params: ContinuumRequest ) -> Quantity: supplied_sensitivity_jy = Quantity( supplied_sensitivity, params.sensitivity_unit ).to(u.Jy / u.beam) integration_input = prepare_integration_input( supplied_sensitivity_jy.value, **asdict(params) ) return calculate_integration_time(integration_input) def _continuum_integration_time( continuum_thermal_sensitivity: Quantity, calculation: dict, continuum_weighting: ContinuumWeightingResponse, spectral_weighting: ContinuumWeightingResponse, error_message: Union[bool, str], warnings: list[str] = None, ) -> dict: continuum_confusion_noise = get_confusion_noise( confusion_noise=continuum_weighting.confusion_noise.value, maj_beam_size=continuum_weighting.beam_size.beam_maj_scaled, min_beam_size=continuum_weighting.beam_size.beam_min_scaled, ) continuum_synthesized_beam_size = get_synthesized_beam_size( continuum_weighting.beam_size.beam_min_scaled, continuum_weighting.beam_size.beam_maj_scaled, ) continuum_integration_time = calculation.get("continuum_integration_time") spectral_confusion_noise = get_confusion_noise( confusion_noise=spectral_weighting.confusion_noise.value, maj_beam_size=spectral_weighting.beam_size.beam_maj_scaled, min_beam_size=spectral_weighting.beam_size.beam_min_scaled, ) spectral_synthesized_beam_size = get_synthesized_beam_size( spectral_weighting.beam_size.beam_min_scaled, spectral_weighting.beam_size.beam_maj_scaled, ) spectral_integration_time = calculation.get("spectral_integration_time") transformed_result = asdict( ContinuumIntegrationTimeTransformedResults( continuum_confusion_noise=continuum_confusion_noise, continuum_synthesized_beam_size=continuum_synthesized_beam_size, continuum_integration_time=continuum_integration_time, spectral_confusion_noise=spectral_confusion_noise, spectral_synthesized_beam_size=spectral_synthesized_beam_size, spectral_integration_time=spectral_integration_time, ) ) # For sensitivity -> integration time we need to compare the entered (total) sensitivity with the up to # two confusion noise values that might be shown (full bandwidth, two confusion-noise values and spectral) confusion_noises = [ continuum_weighting.confusion_noise.value, spectral_weighting.confusion_noise.value, ] max_confusion_noise = max(confusion_noises) if sensitivity_limit_reached(max_confusion_noise, continuum_thermal_sensitivity): msg = _sens_limit_warning(spectral_confusion_noise) warnings.append(msg) if error_message: transformed_result["subbands"] = {} return transformed_result subbands = _continuum_integration_time_subbands( continuum_thermal_sensitivity, calculation=calculation, continuum_weighting=continuum_weighting, confusion_noises=confusion_noises, spectral_confusion_noise=spectral_confusion_noise, ) transformed_result["subbands"] = subbands return transformed_result def _calculate_continuum_integration_time_subbands_results( subbands_input: SensitivityTransformationsSubbandsInput, warnings: list[str], ) -> SubbandsContinuumIntegrationTimeTransformedResults: """ Uses an input containing values from Calculator and Weighting subbands to generate weighted subbands results to be displayed in the UI. :param subbands_input: the subband values from the calculator and weighting. :type subbands_input: data class SensitivityTransformationsSubbandsInput :param warnings: the list of warnings to be returned. :type warnings: List of string :return: a data class object representing the subband weighted results. :rtype: SubbandsContinuumIntegrationTimeTransformedResults """ # TODO: add tests for this function # TODO: check if this function is used anywhere max_time = subbands_input.calculate_subbands[0]["integration_time"] min_time = subbands_input.calculate_subbands[-1]["integration_time"] max_synthesized_beam = get_synthesized_beam_size( maj_beam_size=subbands_input.weighting_subbands[0].beam_size.beam_maj_scaled, min_beam_size=subbands_input.weighting_subbands[0].beam_size.beam_min_scaled, ) min_synthesized_beam = get_synthesized_beam_size( maj_beam_size=subbands_input.weighting_subbands[-1].beam_size.beam_maj_scaled, min_beam_size=subbands_input.weighting_subbands[-1].beam_size.beam_min_scaled, ) confusion_noise_per_subband = SubbandsItemsTransformedResults( max_value=subbands_input.weighting_subbands[0].confusion_noise.value * u.Jy, min_value=subbands_input.weighting_subbands[-1].confusion_noise.value * u.Jy, ) synthesized_beam_size_per_subband = SubbandsBeamSizeTransformedResults( max_value=max_synthesized_beam, min_value=min_synthesized_beam ) integration_time_per_subband = SubbandsItemsTransformedResults( max_value=max_time, min_value=min_time ) # handle warnings compare_max_confusion_noise_jy = [ subbands_input.weighting_subbands[0].confusion_noise.value, subbands_input.weighting_subbands[-1].confusion_noise.value, ] max_confusion_noise_jy = max(compare_max_confusion_noise_jy) if sensitivity_limit_reached( max_confusion_noise_jy, subbands_input.continuum_thermal_sensitivity ): warning = _sens_limit_error(max_confusion_noise_jy) warnings.append(warning) return SubbandsContinuumIntegrationTimeTransformedResults( confusion_noise_per_subband=confusion_noise_per_subband, synthesized_beam_size_per_subband=synthesized_beam_size_per_subband, integration_time_per_subband=integration_time_per_subband, warnings=warnings, ) def _calculate_continuum_integration_time_subbands_results_custom( subbands_input: SensitivityTransformationsSubbandsInput, warnings: list[str], ) -> SubbandsContinuumIntegrationTimeTransformedResults: """ Uses an input containing values from Calculator and Weighting subbands to generate weighted subbands results to be displayed in the UI. :param subbands_input: the subband values from the calculator and no weighting. :type subbands_input: data class SensitivityTransformationsSubbandsInput :param warnings: the list of warnings to be returned. :type warnings: List of string :return: a data class object representing the subband weighted results. :rtype: SubbandsContinuumIntegrationTimeTransformedResults """ max_time = subbands_input.calculate_subbands[0]["integration_time"] min_time = subbands_input.calculate_subbands[-1]["integration_time"] integration_time_per_subband = SubbandsItemsTransformedResults( max_value=max_time, min_value=min_time ) # There are no warnings for this mode (custom) as there is no confusion noise return SubbandsContinuumIntegrationTimeTransformedResults( integration_time_per_subband=integration_time_per_subband, warnings=warnings, )
[docs] def get_zoom_calculate_response( params: ZoomRequestPrepared, ) -> dict: """ Extract the params from the request, convert them into the relevant calculator inputs, perform the calculations and collect the results into the response body. """ # Parse the target target = SkyCoord( params.pointing_centre, frame="icrs", unit=(u.hourangle, u.deg), ) params.target = target calculations = [] warnings = [] if integration_time_s := params.integration_time_s: for zoom_freq_centre_hz, zoom_spectral_resolution_hz, total_bandwidth_hz in zip( params.freq_centres_hz, params.spectral_resolutions_hz, params.total_bandwidths_hz, ): zoom_params = dataclasses.replace(params) zoom_params.freq_centre_hz = zoom_freq_centre_hz effective_resolution_hz = ( zoom_spectral_resolution_hz * params.spectral_averaging_factor ) zoom_params.bandwidth_hz = effective_resolution_hz sensitivity_input = prepare_sensitivity_input( integration_time_s, **asdict(zoom_params) ) sensitivity = calculate_sensitivity(sensitivity_input).to(u.Jy)[0] spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(total_bandwidth_hz, "Hz"), frequency=Quantity(zoom_freq_centre_hz, "Hz"), effective_channel_width=Quantity(effective_resolution_hz, "Hz"), ) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) single = SingleZoomSensitivityResponse( freq_centre=zoom_freq_centre_hz * u.Hz, spectral_sensitivity=sensitivity, spectropolarimetry_results=spectropolarimetry_results, ) calculations.append(asdict(single)) # We need them early for sensitivities calculations weightings = get_zoom_weighting_response(params) thermals = [] if params.supplied_sensitivities: assert ( len(params.freq_centres_hz) == len(params.spectral_resolutions_hz) == len(params.supplied_sensitivities) == len(params.total_bandwidths_hz) ) for i in range(len(params.freq_centres_hz)): zoom_freq_centre_hz = params.freq_centres_hz[i] zoom_spectral_resolution_hz = params.spectral_resolutions_hz[i] zoom_supplied_sensitivity = params.supplied_sensitivities[i] total_bandwidth_hz = params.total_bandwidths_hz[i] try: weighting = weightings[i] sensitivity = sensitivity_on_unit( zoom_supplied_sensitivity, weighting.sbs_conv_factor, params.sensitivity_unit, ) confusion_noise = _adjust_confusion_noise( params, weighting.confusion_noise.value ) _thermal_sensitivity = thermal_sensitivity( sensitivity, weighting.weighting_factor, confusion_noise ) thermal = ThermalData( sensitivity, confusion_noise, _thermal_sensitivity, ) thermals.append(thermal) error_message = _check_sensitivity_limit( sensitivity, Quantity(confusion_noise, u.Jy / u.beam) ) if error_message: calculations.append({"warnings": [error_message]}) continue except IndexError: _thermal_sensitivity = zoom_supplied_sensitivity zoom_params = dataclasses.replace(params) zoom_params.freq_centre_hz = zoom_freq_centre_hz effective_resolution_hz = ( zoom_spectral_resolution_hz * params.spectral_averaging_factor ) zoom_params.bandwidth_hz = effective_resolution_hz integration_input = prepare_integration_input( _thermal_sensitivity, **asdict(zoom_params) ) integration_time = calculate_integration_time(integration_input).to(u.s)[0] spectropolarimetry_input = SpectropolarimetryInput( bandwidth=Quantity(total_bandwidth_hz, "Hz"), frequency=Quantity(zoom_freq_centre_hz, "Hz"), effective_channel_width=Quantity(effective_resolution_hz, "Hz"), ) spectropolarimetry_results = get_spectropolarimetry_results( spectropolarimetry_input ) single = SingleZoomIntegrationTimeResponse( freq_centre=zoom_freq_centre_hz * u.Hz, spectral_integration_time=integration_time, spectropolarimetry_results=spectropolarimetry_results, warnings=warnings, ) calculations.append(asdict(single)) transformed_result = _zoom_transformed_result( params, calculations, weightings, thermals ) res = { "calculate": calculations, "weighting": {"spectral_weighting": [asdict(w) for w in weightings]}, "transformed_result": transformed_result, } # TODO: Once agreed, we need only returning required messages, rather than nulls. And for custom array/non Gaussian only. cleaned_results = _replace_na_fileds(res) return cleaned_results
def _zoom_transformed_result( params: ZoomRequestPrepared, calculations: list[SingleZoomSensitivityResponse], weightings: list[SingleZoomWeightingResponse], thermals: list[ThermalData], ) -> ZoomSensitivityResult | ZoomIntegrationTimeResult: if params.integration_time_s: if params.subarray_configuration is None: return _zoom_custom_sensitivity(calculations) else: if is_beam_non_gaussian( params.weighting_mode, params.robustness, params.taper ): return _zoom_sensitivity_non_gaussian(calculations, weightings) else: return _zoom_sensitivity(calculations, weightings) else: if params.subarray_configuration is None: return _zoom_custom_integration(calculations) else: warnings = [] calculations = deepcopy(calculations) weightings = deepcopy(weightings) if is_beam_non_gaussian( params.weighting_mode, params.robustness, params.taper ): res = _zoom_integration_time_non_gaussian(calculations) else: res = _zoom_integration_time(calculations, weightings, thermals) res.extend(warnings) return res def _zoom_custom_integration( calculations: list[SingleZoomSensitivityResponse], ) -> list[ZoomIntegrationTimeResult]: res = [] for calculation in calculations: zoom_result = ZoomIntegrationTimeResult( spectral_integration_time=calculation["spectral_integration_time"], ) res.append(asdict(zoom_result)) return res def _zoom_custom_sensitivity( calculations: list[SingleZoomSensitivityResponse], ) -> list[ZoomSensitivityResult]: res = [] for calculation in calculations: zoom_result = ZoomSensitivityResult( weighted_spectral_sensitivity=u.Quantity( calculation["spectral_sensitivity"].to(u.Jy).value, u.Jy / u.beam ), ) res.append(asdict(zoom_result)) return res def _zoom_sensitivity( calculations: list[SingleZoomSensitivityResponse], weightings: list[SingleZoomWeightingResponse], ) -> list[ZoomSensitivityResult]: res = [] for calculation, weighting in zip(calculations, weightings): try: weighted_spectral_sensitivity = get_weighted_sensitivity( calculation["spectral_sensitivity"], weighting.weighting_factor, ) except KeyError: weighted_spectral_sensitivity = nan min_beam_size = weighting.beam_size.beam_min_scaled maj_beam_size = weighting.beam_size.beam_maj_scaled spectral_confusion_noise = get_confusion_noise( weighting.confusion_noise.value, min_beam_size, maj_beam_size, ) total_spectral_sensitivity = get_total_sensitivity( spectral_confusion_noise, weighted_spectral_sensitivity ) spectral_synthesized_beam_size = get_synthesized_beam_size( min_beam_size, maj_beam_size, ) spectral_surface_brightness_sensitivity = get_surface_brightness_sensitivity( weighting.sbs_conv_factor, total_spectral_sensitivity, ) warnings = [] if sensitivity_limit_reached( confusion_noise=spectral_confusion_noise, total_sensitivity=total_spectral_sensitivity, ): msg = _sens_limit_warning(spectral_confusion_noise) warnings.append(msg) zoom_result = ZoomSensitivityResult( weighted_spectral_sensitivity=weighted_spectral_sensitivity, warnings=warnings, spectral_confusion_noise=spectral_confusion_noise, total_spectral_sensitivity=total_spectral_sensitivity, spectral_synthesized_beam_size=spectral_synthesized_beam_size, spectral_surface_brightness_sensitivity=spectral_surface_brightness_sensitivity, ) res.append(asdict(zoom_result)) return res def _zoom_sensitivity_non_gaussian( calculations: list[SingleZoomSensitivityResponse], weightings: list[SingleZoomWeightingResponse], ) -> list[ZoomSensitivityResult]: res = [] for calculation, weighting in zip(calculations, weightings): sensitivity = calculation.get("spectral_sensitivity", nan) zoom_result = ZoomSensitivityResult( weighted_spectral_sensitivity=u.Quantity( (sensitivity.to(u.Jy).value * weighting.weighting_factor), u.Jy / u.beam ), ) res.append(asdict(zoom_result)) return res def _zoom_integration_time( calculations: list[SingleZoomSensitivityResponse], weightings: list[SingleZoomWeightingResponse], thermals: list[ThermalData], ) -> list[ZoomIntegrationTimeResult]: res = [] for calculation, weighting, thermal in zip(calculations, weightings, thermals): spectral_integration_time = calculation.get("spectral_integration_time") if spectral_integration_time is None: res.append({}) else: min_beam_size = weighting.beam_size.beam_min_scaled maj_beam_size = weighting.beam_size.beam_maj_scaled spectral_confusion_noise = get_confusion_noise( weighting.confusion_noise.value, min_beam_size, maj_beam_size, ) spectral_synthesized_beam_size = get_synthesized_beam_size( min_beam_size, maj_beam_size, ) warnings = [] # TODO: use the new error function here or assess to remove if not needed? # Maybe it is needed for non-Gaussian beam where the error is just appended? if sensitivity_limit_reached(spectral_confusion_noise, thermal.sensitivity): msg = _sens_limit_warning(Quantity(thermal.confusion_noise, u.Jy)) warnings.append(msg) zoom_result = ZoomIntegrationTimeResult( spectral_integration_time=spectral_integration_time, spectral_confusion_noise=spectral_confusion_noise, spectral_synthesized_beam_size=spectral_synthesized_beam_size, warnings=warnings, ) res.append(asdict(zoom_result)) return res def _confusion_noise_included(params: ZoomRequestPrepared) -> bool: if params.subarray_configuration is None or is_beam_non_gaussian( params.weighting_mode, params.robustness, params.taper ): return False else: return True def _adjust_confusion_noise( params: ZoomRequestPrepared, confusion_noise: float ) -> float: if _confusion_noise_included(params): return confusion_noise else: return 0 def _sens_limit_warning(confusion_noise: Quantity) -> str: return f"Warning: You are approaching the confusion limit ({confusion_noise}) given the synthesized beam-size and frequency." def _sens_limit_error(confusion_noise: Quantity, sensitivity: Quantity) -> str: return f"Error: the entered sensitivity ({sensitivity}) is less than or equal to the confusion noise ({confusion_noise})" def _check_sensitivity_limit( sensitivity: Quantity, confusion_noise: Quantity ) -> Union[bool, str]: if sensitivity.value <= confusion_noise.value: return f"Error: the entered sensitivity ({sensitivity}) is less than or equal to the confusion noise ({confusion_noise})" return False def _zoom_integration_time_non_gaussian( calculations: list[SingleZoomSensitivityResponse], ) -> list[ZoomIntegrationTimeResult]: res = [] for calculation in calculations: zoom_result = ZoomIntegrationTimeResult( spectral_integration_time=calculation["spectral_integration_time"] ) res.append(asdict(zoom_result)) return res def _replace_na_fileds(d: dict | list[dict]) -> dict | list: if type(d) == list: res = [_replace_na_fileds(elem) for elem in d] return res elif type(d) == dict: res = {} for k, v in d.items(): cleaned = _replace_na_fileds(v) if isinstance(cleaned, float) and isnan(cleaned): res[k] = None else: res[k] = cleaned return res else: return d
[docs] def get_subarray_response(): """ return the appropriate subarray objects """ return [ { "name": subarray.name, "label": subarray.label, "n_ska": subarray.n_ska, "n_meer": subarray.n_meer, } for subarray in subarray_storage.list() ]