How to manage your operational state with BaseComponentManager

As described in Operational state management, when inheriting from the BaseComponentManager, your component manager is expected to drive the operational state of your device by calling _update_communication_state() and _update_component_state() with a communication_state, fault and power while monitoring your component.

The operational state has to be managed differently for:

In both cases, the start_communicating() method should start attempting to monitor its component, for example, by launching a thread. This should result in a call to _update_communication_state() with CommunicationStatus.NOT_ESTABLISHED while attempts to monitor the component have not yet succeeded. Once the component manager has started monitoring, it must call _update_communication_state() with CommunicationStatus.ESTABLISHED.

The difference between the two types of components is how the power is passed to the _update_component_state() method.

Components which support different power levels

Components which support multiple power level must read the current power level from their component. This should then be passed on with a call to _update_component_state().

The operational state of the device will not be updated until the power state has been supplied.

class MyPoweredComponentManager(BaseComponentManager):
  def __init__(self, *args: Any, **kwargs: Any) -> None:
    super().__init__(*args, **kwargs)
    self._thread = None
    self._shutdown_event = Event()

  def _monitor(self) -> None:
    """Monitor our component."""
    connection = make_connection_to_component()

    self._update_communication_state(CommunicationStatus.ESTABLISHED)

    while not self._shutdown_event.is_set():
      power = connection.read_power_state()
      fault = connection.read_fault()
      ...
      self._update_component_state(power=power, fault=fault, ...)
      self._shutdown_event.wait(timeout=POLLING_PERIOD)

  def start_communicating(self) -> None:
    """Establish communication with the component, then start monitoring."""
    if self._thread is not None:
      return # We are already attempting to communicate

    self._thread = Thread(target=self._monitor)
    self._shutdown_event.clear()
    self._thread.start()
    self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)

  def stop_communicating(self) -> None:
     """Break off communication with the component."""
     self._shutdown_event.set()
     self._thread.join()
     self._update_communication_state(CommunicationStatus.DISABLED)

Components which have only a single power level

Components which do not support multiple power levels should always be in operational state DevState.ON if communication has been established and there is not fault. In this case, the power should be set to PowerState.ON as soon as communication has been established.

class MyAlwaysOnComponentManager(BaseComponentManager):
  def __init__(self, *args: Any, **kwargs: Any) -> None:
    super().__init__(*args, **kwargs)
    self._thread = None
    self._shutdown_event = Event()

  def _monitor(self) -> None:
    """Monitor our component."""
    connection = make_connection_to_component()

    self._update_communication_state(CommunicationStatus.ESTABLISHED)
    self._update_component_state(power=PowerState.ON)

    while not self._shutdown_event.is_set():
      fault = connection.read_fault()
      ...
      self._update_component_state(fault=fault, ...)
      self._shutdown_event.wait(timeout=POLLING_PERIOD)

  def start_communicating(self) -> None:
    """Establish communication with the component, then start monitoring."""
    if self._thread is not None:
      return # We are already attempting to communicate

    self._thread = Thread(target=self._monitor)
    self._shutdown_event.clear()
    self._thread.start()
    self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)

  def stop_communicating(self) -> None:
     """Break off communication with the component."""
     self._shutdown_event.set()
     self._thread.join()
     self._update_communication_state(CommunicationStatus.DISABLED)

Note

These examples are polling their components in a separate thread. Consider using the PollingComponentManager. These examples are intended to be explanatory only.