"""Fake deployment."""
# pylint: disable=too-many-arguments
import logging
import threading
from time import sleep
from typing import Any, Callable, Tuple
from ska_sdp_config import Config
from .ee_base_deploy import EEDeploy
LOG = logging.getLogger("ska_sdp_scripting")
[docs]
class FakeDeploy(EEDeploy):
"""
Deploy a fake execution engine.
The function is called with the arguments in a separate thread so the
constructor can return immediately.
This should not be created directly, use the :func:`Phase.ee_deploy_test`
method instead.
:param pb_id: processing block ID
:type pb_id: str
:param config: SDP configuration client
:type config: ska_sdp_config.Client
:param deploy_name: deployment name
:type deploy_name: str
:param func: function to execute
:type func: function
:param f_args: function arguments
:type f_args: tuple
"""
def __init__(
self,
pb_id: str,
config: Config,
deploy_name: str,
func: Callable = None,
f_args: Tuple[Any] = None,
):
super().__init__(pb_id, config)
thread = threading.Thread(
target=self._deploy,
args=(deploy_name, func, f_args),
daemon=True,
)
thread.start()
def _deploy(
self,
deploy_name: str,
func: Callable = None,
f_args: Tuple[Any] = None,
) -> None:
"""
Execute the function.
This is called by the execution thread.
:param deploy_name: deployment name
:param func: function to process
:param f_args: function arguments
"""
LOG.info("Deploying %s script...", deploy_name)
self._deploy_id = f"proc-{self._pb_id}-{self._pb_id}"
self.update_deploy_status("RUNNING")
if f_args is not None and func is not None:
LOG.info("Starting processing for %fs", *f_args)
func(*f_args)
LOG.info("Finished processing")
else:
LOG.info("No processing function is defined. Finished.")
self.update_deploy_status("FINISHED")
[docs]
def set_deployments_ready(self, time_to_ready: int = 0) -> None:
"""
Set deployments_ready to True
:param time_to_ready: set deployments_ready to true after
this amount of time has passed (seconds)
"""
sleep(time_to_ready)
for txn in self._config.txn():
pb_state = txn.get_processing_block_state(self._pb_id)
if (
"deployments_ready" not in pb_state
or pb_state["deployments_ready"] is False
):
pb_state["deployments_ready"] = True
txn.update_processing_block_state(self._pb_id, pb_state)