# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class file for providing a testing wrapper class around device proxies."""
from __future__ import annotations
import logging
import time
from functools import partial
from typing import Any, Generator, List, Tuple, TypeAlias
import backoff
import tango
from readerwriterlock import rwlock
from ska_control_model import ObsState
from ska_pst.lmc import DeviceProxyFactory
from ska_pst.lmc.util import LongRunningCommand
from ska_tango_base.commands import ResultCode
from .tango import _TangoAttributeEventSubscription
TangoCommandResult: TypeAlias = Tuple[List[ResultCode], List[str]]
class _MonitorAttributesFilter(logging.Filter):
def filter(self: _MonitorAttributesFilter, record: logging.LogRecord) -> bool:
if hasattr(record, "monitor_attr"):
for k, v in record.monitor_attr.items():
record.msg = record.msg + f"\n{k}: {v}"
return super().filter(record) # type: ignore
MONITOR_ATTRIBUTES_FILTER: _MonitorAttributesFilter = _MonitorAttributesFilter()
[docs]class PstTestDeviceProxy:
"""A class for use when testing a PST Tango Device.
This class is designed as a lightweight proxy that wraps a
:py:class:`tango.DeviceProxy`. This allows for automated testing
or using in a notebook to block on the command calls rather than
assuming the calls have finished.
All the commands exposed on this class do not check the current
state of the remote device. The remote device is in control of
checking state and rejecting methods that are or are not allowed.
"""
def __init__(
self: PstTestDeviceProxy,
fqdn: str,
logger: logging.Logger | None = None,
command_timeout: float = 30.0,
) -> None:
"""Create instance of device proxy.
:param fqdn: the fully qualified domain name (FQDN) of the Tango device.
:param logger: the optional logger to used when instances of this class
needs to log output.
"""
logger = logger or logging.getLogger(__name__)
logger.addFilter(MONITOR_ATTRIBUTES_FILTER)
super().__setattr__("logger", logger)
device_proxy = DeviceProxyFactory.get_device(fqdn=fqdn, logger=logger)
device_proxy.wait_for_initialised()
super().__setattr__("_device", device_proxy.device)
super().__setattr__("fqdn", fqdn)
super().__setattr__("_curr_attr_values", {})
super().__setattr__("_rw_lock", rwlock.RWLockWrite())
super().__setattr__("command_timeout", command_timeout)
# attribute subscriptions
subscriptions = {
_TangoAttributeEventSubscription(
device=device_proxy,
attribute=attribute,
evt_handler=partial(self._store_event, attribute),
logger=logger,
)
for attribute in [
"dataReceiveRate",
"dataReceived",
"dataDropRate",
"dataDropped",
"dataRecordRate",
"dataRecorded",
"availableDiskSpace",
"availableRecordingTime",
"ringBufferUtilisation",
]
}
super().__setattr__("_subscriptions", subscriptions)
def _store_event(self: PstTestDeviceProxy, attribute: str, value: Any) -> None:
with self._rw_lock.gen_wlock():
self._curr_attr_values[attribute] = value
def _yield_attr_values(self: PstTestDeviceProxy, attr: str) -> Generator[Any, None, None]:
while True:
yield self._device.read_attribute(attr).value
time.sleep(1)
def _wait_for_attribute_value(self: PstTestDeviceProxy, attr: str, desired_value: Any) -> None:
for value in self._yield_attr_values(attr):
if value == desired_value:
return
def _execute_command(
self: PstTestDeviceProxy, command: str, command_args: tuple[Any] | None = None
) -> None:
lrc = LongRunningCommand(command=command)
result = lrc(proxy=self._device, command_args=command_args, timeout=self.command_timeout)
if result.result_code == ResultCode.FAILED:
self.logger.warning(f"Command failed. The result message = {result.result}")
[docs] def On(self: PstTestDeviceProxy) -> None:
"""Call On command on remote device."""
self._execute_command(command="On")
[docs] def Off(self: PstTestDeviceProxy) -> None:
"""Call Off command on remote device."""
self._execute_command(command="Off")
[docs] def Scan(self: PstTestDeviceProxy, scan_id: str) -> None:
"""Call Scan on remote device.
This will put the remote device in to a SCANNING state.
"""
self._execute_command(command="Scan", command_args=(scan_id,))
[docs] def EndScan(self: PstTestDeviceProxy) -> None:
"""Call EndScan on remote device."""
self._execute_command(command="EndScan")
[docs] def GoToIdle(self: PstTestDeviceProxy) -> None:
"""Call GoToIdle on remote device."""
self._execute_command(command="GoToIdle")
[docs] def GoToFault(self: PstTestDeviceProxy, fault_msg: str) -> None:
"""Call GoToFault on remote device."""
self._execute_command(command="GoToFault", command_args=(fault_msg,))
[docs] def Abort(self: PstTestDeviceProxy) -> None:
"""Call Abort on remote device."""
self._execute_command(command="Abort")
[docs] def ObsReset(self: PstTestDeviceProxy) -> None:
"""Call ObsReset on remote device."""
self._execute_command(command="ObsReset")
[docs] def Reset(self: PstTestDeviceProxy) -> None:
"""Call Reset on remote device."""
self._execute_command(command="Reset")
[docs] def state(self: PstTestDeviceProxy) -> tango.AdminMode:
"""Get the current admin mode state of the remote device."""
return self._device.state()
def __setattr__(self: PstTestDeviceProxy, name: str, value: Any) -> None:
"""Set an attribute value on the remote device."""
self._device.write_attribute(name, value)
self._wait_for_attribute_value(attr=name, desired_value=value)
def __getattr__(self: PstTestDeviceProxy, name: str) -> Any:
"""Get an attribute value from the remote device."""
return self._device.read_attribute(name).value
[docs] def get_property(self: PstTestDeviceProxy, propname: str) -> Any:
"""Get the value of a device property.
This just proxies through to the `tango.DeviceProxy.get_property`
"""
return self._device.get_property(propname)
[docs] def display_monitoring(self: PstTestDeviceProxy) -> None:
"""Display current values of some monitored attributes on remote device."""
with self._rw_lock.gen_rlock():
self.logger.info("Current attribute values:", extra={"monitor_attr": self._curr_attr_values})
[docs] def monitor(self: PstTestDeviceProxy) -> None:
"""Start background monitoring of values of remote device.
This method will start a background process to log out the current
monitored values. This is done at a rate given by the monitoring
polling rate on the remote device.
"""
import multiprocessing
monitoring_polling_rate_ms = self.monitoringPollingRate
def _monitor() -> None:
self.logger.info(f"Starting to monitor {self.fqdn}")
self.logger.info(f"Monitoring polling rate: {monitoring_polling_rate_ms}ms")
while self.obsState == ObsState.SCANNING:
try:
self.display_monitoring()
time.sleep(monitoring_polling_rate_ms / 1000.0)
except Exception:
self.logger.exception("Exception occurred while monitoring.", exc_info=True)
self.logger.info(f"Monitoring is exiting as state is: {ObsState(self.obsState)}")
multiprocessing.Process(target=_monitor).start()
@backoff.on_exception(backoff.expo, AssertionError, factor=0.1, max_time=10.0)
def assert_obs_state(self: PstTestDeviceProxy, obs_state: ObsState) -> None:
"""
Assert that the obsState of the device reaches the a given value within the timeout.
This uses a backoff decorator to allow retesting the value over a period of 1 second.
"""
curr_obsState = ObsState(self.obsState)
assert (
curr_obsState == obs_state
), f"current obsState = {curr_obsState.name}, expected {obs_state.name}"
@backoff.on_exception(backoff.expo, AssertionError, factor=0.1, max_time=10.0)
def assert_comp_state(
self: PstTestDeviceProxy, recv: ObsState, smrb: ObsState, stat: ObsState, dsp: ObsState
) -> None:
"""
Assert the obsState of the sub-components reach the a given values within the timeout.
This uses a backoff decorator to allow retesting the value over a period of 1 second.
"""
curr_recvObsState = ObsState(self.recvObsState)
curr_smrbObsState = ObsState(self.smrbObsState)
curr_statObsState = ObsState(self.statObsState)
curr_dspObsState = ObsState(self.dspObsState)
assert (
curr_recvObsState == recv
), f"current obsRecvState = {curr_recvObsState.name}, expected {recv.name}"
assert (
curr_smrbObsState == smrb
), f"current obsSmrbState = {curr_smrbObsState.name}, expected {smrb.name}"
assert (
curr_statObsState == stat
), f"current obsStatState = {curr_statObsState.name}, expected {stat.name}"
assert curr_dspObsState == dsp, f"current obsDspState = {curr_dspObsState.name}, expected {dsp.name}"