""" Functions for dish surface modeling
"""
__all__ = [
"simulate_gaintable_from_zernikes",
"simulate_gaintable_from_voltage_pattern",
]
import logging
import numpy
from astropy.time import Time
from scipy.interpolate import RectBivariateSpline
from ska_sdp_datamodels.calibration import create_gaintable_from_visibility
from ska_sdp_func_python.util import hadec_to_azel
from ska_sdp_func_python.util.geometry import calculate_azel
from ska_sdp_func_python.visibility.visibility_geometry import (
calculate_visibility_hourangles,
)
log = logging.getLogger("rascil-logger")
[docs]
def simulate_gaintable_from_voltage_pattern(
vis,
sc,
vp,
vis_slices=None,
order=3,
elevation_limit=15.0 * numpy.pi / 180.0,
jones_type="B",
**kwargs,
):
"""Create gaintables from a list of components and voltage patterns
:param elevation_limit:
:param vis_slices:
:param vis:
:param sc: Sky components for which pierce points are needed
:param vp: Voltage pattern in AZELGEO frame, can also be a list of
voltage patterns, indexed alphabetically
:param order: order of spline (default is 3)
:param jones_type: Type of calibration matrix T or G or B
:return:
"""
gaintables = [
create_gaintable_from_visibility(
vis, timeslice=kwargs.get("timeslice", None), jones_type=jones_type
)
for i in sc
]
nant = gaintables[0].gaintable_acc.nants
gnchan = gaintables[0].gaintable_acc.nchan
frequency = gaintables[0].frequency
# if not isinstance(vp, collections.abc.Iterable):
if not isinstance(vp, list):
vp = [vp]
nchan, npol, ny, nx = vp[0]["pixels"].data.shape
vnpol = vis.visibility_acc.npol
vp_types = numpy.unique(vis.configuration.vp_type.data)
nvp = len(vp_types)
# Each antennas can have one of a (small) number of different voltage patterns thus
# allowing for e.g. SKA and MeerKAT antennas for MID
vp_for_ant = numpy.zeros([nant], dtype=int)
for ivp in range(nvp):
for ant in range(nant):
if vis.configuration.vp_type.data[ant] == vp_types[ivp]:
vp_for_ant[ant] = ivp
# We construct interpolators for each voltage pattern type and for each
# polarisation, and for real, imaginary parts
if len(vp) == 1:
vp_types = [0]
else:
assert len(vp) == len(vp_types)
# These are substantial overheads so it is best to call these function as few times
# as possible.
# Note that we set up splines only in x, y so there is no interpolation in
# frequency. In the lookup below, we just use the nearest frequency channel.
real_spline = [
[
[
RectBivariateSpline(
range(ny),
range(nx),
vp[ivp]["pixels"].data[chan, pol, ...].real,
kx=order,
ky=order,
)
for ivp, _ in enumerate(vp_types)
]
for chan in range(nchan)
]
for pol in range(npol)
]
imag_spline = [
[
[
RectBivariateSpline(
range(ny),
range(nx),
vp[ivp]["pixels"].data[chan, pol, ...].imag,
kx=order,
ky=order,
)
for ivp, _ in enumerate(vp_types)
]
for chan in range(nchan)
]
for pol in range(npol)
]
vp_wcs = [v.image_acc.wcs for v in vp]
assert vp_wcs[0].wcs.ctype[0] == "AZELGEO long", vp_wcs[0].wcs.ctype[0]
assert vp_wcs[0].wcs.ctype[1] == "AZELGEO lati", vp_wcs[0].wcs.ctype[1]
assert vis.configuration.mount[0] == "azel", (
"Mount %s not supported yet" % vis.configuration.mount[0]
)
number_bad = 0
number_good = 0
# For each hourangle, we need to calculate the location of a component
# in AZELGEO. With that we can then look up the relevant gain from the
# voltage pattern
gt0 = gaintables[0]
for row, time in enumerate(gt0.time):
time_slice = {
"time": slice(time - gt0.interval[row] / 2, time + gt0.interval[row] / 2)
}
v = vis.sel(time_slice)
utc_time = Time([numpy.average(v.time) / 86400.0], format="mjd", scale="utc")
azimuth_centre, elevation_centre = calculate_azel(
v.configuration.location, utc_time, vis.phasecentre
)
azimuth_centre = azimuth_centre[0].to("deg").value
elevation_centre = elevation_centre[0].to("deg").value
if elevation_centre >= elevation_limit:
for icomp, comp in enumerate(sc):
antvp = numpy.zeros([nvp, gnchan, vnpol], dtype="complex")
antgain = numpy.zeros([nant, gnchan, vnpol], dtype="complex")
antwt = numpy.zeros([nant, gnchan, vnpol])
# Calculate the azel of this component
azimuth_comp, elevation_comp = calculate_azel(
v.configuration.location, utc_time, comp.direction
)
cosel = numpy.cos(elevation_comp[0]).value
azimuth_comp = azimuth_comp[0].to("deg").value
elevation_comp = elevation_comp[0].to("deg").value
if azimuth_comp - azimuth_centre > 180.0:
azimuth_centre += 360.0
elif azimuth_comp - azimuth_centre < -180.0:
azimuth_centre -= 360.0
for ivp, _ in enumerate(vp_types):
for gchan in range(gnchan):
try:
worldloc = [
[
(azimuth_comp - azimuth_centre) * cosel,
elevation_comp - elevation_centre,
vp_wcs[ivp].wcs.crval[2],
frequency.data[gchan],
]
]
pixloc = vp_wcs[ivp].wcs_world2pix(worldloc, 0)[0]
assert pixloc[0] > 2, f"Error in x pixel : {pixloc}"
assert pixloc[0] < nx - 3, f"Error in x pixel : {pixloc}"
assert pixloc[1] > 2, f"Error in y pixel : {pixloc}"
assert pixloc[1] < ny - 3, f"Error in y pixel : {pixloc}"
assert (
pixloc[3] > -1
), f"Error in Frequency pixel : {pixloc}"
assert (
pixloc[3] < nchan
), f"Error in Frequency pixel : {pixloc}"
chan = int(round(pixloc[3]))
if vnpol > 1:
gain = numpy.zeros([npol], dtype="complex")
for pol in range(vnpol):
gain[pol] = real_spline[pol][chan][ivp].ev(
pixloc[1], pixloc[0]
) + 1j * imag_spline[pol][chan][ivp].ev(
pixloc[1], pixloc[0]
)
ag = gain.reshape([2, 2])
ag = numpy.linalg.inv(ag)
antvp[ivp, gchan, :] = ag.reshape([4])
number_good += 1
else:
gain = 0.5 * (
real_spline[0][chan][ivp].ev(pixloc[1], pixloc[0])
+ 1j
* imag_spline[0][chan][ivp].ev(pixloc[1], pixloc[0])
+ real_spline[3][chan][ivp].ev(pixloc[1], pixloc[0])
+ 1j
* imag_spline[3][chan][ivp].ev(pixloc[1], pixloc[0])
)
if numpy.abs(gain) > 0.0:
antvp[ivp, gchan, 0] = 1.0 / gain
number_good += 1
else:
antvp[ivp, gchan, 0] = 0.0
number_bad += 1
for ant in range(nant):
antgain[ant, ...] = antvp[vp_for_ant[ant], ...]
antwt[...] = 1.0
except (IndexError, ValueError, AssertionError) as err:
print(err)
number_bad += 1
antgain[...] = 0.0
antwt[...] = 0.0
if vnpol > 1:
gaintables[icomp].gain.data[row, :, :, :] = antgain.reshape(
[nant, gnchan, 2, 2]
)
gaintables[icomp].weight.data[row, :, :, :] = antwt.reshape(
[nant, gnchan, 2, 2]
)
else:
gaintables[icomp].gain.data[row, :, :, :] = antgain.reshape(
[nant, gnchan, 1, 1]
)
gaintables[icomp].weight.data[row, :, :, :] = antwt.reshape(
[nant, gnchan, 1, 1]
)
gaintables[icomp].attrs["phasecentre"] = comp.direction
else:
for icomp, comp in enumerate(sc):
gaintables[icomp].gain.data[...] = 1.0 + 0.0j
gaintables[icomp].weight.data[row, :, :, :] = 0.0
gaintables[icomp].attrs["phasecentre"] = comp.direction
number_bad += nant
assert number_good > 0, (
"simulate_gaintable_from_voltage_pattern: No points inside the voltage "
"pattern image"
)
if number_bad > 0:
log.warning(
"simulate_gaintable_from_voltage_pattern: %d points are inside the "
"voltage pattern image" % (number_good)
)
log.warning(
"simulate_gaintable_from_voltage_pattern: %d points are outside the "
"voltage pattern image" % (number_bad)
)
return gaintables
[docs]
def simulate_gaintable_from_zernikes(
vis,
sc,
vp_list,
vp_coeffs,
vis_slices=None,
order=3,
elevation_limit=15.0 * numpy.pi / 180.0,
jones_type="B",
**kwargs,
):
"""Create gaintables for a set of zernikes
:param vis:
:param sc: Sky components for which pierce points are needed
:param vp: List of Voltage patterns in AZELGEO frame
:param vp_coeffs: Fractional contribution [nants, nvp]
:param order: order of spline (default is 3)
:param jones_type: Type of calibration matrix T or G or B
:return:
"""
ntimes = vis.vis.shape[0]
vp_coeffs = numpy.array(vp_coeffs)
gaintables = [
create_gaintable_from_visibility(
vis, timeslice=kwargs.get("timeslice", None), jones_type=jones_type
)
for i in sc
]
nant = gaintables[0].gaintable_acc.nants
assert vis.configuration.mount[0] == "azel", (
"Mount %s not supported yet" % vis.configuration.mount[0]
)
# The time in the Visibility is UTC in seconds
number_bad = 0
number_good = 0
# Cache the splines, one per voltage pattern
real_splines = list()
imag_splines = list()
for ivp, vp in enumerate(vp_list):
assert (
vp.image_acc.wcs.wcs.ctype[0] == "AZELGEO long"
), vp.image_acc.wcs.wcs.ctype[0]
assert (
vp.image_acc.wcs.wcs.ctype[1] == "AZELGEO lati"
), vp.image_acc.wcs.wcs.ctype[1]
nchan, npol, ny, nx = vp["pixels"].data.shape
real_splines.append(
RectBivariateSpline(
range(ny),
range(nx),
vp["pixels"].data[0, 0, ...].real,
kx=order,
ky=order,
)
)
imag_splines.append(
RectBivariateSpline(
range(ny),
range(nx),
vp["pixels"].data[0, 0, ...].imag,
kx=order,
ky=order,
)
)
latitude = vis.configuration.location.lat.rad
r2d = 180.0 / numpy.pi
s2r = numpy.pi / 43200.0
# For each hourangle, we need to calculate the location of a component
# in AZELGEO. With that we can then look up the relevant gain from the
# voltage pattern
for icomp, comp in enumerate(sc):
gt = gaintables[icomp]
for row, time in enumerate(gt.time):
time_slice = {
"time": slice(time - gt.interval[row] / 2, time + gt.interval[row] / 2)
}
vis_sel = vis.sel(time_slice)
ha = numpy.average(calculate_visibility_hourangles(vis_sel).to("rad").value)
# Calculate the az el for this hourangle and the phasecentre declination
utc_time = Time(
[numpy.average(vis_sel.time) / 86400.0], format="mjd", scale="utc"
)
azimuth_centre, elevation_centre = calculate_azel(
vis_sel.configuration.location, utc_time, vis.phasecentre
)
azimuth_centre = azimuth_centre[0].to("deg").value
elevation_centre = elevation_centre[0].to("deg").value
if elevation_centre >= elevation_limit:
antgain = numpy.zeros([nant], dtype="complex")
# Calculate the location of the component in AZELGEO, then add the
# pointing offset for each antenna
hacomp = comp.direction.ra.rad - vis.phasecentre.ra.rad + ha
deccomp = comp.direction.dec.rad
azimuth_comp, elevation_comp = hadec_to_azel(hacomp, deccomp, latitude)
for ant in range(nant):
for ivp, vp in enumerate(vp_list):
nchan, npol, ny, nx = vp["pixels"].data.shape
wcs_azel = vp.image_acc.wcs.deepcopy()
# We use WCS sensible coordinate handling by labelling the
# axes misleadingly
wcs_azel.wcs.crval[0] = azimuth_centre
wcs_azel.wcs.crval[1] = elevation_centre
wcs_azel.wcs.ctype[0] = "RA---SIN"
wcs_azel.wcs.ctype[1] = "DEC--SIN"
worldloc = [
azimuth_comp * r2d,
elevation_comp * r2d,
vp.image_acc.wcs.wcs.crval[2],
vp.image_acc.wcs.wcs.crval[3],
]
try:
pixloc = wcs_azel.sub(2).wcs_world2pix([worldloc[:2]], 1)[0]
assert pixloc[0] > 2
assert pixloc[0] < nx - 3
assert pixloc[1] > 2
assert pixloc[1] < ny - 3
gain = real_splines[ivp].ev(
pixloc[1], pixloc[0]
) + 1j * imag_splines[ivp](pixloc[1], pixloc[0])
antgain[ant] += vp_coeffs[ant, ivp] * gain
number_good += 1
except (ValueError, AssertionError):
number_bad += 1
antgain[ant] = 1.0
antgain[ant] = 1.0 / antgain[ant]
gaintables[icomp].gain.data[row, :, :, :] = antgain[
:, numpy.newaxis, numpy.newaxis, numpy.newaxis
]
gaintables[icomp].attrs["phasecentre"] = comp.direction
else:
gaintables[icomp].gain.data[...] = 1.0 + 0.0j
gaintables[icomp].attrs["phasecentre"] = comp.direction
number_bad += nant
if number_bad > 0:
log.warning(
"simulate_gaintable_from_zernikes: %d points are inside the "
"voltage pattern image" % (number_good)
)
log.warning(
"simulate_gaintable_from_zernikes: %d points are outside the "
"voltage pattern image" % (number_bad)
)
return gaintables