"""Read and display Tango stuff."""
import json
import logging
import os
from typing import Any
import numpy
import tango
try:
from ska_tangoctl.k8s_info.get_k8s_info import KubernetesInfo
except ModuleNotFoundError:
KubernetesInfo = None # type: ignore[assignment,misc]
from ska_tangoctl.tango_control.disp_action import DispAction
from ska_tangoctl.tango_control.progress_bar import progress_bar
from ska_tangoctl.tango_control.tango_json import TangoJsonReader
from ska_tangoctl.tla_jargon.tla_jargon import find_jargon
DEFAULT_TIMEOUT_MILLIS: int = 500
[docs]
class TangoctlDevice:
"""Compile a dictionary for a Tango device."""
[docs]
def __init__( # noqa: C901
self,
logger: logging.Logger,
disp_action: DispAction,
outf: Any,
timeout_millis: int | None,
dev_status: dict,
device: str,
list_items: dict,
block_items: dict,
tgo_attrib: str | None,
tgo_cmd: str | None,
tgo_prop: str | None,
k8s_ctx: str | None,
domain_name: str | None,
indent: int = 0,
):
"""
Iniltialise the thing.
:param logger: logging handle
:param disp_action: display settings
:param outf: output file pointer
:param timeout_millis: Tango device timeout in milliseconds
:param dev_status: flag to read status
:param device: device name
:param list_items: attributes, commands or properties in list output
:param block_items: attributes, commands or properties not to be shown in list output
:param tgo_attrib: attribute filter
:param tgo_cmd: command filter
:param tgo_prop: property filter
:param k8s_ctx: Kubernetes context
:param domain_name: Kubernetes domain name
:param indent: indentation for JSON and YAML
:raises Exception: could not open device
"""
self.commands: dict = {}
self.attributes: dict = {}
self.properties: dict = {}
self.procs: dict = {}
self.pod_name: str | None = None
self.pod_desc: dict = {}
self.attribs_found: list = []
self.props_found: list = []
self.cmds_found: list = []
self.indent: int = indent
self.info: tango.DeviceInfo
self.quiet_mode: bool = True
self.outf: Any = outf
self.attribs: list
self.cmds: list
self.props: list
self.list_items: dict
self.timeout_millis: int | None
self.logger: logging.Logger = logger
self.disp_action: DispAction = disp_action
if timeout_millis is None:
self.timeout_millis = DEFAULT_TIMEOUT_MILLIS
else:
self.timeout_millis = timeout_millis
self.dev: tango.DeviceProxy
self.dev_name: str = "?"
self.version: str = "?"
self.status: str = "?"
self.adminMode: int | None = None
self.adminModeStr: str = "---"
self.dev_class: str
self.dev_state: Any = None
self.dev_errors: list = []
self.dev_values: dict = {}
self.db_host: str = "?"
self.db_port: int = 0
self.tango_lib: int = 0
self.domain_name: str | None = domain_name
self.k8s_ctx: str | None = k8s_ctx
err_msg: str
# Set up Tango device
tango_host = os.getenv("TANGO_HOST")
self.list_items = list_items
self.logger.debug(
"Open device %s (%s) status %s (list items %s)",
device,
tango_host,
dev_status,
list_items,
)
try:
self.dev = tango.DeviceProxy(device)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Tango device failed: %s", err_msg)
self.dev_errors.append(f"Tango device failed: {err_msg}")
self.dev = None
except RuntimeError as rerr:
self.logger.warning("Tango runtime error: %s", rerr)
self.dev_errors.append(f"Tango runtime error: {rerr}")
self.dev = None
if self.dev is None:
device = device.lower()
self.logger.debug("Retry device %s", device)
try:
self.dev = tango.DeviceProxy(device)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.dev_name = f"{device} (N/A)"
self.logger.warning(
"Could not open device %s (%s) : %s", device, tango_host, err_msg
)
raise Exception(f"Could not open device {device} ({tango_host}) : {err_msg}")
self.dev.set_timeout_millis(self.timeout_millis)
# Read device name and database info
self.dev_name = self.read_info(device, "name", True)
if self.dev_name is None:
self.dev_name = f"{device} (N/A)"
self.db_host = self.read_info(device, "get_db_host", True)
if self.db_host is None:
self.db_host = "N/A"
self.db_port = self.read_info(device, "get_db_port_num")
if self.db_port is None:
self.db_port = 0
self.tango_lib = self.read_info(device, "get_tango_lib_version", True)
if self.tango_lib is None:
self.tango_lib = 0
# Read information on the device and device class name
try:
self.info = self.dev.info()
self.dev_class = self.info.dev_class
except Exception as eerr:
self.logger.warning("Could not read device class: %s", str(eerr))
self.dev_class = "N/A"
# Read green mode and access control type for this DeviceProxy
self.green_mode: Any = str(self.dev.get_green_mode())
self.dev_access: str = str(self.dev.get_access_control())
# Read status
if dev_status:
self.attribs = dev_status["attributes"]
self.cmds = dev_status["commands"]
self.props = dev_status["properties"]
self.logger.debug(
"Get status for %s: attributes %s commands %s properties %s",
self.dev_name,
self.attribs,
self.cmds,
self.props,
)
else:
self.attribs = []
self.cmds = []
self.props = []
# Read the names of all attributes implemented for this device
if self.disp_action.show_attrib:
self.logger.debug("Get attribute list for %s", self.dev_name)
try:
self.attribs = sorted(self.dev.get_attribute_list())
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Could not read attributes for %s", device)
self.dev_errors.append(f"Could not read attributes : {err_msg}")
self.attribs = []
self.logger.debug("Got %d attributes for %s", len(self.attribs), self.dev_name)
# Read the names of all commands implemented for this device
if self.disp_action.show_cmd:
try:
self.cmds = sorted(self.dev.get_command_list(), reverse=self.disp_action.reverse)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Could not read commands for %s", device)
self.dev_errors.append(f"Could not read commands : {err_msg}")
self.cmds = []
self.logger.debug("Got %d commands for %s", len(self.cmds), self.dev_name)
# Get the list of property names for the device
if self.disp_action.show_prop:
try:
self.props = sorted(
self.dev.get_property_list("*"), reverse=self.disp_action.reverse
)
except tango.NonDbDevice:
self.logger.info("Not reading properties in nodb mode")
self.props = []
self.logger.debug("Got %d properties for %s", len(self.props), self.dev_name)
self.logger.debug(
"Open device %s (attributes %s, commands %s, properties %s)",
device,
tgo_attrib,
tgo_cmd,
tgo_prop,
)
self.list_items = list_items
self.block_items = block_items
# Set quiet mode, i.e. do not display progress bars
if self.logger.getEffectiveLevel() in (logging.DEBUG, logging.INFO):
self.quiet_mode = True
if tgo_attrib:
tgo_attrib = tgo_attrib.lower()
# Check commands
for cmd in self.cmds:
if self.disp_action.xact_match and tgo_cmd:
if tgo_cmd == cmd.lower():
self.logger.debug("Add matched command %s", cmd)
self.commands[cmd] = {}
elif tgo_cmd:
if tgo_cmd in cmd.lower():
self.logger.debug("Add command %s", cmd)
self.commands[cmd] = {}
elif tgo_attrib or tgo_prop:
pass
else:
self.logger.debug("Add command %s", cmd)
self.commands[cmd] = {}
# Check attributes
for attrib in self.attribs:
if self.disp_action.xact_match and tgo_attrib:
if tgo_attrib == attrib.lower():
self.logger.debug("Add matched attribute %s", attrib)
self.attributes[attrib] = {}
elif tgo_attrib:
if tgo_attrib in attrib.lower():
self.logger.debug("Add attribute %s", attrib)
self.attributes[attrib] = {}
elif tgo_cmd or tgo_prop:
pass
else:
self.logger.debug("Add attribute %s", attrib)
self.attributes[attrib] = {}
# Check properties
for prop in self.props:
if self.disp_action.xact_match and tgo_prop:
if tgo_prop == prop.lower():
self.logger.debug("Add matched property %s", prop)
self.properties[prop] = {}
elif tgo_prop:
if tgo_prop in prop.lower():
self.logger.debug("Add property %s", prop)
self.properties[prop] = {}
elif tgo_attrib or tgo_cmd:
pass
else:
self.logger.debug("Add property %s", prop)
self.properties[prop] = {}
# Check information
try:
self.info = self.dev.info()
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Could not read info from %s : %s", device, err_msg)
self.dev_errors.append(f"Could not read info: {err_msg}")
self.info = None
# Check version
try:
self.version = self.dev.versionId
except AttributeError as oerr:
self.logger.debug("Could not read device %s version ID : %s", self.dev_name, str(oerr))
self.version = "N/A"
except tango.CommunicationFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.debug("Could not read device %s version ID : %s", self.dev_name, err_msg)
self.version = "N/A"
self.logger.info(
"Add device %s with %d attributes, %d commands and %d properties, class %s",
device,
len(self.attributes),
len(self.commands),
len(self.properties),
self.dev_class,
)
# Other stuff
# self. = self.dev.get_()
self.fqdn = self.read_info(device, "get_fqdn")
self.idl_version = self.read_info(device, "get_idl_version")
# TODO figure this out
# self. = self.dev.get_locker()
self.logging_level = self.read_info(device, "get_logging_level")
self.logging_target = self.read_info(device, "get_logging_target")
self.pipe_config = self.read_info(device, "get_pipe_config")
self.source = self.read_info(device, "get_source")
self.timeout_millis = self.read_info(device, "get_timeout_millis")
self.transparency_reconnection = self.read_info(device, "get_transparency_reconnection")
# TODO deal with very messy string
# try:
# self.componentstates = self.dev.getcomponentstates()
# except AttributeError as ae:
# self.logger.info("Could not read component states attribute: %s", str(ae))
# self.componentstates = {}
# except Exception as gcse:
# self.logger.info("Could not read component states: %s", str(gcse))
# self.componentstates = {}
# self.logger.debug("Componentstates: %s", self.componentstates)
# Check name for acronyms
if self.disp_action.show_jargon:
self.jargon = find_jargon(self.dev_name)
else:
self.jargon = ""
[docs]
def read_info(self, device: str, info: str, log_it: bool = False) -> Any: # noqa: C901
"""
Read device name and database info.
:param device: device name
:param info: what to read
:param log_it: log warning when read fails
:returns: String with information read from Tango
"""
info_data: str | None
try:
if info == "name":
info_data = self.dev.name()
elif info == "get_db_host":
info_data = self.dev.get_db_host()
elif info == "get_db_port_num":
info_data = self.dev.get_db_port_num()
elif info == "get_tango_lib_version":
info_data = self.dev.get_tango_lib_version()
elif info == "get_fqdn":
info_data = self.dev.get_fqdn()
elif info == "get_idl_version":
info_data = self.dev.get_idl_version()
elif info == "get_logging_level":
info_data = self.dev.get_logging_level()
elif info == "get_logging_target":
info_data = self.dev.get_logging_target()
elif info == "get_pipe_config":
info_data = self.dev.get_pipe_config()
elif info == "get_source":
info_data = self.dev.get_source()
elif info == "get_timeout_millis":
info_data = self.dev.get_timeout_millis()
elif info == "get_transparency_reconnection":
info_data = self.dev.get_transparency_reconnection()
# elif info == "":
# info_data = self.dev.()
else:
self.logger.warning("Can't read device %s %s", device, info)
info_data = None
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
if log_it:
self.logger.warning("Failed to read device %s %s : %s", device, info, err_msg)
self.dev_errors.append(f"Could not read device {device} {info} : {err_msg}")
else:
self.logger.debug("Failed to read device %s %s : %s", device, info, err_msg)
info_data = None
except tango.ConnectionFailed as terr:
err_msg = terr.args[0].desc.strip()
if log_it:
self.logger.warning("Connection failed reading device %s %s", device, info)
self.dev_errors.append(
f"Connection failed reading device {device} {info} : {err_msg}"
)
else:
self.logger.debug("Connection failed reading device %s %s", device, info)
info_data = None
return info_data
[docs]
def __del__(self) -> None:
"""Destructor."""
self.logger.debug("Shut down TangoctlDevice for %s", self.dev_name)
[docs]
def read_config(self) -> None: # noqa: C901
"""
Read additional data as configured in JSON file.
State, adminMode and versionId are specific to devices
"""
attribute: Any
command: Any
dev_val: Any
err_msg: str
self.logger.debug("Reading configuration of device %s", self.dev_name)
# Names of attributes to be read
attribs: list
if self.attribs:
attribs = self.attribs
else:
attribs = list(self.list_items["attributes"].keys())
self.logger.debug("Reading attributes : %s", attribs)
# Names of commands to be read
cmds: list
if self.cmds:
cmds = self.cmds
else:
cmds = list(self.list_items["commands"].keys())
self.logger.debug("Reading commands : %s", cmds)
# Names of properties to be read
props: list
if self.props:
props = self.props
else:
props = list(self.list_items["properties"].keys())
self.logger.debug("Reading properties : %s", props)
# Read configured attribute values
if "attributes" in self.list_items:
self.logger.debug("Reading attributes : %s", self.list_items["attributes"])
for attribute in self.list_items["attributes"]:
if type(attribute) is list:
attribute = attribute[0]
if attribute not in attribs:
self.logger.info("Attribute %s not in %s", attribute, attribs)
try:
self.dev_values[attribute] = "-"
except TypeError:
self.logger.error("Could not update attribute %s", attribute)
continue
# Read a single attribute
try:
dev_attrib = self.dev.read_attribute(attribute)
dev_val_type = dev_attrib.type
if dev_val_type == tango._tango.CmdArgType.DevEnum:
dev_attr_cfg = self.dev.get_attribute_config(attribute)
dev_val = dev_attr_cfg.enum_labels[dev_attrib.value]
else:
dev_val = dev_attrib.value
self.logger.debug(
"Read device %s attribute %s value : %s", self.dev_name, attribute, dev_val
)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.debug(
"Device %s failed for attribute %s : %s",
self.dev_name,
attribute,
err_msg,
)
dev_val = ""
except tango.CommunicationFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning(
"Communication failed for device %s attribute %s : %s",
self.dev_name,
attribute,
err_msg,
)
self.dev_errors.append(
f"Communication failed for device {self.dev_name} attribute {attribute} :"
f" {err_msg}"
)
dev_val = "N/A"
except AttributeError as oerr:
self.logger.warning(
"Attribute error for device %s attribute %s : %s",
self.dev_name,
attribute,
str(oerr),
)
self.dev_errors.append(
f"Attribute error for device {self.dev_name} attribute {attribute} :"
f" {str(oerr)}"
)
dev_val = "N/A"
except TypeError as yerr:
self.logger.warning(
"Type error for device %s attribute %s : %s",
self.dev_name,
attribute,
str(yerr),
)
self.dev_errors.append(
f"Type error for device {self.dev_name} attribute {attribute} :"
f" {str(yerr)}"
)
dev_val = "N/A"
self.logger.debug("Read attribute %s value: %s", attribute, dev_val)
self.dev_values[attribute] = dev_val
# Read configured command values
if "commands" in self.list_items:
self.logger.debug("Reading commands : %s", self.list_items["commands"])
for command in self.list_items["commands"]:
if type(command) is list:
command = command[0]
if command not in cmds:
self.logger.info("Command %s not in %s", command, cmds)
self.dev_values[command] = "-"
continue
# Execute a command on a device
try:
dev_val = str(self.dev.command_inout(command))
self.logger.debug(
"Read device %s command %s value : %s", self.dev_name, command, dev_val
)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning(
"Read device %s command %s failed : %s", self.dev_name, command, err_msg
)
self.dev_errors.append(
f"Read device {self.dev_name} command {command} failed : {err_msg}"
)
dev_val = "N/A"
except tango.CommunicationFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning(
"Could not read device %s command %s : %s", self.dev_name, command, err_msg
)
self.dev_errors.append(
f"Could not read device {self.dev_name} command {command} : {err_msg}"
)
dev_val = "N/A"
except AttributeError as oerr:
self.logger.warning(
"Could not device %s command %s : %s", self.dev_name, command, str(oerr)
)
self.dev_errors.append(
f"Could not read device {self.dev_name} command {command} : {str(oerr)}"
)
dev_val = "N/A"
except TypeError as yerr:
self.logger.warning(
"Type error for device %s command %s : %s",
self.dev_name,
command,
str(yerr),
)
self.dev_errors.append(
f"Type error for device {self.dev_name} command {command} :{str(yerr)}"
)
dev_val = "N/A"
self.logger.debug("Read command %s: %s", command, dev_val)
self.dev_values[command] = dev_val
# Read configured command values
if "properties" in self.list_items:
self.logger.debug("Reading properties : %s", self.list_items["properties"])
for tproperty in self.list_items["properties"]:
if tproperty not in props:
self.logger.debug("Property %s not in %s", tproperty, props)
self.dev_values[tproperty] = "-"
continue
# Get a list of properties for a device
try:
dev_val = self.dev.get_property(tproperty)[tproperty]
# pylint: disable-next=c-extension-no-member
if type(dev_val) is tango._tango.StdStringVector:
dev_val = ",".join(dev_val)
except tango.NonDbDevice:
self.logger.info("Not reading properties in nodb mode")
dev_val = "-"
self.logger.debug("Read property %s: %s", property, dev_val)
self.dev_values[tproperty] = dev_val
self.logger.debug("Read configuration of device %s", self.dev_name)
[docs]
def read_attribute_config(self, attrib: str) -> tuple:
"""
Read configuration of attribute.
:param attrib: attribute name
:returns: tuple with configuration, poll period and error message
"""
attrib_cfg: Any
poll_period: int | None
err_msg: str | None = None
try:
attrib_cfg = self.dev.get_attribute_config(attrib)
poll_period = self.dev.get_attribute_poll_period(attrib)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning(
"Could not not read attribute %s config for %s : %s",
attrib,
self.dev_name,
err_msg,
)
attrib_cfg = None
poll_period = None
return attrib_cfg, poll_period, err_msg
[docs]
def read_config_all(self) -> None:
"""Read attribute and command configuration."""
attrib: str
cmd: str
attrib_cfg: Any
poll_period: int | None
err_msg: str | None
self.logger.debug("Reading all configurations from device %s", self.dev_name)
# Read attribute configuration
for attrib in self.attributes:
self.logger.debug("Read attribute config from %s", attrib)
# Read the attribute configuration for a single attribute
attrib_cfg, poll_period, err_msg = self.read_attribute_config(attrib)
if err_msg is None:
self.attributes[attrib]["config"] = attrib_cfg
self.attributes[attrib]["poll_period"] = poll_period
else:
# Retry once
attrib_cfg, poll_period, err_msg = self.read_attribute_config(attrib)
if err_msg is None:
self.attributes[attrib]["config"] = attrib_cfg
self.attributes[attrib]["poll_period"] = poll_period
else:
self.attributes[attrib]["error"] = err_msg
self.attributes[attrib]["config"] = None
self.attributes[attrib]["poll_period"] = None
self.logger.debug("Device %s attributes: %s", self.dev_name, self.attributes)
# Read command configuration
for cmd in self.commands:
self.logger.debug("Read command config from %s", cmd)
# Read the configuration for a single command
try:
self.commands[cmd]["config"] = self.dev.get_command_config(cmd)
self.commands[cmd]["poll_period"] = self.dev.get_command_poll_period(cmd)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning(
"Could not not read config for command %s on %s : %s",
cmd,
self.dev_name,
err_msg,
)
self.commands[cmd]["error"] = err_msg
self.commands[cmd]["config"] = None
self.commands[cmd]["poll_period"] = None
self.logger.debug("Device %s commands: %s", self.dev_name, self.commands)
self.logger.debug("Read all configurations from device %s", self.dev_name)
[docs]
def check_for_attribute(self, tgo_attrib: str | None) -> list:
"""
Filter by attribute name.
:param tgo_attrib: attribute name
:return: list of device names matched
"""
chk_attrib: str
self.logger.debug(
"Check %d attributes for %s : %s", len(self.attributes), tgo_attrib, self.attributes
)
self.attribs_found = []
if not tgo_attrib:
return self.attribs_found
chk_attrib = tgo_attrib.lower()
for attrib in self.attributes:
if chk_attrib in attrib.lower():
self.attribs_found.append(attrib)
return self.attribs_found
[docs]
def check_for_command(self, tgo_cmd: str | None) -> list:
"""
Filter by command name.
:param tgo_cmd: command name
:return: list of device names matched
"""
self.logger.debug(
"Check %d commands for %s : %s", len(self.commands), tgo_cmd, self.commands
)
self.cmds_found = []
if not tgo_cmd:
return self.cmds_found
chk_cmd: str = tgo_cmd.lower()
for cmd in self.commands:
if chk_cmd in cmd.lower():
self.cmds_found.append(cmd)
return self.cmds_found
[docs]
def check_for_property(self, tgo_prop: str | None) -> list:
"""
Filter by command name.
:param tgo_prop: property name
:return: list of device names matched
"""
chk_prop: str
self.logger.debug(
"Check %d props for %s : %s", len(self.commands), tgo_prop, self.commands
)
self.props_found = []
if not tgo_prop:
return self.props_found
chk_prop = tgo_prop.lower()
for prop in self.properties:
if chk_prop in prop.lower():
self.props_found.append(prop)
return self.props_found
[docs]
def make_json_small(self) -> dict: # noqa: C901
"""
Convert internal values to JSON.
:returns: dictionary
"""
def read_json_attribute(attr_name: str) -> dict:
"""
Add attribute to dictionary.
:param attr_name: attribute name
:returns: dictionary
"""
self.logger.debug("Read JSON attribute %s", attr_name)
# Check for unknown attribute
attrib_dict: dict = {}
attrib_dict["name"] = attr_name
if attr_name not in self.attributes:
self.logger.debug("Unknown attribute %s not shown", attr_name)
return attrib_dict
attrib_dict["data"] = {}
# Check for attribute error
if "error" in self.attributes[attr_name]:
if self.attributes[attr_name]["error"]:
attrib_dict["error"] = str(self.attributes[attr_name]["error"])
# Check that data value has been read
if "data" not in self.attributes[attr_name]:
pass
elif "value" in self.attributes[attr_name]["data"]:
data_val: Any = self.attributes[attr_name]["data"]["value"]
self.logger.debug(
"Attribute %s data type %s: %s", attr_name, type(data_val), data_val
)
# Check data type
if type(data_val) is dict:
attrib_dict["data"]["value"] = {}
for key in data_val:
attrib_dict["data"]["value"][key] = data_val[key]
elif type(data_val) is numpy.ndarray:
attrib_dict["data"]["value"] = data_val.tolist()
elif type(data_val) is list:
attrib_dict["data"]["value"] = data_val
elif type(data_val) is tuple:
attrib_dict["data"]["value"] = list(data_val)
elif type(data_val) is str:
if not data_val:
attrib_dict["data"]["value"] = ""
elif data_val[0] == "{" and data_val[-1] == "}":
attrib_dict["data"]["value"] = json.loads(data_val)
else:
attrib_dict["data"]["value"] = data_val
else:
attrib_dict["data"]["value"] = str(data_val)
else:
pass
return attrib_dict
self.logger.info("Building small JSON data for device %s", self.dev_name)
devdict: dict = {}
devdict["name"] = self.dev_name
devdict["errors"] = self.dev_errors
# Attributes
devdict["attributes"] = []
if self.attribs_found:
for attrib in self.attribs_found:
self.logger.debug("Read JSON attribute %s", attrib)
devdict["attributes"].append(read_json_attribute(attrib))
else:
for attrib in progress_bar(
self.attribs,
not self.quiet_mode,
prefix=f"Read {len(self.attribs)} JSON attributes :",
suffix="complete",
decimals=0,
length=100,
):
devdict["attributes"].append(read_json_attribute(attrib))
self.logger.debug("Built small JSON : %s", devdict)
return devdict
[docs]
def make_json_medium(self) -> dict: # noqa: C901
"""
Convert internal values to medium size JSON.
:return: dictionary
"""
def read_json_attribute(attr_name: str) -> dict:
"""
Add attribute to dictionary.
:param attr_name: attribute name
:returns: dictionary
"""
self.logger.debug("Read JSON attribute %s", attr_name)
# Check for unknown attribute
attrib_dict: dict = {}
attrib_dict["name"] = attr_name
attrib_dict["config"] = {}
if attr_name not in self.attributes:
self.logger.debug("Unknown attribute %s not shown", attr_name)
return attrib_dict
attrib_dict["data"] = {}
# Check for attribute error
if "error" in self.attributes[attr_name]:
attrib_dict["error"] = str(self.attributes[attr_name]["error"])
# Other stuff
attrib_dict["poll_period"] = self.attributes[attr_name]["poll_period"]
# Check that data value has been read
if "data" not in self.attributes[attr_name]:
pass
elif "value" in self.attributes[attr_name]["data"]:
data_val: Any = self.attributes[attr_name]["data"]["value"]
self.logger.debug(
"Attribute %s data type %s: %s", attr_name, type(data_val), data_val
)
# Check data type
attrib_dict["data"]["type"] = str(self.attributes[attr_name]["data"]["type"])
attrib_dict["data"]["pytype"] = str(self.attributes[attr_name]["data"]["pytype"])
if type(data_val) is dict:
attrib_dict["data"]["value"] = {}
for key in data_val:
attrib_dict["data"]["value"][key] = data_val[key]
elif type(data_val) is numpy.ndarray:
attrib_dict["data"]["value"] = data_val.tolist()
elif type(data_val) is list:
attrib_dict["data"]["value"] = data_val
elif type(data_val) is tuple:
attrib_dict["data"]["value"] = list(data_val)
elif type(data_val) is str:
if not data_val:
attrib_dict["data"]["value"] = ""
elif data_val[0] == "{" and data_val[-1] == "}":
attrib_dict["data"]["value"] = json.loads(data_val)
else:
attrib_dict["data"]["value"] = data_val
elif attrib_dict["data"]["type"] == "DevEnum":
if "enum_labels" in attrib_dict["config"]:
attrib_dict["data"]["value"] = attrib_dict["config"]["enum_labels"][
data_val
]
else:
attrib_dict["data"]["value"] = str(data_val)
else:
attrib_dict["data"]["value"] = str(data_val)
else:
pass
return attrib_dict
def read_json_command(cmd_name: str) -> dict:
"""
Add commands to dictionary.
:param cmd_name: command name
:returns: dictionary
"""
cmd_dict: dict = {}
cmd_dict["name"] = cmd_name
cmd_dict["poll_period"] = self.commands[cmd_name]["poll_period"]
# Check for error message
if "error" in self.commands[cmd_name]:
cmd_dict["error"] = self.commands[cmd_name]["error"]
# Check command configuration
cmd_dict["config"] = {}
if self.commands[cmd_name]["config"] is not None:
cmd_cfg = self.commands[cmd_name]["config"]
# Input type
cmd_dict["config"]["in_type"] = repr(cmd_cfg.in_type)
# Output type
cmd_dict["config"]["out_type"] = repr(cmd_cfg.out_type)
cmd_dict["config"]["cmd_tag"] = cmd_cfg.cmd_tag
cmd_dict["config"]["disp_level"] = str(cmd_cfg.disp_level)
if "value" in self.commands[cmd_name]:
cmd_dict["value"] = self.commands[cmd_name]["value"]
return cmd_dict
def read_json_property(prop_name: str) -> dict:
"""
Add properties to dictionary.
:param prop_name: property name
:returns: dictionary
"""
# Check that value has been read
prop_dict: dict = {}
prop_dict["name"] = prop_name
if "value" in self.properties[prop_name]:
prop_val: Any = self.properties[prop_name]["value"]
# pylint: disable-next=c-extension-no-member
if type(prop_val) is tango._tango.StdStringVector:
prop_dict["value"] = [] # delimiter.join(prop_val)
for propv in prop_val:
prop_dict["value"].append(propv)
else:
prop_dict["value"] = prop_val
return prop_dict
# Read attribute and command configuration
self.logger.info("Building medium JSON data for device %s", self.dev_name)
self.read_config_all()
devdict: dict = {}
devdict["name"] = self.dev_name
devdict["errors"] = self.dev_errors
devdict["db_host"] = self.db_host
devdict["db_port"] = self.db_port
devdict["tango_lib"] = self.tango_lib
devdict["green_mode"] = self.green_mode
devdict["version"] = self.version
devdict["device_access"] = self.dev_access
devdict["fqdn"] = self.fqdn
devdict["idl_version"] = self.idl_version
devdict["logging_level"] = self.logging_level
if self.logging_target is not None:
devdict["logging_target"] = list(self.logging_target)
else:
devdict["logging_target"] = []
if self.pipe_config is not None:
devdict["pipe_config"] = list(self.pipe_config)
else:
devdict["pipe_config"] = []
devdict["source"] = self.source
devdict["timeout_millis"] = self.timeout_millis
devdict["transparency_reconnection"] = self.transparency_reconnection
# TODO not ready for the big time yet
# devdict["componentstates"] = self.componentstates
if self.jargon:
devdict["acronyms"] = self.jargon
# Information
devdict["info"] = {}
if self.info is not None:
devdict["info"]["dev_class"] = self.info.dev_class
devdict["info"]["dev_type"] = self.info.dev_type
devdict["info"]["server_host"] = self.info.server_host
else:
devdict["info"] = {}
# Attributes
devdict["attributes"] = []
if self.attribs_found:
for attrib in self.attribs_found:
self.logger.debug("Read JSON attribute %s", attrib)
devdict["attributes"].append(read_json_attribute(attrib))
else:
for attrib in progress_bar(
self.attribs,
not self.quiet_mode,
prefix=f"Read {len(self.attribs)} JSON attributes :",
suffix="complete",
decimals=0,
length=100,
):
devdict["attributes"].append(read_json_attribute(attrib))
# Commands
devdict["commands"] = []
if self.commands:
for cmd in self.commands:
self.logger.debug("Read JSON command %s", cmd)
devdict["commands"].append(read_json_command(cmd))
# Properties
devdict["properties"] = []
if self.properties:
for prop in self.properties:
self.logger.debug("Read JSON property %s", prop)
devdict["properties"].append(read_json_property(prop))
# Processes
if self.procs:
devdict["processes"] = {"output": self.procs["output"]}
else:
devdict["processes"] = {}
self.logger.debug("Built medium JSON : %s", devdict)
return devdict
[docs]
def make_json_large(self) -> dict: # noqa: C901
"""
Convert internal values to JSON.
:return: dictionary
"""
def read_json_attribute(attr_name: str) -> dict:
"""
Add attribute to dictionary.
:param attr_name: attribute name
:returns: dictionary
"""
self.logger.debug("Read JSON attribute %s", attr_name)
# Check for unknown attribute
attrib_dict: dict = {}
attrib_dict["name"] = attr_name
attrib_dict["config"] = {}
if attr_name not in self.attributes:
self.logger.debug("Unknown attribute %s not shown", attr_name)
return attrib_dict
attrib_dict["data"] = {}
# Check for attribute error
if "error" in self.attributes[attr_name]:
attrib_dict["error"] = str(self.attributes[attr_name]["error"])
# Check attribute configuration
if self.attributes[attr_name]["config"] is not None:
attr_cfg = self.attributes[attr_name]["config"]
# Description
try:
attrib_dict["config"]["description"] = attr_cfg.description
except UnicodeDecodeError:
attrib_dict["config"]["description"] = "N/A"
# Alarms
dev_items = attr_cfg.alarms
attrib_dict["config"]["alarms"] = {
"delta_t": dev_items.delta_t,
"delta_val": dev_items.delta_val,
"extensions": list(dev_items.extensions),
"max_alarm": dev_items.max_alarm,
"max_warning": dev_items.max_warning,
"min_alarm": dev_items.min_alarm,
"min_warning": dev_items.min_warning,
}
# Events
dev_items = attr_cfg.events
attrib_dict["config"]["events"] = {
"arch_event": {
"archive_abs_change": dev_items.arch_event.archive_abs_change,
"archive_period": dev_items.arch_event.archive_period,
"archive_rel_change": dev_items.arch_event.archive_rel_change,
"extensions": list(dev_items.arch_event.extensions),
},
"ch_event": {
"abs_change": dev_items.ch_event.abs_change,
"extensions": list(dev_items.ch_event.extensions),
"rel_change": dev_items.ch_event.rel_change,
},
"per_event": {
"extensions": list(dev_items.per_event.extensions),
"period": dev_items.per_event.period,
},
}
attrib_dict["config"]["sys_extensions"] = list(attr_cfg.sys_extensions)
# Root name
attrib_dict["config"]["root_attr_name"] = attr_cfg.root_attr_name
# Format
attrib_dict["config"]["format"] = attr_cfg.format
# Data format
attrib_dict["config"]["data_format"] = str(attr_cfg.data_format)
# Display level
attrib_dict["config"]["disp_level"] = str(attr_cfg.disp_level)
# Data type
dtype = attr_cfg.data_type
# pylint: disable-next=c-extension-no-member
if dtype == tango._tango.CmdArgType.DevEnum:
attrib_dict["config"]["enum_labels"] = list(attr_cfg.enum_labels)
tydict = tango.CmdArgType.names
attrib_dict["config"]["data_type"] = list(tydict.keys())[
list(tydict.values()).index(attr_cfg.data_type)
]
# Display unit
attrib_dict["config"]["display_unit"] = attr_cfg.display_unit
# Standard unit
attrib_dict["config"]["standard_unit"] = attr_cfg.standard_unit
# Writable
attrib_dict["config"]["writable"] = str(attr_cfg.writable)
attrib_dict["config"]["max_dim_x"] = attr_cfg.max_dim_x
attrib_dict["config"]["max_dim_y"] = attr_cfg.max_dim_y
attrib_dict["config"]["max_alarm"] = attr_cfg.max_alarm
attrib_dict["config"]["max_value"] = attr_cfg.max_value
attrib_dict["config"]["memorized"] = str(attr_cfg.memorized)
attrib_dict["config"]["min_alarm"] = attr_cfg.min_alarm
attrib_dict["config"]["min_value"] = attr_cfg.min_value
# Writable attribute name
attrib_dict["config"]["writable_attr_name"] = attr_cfg.writable_attr_name
# Other stuff
attrib_dict["poll_period"] = self.attributes[attr_name]["poll_period"]
# Check that data value has been read
if "data" not in self.attributes[attr_name]:
pass
elif "value" in self.attributes[attr_name]["data"]:
data_val: Any = self.attributes[attr_name]["data"]["value"]
self.logger.debug(
"Attribute %s data type %s: %s", attr_name, type(data_val), data_val
)
# Check data type
attrib_dict["data"]["type"] = str(self.attributes[attr_name]["data"]["type"])
attrib_dict["data"]["pytype"] = str(self.attributes[attr_name]["data"]["pytype"])
if type(data_val) is dict:
attrib_dict["data"]["value"] = {}
for key in data_val:
attrib_dict["data"]["value"][key] = data_val[key]
elif type(data_val) is numpy.ndarray:
attrib_dict["data"]["value"] = data_val.tolist()
elif type(data_val) is list:
attrib_dict["data"]["value"] = data_val
elif type(data_val) is tuple:
attrib_dict["data"]["value"] = list(data_val)
elif type(data_val) is str:
if not data_val:
attrib_dict["data"]["value"] = ""
elif data_val[0] == "{" and data_val[-1] == "}":
attrib_dict["data"]["value"] = json.loads(data_val)
else:
attrib_dict["data"]["value"] = data_val
elif attrib_dict["data"]["type"] == "DevEnum":
if "enum_labels" in attrib_dict["config"]:
attrib_dict["data"]["value"] = attrib_dict["config"]["enum_labels"][
data_val
]
else:
attrib_dict["data"]["value"] = str(data_val)
else:
attrib_dict["data"]["value"] = str(data_val)
# Data format, e.g. "SCALAR"
attrib_dict["data"]["data_format"] = str(
self.attributes[attr_name]["data"]["data_format"]
)
else:
pass
return attrib_dict
def read_json_command(cmd_name: str) -> dict:
"""
Add commands to dictionary.
:param cmd_name: command name
:returns: dictionary
"""
cmd_dict: dict = {}
cmd_dict["name"] = cmd_name
cmd_dict["poll_period"] = self.commands[cmd_name]["poll_period"]
# Check for error message
if "error" in self.commands[cmd_name]:
cmd_dict["error"] = self.commands[cmd_name]["error"]
# Check command configuration
cmd_dict["config"] = {}
if self.commands[cmd_name]["config"] is not None:
cmd_cfg = self.commands[cmd_name]["config"]
# Input type
cmd_dict["config"]["in_type"] = repr(cmd_cfg.in_type)
# Input type description
cmd_dict["config"]["in_type_desc"] = cmd_cfg.in_type_desc
# Output type
cmd_dict["config"]["out_type"] = repr(cmd_cfg.out_type)
# Output type description
cmd_dict["config"]["out_type_desc"] = cmd_cfg.out_type_desc
cmd_dict["config"]["cmd_tag"] = cmd_cfg.cmd_tag
cmd_dict["config"]["disp_level"] = str(cmd_cfg.disp_level)
if "value" in self.commands[cmd_name]:
cmd_dict["value"] = self.commands[cmd_name]["value"]
return cmd_dict
def read_json_property(prop_name: str) -> dict:
"""
Add properties to dictionary.
:param prop_name: property name
:returns: dictionary
"""
# Check that value has been read
prop_dict: dict = {}
prop_dict["name"] = prop_name
if "value" in self.properties[prop_name]:
prop_val: Any = self.properties[prop_name]["value"]
# pylint: disable-next=c-extension-no-member
if type(prop_val) is tango._tango.StdStringVector:
prop_dict["value"] = [] # delimiter.join(prop_val)
for propv in prop_val:
prop_dict["value"].append(propv)
else:
prop_dict["value"] = prop_val
return prop_dict
# Read attribute and command configuration
self.logger.info("Building medium JSON data for device %s", self.dev_name)
self.read_config_all()
devdict: dict = {}
devdict["name"] = self.dev_name
devdict["errors"] = self.dev_errors
devdict["db_host"] = self.db_host
devdict["db_port"] = self.db_port
devdict["tango_lib"] = self.tango_lib
devdict["green_mode"] = self.green_mode
devdict["version"] = self.version
devdict["device_access"] = self.dev_access
devdict["fqdn"] = self.fqdn
devdict["idl_version"] = self.idl_version
devdict["logging_level"] = self.logging_level
if self.logging_target is not None:
devdict["logging_target"] = list(self.logging_target)
else:
devdict["logging_target"] = []
if self.pipe_config is not None:
devdict["pipe_config"] = list(self.pipe_config)
else:
devdict["pipe_config"] = []
devdict["source"] = self.source
devdict["timeout_millis"] = self.timeout_millis
devdict["transparency_reconnection"] = self.transparency_reconnection
# TODO not ready for the big time yet
# devdict["componentstates"] = self.componentstates
if self.jargon:
devdict["acronyms"] = self.jargon
# Information
devdict["info"] = {}
if self.info is not None:
devdict["info"]["dev_class"] = self.info.dev_class
devdict["info"]["dev_type"] = self.info.dev_type
devdict["info"]["doc_url"] = self.info.doc_url
devdict["info"]["server_host"] = self.info.server_host
devdict["info"]["server_id"] = self.info.server_id
devdict["info"]["server_version"] = self.info.server_version
else:
devdict["info"] = {}
# Read alias where applicable
try:
devdict["aliases"] = self.dev.get_device_alias_list()
except AttributeError as oerr:
self.logger.debug("Could not read device %s alias : %s", self.dev_name, str(oerr))
devdict["aliases"] = "N/A"
# Attributes
devdict["attributes"] = []
if self.attribs_found:
for attrib in self.attribs_found:
self.logger.debug("Read JSON attribute %s", attrib)
devdict["attributes"].append(read_json_attribute(attrib))
else:
self.logger.info("Reading %d JSON attributes", len(self.attribs))
for attrib in progress_bar(
self.attribs,
not self.quiet_mode,
prefix=f"Read {len(self.attribs)} JSON attributes :",
suffix="complete",
decimals=0,
length=100,
):
devdict["attributes"].append(read_json_attribute(attrib))
# Commands
devdict["commands"] = []
if self.commands:
for cmd in self.commands:
self.logger.debug("Read JSON command %s", cmd)
devdict["commands"].append(read_json_command(cmd))
# Properties
devdict["properties"] = []
if self.properties:
for prop in self.properties:
self.logger.debug("Read JSON property %s", prop)
devdict["properties"].append(read_json_property(prop))
# Processes
devdict["processes"] = self.procs
self.logger.debug("Built large JSON : %s", devdict)
return devdict
[docs]
def write_attribute_value(self, attrib: str, value: str) -> int:
"""
Set value of attribute.
:param attrib: attribute name
:param value: attribute value
:return: error condition
"""
# Check that attribute is known
if attrib not in self.attributes:
self.logger.error("Attribute %s not found in %s", attrib, self.attributes.keys())
return 1
# Check type
devtype: Any = self.attributes[attrib]["data"]["type"]
wval: Any
if devtype == "DevEnum":
wval = int(value)
else:
wval = value
self.logger.debug("Set attribute %s (%s) to %s (%s)", attrib, devtype, wval, type(wval))
# Write a single attribute
self.dev.write_attribute(attrib, wval)
return 0
[docs]
def read_attribute_value(self) -> None:
"""Read device attributes."""
self.logger.debug("Reading %d attributes for %s", len(self.attributes), self.dev_name)
for attrib in self.attributes:
# Read a single attribute
self.attributes[attrib]["data"] = {}
try:
attrib_data = self.dev.read_attribute(attrib)
except tango.DevFailed as terr:
err_msg = str(terr.args[-1].desc)
self.logger.debug("Failed on attribute %s : %s", attrib, err_msg)
self.attributes[attrib]["error"] = err_msg
self.attributes[attrib]["data"]["type"] = "N/A"
self.attributes[attrib]["data"]["data_format"] = "N/A"
continue
if attrib in self.block_items["attributes"]:
self.logger.warning("Not reading attribute %s value", attrib)
self.attributes[attrib]["data"]["value"] = "N/A"
else:
self.attributes[attrib]["data"]["value"] = attrib_data.value
self.attributes[attrib]["data"]["type"] = str(attrib_data.type)
self.attributes[attrib]["data"]["pytype"] = type(attrib_data.value).__name__
self.attributes[attrib]["data"]["data_format"] = str(attrib_data.data_format)
self.logger.debug(
"Read attribute %s data : %s", attrib, self.attributes[attrib]["data"]
)
self.logger.debug("Read %d attributes for %s", len(self.attributes), self.dev_name)
[docs]
def read_command_value(self, run_commands: list, run_commands_name: list) -> None:
"""
Read device commands.
:param run_commands: commands safe to run without parameters
:param run_commands_name: commands safe to run with device name as parameter
"""
self.logger.debug("Reading %d commands for %s", len(self.commands), self.dev_name)
for cmd in self.commands:
if cmd in run_commands:
# Execute a command on a device
try:
self.commands[cmd]["value"] = self.dev.command_inout(cmd)
except tango.ConnectionFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Could not run command %s : %s", cmd, err_msg)
self.commands[cmd]["value"] = "N/A"
self.commands[cmd]["error"] = err_msg
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Could not run command %s : %s", cmd, err_msg)
self.commands[cmd]["value"] = "N/A"
self.commands[cmd]["error"] = err_msg
self.logger.debug(
"Read command %s (%s) : %s",
cmd,
type(self.commands[cmd]["value"]),
self.commands[cmd]["value"],
)
elif cmd in run_commands_name:
# Run command in/out with device name as parameter
try:
self.commands[cmd]["value"] = self.dev.command_inout(cmd, self.dev_name)
self.logger.debug(
"Read command %s (%s) with arg %s : %s",
cmd,
type(self.commands[cmd]["value"]),
self.dev_name,
self.commands[cmd]["value"],
)
except tango.DevFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning(
"Could not run command %s with device name : %s", cmd, err_msg
)
self.commands[cmd]["value"] = "N/A"
self.commands[cmd]["error"] = err_msg
else:
# Nothing to see here
pass
self.logger.debug("Read %d commands for %s", len(self.commands), self.dev_name)
return
[docs]
def read_property_value(self) -> None:
"""Read device properties."""
self.logger.debug("Reading %d properties for %s", len(self.properties), self.dev_name)
for prop in self.properties:
# get_property returns this:
# {'CspMasterFQDN': ['mid-csp/control/0']}
if prop in self.block_items["properties"]:
self.logger.warning("Not reading property %s value", prop)
self.properties[prop]["value"] = ["N/R"]
continue
# Get a list of properties for a device
try:
self.properties[prop]["value"] = self.dev.get_property(prop)[prop]
except tango.CommunicationFailed as terr:
err_msg = terr.args[0].desc.strip()
self.logger.warning("Could not get property %s value: %s", prop, err_msg)
self.properties[prop]["value"] = ["N/A"]
self.logger.debug("Read property %s : %s", prop, self.properties[prop]["value"])
self.logger.debug("Read %d properties for %s", len(self.properties), self.dev_name)
return
[docs]
def print_list(self, eol: str = "\n") -> None:
"""
Print data.
:param eol: printed at the end
"""
self.read_config()
self.logger.debug("Print list: %s", self.list_items)
self.logger.debug("Use values: %s", self.dev_values)
print(f"{self.dev_name:64} ", end="", file=self.outf)
for attribute in self.list_items["attributes"]:
field_value = self.dev_values[attribute]
field_width = self.list_items["attributes"][attribute]
self.logger.debug(f"Print attribute {attribute} : {field_value} {field_width=}")
print(f"{field_value:{field_width}} ", end="", file=self.outf)
for command in self.list_items["commands"]:
field_value = self.dev_values[command]
field_width = self.list_items["commands"][command]
self.logger.debug(f"Print command {command} : {field_value} ({field_width=})")
print(f"{field_value:{field_width}} ", end="", file=self.outf)
for tproperty in self.list_items["properties"]:
field_value = self.dev_values[tproperty]
field_width = self.list_items["properties"][tproperty]
self.logger.debug(f"Print property {tproperty} : {field_value} ({field_width=})")
print(f"{field_value:{field_width}} ", end="", file=self.outf)
print(f"{self.dev_class:32}", end=eol, file=self.outf)
[docs]
def print_list_attribute(self, lwid: int, show_val: bool = True) -> None:
"""
Print list of devices with attribute.
:param lwid: line width
:param show_val: print value
"""
n: int
self.print_list("")
n = 0
for attrib in self.attributes.keys():
if n:
print(f"{' ':{lwid}}", end="", file=self.outf)
if show_val:
try:
attrib_val = self.attributes[attrib]["data"]["value"]
except KeyError:
attrib_val = "N/A"
print(f" {attrib:40} {attrib_val}", file=self.outf)
else:
print(f" {attrib}", file=self.outf)
n += 1
self.logger.debug("Listed %d attributes", len(self.attributes))
[docs]
def print_list_command(self, lwid: int, show_val: bool = True) -> None:
"""
Print list of devices with command.
:param lwid: line width
:param show_val: print value
"""
n: int
self.print_list("")
n = 0
for cmd in self.commands.keys():
if n:
print(f"{' ':{lwid}}", end="", file=self.outf)
if show_val:
if "value" in self.commands[cmd]:
cmdv = self.commands[cmd]["value"]
print(f" {cmd:40} {cmdv}", file=self.outf)
else:
print(f" {cmd}", file=self.outf)
else:
print(f" {cmd}", file=self.outf)
n += 1
self.logger.debug("Listed %d commands", len(self.commands))
[docs]
def print_list_property(self, lwid: int, show_val: bool = True) -> None:
"""
Print list of devices with property.
:param lwid: line width
:param show_val: print value
"""
n: int
self.print_list("")
n = 0
for prop in self.properties.keys():
if n:
print(f"{' ':{lwid}}", end="", file=self.outf)
if show_val:
propv = ",".join(self.properties[prop]["value"])
print(f" {prop:40} {propv}", file=self.outf)
else:
print(f" {prop}", file=self.outf)
n += 1
self.logger.debug("Listed %d properties", len(self.properties))
[docs]
def print_html_large(self, html_body: bool) -> None:
"""
Print full HTML report.
:param html_body: Flag to print HTML header and footer
"""
self.logger.debug("Printing as large HTML")
devsdict = {f"{self.dev_name}": self.make_json_large()}
json_reader: TangoJsonReader = TangoJsonReader(
self.logger, self.indent, self.quiet_mode, None, devsdict, self.outf
)
json_reader.print_html_large(html_body)
[docs]
def print_html_small(self, html_body: bool) -> None:
"""
Print shortened HTML report.
:param html_body: Flag to print HTML header and footer
"""
self.logger.debug("Printing as small HTML")
devsdict = {f"{self.dev_name}": self.make_json_small()}
json_reader: TangoJsonReader = TangoJsonReader(
self.logger, self.indent, self.quiet_mode, None, devsdict, self.outf
)
json_reader.print_html_small(html_body)
[docs]
def device_run_cmd(self, ns_name: str, pod_name: str, pod_cmd: str) -> dict:
"""
Run a command in specified pod.
:param ns_name: namespace
:param pod_name: pod name
:param pod_cmd: command to run
:returns: dictionary with output information
"""
pod: dict = {}
if KubernetesInfo is None:
return pod
k8s: KubernetesInfo = KubernetesInfo(self.logger, None)
pod["name"] = pod_name
pod["command"] = pod_cmd
self.logger.info("Running command in device pod %s : '%s'", pod_name, pod_cmd)
pod_exec: list = pod_cmd.split(" ")
resps: str = k8s.exec_pod_command(ns_name, pod_name, pod_exec)
pod["output"] = []
if not resps:
pod["output"].append("N/A")
elif "\n" in resps:
resp: str
for resp in resps.split("\n"):
if not resp:
pass
elif resp[-6:] == "ps -ef":
pass
elif resp[0:3] == "UID":
pass
elif resp[0:3] == "PID":
pass
else:
pod["output"].append(resp)
else:
pod["output"].append(resps)
self.logger.debug(
"Ran command '%s' with output: %s bytes, %d lines",
pod_cmd,
len(resps),
len(pod["output"]),
)
self.logger.debug("Command %s output: %s", pod_cmd, pod)
return pod
[docs]
def read_procs(self, ns_name: str | None) -> int:
"""
Read processes running on host.
:param ns_name: namespace
:returns: error condition
"""
self.logger.debug("Reading processes")
if self.info is None:
self.procs = {}
return 1
if ns_name is None:
self.logger.warning("Namespace for processes not set")
self.procs = {}
return 1
pod_name = self.info.server_host
procs_cmd: str = "ps -ef"
self.procs = self.device_run_cmd(ns_name, pod_name, procs_cmd)
if not self.procs:
self.logger.warning("Could not read %d processes")
return 1
self.logger.debug("Read %d processes", len(self.procs))
return 0
[docs]
def read_pod(self, ns_name: str | None) -> int:
"""
Read info about pod running this device.
:param ns_name: namespace
:returns: error condition
"""
if KubernetesInfo is None:
self.logger.warning("Kubernetes not supported")
return 1
if ns_name is None:
self.logger.warning("Namespace for pod not set")
self.pod_desc = {}
return 1
self.pod_name = self.info.server_host
self.logger.debug("Reading description of pod : %s", self.pod_name)
if self.pod_name is None:
self.logger.warning("Could not read server host for device %s", self.dev_name)
self.dev_errors.append(f"Could not read server host for device {self.dev_name}")
self.pod_desc = {}
return 1
k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx)
pod_desc: Any = k8s.get_pod_desc(ns_name, self.pod_name)
if pod_desc is None:
self.pod_desc = {}
return 1
self.pod_desc = pod_desc.to_dict()
self.logger.info("Read description of pod : %s", self.pod_name)
self.logger.debug(
"Pod description :\n%s", json.dumps(self.pod_desc, indent=4, default=str)
)
return 0