rsexecute
rsexecute workflows can be used in two modes
delayed using Dask.delayed
serially executed immediately on definition,
Distribution is acheived by working on lists of data models, such as lists of BlockVisibilities.
For example:
from rascil.workflows import continuum_imaging_list_rsexecute_workflow, rsexecute
rsexecute.set_client(use_dask=True, threads_per_worker=1,
memory_limit=32 * 1024 * 1024 * 1024, n_workers=8,
local_dir=dask_dir, verbose=True)
continuum_imaging_list = continuum_imaging_list_rsexecute_workflow(vis_list,
model_imagelist=model_list,
context='wstack', vis_slices=51,
scales=[0, 3, 10], algorithm='mmclean',
nmoment=3, niter=1000,
fractional_threshold=0.1, threshold=0.1,
nmajor=5, gain=0.25,
psf_support=64)
deconvolved_list, residual_list, restored_list = rsexecute.compute(continuum_imaging_list,
sync=True)
The call to continuum_imaging_list_rsexecute_workflow does not execute immediately just generates a Dask.delayed object that can be computed subsequently. The higher level functions such as continuum_imaging_list_rsexecute_workflow are built from lower level functions such as invert_list_rsexecute_workflow.
In this example, changing use_dask to False will cause the definitions to be executed immediately.
The rsexecute framework relies upon a singleton object called rsexecute. This is documented below as the class _rsexecutebase.
rascil.workflows.rsexecute.calibration Package
Workflows for calibration
Functions
|
Create a set of components for (optionally global) calibration of a list of visibilities |
rascil.workflows.rsexecute.image Package
Workflows for operating on images
Functions
|
Gather a set of images in frequency, using a tree reduction or directly |
|
Apply a function across an image: scattering to subimages, applying the function, and then gathering |
|
Sum a set of images, using a tree reduction |
rascil.workflows.rsexecute.imaging Package
Functions
Create a graph for deconvolution by channels, adding to the model |
|
|
Create a graph for deconvolution, adding to the model |
|
Sum results from invert, iterating over the scattered image and vis_list |
|
Predict, iterating over both the scattered vis_list and image |
|
Create a graph to calculate (list or graph) of residual images |
|
Create a graph to calculate the restored image |
|
Create a graph to calculate the restored image |
|
Initialise vis to zero |
|
Sum a set of invert results with appropriate weighting |
|
Sum a set of predict results |
|
Taper to desired size |
|
Find actual threshold for list of results |
|
Weight the visibility data |
|
Creates a new vis_list and initialises all to zero |
rascil.workflows.rsexecute.pipelines Package
Functions
Create graph for the continuum imaging pipeline. |
|
|
Create graph for ICAL pipeline using SkyModel |
Create graph for spectral line imaging pipeline |
rascil.workflows.rsexecute.simulation Package
Functions
|
Create a graph to apply gain errors to a vis_list |
Create gaintable for atmospheric errors |
|
Create gaintable for polarisation effects |
|
Create gaintable for pointing errors |
|
Create gaintable for polarisation effects |
|
Create the standard LOW simulation |
|
Create the standard MID simulation |
|
Create gaintable for surface errors :param band: B1, B2 or Ku :param sub_bvis_list: List of vis (or graph) :param sub_components: List of components (or graph) :param vp_directory: Location of voltage patterns :param elevation_sampling: Sampling in elevation (degrees) :return: (list of error-free gaintables, list of error gaintables) or graph |
|
Create gaintable for nominal voltage pattern |
|
|
A component to simulate an observation |
rascil.workflows.rsexecute.skymodel Package
Functions
Deconvolve using a skymodel |
|
Calibrate and invert from a skymodel, iterating over the skymodel |
|
Predict from a list of skymodels |
|
Create a graph to calculate the restored skymodel at the centre channel |
|
Create a graph to calculate the restored image |
rascil.workflows.rsexecute.execution_support Package
Functions
|
Get a Dask.distributed Client to be used in rsexecute |
Classes
The rsexecute framework relies upon a singleton object called rsexecute. This is documented below as the class _rsexecutebase. Note that by design it is not possible to create more than one _rsexecutebase object.
- class _rsexecutebase(use_dask=True, use_dlg=False, verbose=False, optimize=True)[source]
Initialise rsexecute framework
A singleton of this class is created and is available globally as rsexecute. Hence it is not necessary to declare an instance of _rsexecutebase.
For example:
from rascil.workflows import continuum_imaging_list_rsexecute_workflow, rsexecute rsexecute.set_client(use_dask=True, memory_limit=32 * 1024 * 1024 * 1024, n_workers=8, local_dir=dask_dir, verbose=True) continuum_imaging_list = continuum_imaging_list_rsexecute_workflow(vis_list, model_imagelist=model_list, context='wstack', vis_slices=51, scales=[0, 3, 10], algorithm='mmclean', nmoment=3, niter=1000, fractional_threshold=0.1, threshold=0.1, nmajor=5, gain=0.25, psf_support=64) deconvolved_list, residual_list, restored_list = rsexecute.compute(continuum_imaging_list, sync=True)
- Parameters:
use_dask – Use dask (True)
use_dlg – Use daluige (False)
verbose – Be verbose in printing messages
optimize – Optimize if using dask (True)
- execute(func, *args, **kwargs)[source]
Wrap for immediate or deferred execution
Passes through if dask is not being used
- Parameters:
args
kwargs
- Returns:
delayed func or func
- set_client(client=None, use_dask=True, use_dlg=False, verbose=False, optim=True, **kwargs)[source]
Set the Dask/DALiuGE client to be used
If you want to customise the Client or use an externally defined Scheduler use get_dask_client and pass it in.
- Parameters:
use_dask – Use Dask?
client – If None and use_dask is True, a client will be created otherwise the client is None
use_dlg – Use Daliuge to execute graphs?
verbose – Be verbose in output
optim – Use dask.optimize via rsexecute.optimize function.
- Returns:
- compute(value, sync=False)[source]
Get the actual value
If not using dask then this returns the value directly since it already is computed. If using dask and sync=True then this waits and returns the actual wait. If using dask and sync=False then this returns a future, on which you will need to call .result()
- Parameters:
value
sync – Return synchronously? (False)
- Returns:
- persist(graph, **kwargs)[source]
Persist graph data on workers
The graphs are placed on the workers but not computed
No-op if not using_dask
- Parameters:
graph
- Returns:
- scatter(graph, **kwargs)[source]
Scatter graph data to workers
The data are placed on the workers
No-op if not using_dask :param graph: :return:
- gather(graph)[source]
Gather graph from workers
The data are gathered from the workers
No-op if not using_dask
- Parameters:
graph
- Returns:
- optimize(*args, **kwargs)[source]
Run Dask optimisation of graphs
Only does something when using dask
- Parameters:
args – for Dask.optimize
kwargs – for Dask.optimize
- Returns:
- init_statistics()[source]
Initialise the profile and task stream info
rsexecute can save the Dask profile and Task Stream information for later saving
- Returns:
- save_statistics(name='dask')[source]
Save the statistics to html files
rsexecute can save the Dask profile and Task Stream information for later saving. This saves the current statistics to html files.
- Parameters:
name – prefix to name e.g. dask
- memusage(memusage_file='memusage.csv')[source]
Install the dask-memusage plugin
https://github.com/itamarst/dask-memusage/blob/master/dask_memusage.py
Note that there can only be one dask thread per process.
This only works for the process scheduler. For the distributed scheduler, preload the plugin. For example:
dask-scheduler –port=8786 –preload dask_memusage –memusage-csv ./memusage.csv
- Parameters:
memusage_file – Name of mem-usage file produced by dask-memusage plugin
- Returns:
- property client
Client being used
- Returns:
client
- property using_dask
Is dask being used?
- Returns:
- property using_dlg
Is daluige being used?
- Returns:
- property optimizing
Is Dask optimisation being performed?
- Returns: