Source code for ska_sdp_exec_swiftly.dask_wrapper

"""
The dask wrapper.

We wrap the Dask delayed implementation in a dask
wrapper where if use_dask is set to True, the dask wrapper
function will call the dask.delayed option for the computation.
"""

import functools
import logging

import dask
from distributed import Client

log = logging.getLogger("fourier-logger")


[docs] def dask_wrapper(func): """ The Dask wrapper function If arg use_dask = True and arg nout is provided, then function is run with dask.delayed. Else, it is executed in serial as normal. """ @functools.wraps(func) # preserves information about the original function def wrapper(*args, **kwargs): try: use_dask = kwargs["use_dask"] nout = kwargs["nout"] except KeyError: use_dask = False nout = None if use_dask: result = dask.delayed(func, nout=nout)(*args, **kwargs) else: result = func(*args, **kwargs) return result return wrapper
[docs] def set_up_dask(scheduler_address=None): """ Set up the Dask Client :param scheduler_address: IP_address:PORT of scheduler if None, a local cluster is created with machine resources :return: Dask client """ client = Client(scheduler_address) log.info(client.dashboard_link) return client
[docs] def tear_down_dask(client): """ Close the Dask Client """ client.shutdown()