# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This is used to wrap an iterator with a timeout."""
from __future__ import annotations
import logging
import threading
from queue import Empty, Queue
from threading import Event, Thread
from typing import Iterable, Iterator, Optional, TypeVar, Union
T = TypeVar("T")
NO_TIMEOUT = 0.0
[docs]class TimeoutIterator(Iterable[T]):
"""
An iterator that can timeout.
The implementation of this uses a background thread to get the items and put them on a queue, while the
next functionality of this will block
"""
def __init__(
self: TimeoutIterator,
iterator: Iterator[T],
abort_event: Optional[Event] = None,
timeout: float = NO_TIMEOUT,
expected_period: float = 0.0,
background_thread_wait: bool = True,
) -> None:
"""Initialise iterator."""
self._iterator = iterator
self._timeout = timeout
self._queue: Queue[Union[T, BaseException]] = Queue()
self._done = False
self._abort_event = abort_event or threading.Event()
self._expected_period = expected_period
self._logger = logging.getLogger(__name__)
self._first = True
self._thread: Thread | None = None
self._background_thread_wait = background_thread_wait
def __del__(self: TimeoutIterator) -> None:
"""
Tear down iterator.
This makes sure that the background thread is notified to abort.
"""
self._abort_event.set()
def __iter__(self: TimeoutIterator) -> Iterator[T]:
"""Return self as an iterator."""
return self
def __next__(self: TimeoutIterator) -> T:
"""Return the next item."""
if self._thread is None:
self._thread = Thread(target=self.__background_iterate, daemon=True)
self._thread.start()
if self._done or self._abort_event.is_set():
raise StopIteration
# cannot guarantee that the iterator has produced anything yet
# wait for the expected period/interval on first call. This
# will ensure we either use the greater of timeout or 2 times
# the expected period.
#
# Previously this performed as thread sleep rather than using a
# timeout, which for health check lead up to 2 seconds before
# actual iteration of the first value.
timeout = max(self._timeout, 2.0 * self._expected_period) if self._first else self._timeout
self._first = False
try:
if self._timeout == NO_TIMEOUT:
data = self._queue.get()
else:
data = self._queue.get(timeout=timeout)
self._queue.task_done()
except Empty:
self._logger.debug(
f"Received Empty. Queue size is {self._queue.qsize()}. Queue object is {self._queue}"
)
self._done = True
if self._abort_event.is_set():
raise StopIteration
self._timedout = True
raise TimeoutError
if isinstance(data, BaseException):
if isinstance(data, StopIteration):
self._done = True
raise data
return data
def __background_iterate(self: TimeoutIterator) -> None:
"""Iterate over items in the background."""
try:
while True:
# This will block until the first message is received but
# means we don't have to wait for the polling interval for
# the first message if the producer starts producing
# immediately
item = next(self._iterator)
self._queue.put_nowait(item)
if self._background_thread_wait and self._expected_period > 0.0:
# this allows us not having to wait a full
# rate to get told to stop as the iterator
# will still block, but when the event is set
# it will notify this thread and it will stop
# blocking.
if self._abort_event.wait(timeout=self._expected_period):
raise StopIteration
else:
if self._abort_event.is_set():
raise StopIteration
except BaseException as e:
if not isinstance(e, StopIteration):
self._logger.warning(f"Exception raised in background processing: {e}", exc_info=False)
self._logger.debug("Exception details:", exc_info=True)
self._queue.put_nowait(e)