Source code for tools.v1.archiving2yaml

"""
Simple helper script to generate a configuration file from an existing
archive setup.
"""
import logging
import os
import re
from argparse import ArgumentParser
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from functools import lru_cache
from typing import Optional

import tango
import yaml
from ska_ser_logging import configure_logging

configure_logging()
logger = logging.getLogger("archiving2yaml")

INFO_NOT_AVAILABLE = "Info not available"


class IndentDumper(yaml.SafeDumper):
    """Class to help with indent, incase of nested dictionary and sequence."""

    def increase_indent(self, flow=False, indentless=False):
        return super().increase_indent(flow, False)


@lru_cache(None)
def get_device_proxy(device):
    """Method provides DeviceProxy for the given device"""
    return tango.DeviceProxy(device)


@lru_cache(None)
def get_attribute_proxy(attribute_trl):
    """Method provides AttributeProxy for the given device"""
    return tango.AttributeProxy(attribute_trl)


@dataclass
class TangoHost:
    """Dataclass to maintain tango host details."""

    host: str
    port: int = 10000


@dataclass
class ConfigurationManager:
    """Dataclass to maintain configuration manager details."""

    trl: str = "Please update cm"
    th: str = "Please update cm"


@dataclass
class EventSubscriber:
    """Dataclass to maintain event subscriber details."""

    trl: str
    th: Optional[str] = None


@dataclass
class AttributeConfiguration:
    """Dataclass to maintain attribute configuration details."""

    code_push_event: bool | None = None
    polling_period: int | None = None
    archive_abs_change: float | None = None
    archive_rel_change: float | None = None
    archive_period: int | None = None


@dataclass
class Attribute:
    """Dataclass to maintain attribute details."""

    trl: str
    es: str
    cm: str = "Please update cm."
    th: str | None = None
    configuration: AttributeConfiguration | None = None


@dataclass
class Archiving2yamlConfiguration:
    """Stores configuration details to generate yaml file format."""

    tango_hosts: list[dict[str, TangoHost] | None] = field(
        default_factory=list
    )
    configuration_managers: list[
        dict[str, ConfigurationManager] | None
    ] = field(default_factory=list)
    event_subscribers: list[dict[str, EventSubscriber] | None] = field(
        default_factory=list
    )
    attributes: list[Attribute | None] = field(default_factory=list)


[docs] class Archiving2yaml: """Class provides functionality to fetch details from the archiver and format zthe data for yaml file generation. """ def __init__(self): """Initialises the instance variables utilised for maintaining data.""" self.archiving_configuration = Archiving2yamlConfiguration() self.default_host, self.default_port = os.getenv("TANGO_HOST").split( ":" ) self.archiver_trls: list[str | None] = [] self.cm_es_mapping: dict[str, list] = defaultdict(list)
[docs] def add_archiver_trls(self, host: str, port: str, archiver_name: str): """Adds the archiver full trls into a list. :param host: tango host of event subscriber :type host: str :param port: port of event subscriber :type port: str :param archiver_name: name of event subscriber :type archiver_name: str """ if not self.is_full_trl(archiver_name): self.archiver_trls.append( self.get_full_trl(host, port, archiver_name) ) else: self.archiver_trls.append(archiver_name)
[docs] def get_short_trl(self, dev_name: str) -> str: """Method to provide the short trl from full trl. :param dev_name: device name :type dev_name: str :return: Returns the short trl of device. :rtype: str """ if self.is_full_trl(dev_name): domain, family, member = dev_name.split("/")[-3:] return os.path.join(domain, family, member) return dev_name
[docs] def get_full_trl(self, host: str, port: str, dev_name: str) -> str: """Method to provide the short trl from full trl. :param dev_name: device name :type dev_name: str :param dev_name: tango host :type dev_name: str :param dev_name: tango port :type dev_name: str :return: Returns the short trl of device. :rtype: str """ return f"tango://{host}:{port}/{dev_name}"
[docs] def set_archivers(self, archiver_names: list[str]): """ Sets Event subscriber details based on list provided. archiver_names (list): The name of archivers for which data needs to be checked. For example: ["mid-eda/es/01"] or ["mid-eda/es/01","mid-eda/es/02"]. Also accepts ["mid-eda/es/\\*"], this would get all the instance present in current host. """ event_subscriber_id = 1 for archiver in archiver_names: host, port = self.default_host, self.default_port if self.is_full_trl(archiver): host, port = self.get_host_port_from_trl(archiver) db = tango.Database(host, port) host_id = self.get_host_id(host, port) archiver_name = self.get_short_trl(archiver) device_names: list = db.get_device_exported( archiver_name ).value_string for device_name in device_names: self.add_archiver_trls(host, port, device_name) archiver_detail = { f"es{event_subscriber_id}": EventSubscriber( trl=device_name, th=host_id ) } event_subscriber_id += 1 self.archiving_configuration.event_subscribers.append( archiver_detail )
[docs] def is_full_trl(self, device_name: str) -> bool: """Method checks if the trl is full or short. :param device_name: Device name. :type device_name: str :return: Returns True if trl is in full format else False. :rtype: bool """ return "tango://" in device_name
[docs] def get_host_port_from_trl(self, device_name: str) -> tuple[str, str]: """Provides tango host and port details from the trl :param device_name: device name. :type device_name: str :return: Returns tango host and port of the device. :rtype: tuple[str,str] """ match = re.match(r"tango://([^/]+)", device_name) host, port = match.group(1).split(":") return host, port
[docs] def get_host_port_from_id(self, host_id: str) -> tuple[str, str]: """Provides host and port details from host_id :param host_id: host id :type host_id: str :return: Returns host and port details for the host id. :rtype: tuple[str] """ host, port = "", "" for host_details in self.archiving_configuration.tango_hosts: for hostid, host_detail in host_details.items(): if hostid == host_id: host = host_detail.host port = host_detail.port if host and port: break return host, port
[docs] def get_host_id(self, host: str, port: str) -> str: """Provides the tango host id based on host and port details. :param host: Tango host name. :type host: str :param port: Tango database port name. :type port: str """ hostid: str = "" for host_details in self.archiving_configuration.tango_hosts: for host_id, host_detail in host_details.items(): if host_detail.host == host and host_detail.port == port: hostid = host_id if hostid: break return hostid
[docs] def set_tango_hosts(self, archiver_names: list[str]): """Sets tango host into required format from archiver trl. :param archiver_names: list of tango hosts. For example: ["mid-eda/es/01"], here it uses current tango host. ["tango://tango-databaseds:10000/mid-eda/es/01", "tango://tango-databaseds2:10000/mid-eda/es/\\*"] Here,incase of "tango-databaseds2" the port will be set to default that is 10000. Please use KUBE DNS <host>.<namespace>.svc.<cluster-domain> for hosts outside the current namespace of deployment. :type archiver_names: list[str] """ tango_host_id = 1 default_host_required: bool = False hosts = [] for archiver in archiver_names: tango_host_details: dict[str, TangoHost] | dict[None] = {} if self.is_full_trl(archiver): host, port = self.get_host_port_from_trl(archiver) host_port = f"{host}:{port}" if host_port in hosts: continue hosts.append(host_port) tango_host_details[f"th{tango_host_id}"] = TangoHost( host, port ) tango_host_id += 1 self.archiving_configuration.tango_hosts.append( tango_host_details ) else: default_host_required = True if ( default_host_required and f"{self.default_host}:{self.default_port}" not in hosts ): tango_host_details = {} # reset tango_host_details[f"th{tango_host_id}"] = TangoHost( self.default_host, self.default_port ) tango_host_id += 1 self.archiving_configuration.tango_hosts.append(tango_host_details)
[docs] def set_all_archiver_attribute_configuration(self): """Method sets the attribute configuration information for all the archiver.""" for es_details in self.archiving_configuration.event_subscribers: for es_id, es_detail in es_details.items(): host, port = self.get_host_port_from_id(es_detail.th) trl = self.get_full_trl(host, port, es_detail.trl) self.set_attribute_configuration(archiver=trl, es_id=es_id)
[docs] def set_attribute_configuration(self, archiver: str, es_id: str): """Method sets attribute configuration :param archiver: event subscriber name. :type archiver: str """ archiver_proxy = get_device_proxy(archiver) attributes_list = archiver_proxy.read_attribute("AttributeList").value for attribute in attributes_list: is_set_default: bool = False try: attribute_trl = attribute.split(";")[0] attribute_proxy = get_attribute_proxy(attribute_trl) except tango.DevFailed as error: logger.error( "Could not get attribute config information%s (%s);", attribute_trl, error.args[0].desc, ) is_set_default = True attr_config = AttributeConfiguration() attr_config.archive_abs_change = INFO_NOT_AVAILABLE attr_config.archive_period = INFO_NOT_AVAILABLE attr_config.archive_rel_change = INFO_NOT_AVAILABLE attr_config.polling_period = INFO_NOT_AVAILABLE attr_config.code_push_event = False if not is_set_default: att_conf = attribute_proxy.get_config() poll_period = attribute_proxy.get_poll_period() abs_change = att_conf.events.arch_event.archive_abs_change rel_change = att_conf.events.arch_event.archive_rel_change period = att_conf.events.arch_event.archive_period attr_config = AttributeConfiguration() if abs_change != "Not specified": attr_config.archive_abs_change = float(abs_change) if rel_change != "Not specified": attr_config.archive_rel_change = float(rel_change) if period != "Not specified": attr_config.archive_period = int(period) if poll_period: attr_config.polling_period = poll_period if ( rel_change == period == abs_change == "Not specified" and not poll_period ): attr_config.code_push_event = True elif poll_period == 0: attr_config.code_push_event = True managerid = self.get_managerid_of_archiver(archiver) attribute_detail = Attribute( trl=attribute_trl, es=es_id, configuration=attr_config, ) if managerid: attribute_detail.cm = managerid self.archiving_configuration.attributes.append(attribute_detail)
[docs] def check_archivers_in_manager(self, configuration_manager: str) -> bool: """Method checks if manager contains any of the event subscriber present in the file. :param configuration_manager: configuration manager :type configuration_manager: str :return: Returns True if any of the archiver is present manager. :rtype: bool """ archiver_cm_link_present: bool = False for archiver in self.archiver_trls: cm_proxy = get_device_proxy(configuration_manager) if archiver in cm_proxy.read_attribute("ArchiverList").value: self.cm_es_mapping[archiver].append(configuration_manager) archiver_cm_link_present = True return archiver_cm_link_present
[docs] def get_manager_id(self, manager: str) -> str: """Method provides the configuration manager id \ based on the manager trl. :param manager: _description_ :type manager: str :return: Returns configuration manager id :rtype: str """ cmid: str = "" for cm_details in self.archiving_configuration.configuration_managers: for cm_id, cm_detail in cm_details.items(): host, port = self.get_host_port_from_id(cm_detail.th) trl = self.get_full_trl(host, port, cm_detail.trl) if trl == manager: cmid = cm_id if cmid: break return cmid
[docs] def get_managerid_of_archiver(self, archiver: str) -> str: """Method provides the manager id based on event subsriber trl. :param archiver: event subscriber trl. :type archiver: str :return: Returns the event subscriber id :rtype: str """ manager_id: str = "" if not self.is_full_trl(archiver): archiver = self.get_full_trl( self.default_host, self.default_host, archiver ) for es, manager_list in self.cm_es_mapping.items(): if es == archiver: manager_id = self.get_manager_id(manager_list[0]) break return manager_id
[docs] def set_configuration_managers(self, configuration_managers: list[str]): """Method sets the configuration manager details. :param configuration_managers: List of configuration manager trl :type configuration_managers: list """ configuration_manager_id = 1 for manager in configuration_managers: host, port = self.default_host, self.default_port if self.is_full_trl(manager): host, port = self.get_host_port_from_trl(manager) manager_name = self.get_short_trl(manager) db = tango.Database(host, port) host_id = self.get_host_id(host, port) device_names: list = db.get_device_exported( manager_name ).value_string for device_name in device_names: cm_trl = self.get_full_trl(host, port, device_name) if self.check_archivers_in_manager(cm_trl): manager_detail = { f"cm{configuration_manager_id}": ConfigurationManager( trl=device_name, th=host_id ) } configuration_manager_id += 1 self.archiving_configuration.configuration_managers.append( manager_detail )
[docs] def remove_unnecessary_values(self, configurations: dict): """Method to remove unnecessary values from the configuraiton data. :param configurations: Attribute configuration data. :type configurations: dict """ attributes: list = [] for attribute_config in configurations["attributes"]: attribute: dict = {} for key, config in attribute_config.items(): if key == "th" and not config: continue if key == "configuration": filtered_config = { k: v for k, v in config.items() if v is not None } attribute.update({key: filtered_config}) continue attribute.update({key: config}) attributes.append(attribute) configurations["attributes"] = attributes
def main(): parser = ArgumentParser() archiving2yaml = Archiving2yaml() parser.add_argument( "-a", "--archivers", nargs="+", help="List of event subscribers trls", required=True, ) parser.add_argument( "-m", "--managers", nargs="+", help="List of configuration manager trls", ) args = parser.parse_args() archivers = [archiver.strip() for archiver in args.archivers] managers = [] if args.managers: managers = [manager.strip() for manager in args.managers] managers = managers[0].split(" ") archivers = archivers[0].split(" ") archiving2yaml.set_tango_hosts(archivers) archiving2yaml.set_archivers(archivers) archiving2yaml.set_configuration_managers(managers) archiving2yaml.set_all_archiver_attribute_configuration() configuration = asdict(archiving2yaml.archiving_configuration) archiving2yaml.remove_unnecessary_values(configuration) print( yaml.dump( configuration, sort_keys=False, default_flow_style=False, indent=2, allow_unicode=True, Dumper=IndentDumper, ) ) if __name__ == "__main__": main()