Source code for ska_tangoctl.tango_control.read_tango_devices

"""Read and display Tango stuff."""

import datetime
import json
import logging
import os
import re
import sys
import time
from typing import Any

import numpy as np
import pandas as pd
import tango
import yaml

from ska_tangoctl.tango_control.disp_action import DispAction
from ska_tangoctl.tango_control.progress_bar import progress_bar
from ska_tangoctl.tango_control.read_tango_device import TangoctlDevice
from ska_tangoctl.tango_control.tango_json import TangoJsonReader

try:
    from ska_tangoctl.k8s_info.get_k8s_info import KubernetesInfo, run_catior
except ModuleNotFoundError:
    KubernetesInfo = None  # type: ignore[assignment,misc]

FILE_MODE: str = "w"


class NumpyEncoder(json.JSONEncoder):
    """Make a numpy object more JSON-friendly."""

    def default(self, np_obj: Any) -> Any:
        """
        Set values to default.

        :param np_obj: object to be read
        :returns: JSON-friendly thing
        """
        if isinstance(np_obj, np.ndarray):
            return np_obj.tolist()
        return super().default(np_obj)


[docs] class TangoctlDevices: """Compile a dictionary of available Tango devices.""" logger: logging.Logger
[docs] def __init__( # noqa: C901s self, logger: logging.Logger, tango_host: str | None, outf: Any, timeout_millis: int | None, dev_status: dict, cfg_data: dict, tgo_name: str | None, uniq_cls: bool, disp_action: DispAction, k8s_ctx: str | None, k8s_cluster: str | None, k8s_ns: str | None, domain_name: str | None, tgo_attrib: str | None = None, tgo_cmd: str | None = None, tgo_prop: str | None = None, tgo_class: str | None = None, dev_count: int = 0, ): """ Get a dictionary of devices. :param logger: logging handle :param tango_host: Tango database host :param outf: output file pointer :param timeout_millis: Tango device timeout in milliseconds :param dev_status: dictionary with status stuff :param cfg_data: configuration data in JSON format :param tgo_name: filter device name :param uniq_cls: only read one device per class :param disp_action: output format :param k8s_ctx: K8S context :param k8s_cluster: K8S cluster :param k8s_ns: K8S namespace :param domain_name: K8S domain name :param tgo_attrib: filter attribute name :param tgo_cmd: filter command name :param tgo_prop: filter property name :param tgo_class: filter class name :param dev_count: number of Tango device to read (for testing) :raises Exception: when database connect fails """ self.logger = logger self.disp_action = disp_action self.timeout_millis: int | None = timeout_millis self.prog_bar: bool if self.logger.getEffectiveLevel() in (logging.DEBUG, logging.INFO): self.prog_bar = False else: self.prog_bar = not self.disp_action.quiet_mode self.dev_status: dict = dev_status self.cfg_data = cfg_data self.k8s_ctx: str | None = k8s_ctx self.k8s_cluster: str | None = k8s_cluster self.k8s_ns: str | None = k8s_ns self.domain_name: str | None = domain_name self.tgo_name: str | None = tgo_name self.tgo_attrib: str | None = tgo_attrib self.tgo_cmd: str | None = tgo_cmd self.tgo_prop: str | None = tgo_prop self.tgo_class: str | None = tgo_class if self.tgo_class is not None: self.tgo_class = self.tgo_class.lower() self.uniq_cls: bool = uniq_cls self.outf: Any = outf if dev_count: self.dev_count = dev_count else: self.dev_count = sys.maxsize self.logger.debug("Devices configuration : %s", self.cfg_data) self.delimiter = self.cfg_data["delimiter"] self.run_commands = self.cfg_data["run_commands"] self.logger.debug("Run commands %s", self.run_commands) self.run_commands_name = self.cfg_data["run_commands_name"] self.list_items = self.cfg_data["list_items"] self.block_items = self.cfg_data["block_items"] self.logger.debug("Run commands with name %s", self.run_commands_name) self.start_now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.start_perf = time.perf_counter() self.attribs_found: list = [] self.tgo_space: str = "" self.dev_classes: list = [] self.good_pods: dict = {} self.bad_pods: dict = {} self.devices: dict = {} self.device_list: list # Set Tango database host self.tango_host: str | None if tango_host is not None: self.tango_host = tango_host else: self.tango_host = os.getenv("TANGO_HOST") if self.tango_host is None: self.logger.error("No host specified and TANGO_HOST not set") raise Exception("Could not set Tango database host") os.environ["TANGO_HOST"] = self.tango_host # Get high level Tango object which contains the link to the static database database: tango.Database try: database = tango.Database() except Exception as oerr: self.logger.warning("Could not connect to Tango database %s", self.tango_host) raise oerr self.logger.debug("Connect to Tango database %s", self.tango_host) # Read devices self.device_names: list = [] exported_devices = sorted( database.get_device_exported("*").value_string, reverse=self.disp_action.reverse ) self.logger.debug("Found %d exported devices", len(exported_devices)) device_name: str n_devs = 0 for device_name in exported_devices: if not self.disp_action.evrythng: chk_fail: bool = False for dev_chk in self.cfg_data["ignore_device"]: chk_len: int = len(dev_chk) if device_name[0:chk_len] == dev_chk: chk_fail = True break if chk_fail: self.logger.info( "Skip device : '%s' matches '%s'", device_name, self.cfg_data["ignore_device"], ) continue if self.tgo_name: ichk: str = device_name.lower() if self.tgo_name not in ichk: self.logger.info("Ignore device : %s (not '%s')", self.tgo_name, ichk) continue n_devs += 1 if n_devs > self.dev_count: self.logger.warning("Stop reading devices after %d", n_devs) break self.logger.debug("Add device : %s", device_name) self.device_names.append(device_name) self.logger.debug("Found %d device names", len(self.device_names))
[docs] def read_devices_nodb(self) -> None: """Read a single device without database connection.""" trl = f"tango://{self.tango_host}/{self.tgo_name}#dbase=no" new_dev = TangoctlDevice( self.logger, self.disp_action, self.outf, self.timeout_millis, self.dev_status, trl, self.list_items, self.block_items, self.tgo_attrib, self.tgo_cmd, self.tgo_prop, self.k8s_ctx, self.domain_name, indent=self.disp_action.indent, ) self.devices[self.tgo_name] = new_dev
[docs] def read_device_hosts(self) -> list: # noqa: C901 """ Compile a list of hosts. :returns: list of dictionaries with host information """ hosts: list = [] db = tango.Database() for host_name in self.bad_pods: self.logger.info( "Reading IP address of host %s running devices %s", host_name, ",".join(self.bad_pods[host_name]), ) host: dict = {} host["name"] = host_name host["devices"] = [] host["addresses"] = [] for device_name in self.bad_pods[host_name]: device_info: tango.DbDevFullInfo = db.get_device_info(device_name) host["devices"].append( { "name": device_name, "class_name": device_info.class_name, "ds_full_name": device_info.ds_full_name, "exported": device_info.exported, "ior": device_info.ior, "pid": device_info.pid, "started_date": device_info.started_date, "stopped_date": device_info.stopped_date, "version": device_info.version, } ) if not host["addresses"]: ip_addrs: list = [] k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) pod_name = "ska-tango-base-itango-console" exec_command = ["catior", device_info.ior] if self.k8s_ns is not None: ior = k8s.exec_pod_command(self.k8s_ns, pod_name, exec_command) if not ior: host["catior"] = ior.split("\n") for line in ior.split("\n"): self.logger.debug("%s", line) if "IIOP" in line: ip_addr = line.split(" ")[3] ip_addrs.append(ip_addr) host["addresses"] = ip_addrs if not ip_addrs: self.logger.warning("Could not read IP addresses from pod %s", pod_name) res, ior, err = run_catior(device_info.ior) if res: self.logger.warning("Could not run catior : %s", err) else: self.logger.info("Run IOR output : %s", ior) host["catior"] = ior.split("\n") for line in ior.split("\n"): self.logger.debug("%s", line) if "IIOP" in line: ip_addr = line.split(" ")[3] ip_addrs.append(ip_addr) host["addresses"] = ip_addrs hosts.append(host) return hosts
[docs] def read_devices(self) -> None: # noqa: C901 """Read all devices.""" if self.tgo_name: self.tgo_name = self.tgo_name.lower() self.logger.info( "Reading devices with name %s attribute %s command %s property %s...", self.tgo_name, self.tgo_attrib, self.tgo_cmd, self.tgo_prop, ) self.list_items = self.cfg_data["list_items"] self.logger.info("List items : %s", self.list_items) self.block_items = self.cfg_data["block_items"] if self.logger.getEffectiveLevel() in (logging.DEBUG, logging.INFO): self.disp_action.quiet_mode = True ndevs = len(self.device_names) self.logger.info("Reading %d devices (unique %s)", ndevs, self.uniq_cls) dev_class: str n: int = 0 for device_name in progress_bar( self.device_names, not self.disp_action.quiet_mode, prefix=f"Read {len(self.device_names)} exported devices :", suffix="complete", decimals=0, length=100, ): n += 1 try: new_dev = TangoctlDevice( self.logger, self.disp_action, self.outf, self.timeout_millis, self.dev_status, device_name, self.list_items, self.block_items, self.tgo_attrib, self.tgo_cmd, self.tgo_prop, self.k8s_ctx, self.domain_name, indent=self.disp_action.indent, ) if self.tgo_attrib: attribs_found: list = new_dev.check_for_attribute(self.tgo_attrib) if attribs_found: self.logger.debug( "Device %s matched attributes %s", device_name, attribs_found ) self.devices[device_name] = new_dev else: self.logger.debug( "Skip device %s (attribute %s not found)", device_name, self.tgo_attrib ) elif self.tgo_cmd: cmds_found: list = new_dev.check_for_command(self.tgo_cmd) if cmds_found: self.logger.debug("Device %s matched commands %s", device_name, cmds_found) self.devices[device_name] = new_dev else: self.logger.debug( "Skip device %s (command %s not found)", device_name, self.tgo_cmd ) elif self.tgo_prop: props_found: list = new_dev.check_for_property(self.tgo_prop) if props_found: self.logger.debug( "Device %s matched properties %s", device_name, props_found ) self.devices[device_name] = new_dev else: self.logger.debug( "Skip device %s (property %s not found)", device_name, self.tgo_prop ) elif self.tgo_class: dev_class = new_dev.dev_class.lower() if self.tgo_class == dev_class: self.logger.debug( "Device %s matched class %s", device_name, self.tgo_class ) self.devices[device_name] = new_dev else: self.logger.debug("Skip device %s with class %s", device_name, dev_class) elif self.uniq_cls: dev_class = new_dev.dev_class if dev_class == "---": self.logger.debug( "Skip device %s with unknown class %s", device_name, dev_class ) elif dev_class not in self.dev_classes: self.dev_classes.append(dev_class) self.devices[device_name] = new_dev else: self.logger.debug( "Skip device %s with known class %s", device_name, dev_class ) else: self.logger.debug("Add device %s", device_name) self.devices[device_name] = new_dev except Exception as e: self.logger.warning("Could not instantiate device %s: %s", device_name, e) self.devices[device_name] = None self.logger.info("Read %d devices", len(self.devices))
[docs] def get_classes(self) -> dict: """ Print list of device names. :return: dictionary with class and device names """ self.logger.info("Getting classes of %d devices...", len(self.devices)) klasses: dict = {} for device in self.devices: klass = self.devices[device].dev_class dev_name = self.devices[device].dev_name if klass not in klasses: klasses[klass] = [] klasses[klass].append(dev_name) rdict: dict = {"classes": [], "tango_host": self.tango_host} for klass in klasses: rdict["classes"].append({"name": klass, "devices": klasses[klass]}) self.logger.info("Got classes of %d devices...", len(self.devices)) self.logger.debug("Classes: %s", rdict) return rdict
[docs] def read_attribute_values(self) -> None: """Read device attribute values.""" self.logger.info("Reading attributes of %d devices", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} attributes :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: self.logger.info("Reading attribute values of device %s", device) self.devices[device].read_attribute_value()
[docs] def read_command_values(self) -> None: """Read device commands.""" self.logger.info("Reading commands of %d devices", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} device commands :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: self.logger.info("Reading commands of device %s", device) self.devices[device].read_command_value(self.run_commands, self.run_commands_name)
[docs] def read_property_values(self) -> None: """Read device properties.""" self.logger.info("Reading properties of %d devices", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} property values :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: self.logger.info("Reading properties of device %s", device) self.devices[device].read_property_value()
[docs] def read_procs(self) -> None: """Read device processes.""" rc: int pod_name: str | None self.logger.info("Reading processes for %d devices", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} processes :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is None: self.logger.warning("Device %s is empty", device) continue self.logger.info("Reading processes of device %s", device) dev: TangoctlDevice = self.devices[device] if dev.info is not None: pod_name = dev.info.server_host if pod_name in self.bad_pods: self.logger.info("Skip bad pod %s", pod_name) continue else: pod_name = None rc = dev.read_procs(self.k8s_ns) if rc: self.bad_pods[pod_name] = [] self.bad_pods[pod_name].append(device)
[docs] def read_pods(self) -> None: """Read device pods.""" rc: int pod_name: str | None self.logger.info("Reading pods for %d devices", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} processes :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: dev: TangoctlDevice = self.devices[device] if dev.info is not None: pod_name = dev.info.server_host if pod_name in self.bad_pods: self.logger.info("Skip bad pod %s", pod_name) continue else: pod_name = None if pod_name is not None: self.logger.info("Reading pod %s", pod_name) rc = dev.read_pod(self.k8s_ns) if rc: if pod_name not in self.good_pods: self.good_pods[pod_name] = { "api_version": "N/A", "kind": "N/A", "metadata": {"name": pod_name, "namespace": self.k8s_ns}, } self.bad_pods[pod_name] = [] self.bad_pods[pod_name].append(device) if pod_name not in self.good_pods: self.good_pods[pod_name] = dev.pod_desc
[docs] def read_logs(self) -> None: """Read device logs.""" rc: int pod_name: str | None self.logger.info("Reading logs for %d devices", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} processes :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: dev: TangoctlDevice = self.devices[device] pod_name = dev.info.server_host if pod_name in self.bad_pods: self.logger.info("Skip bad pod %s", pod_name) continue dev.read_procs(self.k8s_ns) rc = dev.read_pod(self.k8s_ns) if rc: self.bad_pods[pod_name] = device continue if pod_name not in self.good_pods: self.good_pods[pod_name] = [] self.good_pods[pod_name].append(dev.pod_desc)
[docs] def read_device_values(self) -> None: """Read device values.""" self.logger.debug("Reading device values: %s", self.disp_action.show()) if not ( self.dev_status or self.disp_action.show_attrib or self.disp_action.show_cmd or self.disp_action.show_prop ): self.logger.info("Reading basic information...") # TODO do something here if self.dev_status and not self.disp_action.show_attrib: self.logger.info("Reading status of devices...") # TODO do something here if self.disp_action.show_attrib: self.logger.info("Reading attribute values...") self.read_attribute_values() if self.disp_action.show_cmd: self.logger.info("Reading command values from devices...") self.read_command_values() if self.disp_action.show_prop: self.logger.info("Read property values from devices...") self.read_property_values() if self.disp_action.show_proc: self.logger.info("Read processes running on host...") self.read_procs() if self.disp_action.show_pod: self.logger.info("Read pods...") self.read_pods() else: self.logger.debug("Skip pods...") if self.disp_action.show_log: self.logger.info("Read K8S logs...") self.read_logs()
# self.logger.info("Read values for %d devices", len(self.devices))
[docs] def read_configs(self) -> None: """Read additional data.""" self.logger.debug("Reading %d device configs", len(self.devices)) for device in progress_bar( self.devices, not self.disp_action.quiet_mode, prefix=f"Read {len(self.devices)} device configs :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: dev: TangoctlDevice = self.devices[device] dev.read_config() self.logger.info("Read %d device configs", len(self.devices))
[docs] def read_configs_all(self) -> None: """Read additional data.""" self.logger.debug("Reading all %d device configs", len(self.devices)) for device in progress_bar( self.devices, not self.disp_action.quiet_mode, prefix=f"Read {len(self.devices)} device configs :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: self.devices[device].read_config_all() self.logger.info("Read all %d device configs", len(self.devices))
[docs] def make_devices_json_small(self) -> dict: """ Read device data. :return: dictionary """ devs_list: list = [] self.logger.info("Reading %d devices in JSON small format", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} JSON records :", suffix="complete", decimals=0, length=100, ): self.logger.debug("Reading device %s", device) if self.devices[device] is not None: dev = self.devices[device] devs_list.append(dev.make_json_small()) self.logger.debug("Read %d devices in JSON small format: %s", len(self.devices), devs_list) return {"devices": devs_list}
[docs] def make_devices_json_medium(self) -> dict: """ Read device data. :return: dictionary """ devs_list: list = [] self.logger.info("Reading %d devices in JSON medium format", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} JSON records :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: dev = self.devices[device] devs_list.append(dev.make_json_medium()) self.logger.debug( "Read %d devices in JSON medium format: %s", len(self.devices), devs_list ) return {"devices": devs_list}
[docs] def make_devices_json_large(self) -> dict: """ Read device data. :return: dictionary """ devs_list: list = [] self.logger.info("Reading %d devices in JSON large format", len(self.devices)) for device in progress_bar( self.devices, self.prog_bar, prefix=f"Read {len(self.devices)} JSON records :", suffix="complete", decimals=0, length=100, ): if self.devices[device] is not None: dev = self.devices[device] devs_list.append(dev.make_json_large()) self.logger.debug("Read %d devices in JSON large format: %s", len(self.devices), devs_list) return {"devices": devs_list}
[docs] def print_names_list(self) -> None: """Print list of device names.""" self.logger.info("Listing %d device names...", len(self.device_names)) print(f"Devices : {len(self.device_names)}", file=self.outf) for device_name in self.device_names: print(f"\t{device_name}", file=self.outf)
[docs] def print_classes(self) -> None: """Print list of device names.""" self.logger.info("Printing classes of %d devices...", len(self.devices)) klasses: dict = {} for device in self.devices: klass = self.devices[device].dev_class dev_name = self.devices[device].dev_name if klass not in klasses: klasses[klass] = [] klasses[klass].append(dev_name) print(f"Classes : {len(klasses)}", file=self.outf) for klass in klasses: print(f"\t{klass} : ", file=self.outf) for dev_name in klasses[klass]: print(f"\t\t{dev_name}", file=self.outf)
[docs] def print_txt_list(self, heading: str | None = None) -> None: """ Print list of devices. :param heading: print at the top """ self.logger.info("Listing %d basic devices in text format...", len(self.devices)) if heading is not None: print(f"{heading}", file=self.outf) print(f"Tango host : {os.getenv('TANGO_HOST')}", file=self.outf) self.print_txt_heading() for device in self.devices: if self.devices[device] is not None: self.devices[device].print_list() else: print(f"{device} (N/A)", file=self.outf) print(file=self.outf)
[docs] def print_txt_small(self) -> None: """Print devices as text.""" self.logger.info("Print devices as text (short)...") devsdict = self.make_devices_json_small() json_reader: TangoJsonReader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) json_reader.print_txt_small()
[docs] def print_txt_medium(self) -> None: """Print devices as text.""" self.logger.info("Printing devices as text (all)...") devsdict = self.make_devices_json_medium() json_reader: TangoJsonReader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) if not self.disp_action.quiet_mode: print("\n\n", file=self.outf) json_reader.print_txt_medium()
[docs] def print_txt_large(self) -> None: """Print devices as text.""" self.logger.info("Printing devices as text (all)...") devsdict = self.make_devices_json_large() json_reader: TangoJsonReader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) if not self.disp_action.quiet_mode: print("\n\n", file=self.outf) json_reader.print_txt_large()
[docs] def print_txt(self, heading: str | None = None) -> None: """ Print in text format. :param heading: to be printed on the top """ devsdict: dict json_reader: TangoJsonReader if self.disp_action.check(DispAction.TANGOCTL_LIST): self.logger.info("Printing devices as text (list)...") self.print_txt_list(heading) print(file=self.outf) elif self.disp_action.size == "S": self.logger.info("Printing devices as text (small)...") devsdict = self.make_devices_json_small() json_reader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) json_reader.print_txt_small() elif self.disp_action.size == "L": self.logger.info("Printing devices as text (large)...") devsdict = self.make_devices_json_large() json_reader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) json_reader.print_txt_large() else: self.logger.info("Printing devices (display action %s)...", self.disp_action) devsdict = self.make_devices_json_medium() json_reader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) if not self.disp_action.quiet_mode: print("\n\n", file=self.outf) json_reader.print_txt_medium()
[docs] def print_json_small(self) -> None: # noqa: C901 """Print in shortened JSON format.""" self.logger.info("Printing devices as small JSON...") ydevsdict: dict = { "tango_host": self.tango_host, "timeout_millis": self.timeout_millis, "start_time": self.start_now, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_ccluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_small()) pods_list: list = [] for good_pod in self.good_pods: pod = self.good_pods[good_pod] containers_lst: list = [] if "spec" in pod: containers = pod["spec"]["containers"] else: self.logger.warning("No spec in pod : %s", pod) containers = {} for container in containers: container_dict = { "name": container["name"], "command": container["command"], "args": container["args"], "resources": container["resources"], } containers_lst.append(container_dict) try: pod_dict: dict = { "api_version": pod["api_version"], "kind": pod["kind"], "metadata": { "name": pod["metadata"]["name"], "namespace": pod["metadata"]["namespace"], }, "spec": { "containers": containers_lst, "hostname": pod["spec"]["hostname"], }, "status": { "host_ip": pod["status"]["host_ip"], "pod_ip": pod["status"]["pod_ip"], "phase": pod["status"]["phase"], "start_time": pod["status"]["start_time"], }, } except KeyError: pod_dict = { "api_version": pod["api_version"], "kind": pod["kind"], "metadata": { "name": pod["metadata"]["name"], "namespace": pod["metadata"]["namespace"], }, } pods_list.append(pod_dict) ydevsdict.update({"pods": pods_list}) if not self.disp_action.indent: self.disp_action.indent = 4 print( json.dumps(ydevsdict, indent=self.disp_action.indent, cls=NumpyEncoder, default=str), file=self.outf, )
[docs] def print_json_medium(self) -> None: # noqa: C901 """Print in JSON medium format.""" self.logger.info("Printing devices as medium JSON...") ydevsdict: dict = { "tango_host": self.tango_host, "timeout_millis": self.timeout_millis, "start_time": self.start_now, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_cluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_medium()) pods_list: list = [] for good_pod in self.good_pods: pod = self.good_pods[good_pod] containers_lst: list = [] if "spec" in pod: containers = pod["spec"]["containers"] else: self.logger.warning("No spec in pod : %s", pod) containers = {} for container in containers: volume_mounts: list = [] for volume_mount in container["volume_mounts"]: volume_mounts.append( { "name": volume_mount["name"], "mount_path": volume_mount["mount_path"], "sub_path": volume_mount["sub_path"], "read_only": volume_mount["read_only"], } ) container_dict = { "name": container["name"], "command": container["command"], "args": container["args"], "env": container["env"], "ports": container["ports"], "resources": container["resources"], "volume_mounts": volume_mounts, } containers_lst.append(container_dict) pod_dict: dict try: pod_dict = { "api_version": pod["api_version"], "kind": pod["kind"], "metadata": { "creation_timestamp": pod["metadata"]["creation_timestamp"], "labels": pod["metadata"]["labels"], "name": pod["metadata"]["name"], "namespace": pod["metadata"]["namespace"], }, "spec": { "containers": containers_lst, "hostname": pod["spec"]["hostname"], "restart_policy": pod["spec"]["restart_policy"], }, "status": { "host_ip": pod["status"]["host_ip"], "pod_ip": pod["status"]["pod_ip"], "phase": pod["status"]["phase"], "start_time": pod["status"]["start_time"], }, } except KeyError: pod_dict = { "api_version": pod["api_version"], "kind": pod["kind"], "metadata": { "name": pod["metadata"]["name"], "namespace": pod["metadata"]["namespace"], }, } pods_list.append(pod_dict) hosts_list: list = [] ydevsdict.update({"pods": pods_list}) hosts: list = self.read_device_hosts() for host in hosts: host_dict: dict = {"name": host["name"], "devices": [], "addresses": host["addresses"]} for device in host["devices"]: host_dict["devices"].append( { "name": device["name"], "class_name": device["class_name"], "ds_full_name": device["ds_full_name"], "pid": device["pid"], } ) hosts_list.append(host_dict) ydevsdict.update({"hosts": hosts_list}) if not self.disp_action.indent: self.disp_action.indent = 4 print( json.dumps(ydevsdict, indent=self.disp_action.indent, cls=NumpyEncoder, default=str), file=self.outf, )
[docs] def print_json_large(self) -> None: """Print in JSON format.""" self.logger.info("Printing devices as large JSON...") ydevsdict: dict = { "tango_host": self.tango_host, "timeout_millis": self.timeout_millis, "start_time": self.start_now, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_cluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_large()) pods_list: list = [] for good_pod in self.good_pods: pods_list.append(self.good_pods[good_pod]) ydevsdict.update({"pods": pods_list}) hosts: list = self.read_device_hosts() ydevsdict.update({"hosts": hosts}) if not self.disp_action.indent: self.disp_action.indent = 4 print( json.dumps(ydevsdict, indent=self.disp_action.indent, cls=NumpyEncoder, default=str), file=self.outf, )
[docs] def print_json(self) -> None: """Print in JSON format.""" if self.disp_action.size == "L": self.print_json_large() elif self.disp_action.size == "S": self.print_json_small() else: self.print_json_medium()
[docs] def print_json_table(self) -> None: """Print in JSON format.""" # TODO this is not much use and needs more work self.logger.debug("Printing devices as JSON table...") ydevsdict: dict = { "tango_host": self.tango_host, "timeout_millis": self.timeout_millis, "start_time": self.start_now, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_cluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_large()) ydevsdict.update({"pods": self.good_pods}) # df = pd.json_normalize(ydevsdict["devices"]) # df.set_index(["name"], inplace=True) df = pd.DataFrame.from_dict(ydevsdict["devices"]) print(df.head(10), file=self.outf) self.logger.info("Printed devices as JSON table...")
[docs] def print_markdown(self) -> None: """Print in JSON format.""" self.logger.debug("Printing devices as markdown...") devsdict: dict = self.make_devices_json_large() json_reader: TangoJsonReader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) json_reader.print_markdown_large() self.logger.info("Printed %d devices as markdown...", len(devsdict))
[docs] def print_html(self) -> None: """Print in HTML format.""" self.logger.debug("Printing devices as HTML...") devsdict: dict = self.make_devices_json_large() json_reader: TangoJsonReader = TangoJsonReader( self.logger, self.disp_action.indent, not self.prog_bar, self.tgo_space, devsdict, self.outf, ) if self.disp_action.size == "L": json_reader.print_html_large(True) elif self.disp_action.size == "M": json_reader.print_html_large(True) else: json_reader.print_html_small(True) self.logger.info("Printed %d devices as HTML...", len(devsdict))
[docs] def print_yaml_small(self) -> None: """Print in YAML small format.""" self.logger.debug("Printing devices as small YAML...") ydevsdict: dict = { "tango_host": self.tango_host, "start_time": self.start_now, "timeout_millis": self.timeout_millis, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_cluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_small()) ydevsdict.update({"pods": self.good_pods}) # Serialize a Python object into a YAML stream print(yaml.dump(ydevsdict, indent=self.disp_action.indent), file=self.outf) self.logger.info("Printed devices as small YAML...")
[docs] def print_yaml_medium(self) -> None: """Print in YAML medium format.""" self.logger.debug("Printing devices as medium YAML...") ydevsdict: dict = { "tango_host": self.tango_host, "start_time": self.start_now, "timeout_millis": self.timeout_millis, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_cluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_medium()) ydevsdict.update({"pods": self.good_pods}) # Serialize a Python object into a YAML stream print(yaml.dump(ydevsdict, indent=self.disp_action.indent), file=self.outf) self.logger.info("Printed devices as medium YAML...")
[docs] def print_yaml_large(self) -> None: """Print in YAML large format.""" self.logger.debug("Printing devices as large YAML...") ydevsdict: dict = { "tango_host": self.tango_host, "start_time": self.start_now, "timeout_millis": self.timeout_millis, "end_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "elapsed_time": float(f"{(time.perf_counter() - self.start_perf):.3e}"), } if self.k8s_ctx is not None: ydevsdict.update({"active_context": self.k8s_ctx}) if self.k8s_cluster is not None: ydevsdict.update({"active_cluster": self.k8s_cluster}) if self.k8s_ns is not None: ydevsdict.update({"namespace": self.k8s_ns}) if self.domain_name is not None: ydevsdict.update({"domain_name": self.domain_name}) ydevsdict.update(self.make_devices_json_large()) ydevsdict.update({"pods": self.good_pods}) # Serialize a Python object into a YAML stream print(yaml.dump(ydevsdict, indent=self.disp_action.indent), file=self.outf) self.logger.info("Printed devices as large YAML...")
[docs] def print_yaml(self) -> None: """Print in YAML format.""" if self.disp_action.size == "S": self.print_yaml_small() elif self.disp_action.size == "M": self.print_yaml_medium() else: self.print_yaml_large()
[docs] def print_txt_list_attributes(self, show_val: bool = True) -> None: """ Print list of devices as plain text. :param show_val: print value """ device: str lwid: int self.logger.debug("Listing %d device attributes...", len(self.devices)) lwid = self.print_txt_heading("") print(f" {'ATTRIBUTE':32}", file=self.outf) for device in self.devices: if self.devices[device] is not None: if self.devices[device].attributes: self.devices[device].read_config() self.devices[device].print_list_attribute(lwid, show_val) self.logger.info("Listed %d device attributes...", len(self.devices))
[docs] def print_txt_heading(self, eol: str = "\n") -> int: """ Print heading for list of devices. :param eol: printed at the end :return: width of characters printed """ line_width: int self.logger.debug("Listing attributes %s", self.list_items["attributes"].keys()) print(f"\n{'DEVICE NAME':64} ", end="", file=self.outf) line_width = 65 for attribute in self.list_items["attributes"]: field_name = attribute.upper() field_width = self.list_items["attributes"][attribute] line_width += int(re.sub(r"\D", "", field_width)) + 1 print(f"{field_name:{field_width}} ", end="", file=self.outf) self.logger.debug("Listing commands %s", self.list_items["commands"].keys()) for command in self.list_items["commands"]: field_name = command.upper() field_width = self.list_items["commands"][command] line_width += int(re.sub(r"\D", "", field_width)) + 1 print(f"{field_name:{field_width}} ", end="", file=self.outf) self.logger.debug("Listing properties %s", self.list_items["properties"].keys()) for tproperty in self.list_items["properties"]: field_name = tproperty.upper() field_width = self.list_items["properties"][tproperty] line_width += int(re.sub(r"\D", "", field_width)) + 1 print(f"{field_name:{field_width}} ", end="", file=self.outf) print(f"{'CLASS':32}", end=eol, file=self.outf) line_width += 32 self.logger.info("Listed headings") return line_width
[docs] def print_txt_list_commands(self, show_val: bool = True) -> None: """ Print list of device commands. :param show_val: print value """ device: str lwid: int self.logger.debug("Listing %d device commands...", len(self.devices)) lwid = self.print_txt_heading("") print(f" {'COMMAND':32}", file=self.outf) for device in self.devices: if self.devices[device] is not None: if self.devices[device].commands: self.devices[device].read_config() self.devices[device].print_list_command(lwid, show_val) self.logger.info("Listed %d device commands...", len(self.devices))
[docs] def print_txt_list_properties(self, show_val: bool = True) -> None: """ Print list of device properties. :param show_val: print value """ device: str self.logger.debug("Listing %d device properties...", len(self.devices)) lwid = self.print_txt_heading("") print(f" {'PROPERTY':32}", file=self.outf) for device in self.devices: if self.devices[device] is not None: if self.devices[device].properties: self.devices[device].read_config() self.devices[device].print_list_property(lwid, show_val) self.logger.info("Listed %d device properties...", len(self.devices))
[docs] def read_attribute_names(self) -> dict: """ Read device data. :return: dictionary of devices """ the_attribs: dict = {} self.logger.debug("Reading attribute names of %d devices", len(self.devices)) for device in progress_bar( self.devices, not self.disp_action.quiet_mode, prefix=f"Read {len(self.devices)} attributes :", suffix="complete", decimals=0, length=100, ): dev_attribs = self.devices[device].attribs for attr in dev_attribs: if attr not in the_attribs: the_attribs[attr] = [] the_attribs[attr].append(device) self.logger.info("Read attribute names of %d devices: %s", len(the_attribs), the_attribs) return the_attribs
[docs] def read_command_names(self) -> dict: """ Read device data. :return: dictionary of devices """ the_commands: dict = {} self.logger.debug("Reading command names of %d devices", len(self.devices)) for device in progress_bar( self.devices, not self.disp_action.quiet_mode, prefix=f"Read {len(self.devices)} attributes :", suffix="complete", decimals=0, length=100, ): try: dev_commands = self.devices[device].cmds for cmd in dev_commands: if cmd not in the_commands: the_commands[cmd] = [] the_commands[cmd].append(device) except AttributeError: self.logger.warning("Could not read device %s", device) self.logger.info("Read command names of %d devices: %s", len(the_commands), the_commands) return the_commands
[docs] def read_property_names(self) -> dict: """ Read device data. :return: dictionary of devices """ the_properties: dict = {} self.logger.info("Reading property names of %d devices", len(self.devices)) for device in progress_bar( self.devices, not self.disp_action.quiet_mode, prefix=f"Read {len(self.devices)} attributes :", suffix="complete", decimals=0, length=100, ): dev_properties = self.devices[device].props for prop in dev_properties: if prop not in the_properties: the_properties[prop] = [] the_properties[prop].append(prop) self.logger.debug( "Read property names of %d devices: %s", len(the_properties), the_properties ) return the_properties
[docs] def __del__(self) -> None: """Desctructor.""" try: self.logger.debug("Shut down TangoctlDevices...") except AttributeError: pass