Source code for yaml2archiving.v1.yamlparser

"""This module contain class for yml parser based on new format
"""
import logging
from pathlib import Path
from typing import Dict

from ska_ser_logging import configure_logging

from yaml2archiving.utils.config_data_classes import (
    AttributeConfig,
    AttributeConfigs,
    ConfigManagers,
    ConfigurationItem,
    DBConfig,
    DBConfigs,
    EventSubscribers,
)

from .config import load_yaml_file

configure_logging()


[docs] class InvalidKeyReference(Exception): """Raise Invalid key parser error""" pass
[docs] class YamlParser: """This parser parse yaml file and generate the configuration data for attributes specified in yaml file """ def __init__(self, config_path): self.config_path = config_path self.db_configs = DBConfigs() self.config_manager = ConfigManagers() self.event_subscriber = EventSubscribers() self.attribute_configs = AttributeConfigs() self.default_th = "" self.default_cm = "" self.default_es = "" self.logger = logging.getLogger("yaml_parser") self._load_config() def _get_full_trl(self, db_info: DBConfig, device_name: str) -> str: """Get Full Device trl based on db and device name Args: db_info(DBConfig): Database Information device_name(str): Name of the device Returns: str: trl of device """ if device_name.startswith("tango://"): return device_name return f"tango://{db_info.host}:{db_info.port}/{device_name}"
[docs] def get_attribute_configs(self): """Return Attribute configs""" return self.attribute_configs
def _load_config(self): """Load the yaml file""" yaml_file = Path(self.config_path) self.config_dict = load_yaml_file(yaml_file) def _set_defaults(self): """Set defaults values""" for default in self.config_dict.get("defaults", []): th_value = default.get("th", None) if th_value: self.default_th = th_value cm_value = default.get("cm", None) if cm_value: self.default_cm = cm_value es_value = default.get("es") if es_value: self.default_es = es_value
[docs] def set_db_config(self): """Set Db configuration""" for host_info in self.config_dict.get("tango_hosts", []): for key, value in host_info.items(): self.db_configs.data[key] = DBConfig( host=value.get("host"), port=value.get("port", "10000") )
[docs] def set_config_managers(self): """Set Config Managers""" for config_manager in self.config_dict.get( "configuration_managers", [] ): for config_key, value in config_manager.items(): trl = value.get("trl") self.validate_keys(value, ["th"]) th = value.get("th", self.default_th) db_info = self.db_configs.data[th] config_full_trl = self._get_full_trl(db_info, trl) self.config_manager.data[config_key] = config_full_trl
[docs] def set_event_subscribers(self): """Set Event Subscriber""" for event_subscriber in self.config_dict.get("event_subscribers", []): for event_sbr_key, value in event_subscriber.items(): self.validate_keys(value, ["th"]) trl = value.get("trl") th = value.get("th", self.default_th) db_info = self.db_configs.data[th] event_sb_full_trl = self._get_full_trl(db_info, trl) self.event_subscriber.data[event_sbr_key] = event_sb_full_trl
[docs] def get_attribute_short_trl(self, trl: str) -> str: """ Args: trl(str): device trl Returns: str: short form of trl """ if "tango://" in trl: trl = "/".join(trl.split("/")[3:]) return trl
[docs] def create_attribute_config_list(self) -> Dict[str, AttributeConfig]: """Create attribute config list""" for attribute_dict in self.config_dict.get("attributes", []): self.validate_keys(attribute_dict) self.logger.debug(f"attribute {attribute_dict.get('trl')}") device_trl = "/".join(attribute_dict.get("trl").split("/")[:-1]) es_key = attribute_dict.get("es", self.default_es) cm_key = attribute_dict.get("cm", self.default_cm) th_key = attribute_dict.get("th", self.default_th) db_info = self.db_configs.data[th_key] device_trl = self._get_full_trl(db_info, device_trl) attribute_info = ConfigurationItem( attributes=attribute_dict["configuration"] ) attribute_key = self.get_attribute_short_trl(attribute_dict["trl"]) attr_config = AttributeConfig( db=f"{db_info.host}:{db_info.port}", manager=self.config_manager.data[cm_key], archiver=self.event_subscriber.data[es_key], device=device_trl, configuration=[attribute_info], ) if self.attribute_configs.configs.get(attribute_key): self.attribute_configs.configs.get(attribute_key).append( attr_config ) else: self.attribute_configs.configs[attribute_key] = [attr_config]
[docs] def validate_keys(self, config, keys=["es", "cm", "th"]): """ Args: config(dict): archive config dict keys(list): keys to validate """ try: if "es" in keys: es_key = config.get("es", self.default_es) self.event_subscriber.data[es_key] if "cm" in keys: cm_key = config.get("cm", self.default_cm) self.config_manager.data[cm_key] if "th" in keys: th_key = config.get("th", self.default_th) self.db_configs.data[th_key] except KeyError as ke: self.logger.error( "Invalid key reference %s." "Please check refer tango_hosts,event_subscribers" " and configuration_managers section for key reference", ke, ) raise InvalidKeyReference(ke) from ke
[docs] def parse(self): """ parse file and generate data """ self.set_db_config() self._set_defaults() self.set_event_subscribers() self.set_config_managers() self.create_attribute_config_list()