"""This module contain class for yml parser based on new format
"""
import logging
from pathlib import Path
from typing import Dict
from ska_ser_logging import configure_logging
from yaml2archiving.utils.config_data_classes import (
AttributeConfig,
AttributeConfigs,
ConfigManagers,
ConfigurationItem,
DBConfig,
DBConfigs,
EventSubscribers,
)
from .config import load_yaml_file
configure_logging()
[docs]
class InvalidKeyReference(Exception):
"""Raise Invalid key parser error"""
pass
[docs]
class YamlParser:
"""This parser parse yaml file and generate the configuration data
for attributes specified in yaml file
"""
def __init__(self, config_path):
self.config_path = config_path
self.db_configs = DBConfigs()
self.config_manager = ConfigManagers()
self.event_subscriber = EventSubscribers()
self.attribute_configs = AttributeConfigs()
self.default_th = ""
self.default_cm = ""
self.default_es = ""
self.logger = logging.getLogger("yaml_parser")
self._load_config()
def _get_full_trl(self, db_info: DBConfig, device_name: str) -> str:
"""Get Full Device trl based on db and device name
Args:
db_info(DBConfig): Database Information
device_name(str): Name of the device
Returns:
str: trl of device
"""
if device_name.startswith("tango://"):
return device_name
return f"tango://{db_info.host}:{db_info.port}/{device_name}"
[docs]
def get_attribute_configs(self):
"""Return Attribute configs"""
return self.attribute_configs
def _load_config(self):
"""Load the yaml file"""
yaml_file = Path(self.config_path)
self.config_dict = load_yaml_file(yaml_file)
def _set_defaults(self):
"""Set defaults values"""
for default in self.config_dict.get("defaults", []):
th_value = default.get("th", None)
if th_value:
self.default_th = th_value
cm_value = default.get("cm", None)
if cm_value:
self.default_cm = cm_value
es_value = default.get("es")
if es_value:
self.default_es = es_value
[docs]
def set_db_config(self):
"""Set Db configuration"""
for host_info in self.config_dict.get("tango_hosts", []):
for key, value in host_info.items():
self.db_configs.data[key] = DBConfig(
host=value.get("host"), port=value.get("port", "10000")
)
[docs]
def set_config_managers(self):
"""Set Config Managers"""
for config_manager in self.config_dict.get(
"configuration_managers", []
):
for config_key, value in config_manager.items():
trl = value.get("trl")
self.validate_keys(value, ["th"])
th = value.get("th", self.default_th)
db_info = self.db_configs.data[th]
config_full_trl = self._get_full_trl(db_info, trl)
self.config_manager.data[config_key] = config_full_trl
[docs]
def set_event_subscribers(self):
"""Set Event Subscriber"""
for event_subscriber in self.config_dict.get("event_subscribers", []):
for event_sbr_key, value in event_subscriber.items():
self.validate_keys(value, ["th"])
trl = value.get("trl")
th = value.get("th", self.default_th)
db_info = self.db_configs.data[th]
event_sb_full_trl = self._get_full_trl(db_info, trl)
self.event_subscriber.data[event_sbr_key] = event_sb_full_trl
[docs]
def get_attribute_short_trl(self, trl: str) -> str:
"""
Args:
trl(str): device trl
Returns:
str: short form of trl
"""
if "tango://" in trl:
trl = "/".join(trl.split("/")[3:])
return trl
[docs]
def create_attribute_config_list(self) -> Dict[str, AttributeConfig]:
"""Create attribute config list"""
for attribute_dict in self.config_dict.get("attributes", []):
self.validate_keys(attribute_dict)
self.logger.debug(f"attribute {attribute_dict.get('trl')}")
device_trl = "/".join(attribute_dict.get("trl").split("/")[:-1])
es_key = attribute_dict.get("es", self.default_es)
cm_key = attribute_dict.get("cm", self.default_cm)
th_key = attribute_dict.get("th", self.default_th)
db_info = self.db_configs.data[th_key]
device_trl = self._get_full_trl(db_info, device_trl)
attribute_info = ConfigurationItem(
attributes=attribute_dict["configuration"]
)
attribute_key = self.get_attribute_short_trl(attribute_dict["trl"])
attr_config = AttributeConfig(
db=f"{db_info.host}:{db_info.port}",
manager=self.config_manager.data[cm_key],
archiver=self.event_subscriber.data[es_key],
device=device_trl,
configuration=[attribute_info],
)
if self.attribute_configs.configs.get(attribute_key):
self.attribute_configs.configs.get(attribute_key).append(
attr_config
)
else:
self.attribute_configs.configs[attribute_key] = [attr_config]
[docs]
def validate_keys(self, config, keys=["es", "cm", "th"]):
"""
Args:
config(dict): archive config dict
keys(list): keys to validate
"""
try:
if "es" in keys:
es_key = config.get("es", self.default_es)
self.event_subscriber.data[es_key]
if "cm" in keys:
cm_key = config.get("cm", self.default_cm)
self.config_manager.data[cm_key]
if "th" in keys:
th_key = config.get("th", self.default_th)
self.db_configs.data[th_key]
except KeyError as ke:
self.logger.error(
"Invalid key reference %s."
"Please check refer tango_hosts,event_subscribers"
" and configuration_managers section for key reference",
ke,
)
raise InvalidKeyReference(ke) from ke
[docs]
def parse(self):
"""
parse file and generate data
"""
self.set_db_config()
self._set_defaults()
self.set_event_subscribers()
self.set_config_managers()
self.create_attribute_config_list()