from typing import Literal, Union
import dask.array as da
import numpy as np
from ska_sdp_datamodels.calibration import GainTable
from ska_sdp_datamodels.science_data_model import ReceptorFrame
from ska_sdp_datamodels.visibility import Visibility
from .solution_interval import SolutionIntervals
[docs]
def create_gaintable_from_visibility(
vis: Visibility,
timeslice: Union[float, Literal["auto", "full"], None] = None,
jones_type: Literal["T", "G", "B"] = "T",
lower_precision: bool = True,
skip_default_chunk: bool = False,
) -> GainTable:
"""
Create a unity- or identity-initialised GainTable consistent with the
given Visibility.
GainTable either represents:
- a collection of complex-valued scalar gains, if Visibility carries only
Stokes I data.
- a collection of 2x2 complex-valued Jones matrices otherwise.
In the first case, gains are initialised to unity. In the second case,
Jones matrices are initialised to the identity matrix.
Parameters
----------
vis
Visibility to create gaintable from
timeslice
Defines the time scale over which each gain solution is
valid. This is used to define the time axis of the GainTable. This
parameter is interpreted as follows depending on its type:
- ``float``: this is a custom time interval in seconds.
Input timestamps are grouped by intervals of this duration,
and said groups are separately averaged to produce
the output time axis.
- "full": create a single solution across the entire visibility time
- "auto" or ``None``: match the time resolution of the input
visibility, i.e. copy the time axis of the input Visibility
jones_type
Type of Jones term, one of {"T", "G", "B"}.
The frequency axis of the output GainTable depends on the value
provided:
- "B": the output frequency axis is the same as that of the input
Visibility.
- "T" or "G": solution is assumed to be frequency-independent,
and the frequency axis of the output contains a single value: the
average frequency of the input Visibility's channels.
lower_precision
Used to set up the float bit sizes while initialising the gaintable.
If true, uses np.complex64 and np.float32 instead of higher precision
np.complex128 and np.float64. Useful for memory optimization.
skip_default_chunk
If set to true, skips Dask/Xarray chunking of data in alignment to the
input visibility. Useful in cases of chunk alignment issues.
Default: False
Returns
-------
Gaintable suitable for storing calibration solutions of given
visibility
"""
# Backward compatibility. Should be removed as "auto" is vary vague
if timeslice == "auto":
timeslice = None
soln_intervals = SolutionIntervals(vis.time.data, timeslice)
ntimes = soln_intervals.size
nants = vis.visibility_acc.nants
# Set the frequency sampling
if jones_type == "B":
gain_frequency = vis.frequency.data
nfrequency = len(gain_frequency)
elif jones_type in ("G", "T"):
gain_frequency = np.mean(vis.frequency.data, keepdims=True)
nfrequency = 1
else:
raise ValueError(f"Unknown Jones type {jones_type}")
# There is only one receptor frame in Visibility
# Use it for both receptor1 and receptor2
receptor_frame = ReceptorFrame(vis.visibility_acc.polarisation_frame.type)
nrec = receptor_frame.nrec
gain_shape = [ntimes, nants, nfrequency, nrec, nrec]
# Create data variables with provided precision and backend
if lower_precision:
complex_dtype, float_dtype = np.complex64, np.float32
else:
complex_dtype, float_dtype = np.complex128, np.float64
gain = da.broadcast_to(da.eye(nrec, dtype=complex_dtype), gain_shape)
gain_weight = da.ones(gain_shape, dtype=float_dtype)
gain_residual = da.zeros(
[ntimes, nfrequency, nrec, nrec], dtype=float_dtype
)
gain_table = GainTable.constructor(
gain=gain,
time=soln_intervals.solution_time,
interval=soln_intervals.intervals,
weight=gain_weight,
residual=gain_residual,
frequency=gain_frequency,
receptor_frame=receptor_frame,
phasecentre=vis.phasecentre,
configuration=vis.configuration,
jones_type=jones_type,
)
# Attach solution interval slices as attribute
gain_table.attrs["soln_interval_slices"] = soln_intervals.indices
# Chunk data variables
if skip_default_chunk:
return gain_table
gain_table = gain_table.chunk(time=1)
if gain_table.frequency.size == vis.frequency.size:
gain_table = gain_table.chunk(frequency=vis.chunksizes["frequency"])
return gain_table
[docs]
def reset_gaintable(gaintable: GainTable) -> GainTable:
"""
Returns a new dask-backed gaintable with all data variables resetted
to their initial sensible values.
Parameters
----------
gaintable
Gaintable object to be reset
Returns
-------
Gaintable with data variables resetted to their intial sensible values.
"""
gain_shape = gaintable.gain.shape
nrec = gain_shape[-1]
gain = da.broadcast_to(
da.eye(nrec, dtype=gaintable.gain.dtype), gain_shape
)
weight = da.ones(gaintable.weight.shape, dtype=gaintable.weight.dtype)
residual = da.zeros(
gaintable.residual.shape, dtype=gaintable.residual.dtype
)
# Deepcopy and change data variables
# Simpler and less prone to errors than "assign"
new_gaintable = gaintable.copy(deep=True)
new_gaintable.gain.data = gain
new_gaintable.weight.data = weight
new_gaintable.residual.data = residual
return new_gaintable
[docs]
def divide_bandpass_by_ref_ant_preserve_phase(
gaintable: GainTable, ref_ant: int
) -> GainTable:
"""
Original code referred from https://github.com/flint-crew/flint.git
Divide the bandpass complex gains (solved for initially by something
like calibrate) by a nominated reference antenna. In the case of
``calibrate`` there is no implicit reference antenna. This is valid for
cases where the xy-phase is set to 0 (true via the ASKAP on-dish
calibrator).
This particular function is most appropriate for the `calibrate` style
solutions, which solve for the Jones in one step. In HMS notation this
are normally split into two separate 2x2 matrices, one for the gains
with zero off-diagonal elements and a leakage matrix with ones on
the diagonal.
This is the preferred function to use whena attempting to set a
phase reference antenna to precomputed Jones bandpass solutions.
The input complex gains should be in the form:
>> (ant, channel, pol)
Internally reference phasores are constructed for the G_x and G_y
terms of the reference antenna. They are then applied:
>> G_x = G_x / G_xref
>> G_xyp = G_xy / G_yref
>> G_yxp = G_yx / G_xref
>> G_y = G_y / G_yref
which is applied to all antennas in ``complex_gains``.
Args:
complex_gains (np.ndarray): The complex gains that will be normalised
ref_ant (int): The desired reference antenna to use
Returns:
GainTable: The normalised bandpass solutions
"""
gains = np.copy(gaintable.gain.data)
g_x = gains[..., 0, 0]
g_y = gains[..., 1, 1]
g_xy = gains[..., 0, 1]
g_yx = gains[..., 1, 0]
ref_g_x = gains[:, ref_ant : ref_ant + 1, :, 0, 0] # noqa: E203
ref_g_x = ref_g_x / np.abs(ref_g_x)
ref_g_y = gains[:, ref_ant : ref_ant + 1, :, 1, 1] # noqa: E203
ref_g_y = ref_g_y / np.abs(ref_g_y)
g_x_prime = g_x / ref_g_x
g_y_prime = g_y / ref_g_y
g_xy_prime = g_xy / ref_g_y
g_yx_prime = g_yx / ref_g_x
bp_p = np.zeros_like(gains) * np.nan + 1j * np.zeros_like(gains) * np.nan
bp_p[..., 0, 0] = g_x_prime
bp_p[..., 1, 1] = g_y_prime
bp_p[..., 0, 1] = g_xy_prime
bp_p[..., 1, 0] = g_yx_prime
new_gain = gaintable.gain.copy()
new_gain.data = bp_p
return gaintable.assign(
{
"gain": new_gain,
}
).chunk(gaintable.chunks)