Source code for ska_tangoctl.tango_control.test_tango_script

"""Dance the Tango from a script."""

import json
import logging
import os
from typing import Any, TextIO

import tango


[docs] class TangoScript: """The classy Tango."""
[docs] def __init__( self, logger: logging.Logger, input_file: int | str | bytes | os.PathLike[str] | os.PathLike[bytes], device_name: str | None, dry_run: bool, ): """ Read actions from file. :raises Exception: error condition :param logger: logging handle :param input_file: input file :param device_name: device name :param dry_run: flag for dry run """ tango_host: str | None self.dev: tango.DeviceProxy err_msg: str self.logger = logger self.dev_name: str # Read configuration file self.logger.warning("Read file %s", input_file) cfg_file: TextIO = open(input_file) self.cfg_data: Any = json.load(cfg_file) cfg_file.close() # Get Tango database host tango_host = os.getenv("TANGO_HOST") if device_name is None: self.logger.error("Tango device name not set") raise Exception("Tango device name not set") try: self.logger.info("Connect device %s", device_name) self.dev = tango.DeviceProxy(device_name) # Return the device name from the device itself self.dev_name = self.dev.name() # Return the names of all attributes implemented for this device self.attributes: list = sorted(self.dev.get_attribute_list()) # Return the names of all commands implemented for this device self.commands: list = sorted(self.dev.get_command_list()) # Get the list of property names for the device self.properties: list = sorted(self.dev.get_property_list("*")) except tango.ConnectionFailed as terr: err_msg = terr.args[0].desc.strip() print(f"[FAILED] {device_name} connection to {tango_host} failed : {err_msg}") self.logger.debug(terr) self.dev = None except tango.DevFailed as terr: err_msg = terr.args[0].desc.strip() print(f"[FAILED] {device_name} device failed : {err_msg}") self.logger.debug(terr) self.dev_name = device_name + " (N/A)" self.logger.info("Read device name %s", self.dev_name)
[docs] def run(self) -> int: """ Run the thing. :return: error condition """ test_cfg: Any thing: Any attr_read: Any attr_write: Any self.logger.info("Process : %s", self.cfg_data) for test in self.cfg_data: test_cfg = self.cfg_data[test] if type(test_cfg) is list: for thing in test_cfg: if type(thing) is dict: if "attribute" in thing: attr_thing = thing["attribute"] if "read" in thing: attr_read = thing["read"] else: attr_read = None if "write" in thing: attr_write = thing["write"] else: attr_write = None self.read_write_attribute(attr_thing, attr_read, attr_write) elif "command" in thing: cmd_thing = thing["command"] if "args" in thing: cmd_args = thing["args"] else: cmd_args = None self.run_command(cmd_thing, cmd_args) else: self.logger.info(f"Device {self.dev_name} {thing} ({type(thing)})") else: self.logger.info(f"{test} : {test_cfg}") return 0
[docs] def read_write_attribute( # noqa: C901 self, attr_thing: str | None, attr_read: int | float | str | None, attr_write: int | float | str | None, ) -> int: """ Read or write Tango attribute. :param attr_thing: attribute name :param attr_read: read value :param attr_write: write value :return: error condition """ env_name: str attrib_data: tango.DeviceAttribute attr_type: str write_val: Any if attr_thing not in self.attributes: self.logger.error("Device does not have a '%s' attribute", attr_thing) return 1 if attr_read is not None: if type(attr_read) is str: if "${" in attr_read and "}" in attr_read: env_name = attr_read.split("{")[1].split("}")[0] attr_read = os.getenv(env_name) self.logger.info("Read environment variable %s : %s", env_name, attr_read) self.logger.debug("Read attribute %s : should be '%s'", attr_thing, str(attr_read)) attrib_data = self.dev.read_attribute(attr_thing) print("Attribute %s : %s" % (attrib_data.name, attrib_data.value)) if attr_write is not None: # Read a single attribute attrib_data = self.dev.read_attribute(attr_thing) attr_type = str(attrib_data.type) if type(attr_write) is str: if "${" in attr_write and "}" in attr_write: env_name = attr_write.split("{")[1].split("}")[0] attr_write = os.getenv(env_name) self.logger.info("Read environment variable %s : %s", env_name, attr_write) self.logger.info("Write attrbute %s value %s", attr_thing, str(attr_write)) if attr_type == "DevEnum": write_val = int(attr_write) # type: ignore[arg-type] else: write_val = str(attr_write) # Write a single attribute try: self.dev.write_attribute(attr_thing, write_val) except tango.DevFailed as terr: err_msg = terr.args[0].desc.strip() self.logger.error("Write failed : %s", err_msg) return 1 # Read the same attribute attrib_data = self.dev.read_attribute(attr_thing) print("Attrbute %s set to %s" % (attrib_data.name, attrib_data.value)) return 0
[docs] def run_command(self, cmd_thing: str, cmd_args: Any) -> int: """ Run Tango command. :param cmd_thing: command name :param cmd_args: command arguments :return: error condition """ cmd_val: Any if cmd_thing not in self.commands: self.logger.error("Device does not have a '%s' command", cmd_thing) return 1 if cmd_args is None: self.logger.info("Run command %s", cmd_thing) cmd_val = self.dev.command_inout(cmd_thing) print("Command %s : %s" % (cmd_thing, cmd_val)) else: self.logger.info("Run command %s arguments %s", cmd_thing, cmd_args) cmd_val = self.dev.command_inout(cmd_thing, cmd_args) print("Command %s (%s) : %s" % (cmd_thing, cmd_args, cmd_val)) return 0