State Machines and Automated Navigation
Conceptual definition
While Actions, Assertions and Synchronisation address the problem of representing individual interactions with the SUT in a structured way, integration tests often face a higher-level challenge: orchestrating sequences of actions across a complex, multi-state system.
In many systems under test — especially in the SKA context — the state space
is both large and formally defined. A Tango subarray device, for instance,
may follow a well-specified observation lifecycle (EMPTY → IDLE →
READY → SCANNING → …) with multiple alternative paths, recovery
transitions, and abort sequences. Handling each of these transitions
manually in test code quickly leads to deeply nested conditionals, duplicated
sequences of commands, and logic that is hard to read, maintain, or reuse.
The State Machine building block addresses this problem by allowing you to model the SUT’s state space explicitly and delegate navigation to an automated algorithm. Rather than writing:
# Imperative, fragile, hard to maintain
if current_state == ObsState.SCANNING:
end_scan_action.execute(timeout=60)
end_action.execute(timeout=60)
elif current_state == ObsState.READY:
end_action.execute(timeout=60)
elif current_state == ObsState.ABORTED:
restart_action.execute(timeout=60)
# ... and so on
you declare the valid transitions once and then simply ask:
# Declarative, robust, readable
state_machine.reach(ObsState.IDLE, timeout=60)
The state machine figures out the rest.
The core idea: state space as a directed graph
The mental model behind this building block is straightforward: the SUT’s state space is a directed graph where:
Nodes are the possible states of the system (represented as
Enummembers, e.g.,ObsState.EMPTY,ObsState.READY).Edges are the transitions between states, each associated with a
SUTActionthat physically performs the state change on the SUT.Edge weights (costs) allow you to express preferences: for example, an
Abort+Restartrecovery path may carry a higher cost than a direct forward transition, so the algorithm will avoid it unless strictly necessary.
When you invoke reach(),
the state machine reads the current state from the SUT via a
user-supplied callable, applies Dijkstra’s shortest-path algorithm to
compute the lowest-cost sequence of transitions to the target state,
and then executes each transition in order, validating the actual system
state after every step.
This design eliminates entire categories of test-code complexity:
No more manual bookkeeping of “which state am I in?”.
No more long chains of
if/elifto decide which commands to send.No more duplicated teardown sequences scattered across test fixtures.
Recovery and cleanup logic can be encoded as transitions with appropriate costs, and invoked transparently whenever needed.
Design and implementation
In ITH as a Platform, the state machine mechanism is provided by two
main classes in ska_integration_test_harness.core.state_machine.
StateMachineTransition
represents a single directed edge in the state graph. It holds:
A target state — the state the system is expected to be in after the transition completes.
An action (
SUTAction) — the concrete operation that drives the SUT from a source state to the target state.A set of accepted source states — the states from which this transition is valid. If not specified, the transition is considered valid from any state.
A cost (default
1.0) — a non-negative weight used by the pathfinding algorithm. Higher costs discourage a transition from being selected; this is how one encodes a preference for the normal path over a costly recovery route such asAbort.
StateMachine
is the main entry point. You instantiate it with a state getter callable
(a zero-argument function that returns the current state of the SUT) and
then register transitions using
define_transition().
Once the graph is fully defined, the public interface offers:
reach()— navigate to a target state, executing the computed path.is_reachable()— check whether a target state is reachable from the current (or a given) state, without executing anything.path_to()— compute and return the planned sequence of transitions without executing them, useful for inspection or logging.current_state()— read the current state from the SUT via the registered getter.
Validation after each step. After every transition is executed,
the state machine reads the actual system state and compares it against the
expected target. If they differ, a
StateMachineNavigationError
is raised immediately. This prevents cascading failures and makes it
easy to diagnose unexpected system behaviour during test execution.
Error handling. Two specific exception types signal different failure modes:
StateMachineNoPathError— raised when the graph contains no path from the current state to the requested target (either because no transitions were defined for that route, or because the current state is not a valid source for any of them).StateMachineNavigationError— raised when an error occurs during navigation (e.g., the system ended up in an unexpected state after a transition, or the underlying action raised an exception).
Relationship with Actions
The state machine building block is designed to compose with the Actions
building block, not to replace it. Each transition in the graph delegates
its execution to a
SUTAction, which
handles the low-level details of pre-conditions, the command procedure,
post-conditions, and synchronisation. The state machine operates one layer
above: it decides which action to run and in what order, based on the
declared graph and the computed path.
This layering keeps each concern separate:
Actions describe how to carry out a single state change — the low-level, on-the-wire interaction logic.
State machines describe the topology of the state space and how to navigate it autonomously — the orchestration logic.
A typical usage pattern is therefore to define the individual transitions
using TangoLRCAction
(or any custom
SUTAction subclass,
not either necessarily tied to Tango), and wire them together in a
StateMachine
instance that your test fixtures or setup/teardown helpers can use.
Usage Example: Subarray Observation State Navigation
This example demonstrates how to model the observation lifecycle of a Tango subarray device as a state machine and use it to navigate between states in test setup and teardown steps, without any hardcoded command sequences.
Scenario. A single tango.DeviceProxy for the subarray exposes an
obsState attribute (an ObsState enum) and accepts the standard SKA
Long Running Commands (AssignResources, Configure, Scan, etc.).
We want test fixtures to be able to bring the subarray to any target
observation state from any starting state, without the fixture knowing or
caring what the current state is. For simplicity, command inputs
(JSON strings for AssignResources, Configure, and Scan) are
treated as fixed values in this example.
Approach. We write a minimal Telescope wrapper that owns the device
proxy and provides two methods:
_build_obs_state_command— constructs aTangoLRCActionfor a given command, attaching LRC completion and anobsStatepost-condition as required._build_subarray_state_machine— assembles the fullStateMachine, wiring each command to the appropriate transition. Transition costs encode the preference for the normal forward path over abort/restart recovery routes.reach_obs_state— the public entry point: builds the state machine, then drives through an optional list of intermediate states before arriving at the target.
from enum import Enum
from typing import SupportsFloat
import tango
from ska_control_model import ObsState
from ska_tango_testing.integration.assertions import ChainedAssertionsTimeout
from ska_integration_test_harness.core.actions.tracer_action import TracerAction
from ska_integration_test_harness.core.assertions.dev_state_changes import (
AssertDevicesStateChanges,
)
from ska_integration_test_harness.core.state_machine import StateMachine
from ska_integration_test_harness.extensions.actions import TangoLRCAction
class SubarrayCommand(Enum):
AssignResources = "AssignResources"
Configure = "Configure"
Scan = "Scan"
EndScan = "EndScan"
End = "End"
ReleaseAllResources = "ReleaseAllResources"
Abort = "Abort"
Restart = "Restart"
# Fixed inputs (in a real project these would be loaded from files
# or injected via fixtures):
COMMANDS_INPUTS = {
SubarrayCommand.AssignResources: '{"resources": [...]}',
SubarrayCommand.Configure: '{"config": {...}}',
SubarrayCommand.Scan: '{"scan_id": 1}',
}
class Telescope:
"""Minimal wrapper around a single subarray device proxy."""
def __init__(self, subarray: tango.DeviceProxy) -> None:
self.subarray = subarray
# -----------------------------------------------------------------
# Low-level action builder
def _build_obs_state_command(
self,
command: SubarrayCommand,
input: str | None = None,
wait_for_obsstate: ObsState | None = None,
wait_lrc_completion: bool = True,
) -> TangoLRCAction:
"""Build a TangoLRCAction for the given subarray command."""
action = TangoLRCAction(
target_device=self.subarray,
command_name=command.value,
command_param=input,
)
action.add_lrc_errors_to_early_stop()
if wait_lrc_completion:
action.add_lrc_completion_to_postconditions()
if wait_for_obsstate is not None:
action.add_postconditions(
AssertDevicesStateChanges(
self.subarray, "obsState", wait_for_obsstate
)
)
return action
def _build_wait_obs_state_action(
self, expected_state: ObsState
) -> TracerAction:
"""Build an action that does nothing but wait for an obsState."""
class _Wait(TracerAction):
def execute_procedure(self):
pass
return _Wait().add_postconditions(
AssertDevicesStateChanges(
self.subarray, "obsState", expected_state
)
)
# -----------------------------------------------------------------
# State machine builder
def _build_subarray_state_machine(
self,
commands_inputs: dict[SubarrayCommand, str],
) -> StateMachine:
"""Declare the full subarray state graph and return the machine."""
sm = StateMachine(
state_getter=lambda: ObsState(self.subarray.obsState)
)
# ---- Normal forward path ------------------------------------
sm.define_transition(
target=ObsState.IDLE,
accepted_sources=[ObsState.EMPTY],
action=self._build_obs_state_command(
SubarrayCommand.AssignResources,
input=commands_inputs[SubarrayCommand.AssignResources],
wait_lrc_completion=True,
wait_for_obsstate=ObsState.IDLE,
),
cost=1,
)
sm.define_transition(
target=ObsState.READY,
accepted_sources=[ObsState.IDLE],
action=self._build_obs_state_command(
SubarrayCommand.Configure,
input=commands_inputs[SubarrayCommand.Configure],
wait_lrc_completion=True,
wait_for_obsstate=ObsState.READY,
),
cost=1,
)
sm.define_transition(
target=ObsState.SCANNING,
accepted_sources=[ObsState.READY],
action=self._build_obs_state_command(
SubarrayCommand.Scan,
input=commands_inputs[SubarrayCommand.Scan],
wait_for_obsstate=ObsState.SCANNING,
wait_lrc_completion=False, # fire-and-forget
),
cost=1,
)
# ---- Transient states (useful to test intermediate steps) ---
# No LRC wait: we only care that the transition starts,
# not that it completes.
sm.define_transition(
target=ObsState.RESOURCING,
accepted_sources=[ObsState.EMPTY],
action=self._build_obs_state_command(
SubarrayCommand.AssignResources,
input=commands_inputs[SubarrayCommand.AssignResources],
wait_for_obsstate=ObsState.RESOURCING,
wait_lrc_completion=False,
),
cost=1,
)
sm.define_transition(
target=ObsState.CONFIGURING,
accepted_sources=[ObsState.IDLE],
action=self._build_obs_state_command(
SubarrayCommand.Configure,
input=commands_inputs[SubarrayCommand.Configure],
wait_for_obsstate=ObsState.CONFIGURING,
wait_lrc_completion=False,
),
cost=1,
)
# ---- Teardown path ------------------------------------------
sm.define_transition(
target=ObsState.READY,
accepted_sources=[ObsState.SCANNING],
action=self._build_obs_state_command(
SubarrayCommand.EndScan,
wait_for_obsstate=ObsState.READY,
),
cost=1,
)
sm.define_transition(
target=ObsState.IDLE,
accepted_sources=[ObsState.READY],
action=self._build_obs_state_command(
SubarrayCommand.End,
wait_for_obsstate=ObsState.IDLE,
),
cost=1,
)
sm.define_transition(
target=ObsState.EMPTY,
accepted_sources=[ObsState.IDLE],
action=self._build_obs_state_command(
SubarrayCommand.ReleaseAllResources,
wait_for_obsstate=ObsState.EMPTY,
),
cost=1,
)
# ---- Abort / restart recovery --------------------------------
# Higher cost ensures the algorithm prefers the normal path
# and only falls back to abort/restart when no other route exists.
supports_abort = [
ObsState.RESOURCING,
ObsState.IDLE,
ObsState.CONFIGURING,
ObsState.READY,
ObsState.SCANNING,
]
supports_restart = [ObsState.ABORTED, ObsState.FAULT]
sm.define_transition(
target=ObsState.ABORTED,
accepted_sources=supports_abort,
action=self._build_obs_state_command(
SubarrayCommand.Abort,
wait_for_obsstate=ObsState.ABORTED,
),
cost=10,
)
sm.define_transition(
target=ObsState.EMPTY,
accepted_sources=supports_restart,
action=self._build_obs_state_command(
SubarrayCommand.Restart,
wait_for_obsstate=ObsState.EMPTY,
),
cost=10,
)
# Transient abort/restart states (very high cost — last resort)
sm.define_transition(
target=ObsState.ABORTING,
accepted_sources=supports_abort,
action=self._build_obs_state_command(
SubarrayCommand.Abort,
wait_for_obsstate=ObsState.ABORTING,
wait_lrc_completion=False,
),
cost=50,
)
sm.define_transition(
target=ObsState.RESTARTING,
accepted_sources=supports_restart,
action=self._build_obs_state_command(
SubarrayCommand.Restart,
wait_for_obsstate=ObsState.RESTARTING,
wait_lrc_completion=False,
),
cost=50,
)
# Passive wait transitions out of ABORTING / RESTARTING
# (no command to send; just wait for the device to settle)
sm.define_transition(
target=ObsState.ABORTED,
accepted_sources=[ObsState.ABORTING],
action=self._build_wait_obs_state_action(ObsState.ABORTED),
cost=50,
)
sm.define_transition(
target=ObsState.EMPTY,
accepted_sources=[ObsState.RESTARTING],
action=self._build_wait_obs_state_action(ObsState.EMPTY),
cost=50,
)
return sm
# -----------------------------------------------------------------
# Public entry point
def reach_obs_state(
self,
target_state: ObsState,
commands_inputs: dict[SubarrayCommand, str],
pass_through_states: list[ObsState] | None = None,
timeout: SupportsFloat = 100,
) -> None:
"""Navigate the subarray to ``target_state``.
Optionally force a passage through one or more intermediate
states first (e.g., ``pass_through_states=[ObsState.ABORTED]``
to force an abort/restart cycle before reaching the target).
"""
sm = self._build_subarray_state_machine(commands_inputs)
steps = list(pass_through_states or []) + [target_state]
remaining = ChainedAssertionsTimeout(timeout)
remaining.start()
for step in steps:
sm.reach(step, timeout=remaining)
A few observations worth highlighting:
The state machine is rebuilt on every call to
reach_obs_state. This is intentional: it keeps the code stateless and avoids stale action objects being reused across test steps.The
pass_through_statesparameter is the mechanism for forcing a specific path. For example, passing[ObsState.ABORTED]before a target ofObsState.EMPTYguarantees the subarray goes through an abort/restart cycle, which is sometimes needed in teardown.The
_build_wait_obs_state_actionpattern shows how to express a transition that sends no command but simply waits for the device to leave a transient state on its own — a natural fit forABORTINGandRESTARTING.Transition costs encode knowledge about the system:
cost=1for normal steps,cost=10for recoverable but disruptive operations,cost=50for truly last-resort ones. Dijkstra’s algorithm will always find the cheapest path, so the normal flow is preferred automatically.
The payoff: a single reusable BDD step.
With the Telescope wrapper in place, the entire state-navigation logic
is encapsulated and reusable. A pytest-BDD fixture (or any other test setup
step) reduces to a single call:
@given(parsers.parse(
"subarray {subarray_id} is in obsState {expected_state}"
))
def subarray_is_in_obsstate(
telescope: Telescope,
subarray_id: str,
expected_state: str,
) -> None:
"""Bring the subarray to the required obsState, from any starting point."""
target = ObsState[expected_state]
try:
telescope.reach_obs_state(
target_state=target,
commands_inputs=COMMANDS_INPUTS,
timeout=LARGE_TIMEOUT,
)
except Exception:
# Harder retry: force an abort/restart cycle first,
# then navigate to the target with a more generous timeout.
telescope.reach_obs_state(
target_state=target,
commands_inputs=COMMANDS_INPUTS,
pass_through_states=[ObsState.ABORTED],
timeout=VERY_LARGE_TIMEOUT,
)
This step works regardless of the current observation state. The state machine computes and executes the shortest path automatically. If something goes wrong, the retry forces a clean abort/restart before trying again — all without any conditional logic in the test step itself.