# -*- coding: utf-8 -*-
#
# This file is part of the SKA Control System project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
This module specifies the operational state model for the SKA control system.
It consists of:
* an underlying state machine: :py:class:`.OpStateMachine`
* an :py:class:`.OpStateModel` that maps state machine state to device
"op state". This "op state" is currently represented as a
:py:class:`tango.DevState` enum value.
"""
import logging
from typing import Any, Callable, Optional
from tango import DevState
from transitions.extensions import LockedMachine as Machine
from ska_control_model.faults import StateModelError
from ska_control_model.utils import for_testing_only
__all__ = ["OpStateModel"]
class _OpStateMachine(Machine):
"""
State machine representing operational state.
The operational state is either the state of the system under
control, or a state that indicates why the control system is not
currently monitoring the system under control.
The post-init states supported are:
* **DISABLE**: the control system has been told not to monitor the
system under control
* **UNKNOWN**: the control system is monitoring (or at least trying
to monitor) the system under control, but is unable to determine
the system under control's state. Typically, this occurs when
communication between the control system and the system under
control is temporarily disrupted.
* **OFF**: the control system is monitoring the system under
control, which is powered off.
* **STANDBY**: the control system is monitoring the system under
control. which is in low-power standby mode.
* **ON**: the control system is monitoring the system under control,
which is turned on.
* **FAULT_STANDBY**: the control system is monitoring the system
under control, which is in low-power standby mode and has
experienced a fault.
* **FAULT_ON**: the control system is monitoring the system under
control, which is turned on and has experienced a fault.
There are also corresponding initialising states: **INIT_DISABLE**,
**INIT_UNKNOWN**, **INIT_OFF**, **INIT_STANDBY**, **INIT_ON**,
**INIT_FAULT_STANDBY** and **INIT_FAULT_ON**. These states allow for
the underlying system under control to change its state during
initialisation of the control system, and for the control system to
transition to the correct state at the end of initialisation.
Finally, there is an **_UNINITIALISED** starting state, representing
a control system that hasn't started initialising yet.
The actions supported are:
* **init_invoked**: the control system has started initialising
* **init_completed**: the control system has finished initialising
* **component_disconnected**: the control system his disconnected
from the sysem under control (for example because admin mode was
set to OFFLINE). Note, this action indicates a deliberate,
control-system-initiated, disconnect; a lost connection would be
indicated by a "component_fault" or "component_unknown" action,
depending on circumstances.
* **component_unknown**: the control system is unable to determine
the state of the system under control.
* **component_off**: the system under control has been switched off
* **component_standby**: the system under control has switched to
low-power standby mode
* **component_on**: the system under control has been switched on
* **component_fault**: the system under control has experienced a
fault
* **component_no_fault**: the system under control has stopped
experiencing a fault
A diagram of the state machine is shown below. Essentially, the
machine has three "super-states", representing the control system
before, during and after initialisation. Transition between these
"super-states" is triggered by the "init_invoked" and
"init_completed" actions. In the last two "super-states", the
control system monitors the system under control and updates its
state accordingly.
.. uml:: op_state_machine.uml
:caption: Diagram of the op state machine
"""
def __init__(
self,
callback: Callable[[str], None] | None = None,
**extra_kwargs: Any,
) -> None:
"""
Initialise the state model.
:param callback: A callback to be called when a transition
implies a change to op state
:param extra_kwargs: Additional keywords arguments to pass to
superclass initialiser (useful for graphing)
"""
self._callback = callback
states = [
"_UNINITIALISED",
"INIT_DISABLE",
"INIT_UNKNOWN",
"INIT_OFF",
"INIT_STANDBY",
"INIT_ON",
"INIT_FAULT_STANDBY",
"INIT_FAULT_ON",
"DISABLE",
"UNKNOWN",
"OFF",
"STANDBY",
"ON",
"FAULT_STANDBY",
"FAULT_ON",
]
transitions = [
# Initial transition on the device starting initialisation
{
"source": "_UNINITIALISED",
"trigger": "init_invoked",
"dest": "INIT_DISABLE",
},
# Changes in the state of the monitored component
# while the device is initialising
{
"source": [
"INIT_DISABLE",
"INIT_UNKNOWN",
"INIT_OFF",
"INIT_STANDBY",
"INIT_ON",
"INIT_FAULT_STANDBY",
"INIT_FAULT_ON",
],
"trigger": "component_disconnected",
"dest": "INIT_DISABLE",
},
{
"source": [
"INIT_DISABLE",
"INIT_UNKNOWN",
"INIT_OFF",
"INIT_STANDBY",
"INIT_ON",
"INIT_FAULT_STANDBY",
"INIT_FAULT_ON",
],
"trigger": "component_unknown",
"dest": "INIT_UNKNOWN",
},
{
"source": [
"INIT_DISABLE",
"INIT_UNKNOWN",
"INIT_OFF",
"INIT_STANDBY",
"INIT_ON",
"INIT_FAULT_STANDBY",
"INIT_FAULT_ON",
],
"trigger": "component_off",
"dest": "INIT_OFF",
},
{
"source": [
"INIT_DISABLE",
"INIT_UNKNOWN",
"INIT_OFF",
"INIT_STANDBY",
"INIT_ON",
],
"trigger": "component_standby",
"dest": "INIT_STANDBY",
},
{
"source": [
"INIT_FAULT_STANDBY",
"INIT_FAULT_ON",
],
"trigger": "component_standby",
"dest": "INIT_FAULT_STANDBY",
},
{
"source": [
"INIT_DISABLE",
"INIT_UNKNOWN",
"INIT_OFF",
"INIT_STANDBY",
"INIT_ON",
],
"trigger": "component_on",
"dest": "INIT_ON",
},
{
"source": [
"INIT_FAULT_STANDBY",
"INIT_FAULT_ON",
],
"trigger": "component_on",
"dest": "INIT_FAULT_ON",
},
{
"source": ["INIT_STANDBY", "INIT_FAULT_STANDBY"],
"trigger": "component_fault",
"dest": "INIT_FAULT_STANDBY",
},
{
"source": ["INIT_ON", "INIT_FAULT_ON"],
"trigger": "component_fault",
"dest": "INIT_FAULT_ON",
},
{
"source": ["INIT_STANDBY", "INIT_FAULT_STANDBY"],
"trigger": "component_no_fault",
"dest": "INIT_STANDBY",
},
{
"source": ["INIT_ON", "INIT_FAULT_ON"],
"trigger": "component_no_fault",
"dest": "INIT_ON",
},
# Completion of initialisation
{
"source": "INIT_DISABLE",
"trigger": "init_completed",
"dest": "DISABLE",
},
{
"source": "INIT_UNKNOWN",
"trigger": "init_completed",
"dest": "UNKNOWN",
},
{
"source": "INIT_OFF",
"trigger": "init_completed",
"dest": "OFF",
},
{
"source": "INIT_STANDBY",
"trigger": "init_completed",
"dest": "STANDBY",
},
{
"source": "INIT_ON",
"trigger": "init_completed",
"dest": "ON",
},
{
"source": "INIT_FAULT_STANDBY",
"trigger": "init_completed",
"dest": "FAULT_STANDBY",
},
{
"source": "INIT_FAULT_ON",
"trigger": "init_completed",
"dest": "FAULT_ON",
},
# Changes in state of the monitored component post-initialisation
{
"source": [
"DISABLE",
"UNKNOWN",
"OFF",
"STANDBY",
"ON",
"FAULT_STANDBY",
"FAULT_ON",
],
"trigger": "component_disconnected",
"dest": "DISABLE",
},
{
"source": [
"DISABLE",
"UNKNOWN",
"OFF",
"STANDBY",
"ON",
"FAULT_STANDBY",
"FAULT_ON",
],
"trigger": "component_unknown",
"dest": "UNKNOWN",
},
{
"source": [
"DISABLE",
"UNKNOWN",
"OFF",
"STANDBY",
"ON",
"FAULT",
"FAULT_STANDBY",
"FAULT_ON",
],
"trigger": "component_off",
"dest": "OFF",
},
{
"source": ["DISABLE", "UNKNOWN", "OFF", "STANDBY", "ON"],
"trigger": "component_standby",
"dest": "STANDBY",
},
{
"source": ["FAULT_STANDBY", "FAULT_ON"],
"trigger": "component_standby",
"dest": "FAULT_STANDBY",
},
{
"source": ["DISABLE", "UNKNOWN", "OFF", "STANDBY", "ON"],
"trigger": "component_on",
"dest": "ON",
},
{
"source": ["FAULT_STANDBY", "FAULT_ON"],
"trigger": "component_on",
"dest": "FAULT_ON",
},
{
"source": ["STANDBY", "FAULT_STANDBY"],
"trigger": "component_fault",
"dest": "FAULT_STANDBY",
},
{
"source": ["ON", "FAULT_ON"],
"trigger": "component_fault",
"dest": "FAULT_ON",
},
{
"source": ["STANDBY", "FAULT_STANDBY"],
"trigger": "component_no_fault",
"dest": "STANDBY",
},
{
"source": ["ON", "FAULT_ON"],
"trigger": "component_no_fault",
"dest": "ON",
},
]
super().__init__(
states=states,
initial="_UNINITIALISED",
transitions=transitions,
after_state_change=self._state_changed,
**extra_kwargs,
)
self._state_changed()
def _state_changed(self) -> None:
"""
State machine callback that is called every time the op_state changes.
Responsible for ensuring that callbacks are called.
"""
if self._callback is not None:
self._callback(self.state)
[docs]
class OpStateModel:
"""
This class implements the state model for operational state ("opState").
The model supports the following states, represented as values of
the :py:class:`tango.DevState` enum.
* **INIT**: the control system is initialising.
* **DISABLE**: the control system has been told not to monitor the
system under control.
* **UNKNOWN**: the control system is monitoring (or at least trying
to monitor) the system under control, but is unable to determine
its state.
* **OFF**: the control system is monitoring the system under
control, which is powered off.
* **STANDBY**: the control system is monitoring the system under
control, which is in low-power standby mode.
* **ON**: the control system is monitoring the system under control,
which is turned on.
* **FAULT**: the control system is monitoring the system under
control, which has failed or is in an inconsistent state.
The actions supported are:
* **init_invoked**: the control system has started initialising.
* **init_completed**: the control system has finished initialising.
* **component_disconnected**: the control system his disconnected
from the system under control (for example because admin mode was
set to OFFLINE). Note, this action indicates a deliberate,
control-system-initiated, disconnect; a lost connection would be
indicated by a "component_unknown" action.
* **component_unknown**: the control system is unable to determine
the state of the system under control.
* **component_off**: the system under control has been switched off
* **component_standby**: the system under control has switched to
low-power standby mode
* **component_on**: the system under control has been switched on.
* **component_fault**: the system under control has experienced a
fault.
* **component_no_fault**: the system under control has stopped
experiencing a fault.
A diagram of the operational state model, as implemented, is shown
below.
.. uml:: op_state_model.uml
:caption: Diagram of the operational state model
The following hierarchical diagram is more explanatory; however note
that the implementation does *not* use a hierarchical state machine.
.. uml:: op_state_model_hierarchical.uml
:caption: Diagram of the operational state model
"""
def __init__(
self,
logger: logging.Logger,
callback: Callable[[Optional[DevState]], None] | None = None,
state_machine_factory: Callable[..., Machine] | None = None,
) -> None:
"""
Initialise the operational state model.
:param logger: the logger to be used by this state model.
:param callback: A callback to be called when the state machine
for op_state reports a change of state
:param state_machine_factory: a callable that returns a
state machine for this model to use
"""
self.logger = logger
self._op_state = None
self._callback = callback
factory = state_machine_factory or _OpStateMachine
self._op_state_machine = factory(callback=self._op_state_changed)
@property
def op_state(self) -> DevState:
"""
Return the op state.
:returns: the op state of this state model
"""
return self._op_state
_op_state_mapping = {
"_UNINITIALISED": None,
"INIT_DISABLE": DevState.INIT,
"INIT_UNKNOWN": DevState.INIT,
"INIT_OFF": DevState.INIT,
"INIT_STANDBY": DevState.INIT,
"INIT_ON": DevState.INIT,
"INIT_FAULT_STANDBY": DevState.INIT,
"INIT_FAULT_ON": DevState.INIT,
"DISABLE": DevState.DISABLE,
"UNKNOWN": DevState.UNKNOWN,
"OFF": DevState.OFF,
"STANDBY": DevState.STANDBY,
"ON": DevState.ON,
"FAULT_STANDBY": DevState.FAULT,
"FAULT_ON": DevState.FAULT,
}
def _op_state_changed(self, machine_state: str) -> None:
"""
Handle change of operational state.
This is a helper method that updates op_state, ensuring that the
callback is called if one exists.
:param machine_state: the new state of the operation state
machine
"""
op_state = self._op_state_mapping[machine_state]
if self._op_state != op_state:
self._op_state = op_state
if self._callback is not None:
self._callback(op_state)
def is_action_allowed(self, action: str, raise_if_disallowed: bool = False) -> bool:
"""
Return whether a given action is allowed in the current state.
:param action: an action, as given in the transitions table
:param raise_if_disallowed: whether to raise an exception if the
action is disallowed, or merely return False (optional,
defaults to False)
:raises StateModelError: if the action is unknown to the state
machine
:return: whether the action is allowed in the current state
"""
if action in self._op_state_machine.get_triggers(self._op_state_machine.state):
return True
if raise_if_disallowed:
raise StateModelError(
f"Action {action} is not allowed in op_state {self.op_state}."
)
return False
def perform_action(self, action: str) -> None:
"""
Perform an action on the state model.
:param action: an action, as given in the transitions table
"""
_ = self.is_action_allowed(action, raise_if_disallowed=True)
self._op_state_machine.trigger(action)
@for_testing_only
def _straight_to_state(self, op_state_name: str) -> None:
"""
Take this op state model straight to the specified underlying op state.
This method exists to simplify testing; for example, if testing
that a command may be run in a given op state, you can push
this state model straight to that op state, rather than having
to drive it to that state through a sequence of actions. It is
not intended that this method would be called outside of test
setups. A warning will be raised if it is.
The state must be provided as the string name of a state in the
underlying :py:class:`.OpStateMachine`. This machine has more
specific states, allowing for more flexibility in test setup.
Specifically, there are "DISABLE", "UNKNOWN", "OFF", "STANDBY",
"ON" and "FAULT" states, representing states of an initialised
device. But instead of a single "INIT" state, there are
"INIT_DISABLE", "INIT_UNKNOWN", "INIT_OFF", "INIT_STANDBY",
"INIT_ON" and "INIT_FAULT" states, representing an initialised
device with the monitored component in a given state.
For example, to test that a device transitions out of INIT into
STANDBY when the component that it monitors is in low-power
standby mode:
.. code-block:: py
model = OpStateModel(logger)
model._straight_to_state("INIT_STANDBY")
assert model.op_state == DevState.INIT
model.perform_action("init_completed")
assert model.op_state == DevState.STANDBY
:param op_state_name: the name of a target op state, as used by
the underlying :py:class:`.OpStateMachine`.
"""
getattr(self._op_state_machine, f"to_{op_state_name}")()