"""This is archiver configurator tool
"""
import argparse
import logging
import sys
from ska_ser_logging import configure_logging
from tango import DeviceProxy
from .action import get_actions, perform_actions, show_actions
from .archiver import get_current_attributes
from .config import ARCHIVING_DEFAULT_PARAMS, get_tango_database
from .script import configure
from .yamlparser import YamlParser
configure_logging(level=logging.DEBUG)
logger = logging.getLogger("archiver_configurator")
try:
from ._version import version # type: ignore
except ImportError:
version = "unknown"
[docs]
class ArchiverConfigurator:
"""
This class contain all methods to start configuring attributes
based on yaml file provided
"""
def __init__(self, config_file, is_update, is_show, is_remove, delay=0.3):
"""Initialize required data"""
self.config_file = config_file
self.is_update = is_update
self.is_remove = is_remove
self.is_show = is_show
self.delay = delay
self.logger = logging.getLogger("archiver_configurator")
self._actions: dict = {}
@property
def actions(self) -> dict:
"""Returns the actions"""
return self._actions
@actions.setter
def actions(self, value: dict) -> None:
"""Set the actions"""
self._actions = value
[docs]
def start(self):
"""Start archiving
1. Parse the config file and generate attributes config data
2. Perform archiver configuration for each attribute
"""
parser = YamlParser(self.config_file)
parser.parse()
attribute_configs = parser.get_attribute_configs()
self._configure(attribute_configs.configs)
def _generate_full_attribute_trl(
self, device: DeviceProxy, attribute_trl: str
):
"""Generate full attribute trl based on device db host and port
Args:
device(DeviceProxy): Device Proxy object
attribute_trl(str): Attribute trl string
"""
device = DeviceProxy(device)
db_host = device.get_db_host()
db_port = device.get_db_port()
full_attribute_trl = f"tango://{db_host}:{db_port}/{device.dev_name()}/{attribute_trl.lower().split('/')[-1]}"
return full_attribute_trl
def _update_actions(self, actions: list) -> None:
"""Update Actions
Args:
actions[list]: Actions list to update
"""
# Update self.actions
if "changed" in self.actions:
self.actions["changed"].update(actions.get("changed", {}))
else:
self.actions["changed"] = actions.get("changed", {})
if "added" in self.actions:
self.actions["added"].update(actions.get("added", {}))
else:
self.actions["added"] = actions.get("added", {})
if "removed" in self.actions:
self.actions["removed"].update(actions.get("removed", {}))
else:
self.actions["removed"] = actions.get("removed", {})
def _configure(self, attribute_config_dict):
"""Perform attribute archiving
Args:
attribute_config_dict(dict): Attribute config info
"""
self.actions["failed"] = []
for attribute_trl, config_list in attribute_config_dict.items():
for config in config_list:
_ = get_tango_database(config.db)
# Get the current archiving configuration. This consists of
# all archiving related settings for each attribute currently
# archived by the archiver.
current = get_current_attributes(
config.archiver, delay=self.delay
)
self.logger.debug("Current attributes %s", current)
# Build the configuration described by the config file + reality
desried_attributes = {}
# if not self.is_update:
attribute_trl = f"tango://{config.db}/{attribute_trl}"
# if self.is_update:
# full_attribute_trl = full_attribute_trl.replace(":10000", ".ska-tango-archiver.svc.cluster.local:10000")
full_attribute_trl = self._generate_full_attribute_trl(
config.device, attribute_trl
)
desried_attributes[full_attribute_trl] = {
**ARCHIVING_DEFAULT_PARAMS,
**config.configuration[0].attributes,
}
# Calculate what is needed to bring the current config to the desired state
if self.is_remove:
actions = {"removed": desried_attributes}
else:
actions = get_actions(current, desried_attributes)
if any(actions.values()):
output = show_actions(actions, self.is_update)
self.logger.info("Config %s", config.archiver)
self.logger.info("--- Required actions ---")
self.logger.info("\n".join(output))
failed = perform_actions(
config.manager,
config.archiver,
actions,
update=self.is_update,
)
failed_output = []
for key, attrs in failed.items():
if attrs:
failed_output.append(key.upper())
for attr, error in attrs:
line_error = error.replace("\n", " ")
failed_output.append(f"\t{attr}: {line_error}")
if failed_output:
self.logger.info(
"!!! Actions applied, but the"
" following actions failed !!!"
)
failed_output_str = "\n".join(failed_output)
self.logger.info(failed_output_str)
self.actions["failed"].append(failed_output_str)
else:
self.logger.info(
"*** Successfully applied the actions"
" to the archiving system! ***"
)
self._update_actions(actions)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"yamlfile",
type=str,
help="A YAML file containing archiver configuration.",
)
parser.add_argument(
"--write",
"-w",
default=False,
help="Apply the changes to the archiver",
action="store_true",
)
parser.add_argument(
"--update",
"-u",
default=False,
help="Don't remove any attributes, only add or change",
action="store_true",
)
parser.add_argument(
"-v",
"--verbose",
help="Show debug messages",
action="store_true",
)
parser.add_argument(
"-s",
"--show",
help="Print out the complete configuration",
action="store_true",
)
parser.add_argument(
"--remove",
"-r",
default=False,
help="Remove the provided attributes",
action="store_true",
)
parser.add_argument(
"--delay",
type=float,
default=0.3,
help="Delay in between archiving operations",
)
parser.add_argument(
"-of",
"--oldformat",
help="Old Format Yaml",
action="store_true",
)
parser.add_argument("-V", "--version", action="version", version=version)
args = parser.parse_args(sys.argv[1:])
if args.oldformat:
try:
configure(
args.yamlfile,
update=args.update,
write=args.write,
show=args.show,
delay=args.delay,
use_old_format=True,
logger=logger,
)
except RuntimeError as e:
logger.fatal("Error: %s", e)
sys.exit(1)
else:
archiver = ArchiverConfigurator(
args.yamlfile, args.update, args.show, args.remove, args.delay
)
archiver.start()
if __name__ == "__main__":
main()