"""Phase class module for SDP scripting."""
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments
import logging
from copy import deepcopy
from typing import Any, Callable, List, Optional, Tuple
from ska_sdp_config import Config, ConfigVanished
from ska_sdp_config.config import Transaction
from .dask_deploy import DaskDeploy
from .ee_base_deploy import EEDeploy
from .fake_deploy import FakeDeploy
from .helm_deploy import HelmDeploy
from .queue_connector import (
configure_queue_connector,
tear_down_queue_connector,
wait_for_queue_connector_state,
)
from .utils import (
DeploymentStatus,
ExecutionBlockStatus,
ProcessingBlockStatus,
)
LOG = logging.getLogger("ska_sdp_scripting")
TIMEOUT_SEC = 60.0
class ScriptPhaseException(Exception):
"""Exception raised by a script phase."""
[docs]
class Phase:
"""
Script phase.
This should not be created directly, use the
:func:`ProcessingBlock.create_phase()` method instead.
:param name: name of the phase
:type name: str
:param list_requests: list of requests
:type list_requests: list
:param config: SDP configuration client
:type config: ska_sdp_config.Config
:param pb_id: processing block ID
:type pb_id: str
:param eb_id: execution block ID
:type eb_id: str
:param script_kind: script kind
:type script_kind: str
:param qc_config: Queue Connector config string
:type qc_config: str
"""
def __init__(
self,
name: str,
list_requests: List,
config: Config,
pb_id: str,
eb_id: str,
script_kind: str,
qc_config: str,
):
self._name = name
self._requests = list_requests
self._config = config
self._pb_id = pb_id
self._eb_id = eb_id
self._script_kind = script_kind
self._deploy_id_list = []
self._deploy = None
self._status = None
self._deployment_status = None
self._qc_config = qc_config
self._qc_config_path = None
def __enter__(self):
"""
Wait for resources to be available.
While waiting, it checks if the PB is cancelled or finished, and for
real-time scripts it checks if the EB is cancelled or finished.
"""
# Set state to indicate script is waiting for resources
for attempt, txn in enumerate(self._config.txn()):
self.check_state(txn)
LOG.info("Setting status to WAITING (attempt %s)", attempt + 1)
state = txn.get_processing_block_state(self._pb_id)
state["status"] = ProcessingBlockStatus.WAITING.value
txn.update_processing_block_state(self._pb_id, state)
# Make sure queue connector is on
if self._qc_config:
LOG.info("Setting queue connector configuration")
self._qc_config_path = configure_queue_connector(
txn, self._qc_config, self._pb_id, self._eb_id
)
LOG.info("Waiting for resources to be available")
resources_available = False
for watcher in self._config.watcher():
for attempt, txn in enumerate(watcher.txn()):
self.check_state(txn)
state = txn.get_processing_block_state(self._pb_id)
resources_available = state.get("resources_available")
if resources_available:
LOG.info(
"Resources are available (attempt %s)", attempt + 1
)
self._update_pb_state_upon_start(txn, state, attempt + 1)
# Finish watcher loop once we have resources available
if resources_available:
break
if self._qc_config:
LOG.info("Checking queue connector state")
wait_for_queue_connector_state(
self._config, self._qc_config_path, "ON", timeout=TIMEOUT_SEC
)
def _update_pb_state_upon_start(
self, txn: Transaction, state: dict, attempt: int
) -> None:
"""
Update processing block state
"""
# Add deployments key to processing block state
if "deployments" not in state:
state["deployments"] = {}
# Set state to indicate processing has started
LOG.info("Setting status to RUNNING (attempt %s)", attempt)
state["status"] = ProcessingBlockStatus.RUNNING.value
txn.update_processing_block_state(self._pb_id, state)
[docs]
def check_state(
self, txn: Transaction, check_realtime: bool = True
) -> None:
"""
Check the state of the processing block.
Check if the PB is finished or cancelled, and for real-time scripts
check if the EB is finished or cancelled.
:param txn: SDP configuration transaction
:type txn: ska_sdp_config.Transaction
:param check_realtime: Whether to check execution block state
if the processing block is realtime (i.e. cancel
processing script for FINISHED/CANCELLED)
:type check_realtime: bool
"""
LOG.info("Checking processing block state")
pb_state = txn.get_processing_block_state(self._pb_id)
if pb_state is None:
raise ScriptPhaseException("Processing block was deleted")
pb_status = pb_state.get("status")
if pb_status in [
ProcessingBlockStatus.FINISHED.value,
ProcessingBlockStatus.CANCELLED.value,
]:
raise ScriptPhaseException(f"Processing block is {pb_state}")
if not txn.is_processing_block_owner(self._pb_id):
raise ScriptPhaseException(
"Lost ownership of the processing block"
)
if self._script_kind == "realtime" and check_realtime:
LOG.info("Checking execution block state")
eb = txn.get_execution_block(self._eb_id)
if eb is None:
raise ScriptPhaseException("Execution block was deleted")
eb_status = eb.get("status")
if eb_status in [
ExecutionBlockStatus.FINISHED.value,
ExecutionBlockStatus.CANCELLED.value,
]:
raise ScriptPhaseException(f"Execution block is {eb_status}")
[docs]
def ee_deploy_test(
self,
deploy_name: str,
func: Callable = None,
f_args: List[Any] = None,
) -> EEDeploy:
"""
Deploy a fake execution engine.
This is used for testing and example purposes.
:param deploy_name: deployment name
:type deploy_name: str
:param func: function to execute
:type func: function
:param f_args: function arguments
:type f_args: tuple
:return: fake execution engine deployment
:rtype: :class:`FakeDeploy`
"""
self._deploy = FakeDeploy(
self._pb_id,
self._config,
deploy_name,
func=func,
f_args=f_args,
)
return self._deploy
[docs]
def ee_deploy_helm(
self, deploy_name: str, values: Optional[dict] = None
) -> EEDeploy:
"""
Deploy a Helm execution engine.
This can be used to deploy any Helm chart.
:param deploy_name: name of Helm chart
:type deploy_name: str
:param values: values to pass to Helm chart
:type values: dict, optional
:return: Helm execution engine deployment
:rtype: :class:`HelmDeploy`
"""
self._deploy = HelmDeploy(
self._pb_id, self._config, deploy_name, values
)
deploy_id = self._deploy.get_id()
self._deploy_id_list.append(deploy_id)
return self._deploy
[docs]
def ee_deploy_dask(
self, name: str, n_workers: int, func: Callable, f_args: Tuple[Any]
) -> EEDeploy:
"""
Deploy a Dask execution engine.
:param name: deployment name
:type name: str
:param n_workers: number of Dask workers
:type n_workers: int
:param func: function to execute
:type func: function
:param f_args: function arguments
:type f_args: tuple
:return: Dask execution engine deployment
:rtype: :class:`DaskDeploy`
"""
return DaskDeploy(
self._pb_id, self._config, name, n_workers, func, f_args
)
[docs]
def ee_remove(self) -> None:
"""
Remove execution engines deployments.
"""
for txn in self._config.txn():
state = txn.get_processing_block_state(self._pb_id)
deployments = state.get("deployments")
for deploy_id in self._deploy_id_list:
if deployments[deploy_id] == DeploymentStatus.FINISHED.value:
deployment_list = txn.list_deployments()
if deploy_id in deployment_list:
self._deploy.remove(deploy_id)
[docs]
def is_eb_finished(self, txn: Transaction) -> bool:
"""
Check if the EB is finished or cancelled.
:param txn: config db transaction
:type txn: ska_sdp_config.Transaction
:rtype: bool
"""
eb = txn.get_execution_block(self._eb_id)
status = eb.get("status")
if status in [
ExecutionBlockStatus.FINISHED.value,
ExecutionBlockStatus.CANCELLED.value,
]:
self._status = status
if status == ExecutionBlockStatus.CANCELLED.value:
LOG.error("EB is %s", status)
if self._deploy_id_list:
self._deploy.update_deploy_status(status)
return True
return False
[docs]
def update_pb_state(
self,
status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET,
) -> None:
"""
Update processing block state.
If the status is UNSET, it is marked as finished.
:param status: status
:type status: str, optional
"""
self._status = status.value
for txn in self._config.txn():
# Set state to indicate processing has ended
state = txn.get_processing_block_state(self._pb_id)
if self._status is ProcessingBlockStatus.UNSET.value:
state["status"] = ProcessingBlockStatus.FINISHED.value
else:
LOG.info("Setting PB status to %s", self._status)
state["status"] = self._status
txn.update_processing_block_state(self._pb_id, state)
[docs]
def monitor_deployments(
self, txn: Transaction, iteration: int = 0
) -> None:
"""
Monitor deployments, update the deployment status
in the processing block state based on the deployments'
pods' state.
Also update the deployments_ready pb state key.
At the moment deployments_ready = True only if all
deployments are RUNNING. Else, it is False.
:param txn: Transaction object (config.txn)
:param iteration: number of txn iteration
"""
LOG.info("Checking deployment status (attempt %s)", iteration)
pb_state = txn.get_processing_block_state(self._pb_id)
pb_state_original = deepcopy(pb_state)
if not self._deploy_id_list:
pb_state["deployments_ready"] = True
else:
# collect the states of deployments for conditional testing
deployments = pb_state.get("deployments")
dpl_states = []
for deploy_id in self._deploy_id_list:
dpl_state = self._determine_deploy_state(txn, deploy_id)
LOG.info(
"State of deployment %s is %s (attempt %s)",
deploy_id,
dpl_state,
iteration,
)
dpl_states.append(dpl_state)
if dpl_state is not None:
deployments[deploy_id] = dpl_state
pb_state["deployments"] = deployments
if dpl_states and all(
dpl_ste == DeploymentStatus.RUNNING.value
for dpl_ste in dpl_states
):
pb_state["deployments_ready"] = True
else:
pb_state["deployments_ready"] = False
if pb_state != pb_state_original:
LOG.info("Updating processing block state (attempt %s)", iteration)
txn.update_processing_block_state(self._pb_id, pb_state)
@staticmethod
def _determine_deploy_state(
txn: Transaction, deploy_id: str
) -> Optional[str]:
"""
Update pb state with deployments' state depending on
the status of their contributing pods.
"""
current_dpl_state = txn.get_deployment_state(deploy_id)
if current_dpl_state is None:
return None
num_pod = current_dpl_state.get("num_pod", 0)
dpl_state_pods = current_dpl_state.get("pods", None)
if dpl_state_pods is None:
dpl_state = DeploymentStatus.PENDING.value
elif len(dpl_state_pods.keys()) < num_pod:
# in this case not all pods have been added to the
# "pods" key yet, so we're waiting for all to appear
dpl_state = DeploymentStatus.PENDING.value
else:
pod_status = set(dpl_state_pods.values())
if DeploymentStatus.PENDING.value in pod_status:
dpl_state = DeploymentStatus.PENDING.value
elif DeploymentStatus.FAILED.value in pod_status:
dpl_state = DeploymentStatus.FAILED.value
elif DeploymentStatus.CANCELLED.value in pod_status:
dpl_state = DeploymentStatus.CANCELLED.value
elif DeploymentStatus.RUNNING.value in pod_status:
dpl_state = DeploymentStatus.RUNNING.value
else:
dpl_state = DeploymentStatus.FINISHED.value
return dpl_state
[docs]
def wait_loop(
self, func: Callable[[Transaction], bool], time_to_ready: int = 0
) -> None:
"""
Wait loop to check the status of the processing block.
It also updates the processing block state with deployment
statuses for realtime scripts.
:param func: function to check condition
for exiting the watcher loop
:param time_to_ready: set deployments_ready to true after
this amount of time has passed (seconds). Only for
deployments deployed with phase.ee_deploy_test
"""
condition_met = False
for watcher in self._config.watcher():
for attempt, txn in enumerate(watcher.txn()):
self.check_state(txn, check_realtime=False)
if self._script_kind == "realtime":
self.monitor_deployments(txn, attempt)
if isinstance(self._deploy, FakeDeploy):
self._deploy.set_deployments_ready(
time_to_ready=time_to_ready
)
condition_met = func(txn)
if condition_met:
break
def __exit__(self, exc_type: str, exc_val: str, exc_tb: str) -> None:
"""
For real-time scripts, this checks if the EB is marked as finished
or cancelled. For both kinds of script, it updates the processing
block state.
"""
# Tear down queue connector first
if self._qc_config:
try:
tear_down_queue_connector(self._config, self._qc_config_path)
except ConfigVanished:
# The config does not exist
pass
if self._script_kind == "realtime":
# Clean up deployment.
LOG.info("Clean up deployments")
if self._deploy_id_list:
self.ee_remove()
self.update_pb_state()
LOG.info("Deployments All Done")