Source code for ska_pst_lmc.util.remote_task

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
This module handles tasks that need to be called on device proxies.

Since commands on device proxies may be slow, long running commands, this
module provies classes that can submit and track remote tasks.

The :py:class:`RemoteTask` class deals with calling the task on a remote
device. It subscribes to events of the `longRunningCommandStatus` and
`longRunningCommandProgress` attributes and will proxy the values
of `progress` and `status` back to a task callback method.

The :py:class:`AggregateRemoteTask` class is used when the task has to be
performed on multiple remote devices. It aggregates the progress and the
status. When submitted it will also make sure that the task is run
to completion by checking if all task end, successfully or not.
"""

from __future__ import annotations

import logging
import time
from multiprocessing import Lock
from typing import Any, Callable, Dict, List, Optional, Tuple

from ska_tango_base.commands import ResultCode
from ska_tango_base.executor import TaskStatus

from ska_pst_lmc.device_proxy import PstDeviceProxy
from ska_pst_lmc.util.background_task import BackgroundTaskProcessor

TaskResponse = Tuple[TaskStatus, str]
RemoteTaskResponse = Tuple[ResultCode, str]
RemoteAction = Callable[[PstDeviceProxy], RemoteTaskResponse]

__all__ = [
    "RemoteTask",
    "AggregateRemoteTask",
]


[docs]class RemoteTask: """ Class to handle calling long run tasks on remote devices. This class takes instances of a device proxy and function/action to call when this task is called. It also takes a task_callback to be used when the process updates. This task will subscribe to events on the device proxy of the `longRunningCommandStatus` and `longRunningCommandProgress` attributes. When the task is deleted it will unsubscribe from the events too. This task has to be called (i.e. `task()`) for the action to be called. This allows for the task to be submitted as a background process/thread. """ command_id: Optional[str] = None status: TaskStatus = TaskStatus.STAGING progress: Optional[int] = None def __init__( self: RemoteTask, device: PstDeviceProxy, action: RemoteAction, task_callback: Callable, ): """ Initialise the task. :param device: the device proxy that the task is for. :param action: the action to perfom when called. :param task_callback: the callback to use when the task needs to update it's progress or status. """ # should only be able to subscribe to the long running stuff once. self.device = device self.action = action self.task_callback = task_callback self._status_subscription = self.device.subscribe_change_event( "longRunningCommandStatus", self._status_callback, ) self._progress_subscription = self.device.subscribe_change_event( "longRunningCommandProgress", self._progress_callback, ) def __del__(self: RemoteTask) -> None: """ Cleam up remote task. This makes sure it unsubscribes from events on the attributes of the remote device. """ self._status_subscription.unsubscribe() self._progress_subscription.unsubscribe() def __call__(self: RemoteTask, *arg: Any, **kwargs: Any) -> TaskResponse: """ Execute the task. This executes the action on the remote device. """ (result_code, details) = self.action(self.device) # OK = 0 => COMPLETED # STARTED = 1 => QUEUED # QUEUED = 2 => QUEUED # FAILED = 3 => FAILED # UNKNOWN = 4 => FAILED # REJECTED = 5 => REJECTED # NOT_ALLOWED = 6 => FAILED # ABORTED = 7 => ABORTED if result_code == ResultCode.OK: # this only happens if it's not a slow command self.status = TaskStatus.COMPLETED self.progress = 100 elif result_code == ResultCode.QUEUED: self.status = TaskStatus.QUEUED self.command_id = details self.progress = 0 elif result_code == ResultCode.REJECTED: self.status = TaskStatus.REJECTED return self.status, details def _progress_callback(self: RemoteTask, progress_values: List[str]) -> None: """ Handle change to progress value of long running command. When the `longRunningCommandProgress` attribute value changes for the remote command, this method is called. It tries to find the value for the current process id, if found it will call the task_callback if the value has changed from its current value. """ value_iter = iter(progress_values) progress_values_map: Dict[str, int] = {k: int(v) for (k, v) in zip(value_iter, value_iter)} if self.command_id in progress_values_map: progress = progress_values_map[self.command_id] if progress != self.progress: self.progress = progress self.task_callback(progress=progress) def _status_callback(self: RemoteTask, status_values: List[str]) -> None: """ Handle change to status value of long running command. When the `longRunningCommandStatus` attribute value changes for the remote command, this method is called. It tries to find the value for the current process id, if found it will call the task_callback if the value has changed from its current value. """ value_iter = iter(status_values) status_values_map: Dict[str, TaskStatus] = { k: TaskStatus[v] for (k, v) in zip(value_iter, value_iter) } if self.command_id in status_values_map: status = status_values_map[self.command_id] if status != self.status: self.status = status self.task_callback(status=status)
[docs]class AggregateRemoteTask: """ Class for aggregation of multiple remote tasks. This class is used to submit multiple remotes tasks that can occur in parallel. Tasks are added to this aggregate by: .. code-block:: python task.add_remote_task( device, action=lambda d: d.Action(), ) Only after all the tasks have been added should this task be submitted. This task will also work out the progress and overall status of the task. In the case of progress it an average of the progress of all the subtasks but there is no weighting of those progress values. """ tasks: List[RemoteTask] status: TaskStatus def __init__( self: AggregateRemoteTask, task_callback: Callable, background: bool = True, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise aggregate task. :param task_callback: the callback to use when the task needs to update it's progress or status. :param background: whether to submit the task in the background or not. The default value is True. :param logger: the logger to use for logging from this task. """ self.tasks = list() self.status = TaskStatus.STAGING self.task_callback = task_callback self._lock = Lock() self._running = False self._completed = False self._background = background self._logger = logger or logging.getLogger(__name__) if background: self._background_task_processor = BackgroundTaskProcessor(default_logger=self._logger) def __call__(self: AggregateRemoteTask) -> TaskResponse: """Execute task.""" if self._background: self._background_task_processor.submit_task(action_fn=self._execute_task) else: self._execute_task() return TaskStatus.QUEUED, "Queued"
[docs] def add_remote_task( self: AggregateRemoteTask, device: PstDeviceProxy, action: RemoteAction ) -> RemoteTask: """ Add a remote task to be executed when this task executes. This can only be called while the task hasn't been already started. :param device: the device proxy that the new remote task relates to. :param action: the action to perfom on the remote device. """ if self._running: raise ValueError("Can't add remote task once called.") task = RemoteTask(device=device, action=action, task_callback=self._remote_task_updated) self.tasks.append(task) return task
def _execute_task(self: AggregateRemoteTask) -> None: """ Execute the task. This will submit all the remote tasks to run in parallel, i.e. that we don't need the first task to complete before even calling the 2nd. However, calling this method will make sure that this task runs to completion. """ with self._lock: # want to lock so we don't have updates before all the commands are submitted self._running = True self.status = TaskStatus.IN_PROGRESS self.task_callback(status=TaskStatus.IN_PROGRESS, progress=0) for t in self.tasks: (status_code, details) = t() if status_code == TaskStatus.REJECTED: self._logger.warning(f"Remote task for {t.device.fqdn} was rejected. Msg: {details}") self.task_callback(status=TaskStatus.REJECTED) return self._run_to_completion() def _run_to_completion(self: AggregateRemoteTask, timeout: float = 120.0, sleep: float = 0.1) -> None: """ Run current task to completion. This method will poll to see the current state is in a completed state. If the state is in a completed state it will exit, else this will sleep for the `sleep` argument value before checking and will timeout after the `timeout` value. :param timeout: the amount of time to wait before exiting if not complete. Default is 120 seconds. :param sleep: the amount to sleep between each check. Default is 0.1 second. """ elapsed = 0.0 while elapsed < timeout: with self._lock: if self.status not in [TaskStatus.STAGING, TaskStatus.QUEUED, TaskStatus.IN_PROGRESS]: self._completed = True return time.sleep(sleep) elapsed += sleep def _remote_task_updated(self: AggregateRemoteTask, **kwargs: Any) -> None: """ Handle remote task has been updated. This is called when any of the tasks call it's task_progress but it will check all the progress and status values of all tasks to determine the current progress and status, these values are then passed to the `task_callback` that was passed in during the construction of this task. """ # happy path with self._lock: assert len(self.tasks) > 0 progress = 0 status = TaskStatus.COMPLETED for t in self.tasks: progress += t.progress or 0 if status == TaskStatus.COMPLETED and t.status != TaskStatus.COMPLETED: status = t.status elif t.status == TaskStatus.IN_PROGRESS: status = t.status if status == TaskStatus.COMPLETED: progress = 100 else: progress = int(progress / len(self.tasks)) self.status = status self.task_callback( progress=progress, status=status, )