Source code for ska_ser_skallop.connectors.remoting.tangobridge.control

"""Enable conccurent monitoring and control as both asyncio routines or concurrent threads."""
import asyncio
import atexit
import logging
from concurrent.futures import CancelledError, Future
from concurrent.futures.thread import ThreadPoolExecutor
from threading import Event, Thread
from typing import Any, Coroutine, List, NamedTuple, TypeVar, Union

logger = logging.getLogger(__name__)

T = TypeVar("T")
"""
.. Note to devs: sphinx-autodoc can't document typevars properly right now,
   see https://github.com/agronholm/sphinx-autodoc-typehints/issues/39

A generic type for elements of an iterable.
"""


[docs]class MonitoredTask(NamedTuple): """Represents a conccurrent task being monitored.""" future: Future result: Any = "pending" exception: Union[None, Exception] = None
[docs]class ControllerState(NamedTuple): """Represent a Controller state as a tuple of running and not running (mutually exclusively).""" running = Event() not_running = Event()
[docs]class Outcome(NamedTuple): """Bundles a future result with any exceptions raised from it.""" result: Any cancelled: Union[None, CancelledError] exception: Union[None, Exception]
[docs]class NamedFuture(NamedTuple): """Wraps a Conccurent Future as a Future class with a name.""" future: Future[Any] name: str
[docs] def poll_finished_with_exception( self, ignore_cancelled=True ) -> Union[None, BaseException]: """Check if the future has finished but ended with an exception. :param ignore_cancelled: Whether cancelled errors should be treated as an exception, default to True - meaning cancelled errors will be ignored :type ignore_cancelled: bool :returns: None if task has not finished yet or finished but no exception was raised otherwise returns the actual exception object. """ if self.future.done(): try: return self.future.exception(0) except CancelledError as error: if ignore_cancelled: return None return error return None
[docs] def done(self) -> bool: """Return True if the future was cancelled or finished executing. :returns: Return True if the future was cancelled or finished executing """ return self.future.done()
[docs] def cancel(self) -> bool: """Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. :returns: True if the Future was successfully cancelled """ return self.future.cancel()
[docs] def cancelled(self): """Return True if the future was cancelled. :returns: True if the future was cancelled """ return self.future.cancelled()
[docs] def block_until_complete(self, timeout: float) -> Outcome: """Block any further execution until future has completed. :param timeout: [description] :type timeout: float :return: The outcome as a combination of the result and any exceptions raised within Future """ try: result = self.future.result(timeout) return Outcome(result, None, None) except CancelledError as cancelled_error: return Outcome(None, cancelled_error, None) except Exception as exception: return Outcome(None, None, exception)
AbstractTask = Union[asyncio.Task, NamedFuture]
[docs]def cancel_future(future: Future, timeout=1): """Attempt to cancel a future task in case it is still pending. Will remain block if the future is running until it is completed :param future: The future that needs to be canceled :type future: Future :param timeout: The maximum amount of time to wait for future to finish, defaults to 1 :type timeout: int, optional """ if future.done(): return if future.cancel(): return # this means the future is already running and needs to be waited future.result(timeout)
[docs]class BaseControllerTasks: """Base object containing the common methods to handle a list of Tasks/Futures.""" submitted_tasks: List[AbstractTask] = [] failed_tasks: List[BaseException] = [] done_tasks: List[AbstractTask] = [] cancelled_tasks: List[AbstractTask] = []
[docs] def update(self): """Poll the state of concurrent tasks and set to done those that are finished.""" self.move_to_done()
[docs] def move_to_done(self): """Move any tasks that are completed to list of tasks in the done state.""" for index, task in enumerate(self.submitted_tasks): if task.done(): self.submitted_tasks.pop(index) self.done_tasks.append(task)
def _cancel_next(self): while self.submitted_tasks: index, task = next(enumerate(self.submitted_tasks)) if task.done(): self.submitted_tasks.pop(index) self.done_tasks.append(task) elif any([task.cancel(), task.cancelled()]): self.submitted_tasks.pop(index) self.cancelled_tasks.append(task)
[docs] def raise_any_exceptions(self): """Raise any exceptions generated from failed tasks. :raises Exception: The exception with the list of task exceptions """ if self.failed_tasks: raise Exception(self.failed_tasks)
[docs]class Futures(BaseControllerTasks): """A subsclass of BaseControllerTasks as a set of Future tasks.""" submitted_tasks: List[NamedFuture] = [] failed_tasks: List[BaseException] = [] done_tasks: List[NamedFuture] = [] cancelled_tasks: List[NamedFuture] = []
[docs] def check_for_exceptions(self): """Move any futures that has ended with an exception to that group.""" for done_task in self.done_tasks: if failed_task := done_task.poll_finished_with_exception(): self.failed_tasks.append(failed_task)
[docs] def update(self): """Move any done futures to the completed ones and excepted futures to exceptions.""" self.move_to_done() self.check_for_exceptions()
[docs] def add(self, future: Future, name=""): """Add a new Future to be monitored. :param future: The conccurent future to monitor :type future: Future :param name: A name to give to the future for logging purposes, deafult to "" :type name: str """ self.submitted_tasks.append(NamedFuture(future, name))
def _wait_for_running_futures_to_complete(self, timeout: float): for future in self.submitted_tasks: result = future.block_until_complete(timeout) if result.cancelled: self.cancelled_tasks.append(future) elif result.exception: self.failed_tasks.append(result.exception)
[docs] def cancel_pending_and_join(self): """Cancel any pending Futures and join the remaining ones until they have finished.""" # self.update() while self.submitted_tasks: self._cancel_next() self._wait_for_running_futures_to_complete(1) if self.cancelled_tasks: logger.debug( f"the following futures have been cancelled: {self.cancelled_tasks}" )
[docs]class Tasks(BaseControllerTasks): """A subsclass of BaseControllerTasks as a set of asyncio tasks.""" submitted_tasks: List[asyncio.Task] = [] failed_tasks: List[BaseException] = [] done_tasks: List[asyncio.Task] = [] cancelled_tasks: List[asyncio.Task] = []
[docs] def update(self): """Move any done tasks to the completed ones and excepted tasks to exceptions.""" self.move_to_done()
[docs] def add(self, task: asyncio.Task): """Add a new task to be monitored. :param task: the asyncio Task to be monitored :type task: asyncio.Task """ self.submitted_tasks.append(task)
[docs] async def cancel_pending(self): """Cancel any pending asyncio tasks. This will cause the task to raise a cancelled Error and finish. """ self.update() while self.submitted_tasks: self._cancel_next() await asyncio.sleep(0.001) if self.cancelled_tasks: logger.debug( f"the following async tasks have been cancelled: {self.cancelled_tasks}" )
[docs]class Controller: """Class to manage running asyncio tasks on a separate thread. The controller creates a separate deamon thread named "asyncio". The deamon thread contains an asyncio loop that is used to manage asyncio tasks. Thus the controller allows for a separate thread to run asyncio operations within a pytest environment. These asyncio operations can then be used by the Tango bridge for calls to external services. The controller allows for two separate ways of achieving concurrency: 1. From within a deamon like asynchronous routine that is running on the controller thread 2. From within the main thread as a dispatched Future resulting from running the async routine on that thread. The most typical use case is thus to first create a main asyncio routine that works like a deamon loop waiting for all asyncio tasks to complete in a gather command. .. code-block:: python controller = Controller() async def main(): do_a = controller.create_async_task(doA()) do_b = controller.create_async_task(doB()) await asyncio.gather(do_a, do_b) async def doA(): await asyncio.sleep(1) async def doB(): await asyncio.sleep(2) The main routine is then executed as a concurrent thread loaded onto the controller: .. code-block:: python main_thread = controller.dispatch_concurrent_routine(main()) main_thread.result() # wait until doA and doB have completed # gracefully tear down all tasks and futures that may still be running controller.stop() Note the controller has the ability to cancel all pending tasks (and Futures) when it gets terminated via the stop command (which is registered on the `atexit` method). This will result in a cancelled exception on an asyncio routine. """ task_polling_period = 0.2 def __init__(self) -> None: """Initialise the object.""" self._loop = asyncio.new_event_loop() self.clear_controller_deamon = asyncio.Event() self.thread_manager = ThreadPoolExecutor( max_workers=1, thread_name_prefix="asyncio" ) self.running = Event() self.controller_deamon = self._start_controlling_deamon() self.futures = Futures() self.async_tasks = Tasks() self._exception = None self.monitor_task = self._start_task_monitoring() atexit.register(self.stop) def _start_controlling_deamon(self) -> Thread: thread = Thread(target=self._async_deamon, daemon=True, name="asyncio") thread.start() return thread def _async_deamon(self): asyncio.set_event_loop(self._loop) self.running.set() self._loop.run_forever() def _start_task_monitoring(self) -> "Future[None]": return asyncio.run_coroutine_threadsafe(self._poll_tasks(), self._loop) async def _poll_tasks(self): while self.running.is_set(): await asyncio.sleep(self.task_polling_period) self.futures.update() self.async_tasks.update() async def _cancel_pending_async_tasks_routine(self): await self.async_tasks.cancel_pending() def _cancel_pending_async_tasks(self): asyncio.run_coroutine_threadsafe( self._cancel_pending_async_tasks_routine(), self._loop ).result()
[docs] def get_loop(self) -> asyncio.AbstractEventLoop: """Return the event loop being used to generate async tasks on the thread. :return: the event loop. """ return self._loop
[docs] def create_async_task( self, routine: Coroutine[Any, Any, T], name=None ) -> asyncio.Task[T]: """Run an asynchronous routine concurrently within a given loop. Note this must be called from within a currently executing routine within that loop. :param routine: the routine to run :type routine: Coroutine[Any, Any, T] :param name: The name to be given to the task, defaults to None which will result in the name being the same as the coroutine function name. :type name: str :return: the coroutine wrapped as a future allowing asynchronous awaiting """ if not name: name = routine.__name__ async_task = self._loop.create_task(routine, name=name) setattr(async_task, "trace", routine.cr_code) self.async_tasks.add(async_task) return async_task
[docs] def dispatch_concurrent_routine( self, routine: Coroutine[Any, Any, T], name="" ) -> "Future[T]": """ Dispatch a separate thread to run an asynchronous task on its event loop. :param routine: an asynchronous routine that will be run on an event loop in a separate thread. Note any running or pending tasks in the thread will be cancelled during tear down of the thread. :type routine: Coroutine[Any, Any, T] :param name: The name to be given to the task, defaults to "" which will result in the name being the same as the coroutine function name. :type name: str :return: a Future representing the concurrent execution of the task, that can be waited upon at some later time to get the result. """ if not name: name = routine.__name__ future = asyncio.run_coroutine_threadsafe(routine, self._loop) setattr(future, "trace", routine.cr_code) self.futures.add(future, name) return future
[docs] def run_async_task( self, routine: Coroutine[Any, Any, T], name="", timeout=100 ) -> T: """ Run an async task on a separate controller thread. Block until the task has finished or raised an exception. :param routine: an asynchronous routine that will be run on an event loop in a separate thread. Note any running or pending tasks in the thread will be cancelled during tear down of the thread. :type routine: Coroutine[Any, Any, T] :param timeout: A maximum amount of time to wait for the result of the task, default is 100s :param name: The name to be given to the task, defaults to "" which will result in the name being the same as the coroutine function name. :type name: str :raises TimeoutError: if the task did not return within given timeout period :return: the result of the asynchronous task """ if not name: name = routine.__name__ future_result = self.dispatch_concurrent_routine(routine, name) try: result = future_result.result(timeout=timeout) except TimeoutError as exception: raise TimeoutError( f"Timed out after {timeout}" f"whilst trying to execute co_routine: \n{routine.cr_code}" ) from exception return result
[docs] def stop(self): """Perform a gracefull teardown of pending tasks.""" if self.running.is_set(): self.running.clear() self._cancel_pending_async_tasks() self.futures.cancel_pending_and_join() self.futures.raise_any_exceptions()