import numpy
from ska_sdp_datamodels.image.image_model import Image
from ska_sdp_datamodels.science_data_model.polarisation_model import PolarisationFrame
from ska_sdp_datamodels.visibility.vis_model import Visibility
__all__ = ["get_rowmap", "get_polarisation_map", "get_frequency_map"]
[docs]
def get_frequency_map(vis, im: Image = None):
"""Map channels from visibilities to image"""
# Find the unique frequencies in the visibility
ufrequency = numpy.unique(vis.frequency)
vnchan = len(ufrequency)
if im is None:
spectral_mode = "channel"
if vis.frequency_map is None:
vfrequencymap = get_rowmap(vis.frequency, ufrequency)
vis.frequencymap = vfrequencymap
else:
vfrequencymap = vis.frequency_map
assert (
min(vfrequencymap) >= 0
), "Invalid frequency map: visibility channel < 0: %s" % str(vfrequencymap)
elif im["pixels"].data.shape[0] == 1 and vnchan >= 1:
assert im.image_acc.is_canonical()
spectral_mode = "mfs"
vfrequencymap = numpy.zeros_like(vis.frequency, dtype="int")
else:
assert im.image_acc.is_canonical()
# We can map these to image channels
v2im_map = (
im.image_acc.wcs.sub(["spectral"])
.wcs_world2pix(ufrequency, 0)[0]
.astype("int")
)
spectral_mode = "channel"
nrows = len(vis.frequency)
row2vis = numpy.array(get_rowmap(vis.frequency, ufrequency))
vfrequencymap = [v2im_map[row2vis[row]] for row in range(nrows)]
assert (
min(vfrequencymap) >= 0
), "Invalid frequency map: image channel < 0 %s" % str(vfrequencymap)
assert (
max(vfrequencymap) < im["pixels"].data.shape[0]
), "Invalid frequency map: image channel > number image channels %s" % str(
vfrequencymap
)
return spectral_mode, vfrequencymap
[docs]
def get_polarisation_map(vis: Visibility, im: Image = None):
"""Get the mapping of visibility polarisations to image polarisations"""
assert isinstance(im, Image), im
assert im.image_acc.is_canonical()
if vis.visibility_acc.polarisation_frame == im.image_acc.polarisation_frame:
if vis.visibility_acc.polarisation_frame == PolarisationFrame("stokesI"):
return "stokesI->stokesI", lambda pol: 0
elif vis.visibility_acc.polarisation_frame == PolarisationFrame("stokesIQUV"):
return "stokesIQUV->stokesIQUV", lambda pol: pol
return "unknown", lambda pol: pol
[docs]
def get_rowmap(col, ucol=None):
"""Map to unique cols
:param col: Data column
:param ucol: Unique values in col
"""
pdict = {}
def phash(f):
return numpy.round(f).astype("int")
if ucol is None:
ucol = numpy.unique(col)
for i, f in enumerate(ucol):
pdict[phash(f)] = i
# vmap = []
# vmap = [pdict[phash(p)] for p in col]
# for p in col:
# vmap.append(pdict[phash(p)])
n_ucol = numpy.round(col).astype(("int"))
vmap = numpy.vectorize(pdict.__getitem__)(n_ucol)
return vmap.tolist()