"""Dask execution engine deployment module."""
# pylint: disable=too-many-arguments
# pylint: disable=broad-except
import logging
import sys
import threading
from typing import Any, Callable, Tuple
import distributed
import ska_sdp_config
from ska_sdp_config import Config
from .ee_base_deploy import EEDeploy
LOG = logging.getLogger("ska_sdp_scripting")
[docs]
class DaskDeploy(EEDeploy):
"""
Deploy a Dask execution engine.
The function when called with the arguments should return a Dask graph. The
graph is then executed by calling the compute method:
.. code-block:: python
result = func(*f_args)
result.compute()
This happens in a separate thread so the constructor can return
immediately.
This should not be created directly, use the :func:`Phase.ee_deploy_dask`
method instead.
:param pb_id: processing block ID
:type pb_id: str
:param config: configuration DB client
:type config: ska_sdp_config.Client
:param deploy_name: deployment name
:type deploy_name: str
:param n_workers: number of Dask workers
:type n_workers: int
:param func: function to execute
:type func: function
:param f_args: function arguments
:type f_args: tuple
"""
def __init__(
self,
pb_id: str,
config: Config,
deploy_name: str,
n_workers: int,
func: Callable,
f_args: Tuple[Any],
):
super().__init__(pb_id, config)
thread = threading.Thread(
target=self._deploy,
args=(
deploy_name,
n_workers,
func,
f_args,
),
daemon=True,
)
thread.start()
def _deploy(
self,
deploy_name: str,
n_workers: int,
func: Callable,
f_args: Tuple[Any],
):
"""
Make the deployment and execute the function.
This is called from the thread.
:param deploy_name: deployment name
:param func: function to process
:param f_args: function arguments
:param n_workers: number of dask workers
"""
LOG.info("Deploying Dask...")
self._deploy_id = f"proc-{self._pb_id}-{deploy_name}"
LOG.info(self._deploy_id)
# Set Deployment to RUNNING status in the config_db
self.update_deploy_status("RUNNING")
# Hack for mismatch between formats of dask/distributed package version
# Getting image from config db through the pb kind, name and version
image = None
for txn in self._config.txn():
pb = txn.get_processing_block(self._pb_id)
image = txn.get_script(
pb.script["kind"], pb.script["name"], pb.script["version"]
)
values = {"worker": {"replicas": n_workers}}
if image is not None:
values.update(image)
deploy = ska_sdp_config.Deployment(
self._deploy_id,
"helm",
{"chart": "dask", "values": values},
)
for txn in self._config.txn():
txn.create_deployment(deploy)
LOG.info("Waiting for Dask...")
client = None
for _ in range(200):
try:
client = distributed.Client(
f"{self._deploy_id}-scheduler:8786"
)
except Exception as ex:
LOG.error(ex)
if client is None:
LOG.error("Could not connect to Dask!")
sys.exit(1)
LOG.info("Connected to Dask")
# Computing result
result = func(*f_args)
compute_result = result.compute()
LOG.info("Computed Result %s", compute_result)
# Update Deployment Status
self.update_deploy_status("FINISHED")