"""Functions used to simulate RFI. Developed as part of SP-122/SIM.
The scenario is:
* There is a TV station at a remote location (e.g. Perth), emitting a
broadband signal (7MHz) of known power (50kW).
* The emission from the TV station arrives at LOW stations with phase
delay and attenuation. Neither of these are well known but they are probably static.
* The RFI enters LOW stations in a side-lobe of the main beam. Calculations by Fred
Dulwich indicate that this provides attenuation of about 55 - 60dB for a source close
to the horizon.
* The RFI enters each LOW station with fixed delay and zero fringe rate (assuming no
e.g. ionospheric ducting)
* In tracking a source on the sky, the signal from one station is delayed and
fringe-rotated to stop the fringes for one direction on the sky.
* The fringe rotation stops the fringe from a source at the phase tracking centre but
phase rotates the RFI, which now becomes time-variable.
* The correlation data are time- and frequency-averaged over a timescale appropriate
for the station field of view. This averaging de-correlates the RFI signal.
* We want to study the effects of this RFI on statistics of the images: on source
and at the pole.
"""
__all__ = [
"calculate_averaged_correlation",
"simulate_rfi_block_prop",
"calculate_station_correlation_rfi",
]
import copy
import logging
import astropy.units as u
import numpy
from astropy.coordinates import SkyCoord
from astropy.time import Time
from ska_sdp_datamodels.calibration import create_pointingtable_from_visibility
from ska_sdp_datamodels.science_data_model.polarisation_model import PolarisationFrame
from ska_sdp_datamodels.sky_model.sky_model import SkyComponent
from ska_sdp_func_python.calibration import apply_gaintable
from ska_sdp_func_python.util import average_chunks2, azel_to_hadec, skycoord_to_lmn
from ska_sdp_func_python.util.coordinate_support import (
simulate_point_antenna,
xyz_to_uvw,
)
from ska_sdp_func_python.util.geometry import calculate_azel
from ska_sdp_func_python.visibility.visibility_geometry import (
calculate_visibility_hourangles,
)
from rascil import phyconst
from rascil.processing_components.imaging.primary_beams import create_mid_allsky
from rascil.processing_components.simulation.pointing import (
simulate_gaintable_from_pointingtable,
)
log = logging.getLogger("rascil-logger")
[docs]
def calculate_station_correlation_rfi(rfi_at_station, baselines):
"""Form the correlation from the rfi at the station
:param rfi_at_station: [btimes, nchan, nants, nants]
:param baselines: Visibility baselines object
:return: correlation(ntimes, nbaselines, nchan] in Jy
"""
ntimes, nants, nchan = rfi_at_station.shape
correlationt = numpy.zeros([ntimes, nchan, nants, nants], dtype="complex")
correlationb = numpy.zeros([ntimes, nchan, len(baselines)], dtype="complex")
rfit = numpy.transpose(rfi_at_station, axes=[0, 2, 1])
rfitc = numpy.conjugate(rfit)
for itime in range(ntimes):
for chan in range(nchan):
correlationt[itime, chan, ...] = numpy.outer(
rfit[itime, chan, :], rfitc[itime, chan, :]
)
# reshape station axes to baseline for xarray
for ibaseline, (a1, a2) in enumerate(baselines.data):
correlationb[itime, chan, ibaseline, ...] = correlationt[
itime, chan, a1, a2
]
correlation = numpy.transpose(correlationb, axes=[0, 2, 1])
return correlation[..., numpy.newaxis] * 1e26
[docs]
def calculate_averaged_correlation(correlation, time_width, channel_width):
"""Average the correlation in time and frequency
:param correlation: Correlation(ntimes, nant, nants, nchan]
:param channel_width: Number of channels to average
:param time_width: Number of integrations to average
:return:
"""
wts = numpy.ones(correlation.shape, dtype="float")
return average_chunks2(correlation, wts, (time_width, channel_width))[0]
def match_frequencies(rfi_signal, rfi_frequencies, bvis_freq_channels, bvis_bandwidth):
"""
The RFI signal data needs to provide information to all of the frequency
channels of the visibility (bvis), which is updated with the RFI signal.
This function compares the channels for the RFI data and the channels of the bvis,
and adds 0s where there is no RFI signal in a bvis channel, and adds the signal
where there is.
:param rfi_signal: RFI signal, numpy array [ntimes x nantennas x nchannels]
:param rfi_frequencies: numpy array of the frequency channels of the RFI signal
(length == nchannels)
:param bvis_freq_channels: numpy array of the frequency channels in the visibility
:param bvis_bandwidth: numpy array of bandwidths of each freq. channel in the
block vis.
"""
index = []
for i, chan in enumerate(bvis_freq_channels):
ind_left = numpy.where(rfi_frequencies >= chan - bvis_bandwidth[i] / 2.0)
ind_right = numpy.where(rfi_frequencies <= chan + bvis_bandwidth[i] / 2.0)
index.append(numpy.intersect1d(ind_left, ind_right))
# append one line of zeros to the channel column of rfi_signal for each channel
# in frequency_channels that is empty in rfi_frequencies
to_append = numpy.zeros((rfi_signal.shape[0], rfi_signal.shape[1], 1))
new_array = to_append.copy()
for ind in index:
if ind.size == 0: # empty array --> need to add zeros at this index
new_array = numpy.append(new_array, to_append, axis=2)
elif ind.size > 1:
new_array = numpy.append(
new_array,
numpy.median(rfi_signal[:, :, ind], axis=2, keepdims=True),
axis=2,
)
else:
new_array = numpy.append(new_array, rfi_signal[:, :, ind], axis=2)
# there is an extra row at the beginning of the frequency column,
# which we need to cut off to get the right size
return new_array[:, :, 1:]
def apply_beam_gain_for_low(
isotropic_emitter_power,
beam_gain_for_low,
rfi_source,
):
"""
Apply the beam gain to the apparent emitter power, to determine
the received RFI signal at SKA LOW station(s).
:param isotropic_emitter_power: apparent emitter power level as received by an
isotropic antenna
[ntimes x nantennas x nchannels]
:param beam_gain_for_low: beam gain data / information
if None, beam_gain = 1.0 is used
if provided, it is either a single value,
or a numpy array with dimensions [nsources x nstations x nchannels]
:param rfi_source: integer indicating where in the beam array the correct data
for the source can be found
"""
if beam_gain_for_low is None:
log.warning(
"Beam gain wasn't provided for Low calculations. Not applying beam gain."
)
beam_gain_for_low = 1.0
if isinstance(beam_gain_for_low, numpy.ndarray):
# get the right values for the current source
beam_gain = beam_gain_for_low[rfi_source, ...]
else:
beam_gain = beam_gain_for_low
# apply the beam gain to the input apparent emitter power
rfi_at_station = isotropic_emitter_power.copy() * numpy.sqrt(
beam_gain
) # shape of rfi_at_station = [ntimes, nants, n_rfi_channels]
return rfi_at_station
def apply_beam_gain_for_mid(
bvis,
sub_vis,
voltage_pattern,
azimuth,
elevation,
phase_centre,
):
"""
Apply the beam gain to the apparent emitter power, to determine
the received RFI signal at SKA MID antenna(s).
:param bvis: original visibility
:param sub_vis: a copy and subset of the visibility, already containing the
RFI signal
:param voltage_pattern: voltage pattern object for SKA Mid
:param azimuth: azimuth of the RFI source at a given time, in degrees
:param elevation: elevation of the RFI source at a given time, in degrees
:param phase_centre: astropy.coordinates.sky_coordinate.SkyCoord object
"""
flux = numpy.zeros((bvis.vis.shape[2], bvis.vis.shape[3])) # [nchan, npol]
pointing_table = create_pointingtable_from_visibility(sub_vis)
# there is always one time index in pt --> the first index is always 0
# Axes are [time, ant, frequency, receptor, angle]. All receptors and
# frequencies have the same pointing.
utc_time = Time(
[numpy.average(pointing_table["time"]) / 86400.0], format="mjd", scale="utc"
)
az_centre, el_centre = calculate_azel(
sub_vis.configuration.location, utc_time, sub_vis.phasecentre
)
az_centre = az_centre.to("rad").value
el_centre = el_centre.to("rad").value
# The nominal pointing is the phasecentre in az, el. This is the same value
# calculated by simulate_gaintable_from_pointingtable
pointing_table["nominal"][0, :, :, :, 0] = az_centre[
:, numpy.newaxis, numpy.newaxis
]
pointing_table["nominal"][0, :, :, :, 1] = el_centre[
:, numpy.newaxis, numpy.newaxis
]
# The pointing is the interferer in az el as seen from each antenna
pointing_table["pointing"][0, :, :, :, 0] = numpy.deg2rad(
azimuth[:, numpy.newaxis, numpy.newaxis]
)
pointing_table["pointing"][0, :, :, :, 1] = numpy.deg2rad(
elevation[:, numpy.newaxis, numpy.newaxis]
)
# Pointing tables define pointing to be relative to the nominal
pointing_table["pointing"] -= pointing_table["nominal"]
pointing_table["pointing"][0, :, :, :, 0] *= numpy.cos(
pointing_table["nominal"][0, :, :, :, 1]
)
# Update the location of the emitter
emitter_comp = SkyComponent(
direction=phase_centre,
flux=flux,
frequency=bvis.frequency.data,
polarisation_frame=PolarisationFrame(bvis._polarisation_frame),
)
# Now calculate and apply the gains
gt = simulate_gaintable_from_pointingtable(
vis=sub_vis,
pt=pointing_table,
sc=[emitter_comp],
vp=voltage_pattern,
elevation_limit=0.0,
jones_type="B",
)
# apply the beam gain --> it updates subvis in place
apply_gaintable(sub_vis, gt[0], inverse=False, use_flags=False)
def _get_uvw_per_station(xyz, ha, dec):
"""arrays"""
uvw = [xyz_to_uvw(xyz[i], ha[i], dec[i]) for i in range(len(ha))]
return numpy.array(uvw)
[docs]
def simulate_rfi_block_prop(
bvis,
apparent_emitter_power,
apparent_emitter_coordinates,
rfi_sources,
rfi_frequencies,
low_beam_gain=None,
apply_primary_beam=True,
):
"""Simulate RFI in a BlockVisility
:param bvis: input Visibility, to be updated with RFI
:param apparent_emitter_power: RFI emitter power as received by an
isotropic SKA antenna
[nrfi_sources x ntimes x nantennas x nchannels]
:param apparent_emitter_coordinates: azimuth, elevation, distance
information of RFI emitters
[nrfi_sources x ntimes x nantennas x 3]
:param rfi_sources: RFI source names or IDs
:param rfi_frequencies: frequency channels where there is RFI information
length = nchannels
:param low_beam_gain: beam gain data / information for Low.
If provided, it is either a single value,
or a numpy array with dimensions
[nrfi_sources x nstations x nchannels];
for Mid, use None
:param apply_primary_beam: Apply the primary beam, not used for Low
:return: Visibility
"""
mid_or_low = bvis.configuration.name
if "MID" not in mid_or_low and "LOW" not in mid_or_low:
raise ValueError(
"Telescope configuration is neither for SKA Mid nor for SKA Low."
"Please specify correct configuration."
)
# temporary copy to calculate contribution for each RFI source
bvis_data_copy = copy.copy(bvis["vis"].data)
ntimes, nbaselines, nchan, npol = bvis.vis.shape
if apply_primary_beam and "MID" in mid_or_low:
vp = create_mid_allsky(bvis.frequency)
for i, source in enumerate(
rfi_sources
): # this will tell us the index of the source in the data
if "LOW" in mid_or_low:
# Apply beam gain for Low RFI data
rfi_at_station = apply_beam_gain_for_low(
apparent_emitter_power[i], low_beam_gain, i
)
rfi_at_station_all_chans = match_frequencies(
rfi_at_station,
rfi_frequencies,
bvis.frequency.values,
bvis.channel_bandwidth.values,
)
else:
rfi_at_station_all_chans = match_frequencies(
apparent_emitter_power[i],
rfi_frequencies,
bvis.frequency.values,
bvis.channel_bandwidth.values,
)
# Calculate the RFI correlation using the fringe rotation and the
# RFI at the station
# [ntimes, nants, nants, nchan, npol]
bvis_data_copy[...] = calculate_station_correlation_rfi(
rfi_at_station_all_chans, baselines=bvis.baselines
)
k = bvis.frequency.data / phyconst.c_m_s
# We know where the emitter is.
site = bvis.configuration.location
latitude = site.lat.rad
# apparent_emitter_coordinates [nsource x ntimes x nantennas x [az,el,dist]
# azimuth is index [:, :, :, 0]; elevation is index [:, :, :, 1]
az = apparent_emitter_coordinates[i, :, :, 0]
el = apparent_emitter_coordinates[i, :, :, 1]
ha_emitter, dec_emitter = azel_to_hadec(
numpy.deg2rad(az), numpy.deg2rad(el), latitude
)
# Now step through the time stamps, calculating the effective
# sky position for the emitter, and performing phase rotation
# appropriately
hourangles = calculate_visibility_hourangles(bvis)
final_bvis = bvis.copy(deep=True, zero=True)
for iha, (time, subvis) in enumerate(final_bvis.groupby("time", squeeze=False)):
ha_phase_ctr = hourangles[iha]
# calculate station-based arrays
ant_uvw = _get_uvw_per_station(
bvis.configuration.xyz.data,
ha_emitter[iha, :] + ha_phase_ctr.rad,
dec_emitter[iha, :],
)
ant_ra = -ha_emitter[iha, :] + ha_phase_ctr.rad
ant_dec = dec_emitter[iha, :]
emitter_sky = SkyCoord(ant_ra * u.rad, ant_dec * u.rad)
l, m, n = skycoord_to_lmn(emitter_sky, bvis.phasecentre)
phasor = numpy.ones([nbaselines, nchan, npol], dtype="complex")
for j, (station1, station2) in enumerate(subvis.baselines.values):
for chan in range(nchan):
phasor[j, chan, :] = (
simulate_point_antenna(
k[chan] * ant_uvw[station1], l[station1], m[station1]
)
* numpy.conjugate(
simulate_point_antenna(
k[chan] * ant_uvw[station2],
l[station2],
m[station2],
)
)
)[..., numpy.newaxis]
# Now fill this into the Visibility
subvis["vis"].data[0, ...] = bvis_data_copy[iha, ...] * phasor
if apply_primary_beam and "MID" in mid_or_low:
# Apply beam gain for Mid
apply_beam_gain_for_mid(
bvis,
subvis,
vp,
az[iha],
el[iha],
bvis.phasecentre,
)
# Now accumulate over all sources
bvis["vis"].data += final_bvis["vis"].data
return bvis