Source code for ska_pst_lmc.component.obs_state_model

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
This module provides the PST overrides of the ObsStateModel.

There is known tech debt in PST in regards to the fact that it
was initially developed using a sub-array model and not an
obs device. While the BEAM.MGMT uses a `CspSubElementObsStateModel`
and `CspSubElementObsStateMachine` this does not work for the
sub-devices like SMRB.MGMT, RECV.MGMT, and DSP.MGMT which still
have a resourcing step and a configuring step. To simplify the
resetting as part of AT3-425 the default ObsStateMachine needs
to be overriden to allow for the "unresourced" (sic - SKA term)
state.
"""

from __future__ import annotations

from typing import Any, Callable, Optional

from transitions.extensions import LockedMachine as Machine


[docs]class PstObsStateMachine(Machine): """ State machine for observation state. This class to be used in place of the :py:class:`ska_control_model._ObsStateMachine` state machine to allow for the specifics of the state machine needed for the PST Process sub-devices (i.e. SMRB.MGMT, DSP.MGMT, RECV.MGMT, etc) This is based off the `SKA ObsStateModel <https://developer.skao.int/projects/ska-control-model/en/latest/obs_state.html#ska_control_model.ObsStateModel>`_ The full list of supported states are: * **EMPTY**: the PST subdevice has no configuration * **RESOURCING_EMPTY**: the PST subdevice is unresourced, but performing a configure beam operation * **RESOURCING_IDLE**: the PST subdevice is beam configured, and is currently finishing the configure beam operation * **IDLE**: the PST subdevice is beam configured but not scan configured * **CONFIGURING_IDLE**: the PST subdevice is beam configured but not scan configured; it is currently performing a configure scan operation * **CONFIGURING_READY**: the PST subdevice is scan configured; and is currently finishing the configure scan operation * **READY**: the PST subdevice is scan configured and waiting for a start scan request * **SCANNING**: the PST subdevice is scanning * **ABORTING**: the PST subdevice is aborting * **ABORTED**: the PST subdevice has aborted * **RESETTING**: the PST subdevice is resetting from an ABORTED or FAULT state back to IDLE * **FAULT**: the PST subdevice has encountered an observation fault. A diagram of the state machine is shown below. Reflexive transitions and transitions to FAULT obs state are omitted to simplify the diagram. .. uml:: obs_state_machine.puml :caption: Diagram of the PST subdevice state machine """ def __init__( self: PstObsStateMachine, callback: Optional[Callable] = None, **extra_kwargs: Any, ) -> None: """ Initialise the model. :param callback: A callback to be called when the state changes :type callback: callable :param extra_kwargs: Additional keywords arguments to pass to super class initialiser (useful for graphing) """ self._callback = callback states = [ "EMPTY", "RESOURCING_EMPTY", "RESOURCING_IDLE", "IDLE", "CONFIGURING_IDLE", "CONFIGURING_READY", "READY", "SCANNING", "ABORTING", "ABORTED", "RESETTING", "FAULT", ] transitions = [ { "source": "*", "trigger": "component_obsfault", "dest": "FAULT", }, { "source": "EMPTY", "trigger": "assign_invoked", "dest": "RESOURCING_EMPTY", }, { "source": "EMPTY", "trigger": "release_invoked", "dest": "RESOURCING_EMPTY", }, { "source": "IDLE", "trigger": "assign_invoked", "dest": "RESOURCING_IDLE", }, { "source": "IDLE", "trigger": "release_invoked", "dest": "RESOURCING_IDLE", }, { "source": "RESOURCING_EMPTY", "trigger": "component_resourced", "dest": "RESOURCING_IDLE", }, { "source": "RESOURCING_IDLE", "trigger": "component_unresourced", "dest": "RESOURCING_EMPTY", }, { "source": "RESOURCING_EMPTY", "trigger": "assign_completed", "dest": "EMPTY", }, { "source": "RESOURCING_EMPTY", "trigger": "release_completed", "dest": "EMPTY", }, { "source": "RESOURCING_IDLE", "trigger": "assign_completed", "dest": "IDLE", }, { "source": "RESOURCING_IDLE", "trigger": "release_completed", "dest": "IDLE", }, { "source": "IDLE", "trigger": "configure_invoked", "dest": "CONFIGURING_IDLE", }, { "source": "CONFIGURING_IDLE", "trigger": "configure_completed", "dest": "IDLE", }, { "source": "READY", "trigger": "configure_invoked", "dest": "CONFIGURING_READY", }, { "source": "CONFIGURING_IDLE", "trigger": "component_configured", "dest": "CONFIGURING_READY", }, { "source": "CONFIGURING_READY", "trigger": "configure_completed", "dest": "READY", }, { "source": "READY", "trigger": "end_invoked", "dest": "READY", }, { "source": "READY", "trigger": "component_unconfigured", "dest": "IDLE", }, { "source": "READY", "trigger": "scan_invoked", "dest": "READY", }, { "source": "READY", "trigger": "component_scanning", "dest": "SCANNING", }, { "source": "SCANNING", "trigger": "end_scan_invoked", "dest": "SCANNING", }, { "source": "SCANNING", "trigger": "component_not_scanning", "dest": "READY", }, { "source": [ "IDLE", "CONFIGURING_IDLE", "CONFIGURING_READY", "READY", "SCANNING", "RESETTING", ], "trigger": "abort_invoked", "dest": "ABORTING", }, # Aborting implies trying to stop the monitored component # while it is doing something. Thus the monitored component # may send some events while in aborting state. { "source": "ABORTING", "trigger": "component_unconfigured", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_configured", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_not_scanning", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "component_scanning", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "abort_completed", "dest": "ABORTED", }, { "source": ["ABORTED", "FAULT"], "trigger": "obsreset_invoked", "dest": "RESETTING", }, { "source": "RESETTING", "trigger": "component_unconfigured", "dest": "RESETTING", }, { "source": "RESETTING", "trigger": "component_unresourced", "dest": "RESETTING", }, { "source": "RESETTING", "trigger": "obsreset_completed", "dest": "EMPTY", }, ] super().__init__( states=states, initial="EMPTY", transitions=transitions, after_state_change=self._state_changed, **extra_kwargs, ) self._state_changed() def _state_changed(self: PstObsStateMachine) -> None: """ State machine callback that is called every time the obs_state changes. Responsible for ensuring that callbacks are called. """ if self._callback is not None: self._callback(self.state)