Source code for ska_sdp_wflow_pointing_offset.array_data_func

# pylint: disable=too-many-instance-attributes

"""
Functions for manipulation of data. Currently, contains functions for:

1. Applying static RFI mask
2. Aligning pointing and visibility timestamps via interpolation

3. Performing time-frequency-polarisation averaging of the visibility
   amplitudes or time only averaging or time and polarisation averaging
   of the gain amplitudes

4. Extracting visibility or gain amplitudes for all discrete offset
   pointing scans

5. Computing the weighted-average of the valid fitted pointing offsets
6. Creating a PointingTable containing the pointing offsets
"""

import logging

import numpy
from scipy.interpolate import interp1d
from ska_sdp_datamodels.calibration import PointingTable
from ska_sdp_datamodels.science_data_model import ReceptorFrame
from ska_sdp_datamodels.visibility import import_staticmasktable_from_hdf5

from ska_sdp_wflow_pointing_offset.calibration.gt_solve import (
    compute_gains,
    extract_parallel_hand_gains,
)
from ska_sdp_wflow_pointing_offset.utils import (
    antenna_and_offsets_reorder,
    create_dict_for_output_offsets,
    get_crosscorrelations,
    observing_band,
    select_channels,
    split_array_into_chunks,
    weighted_avg_and_std,
)

LOG = logging.getLogger("ska-sdp-pointing-offset")


def _check_matching_freqs(indices):
    """
    Checks if there is a match between observing frequencies and
    static RFI mask frequencies. This check is performed only when
    the observing frequencies are fewer than those of the static RFI mask

    :param indices: The start and end indices of the matching
        frequencies in the static RFI mask
    """
    if len(indices) == 0 or 1 < indices.size > 2:
        raise ValueError(
            "Start and end frequencies of observation do not "
            "fall exactly within those of the static RFI mask!"
        )


def _check_obs_mask_freqs(obs_freq, mask_freq):
    """
    Checks if the observing frequencies are consistent with
    the frequency coverage of the static RFI mask

    :param obs_freq: Observing frequencies
    :param mask_freq: Static RFI mask frequencies
    """
    try:
        if obs_freq.shape[1] < mask_freq.shape[1]:
            LOG.warning(
                "Selecting and applying mask matching observing number "
                "of frequency channels"
            )
        elif obs_freq.shape[1] > mask_freq.shape[1]:
            raise IndexError(
                "Observation has more frequency channels than static RFI mask!"
            )
    except AttributeError as exc:
        if (
            numpy.concatenate(obs_freq).shape[0]
            < numpy.concatenate(mask_freq).shape[0]
        ):
            LOG.warning(
                "Selecting and applying mask matching observing number "
                "of frequency channels"
            )
        elif (
            numpy.concatenate(obs_freq).shape[0]
            > numpy.concatenate(mask_freq).shape[0]
        ):
            raise IndexError(
                "Observation has more frequency channels than static RFI mask!"
            ) from exc


def _select_mask_and_split(
    mask_freq, rfi_mask, start_freq, end_freq, num_chunks
):
    """
    Selects static RFI mask in some frequency range and
    split into chunks, including its corresponding frequencies

    :param mask_freq: Static RFI mask frequencies
    :param rfi_mask: Static RFI mask
    :param start_freq: Start frequency in MHz
    :param end_freq: End frequency in MHz
    :param num_chunks: Number of chunks

    :return: RFI mask optionally in the selected frequency range
        and its corresponding frequencies split into chunks
    """
    if start_freq is not None and end_freq is not None:
        mask_freq, mask_channel = select_channels(
            mask_freq, numpy.arange(len(mask_freq)), start_freq, end_freq
        )
        rfi_mask = rfi_mask[mask_channel]

    # Split mask and corresponding frequencies into chunks
    # and discard top and bottom chunks
    rfi_mask = split_array_into_chunks(rfi_mask, num_chunks)
    mask_freq = split_array_into_chunks(mask_freq, num_chunks)

    return rfi_mask, mask_freq


def _remove_bad_channels(obs_freqs, obs_channels, mask_freq, rfi_mask):
    """
    Applies the static RFI mask to the observing frequency chunks and
    automatically discards empty chunks

    :param obs_freqs: Observing frequencies in MHz in all frequency chunks
    :param obs_channels: Observing channels in all frequency chunks
    :param mask_freq: Static RFI mask frequencies in MHz in all
        frequency chunks
    :param rfi_mask: Static RFI mask in all frequency chunks

    :return: Observing frequencies and corresponding channels
        split into chunks
    """
    frequency_per_chunk = []
    channel_per_chunk = []
    LOG.info("Applying static RFI mask...")
    for obs_nu, obs_chan, mask_nu, mask in zip(
        obs_freqs, obs_channels, mask_freq, rfi_mask
    ):
        if len(obs_nu) < len(mask_nu):
            # The observation has fewer frequencies than the RFI mask
            # so check if the observing freqs fall within the frequencies
            # of the static RFI mask
            indices = numpy.array(
                [
                    i
                    for i, m_nu in enumerate(mask_nu)
                    if m_nu in (obs_nu[0], obs_nu[-1])
                ]
            )
            _check_matching_freqs(indices)
            start_index, end_index = numpy.sort(indices)
            mask = mask[start_index : end_index + 1]

        # Exclude empty chunks and store non-empty frequency and channel chunks
        if obs_nu[mask == 0].size == 0 and obs_chan[mask == 0].size == 0:
            continue
        frequency_per_chunk.append(obs_nu[mask == 0])
        channel_per_chunk.append(obs_chan[mask == 0])

    return frequency_per_chunk, channel_per_chunk


[docs] def apply_rfi_mask( freqs, channels, start_freq=None, end_freq=None, num_chunks=16 ): """ Apply RFI mask in HDF5 format pulled from the SKA Telescope Model Data Repository :param freqs: 1D array of frequencies in MHz [nchan] if num_chunks=1 and 2D array of frequencies in MHz if num_chunks>1 :param channels: 1D array of channel numbers if num_chunks=1 and 2D array of channel numbers if num_chunks>1 :param num_chunks: Number of frequency chunks :return: Filtered frequency and channel numbers array """ static_mask_table = import_staticmasktable_from_hdf5("rfi_mask.h5") rfi_mask = static_mask_table.mask.data mask_freq = static_mask_table.frequency.data / 1.0e6 # Hz -> MHz rfi_mask, mask_freq = _select_mask_and_split( mask_freq, rfi_mask, start_freq, end_freq, num_chunks, ) # Check if the observing frequencies are fewer or more than those # in the static RFI mask _check_obs_mask_freqs(freqs, mask_freq) # Apply the RFI mask frequency_per_chunk, channel_per_chunk = _remove_bad_channels( freqs, channels, mask_freq, rfi_mask ) # Ensure all the frequency channels are not flagged if len(frequency_per_chunk) == len(channel_per_chunk) == 0: raise ValueError("All frequency channels are flagged!") return frequency_per_chunk, channel_per_chunk
[docs] def interp_timestamps(origin_data, origin_times, new_times): """ Interpolate the on-sky offsets at the visibility timestamps. :param origin_data: on-sky offsets in degrees with dimensions (nants, ntimes_origin, 2) :param origin_times: Original times in MJD [ntimes_origin] :param new_times: New times (from Visibility) [ntimes_new] :return: Interpolated on-sky offsets in degrees with dimensions (nants, ntimes_new, 2) """ if origin_data.ndim != 3 or origin_data.shape[2] != 2: LOG.warning( "Input offset data has the wrong shape, no interpolation done." ) return origin_data ntimes_new = new_times.shape[0] sort_index = numpy.argsort(origin_times) origin_times = origin_times[sort_index] origin_data = origin_data[:, sort_index] direction_xel = origin_data[:, :, 0] direction_el = origin_data[:, :, 1] output = numpy.zeros((origin_data.shape[0], ntimes_new, 2)) for i in range(origin_data.shape[0]): xel_ant = direction_xel[i] el_ant = direction_el[i] interp_xel = interp1d( x=origin_times, y=xel_ant, kind="zero", fill_value="extrapolate" ) interp_el = interp1d( x=origin_times, y=el_ant, kind="zero", fill_value="extrapolate" ) new_xel_ant = interp_xel(new_times) new_el_ant = interp_el(new_times) output[i, :, 0] = new_xel_ant output[i, :, 1] = new_el_ant return output
def _avg_amp(data, weights, fit_to_sep_pol=False, fit_to_vis=False): """ Computes the weighted-average of the visibility amplitudes in time, frequency, and polarisation. For the gain amplitudes, the weighted average is computed in time only (when fitting to the parallel-hands polarisations) or in time and polarisation (when fitting to the Stokes I amplitudes) :param data: Visibility amplitudes with dimensions (ntimes, nants, nchans, npols) or gain amplitudes with dimensions (ntimes, nants, npols) :param weights: Visibility or gain calibration weights. Have the same dimensions as data :param fit_to_sep_pol: Fit primary beams to the parallel-hands gain amplitudes :param fit_to_vis: Fit primary beams to the cross-correlation visibility amplitudes instead of the antenna gain amplitudes :return: Weighted-average visibility or gain amplitudes and weighted standard deviation. These amplitudes and their standard deviations have dimension (nants, ) if fitting to the Stokes I visibility and gain amplitudes and dimensions (nants, npols) when fitting to the parallel-hands polarisations gain amplitudes. """ if fit_to_vis: # Average in time, frequency, and polarisation axis = (0, 2, 3) elif fit_to_sep_pol: # Average in time only axis = 0 else: # Average in time and polarisation axis = (0, 2) return weighted_avg_and_std( data, axis=axis, weights=weights, std_error=False ) def _create_arrays_for_fitting( vis_list_size, ants_size, num_chunks, fit_to_sep_pol=False, ): """ Creates empty arrays to be populated for the beam fitting routine :param vis_list_size: Size of Visibility objects [nscans] :param ants_size: Size of katpoint Antenna objects [nants] :param num_chunks: Number of usable frequency chunks (integer) :param fit_to_sep_pol: Fit primary beam to the parallel-hands gain or visibility amplitudes instead of the Stokes I amplitudes :return: Empty array of the Stokes I or parallel-hands polarisation visibility or gain amplitudes, their standard deviations, and frequencies in all chunks (should be same for all scans) """ frequency_per_chunk = numpy.zeros(num_chunks) if fit_to_sep_pol: y_per_scan = numpy.zeros((2, ants_size, num_chunks, vis_list_size)) stddev_per_scan = numpy.zeros( (2, ants_size, num_chunks, vis_list_size) ) else: y_per_scan = numpy.zeros((ants_size, num_chunks, vis_list_size)) stddev_per_scan = numpy.zeros((ants_size, num_chunks, vis_list_size)) return y_per_scan, stddev_per_scan, frequency_per_chunk
[docs] class ExtractPerScan: """ Extracts the visibility or gain amplitude and the antenna pointings for each scan required by the beam fitting routine. :param vis_list: List of Visibility object :param on_sky_offsets_list: A list of on-sky offsets in cross-elevation and elevation :param ants: List of katpoint Antenna objects [nants] :param fit_to_sep_pol: Fit primary beams to the parallel-hands gain or visibility amplitudes """ def __init__( self, vis_list, on_sky_offsets_list, ants, fit_to_sep_pol=False, ): self.vis_list = vis_list self.on_sky_offsets_list = on_sky_offsets_list self.ants = ants self.fit_to_sep_pol = fit_to_sep_pol self.x_per_scan = numpy.zeros( (len(self.on_sky_offsets_list), len(self.ants), 2) ) ( self.y_per_scan, self.stddev_per_scan, self.frequency_per_chunk, ) = _create_arrays_for_fitting( len(vis_list), len(ants), len(vis_list[0]), self.fit_to_sep_pol ) for scan, on_sky_offset in enumerate(self.on_sky_offsets_list): # Average antenna pointings (nants, ntimes, 2) in time self.x_per_scan[scan] = on_sky_offset.mean(axis=1)
[docs] def from_vis(self): """ Extracts the cross-correlation amplitudes for all discrete pointing scans required by the beam fitting routine :return: on-sky offsets for all discrete pointing scans, the weighted-average visibility amplitudes and their standard deviations for all discrete pointing scans, and the frequency at the higher end of the band in Hz """ if len(self.ants) > 2: raise ValueError( "Too many antennas present to use the fit_to_vis " "option. Do not set this option to fit the voltage " "beams to the gain amplitudes!" ) for scan, vis in enumerate(self.vis_list): # Get cross-correlation visibility amplitudes crosscorr_vis = get_crosscorrelations(vis) for i, visibility in enumerate(crosscorr_vis): weighted_vis_amp, stddev = _avg_amp( numpy.abs(visibility.vis.data), visibility.weight.data, fit_to_sep_pol=False, fit_to_vis=True, ) if scan == 0: # We want to use the frequency at the higher end of the # frequency range for better pointing accuracy self.frequency_per_chunk[i] = visibility.frequency.data[-1] self.y_per_scan[:, i, scan] = weighted_vis_amp self.stddev_per_scan[:, i, scan] = stddev return ( self.x_per_scan, self.y_per_scan, self.stddev_per_scan, self.frequency_per_chunk, )
[docs] def from_gains(self, use_modelvis=False): """ Extracts the gain amplitude for all discrete pointing scans required by the beam fitting routine :param use_modelvis: Use a model visibility :return: on-sky offsets for all discrete pointing scans, the weighted-average gain amplitudes and their standard deviation for all discrete pointing scans, and 1D array of the averaged-frequency in each chunk in Hz """ if len(self.ants) < 3: raise ValueError( "Inadequate number of antennas present for " "gain calibration. Use the fit_to_vis option " "instead for the power beams to be fitted to " "the visibility amplitudes!" ) # Solve for the un-normalised G terms for each scan for scan, vis in enumerate(self.vis_list): LOG.info( "Solving for the antenna complex gains for Scan %d", scan + 1, ) gt_list = compute_gains(vis) for i, gt in enumerate(gt_list): # Gains have shape (time, ants, freqs, receptor1, # receptor2). Extract the parallel-hands polarisation # gains and compute the weighted average of the gain # amplitudes gt_amp, gt_weights = extract_parallel_hand_gains(gt) weighted_gt_amp, stddev = _avg_amp( gt_amp, gt_weights, self.fit_to_sep_pol, fit_to_vis=False, ) if scan == 0: self.frequency_per_chunk[i] = numpy.squeeze( gt.frequency.data ) if use_modelvis: # Applies only to the simulated data where the # noise on the gains is not yet realistic. # Therefore in order to obtain good fits, the # stddev per scan must be set to a dummy value. stddev[:] = 1.0 if self.fit_to_sep_pol: self.y_per_scan[:, :, i, scan] = weighted_gt_amp.T self.stddev_per_scan[:, :, i, scan] = stddev.T else: self.y_per_scan[:, i, scan] = weighted_gt_amp self.stddev_per_scan[:, i, scan] = stddev return ( self.x_per_scan, self.y_per_scan, self.stddev_per_scan, self.frequency_per_chunk, )
# pylint: disable=too-many-locals,too-many-statements
[docs] def fitted_offsets_weighted_average(ants, fitted_beams, fit_to_vis=False): """ Compute the weighted average of the fitted pointing offsets. :param ants: A list of katpoint Antenna objects [nants]. :param fitted_beams: A dictionary of the fitted beams obtained from :func:`beam_fitting.py.SolveForOffsets.fit_to_gains` or :func:`beam_fitting.py.SolveForOffsets.fit_to_vis`. :param fit_to_vis: Fit primary beam to cross-correlation visibility amplitudes instead of the antenna gain amplitudes. :return: The weighted average of the valid fitted cross-elevation and elevation offsets and their standard deviations in radians, the theoretical and fitted beamwidths (and uncertainty) in radians, and the fitted height and its uncertainty for each dish. """ fitted_parameters = create_dict_for_output_offsets(len(ants)) if fit_to_vis: for antenna_name in fitted_beams.keys(): beams_freq = fitted_beams.get(antenna_name, []) fitted_parameters["antenna_names"] = antenna_name beams_freq = [ b for b in beams_freq if b is not None and b.is_valid ] if not beams_freq: LOG.warning( "%s had no valid primary beam fitted", antenna_name ) continue # Get the fitted centres, width, height and their uncertainties offsets_freq = numpy.array([b.centre for b in beams_freq]) offsets_freq_stderr = numpy.array( [b.std_centre for b in beams_freq] ) width_freq = numpy.array([b.width for b in beams_freq]) width_freq_stderr = numpy.array([b.std_width for b in beams_freq]) height_freq = numpy.array([b.height for b in beams_freq]) height_freq_stderr = numpy.array( [b.std_height for b in beams_freq] ) expected_width_freq = numpy.array( [b.expected_width for b in beams_freq] ) # Do a weighted-average of the fitted parameters over # frequency chunks. The expected width is just the # average since it has no uncertainty if len(offsets_freq) > 1: pointing_offset, pointing_offset_stderr = weighted_avg_and_std( data=offsets_freq, axis=0, weights=1.0 / offsets_freq_stderr**2, std_error=True, ) width, width_stderr = weighted_avg_and_std( data=width_freq, axis=0, weights=1.0 / width_freq_stderr**2, std_error=True, ) height, height_stderr = weighted_avg_and_std( data=height_freq, axis=0, weights=1.0 / height_freq_stderr**2, std_error=True, ) else: pointing_offset = numpy.squeeze(offsets_freq) pointing_offset_stderr = numpy.squeeze(offsets_freq_stderr) width = numpy.squeeze(width_freq) width_stderr = numpy.squeeze(width_freq_stderr) height = height_freq height_stderr = height_freq_stderr expected_width = expected_width_freq.mean(axis=0) # Store the fitted parameters fitted_parameters["xel_offset"] = numpy.radians(pointing_offset[0]) fitted_parameters["xel_offset_std"] = numpy.radians( pointing_offset_stderr[0] ) fitted_parameters["el_offset"] = numpy.radians(pointing_offset[1]) fitted_parameters["el_offset_std"] = numpy.radians( pointing_offset_stderr[1] ) fitted_parameters["expected_width"] = numpy.radians(expected_width) fitted_parameters["fitted_width"] = numpy.radians(width) fitted_parameters["fitted_width_std"] = numpy.radians(width_stderr) fitted_parameters["fitted_height"] = height fitted_parameters["fitted_height_std"] = height_stderr else: for i, antenna in enumerate(ants): fitted_parameters["antenna_names"][i] = antenna.name beams_freq = fitted_beams.get(antenna.name, []) beams_freq = [ b for b in beams_freq if b is not None and b.is_valid ] if not beams_freq: LOG.warning( "%s had no valid primary beam fitted", antenna.name ) continue # Get the fitted centres, width, height and their uncertainties offsets_freq = numpy.array([b.centre for b in beams_freq]) offsets_freq_stderr = numpy.array( [b.std_centre for b in beams_freq] ) width_freq = numpy.array([b.width for b in beams_freq]) width_freq_stderr = numpy.array([b.std_width for b in beams_freq]) height_freq = numpy.array([b.height for b in beams_freq]) height_freq_stderr = numpy.array( [b.std_height for b in beams_freq] ) expected_width_freq = numpy.array( [b.expected_width for b in beams_freq] ) # Do a weighted-average of the fitted parameters over # frequency chunks. The expected width is just the # average since it has no uncertainty if len(offsets_freq) > 1: pointing_offset, pointing_offset_stderr = weighted_avg_and_std( data=offsets_freq, axis=0, weights=1.0 / offsets_freq_stderr**2, std_error=True, ) width, width_stderr = weighted_avg_and_std( data=width_freq, axis=0, weights=1.0 / width_freq_stderr**2, std_error=True, ) height, height_stderr = weighted_avg_and_std( data=height_freq, axis=0, weights=1.0 / height_freq_stderr**2, std_error=True, ) else: pointing_offset = numpy.squeeze(offsets_freq) pointing_offset_stderr = numpy.squeeze(offsets_freq_stderr) width = numpy.squeeze(width_freq) width_stderr = numpy.squeeze(width_freq_stderr) height = height_freq height_stderr = height_freq_stderr expected_width = expected_width_freq.mean(axis=0) # Store the fitted parameters fitted_parameters["xel_offset"][i] = numpy.radians( pointing_offset[0] ) fitted_parameters["xel_offset_std"][i] = numpy.radians( pointing_offset_stderr[0] ) fitted_parameters["el_offset"][i] = numpy.radians( pointing_offset[1] ) fitted_parameters["el_offset_std"][i] = numpy.radians( pointing_offset_stderr[1] ) fitted_parameters["expected_width"][i] = numpy.radians( expected_width ) fitted_parameters["fitted_width"][i] = numpy.radians(width) fitted_parameters["fitted_width_std"][i] = numpy.radians( width_stderr ) fitted_parameters["fitted_height"][i] = height fitted_parameters["fitted_height_std"][i] = height_stderr return fitted_parameters
# pylint: disable=too-many-arguments,too-many-positional-arguments
[docs] def create_offset_pointing_table( output_offsets, frequency_per_chunk, reference_mjd, reference_commanded_azel, vis_list, x_per_scan, track_duration, ): """ Writes the computed pointing offsets and other fitted parameters into a PointingTable. The pointing offsets in cross-elevation and elevation in radians are stored as the `pointing` variable. The expected width in the H and V directions in radians are stored as the `expected_width` variable. The fitted width in the H and V directions in radians are stored as the `fitted_width` variable. The fitted height in arbitrary units is stored as the `fitted_height` variable. The uncertainties on the fitted width and height are stored as the `fitted_width_std` and `fitted_height_std` variables respectively. The uncertainties on the pointing offsets in the `pointing` variable are converted to weights as the inverse square of the uncertainties and stored as the `weight` variable. :param output_offsets: A list of dictionary of the weighted-average pointing offsets output by :func:`array_data_func.weighted_average` when fitting to the parallel-hands amplitudes and a dictioanry of these pointing offsets when fitting to the Stokes I amplitudes. :param frequency_per_chunk: 1D array of the averaged-frequency in each chunk in Hz [num_chunks] :param reference_mjd: Reference timestamp (s) in MJD :param reference_commanded_azel: Commanded AzEl at the `reference_mjd` in radians with dimension (nants, 2) when fitting to the gain amplitudes. When fitting to the visibility amplitudes, the dimension is (ntimes, nants, 2) where ntimes is the number of the `reference_mjd` which is expected to be 2 :param vis_list: List of Visibility :param x_per_scan: The commanded pointings relative to the target in the reference pointing observation in xEl-El coordinates for one antenna with dimensions (nscans, 2) :param track_duration: A list of how long each scan was tracked for in seconds :param num_chunks: Number of frequency chunks (integer) :return: PointingTable of the computed pointing offsets and other fitted parameters for each antenna """ reference_mjd = numpy.array(reference_mjd).ravel() discrete_offset = numpy.zeros((x_per_scan.shape[0], 2)) for i in range(x_per_scan.shape[0]): discrete_offset[i] = numpy.stack((x_per_scan[i, 0], x_per_scan[i, 1])) discrete_offset = discrete_offset.round(2) band_type = observing_band(frequency_per_chunk) scan_mode = f"{x_per_scan.shape[0]}-point" # We expect equal track duration for each pointing scan # so we will only write the value of just one scan # to the PointingTable track_duration = track_duration[0] if reference_commanded_azel is not None: if isinstance(reference_commanded_azel, list): # Two-dish mode scenario so will have dimensions # (nobs/ntimes, nants, 2), where nobs=2 because # there are two sets of observations and in each # observation, the reference time is calculated. # Each set of reference commanded azel has shape # (nants, 2) and will be reshaped to (1, nants, 1, 1, 2) reference_commanded_azel = numpy.radians( numpy.stack( (reference_commanded_azel[0], reference_commanded_azel[1]) ) ) reference_commanded_azel = reference_commanded_azel[ :, :, numpy.newaxis, numpy.newaxis ] else: # Three-dish mode scenario # The reference commanded azel has shape (nants, ntimes, 2) # but will be reshaped to (ntimes, nants, 1, 1, 2) reference_commanded_azel = numpy.radians( reference_commanded_azel[ numpy.newaxis, :, numpy.newaxis, numpy.newaxis, : ] ) # In the two-dish mode, ensure the fitted offsets are matched # to the right antennas if ( output_offsets["antenna_names"] != vis_list[0][0].attrs["configuration"].names.data ).all(): LOG.info( "Order of Antenna names and antenna names in Configuration " "do not match. Fixing it and their corresponding " "fitted parameters before writing to PointingTable" ) output_offsets = antenna_and_offsets_reorder(output_offsets) # Extract the cross-elevation and elevation offsets pointing_offsets = numpy.dstack( ( output_offsets["xel_offset"], output_offsets["el_offset"], ) ) pointing_offsets = pointing_offsets[ :, :, numpy.newaxis, numpy.newaxis, : ].repeat(len(reference_mjd), axis=0) # Extract the uncertainties on the cross-elevation and # elevation offsets pointing_offsets_std = numpy.dstack( ( output_offsets["xel_offset_std"], output_offsets["el_offset_std"], ) ) pointing_offsets_std = pointing_offsets_std[ :, :, numpy.newaxis, numpy.newaxis, : ].repeat(len(reference_mjd), axis=0) weights = 1.0 / pointing_offsets_std**2 # Extract the expected width expected_width = output_offsets["expected_width"] expected_width = expected_width[ numpy.newaxis, :, numpy.newaxis, numpy.newaxis, : ].repeat(len(reference_mjd), axis=0) # Extract the fitted width fitted_width = output_offsets["fitted_width"] fitted_width = fitted_width[ numpy.newaxis, :, numpy.newaxis, numpy.newaxis, : ].repeat(len(reference_mjd), axis=0) # Extract the uncertainties on the fitted width fitted_width_std = output_offsets["fitted_width_std"] fitted_width_std = fitted_width_std[ numpy.newaxis, :, numpy.newaxis, numpy.newaxis, : ].repeat(len(reference_mjd), axis=0) # Extract the fitted height fitted_height = output_offsets["fitted_height"] fitted_height = fitted_height[ numpy.newaxis, :, numpy.newaxis, numpy.newaxis ].repeat(len(reference_mjd), axis=0) # Extract the uncertainties on the fitted height fitted_height_std = output_offsets["fitted_height_std"] fitted_height_std = fitted_height_std[ numpy.newaxis, :, numpy.newaxis, numpy.newaxis ].repeat(len(reference_mjd), axis=0) # The Cross-El and El offsets are in radians so # we write them directly into the PointingTable return PointingTable.constructor( pointing=pointing_offsets, expected_width=expected_width, fitted_width=fitted_width, fitted_width_std=fitted_width_std, fitted_height=fitted_height, fitted_height_std=fitted_height_std, time=reference_mjd, weight=weights, frequency=numpy.ravel(frequency_per_chunk.mean()), receptor_frame=ReceptorFrame("stokesI"), pointing_frame="xel-el", band_type=band_type, scan_mode=scan_mode, track_duration=track_duration, discrete_offset=discrete_offset, commanded_pointing=reference_commanded_azel, pointingcentre=vis_list[0][0].phasecentre, configuration=vis_list[0][0].configuration, )