"""
Functions that define and manipulate kernels
"""
__all__ = [
"create_pswf_convolutionfunction",
"create_box_convolutionfunction",
"create_awterm_convolutionfunction",
"create_vpterm_convolutionfunction",
]
import logging
import numpy
from ska_sdp_datamodels.gridded_visibility import create_convolutionfunction_from_image
from ska_sdp_datamodels.image.image_create import Image
from ska_sdp_func_python.fourier_transforms import coordinates, grdsf
from ska_sdp_func_python.image.operations import (
fft_image_to_griddata_with_wcs,
pad_image,
)
from ska_sdp_func_python.imaging.primary_beams import convert_azelvp_to_radec
from rascil.processing_components.image.operations import create_w_term_like
log = logging.getLogger("rascil-logger")
[docs]
def create_box_convolutionfunction(
im, oversampling=1, support=1, polarisation_frame=None
):
"""Fill a box car function into a ConvolutionFunction
Also returns the griddata correction function as an image
:param im: Image template
:param oversampling: Oversampling of the convolution function in uv space
:return: griddata correction Image, griddata kernel as ConvolutionFunction
"""
cf = create_convolutionfunction_from_image(
im, oversampling=1, support=4, polarisation_frame=polarisation_frame
)
cf["pixels"].data[...] = 0.0 + 0.0j
cf["pixels"].data[..., 2, 2] = 1.0 + 0.0j
# Now calculate the griddata correction function as an image with the same
# coordinates as the image which is necessary so that the correction function
# can be applied directly to the image
nx = im["pixels"].data.shape[-1]
nu = numpy.abs(coordinates(nx))
gcf1d = numpy.sinc(nu)
gcf = numpy.outer(gcf1d, gcf1d)
gcf = 1.0 / gcf
gcf_data = numpy.zeros_like(im["pixels"].data)
gcf_data[...] = gcf[numpy.newaxis, numpy.newaxis, ...]
gcf_image = Image.constructor(
data=gcf_data,
polarisation_frame=im.image_acc.polarisation_frame,
wcs=im.image_acc.wcs,
)
assert cf["pixels"].data.dtype == "complex", cf["pixels"].data.dtype
assert gcf_image["pixels"].data.dtype == "float", gcf_image["pixels"].data.dtype
return gcf_image, cf
[docs]
def create_pswf_convolutionfunction(
im, oversampling=127, support=8, polarisation_frame=None
):
"""Fill an Anti-Aliasing filter into a ConvolutionFunction
Fill the Prolate Spheroidal Wave Function into a GriData with the specified
oversampling. Only the inner non-zero part is retained
Also returns the griddata correction function as an image
:param im: Image template
:param oversampling: Oversampling of the convolution function in uv space
:return: griddata correction Image, griddata kernel as ConvolutionFunction
"""
if oversampling % 2 == 0:
oversampling += 1
log.info(
"Setting oversampling to next greatest odd number {}".format(oversampling)
)
width = support - 2
# Calculate the convolution kernel. We oversample in u,v space by the
# factor oversampling
cf = create_convolutionfunction_from_image(
im,
oversampling=oversampling,
support=support,
polarisation_frame=polarisation_frame,
)
kernel = numpy.zeros([oversampling, support])
for grid in range(1, support - 1):
for subsample in range(oversampling):
nu = (grid - support // 2) - (subsample - oversampling // 2) / oversampling
kernel[subsample, grid] = grdsf([nu / (width // 2)])[1]
kernel /= numpy.sum(numpy.real(kernel[oversampling // 2, :]))
nchan, npol, _, _ = im["pixels"].data.shape
cf["pixels"].data = numpy.zeros(
[nchan, npol, 1, oversampling, oversampling, support, support]
).astype("complex")
for y in range(oversampling):
for x in range(oversampling):
cf["pixels"].data[:, :, 0, y, x, :, :] = numpy.outer(
kernel[y, :], kernel[x, :]
)[numpy.newaxis, numpy.newaxis, ...]
for y in range(oversampling):
for x in range(oversampling):
norm = numpy.sum(
numpy.real(cf["pixels"].data[:, :, 0, y, x, :, :]), axis=(-2, -1)
)[..., numpy.newaxis, numpy.newaxis]
cf["pixels"].data[:, :, 0, y, x, :, :] /= norm
# Now calculate the griddata correction function as an image with the same
# coordinates as the image which is necessary so that the correction function
# can be applied directly to the image
nx = im["pixels"].data.shape[-1]
nu = numpy.abs(2.0 * coordinates(nx))
gcf1d = grdsf(nu)[0]
gcf = numpy.outer(gcf1d, gcf1d)
gcf[gcf > 0.0] = gcf.max() / gcf[gcf > 0.0]
gcf_data = numpy.zeros_like(im["pixels"].data)
gcf_data[...] = gcf[numpy.newaxis, numpy.newaxis, ...]
gcf_image = Image.constructor(
data=gcf_data,
polarisation_frame=im.image_acc.polarisation_frame,
wcs=im.image_acc.wcs,
)
assert cf["pixels"].data.dtype == "complex", cf["pixels"].data.dtype
assert gcf_image["pixels"].data.dtype == im["pixels"].data.dtype, gcf[
"pixels"
].data.dtype
return gcf_image, cf
[docs]
def create_awterm_convolutionfunction(
im,
make_pb=None,
nw=1,
wstep=1e15,
oversampling=9,
support=8,
use_aaf=True,
maxsupport=512,
pa=None,
normalise=True,
polarisation_frame=None,
):
"""Fill AW projection kernel into a GridData.
:param im: Image template
:param make_pb: Function to make the primary beam model image (hint: use a partial)
:param nw: Number of w planes
:param wstep: Step in w (wavelengths)
:param oversampling: Oversampling of the convolution function in uv space
:return: griddata correction Image, griddata kernel as GridData
"""
if oversampling % 2 == 0:
oversampling += 1
log.info(
"Setting oversampling to next greatest odd number {}".format(oversampling)
)
d2r = numpy.pi / 180.0
# We only need the griddata correction function for the PSWF so we make
# it for the shape of the image
nchan, npol, _, onx = im["pixels"].data.shape
# Calculate the template convolution kernel.
cf = create_convolutionfunction_from_image(
im,
nw=nw,
wstep=wstep,
oversampling=oversampling,
support=support,
polarisation_frame=polarisation_frame,
)
if nw > 1 and numpy.abs(wstep) > 0.0:
w_list = cf.convolutionfunction_acc.cf_wcs.sub([5]).wcs_pix2world(range(nw), 0)[
0
]
else:
w_list = [0.0]
assert oversampling > 0
nx = max(maxsupport, 2 * oversampling * support)
ny = max(maxsupport, 2 * oversampling * support)
qnx = nx // oversampling
qny = ny // oversampling
cf["pixels"].data[...] = 0.0
ccell = onx * numpy.abs(d2r * im.image_acc.wcs.wcs.cdelt[0]) / qnx
subim_wcs = im.image_acc.wcs.deepcopy()
subim_wcs.wcs.cdelt[0] = -ccell / d2r
subim_wcs.wcs.cdelt[1] = +ccell / d2r
subim_wcs.wcs.crpix[0] = qnx // 2 + 1.0
subim_wcs.wcs.crpix[1] = qny // 2 + 1.0
subim = Image.constructor(
data=numpy.zeros([nchan, npol, qny, qnx]),
polarisation_frame=polarisation_frame,
wcs=subim_wcs,
)
if use_aaf:
this_pswf_gcf, _ = create_pswf_convolutionfunction(
subim, oversampling=1, support=6, polarisation_frame=polarisation_frame
)
norm = 1.0 / this_pswf_gcf["pixels"].data
else:
norm = 1.0
if make_pb is not None:
pb = make_pb(subim)
if pa is not None:
rpb = convert_azelvp_to_radec(pb, subim, pa)
else:
rpb = convert_azelvp_to_radec(pb, subim, 0.0)
norm *= rpb["pixels"].data
# We might need to work with a larger image
padded_shape = [nchan, npol, ny, nx]
thisplane = subim.copy(deep=True)
thisplane["pixels"].data = numpy.zeros(
thisplane["pixels"].data.shape, dtype="complex"
)
scf = cf["pixels"].data[...]
for z, w in enumerate(w_list):
thisplane["pixels"].data[...] = 0.0 + 0.0j
thisplane = create_w_term_like(thisplane, w, dopol=True)
thisplane["pixels"].data *= norm
paddedplane = pad_image(thisplane, padded_shape)
grid = fft_image_to_griddata_with_wcs(paddedplane)
gd = grid["pixels"].data
ycen, xcen = ny // 2, nx // 2
for y in range(oversampling):
ybeg = y + ycen + (support * oversampling) // 2 - oversampling // 2
yend = y + ycen - (support * oversampling) // 2 - oversampling // 2
assert ybeg >= 0, "Convolutionfunction does not fit into image"
assert yend < ny, "Convolutionfunction does not fit into image"
for x in range(oversampling):
xbeg = x + xcen + (support * oversampling) // 2 - oversampling // 2
xend = x + xcen - (support * oversampling) // 2 - oversampling // 2
assert xbeg >= 0, "Convolutionfunction does not fit into image"
assert xend < nx, "Convolutionfunction does not fit into image"
scf[..., z, y, x, :, :] = gd[
..., ybeg:yend:-oversampling, xbeg:xend:-oversampling
]
cf["pixels"].data[...] = scf
for ipol in range(1, npol):
cf["pixels"].data[:, ipol, ...] = cf["pixels"].data[:, 0, ...]
if normalise:
norm = numpy.zeros([nchan, npol, oversampling, oversampling])
for y in range(oversampling):
for x in range(oversampling):
norm[..., y, x] = numpy.sum(
numpy.real(cf["pixels"].data[:, :, 0, y, x, :, :]), axis=(-2, -1)
)
for z, _ in enumerate(w_list):
for y in range(oversampling):
for x in range(oversampling):
cf["pixels"].data[:, :, z, y, x] /= norm[..., y, x][
..., numpy.newaxis, numpy.newaxis
]
cf["pixels"].data = numpy.conjugate(cf["pixels"].data)
if use_aaf:
pswf_gcf, _ = create_pswf_convolutionfunction(
im, oversampling=1, support=6, polarisation_frame=polarisation_frame
)
else:
pswf_gcf = Image.constructor(
data=numpy.zeros_like(im["pixels"].data),
polarisation_frame=im.image_acc.polarisation_frame,
wcs=im.image_acc.wcs,
clean_beam=im.attrs["clean_beam"],
)
pswf_gcf["pixels"].data[...] = 1.0
assert cf["pixels"].data.dtype == "complex", cf["pixels"].data.dtype
assert pswf_gcf["pixels"].data.dtype == im["pixels"].data.dtype, pswf_gcf[
"pixels"
].data.dtype
return pswf_gcf, cf
[docs]
def create_vpterm_convolutionfunction(
im,
make_vp=None,
oversampling=8,
support=6,
use_aaf=False,
maxsupport=512,
pa=None,
normalise=True,
polarisation_frame=None,
):
"""Fill voltage pattern kernel projection kernel into a GridData.
The makes the convolution function for gridding polarised data with a voltage
pattern.
:param im: Image template
:param make_vp: Function to make the voltage pattern model image
(hint: use a partial)
:param oversampling: Oversampling of the convolution function in uv space
:return: griddata correction Image, griddata kernel as GridData
"""
if oversampling % 2 == 0:
log.info(
"Setting oversampling to next greatest odd number {}".format(oversampling)
)
oversampling += 1
d2r = numpy.pi / 180.0
# We only need the griddata correction function for the PSWF so we make
# it for the shape of the image
nchan, npol, _, onx = im["pixels"].data.shape
# Calculate the template convolution kernel.
cf = create_convolutionfunction_from_image(
im,
oversampling=oversampling,
support=support,
polarisation_frame=polarisation_frame,
)
cf_shape = list(cf["pixels"].data.shape)
cf["pixels"].data = numpy.zeros(cf_shape).astype("complex")
assert oversampling > 0
nx = max(maxsupport, 2 * oversampling * support)
ny = max(maxsupport, 2 * oversampling * support)
qnx = nx // oversampling
qny = ny // oversampling
cf["pixels"].data[...] = 0.0
ccell = onx * numpy.abs(d2r * im.image_acc.wcs.wcs.cdelt[0]) / qnx
wcs = im.image_acc.wcs
wcs.wcs.cdelt[0] = -ccell / d2r
wcs.wcs.cdelt[1] = +ccell / d2r
wcs.wcs.crpix[0] = qnx // 2 + 1.0
wcs.wcs.crpix[1] = qny // 2 + 1.0
subim = Image.constructor(
data=numpy.zeros([nchan, npol, qny, qnx]),
polarisation_frame=im.image_acc.polarisation_frame,
wcs=wcs,
)
vp = make_vp(subim)
assert (
numpy.max(numpy.abs(vp["pixels"])) > 0.0
), "Construction of voltage pattern failed: empty image {}".format(vp)
if pa is not None:
rvp = convert_azelvp_to_radec(vp, subim, pa)
else:
rvp = convert_azelvp_to_radec(vp, subim, 0.0)
assert (
numpy.max(numpy.abs(rvp["pixels"])) > 0.0
), "Projection from azel to radec failed: empty image {}".format(rvp)
if use_aaf:
this_pswf_gcf, _ = create_pswf_convolutionfunction(
subim, oversampling=1, support=6, polarisation_frame=polarisation_frame
)
rvp["pixels"].data /= this_pswf_gcf["pixels"].data
# We might need to work with a larger image
padded_shape = [nchan, npol, ny, nx]
paddedplane = pad_image(rvp, padded_shape)
paddedplane = fft_image_to_griddata_with_wcs(paddedplane)
ycen, xcen = ny // 2, nx // 2
for y in range(oversampling):
ybeg = y + ycen + (support * oversampling) // 2 - oversampling // 2
yend = y + ycen - (support * oversampling) // 2 - oversampling // 2
# vv = range(ybeg, yend, -oversampling)
for x in range(oversampling):
xbeg = x + xcen + (support * oversampling) // 2 - oversampling // 2
xend = x + xcen - (support * oversampling) // 2 - oversampling // 2
# uu = range(xbeg, xend, -oversampling)
cf["pixels"].data[..., 0, y, x, :, :] = paddedplane["pixels"].data[
..., ybeg:yend:-oversampling, xbeg:xend:-oversampling
]
if normalise:
cf["pixels"].data /= numpy.sum(
numpy.real(
cf["pixels"].data[0, 0, 0, oversampling // 2, oversampling // 2, :, :]
)
)
cf["pixels"].data = numpy.conjugate(cf["pixels"].data)
if use_aaf:
pswf_gcf, _ = create_pswf_convolutionfunction(
im, oversampling=1, support=6, polarisation_frame=polarisation_frame
)
else:
pswf_gcf = Image.constructor(
data=numpy.zeros_like(im["pixels"].data),
polarisation_frame=im.image_acc.polarisation_frame,
wcs=im.image_acc.wcs,
clean_beam=im.attrs["clean_beam"],
)
pswf_gcf["pixels"].data[...] = 1.0
assert cf["pixels"].data.dtype == "complex", cf["pixels"].data.dtype
assert pswf_gcf["pixels"].data.dtype == "float", pswf_gcf["pixels"].data.dtype
return pswf_gcf, cf