# -*- coding: utf-8 -*-
#
# This file is part of the SKA Control System project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""This module defines an enumerated type for observing state."""
import enum
import logging
from typing import Any, Callable, Final
from transitions.extensions import LockedMachine as Machine
from ska_control_model.faults import StateModelError
from ska_control_model.utils import for_testing_only
[docs]
class ObsState(enum.IntEnum):
"""Python enumerated type for observing state."""
EMPTY = 0
"""The sub-array has no resources allocated and is unconfigured."""
RESOURCING = 1
"""
Resources are being allocated to, or deallocated from, the subarray.
In normal science operations these will be the resources required
for the upcoming SBI execution.
This may be a complete de/allocation, or it may be incremental. In
both cases it is a transient state; when the resourcing operation
completes, the subarray will automatically transition to EMPTY or
IDLE, according to whether the subarray ended up having resources or
not.
For some subsystems this may be a very brief state if resourcing is
a quick activity.
"""
IDLE = 2
"""The subarray has resources allocated but is unconfigured."""
CONFIGURING = 3
"""
The subarray is being configured for an observation.
This is a transient state; the subarray will automatically
transition to READY when configuring completes normally.
"""
READY = 4
"""
The subarray is fully prepared to scan, but is not scanning.
It may be tracked, but it is not moving in the observed coordinate
system, nor is it taking data.
"""
SCANNING = 5
"""
The subarray is scanning.
It is taking data and, if needed, all components are synchronously
moving in the observed coordinate system.
Any changes to the sub-systems are happening automatically (this
allows for a scan to cover the case where the phase centre is moved
in a pre-defined pattern).
"""
ABORTING = 6
"""The subarray has been interrupted and is aborting what it was doing."""
ABORTED = 7
"""The subarray is in an aborted state."""
RESETTING = 8
"""The subarray device is resetting to a base (EMPTY or IDLE) state."""
FAULT = 9
"""The subarray has detected an error in its observing state."""
RESTARTING = 10
"""
The subarray device is restarting.
After restarting, the subarray will return to EMPTY state, with no
allocated resources and no configuration defined.
"""
class _ObsStateMachine(Machine):
"""
State machine for observation state.
The machine implemented is essentially as agreed in ADR-8, but
modified to allow abort from RESOURCING.
Also, some states are broken down into sub-states to account for the
interactions between commands and monitoring of the underlying
component.
For example, ADR-8 says that a configuring subarray moves from IDLE
to CONFIGURING to READY. But in a device model where the state
machine is responsive to both commands and changes to the monitored
component, the sequence is better represented as follows:
1. The Configure() command triggers the "configure_invoked" action
on the state machine, resulting in a transition from IDLE to
CONFIGURING_IDLE
2. The Configure() command invokes methods on its component in order
to effect configuration. At some point in this process, the
component triggers the "component_configured" action on the
state machine, resulting in a transition from CONFIGURING_IDLE to
CONFIGURING_READY.
3. At completion of configuration, the action "configure_completed"
is triggered on the state machine, resulting in a transition from
CONFIGURING_READY to READY.
Thus, this machine contains substates CONFIGURING_IDLE and
CONFIGURING_READY, rather than the ADR-8 state CONFIGURING
The full list of supported states are:
* **EMPTY**: the subarray is unresourced
* **RESOURCING_EMPTY**: the subarray is unresourced, but performing
a resourcing operation
* **RESOURCING_IDLE**: the subarray is resourced, and currently
performing a resourcing operation
* **IDLE**: the subarray is resourced but unconfigured
* **CONFIGURING_IDLE**: the subarray is resourced but unconfigured;
it is currently performing a configuring operation
* **CONFIGURING_READY**: the subarray is resourced and configured;
it is currently performing a configuring operation
* **READY**: the subarray is resourced and configured
* **SCANNING**: the subarray is scanning
* **ABORTING**: the subarray is aborting
* **ABORTING_EMPTY**: the subarray is aborting (while unresourced)
* **ABORTED**: the subarray has aborted
* **ABORTED_EMPTY**: the subarray has aborted (while unresourced)
* **RESETTING**: the subarray is resetting from an ABORTED or FAULT
state back to IDLE
* **RESETTING_EMPTY**: the subarray is unresourced, and is resetting
from an ABORTED_EMPTY or FAULT_EMPTY state back to EMPTY
* **RESTARTING**: the subarray is restarting from an ABORTED or
FAULT (or ABORTED_EMPTY or FAULT_EMPTY) state back to EMPTY
* **FAULT**: the subarray has encountered an observation fault.
* **FAULT_EMPTY**: the subarray has encountered an observation fault
(whilst unresourced).
The actions supported divide into command-oriented actions and
component monitoring actions.
The command-oriented actions are:
* **assign_invoked** and **assign_completed**: bookending the
AssignResources() command, and hence the RESOURCING transitional
state
* **release_invoked** and **release_completed**: bookending the
ReleaseResources() and ReleaseAllResources() commands, hence the
RESOURCING transitional state
* **configure_invoked** and **configure_completed**: bookending the
Configure() command, and hence the CONFIGURING state
* **abort_invoked** and **abort_completed**: bookending the Abort()
command, and hence the ABORTING state
* **obsreset_invoked** and **obsreset_completed**: bookending the
ObsReset() command, and hence the OBSRESETTING state
* **restart_invoked** and **restart_completed**: bookending the
Restart() command, and hence the RESTARTING state
* **end_invoked**, **scan_invoked**, **end_scan_invoked**: these
result in reflexive transitions, and are purely there to indicate
states in which the End(), Scan() and EndScan() commands are
permitted to be run
The component-oriented actions are:
* **component_obsfault**: the monitored component has experienced an
observation fault
* **component_unresourced**: the monitored component has become
unresourced
* **component_resourced**: the monitored component has become
resourced
* **component_unconfigured**: the monitored component has become
unconfigured
* **component_configured**: the monitored component has become
configured
* **component_scanning**: the monitored component has started
scanning
* **component_not_scanning**: the monitored component has stopped
scanning
A diagram of the state machine is shown below. Reflexive transitions
and transitions to FAULT obs state are omitted to simplify the
diagram.
.. uml:: obs_state_machine.uml
:caption: Diagram of the subarray obs state machine
"""
def __init__(
self,
callback: Callable[[str], None] | None = None,
**extra_kwargs: Any,
) -> None:
"""
Initialise the model.
:param callback: A callback to be called when the state changes
:param extra_kwargs: Additional keywords arguments to pass to
super class initialiser (useful for graphing)
"""
self._callback = callback
states = [
"EMPTY",
"RESOURCING_EMPTY",
"RESOURCING_IDLE",
"IDLE",
"CONFIGURING_IDLE",
"CONFIGURING_READY",
"READY",
"SCANNING",
"ABORTING",
"ABORTING_EMPTY",
"ABORTED",
"ABORTED_EMPTY",
"RESETTING",
"RESETTING_EMPTY",
"RESTARTING",
"FAULT",
"FAULT_EMPTY",
]
transitions = [
{
"source": [
"EMPTY",
"RESOURCING_EMPTY",
"ABORTING_EMPTY",
"ABORTED_EMPTY",
"RESETTING_EMPTY",
"FAULT_EMPTY",
],
"trigger": "component_obsfault",
"dest": "FAULT_EMPTY",
},
{
"source": [
"RESOURCING_IDLE",
"IDLE",
"CONFIGURING_IDLE",
"CONFIGURING_READY",
"READY",
"SCANNING",
"ABORTING",
"ABORTED",
"RESETTING",
"RESTARTING",
"FAULT",
],
"trigger": "component_obsfault",
"dest": "FAULT",
},
{
"source": "EMPTY",
"trigger": "assign_invoked",
"dest": "RESOURCING_EMPTY",
},
{
"source": "EMPTY",
"trigger": "release_invoked",
"dest": "RESOURCING_EMPTY",
},
{
"source": "IDLE",
"trigger": "assign_invoked",
"dest": "RESOURCING_IDLE",
},
{
"source": "IDLE",
"trigger": "release_invoked",
"dest": "RESOURCING_IDLE",
},
{
"source": "RESOURCING_EMPTY",
"trigger": "component_resourced",
"dest": "RESOURCING_IDLE",
},
{
"source": "RESOURCING_IDLE",
"trigger": "component_unresourced",
"dest": "RESOURCING_EMPTY",
},
{
"source": "RESOURCING_EMPTY",
"trigger": "assign_completed",
"dest": "EMPTY",
},
{
"source": "RESOURCING_EMPTY",
"trigger": "release_completed",
"dest": "EMPTY",
},
{
"source": "RESOURCING_IDLE",
"trigger": "assign_completed",
"dest": "IDLE",
},
{
"source": "RESOURCING_IDLE",
"trigger": "release_completed",
"dest": "IDLE",
},
{
"source": "IDLE",
"trigger": "configure_invoked",
"dest": "CONFIGURING_IDLE",
},
{
"source": "CONFIGURING_IDLE",
"trigger": "configure_completed",
"dest": "IDLE",
},
{
"source": "READY",
"trigger": "configure_invoked",
"dest": "CONFIGURING_READY",
},
{
"source": "CONFIGURING_IDLE",
"trigger": "component_configured",
"dest": "CONFIGURING_READY",
},
{
"source": "CONFIGURING_READY",
"trigger": "configure_completed",
"dest": "READY",
},
{
"source": "READY",
"trigger": "end_invoked",
"dest": "READY",
},
{
"source": "READY",
"trigger": "component_unconfigured",
"dest": "IDLE",
},
{
"source": "READY",
"trigger": "scan_invoked",
"dest": "READY",
},
{
"source": "READY",
"trigger": "component_scanning",
"dest": "SCANNING",
},
{
"source": "SCANNING",
"trigger": "end_scan_invoked",
"dest": "SCANNING",
},
{
"source": "SCANNING",
"trigger": "component_not_scanning",
"dest": "READY",
},
{
"source": ["RESOURCING_EMPTY", "RESETTING_EMPTY"],
"trigger": "abort_invoked",
"dest": "ABORTING_EMPTY",
},
{
"source": [
"RESOURCING_IDLE",
"IDLE",
"CONFIGURING_IDLE",
"CONFIGURING_READY",
"READY",
"SCANNING",
"RESETTING",
],
"trigger": "abort_invoked",
"dest": "ABORTING",
},
# Aborting implies trying to stop the monitored component
# while it is doing something. Thus the monitored component
# may send some events while in aborting state.
{
"source": "ABORTING_EMPTY",
"trigger": "component_resourced",
"dest": "ABORTING",
},
{
"source": "ABORTING",
"trigger": "component_unresourced",
"dest": "ABORTING_EMPTY",
},
{
"source": "ABORTING",
"trigger": "component_unconfigured",
"dest": "ABORTING",
},
{
"source": "ABORTING",
"trigger": "component_configured",
"dest": "ABORTING",
},
{
"source": "ABORTING",
"trigger": "component_not_scanning",
"dest": "ABORTING",
},
{
"source": "ABORTING",
"trigger": "component_scanning",
"dest": "ABORTING",
},
{
"source": "ABORTING_EMPTY",
"trigger": "abort_completed",
"dest": "ABORTED_EMPTY",
},
{
"source": "ABORTING",
"trigger": "abort_completed",
"dest": "ABORTED",
},
{
"source": ["ABORTED_EMPTY", "FAULT_EMPTY"],
"trigger": "obsreset_invoked",
"dest": "RESETTING_EMPTY",
},
{
"source": ["ABORTED", "FAULT"],
"trigger": "obsreset_invoked",
"dest": "RESETTING",
},
{
"source": "RESETTING",
"trigger": "component_unconfigured",
"dest": "RESETTING",
},
{
"source": "RESETTING",
"trigger": "obsreset_completed",
"dest": "IDLE",
},
{
"source": "RESETTING_EMPTY",
"trigger": "obsreset_completed",
"dest": "EMPTY",
},
{
"source": ["ABORTED", "ABORTED_EMPTY", "FAULT", "FAULT_EMPTY", "EMPTY"],
"trigger": "restart_invoked",
"dest": "RESTARTING",
},
{
"source": "RESTARTING",
"trigger": "component_unconfigured",
"dest": "RESTARTING",
},
{
"source": "RESTARTING",
"trigger": "component_unresourced",
"dest": "RESTARTING",
},
{
"source": "RESTARTING",
"trigger": "restart_completed",
"dest": "EMPTY",
},
]
super().__init__(
states=states,
initial="EMPTY",
transitions=transitions,
after_state_change=self._state_changed,
**extra_kwargs,
)
self._state_changed()
def _state_changed(self) -> None:
"""
State machine callback that is called every time the obs_state changes.
Responsible for ensuring that callbacks are called.
"""
if self._callback is not None:
self._callback(self.state)
class ObsStateModel:
"""
Implements the observation state model for subarray.
The model supports all of the states of the
:py:class:`~ska_control_model.obs_state.ObsState` enum:
* **EMPTY**: the subarray is unresourced
* **RESOURCING**: the subarray is performing a resourcing operation
* **IDLE**: the subarray is resourced but unconfigured
* **CONFIGURING**: the subarray is performing a configuring
operation
* **READY**: the subarray is resourced and configured
* **SCANNING**: the subarray is scanning
* **ABORTING**: the subarray is aborting
* **ABORTED**: the subarray has aborted
* **RESETTING**: the subarray is resetting from an ABORTED or FAULT
state back to IDLE
* **RESTARTING**: the subarray is restarting from an ABORTED or
FAULT state back to EMPTY
* **FAULT**: the subarray has encountered a observation fault.
A diagram of the subarray observation state model is shown below.
This model is non-deterministic as diagrammed, but the underlying
state machines has extra states and transitions that render it
deterministic. This class simply maps those extra classes onto
valid ObsState values.
.. uml:: obs_state_model.uml
:caption: Diagram of the subarray observation state model
"""
_OBS_STATE_MAPPING: Final = {
"EMPTY": ObsState.EMPTY,
"RESOURCING_EMPTY": ObsState.RESOURCING,
"RESOURCING_IDLE": ObsState.RESOURCING,
"IDLE": ObsState.IDLE,
"CONFIGURING_IDLE": ObsState.CONFIGURING,
"CONFIGURING_READY": ObsState.CONFIGURING,
"READY": ObsState.READY,
"SCANNING": ObsState.SCANNING,
"ABORTING": ObsState.ABORTING,
"ABORTING_EMPTY": ObsState.ABORTING,
"ABORTED": ObsState.ABORTED,
"ABORTED_EMPTY": ObsState.ABORTED,
"RESETTING": ObsState.RESETTING,
"RESETTING_EMPTY": ObsState.RESETTING,
"RESTARTING": ObsState.RESTARTING,
"FAULT": ObsState.FAULT,
"FAULT_EMPTY": ObsState.FAULT,
}
def __init__(
self,
logger: logging.Logger,
callback: Callable[[ObsState], None] | None = None,
state_machine_factory: Callable[..., Machine] = _ObsStateMachine,
) -> None:
"""
Initialise the model.
:param logger: the logger to be used by this state model.
:param callback: A callback to be called when a transition
causes a change to device obs_state
:param state_machine_factory: a callable that returns a
state machine for this model to use
"""
self.logger = logger
self._obs_state: ObsState | None = None
self._callback = callback
self._obs_state_machine = state_machine_factory(
callback=self._obs_state_changed
)
@property
def obs_state(self) -> ObsState | None:
"""
Return the obs_state.
:returns: obs_state of this state model
"""
return self._obs_state
def _obs_state_changed(self, machine_state: str) -> None:
"""
Handle change in observation state.
This is a helper method that updates obs_state, ensuring that
the callback is called if one exists.
:param machine_state: the new state of the observation state
machine
"""
obs_state = self._OBS_STATE_MAPPING[machine_state]
if self._obs_state != obs_state:
self._obs_state = obs_state
if self._callback is not None:
self._callback(obs_state)
def is_action_allowed(self, action: str, raise_if_disallowed: bool = False) -> bool:
"""
Return whether a given action is allowed in the current state.
:param action: an action, as given in the transitions table
:param raise_if_disallowed: whether to raise an exception if the
action is disallowed, or merely return False (optional,
defaults to False)
:raises StateModelError: if the action is unknown to the state
machine
:return: whether the action is allowed in the current state
"""
if action in self._obs_state_machine.get_triggers(
self._obs_state_machine.state
):
return True
if raise_if_disallowed:
raise StateModelError(
f"Action {action} is not allowed in obs state "
f"{'None' if self.obs_state is None else self.obs_state.name}."
)
return False
def perform_action(self, action: str) -> None:
"""
Perform an action on the state model.
:param action: an action, as given in the transitions table
"""
_ = self.is_action_allowed(action, raise_if_disallowed=True)
self._obs_state_machine.trigger(action)
@for_testing_only
def _straight_to_state(self, obs_state_name: ObsState) -> None:
"""
Take this model straight to the specified state.
This method exists to simplify testing; for example, if testing
that a command may be run in a given ObsState, one can push this
state model straight to that ObsState, rather than having to
drive it to that state through a sequence of actions. It is not
intended that this method would be called outside of test
setups. A warning will be raised if it is.
For example, to test that a device transitions from SCANNING to
ABORTING when the Abort() command is called:
.. code-block:: py
model = ObservationStateModel(logger)
model._straight_to_state("SCANNING")
assert model.obs_state == ObsState.SCANNING
model.perform_action("abort_invoked")
assert model.obs_state == ObsState.ABORTING
:param obs_state_name: the target obs_state
"""
getattr(self._obs_state_machine, f"to_{obs_state_name}")()