Source code for rascil.workflows.rsexecute.execution_support.rsexecute

""" Wrap dask such that with the same code Dask.delayed can be replaced by immediate calculation

"""

__all__ = ["rsexecute", "get_dask_client", "_rsexecutebase"]

import logging
import os
import time

import dask_memusage
from dask import config, delayed, optimize
from dask.distributed import Client, LocalCluster, wait
from tabulate import tabulate

log = logging.getLogger("rascil-logger")

# Support daliuge's delayed function, make it fail if not available but used
try:
    from dlg import delayed as dlg_delayed
    from dlg.dask_emulation import compute as dlg_compute
except ImportError:

    def dlg_delayed(*args, **kwargs):
        raise Exception("daliuge is not available")

    def dlg_compute(*args, **kwargs):
        pass


log = logging.getLogger("rascil-logger")


[docs] def get_dask_client( timeout=30, n_workers=None, threads_per_worker=None, processes=True, create_cluster=False, memory_limit=None, local_dir=".", with_file=False, scheduler_file="./scheduler.json", dashboard_address=":8787", ): """Get a Dask.distributed Client to be used in rsexecute The default operation of rsexecute.set_client is to create a set of workes on one node. Hence if you want to use a cluster it is necessary to use get_dask_client. The environment variable RASCIL_DASK_SCHEDULER is interpreted as pointing to the Dask distributed scheduler. and a client using that scheduler is returned. Otherwise a client for a LocalCluster is created. The environment variable RASCIL_DASK_SCHEDULER_FILE is interpreted as pointing to the Dask scheduler file and a client using that scheduler is returned. If RASCIL_DASK_SCHEDULER_FILE is set, with_file option is set to true and scheduler_file name is overridden with the RASCIL_DASK_SCHEDULER_FILE :param timeout: Time out for creation (30s) :param n_workers: Number of workers (cores available) :param threads_per_worker: 1 :param processes: Use processes instead of threads (True) :param create_cluster: Create a LocalCluster (True) :param memory_limit: Memory limit per worker (bytes e.g. 8e9) (None) :param scheduler_file: Scheduler file for Dask ('./scheduler.json') :param dashboard_address: Port used for diagnostics (':8787') :return: Dask client """ scheduler = os.getenv("RASCIL_DASK_SCHEDULER", None) if os.getenv("RASCIL_DASK_SCHEDULER_FILE", None) is not None: scheduler_file = os.getenv("RASCIL_DASK_SCHEDULER_FILE") with_file = True if scheduler is not None: print("Creating Dask Client using externally defined scheduler") c = Client(scheduler, timeout=timeout) elif with_file: print( "Creating Dask Client using externally defined scheduler in file %s" % scheduler_file ) c = Client(scheduler_file=scheduler_file, timeout=timeout) elif create_cluster: print("Creating Dask Localcluster - xarray feaures may not work correctly") if n_workers is not None: if memory_limit is not None: cluster = LocalCluster( n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes, memory_limit=memory_limit, dashboard_address=dashboard_address, ) else: cluster = LocalCluster( n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes, dashboard_address=dashboard_address, ) else: if memory_limit is not None: cluster = LocalCluster( threads_per_worker=threads_per_worker, processes=processes, memory_limit=memory_limit, dashboard_address=dashboard_address, ) else: cluster = LocalCluster( threads_per_worker=threads_per_worker, processes=processes, dashboard_address=dashboard_address, ) print("Creating LocalCluster and Dask Client") c = Client(cluster) else: print("Creating Dask.distributed Client") c = Client( threads_per_worker=threads_per_worker, processes=processes, memory_limit=memory_limit, local_directory=local_dir, ) addr = c.scheduler_info()["address"] services = c.scheduler_info()["services"] if "bokeh" in services.keys(): bokeh_addr = "http:%s:%s" % (addr.split(":")[1], services["bokeh"]) print("Diagnostic pages available on port %s" % bokeh_addr) if "dashboard" in services.keys(): db_addr = "http:%s:%s" % (addr.split(":")[1], services["dashboard"]) print("Diagnostic pages available on port %s" % db_addr) return c
[docs] class _rsexecutebase: """Initialise rsexecute framework A singleton of this class is created and is available globally as rsexecute. Hence it is not necessary to declare an instance of _rsexecutebase. For example:: from rascil.workflows import continuum_imaging_list_rsexecute_workflow, rsexecute rsexecute.set_client(use_dask=True, memory_limit=32 * 1024 * 1024 * 1024, n_workers=8, local_dir=dask_dir, verbose=True) continuum_imaging_list = continuum_imaging_list_rsexecute_workflow(vis_list, model_imagelist=model_list, context='wstack', vis_slices=51, scales=[0, 3, 10], algorithm='mmclean', nmoment=3, niter=1000, fractional_threshold=0.1, threshold=0.1, nmajor=5, gain=0.25, psf_support=64) deconvolved_list, residual_list, restored_list = rsexecute.compute(continuum_imaging_list, sync=True) :param use_dask: Use dask (True) :param use_dlg: Use daluige (False) :param verbose: Be verbose in printing messages :param optimize: Optimize if using dask (True) """ _instance = None def __init__(self, use_dask=True, use_dlg=False, verbose=False, optimize=True): """Initialise rsexecute framework A singleton of this class is created and is available globally as rsexecute :param use_dask: Use dask (True) :param use_dlg: Use daluige (False) :param verbose: Be verbose in printing messages :param optimize: Optimize if using dask (True) """ if bool(use_dask) and bool(use_dlg): raise ValueError("use_dask and use_dlg cannot be specified together") self._set_state(use_dask, use_dlg, None, verbose, optimize) # Initialize astropy to avoid threading problems if use_dask: import astropy.units as u import erfa # direct dependency of astropy assert erfa.s2c(0 * u.rad, 0 * u.rad)[0] == 1.0 import astropy.constants assert astropy.constants.c.unit == "m/s" from astropy.coordinates import SkyCoord assert isinstance( SkyCoord(0.0 * u.rad, 0.0 * u.rad, frame="icrs").to_string(), str ) assert ( SkyCoord(0.0 * u.rad, 0.0 * u.rad, frame="icrs").skyoffset_frame().name == "skyoffseticrs" ) def _set_state(self, use_dask, use_dlg, client, verbose, optimize): self._using_dask = use_dask self._using_dlg = use_dlg self._client = client self._verbose = verbose self._optimize = optimize
[docs] def execute(self, func, *args, **kwargs): """Wrap for immediate or deferred execution Passes through if dask is not being used :param args: :param kwargs: :return: delayed func or func """ if self._using_dask: return delayed(func, *args, **kwargs) elif self._using_dlg: return dlg_delayed(func, *args, **kwargs) else: return func
[docs] def type(self): """Get the name of the execution system :return: """ if self._using_dask: return "dask" elif self._using_dlg: return "daliuge" else: return "function"
[docs] def set_client( self, client=None, use_dask=True, use_dlg=False, verbose=False, optim=True, **kwargs ): """Set the Dask/DALiuGE client to be used If you want to customise the Client or use an externally defined Scheduler use get_dask_client and pass it in. :param use_dask: Use Dask? :param client: If None and use_dask is True, a client will be created otherwise the client is None :param use_dlg: Use Daliuge to execute graphs? :param verbose: Be verbose in output :param optim: Use dask.optimize via rsexecute.optimize function. :return: """ # We need this so that xarray knows which scheduler to use if use_dask: config.set(scheduler="distributed") if bool(use_dask) and bool(use_dlg): raise ValueError("use_dask and use_dlg cannot be specified together") if isinstance(self._client, Client): if self._verbose: print("Removing existing client") self.client.close() if use_dask: client = client or get_dask_client(**kwargs) self._set_state(True, False, client, verbose, optim) self._client.profile() self._client.get_task_stream() self.start_time = time.time() elif use_dlg: self._set_state(False, True, client, verbose, optim) else: self._set_state(False, False, None, verbose, optim) if self._verbose: print("rsexecute.set_client: defined Dask distributed client")
[docs] def compute(self, value, sync=False): """Get the actual value If not using dask then this returns the value directly since it already is computed If using dask and sync=True then this waits and resturns the actual wait. If using dask and sync=False then this returns a future, on which you will need to call .result() :param value: :param sync: Return synchronously? (False) :return: """ if self._using_dask: start = time.time() if self.client is None: return value.compute() else: import dask try: scheduler = dask.config.get("scheduler") assert ( scheduler == "distributed" or scheduler == "dask.distributed" ), scheduler except: pass future = self.client.compute(value, sync=sync) wait(future) if self._verbose: duration = time.time() - start log.debug( "rsexecute.compute: Execution using Dask took %.3f seconds" % duration ) print( "rsexecute.compute: Execution using Dask took %.3f seconds" % duration ) return future elif self._using_dlg: kwargs = {"client": self._client} if self._client else {} return dlg_compute(value, **kwargs) else: return value
[docs] def persist(self, graph, **kwargs): """Persist graph data on workers The graphs are placed on the workers but not computed No-op if not using_dask :param graph: :return: """ if self.using_dask and self.client is not None: return self.client.persist(graph, **kwargs) else: return graph
[docs] def scatter(self, graph, **kwargs): """Scatter graph data to workers The data are placed on the workers No-op if not using_dask :param graph: :return: """ if self.using_dask and self.client is not None: return self.client.scatter(graph, **kwargs) else: return graph
[docs] def gather(self, graph): """Gather graph from workers The data are gathered from the workers No-op if not using_dask :param graph: :return: """ if self.using_dask and self.client is not None: return self.client.gather(graph) else: return graph
[docs] def run(self, func, *args, **kwargs): """Run a function on the client :param func: :return: """ if self.using_dask: if self.client is not None: return self.client.run(func, *args, **kwargs) else: return func(*args, **kwargs) else: return func(*args, **kwargs)
[docs] def optimize(self, *args, **kwargs): """Run Dask optimisation of graphs Only does something when using dask :param args: for Dask.optimize :param kwargs: for Dask.optimize :return: """ if self.using_dask and self._optimize: return optimize(*args, **kwargs)[0] else: return args[0]
[docs] def close(self): """Close the client""" if self._using_dask and isinstance(self._client, Client): if self._verbose: print("rsexcute.close: closed down Dask Client") if self._client.cluster is not None: self._client.cluster.close() self._client.close() self._client = None
[docs] def init_statistics(self): """Initialise the profile and task stream info rsexecute can save the Dask profile and Task Stream information for later saving :return: """ self.start_time = time.time() if self._using_dask: self._client.profile() self._client.get_task_stream()
[docs] def save_statistics(self, name="dask"): """Save the statistics to html files rsexecute can save the Dask profile and Task Stream information for later saving. This saves the current statistics to html files. :param name: prefix to name e.g. dask """ if self._using_dask: task_stream, graph = self.client.get_task_stream( plot="save", filename="%s_task_stream.html" % name ) self.client.profile(plot="save", filename="%s_profile.html" % name) def print_ts(ts): log.info("Processor time used in each function") summary = {} number = {} dask_info = dict() for t in ts: name = t["key"].split("-")[0] elapsed = t["startstops"][0]["stop"] - t["startstops"][0]["start"] if name not in summary.keys(): summary[name] = elapsed number[name] = 1 else: summary[name] += elapsed number[name] += 1 total = 0.0 for key in summary.keys(): total += summary[key] table = [] headers = ["Function", "Time (s)", "Percent", "Number calls"] for key in summary.keys(): percent = 100.0 * summary[key] / total table.append( [ key, "{0:.3f}".format(summary[key]), "{0:.3f}".format(percent), number[key], ] ) dask_info[key] = { "time": summary[key], "fraction": percent, "number_calls": number[key], } log.info("\n" + tabulate(table, headers=headers)) duration = time.time() - self.start_time speedup = total / duration log.info( "Total processor time {0:.3f} (s), total wallclock time {1:.3f} (s), speedup {2:.2f}".format( total, duration, speedup ) ) dask_info["summary"] = { "total": total, "duration": duration, "speedup": speedup, } return dask_info try: return print_ts(task_stream) except (ValueError, KeyError): log.warning("Dask task stream is unintelligible") return dict()
[docs] def memusage(self, memusage_file="memusage.csv"): """Install the dask-memusage plugin https://github.com/itamarst/dask-memusage/blob/master/dask_memusage.py Note that there can only be one dask thread per process. This only works for the process scheduler. For the distributed scheduler, preload the plugin. For example: dask-scheduler --port=8786 --preload dask_memusage --memusage-csv ./memusage.csv :param memusage_file: Name of mem-usage file produced by dask-memusage plugin :return: """ if ( self._client.cluster is not None and self._client.cluster.scheduler is not None ): dask_memusage.install(self._client.cluster.scheduler, memusage_file)
@property def client(self): """Client being used :return: client """ return self._client @property def using_dask(self): """Is dask being used? :return: """ return self._using_dask @property def using_dlg(self): """Is daluige being used? :return: """ return self._using_dlg @property def optimizing(self): """Is Dask optimisation being performed? :return: """ return self._optimize
def rsexecutebase(*args, **kwargs): if _rsexecutebase._instance is None: _rsexecutebase._instance = _rsexecutebase(*args, **kwargs) return _rsexecutebase._instance # Any new rsexecute created by import of this file points to the only _rsexecutebase rsexecute = rsexecutebase(use_dask=True)