Example Usage
An example script is also available in jones_solvers/examples/scripts/run_jones_solvers.py
. This simulates a simple
observation and calls function solve_jones()
to find antenna-based Jones matrices.
The package uses the RASCIL classes for visibilities, array metadata generation and polarisation support.
import os
import sys
import time
import copy
import numpy as np
from numpy import sin as sin
from numpy import cos as cos
import matplotlib.pyplot as plt
from astropy.coordinates import SkyCoord
from astropy.time import Time
import astropy.units as u
import astropy.constants as consts
from rascil.data_models import PolarisationFrame
from rascil.processing_components import create_named_configuration
from rascil.processing_components import create_blockvisibility
from rascil.processing_components.util.geometry import calculate_azel
from rascil.processing_components.util.coordinate_support import lmn_to_skycoord
from rascil.processing_components.calibration.operations import create_gaintable_from_blockvisibility
from jones_solvers.processing_components import solve_jones
import logging
log = logging.getLogger()
log.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler(sys.stdout))
mpl_logger = logging.getLogger("matplotlib")
mpl_logger.setLevel(logging.WARNING)
np.set_printoptions(linewidth=120)
# ------------------------- #
log.info("Init and predicting blockvisibility")
lowcore = create_named_configuration('LOWBD2-CORE')
# how can we extract these from lowcore?
lon = 116.76444824 * np.pi / 180.
lat = -26.82472208 * np.pi / 180.
vntimes = 1
integration_time = 1.0
times = (np.pi / 43200.0) * np.arange(0,vntimes*integration_time, integration_time)
vnchan = 1
channel_bandwidth = 1.0e6
frequency = np.arange(100.0e6, 100.0e6+vnchan*channel_bandwidth, channel_bandwidth)
channel_bandwidth = np.array(vnchan*[channel_bandwidth])
phasecentre = SkyCoord(ra=+15.0 * u.deg, dec=-45.0 * u.deg, frame='icrs', equinox='J2000')
# create empty blockvis with intrumental polarisation (XX, XY, YX, YY)
xVis = create_blockvisibility(lowcore, times, frequency, channel_bandwidth=channel_bandwidth, phasecentre=phasecentre,
integration_time=integration_time, polarisation_frame=PolarisationFrame("linear"),
weight=1.0)
assert xVis['vis'].shape[0] == vntimes, "Shape inconsistent with specified number of times"
assert xVis['vis'].shape[2] == vnchan, "Shape inconsistent with specified number of channels"
assert xVis['vis'].shape[3] == 4, "Shape inconsistent with specified number of polarisations"
assert xVis['vis'].shape[0:3] == xVis["uvw_lambda"].data.shape[0:3], "vis & uvw_lambda have inconsistent shapes"
assert all(xVis['polarisation'].data == ['XX', 'XY', 'YX', 'YY']), "Polarisations inconsistent with expectations"
nvis = xVis["baselines"].shape[0]
To help with flexibility in the early stages of development, the package does not yet use RASCIL predict functionality. It also currently just uses a simple short-dipole beam model with a Gaussian taper. These will be replaced with standard sky and beam models in the future.
# Generate a sky model from Nsrc point-source components, randomly distributed around the phase centre
Nsrc = 10
dist_source_max = 2.5 * np.pi/180.0
# sky model: randomise sources across the field
theta = 2.*np.pi * np.random.rand(Nsrc)
phi = dist_source_max * np.sqrt( np.random.rand(Nsrc) )
l = sin(theta) * sin(phi)
m = cos(theta) * sin(phi)
n = np.sqrt(1-l*l-m*m) - 1
jy = 10 * np.random.rand(Nsrc)
for src in range(0,Nsrc):
# analytic response of short dipoles aligned NS & EW to sky xy polarisations
# with an approx Gaussian taper for a 35m station
srcdir = lmn_to_skycoord(np.array([l[src],m[src],n[src]]), phasecentre)
ra = srcdir.ra.value * np.pi / 180.
dec = srcdir.dec.value * np.pi / 180.
sep = srcdir.separation(phasecentre).value * np.pi / 180.
diam = 35.;
# need to set ha,dec, but need to be in time,freq loop
for t in range(0,len(xVis['datetime'])):
utc_time = xVis['datetime'].data[t]
#azel = calculate_azel(location, utc_time, srcdir);
lst = Time(utc_time, location=(lon * u.rad, lat * u.rad)).sidereal_time('mean').value * np.pi / 12.
ha = lst - ra
J00 = cos(lat)*cos(dec) + sin(lat)*sin(dec)*cos(ha)
J01 = -sin(lat)*sin(ha)
J10 = sin(dec)*sin(ha)
J11 = cos(ha)
J = np.array([[J00,J01],[J10,J11]], "complex")
# components are unpolarised, so can form power product now
JJ = J @ J.conj().T
for f in range(0,len(xVis['frequency'])):
wl = consts.c.value / xVis['frequency'].data[f];
sigma = wl/diam / 2.355;
gain = np.exp( -sep*sep/(2*sigma*sigma) );
srcbeam = JJ * gain
log.debug(src,sep*180./np.pi,l[src]*180./np.pi,m[src]*180./np.pi,n[src],np.array(srcbeam).flatten())
# vis (time, baselines, frequency, polarisation) complex128
uvw = xVis['uvw_lambda'].data[t,:,f,:]
phaser = 0.5*jy[src] * np.exp( 2j*np.pi * (uvw[:,0]*l[src] + uvw[:,1]*m[src] + uvw[:,2]*n[src]) )
assert all(xVis['polarisation'].data == ['XX', 'XY', 'YX', 'YY']), xVis['polarisation'].data
xVis['vis'].data[t,:,f,0] += phaser * srcbeam[0,0]
xVis['vis'].data[t,:,f,1] += phaser * srcbeam[0,1]
xVis['vis'].data[t,:,f,2] += phaser * srcbeam[1,0]
xVis['vis'].data[t,:,f,3] += phaser * srcbeam[1,1]
Apply calibration factors to the visibilities and add noise
stations = lowcore["stations"]
nstations = stations.shape[0]
stn1 = xVis["antenna1"].data
stn2 = xVis["antenna2"].data
# set up station-based Jones matrices (gains and leakages)
# - will change to gaintable data model
Jsigma = 0.1
# generate a gaintable with a single timeslice (is in sec, so should be > 43200 for a 12 hr observation)
# - could alternatively just use the first time step in the call
# - using Jones type "G" because type "P" is unknown
gt_true = create_gaintable_from_blockvisibility(xVis, timeslice=1e6, jones_type="G")
gt_fit = create_gaintable_from_blockvisibility(xVis, timeslice=1e6, jones_type="G")
# set up references to the data
Jt = gt_true["gain"].data
Jm = gt_fit["gain"].data
for stn in range(0,nstations):
# generate the starting model station gain error matrices
Jm[0,stn,0,:,:] = np.eye(2, dtype=complex)
# generate the true station gain error matrices. Set to model matrices plus some Gaussian distortions
Jt[0,stn,0,:,:] = Jm[0,stn,0,:,:] + Jsigma * ( np.random.randn(2,2) + 1j*np.random.randn(2,2) )
# Make copies of the visibilties and multiply in the new Jones matrices
# - assuming that unknown calibration errors are constant over time and frequency samples (i.e. is a snapshot)
# Make copies of the vis and apply calibration factors and noise
modelVis = xVis.copy(deep=True)
noiselessVis = xVis.copy(deep=True)
for t in range(0,len(xVis['datetime'])):
for f in range(0,len(xVis['frequency'])):
# set up references to the data
modelTmp = modelVis['vis'].data[t,:,f,:]
noiselessTmp = noiselessVis['vis'].data[t,:,f,:]
for k in range(0,nvis):
vis_in = np.reshape(modelTmp[k,:],(2,2))
vis_out = Jm[0,stn1[k],0] @ vis_in @ Jm[0,stn2[k],0].conj().T
modelTmp[k,:] = np.reshape(np.array(vis_out),(4))
vis_in = np.reshape(noiselessTmp[k,:],(2,2))
vis_out = Jt[0,stn1[k],0] @ vis_in @ Jt[0,stn2[k],0].conj().T
noiselessTmp[k,:] = np.reshape(np.array(vis_out),(4))
# Add noise to a visibility
# - will change to RASCIL addnoise_visibility, but for now just add gaussian noise so there is more control over SNR
observedVis = noiselessVis.copy(deep=True)
sigma = 0.01
shape = observedVis['vis'].shape
assert len(shape) == 4, "require 4 dimensions for blockvisibilty"
observedVis['vis'].data += sigma * ( np.random.randn(shape[0],shape[1],shape[2],shape[3]) +
np.random.randn(shape[0],shape[1],shape[2],shape[3]) * 1j )
if sigma > 0: observedVis['weight'].data *= np.ones(shape) / (sigma * sigma)
And then the jones_solvers package is used to solve for the Jones matrices:
gt1 = gt_fit.copy(deep=True)
modelVis1 = modelVis.copy(deep=True)
chisq1 = solve_jones(observedVis, modelVis1, gt1, testvis=noiselessVis, algorithm=1)
gt2 = gt_fit.copy(deep=True)
modelVis2 = modelVis.copy(deep=True)
chisq2 = solve_jones(observedVis, modelVis2, gt2, testvis=noiselessVis, accum_opt=2)
ax = plt.subplot(111)
ax.set_yscale('log')
plt.plot( chisq1, label="RTS" )
plt.plot( chisq2, label="Yanda" )
plt.xlabel("iteration")
plt.ylabel("chisq error")
plt.legend(loc=1, fontsize=11, frameon=False)
plt.grid()
plt.show()