# pylint: disable=too-many-instance-attributes
"""
Functions for manipulation of data. Currently, contains functions for:
1. Applying static RFI mask
2. Aligning pointing and visibility timestamps via interpolation
3. Performing time-frequency-polarisation averaging of the visibility
amplitudes or time only averaging or time and polarisation averaging
of the gain amplitudes
4. Extracting visibility or gain amplitudes for all discrete offset
pointing scans
5. Computing the weighted-average of the valid fitted pointing offsets
6. Creating a PointingTable containing the pointing offsets
"""
import logging
import numpy
from scipy.interpolate import interp1d
from ska_sdp_datamodels.calibration import PointingTable
from ska_sdp_datamodels.science_data_model import ReceptorFrame
from ska_sdp_datamodels.visibility import import_staticmasktable_from_hdf5
from ska_sdp_wflow_pointing_offset.calibration.gt_solve import (
compute_gains,
extract_parallel_hand_gains,
)
from ska_sdp_wflow_pointing_offset.utils import (
antenna_and_offsets_reorder,
create_dict_for_output_offsets,
get_crosscorrelations,
observing_band,
select_channels,
split_array_into_chunks,
weighted_avg_and_std,
)
LOG = logging.getLogger("ska-sdp-pointing-offset")
def _check_matching_freqs(indices):
"""
Checks if there is a match between observing frequencies and
static RFI mask frequencies. This check is performed only when
the observing frequencies are fewer than those of the static RFI mask
:param indices: The start and end indices of the matching
frequencies in the static RFI mask
"""
if len(indices) == 0 or 1 < indices.size > 2:
raise ValueError(
"Start and end frequencies of observation do not "
"fall exactly within those of the static RFI mask!"
)
def _check_obs_mask_freqs(obs_freq, mask_freq):
"""
Checks if the observing frequencies are consistent with
the frequency coverage of the static RFI mask
:param obs_freq: Observing frequencies
:param mask_freq: Static RFI mask frequencies
"""
try:
if obs_freq.shape[1] < mask_freq.shape[1]:
LOG.warning(
"Selecting and applying mask matching observing number "
"of frequency channels"
)
elif obs_freq.shape[1] > mask_freq.shape[1]:
raise IndexError(
"Observation has more frequency channels than static RFI mask!"
)
except AttributeError as exc:
if (
numpy.concatenate(obs_freq).shape[0]
< numpy.concatenate(mask_freq).shape[0]
):
LOG.warning(
"Selecting and applying mask matching observing number "
"of frequency channels"
)
elif (
numpy.concatenate(obs_freq).shape[0]
> numpy.concatenate(mask_freq).shape[0]
):
raise IndexError(
"Observation has more frequency channels than static RFI mask!"
) from exc
def _select_mask_and_split(
mask_freq, rfi_mask, start_freq, end_freq, num_chunks
):
"""
Selects static RFI mask in some frequency range and
split into chunks, including its corresponding frequencies
:param mask_freq: Static RFI mask frequencies
:param rfi_mask: Static RFI mask
:param start_freq: Start frequency in MHz
:param end_freq: End frequency in MHz
:param num_chunks: Number of chunks
:return: RFI mask optionally in the selected frequency range
and its corresponding frequencies split into chunks
"""
if start_freq is not None and end_freq is not None:
mask_freq, mask_channel = select_channels(
mask_freq, numpy.arange(len(mask_freq)), start_freq, end_freq
)
rfi_mask = rfi_mask[mask_channel]
# Split mask and corresponding frequencies into chunks
# and discard top and bottom chunks
rfi_mask = split_array_into_chunks(rfi_mask, num_chunks)
mask_freq = split_array_into_chunks(mask_freq, num_chunks)
return rfi_mask, mask_freq
def _remove_bad_channels(obs_freqs, obs_channels, mask_freq, rfi_mask):
"""
Applies the static RFI mask to the observing frequency chunks and
automatically discards empty chunks
:param obs_freqs: Observing frequencies in MHz in all frequency chunks
:param obs_channels: Observing channels in all frequency chunks
:param mask_freq: Static RFI mask frequencies in MHz in all
frequency chunks
:param rfi_mask: Static RFI mask in all frequency chunks
:return: Observing frequencies and corresponding channels
split into chunks
"""
frequency_per_chunk = []
channel_per_chunk = []
LOG.info("Applying static RFI mask...")
for obs_nu, obs_chan, mask_nu, mask in zip(
obs_freqs, obs_channels, mask_freq, rfi_mask
):
if len(obs_nu) < len(mask_nu):
# The observation has fewer frequencies than the RFI mask
# so check if the observing freqs fall within the frequencies
# of the static RFI mask
indices = numpy.array(
[
i
for i, m_nu in enumerate(mask_nu)
if m_nu in (obs_nu[0], obs_nu[-1])
]
)
_check_matching_freqs(indices)
start_index, end_index = numpy.sort(indices)
mask = mask[start_index : end_index + 1]
# Exclude empty chunks and store non-empty frequency and channel chunks
if obs_nu[mask == 0].size == 0 and obs_chan[mask == 0].size == 0:
continue
frequency_per_chunk.append(obs_nu[mask == 0])
channel_per_chunk.append(obs_chan[mask == 0])
return frequency_per_chunk, channel_per_chunk
[docs]
def apply_rfi_mask(
freqs, channels, start_freq=None, end_freq=None, num_chunks=16
):
"""
Apply RFI mask in HDF5 format pulled from the SKA Telescope Model Data
Repository
:param freqs: 1D array of frequencies in MHz [nchan] if num_chunks=1 and
2D array of frequencies in MHz if num_chunks>1
:param channels: 1D array of channel numbers if num_chunks=1 and
2D array of channel numbers if num_chunks>1
:param num_chunks: Number of frequency chunks
:return: Filtered frequency and channel numbers array
"""
static_mask_table = import_staticmasktable_from_hdf5("rfi_mask.h5")
rfi_mask = static_mask_table.mask.data
mask_freq = static_mask_table.frequency.data / 1.0e6 # Hz -> MHz
rfi_mask, mask_freq = _select_mask_and_split(
mask_freq,
rfi_mask,
start_freq,
end_freq,
num_chunks,
)
# Check if the observing frequencies are fewer or more than those
# in the static RFI mask
_check_obs_mask_freqs(freqs, mask_freq)
# Apply the RFI mask
frequency_per_chunk, channel_per_chunk = _remove_bad_channels(
freqs, channels, mask_freq, rfi_mask
)
# Ensure all the frequency channels are not flagged
if len(frequency_per_chunk) == len(channel_per_chunk) == 0:
raise ValueError("All frequency channels are flagged!")
return frequency_per_chunk, channel_per_chunk
[docs]
def interp_timestamps(origin_data, origin_times, new_times):
"""
Interpolate the on-sky offsets at the visibility timestamps.
:param origin_data: on-sky offsets in degrees with dimensions
(nants, ntimes_origin, 2)
:param origin_times: Original times in MJD [ntimes_origin]
:param new_times: New times (from Visibility) [ntimes_new]
:return: Interpolated on-sky offsets in degrees with dimensions
(nants, ntimes_new, 2)
"""
if origin_data.ndim != 3 or origin_data.shape[2] != 2:
LOG.warning(
"Input offset data has the wrong shape, no interpolation done."
)
return origin_data
ntimes_new = new_times.shape[0]
sort_index = numpy.argsort(origin_times)
origin_times = origin_times[sort_index]
origin_data = origin_data[:, sort_index]
direction_xel = origin_data[:, :, 0]
direction_el = origin_data[:, :, 1]
output = numpy.zeros((origin_data.shape[0], ntimes_new, 2))
for i in range(origin_data.shape[0]):
xel_ant = direction_xel[i]
el_ant = direction_el[i]
interp_xel = interp1d(
x=origin_times, y=xel_ant, kind="zero", fill_value="extrapolate"
)
interp_el = interp1d(
x=origin_times, y=el_ant, kind="zero", fill_value="extrapolate"
)
new_xel_ant = interp_xel(new_times)
new_el_ant = interp_el(new_times)
output[i, :, 0] = new_xel_ant
output[i, :, 1] = new_el_ant
return output
def _avg_amp(data, weights, fit_to_sep_pol=False, fit_to_vis=False):
"""
Computes the weighted-average of the visibility amplitudes in
time, frequency, and polarisation. For the gain amplitudes, the
weighted average is computed in time only (when fitting to the
parallel-hands polarisations) or in time and polarisation (when
fitting to the Stokes I amplitudes)
:param data: Visibility amplitudes with dimensions (ntimes, nants,
nchans, npols) or gain amplitudes with dimensions (ntimes, nants,
npols)
:param weights: Visibility or gain calibration weights. Have the
same dimensions as data
:param fit_to_sep_pol: Fit primary beams to the parallel-hands gain
amplitudes
:param fit_to_vis: Fit primary beams to the cross-correlation visibility
amplitudes instead of the antenna gain amplitudes
:return: Weighted-average visibility or gain amplitudes and weighted
standard deviation. These amplitudes and their standard deviations have
dimension (nants, ) if fitting to the Stokes I visibility and gain
amplitudes and dimensions (nants, npols) when fitting to the
parallel-hands polarisations gain amplitudes.
"""
if fit_to_vis:
# Average in time, frequency, and polarisation
axis = (0, 2, 3)
elif fit_to_sep_pol:
# Average in time only
axis = 0
else:
# Average in time and polarisation
axis = (0, 2)
return weighted_avg_and_std(
data, axis=axis, weights=weights, std_error=False
)
def _create_arrays_for_fitting(
vis_list_size,
ants_size,
num_chunks,
fit_to_sep_pol=False,
):
"""
Creates empty arrays to be populated for the beam fitting
routine
:param vis_list_size: Size of Visibility objects [nscans]
:param ants_size: Size of katpoint Antenna objects [nants]
:param num_chunks: Number of usable frequency chunks (integer)
:param fit_to_sep_pol: Fit primary beam to the parallel-hands gain
or visibility amplitudes instead of the Stokes I amplitudes
:return: Empty array of the Stokes I or parallel-hands polarisation
visibility or gain amplitudes, their standard deviations, and
frequencies in all chunks (should be same for all scans)
"""
frequency_per_chunk = numpy.zeros(num_chunks)
if fit_to_sep_pol:
y_per_scan = numpy.zeros((2, ants_size, num_chunks, vis_list_size))
stddev_per_scan = numpy.zeros(
(2, ants_size, num_chunks, vis_list_size)
)
else:
y_per_scan = numpy.zeros((ants_size, num_chunks, vis_list_size))
stddev_per_scan = numpy.zeros((ants_size, num_chunks, vis_list_size))
return y_per_scan, stddev_per_scan, frequency_per_chunk
# pylint: disable=too-many-locals,too-many-statements
[docs]
def fitted_offsets_weighted_average(ants, fitted_beams, fit_to_vis=False):
"""
Compute the weighted average of the fitted pointing offsets.
:param ants: A list of katpoint Antenna objects [nants].
:param fitted_beams: A dictionary of the fitted beams obtained from
:func:`beam_fitting.py.SolveForOffsets.fit_to_gains` or
:func:`beam_fitting.py.SolveForOffsets.fit_to_vis`.
:param fit_to_vis: Fit primary beam to cross-correlation visibility
amplitudes instead of the antenna gain amplitudes.
:return: The weighted average of the valid fitted cross-elevation
and elevation offsets and their standard deviations in
radians, the theoretical and fitted beamwidths (and
uncertainty) in radians, and the fitted height and its
uncertainty for each dish.
"""
fitted_parameters = create_dict_for_output_offsets(len(ants))
if fit_to_vis:
for antenna_name in fitted_beams.keys():
beams_freq = fitted_beams.get(antenna_name, [])
fitted_parameters["antenna_names"] = antenna_name
beams_freq = [
b for b in beams_freq if b is not None and b.is_valid
]
if not beams_freq:
LOG.warning(
"%s had no valid primary beam fitted", antenna_name
)
continue
# Get the fitted centres, width, height and their uncertainties
offsets_freq = numpy.array([b.centre for b in beams_freq])
offsets_freq_stderr = numpy.array(
[b.std_centre for b in beams_freq]
)
width_freq = numpy.array([b.width for b in beams_freq])
width_freq_stderr = numpy.array([b.std_width for b in beams_freq])
height_freq = numpy.array([b.height for b in beams_freq])
height_freq_stderr = numpy.array(
[b.std_height for b in beams_freq]
)
expected_width_freq = numpy.array(
[b.expected_width for b in beams_freq]
)
# Do a weighted-average of the fitted parameters over
# frequency chunks. The expected width is just the
# average since it has no uncertainty
if len(offsets_freq) > 1:
pointing_offset, pointing_offset_stderr = weighted_avg_and_std(
data=offsets_freq,
axis=0,
weights=1.0 / offsets_freq_stderr**2,
std_error=True,
)
width, width_stderr = weighted_avg_and_std(
data=width_freq,
axis=0,
weights=1.0 / width_freq_stderr**2,
std_error=True,
)
height, height_stderr = weighted_avg_and_std(
data=height_freq,
axis=0,
weights=1.0 / height_freq_stderr**2,
std_error=True,
)
else:
pointing_offset = numpy.squeeze(offsets_freq)
pointing_offset_stderr = numpy.squeeze(offsets_freq_stderr)
width = numpy.squeeze(width_freq)
width_stderr = numpy.squeeze(width_freq_stderr)
height = height_freq
height_stderr = height_freq_stderr
expected_width = expected_width_freq.mean(axis=0)
# Store the fitted parameters
fitted_parameters["xel_offset"] = numpy.radians(pointing_offset[0])
fitted_parameters["xel_offset_std"] = numpy.radians(
pointing_offset_stderr[0]
)
fitted_parameters["el_offset"] = numpy.radians(pointing_offset[1])
fitted_parameters["el_offset_std"] = numpy.radians(
pointing_offset_stderr[1]
)
fitted_parameters["expected_width"] = numpy.radians(expected_width)
fitted_parameters["fitted_width"] = numpy.radians(width)
fitted_parameters["fitted_width_std"] = numpy.radians(width_stderr)
fitted_parameters["fitted_height"] = height
fitted_parameters["fitted_height_std"] = height_stderr
else:
for i, antenna in enumerate(ants):
fitted_parameters["antenna_names"][i] = antenna.name
beams_freq = fitted_beams.get(antenna.name, [])
beams_freq = [
b for b in beams_freq if b is not None and b.is_valid
]
if not beams_freq:
LOG.warning(
"%s had no valid primary beam fitted", antenna.name
)
continue
# Get the fitted centres, width, height and their uncertainties
offsets_freq = numpy.array([b.centre for b in beams_freq])
offsets_freq_stderr = numpy.array(
[b.std_centre for b in beams_freq]
)
width_freq = numpy.array([b.width for b in beams_freq])
width_freq_stderr = numpy.array([b.std_width for b in beams_freq])
height_freq = numpy.array([b.height for b in beams_freq])
height_freq_stderr = numpy.array(
[b.std_height for b in beams_freq]
)
expected_width_freq = numpy.array(
[b.expected_width for b in beams_freq]
)
# Do a weighted-average of the fitted parameters over
# frequency chunks. The expected width is just the
# average since it has no uncertainty
if len(offsets_freq) > 1:
pointing_offset, pointing_offset_stderr = weighted_avg_and_std(
data=offsets_freq,
axis=0,
weights=1.0 / offsets_freq_stderr**2,
std_error=True,
)
width, width_stderr = weighted_avg_and_std(
data=width_freq,
axis=0,
weights=1.0 / width_freq_stderr**2,
std_error=True,
)
height, height_stderr = weighted_avg_and_std(
data=height_freq,
axis=0,
weights=1.0 / height_freq_stderr**2,
std_error=True,
)
else:
pointing_offset = numpy.squeeze(offsets_freq)
pointing_offset_stderr = numpy.squeeze(offsets_freq_stderr)
width = numpy.squeeze(width_freq)
width_stderr = numpy.squeeze(width_freq_stderr)
height = height_freq
height_stderr = height_freq_stderr
expected_width = expected_width_freq.mean(axis=0)
# Store the fitted parameters
fitted_parameters["xel_offset"][i] = numpy.radians(
pointing_offset[0]
)
fitted_parameters["xel_offset_std"][i] = numpy.radians(
pointing_offset_stderr[0]
)
fitted_parameters["el_offset"][i] = numpy.radians(
pointing_offset[1]
)
fitted_parameters["el_offset_std"][i] = numpy.radians(
pointing_offset_stderr[1]
)
fitted_parameters["expected_width"][i] = numpy.radians(
expected_width
)
fitted_parameters["fitted_width"][i] = numpy.radians(width)
fitted_parameters["fitted_width_std"][i] = numpy.radians(
width_stderr
)
fitted_parameters["fitted_height"][i] = height
fitted_parameters["fitted_height_std"][i] = height_stderr
return fitted_parameters
# pylint: disable=too-many-arguments,too-many-positional-arguments
[docs]
def create_offset_pointing_table(
output_offsets,
frequency_per_chunk,
reference_mjd,
reference_commanded_azel,
vis_list,
x_per_scan,
track_duration,
):
"""
Writes the computed pointing offsets and other fitted parameters
into a PointingTable.
The pointing offsets in cross-elevation and elevation in radians
are stored as the `pointing` variable.
The expected width in the H and V directions in radians are stored
as the `expected_width` variable.
The fitted width in the H and V directions in radians are stored
as the `fitted_width` variable.
The fitted height in arbitrary units is stored as the `fitted_height`
variable.
The uncertainties on the fitted width and height are stored as the
`fitted_width_std` and `fitted_height_std` variables respectively.
The uncertainties on the pointing offsets in the `pointing` variable
are converted to weights as the inverse square of the uncertainties
and stored as the `weight` variable.
:param output_offsets: A list of dictionary of the weighted-average
pointing offsets output by :func:`array_data_func.weighted_average`
when fitting to the parallel-hands amplitudes and a dictioanry of
these pointing offsets when fitting to the Stokes I amplitudes.
:param frequency_per_chunk: 1D array of the averaged-frequency in each
chunk in Hz [num_chunks]
:param reference_mjd: Reference timestamp (s) in MJD
:param reference_commanded_azel: Commanded AzEl at the `reference_mjd`
in radians with dimension (nants, 2) when fitting to the gain
amplitudes. When fitting to the visibility amplitudes, the dimension
is (ntimes, nants, 2) where ntimes is the number of the `reference_mjd`
which is expected to be 2
:param vis_list: List of Visibility
:param x_per_scan: The commanded pointings relative to the target in
the reference pointing observation in xEl-El coordinates for one
antenna with dimensions (nscans, 2)
:param track_duration: A list of how long each scan was tracked for in
seconds
:param num_chunks: Number of frequency chunks (integer)
:return: PointingTable of the computed pointing offsets
and other fitted parameters for each antenna
"""
reference_mjd = numpy.array(reference_mjd).ravel()
discrete_offset = numpy.zeros((x_per_scan.shape[0], 2))
for i in range(x_per_scan.shape[0]):
discrete_offset[i] = numpy.stack((x_per_scan[i, 0], x_per_scan[i, 1]))
discrete_offset = discrete_offset.round(2)
band_type = observing_band(frequency_per_chunk)
scan_mode = f"{x_per_scan.shape[0]}-point"
# We expect equal track duration for each pointing scan
# so we will only write the value of just one scan
# to the PointingTable
track_duration = track_duration[0]
if reference_commanded_azel is not None:
if isinstance(reference_commanded_azel, list):
# Two-dish mode scenario so will have dimensions
# (nobs/ntimes, nants, 2), where nobs=2 because
# there are two sets of observations and in each
# observation, the reference time is calculated.
# Each set of reference commanded azel has shape
# (nants, 2) and will be reshaped to (1, nants, 1, 1, 2)
reference_commanded_azel = numpy.radians(
numpy.stack(
(reference_commanded_azel[0], reference_commanded_azel[1])
)
)
reference_commanded_azel = reference_commanded_azel[
:, :, numpy.newaxis, numpy.newaxis
]
else:
# Three-dish mode scenario
# The reference commanded azel has shape (nants, ntimes, 2)
# but will be reshaped to (ntimes, nants, 1, 1, 2)
reference_commanded_azel = numpy.radians(
reference_commanded_azel[
numpy.newaxis, :, numpy.newaxis, numpy.newaxis, :
]
)
# In the two-dish mode, ensure the fitted offsets are matched
# to the right antennas
if (
output_offsets["antenna_names"]
!= vis_list[0][0].attrs["configuration"].names.data
).all():
LOG.info(
"Order of Antenna names and antenna names in Configuration "
"do not match. Fixing it and their corresponding "
"fitted parameters before writing to PointingTable"
)
output_offsets = antenna_and_offsets_reorder(output_offsets)
# Extract the cross-elevation and elevation offsets
pointing_offsets = numpy.dstack(
(
output_offsets["xel_offset"],
output_offsets["el_offset"],
)
)
pointing_offsets = pointing_offsets[
:, :, numpy.newaxis, numpy.newaxis, :
].repeat(len(reference_mjd), axis=0)
# Extract the uncertainties on the cross-elevation and
# elevation offsets
pointing_offsets_std = numpy.dstack(
(
output_offsets["xel_offset_std"],
output_offsets["el_offset_std"],
)
)
pointing_offsets_std = pointing_offsets_std[
:, :, numpy.newaxis, numpy.newaxis, :
].repeat(len(reference_mjd), axis=0)
weights = 1.0 / pointing_offsets_std**2
# Extract the expected width
expected_width = output_offsets["expected_width"]
expected_width = expected_width[
numpy.newaxis, :, numpy.newaxis, numpy.newaxis, :
].repeat(len(reference_mjd), axis=0)
# Extract the fitted width
fitted_width = output_offsets["fitted_width"]
fitted_width = fitted_width[
numpy.newaxis, :, numpy.newaxis, numpy.newaxis, :
].repeat(len(reference_mjd), axis=0)
# Extract the uncertainties on the fitted width
fitted_width_std = output_offsets["fitted_width_std"]
fitted_width_std = fitted_width_std[
numpy.newaxis, :, numpy.newaxis, numpy.newaxis, :
].repeat(len(reference_mjd), axis=0)
# Extract the fitted height
fitted_height = output_offsets["fitted_height"]
fitted_height = fitted_height[
numpy.newaxis, :, numpy.newaxis, numpy.newaxis
].repeat(len(reference_mjd), axis=0)
# Extract the uncertainties on the fitted height
fitted_height_std = output_offsets["fitted_height_std"]
fitted_height_std = fitted_height_std[
numpy.newaxis, :, numpy.newaxis, numpy.newaxis
].repeat(len(reference_mjd), axis=0)
# The Cross-El and El offsets are in radians so
# we write them directly into the PointingTable
return PointingTable.constructor(
pointing=pointing_offsets,
expected_width=expected_width,
fitted_width=fitted_width,
fitted_width_std=fitted_width_std,
fitted_height=fitted_height,
fitted_height_std=fitted_height_std,
time=reference_mjd,
weight=weights,
frequency=numpy.ravel(frequency_per_chunk.mean()),
receptor_frame=ReceptorFrame("stokesI"),
pointing_frame="xel-el",
band_type=band_type,
scan_mode=scan_mode,
track_duration=track_duration,
discrete_offset=discrete_offset,
commanded_pointing=reference_commanded_azel,
pointingcentre=vis_list[0][0].phasecentre,
configuration=vis_list[0][0].configuration,
)