import logging
from typing import Tuple
import dask.array as da
import numpy as np
import xarray as xr
from dask.delayed import Delayed, delayed
from ska_sdp_piper.piper.utils import delayed_log
logger = logging.getLogger()
[docs]
def report_peak_visibility(visibility, unit):
abs_visibility = np.abs(visibility)
max_freq_axis = abs_visibility.max(
dim=["time", "baseline_id", "polarization"]
)
peak_channel = max_freq_axis.argmax()
peak_frequency = max_freq_axis.idxmax()
max_visibility = abs_visibility.max()
return delayed_log(
logger.info,
"Peak visibility Channel: {peak_channel}."
" Frequency: {peak_frequency} {unit}."
" Peak Visibility: {max_visibility}",
peak_channel=peak_channel.data,
peak_frequency=peak_frequency.data,
max_visibility=max_visibility.data,
unit=unit,
)
[docs]
def apply_power_law_scaling(
image: xr.DataArray,
frequency_range: np.ndarray,
reference_frequency: float = None,
spectral_index: float = 0.75,
freq_chunks: Tuple[int] = None,
) -> xr.DataArray:
r"""
Apply power law scaling on a continuum image and return a scaled cube.
The "image" must be a xarray dataarray.
If "frequency" dimension of size 1 is present in image,
then that value is used as reference frequency for scaling.
Else, user can pass any reference frequency
as "reference_frequency" argument, which takes preference.
If "data" attribute of image is a dask array, then this will return
a dataarray which wraps a new dask array containing operations
for power law scaling. This means, the values in returned dataarray
are not computed eagerly.
The formula for power law scaling is given as:
.. math::
S2 = S1 * ( \nu2 / \nu1 ) ^ {-\alpha}
Parameters
----------
image
Image data array. The "data" attribute of dataarray can either
be a numpy array or a dask array.
The shape can either be 2D (x, y) or 3D (x, y, polarisation)
frequency_range
Frequency range in hertz over which to scale the data.
reference_frequency
Refernce frequency in hertz. If not passed, function expects that
a frequency coordinate is present in the image. If passed, this
takes priority over the frequency cordinates of image.
spectral_index
Spectral index (alpha) used in power law scaling.
freq_chunks
Chunks corresponding to frequency channel
By default, frequency dimension will not be chunked
Returns
-------
A 3D or 4D scaled spectral cube
"""
_ref_freq = None
if "frequency" in image.dims:
if image.frequency.size != 1:
logger.warning(
"Can not apply power law scaling on a spectral cube."
"Ignoring passed argument and continuing pipeline"
)
return image
_ref_freq = image.frequency.data
# Need to remove frequency dimension
# for broadcasting to work later
image = image.squeeze(dim="frequency", drop=True)
if reference_frequency:
_ref_freq = reference_frequency
if _ref_freq is None:
raise Exception(
"reference_frequency is not passed, and "
"'frequency' dimension is not present in the input image. "
"Can not proceed with power law scaling."
)
# Casting channel_multipliers to have same dtype as image
# This will reduce the overall size of the cube (as images
# are typically float32), but this will reduce the compute
# precision of scaled_cube.
# A better approach without losing compute precision:
# xarray map_blocks on "high precision + low dimensional" arrays, with
# immediate downcast inside the delayed task
channel_multipliers = np.power(
(frequency_range / _ref_freq), -spectral_index
).astype(image.dtype)
channel_multipliers_xdr = xr.DataArray(
channel_multipliers,
dims=["frequency"],
coords={"frequency": frequency_range},
).chunk({"frequency": freq_chunks})
scaled_cube = image * channel_multipliers_xdr
return scaled_cube
[docs]
def fit_polynomial_on_visibility(data: xr.DataArray) -> Delayed:
"""
Perform polynomial fit across frequency axis.
Parameters
----------
data
A DataArray with dimensions ["time", "baseline_id", "polarization",
"frequencies"] in any sequence.
Returns
-------
A dask delayed call to numpy polynomial polyfit function
On compute, it should return a numpy array containing the
result of polynomial.polyfit
"""
mean_vis = data.mean(
dim=["time", "baseline_id", "polarization"], skipna=True
)
weights = np.isfinite(mean_vis)
mean_vis_finite = xr.where(weights, mean_vis, 0.0)
xaxis = da.arange(mean_vis_finite.size)
return delayed(np.polynomial.polynomial.polyfit)(
xaxis, mean_vis_finite, w=weights, deg=1
)