Source code for yaml2archiving.v1.action

import logging
import sys
import time
from copy import copy
from typing import Dict, Tuple

import tango  # type: ignore

from . import ARCHIVING_PARAMS, AttributeConfig, get_attribute_configs

if sys.version_info >= (3, 8):
    from typing import TypedDict
else:
    from typing_extensions import TypedDict


logger = logging.getLogger(__name__)


[docs] def get_safe_params( attr: str, old: AttributeConfig, new: AttributeConfig ) -> AttributeConfig: """Do some checks on the given params and return a 'safe' version""" old_polling = old.get("polling_period") new_polling = new.get("polling_period") config = copy(new) # Check that polling is valid if old_polling and new_polling and new_polling > old_polling: # We are careful not to slow down polling automatically, since it could # be that it needs to be set to what it is for external reasons. We leave # it to the user to manually tweak polling in that case. logger.warning( "Polling for %r would be slowed down from %d to %d;" " not changing.", attr, old_polling, new_polling, ) config["polling_period"] = old_polling if old_polling and not new_polling: logger.warning("Not turning off polling for %r.", attr) config["polling_period"] = old_polling return config
Actions = TypedDict( "Actions", { "added": Dict[str, Tuple[AttributeConfig, AttributeConfig]], "removed": Dict[str, AttributeConfig], "changed": Dict[str, Tuple[AttributeConfig, AttributeConfig]], }, total=True, )
[docs] def get_actions( prev_attrs: Dict[str, AttributeConfig], next_attrs: Dict[str, AttributeConfig], use_old_format: bool = False, ) -> Actions: """ Takes "previous" and "next" attribute configurations, then produces a list of actions needed to go from the first situation to the second. I.e. additions, removals and changes. """ if use_old_format: removed = { attr: prev_attrs[attr] for attr in set(prev_attrs) - set(next_attrs) } else: removed = {} added = {} new_attrs = set(next_attrs) - set(prev_attrs) logger.debug("New attr %s", new_attrs) logger.debug("Prev attr %s", prev_attrs) logger.debug("Removed %s", removed) db = tango.Database() new_attr_configs = get_attribute_configs(db, list(new_attrs)) for attr, current in new_attr_configs.items(): added[attr] = ( current, get_safe_params(attr, current, next_attrs[attr]), ) maybe_changed = { attr: ( prev_attrs[attr], get_safe_params(attr, prev_attrs[attr], next_attrs[attr]), ) for attr, params in next_attrs.items() if attr not in added } # Remove empty changes changed = { attr: (a, b) for attr, (a, b) in maybe_changed.items() if a != b } return { "added": added, "removed": removed, "changed": changed, }
[docs] def show_actions(actions: Actions, update: bool = False): """Return some actions formatted in a human readable way.""" output = [] added = 0 changed = 0 removed = 0 skipped_remove = 0 # Only may happen when update is True! for attr, (current, desired) in actions.get("added", {}).items(): output.append(f"ADD {attr}") added += 1 for param in ARCHIVING_PARAMS: if param in current or param in desired: if current.get(param) is None and desired.get(param) is None: continue output.append( f"\t{param}: {current.get(param)} -> {desired.get(param)}" ) for attr, (current, desired) in actions.get("changed", {}).items(): output.append(f"CHANGE {attr}") changed += 1 for param in ARCHIVING_PARAMS: if param in current or param in desired: if current.get(param) == desired.get(param): continue output.append( f"\t{param}: {current.get(param)} -> {desired.get(param)}" ) for attr, _ in actions.get("removed", {}).items(): if update: output.append(f"SKIP REMOVE {attr}\n\tCause: --update flag used") skipped_remove += 1 else: output.append(f"REMOVE {attr}") removed += 1 output.append("Totals:") output.append(f"- Added: {added}") output.append(f"- Changed: {changed}") if update: output.append(f"- Removed: {removed} ({skipped_remove} skipped)") else: output.append(f"- Removed: {removed}") return output
[docs] def get_archiving_settings(manager: tango.DeviceProxy): """ Return the current "set attribute" values for the manager as a string. These are the parameters filled in before running "AddAttribute". Can be useful for debugging issues. """ attr_list = [ "SetArchiver", "SetAttributeName", "SetCodePushedEvent", "SetPeriodEvent", "SetPollingPeriod", "SetAbsoluteEvent", "SetRelativeEvent", ] print_buff = [] print_buff.append("Dumping all settings for the current attribute") for attr in attr_list: print_buff.append( f"\t\t\t\t\t * {attr} read: {manager.read_attribute(attr).value}," f" write {manager.read_attribute(attr).w_value}" ) return "\n".join(print_buff)
[docs] def perform_actions( manager: str, archiver: str, actions, update=False, delay=0, get_device_proxy=tango.DeviceProxy, get_attribute_proxy=tango.AttributeProxy, use_old_format=False, ): """Take a bunch of "actions" and apply them to the control system.""" try: manager_proxy = get_device_proxy(manager) manager_proxy.ping() except tango.DevFailed as e: logger.fatal( "Can't connect to the archiving configuration manager %r: %s", manager, e.args[-1].desc, ) raise RuntimeError("Could not contact manager") from e failed: dict[str, list[tuple[str, str]]] = { "add": [], "remove": [], "change": [], } # Added for attr, (current, desired) in actions.get("added", {}).items(): logging.info("Adding %r", attr) # Relax the process a bit, to prevent overwhelming the system time.sleep(delay) # Adding an attribute to archiving is done with the manager device. # The settings are written to various attributes on that device, # and then a command is run to apply. parameters_to_unset = set() try: # Archiver if ( not use_old_format and archiver not in manager_proxy.ArchiverList ): manager_proxy.ArchiverAdd(archiver) manager_proxy.SetArchiver = archiver logging.info("SetArchiver: %r for %s", archiver, attr) # Attribute Name # TODO this fails if the device is not running? manager_proxy.SetAttributeName = attr # Relative event filter value = desired.get("archive_rel_change", 0) manager_proxy.SetRelativeEvent = value logging.debug("SetRelativeEvent: %r for %s", value, attr) if value == 0: parameters_to_unset.add("archive_rel_change") # Absolute event filter value = desired.get("archive_abs_change", 0) manager_proxy.SetAbsoluteEvent = value logging.debug("SetAbsoluteEvent: %r for %s", value, attr) if value == 0: parameters_to_unset.add("archive_abs_change") # Periodic event filter value = desired.get("archive_period", 0) manager_proxy.SetPeriodEvent = value logging.debug("SetPeriodEvent: %r for %s", value, attr) if value == 0: parameters_to_unset.add("archive_period") # Polling period polling_period = int(desired.get("polling_period", 0)) current_polling_period = current.get("polling_period", None) logging.debug( "Current polling period for %r: %r", attr, current_polling_period, ) if polling_period != 0: manager_proxy.SetPollingPeriod = polling_period logging.debug( "SetPollingPeriod: %r for %s", polling_period, attr ) # CodePushedEvent # If we dont have any polling set, then we assume that code pushes. code_push_event = polling_period == 0 manager_proxy.SetCodePushedEvent = code_push_event logging.debug( "SetCodePushedEvent: %r for %s", code_push_event, attr ) # Strategy value = desired.get( "archive_strategy", "ALWAYS" ) # TODO needed to default here? manager_proxy.SetStrategy = value logging.debug("SetStrategy: %r for %s", value, attr) # Finally, add attribute manager_proxy.AttributeAdd() logging.debug("Attribute: %s added", attr) # Note: setting the value to 0 means that the manager # won't touch the parameter # on the device. So if we want to unset it, # we'll have to do it manually. if parameters_to_unset: logging.debug( "Unsetting parameters %r on %r", parameters_to_unset, attr ) attr_proxy = get_attribute_proxy(attr) attr_config = attr_proxy.get_config() for param in parameters_to_unset: setattr( attr_config.events.arch_event, param, "Not specified" ) attr_proxy.set_config(attr_config) except tango.DevFailed as e: logging.error( "Error when configuring attribute %r: %s." + " Try -v flag to debug the issue", attr, e.args[0].desc, ) # dump current Archiver settings (for debugging) logging.debug(get_archiving_settings(manager_proxy)) failed["add"].append((attr, e.args[0].desc)) # Removed for attr, _ in actions.get("removed", {}).items(): # manager_proxy.AttributeStop(attr) # TODO Not needed? logging.info("Removing %r", attr) if update: logging.info( "Not removing attribute %s because the" " --update flag is active.", attr, ) continue try: # This may fail e.g. if the attribute doesn't exist. manager_proxy.AttributeRemove(attr) except tango.DevFailed as e: logging.error( "Failed to remove attribute %s: %s ", attr, e.args[0].desc ) failed["remove"].append((attr, e.args[0].desc)) else: logging.debug(f"Attribute {attr} removed") # Changed for attr, (current, desired) in actions.get("changed", {}).items(): # Changing archiving settings must be done on the attribute itself logging.info("Changing %r", attr) try: attr_proxy = get_attribute_proxy(attr) attr_config = attr_proxy.get_config() except tango.DevFailed as e: logging.error( "Could not get config for attribute %r: %s", attr, e.args[0].desc, ) failed["change"].append((attr, e.args[0].desc)) continue attr_config_changed = False for param in ARCHIVING_PARAMS: if current.get(param) == desired.get(param): continue if param in current or param in desired: old_value = current.get(param) new_value = desired.get(param) or 0 logger.debug( "Attribute %s %s %r -> %r", attr, param, old_value, new_value, ) if param == "polling_period": try: attr_proxy.poll(new_value) except tango.DevFailed: logger.exception( "Failed to set polling on %r to %r", attr, new_value, ) elif param == "archive_abs_change": attr_config.events.arch_event.archive_abs_change = str( new_value ) attr_config_changed = True elif param == "archive_rel_change": attr_config.events.arch_event.archive_rel_change = str( new_value ) attr_config_changed = True elif param == "archive_period": attr_config.events.arch_event.archive_period = str( new_value ) attr_config_changed = True else: logger.warning( "Not changing parameter %r for attribute %r.", param, attr, ) if attr_config_changed: try: logger.debug("Writing config to %r", attr) attr_proxy.set_config(attr_config) except tango.DevFailed as e: logging.exception( "Failed to write config %r to attribute %r: %s", attr_config, attr, e.args[-1].desc, ) failed["change"].append((attr, e.args[0].desc)) return failed