"""
Base functions to create and export Visibility
from UVFits files.
"""
import logging
import os
import re
import numpy
import pandas
from astropy import units as u
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.io import fits
from astropy.time import Time
from ska_sdp_datamodels.configuration.config_model import Configuration
from ska_sdp_datamodels.physical_constants import C_M_S
from ska_sdp_datamodels.science_data_model.polarisation_model import PolarisationFrame
from ska_sdp_datamodels.visibility.vis_model import Visibility
from ska_sdp_datamodels.visibility.vis_utils import generate_baselines
log = logging.getLogger("rascil-logger")
[docs]
def create_visibility_from_uvfits(fitsname, channum=None, antnum=None):
"""Minimal UVFIT to Visibility converter
The UVFITS format is much more general than the RASCIL Visibility
so we cut many corners.
Creates a list of Visibility's, split by field and spectral window
:param fitsname: File name of UVFITS
:param channum: range of channels e.g. range(17,32), default is
None meaning all
:param antnum: the number of antenna
:return:
"""
def find_time_slots(times):
"""Find the time slots
:param times:
:return:
"""
intervals = times[1:] - times[0:-1]
integration_time = numpy.median(intervals[intervals > 0.0])
last_time = times[0]
time_slots = []
for t in times:
if t > last_time + integration_time:
last_time = t
time_slots.append(last_time)
time_slots = numpy.array(time_slots)
return time_slots
def param_dict(hdul):
"""Return the dictionary of the random parameters"
The keys of the dictionary are the parameter names
uppercased for consistency. The values are the column numbers.
If multiple parameters have the same name (e.g., DATE) their
columns are entered as a list.
"""
pre = re.compile(r"PTYPE(?P<i>\d+)")
res = {}
for k, v in hdul.header.items():
m = pre.match(k)
if m:
vu = v.upper()
if vu in res:
res[vu] = [res[vu], int(m.group("i"))]
else:
res[vu] = int(m.group("i"))
return res
with fits.open(fitsname) as hdul:
# Read Spectral Window
nspw = hdul[0].header["NAXIS5"]
# Read Channel and Frequency Interval
freq_ref = hdul[0].header["CRVAL4"]
delt_freq = hdul[0].header["CDELT4"]
# Real the number of channels in one spectral window
channels = hdul[0].header["NAXIS4"]
freq = numpy.zeros([nspw, channels])
# Read Frequency or IF
freqhdulname = "AIPS FQ"
sdhu = hdul.index_of(freqhdulname)
if_freq = hdul[sdhu].data["IF FREQ"].ravel()
for i in range(nspw):
temp = numpy.array(
[if_freq[i] + freq_ref + delt_freq * ff for ff in range(channels)]
)
freq[i, :] = temp[:]
freq_delt = numpy.ones(channels) * delt_freq
if channum is None:
channum = range(channels)
# Read time. We are trying to find a discrete set of times to use in
# Visibility.
bvtimes = Time(hdul[0].data["DATE"], hdul[0].data["_DATE"], format="jd")
bv_times = find_time_slots(bvtimes.jd)
ntimes = len(bv_times)
# Get Antenna
antennahdulname = "AIPS AN"
adhu = hdul.index_of(antennahdulname)
try:
antenna_name = hdul[adhu].data["ANNAME"]
antenna_name = antenna_name.encode("ascii", "ignore")
except ValueError:
antenna_name = None
antenna_xyz = hdul[adhu].data["STABXYZ"]
antenna_mount = hdul[adhu].data["MNTSTA"]
antenna_offset = hdul[adhu].data["STAXOF"]
try:
antenna_diameter = hdul[adhu].data["DIAMETER"]
except (ValueError, KeyError):
antenna_diameter = None
# To reading some UVFITS with wrong numbers of antenna
if antnum is not None and antenna_name is not None:
antenna_name = antenna_name[:antnum]
antenna_xyz = antenna_xyz[:antnum]
antenna_mount = antenna_mount[:antnum]
antenna_offset = antenna_offset[:antnum]
if antenna_diameter is not None:
antenna_diameter = antenna_diameter[:antnum]
nants = len(antenna_xyz)
baselines = pandas.MultiIndex.from_tuples(
generate_baselines(nants), names=("antenna1", "antenna2")
)
nbaselines = len(baselines)
# Put offset into same shape as for MS
antenna_offset = numpy.c_[
antenna_offset, numpy.zeros(nants), numpy.zeros(nants)
]
# Get polarisation info
npol = hdul[0].header["NAXIS3"]
corr_type = numpy.arange(hdul[0].header["NAXIS3"]) - (
hdul[0].header["CRPIX3"] - 1
)
corr_type *= hdul[0].header["CDELT3"]
corr_type += hdul[0].header["CRVAL3"]
# xx yy xy yx
# These correspond to the CASA Stokes enumerations
if numpy.array_equal(corr_type, [1, 2, 3, 4]):
polarisation_frame = PolarisationFrame("stokesIQUV")
elif numpy.array_equal(corr_type, [1, 4]):
polarisation_frame = PolarisationFrame("stokesIV")
elif numpy.array_equal(corr_type, [1, 2]):
polarisation_frame = PolarisationFrame("stokesIQ")
elif numpy.array_equal(corr_type, [-1, -2, -3, -4]):
polarisation_frame = PolarisationFrame("circular")
elif numpy.array_equal(corr_type, [-1, -4]):
polarisation_frame = PolarisationFrame("circularnp")
elif numpy.array_equal(corr_type, [-5, -6, -7, -8]):
polarisation_frame = PolarisationFrame("linear")
elif numpy.array_equal(corr_type, [-5, -8]):
polarisation_frame = PolarisationFrame("linearnp")
else:
raise KeyError(f"Polarisation not understood: {corr_type}")
configuration = Configuration.constructor(
name="",
location=None,
names=antenna_name,
xyz=antenna_xyz,
mount=antenna_mount,
frame=None,
receptor_frame=polarisation_frame,
diameter=antenna_diameter,
offset=antenna_offset,
stations=antenna_name,
)
# Get RA and DEC
phase_center_ra_degrees = float(hdul[0].header["CRVAL6"])
phase_center_dec_degrees = float(hdul[0].header["CRVAL7"])
# Get phasecentres
phasecentre = SkyCoord(
ra=phase_center_ra_degrees * u.deg,
dec=phase_center_dec_degrees * u.deg,
frame="icrs",
equinox="J2000",
)
# Get UVW
d = param_dict(hdul[0])
if "UU" in d:
uu = hdul[0].data["UU"]
vv = hdul[0].data["VV"]
ww = hdul[0].data["WW"]
else:
uu = hdul[0].data["UU---SIN"]
vv = hdul[0].data["VV---SIN"]
ww = hdul[0].data["WW---SIN"]
_vis = hdul[0].data["DATA"]
row = 0
nchan = len(channum)
vis_list = []
for spw_index in range(nspw):
bv_vis = numpy.zeros([ntimes, nbaselines, nchan, npol]).astype("complex")
bv_flags = numpy.zeros([ntimes, nbaselines, nchan, npol]).astype("int")
bv_weight = numpy.zeros([ntimes, nbaselines, nchan, npol])
bv_uvw = numpy.zeros([ntimes, nbaselines, 3])
for time_index, _ in enumerate(bv_times):
for antenna1 in range(nants - 1):
for antenna2 in range(antenna1, nants):
ibaseline = baselines.get_loc((antenna1, antenna2))
for channel_no, channel_index in enumerate(channum):
for pol_index in range(npol):
bv_vis[
time_index,
ibaseline,
channel_no,
pol_index,
] = complex(
_vis[
row,
:,
:,
spw_index,
channel_index,
pol_index,
0,
],
_vis[
row,
:,
:,
spw_index,
channel_index,
pol_index,
1,
],
)
bv_weight[
time_index,
ibaseline,
channel_no,
pol_index,
] = _vis[
row,
:,
:,
spw_index,
channel_index,
pol_index,
2,
]
bv_uvw[time_index, ibaseline, 0] = uu[row] * C_M_S
bv_uvw[time_index, ibaseline, 1] = vv[row] * C_M_S
bv_uvw[time_index, ibaseline, 2] = ww[row] * C_M_S
row += 1
# Convert negative weights to flags
bv_flags[bv_weight < 0.0] = 1
bv_weight[bv_weight < 0.0] = 0.0
vis_list.append(
Visibility.constructor(
uvw=bv_uvw,
time=bv_times,
baselines=baselines,
frequency=freq[spw_index][channum],
channel_bandwidth=freq_delt[channum],
vis=bv_vis,
flags=bv_flags,
weight=bv_weight,
configuration=configuration,
phasecentre=phasecentre,
polarisation_frame=polarisation_frame,
)
)
return vis_list