Source code for ska_tangoctl.tango_kontrol.tango_kontrol

"""Read all information about Tango devices in a Kubernetes cluster."""

import json
import logging
import os
import sys
from typing import Any

import tango
import yaml

try:
    from ska_tangoctl.k8s_info.get_k8s_info import KubernetesInfo
except ModuleNotFoundError:
    KubernetesInfo = None  # type: ignore[assignment,misc]
from ska_tangoctl.tango_control.disp_action import DispAction
from ska_tangoctl.tango_control.read_tango_devices import NumpyEncoder, TangoctlDevices
from ska_tangoctl.tango_control.tango_control import TangoControl
from ska_tangoctl.tango_kontrol.tango_kontrol_help import TangoKontrolHelpMixin
from ska_tangoctl.tango_kontrol.tango_kontrol_setup import TangoKontrolSetupMixin
from ska_tangoctl.tango_kontrol.tangoktl_config import TANGOKTL_CONFIG, read_tangoktl_config


[docs] class TangoKontrol( # type:ignore[misc] TangoControl, TangoKontrolHelpMixin, TangoKontrolSetupMixin ): """Read Tango devices running in a Kubernetes cluster."""
[docs] def __init__( self, logger: logging.Logger, k8s_ctx: str | None, k8s_cluster: str | None, domain_name: str | None, ): """ Initialize this thing. :param logger: logging handle :param k8s_ctx: Kubernetes context :param k8s_cluster: Kubernetes :param domain_name: Kubernetes domain name """ super().__init__(logger) self.cfg_data: Any = TANGOKTL_CONFIG self.pod_cmd: str = "" self.show_svc: bool = False self.use_fqdn: bool = True self.k8s_ns: str | None = None self.k8s_ctx: str | None = k8s_ctx self.k8s_cluster: str | None = k8s_cluster self.k8s_pod: str | None = None self.domain_name: str | None = domain_name self.logger.info("Initialize with context %s", self.k8s_ctx)
[docs] def __repr__(self) -> str: """ Do the string thing. :returns: string representation """ rval = f"\tDisplay format {repr(self.disp_action)}" rval += "\n\tShow" rval += f"{' attributes' if self.disp_action.show_attrib else ''}" rval += f"{' commands' if self.disp_action.show_cmd else ''}" rval += f"{' properties' if self.disp_action.show_prop else ''}" rval += f"{' pods' if self.disp_action.show_pod else ''}" rval += f"{' processes' if self.disp_action.show_proc else ''}" if self.tgo_name: rval += f"\n\tDevices: {self.tgo_name}" if self.tgo_attrib: rval += f"\n\tAttributes: {self.tgo_attrib}" if self.tgo_cmd: rval += f"\n\tCommands: {self.tgo_attrib}" if self.tgo_prop: rval += f"\n\tProperties: {self.tgo_prop}" rval += f"\n\tContext: {self.k8s_ctx}" rval += f"\n\tNamespace: {self.k8s_ns}" rval += f"\n\tDomain: {self.domain_name}" rval += f"\n\tDetail: {self.disp_action.size}" if self.logger.getEffectiveLevel() == logging.DEBUG: rval += f"\n\tConfiguration:\n{json.dumps(self.cfg_data, indent=4)}" return rval
[docs] def reset(self) -> None: """Reset it to defaults.""" self.logger.debug("Reset") super().reset() self.cfg_data = TANGOKTL_CONFIG self.pod_cmd = "" self.disp_action.show_ctx = False self.disp_action.show_ns = False self.disp_action.show_svc = False self.use_fqdn = True self.k8s_ns = None
[docs] def read_config(self) -> None: """Read configuration.""" self.cfg_data = read_tangoktl_config(self.logger, self.cfg_name)
[docs] def get_pods_dict(self, ns_name: str | None) -> dict: """ Read pods in Kubernetes namespace. :param ns_name: namespace name :return: dictionary with devices """ self.logger.debug("Get Kubernetes pods") pods_dict: dict = {} if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return pods_dict k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) pods_dict = k8s.get_pods(ns_name, None) self.logger.info("Got %d pods", len(pods_dict)) self.logger.debug("Pods", json.dumps(pods_dict, indent=4, default=str)) return pods_dict
[docs] def list_pod_names(self, ns_name: str | None) -> int: # noqa: C901 """ Display pods in Kubernetes namespace. :param ns_name: namespace name :returns: error condition """ self.logger.debug("List Kubernetes pod names") if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return 1 if ns_name is None: self.logger.error("K8S namespace not specified") return 1 pods_dict: dict = self.get_pods_dict(ns_name) if not pods_dict: self.logger.error("Could not read pods") return 1 print(f"Pods in namespace {ns_name} : {len(pods_dict)}") pod_name: str for pod_name in pods_dict: print(f"\t{pod_name}") self.logger.info("Listed %d Kubernetes pod names", len(pods_dict)) return 0
[docs] def print_pod_procs(self) -> int: """ Print processes running in pod. :returns: error condition """ self.logger.debug("Print Kubernetes pod processes") k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) if self.k8s_ns is None: self.logger.error("Namespace not set") return 1 if self.k8s_pod is None: self.logger.error("Pod name not set") return 1 pod = self.pod_run_cmd(k8s, self.k8s_ns, self.k8s_pod, "ps -ef") for line in pod["output"]: print(f"{line}") self.logger.info("Printed %d Kubernetes pod processes", len(pod["output"])) return 0
[docs] def pod_run_cmd(self, k8s: KubernetesInfo, ns_name: str, pod_name: str, pod_cmd: str) -> dict: """ Run a command in specified pod. :param k8s: K8S info handle :param ns_name: namespace :param pod_name: pod name :param pod_cmd: command to run :returns: dictionary with output information """ pod: dict = {} pod["name"] = pod_name pod["command"] = pod_cmd self.logger.info("Run command in pod %s: %s", pod_name, pod_cmd) pod_exec: list = pod_cmd.split(" ") resps: str = k8s.exec_pod_command(ns_name, pod_name, pod_exec) pod["output"] = [] if not resps: pod["output"].append("N/A") elif "\n" in resps: resp: str for resp in resps.split("\n"): if not resp: pass elif resp[-6:] == "ps -ef": pass elif resp[0:3] == "UID": pass elif resp[0:3] == "PID": pass else: pod["output"].append(resp) else: pod["output"].append(resps) return pod
[docs] def print_pod( # noqa: C901 self, ns_name: str | None, pod_name: str | None, pod_cmd: str ) -> int: """ Display pods in Kubernetes namespace. :param ns_name: namespace name :param pod_name: pod name :param pod_cmd: command to run :returns: error condition """ self.logger.info("Print output of command '%s' in pod %s", pod_cmd, pod_name) k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) pod_exec: list = pod_cmd.split(" ") print(f"Pod in namespace {ns_name} : '{pod_cmd}'") print(f"\t{pod_name}") if ns_name is not None and pod_name is not None: resps: str = k8s.exec_pod_command(ns_name, pod_name, pod_exec) if not resps: pass elif "\n" in resps: resp: str for resp in resps.split("\n"): self.logger.debug("Got '%s'", resp) if not resp: pass elif resp[-6:] == "ps -ef": pass elif resp[0:3] == "UID": pass elif resp[0:3] == "PID": pass # TODO to show nginx or not to show nginx # elif "nginx" in resp: # pass elif resp[0:5] in ("tango", "root ", "mysql") or resp[0:3] == "100": respl = resp.split() print(f"\t\t* {respl[0]:8} {' '.join(respl[7:])}") else: print(f"\t\t {resp}") else: print(f"\t\t- {resps}") return 0
[docs] def print_pods(self, ns_name: str | None, pod_cmd: str) -> int: # noqa: C901 """ Display pods in Kubernetes namespace. :param ns_name: namespace name :param pod_cmd: command to run :returns: error condition """ self.logger.debug("Print Kubernetes pods: %s", pod_cmd) pod_exec: list = pod_cmd.split(" ") if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return 1 if ns_name is None: self.logger.error("K8S namespace not specified") return 1 k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) pods_dict: dict = self.get_pods_dict(ns_name) print(f"{len(pods_dict)} pods in namespace {ns_name} : '{pod_cmd}'") pod_name: str for pod_name in pods_dict: print(f"\t{pod_name}") if not self.disp_action.quiet_mode: resps: str = k8s.exec_pod_command(ns_name, pod_name, pod_exec) if not resps: pass elif "\n" in resps: resp: str for resp in resps.split("\n"): self.logger.debug("Got '%s'", resp) if not resp: pass elif resp[-6:] == "ps -ef": pass elif resp[0:3] == "UID": pass elif resp[0:3] == "PID": pass # TODO to show nginx or not to show nginx # elif "nginx" in resp: # pass elif resp[0:5] in ("tango", "root ", "mysql") or resp[0:3] == "100": respl = resp.split() print(f"\t\t* {respl[0]:8} {' '.join(respl[7:])}") else: print(f"\t\t {resp}") else: print(f"\t\t- {resps}") self.logger.debug("Printed %d Kubernetes pods: %s", len(pods_dict), pod_cmd) return 0
[docs] def get_pods_json(self, ns_name: str | None, pod_cmd: str) -> list: # noqa: C901 """ Read pods in Kubernetes namespace. :param ns_name: namespace name :param pod_cmd: command to run on pod :returns: dictionary with pod information """ self.logger.debug("Get Kubernetes pods as JSON: %s", pod_cmd) pods: list = [] if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return pods if ns_name is None: self.logger.error("K8S namespace not specified") return pods k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) self.logger.debug("Read pods running in namespace %s", ns_name) pods_dict: dict = k8s.get_pods(ns_name, None) self.logger.info("Found %d pods running in namespace %s", len(pods_dict), ns_name) pod_name: str for pod_name in pods_dict: pod: dict = self.pod_run_cmd(k8s, ns_name, pod_name, pod_cmd) pods.append(pod) self.logger.info("Got %d Kubernetes pods as JSON: %s", len(pods), pod_cmd) return pods
[docs] def show_pod(self, pod_cmd: str) -> int: """ Display pods in Kubernetes namespace. :param pod_cmd: command to run :returns: error condition """ self.logger.info("Show pod %s : %s", self.k8s_pod, pod_cmd) self.set_output() if self.disp_action.check(DispAction.TANGOCTL_TXT): self.print_pod(self.k8s_ns, self.k8s_pod, pod_cmd) else: self.logger.warning("Output format %s not supported", self.disp_action) self.unset_output() return 0
[docs] def show_pods(self, pod_cmd: str) -> int: """ Display pods in Kubernetes namespace. :param pod_cmd: command to run :returns: error condition """ self.logger.debug("Show Kubernetes pods as JSON") pods: list self.set_output() if self.disp_action.check(DispAction.TANGOCTL_JSON): if not self.disp_action.indent: self.disp_action.indent = 4 pods = self.get_pods_json(self.k8s_ns, pod_cmd) print(json.dumps(pods, indent=self.disp_action.indent), file=self.outf) self.logger.info("Showed %d Kubernetes pods as JSON", len(pods)) elif self.disp_action.check(DispAction.TANGOCTL_YAML): if not self.disp_action.indent: self.disp_action.indent = 2 pods = self.get_pods_json(self.k8s_ns, pod_cmd) print(yaml.dump(pods, indent=self.disp_action.indent), file=self.outf) self.logger.info("Showed %d Kubernetes pods as YAML", len(pods)) elif self.disp_action.check(DispAction.TANGOCTL_TXT): self.print_pods(self.k8s_ns, pod_cmd) self.logger.info("Showed Kubernetes pods") else: self.logger.warning("Output format %s not supported", self.disp_action) self.unset_output() return 1 self.unset_output() return 0
[docs] def print_k8s_info(self) -> None: """Print kubernetes context and namespace.""" if self.k8s_ctx: print(f"Active context : {self.k8s_ctx}", file=self.outf) if self.k8s_ctx: print(f"Active cluster : {self.k8s_cluster}", file=self.outf) if self.k8s_ns: print(f"K8S namespace : {self.k8s_ns}", file=self.outf) if self.domain_name: print(f"Domain : {self.domain_name}", file=self.outf)
[docs] def run_info(self) -> int: # noqa: C901 """ Read information on Tango devices. :return: error condition """ rc: int devices: TangoctlDevices self.logger.info( "Run info display %s : device %s attribute %s command %s property %s for K8S...", self.disp_action.show(), self.tgo_name, self.tgo_attrib, self.tgo_cmd, self.tgo_prop, ) self.set_output() # Get device classes if self.disp_action.check(DispAction.TANGOCTL_CLASS): rc = self.list_classes() self.unset_output() return rc # Check if there is something to do if ( self.tgo_name is None and self.tgo_attrib is None and self.tgo_cmd is None and self.tgo_prop is None and self.disp_action.check(0) and not ( self.disp_action.show_attrib or self.disp_action.show_cmd or self.disp_action.show_prop or self.dev_status ) and self.disp_action.check( [DispAction.TANGOCTL_JSON, DispAction.TANGOCTL_TXT, DispAction.TANGOCTL_YAML] ) ): self.logger.error( "No filters specified, use '-l' flag to list all devices" " or '-e' for a full display of every device in the namespace", ) self.unset_output() return 1 # Get a dictionary of devices try: devices = TangoctlDevices( self.logger, self.tango_host, self.outf, self.timeout_millis, self.dev_status, self.cfg_data, self.tgo_name, self.uniq_cls, self.disp_action, self.k8s_ctx, self.k8s_cluster, self.k8s_ns, self.domain_name, self.tgo_attrib, self.tgo_cmd, self.tgo_prop, self.tgo_class, dev_count=self.dev_count, ) except tango.ConnectionFailed: self.logger.error("Tango connection for K8S info failed") self.unset_output() return 1 self.logger.debug("Read devices running for K8S (action %s)", str(self.disp_action)) # Display in specified format if self.disp_action.show_class: self.logger.debug("Reading device classes") devices.read_devices() if self.disp_action.check(DispAction.TANGOCTL_JSON): if not self.disp_action.indent: self.disp_action.indent = 4 klasses = devices.get_classes() klasses["namespace"] = self.k8s_ns klasses["active_context"] = self.k8s_ctx klasses["active_cluster"] = self.k8s_cluster print( json.dumps(klasses, indent=self.disp_action.indent, cls=NumpyEncoder), file=self.outf, ) elif self.disp_action.check(DispAction.TANGOCTL_YAML): if not self.disp_action.indent: self.disp_action.indent = 2 klasses = devices.get_classes() klasses["namespace"] = self.k8s_ns klasses["active_context"] = self.k8s_ctx klasses["active_cluster"] = self.k8s_cluster print( yaml.safe_dump(klasses, default_flow_style=False, sort_keys=False), file=self.outf, ) else: devices.print_classes() elif self.disp_action.check(DispAction.TANGOCTL_LIST): self.logger.debug("Listing devices") # TODO this is messy self.print_k8s_info() devices.read_devices() devices.read_device_values() if ( self.disp_action.show_attrib or self.disp_action.show_cmd or self.disp_action.show_prop ): if self.disp_action.show_attrib: devices.print_txt_list_attributes(True) if self.disp_action.show_cmd: devices.print_txt_list_commands(True) if self.disp_action.show_prop: devices.print_txt_list_properties(True) else: devices.print_txt_list() elif self.disp_action.check(DispAction.TANGOCTL_TXT): self.logger.debug("Listing devices as txt") self.print_k8s_info() devices.read_devices() devices.read_device_values() devices.print_txt() elif self.disp_action.check(DispAction.TANGOCTL_HTML): self.logger.debug("Listing devices as HTML") devices.read_devices() devices.read_device_values() devices.print_html() elif self.disp_action.check(DispAction.TANGOCTL_JSON): self.logger.debug("Listing devices as JSON") devices.read_devices() devices.read_device_values() devices.print_json() elif self.disp_action.check(DispAction.TANGOCTL_MD): self.logger.debug("Listing devices as markdown") devices.read_devices() devices.read_device_values() devices.print_markdown() elif self.disp_action.check(DispAction.TANGOCTL_YAML): self.logger.debug("Listing devices as YAML") devices.read_devices() devices.read_device_values() devices.print_yaml() elif self.disp_action.check(DispAction.TANGOCTL_NAMES): self.logger.debug("Listing device names") self.print_k8s_info() devices.print_names_list() elif self.disp_action.check(DispAction.TANGOCTL_TABL): self.logger.debug("Listing devices in table") devices.read_devices() devices.read_device_values() devices.print_json_table() else: self.logger.error("Display action %s not supported", self.disp_action) self.unset_output() return 0
[docs] def get_contexts_dict(self) -> dict: """ Read contexts/clusters in Kubernetes. :return: dictionary with host and context names """ ctx_dict: dict = {} self.logger.debug("Read Kubernetes contexts") if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return ctx_dict k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) ctx_dict = k8s.get_contexts_dict() self.logger.info( "Read Kubernetes contexts: %s", json.dumps(ctx_dict, indent=4, default=str) ) return ctx_dict
[docs] def get_contexts_list(self) -> tuple: """ Read contexts/clusters in Kubernetes. :return: tuple with host and context names """ active_host: str active_ctx: str active_cluster: str k8s_list: list ns_list: list = [] self.logger.debug("List Kubernetes contexts") if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return None, ns_list k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) active_host, active_ctx, active_cluster, k8s_list = k8s.get_contexts_list() self.logger.debug("Listed Kubernetes contexts : %s", k8s_list) return active_host, active_ctx, active_cluster, k8s_list
[docs] def get_namespaces_list(self) -> tuple: """ Read namespaces in Kubernetes cluster. :return: tuple with context name, cluster name and list with devices """ self.logger.debug("List Kubernetes namespaces") ns_list: list = [] k8s_list: list _ctx_name: str | None if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return None, None, ns_list k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) _ctx_name, _cluster_name, k8s_list = k8s.get_namespaces_list(self.k8s_ns) if self.k8s_ns is None: return k8s.context, k8s.cluster, k8s_list for k8s_name in k8s_list: if k8s_name == self.k8s_ns: ns_list.append(k8s_name) self.logger.info("Listed %d namespaces: %s", len(ns_list), ",".join(ns_list)) return k8s.context, k8s.cluster, ns_list
[docs] def get_namespaces_dict(self) -> dict: """ Read namespaces in Kubernetes cluster. :return: dictionary with devices """ self.logger.debug("Get Kubernetes namespaces") ns_dict: dict = {} if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return ns_dict k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) ns_dict = k8s.get_namespaces_dict() self.logger.info("Got %d namespaces", len(ns_dict)) self.logger.debug("Namespaces", json.dumps(ns_dict, indent=4, default=str)) return ns_dict
[docs] def show_contexts(self) -> int: """ Display contexts in Kubernetes. :returns: error condition """ active_host: str active_ctx: str ctx_list: list self.logger.debug("Display contexts in Kubernetes") self.set_output() if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return 1 if self.disp_action.check(DispAction.TANGOCTL_JSON): if not self.disp_action.indent: self.disp_action.indent = 4 ctx_dict = self.get_contexts_dict() print(json.dumps(ctx_dict, indent=self.disp_action.indent), file=self.outf) elif self.disp_action.check(DispAction.TANGOCTL_YAML): if not self.disp_action.indent: self.disp_action.indent = 2 ctx_dict = self.get_contexts_dict() print(yaml.dump(ctx_dict, indent=self.disp_action.indent), file=self.outf) else: active_host, active_ctx, active_cluster, ctx_list = self.get_contexts_list() print(f"Active host : {active_host}", file=self.outf) print("Contexts :", file=self.outf) for ctx in ctx_list: print(f"\t{ctx}", file=self.outf) print(f"Active context : {active_ctx}", file=self.outf) print(f"Active cluster : {active_cluster}", file=self.outf) print(f"Domain name : {self.domain_name}", file=self.outf) self.unset_output() return 0
[docs] def show_namespaces(self) -> int: """ Display namespaces in Kubernetes cluster. :returns: error condition """ self.logger.debug("Show Kubernetes namespaces") ns_dict: dict ctx_name: str | None ns_list: list ns_name: str self.set_output() if KubernetesInfo is None: self.logger.warning("Kubernetes package is not installed") return 1 if self.disp_action.check(DispAction.TANGOCTL_JSON): if not self.disp_action.indent: self.disp_action.indent = 4 ns_dict = self.get_namespaces_dict() print(json.dumps(ns_dict, indent=self.disp_action.indent), file=self.outf) self.logger.info("Showed %d Kubernetes namespaces as JSON", len(ns_dict)) elif self.disp_action.check(DispAction.TANGOCTL_YAML): if not self.disp_action.indent: self.disp_action.indent = 2 ns_dict = self.get_namespaces_dict() print(yaml.dump(ns_dict, indent=self.disp_action.indent), file=self.outf) self.logger.info("Showed %d Kubernetes namespaces as YAML", len(ns_dict)) else: ctx_name, cluster_name, ns_list = self.get_namespaces_list() print(f"Context : {ctx_name}", file=self.outf) print(f"Cluster : {cluster_name}", file=self.outf) print(f"Namespaces : {len(ns_list)}", file=self.outf) for ns_name in sorted(ns_list, reverse=self.disp_action.reverse): print(f"\t{ns_name}", file=self.outf) self.logger.info("Showed %d Kubernetes namespaces", len(ns_list)) self.unset_output() return 0
[docs] def show_services(self) -> int: """ Display services in Kubernetes namespace. :returns: error condition """ self.logger.debug("Show Kubernetes services (%s)", self.disp_action) self.set_output() k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) if self.disp_action.check(DispAction.TANGOCTL_JSON): if not self.disp_action.indent: self.disp_action.indent = 4 service_dict = k8s.get_services_dict(self.k8s_ns) print( f"***\n{json.dumps(service_dict, indent=self.disp_action.indent)}", file=self.outf ) self.logger.debug( "Showed %d Kubernetes services as JSON", len(service_dict), self.disp_action ) elif self.disp_action.check(DispAction.TANGOCTL_YAML): if not self.disp_action.indent: self.disp_action.indent = 2 service_dict = k8s.get_services_dict(self.k8s_ns) print(yaml.dump(service_dict, indent=self.disp_action.indent), file=self.outf) self.logger.debug( "Showed %d Kubernetes services as JSON", len(service_dict), self.disp_action ) elif self.disp_action.check(DispAction.TANGOCTL_TXT): service_list = k8s.get_services_data(self.k8s_ns) self.logger.debug("Kubernetes services:\n%s", service_list) if not service_list.items: self.logger.error("No services found in namespace %s", self.k8s_ns) return 1 for service in service_list.items: print(f"Service Name: {service.metadata.name}", file=self.outf) print(f" Type: {service.spec.type}", file=self.outf) print(f" IP: {service.spec.cluster_ip}", file=self.outf) if service.spec.ports: for port in service.spec.ports: print( f" Port: {port.port}, Target Port: {port.target_port}," f" Protocol: {port.protocol}", file=self.outf, ) print("-" * 20, file=self.outf) self.logger.debug( "Showed %d Kubernetes services", len(service_list.items), self.disp_action ) else: self.logger.warning("Could not show Kubernetes services as %s", self.disp_action) self.unset_output() return 0
[docs] def show_pod_log(self) -> int: """ Read pod logs. :returns: error condition """ self.logger.debug("Read pod logs") if self.k8s_pod is None: self.logger.error("Pod name not set") return 1 self.set_output() k8s: KubernetesInfo = KubernetesInfo(self.logger, self.k8s_ctx) pod_log = k8s.get_pod_log(self.k8s_ns, self.k8s_pod) print(f"{pod_log}", file=self.outf) self.unset_output() self.logger.info("Read logs for pod %s", self.k8s_pod) return 0
[docs] def read_tango_host(self, ntango: int, ntangos: int) -> int: # noqa: C901 """ Read info from Tango host. :param ntango: index number, :param ntangos: index count :return: error condition """ self.logger.debug("Read Tango host") rc: int = 0 # Fork just in case, so that ctrl-C will work (most of the time) pid: int = os.fork() if pid == 0: # Do the actual reading self.logger.debug("Processing namespace %s", self.k8s_ns) rc = self.run_info() self.logger.info("Processed namespace %s", self.k8s_ns) sys.exit(rc) else: # Wait for the reading process self.logger.info("Processing %s (PID %d)", self.k8s_ns, pid) try: os.waitpid(pid, 0) except OSError: pass self.logger.info("Processed %s (PID %d)", self.k8s_ns, pid) return rc