rsexecute

rsexecute workflows can be used in two modes

  • delayed using Dask.delayed

  • serially executed immediately on definition,

Distribution is acheived by working on lists of data models, such as lists of BlockVisibilities.

For example:

from rascil.workflows import continuum_imaging_list_rsexecute_workflow, rsexecute
rsexecute.set_client(use_dask=True, threads_per_worker=1,
    memory_limit=32 * 1024 * 1024 * 1024, n_workers=8,
    local_dir=dask_dir, verbose=True)
continuum_imaging_list = continuum_imaging_list_rsexecute_workflow(vis_list,
    model_imagelist=model_list,
    context='wstack', vis_slices=51,
    scales=[0, 3, 10], algorithm='mmclean',
    nmoment=3, niter=1000,
    fractional_threshold=0.1, threshold=0.1,
    nmajor=5, gain=0.25,
    psf_support=64)

deconvolved_list, residual_list, restored_list = rsexecute.compute(continuum_imaging_list,
    sync=True)

The call to continuum_imaging_list_rsexecute_workflow does not execute immediately just generates a Dask.delayed object that can be computed subsequently. The higher level functions such as continuum_imaging_list_rsexecute_workflow are built from lower level functions such as invert_list_rsexecute_workflow.

In this example, changing use_dask to False will cause the definitions to be executed immediately.

The rsexecute framework relies upon a singleton object called rsexecute. This is documented below as the class _rsexecutebase.

rascil.workflows.rsexecute.calibration Package

Workflows for calibration

Functions

calibrate_list_rsexecute_workflow(vis_list, ...)

Create a set of components for (optionally global) calibration of a list of visibilities

rascil.workflows.rsexecute.image Package

Workflows for operating on images

Functions

image_gather_channels_rsexecute(image_list)

Gather a set of images in frequency, using a tree reduction or directly

image_rsexecute_map_workflow(im, imfunction)

Apply a function across an image: scattering to subimages, applying the function, and then gathering

sum_images_rsexecute(image_list[, split])

Sum a set of images, using a tree reduction

rascil.workflows.rsexecute.imaging Package

Functions

deconvolve_list_channel_rsexecute_workflow(...)

Create a graph for deconvolution by channels, adding to the model

deconvolve_list_rsexecute_workflow(...[, ...])

Create a graph for deconvolution, adding to the model

invert_list_rsexecute_workflow(vis_list, ...)

Sum results from invert, iterating over the scattered image and vis_list

predict_list_rsexecute_workflow(vis_list, ...)

Predict, iterating over both the scattered vis_list and image

residual_list_rsexecute_workflow(vis, ...[, ...])

Create a graph to calculate (list or graph) of residual images

restore_centre_rsexecute_workflow(...[, ...])

Create a graph to calculate the restored image

restore_list_rsexecute_workflow(...[, ...])

Create a graph to calculate the restored image

subtract_list_rsexecute_workflow(vis_list, ...)

Initialise vis to zero

sum_invert_results_rsexecute(image_list)

Sum a set of invert results with appropriate weighting

sum_predict_results_rsexecute(bvis_list[, split])

Sum a set of predict results

taper_list_rsexecute_workflow(vis_list, ...)

Taper to desired size

threshold_list_rsexecute(imagelist[, prefix])

Find actual threshold for list of results

weight_list_rsexecute_workflow(vis_list, ...)

Weight the visibility data

zero_list_rsexecute_workflow(vis_list[, copy])

Creates a new vis_list and initialises all to zero

rascil.workflows.rsexecute.pipelines Package

Functions

continuum_imaging_skymodel_list_rsexecute_workflow(...)

Create graph for the continuum imaging pipeline.

ical_skymodel_list_rsexecute_workflow(...[, ...])

Create graph for ICAL pipeline using SkyModel

spectral_line_imaging_skymodel_list_rsexecute_workflow(...)

Create graph for spectral line imaging pipeline

rascil.workflows.rsexecute.simulation Package

Functions

corrupt_list_rsexecute_workflow(vis_list[, ...])

Create a graph to apply gain errors to a vis_list

create_atmospheric_errors_gaintable_rsexecute_workflow(...)

Create gaintable for atmospheric errors

create_heterogeneous_gaintable_rsexecute_workflow(...)

Create gaintable for polarisation effects

create_pointing_errors_gaintable_rsexecute_workflow(...)

Create gaintable for pointing errors

create_polarisation_gaintable_rsexecute_workflow(...)

Create gaintable for polarisation effects

create_standard_low_simulation_rsexecute_workflow(...)

Create the standard LOW simulation

create_standard_mid_simulation_rsexecute_workflow(...)

Create the standard MID simulation

create_surface_errors_gaintable_rsexecute_workflow(...)

Create gaintable for surface errors :param band: B1, B2 or Ku :param sub_bvis_list: List of vis (or graph) :param sub_components: List of components (or graph) :param vp_directory: Location of voltage patterns :param elevation_sampling: Sampling in elevation (degrees) :return: (list of error-free gaintables, list of error gaintables) or graph

create_voltage_pattern_gaintable_rsexecute_workflow(...)

Create gaintable for nominal voltage pattern

simulate_list_rsexecute_workflow([config, ...])

A component to simulate an observation

rascil.workflows.rsexecute.skymodel Package

Functions

deconvolve_skymodel_list_rsexecute_workflow(...)

Deconvolve using a skymodel

invert_skymodel_list_rsexecute_workflow(...)

Calibrate and invert from a skymodel, iterating over the skymodel

predict_skymodel_list_rsexecute_workflow(...)

Predict from a list of skymodels

restore_centre_skymodel_list_rsexecute_workflow(...)

Create a graph to calculate the restored skymodel at the centre channel

restore_skymodel_list_rsexecute_workflow(...)

Create a graph to calculate the restored image

rascil.workflows.rsexecute.execution_support Package

Functions

get_dask_client([timeout, n_workers, ...])

Get a Dask.distributed Client to be used in rsexecute

Classes

The rsexecute framework relies upon a singleton object called rsexecute. This is documented below as the class _rsexecutebase. Note that by design it is not possible to create more than one _rsexecutebase object.

class _rsexecutebase(use_dask=True, use_dlg=False, verbose=False, optimize=True)[source]

Initialise rsexecute framework

A singleton of this class is created and is available globally as rsexecute. Hence it is not necessary to declare an instance of _rsexecutebase.

For example:

from rascil.workflows import continuum_imaging_list_rsexecute_workflow, rsexecute
rsexecute.set_client(use_dask=True,
    memory_limit=32 * 1024 * 1024 * 1024, n_workers=8,
    local_dir=dask_dir, verbose=True)
continuum_imaging_list = continuum_imaging_list_rsexecute_workflow(vis_list,
    model_imagelist=model_list,
    context='wstack', vis_slices=51,
    scales=[0, 3, 10], algorithm='mmclean',
    nmoment=3, niter=1000,
    fractional_threshold=0.1, threshold=0.1,
    nmajor=5, gain=0.25,
    psf_support=64)

deconvolved_list, residual_list, restored_list = rsexecute.compute(continuum_imaging_list,
    sync=True)
Parameters:
  • use_dask – Use dask (True)

  • use_dlg – Use daluige (False)

  • verbose – Be verbose in printing messages

  • optimize – Optimize if using dask (True)

execute(func, *args, **kwargs)[source]

Wrap for immediate or deferred execution

Passes through if dask is not being used

Parameters:
  • args

  • kwargs

Returns:

delayed func or func

type()[source]

Get the name of the execution system

Returns:

set_client(client=None, use_dask=True, use_dlg=False, verbose=False, optim=True, **kwargs)[source]

Set the Dask/DALiuGE client to be used

If you want to customise the Client or use an externally defined Scheduler use get_dask_client and pass it in.

Parameters:
  • use_dask – Use Dask?

  • client – If None and use_dask is True, a client will be created otherwise the client is None

  • use_dlg – Use Daliuge to execute graphs?

  • verbose – Be verbose in output

  • optim – Use dask.optimize via rsexecute.optimize function.

Returns:

compute(value, sync=False)[source]

Get the actual value

If not using dask then this returns the value directly since it already is computed If using dask and sync=True then this waits and resturns the actual wait. If using dask and sync=False then this returns a future, on which you will need to call .result()

Parameters:
  • value

  • sync – Return synchronously? (False)

Returns:

persist(graph, **kwargs)[source]

Persist graph data on workers

The graphs are placed on the workers but not computed

No-op if not using_dask

Parameters:

graph

Returns:

scatter(graph, **kwargs)[source]

Scatter graph data to workers

The data are placed on the workers

No-op if not using_dask :param graph: :return:

gather(graph)[source]

Gather graph from workers

The data are gathered from the workers

No-op if not using_dask

Parameters:

graph

Returns:

run(func, *args, **kwargs)[source]

Run a function on the client

Parameters:

func

Returns:

optimize(*args, **kwargs)[source]

Run Dask optimisation of graphs

Only does something when using dask

Parameters:
  • args – for Dask.optimize

  • kwargs – for Dask.optimize

Returns:

close()[source]

Close the client

init_statistics()[source]

Initialise the profile and task stream info

rsexecute can save the Dask profile and Task Stream information for later saving

Returns:

save_statistics(name='dask')[source]

Save the statistics to html files

rsexecute can save the Dask profile and Task Stream information for later saving. This saves the current statistics to html files.

Parameters:

name – prefix to name e.g. dask

memusage(memusage_file='memusage.csv')[source]

Install the dask-memusage plugin

https://github.com/itamarst/dask-memusage/blob/master/dask_memusage.py

Note that there can only be one dask thread per process.

This only works for the process scheduler. For the distributed scheduler, preload the plugin. For example:

dask-scheduler –port=8786 –preload dask_memusage –memusage-csv ./memusage.csv

Parameters:

memusage_file – Name of mem-usage file produced by dask-memusage plugin

Returns:

property client

Client being used

Returns:

client

property using_dask

Is dask being used?

Returns:

property using_dlg

Is daluige being used?

Returns:

property optimizing

Is Dask optimisation being performed?

Returns: