Source code for ska_pst.lmc.component.obs_state_model

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
This module provides PST overrides of the ObsStateModel.

There is known tech debt in PST in regards to the fact that it
was initially developed using a sub-array model and not an
obs device. While the BEAM.MGMT uses a `CspSubElementObsStateModel`
and `CspSubElementObsStateMachine` this does not work for the
sub-devices like SMRB.MGMT, RECV.MGMT, and DSP.MGMT which still
have a resourcing step and a configuring step. To simplify the
resetting as part of AT3-425 the default ObsStateMachine needs
to be overriden to allow for the "unresourced" (sic - SKA term)
state.
"""

from __future__ import annotations

import logging
from typing import Any, Callable, List

from ska_csp_lmc_base.obs import CspSubElementObsStateModel
from transitions.extensions import LockedMachine as Machine

VALID_STATES: List[str] = [
    "IDLE",
    "CONFIGURING_IDLE",
    "CONFIGURING_READY",
    "READY",
    "SCANNING",
    "ABORTING",
    "ABORTED",
    "RESETTING",
    "FAULT",
]
"""
List of valid transitions for the observing state machine.

This list is different to the ``ObsState`` values due to
it representing internal states of the state machine.

These values are mapped to ``ObsState`` values by the
base :py:class:`ska_control_model.ObsStateModel` class.
"""

VALID_TRANSITIONS: List[dict] = [
    {
        "source": "*",
        "trigger": "component_obsfault",
        "dest": "FAULT",
    },
    {
        "source": "*",
        "trigger": "reset_invoked",
        "dest": "RESETTING",
    },
    {
        "source": "IDLE",
        "trigger": "configure_invoked",
        "dest": "CONFIGURING_IDLE",
    },
    {
        "source": "CONFIGURING_IDLE",
        "trigger": "configure_completed",
        "dest": "IDLE",
    },
    {
        "source": "READY",
        "trigger": "configure_invoked",
        "dest": "CONFIGURING_READY",
    },
    {
        "source": "CONFIGURING_IDLE",
        "trigger": "component_configured",
        "dest": "CONFIGURING_READY",
    },
    {
        "source": "CONFIGURING_READY",
        "trigger": "configure_completed",
        "dest": "READY",
    },
    {
        "source": "READY",
        "trigger": "end_invoked",
        "dest": "READY",
    },
    {
        "source": "READY",
        "trigger": "component_unconfigured",
        "dest": "IDLE",
    },
    {
        "source": "READY",
        "trigger": "scan_invoked",
        "dest": "READY",
    },
    {
        "source": "READY",
        "trigger": "component_scanning",
        "dest": "SCANNING",
    },
    {
        "source": "SCANNING",
        "trigger": "end_scan_invoked",
        "dest": "SCANNING",
    },
    {
        "source": "SCANNING",
        "trigger": "component_not_scanning",
        "dest": "READY",
    },
    {
        "source": [
            "IDLE",
            "CONFIGURING_IDLE",
            "CONFIGURING_READY",
            "READY",
            "SCANNING",
            "RESETTING",
        ],
        "trigger": "abort_invoked",
        "dest": "ABORTING",
    },
    # Aborting implies trying to stop the monitored component
    # while it is doing something. Thus the monitored component
    # may send some events while in aborting state.
    {
        "source": "ABORTING",
        "trigger": "component_unconfigured",
        "dest": "ABORTING",
    },
    {
        "source": "ABORTING",
        "trigger": "component_configured",
        "dest": "ABORTING",
    },
    {
        "source": "ABORTING",
        "trigger": "component_not_scanning",
        "dest": "ABORTING",
    },
    {
        "source": "ABORTING",
        "trigger": "component_scanning",
        "dest": "ABORTING",
    },
    {
        "source": "ABORTING",
        "trigger": "abort_completed",
        "dest": "ABORTED",
    },
    {
        "source": ["ABORTED", "FAULT"],
        "trigger": "obsreset_invoked",
        "dest": "RESETTING",
    },
    {
        "source": "RESETTING",
        "trigger": "component_unconfigured",
        "dest": "RESETTING",
    },
    {
        "source": "RESETTING",
        "trigger": "component_not_scanning",
        "dest": "RESETTING",
    },
    {
        "source": "RESETTING",
        "trigger": "obsreset_completed",
        "dest": "IDLE",
    },
    {
        "source": "RESETTING",
        "trigger": "component_unconfigured",
        "dest": "RESETTING",
    },
    {
        "source": "RESETTING",
        "trigger": "reset_completed",
        "dest": "IDLE",
    },
]
"""A list of valid state transitions used by PST."""


[docs]class PstObsStateMachine(Machine): """ State machine for observation state. This class to be used in place of the :py:class:`ska_control_model._ObsStateMachine` state machine to allow for the specifics of the state machine needed for the PST Process sub-devices (i.e. SMRB.MGMT, DSP.MGMT, RECV.MGMT, etc) This is based off the `SKA ObsStateModel <https://developer.skao.int/projects/ska-control-model/en/latest/obs_state.html#ska_control_model.ObsStateModel>`_ The full list of supported states are: * **IDLE**: PST has not been configured for a scan * **CONFIGURING_IDLE**: PST is in the process of configuring a scan; it is currently performing a configure scan operation * **CONFIGURING_READY**: PST is in the process of configuring a scan; and is currently finishing the configure scan operation * **READY**: PST is configured for a scan * **SCANNING**: PST is scanning * **ABORTING**: PST is aborting * **ABORTED**: PST has aborted * **RESETTING**: PST is resetting to IDLE * **FAULT**: PST has faulted and needs to be reset A diagram of the state machine is shown below. Reflexive transitions and transitions to FAULT obs state are omitted to simplify the diagram. .. uml:: obs_state_machine.puml :caption: Diagram of PST subdevice state machine """ def __init__( self: PstObsStateMachine, callback: Callable | None = None, initial_state: str = "IDLE", **extra_kwargs: Any, ) -> None: """ Initialise the model. :param callback: A callback to be called when the state changes :type callback: callable :param extra_kwargs: Additional keywords arguments to pass to super class init method (useful for graphing) """ self._callback = callback super().__init__( states=VALID_STATES, initial=initial_state, transitions=VALID_TRANSITIONS, after_state_change=self._state_changed, **extra_kwargs, ) self._state_changed() def _state_changed(self: PstObsStateMachine) -> None: """ State machine callback that is called every time the obs_state changes. Responsible for ensuring that callbacks are called. """ if self._callback is not None: self._callback(self.state)
[docs]class PstObsStateModel(CspSubElementObsStateModel): """ Implements the observation state model for a PST Device. The implemented states are: * **IDLE**: the device is unconfigured. * **CONFIGURING**: transitional state to report device configuration is in progress. * **READY**: the device is configured and is ready to perform observations * **SCANNING**: the device is performing the observation. * **ABORTING**: the device is processing an abort. * **ABORTED**: the device has completed the abort request. * **RESETTING**: the device is resetting all the device back to IDLE * **FAULT**: the device component has experienced an error from which it can be recovered only via manual intervention invoking a reset command that force the device to the base state (IDLE). """ def __init__( self: PstObsStateModel, logger: logging.Logger, initial_state: str = "IDLE", callback: Callable | None = None, ) -> None: """ Initialise the model. :param logger: the logger to be used by this state model. :type logger: logging.Logger :param initial_state: the initial state of the model, defaults to "IDLE" :type initial_state: str, optional :param callback: A callback to be called when a transition causes a change to device obs_state :type callback: Callable | None, optional """ super().__init__( logger, callback=callback, ) self._obs_state_machine = PstObsStateMachine( initial_state=initial_state, callback=self._obs_state_changed )