# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module to Tango specific test utilities."""
from __future__ import annotations
__all__ = [
"TangoDeviceCommandChecker",
"TangoChangeEventHelper",
"TangoCommandResult",
]
import logging
from typing import Any, Callable, Dict, List, Tuple
import tango
from readerwriterlock import rwlock
from ska_control_model import ObsState, ResultCode, TaskStatus
from ska_pst.lmc.util import LongRunningCommand, LrcResult
from ska_tango_base.base.base_component_manager import JSONData
from ska_tango_testing.mock.tango import MockTangoEventCallbackGroup
TangoCommandResult = Tuple[List[ResultCode], List[str]]
[docs]class TangoDeviceCommandChecker:
"""A convenience class used to help check a Tango Device command.
This class can be used to check that a command executed on a
DeviceProxy fires the correct change events for task status,
the completion state, and any changes through the ObsState.
"""
def __init__(
self: TangoDeviceCommandChecker,
tango_change_event_helper: TangoChangeEventHelper,
change_event_callbacks: MockTangoEventCallbackGroup,
logger: logging.Logger | None = None,
) -> None:
"""Initialise command checker."""
self._device = device = tango_change_event_helper.device_under_test
self._logger = logger or logging.getLogger(__name__)
self._lrc_tracker = _LongRunningCommandTracker(
device=device,
logger=logger,
)
def _subscribe(property: str) -> None:
value = getattr(device, property)
tango_change_event_helper.subscribe(property)
try:
# ignore the first event. This should be able to clear out the events
change_event_callbacks[property].assert_change_event(value)
except Exception:
self._logger.warning(
f"Asserting {device}.{property} to be {value} failed.",
exc_info=True,
)
_subscribe("obsState")
self.change_event_callbacks = change_event_callbacks
self._tango_change_event_helper = tango_change_event_helper
self._command_states: Dict[str, str] = {}
self.prev_command_result: LrcResult | None = None
[docs] def assert_command( # noqa: C901 - override checking of complexity for this test
self: TangoDeviceCommandChecker,
command: str,
command_args: tuple[Any] | None = None,
expected_command_result: JSONData = [ResultCode.OK.value, "Completed successfully"],
expected_command_status_events: List[TaskStatus] = [
TaskStatus.STAGING,
TaskStatus.QUEUED,
TaskStatus.IN_PROGRESS,
TaskStatus.COMPLETED,
],
expected_obs_state_events: List[ObsState] = [],
timeout: float = 10.0,
**kwargs: Any,
) -> None:
"""
Assert that the command has the correct result and events.
This method has sensible defaults of the expected result code,
the overall result, and the status events that the command
goes through, and by default asserts that the ObsState model
doesn't change.
:param command: the name of the command to call on the remote TANGO device
:type command: str
:param command_args: the optional arguments to pass through to TANGO device,
defaults to None
:type command_args: tuple[Any] | None, optional
:param expected_command_result: the expected command result when the command completes,
defaults to [ResultCode.OK.value, "Completed successfully"].
:type expected_command_result: JSONData, optional
:param expected_command_status_events: a list of expected
status events of the command, these should be in the
order the events happen. Default expected events are:
[ TaskStatus.STAGING, TaskStatus.QUEUED, TaskStatus.IN_PROGRESS, TaskStatus.COMPLETED, ]
:type expected_command_status_events: List[TaskStatus], optional
:param expected_obs_state_events: the expected events of the ObsState
model. The default is an empty list, meaning no events expected.
:type expected_obs_state_events: List[ObsState], optional
:param timeout: expected length of time for the results of the command
to take, defaults to 10.0.
:type timeout: float, optional
"""
current_obs_state = self._device.obsState
lrc = LongRunningCommand(command=command)
self.prev_command_result = command_result = lrc(
proxy=self._device, command_args=command_args, timeout=timeout
)
if self.prev_command_result.exception is not None:
raise self.prev_command_result.exception
if len(expected_command_status_events) > 0:
assert command_result.task_status_events == expected_command_status_events
self._lrc_tracker.assert_command_status_events(
command_result=command_result,
expected_task_status_events=expected_command_status_events,
)
if expected_command_result is not None:
assert (
expected_command_result == command_result.result
), f"expected {expected_command_result} but got {command_result.result}"
if expected_obs_state_events and [current_obs_state] != expected_obs_state_events:
for expected_obs_state in expected_obs_state_events:
self._logger.debug(f"Checking next obsState event is {expected_obs_state.name}")
self.change_event_callbacks["obsState"].assert_change_event(
attribute_value=expected_obs_state.value,
)
else:
self._logger.debug("Checking obsState does not change.")
self.change_event_callbacks["obsState"].assert_not_called()
[docs]class TangoChangeEventHelper:
"""Internal testing class used for handling change events."""
def __init__(
self: TangoChangeEventHelper,
device_under_test: tango.DeviceProxy,
change_event_callbacks: MockTangoEventCallbackGroup,
logger: logging.Logger | None = None,
) -> None:
"""Initialise change event helper."""
self.device_under_test = device_under_test
self.change_event_callbacks = change_event_callbacks
self.subscriptions: Dict[str, int] = {}
self.logger = logger or logging.getLogger(__name__)
def __del__(self: TangoChangeEventHelper) -> None:
"""Free resources held."""
self.release()
[docs] def subscribe(self: TangoChangeEventHelper, attribute_name: str) -> None:
"""Subscribe to change events of an attribute.
This returns a MockChangeEventCallback that can
then be used to verify changes.
"""
def _handle_evt(*args: Any, **kwargs: Any) -> None:
self.logger.debug(f"Event received with: args={args}, kwargs={kwargs}")
self.change_event_callbacks[attribute_name](*args, **kwargs)
subscription_id = self.device_under_test.subscribe_event(
attribute_name,
tango.EventType.CHANGE_EVENT,
_handle_evt,
)
self.logger.debug(f"Subscribed to events of '{attribute_name}'. subscription_id = {subscription_id}")
self.subscriptions[attribute_name] = subscription_id
[docs] def release(self: TangoChangeEventHelper) -> None:
"""Release any subscriptions that are held."""
for name, subscription_id in self.subscriptions.items():
self.logger.debug(f"Unsubscribing to '{name}' with subscription_id = {subscription_id}")
self.device_under_test.unsubscribe_event(subscription_id)
self.subscriptions.clear()
[docs] def assert_change_event(
self: TangoChangeEventHelper,
attribute_name: str,
attribute_value: Any,
**kwargs: Any,
) -> dict:
"""
Assert that the callback received a change event with the given value.
This is a helper method that delegates to the
``MockTangoEventCallbackGroup.assert_change_event``
:param attribute_name: name of the attribute to assert a change event against.
:type attribute_name: str
:param attribute_value: new value of the attribute for which the
change event has been sent
:type attribute_value: Any
:return: details of the change event
:raises AssertionError: if the asserted call has not occurred
within the timeout period
"""
return self.change_event_callbacks.assert_change_event(attribute_name, attribute_value, **kwargs)
class _TangoAttributeEventSubscription:
def __init__(
self: _TangoAttributeEventSubscription,
device: tango.DeviceProxy,
attribute: str,
evt_handler: Callable[[Any], None] = lambda x: None,
logger: logging.Logger | None = None,
) -> None:
"""Create instance of event subscription."""
self._device = device
self._attribute = attribute
self._evt_handler = evt_handler
self._logger = logger or logging.getLogger(__name__)
self._subscription_id = self._device.subscribe_event(
attribute,
tango.EventType.CHANGE_EVENT,
self.handle_event,
)
def __del__(self: _TangoAttributeEventSubscription) -> None:
"""Cleanup subscription."""
try:
self._device.unsubscribe_event(self._subscription_id)
except Exception:
self._logger.warning(
f"Exception occurred when trying to unsubscribe from change events for {self._attribute}",
exc_info=True,
)
def handle_event(self: _TangoAttributeEventSubscription, event: tango.EventData) -> Any:
"""Handle event data for attribute event."""
try:
self._logger.debug(f"Received event for {self._attribute}, event = {event}")
if event.err:
self._logger.warning(f"Received failed change event: error stack is {event.errors}.")
return
elif event.attr_value is None:
warning_message = (
"Received change event with empty value. Falling back to manual "
f"attribute read. Event.err is {event.err}. Event.errors is\n"
f"{event.errors}."
)
self._logger.warning(warning_message)
value = self._device.read_attribute(self._attribute)
else:
value = event.attr_value
if isinstance(value, tango.DeviceAttribute):
value = value.value
if value is None:
return
self._logger.debug(
f"Received event callback for {self._device}.{self._attribute} with value: {value}"
)
self._evt_handler(value)
except Exception:
self._logger.exception("Error in handling of event", exc_info=True)
class _LongRunningCommandTracker:
"""A convenience class used to help check a Tango Device command.
This class can be used to check that a command executed on a
:py:class:`DeviceProxy` fires the correct change events
for task status, the completion state, and any changes through
the :py:class:`ObsState`.
"""
def __init__(
self: _LongRunningCommandTracker, device: tango.DeviceProxy, logger: logging.Logger | None = None
) -> None:
"""
Initialise command checker.
:param device: the TANGO device proxy to perform command on.
:type device: tango.DeviceProxy
:param logger: the logger to use if warnings are raised, defaults to None
:type logger: logging.Logger | None, optional
"""
self._device = device
self._lock = rwlock.RWLockWrite()
self._logger = logger or logging.getLogger(__name__)
def assert_command_status_events(
self: _LongRunningCommandTracker,
command_result: LrcResult,
expected_task_status_events: List[TaskStatus] = [
TaskStatus.QUEUED,
TaskStatus.IN_PROGRESS,
TaskStatus.COMPLETED,
],
) -> None:
"""
Assert that the command has the correct status events.
:param command_result: the result of the long running command
:type command_result: LrcResult
:param expected_task_status_events: a list of expected
status events of the command, these should be in the
order the events happen. Default expected events are:
[TaskStatus.QUEUED, TaskStatus.IN_PROGRESS, TaskStatus.COMPLETED]
:type expected_task_status_events: List[TaskStatus], optional
"""
task_status_events = command_result.task_status_events
assert len(task_status_events) > 0, "Expected at least 1 task status event."
if (
task_status_events[-1] == TaskStatus.FAILED
and TaskStatus.FAILED not in expected_task_status_events
):
self._logger.warning(
f"Command {command_result.command_id} failed but expected to end up "
f"in {task_status_events[-1].name} state."
)
assert expected_task_status_events == task_status_events, (
f"Expected command status events to be {expected_task_status_events} "
f"but received {task_status_events}"
)