Source code for ska_control_model.op_state

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Control System project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
This module specifies the operational state model for the SKA control system.

It consists of:

* an underlying state machine: :py:class:`.OpStateMachine`
* an :py:class:`.OpStateModel` that maps state machine state to device
  "op state". This "op state" is currently represented as a
  :py:class:`tango.DevState` enum value.
"""
import logging
from typing import Any, Callable, Optional

from tango import DevState
from transitions.extensions import LockedMachine as Machine

from ska_control_model.faults import StateModelError
from ska_control_model.utils import for_testing_only

__all__ = ["OpStateModel"]


class _OpStateMachine(Machine):
    """
    State machine representing operational state.

    The operational state is either the state of the system under
    control, or a state that indicates why the control system is not
    currently monitoring the system under control.

    The post-init states supported are:

    * **DISABLE**: the control system has been told not to monitor the
      system under control
    * **UNKNOWN**: the control system is monitoring (or at least trying
      to monitor) the system under control, but is unable to determine
      the system under control's state. Typically, this occurs when
      communication between the control system and the system under
      control is temporarily disrupted.
    * **OFF**: the control system is monitoring the system under
      control, which is powered off.
    * **STANDBY**: the control system is monitoring the system under
      control. which is in low-power standby mode.
    * **ON**: the control system is monitoring the system under control,
      which is turned on.
    * **FAULT_STANDBY**: the control system is monitoring the system
      under control, which is in low-power standby mode and has
      experienced a fault.
    * **FAULT_ON**: the control system is monitoring the system under
      control, which is turned on and has experienced a fault.

    There are also corresponding initialising states: **INIT_DISABLE**,
    **INIT_UNKNOWN**, **INIT_OFF**, **INIT_STANDBY**, **INIT_ON**,
    **INIT_FAULT_STANDBY** and **INIT_FAULT_ON**. These states allow for
    the underlying system under control to change its state during
    initialisation of the control system, and for the control system to
    transition to the correct state at the end of initialisation.

    Finally, there is an **_UNINITIALISED** starting state, representing
    a control system that hasn't started initialising yet.

    The actions supported are:

    * **init_invoked**: the control system has started initialising
    * **init_completed**: the control system has finished initialising
    * **component_disconnected**: the control system his disconnected
      from the sysem under control (for example because admin mode was
      set to OFFLINE). Note, this action indicates a deliberate,
      control-system-initiated, disconnect; a lost connection would be
      indicated by a "component_fault" or "component_unknown" action,
      depending on circumstances.
    * **component_unknown**: the control system is unable to determine
      the state of the system under control.
    * **component_off**: the system under control has been switched off
    * **component_standby**: the system under control has switched to
      low-power standby mode
    * **component_on**: the system under control has been switched on
    * **component_fault**: the system under control has experienced a
      fault
    * **component_no_fault**: the system under control has stopped
      experiencing a fault

    A diagram of the state machine is shown below. Essentially, the
    machine has three "super-states", representing the control system
    before, during and after initialisation. Transition between these
    "super-states" is triggered by the "init_invoked" and
    "init_completed" actions. In the last two "super-states", the
    control system monitors the system under control and updates its
    state accordingly.

    .. uml:: op_state_machine.uml
       :caption: Diagram of the op state machine

    """

    def __init__(
        self,
        callback: Callable[[str], None] | None = None,
        **extra_kwargs: Any,
    ) -> None:
        """
        Initialise the state model.

        :param callback: A callback to be called when a transition
            implies a change to op state
        :param extra_kwargs: Additional keywords arguments to pass to
            superclass initialiser (useful for graphing)
        """
        self._callback = callback

        states = [
            "_UNINITIALISED",
            "INIT_DISABLE",
            "INIT_UNKNOWN",
            "INIT_OFF",
            "INIT_STANDBY",
            "INIT_ON",
            "INIT_FAULT_STANDBY",
            "INIT_FAULT_ON",
            "DISABLE",
            "UNKNOWN",
            "OFF",
            "STANDBY",
            "ON",
            "FAULT_STANDBY",
            "FAULT_ON",
        ]

        transitions = [
            # Initial transition on the device starting initialisation
            {
                "source": "_UNINITIALISED",
                "trigger": "init_invoked",
                "dest": "INIT_DISABLE",
            },
            # Changes in the state of the monitored component
            # while the device is initialising
            {
                "source": [
                    "INIT_DISABLE",
                    "INIT_UNKNOWN",
                    "INIT_OFF",
                    "INIT_STANDBY",
                    "INIT_ON",
                    "INIT_FAULT_STANDBY",
                    "INIT_FAULT_ON",
                ],
                "trigger": "component_disconnected",
                "dest": "INIT_DISABLE",
            },
            {
                "source": [
                    "INIT_DISABLE",
                    "INIT_UNKNOWN",
                    "INIT_OFF",
                    "INIT_STANDBY",
                    "INIT_ON",
                    "INIT_FAULT_STANDBY",
                    "INIT_FAULT_ON",
                ],
                "trigger": "component_unknown",
                "dest": "INIT_UNKNOWN",
            },
            {
                "source": [
                    "INIT_DISABLE",
                    "INIT_UNKNOWN",
                    "INIT_OFF",
                    "INIT_STANDBY",
                    "INIT_ON",
                    "INIT_FAULT_STANDBY",
                    "INIT_FAULT_ON",
                ],
                "trigger": "component_off",
                "dest": "INIT_OFF",
            },
            {
                "source": [
                    "INIT_DISABLE",
                    "INIT_UNKNOWN",
                    "INIT_OFF",
                    "INIT_STANDBY",
                    "INIT_ON",
                ],
                "trigger": "component_standby",
                "dest": "INIT_STANDBY",
            },
            {
                "source": [
                    "INIT_FAULT_STANDBY",
                    "INIT_FAULT_ON",
                ],
                "trigger": "component_standby",
                "dest": "INIT_FAULT_STANDBY",
            },
            {
                "source": [
                    "INIT_DISABLE",
                    "INIT_UNKNOWN",
                    "INIT_OFF",
                    "INIT_STANDBY",
                    "INIT_ON",
                ],
                "trigger": "component_on",
                "dest": "INIT_ON",
            },
            {
                "source": [
                    "INIT_FAULT_STANDBY",
                    "INIT_FAULT_ON",
                ],
                "trigger": "component_on",
                "dest": "INIT_FAULT_ON",
            },
            {
                "source": ["INIT_STANDBY", "INIT_FAULT_STANDBY"],
                "trigger": "component_fault",
                "dest": "INIT_FAULT_STANDBY",
            },
            {
                "source": ["INIT_ON", "INIT_FAULT_ON"],
                "trigger": "component_fault",
                "dest": "INIT_FAULT_ON",
            },
            {
                "source": ["INIT_STANDBY", "INIT_FAULT_STANDBY"],
                "trigger": "component_no_fault",
                "dest": "INIT_STANDBY",
            },
            {
                "source": ["INIT_ON", "INIT_FAULT_ON"],
                "trigger": "component_no_fault",
                "dest": "INIT_ON",
            },
            # Completion of initialisation
            {
                "source": "INIT_DISABLE",
                "trigger": "init_completed",
                "dest": "DISABLE",
            },
            {
                "source": "INIT_UNKNOWN",
                "trigger": "init_completed",
                "dest": "UNKNOWN",
            },
            {
                "source": "INIT_OFF",
                "trigger": "init_completed",
                "dest": "OFF",
            },
            {
                "source": "INIT_STANDBY",
                "trigger": "init_completed",
                "dest": "STANDBY",
            },
            {
                "source": "INIT_ON",
                "trigger": "init_completed",
                "dest": "ON",
            },
            {
                "source": "INIT_FAULT_STANDBY",
                "trigger": "init_completed",
                "dest": "FAULT_STANDBY",
            },
            {
                "source": "INIT_FAULT_ON",
                "trigger": "init_completed",
                "dest": "FAULT_ON",
            },
            # Changes in state of the monitored component post-initialisation
            {
                "source": [
                    "DISABLE",
                    "UNKNOWN",
                    "OFF",
                    "STANDBY",
                    "ON",
                    "FAULT_STANDBY",
                    "FAULT_ON",
                ],
                "trigger": "component_disconnected",
                "dest": "DISABLE",
            },
            {
                "source": [
                    "DISABLE",
                    "UNKNOWN",
                    "OFF",
                    "STANDBY",
                    "ON",
                    "FAULT_STANDBY",
                    "FAULT_ON",
                ],
                "trigger": "component_unknown",
                "dest": "UNKNOWN",
            },
            {
                "source": [
                    "DISABLE",
                    "UNKNOWN",
                    "OFF",
                    "STANDBY",
                    "ON",
                    "FAULT",
                    "FAULT_STANDBY",
                    "FAULT_ON",
                ],
                "trigger": "component_off",
                "dest": "OFF",
            },
            {
                "source": ["DISABLE", "UNKNOWN", "OFF", "STANDBY", "ON"],
                "trigger": "component_standby",
                "dest": "STANDBY",
            },
            {
                "source": ["FAULT_STANDBY", "FAULT_ON"],
                "trigger": "component_standby",
                "dest": "FAULT_STANDBY",
            },
            {
                "source": ["DISABLE", "UNKNOWN", "OFF", "STANDBY", "ON"],
                "trigger": "component_on",
                "dest": "ON",
            },
            {
                "source": ["FAULT_STANDBY", "FAULT_ON"],
                "trigger": "component_on",
                "dest": "FAULT_ON",
            },
            {
                "source": ["STANDBY", "FAULT_STANDBY"],
                "trigger": "component_fault",
                "dest": "FAULT_STANDBY",
            },
            {
                "source": ["ON", "FAULT_ON"],
                "trigger": "component_fault",
                "dest": "FAULT_ON",
            },
            {
                "source": ["STANDBY", "FAULT_STANDBY"],
                "trigger": "component_no_fault",
                "dest": "STANDBY",
            },
            {
                "source": ["ON", "FAULT_ON"],
                "trigger": "component_no_fault",
                "dest": "ON",
            },
        ]

        super().__init__(
            states=states,
            initial="_UNINITIALISED",
            transitions=transitions,
            after_state_change=self._state_changed,
            **extra_kwargs,
        )
        self._state_changed()

    def _state_changed(self) -> None:
        """
        State machine callback that is called every time the op_state changes.

        Responsible for ensuring that callbacks are called.
        """
        if self._callback is not None:
            self._callback(self.state)


[docs] class OpStateModel: """ This class implements the state model for operational state ("opState"). The model supports the following states, represented as values of the :py:class:`tango.DevState` enum. * **INIT**: the control system is initialising. * **DISABLE**: the control system has been told not to monitor the system under control. * **UNKNOWN**: the control system is monitoring (or at least trying to monitor) the system under control, but is unable to determine its state. * **OFF**: the control system is monitoring the system under control, which is powered off. * **STANDBY**: the control system is monitoring the system under control, which is in low-power standby mode. * **ON**: the control system is monitoring the system under control, which is turned on. * **FAULT**: the control system is monitoring the system under control, which has failed or is in an inconsistent state. The actions supported are: * **init_invoked**: the control system has started initialising. * **init_completed**: the control system has finished initialising. * **component_disconnected**: the control system his disconnected from the system under control (for example because admin mode was set to OFFLINE). Note, this action indicates a deliberate, control-system-initiated, disconnect; a lost connection would be indicated by a "component_unknown" action. * **component_unknown**: the control system is unable to determine the state of the system under control. * **component_off**: the system under control has been switched off * **component_standby**: the system under control has switched to low-power standby mode * **component_on**: the system under control has been switched on. * **component_fault**: the system under control has experienced a fault. * **component_no_fault**: the system under control has stopped experiencing a fault. A diagram of the operational state model, as implemented, is shown below. .. uml:: op_state_model.uml :caption: Diagram of the operational state model The following hierarchical diagram is more explanatory; however note that the implementation does *not* use a hierarchical state machine. .. uml:: op_state_model_hierarchical.uml :caption: Diagram of the operational state model """ def __init__( self, logger: logging.Logger, callback: Callable[[Optional[DevState]], None] | None = None, state_machine_factory: Callable[..., Machine] | None = None, ) -> None: """ Initialise the operational state model. :param logger: the logger to be used by this state model. :param callback: A callback to be called when the state machine for op_state reports a change of state :param state_machine_factory: a callable that returns a state machine for this model to use """ self.logger = logger self._op_state = None self._callback = callback factory = state_machine_factory or _OpStateMachine self._op_state_machine = factory(callback=self._op_state_changed) @property def op_state(self) -> DevState: """ Return the op state. :returns: the op state of this state model """ return self._op_state _op_state_mapping = { "_UNINITIALISED": None, "INIT_DISABLE": DevState.INIT, "INIT_UNKNOWN": DevState.INIT, "INIT_OFF": DevState.INIT, "INIT_STANDBY": DevState.INIT, "INIT_ON": DevState.INIT, "INIT_FAULT_STANDBY": DevState.INIT, "INIT_FAULT_ON": DevState.INIT, "DISABLE": DevState.DISABLE, "UNKNOWN": DevState.UNKNOWN, "OFF": DevState.OFF, "STANDBY": DevState.STANDBY, "ON": DevState.ON, "FAULT_STANDBY": DevState.FAULT, "FAULT_ON": DevState.FAULT, } def _op_state_changed(self, machine_state: str) -> None: """ Handle change of operational state. This is a helper method that updates op_state, ensuring that the callback is called if one exists. :param machine_state: the new state of the operation state machine """ op_state = self._op_state_mapping[machine_state] if self._op_state != op_state: self._op_state = op_state if self._callback is not None: self._callback(op_state) def is_action_allowed(self, action: str, raise_if_disallowed: bool = False) -> bool: """ Return whether a given action is allowed in the current state. :param action: an action, as given in the transitions table :param raise_if_disallowed: whether to raise an exception if the action is disallowed, or merely return False (optional, defaults to False) :raises StateModelError: if the action is unknown to the state machine :return: whether the action is allowed in the current state """ if action in self._op_state_machine.get_triggers(self._op_state_machine.state): return True if raise_if_disallowed: raise StateModelError( f"Action {action} is not allowed in op_state {self.op_state}." ) return False def perform_action(self, action: str) -> None: """ Perform an action on the state model. :param action: an action, as given in the transitions table """ _ = self.is_action_allowed(action, raise_if_disallowed=True) self._op_state_machine.trigger(action) @for_testing_only def _straight_to_state(self, op_state_name: str) -> None: """ Take this op state model straight to the specified underlying op state. This method exists to simplify testing; for example, if testing that a command may be run in a given op state, you can push this state model straight to that op state, rather than having to drive it to that state through a sequence of actions. It is not intended that this method would be called outside of test setups. A warning will be raised if it is. The state must be provided as the string name of a state in the underlying :py:class:`.OpStateMachine`. This machine has more specific states, allowing for more flexibility in test setup. Specifically, there are "DISABLE", "UNKNOWN", "OFF", "STANDBY", "ON" and "FAULT" states, representing states of an initialised device. But instead of a single "INIT" state, there are "INIT_DISABLE", "INIT_UNKNOWN", "INIT_OFF", "INIT_STANDBY", "INIT_ON" and "INIT_FAULT" states, representing an initialised device with the monitored component in a given state. For example, to test that a device transitions out of INIT into STANDBY when the component that it monitors is in low-power standby mode: .. code-block:: py model = OpStateModel(logger) model._straight_to_state("INIT_STANDBY") assert model.op_state == DevState.INIT model.perform_action("init_completed") assert model.op_state == DevState.STANDBY :param op_state_name: the name of a target op state, as used by the underlying :py:class:`.OpStateMachine`. """ getattr(self._op_state_machine, f"to_{op_state_name}")()