Source code for ska_sdp_instrumental_calibration.xarray_processors.delay

import dask
import dask.array as da
import numpy as np
import xarray as xr


[docs] def calculate_delay(gaintable: xr.Dataset, oversample) -> xr.Dataset: """ Applies the delay to the given gaintable Parameters ---------- gaintable: xr.Dataset Gaintable oversample: int Oversample rate required for the delay Returns ------- xr.DataSet delay """ nstations = len(gaintable.antenna) Xgain = gaintable.gain[0, :, :, 0, 0] Ygain = gaintable.gain[0, :, :, 1, 1] Xdelay_coarse = coarse_delay(Xgain, oversample) Xdelay, Xoffset = update_delay( gaintable, da.zeros(nstations, dtype=float), Xdelay_coarse, pol=0, ) Ydelay_coarse = coarse_delay(Ygain, oversample) Ydelay, Yoffset = update_delay( gaintable, da.zeros(nstations, dtype=float), Ydelay_coarse, pol=1, ) return xr.Dataset( data_vars=dict( delay=( ["time", "antenna", "pol"], np.stack([Xdelay, Ydelay], axis=1).reshape(1, nstations, 2), ), offset=( ["time", "antenna", "pol"], np.stack([Xoffset, Yoffset], axis=1).reshape(1, nstations, 2), ), ), coords=dict( antenna=gaintable.antenna, pol=["XX", "YY"], time=gaintable.time ), attrs=dict(configuration=gaintable.configuration), )
[docs] def apply_delay(gaintable: xr.Dataset, delay: xr.Dataset) -> xr.Dataset: """ Applies the delay to the given gaintable Parameters ---------- gaintable: xr.Dataset Gaintable oversample: int Oversample rate required for the delay Returns ------- xr.Dataset Gaintable with updated gains """ new_gain_data = gaintable.gain.data.copy() Xgain = gaintable.gain[0, :, :, 0, 0] Ygain = gaintable.gain[0, :, :, 1, 1] Xdelay = delay.delay.data[0, :, 0] Xoffset = delay.offset.data[0, :, 0] Ydelay = delay.delay.data[0, :, 1] Yoffset = delay.offset.data[0, :, 1] new_gain_data[0, :, :, 0, 0] = calculate_gain_rot( Xgain, Xdelay, Xoffset, gaintable.frequency.data.reshape(1, -1) ) new_gain_data[0, :, :, 1, 1] = calculate_gain_rot( Ygain, Ydelay, Yoffset, gaintable.frequency.data.reshape(1, -1) ) new_gain = gaintable.gain.copy() new_gain.data = new_gain_data return gaintable.assign( { "gain": new_gain, } ).chunk(gaintable.chunks)
[docs] def update_delay(gaintable, _offset, delay, pol): """ Updates the delay to the gains Parameters ---------- gaintable: xr.Dataset Gaintable _offset: np.array Calculated offset per station delay: np.array Calculated delays per station pol: int Polarisations of the gains Returns ------- np.array Updated delay and offset. """ freq = gaintable.frequency.data.reshape(1, -1) gains = gaintable.gain.data[0, :, :, pol, pol] wgt = gaintable.weight.data[0, :, :, pol, pol] gains_rot = calculate_gain_rot(gains, delay, _offset, freq) cycles = da.from_delayed( calculate_cycles(gains_rot), gains.shape, gains_rot.real.dtype ) denom = np.sum(wgt, axis=1) * np.sum(wgt * freq * freq, axis=1) - np.sum( wgt * freq, axis=1 ) * np.sum(wgt * freq, axis=1) offset = ( _offset + ( np.sum(wgt * freq * freq, axis=1) * np.sum(wgt * cycles, axis=1) - np.sum(wgt * freq, axis=1) * np.sum(wgt * freq * cycles, axis=1) ) / denom ) updated_delay = ( np.sum(wgt, axis=1) * np.sum(wgt * cycles * freq, axis=1) - np.sum(wgt * cycles, axis=1) * np.sum(wgt * freq, axis=1) ) / denom return delay + updated_delay, offset
@dask.delayed def calculate_cycles(gains_rot): return np.unwrap(np.angle(gains_rot)) / (2 * np.pi)
[docs] def coarse_delay(gains, oversample): """ Calculates the coarse delay Parameters ---------- frequency: xarray Frequency of the gains gains: xarray Gains from previous calibration step oversample: int Oversample rate Returns ------- np.array Array of coarse delays for all stations """ nstations = len(gains.antenna) nchan = len(gains.frequency) frequency = gains.frequency N = oversample * nchan padded_gains = da.zeros((nstations, N), "complex") gain_start_index = N // 2 - nchan // 2 gain_stop_index = gain_start_index + nchan padded_gains[:, gain_start_index:gain_stop_index] = gains delay_spec = da.fft.fftshift(da.fft.fft(padded_gains, axis=1), axes=(1,)) delay = ((1 / oversample) / (frequency[-1] - frequency[0]).data) * ( -N // 2 + da.arange(N) ) return delay[da.abs(delay_spec).argmax(axis=1)]
[docs] def calculate_gain_rot(gain, delay, offset, freq): """ Calculates gain rotation Parameters ---------- gain: xarray Gains delay: np.array Delays offset: np.array Offset freq: xarray Frequency Returns ------- np.array Array of calculated gain rotation """ return gain * np.exp(-2j * np.pi * (offset + (delay.T * freq.T))).T