Source code for ska_tango_base.poller.polling_component_manager

#
# (c) 2022 CSIRO.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.

"""This module provides a polling component manager."""

from __future__ import annotations

from collections.abc import Callable
from logging import Logger
from typing import Any

from ska_control_model import CommunicationStatus, PowerState

from ..base import BaseComponentManager
from ..base.base_component_manager import CommunicationStatusCallbackType
from .poller import Poller, PollModel, PollRequestT, PollResponseT


[docs] class PollingComponentManager( BaseComponentManager, PollModel[PollRequestT, PollResponseT] ): """Abstract base class for a component manager that polls its component."""
[docs] def __init__( self: PollingComponentManager[PollRequestT, PollResponseT], logger: Logger, communication_state_callback: CommunicationStatusCallbackType, component_state_callback: Callable[..., None], poll_rate: float = 0.1, **kwargs: Any, ) -> None: """ Initialise a new base component manager instance. :param logger: a logger for this component manager to use for logging :param communication_state_callback: callback to be called when the status of communications between the component manager and its component changes. :param component_state_callback: callback to be called when the state of the component changes. :param poll_rate: how often to poll, in seconds :param kwargs: initial values for additional attributes. """ self._poller = Poller(self, poll_rate, logger) super().__init__( logger, communication_state_callback, component_state_callback, power=PowerState.UNKNOWN, fault=None, **kwargs, )
[docs] def cleanup(self) -> None: """Cleanup the polling thread.""" self._poller.kill_polling_thread() super().cleanup()
[docs] def start_communicating( self: PollingComponentManager[PollRequestT, PollResponseT], ) -> None: """Start polling the component.""" if self.communication_state == CommunicationStatus.DISABLED: self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) # and we remain in NOT_ESTABLISHED until the polling loop is # actually talking to the spectrum analyser. It will tell us this # is the case by calling our `poll_succeeded` method. self._poller.start_polling()
[docs] def polling_started( self: PollingComponentManager[PollRequestT, PollResponseT], ) -> None: """ Respond to polling having started. This is a hook called by the poller when it starts polling. """
# There's no need to do anything here. We wait to receive some polled # values before we declare communication to be established.
[docs] def stop_communicating( self: PollingComponentManager[PollRequestT, PollResponseT], ) -> None: """Stop polling the spectrum analyser.""" if self.communication_state == CommunicationStatus.DISABLED: return # communication remains ESTABLISHED until the polling loop actually # stops polling. It will tell us that it has stopped by calling the # `polling_stopped` method. self._poller.stop_polling()
[docs] def polling_stopped( self: PollingComponentManager[PollRequestT, PollResponseT], ) -> None: """ Respond to polling having stopped. This is a hook called by the poller when it stops polling. """ self._update_component_state(power=PowerState.UNKNOWN, fault=None) self._update_communication_state(CommunicationStatus.DISABLED)
[docs] def poll_failed( self: PollingComponentManager[PollRequestT, PollResponseT], exception: Exception ) -> None: """ Respond to an exception being raised by a poll attempt. This is a hook called by the poller when an exception occurs. :param exception: the exception that was raised by a recent poll attempt. """ self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) self.logger.error(f"Poll failed: {repr(exception)}")
[docs] def poll_succeeded( self: PollingComponentManager[PollRequestT, PollResponseT], poll_response: PollResponseT, ) -> None: """ Handle a successful poll, including any values received. This is a hook called by the poller at the end of each successful poll. :param poll_response: response to the poll, including any values received. """ # Reiterate that communication is established, just in case it # had dropped out. self._update_communication_state(CommunicationStatus.ESTABLISHED)
[docs] def get_request( self: PollingComponentManager[PollRequestT, PollResponseT], ) -> PollRequestT: """ Return the reads and writes to be executed in the next poll. :raises NotImplementedError: because this class is abstract. :returns: reads and writes to be executed in the next poll. """ raise NotImplementedError( f"'get_request' method must be implemented by '{self.__class__.__name__}'. " "The parent 'PollingComponentManager' is an abstract base class." )
[docs] def poll( self: PollingComponentManager[PollRequestT, PollResponseT], poll_request: PollRequestT, ) -> PollResponseT: """ Poll the hardware. Connect to the hardware, write any values that are to be written, and then read all values. :param poll_request: specification of the reads and writes to be performed in this poll. :raises NotImplementedError: because this class is abstract. :return: responses to queries in this poll """ raise NotImplementedError( f"'poll' method must be implemented by '{self.__class__.__name__}'. " "The parent 'PollingComponentManager' is an abstract base class." )