Source code for ska_mid_dish_manager.models.dish_mode_model

"""This model enforces the legal transitions when a command is triggered. It assesses the current
state of the device to decide if the requested state is a nearby node to allow or reject a command.
"""

import typing
from typing import Any, Callable

import networkx as nx

from ska_mid_dish_manager.models.dish_enums import DishMode
from ska_mid_dish_manager.utils.action_helpers import report_task_progress

CONFIG_COMMANDS = (
    "ConfigureBand1",
    "ConfigureBand2",
    "ConfigureBand3",
    "ConfigureBand4",
    "ConfigureBand5a",
    "ConfigureBand5b",
    "ConfigureBand",
)

DISH_MODE_NODES = (
    "STARTUP",
    "SHUTDOWN",
    "STANDBY_LP",
    "STANDBY_FP",
    "MAINTENANCE",
    "STOW",
    "CONFIG",
    "OPERATE",
    "UNKNOWN",
)


[docs]class DishModeModel: """Representation of the mode transition diagram, depicting commanded transitions."""
[docs] def __init__(self) -> None: self.dishmode_graph = self._build_model()
@classmethod def _build_model(cls) -> Any: dishmode_graph = nx.DiGraph() for node in DISH_MODE_NODES: dishmode_graph.add_node(node) # From Standby_LP to other modes dishmode_graph.add_edge("STANDBY_LP", "STANDBY_FP", commands=["SetStandbyFPMode"]) dishmode_graph.add_edge("STANDBY_LP", "CONFIG", commands=CONFIG_COMMANDS) # From Standby_FP to other modes dishmode_graph.add_edge("STANDBY_FP", "STANDBY_LP", commands=["SetStandbyLPMode"]) dishmode_graph.add_edge("STANDBY_FP", "CONFIG", commands=CONFIG_COMMANDS) # From Operate to other modes dishmode_graph.add_edge("OPERATE", "STANDBY_FP", commands=["SetStandbyFPMode"]) dishmode_graph.add_edge("OPERATE", "STANDBY_LP", commands=["SetStandbyLPMode"]) dishmode_graph.add_edge("OPERATE", "CONFIG", commands=CONFIG_COMMANDS) # From Stow to other modes dishmode_graph.add_edge("STOW", "STANDBY_FP", commands=["SetStandbyFPMode"]) dishmode_graph.add_edge("STOW", "STANDBY_LP", commands=["SetStandbyLPMode"]) dishmode_graph.add_edge("STOW", "CONFIG", commands=CONFIG_COMMANDS) dishmode_graph.add_edge("STOW", "MAINTENANCE", commands=["SetMaintenanceMode"]) # From any mode to Stow for node in DISH_MODE_NODES: if node == "STOW": continue dishmode_graph.add_edge(node, "STOW", commands=["SetStowMode"]) # From any mode to Shutdown # TODO: The shutdown command is not currently defined. Add it here # once implemented for node in DISH_MODE_NODES: if node == "SHUTDOWN": continue dishmode_graph.add_edge(node, "SHUTDOWN") return dishmode_graph
[docs] @typing.no_type_check def is_command_allowed( self, cmd_name: str, dish_mode: str | None = None, component_manager: Any | None = None, progress_callback: Callable | None = None, ) -> bool: """Determine if requested tango command is allowed based on current dish mode. This method is used by the executor to evaluate the command pre-condition after it's taken off the queue. To ensure the evaluation is always performed using an updated component state (and not the old state used when the command is queued), the component manager should be passed for the enqueue operation. In testing scenarios for example, the function can be evoked directly with the dishmode parameter. NOTE: Though the function signature has only one required argument, it still needs either the dish_mode or component_manager passed to it to perform the evaluation. :param cmd_name: the requested command :param dish_mode: the current dishMode reported by the component state :param component_manager: the component manager containing the component state :param progress_callback: progress_callback function to report progress :raises TypeError: when no dish_mode or component_manager is provided to function call :return: boolean indicating the function execution is allowed """ try: current_dish_mode = ( dish_mode or DishMode(component_manager.component_state["dishmode"]).name ) except AttributeError as exc: raise TypeError( "is_command_allowed() requires either the dish_mode or" " the component_manager to be specified" ) from exc # For a call to SetMaintenanceMode, the command is allowed regardless of the current # dish mode. SetMaintenanceMode should not be allowed in Maintenance dish mode if cmd_name == "SetMaintenanceMode" and not current_dish_mode == "MAINTENANCE": return True allowed_commands = [] for from_node, to_node in self.dishmode_graph.edges(current_dish_mode): commands = self.dishmode_graph.get_edge_data(from_node, to_node).get("commands", None) if commands: allowed_commands.extend(commands) if cmd_name in allowed_commands: return True # report the reason for the command rejection to logs and lrc attribute msg = ( f"{cmd_name} not allowed in {current_dish_mode} dishMode." f" Commands allowed from {current_dish_mode} are: {allowed_commands}." ) if component_manager: logger = component_manager.logger logger.debug(msg) report_task_progress(msg, progress_callback) return False