# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class file for helping with attribute monitoring."""
from __future__ import annotations
__all__ = [
"AttributesMonitor",
]
import dataclasses
import functools
import logging
from dataclasses import field
from datetime import datetime
from typing import Any, Callable, Dict, List
import backoff
import tango
from readerwriterlock import rwlock
from ska_pst.lmc.device_proxy import ChangeEvent, ChangeEventSubscription, PstDeviceProxy
@dataclasses.dataclass(kw_only=True)
class AttributeHistoryEvent:
value: Any
update_time: datetime = field(init=False)
def __post_init__(self: AttributeHistoryEvent) -> None:
self.update_time = datetime.now()
class _AttributeHistory:
"""Class representing the history of an attribute."""
def __init__(self: _AttributeHistory, attribute_name: str, initial_value: Any) -> None:
"""Create instance of an attribute history.
:param attribute_name: the name of the attribute to track the history of.
:param initial_value: the initial value of the attribute.
"""
self.attribute_name = attribute_name
self._lock = rwlock.RWLockWrite()
self._history: List[AttributeHistoryEvent] = [AttributeHistoryEvent(value=initial_value)]
@property
def current_value(self: _AttributeHistory) -> Any:
with self._lock.gen_rlock():
return self._history[-1].value
def _update_value(self: _AttributeHistory, value: Any) -> None:
"""Update the current value of the attribute."""
with self._lock.gen_wlock():
if self._history[-1].value != value:
self._history.append(AttributeHistoryEvent(value=value))
@property
def history(self: _AttributeHistory) -> List[Any]:
"""Get history of the attribute."""
with self._lock.gen_rlock():
return [v.value for v in self._history]
@property
def history_events(self: _AttributeHistory) -> List[AttributeHistoryEvent]:
"""Get history of the attribute including time of update."""
with self._lock.gen_rlock():
# do a shallow copy. Don't return actual
# list as that could update
return [*self._history]
def wait_for_update(self: _AttributeHistory, timeout: float = 5.0) -> None:
"""Wait for the attribute to update."""
# get current value - this property has
# a read lock.
current_value = self.current_value
def _raise_timeout_error(*args: Any, **kwargs: Any) -> None:
raise TimeoutError()
@backoff.on_predicate(
backoff.expo,
on_giveup=_raise_timeout_error,
factor=0.1,
max_time=timeout,
)
def _check_updated() -> bool:
# don't use a lock here as not needed.
return current_value != self._history[-1].value
_check_updated()
[docs]class AttributesMonitor:
"""Class used to monitor the attributes of a Tango device.
This class can be used to track multiple attributes of a Tango class
and then be used to assert values or wait for when an attribute is
updated.
Creating the instance of this class does nothing. The `setup` method
must be called afterwards to ensure that attributes are monitored.
"""
def __init__(
self: AttributesMonitor,
device_proxy: PstDeviceProxy,
attribute_names: List[str],
logger: logging.Logger | None = None,
) -> None:
"""
Create an instance of a attribute monitor.
:param device_proxy: the device proxy to monitor attribute values for.
:type device_proxy: PstDeviceProxy
:param attribute_names: the name of all the attributes to monitor.
:type attribute_names: List[str]
:param logger: a logger instance to use when an attribute's quality is not ATTR_VALID,
defaults to None
:type logger: logging.Logger | None, optional
"""
self.logger = logger or logging.getLogger(__name__)
self.device_proxy = device_proxy
self.attribute_names = attribute_names
self.attribute_histories: Dict[str, _AttributeHistory] = {}
self.attribute_subscriptions: Dict[str, ChangeEventSubscription] = {}
self.previous_attribute_values: dict = {}
def __del__(self: AttributesMonitor) -> None:
"""Ensure cleanup on delete."""
self.teardown()
[docs] def setup(self: AttributesMonitor) -> None:
"""Set up monitoring for attributes."""
for attr in self.attribute_names:
initial_value = getattr(self.device_proxy, attr)
self.previous_attribute_values[attr] = initial_value
self.attribute_histories[attr] = _AttributeHistory(
attribute_name=attr, initial_value=initial_value
)
self.attribute_subscriptions[attr] = self.device_proxy.subscribe_change_event(
attr, functools.partial(self._handle_attribute_event, attr)
)
[docs] def teardown(self: AttributesMonitor) -> None:
"""Teardown the monitor.
This will unsubscribe from Tango events of the attributes.
"""
for s in self.attribute_subscriptions.values():
s.unsubscribe()
self.attribute_subscriptions.clear()
self.attribute_histories.clear()
def _handle_attribute_event(
self: AttributesMonitor,
attribute: str,
event: ChangeEvent,
*args: Any,
**kwargs: Any,
) -> None:
"""
Handle an event that updates the attribute on the Tango device.
Note we will get the initial value being sent to us via Tango.
:param attribute: the name of the attribute
:type attribute: str
:param event: the event details, which includes the value and attribute quality.
:type event: ChangeEvent
"""
attr_history = self.attribute_histories[attribute]
attr_history._update_value(event.value)
if event.quality != tango.AttrQuality.ATTR_VALID:
self.logger.warning(
f"{self.device_proxy}.{attribute} is no longer valid. {event=}", exc_info=False
)
@property
def current_attribute_values(self: AttributesMonitor) -> dict:
"""Get current attribute values for device."""
self.capture_current_values()
return self.previous_attribute_values
[docs] def capture_current_values(self: AttributesMonitor) -> None:
"""Capture the current values to allow for asserting of updates later."""
self.previous_attribute_values = {k: a.current_value for k, a in self.attribute_histories.items()}
[docs] def assert_attribute(
self: AttributesMonitor, attribute: str, value_assertion: Callable[..., bool]
) -> None:
"""Assert and attribute has a given value.
This is a helper method to get the attribute's history and then passes it
to the value_assertion callable.
:param attribute: the name of the attribute to assert against.
:param value_assertion: a callable to assert against the latest value of the attribute.
"""
value = self.attribute_histories[attribute].current_value
assert value_assertion(
value
), f"Attribute '{attribute}' did not meet value assertion. Current value = {value}"
[docs] def assert_attribute_values_changed(self: AttributesMonitor) -> None:
"""Assert that attribute values have changed since last check."""
prev_values = self.previous_attribute_values
curr_values = self.current_attribute_values
assert prev_values != curr_values
[docs] def assert_attribute_values_not_changed(self: AttributesMonitor) -> None:
"""Assert that attribute values have not changed since last check."""
prev_values = self.previous_attribute_values
curr_values = self.current_attribute_values
assert prev_values == curr_values
[docs] def wait_for_attribute_update(self: AttributesMonitor, attribute_name: str, timeout: float) -> None:
"""Wait for attribute to be updated.
Waits until there has been an update for the specific attribute or a timeout has occurred.
:param attribute_name: the attribute to wait for an update of.
:param timeout: how long to wait for an update before raising an exception.
"""
self.attribute_histories[attribute_name].wait_for_update(timeout=timeout)
[docs] def get_attribute_history_events(
self: AttributesMonitor, attribute_name: str
) -> List[AttributeHistoryEvent]:
"""Get list of history events for an attribute."""
return self.attribute_histories[attribute_name].history_events