""" Functions for simulating pointing errors
"""
__all__ = [
"simulate_gaintable_from_pointingtable",
"simulate_pointingtable_from_timeseries",
"simulate_pointingtable",
]
import logging
import numpy
from astropy.time import Time
from scipy.interpolate import RectBivariateSpline
from ska_sdp_datamodels.calibration import create_gaintable_from_visibility
from ska_sdp_datamodels.calibration.calibration_model import PointingTable
from ska_sdp_func_python.util.geometry import calculate_azel
from rascil.processing_components.parameters import rascil_data_path
log = logging.getLogger("rascil-logger")
[docs]
def simulate_gaintable_from_pointingtable(
vis,
sc,
pt,
vp,
vis_slices=None,
scale=1.0,
order=3,
elevation_limit=15.0 * numpy.pi / 180.0,
jones_type="G",
**kwargs,
):
"""Create gaintables from a pointing table
Note that the column "nominal" is not used
:param vis:
:param sc: Sky components for which pierce points are needed
:param pt: Pointing table
:param vp: Voltage pattern in AZELGEO frame
:param scale: Multiply the screen by this factor
:param order: order of spline (default is 3)
:param jones_type: Type of calibration matrix T or G or B
:return:
"""
nant = vis.visibility_acc.nants
gaintables = [
create_gaintable_from_visibility(
vis, timeslice=kwargs.get("timeslice", None), jones_type=jones_type
)
for i in sc
]
gnchan = gaintables[0].gaintable_acc.nchan
frequency = gaintables[0].frequency
nchan, npol, ny, nx = vp["pixels"].data.shape
real_spline = [
[
RectBivariateSpline(
range(ny),
range(nx),
vp["pixels"].data[chan, pol, ...].real,
kx=order,
ky=order,
)
for chan in range(nchan)
]
for pol in range(npol)
]
imag_spline = [
[
RectBivariateSpline(
range(ny),
range(nx),
vp["pixels"].data[chan, pol, ...].imag,
kx=order,
ky=order,
)
for chan in range(nchan)
]
for pol in range(npol)
]
assert (
npol == vis.visibility_acc.npol
), "Voltage pattern and visibility have incompatible polarisations"
assert vp.image_acc.wcs.wcs.ctype[0] == "AZELGEO long", vp.image_acc.wcs.wcs.ctype[
0
]
assert vp.image_acc.wcs.wcs.ctype[1] == "AZELGEO lati", vp.image_acc.wcs.wcs.ctype[
1
]
assert vis.configuration.mount[0] == "azel", (
"Mount %s not supported yet" % vis.configuration.mount[0]
)
# The time in the Visibility is UTC in seconds
number_bad = 0
number_good = 0
number_singular = 0
r2d = 180.0 / numpy.pi
# For each hourangle, we need to calculate the location of a component
# in AZELGEO. With that we can then look up the relevant gain from the
# voltage pattern
location = vis.attrs["configuration"].attrs["location"]
for row, time in enumerate(pt["time"]):
time_slice = {
"time": slice(time - pt["interval"][row] / 2, time + pt.interval[row] / 2)
}
pt_sel = pt.sel(time_slice)
if len(pt_sel["time"]) > 0:
pointing_ha = pt_sel["pointing"].data[0, ...]
utc_time = Time(
[numpy.average(pt_sel["time"]) / 86400.0], format="mjd", scale="utc"
)
azimuth_centre, elevation_centre = calculate_azel(
location, utc_time, vis.phasecentre
)
azimuth_centre = azimuth_centre[0].to("rad").value
elevation_centre = elevation_centre[0].to("rad").value
# Calculate the az el for this hourangle and the phasecentre declination
for icomp, comp in enumerate(sc):
gt_sel = gaintables[icomp].sel(time_slice)
nrec = gt_sel.gaintable_acc.nrec
if elevation_centre >= elevation_limit:
antgain = numpy.zeros([nant, gnchan, npol], dtype="complex")
# Calculate the azel of this component
azimuth_comp, elevation_comp = calculate_azel(
location, utc_time, comp.direction
)
azimuth_comp = azimuth_comp[0].to("rad").value
elevation_comp = elevation_comp[0].to("rad").value
# We now add the pointing to the calculated az, el of the component
for ant in range(nant):
wcs_azel = vp.image_acc.wcs.deepcopy()
az_comp = azimuth_centre + pointing_ha[
ant, 0, 0, 0
] / numpy.cos(elevation_centre)
el_comp = elevation_centre + pointing_ha[ant, 0, 0, 1]
if az_comp - azimuth_comp > numpy.pi:
azimuth_comp += 2.0 * numpy.pi
if az_comp - azimuth_comp < -numpy.pi:
azimuth_comp -= 2.0 * numpy.pi
# We use WCS sensible coordinate handling by labelling the
# axes misleadingly
wcs_azel.wcs.crval[0] = az_comp * r2d
wcs_azel.wcs.crval[1] = el_comp * r2d
wcs_azel.wcs.ctype[0] = "RA---SIN"
wcs_azel.wcs.ctype[1] = "DEC--SIN"
try:
# Unwrap azimuths
if azimuth_comp - az_comp > numpy.pi:
azimuth_comp -= 2.0 * numpy.pi
elif azimuth_comp - az_comp <= -numpy.pi:
azimuth_comp += 2.0 * numpy.pi
if azimuth_comp > numpy.pi:
azimuth_comp -= 2.0 * numpy.pi
elif azimuth_comp <= -numpy.pi:
azimuth_comp += 2.0 * numpy.pi
for gchan in range(gnchan):
gain = numpy.zeros([npol], dtype="complex")
# World coordinates of this component
worldloc = [
azimuth_comp * r2d,
elevation_comp * r2d,
vp.image_acc.wcs.wcs.crval[2],
frequency[gchan].data,
]
# Pixel coordinates of this component. Note that the
# polarisation coordinate is not used here
pixloc = wcs_azel.wcs_world2pix([worldloc], 0)[0]
assert pixloc[0] > 2
assert pixloc[0] < nx - 3
assert pixloc[1] > 2
assert pixloc[1] < ny - 3
chan = int(round(pixloc[3]))
if nchan == 1:
chan = 0
for pol in range(npol):
gain[pol] = real_spline[pol][chan].ev(
pixloc[1], pixloc[0]
) + 1j * imag_spline[pol][chan].ev(
pixloc[1], pixloc[0]
)
if nrec == 2:
ag = gain.reshape([2, 2])
antgain[ant, gchan, :] = ag.reshape([4])
elif nrec == 1:
antgain[ant, gchan, 0] = gain
else:
raise ValueError(
"Illegal number of receptors: {}".format(nrec)
)
number_good += 1
except (ValueError, AssertionError, IndexError):
number_bad += 1
antgain[ant, :, :] = 0.0
except numpy.linalg.LinAlgError:
number_singular += 1
antgain[ant, :, :] = 0.0
gt_sel["gain"].data[:, :, :, :] = antgain[:, :, :].reshape(
[nant, gnchan, nrec, nrec]
)
gt_sel.attrs["phasecentre"] = comp.direction
else:
gt_sel["gain"].data[...] = 1.0 + 0.0j
gt_sel.attrs["phasecentre"] = comp.direction
number_bad += nant
else:
log.warning("Zero length pointing interval")
if number_bad > 0:
log.debug(
"simulate_gaintable_from_pointingtable: %d points inside "
"the voltage pattern image" % (number_good)
)
log.debug(
"simulate_gaintable_from_pointingtable: %d points outside "
"the voltage pattern image will be ignored" % (number_bad)
)
if number_singular > 0:
log.debug(
"simulate_gaintable_from_pointingtable: %d points have singular gain"
% (number_singular)
)
return gaintables
[docs]
def simulate_pointingtable(
pt: PointingTable,
pointing_error,
static_pointing_error=None,
global_pointing_error=None,
seed=None,
**kwargs,
) -> PointingTable:
"""Simulate a gain table
:type pt: PointingTable
:param pointing_error: std of normal distribution (radians)
:param static_pointing_error: std of normal distribution (radians)
:param global_pointing_error: 2-vector of global pointing error (rad)
:param kwargs:
:return: PointingTable
"""
from numpy.random import default_rng
if seed is None:
rng = default_rng(1805550721)
else:
rng = default_rng(seed)
if static_pointing_error is None:
static_pointing_error = [0.0, 0.0]
r2s = 180.0 * 3600.0 / numpy.pi
pt["pointing"].data = numpy.zeros(pt["pointing"].data.shape)
ntimes, nant, nchan, nrec, _ = pt["pointing"].data.shape
if pointing_error > 0.0:
log.debug(
"simulate_pointingtable: Simulating dynamic pointing error = "
"%g (rad) %g (arcsec)" % (pointing_error, r2s * pointing_error)
)
pt["pointing"].data += rng.normal(
0.0, pointing_error, pt["pointing"].data.shape
)
if (abs(static_pointing_error[0]) > 0.0) or (abs(static_pointing_error[1]) > 0.0):
log.debug(
"simulate_pointingtable: Simulating static pointing error = "
"(%g, %g) (rad) (%g, %g)(arcsec)"
% (
static_pointing_error[0],
static_pointing_error[1],
r2s * static_pointing_error[0],
r2s * static_pointing_error[1],
)
)
static_pe = numpy.zeros(pt["pointing"].data.shape[1:])
static_pe[..., 0] = rng.normal(
0.0, static_pointing_error[0], static_pe[..., 0].shape
)[numpy.newaxis, ...]
static_pe[..., 1] = rng.normal(
0.0, static_pointing_error[1], static_pe[..., 1].shape
)[numpy.newaxis, ...]
pt["pointing"].data += static_pe
if global_pointing_error is not None:
log.debug(
"simulate_pointingtable: Simulating global pointing error = "
"[%g, %g] (rad) [%g,s %g] (arcsec)"
% (
global_pointing_error[0],
global_pointing_error[1],
r2s * global_pointing_error[0],
r2s * global_pointing_error[1],
)
)
pt["pointing"].data[..., :] += global_pointing_error
return pt
[docs]
def simulate_pointingtable_from_timeseries(
pt,
type="wind",
time_series_type="precision",
pointing_directory=None,
reference_pointing=False,
seed=None,
):
"""Create a pointing table with time series created from PSD.
:param pt: Pointing table to be filled
:param type: Type of pointing: 'tracking' or 'wind'
:param time_series_type: Type of wind condition precision|standard|degraded
:param pointing_directory: Name of pointing file directory
:param reference_pointing: Use reference pointing?
:return:
"""
from numpy.random import default_rng
if seed is None:
rng = default_rng(1805550721)
else:
rng = default_rng(seed)
if pointing_directory is None:
pointing_directory = rascil_data_path("models/%s" % time_series_type)
else:
pointing_directory = pointing_directory + "/%s" % (time_series_type)
pt["pointing"].data = numpy.zeros(pt["pointing"].data.shape)
ntimes, nant, nchan, nrec, _ = pt["pointing"].data.shape
# Use az and el at the beginning of this pointingtable
axis_values = pt.nominal[0, 0, 0, 0, 0]
el = pt.nominal[0, 0, 0, 0, 1]
el_deg = el * 180.0 / numpy.pi
az_deg = axis_values * 180.0 / numpy.pi
if el_deg < 30.0:
el_deg = 15.0
elif el_deg < (90.0 + 45.0) / 2.0:
el_deg = 45.0
else:
el_deg = 90.0
if abs(az_deg) < 45.0 / 2.0:
az_deg = 0.0
elif abs(az_deg) < (45.0 + 90.0) / 2.0:
az_deg = 45.0
elif abs(az_deg) < (90.0 + 135.0) / 2.0:
az_deg = 90.0
elif abs(az_deg) < (135.0 + 180.0) / 2.0:
az_deg = 135.0
else:
az_deg = 180.0
pointing_file = "%s/El%dAz%d.dat" % (pointing_directory, int(el_deg), int(az_deg))
log.info(
"simulate_pointingtable_from_timeseries: Reading wind PSD from %s"
% pointing_file
)
try:
psd = numpy.loadtxt(pointing_file)
except OSError:
raise ValueError("Pointing file %s not found." % pointing_file)
# define some arrays
freq = psd[:, 0]
axesdict = {"az": psd[:, 1], "el": psd[:, 2], "pxel": psd[:, 3], "pel": psd[:, 4]}
if type == "tracking":
axes = ["az", "el"]
elif type == "wind":
axes = ["pxel", "pel"]
else:
raise ValueError("Pointing type %s not known." % type)
freq_interval = 0.0001
for axis in axes:
axis_values = axesdict[axis]
if (axis == "az") or (axis == "el"):
# determine index of maximum PSD value; add 50 for better fit
axis_values_max_index = (
numpy.argwhere(axis_values == numpy.max(axis_values))[0][0] + 50
)
axis_values_max_index = min(axis_values_max_index, len(axis_values))
# max_freq = 2.0 / pt.interval[0]
max_freq = 0.4
freq_max_index = numpy.argwhere(freq > max_freq)[0][0]
else:
break_freq = 0.01 # not max; just a break
axis_values_max_index = numpy.argwhere(freq > break_freq)[0][0]
# max_freq = 2.0 / pt.interval[0]
max_freq = 0.1
freq_max_index = numpy.argwhere(freq > max_freq)[0][0]
# construct regularly-spaced frequencies
regular_freq = numpy.arange(freq[0], freq[freq_max_index], freq_interval)
regular_axis_values_max_index = numpy.argwhere(
numpy.abs(regular_freq - freq[axis_values_max_index])
== numpy.min(numpy.abs(regular_freq - freq[axis_values_max_index]))
)[0][0]
# print ('Frequency break: ', freq[az_max_index])
# print ('Max frequency: ', max_freq)
#
# print ('New frequency break: ', regular_freq[regular_az_max_index])
# print ('New max frequency: ', regular_freq[-1])
if axis_values_max_index >= freq_max_index:
raise ValueError(
"Frequency break is higher than highest frequency; "
"select a lower break."
)
# use original frequency break and max frequency to fit function
# fit polynomial to psd up to max value
import warnings
from numpy import RankWarning
warnings.simplefilter("ignore", RankWarning)
p_axis_values1 = numpy.polyfit(
freq[:axis_values_max_index],
numpy.log(axis_values[:axis_values_max_index]),
5,
)
f_axis_values1 = numpy.poly1d(p_axis_values1)
# fit polynomial to psd beyond max value
p_axis_values2 = numpy.polyfit(
freq[axis_values_max_index:freq_max_index],
numpy.log(axis_values[axis_values_max_index:freq_max_index]),
5,
)
f_axis_values2 = numpy.poly1d(p_axis_values2)
# use new frequency break and max frequency to apply function (ensures
# equal spacing of frequency intervals)
# resampled to construct regularly-spaced frequencies
regular_axis_values1 = numpy.exp(
f_axis_values1(regular_freq[:regular_axis_values_max_index])
)
regular_axis_values2 = numpy.exp(
f_axis_values2(regular_freq[regular_axis_values_max_index:])
)
# join
regular_axis_values = numpy.append(regular_axis_values1, regular_axis_values2)
m0 = len(regular_axis_values)
# check rms of resampled PSD
# df = regular_freq[1:]-regular_freq[:-1]
# psd2rms_pxel = numpy.sqrt(numpy.sum(regular_az[:-1]*df))
# print ('Calculate rms of resampled PSD: ', psd2rms_pxel)
original_regular_freq = regular_freq
original_regular_axis_values = regular_axis_values
# get amplitudes from psd values
if (regular_axis_values < 0).any():
raise ValueError(
"Resampling returns negative power values; change fit range."
)
amp_axis_values = numpy.sqrt(regular_axis_values * 2 * freq_interval)
# need to scale PSD by 2* frequency interval before square rooting, then
# by number of modes in resampled PSD
# Now we generate some random phases
for ant in range(nant):
regular_freq = original_regular_freq
regular_axis_values = original_regular_axis_values
phi_axis_values = rng.random(size=len(regular_axis_values)) * 2 * numpy.pi
# create complex array
z_axis_values = amp_axis_values * numpy.exp(1j * phi_axis_values) # polar
# make symmetrical frequencies
mirror_z_axis_values = numpy.copy(z_axis_values)
# make complex conjugates
mirror_z_axis_values.imag -= 2 * z_axis_values.imag
# make negative frequencies
mirror_regular_freq = -regular_freq
# join
z_axis_values = numpy.append(z_axis_values, mirror_z_axis_values[::-1])
regular_freq = numpy.append(regular_freq, mirror_regular_freq[::-1])
# add a 0 Fourier term
zav = z_axis_values
z_axis_values = numpy.zeros([len(zav) + 1]).astype("complex")
z_axis_values[1:] = zav
# perform inverse fft
ts = numpy.fft.ifft(z_axis_values)
# set up and check scalings
Dt = pt.interval[0]
ts = numpy.real(ts)
# the result is scaled by number of points in the signal,
# so multiply - real part - by this
ts *= m0
# The output of the iFFT will be a random time series on the finite
# (bounded, limited) time interval t = 0 to tmax = (N-1) X Dt, #
# where Dt = 1 / (2 X Fmax)
# scale to time interval
# Convert from arcsec to radians
ts *= numpy.pi / (180.0 * 3600.0)
# We take reference pointing to mean that the pointing errors are
# zero at the beginning of the set of integrations
if reference_pointing:
ts[:] -= ts[0]
# pt.data['time'] = times[:ntimes]
if axis == "az":
pt["pointing"].data[:, ant, :, :, 0] = ts[
:ntimes, numpy.newaxis, numpy.newaxis, ...
]
elif axis == "el":
pt["pointing"].data[:, ant, :, :, 1] = ts[
:ntimes, numpy.newaxis, numpy.newaxis, ...
]
elif axis == "pxel":
pt["pointing"].data[:, ant, :, :, 0] = ts[
:ntimes, numpy.newaxis, numpy.newaxis, ...
]
elif axis == "pel":
pt["pointing"].data[:, ant, :, :, 1] = ts[
:ntimes, numpy.newaxis, numpy.newaxis, ...
]
else:
raise ValueError("Unknown axis %s" % axis)
return pt