Source code for yaml2archiving.v1.archiver_configurator

"""This is archiver configurator tool
"""
import argparse
import logging
import sys

from ska_ser_logging import configure_logging
from tango import DeviceProxy

from .action import get_actions, perform_actions, show_actions
from .archiver import get_current_attributes
from .config import ARCHIVING_DEFAULT_PARAMS, get_tango_database
from .script import configure
from .yamlparser import YamlParser

configure_logging(level=logging.DEBUG)
logger = logging.getLogger("archiver_configurator")
try:
    from ._version import version  # type: ignore
except ImportError:
    version = "unknown"


[docs] class ArchiverConfigurator: """ This class contain all methods to start configuring attributes based on yaml file provided """ def __init__(self, config_file, is_update, is_show, is_remove, delay=0.3): """Initialize required data""" self.config_file = config_file self.is_update = is_update self.is_remove = is_remove self.is_show = is_show self.delay = delay self.logger = logging.getLogger("archiver_configurator") self._actions: dict = {} @property def actions(self) -> dict: """Returns the actions""" return self._actions @actions.setter def actions(self, value: dict) -> None: """Set the actions""" self._actions = value
[docs] def start(self): """Start archiving 1. Parse the config file and generate attributes config data 2. Perform archiver configuration for each attribute """ parser = YamlParser(self.config_file) parser.parse() attribute_configs = parser.get_attribute_configs() self._configure(attribute_configs.configs)
def _generate_full_attribute_trl( self, device: DeviceProxy, attribute_trl: str ): """Generate full attribute trl based on device db host and port Args: device(DeviceProxy): Device Proxy object attribute_trl(str): Attribute trl string """ device = DeviceProxy(device) db_host = device.get_db_host() db_port = device.get_db_port() full_attribute_trl = f"tango://{db_host}:{db_port}/{device.dev_name()}/{attribute_trl.lower().split('/')[-1]}" return full_attribute_trl def _update_actions(self, actions: list) -> None: """Update Actions Args: actions[list]: Actions list to update """ # Update self.actions if "changed" in self.actions: self.actions["changed"].update(actions.get("changed", {})) else: self.actions["changed"] = actions.get("changed", {}) if "added" in self.actions: self.actions["added"].update(actions.get("added", {})) else: self.actions["added"] = actions.get("added", {}) if "removed" in self.actions: self.actions["removed"].update(actions.get("removed", {})) else: self.actions["removed"] = actions.get("removed", {}) def _configure(self, attribute_config_dict): """Perform attribute archiving Args: attribute_config_dict(dict): Attribute config info """ self.actions["failed"] = [] for attribute_trl, config_list in attribute_config_dict.items(): for config in config_list: _ = get_tango_database(config.db) # Get the current archiving configuration. This consists of # all archiving related settings for each attribute currently # archived by the archiver. current = get_current_attributes( config.archiver, delay=self.delay ) self.logger.debug("Current attributes %s", current) # Build the configuration described by the config file + reality desried_attributes = {} # if not self.is_update: attribute_trl = f"tango://{config.db}/{attribute_trl}" # if self.is_update: # full_attribute_trl = full_attribute_trl.replace(":10000", ".ska-tango-archiver.svc.cluster.local:10000") full_attribute_trl = self._generate_full_attribute_trl( config.device, attribute_trl ) desried_attributes[full_attribute_trl] = { **ARCHIVING_DEFAULT_PARAMS, **config.configuration[0].attributes, } # Calculate what is needed to bring the current config to the desired state if self.is_remove: actions = {"removed": desried_attributes} else: actions = get_actions(current, desried_attributes) if any(actions.values()): output = show_actions(actions, self.is_update) self.logger.info("Config %s", config.archiver) self.logger.info("--- Required actions ---") self.logger.info("\n".join(output)) failed = perform_actions( config.manager, config.archiver, actions, update=self.is_update, ) failed_output = [] for key, attrs in failed.items(): if attrs: failed_output.append(key.upper()) for attr, error in attrs: line_error = error.replace("\n", " ") failed_output.append(f"\t{attr}: {line_error}") if failed_output: self.logger.info( "!!! Actions applied, but the" " following actions failed !!!" ) failed_output_str = "\n".join(failed_output) self.logger.info(failed_output_str) self.actions["failed"].append(failed_output_str) else: self.logger.info( "*** Successfully applied the actions" " to the archiving system! ***" ) self._update_actions(actions)
def main(): parser = argparse.ArgumentParser() parser.add_argument( "yamlfile", type=str, help="A YAML file containing archiver configuration.", ) parser.add_argument( "--write", "-w", default=False, help="Apply the changes to the archiver", action="store_true", ) parser.add_argument( "--update", "-u", default=False, help="Don't remove any attributes, only add or change", action="store_true", ) parser.add_argument( "-v", "--verbose", help="Show debug messages", action="store_true", ) parser.add_argument( "-s", "--show", help="Print out the complete configuration", action="store_true", ) parser.add_argument( "--remove", "-r", default=False, help="Remove the provided attributes", action="store_true", ) parser.add_argument( "--delay", type=float, default=0.3, help="Delay in between archiving operations", ) parser.add_argument( "-of", "--oldformat", help="Old Format Yaml", action="store_true", ) parser.add_argument("-V", "--version", action="version", version=version) args = parser.parse_args(sys.argv[1:]) if args.oldformat: try: configure( args.yamlfile, update=args.update, write=args.write, show=args.show, delay=args.delay, use_old_format=True, logger=logger, ) except RuntimeError as e: logger.fatal("Error: %s", e) sys.exit(1) else: archiver = ArchiverConfigurator( args.yamlfile, args.update, args.show, args.remove, args.delay ) archiver.start() if __name__ == "__main__": main()