Source code for ska_sdp_wflow_pointing_offset.utils

"""
Util functions for:

1. Constructing katpoint antenna information
2. Constructing Measurement Sets names based on ScanIDs
3. Determining observing frequency band

4. Storing errors and traceback in the config db when running the
   pipeline in SDP

5. Extracts the values from the dictionary of the fitted parameters
   and outputs them as a numpy array in degrees.

6. Extracting the index of antenna under test in a two-dish mode scenario
7. Merging the output offsets from a two-dish mode scenario

8. Computing the weighted average of the output offsets for the
   parallel hands gain or visibility amplitudes

9. Computing a weighted-average of the fitted parameters and their
   standard deviations from beams fitted to the Stokes I amplitudes
   or parallel-hands gain amplitudes

10. Converting the dictionary of fitted parameters to a numpy array
    and to a structured numpy array

11. Generating some parameters required for plotting the Gaussian fits
    in the two-dish mode scenario

12. Splitting array into chunks and discards the first and last chunks if
    the number of chunks is greater than 1

13. Extracting the cross-correlation Visibility object of the
    parallel-hands polarisations

14. Selecting frequency ranges for input data and splitting into chunks
"""

import glob
import logging
import traceback
from copy import deepcopy

import numpy
from katpoint import Antenna
from ska_sdp_datamodels.science_data_model import PolarisationFrame
from ska_sdp_datamodels.visibility import Visibility
from ska_sdp_func_python.util.coordinate_support import ecef_to_lla

LOG = logging.getLogger("ska-sdp-pointing-offset")

DATA_STRUCTURE = numpy.dtype(
    [
        ("antenna_name", "U8"),
        ("last_scan_index", "f8"),
        ("xel_offset", "f8"),
        ("xel_offset_std", "f8"),
        ("el_offset", "f8"),
        ("el_offset_std", "f8"),
        ("expected_width_h", "f8"),
        ("expected_width_v", "f8"),
        ("fitted_width_h", "f8"),
        ("fitted_width_h_std", "f8"),
        ("fitted_width_v", "f8"),
        ("fitted_width_v_std", "f8"),
        ("fitted_height", "f8"),
        ("fitted_height_std", "f8"),
    ]
)


[docs] def construct_antennas(xyz, diameter, station): """ Construct list of katpoint Antenna objects based on telescope configuration information. :param xyz: xyz coordinates of antenna positions in [nants, 3] :param diameter: Diameter of dishes in [nants] :param station: List of the antenna names [nants] :return: a set of katpoint Antenna objects """ latitude, longitude, altitude = ecef_to_lla( x=xyz[:, 0], y=xyz[:, 1], z=xyz[:, 2] ) ants = [] for ant_name, diam, lat, long, alt in zip( station, diameter, latitude, longitude, altitude ): # Antenna information # The beamwidth is HPBW of an antenna: k * lambda/D # We use an estimate of k=1.22 here but not used in # calculating the beamwidth as k is passed from # the command line. The "beamwidth" as used in ant is # actually referring to the beamwidth factor, k. ant = Antenna( name=ant_name, latitude=lat, longitude=long, altitude=alt, diameter=diam, delay_model=None, pointing_model=None, beamwidth=1.22, ) ants.append(ant) return ants
[docs] def construct_ms_file_name(data_dir, scan_ids=None): """ Construct filenames based on scanID. Assumes one MS per scan ID. :param data_dir: Given directory name containing Measurement Sets :param scan_ids: A list of scan IDs :return: A list of Measurement Sets filenames """ file_extensions = "*.ms" scan_ids_track = deepcopy(scan_ids) files_in_dir = glob.glob(f"{data_dir}/{file_extensions}") ms_files = [] # Find files that contain the specific string for file in files_in_dir: # Define what is to be searched in file_text = file.lower() for scan_id in scan_ids_track: # Assume current file name likes *scan-0.ms. file_content = f"scan-{scan_id}.ms" if file_content in file_text: ms_files.append(file) scan_ids_track.remove(scan_id) break if scan_ids_track: raise ( FileNotFoundError( f"Missing data: The following scan IDs do not have associated " f"MeasurementSets in the directory: {scan_ids_track}" ) ) return ms_files
[docs] def observing_band(frequency_per_chunk): """ Compares the frequency per chunk to the dedicated frequency bands for the SKA and returns which observing band was used. The observing frequency bands were obtained from https://www.skao.int/en/science-users/118/ska-telescope-specifications#__otpm4 :param frequency_per_chunk: 1D array of the averaged-frequency in each chunk in Hz [num_chunks] :return: The frequency band within which an observation falls """ bands = { "SKA1-Low": numpy.linspace(50.0e6, 350.0e6, 300), "Band 1": numpy.linspace(350.0e6, 1050.0e6, 700), "Band 2": numpy.linspace(950.0e6, 1760.0e6, 810), "Band 3": numpy.linspace(1650.0e6, 3050.0e6, 1400), "Band 4": numpy.linspace(2800.0e6, 5180.0e6, 2380), "Band 5a": numpy.linspace(4600.0e6, 8500.0e6, 4000), "Band 5b": numpy.linspace(8300.0e6, 15300.0e6, 5000), } try: # num_chunks > 1 lower_band = frequency_per_chunk[0] upper_band = frequency_per_chunk[-1] except IndexError: # num_chunks = 1 lower_band = upper_band = frequency_per_chunk band_type = [ key for key, val in bands.items() if (lower_band >= val[0] and upper_band <= val[-1]) ] if len(band_type) == 0: band_type = None else: band_type = band_type[0] return band_type
[docs] def handle_exception(err, traceback_frames, scan_ids): """ When an exception occurs store the error and traceback in the config db :param err: Exception thrown :param traceback_frames: Exception traceback object :param scan_ids: list of processed scan IDs :return: dict of error information """ # collect the error type error_type = type(err).__name__ error_message = str(err) full_error = f"{error_type}: {error_message}" # collect and log the error and traceback trace = traceback.format_exc() LOG.error("Pipeline unsuccessful with error: %s", full_error) LOG.error("Traceback: %s", trace) stack_summary = traceback.extract_tb(traceback_frames) # Access the last line last_line = stack_summary[-1] exception_data = { "scan_ids": scan_ids, "error": full_error, "traceback": { "file": last_line.filename, "line": last_line.lineno, "function": last_line.name, }, "status": "failed", } return exception_data
[docs] def antenna_and_offsets_reorder(offsets): """ Re-orders the antenna names and their corresponding fitted parameters in the two-dish scenario to match those in the output HDF5 file :param offsets: A dictionary of the weighted-average pointing offsets output by :func:`array_data_func.weighted_average` :return: A dictionary of the input offsets with the right antenna names and fitted offsets ordering """ offsets["antenna_names"] = numpy.flip(offsets["antenna_names"]) offsets["xel_offset"] = numpy.flip(offsets["xel_offset"]) offsets["xel_offset_std"] = numpy.flip(offsets["xel_offset_std"]) offsets["el_offset"] = numpy.flip(offsets["el_offset"]) offsets["el_offset_std"] = numpy.flip(offsets["el_offset_std"]) offsets["expected_width"] = numpy.flipud(offsets["expected_width"]) offsets["fitted_width"] = numpy.flipud(offsets["fitted_width"]) offsets["fitted_width_std"] = numpy.flipud(offsets["fitted_width_std"]) offsets["fitted_height"] = numpy.flip(offsets["fitted_height"]) offsets["fitted_height_std"] = numpy.flip(offsets["fitted_height_std"]) return offsets
[docs] def convert_dict_to_numpy_array(offsets): """ Extracts the values from the dictionary of the fitted parameters and outputs them as a numpy array. :param offsets: A dictionary of the weighted-average pointing offsets output by :func:`array_data_func.weighted_average` :return: A numpy array of the input offsets """ offsets = numpy.column_stack( ( offsets["antenna_names"], offsets["xel_offset"], offsets["xel_offset_std"], offsets["el_offset"], offsets["el_offset_std"], offsets["expected_width"][:, 0], offsets["expected_width"][:, 1], offsets["fitted_width"][:, 0], offsets["fitted_width_std"][:, 0], offsets["fitted_width"][:, 1], offsets["fitted_width_std"][:, 1], offsets["fitted_height"], offsets["fitted_height_std"], ) ) return offsets
[docs] def create_empty_array(antennas): """ Create an empty array with 13 columns and as many rows as there are receptors to send to kafka if there is an exception while processing scans :param antennas: list of antenna names :return: empty array to send to kafka """ # Give antenna empty strings # And the rest NaN values ants = numpy.array(antennas).reshape(len(antennas), 1) values = numpy.full([len(antennas), 12], numpy.nan) data = numpy.column_stack((ants, values)) return data
[docs] def convert_offsets_to_degrees_for_kafka(offsets): """ Extracts the values from the dictionary of the fitted parameters and outputs them as a numpy array in degrees. :param offsets: A dictionary of the weighted-average pointing offsets output by :func:`array_data_func.weighted_average` :return: A converted numpy array where all the input offsets are now in degrees except the fitted heights and its uncertainty are in arbitrary units """ offsets = numpy.column_stack( ( offsets["antenna_names"], numpy.degrees(offsets["xel_offset"]), numpy.degrees(offsets["xel_offset_std"]), numpy.degrees(offsets["el_offset"]), numpy.degrees(offsets["el_offset_std"]), numpy.degrees(offsets["expected_width"][:, 0]), numpy.degrees(offsets["expected_width"][:, 1]), numpy.degrees(offsets["fitted_width"][:, 0]), numpy.degrees(offsets["fitted_width_std"][:, 0]), numpy.degrees(offsets["fitted_width"][:, 1]), numpy.degrees(offsets["fitted_width_std"][:, 1]), offsets["fitted_height"], offsets["fitted_height_std"], ) ) return offsets
[docs] def get_off_source_antenna_index(on_sky_offsets): """ Extracts the index of the antenna under test i.e the antenna performing the five-point scan or similar from the on-sky offsets in a two-dish mode reference pointing observation :param on_sky_offsets: The commanded pointings relative to the target in the reference pointing observation in xEl-El coordinates with dimensions (nscans, nants, 2) :return: A list of the index of the antenna performing the five-point or similar scan """ moving_antenna_index = [] for index in range(on_sky_offsets.shape[1]): xel_min = abs(on_sky_offsets[:, index, 0].min()) el_min = abs(on_sky_offsets[:, index, 1].min()) if xel_min > 0.1 * xel_min and el_min > 0.1 * el_min: moving_antenna_index.append(index) if len(moving_antenna_index) != 1: raise ValueError( "Only one antenna is allowed to move to discrete offset pointings!" ) return moving_antenna_index
[docs] def create_dict_for_output_offsets(nants): """ Creates a dictionary of the fitted parameters and populates them with NaNs :param nants: The number of antennas :return: A dictionary of the fitted parameters populated with NaNs """ fitted_offset_params = numpy.full(nants, numpy.nan) fitted_width_params = numpy.full((nants, 2), numpy.nan) fitted_parameters = { "antenna_names": numpy.full(nants, numpy.nan, dtype=list), "xel_offset": fitted_offset_params.copy(), "xel_offset_std": fitted_offset_params.copy(), "el_offset": fitted_offset_params.copy(), "el_offset_std": fitted_offset_params.copy(), "expected_width": fitted_width_params.copy(), "fitted_width": fitted_width_params.copy(), "fitted_width_std": fitted_width_params.copy(), "fitted_height": fitted_offset_params.copy(), "fitted_height_std": fitted_offset_params.copy(), } return fitted_parameters
[docs] def merge_output_offsets(output_offsets_list): """ Merges the output offsets from a two-dish scenario observation into a single dictionary of the output offsets :param output_offsets_list: A list of output offsets :return: A dictionary of the merged output offsets """ def _populate_dict(key, single_offset, merged_offset): """ Put into merged offset for each key """ try: merged_offset[key][i] = single_offset[key] # If offset is already all NaNs # Then don't need to do anything except ValueError: pass merged_output_offsets = create_dict_for_output_offsets( nants=len(output_offsets_list) ) for i, output_offset in enumerate(output_offsets_list): for key in output_offset: _populate_dict(key, output_offset, merged_output_offsets) return merged_output_offsets
[docs] def weighted_avg_and_std(data, axis, weights, std_error=False): """ Compute weighted-average and standard deviation (or error). :param data: Data to be averaged (masked entries are ignored). :param weights: Array of weights with same shape as data along the specified axes. :param axis: Axis or axes along which to average data. :param std_error: If set then return standard error instead of standard deviation (default=False) :return: weighted average and weighted standard deviation (or error). """ # Mask any NaNs in the weights weights = numpy.ma.fix_invalid(weights) average, sumwt = numpy.ma.average( data, axis=axis, weights=weights, returned=True ) mean_subtracted = data - numpy.expand_dims(average, axis=axis) sumsqwt = (weights**2).sum(axis=axis) unbiased_variance = (mean_subtracted**2 * weights).sum(axis=axis) / ( sumwt - (sumsqwt / sumwt) ) if not std_error: std = numpy.sqrt(unbiased_variance) else: std = numpy.sqrt(unbiased_variance * sumsqwt) / sumwt return average, std
[docs] def average_output_offsets(output_offsets_list): """ Computes a weighted average of the output offsets for the parallel hands gain or visibility amplitudes output by :func:`array_data_func.fitted_offsets_weighted_average`. :param output_offsets_list: A list of output offsets. :return: A dictionary of the weighted-average output offsets. """ output_offsets_xx = output_offsets_list[0] output_offsets_yy = output_offsets_list[1] antenna_names = output_offsets_xx["antenna_names"] xel_offset, xel_offset_std = weighted_avg_and_std( data=numpy.array( (output_offsets_xx["xel_offset"], output_offsets_yy["xel_offset"]) ), axis=0, weights=1.0 / numpy.array( ( output_offsets_xx["xel_offset_std"], output_offsets_yy["xel_offset_std"], ) ) ** 2, std_error=True, ) el_offset, el_offset_std = weighted_avg_and_std( data=numpy.array( (output_offsets_xx["el_offset"], output_offsets_yy["el_offset"]) ), axis=0, weights=1.0 / numpy.array( ( output_offsets_xx["el_offset_std"], output_offsets_yy["el_offset_std"], ) ) ** 2, std_error=True, ) expected_width = numpy.ma.average( numpy.array( ( output_offsets_xx["expected_width"], output_offsets_yy["expected_width"], ) ), axis=0, ) width, width_std = weighted_avg_and_std( data=numpy.array( ( output_offsets_xx["fitted_width"], output_offsets_yy["fitted_width"], ) ), axis=0, weights=1.0 / numpy.array( ( output_offsets_xx["fitted_width_std"], output_offsets_yy["fitted_width_std"], ) ) ** 2, std_error=True, ) height, height_std = weighted_avg_and_std( data=numpy.array( ( output_offsets_xx["fitted_height"], output_offsets_yy["fitted_height"], ) ), axis=0, weights=1.0 / numpy.array( ( output_offsets_xx["fitted_height_std"], output_offsets_yy["fitted_height_std"], ) ) ** 2, std_error=True, ) fitted_parameters = create_dict_for_output_offsets( nants=len(antenna_names) ) # Populate the dictionary fitted_parameters["antenna_names"] = antenna_names fitted_parameters["xel_offset"] = xel_offset fitted_parameters["xel_offset_std"] = xel_offset_std fitted_parameters["el_offset"] = el_offset fitted_parameters["el_offset_std"] = el_offset_std fitted_parameters["expected_width"] = expected_width fitted_parameters["fitted_width"] = width fitted_parameters["fitted_width_std"] = width_std fitted_parameters["fitted_height"] = height fitted_parameters["fitted_height_std"] = height_std return fitted_parameters
[docs] def convert_to_structured(poffset_data, last_scan_index): """ Convert pointing offset data from a numpy structure of 13 elements (antenna_name plus 12 floats representing fitting values) to a numpy structured array. The names of the fields are taken to match those in array_data_func.weighted_average() Include the last scan index as the second element in this structured data :param poffset_data: Offsets as numpy column_data :param last_scan_index: Index of last scan :return: Offsets and last_scan_index as structured numpy data """ nants = poffset_data.shape[0] structured_data = numpy.zeros(0, dtype=DATA_STRUCTURE) for ant in range(nants): ant_data = numpy.insert(poffset_data[ant], 1, last_scan_index) structured_data = numpy.append( structured_data, numpy.array(tuple(ant_data), dtype=DATA_STRUCTURE), ) return structured_data
# pylint: disable=too-many-locals
[docs] def get_parameters_for_gaussian_fits_plot( x_per_scan_list, y_per_scan_list, fitted_beams_list ): """ Generate some parameters required for plotting the Gaussian fits in the two-dish mode scenario i.e, when the beams are fitted to the visibility amplitudes by merging the parameters from the two sets of observations :param x_per_scan_list: List of on-sky offsets from the two sets of observations :param y_per_scan_list: List of visibility amplitudes from the two sets of observations :param fitted_beams_list: The list of fitted beams from the two sets of observations :return: The on-sky offsets with dimension (nscans, nants, 2), the vis amplitudes with dimensions (nants, nscans) when num_chunks=1 or (nants, num_chunks, nscans) when num_chunks > 1. Note that nants is always 2 because only two antennas are used in the observation when the beams are fitted to the visibility amplitudes """ # Convert the list of on-sky offsets to dimensions (nscans, nants, 2) # where nants=2 ant1_x_per_scan = x_per_scan_list[0] ant1_idx = get_off_source_antenna_index(ant1_x_per_scan)[0] ant1_x_per_scan = ant1_x_per_scan[:, ant1_idx] ant2_x_per_scan = x_per_scan_list[1] ant2_idx = get_off_source_antenna_index(ant2_x_per_scan)[0] ant2_x_per_scan = ant2_x_per_scan[:, ant2_idx] # We need axis 1 as the nants x_per_scan = numpy.swapaxes( numpy.stack((ant1_x_per_scan, ant2_x_per_scan)), 0, 1 ) # Convert the list of vis amplitudes to dimensions (nants, nscans) # when num_chunks=1 and (nants, num_chunks, nscans) when num_chunks > 1 # Note that num_chunks cannot be 2 because the first and last chunks # of the band are discarded ant1_y_per_scan = y_per_scan_list[0] ant2_y_per_scan = y_per_scan_list[1] ant1_y_per_scan = ant1_y_per_scan[ant1_idx] ant2_y_per_scan = ant2_y_per_scan[ant2_idx] y_per_scan = numpy.stack((ant1_y_per_scan, ant2_y_per_scan)) for (key1, value1), (key2, value2) in zip( fitted_beams_list[0].items(), fitted_beams_list[1].items() ): merged_fitted_beams = {key1: value1, key2: value2} return x_per_scan, y_per_scan, merged_fitted_beams
[docs] def get_crosscorrelations(vis): """ Extracts the cross-correlation visibilities and their weights from visibilities containing both auto and cross-correlations :param vis: Visibility object containing both auto and cross- correlations :return: Cross-correlation Visibility object for the parallel-hands polarisations """ vis_list = [] for visibility in vis: get_crosscorr = visibility.antenna1 != visibility.antenna2 crosscorr_vis = visibility.vis.data[:, get_crosscorr] crosscorr_weights = visibility.weight.data[:, get_crosscorr] baselines = visibility.baselines[get_crosscorr] flags = visibility.flags[:, get_crosscorr] uvw = visibility.uvw[:, get_crosscorr] if crosscorr_vis.shape[3] == 2: # Polarisations are (XX, YY) or (RR, LL) LOG.info( "Found two polarisations. Assumed to be the parallel-hands..." ) elif crosscorr_vis.shape[3] == 4: # Polarisations are (XX, XY, YX, YY) or (RR, RL, LR, LL) LOG.info( "Found four polarisations. Extracting the " "parallel hand polarisations..." ) crosscorr_vis = numpy.stack( (crosscorr_vis[:, :, :, 0], crosscorr_vis[:, :, :, 3]) ) crosscorr_vis = numpy.moveaxis(crosscorr_vis, 0, 3) crosscorr_weights = numpy.stack( (crosscorr_weights[:, :, :, 0], crosscorr_weights[:, :, :, 3]) ) crosscorr_weights = numpy.moveaxis(crosscorr_weights, 0, 3) else: raise ValueError("Polarisation type not supported!") vis_list.append( Visibility.constructor( frequency=visibility.frequency, channel_bandwidth=visibility.channel_bandwidth, phasecentre=visibility.phasecentre, configuration=visibility.configuration, uvw=uvw, time=visibility.time.data, vis=crosscorr_vis, weight=crosscorr_weights, integration_time=visibility.integration_time.data, flags=flags, baselines=baselines, polarisation_frame=PolarisationFrame("linearnp"), source=visibility.source, meta=visibility.meta, ) ) return vis_list
[docs] def split_array_into_chunks(array, num_chunks): """ Split an array into chunks of num_chunks elements. When num_chunks > 1, the first and last chunks are discarded :param array: Array to be split (equally or unequally) into chunks :param num_chunks: Number of chunks :return: Array with chunks of size num_chunks """ # Split (equally or unequally) into chunks if len(array) % num_chunks != 0: LOG.warning( "Array cannot be equally divided so they would " "be unequally divided..." ) array = numpy.array_split(array, num_chunks) else: array = array.reshape(num_chunks, -1) if num_chunks > 1: # Discard the first and last chunks of the array array = array[1 : num_chunks - 1] return array
[docs] def select_channels(freqs, channels, start_freq, end_freq): """ Select the desired frequencies and corresponding channels of interest, by inputting the start and end frequency. The function will select the channels between these two frequencies. :param freqs: 1D frequency array in MHz [nchan] :param channels: 1D frequency channel numbers :param start_freq: Start frequency in MHz (float) :param end_freq: End frequency in MHz (float) :return: selected array of (frequencies in MHz, channel numbers) """ select_mask = (freqs > float(start_freq)) & (freqs < float(end_freq)) freqs = freqs[select_mask] channels = channels[select_mask] return freqs, channels
[docs] def select_channels_and_split( freqs, channels, start_freq, end_freq, num_chunks ): """ Selects frequencies and corresponding channels and split them into chunks :param freqs: Frequency array in MHz :param channels: Channel array with same length as freqs :param start_freq: Start frequency in MHz :param end_freq: End frequency in MHz :return: Optionally selected frequencies and channels split into chunks """ # Optionally select frequency channels if start_freq is not None and end_freq is not None: if float(start_freq) > float(end_freq) or float(start_freq) == float( end_freq ): raise ValueError("start_freq cannot be >= end_freq!") freqs, channels = select_channels( freqs, channels, start_freq, end_freq ) # Split frequencies and corresponding channels into chunks freqs = split_array_into_chunks(freqs, num_chunks) channels = split_array_into_chunks(channels, num_chunks) return freqs, channels