Example Usage

An example script is also available in jones_solvers/examples/scripts/run_jones_solvers.py. This simulates a simple observation and calls function solve_jones() to find antenna-based Jones matrices.

The package uses the RASCIL classes for visibilities, array metadata generation and polarisation support.

import os
import sys
import time
import copy

import numpy as np

from numpy import sin as sin
from numpy import cos as cos

import matplotlib.pyplot as plt

from astropy.coordinates import SkyCoord
from astropy.time import Time
import astropy.units as u
import astropy.constants as consts

from rascil.data_models import PolarisationFrame
from rascil.processing_components import create_named_configuration
from rascil.processing_components import create_blockvisibility
from rascil.processing_components.util.geometry import calculate_azel
from rascil.processing_components.util.coordinate_support import lmn_to_skycoord
from rascil.processing_components.calibration.operations import create_gaintable_from_blockvisibility

from jones_solvers.processing_components import solve_jones

import logging

log = logging.getLogger()
log.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler(sys.stdout))

mpl_logger = logging.getLogger("matplotlib")
mpl_logger.setLevel(logging.WARNING)

np.set_printoptions(linewidth=120)

# ------------------------- #

log.info("Init and predicting blockvisibility")

lowcore = create_named_configuration('LOWBD2-CORE')

# how can we extract these from lowcore?
lon = 116.76444824 * np.pi / 180.
lat = -26.82472208 * np.pi / 180.

vntimes = 1
integration_time = 1.0
times = (np.pi / 43200.0) * np.arange(0,vntimes*integration_time, integration_time)

vnchan = 1
channel_bandwidth = 1.0e6
frequency = np.arange(100.0e6, 100.0e6+vnchan*channel_bandwidth, channel_bandwidth)
channel_bandwidth = np.array(vnchan*[channel_bandwidth])

phasecentre = SkyCoord(ra=+15.0 * u.deg, dec=-45.0 * u.deg, frame='icrs', equinox='J2000')

# create empty blockvis with intrumental polarisation (XX, XY, YX, YY)
xVis = create_blockvisibility(lowcore, times, frequency, channel_bandwidth=channel_bandwidth, phasecentre=phasecentre,
                              integration_time=integration_time, polarisation_frame=PolarisationFrame("linear"),
                              weight=1.0)

assert xVis['vis'].shape[0]   == vntimes, "Shape inconsistent with specified number of times"
assert xVis['vis'].shape[2]   == vnchan, "Shape inconsistent with specified number of channels"
assert xVis['vis'].shape[3]   == 4, "Shape inconsistent with specified number of polarisations"
assert xVis['vis'].shape[0:3] == xVis["uvw_lambda"].data.shape[0:3], "vis & uvw_lambda have inconsistent shapes"
assert all(xVis['polarisation'].data == ['XX', 'XY', 'YX', 'YY']), "Polarisations inconsistent with expectations"

nvis = xVis["baselines"].shape[0]

To help with flexibility in the early stages of development, the package does not yet use RASCIL predict functionality. It also currently just uses a simple short-dipole beam model with a Gaussian taper. These will be replaced with standard sky and beam models in the future.

# Generate a sky model from Nsrc point-source components, randomly distributed around the phase centre

Nsrc = 10

dist_source_max = 2.5 * np.pi/180.0

# sky model: randomise sources across the field
theta = 2.*np.pi * np.random.rand(Nsrc)
phi = dist_source_max * np.sqrt( np.random.rand(Nsrc) )
l = sin(theta) * sin(phi)
m = cos(theta) * sin(phi)
n = np.sqrt(1-l*l-m*m) - 1
jy = 10 * np.random.rand(Nsrc)
for src in range(0,Nsrc):

    # analytic response of short dipoles aligned NS & EW to sky xy polarisations
    # with an approx Gaussian taper for a 35m station
    srcdir = lmn_to_skycoord(np.array([l[src],m[src],n[src]]), phasecentre)
    ra  = srcdir.ra.value * np.pi / 180.
    dec = srcdir.dec.value * np.pi / 180.
    sep = srcdir.separation(phasecentre).value * np.pi / 180.
    diam = 35.;

    # need to set ha,dec, but need to be in time,freq loop
    for t in range(0,len(xVis['datetime'])):

        utc_time = xVis['datetime'].data[t]
        #azel = calculate_azel(location, utc_time, srcdir);
        lst = Time(utc_time, location=(lon * u.rad, lat * u.rad)).sidereal_time('mean').value * np.pi / 12.
        ha = lst - ra

        J00 =  cos(lat)*cos(dec) + sin(lat)*sin(dec)*cos(ha)
        J01 = -sin(lat)*sin(ha)
        J10 =  sin(dec)*sin(ha)
        J11 =  cos(ha)
        J = np.array([[J00,J01],[J10,J11]], "complex")
        # components are unpolarised, so can form power product now
        JJ = J @ J.conj().T

        for f in range(0,len(xVis['frequency'])):

            wl = consts.c.value / xVis['frequency'].data[f];
            sigma = wl/diam / 2.355;
            gain = np.exp( -sep*sep/(2*sigma*sigma) );

            srcbeam = JJ * gain
            log.debug(src,sep*180./np.pi,l[src]*180./np.pi,m[src]*180./np.pi,n[src],np.array(srcbeam).flatten())

            # vis (time, baselines, frequency, polarisation) complex128

            uvw = xVis['uvw_lambda'].data[t,:,f,:]
            phaser = 0.5*jy[src] * np.exp( 2j*np.pi * (uvw[:,0]*l[src] + uvw[:,1]*m[src] + uvw[:,2]*n[src]) )

            assert all(xVis['polarisation'].data == ['XX', 'XY', 'YX', 'YY']), xVis['polarisation'].data

            xVis['vis'].data[t,:,f,0] += phaser * srcbeam[0,0]
            xVis['vis'].data[t,:,f,1] += phaser * srcbeam[0,1]
            xVis['vis'].data[t,:,f,2] += phaser * srcbeam[1,0]
            xVis['vis'].data[t,:,f,3] += phaser * srcbeam[1,1]

Apply calibration factors to the visibilities and add noise

stations = lowcore["stations"]
nstations = stations.shape[0]

stn1 = xVis["antenna1"].data
stn2 = xVis["antenna2"].data

# set up station-based Jones matrices (gains and leakages)
#  - will change to gaintable data model
Jsigma = 0.1

# generate a gaintable with a single timeslice (is in sec, so should be > 43200 for a 12 hr observation)
#  - could alternatively just use the first time step in the call
#  - using Jones type "G" because type "P" is unknown
gt_true = create_gaintable_from_blockvisibility(xVis, timeslice=1e6, jones_type="G")
gt_fit  = create_gaintable_from_blockvisibility(xVis, timeslice=1e6, jones_type="G")

# set up references to the data
Jt = gt_true["gain"].data
Jm = gt_fit["gain"].data

for stn in range(0,nstations):

    # generate the starting model station gain error matrices
    Jm[0,stn,0,:,:] = np.eye(2, dtype=complex)

    # generate the true station gain error matrices. Set to model matrices plus some Gaussian distortions
    Jt[0,stn,0,:,:] = Jm[0,stn,0,:,:] + Jsigma * ( np.random.randn(2,2) + 1j*np.random.randn(2,2) )

# Make copies of the visibilties and multiply in the new Jones matrices
#  - assuming that unknown calibration errors are constant over time and frequency samples (i.e. is a snapshot)

# Make copies of the vis and apply calibration factors and noise
modelVis     = xVis.copy(deep=True)
noiselessVis = xVis.copy(deep=True)

for t in range(0,len(xVis['datetime'])):
    for f in range(0,len(xVis['frequency'])):

        # set up references to the data
        modelTmp     = modelVis['vis'].data[t,:,f,:]
        noiselessTmp = noiselessVis['vis'].data[t,:,f,:]

        for k in range(0,nvis):

            vis_in = np.reshape(modelTmp[k,:],(2,2))
            vis_out = Jm[0,stn1[k],0] @ vis_in @ Jm[0,stn2[k],0].conj().T
            modelTmp[k,:] = np.reshape(np.array(vis_out),(4))

            vis_in = np.reshape(noiselessTmp[k,:],(2,2))
            vis_out = Jt[0,stn1[k],0] @ vis_in @ Jt[0,stn2[k],0].conj().T
            noiselessTmp[k,:] = np.reshape(np.array(vis_out),(4))

# Add noise to a visibility
#  - will change to RASCIL addnoise_visibility, but for now just add gaussian noise so there is more control over SNR

observedVis = noiselessVis.copy(deep=True)

sigma = 0.01
shape = observedVis['vis'].shape
assert len(shape) == 4, "require 4 dimensions for blockvisibilty"

observedVis['vis'].data += sigma * ( np.random.randn(shape[0],shape[1],shape[2],shape[3]) +
                                     np.random.randn(shape[0],shape[1],shape[2],shape[3]) * 1j )

if sigma > 0: observedVis['weight'].data *= np.ones(shape) / (sigma * sigma)

And then the jones_solvers package is used to solve for the Jones matrices:

gt1 = gt_fit.copy(deep=True)
modelVis1 = modelVis.copy(deep=True)
chisq1 = solve_jones(observedVis, modelVis1, gt1, testvis=noiselessVis, algorithm=1)
gt2 = gt_fit.copy(deep=True)
modelVis2 = modelVis.copy(deep=True)
chisq2 = solve_jones(observedVis, modelVis2, gt2, testvis=noiselessVis, accum_opt=2)
ax = plt.subplot(111)
ax.set_yscale('log')
plt.plot( chisq1, label="RTS" )
plt.plot( chisq2, label="Yanda" )
plt.xlabel("iteration")
plt.ylabel("chisq error")
plt.legend(loc=1, fontsize=11, frameon=False)
plt.grid()
plt.show()