#!/usr/bin/python
"""Test devices from Tango database."""
import logging
import os
import time
from typing import Any
import tango
[docs]
class TestTangoDevice:
"""Test a Tango device."""
[docs]
def __init__(self, logger: logging.Logger, device_name: str): # noqa: C901
"""
Get going.
:param logger: logging handle
:param device_name: Tango device name
"""
self.logger: logging.Logger = logger
self.adminMode: int | None = None
self.attribs: list = []
self.cmds: list = []
self.dev: tango.DeviceProxy | None
err_msg: str
self.logger.info("Test device proxy %s", device_name)
try:
self.dev = tango.DeviceProxy(device_name)
except tango.ConnectionFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {device_name} connection failed : {err_msg}")
self.logger.debug(terr)
self.dev = None
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {device_name} device failed : {err_msg}")
self.logger.debug(terr)
self.dev = None
if self.dev is not None:
try:
self.dev_name = self.dev.name()
except tango.DevFailed:
self.dev_name = device_name + " (N/A)"
try:
self.attribs = self.dev.get_attribute_list()
except tango.DevFailed:
self.attribs = []
try:
self.props = self.dev.get_property_list("*")
except tango.DevFailed:
self.props = []
try:
self.cmds = self.dev.get_command_list()
except tango.DevFailed:
self.cmds = []
self.dev_status: str | None = None
self.dev_state: int | None = None
self.simMode: int | None = None
[docs]
def get_admin_mode(self) -> int | None:
"""
Read attribute for admin mode.
:return: attribute value
"""
if self.dev is None:
return None
if "adminMode" not in self.attribs:
print(f"[ WARN ] {self.dev_name} does not have an adminMode attribute")
self.adminMode = None
self.adminModeStr = "N/A"
return None
try:
self.adminMode = self.dev.adminMode
self.adminModeStr = str(self.dev.adminMode).replace("adminMode.", "")
print(f"[ OK ] admin mode {self.adminMode}")
except AttributeError as terr:
print("[FAILED] could not read admin mode")
self.logger.debug(terr)
self.adminMode = None
self.adminModeStr = "N/A"
return self.adminMode
[docs]
def get_simulation_mode(self) -> int | None:
"""
Read attribute for simulation mode.
:return: attribute value
"""
if self.dev is None:
return None
if "simulationMode" not in self.attribs:
print(f"[ WARN ] {self.dev_name} does not have a simulationMode attribute")
try:
self.simMode = self.dev.simulationMode
print(f"[ OK ] simulation mode {self.simMode}")
except AttributeError as terr:
print("[FAILED] could not read simulation mode")
self.logger.debug(terr)
self.simMode = None
return self.simMode
[docs]
def set_simulation_mode(self, dev_sim: int | None) -> int | None:
"""
Set attribute for simulation mode.
:param dev_sim: attribute value
:return: error condition
"""
if self.dev is None:
return None
if "simulationMode" not in self.attribs:
print(f"[ WARN ] {self.dev_name} does not have a simulationMode attribute")
try:
self.dev.simulationMode = dev_sim
self.simMode = self.dev.simulationMode
print(f"[ OK ] simulation mode set to {self.simMode}")
except AttributeError as terr:
print(f"[FAILED] could not set simulation mode to {dev_sim}")
self.logger.debug(terr)
self.simMode = None
return 1
if dev_sim != self.simMode:
print(f"[FAILED] simulation mode should be {dev_sim} but is {self.simMode}")
return 0
[docs]
def test_ping(self) -> int:
"""
Check that device is online.
:return: error condition
"""
try:
self.dev.ping() # type: ignore[union-attr]
print(f"[ OK ] Device {self.dev_name} is online")
except tango.DevFailed as terr:
print(f"[FAILED] Device {self.dev_name} is not online")
self.logger.debug(terr.args[-1].desc)
return 1
return 0
[docs]
def test_device_attributes(self, show: bool = False) -> None:
"""
Display number and names of attributes.
:param show: flag to print names
"""
print(f"[ OK ] {self.dev_name} has {len(self.attribs)} attributes")
if show:
for attrib in sorted(self.attribs):
print(f"\t{attrib}")
[docs]
def test_device_properties(self, show: bool = False) -> None:
"""
Display number and names of properties.
:param show: flag to print names
"""
print(f"[ OK ] {self.dev_name} has {len(self.props)} properties")
if show:
for prop in sorted(self.props):
print(f"\t{prop}")
[docs]
def read_device_attributes(self) -> None:
"""Read all attributes of this device."""
attrib_value: Any
err_msg: str
self.logger.debug("Read attribute %s values", self.dev_name)
if self.dev is None:
return
print(f"[ OK ] {self.dev_name} read {len(self.attribs)} attributes")
for attrib in sorted(self.attribs):
time.sleep(2)
try:
attrib_value = self.dev.read_attribute(attrib).value
print(f"[ OK ] {self.dev_name} attribute {attrib} : {attrib_value}")
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} attribute {attrib} could not be read : {err_msg}")
self.logger.debug(terr)
[docs]
def test_device_commands(self, show: bool = False) -> None:
"""
Display number and names of commands.
:param show: flag to print names
"""
cmd: str
print(f"[ OK ] {self.dev_name} has {len(self.cmds)} commands")
if show:
for cmd in sorted(self.cmds):
print(f"\t{cmd}")
[docs]
def admin_mode_off(self) -> None:
"""Turn admin mode off."""
err_msg: str
if self.dev is None:
return
if self.adminMode is None:
return
if self.adminMode == 1:
self.logger.info("Turn device admin mode off")
try:
self.dev.adminMode = 0
self.adminMode = self.dev.adminMode
self.adminModeStr = str(self.dev.adminMode).replace("adminMode.", "")
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} admin mode could not be turned off : {err_msg}")
self.logger.debug(terr)
return
self.adminMode = self.dev.adminMode
self.adminModeStr = str(self.dev.adminMode).replace("adminMode.", "")
print(f"[ OK ] {self.dev_name} admin mode set to off, now ({self.adminModeStr})")
[docs]
def device_status(self) -> int | None:
"""
Print device status.
:return: device state
"""
if self.dev is None:
return None
if "Status" not in self.cmds:
print(f"[FAILED] {self.dev.dev_name} does not have Status command")
if "State" not in self.cmds:
print(f"[FAILED] {self.dev.dev_name} does not have State command")
return None
self.dev_status = self.dev.Status()
self.dev_state = self.dev.State()
print(f"[ OK ] {self.dev_name} state : {self.dev_state} ({self.dev_state:d})")
print(f"[ OK ] {self.dev_name} status : {self.dev_status}")
return self.dev_state
[docs]
def device_on(self) -> int:
"""
Turn this device on.
:return: error condition
"""
dev_on: Any
err_msg: str
self.logger.debug("Turn device %s on", self.dev_name)
if self.dev is None:
return 1
if "On" not in self.cmds:
print(f"[FAILED] {self.dev_name} does not have On command")
return 1
cmd_cfg: tango.CommandInfo = self.dev.get_command_config("On")
if cmd_cfg.in_type_desc == "Uninitialised":
try:
dev_on = self.dev.On()
print(f"[ OK ] {self.dev_name} turned on, now {dev_on}")
return 1
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} could not be turned on : {err_msg}")
self.logger.debug(terr)
else:
try:
dev_on = self.dev.On([])
print(f"[ OK ] {self.dev_name} turned on, now {dev_on}")
return 1
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(
f"[FAILED] {self.dev_name} could not be turned on (device failed) : {err_msg}"
)
self.logger.debug(terr)
return 1
except TypeError as terr:
print(
f"[FAILED] {self.dev_name} could not be turned on"
f" (parameter type should be {cmd_cfg.in_type_desc})"
)
self.logger.debug(terr)
return 1
return 0
[docs]
def device_off(self) -> int:
"""
Turn this device off.
:return: error condition
"""
dev_off: Any
err_msg: str
self.logger.debug("Turn device %s off", self.dev_name)
if self.dev is None:
return 1
if "Off" not in self.cmds:
print(f"[FAILED] {self.dev_name} does not have Off command")
return 1
cmd_cfg: tango.CommandInfo = self.dev.get_command_config("Off")
if cmd_cfg.in_type_desc == "Uninitialised":
try:
dev_off = self.dev.Off()
print(f"[ OK ] {self.dev_name} turned off, now {dev_off}")
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} could not be turned off : {err_msg}")
self.logger.debug(terr)
return 1
else:
try:
dev_off = self.dev.Off([])
print(f"[ OK ] {self.dev_name} turned off, now {dev_off}")
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} could not be turned off : {err_msg}")
self.logger.debug(terr)
return 1
return 0
[docs]
def device_standby(self) -> int:
"""
Set this device to standby mode.
:return: error condition
"""
dev_standby: Any
err_msg: str
cmd_cfg: tango.CommandInfo
self.logger.debug("Set device %s on standby", self.dev_name)
if self.dev is None:
return 1
if "Standby" not in self.cmds:
print(f"[FAILED] {self.dev.dev_name} does not have Standby command")
return 1
cmd_cfg = self.dev.get_command_config("Standby")
if cmd_cfg.in_type_desc == "Uninitialised":
try:
dev_standby = self.dev.Standby()
print(f"[ OK ] {self.dev_name} switched to standby, now {dev_standby}")
return 0
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} could not be switched to standby : {err_msg}")
self.logger.debug(terr)
else:
try:
dev_standby = self.dev.Standby([])
print(f"[ OK ] {self.dev_name} switched to standby, now {dev_standby}")
return 0
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} could not be switched to standby : {err_msg}")
self.logger.debug(terr)
return 1
[docs]
def admin_mode_on(self) -> None:
"""Turn admin mode on."""
err_msg: str
self.logger.info("Turn device admin mode on")
if self.dev is None:
return
try:
self.dev.adminMode = 1
self.adminMode = self.dev.adminMode
self.adminModeStr = str(self.dev.adminMode).replace("adminMode.", "")
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} admin mode could not be turned on : {err_msg}")
self.logger.debug(terr)
return
print(f"[ OK ] {self.dev_name} admin mode turned on, now ({self.adminModeStr})")
[docs]
def set_admin_mode(self, admin_mode: int) -> int:
"""
Change admin mode.
:param admin_mode: new value
:return: error condition
"""
err_msg: str
self.logger.info("Set device admin mode to %d", admin_mode)
if self.dev is None:
return 1
try:
self.dev.adminMode = admin_mode
self.adminMode = self.dev.adminMode
self.adminModeStr = str(self.dev.adminMode).replace("adminMode.", "")
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[FAILED] {self.dev_name} admin mode could not be changed : {err_msg}")
self.logger.debug(terr)
return 1
if self.adminMode != admin_mode:
print(
f"[FAILED] {self.dev_name} admin mode is {self.adminMode}"
f" but should be {admin_mode}"
)
return 1
print(f"[ OK ] {self.dev_name} admin mode set to ({self.adminModeStr})")
return 0
[docs]
def test_admin_mode(self, dev_admin: int | None) -> int:
"""
Test admin mode.
:param dev_admin: new value
:return: error condition
"""
self.test_ping()
self.get_simulation_mode()
# Read admin mode
self.get_admin_mode()
if self.adminMode is not None:
if dev_admin is not None:
self.set_admin_mode(dev_admin)
return 0
[docs]
def test_off(self, dev_sim: int | None) -> int:
"""
Test that device can be turned off.
:param dev_sim: flag for hardware simulation.
:return: error condition
"""
self.test_ping()
self.get_simulation_mode()
if dev_sim is not None:
self.set_simulation_mode(dev_sim)
self.test_device_attributes()
self.test_device_commands()
# Read admin mode
self.get_admin_mode()
# Read state
self.device_status()
# Turn device off
self.device_off()
# Turn on admin mode
self.admin_mode_on()
# Read state
self.device_status()
return 0
[docs]
def test_on(self, dev_sim: int | None) -> int:
"""
Test that device can be turned on.
:param dev_sim: flag for hardware simulation.
:return: error condition
"""
init_state: int | None
self.test_ping()
self.get_simulation_mode()
if dev_sim is not None:
self.set_simulation_mode(dev_sim)
self.test_device_attributes()
self.test_device_commands()
# Read admin mode, turn off
self.get_admin_mode()
self.admin_mode_off()
# Turn device on
init_state = self.device_status()
# pylint: disable-next=c-extension-no-member
if init_state == tango._tango.DevState.ON:
print("[ WARN ] device is already on")
else:
self.device_on()
self.device_status()
return 0
[docs]
def test_standby(self, dev_sim: int | None) -> int:
"""
Test that device can be placed into standby mode.
:param dev_sim: flag for hardware simulation.
:return: error condition
"""
self.test_ping()
self.get_simulation_mode()
if dev_sim is not None:
self.set_simulation_mode(dev_sim)
self.get_admin_mode()
self.device_standby()
self.device_status()
return 0
[docs]
def test_status(self) -> int:
"""
Test that device status can be read.
:return: error condition
"""
self.test_ping()
self.get_simulation_mode()
self.get_admin_mode()
self.device_status()
return 0
[docs]
def test_simulation_mode(self, dev_sim: int | None) -> int:
"""
Test that device hardware simulation can be set.
:param dev_sim: flag for hardware simulation.
:return: error condition
"""
self.test_ping()
self.get_simulation_mode()
self.set_simulation_mode(dev_sim)
self.get_simulation_mode()
return 0
[docs]
def test_all(self, show_attrib: bool) -> int:
"""
Test everything that device can do.
:param show_attrib: flag for attributes display.
:return: error condition
"""
init_admin_mode: int | None
init_state: int | None
self.test_ping()
self.get_simulation_mode()
self.test_device_attributes()
self.test_device_commands()
# Read admin mode, turn on ond off
init_admin_mode = self.get_admin_mode()
if self.adminMode is not None:
self.admin_mode_on()
self.admin_mode_off()
# Read state
init_state = self.device_status()
# Turn device on
# pylint: disable-next=c-extension-no-member
if init_state == tango._tango.DevState.ON:
print("[ WARN ] device is already on")
else:
self.device_on()
self.device_status()
# Read attribute values
if show_attrib:
self.read_device_attributes()
else:
print("[ WARN ] skip reading attributes")
# Turn device off
self.device_off()
self.device_status()
# pylint: disable-next=c-extension-no-member
if self.dev_state == tango._tango.DevState.ON:
print("[FAILED] device is still on")
# Turn device back on, if necessary
# pylint: disable-next=c-extension-no-member
if init_state == tango._tango.DevState.ON:
print("[ WARN ] turn device back on")
self.device_on()
self.device_status()
# Turn device admin mode back on, if necessary
if init_admin_mode == 1:
print("[ WARN ] turn admin mode back to on")
self.admin_mode_on()
return 0
[docs]
def test_subscribe(self, attrib: str) -> int:
"""
Test subscribed to event.
:param attrib: attribute name
:return: error condition
"""
err_msg: str
evnt_id: int
events: Any
print(f"[ OK ] subscribe to events for {self.dev_name} {attrib}")
if self.dev is None:
return 1
evnt_id = self.dev.subscribe_event(
attrib, tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()
)
print(f"[ OK ] subscribed to event ID {evnt_id}")
time.sleep(15)
try:
events = self.dev.get_events(evnt_id)
print(f"[ OK ] got events {events}")
except tango.EventSystemFailed as terr:
err_msg = terr.args[0].desc.strip()
print(f"[ WARN ] got no events for {self.dev_name} {attrib} : {err_msg}")
self.logger.debug(terr)
try:
self.dev.devc.unsubscribe_event(evnt_id)
print(f"[ OK ] unsubscribed from event ID {evnt_id}")
except AttributeError as oerr:
print(f"[ WARN ] could not unsubscribe from event ID {evnt_id}")
self.logger.debug(oerr)
return 0
[docs]
def run_test( # noqa: C901
self,
dev_admin: int | None,
dev_off: bool,
dev_on: bool,
dev_ping: bool,
dev_sim: int | None,
dev_standby: bool,
dev_status: dict,
show_attrib: bool,
show_command: bool,
show_prop: bool,
tgo_attrib: str | None,
tgo_name: str | None,
) -> int:
"""
Run tests on Tango devices.
:param dev_admin: check admin mode
:param dev_off: turn device on
:param dev_on: turn device off
:param dev_ping: ping device
:param dev_sim: simulation flag
:param dev_standby: place device in standby
:param dev_status: set device status
:param show_attrib: test device attributes
:param show_command: test device commands
:param show_prop: test device properties
:param tgo_attrib: name of attribute
:param tgo_name: device name
:return: error condition
"""
if tgo_name is None:
self.logger.error("Device name not specified")
return 1
# General tests
if dev_ping:
self.test_ping()
if dev_admin is not None:
self.test_admin_mode(dev_admin)
if dev_status:
self.test_status()
if show_attrib:
self.test_device_attributes(True)
if show_command:
self.test_device_commands(True)
if show_prop:
self.test_device_properties(True)
if tgo_attrib is not None:
self.test_subscribe(tgo_attrib)
if dev_sim is not None:
self.test_simulation_mode(dev_sim)
# Mutually exclusive tests
if dev_off:
self.test_off(dev_sim)
elif dev_on:
self.test_on(dev_sim)
elif dev_standby:
self.test_standby(dev_sim)
else:
pass
return 0
class TestTangoDevices:
"""Compile a list of available Tango devices."""
def __init__(self, logger: logging.Logger, evrythng: bool, cfg_data: Any):
"""
Read Tango device names.
:param logger: logging handle
:param evrythng: include the kitchen sink
:param cfg_data: configuration data
:raises Exception: connect to Tango database failed
"""
database: tango.Database
device_list: tango.DbDatum
device: str
self.logger = logger
self.tango_devices: list = []
# Connect to database
try:
database = tango.Database()
except Exception:
tango_host = os.getenv("TANGO_HOST")
self.logger.error("[FAILED] Could not connect to Tango database %s", tango_host)
raise Exception("Could not connect to Tango database %s", tango_host)
# Read devices
device_list = database.get_device_exported("*")
print(f"[ OK ] {len(device_list)} devices available")
for device in sorted(device_list.value_string):
# Check device name against mask
if not evrythng:
chk_fail: bool = False
for dev_chk in cfg_data["ignore_device"]:
chk_len = len(dev_chk)
if device[0:chk_len] == dev_chk:
chk_fail = True
break
if chk_fail:
self.logger.debug("'%s' matches '%s'", device, cfg_data["ignore_device"])
continue
self.logger.debug("Add device %s", device)
self.tango_devices.append(device)
self.logger.info("Found %d devices", len(self.tango_devices))