# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class file handling of long running commands (LRC) in a test environment."""
from __future__ import annotations
import dataclasses
import logging
import threading
from typing import Any, List, cast
from ska_control_model import ObsState, ResultCode, TaskStatus
from ska_tango_base.base.base_component_manager import JSONData
from ska_tango_base.faults import CommandError, ResultCodeError
from ska_tango_base.long_running_commands_api import LrcSubscriptions
from tango import DevError, DeviceProxy
[docs]@dataclasses.dataclass
class LrcResult:
"""A data class that used to represent the overall result of a long running command (LRC)."""
command: str
"""The name of the command that the result class relates to."""
command_id: str | None
"""The ID of the remote TANGO command."""
result_code: ResultCode
"""The result code of the long running command."""
result: Any | None = None
"""The result object of the long running command."""
exception: Exception | None = None
"""The exception, if raised, that was captured during the long running command."""
task_status_events: List[TaskStatus] = dataclasses.field(default_factory=list)
"""A list of `TaskStatus` that occurred during the long running command."""
[docs]class LongRunningCommand:
"""
A class that wraps calling the :py:func:`invoke_lrc` function on a device proxy.
Creating an instance of this class does not execute the command. This class is
a :py:class:`Callable` class and calling it will execute the long running command
and block until successful or a timeout occurs, unless an exception occurs or the
command fails.
.. code-block:: python
lrc = LongRunningCommand(command="ConfigureScan")
result = lrc(proxy=proxy, command_args=scan_configuration, timeout=60.0)
if result.result_code != ResultCode.OK:
...
"""
def __init__(self: LongRunningCommand, command: str, logger: logging.Logger | None = None) -> None:
"""
Create instance of a long running command.
:param command: the command name to execute
:type command: str
:param logger: the logger to use if exceptions occur, defaults to None
:type logger: logging.Logger | None, optional
"""
self.command = command
self._logger = logger or logging.getLogger(__name__)
self._obs_states: List[ObsState] = list()
def __call__(
self: LongRunningCommand,
*,
proxy: DeviceProxy,
command_args: tuple[Any] | None = None,
logger: logging.Logger | None = None,
timeout: float | None = None,
) -> LrcResult:
"""
Execute the long running command on the device proxy with given arguments.
:param proxy: the device proxy to run the command on.
:type proxy: DeviceProxy
:param command_args: the arguments to pass through when calling the remote command,
defaults to None
:type command_args: tuple[Any] | None, optional
:param logger: the logger to use during the long running command execution,
defaults to None
:type logger: logging.Logger | None, optional
:param timeout: the timeout, in seconds, to use when expecting remote command
to have completed by, defaults to None
:type timeout: float | None, optional
:raises TimeoutError: raised if the timeout has expired but the command hasn't completed.
:return: a view of the overall result of the command.
:rtype: LrcResult
"""
# the local import is used to allow mocking of invoke_lrc
from ska_tango_base.long_running_commands_api import invoke_lrc
tracker = _LrcTracker(command=self.command)
try:
# set the subscription to allow for it to be dropped when method goes out of scope
tracker._subscription = invoke_lrc(
lrc_callback=tracker,
proxy=proxy,
command=self.command,
command_args=command_args,
logger=logger,
)
tracker.wait(timeout=timeout)
except Exception as e:
self._logger.exception(f"Invoking of {self.command} raised an exception", exc_info=True)
tracker.exception = e
return tracker.result
class _LrcTracker:
"""
An internal class used to track the long running commands (LRC).
Instances of this are used as the callback to the SKA TANGO base
function :py:func:`invoke_lrc`.
This class will map exceptions to correct result codes and also
keep track of the changes in the remote command's task status events
changes.
This class is also used to hold on to the LRC subscription which
is needed to ensure TANGO events are subscribed to but also are
un-subscribed from when no longer needed (i.e. when this tracker is
dropped/deleted).
"""
_subscription: LrcSubscriptions | None
def __init__(self: _LrcTracker, command: str, logger: logging.Logger | None = None):
"""
Create and instance of the long running command tracker.
:param command: the command name, used in the result
:type command: str
:param logger: the logger to use for debugging, defaults to None
:type logger: logging.Logger | None, optional
"""
self.command = command
self._result_code = ResultCode.UNKNOWN
self._task_status_events: List[TaskStatus] = list()
self._result: Any | None = None
self._done = threading.Event()
self._subscription = None
self._exception: Exception | None = None
self._logger = logger or logging.getLogger(__name__)
@property
def command_id(self: _LrcTracker) -> str | None:
"""
Get the command id for the remote command.
Until a subscription is made there is no command id and
this method will return `None`.
:return: the command id for the remote command.
:rtype: str | None
"""
if self._subscription is None:
return None
return self._subscription.command_id
@property
def exception(self: _LrcTracker) -> Exception | None:
"""
Get the exception of the long running command, if one has been raised.
:return: the exception of the long running command, if one has been raised.
:rtype: Exception | None
"""
return self._exception
@exception.setter
def exception(self: _LrcTracker, exception: Exception) -> None:
"""
Set the exception from the long running command.
This setter will check the type of exception and infer the
result code. This happens if there exception is a
:py:class:`CommandError` or :py:class:`ResultCodeError`. For
all other exceptions the result code is set to `ResultCode.FAILED`,
even for timeout events.
:param exception: the exception raised during processing of long
running command.
:type exception: Exception
"""
self._exception = exception
if isinstance(exception, CommandError):
self._result_code = ResultCode.REJECTED
elif isinstance(exception, ResultCodeError):
[*_, result_code] = cast(str, exception.args[0]).split(" ")
self._result_code = ResultCode(int(result_code))
else:
self._result_code = ResultCode.FAILED
self._done.set()
@property
def result(self: _LrcTracker) -> LrcResult:
"""
Get the result of the long running command.
It is only valid to call this once the command has completed.
:return: the result of the long running command.
:rtype: LrcResult
"""
assert self._done.is_set(), (
f"expected {self.command}({self.command_id}) have completed before getting "
f"result. Current state is {self._result_code.name}"
)
return LrcResult(
command=self.command,
command_id=self.command_id,
result_code=self._result_code,
result=self._result,
exception=self.exception,
task_status_events=self._task_status_events,
)
def wait(self: _LrcTracker, timeout: float | None = None) -> None:
"""
Wait until the command has completed.
If the `timeout` is `None` then this will block until the command
completes.
:param timeout: the timeout, in seconds, to use when expecting remote command
to have completed by, defaults to None
:type timeout: float | None, optional
:raises TimeoutError: raised if `timeout` is not `None` but the command
doesn't complete before the timeout occurs.
"""
if not self._done.wait(timeout=timeout):
raise TimeoutError(f"{self.command} failed to return within {timeout:0.3f} seconds.")
def __call__(
self: _LrcTracker,
status: TaskStatus | None = None,
progress: int | None = None,
result: JSONData = None,
error: tuple[DevError] | None = None,
**kwargs: Any,
) -> None:
"""
Handle callback from the SKA TANGO base long running command framework.
:param status: a task status value if changed, defaults to None
:type status: TaskStatus | None, optional
:param progress: the progress of the long running command, defaults to None
:type progress: int | None, optional
:param result: the result of the command, defaults to None
:type result: JSONData, optional
:param error: any TANGO device errors that have been raised, defaults to None
:type error: tuple[DevError] | None, optional
"""
self._logger.debug(f"_LrcTracker called with: {status=}, {progress=}, {result=}, {error=}")
if error is not None:
self.exception = error[0]
if result is not None:
if isinstance(result, (tuple, list)) and len(result) > 0 and isinstance(result[0], int):
try:
self._result_code = ResultCode(result[0])
except Exception:
# the first value wasn't a valid result code
pass
self._result = result
if status is not None:
self._handle_status(status)
def _handle_status(self: _LrcTracker, status: TaskStatus) -> None:
self._task_status_events.append(status)
if self._result_code == ResultCode.UNKNOWN:
if status == TaskStatus.FAILED:
self._result_code = ResultCode.FAILED
if status == TaskStatus.ABORTED:
self._result_code = ResultCode.ABORTED
if status == TaskStatus.REJECTED:
self._result_code = ResultCode.REJECTED
if status == TaskStatus.COMPLETED:
self._result_code = ResultCode.OK
if status in [
TaskStatus.COMPLETED,
TaskStatus.ABORTED,
TaskStatus.FAILED,
TaskStatus.REJECTED,
]:
self._logger.debug(f"_LrcTracker {status=} is a completed state")
self._done.set()