Source code for ska_mid_dish_manager.models.fanned_out_command

"""Abstracts all the logic for executing a command on a device."""

import enum
import json
import logging
import time
from typing import Any, Callable, Optional

from ska_control_model import ResultCode, TaskStatus

from ska_mid_dish_manager.component_managers.tango_device_cm import TangoDeviceComponentManager
from ska_mid_dish_manager.models.dish_enums import FannedOutCommandStatus
from ska_mid_dish_manager.utils.action_helpers import (
    check_component_state_matches_awaited,
    report_awaited_attributes,
    report_task_progress,
)


[docs]class FannedOutCommand: """Defines a single command to be fanned out as part of a Action."""
[docs] def __init__( self, logger: logging.Logger, device: str, command_name: str, command: Callable, component_state: dict, command_argument: Any = None, awaited_component_state: dict = {}, timeout_s: float = 0, progress_callback: Optional[Callable] = None, ): """:param logger: Logger instance :type logger: Logger :param device: The name for the device this command is executed on :type device: str :param command_name: The name for the command :type command_name: str :param command: Command to run as part of `execute` :type command: str :param component_state: The component state containing the attributes to wait for updates on. :type component_state: Optional[dict] :param command_argument: Argument for the requested command :type command_argument: Any :param awaited_component_state: The component state containing the attributes and values to wait for. :type awaited_component_state: dict :param timeout_s: Timeout (in seconds) for the command execution. A value <= 0 will disable the timeout. :type timeout_s: float :param progress_callback: Optional callback to report progress updates. :type progress_callback: Callable """ self.logger = logger self.device = device self.command_name = command_name self.command = command self.command_argument = command_argument self.timeout_s = timeout_s self.start_time: float = 0.0 self.executed_cmd_message = None self._status = FannedOutCommandStatus.PENDING self._task_finish_reported = False self._progress_callback = progress_callback self.executed_cmd_response = "" self.component_state = component_state self.awaited_component_state = awaited_component_state self.awaited_update_reports = {attr: False for attr in awaited_component_state.keys()}
[docs] def execute(self, task_callback: Callable) -> None: """Execute the fanned out command.""" self.logger.debug(f"Executing {self.command_name} with arg {self.command_argument}") self._status = FannedOutCommandStatus.IN_PROGRESS self.start_time = time.time() try: res = self.command() assert len(res) == 2, ( f"FannedOutCommand 'command' Callable expects a response of len 2, but got '{res}'" ) self.executed_cmd_response, self.executed_cmd_message = res if self.awaited_component_state is not None: awaited_attributes = list(self.awaited_component_state.keys()) awaited_values = list(self.awaited_component_state.values()) report_awaited_attributes( self._progress_callback, awaited_attributes, awaited_values, self.device ) except RuntimeError as e: self.logger.error(f"FannedOutCommand '{self.command_name}' failed to execute: {e}") self._status = FannedOutCommandStatus.FAILED self.executed_cmd_response = f"{e.args[0]}"
@property def status(self) -> FannedOutCommandStatus: """Get the status of the fanned out command.""" return self._status @property def failed(self) -> bool: """Check if the fanned out command has failed.""" return self.status in ( FannedOutCommandStatus.TIMED_OUT, FannedOutCommandStatus.FAILED, FannedOutCommandStatus.ABORTED, FannedOutCommandStatus.REJECTED, ) @property def successful(self) -> bool: """Check if the fanned out command has failed.""" return self.status in (FannedOutCommandStatus.COMPLETED, FannedOutCommandStatus.IGNORED) @property def finished(self) -> bool: """Check if the fanned out command has finished.""" return self.failed or self.successful def _update_status(self, task_callback: Callable) -> None: """Update the status of the command based on component state and timeout checks.""" if self._status == FannedOutCommandStatus.IN_PROGRESS: # completed if check_component_state_matches_awaited( self.component_state, self.awaited_component_state ): self._status = FannedOutCommandStatus.COMPLETED # timeout if self.timeout_s > 0 and time.time() - self.start_time > self.timeout_s: self._status = FannedOutCommandStatus.TIMED_OUT
[docs] def report_progress(self, task_callback: Callable) -> None: """Report the progress of fanned out command.""" current_comp_state = dict(self.component_state) # Report awaited component state updates for attr_name, reported_update in self.awaited_update_reports.items(): if not reported_update and attr_name in self.awaited_component_state: expected_value = self.awaited_component_state[attr_name] if attr_name in current_comp_state: current_value = current_comp_state[attr_name] if current_value == expected_value: if isinstance(current_value, enum.IntEnum): current_value = current_value.name report_task_progress( f"{self.device} {attr_name} changed to {current_value}", self._progress_callback, ) self.awaited_update_reports[attr_name] = True # Update the commands status self._update_status(task_callback) # Report if the command has finished if self.finished and not self._task_finish_reported: status_name = self.status.name.lower().replace("_", " ") cmd_response = ( self.executed_cmd_response if self._status in [ FannedOutCommandStatus.FAILED, FannedOutCommandStatus.REJECTED, FannedOutCommandStatus.ABORTED, ] else "" ) if cmd_response: report_task_progress( f"{self.device}.{self.command_name} {status_name}: {cmd_response}", self._progress_callback, ) else: report_task_progress( f"{self.device}.{self.command_name} {status_name}", self._progress_callback, ) self._task_finish_reported = True
[docs]class FannedOutTangoCommand(FannedOutCommand):
[docs] def __init__( self, logger: logging.Logger, device: str, command_name: str, device_component_manager: TangoDeviceComponentManager, command_argument: Any = None, awaited_component_state: dict = {}, timeout_s: float = 0, progress_callback: Optional[Callable] = None, is_device_ignored: bool = False, ): """:param logger: Logger instance :type logger: Logger :param device: The name for the device this command is executed on :type device: str :param command_name: The name for the command to be executed :type command_name: str :param device_component_manager: The component manager of the subservient device :type device_component_manager: TangoDeviceComponentManager :param timeout_s: Timeout (in seconds) for the command execution :type timeout_s: float :param command_argument: Argument for the requested command :type command_argument: Any :param awaited_component_state: The component state containing the attributes and values to wait for. :type awaited_component_state: dict :param progress_callback: Optional callback to report progress updates. :type progress_callback: Callable :param is_device_ignored: Toggle to ignore fanning out of command if the device is ignored. :type is_device_ignored: bool """ self.device_component_manager = device_component_manager self.is_device_ignored = is_device_ignored super().__init__( logger=logger, device=device, command_name=f"{command_name}", command=self._execute_tango_command, # use device_component_manager._component_state to pass the dict by reference # device_component_manager.component_state will use the tango base property which will # do a deep copy component_state=self.device_component_manager._component_state, command_argument=command_argument, awaited_component_state=awaited_component_state, timeout_s=timeout_s, progress_callback=progress_callback, )
def _execute_tango_command(self) -> tuple: """Fan out the respective command to the subservient devices.""" if self.is_device_ignored: self.logger.debug( f"{self.device} device is disabled. {self.command_name} call ignored" ) self._status = FannedOutCommandStatus.IGNORED return None, None task_status, msg = self.device_component_manager.execute_command( self.command_name, self.command_argument ) if task_status in [TaskStatus.FAILED, TaskStatus.REJECTED, TaskStatus.ABORTED]: raise RuntimeError(msg) return task_status, msg
[docs]class FannedOutTangoLongRunningCommand(FannedOutTangoCommand):
[docs] def __init__( self, logger: logging.Logger, device: str, command_name: str, device_component_manager: TangoDeviceComponentManager, command_argument: Any = None, awaited_component_state: dict = {}, timeout_s: float = 0, progress_callback: Optional[Callable] = None, is_device_ignored: bool = False, ): """:param logger: Logger instance :type logger: Logger :param device: The name for the device this command is executed on :type device: str :param command_name: The name for the command to be executed :type command_name: str :param device_component_manager: The component manager of the subservient device :type device_component_manager: TangoDeviceComponentManager :param timeout_s: Timeout (in seconds) for the command execution :type timeout_s: float :param command_argument: Argument for the requested command :type command_argument: Any :param awaited_component_state: The component state containing the attributes and values to wait for. :type awaited_component_state: dict :param progress_callback: Optional callback to report progress updates. :type progress_callback: Callable :param is_device_ignored: Toggle to ignore fanning out of command if the device is ignored. :type is_device_ignored: bool """ self.is_lrc_finished = False super().__init__( logger=logger, device=device, command_name=f"{command_name}", device_component_manager=device_component_manager, command_argument=command_argument, awaited_component_state=awaited_component_state, timeout_s=timeout_s, progress_callback=progress_callback, is_device_ignored=is_device_ignored, )
def _execute_tango_command(self) -> tuple: """Fan out the respective command to the device and handle task status response.""" task_status, msg = super()._execute_tango_command() # If the command completed immediately then it won't appear in the LRC attributes. Mark # the lrc as complete, the component state check will be used to complete the command. if task_status in [TaskStatus.COMPLETED, TaskStatus.REJECTED, TaskStatus.ABORTED]: self.is_lrc_finished = True return task_status, msg def _is_command_in_lrc_queued(self) -> bool: """Check if the long running command is in the lrcQueue attribute.""" lrc_queue = self.device_component_manager.read_attribute_value("lrcqueue", log_read=False) if not isinstance(lrc_queue, tuple): self.logger.error( "lrcQueue value is not a tuple, got %s: %s", type(lrc_queue), lrc_queue ) return False for queued_cmd in lrc_queue: try: queued_cmd_dict = json.loads(queued_cmd) except json.JSONDecodeError: self.logger.exception("Invalid json value for lrcQueue") continue if queued_cmd_dict.get("uid") == self.executed_cmd_message: return True return False def _is_command_in_lrc_executing(self) -> bool: """Check if the long running command is in the lrcExecuting attribute.""" lrc_executing = self.device_component_manager.read_attribute_value( "lrcexecuting", log_read=False ) if not isinstance(lrc_executing, tuple): self.logger.error( "lrcExecuting value is not a tuple, got %s: %s", type(lrc_executing), lrc_executing ) return False for executing_cmd in lrc_executing: try: executing_cmd_dict = json.loads(executing_cmd) except json.JSONDecodeError: self.logger.exception("Invalid json value for lrcExecuting") continue if executing_cmd_dict.get("uid") == self.executed_cmd_message: return True return False def _get_command_lrc_finished_dict(self) -> Optional[dict]: """Get the lrcFinished dict for the long running command.""" lrc_finished = self.device_component_manager.read_attribute_value( "lrcfinished", log_read=False ) if not isinstance(lrc_finished, tuple): self.logger.error( "lrcFinished value is not a tuple, got %s: %s", type(lrc_finished), lrc_finished ) return None for finished_cmd in lrc_finished: try: finished_cmd_dict = json.loads(finished_cmd) except json.JSONDecodeError: self.logger.exception("Invalid json value for lrcFinished") return None if finished_cmd_dict.get("uid") == self.executed_cmd_message: return finished_cmd_dict return None def _update_status(self, task_callback: Callable) -> None: """Update the status of the fanned out command based on the LRC status and component state. Requires both the LRC to have completed and the component states to match to complete. """ if self._status in [FannedOutCommandStatus.QUEUED, FannedOutCommandStatus.IN_PROGRESS]: # If the LRC has not yet been reported in lrcFinished if not self.is_lrc_finished: lrc_finished_dict = self._get_command_lrc_finished_dict() if lrc_finished_dict: lrc_result = lrc_finished_dict["result"] lrc_status = lrc_finished_dict["status"] self.executed_cmd_response = lrc_result if lrc_status == TaskStatus.COMPLETED.name: # Don't mark it as completed yet, still need to check component state self.is_lrc_finished = True elif lrc_status == TaskStatus.ABORTED.name: self._status = FannedOutCommandStatus.ABORTED return elif lrc_status == TaskStatus.REJECTED.name: self._status = FannedOutCommandStatus.REJECTED return elif lrc_status == TaskStatus.FAILED.name: self._status = FannedOutCommandStatus.FAILED return elif self._is_command_in_lrc_executing(): self._status = FannedOutCommandStatus.IN_PROGRESS elif self._is_command_in_lrc_queued(): self._status = FannedOutCommandStatus.QUEUED # Final component state check if self.is_lrc_finished: component_ready = check_component_state_matches_awaited( self.component_state, self.awaited_component_state, ) if component_ready: self._status = FannedOutCommandStatus.COMPLETED return # timeout if self.timeout_s > 0 and time.time() - self.start_time > self.timeout_s: self._status = FannedOutCommandStatus.TIMED_OUT
[docs]class DishManagerCMMethod(FannedOutCommand): """Class that executes the method, args and kwargs passed to it. This class specifically handles the case where the method responds with a result or raises an exception. """
[docs] def __init__( self, logger, method, component_state, command_args=(), command_kwargs={}, awaited_component_state={}, timeout_s=0, ): self.command_args = command_args self.command_kwargs = command_kwargs super().__init__( logger, "DishManager", str(method), method, component_state, None, awaited_component_state, timeout_s, )
[docs] def execute(self, task_callback) -> None: """Execute the command.""" self.logger.debug( ( f"Executing {self.command_name} with args {self.command_args} " f"and kwargs. {self.command_kwargs}" ) ) self._status = FannedOutCommandStatus.IN_PROGRESS self.start_time = time.time() try: res = self.command(*self.command_args, **self.command_kwargs) self.logger.debug(f"Result: {res}") self.executed_cmd_response = res self._status = FannedOutCommandStatus.COMPLETED except Exception as e: self.logger.exception(f"FannedOutCommand '{self.command_name}' failed to execute: {e}") self._status = FannedOutCommandStatus.FAILED self.executed_cmd_response = f"{e}"
[docs]class DishManagerCMMethodCallBack(FannedOutCommand): """Class that executes the method, args and kwargs passed to it. This class specifically handles the case where the task_callback is used to track method result. """
[docs] def __init__( self, logger, method, component_state, command_args=(), command_kwargs={}, awaited_component_state={}, timeout_s=0, ): self.command_args = command_args self.command_kwargs = command_kwargs super().__init__( logger, "DishManager", str(method), method, component_state, None, awaited_component_state, timeout_s, )
def _task_callback(self, *args, **kwargs): """Update the status from the callback.""" status = kwargs.get("status", None) if status: if status == TaskStatus.COMPLETED: self._status = FannedOutCommandStatus.COMPLETED if status in (TaskStatus.FAILED, TaskStatus.ABORTED, TaskStatus.NOT_FOUND): self._status = FannedOutCommandStatus.FAILED if status in (TaskStatus.QUEUED, TaskStatus.STAGING, TaskStatus.IN_PROGRESS): self._status = FannedOutCommandStatus.IN_PROGRESS
[docs] def execute(self, task_callback) -> None: """Execute the command.""" self._status = FannedOutCommandStatus.IN_PROGRESS self.start_time = time.time() self.command_args = list(self.command_args) self.command_args.insert(0, self._task_callback) try: self.logger.debug( ( f"Executing {self.command_name} with args {self.command_args} " f"and kwargs. {self.command_kwargs}" ) ) res = self.command(*self.command_args, **self.command_kwargs) self.logger.debug(f"Result: {res}") self.executed_cmd_response = res except Exception as e: self.logger.exception(f"FannedOutCommand '{self.command_name}' failed to execute: {e}") self._status = FannedOutCommandStatus.FAILED self.executed_cmd_response = f"{e}"
[docs]class DishManagerCMMethodResultCode(FannedOutCommand): """Class that executes the method, args and kwargs passed to it. This class specifically handles the case where method responds with a ResultCode immediately. """
[docs] def __init__( self, logger, method, component_state, command_args=(), command_kwargs={}, awaited_component_state={}, timeout_s=0, ): self.command_args = command_args self.command_kwargs = command_kwargs super().__init__( logger, "DishManager", str(method), method, component_state, None, awaited_component_state, timeout_s, )
[docs] def execute(self, task_callback) -> None: """Execute the command.""" self._status = FannedOutCommandStatus.IN_PROGRESS self.start_time = time.time() try: self.logger.debug( ( f"Executing {self.command_name} with args {self.command_args} " f"and kwargs. {self.command_kwargs}" ) ) result_code, message = self.command(*self.command_args, **self.command_kwargs) self.logger.debug(f"Result: {result_code}, Message: {message}") self.executed_cmd_response = result_code # For DishManagerCMMethodResultCode, we expect an immediate response. # Any response that gets queued/aborted/etc is considered failed. # In those cases use another Action. if result_code == ResultCode.OK: self._status = FannedOutCommandStatus.COMPLETED else: self._status = FannedOutCommandStatus.FAILED except Exception as e: self.logger.exception(f"FannedOutCommand '{self.command_name}' failed to execute: {e}") self._status = FannedOutCommandStatus.FAILED self.executed_cmd_response = f"{e}"