"""
Simple helper script to generate a configuration file from an existing
archive setup.
"""
import logging
import os
import re
from argparse import ArgumentParser
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from functools import lru_cache
from typing import Optional
import tango
import yaml
from ska_ser_logging import configure_logging
configure_logging()
logger = logging.getLogger("archiving2yaml")
INFO_NOT_AVAILABLE = "Info not available"
class IndentDumper(yaml.SafeDumper):
"""Class to help with indent, incase of nested dictionary and sequence."""
def increase_indent(self, flow=False, indentless=False):
return super().increase_indent(flow, False)
@lru_cache(None)
def get_device_proxy(device):
"""Method provides DeviceProxy for the given device"""
return tango.DeviceProxy(device)
@lru_cache(None)
def get_attribute_proxy(attribute_trl):
"""Method provides AttributeProxy for the given device"""
return tango.AttributeProxy(attribute_trl)
@dataclass
class TangoHost:
"""Dataclass to maintain tango host details."""
host: str
port: int = 10000
@dataclass
class ConfigurationManager:
"""Dataclass to maintain configuration manager details."""
trl: str = "Please update cm"
th: str = "Please update cm"
@dataclass
class EventSubscriber:
"""Dataclass to maintain event subscriber details."""
trl: str
th: Optional[str] = None
@dataclass
class AttributeConfiguration:
"""Dataclass to maintain attribute configuration details."""
code_push_event: bool | None = None
polling_period: int | None = None
archive_abs_change: float | None = None
archive_rel_change: float | None = None
archive_period: int | None = None
@dataclass
class Attribute:
"""Dataclass to maintain attribute details."""
trl: str
es: str
cm: str = "Please update cm."
th: str | None = None
configuration: AttributeConfiguration | None = None
@dataclass
class Archiving2yamlConfiguration:
"""Stores configuration details to generate yaml file format."""
tango_hosts: list[dict[str, TangoHost] | None] = field(
default_factory=list
)
configuration_managers: list[
dict[str, ConfigurationManager] | None
] = field(default_factory=list)
event_subscribers: list[dict[str, EventSubscriber] | None] = field(
default_factory=list
)
attributes: list[Attribute | None] = field(default_factory=list)
[docs]
class Archiving2yaml:
"""Class provides functionality to fetch details from
the archiver and format zthe data for yaml file generation.
"""
def __init__(self):
"""Initialises the instance variables utilised for maintaining data."""
self.archiving_configuration = Archiving2yamlConfiguration()
self.default_host, self.default_port = os.getenv("TANGO_HOST").split(
":"
)
self.archiver_trls: list[str | None] = []
self.cm_es_mapping: dict[str, list] = defaultdict(list)
[docs]
def add_archiver_trls(self, host: str, port: str, archiver_name: str):
"""Adds the archiver full trls into a list.
:param host: tango host of event subscriber
:type host: str
:param port: port of event subscriber
:type port: str
:param archiver_name: name of event subscriber
:type archiver_name: str
"""
if not self.is_full_trl(archiver_name):
self.archiver_trls.append(
self.get_full_trl(host, port, archiver_name)
)
else:
self.archiver_trls.append(archiver_name)
[docs]
def get_short_trl(self, dev_name: str) -> str:
"""Method to provide the short trl from full trl.
:param dev_name: device name
:type dev_name: str
:return: Returns the short trl of device.
:rtype: str
"""
if self.is_full_trl(dev_name):
domain, family, member = dev_name.split("/")[-3:]
return os.path.join(domain, family, member)
return dev_name
[docs]
def get_full_trl(self, host: str, port: str, dev_name: str) -> str:
"""Method to provide the short trl from full trl.
:param dev_name: device name
:type dev_name: str
:param dev_name: tango host
:type dev_name: str
:param dev_name: tango port
:type dev_name: str
:return: Returns the short trl of device.
:rtype: str
"""
return f"tango://{host}:{port}/{dev_name}"
[docs]
def set_archivers(self, archiver_names: list[str]):
"""
Sets Event subscriber details based on list provided.
archiver_names (list): The name of archivers for which data needs
to be checked.
For example: ["mid-eda/es/01"] or ["mid-eda/es/01","mid-eda/es/02"].
Also accepts ["mid-eda/es/\\*"], this would get all the instance
present in current host.
"""
event_subscriber_id = 1
for archiver in archiver_names:
host, port = self.default_host, self.default_port
if self.is_full_trl(archiver):
host, port = self.get_host_port_from_trl(archiver)
db = tango.Database(host, port)
host_id = self.get_host_id(host, port)
archiver_name = self.get_short_trl(archiver)
device_names: list = db.get_device_exported(
archiver_name
).value_string
for device_name in device_names:
self.add_archiver_trls(host, port, device_name)
archiver_detail = {
f"es{event_subscriber_id}": EventSubscriber(
trl=device_name, th=host_id
)
}
event_subscriber_id += 1
self.archiving_configuration.event_subscribers.append(
archiver_detail
)
[docs]
def is_full_trl(self, device_name: str) -> bool:
"""Method checks if the trl is full or short.
:param device_name: Device name.
:type device_name: str
:return: Returns True if trl is in full format else False.
:rtype: bool
"""
return "tango://" in device_name
[docs]
def get_host_port_from_trl(self, device_name: str) -> tuple[str, str]:
"""Provides tango host and port details from the trl
:param device_name: device name.
:type device_name: str
:return: Returns tango host and port of the device.
:rtype: tuple[str,str]
"""
match = re.match(r"tango://([^/]+)", device_name)
host, port = match.group(1).split(":")
return host, port
[docs]
def get_host_port_from_id(self, host_id: str) -> tuple[str, str]:
"""Provides host and port details from host_id
:param host_id: host id
:type host_id: str
:return: Returns host and port details for the host id.
:rtype: tuple[str]
"""
host, port = "", ""
for host_details in self.archiving_configuration.tango_hosts:
for hostid, host_detail in host_details.items():
if hostid == host_id:
host = host_detail.host
port = host_detail.port
if host and port:
break
return host, port
[docs]
def get_host_id(self, host: str, port: str) -> str:
"""Provides the tango host id based on host and port details.
:param host: Tango host name.
:type host: str
:param port: Tango database port name.
:type port: str
"""
hostid: str = ""
for host_details in self.archiving_configuration.tango_hosts:
for host_id, host_detail in host_details.items():
if host_detail.host == host and host_detail.port == port:
hostid = host_id
if hostid:
break
return hostid
[docs]
def set_tango_hosts(self, archiver_names: list[str]):
"""Sets tango host into required format from archiver trl.
:param archiver_names: list of tango hosts.
For example:
["mid-eda/es/01"], here it uses current tango host.
["tango://tango-databaseds:10000/mid-eda/es/01",
"tango://tango-databaseds2:10000/mid-eda/es/\\*"]
Here,incase of "tango-databaseds2" the port will be set to
default that is 10000.
Please use KUBE DNS <host>.<namespace>.svc.<cluster-domain>
for hosts outside the current namespace of deployment.
:type archiver_names: list[str]
"""
tango_host_id = 1
default_host_required: bool = False
hosts = []
for archiver in archiver_names:
tango_host_details: dict[str, TangoHost] | dict[None] = {}
if self.is_full_trl(archiver):
host, port = self.get_host_port_from_trl(archiver)
host_port = f"{host}:{port}"
if host_port in hosts:
continue
hosts.append(host_port)
tango_host_details[f"th{tango_host_id}"] = TangoHost(
host, port
)
tango_host_id += 1
self.archiving_configuration.tango_hosts.append(
tango_host_details
)
else:
default_host_required = True
if (
default_host_required
and f"{self.default_host}:{self.default_port}" not in hosts
):
tango_host_details = {} # reset
tango_host_details[f"th{tango_host_id}"] = TangoHost(
self.default_host, self.default_port
)
tango_host_id += 1
self.archiving_configuration.tango_hosts.append(tango_host_details)
[docs]
def set_all_archiver_attribute_configuration(self):
"""Method sets the attribute configuration
information for all the archiver."""
for es_details in self.archiving_configuration.event_subscribers:
for es_id, es_detail in es_details.items():
host, port = self.get_host_port_from_id(es_detail.th)
trl = self.get_full_trl(host, port, es_detail.trl)
self.set_attribute_configuration(archiver=trl, es_id=es_id)
[docs]
def set_attribute_configuration(self, archiver: str, es_id: str):
"""Method sets attribute configuration
:param archiver: event subscriber name.
:type archiver: str
"""
archiver_proxy = get_device_proxy(archiver)
attributes_list = archiver_proxy.read_attribute("AttributeList").value
for attribute in attributes_list:
is_set_default: bool = False
try:
attribute_trl = attribute.split(";")[0]
attribute_proxy = get_attribute_proxy(attribute_trl)
except tango.DevFailed as error:
logger.error(
"Could not get attribute config information%s (%s);",
attribute_trl,
error.args[0].desc,
)
is_set_default = True
attr_config = AttributeConfiguration()
attr_config.archive_abs_change = INFO_NOT_AVAILABLE
attr_config.archive_period = INFO_NOT_AVAILABLE
attr_config.archive_rel_change = INFO_NOT_AVAILABLE
attr_config.polling_period = INFO_NOT_AVAILABLE
attr_config.code_push_event = False
if not is_set_default:
att_conf = attribute_proxy.get_config()
poll_period = attribute_proxy.get_poll_period()
abs_change = att_conf.events.arch_event.archive_abs_change
rel_change = att_conf.events.arch_event.archive_rel_change
period = att_conf.events.arch_event.archive_period
attr_config = AttributeConfiguration()
if abs_change != "Not specified":
attr_config.archive_abs_change = float(abs_change)
if rel_change != "Not specified":
attr_config.archive_rel_change = float(rel_change)
if period != "Not specified":
attr_config.archive_period = int(period)
if poll_period:
attr_config.polling_period = poll_period
if (
rel_change == period == abs_change == "Not specified"
and not poll_period
):
attr_config.code_push_event = True
elif poll_period == 0:
attr_config.code_push_event = True
managerid = self.get_managerid_of_archiver(archiver)
attribute_detail = Attribute(
trl=attribute_trl,
es=es_id,
configuration=attr_config,
)
if managerid:
attribute_detail.cm = managerid
self.archiving_configuration.attributes.append(attribute_detail)
[docs]
def check_archivers_in_manager(self, configuration_manager: str) -> bool:
"""Method checks if manager contains any of the event subscriber
present in the file.
:param configuration_manager: configuration manager
:type configuration_manager: str
:return: Returns True if any of the archiver is present manager.
:rtype: bool
"""
archiver_cm_link_present: bool = False
for archiver in self.archiver_trls:
cm_proxy = get_device_proxy(configuration_manager)
if archiver in cm_proxy.read_attribute("ArchiverList").value:
self.cm_es_mapping[archiver].append(configuration_manager)
archiver_cm_link_present = True
return archiver_cm_link_present
[docs]
def get_manager_id(self, manager: str) -> str:
"""Method provides the configuration manager id \
based on the manager trl.
:param manager: _description_
:type manager: str
:return: Returns configuration manager id
:rtype: str
"""
cmid: str = ""
for cm_details in self.archiving_configuration.configuration_managers:
for cm_id, cm_detail in cm_details.items():
host, port = self.get_host_port_from_id(cm_detail.th)
trl = self.get_full_trl(host, port, cm_detail.trl)
if trl == manager:
cmid = cm_id
if cmid:
break
return cmid
[docs]
def get_managerid_of_archiver(self, archiver: str) -> str:
"""Method provides the manager id based on event subsriber trl.
:param archiver: event subscriber trl.
:type archiver: str
:return: Returns the event subscriber id
:rtype: str
"""
manager_id: str = ""
if not self.is_full_trl(archiver):
archiver = self.get_full_trl(
self.default_host, self.default_host, archiver
)
for es, manager_list in self.cm_es_mapping.items():
if es == archiver:
manager_id = self.get_manager_id(manager_list[0])
break
return manager_id
[docs]
def set_configuration_managers(self, configuration_managers: list[str]):
"""Method sets the configuration manager details.
:param configuration_managers: List of configuration manager trl
:type configuration_managers: list
"""
configuration_manager_id = 1
for manager in configuration_managers:
host, port = self.default_host, self.default_port
if self.is_full_trl(manager):
host, port = self.get_host_port_from_trl(manager)
manager_name = self.get_short_trl(manager)
db = tango.Database(host, port)
host_id = self.get_host_id(host, port)
device_names: list = db.get_device_exported(
manager_name
).value_string
for device_name in device_names:
cm_trl = self.get_full_trl(host, port, device_name)
if self.check_archivers_in_manager(cm_trl):
manager_detail = {
f"cm{configuration_manager_id}": ConfigurationManager(
trl=device_name, th=host_id
)
}
configuration_manager_id += 1
self.archiving_configuration.configuration_managers.append(
manager_detail
)
[docs]
def remove_unnecessary_values(self, configurations: dict):
"""Method to remove unnecessary values from the configuraiton data.
:param configurations: Attribute configuration data.
:type configurations: dict
"""
attributes: list = []
for attribute_config in configurations["attributes"]:
attribute: dict = {}
for key, config in attribute_config.items():
if key == "th" and not config:
continue
if key == "configuration":
filtered_config = {
k: v for k, v in config.items() if v is not None
}
attribute.update({key: filtered_config})
continue
attribute.update({key: config})
attributes.append(attribute)
configurations["attributes"] = attributes
def main():
parser = ArgumentParser()
archiving2yaml = Archiving2yaml()
parser.add_argument(
"-a",
"--archivers",
nargs="+",
help="List of event subscribers trls",
required=True,
)
parser.add_argument(
"-m",
"--managers",
nargs="+",
help="List of configuration manager trls",
)
args = parser.parse_args()
archivers = [archiver.strip() for archiver in args.archivers]
managers = []
if args.managers:
managers = [manager.strip() for manager in args.managers]
managers = managers[0].split(" ")
archivers = archivers[0].split(" ")
archiving2yaml.set_tango_hosts(archivers)
archiving2yaml.set_archivers(archivers)
archiving2yaml.set_configuration_managers(managers)
archiving2yaml.set_all_archiver_attribute_configuration()
configuration = asdict(archiving2yaml.archiving_configuration)
archiving2yaml.remove_unnecessary_values(configuration)
print(
yaml.dump(
configuration,
sort_keys=False,
default_flow_style=False,
indent=2,
allow_unicode=True,
Dumper=IndentDumper,
)
)
if __name__ == "__main__":
main()