Source code for ska_sdp_spectral_line_imaging.data_procs.model

import logging
from typing import Tuple

import dask.array as da
import numpy as np
import xarray as xr
from dask.delayed import Delayed, delayed
from ska_sdp_piper.piper.utils import delayed_log

logger = logging.getLogger()


[docs] def report_peak_visibility(visibility, unit): abs_visibility = np.abs(visibility) max_freq_axis = abs_visibility.max( dim=["time", "baseline_id", "polarization"] ) peak_channel = max_freq_axis.argmax() peak_frequency = max_freq_axis.idxmax() max_visibility = abs_visibility.max() return delayed_log( logger.info, "Peak visibility Channel: {peak_channel}." " Frequency: {peak_frequency} {unit}." " Peak Visibility: {max_visibility}", peak_channel=peak_channel.data, peak_frequency=peak_frequency.data, max_visibility=max_visibility.data, unit=unit, )
[docs] def apply_power_law_scaling( image: xr.DataArray, frequency_range: np.ndarray, reference_frequency: float = None, spectral_index: float = 0.75, freq_chunks: Tuple[int] = None, ) -> xr.DataArray: r""" Apply power law scaling on a continuum image and return a scaled cube. The "image" must be a xarray dataarray. If "frequency" dimension of size 1 is present in image, then that value is used as reference frequency for scaling. Else, user can pass any reference frequency as "reference_frequency" argument, which takes preference. If "data" attribute of image is a dask array, then this will return a dataarray which wraps a new dask array containing operations for power law scaling. This means, the values in returned dataarray are not computed eagerly. The formula for power law scaling is given as: .. math:: S2 = S1 * ( \nu2 / \nu1 ) ^ {-\alpha} Parameters ---------- image Image data array. The "data" attribute of dataarray can either be a numpy array or a dask array. The shape can either be 2D (x, y) or 3D (x, y, polarisation) frequency_range Frequency range in hertz over which to scale the data. reference_frequency Refernce frequency in hertz. If not passed, function expects that a frequency coordinate is present in the image. If passed, this takes priority over the frequency cordinates of image. spectral_index Spectral index (alpha) used in power law scaling. freq_chunks Chunks corresponding to frequency channel By default, frequency dimension will not be chunked Returns ------- A 3D or 4D scaled spectral cube """ _ref_freq = None if "frequency" in image.dims: if image.frequency.size != 1: logger.warning( "Can not apply power law scaling on a spectral cube." "Ignoring passed argument and continuing pipeline" ) return image _ref_freq = image.frequency.data # Need to remove frequency dimension # for broadcasting to work later image = image.squeeze(dim="frequency", drop=True) if reference_frequency: _ref_freq = reference_frequency if _ref_freq is None: raise Exception( "reference_frequency is not passed, and " "'frequency' dimension is not present in the input image. " "Can not proceed with power law scaling." ) # Casting channel_multipliers to have same dtype as image # This will reduce the overall size of the cube (as images # are typically float32), but this will reduce the compute # precision of scaled_cube. # A better approach without losing compute precision: # xarray map_blocks on "high precision + low dimensional" arrays, with # immediate downcast inside the delayed task channel_multipliers = np.power( (frequency_range / _ref_freq), -spectral_index ).astype(image.dtype) channel_multipliers_xdr = xr.DataArray( channel_multipliers, dims=["frequency"], coords={"frequency": frequency_range}, ).chunk({"frequency": freq_chunks}) scaled_cube = image * channel_multipliers_xdr return scaled_cube
[docs] def fit_polynomial_on_visibility(data: xr.DataArray) -> Delayed: """ Perform polynomial fit across frequency axis. Parameters ---------- data A DataArray with dimensions ["time", "baseline_id", "polarization", "frequencies"] in any sequence. Returns ------- A dask delayed call to numpy polynomial polyfit function On compute, it should return a numpy array containing the result of polynomial.polyfit """ mean_vis = data.mean( dim=["time", "baseline_id", "polarization"], skipna=True ) weights = np.isfinite(mean_vis) mean_vis_finite = xr.where(weights, mean_vis, 0.0) xaxis = da.arange(mean_vis_finite.size) return delayed(np.polynomial.polynomial.polyfit)( xaxis, mean_vis_finite, w=weights, deg=1 )