Source code for ska_sdp_scripting.fake_deploy

"""Fake deployment."""

# pylint: disable=too-many-arguments

import logging
import threading
from time import sleep
from typing import Any, Callable, Tuple

from ska_sdp_config import Config

from .ee_base_deploy import EEDeploy

LOG = logging.getLogger("ska_sdp_scripting")


[docs] class FakeDeploy(EEDeploy): """ Deploy a fake execution engine. The function is called with the arguments in a separate thread so the constructor can return immediately. This should not be created directly, use the :func:`Phase.ee_deploy_test` method instead. :param pb_id: processing block ID :type pb_id: str :param config: SDP configuration client :type config: ska_sdp_config.Client :param deploy_name: deployment name :type deploy_name: str :param func: function to execute :type func: function :param f_args: function arguments :type f_args: tuple """ def __init__( self, pb_id: str, config: Config, deploy_name: str, func: Callable = None, f_args: Tuple[Any] = None, ): super().__init__(pb_id, config) thread = threading.Thread( target=self._deploy, args=(deploy_name, func, f_args), daemon=True, ) thread.start() def _deploy( self, deploy_name: str, func: Callable = None, f_args: Tuple[Any] = None, ) -> None: """ Execute the function. This is called by the execution thread. :param deploy_name: deployment name :param func: function to process :param f_args: function arguments """ LOG.info("Deploying %s script...", deploy_name) self._deploy_id = f"proc-{self._pb_id}-{self._pb_id}" self.update_deploy_status("RUNNING") if f_args is not None and func is not None: LOG.info("Starting processing for %fs", *f_args) func(*f_args) LOG.info("Finished processing") else: LOG.info("No processing function is defined. Finished.") self.update_deploy_status("FINISHED")
[docs] def set_deployments_ready(self, time_to_ready: int = 0) -> None: """ Set deployments_ready to True :param time_to_ready: set deployments_ready to true after this amount of time has passed (seconds) """ sleep(time_to_ready) for txn in self._config.txn(): pb_state = txn.get_processing_block_state(self._pb_id) if ( "deployments_ready" not in pb_state or pb_state["deployments_ready"] is False ): pb_state["deployments_ready"] = True txn.update_processing_block_state(self._pb_id, pb_state)