How to manage the operational state
This guide covers how to manage the operational state (a subset of the DevState enumeration) by driving the SKA OpStateModel, either directly in your device class inheriting from one of the base interfaces, or with a component manager using callbacks.
When a device is in AdminMode.OFFLINE, the SKA Control Model requires that the device does not communicate with its component (system under control) and reports its state as DevState.DISABLE. Whereas, when a device is in AdminMode.ONLINE, it should be monitoring and controlling its component.
The operational state has to be managed differently for:
Components which support different power states
When a device has established communication with its component, it should determine its current power state and update the operational state accordingly.
With the BaseInterface
Device classes inheriting from the BaseInterface are responsible for overriding the change_control_level() method which accepts a ControlLevel parameter used to determine whether it should start or stop monitoring its component. The operational state of the Tango device must be updated to match the state of the component under its control. We can drive the OpStateModel directly by calling the various component_<x>() methods which will “perform an action” on the state machine.
In the following example implementation of change_control_level(), when instructed to stop monitoring (ControlLevel.NO_CONTACT), we stop communicating with the component and perform the component_disconnected() action, transitioning the device to DevState.DISABLE. When we are instructed to monitor the component (ControlLevel.FULL_CONTROL), we first establish communication, check for any faults, determine the power state, and then transition the operational state via the appropriate component_<x>() action.
import ska_control_model as scm
import ska_tango_base as stb
class MyPoweredDevice(stb.base.BaseInterface):
...
def change_control_level(self, control_level: stb.base.ControlLevel) -> None:
"""Change how the device is interacting with the system under control."""
if control_level == stb.base.ControlLevel.NO_CONTACT:
self._component.disconnect()
self.component_disconnected()
elif control_level == stb.base.ControlLevel.FULL_CONTROL:
self._component.connect()
if self._component.has_fault():
self.component_fault()
else:
power_state: scm.PowerState = self._component.get_power_state()
match power_state:
case PowerState.UNKNOWN:
self.component_unknown()
case PowerState.NO_SUPPLY | PowerState.OFF:
self.component_off()
case PowerState.STANDBY:
self.component_standby()
case PowerState.ON:
self.component_on()
else:
raise ValueError(f"Unknown control_level {control_level}")
With a component manager
As described in the component managers concept page, when inheriting from the BaseComponentManager, your component manager is expected to drive the operational state of your device by calling _update_communication_state() and _update_component_state() with a communication_state, fault and power while monitoring your component.
The start_communicating() method should start attempting to monitor its component, for example, by launching a thread. This should result in a call to _update_communication_state() with CommunicationStatus.NOT_ESTABLISHED while attempts to monitor the component have not yet succeeded. Once the component manager has started monitoring, it must call _update_communication_state() with CommunicationStatus.ESTABLISHED.
import ska_tango_base as stb
class MyPoweredComponentManager(stb.base.BaseComponentManager):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._thread = None
self._shutdown_event = Event()
def _monitor(self) -> None:
"""Monitor our component."""
self._component.connect()
self._update_communication_state(CommunicationStatus.ESTABLISHED)
while not self._shutdown_event.is_set():
power = self._component.get_power_state()
fault = self._component.has_fault()
...
self._update_component_state(power=power, fault=fault, ...)
self._shutdown_event.wait(timeout=POLLING_PERIOD)
def start_communicating(self) -> None:
"""Establish communication with the component, then start monitoring."""
if self._thread is not None:
return # We are already attempting to communicate
self._thread = Thread(target=self._monitor)
self._shutdown_event.clear()
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
self._thread.start()
def stop_communicating(self) -> None:
"""Break off communication with the component."""
self._shutdown_event.set()
self._thread.join()
self._component.disconnect()
self._update_communication_state(CommunicationStatus.DISABLED)
Components which have only a single power state
Components which do not support multiple power states should always be in operational state DevState.ON if communication has been established and there is no fault.
With the BaseInterface
The operational state is transitioned to DevState.ON via the component_on() action as soon as communication has been established.
import ska_tango_base as stb
class MyAlwaysOnDevice(stb.base.BaseInterface):
...
def change_control_level(self, control_level: stb.base.ControlLevel) -> None:
"""Change how the device is interacting with the system under control."""
if control_level == stb.base.ControlLevel.NO_CONTACT:
self._component.disconnect()
self.component_disconnected()
elif control_level == stb.base.ControlLevel.FULL_CONTROL:
self._component.connect()
if self._component.has_fault():
self.component_fault()
else:
self.component_on()
else:
raise ValueError(f"Unknown control_level {control_level}")
With a component manager
The power should be set to PowerState.ON as soon as communication has been established.
import ska_tango_base as stb
class MyAlwaysOnComponentManager(stb.base.BaseComponentManager):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._thread = None
self._shutdown_event = Event()
def _monitor(self) -> None:
"""Monitor our component."""
self._component.connect()
self._update_communication_state(CommunicationStatus.ESTABLISHED)
self._update_component_state(power=PowerState.ON)
while not self._shutdown_event.is_set():
fault = self._component.has_fault()
...
self._update_component_state(fault=fault, ...)
self._shutdown_event.wait(timeout=POLLING_PERIOD)
def start_communicating(self) -> None:
"""Establish communication with the component, then start monitoring."""
if self._thread is not None:
return # We are already attempting to communicate
self._thread = Thread(target=self._monitor)
self._shutdown_event.clear()
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
self._thread.start()
def stop_communicating(self) -> None:
"""Break off communication with the component."""
self._shutdown_event.set()
self._thread.join()
self._component.disconnect()
self._update_communication_state(CommunicationStatus.DISABLED)
Note
These examples are polling their components in a separate thread. Consider using the PollingComponentManager. These examples are intended to be explanatory only.