"""This model enforces the legal transitions when a command is triggered. It assesses the current
state of the device to decide if the requested state is a nearby node to allow or reject a command.
"""
import typing
from typing import Any, Callable
import networkx as nx
from ska_mid_dish_manager.models.dish_enums import DishMode
from ska_mid_dish_manager.utils.action_helpers import report_task_progress
CONFIG_COMMANDS = (
"ConfigureBand1",
"ConfigureBand2",
"ConfigureBand3",
"ConfigureBand4",
"ConfigureBand5a",
"ConfigureBand5b",
"ConfigureBand",
)
DISH_MODE_NODES = (
"STARTUP",
"SHUTDOWN",
"STANDBY_LP",
"STANDBY_FP",
"MAINTENANCE",
"STOW",
"CONFIG",
"OPERATE",
"UNKNOWN",
)
[docs]class DishModeModel:
"""Representation of the mode transition diagram, depicting commanded transitions."""
[docs] def __init__(self) -> None:
self.dishmode_graph = self._build_model()
@classmethod
def _build_model(cls) -> Any:
dishmode_graph = nx.DiGraph()
for node in DISH_MODE_NODES:
dishmode_graph.add_node(node)
# From Standby_LP to other modes
dishmode_graph.add_edge("STANDBY_LP", "STANDBY_FP", commands=["SetStandbyFPMode"])
dishmode_graph.add_edge("STANDBY_LP", "CONFIG", commands=CONFIG_COMMANDS)
# From Standby_FP to other modes
dishmode_graph.add_edge("STANDBY_FP", "STANDBY_LP", commands=["SetStandbyLPMode"])
dishmode_graph.add_edge("STANDBY_FP", "CONFIG", commands=CONFIG_COMMANDS)
# From Operate to other modes
dishmode_graph.add_edge("OPERATE", "STANDBY_FP", commands=["SetStandbyFPMode"])
dishmode_graph.add_edge("OPERATE", "STANDBY_LP", commands=["SetStandbyLPMode"])
dishmode_graph.add_edge("OPERATE", "CONFIG", commands=CONFIG_COMMANDS)
# From Stow to other modes
dishmode_graph.add_edge("STOW", "STANDBY_FP", commands=["SetStandbyFPMode"])
dishmode_graph.add_edge("STOW", "STANDBY_LP", commands=["SetStandbyLPMode"])
dishmode_graph.add_edge("STOW", "CONFIG", commands=CONFIG_COMMANDS)
dishmode_graph.add_edge("STOW", "MAINTENANCE", commands=["SetMaintenanceMode"])
# From any mode to Stow
for node in DISH_MODE_NODES:
if node == "STOW":
continue
dishmode_graph.add_edge(node, "STOW", commands=["SetStowMode"])
# From any mode to Shutdown
# TODO: The shutdown command is not currently defined. Add it here
# once implemented
for node in DISH_MODE_NODES:
if node == "SHUTDOWN":
continue
dishmode_graph.add_edge(node, "SHUTDOWN")
return dishmode_graph
[docs] @typing.no_type_check
def is_command_allowed(
self,
cmd_name: str,
dish_mode: str | None = None,
component_manager: Any | None = None,
progress_callback: Callable | None = None,
) -> bool:
"""Determine if requested tango command is allowed based on current dish mode.
This method is used by the executor to evaluate the command pre-condition after it's
taken off the queue. To ensure the evaluation is always performed using an updated
component state (and not the old state used when the command is queued), the component
manager should be passed for the enqueue operation. In testing scenarios for example,
the function can be evoked directly with the dishmode parameter.
NOTE: Though the function signature has only one required argument, it still needs either
the dish_mode or component_manager passed to it to perform the evaluation.
:param cmd_name: the requested command
:param dish_mode: the current dishMode reported by the component state
:param component_manager: the component manager containing the component state
:param progress_callback: progress_callback function to report progress
:raises TypeError: when no dish_mode or component_manager is provided to function call
:return: boolean indicating the function execution is allowed
"""
try:
current_dish_mode = (
dish_mode or DishMode(component_manager.component_state["dishmode"]).name
)
except AttributeError as exc:
raise TypeError(
"is_command_allowed() requires either the dish_mode or"
" the component_manager to be specified"
) from exc
# For a call to SetMaintenanceMode, the command is allowed regardless of the current
# dish mode. SetMaintenanceMode should not be allowed in Maintenance dish mode
if cmd_name == "SetMaintenanceMode" and not current_dish_mode == "MAINTENANCE":
return True
allowed_commands = []
for from_node, to_node in self.dishmode_graph.edges(current_dish_mode):
commands = self.dishmode_graph.get_edge_data(from_node, to_node).get("commands", None)
if commands:
allowed_commands.extend(commands)
if cmd_name in allowed_commands:
return True
# report the reason for the command rejection to logs and lrc attribute
msg = (
f"{cmd_name} not allowed in {current_dish_mode} dishMode."
f" Commands allowed from {current_dish_mode} are: {allowed_commands}."
)
if component_manager:
logger = component_manager.logger
logger.debug(msg)
report_task_progress(msg, progress_callback)
return False