Source code for rascil.processing_components.visibility.base

"""
Base functions to create and export Visibility
from UVFits files.
"""

import logging
import os
import re

import numpy
import pandas
from astropy import units as u
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.io import fits
from astropy.time import Time
from ska_sdp_datamodels.configuration.config_model import Configuration
from ska_sdp_datamodels.physical_constants import C_M_S
from ska_sdp_datamodels.science_data_model.polarisation_model import PolarisationFrame
from ska_sdp_datamodels.visibility.vis_model import Visibility
from ska_sdp_datamodels.visibility.vis_utils import generate_baselines

log = logging.getLogger("rascil-logger")


[docs] def create_visibility_from_uvfits(fitsname, channum=None, antnum=None): """Minimal UVFIT to Visibility converter The UVFITS format is much more general than the RASCIL Visibility so we cut many corners. Creates a list of Visibility's, split by field and spectral window :param fitsname: File name of UVFITS :param channum: range of channels e.g. range(17,32), default is None meaning all :param antnum: the number of antenna :return: """ def find_time_slots(times): """Find the time slots :param times: :return: """ intervals = times[1:] - times[0:-1] integration_time = numpy.median(intervals[intervals > 0.0]) last_time = times[0] time_slots = [] for t in times: if t > last_time + integration_time: last_time = t time_slots.append(last_time) time_slots = numpy.array(time_slots) return time_slots def param_dict(hdul): """Return the dictionary of the random parameters" The keys of the dictionary are the parameter names uppercased for consistency. The values are the column numbers. If multiple parameters have the same name (e.g., DATE) their columns are entered as a list. """ pre = re.compile(r"PTYPE(?P<i>\d+)") res = {} for k, v in hdul.header.items(): m = pre.match(k) if m: vu = v.upper() if vu in res: res[vu] = [res[vu], int(m.group("i"))] else: res[vu] = int(m.group("i")) return res with fits.open(fitsname) as hdul: # Read Spectral Window nspw = hdul[0].header["NAXIS5"] # Read Channel and Frequency Interval freq_ref = hdul[0].header["CRVAL4"] delt_freq = hdul[0].header["CDELT4"] # Real the number of channels in one spectral window channels = hdul[0].header["NAXIS4"] freq = numpy.zeros([nspw, channels]) # Read Frequency or IF freqhdulname = "AIPS FQ" sdhu = hdul.index_of(freqhdulname) if_freq = hdul[sdhu].data["IF FREQ"].ravel() for i in range(nspw): temp = numpy.array( [if_freq[i] + freq_ref + delt_freq * ff for ff in range(channels)] ) freq[i, :] = temp[:] freq_delt = numpy.ones(channels) * delt_freq if channum is None: channum = range(channels) # Read time. We are trying to find a discrete set of times to use in # Visibility. bvtimes = Time(hdul[0].data["DATE"], hdul[0].data["_DATE"], format="jd") bv_times = find_time_slots(bvtimes.jd) ntimes = len(bv_times) # Get Antenna antennahdulname = "AIPS AN" adhu = hdul.index_of(antennahdulname) try: antenna_name = hdul[adhu].data["ANNAME"] antenna_name = antenna_name.encode("ascii", "ignore") except ValueError: antenna_name = None antenna_xyz = hdul[adhu].data["STABXYZ"] antenna_mount = hdul[adhu].data["MNTSTA"] antenna_offset = hdul[adhu].data["STAXOF"] try: antenna_diameter = hdul[adhu].data["DIAMETER"] except (ValueError, KeyError): antenna_diameter = None # To reading some UVFITS with wrong numbers of antenna if antnum is not None and antenna_name is not None: antenna_name = antenna_name[:antnum] antenna_xyz = antenna_xyz[:antnum] antenna_mount = antenna_mount[:antnum] antenna_offset = antenna_offset[:antnum] if antenna_diameter is not None: antenna_diameter = antenna_diameter[:antnum] nants = len(antenna_xyz) baselines = pandas.MultiIndex.from_tuples( generate_baselines(nants), names=("antenna1", "antenna2") ) nbaselines = len(baselines) # Put offset into same shape as for MS antenna_offset = numpy.c_[ antenna_offset, numpy.zeros(nants), numpy.zeros(nants) ] # Get polarisation info npol = hdul[0].header["NAXIS3"] corr_type = numpy.arange(hdul[0].header["NAXIS3"]) - ( hdul[0].header["CRPIX3"] - 1 ) corr_type *= hdul[0].header["CDELT3"] corr_type += hdul[0].header["CRVAL3"] # xx yy xy yx # These correspond to the CASA Stokes enumerations if numpy.array_equal(corr_type, [1, 2, 3, 4]): polarisation_frame = PolarisationFrame("stokesIQUV") elif numpy.array_equal(corr_type, [1, 4]): polarisation_frame = PolarisationFrame("stokesIV") elif numpy.array_equal(corr_type, [1, 2]): polarisation_frame = PolarisationFrame("stokesIQ") elif numpy.array_equal(corr_type, [-1, -2, -3, -4]): polarisation_frame = PolarisationFrame("circular") elif numpy.array_equal(corr_type, [-1, -4]): polarisation_frame = PolarisationFrame("circularnp") elif numpy.array_equal(corr_type, [-5, -6, -7, -8]): polarisation_frame = PolarisationFrame("linear") elif numpy.array_equal(corr_type, [-5, -8]): polarisation_frame = PolarisationFrame("linearnp") else: raise KeyError(f"Polarisation not understood: {corr_type}") configuration = Configuration.constructor( name="", location=None, names=antenna_name, xyz=antenna_xyz, mount=antenna_mount, frame=None, receptor_frame=polarisation_frame, diameter=antenna_diameter, offset=antenna_offset, stations=antenna_name, ) # Get RA and DEC phase_center_ra_degrees = float(hdul[0].header["CRVAL6"]) phase_center_dec_degrees = float(hdul[0].header["CRVAL7"]) # Get phasecentres phasecentre = SkyCoord( ra=phase_center_ra_degrees * u.deg, dec=phase_center_dec_degrees * u.deg, frame="icrs", equinox="J2000", ) # Get UVW d = param_dict(hdul[0]) if "UU" in d: uu = hdul[0].data["UU"] vv = hdul[0].data["VV"] ww = hdul[0].data["WW"] else: uu = hdul[0].data["UU---SIN"] vv = hdul[0].data["VV---SIN"] ww = hdul[0].data["WW---SIN"] _vis = hdul[0].data["DATA"] row = 0 nchan = len(channum) vis_list = [] for spw_index in range(nspw): bv_vis = numpy.zeros([ntimes, nbaselines, nchan, npol]).astype("complex") bv_flags = numpy.zeros([ntimes, nbaselines, nchan, npol]).astype("int") bv_weight = numpy.zeros([ntimes, nbaselines, nchan, npol]) bv_uvw = numpy.zeros([ntimes, nbaselines, 3]) for time_index, _ in enumerate(bv_times): for antenna1 in range(nants - 1): for antenna2 in range(antenna1, nants): ibaseline = baselines.get_loc((antenna1, antenna2)) for channel_no, channel_index in enumerate(channum): for pol_index in range(npol): bv_vis[ time_index, ibaseline, channel_no, pol_index, ] = complex( _vis[ row, :, :, spw_index, channel_index, pol_index, 0, ], _vis[ row, :, :, spw_index, channel_index, pol_index, 1, ], ) bv_weight[ time_index, ibaseline, channel_no, pol_index, ] = _vis[ row, :, :, spw_index, channel_index, pol_index, 2, ] bv_uvw[time_index, ibaseline, 0] = uu[row] * C_M_S bv_uvw[time_index, ibaseline, 1] = vv[row] * C_M_S bv_uvw[time_index, ibaseline, 2] = ww[row] * C_M_S row += 1 # Convert negative weights to flags bv_flags[bv_weight < 0.0] = 1 bv_weight[bv_weight < 0.0] = 0.0 vis_list.append( Visibility.constructor( uvw=bv_uvw, time=bv_times, baselines=baselines, frequency=freq[spw_index][channum], channel_bandwidth=freq_delt[channum], vis=bv_vis, flags=bv_flags, weight=bv_weight, configuration=configuration, phasecentre=phasecentre, polarisation_frame=polarisation_frame, ) ) return vis_list