"""Abstracts all the logic for executing a command on a device."""
import enum
import logging
import time
from typing import Any, Callable, Optional
from ska_control_model import ResultCode, TaskStatus
from ska_mid_dish_manager.component_managers.tango_device_cm import TangoDeviceComponentManager
from ska_mid_dish_manager.models.dish_enums import FannedOutCommandStatus
from ska_mid_dish_manager.utils.action_helpers import (
check_component_state_matches_awaited,
report_awaited_attributes,
report_task_progress,
)
[docs]class FannedOutCommand:
"""Defines a single command to be fanned out as part of a Action."""
[docs] def __init__(
self,
logger: logging.Logger,
device: str,
command_name: str,
command: Callable,
component_state: dict,
command_argument: Any = None,
awaited_component_state: dict = {},
timeout_s: float = 0,
progress_callback: Optional[Callable] = None,
):
""":param logger: Logger instance
:type logger: Logger
:param device: The name for the device this command is executed on
:type device: str
:param command_name: The name for the command
:type command_name: str
:param command: Command to run as part of `execute`
:type command: str
:param component_state: The component state containing the attributes to wait for updates
on.
:type component_state: Optional[dict]
:param command_argument: Argument for the requested command
:type command_argument: Any
:param awaited_component_state: The component state containing the attributes and values to
wait for.
:type awaited_component_state: dict
:param timeout_s: Timeout (in seconds) for the command execution. A value <= 0 will disable
the timeout.
:type timeout_s: float
:param progress_callback: Optional callback to report progress updates.
:type progress_callback: Callable
"""
self.logger = logger
self.device = device
self.command_name = command_name
self.command = command
self.command_argument = command_argument
self.timeout_s = timeout_s
self.start_time: float = 0.0
self.cmd_message = None
self._status = FannedOutCommandStatus.PENDING
self._task_finish_reported = False
self._progress_callback = progress_callback
self.cmd_response = ""
self.component_state = component_state
self.awaited_component_state = awaited_component_state
self.awaited_update_reports = {attr: False for attr in awaited_component_state.keys()}
[docs] def execute(self, task_callback: Callable) -> None:
"""Execute the fanned out command."""
self.logger.debug(f"Executing {self.command_name} with arg {self.command_argument}")
self._status = FannedOutCommandStatus.RUNNING
self.start_time = time.time()
try:
res = self.command()
assert len(res) == 2, (
f"FannedOutCommand 'command' Callable expects a response of len 2, but got '{res}'"
)
self.cmd_response, self.cmd_message = res
except RuntimeError as e:
self.logger.error(f"FannedOutCommand '{self.command_name}' failed to execute: {e}")
self._status = FannedOutCommandStatus.FAILED
self.cmd_response = f"{e.args[0]}"
@property
def status(self) -> FannedOutCommandStatus:
"""Get the status of the fanned out command."""
return self._status
@property
def failed(self) -> bool:
"""Check if the fanned out command has failed."""
return self.status in (FannedOutCommandStatus.TIMED_OUT, FannedOutCommandStatus.FAILED)
@property
def successful(self) -> bool:
"""Check if the fanned out command has failed."""
return self.status in (FannedOutCommandStatus.COMPLETED, FannedOutCommandStatus.IGNORED)
@property
def finished(self) -> bool:
"""Check if the fanned out command has finished."""
return self.failed or self.successful
def _update_status(self, task_callback: Callable) -> None:
if self._status == FannedOutCommandStatus.RUNNING:
# timeout
if self.timeout_s > 0 and time.time() - self.start_time > self.timeout_s:
self._status = FannedOutCommandStatus.TIMED_OUT
# completed
if check_component_state_matches_awaited(
self.component_state, self.awaited_component_state
):
self._status = FannedOutCommandStatus.COMPLETED
if self._status in [FannedOutCommandStatus.FAILED, FannedOutCommandStatus.TIMED_OUT]:
report_task_progress(
f"{self.device} device {self._status.name.lower().replace('_', ' ')}"
f" executing {self.command_name} command",
self._progress_callback,
)
[docs] def report_progress(self, task_callback: Callable) -> None:
"""Report the progress of fanned out command."""
self._update_status(task_callback)
current_comp_state = dict(self.component_state)
# Awaited component state updates
for attr_name, reported_update in self.awaited_update_reports.items():
if not reported_update and attr_name in self.awaited_component_state:
expected_value = self.awaited_component_state[attr_name]
if attr_name in current_comp_state:
current_value = current_comp_state[attr_name]
if current_value == expected_value:
if isinstance(current_value, enum.IntEnum):
current_value = current_value.name
report_task_progress(
f"{self.device} {attr_name} changed to {current_value}",
self._progress_callback,
)
self.awaited_update_reports[attr_name] = True
if self.finished and not self._task_finish_reported:
status_name = self.status.name.lower().replace("_", " ")
report_task_progress(
f"{self.device}.{self.command_name} {status_name}", self._progress_callback
)
self._task_finish_reported = True
[docs]class FannedOutSlowCommand(FannedOutCommand):
[docs] def __init__(
self,
logger: logging.Logger,
device: str,
command_name: str,
device_component_manager: TangoDeviceComponentManager,
command_argument: Any = None,
awaited_component_state: dict = {},
timeout_s: float = 0,
progress_callback: Optional[Callable] = None,
is_device_ignored: bool = False,
):
""":param logger: Logger instance
:type logger: Logger
:param device: The name for the device this command is executed on
:type device: str
:param command_name: The name for the command to be executed
:type command_name: str
:param device_component_manager: The component manager of the subservient device
:type device_component_manager: TangoDeviceComponentManager
:param timeout_s: Timeout (in seconds) for the command execution
:type timeout_s: float
:param command_argument: Argument for the requested command
:type command_argument: Any
:param awaited_component_state: The component state containing the attributes and values to
wait for.
:type awaited_component_state: dict
:param progress_callback: Optional callback to report progress updates.
:type progress_callback: Callable
:param is_device_ignored: Toggle to ignore fanning out of command if the device is ignored.
:type is_device_ignored: bool
"""
self.device_component_manager = device_component_manager
self.is_device_ignored = is_device_ignored
super().__init__(
logger=logger,
device=device,
command_name=f"{command_name}",
command=self._execute_tango_command,
# use device_component_manager._component_state to pass the dict by reference
# device_component_manager.component_state will use the tango base property which will
# do a deep copy
component_state=self.device_component_manager._component_state,
command_argument=command_argument,
awaited_component_state=awaited_component_state,
timeout_s=timeout_s,
progress_callback=progress_callback,
)
def _execute_tango_command(self) -> tuple:
"""Fan out the respective command to the subservient devices."""
if self.is_device_ignored:
self.logger.debug(
f"{self.device} device is disabled. {self.command_name} call ignored"
)
self._status = FannedOutCommandStatus.IGNORED
return None, None
task_status, msg = self.device_component_manager.execute_command(
self.command_name, self.command_argument
)
if self.awaited_component_state is not None:
awaited_attributes = list(self.awaited_component_state.keys())
awaited_values = list(self.awaited_component_state.values())
report_awaited_attributes(
self._progress_callback, awaited_attributes, awaited_values, self.device
)
if task_status == TaskStatus.FAILED:
raise RuntimeError(msg)
return task_status, msg
[docs]class DishManagerCMMethod(FannedOutCommand):
"""Class that executes the method, args and kwargs passed to it.
This class specifically handles the case where the method responds with a result or raises
an exception.
"""
[docs] def __init__(
self,
logger,
method,
component_state,
command_args=(),
command_kwargs={},
awaited_component_state={},
timeout_s=0,
):
self.command_args = command_args
self.command_kwargs = command_kwargs
super().__init__(
logger,
"DishManager",
str(method),
method,
component_state,
None,
awaited_component_state,
timeout_s,
)
[docs] def execute(self, task_callback) -> None:
"""Execute the command."""
self.logger.debug(
(
f"Executing {self.command_name} with args {self.command_args} "
f"and kwargs. {self.command_kwargs}"
)
)
self._status = FannedOutCommandStatus.RUNNING
self.start_time = time.time()
try:
res = self.command(*self.command_args, **self.command_kwargs)
self.logger.debug(f"Result: {res}")
self.cmd_response = res
self._status = FannedOutCommandStatus.COMPLETED
except Exception as e:
self.logger.exception(f"FannedOutCommand '{self.command_name}' failed to execute: {e}")
self._status = FannedOutCommandStatus.FAILED
self.cmd_response = f"{e}"
[docs]class DishManagerCMMethodCallBack(FannedOutCommand):
"""Class that executes the method, args and kwargs passed to it.
This class specifically handles the case where the task_callback is used to
track method result.
"""
[docs] def __init__(
self,
logger,
method,
component_state,
command_args=(),
command_kwargs={},
awaited_component_state={},
timeout_s=0,
):
self.command_args = command_args
self.command_kwargs = command_kwargs
super().__init__(
logger,
"DishManager",
str(method),
method,
component_state,
None,
awaited_component_state,
timeout_s,
)
def _task_callback(self, *args, **kwargs):
"""Update the status from the callback."""
status = kwargs.get("status", None)
if status:
if status == TaskStatus.COMPLETED:
self._status = FannedOutCommandStatus.COMPLETED
if status in (TaskStatus.FAILED, TaskStatus.ABORTED, TaskStatus.NOT_FOUND):
self._status = FannedOutCommandStatus.FAILED
if status in (TaskStatus.QUEUED, TaskStatus.STAGING, TaskStatus.IN_PROGRESS):
self._status = FannedOutCommandStatus.RUNNING
[docs] def execute(self, task_callback) -> None:
"""Execute the command."""
self._status = FannedOutCommandStatus.RUNNING
self.start_time = time.time()
self.command_args = list(self.command_args)
self.command_args.insert(0, self._task_callback)
try:
self.logger.debug(
(
f"Executing {self.command_name} with args {self.command_args} "
f"and kwargs. {self.command_kwargs}"
)
)
res = self.command(*self.command_args, **self.command_kwargs)
self.logger.debug(f"Result: {res}")
self.cmd_response = res
except Exception as e:
self.logger.exception(f"FannedOutCommand '{self.command_name}' failed to execute: {e}")
self._status = FannedOutCommandStatus.FAILED
self.cmd_response = f"{e}"
[docs]class DishManagerCMMethodResultCode(FannedOutCommand):
"""Class that executes the method, args and kwargs passed to it.
This class specifically handles the case where method responds with a ResultCode immediately.
"""
[docs] def __init__(
self,
logger,
method,
component_state,
command_args=(),
command_kwargs={},
awaited_component_state={},
timeout_s=0,
):
self.command_args = command_args
self.command_kwargs = command_kwargs
super().__init__(
logger,
"DishManager",
str(method),
method,
component_state,
None,
awaited_component_state,
timeout_s,
)
[docs] def execute(self, task_callback) -> None:
"""Execute the command."""
self._status = FannedOutCommandStatus.RUNNING
self.start_time = time.time()
try:
self.logger.debug(
(
f"Executing {self.command_name} with args {self.command_args} "
f"and kwargs. {self.command_kwargs}"
)
)
result_code, message = self.command(*self.command_args, **self.command_kwargs)
self.logger.debug(f"Result: {result_code}, Message: {message}")
self.cmd_response = result_code
# For DishManagerCMMethodResultCode, we expect an immediate response.
# Any response that gets queued/aborted/etc is considered failed.
# In those cases use another Action.
if result_code == ResultCode.OK:
self._status = FannedOutCommandStatus.COMPLETED
else:
self._status = FannedOutCommandStatus.FAILED
except Exception as e:
self.logger.exception(f"FannedOutCommand '{self.command_name}' failed to execute: {e}")
self._status = FannedOutCommandStatus.FAILED
self.cmd_response = f"{e}"