"""
Util functions for:
1. Constructing katpoint antenna information
2. Constructing Measurement Sets names based on ScanIDs
3. Determining observing frequency band
4. Storing errors and traceback in the config db when running the
pipeline in SDP
5. Extracts the values from the dictionary of the fitted parameters
and outputs them as a numpy array in degrees.
6. Extracting the index of antenna under test in a two-dish mode scenario
7. Merging the output offsets from a two-dish mode scenario
8. Computing the weighted average of the output offsets for the
parallel hands gain or visibility amplitudes
9. Computing a weighted-average of the fitted parameters and their
standard deviations from beams fitted to the Stokes I amplitudes
or parallel-hands gain amplitudes
10. Converting the dictionary of fitted parameters to a numpy array
and to a structured numpy array
11. Generating some parameters required for plotting the Gaussian fits
in the two-dish mode scenario
12. Splitting array into chunks and discards the first and last chunks if
the number of chunks is greater than 1
13. Extracting the cross-correlation Visibility object of the
parallel-hands polarisations
14. Selecting frequency ranges for input data and splitting into chunks
"""
import glob
import logging
import traceback
from copy import deepcopy
import numpy
from katpoint import Antenna
from ska_sdp_datamodels.science_data_model import PolarisationFrame
from ska_sdp_datamodels.visibility import Visibility
from ska_sdp_func_python.util.coordinate_support import ecef_to_lla
LOG = logging.getLogger("ska-sdp-pointing-offset")
DATA_STRUCTURE = numpy.dtype(
[
("antenna_name", "U8"),
("last_scan_index", "f8"),
("xel_offset", "f8"),
("xel_offset_std", "f8"),
("el_offset", "f8"),
("el_offset_std", "f8"),
("expected_width_h", "f8"),
("expected_width_v", "f8"),
("fitted_width_h", "f8"),
("fitted_width_h_std", "f8"),
("fitted_width_v", "f8"),
("fitted_width_v_std", "f8"),
("fitted_height", "f8"),
("fitted_height_std", "f8"),
]
)
[docs]
def construct_antennas(xyz, diameter, station):
"""
Construct list of katpoint Antenna objects
based on telescope configuration information.
:param xyz: xyz coordinates of antenna positions in [nants, 3]
:param diameter: Diameter of dishes in [nants]
:param station: List of the antenna names [nants]
:return: a set of katpoint Antenna objects
"""
latitude, longitude, altitude = ecef_to_lla(
x=xyz[:, 0], y=xyz[:, 1], z=xyz[:, 2]
)
ants = []
for ant_name, diam, lat, long, alt in zip(
station, diameter, latitude, longitude, altitude
):
# Antenna information
# The beamwidth is HPBW of an antenna: k * lambda/D
# We use an estimate of k=1.22 here but not used in
# calculating the beamwidth as k is passed from
# the command line. The "beamwidth" as used in ant is
# actually referring to the beamwidth factor, k.
ant = Antenna(
name=ant_name,
latitude=lat,
longitude=long,
altitude=alt,
diameter=diam,
delay_model=None,
pointing_model=None,
beamwidth=1.22,
)
ants.append(ant)
return ants
[docs]
def construct_ms_file_name(data_dir, scan_ids=None):
"""
Construct filenames based on scanID.
Assumes one MS per scan ID.
:param data_dir: Given directory name containing Measurement Sets
:param scan_ids: A list of scan IDs
:return: A list of Measurement Sets filenames
"""
file_extensions = "*.ms"
scan_ids_track = deepcopy(scan_ids)
files_in_dir = glob.glob(f"{data_dir}/{file_extensions}")
ms_files = []
# Find files that contain the specific string
for file in files_in_dir:
# Define what is to be searched in
file_text = file.lower()
for scan_id in scan_ids_track:
# Assume current file name likes *scan-0.ms.
file_content = f"scan-{scan_id}.ms"
if file_content in file_text:
ms_files.append(file)
scan_ids_track.remove(scan_id)
break
if scan_ids_track:
raise (
FileNotFoundError(
f"Missing data: The following scan IDs do not have associated "
f"MeasurementSets in the directory: {scan_ids_track}"
)
)
return ms_files
[docs]
def observing_band(frequency_per_chunk):
"""
Compares the frequency per chunk to the dedicated frequency bands
for the SKA and returns which observing band was used.
The observing frequency bands were obtained from
https://www.skao.int/en/science-users/118/ska-telescope-specifications#__otpm4
:param frequency_per_chunk: 1D array of the averaged-frequency in each
chunk in Hz [num_chunks]
:return: The frequency band within which an observation falls
"""
bands = {
"SKA1-Low": numpy.linspace(50.0e6, 350.0e6, 300),
"Band 1": numpy.linspace(350.0e6, 1050.0e6, 700),
"Band 2": numpy.linspace(950.0e6, 1760.0e6, 810),
"Band 3": numpy.linspace(1650.0e6, 3050.0e6, 1400),
"Band 4": numpy.linspace(2800.0e6, 5180.0e6, 2380),
"Band 5a": numpy.linspace(4600.0e6, 8500.0e6, 4000),
"Band 5b": numpy.linspace(8300.0e6, 15300.0e6, 5000),
}
try:
# num_chunks > 1
lower_band = frequency_per_chunk[0]
upper_band = frequency_per_chunk[-1]
except IndexError:
# num_chunks = 1
lower_band = upper_band = frequency_per_chunk
band_type = [
key
for key, val in bands.items()
if (lower_band >= val[0] and upper_band <= val[-1])
]
if len(band_type) == 0:
band_type = None
else:
band_type = band_type[0]
return band_type
[docs]
def handle_exception(err, traceback_frames, scan_ids):
"""
When an exception occurs store the error and traceback
in the config db
:param err: Exception thrown
:param traceback_frames: Exception traceback object
:param scan_ids: list of processed scan IDs
:return: dict of error information
"""
# collect the error type
error_type = type(err).__name__
error_message = str(err)
full_error = f"{error_type}: {error_message}"
# collect and log the error and traceback
trace = traceback.format_exc()
LOG.error("Pipeline unsuccessful with error: %s", full_error)
LOG.error("Traceback: %s", trace)
stack_summary = traceback.extract_tb(traceback_frames)
# Access the last line
last_line = stack_summary[-1]
exception_data = {
"scan_ids": scan_ids,
"error": full_error,
"traceback": {
"file": last_line.filename,
"line": last_line.lineno,
"function": last_line.name,
},
"status": "failed",
}
return exception_data
[docs]
def antenna_and_offsets_reorder(offsets):
"""
Re-orders the antenna names and their corresponding fitted parameters
in the two-dish scenario to match those in the output HDF5 file
:param offsets: A dictionary of the weighted-average pointing
offsets output by :func:`array_data_func.weighted_average`
:return: A dictionary of the input offsets with the right antenna names
and fitted offsets ordering
"""
offsets["antenna_names"] = numpy.flip(offsets["antenna_names"])
offsets["xel_offset"] = numpy.flip(offsets["xel_offset"])
offsets["xel_offset_std"] = numpy.flip(offsets["xel_offset_std"])
offsets["el_offset"] = numpy.flip(offsets["el_offset"])
offsets["el_offset_std"] = numpy.flip(offsets["el_offset_std"])
offsets["expected_width"] = numpy.flipud(offsets["expected_width"])
offsets["fitted_width"] = numpy.flipud(offsets["fitted_width"])
offsets["fitted_width_std"] = numpy.flipud(offsets["fitted_width_std"])
offsets["fitted_height"] = numpy.flip(offsets["fitted_height"])
offsets["fitted_height_std"] = numpy.flip(offsets["fitted_height_std"])
return offsets
[docs]
def convert_dict_to_numpy_array(offsets):
"""
Extracts the values from the dictionary of the fitted parameters
and outputs them as a numpy array.
:param offsets: A dictionary of the weighted-average pointing
offsets output by :func:`array_data_func.weighted_average`
:return: A numpy array of the input offsets
"""
offsets = numpy.column_stack(
(
offsets["antenna_names"],
offsets["xel_offset"],
offsets["xel_offset_std"],
offsets["el_offset"],
offsets["el_offset_std"],
offsets["expected_width"][:, 0],
offsets["expected_width"][:, 1],
offsets["fitted_width"][:, 0],
offsets["fitted_width_std"][:, 0],
offsets["fitted_width"][:, 1],
offsets["fitted_width_std"][:, 1],
offsets["fitted_height"],
offsets["fitted_height_std"],
)
)
return offsets
[docs]
def create_empty_array(antennas):
"""
Create an empty array with 13 columns and as many rows as
there are receptors to send to kafka if there is an
exception while processing scans
:param antennas: list of antenna names
:return: empty array to send to kafka
"""
# Give antenna empty strings
# And the rest NaN values
ants = numpy.array(antennas).reshape(len(antennas), 1)
values = numpy.full([len(antennas), 12], numpy.nan)
data = numpy.column_stack((ants, values))
return data
[docs]
def convert_offsets_to_degrees_for_kafka(offsets):
"""
Extracts the values from the dictionary of the fitted parameters
and outputs them as a numpy array in degrees.
:param offsets: A dictionary of the weighted-average pointing
offsets output by :func:`array_data_func.weighted_average`
:return: A converted numpy array where all the input offsets are
now in degrees except the fitted heights and its uncertainty are
in arbitrary units
"""
offsets = numpy.column_stack(
(
offsets["antenna_names"],
numpy.degrees(offsets["xel_offset"]),
numpy.degrees(offsets["xel_offset_std"]),
numpy.degrees(offsets["el_offset"]),
numpy.degrees(offsets["el_offset_std"]),
numpy.degrees(offsets["expected_width"][:, 0]),
numpy.degrees(offsets["expected_width"][:, 1]),
numpy.degrees(offsets["fitted_width"][:, 0]),
numpy.degrees(offsets["fitted_width_std"][:, 0]),
numpy.degrees(offsets["fitted_width"][:, 1]),
numpy.degrees(offsets["fitted_width_std"][:, 1]),
offsets["fitted_height"],
offsets["fitted_height_std"],
)
)
return offsets
[docs]
def get_off_source_antenna_index(on_sky_offsets):
"""
Extracts the index of the antenna under test i.e the antenna
performing the five-point scan or similar from the on-sky
offsets in a two-dish mode reference pointing observation
:param on_sky_offsets: The commanded pointings relative
to the target in the reference pointing observation in
xEl-El coordinates with dimensions (nscans, nants, 2)
:return: A list of the index of the antenna performing the
five-point or similar scan
"""
moving_antenna_index = []
for index in range(on_sky_offsets.shape[1]):
xel_min = abs(on_sky_offsets[:, index, 0].min())
el_min = abs(on_sky_offsets[:, index, 1].min())
if xel_min > 0.1 * xel_min and el_min > 0.1 * el_min:
moving_antenna_index.append(index)
if len(moving_antenna_index) != 1:
raise ValueError(
"Only one antenna is allowed to move to discrete offset pointings!"
)
return moving_antenna_index
[docs]
def create_dict_for_output_offsets(nants):
"""
Creates a dictionary of the fitted parameters and populates
them with NaNs
:param nants: The number of antennas
:return: A dictionary of the fitted parameters populated with
NaNs
"""
fitted_offset_params = numpy.full(nants, numpy.nan)
fitted_width_params = numpy.full((nants, 2), numpy.nan)
fitted_parameters = {
"antenna_names": numpy.full(nants, numpy.nan, dtype=list),
"xel_offset": fitted_offset_params.copy(),
"xel_offset_std": fitted_offset_params.copy(),
"el_offset": fitted_offset_params.copy(),
"el_offset_std": fitted_offset_params.copy(),
"expected_width": fitted_width_params.copy(),
"fitted_width": fitted_width_params.copy(),
"fitted_width_std": fitted_width_params.copy(),
"fitted_height": fitted_offset_params.copy(),
"fitted_height_std": fitted_offset_params.copy(),
}
return fitted_parameters
[docs]
def merge_output_offsets(output_offsets_list):
"""
Merges the output offsets from a two-dish scenario observation
into a single dictionary of the output offsets
:param output_offsets_list: A list of output offsets
:return: A dictionary of the merged output offsets
"""
def _populate_dict(key, single_offset, merged_offset):
"""
Put into merged offset for each key
"""
try:
merged_offset[key][i] = single_offset[key]
# If offset is already all NaNs
# Then don't need to do anything
except ValueError:
pass
merged_output_offsets = create_dict_for_output_offsets(
nants=len(output_offsets_list)
)
for i, output_offset in enumerate(output_offsets_list):
for key in output_offset:
_populate_dict(key, output_offset, merged_output_offsets)
return merged_output_offsets
[docs]
def weighted_avg_and_std(data, axis, weights, std_error=False):
"""
Compute weighted-average and standard deviation (or error).
:param data: Data to be averaged (masked entries are ignored).
:param weights: Array of weights with same shape as data along
the specified axes.
:param axis: Axis or axes along which to average data.
:param std_error: If set then return standard error instead of
standard deviation (default=False)
:return: weighted average and weighted standard deviation (or error).
"""
# Mask any NaNs in the weights
weights = numpy.ma.fix_invalid(weights)
average, sumwt = numpy.ma.average(
data, axis=axis, weights=weights, returned=True
)
mean_subtracted = data - numpy.expand_dims(average, axis=axis)
sumsqwt = (weights**2).sum(axis=axis)
unbiased_variance = (mean_subtracted**2 * weights).sum(axis=axis) / (
sumwt - (sumsqwt / sumwt)
)
if not std_error:
std = numpy.sqrt(unbiased_variance)
else:
std = numpy.sqrt(unbiased_variance * sumsqwt) / sumwt
return average, std
[docs]
def average_output_offsets(output_offsets_list):
"""
Computes a weighted average of the output offsets for the
parallel hands gain or visibility amplitudes output by
:func:`array_data_func.fitted_offsets_weighted_average`.
:param output_offsets_list: A list of output offsets.
:return: A dictionary of the weighted-average output offsets.
"""
output_offsets_xx = output_offsets_list[0]
output_offsets_yy = output_offsets_list[1]
antenna_names = output_offsets_xx["antenna_names"]
xel_offset, xel_offset_std = weighted_avg_and_std(
data=numpy.array(
(output_offsets_xx["xel_offset"], output_offsets_yy["xel_offset"])
),
axis=0,
weights=1.0
/ numpy.array(
(
output_offsets_xx["xel_offset_std"],
output_offsets_yy["xel_offset_std"],
)
)
** 2,
std_error=True,
)
el_offset, el_offset_std = weighted_avg_and_std(
data=numpy.array(
(output_offsets_xx["el_offset"], output_offsets_yy["el_offset"])
),
axis=0,
weights=1.0
/ numpy.array(
(
output_offsets_xx["el_offset_std"],
output_offsets_yy["el_offset_std"],
)
)
** 2,
std_error=True,
)
expected_width = numpy.ma.average(
numpy.array(
(
output_offsets_xx["expected_width"],
output_offsets_yy["expected_width"],
)
),
axis=0,
)
width, width_std = weighted_avg_and_std(
data=numpy.array(
(
output_offsets_xx["fitted_width"],
output_offsets_yy["fitted_width"],
)
),
axis=0,
weights=1.0
/ numpy.array(
(
output_offsets_xx["fitted_width_std"],
output_offsets_yy["fitted_width_std"],
)
)
** 2,
std_error=True,
)
height, height_std = weighted_avg_and_std(
data=numpy.array(
(
output_offsets_xx["fitted_height"],
output_offsets_yy["fitted_height"],
)
),
axis=0,
weights=1.0
/ numpy.array(
(
output_offsets_xx["fitted_height_std"],
output_offsets_yy["fitted_height_std"],
)
)
** 2,
std_error=True,
)
fitted_parameters = create_dict_for_output_offsets(
nants=len(antenna_names)
)
# Populate the dictionary
fitted_parameters["antenna_names"] = antenna_names
fitted_parameters["xel_offset"] = xel_offset
fitted_parameters["xel_offset_std"] = xel_offset_std
fitted_parameters["el_offset"] = el_offset
fitted_parameters["el_offset_std"] = el_offset_std
fitted_parameters["expected_width"] = expected_width
fitted_parameters["fitted_width"] = width
fitted_parameters["fitted_width_std"] = width_std
fitted_parameters["fitted_height"] = height
fitted_parameters["fitted_height_std"] = height_std
return fitted_parameters
[docs]
def convert_to_structured(poffset_data, last_scan_index):
"""
Convert pointing offset data from a numpy structure of 13 elements
(antenna_name plus 12 floats representing fitting values) to a numpy
structured array. The names of the fields are taken to match those
in array_data_func.weighted_average()
Include the last scan index as the second element in this structured data
:param poffset_data: Offsets as numpy column_data
:param last_scan_index: Index of last scan
:return: Offsets and last_scan_index as structured numpy data
"""
nants = poffset_data.shape[0]
structured_data = numpy.zeros(0, dtype=DATA_STRUCTURE)
for ant in range(nants):
ant_data = numpy.insert(poffset_data[ant], 1, last_scan_index)
structured_data = numpy.append(
structured_data,
numpy.array(tuple(ant_data), dtype=DATA_STRUCTURE),
)
return structured_data
# pylint: disable=too-many-locals
[docs]
def get_parameters_for_gaussian_fits_plot(
x_per_scan_list, y_per_scan_list, fitted_beams_list
):
"""
Generate some parameters required for plotting the Gaussian fits
in the two-dish mode scenario i.e, when the beams are fitted to
the visibility amplitudes by merging the parameters from the
two sets of observations
:param x_per_scan_list: List of on-sky offsets from the two sets
of observations
:param y_per_scan_list: List of visibility amplitudes from the two
sets of observations
:param fitted_beams_list: The list of fitted beams from the two sets
of observations
:return: The on-sky offsets with dimension (nscans, nants, 2), the
vis amplitudes with dimensions (nants, nscans) when num_chunks=1
or (nants, num_chunks, nscans) when num_chunks > 1. Note that nants
is always 2 because only two antennas are used in the observation
when the beams are fitted to the visibility amplitudes
"""
# Convert the list of on-sky offsets to dimensions (nscans, nants, 2)
# where nants=2
ant1_x_per_scan = x_per_scan_list[0]
ant1_idx = get_off_source_antenna_index(ant1_x_per_scan)[0]
ant1_x_per_scan = ant1_x_per_scan[:, ant1_idx]
ant2_x_per_scan = x_per_scan_list[1]
ant2_idx = get_off_source_antenna_index(ant2_x_per_scan)[0]
ant2_x_per_scan = ant2_x_per_scan[:, ant2_idx]
# We need axis 1 as the nants
x_per_scan = numpy.swapaxes(
numpy.stack((ant1_x_per_scan, ant2_x_per_scan)), 0, 1
)
# Convert the list of vis amplitudes to dimensions (nants, nscans)
# when num_chunks=1 and (nants, num_chunks, nscans) when num_chunks > 1
# Note that num_chunks cannot be 2 because the first and last chunks
# of the band are discarded
ant1_y_per_scan = y_per_scan_list[0]
ant2_y_per_scan = y_per_scan_list[1]
ant1_y_per_scan = ant1_y_per_scan[ant1_idx]
ant2_y_per_scan = ant2_y_per_scan[ant2_idx]
y_per_scan = numpy.stack((ant1_y_per_scan, ant2_y_per_scan))
for (key1, value1), (key2, value2) in zip(
fitted_beams_list[0].items(), fitted_beams_list[1].items()
):
merged_fitted_beams = {key1: value1, key2: value2}
return x_per_scan, y_per_scan, merged_fitted_beams
[docs]
def get_crosscorrelations(vis):
"""
Extracts the cross-correlation visibilities and their weights
from visibilities containing both auto and cross-correlations
:param vis: Visibility object containing both auto and cross-
correlations
:return: Cross-correlation Visibility object for the parallel-hands
polarisations
"""
vis_list = []
for visibility in vis:
get_crosscorr = visibility.antenna1 != visibility.antenna2
crosscorr_vis = visibility.vis.data[:, get_crosscorr]
crosscorr_weights = visibility.weight.data[:, get_crosscorr]
baselines = visibility.baselines[get_crosscorr]
flags = visibility.flags[:, get_crosscorr]
uvw = visibility.uvw[:, get_crosscorr]
if crosscorr_vis.shape[3] == 2:
# Polarisations are (XX, YY) or (RR, LL)
LOG.info(
"Found two polarisations. Assumed to be the parallel-hands..."
)
elif crosscorr_vis.shape[3] == 4:
# Polarisations are (XX, XY, YX, YY) or (RR, RL, LR, LL)
LOG.info(
"Found four polarisations. Extracting the "
"parallel hand polarisations..."
)
crosscorr_vis = numpy.stack(
(crosscorr_vis[:, :, :, 0], crosscorr_vis[:, :, :, 3])
)
crosscorr_vis = numpy.moveaxis(crosscorr_vis, 0, 3)
crosscorr_weights = numpy.stack(
(crosscorr_weights[:, :, :, 0], crosscorr_weights[:, :, :, 3])
)
crosscorr_weights = numpy.moveaxis(crosscorr_weights, 0, 3)
else:
raise ValueError("Polarisation type not supported!")
vis_list.append(
Visibility.constructor(
frequency=visibility.frequency,
channel_bandwidth=visibility.channel_bandwidth,
phasecentre=visibility.phasecentre,
configuration=visibility.configuration,
uvw=uvw,
time=visibility.time.data,
vis=crosscorr_vis,
weight=crosscorr_weights,
integration_time=visibility.integration_time.data,
flags=flags,
baselines=baselines,
polarisation_frame=PolarisationFrame("linearnp"),
source=visibility.source,
meta=visibility.meta,
)
)
return vis_list
[docs]
def split_array_into_chunks(array, num_chunks):
"""
Split an array into chunks of num_chunks elements. When
num_chunks > 1, the first and last chunks are discarded
:param array: Array to be split (equally or unequally) into chunks
:param num_chunks: Number of chunks
:return: Array with chunks of size num_chunks
"""
# Split (equally or unequally) into chunks
if len(array) % num_chunks != 0:
LOG.warning(
"Array cannot be equally divided so they would "
"be unequally divided..."
)
array = numpy.array_split(array, num_chunks)
else:
array = array.reshape(num_chunks, -1)
if num_chunks > 1:
# Discard the first and last chunks of the array
array = array[1 : num_chunks - 1]
return array
[docs]
def select_channels(freqs, channels, start_freq, end_freq):
"""
Select the desired frequencies and corresponding channels
of interest, by inputting the start and end frequency. The
function will select the channels between these two
frequencies.
:param freqs: 1D frequency array in MHz [nchan]
:param channels: 1D frequency channel numbers
:param start_freq: Start frequency in MHz (float)
:param end_freq: End frequency in MHz (float)
:return: selected array of (frequencies in MHz, channel numbers)
"""
select_mask = (freqs > float(start_freq)) & (freqs < float(end_freq))
freqs = freqs[select_mask]
channels = channels[select_mask]
return freqs, channels
[docs]
def select_channels_and_split(
freqs, channels, start_freq, end_freq, num_chunks
):
"""
Selects frequencies and corresponding channels and split them into chunks
:param freqs: Frequency array in MHz
:param channels: Channel array with same length as freqs
:param start_freq: Start frequency in MHz
:param end_freq: End frequency in MHz
:return: Optionally selected frequencies and channels split into chunks
"""
# Optionally select frequency channels
if start_freq is not None and end_freq is not None:
if float(start_freq) > float(end_freq) or float(start_freq) == float(
end_freq
):
raise ValueError("start_freq cannot be >= end_freq!")
freqs, channels = select_channels(
freqs, channels, start_freq, end_freq
)
# Split frequencies and corresponding channels into chunks
freqs = split_array_into_chunks(freqs, num_chunks)
channels = split_array_into_chunks(channels, num_chunks)
return freqs, channels