# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for class that is used in background processing that receives streamed results."""
from __future__ import annotations
__all__ = ["StreamingTask"]
import logging
import threading
from typing import Any, Callable, Generator, Generic, TypeVar
T = TypeVar("T")
[docs]class StreamingTask(Generic[T]):
"""
A class used to handle the background task of streamed items of data.
This class is used by background monitoring and health check results
for gRPC services. This has common code refactored out to handle
the starting/stopping of the task, streaming of the results and
handling of the results.
"""
def __init__(
self: StreamingTask,
task_name: str,
item_generator: Callable[[threading.Event], Generator[T, None, None]],
item_handler: Callable[[T], None],
exception_handler: Callable[[Exception], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Create instance of a streaming task.
This task takes an instance of a callable/partial function that takes
a ``threading.Event`` returns a Python ``Generator`` which
can be looped over. This generator will yield an item when ever
an item is ready and the task will then pass the item to the
item handler.
An optional exception handler can be provided. If not provided the
default implementation will log the exception and stop processing. No
matter what, when an exception has been raised that background task will
stop processing.
:param task_name: the name of the task to use in logging
:type task_name: str
:param item_generator: a callable / partial function that takes a ``threading.Event``
and returns a generator of items that need to be handled.
:type item_generator: Callable[[threading.Event], Generator[T, None, None]]
:param item_handler: the callback to use when an item is available. This should
not throw an exception.
:type item_handler: Callable[[T], None]
:param exception_handler: a callback that should be used if an exception has been
raise. Defaults to just logging the exception.
:type exception_handler: Callable[[Exception], None] | None, optional
:param logger: an optional logger to use for logging warnings, defaults to None
:type logger: logging.Logger | None, optional
"""
self.task_name = task_name
self._item_generator = item_generator
self._item_handler = item_handler
self._exception_handler = exception_handler
self._logger = logger or logging.getLogger(__name__)
self._running = False
self._condvar = threading.Condition()
self._abort_event: threading.Event | None = None
[docs] def start(self: StreamingTask, abort_event: threading.Event | None = None, **kwargs: Any) -> None:
"""
Start the streaming of results from the generator.
:param abort_event: a threading primitive to use to signal stopping of the task, defaults to None
:type abort_event: threading.Event | None, optional
"""
self.stop()
try:
self._abort_event = abort_event or threading.Event()
# this locks condition var
with self._condvar:
self._running = True
self._condvar.notify_all()
self._logger.debug(f"{self.task_name} has started")
item_iter = self._item_generator(self._abort_event)
while self._running:
try:
self._item_handler(next(item_iter))
except StopIteration:
break
self._logger.debug(f"{self.task_name} has completed successfully")
except Exception as e:
# the condvar has been released
self._logger.warning(f"Error while handing {self.task_name}.", exc_info=True)
if self._exception_handler is not None:
self._exception_handler(e)
finally:
self._logger.debug(f"{self.task_name} has completed, cleaning up")
with self._condvar:
self._running = False
self._condvar.notify_all()
[docs] def stop(self: StreamingTask, wait_for_generator: bool = True) -> None:
"""
Stop processing of background items.
This will signal the generator, via the abort event used in the ``start()``,
to stop streaming of items.
By default this will wait for the item generator to complete successfully but
some cases we want to ignore the generator, like when we want to put
the LMC device into an OFFLINE state. This is okay to do as this task
is already running in the background.
:param wait_for_generator: indicator of whether to wait for the item
generator to complete when this method is requested, defaults to True
:type wait_for_iterator: bool, optional
"""
if not self._running:
return
if self._abort_event is not None:
self._abort_event.set()
# wait on the condvar so we know that the process has actually stopped
if wait_for_generator:
with self._condvar:
self._condvar.wait(0.1)
self._abort_event = None
self._running = False
[docs] def is_running(self: StreamingTask) -> bool:
"""Get whether the task is running or not."""
return self._running