How to manage your operational state with BaseComponentManager
As described in Operational state management, when inheriting
from the
BaseComponentManager, your
component manager is expected to drive the operational state of your device by
calling
_update_communication_state()
and
_update_component_state()
with a communication_state, fault and power while
monitoring your component.
The operational state has to be managed differently for:
In both cases, the start_communicating() method should start attempting
to monitor its component, for example, by launching a thread. This should
result in a call to
_update_communication_state()
with CommunicationStatus.NOT_ESTABLISHED while attempts to
monitor the component have not yet succeeded. Once the component manager has
started monitoring, it must call
_update_communication_state()
with CommunicationStatus.ESTABLISHED.
The difference between the two types of components is how the power is
passed to the
_update_component_state()
method.
Components which support different power levels
Components which support multiple power level must read the current power level
from their component. This should then be passed on with a call to
_update_component_state().
The operational state of the device will not be updated until the power state has been supplied.
class MyPoweredComponentManager(BaseComponentManager):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._thread = None
self._shutdown_event = Event()
def _monitor(self) -> None:
"""Monitor our component."""
connection = make_connection_to_component()
self._update_communication_state(CommunicationStatus.ESTABLISHED)
while not self._shutdown_event.is_set():
power = connection.read_power_state()
fault = connection.read_fault()
...
self._update_component_state(power=power, fault=fault, ...)
self._shutdown_event.wait(timeout=POLLING_PERIOD)
def start_communicating(self) -> None:
"""Establish communication with the component, then start monitoring."""
if self._thread is not None:
return # We are already attempting to communicate
self._thread = Thread(target=self._monitor)
self._shutdown_event.clear()
self._thread.start()
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
def stop_communicating(self) -> None:
"""Break off communication with the component."""
self._shutdown_event.set()
self._thread.join()
self._update_communication_state(CommunicationStatus.DISABLED)
Components which have only a single power level
Components which do not support multiple power levels should always be in
operational state DevState.ON if communication has
been established and there is not fault. In this case, the power should
be set to PowerState.ON as soon as
communication has been established.
class MyAlwaysOnComponentManager(BaseComponentManager):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._thread = None
self._shutdown_event = Event()
def _monitor(self) -> None:
"""Monitor our component."""
connection = make_connection_to_component()
self._update_communication_state(CommunicationStatus.ESTABLISHED)
self._update_component_state(power=PowerState.ON)
while not self._shutdown_event.is_set():
fault = connection.read_fault()
...
self._update_component_state(fault=fault, ...)
self._shutdown_event.wait(timeout=POLLING_PERIOD)
def start_communicating(self) -> None:
"""Establish communication with the component, then start monitoring."""
if self._thread is not None:
return # We are already attempting to communicate
self._thread = Thread(target=self._monitor)
self._shutdown_event.clear()
self._thread.start()
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
def stop_communicating(self) -> None:
"""Break off communication with the component."""
self._shutdown_event.set()
self._thread.join()
self._update_communication_state(CommunicationStatus.DISABLED)
Note
These examples are polling their components in a separate thread. Consider
using the
PollingComponentManager.
These examples are intended to be explanatory only.