Source code for ska_sdp_wflow_pointing_offset.beam_fitting

"""
Fits primary beams modelled by a 2D Gaussian to the visibility
or gain amplitudes and computes the cross-elevation and elevation
offsets for each dish. These routines follow those used by the
South African Radio Astronomy Observatory (SARAO) team for the
MeerKAT array.
"""

import logging

import numpy
from katpoint import lightspeed
from scikits.fitting import GaussianFit, ScatterFit

from ska_sdp_wflow_pointing_offset.utils import get_off_source_antenna_index

LOG = logging.getLogger("ska-sdp-pointing-offset")


def _fwhm_to_sigma(fwhm):
    """
    Standard deviation of Gaussian function with specified
    FWHM beamwidth.

    :param fwhm: Full-width half-maximum (FWHM) beamwidth

    :return: The standard deviation of a Gaussian beam pattern
        with a specified full-width half-maximum (FWHM) beamwidth.
        This beamwidth is the width between the two points left and
        right of the peak where the Gaussian function attains half
        its maximum value
    """
    # Gaussian function reaches half its peak value at sqrt(2 log 2)*sigma
    return fwhm / 2.0 / numpy.sqrt(2.0 * numpy.log(2.0))


def _sigma_to_fwhm(sigma):
    """
    FWHM beamwidth of Gaussian function with specified standard
    deviation.

    :param sigma: The standard deviation of a Gaussian beam pattern
    with a specified full-width half-maximum (FWHM) beamwidth

    :return: The full-width half-maximum (FWHM) beamwidth of a Gaussian
        beam pattern with a specified standard deviation. This beamwidth
        is the width between the two points left and right of the peak
        where the Gaussian function attains half its maximum value
    """
    # Gaussian function reaches half its peak value at sqrt(2 log 2)*sigma
    return 2.0 * numpy.sqrt(2.0 * numpy.log(2.0)) * sigma


# pylint: disable=too-many-instance-attributes,abstract-method
[docs] class BeamPatternFit(ScatterFit): """ Fit analytic beam pattern to total power data defined on 2-D plane. This fits a two-dimensional Gaussian curve (with diagonal covariance matrix) to total power data as a function of 2-D coordinates. The Gaussian bump represents an antenna beam pattern convolved with a point source. :param centre: Initial guess of 2-element beam centre, in target coordinate units :param width: Initial guess of single beam width for both dimensions, or 2-element beam width vector, expressed as FWHM in units of target coordinates :param height: Initial guess of beam pattern amplitude or height Attributes ---------- expected_width: real array, shape (2,), or float Initial guess of beamwidth, saved as expected width for checks is_valid : bool True if beam parameters are within reasonable ranges after fit std_centre: array of float, shape (2,) Standard error of beam centre, only set after :func:`fit` std_width: array of float, shape (2,), or float Standard error of beamwidth(s), only set after :func:`fit` std_height: float Standard error of beam height, only set after :func:`fit` """ def __init__(self, centre, width, height): super().__init__() if not numpy.isscalar(width): width = numpy.atleast_1d(width) self._interp = GaussianFit(centre, _fwhm_to_sigma(width), height) self.centre = self._interp.mean self.width = _sigma_to_fwhm(self._interp.std) self.height = self._interp.height self.expected_width = width self.is_valid = False self.std_centre = self.std_width = self.std_height = None
[docs] def fit(self, x, y, std_y=1.0, thresh_width=1.15): """ Fit a beam pattern to data. The centre, width and height of the fitted beam pattern (and their standard errors) can be obtained from the corresponding member variables after this is run. :param x: Sequence of (2, N) target coordinates (as column vectors) :param y: Sequence of (N, ) corresponding total power values to fit :param std_y: Optional measurement error or uncertainty of (N, ) `y` values, expressed as standard deviation in units of `y`. :param thresh_width: The maximum ratio of the fitted to expected beamwidth :return: The fitted beam parameters (centre, width, height and their uncertainties) """ self._interp.fit(x, y, std_y) self.centre = self._interp.mean self.width = _sigma_to_fwhm(self._interp.std) self.height = self._interp.height self.std_centre = self._interp.std_mean self.std_width = _sigma_to_fwhm(self._interp.std_std) self.std_height = self._interp.std_height self.is_valid = not any(numpy.isnan(self.centre)) and self.height > 0.0 # Validation of the fitted beam using SNR and the size of the # fitted width compared to the expected. The fitted beam can # only be equal to the expected or greater than the expected # by less than thresh_width fit_snr = self._interp.std / self._interp.std_std norm_width = self.width / self.expected_width self.is_valid &= ( all(norm_width > 0.9) and all(norm_width < thresh_width) and all(fit_snr) > 0.0 )
[docs] class SolveForOffsets: """ Fit the beam pattern to the visibility or gain amplitudes and outputs the fitted parameters and their uncertainties. :param x_per_scan: The commanded pointings relative to the target in the reference pointing observation in xEl-El coordinates with dimensions (nscans, nants, 2) :param y_per_scan: Visibility or gain amplitudes of all antennas for all scans with dimensions (nants, num_chunks, nscans), where num_chunks is the number of usable frequency chunks :param stddev_per_scan: The weighted standard deviations from the weighted-average of the visibility or gain amplitudes. Has same shape as y_per_scan :param frequency_per_chunk: 1D array of the usable frequencies in each chunk in Hz. When fitting the beams to the visibility amplitudes across the entire band, this frequency is that at the higher end of the band for better pointing accuracy. If num_chunks > 1, then this frequency is the frequency at the higher end of each chunk :param beamwidth_factor: The beamwidth factor for the two orthogonal directions. Two values are expected as one value for the horizontal co-polarisation and the other value for the vertical co-polarisation. These values often range between 1.03 and 1.22 depending on the illumination pattern of the dish :param ants: A list of katpoint Antenna objects [nants] :param thresh_width: The tolerance on the fitted beamwidth i.e, the ratio of the fitted to expected beamwidths """ # pylint: disable-next=too-many-arguments,too-many-positional-arguments def __init__( self, x_per_scan, y_per_scan, stddev_per_scan, frequency_per_chunk, beamwidth_factor, ants, thresh_width, ): self.x_per_scan = x_per_scan self.y_per_scan = y_per_scan self.stddev_per_scan = stddev_per_scan self.frequency_per_chunk = frequency_per_chunk self.beamwidth_factor = beamwidth_factor self.ants = ants self.thresh_width = thresh_width # Collect the fitted beams self.beams = {} # Minimum of 5 points required for the fitting to work # so data from 5 scans at least are required if self.x_per_scan.shape[0] < 5: raise ValueError( "Number of required data points below the minimum of 5!" ) if len(self.frequency_per_chunk) != self.y_per_scan.shape[1]: raise ValueError( "The number of usable frequency chunks does " "not match the frequency chunks whose " "amplitudes have been extracted!" )
[docs] def fit_to_vis(self): """ Fit the primary beams to the visibility amplitude of the dish under test i.e, the moving dish in each observation group and returns the fitted parameters and their uncertainties. :return: A dictionary of the fitted beams (parameters and their uncertainties) for the moving dish i.e, the dish performing the five-point or similar scan """ # Get the index of the moving dish so the beams can be # fitted to its visibility amplitudes. The index is a list # with a single element off_source_antenna_index = get_off_source_antenna_index( self.x_per_scan )[0] for chunk, freq in enumerate(self.frequency_per_chunk): wavelength = lightspeed / freq # Calculate the theoretical or expected power beamwidth expected_width = numpy.degrees( numpy.array(self.beamwidth_factor) * wavelength / self.ants[off_source_antenna_index].diameter ) LOG.info( "Fitting power beams to sub-band %d visibility " "amplitudes...", chunk + 1, ) fitted_beam = BeamPatternFit( centre=(0.0, 0.0), width=expected_width, height=1.0 ) fitted_beam.fit( x=self.x_per_scan[:, off_source_antenna_index].T, y=self.y_per_scan[off_source_antenna_index, chunk], std_y=self.stddev_per_scan[off_source_antenna_index, chunk], thresh_width=self.thresh_width, ) # Collect the fitted beams beams_freq = self.beams.get( self.ants[off_source_antenna_index].name, [None] * len(self.frequency_per_chunk), ) beams_freq[chunk] = fitted_beam self.beams[self.ants[off_source_antenna_index].name] = beams_freq return self.beams
[docs] def fit_to_gains(self): """ Fit the primary beams to the gain amplitudes of each dish and returns the fitted parameters and their uncertainties. :return: A dictionary of the fitted beams (parameters and their uncertainties) """ for i, antenna in enumerate(self.ants): # Compute the expected or theoretical voltage beamwidth: Convert # power beamwidth (for single dish) to gain/voltage beamwidth # (interferometer) for chunk, freq in enumerate(self.frequency_per_chunk): wavelength = lightspeed / freq expected_width = numpy.sqrt(2) * numpy.degrees( numpy.array(self.beamwidth_factor) * wavelength / antenna.diameter ) LOG.info( "Fitting voltage beams to sub-band %d " "gain amplitudes...", chunk + 1, ) fitted_beam = BeamPatternFit( centre=(0.0, 0.0), width=expected_width, height=1.0 ) fitted_beam.fit( x=self.x_per_scan[:, i].T, y=self.y_per_scan[i, chunk], std_y=self.stddev_per_scan[i, chunk], thresh_width=self.thresh_width, ) # Collect the fitted beams beams_freq = self.beams.get( antenna.name, [None] * len(self.frequency_per_chunk) ) beams_freq[chunk] = fitted_beam self.beams[antenna.name] = beams_freq return self.beams