Source code for ska_pst.lmc.util.streaming_task

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for class that is used in background processing that receives streamed results."""

from __future__ import annotations

__all__ = ["StreamingTask"]

import logging
import threading
from typing import Any, Callable, Generator, Generic, TypeVar

T = TypeVar("T")


[docs]class StreamingTask(Generic[T]): """ A class used to handle the background task of streamed items of data. This class is used by background monitoring and health check results for gRPC services. This has common code refactored out to handle the starting/stopping of the task, streaming of the results and handling of the results. """ def __init__( self: StreamingTask, task_name: str, item_generator: Callable[[threading.Event], Generator[T, None, None]], item_handler: Callable[[T], None], exception_handler: Callable[[Exception], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Create instance of a streaming task. This task takes an instance of a callable/partial function that takes a ``threading.Event`` returns a Python ``Generator`` which can be looped over. This generator will yield an item when ever an item is ready and the task will then pass the item to the item handler. An optional exception handler can be provided. If not provided the default implementation will log the exception and stop processing. No matter what, when an exception has been raised that background task will stop processing. :param task_name: the name of the task to use in logging :type task_name: str :param item_generator: a callable / partial function that takes a ``threading.Event`` and returns a generator of items that need to be handled. :type item_generator: Callable[[threading.Event], Generator[T, None, None]] :param item_handler: the callback to use when an item is available. This should not throw an exception. :type item_handler: Callable[[T], None] :param exception_handler: a callback that should be used if an exception has been raise. Defaults to just logging the exception. :type exception_handler: Callable[[Exception], None] | None, optional :param logger: an optional logger to use for logging warnings, defaults to None :type logger: logging.Logger | None, optional """ self.task_name = task_name self._item_generator = item_generator self._item_handler = item_handler self._exception_handler = exception_handler self._logger = logger or logging.getLogger(__name__) self._running = False self._condvar = threading.Condition() self._abort_event: threading.Event | None = None
[docs] def start(self: StreamingTask, abort_event: threading.Event | None = None, **kwargs: Any) -> None: """ Start the streaming of results from the generator. :param abort_event: a threading primitive to use to signal stopping of the task, defaults to None :type abort_event: threading.Event | None, optional """ self.stop() try: self._abort_event = abort_event or threading.Event() # this locks condition var with self._condvar: self._running = True self._condvar.notify_all() self._logger.debug(f"{self.task_name} has started") item_iter = self._item_generator(self._abort_event) while self._running: try: self._item_handler(next(item_iter)) except StopIteration: break self._logger.debug(f"{self.task_name} has completed successfully") except Exception as e: # the condvar has been released self._logger.warning(f"Error while handing {self.task_name}.", exc_info=True) if self._exception_handler is not None: self._exception_handler(e) finally: self._logger.debug(f"{self.task_name} has completed, cleaning up") with self._condvar: self._running = False self._condvar.notify_all()
[docs] def stop(self: StreamingTask, wait_for_generator: bool = True) -> None: """ Stop processing of background items. This will signal the generator, via the abort event used in the ``start()``, to stop streaming of items. By default this will wait for the item generator to complete successfully but some cases we want to ignore the generator, like when we want to put the LMC device into an OFFLINE state. This is okay to do as this task is already running in the background. :param wait_for_generator: indicator of whether to wait for the item generator to complete when this method is requested, defaults to True :type wait_for_iterator: bool, optional """ if not self._running: return if self._abort_event is not None: self._abort_event.set() # wait on the condvar so we know that the process has actually stopped if wait_for_generator: with self._condvar: self._condvar.wait(0.1) self._abort_event = None self._running = False
[docs] def is_running(self: StreamingTask) -> bool: """Get whether the task is running or not.""" return self._running