Source code for ska_control_model.obs_state

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Control System project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""This module defines an enumerated type for observing state."""

import enum
import logging
from typing import Any, Callable, Final

from transitions.extensions import LockedMachine as Machine

from ska_control_model.faults import StateModelError
from ska_control_model.utils import for_testing_only


[docs] class ObsState(enum.IntEnum): """Python enumerated type for observing state.""" EMPTY = 0 """The sub-array has no resources allocated and is unconfigured.""" RESOURCING = 1 """ Resources are being allocated to, or deallocated from, the subarray. In normal science operations these will be the resources required for the upcoming SBI execution. This may be a complete de/allocation, or it may be incremental. In both cases it is a transient state; when the resourcing operation completes, the subarray will automatically transition to EMPTY or IDLE, according to whether the subarray ended up having resources or not. For some subsystems this may be a very brief state if resourcing is a quick activity. """ IDLE = 2 """The subarray has resources allocated but is unconfigured.""" CONFIGURING = 3 """ The subarray is being configured for an observation. This is a transient state; the subarray will automatically transition to READY when configuring completes normally. """ READY = 4 """ The subarray is fully prepared to scan, but is not scanning. It may be tracked, but it is not moving in the observed coordinate system, nor is it taking data. """ SCANNING = 5 """ The subarray is scanning. It is taking data and, if needed, all components are synchronously moving in the observed coordinate system. Any changes to the sub-systems are happening automatically (this allows for a scan to cover the case where the phase centre is moved in a pre-defined pattern). """ ABORTING = 6 """The subarray has been interrupted and is aborting what it was doing.""" ABORTED = 7 """The subarray is in an aborted state.""" RESETTING = 8 """The subarray device is resetting to a base (EMPTY or IDLE) state.""" FAULT = 9 """The subarray has detected an error in its observing state.""" RESTARTING = 10 """ The subarray device is restarting. After restarting, the subarray will return to EMPTY state, with no allocated resources and no configuration defined. """
class _ObsStateMachine(Machine): """ State machine for observation state. The machine implemented is essentially as agreed in ADR-8, but modified to allow abort from RESOURCING. Also, some states are broken down into sub-states to account for the interactions between commands and monitoring of the underlying component. For example, ADR-8 says that a configuring subarray moves from IDLE to CONFIGURING to READY. But in a device model where the state machine is responsive to both commands and changes to the monitored component, the sequence is better represented as follows: 1. The Configure() command triggers the "configure_invoked" action on the state machine, resulting in a transition from IDLE to CONFIGURING_IDLE 2. The Configure() command invokes methods on its component in order to effect configuration. At some point in this process, the component triggers the "component_configured" action on the state machine, resulting in a transition from CONFIGURING_IDLE to CONFIGURING_READY. 3. At completion of configuration, the action "configure_completed" is triggered on the state machine, resulting in a transition from CONFIGURING_READY to READY. Thus, this machine contains substates CONFIGURING_IDLE and CONFIGURING_READY, rather than the ADR-8 state CONFIGURING The full list of supported states are: * **EMPTY**: the subarray is unresourced * **RESOURCING_EMPTY**: the subarray is unresourced, but performing a resourcing operation * **RESOURCING_IDLE**: the subarray is resourced, and currently performing a resourcing operation * **IDLE**: the subarray is resourced but unconfigured * **CONFIGURING_IDLE**: the subarray is resourced but unconfigured; it is currently performing a configuring operation * **CONFIGURING_READY**: the subarray is resourced and configured; it is currently performing a configuring operation * **READY**: the subarray is resourced and configured * **SCANNING**: the subarray is scanning * **ABORTING**: the subarray is aborting * **ABORTING_EMPTY**: the subarray is aborting (while unresourced) * **ABORTED**: the subarray has aborted * **ABORTED_EMPTY**: the subarray has aborted (while unresourced) * **RESETTING**: the subarray is resetting from an ABORTED or FAULT state back to IDLE * **RESETTING_EMPTY**: the subarray is unresourced, and is resetting from an ABORTED_EMPTY or FAULT_EMPTY state back to EMPTY * **RESTARTING**: the subarray is restarting from an ABORTED or FAULT (or ABORTED_EMPTY or FAULT_EMPTY) state back to EMPTY * **FAULT**: the subarray has encountered an observation fault. * **FAULT_EMPTY**: the subarray has encountered an observation fault (whilst unresourced). The actions supported divide into command-oriented actions and component monitoring actions. The command-oriented actions are: * **assign_invoked** and **assign_completed**: bookending the AssignResources() command, and hence the RESOURCING transitional state * **release_invoked** and **release_completed**: bookending the ReleaseResources() and ReleaseAllResources() commands, hence the RESOURCING transitional state * **configure_invoked** and **configure_completed**: bookending the Configure() command, and hence the CONFIGURING state * **abort_invoked** and **abort_completed**: bookending the Abort() command, and hence the ABORTING state * **obsreset_invoked** and **obsreset_completed**: bookending the ObsReset() command, and hence the OBSRESETTING state * **restart_invoked** and **restart_completed**: bookending the Restart() command, and hence the RESTARTING state * **end_invoked**, **scan_invoked**, **end_scan_invoked**: these result in reflexive transitions, and are purely there to indicate states in which the End(), Scan() and EndScan() commands are permitted to be run The component-oriented actions are: * **component_obsfault**: the monitored component has experienced an observation fault * **component_unresourced**: the monitored component has become unresourced * **component_resourced**: the monitored component has become resourced * **component_unconfigured**: the monitored component has become unconfigured * **component_configured**: the monitored component has become configured * **component_scanning**: the monitored component has started scanning * **component_not_scanning**: the monitored component has stopped scanning A diagram of the state machine is shown below. Reflexive transitions and transitions to FAULT obs state are omitted to simplify the diagram. .. uml:: obs_state_machine.uml :caption: Diagram of the subarray obs state machine """ def __init__( self, callback: Callable[[str], None] | None = None, **extra_kwargs: Any, ) -> None: """ Initialise the model. :param callback: A callback to be called when the state changes :param extra_kwargs: Additional keywords arguments to pass to super class initialiser (useful for graphing) """ self._callback = callback states = [ "EMPTY", "RESOURCING_EMPTY", "RESOURCING_IDLE", "IDLE", "CONFIGURING_IDLE", "CONFIGURING_READY", "READY", "SCANNING", "ABORTING", "ABORTING_EMPTY", "ABORTED", "ABORTED_EMPTY", "RESETTING", "RESETTING_EMPTY", "RESTARTING", "FAULT", "FAULT_EMPTY", ] transitions = [ { "source": [ "EMPTY", "RESOURCING_EMPTY", "ABORTING_EMPTY", "ABORTED_EMPTY", "RESETTING_EMPTY", "FAULT_EMPTY", ], "trigger": "component_obsfault", "dest": "FAULT_EMPTY", }, { "source": [ "RESOURCING_IDLE", "IDLE", "CONFIGURING_IDLE", "CONFIGURING_READY", "READY", "SCANNING", "ABORTING", "ABORTED", "RESETTING", "RESTARTING", "FAULT", ], "trigger": "component_obsfault", "dest": "FAULT", }, { "source": "EMPTY", "trigger": "assign_invoked", "dest": "RESOURCING_EMPTY", }, { "source": "EMPTY", "trigger": "release_invoked", "dest": "RESOURCING_EMPTY", }, { "source": "IDLE", "trigger": "assign_invoked", "dest": "RESOURCING_IDLE", }, { "source": "IDLE", "trigger": "release_invoked", "dest": "RESOURCING_IDLE", }, { "source": "RESOURCING_EMPTY", "trigger": "component_resourced", "dest": "RESOURCING_IDLE", }, { "source": "RESOURCING_IDLE", "trigger": "component_unresourced", "dest": "RESOURCING_EMPTY", }, { "source": "RESOURCING_EMPTY", "trigger": "assign_completed", "dest": "EMPTY", }, { "source": "RESOURCING_EMPTY", "trigger": "release_completed", "dest": "EMPTY", }, { "source": "RESOURCING_IDLE", "trigger": "assign_completed", "dest": "IDLE", }, { "source": "RESOURCING_IDLE", "trigger": "release_completed", "dest": "IDLE", }, { "source": "IDLE", "trigger": "configure_invoked", "dest": "CONFIGURING_IDLE", }, { "source": "CONFIGURING_IDLE", "trigger": "configure_completed", "dest": "IDLE", }, { "source": "READY", "trigger": "configure_invoked", "dest": "CONFIGURING_READY", }, { "source": "CONFIGURING_IDLE", "trigger": "component_configured", "dest": "CONFIGURING_READY", }, { "source": "CONFIGURING_READY", "trigger": "configure_completed", "dest": "READY", }, { "source": "READY", "trigger": "end_invoked", "dest": "READY", }, { "source": "READY", "trigger": "component_unconfigured", "dest": "IDLE", }, { "source": "READY", "trigger": "scan_invoked", "dest": "READY", }, { "source": "READY", "trigger": "component_scanning", "dest": "SCANNING", }, { "source": "SCANNING", "trigger": "end_scan_invoked", "dest": "SCANNING", }, { "source": "SCANNING", "trigger": "component_not_scanning", "dest": "READY", }, { "source": ["RESOURCING_EMPTY", "RESETTING_EMPTY"], "trigger": "abort_invoked", "dest": "ABORTING_EMPTY", }, { "source": [ "RESOURCING_IDLE", "IDLE", "CONFIGURING_IDLE", "CONFIGURING_READY", "READY", "SCANNING", "RESETTING", ], "trigger": "abort_invoked", "dest": "ABORTING", }, # Aborting implies trying to stop the monitored component # while it is doing something. Thus the monitored component # may send some events while in aborting state. { "source": "ABORTING_EMPTY", "trigger": "component_resourced", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_unresourced", "dest": "ABORTING_EMPTY", }, { "source": "ABORTING", "trigger": "component_unconfigured", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_configured", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_not_scanning", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_scanning", "dest": "ABORTING", }, { "source": "ABORTING_EMPTY", "trigger": "abort_completed", "dest": "ABORTED_EMPTY", }, { "source": "ABORTING", "trigger": "abort_completed", "dest": "ABORTED", }, { "source": ["ABORTED_EMPTY", "FAULT_EMPTY"], "trigger": "obsreset_invoked", "dest": "RESETTING_EMPTY", }, { "source": ["ABORTED", "FAULT"], "trigger": "obsreset_invoked", "dest": "RESETTING", }, { "source": "RESETTING", "trigger": "component_unconfigured", "dest": "RESETTING", }, { "source": "RESETTING", "trigger": "obsreset_completed", "dest": "IDLE", }, { "source": "RESETTING_EMPTY", "trigger": "obsreset_completed", "dest": "EMPTY", }, { "source": ["ABORTED", "ABORTED_EMPTY", "FAULT", "FAULT_EMPTY", "EMPTY"], "trigger": "restart_invoked", "dest": "RESTARTING", }, { "source": "RESTARTING", "trigger": "component_unconfigured", "dest": "RESTARTING", }, { "source": "RESTARTING", "trigger": "component_unresourced", "dest": "RESTARTING", }, { "source": "RESTARTING", "trigger": "restart_completed", "dest": "EMPTY", }, ] super().__init__( states=states, initial="EMPTY", transitions=transitions, after_state_change=self._state_changed, **extra_kwargs, ) self._state_changed() def _state_changed(self) -> None: """ State machine callback that is called every time the obs_state changes. Responsible for ensuring that callbacks are called. """ if self._callback is not None: self._callback(self.state) class ObsStateModel: """ Implements the observation state model for subarray. The model supports all of the states of the :py:class:`~ska_control_model.obs_state.ObsState` enum: * **EMPTY**: the subarray is unresourced * **RESOURCING**: the subarray is performing a resourcing operation * **IDLE**: the subarray is resourced but unconfigured * **CONFIGURING**: the subarray is performing a configuring operation * **READY**: the subarray is resourced and configured * **SCANNING**: the subarray is scanning * **ABORTING**: the subarray is aborting * **ABORTED**: the subarray has aborted * **RESETTING**: the subarray is resetting from an ABORTED or FAULT state back to IDLE * **RESTARTING**: the subarray is restarting from an ABORTED or FAULT state back to EMPTY * **FAULT**: the subarray has encountered a observation fault. A diagram of the subarray observation state model is shown below. This model is non-deterministic as diagrammed, but the underlying state machines has extra states and transitions that render it deterministic. This class simply maps those extra classes onto valid ObsState values. .. uml:: obs_state_model.uml :caption: Diagram of the subarray observation state model """ _OBS_STATE_MAPPING: Final = { "EMPTY": ObsState.EMPTY, "RESOURCING_EMPTY": ObsState.RESOURCING, "RESOURCING_IDLE": ObsState.RESOURCING, "IDLE": ObsState.IDLE, "CONFIGURING_IDLE": ObsState.CONFIGURING, "CONFIGURING_READY": ObsState.CONFIGURING, "READY": ObsState.READY, "SCANNING": ObsState.SCANNING, "ABORTING": ObsState.ABORTING, "ABORTING_EMPTY": ObsState.ABORTING, "ABORTED": ObsState.ABORTED, "ABORTED_EMPTY": ObsState.ABORTED, "RESETTING": ObsState.RESETTING, "RESETTING_EMPTY": ObsState.RESETTING, "RESTARTING": ObsState.RESTARTING, "FAULT": ObsState.FAULT, "FAULT_EMPTY": ObsState.FAULT, } def __init__( self, logger: logging.Logger, callback: Callable[[ObsState], None] | None = None, state_machine_factory: Callable[..., Machine] = _ObsStateMachine, ) -> None: """ Initialise the model. :param logger: the logger to be used by this state model. :param callback: A callback to be called when a transition causes a change to device obs_state :param state_machine_factory: a callable that returns a state machine for this model to use """ self.logger = logger self._obs_state: ObsState | None = None self._callback = callback self._obs_state_machine = state_machine_factory( callback=self._obs_state_changed ) @property def obs_state(self) -> ObsState | None: """ Return the obs_state. :returns: obs_state of this state model """ return self._obs_state def _obs_state_changed(self, machine_state: str) -> None: """ Handle change in observation state. This is a helper method that updates obs_state, ensuring that the callback is called if one exists. :param machine_state: the new state of the observation state machine """ obs_state = self._OBS_STATE_MAPPING[machine_state] if self._obs_state != obs_state: self._obs_state = obs_state if self._callback is not None: self._callback(obs_state) def is_action_allowed(self, action: str, raise_if_disallowed: bool = False) -> bool: """ Return whether a given action is allowed in the current state. :param action: an action, as given in the transitions table :param raise_if_disallowed: whether to raise an exception if the action is disallowed, or merely return False (optional, defaults to False) :raises StateModelError: if the action is unknown to the state machine :return: whether the action is allowed in the current state """ if action in self._obs_state_machine.get_triggers( self._obs_state_machine.state ): return True if raise_if_disallowed: raise StateModelError( f"Action {action} is not allowed in obs state " f"{'None' if self.obs_state is None else self.obs_state.name}." ) return False def perform_action(self, action: str) -> None: """ Perform an action on the state model. :param action: an action, as given in the transitions table """ _ = self.is_action_allowed(action, raise_if_disallowed=True) self._obs_state_machine.trigger(action) @for_testing_only def _straight_to_state(self, obs_state_name: ObsState) -> None: """ Take this model straight to the specified state. This method exists to simplify testing; for example, if testing that a command may be run in a given ObsState, one can push this state model straight to that ObsState, rather than having to drive it to that state through a sequence of actions. It is not intended that this method would be called outside of test setups. A warning will be raised if it is. For example, to test that a device transitions from SCANNING to ABORTING when the Abort() command is called: .. code-block:: py model = ObservationStateModel(logger) model._straight_to_state("SCANNING") assert model.obs_state == ObsState.SCANNING model.perform_action("abort_invoked") assert model.obs_state == ObsState.ABORTING :param obs_state_name: the target obs_state """ getattr(self._obs_state_machine, f"to_{obs_state_name}")()