Source code for ska_pst.testutils.tango.attributes_monitor

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class file for helping with attribute monitoring."""

from __future__ import annotations

__all__ = [
    "AttributesMonitor",
]

import dataclasses
import functools
import logging
from dataclasses import field
from datetime import datetime
from typing import Any, Callable

import backoff
import tango
from readerwriterlock import rwlock
from ska_pst.lmc.device_proxy import ChangeEvent, ChangeEventSubscription, PstDeviceProxy


@dataclasses.dataclass(kw_only=True)
class AttributeHistoryEvent:
    value: Any
    update_time: datetime = field(init=False)

    def __post_init__(self: AttributeHistoryEvent) -> None:
        self.update_time = datetime.now()


class _AttributeHistory:
    """Class representing the history of an attribute."""

    def __init__(self: _AttributeHistory, attribute_name: str, initial_value: Any) -> None:
        """Create instance of an attribute history.

        :param attribute_name: the name of the attribute to track the history of.
        :param initial_value: the initial value of the attribute.
        """
        self.attribute_name = attribute_name
        self._lock = rwlock.RWLockWrite()
        self._history: list[AttributeHistoryEvent] = [AttributeHistoryEvent(value=initial_value)]

    @property
    def current_value(self: _AttributeHistory) -> Any:
        with self._lock.gen_rlock():
            return self._history[-1].value

    def _update_value(self: _AttributeHistory, value: Any) -> None:
        """Update the current value of the attribute."""
        with self._lock.gen_wlock():
            if self._history[-1].value != value:
                self._history.append(AttributeHistoryEvent(value=value))

    @property
    def history(self: _AttributeHistory) -> list[Any]:
        """Get history of the attribute."""
        with self._lock.gen_rlock():
            return [v.value for v in self._history]

    @property
    def history_events(self: _AttributeHistory) -> list[AttributeHistoryEvent]:
        """Get history of the attribute including time of update."""
        with self._lock.gen_rlock():
            # do a shallow copy. Don't return actual
            # list as that could update
            return [*self._history]

    def wait_for_update(self: _AttributeHistory, timeout: float = 5.0) -> None:
        """Wait for the attribute to update."""
        # get current value - this property has
        # a read lock.
        current_value = self.current_value

        def _raise_timeout_error(*args: Any, **kwargs: Any) -> None:
            raise TimeoutError()

        @backoff.on_predicate(
            backoff.expo,
            on_giveup=_raise_timeout_error,
            factor=0.1,
            max_time=timeout,
        )
        def _check_updated() -> bool:
            # don't use a lock here as not needed.
            return current_value != self._history[-1].value

        _check_updated()

    def reset(self: _AttributeHistory, value: Any) -> None:
        """
        Reset the history.

        This will clear the history events and add a new event with
        the provided value.

        :param value: the value to reset the history to.
        :type value: Any
        """
        with self._lock.gen_wlock():
            self._history.clear()
            self._history.append(AttributeHistoryEvent(value=value))


[docs]class AttributesMonitor: """Class used to monitor the attributes of a Tango device. This class can be used to track multiple attributes of a Tango class and then be used to assert values or wait for when an attribute is updated. Creating the instance of this class does nothing. The `setup` method must be called afterwards to ensure that attributes are monitored. """ def __init__( self: AttributesMonitor, device_proxy: PstDeviceProxy, attribute_names: list[str], logger: logging.Logger | None = None, ) -> None: """ Create an instance of a attribute monitor. :param device_proxy: the device proxy to monitor attribute values for. :type device_proxy: PstDeviceProxy :param attribute_names: the name of all the attributes to monitor. :type attribute_names: list[str] :param logger: a logger instance to use when an attribute's quality is not ATTR_VALID, defaults to None :type logger: logging.Logger | None, optional """ self.logger = logger or logging.getLogger(__name__) self.device_proxy = device_proxy self.attribute_names = attribute_names self.attribute_histories: dict[str, _AttributeHistory] = {} self.attribute_subscriptions: dict[str, ChangeEventSubscription] = {} self.previous_attribute_values: dict = {} def __del__(self: AttributesMonitor) -> None: """Ensure cleanup on delete.""" self.teardown()
[docs] def setup(self: AttributesMonitor) -> None: """Set up monitoring for attributes.""" for attr in self.attribute_names: initial_value = getattr(self.device_proxy, attr) self.previous_attribute_values[attr] = initial_value self.attribute_histories[attr] = _AttributeHistory( attribute_name=attr, initial_value=initial_value ) self.attribute_subscriptions[attr] = self.device_proxy.subscribe_change_event( attr, functools.partial(self._handle_attribute_event, attr) )
[docs] def teardown(self: AttributesMonitor) -> None: """Teardown the monitor. This will unsubscribe from Tango events of the attributes. """ for s in self.attribute_subscriptions.values(): s.unsubscribe() self.attribute_subscriptions.clear() self.attribute_histories.clear()
def _handle_attribute_event( self: AttributesMonitor, attribute: str, event: ChangeEvent, *args: Any, **kwargs: Any, ) -> None: """ Handle an event that updates the attribute on the Tango device. Note we will get the initial value being sent to us via Tango. :param attribute: the name of the attribute :type attribute: str :param event: the event details, which includes the value and attribute quality. :type event: ChangeEvent """ attr_history = self.attribute_histories[attribute] attr_history._update_value(event.value) if event.quality != tango.AttrQuality.ATTR_VALID: self.logger.warning( f"{self.device_proxy}.{attribute} is no longer valid. {event=}", exc_info=False ) @property def current_attribute_values(self: AttributesMonitor) -> dict: """Get current attribute values for device.""" self.capture_current_values() return self.previous_attribute_values
[docs] def capture_current_values(self: AttributesMonitor) -> None: """Capture the current values to allow for asserting of updates later.""" self.previous_attribute_values = {k: a.current_value for k, a in self.attribute_histories.items()}
[docs] def assert_attribute( self: AttributesMonitor, attribute: str, value_assertion: Callable[..., bool] ) -> None: """Assert and attribute has a given value. This is a helper method to get the attribute's history and then passes it to the value_assertion callable. :param attribute: the name of the attribute to assert against. :param value_assertion: a callable to assert against the latest value of the attribute. """ value = self.attribute_histories[attribute].current_value assert value_assertion( value ), f"Attribute '{attribute}' did not meet value assertion. Current value = {value}"
[docs] def assert_attribute_values_changed(self: AttributesMonitor) -> None: """Assert that attribute values have changed since last check.""" prev_values = self.previous_attribute_values curr_values = self.current_attribute_values assert prev_values != curr_values
[docs] def assert_attribute_values_not_changed(self: AttributesMonitor) -> None: """Assert that attribute values have not changed since last check.""" prev_values = self.previous_attribute_values curr_values = self.current_attribute_values assert prev_values == curr_values
[docs] def wait_for_attribute_update(self: AttributesMonitor, attribute_name: str, timeout: float) -> None: """Wait for attribute to be updated. Waits until there has been an update for the specific attribute or a timeout has occurred. :param attribute_name: the attribute to wait for an update of. :param timeout: how long to wait for an update before raising an exception. """ self.attribute_histories[attribute_name].wait_for_update(timeout=timeout)
[docs] def get_attribute_history_events( self: AttributesMonitor, attribute_name: str ) -> list[AttributeHistoryEvent]: """ Get list of history events for an attribute. :param attribute_name: the attribute to get the history events for. :type attribute_name: str :return: _description_ :rtype: list[AttributeHistoryEvent] """ return self.attribute_histories[attribute_name].history_events
[docs] def reset_attribute_history(self: AttributesMonitor, attribute_name: str, value: Any) -> None: """ Reset the attribute history of a given attribute. :param attribute_name: attribute name to reset :type attribute_name: str :param value: the value to reset attribute to :type value: Any """ self.attribute_histories[attribute_name].reset(value)