Source code for ska_sdp_scripting.phase

"""Phase class module for SDP scripting."""

# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments

import logging
from copy import deepcopy
from typing import Any, Callable, List, Optional, Tuple

from ska_sdp_config import Config, ConfigVanished
from ska_sdp_config.config import Transaction

from .dask_deploy import DaskDeploy
from .ee_base_deploy import EEDeploy
from .fake_deploy import FakeDeploy
from .helm_deploy import HelmDeploy
from .queue_connector import (
    configure_queue_connector,
    tear_down_queue_connector,
    wait_for_queue_connector_state,
)
from .utils import (
    DeploymentStatus,
    ExecutionBlockStatus,
    ProcessingBlockStatus,
)

LOG = logging.getLogger("ska_sdp_scripting")
TIMEOUT_SEC = 60.0


class ScriptPhaseException(Exception):
    """Exception raised by a script phase."""


[docs] class Phase: """ Script phase. This should not be created directly, use the :func:`ProcessingBlock.create_phase()` method instead. :param name: name of the phase :type name: str :param list_requests: list of requests :type list_requests: list :param config: SDP configuration client :type config: ska_sdp_config.Config :param pb_id: processing block ID :type pb_id: str :param eb_id: execution block ID :type eb_id: str :param script_kind: script kind :type script_kind: str :param qc_config: Queue Connector config string :type qc_config: str """ def __init__( self, name: str, list_requests: List, config: Config, pb_id: str, eb_id: str, script_kind: str, qc_config: str, ): self._name = name self._requests = list_requests self._config = config self._pb_id = pb_id self._eb_id = eb_id self._script_kind = script_kind self._deploy_id_list = [] self._deploy = None self._status = None self._deployment_status = None self._qc_config = qc_config self._qc_config_path = None def __enter__(self): """ Wait for resources to be available. While waiting, it checks if the PB is cancelled or finished, and for real-time scripts it checks if the EB is cancelled or finished. """ # Set state to indicate script is waiting for resources for attempt, txn in enumerate(self._config.txn()): self.check_state(txn) LOG.info("Setting status to WAITING (attempt %s)", attempt + 1) state = txn.get_processing_block_state(self._pb_id) state["status"] = ProcessingBlockStatus.WAITING.value txn.update_processing_block_state(self._pb_id, state) # Make sure queue connector is on if self._qc_config: LOG.info("Setting queue connector configuration") self._qc_config_path = configure_queue_connector( txn, self._qc_config, self._pb_id, self._eb_id ) LOG.info("Waiting for resources to be available") resources_available = False for watcher in self._config.watcher(): for attempt, txn in enumerate(watcher.txn()): self.check_state(txn) state = txn.get_processing_block_state(self._pb_id) resources_available = state.get("resources_available") if resources_available: LOG.info( "Resources are available (attempt %s)", attempt + 1 ) self._update_pb_state_upon_start(txn, state, attempt + 1) # Finish watcher loop once we have resources available if resources_available: break if self._qc_config: LOG.info("Checking queue connector state") wait_for_queue_connector_state( self._config, self._qc_config_path, "ON", timeout=TIMEOUT_SEC ) def _update_pb_state_upon_start( self, txn: Transaction, state: dict, attempt: int ) -> None: """ Update processing block state """ # Add deployments key to processing block state if "deployments" not in state: state["deployments"] = {} # Set state to indicate processing has started LOG.info("Setting status to RUNNING (attempt %s)", attempt) state["status"] = ProcessingBlockStatus.RUNNING.value txn.update_processing_block_state(self._pb_id, state)
[docs] def check_state( self, txn: Transaction, check_realtime: bool = True ) -> None: """ Check the state of the processing block. Check if the PB is finished or cancelled, and for real-time scripts check if the EB is finished or cancelled. :param txn: SDP configuration transaction :type txn: ska_sdp_config.Transaction :param check_realtime: Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED) :type check_realtime: bool """ LOG.info("Checking processing block state") pb_state = txn.get_processing_block_state(self._pb_id) if pb_state is None: raise ScriptPhaseException("Processing block was deleted") pb_status = pb_state.get("status") if pb_status in [ ProcessingBlockStatus.FINISHED.value, ProcessingBlockStatus.CANCELLED.value, ]: raise ScriptPhaseException(f"Processing block is {pb_state}") if not txn.is_processing_block_owner(self._pb_id): raise ScriptPhaseException( "Lost ownership of the processing block" ) if self._script_kind == "realtime" and check_realtime: LOG.info("Checking execution block state") eb = txn.get_execution_block(self._eb_id) if eb is None: raise ScriptPhaseException("Execution block was deleted") eb_status = eb.get("status") if eb_status in [ ExecutionBlockStatus.FINISHED.value, ExecutionBlockStatus.CANCELLED.value, ]: raise ScriptPhaseException(f"Execution block is {eb_status}")
[docs] def ee_deploy_test( self, deploy_name: str, func: Callable = None, f_args: List[Any] = None, ) -> EEDeploy: """ Deploy a fake execution engine. This is used for testing and example purposes. :param deploy_name: deployment name :type deploy_name: str :param func: function to execute :type func: function :param f_args: function arguments :type f_args: tuple :return: fake execution engine deployment :rtype: :class:`FakeDeploy` """ self._deploy = FakeDeploy( self._pb_id, self._config, deploy_name, func=func, f_args=f_args, ) return self._deploy
[docs] def ee_deploy_helm( self, deploy_name: str, values: Optional[dict] = None ) -> EEDeploy: """ Deploy a Helm execution engine. This can be used to deploy any Helm chart. :param deploy_name: name of Helm chart :type deploy_name: str :param values: values to pass to Helm chart :type values: dict, optional :return: Helm execution engine deployment :rtype: :class:`HelmDeploy` """ self._deploy = HelmDeploy( self._pb_id, self._config, deploy_name, values ) deploy_id = self._deploy.get_id() self._deploy_id_list.append(deploy_id) return self._deploy
[docs] def ee_deploy_dask( self, name: str, n_workers: int, func: Callable, f_args: Tuple[Any] ) -> EEDeploy: """ Deploy a Dask execution engine. :param name: deployment name :type name: str :param n_workers: number of Dask workers :type n_workers: int :param func: function to execute :type func: function :param f_args: function arguments :type f_args: tuple :return: Dask execution engine deployment :rtype: :class:`DaskDeploy` """ return DaskDeploy( self._pb_id, self._config, name, n_workers, func, f_args )
[docs] def ee_remove(self) -> None: """ Remove execution engines deployments. """ for txn in self._config.txn(): state = txn.get_processing_block_state(self._pb_id) deployments = state.get("deployments") for deploy_id in self._deploy_id_list: if deployments[deploy_id] == DeploymentStatus.FINISHED.value: deployment_list = txn.list_deployments() if deploy_id in deployment_list: self._deploy.remove(deploy_id)
[docs] def is_eb_finished(self, txn: Transaction) -> bool: """ Check if the EB is finished or cancelled. :param txn: config db transaction :type txn: ska_sdp_config.Transaction :rtype: bool """ eb = txn.get_execution_block(self._eb_id) status = eb.get("status") if status in [ ExecutionBlockStatus.FINISHED.value, ExecutionBlockStatus.CANCELLED.value, ]: self._status = status if status == ExecutionBlockStatus.CANCELLED.value: LOG.error("EB is %s", status) if self._deploy_id_list: self._deploy.update_deploy_status(status) return True return False
[docs] def update_pb_state( self, status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET, ) -> None: """ Update processing block state. If the status is UNSET, it is marked as finished. :param status: status :type status: str, optional """ self._status = status.value for txn in self._config.txn(): # Set state to indicate processing has ended state = txn.get_processing_block_state(self._pb_id) if self._status is ProcessingBlockStatus.UNSET.value: state["status"] = ProcessingBlockStatus.FINISHED.value else: LOG.info("Setting PB status to %s", self._status) state["status"] = self._status txn.update_processing_block_state(self._pb_id, state)
[docs] def monitor_deployments( self, txn: Transaction, iteration: int = 0 ) -> None: """ Monitor deployments, update the deployment status in the processing block state based on the deployments' pods' state. Also update the deployments_ready pb state key. At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False. :param txn: Transaction object (config.txn) :param iteration: number of txn iteration """ LOG.info("Checking deployment status (attempt %s)", iteration) pb_state = txn.get_processing_block_state(self._pb_id) pb_state_original = deepcopy(pb_state) if not self._deploy_id_list: pb_state["deployments_ready"] = True else: # collect the states of deployments for conditional testing deployments = pb_state.get("deployments") dpl_states = [] for deploy_id in self._deploy_id_list: dpl_state = self._determine_deploy_state(txn, deploy_id) LOG.info( "State of deployment %s is %s (attempt %s)", deploy_id, dpl_state, iteration, ) dpl_states.append(dpl_state) if dpl_state is not None: deployments[deploy_id] = dpl_state pb_state["deployments"] = deployments if dpl_states and all( dpl_ste == DeploymentStatus.RUNNING.value for dpl_ste in dpl_states ): pb_state["deployments_ready"] = True else: pb_state["deployments_ready"] = False if pb_state != pb_state_original: LOG.info("Updating processing block state (attempt %s)", iteration) txn.update_processing_block_state(self._pb_id, pb_state)
@staticmethod def _determine_deploy_state( txn: Transaction, deploy_id: str ) -> Optional[str]: """ Update pb state with deployments' state depending on the status of their contributing pods. """ current_dpl_state = txn.get_deployment_state(deploy_id) if current_dpl_state is None: return None num_pod = current_dpl_state.get("num_pod", 0) dpl_state_pods = current_dpl_state.get("pods", None) if dpl_state_pods is None: dpl_state = DeploymentStatus.PENDING.value elif len(dpl_state_pods.keys()) < num_pod: # in this case not all pods have been added to the # "pods" key yet, so we're waiting for all to appear dpl_state = DeploymentStatus.PENDING.value else: pod_status = set(dpl_state_pods.values()) if DeploymentStatus.PENDING.value in pod_status: dpl_state = DeploymentStatus.PENDING.value elif DeploymentStatus.FAILED.value in pod_status: dpl_state = DeploymentStatus.FAILED.value elif DeploymentStatus.CANCELLED.value in pod_status: dpl_state = DeploymentStatus.CANCELLED.value elif DeploymentStatus.RUNNING.value in pod_status: dpl_state = DeploymentStatus.RUNNING.value else: dpl_state = DeploymentStatus.FINISHED.value return dpl_state
[docs] def wait_loop( self, func: Callable[[Transaction], bool], time_to_ready: int = 0 ) -> None: """ Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses for realtime scripts. :param func: function to check condition for exiting the watcher loop :param time_to_ready: set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with phase.ee_deploy_test """ condition_met = False for watcher in self._config.watcher(): for attempt, txn in enumerate(watcher.txn()): self.check_state(txn, check_realtime=False) if self._script_kind == "realtime": self.monitor_deployments(txn, attempt) if isinstance(self._deploy, FakeDeploy): self._deploy.set_deployments_ready( time_to_ready=time_to_ready ) condition_met = func(txn) if condition_met: break
def __exit__(self, exc_type: str, exc_val: str, exc_tb: str) -> None: """ For real-time scripts, this checks if the EB is marked as finished or cancelled. For both kinds of script, it updates the processing block state. """ # Tear down queue connector first if self._qc_config: try: tear_down_queue_connector(self._config, self._qc_config_path) except ConfigVanished: # The config does not exist pass if self._script_kind == "realtime": # Clean up deployment. LOG.info("Clean up deployments") if self._deploy_id_list: self.ee_remove() self.update_pb_state() LOG.info("Deployments All Done")