Source code for yaml2archiving.v1.action
import logging
import sys
import time
from copy import copy
from typing import Dict, Tuple
import tango # type: ignore
from . import ARCHIVING_PARAMS, AttributeConfig, get_attribute_configs
if sys.version_info >= (3, 8):
from typing import TypedDict
else:
from typing_extensions import TypedDict
logger = logging.getLogger(__name__)
[docs]
def get_safe_params(
attr: str, old: AttributeConfig, new: AttributeConfig
) -> AttributeConfig:
"""Do some checks on the given params and return a 'safe' version"""
old_polling = old.get("polling_period")
new_polling = new.get("polling_period")
config = copy(new)
# Check that polling is valid
if old_polling and new_polling and new_polling > old_polling:
# We are careful not to slow down polling automatically, since it could
# be that it needs to be set to what it is for external reasons. We leave
# it to the user to manually tweak polling in that case.
logger.warning(
"Polling for %r would be slowed down from %d to %d;"
" not changing.",
attr,
old_polling,
new_polling,
)
config["polling_period"] = old_polling
if old_polling and not new_polling:
logger.warning("Not turning off polling for %r.", attr)
config["polling_period"] = old_polling
return config
Actions = TypedDict(
"Actions",
{
"added": Dict[str, Tuple[AttributeConfig, AttributeConfig]],
"removed": Dict[str, AttributeConfig],
"changed": Dict[str, Tuple[AttributeConfig, AttributeConfig]],
},
total=True,
)
[docs]
def get_actions(
prev_attrs: Dict[str, AttributeConfig],
next_attrs: Dict[str, AttributeConfig],
use_old_format: bool = False,
) -> Actions:
"""
Takes "previous" and "next" attribute configurations,
then produces a list of actions needed to go from the first situation
to the second. I.e. additions, removals and changes.
"""
if use_old_format:
removed = {
attr: prev_attrs[attr]
for attr in set(prev_attrs) - set(next_attrs)
}
else:
removed = {}
added = {}
new_attrs = set(next_attrs) - set(prev_attrs)
logger.debug("New attr %s", new_attrs)
logger.debug("Prev attr %s", prev_attrs)
logger.debug("Removed %s", removed)
db = tango.Database()
new_attr_configs = get_attribute_configs(db, list(new_attrs))
for attr, current in new_attr_configs.items():
added[attr] = (
current,
get_safe_params(attr, current, next_attrs[attr]),
)
maybe_changed = {
attr: (
prev_attrs[attr],
get_safe_params(attr, prev_attrs[attr], next_attrs[attr]),
)
for attr, params in next_attrs.items()
if attr not in added
}
# Remove empty changes
changed = {
attr: (a, b) for attr, (a, b) in maybe_changed.items() if a != b
}
return {
"added": added,
"removed": removed,
"changed": changed,
}
[docs]
def show_actions(actions: Actions, update: bool = False):
"""Return some actions formatted in a human readable way."""
output = []
added = 0
changed = 0
removed = 0
skipped_remove = 0 # Only may happen when update is True!
for attr, (current, desired) in actions.get("added", {}).items():
output.append(f"ADD {attr}")
added += 1
for param in ARCHIVING_PARAMS:
if param in current or param in desired:
if current.get(param) is None and desired.get(param) is None:
continue
output.append(
f"\t{param}: {current.get(param)} -> {desired.get(param)}"
)
for attr, (current, desired) in actions.get("changed", {}).items():
output.append(f"CHANGE {attr}")
changed += 1
for param in ARCHIVING_PARAMS:
if param in current or param in desired:
if current.get(param) == desired.get(param):
continue
output.append(
f"\t{param}: {current.get(param)} -> {desired.get(param)}"
)
for attr, _ in actions.get("removed", {}).items():
if update:
output.append(f"SKIP REMOVE {attr}\n\tCause: --update flag used")
skipped_remove += 1
else:
output.append(f"REMOVE {attr}")
removed += 1
output.append("Totals:")
output.append(f"- Added: {added}")
output.append(f"- Changed: {changed}")
if update:
output.append(f"- Removed: {removed} ({skipped_remove} skipped)")
else:
output.append(f"- Removed: {removed}")
return output
[docs]
def get_archiving_settings(manager: tango.DeviceProxy):
"""
Return the current "set attribute" values for the manager as a string.
These are the parameters filled in before running "AddAttribute".
Can be useful for debugging issues.
"""
attr_list = [
"SetArchiver",
"SetAttributeName",
"SetCodePushedEvent",
"SetPeriodEvent",
"SetPollingPeriod",
"SetAbsoluteEvent",
"SetRelativeEvent",
]
print_buff = []
print_buff.append("Dumping all settings for the current attribute")
for attr in attr_list:
print_buff.append(
f"\t\t\t\t\t * {attr} read: {manager.read_attribute(attr).value},"
f" write {manager.read_attribute(attr).w_value}"
)
return "\n".join(print_buff)
[docs]
def perform_actions(
manager: str,
archiver: str,
actions,
update=False,
delay=0,
get_device_proxy=tango.DeviceProxy,
get_attribute_proxy=tango.AttributeProxy,
use_old_format=False,
):
"""Take a bunch of "actions" and apply them to the control system."""
try:
manager_proxy = get_device_proxy(manager)
manager_proxy.ping()
except tango.DevFailed as e:
logger.fatal(
"Can't connect to the archiving configuration manager %r: %s",
manager,
e.args[-1].desc,
)
raise RuntimeError("Could not contact manager") from e
failed: dict[str, list[tuple[str, str]]] = {
"add": [],
"remove": [],
"change": [],
}
# Added
for attr, (current, desired) in actions.get("added", {}).items():
logging.info("Adding %r", attr)
# Relax the process a bit, to prevent overwhelming the system
time.sleep(delay)
# Adding an attribute to archiving is done with the manager device.
# The settings are written to various attributes on that device,
# and then a command is run to apply.
parameters_to_unset = set()
try:
# Archiver
if (
not use_old_format
and archiver not in manager_proxy.ArchiverList
):
manager_proxy.ArchiverAdd(archiver)
manager_proxy.SetArchiver = archiver
logging.info("SetArchiver: %r for %s", archiver, attr)
# Attribute Name
# TODO this fails if the device is not running?
manager_proxy.SetAttributeName = attr
# Relative event filter
value = desired.get("archive_rel_change", 0)
manager_proxy.SetRelativeEvent = value
logging.debug("SetRelativeEvent: %r for %s", value, attr)
if value == 0:
parameters_to_unset.add("archive_rel_change")
# Absolute event filter
value = desired.get("archive_abs_change", 0)
manager_proxy.SetAbsoluteEvent = value
logging.debug("SetAbsoluteEvent: %r for %s", value, attr)
if value == 0:
parameters_to_unset.add("archive_abs_change")
# Periodic event filter
value = desired.get("archive_period", 0)
manager_proxy.SetPeriodEvent = value
logging.debug("SetPeriodEvent: %r for %s", value, attr)
if value == 0:
parameters_to_unset.add("archive_period")
# Polling period
polling_period = int(desired.get("polling_period", 0))
current_polling_period = current.get("polling_period", None)
logging.debug(
"Current polling period for %r: %r",
attr,
current_polling_period,
)
if polling_period != 0:
manager_proxy.SetPollingPeriod = polling_period
logging.debug(
"SetPollingPeriod: %r for %s", polling_period, attr
)
# CodePushedEvent
# If we dont have any polling set, then we assume that code pushes.
code_push_event = polling_period == 0
manager_proxy.SetCodePushedEvent = code_push_event
logging.debug(
"SetCodePushedEvent: %r for %s", code_push_event, attr
)
# Strategy
value = desired.get(
"archive_strategy", "ALWAYS"
) # TODO needed to default here?
manager_proxy.SetStrategy = value
logging.debug("SetStrategy: %r for %s", value, attr)
# Finally, add attribute
manager_proxy.AttributeAdd()
logging.debug("Attribute: %s added", attr)
# Note: setting the value to 0 means that the manager
# won't touch the parameter
# on the device. So if we want to unset it,
# we'll have to do it manually.
if parameters_to_unset:
logging.debug(
"Unsetting parameters %r on %r", parameters_to_unset, attr
)
attr_proxy = get_attribute_proxy(attr)
attr_config = attr_proxy.get_config()
for param in parameters_to_unset:
setattr(
attr_config.events.arch_event, param, "Not specified"
)
attr_proxy.set_config(attr_config)
except tango.DevFailed as e:
logging.error(
"Error when configuring attribute %r: %s."
+ " Try -v flag to debug the issue",
attr,
e.args[0].desc,
)
# dump current Archiver settings (for debugging)
logging.debug(get_archiving_settings(manager_proxy))
failed["add"].append((attr, e.args[0].desc))
# Removed
for attr, _ in actions.get("removed", {}).items():
# manager_proxy.AttributeStop(attr) # TODO Not needed?
logging.info("Removing %r", attr)
if update:
logging.info(
"Not removing attribute %s because the"
" --update flag is active.",
attr,
)
continue
try:
# This may fail e.g. if the attribute doesn't exist.
manager_proxy.AttributeRemove(attr)
except tango.DevFailed as e:
logging.error(
"Failed to remove attribute %s: %s ", attr, e.args[0].desc
)
failed["remove"].append((attr, e.args[0].desc))
else:
logging.debug(f"Attribute {attr} removed")
# Changed
for attr, (current, desired) in actions.get("changed", {}).items():
# Changing archiving settings must be done on the attribute itself
logging.info("Changing %r", attr)
try:
attr_proxy = get_attribute_proxy(attr)
attr_config = attr_proxy.get_config()
except tango.DevFailed as e:
logging.error(
"Could not get config for attribute %r: %s",
attr,
e.args[0].desc,
)
failed["change"].append((attr, e.args[0].desc))
continue
attr_config_changed = False
for param in ARCHIVING_PARAMS:
if current.get(param) == desired.get(param):
continue
if param in current or param in desired:
old_value = current.get(param)
new_value = desired.get(param) or 0
logger.debug(
"Attribute %s %s %r -> %r",
attr,
param,
old_value,
new_value,
)
if param == "polling_period":
try:
attr_proxy.poll(new_value)
except tango.DevFailed:
logger.exception(
"Failed to set polling on %r to %r",
attr,
new_value,
)
elif param == "archive_abs_change":
attr_config.events.arch_event.archive_abs_change = str(
new_value
)
attr_config_changed = True
elif param == "archive_rel_change":
attr_config.events.arch_event.archive_rel_change = str(
new_value
)
attr_config_changed = True
elif param == "archive_period":
attr_config.events.arch_event.archive_period = str(
new_value
)
attr_config_changed = True
else:
logger.warning(
"Not changing parameter %r for attribute %r.",
param,
attr,
)
if attr_config_changed:
try:
logger.debug("Writing config to %r", attr)
attr_proxy.set_config(attr_config)
except tango.DevFailed as e:
logging.exception(
"Failed to write config %r to attribute %r: %s",
attr_config,
attr,
e.args[-1].desc,
)
failed["change"].append((attr, e.args[0].desc))
return failed