Python API
Processors
- class realtime.calibration.processors.rcal_processor.RCalProcessor(beam_id: str, calibrator_to_use: ~realtime.calibration.calibrators.base_calibrator.BaseCalibrator = <realtime.calibration.calibrators.rcal_calibrator.RCalCalibrator object>, output_h5: str | None = None, kafka_server: str | None = None, kafka_topic: str | None = None, qa_kafka_topic: str | None = None, qa_metrics_kafka_topic: str | None = None, ignore_config_db: bool = False, everybeam_ms: str | None = None, sky_model: str | None = None, fov: float | None = 5, flux_limit: float | None = 0.5, uv_min: float | None = 150, autoflag: bool = True, rfi_masks: ~numpy.ndarray[~typing.Any, ~numpy.dtype[~numpy._typing._array_like._ScalarType_co]] | None = None, preproc_ave_time: int | None = None, preproc_ave_frequency: int | None = None, flip_uvw: bool = False, scale_gains: bool = False, scale_factor: float | None = None, low_gain_threshold: float | None = 0.1, threshold_headroom: bool = False, gain_headroom: float | None = 0.1, station_data_key_path: str = 'stations')
A processor that can present Visibility data to the RCal library. Child of BaseProcessor.
Gain scaling
Before sending calibration solutions to the data queues for application in CBF, they need to be scaled to have real and imaginary values in the range [-1, 1). This is enabled using option scale_gains. If possible, a single or slowly varying scaling factor will be used to do this, to simplify its correction down stream in the formed beams. Most of the variation in time is expected to be included in the beam models, and so the following approach is used:
Before multiplication with the beam models, calculate the median bandpass gain amplitude. This may need to be more sophisticated if there is significant variation of the amplitudes with frequency.
Set a low gain amplitude threshold as a fraction of the median amplitude level (using option low_gain_threshold).
Before finalising the low gain amplitude threshold, option threshold_headroom can be used to reduce it to a less variable number. Until there is a better option, the highest factor of 2 below the median amplitude is used.
After setting the low gain amplitude threshold, option gain_headroom is used to reduce it further to account for station beam variations. Gains may drop by an order of magnitude at low elevations. Gains will also drop for tied-array beams that are far from station beam centre.
The inverse of this final threshold value is used to scale the Jones matrices. After matrix inversion, any matrix with an element outside the range (-1, 1) is flagged.
Attributes
- beam_id: str
The visibility beam to process. Visibility datasets for other beams will be ignored.
- calibrator_to_use: BaseCalibrator
Calibration support class to use. Default = RCalCalibrator()
- kafka_server: Optional[str]
Bootstrap server for RCal Kafka producer. Default = None (i.e. no Kafka output)
- kafka_topic: Optional[str]
Kafka topic for RCal Kafka producer. Default = None (i.e. no Kafka output)
- sky_model: Optional[str]
Define the sky model. Options are:
sky_model=<my_file.csv>: A CSV file containing a sky component list that is readable by the ska_sdp_datamodels.global_sky_model.LocalSkyModel class, as generated by the GSM service.
sky_model=None: A spectrally-flat point source will be placed at the phase centre.
Default = None
- fov: Optional[float]
Field of view (in degrees) used in the sky model cone search. Default = 5.0,
- flux_limit: Optional[float]
Minimum flux density (in Jy) used in the sky model cone search. Default = 0.5,
- uv_min: Optional[float]
Minimum baseline length (in wavelengths) to use in calibration. Default = 150.0,
- autoflag: bool
Whether or not to run RFI peak finding. Default is True. See
pre_process()for details.- rfi_masks: Optional[ndarray]
Flagging limits for known bad frequencies. Default is None. See
pre_process()for details.- preproc_ave_time: Optional[int]
Pre-average visibility dataset in time by this number of samples. See
pre_process()for defaults.- preproc_ave_frequency: Optional[int]
Pre-average visibility dataset in frequency by this number of channels. See
pre_process()for defaults.- output_h5: Optional[str]
Optional output h5parm file for the bandpass Jones matrices. If this option is set, additional files will be written for the combined matrices (J_bandpass @ J_beam) with “_combined” added before the file-type suffix, and for the final inverted and scaled matrices, with “_final” added before the file-type suffix. In all cases, the scan and beam IDs will also be added before the file-type suffix. Default = None
- flip_uvw: bool
Change the sign of the uvw coordinates. This is a temporary switch to deal with an inconsistency in receiver options. Default = False
- scale_gains: bool
Whether or not to scale output Jones matrices to the range [-1, 1). A single scale factor is used for all solution matrices. Default = False
- scale_factor: Optional[float]
User-defined scale factor. This will override all other scaling settings. It should be used with care, as inappropriate scaling can result in over flagging or insufficient bit depth in CBF beamforming. Default = None
- low_gain_threshold: Optional[float]
The fraction of the median bandpass gain amplitude below which to flag matrices. This is also used as a starting point for the scale factor, since its inverse will be the approximate maximum value in the inverse matrices. See also threshold_headroom. Default = 0.1.
- threshold_headroom: bool
If using low_gain_threshold, decrease the median gain amplitude to the next lowest power of 2 before setting the threshold. This will move the threshold level lower, but should also limit variation from one RCAL cycle to the next.
- gain_headroom: Optional[float]
How much gain variation to build into the scaling, expressed as a fraction of maximum gain. Default = 0.1.
- station_data_key_path: str
For SKA Low, path in the Telescope Model structure under which documents describing individual stations can be found. Telescope Model keys to each station are assumed to be of the form
<station_data_key_path>/<station-geolabel>.yaml(e.g.stations/s8-1.yaml).
- param BaseProcessor:
processor base class
- __init__(beam_id: str, calibrator_to_use: ~realtime.calibration.calibrators.base_calibrator.BaseCalibrator = <realtime.calibration.calibrators.rcal_calibrator.RCalCalibrator object>, output_h5: str | None = None, kafka_server: str | None = None, kafka_topic: str | None = None, qa_kafka_topic: str | None = None, qa_metrics_kafka_topic: str | None = None, ignore_config_db: bool = False, everybeam_ms: str | None = None, sky_model: str | None = None, fov: float | None = 5, flux_limit: float | None = 0.5, uv_min: float | None = 150, autoflag: bool = True, rfi_masks: ~numpy.ndarray[~typing.Any, ~numpy.dtype[~numpy._typing._array_like._ScalarType_co]] | None = None, preproc_ave_time: int | None = None, preproc_ave_frequency: int | None = None, flip_uvw: bool = False, scale_gains: bool = False, scale_factor: float | None = None, low_gain_threshold: float | None = 0.1, threshold_headroom: bool = False, gain_headroom: float | None = 0.1, station_data_key_path: str = 'stations')
Create a new BaseProcessor.
- async close()
Flush and close any remaining resources.
DataProducts are gracefully finalized transitioned to
COMPLETEDstatus where possible.If
cancel()was invoked, pending work is cancelled and pending DataProducts transition toCANCELLED.
- static create(argv: Iterable[str]) RCalProcessor
Create an instance of this class from the given command line parameters. This allows user-provided classes to have their own command line parsing logic, and receive arbitrary user-provided parameters.
- Parameters:
argv – A list of command line parameters.
- Returns:
A new instance of this class.
- async end_scan(scan_id: int) Visibility | Iterable[Visibility] | None
Called when a scan has ended. The default implementation ignores this event, but subclasses might want to react to this.
- Parameters:
scan_id – the ID of the scan that has ended.
- Returns:
Visibility to pass to the next processor, or None to skip subsequent processors.
- async process(_dataset: Visibility) bool
Process the given visibility dataset.
- Parameters:
dataset – the visibility to process
- Returns:
Visibility or iterable collection of Visibility objects to pass to the next processor, or None to skip subsequent processors.
- async start_scan(scan_id: int) None
Called when a new scan has started. The default implementation ignores this event, but subclasses might want to react to this.
- Parameters:
scan_id – the ID of the scan that has started.
Processor helper functions
- realtime.calibration.processors.utilities.create_h5parm(gaintable: GainTable, filename: str, squeeze: bool = False) File
Set up a H5Parm HDF5 file to receive one or more gaintable datasets.
The time dimension of tables in the new file is left empty. Function update_h5parm can be used to resize the tables and add gains as new time samples are processed.
The file is flushed before returning.
- Parameters:
gaintable – GainTable
filename – Name of H5Parm file
squeeze – If True, remove axes of length one from dataset
- Output:
the new h5py file object
- realtime.calibration.processors.utilities.pre_process(vis: Dataset, timestep: int | None = None, freqstep: int | None = None, uv_min: float | None = 100, autoflag: bool | None = True, rfi_masks: ndarray[Any, dtype[float]] | None = None) Dataset
Pre-process a visibility dataset.
Pre-processing options:
Average in time
Average in frequency
Flag baselines below a minimum uv length cutoff
Flag based on outlier detection
Flag frequencies ranges
- Parameters:
vis – Visibility data to be processed
timestep – Pre-average visibility dataset in time by this number of samples. For default values, see
set_pre_process_averaging().freqstep – Pre-average visibility dataset in frequency by this number of channels. For default values, see
set_pre_process_averaging().uv_min – Minimum baseline length (in wavelengths) to use in calibration. Default = 100
autoflag – Whether or not to run an adaptive flagger. Currently the ska-sdp-func-python preprocessing rfi_flagger is used with nominal settings.
rfi_masks – Flagging limits for known bad frequencies. Should be a 2D array of [start,end] frequencies in Hz, for example: rfi_masks = np.array([[137.4e6, 137.6e6], [230e6, 280e6]])
- Returns:
Pre-processed the Visibility dataset
- realtime.calibration.processors.utilities.set_beamformer_frequencies(gain_table: GainTable, array=None)
Generate a list of CBF beamformer frequencies
The main version of this function is in ska_sdp_func_python.calibration, however it is likely to undergo multiple changes while RCAL is developed. Use a local copy during this development and update func-python at the end.
- SKA-Low beamformer:
Need Jones matrices for 384, 304 or 152 channels, per antenna, per beam
- Station channels/beams are centred on integer multiples of 781.25 kHz
781.25 kHz = 400 MHz / 512 channels
Station channels run from 50 MHz (64*df) to 350 MHz (448*df)
CBF beamformer calibration is done at the station channel resolution
Will assume that no padding is needed if input band < beamformer band
- MID beamformer
Need Jones matrices for 4096 channels, per antenna, per beam
Timing beam bandwidth : 200 MHz (channel width : 48.8281250 kHz?)
Search beam bandwidth : 300 MHz (channel width : 73.2421875 kHz?)
Set first beamformer channel centre frequency to first input channel
- Parameters:
gain_table – GainTable
array – optional argument to explicitly set the array. Should be “LOW” or “MID”. By default the gaintable configuration name will be used to set the array automatically.
- Returns:
np array of shape [nfreq,]
- realtime.calibration.processors.utilities.update_h5parm(gaintable: GainTable, file: File)
Update a H5Parm file with the contents of a gaintable.
Extend a H5Parm file along the time dimension by the shape of gaintable and insert the gains. The file is flushed before returning.
- Parameters:
gaintable – GainTable
filename – Name of H5Parm file
squeeze – If True, remove axes of length one from dataset
- realtime.calibration.processors.utilities.update_metadata(vis: Dataset, flip_uvw: bool | None = False) Dataset
Update visibility metadata.
Various bits of metadata need updating. So of these updates are temporary and will be eventually be retired, while others are more permanent. Updates include:
Option to reverse the sign of the uvw coordinates
Change internal uvw sample order (and sign) if baselines have upper-triangle order
Change time samples from end-of-integration to centre-of-integration
- Parameters:
vis – Visibility data to be processed
flip_uvw – Temporary switch to deal with an inconsistency in receiver options. If true, the sign of the uvw data is inverted. Default is False.
- Returns:
Pre-processed the Visibility dataset
Calibrators
- class realtime.calibration.calibrators.base_calibrator.BaseCalibrator(config=NotImplemented)
A base class to inherit from when implementing a calibrator
- __init__(config=NotImplemented)
_summary_
- __weakref__
list of weak references to the object (if defined)
- apply_flags_to_solutions() None
Set Jones matrices with zero weight to zero.
CBF expects that flagged calibration solutions are set to zero. Make sure that is the case.
- calculate_chisq(vis: Visibility)
Calculate the chisq for the visibility model and Jones matrices.
Note that the visibility weights may not be set to the inverse variance of the visibility noise, in which case the expected value will not be 1, but rather a function of the noise variance.
- Parameters:
vis – Visibility data to be calibrated
- Returns:
Chisq estimate
- abstract calibrate(vis: Visibility, beams=None)
Update calibration solutions in gain_table.
- delete_skymodel()
Return the sky model to the unset state.
- flag_low_gain_solutions(low_gain_threshold: float) None
Flag low-gain stations.
Flag any Jones matrix with either either X or Y gain less than low_gain_threshold multiplied by the median gain.
This might be better done in RCalCalibrator.calibrate, where the flagging can help to improve calibration.
- Parameters:
low_gain_threshold – Proportion of median gain below which to flag.
- get_calibration() GainTable
Return the calibration gain_table.
If it has been resampled, the resampled version is returned.
- Returns:
GainTable
- initialise_gaintable(vis, keep_gains=True)
Initialise the gaintable.
Initialise the gaintable before calibration. This is done separately so that it can be skipped if cumulative solving is required.
- Parameters:
vis – Visibility data to be calibrated
keep_gains – If true and the gain table is already initialised with matching metadata, the existing gains will be retained. The metadata that must match are the antenna and frequency axes. Default is True.
- resample(array=None)
Resample the calibration solutions.
The gain_table is re-channelised to have spectral sampling that is appropriate for CBF beamforming. The results are written into a new GainTable dataset.
- Parameters:
array – Array type for re-channelisation (“MID” or “LOW”). Default = None
- reset_skymodel()
Whether or not to reset (or initialise) the sky model.
- class realtime.calibration.calibrators.rcal_calibrator.RCalCalibrator(config=NotImplemented)
Realtime calibration support class.
Child of BaseCalibrator.
- calibrate(vis, beams: GenericBeams | None = None)
Update calibration solutions in gain_table.
Generate bandpass calibration solutions using ska_sdp_func_python function solve_gaintable.
- Parameters:
vis – Visibility data to calibrate
beams – Optional GenericBeams object. If supplied, the central beam response will be divided out during initialisation
- update_table(gain_table: GainTable)
Update the main bandpass table with a new one.
Any channels that are entirely flagged will be flagged in the main table and the gain values will be unchanged.
Any channels that contain non-finite gains (NaNs or Infs) will be flagged in the main table and the gain values will be unchanged.
- Parameters:
gain_table – new GainTable
Sky and visibility models
Functions for generating beam models
- class realtime.calibration.sky_model.beams.GenericBeams(vis: Dataset, array: str | None = None, direction: SkyCoord | None = None, ms_path: str | None = None)
A generic class for beam handling.
Generic interface to Everybeam beam models.
Currently for Mid, all beam values are set to 2x2 identity matrices.
Parameters
- vis: xr.Dataset
Dataset containing required metadata.
- array: str
array type (e.g. “low”). If unset or None, will check Vis model for an obvious match.
- direction: SkyCoord
Beam direction. By default the vis phase centre will be used.
- ms_path: str
Location of measurement set for everybeam (e.g. OSKAR_MOCK.ms).
- __init__(vis: Dataset, array: str | None = None, direction: SkyCoord | None = None, ms_path: str | None = None)
- __weakref__
list of weak references to the object (if defined)
- array_response(direction: SkyCoord, frequency: ndarray[Any, dtype[float]], time: Time | None = None) ndarray[Any, dtype[complex]]
Return the response of each antenna or station in a given direction
- Parameters:
direction – Direction of desired response
frequency – 1D array of frequencies
time – obstime
- Returns:
array of beam matrices [nant, nfreq, 2, 2]
- init_models(vis: Dataset, array: str | None = None, ms_path: str | None = None)
Initialise the beam models.
- Parameters:
vis – Visibility data to be calibrated
array – array type. If unset or None, will check Vis model for an obvious match.
- update_beam(frequency: ndarray[Any, dtype[float]], time: Time)
Update the ITRF coordinates of the beam and normalisation factors.
- Parameters:
frequency – 1D array of frequencies
time – obstime
- update_beam_direction(direction: SkyCoord)
Return the response of each antenna or station in a given direction.
- Parameters:
direction – Pointing direction of the beams
Functions for generating the local sky model.
- realtime.calibration.sky_model.lsm.convert_model_to_skycomponents(model: list[Component], freq: ndarray[Any, dtype[float]]) list[SkyComponent]
Convert the LocalSkyModel to a list of SkyComponents.
All sources are unpolarised and specified in the linear polarisation frame using XX = YY = Stokes I/2.
Function
deconvolve_gaussian()is used to deconvolve the MWA synthesised beam from catalogue shape parameters of each component. Components with non-zero widths after this process are stored with shape = “GAUSSIAN”. Otherwise shape = “POINT”.- Parameters:
model – Component list
freq – Frequency list in Hz
freq0 – Reference Frequency for flux scaling in Hz. Default is 200e6. Note: freq0 should really be part of the sky model
- Returns:
SkyComponent list
- realtime.calibration.sky_model.lsm.deconvolve_gaussian(comp: Component) tuple[float]
Deconvolve MWA synthesised beam from Gaussian shape parameters.
This follows the approach of the analysisutilities function deconvolveGaussian in the askap-analysis repository, written by Matthew Whiting. This is based on the approach described in Wild (1970), AuJPh 23, 113.
- Parameters:
comp –
Componentdata for a source- Returns:
Tuple of deconvolved parameters (same units as data in comp)
- realtime.calibration.sky_model.lsm.generate_lsm_from_csv(csvfile: str, phasecentre: SkyCoord, fov: float = 5.0, flux_limit: float = 0.0) list[Component]
Generate a local sky model using a LocalSkyModel csv file.
Form a list of Component objects.
All components are Gaussians, although many will typically default to point sources. See data class
Componentfor more information.- Parameters:
csvfile – CSV filename.
phasecentre – astropy SkyCoord for the centre of the sky model.
fov – Field of view in degrees. Default is 5.
flux_limit – minimum flux density in Jy. Default is 0.
- Returns:
Component list
- realtime.calibration.sky_model.lsm.generate_skycomponents(vis: Visibility, sky_model: str | None = None, fov: float = 5.0, flux_limit: float = 0.0) List[SkyComponent]
Initialise the local sky model.
Component based local sky model for the current dataset. For a large number of components or for significant diffuse emission, the full SkyModel class could be used instead.
- Parameters:
vis – Visibility dataset
sky_model –
Sky model source. Options are:
sky_model=<my_file.csv>: A CSV file containing a sky component list readable by the ska_sdp_datamodels.global_sky_model.LocalSkyModel class, as generated by the GSM service.
sky_model=None: A spectrally-flat point source will be placed at the phase centre.
Default = None
fov – Field of view (in degrees) used in the sky model cone search. Default = 5.0
flux_limit – Minimum flux density (in Jy) used in the sky model cone search. Default = 0.0
- Returns:
SkyComponent List
Functions for generating/predicting model visibilities
- realtime.calibration.sky_model.predict.predict_model_vis(vis: Visibility, skycomponents: List[SkyComponent], beams: GenericBeams | None = None, decorrelate: bool = True) Visibility
Predict model visibilities.
- Parameters:
vis – Visibility dataset to be modelled
skycomponents – SkyComponent List containing the local sky model
beams – Optional GenericBeams object. Defaults to no beams.
decorrelate – Whether to taper by bandwidth and time-average smearing factors
- Returns:
Visibility model