State Machines and Automated Navigation

Conceptual definition

While Actions, Assertions and Synchronisation address the problem of representing individual interactions with the SUT in a structured way, integration tests often face a higher-level challenge: orchestrating sequences of actions across a complex, multi-state system.

In many systems under test — especially in the SKA context — the state space is both large and formally defined. A Tango subarray device, for instance, may follow a well-specified observation lifecycle (EMPTYIDLEREADYSCANNING → …) with multiple alternative paths, recovery transitions, and abort sequences. Handling each of these transitions manually in test code quickly leads to deeply nested conditionals, duplicated sequences of commands, and logic that is hard to read, maintain, or reuse.

The State Machine building block addresses this problem by allowing you to model the SUT’s state space explicitly and delegate navigation to an automated algorithm. Rather than writing:

# Imperative, fragile, hard to maintain
if current_state == ObsState.SCANNING:
    end_scan_action.execute(timeout=60)
    end_action.execute(timeout=60)
elif current_state == ObsState.READY:
    end_action.execute(timeout=60)
elif current_state == ObsState.ABORTED:
    restart_action.execute(timeout=60)
# ... and so on

you declare the valid transitions once and then simply ask:

# Declarative, robust, readable
state_machine.reach(ObsState.IDLE, timeout=60)

The state machine figures out the rest.

The core idea: state space as a directed graph

The mental model behind this building block is straightforward: the SUT’s state space is a directed graph where:

  • Nodes are the possible states of the system (represented as Enum members, e.g., ObsState.EMPTY, ObsState.READY).

  • Edges are the transitions between states, each associated with a SUTAction that physically performs the state change on the SUT.

  • Edge weights (costs) allow you to express preferences: for example, an Abort + Restart recovery path may carry a higher cost than a direct forward transition, so the algorithm will avoid it unless strictly necessary.

When you invoke reach(), the state machine reads the current state from the SUT via a user-supplied callable, applies Dijkstra’s shortest-path algorithm to compute the lowest-cost sequence of transitions to the target state, and then executes each transition in order, validating the actual system state after every step.

This design eliminates entire categories of test-code complexity:

  • No more manual bookkeeping of “which state am I in?”.

  • No more long chains of if/elif to decide which commands to send.

  • No more duplicated teardown sequences scattered across test fixtures.

  • Recovery and cleanup logic can be encoded as transitions with appropriate costs, and invoked transparently whenever needed.

Design and implementation

In ITH as a Platform, the state machine mechanism is provided by two main classes in ska_integration_test_harness.core.state_machine.

StateMachineTransition represents a single directed edge in the state graph. It holds:

  • A target state — the state the system is expected to be in after the transition completes.

  • An action (SUTAction) — the concrete operation that drives the SUT from a source state to the target state.

  • A set of accepted source states — the states from which this transition is valid. If not specified, the transition is considered valid from any state.

  • A cost (default 1.0) — a non-negative weight used by the pathfinding algorithm. Higher costs discourage a transition from being selected; this is how one encodes a preference for the normal path over a costly recovery route such as Abort.

StateMachine is the main entry point. You instantiate it with a state getter callable (a zero-argument function that returns the current state of the SUT) and then register transitions using define_transition(). Once the graph is fully defined, the public interface offers:

  • reach() — navigate to a target state, executing the computed path.

  • is_reachable() — check whether a target state is reachable from the current (or a given) state, without executing anything.

  • path_to() — compute and return the planned sequence of transitions without executing them, useful for inspection or logging.

  • current_state() — read the current state from the SUT via the registered getter.

Validation after each step. After every transition is executed, the state machine reads the actual system state and compares it against the expected target. If they differ, a StateMachineNavigationError is raised immediately. This prevents cascading failures and makes it easy to diagnose unexpected system behaviour during test execution.

Error handling. Two specific exception types signal different failure modes:

  • StateMachineNoPathError — raised when the graph contains no path from the current state to the requested target (either because no transitions were defined for that route, or because the current state is not a valid source for any of them).

  • StateMachineNavigationError — raised when an error occurs during navigation (e.g., the system ended up in an unexpected state after a transition, or the underlying action raised an exception).

Relationship with Actions

The state machine building block is designed to compose with the Actions building block, not to replace it. Each transition in the graph delegates its execution to a SUTAction, which handles the low-level details of pre-conditions, the command procedure, post-conditions, and synchronisation. The state machine operates one layer above: it decides which action to run and in what order, based on the declared graph and the computed path.

This layering keeps each concern separate:

  • Actions describe how to carry out a single state change — the low-level, on-the-wire interaction logic.

  • State machines describe the topology of the state space and how to navigate it autonomously — the orchestration logic.

A typical usage pattern is therefore to define the individual transitions using TangoLRCAction (or any custom SUTAction subclass, not either necessarily tied to Tango), and wire them together in a StateMachine instance that your test fixtures or setup/teardown helpers can use.

Usage Example: Subarray Observation State Navigation

This example demonstrates how to model the observation lifecycle of a Tango subarray device as a state machine and use it to navigate between states in test setup and teardown steps, without any hardcoded command sequences.

Scenario. A single tango.DeviceProxy for the subarray exposes an obsState attribute (an ObsState enum) and accepts the standard SKA Long Running Commands (AssignResources, Configure, Scan, etc.). We want test fixtures to be able to bring the subarray to any target observation state from any starting state, without the fixture knowing or caring what the current state is. For simplicity, command inputs (JSON strings for AssignResources, Configure, and Scan) are treated as fixed values in this example.

Approach. We write a minimal Telescope wrapper that owns the device proxy and provides two methods:

  • _build_obs_state_command — constructs a TangoLRCAction for a given command, attaching LRC completion and an obsState post-condition as required.

  • _build_subarray_state_machine — assembles the full StateMachine, wiring each command to the appropriate transition. Transition costs encode the preference for the normal forward path over abort/restart recovery routes.

  • reach_obs_state — the public entry point: builds the state machine, then drives through an optional list of intermediate states before arriving at the target.

from enum import Enum
from typing import SupportsFloat

import tango
from ska_control_model import ObsState
from ska_tango_testing.integration.assertions import ChainedAssertionsTimeout

from ska_integration_test_harness.core.actions.tracer_action import TracerAction
from ska_integration_test_harness.core.assertions.dev_state_changes import (
    AssertDevicesStateChanges,
)
from ska_integration_test_harness.core.state_machine import StateMachine
from ska_integration_test_harness.extensions.actions import TangoLRCAction


class SubarrayCommand(Enum):
    AssignResources = "AssignResources"
    Configure = "Configure"
    Scan = "Scan"
    EndScan = "EndScan"
    End = "End"
    ReleaseAllResources = "ReleaseAllResources"
    Abort = "Abort"
    Restart = "Restart"


# Fixed inputs (in a real project these would be loaded from files
# or injected via fixtures):
COMMANDS_INPUTS = {
    SubarrayCommand.AssignResources: '{"resources": [...]}',
    SubarrayCommand.Configure: '{"config": {...}}',
    SubarrayCommand.Scan: '{"scan_id": 1}',
}


class Telescope:
    """Minimal wrapper around a single subarray device proxy."""

    def __init__(self, subarray: tango.DeviceProxy) -> None:
        self.subarray = subarray

    # -----------------------------------------------------------------
    # Low-level action builder

    def _build_obs_state_command(
        self,
        command: SubarrayCommand,
        input: str | None = None,
        wait_for_obsstate: ObsState | None = None,
        wait_lrc_completion: bool = True,
    ) -> TangoLRCAction:
        """Build a TangoLRCAction for the given subarray command."""
        action = TangoLRCAction(
            target_device=self.subarray,
            command_name=command.value,
            command_param=input,
        )
        action.add_lrc_errors_to_early_stop()
        if wait_lrc_completion:
            action.add_lrc_completion_to_postconditions()
        if wait_for_obsstate is not None:
            action.add_postconditions(
                AssertDevicesStateChanges(
                    self.subarray, "obsState", wait_for_obsstate
                )
            )
        return action

    def _build_wait_obs_state_action(
        self, expected_state: ObsState
    ) -> TracerAction:
        """Build an action that does nothing but wait for an obsState."""

        class _Wait(TracerAction):
            def execute_procedure(self):
                pass

        return _Wait().add_postconditions(
            AssertDevicesStateChanges(
                self.subarray, "obsState", expected_state
            )
        )

    # -----------------------------------------------------------------
    # State machine builder

    def _build_subarray_state_machine(
        self,
        commands_inputs: dict[SubarrayCommand, str],
    ) -> StateMachine:
        """Declare the full subarray state graph and return the machine."""
        sm = StateMachine(
            state_getter=lambda: ObsState(self.subarray.obsState)
        )

        # ---- Normal forward path ------------------------------------
        sm.define_transition(
            target=ObsState.IDLE,
            accepted_sources=[ObsState.EMPTY],
            action=self._build_obs_state_command(
                SubarrayCommand.AssignResources,
                input=commands_inputs[SubarrayCommand.AssignResources],
                wait_lrc_completion=True,
                wait_for_obsstate=ObsState.IDLE,
            ),
            cost=1,
        )
        sm.define_transition(
            target=ObsState.READY,
            accepted_sources=[ObsState.IDLE],
            action=self._build_obs_state_command(
                SubarrayCommand.Configure,
                input=commands_inputs[SubarrayCommand.Configure],
                wait_lrc_completion=True,
                wait_for_obsstate=ObsState.READY,
            ),
            cost=1,
        )
        sm.define_transition(
            target=ObsState.SCANNING,
            accepted_sources=[ObsState.READY],
            action=self._build_obs_state_command(
                SubarrayCommand.Scan,
                input=commands_inputs[SubarrayCommand.Scan],
                wait_for_obsstate=ObsState.SCANNING,
                wait_lrc_completion=False,  # fire-and-forget
            ),
            cost=1,
        )

        # ---- Transient states (useful to test intermediate steps) ---
        # No LRC wait: we only care that the transition starts,
        # not that it completes.
        sm.define_transition(
            target=ObsState.RESOURCING,
            accepted_sources=[ObsState.EMPTY],
            action=self._build_obs_state_command(
                SubarrayCommand.AssignResources,
                input=commands_inputs[SubarrayCommand.AssignResources],
                wait_for_obsstate=ObsState.RESOURCING,
                wait_lrc_completion=False,
            ),
            cost=1,
        )
        sm.define_transition(
            target=ObsState.CONFIGURING,
            accepted_sources=[ObsState.IDLE],
            action=self._build_obs_state_command(
                SubarrayCommand.Configure,
                input=commands_inputs[SubarrayCommand.Configure],
                wait_for_obsstate=ObsState.CONFIGURING,
                wait_lrc_completion=False,
            ),
            cost=1,
        )

        # ---- Teardown path ------------------------------------------
        sm.define_transition(
            target=ObsState.READY,
            accepted_sources=[ObsState.SCANNING],
            action=self._build_obs_state_command(
                SubarrayCommand.EndScan,
                wait_for_obsstate=ObsState.READY,
            ),
            cost=1,
        )
        sm.define_transition(
            target=ObsState.IDLE,
            accepted_sources=[ObsState.READY],
            action=self._build_obs_state_command(
                SubarrayCommand.End,
                wait_for_obsstate=ObsState.IDLE,
            ),
            cost=1,
        )
        sm.define_transition(
            target=ObsState.EMPTY,
            accepted_sources=[ObsState.IDLE],
            action=self._build_obs_state_command(
                SubarrayCommand.ReleaseAllResources,
                wait_for_obsstate=ObsState.EMPTY,
            ),
            cost=1,
        )

        # ---- Abort / restart recovery --------------------------------
        # Higher cost ensures the algorithm prefers the normal path
        # and only falls back to abort/restart when no other route exists.
        supports_abort = [
            ObsState.RESOURCING,
            ObsState.IDLE,
            ObsState.CONFIGURING,
            ObsState.READY,
            ObsState.SCANNING,
        ]
        supports_restart = [ObsState.ABORTED, ObsState.FAULT]

        sm.define_transition(
            target=ObsState.ABORTED,
            accepted_sources=supports_abort,
            action=self._build_obs_state_command(
                SubarrayCommand.Abort,
                wait_for_obsstate=ObsState.ABORTED,
            ),
            cost=10,
        )
        sm.define_transition(
            target=ObsState.EMPTY,
            accepted_sources=supports_restart,
            action=self._build_obs_state_command(
                SubarrayCommand.Restart,
                wait_for_obsstate=ObsState.EMPTY,
            ),
            cost=10,
        )

        # Transient abort/restart states (very high cost — last resort)
        sm.define_transition(
            target=ObsState.ABORTING,
            accepted_sources=supports_abort,
            action=self._build_obs_state_command(
                SubarrayCommand.Abort,
                wait_for_obsstate=ObsState.ABORTING,
                wait_lrc_completion=False,
            ),
            cost=50,
        )
        sm.define_transition(
            target=ObsState.RESTARTING,
            accepted_sources=supports_restart,
            action=self._build_obs_state_command(
                SubarrayCommand.Restart,
                wait_for_obsstate=ObsState.RESTARTING,
                wait_lrc_completion=False,
            ),
            cost=50,
        )

        # Passive wait transitions out of ABORTING / RESTARTING
        # (no command to send; just wait for the device to settle)
        sm.define_transition(
            target=ObsState.ABORTED,
            accepted_sources=[ObsState.ABORTING],
            action=self._build_wait_obs_state_action(ObsState.ABORTED),
            cost=50,
        )
        sm.define_transition(
            target=ObsState.EMPTY,
            accepted_sources=[ObsState.RESTARTING],
            action=self._build_wait_obs_state_action(ObsState.EMPTY),
            cost=50,
        )

        return sm

    # -----------------------------------------------------------------
    # Public entry point

    def reach_obs_state(
        self,
        target_state: ObsState,
        commands_inputs: dict[SubarrayCommand, str],
        pass_through_states: list[ObsState] | None = None,
        timeout: SupportsFloat = 100,
    ) -> None:
        """Navigate the subarray to ``target_state``.

        Optionally force a passage through one or more intermediate
        states first (e.g., ``pass_through_states=[ObsState.ABORTED]``
        to force an abort/restart cycle before reaching the target).
        """
        sm = self._build_subarray_state_machine(commands_inputs)
        steps = list(pass_through_states or []) + [target_state]
        remaining = ChainedAssertionsTimeout(timeout)
        remaining.start()
        for step in steps:
            sm.reach(step, timeout=remaining)

A few observations worth highlighting:

  • The state machine is rebuilt on every call to reach_obs_state. This is intentional: it keeps the code stateless and avoids stale action objects being reused across test steps.

  • The pass_through_states parameter is the mechanism for forcing a specific path. For example, passing [ObsState.ABORTED] before a target of ObsState.EMPTY guarantees the subarray goes through an abort/restart cycle, which is sometimes needed in teardown.

  • The _build_wait_obs_state_action pattern shows how to express a transition that sends no command but simply waits for the device to leave a transient state on its own — a natural fit for ABORTING and RESTARTING.

  • Transition costs encode knowledge about the system: cost=1 for normal steps, cost=10 for recoverable but disruptive operations, cost=50 for truly last-resort ones. Dijkstra’s algorithm will always find the cheapest path, so the normal flow is preferred automatically.

The payoff: a single reusable BDD step.

With the Telescope wrapper in place, the entire state-navigation logic is encapsulated and reusable. A pytest-BDD fixture (or any other test setup step) reduces to a single call:

@given(parsers.parse(
    "subarray {subarray_id} is in obsState {expected_state}"
))
def subarray_is_in_obsstate(
    telescope: Telescope,
    subarray_id: str,
    expected_state: str,
) -> None:
    """Bring the subarray to the required obsState, from any starting point."""
    target = ObsState[expected_state]

    try:
        telescope.reach_obs_state(
            target_state=target,
            commands_inputs=COMMANDS_INPUTS,
            timeout=LARGE_TIMEOUT,
        )
    except Exception:
        # Harder retry: force an abort/restart cycle first,
        # then navigate to the target with a more generous timeout.
        telescope.reach_obs_state(
            target_state=target,
            commands_inputs=COMMANDS_INPUTS,
            pass_through_states=[ObsState.ABORTED],
            timeout=VERY_LARGE_TIMEOUT,
        )

This step works regardless of the current observation state. The state machine computes and executes the shortest path automatically. If something goes wrong, the retry forces a clean abort/restart before trying again — all without any conditional logic in the test step itself.

Api Reference