# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# Copyright (c) 2019 National Research Council of Canada
from __future__ import annotations
from functools import partial
from queue import Empty, Full, Queue
from threading import Event, Lock
from typing import Callable, Optional, cast
import tango
from pydantic.v1 import utils
from ska_control_model import (
AdminMode,
CommunicationStatus,
HealthState,
PowerState,
ResultCode,
SimulationMode,
TaskStatus,
)
from ska_tango_base.executor.executor_component_manager import (
TaskExecutorComponentManager,
)
from ska_tango_testing import context
from tango.utils import PyTangoThreadPoolExecutor
__all__ = ["CbfComponentManager"]
DEFAULT_LRC_TIMEOUT = 15
DEFAULT_STATE_TIMEOUT = 5
[docs]class CbfComponentManager(TaskExecutorComponentManager):
"""
A base component manager for SKA Mid.CBF MCS
This class exists to modify the interface of the
:py:class:`ska_tango_base.executor.executor_component_manager.TaskExecutorComponentManager`.
The ``TaskExecutorComponentManager`` accepts ``max_queue_size`` keyword argument
to determine limits on worker queue length, for the management of
SubmittedSlowCommand (LRC) threads.
Additionally, this provides optional arguments for attribute change event and
HealthState updates, for a device to pass in its callbacks for push change events.
Finally, the ``TaskExecutorComponentManager`` inherits from BaseComponentManager,
which accepts the keyword arguments communication_state_callback and
component_state_callback, each with an analogous callback method in the
SKABaseDevice (namely _communication_state_changed and _component_state_changed)
used to drive the operational state (opState) model from the component manager.
"""
def __init__(
self: CbfComponentManager,
*args: any,
lrc_timeout: int = DEFAULT_LRC_TIMEOUT,
state_change_timeout: int = DEFAULT_STATE_TIMEOUT,
attr_change_callback: Callable[[str, any], None] | None = None,
attr_archive_callback: Callable[[str, any], None] | None = None,
health_state_callback: Callable[[HealthState], None] | None = None,
admin_mode_callback: Callable[[str], None] | None = None,
simulation_mode: SimulationMode = SimulationMode.TRUE,
**kwargs: any,
) -> None:
"""
Initialise a new CbfComponentManager instance.
max_queue_size of the parent is set to match the MAX_QUEUED_COMMANDS
of the base device class, as this constant is also used to limit the
dimensions of the longRunningCommandsInQueue, longRunningCommandIDsInQueue,
longRunningCommandStatus and longRunningCommandProgress attributes used
to track LRCs, a current limitation of the SKABaseDevice class.
:param lrc_timeout: timeout (in seconds) per LRC when waiting for blocking results;
defaults to 15.0 seconds
:param state_change_timeout: timeout (in seconds) when waiting for Devices state attribute change;
defaults to 3.0 seconds
:param attr_change_callback: callback to be called when
an attribute change event needs to be pushed from the component manager
:param attr_archive_callback: callback to be called when
an attribute archive event needs to be pushed from the component manager
:param health_state_callback: callback to be called when the
HealthState of the component changes
:param simulation_mode: simulation mode identifies if the real component
or a simulator should be monitored and controlled; defaults to
SimulationMode.TRUE
"""
# supply operating state machine trigger keywords
super().__init__(
*args,
fault=None,
power=None,
**kwargs,
)
self.device_attr_change_callback = attr_change_callback
self.device_attr_archive_callback = attr_archive_callback
self.device_admin_mode_callback = admin_mode_callback
self._device_health_state_callback = health_state_callback
self._health_state_lock = Lock()
self._health_state = HealthState.UNKNOWN
# Initialize received LRC results queue and set for blocking command IDs
# See docstring under wait_for_blocking_result for an example scenario
self._results_queue = None # instantiated in attr_event_subscribe
self.blocking_command_ids = set()
# dict to store latest sub-device state attribute values
self._op_states = {}
self._op_states_queue = None # instantiated in attr_event_subscribe
self._attr_event_lock = Lock()
self.event_ids = {}
self._lrc_timeout = lrc_timeout
self._state_change_timeout = state_change_timeout
# NOTE: currently all devices are using constructor default
# simulation_mode == SimulationMode.TRUE
self.simulation_mode = simulation_mode
# -------------
# Communication
# -------------
def _start_communicating(
self: CbfComponentManager, *args, admin_mode: AdminMode, **kwargs
) -> None:
"""
Thread for start_communicating operation.
"""
self.logger.info("Entering CbfComponentManager._start_communicating")
self._update_communication_state(
communication_state=CommunicationStatus.ESTABLISHED
)
if admin_mode == AdminMode.ENGINEERING:
self.device_admin_mode_callback("to_engineering")
elif admin_mode == AdminMode.ONLINE:
self.device_admin_mode_callback("to_online")
[docs] def start_communicating(
self: CbfComponentManager,
current_admin_mode: AdminMode,
admin_mode: AdminMode,
) -> None:
"""
Establish communication with the component, then start monitoring.
"""
# skip this check if the admin mode is edited.
if current_admin_mode == admin_mode:
if self._communication_state == CommunicationStatus.ESTABLISHED:
raise ValueError("Communications already online, returning...")
self.logger.info(
f"Submitting start_communicating task {admin_mode.name}"
)
task_status, message = self.submit_task(
self._start_communicating,
kwargs={"admin_mode": admin_mode},
)
if task_status == TaskStatus.REJECTED:
self.logger.error(
f"start_communicating thread rejected; {message}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
def _stop_communicating(
self: CbfComponentManager, *args, admin_mode: AdminMode, **kwargs
) -> None:
"""
Thread for stop_communicating operation.
"""
self.logger.info(
f"Entering CbfComponentManager._stop_communicating {admin_mode.name}"
)
if admin_mode == AdminMode.OFFLINE:
self._update_component_state(power=PowerState.UNKNOWN)
self._update_communication_state(
communication_state=CommunicationStatus.DISABLED
)
self.device_admin_mode_callback("to_offline")
elif admin_mode == AdminMode.NOT_FITTED:
self.device_admin_mode_callback("to_notfitted")
[docs] def stop_communicating(
self: CbfComponentManager,
current_admin_mode: AdminMode,
admin_mode: AdminMode,
) -> None:
"""
Stop communication with the component
"""
# skip this check if the admin mode is edited.
if current_admin_mode == admin_mode:
if admin_mode == AdminMode.OFFLINE:
if self._communication_state == CommunicationStatus.DISABLED:
self.logger.warning("Communications already offline.")
if admin_mode == AdminMode.NOT_FITTED:
if (
self._communication_state
== CommunicationStatus.ESTABLISHED
):
raise ValueError(
"Communications are online; must be set offline first to set component not fitted"
)
self.logger.info(
f"Submitting stop_communicating task: {admin_mode.name}"
)
task_status, message = self.submit_task(
self._stop_communicating,
kwargs={"admin_mode": admin_mode},
)
if task_status == TaskStatus.REJECTED:
self.logger.error(
f"stop_communicating {admin_mode.name} thread rejected; {message}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
[docs] def task_abort_event_is_set(
self: CbfComponentManager,
command_name: str,
task_callback: Callable,
task_abort_event: Event,
) -> bool:
"""
Helper method for checking task abort event during command thread.
:param command_name: name of command for result message
:param task_callback: command tracker update_command_info callback
:param task_abort_event: task executor abort event
:return: True if abort event is set, otherwise False
"""
if task_abort_event.is_set():
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
f"{command_name} command aborted by task executor abort event.",
),
)
return True
return False
# -------------
# Group Methods
# -------------
[docs] def create_group_proxies(
self: CbfComponentManager, group_proxies: dict
) -> bool:
"""
Create group proxies (list of DeviceProxy) from the list of FQDNs passed in.
Store as class attributes.
:param
:return: True if the group proxies are successfully created, False otherwise.
"""
for group, fqdn in group_proxies.items():
try:
setattr(
self,
group,
[
context.DeviceProxy(device_name=device)
for device in fqdn
],
)
except tango.DevFailed as df:
self.logger.error(f"Failure in connection to {fqdn}: {df}")
return False
return True
def _issue_command_thread(
self: CbfComponentManager,
proxy: tango.DeviceProxy,
argin: any,
command_name: str,
) -> any:
"""
Helper function to issue command to a DeviceProxy
:param proxy: proxy target for command
:param argin: optional command argument
:param command_name: command to be issued
:return: command result (if any)
"""
try:
return (
proxy.command_inout(command_name, argin)
if argin is not None
else proxy.command_inout(command_name)
)
except tango.DevFailed as df:
return (
ResultCode.FAILED,
f"Error issuing {command_name} command; {df}",
)
[docs] def issue_group_command(
self: CbfComponentManager,
command_name: str,
proxies: list[tango.DeviceProxy],
max_workers: int,
argin: any = None,
) -> list[any]:
"""
Helper function to perform tango.Group-like threaded command issuance.
Returns list of command results in the same order as the input proxies list.
If any command causes a tango.DevFailed exception, the result code for
that device's return value will be ResultCode.FAILED.
Important note: all proxies provided must be of the same device type.
For fast commands, the return value will a list of ResultCode and message
string tuples.
For Long Running Commands, the return value will be a list of ResultCode
and unique command ID tuples.
:param command_name: name of command to be issued
:param proxies: list of device proxies in group; determines ordering of
return values
:param argin: optional command argument, defaults to None
:param max_workers: maximum number of PyTangoThreadPoolExecutor workers
:return: list of proxy command returns
"""
results = []
with PyTangoThreadPoolExecutor(max_workers=max_workers) as executor:
for r in executor.map(
partial(
self._issue_command_thread,
argin=argin,
command_name=command_name,
),
proxies,
):
results.append(r)
return results
def _read_attribute_thread(
self: CbfComponentManager,
proxy: tango.DeviceProxy,
attr_name: str,
) -> any:
"""
Helper function to read attribute from a DeviceProxy
:param proxy: proxy target for read_attribute
:param attr_name: name of attribute to be read
:return: read attribute value
"""
# Indicate that this thread performs Tango operations
with tango.EnsureOmniThread():
try:
return proxy.read_attribute(attr_name)
except tango.DevFailed as df:
self.logger.error(f"Error reading attribute {attr_name}; {df}")
return None
def _read_group_attribute(
self: CbfComponentManager,
attr_name: str,
proxies: list[tango.DeviceProxy],
max_workers: int,
) -> list[any]:
"""
Helper function to perform tango.Group-like threaded read_attribute().
Returns list of attribute values in the same order as the input proxies list.
If any command causes a tango.DevFailed exception, the result code for
that device's return value will be None.
Important note: all proxies provided must be of the same device type.
:param attr_name: name of attribute to be read
:param proxies: list of device proxies in group; determines ordering of
return values
:param max_workers: maximum number of PyTangoThreadPoolExecutor workers
:return: list of proxy attribute values
"""
results = []
with PyTangoThreadPoolExecutor(max_workers=max_workers) as executor:
for r in executor.map(
partial(self._read_attribute_thread, attr_name=attr_name),
proxies,
):
results.append(r)
return results
def _write_attribute_thread(
self: CbfComponentManager,
proxy: tango.DeviceProxy,
attr_name: str,
value: any,
) -> bool:
"""
Helper function to write attribute from a DeviceProxy
:param proxy: proxy target for read_attribute
:param attr_name: name of attribute to be read
:param value: attribute value to be written
:return: read attribute value
"""
# Indicate that this thread performs Tango operations
with tango.EnsureOmniThread():
try:
proxy.write_attribute(attr_name, value)
return True
except tango.DevFailed as df:
self.logger.error(
f"Error writing {value} to attribute {attr_name}; {df}"
)
return False
def _write_group_attribute(
self: CbfComponentManager,
attr_name: str,
value: any,
proxies: list[tango.DeviceProxy],
max_workers: int,
) -> bool:
"""
Helper function to perform tango.Group-like threaded write_attribute().
Returns a bool depending on each device's write_attribute success;
True if all writes were successful, False otherwise.
Important note: all proxies provided must be of the same device type.
:param attr_name: name of attribute to be written
:param value: attribute value to be written
:param proxies: list of device proxies in group; determines ordering of
return values
:param max_workers: maximum number of PyTangoThreadPoolExecutor workers
:return: list of proxy attribute values
"""
results = []
with PyTangoThreadPoolExecutor(max_workers=max_workers) as executor:
for r in executor.map(
partial(
self._write_attribute_thread,
attr_name=attr_name,
value=value,
),
proxies,
):
results.append(r)
return all(results)
# ----------------
# Callback Methods
# ----------------
def _push_health_state_update(
self: CbfComponentManager, health_state: HealthState
) -> None:
"""
Push a health state update to the device.
:param health_state: the new health state of the component manager.
"""
if self._device_health_state_callback is not None:
self._device_health_state_callback(health_state)
[docs] def update_device_health_state(
self: CbfComponentManager,
health_state: HealthState,
) -> None:
"""
Handle a health state change.
This is a helper method for use by subclasses.
:param health_state: the new health state of the component manager.
"""
with self._health_state_lock:
if self._health_state != health_state:
self.logger.info(f"Updating healthState to {health_state}")
self._health_state = health_state
self._push_health_state_update(self._health_state)
[docs] def results_callback(
self: CbfComponentManager, event_data: Optional[tango.EventData]
) -> None:
"""
Callback for LRC command result events.
All subdevices that may block our LRC thread with their own LRC execution
have their `longRunningCommandResult` attribute subscribed to with this method
as the change event callback.
:param event_data: Tango attribute change event data
"""
# NOTE (CIP-3559): event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._results_queue.put(event_data, block=False)
except Full:
# Start dropping data if queue is full
self._results_queue.get_nowait()
self._results_queue.task_done()
self._results_queue.put(event_data)
[docs] def op_state_callback(
self: CbfComponentManager, event_data: Optional[tango.EventData]
) -> None:
"""
Callback for state attribute events.
:param event_data: Tango attribute change event data
"""
# NOTE (CIP-3559): event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._op_states_queue.put(event_data, block=False)
except Full:
# Start dropping data if queue is full
self._op_states_queue.get_nowait()
self._op_states_queue.task_done()
self._op_states_queue.put(event_data)
# -------------------------
# Wait for Blocking Results
# -------------------------
[docs] def wait_for_blocking_results(
self: CbfComponentManager,
task_abort_event: Optional[Event] = None,
partial_success: bool = False,
lrc_timeout: int = None,
) -> TaskStatus:
"""
Wait for the number of anticipated results to be pushed by subordinate devices.
When issuing an LRC (or multiple) on subordinate devices from an LRC thread,
command result events will be stored in self._results_queue; use this
method to wait for all blocking command ID `longRunningCommandResult` events.
All subdevices that may block our LRC thread with their own LRC execution
have the `results_callback` method above provided as the change event callback
for their `longRunningCommandResult` attribute subscription, which will store
command IDs and results as change events are received.
:param task_abort_event: Check for abort, defaults to None
:param partial_success: set to True if we only need at least 1 of the blocking
LRCs to be successful; defaults to False, in which case all LRCs must succeed
:param lrc_timeout: timeout in seconds. Use device default value if not provided.
:return: TaskStatus.COMPLETED if status reached, TaskStatus.FAILED if timed out
TaskStatus.ABORTED if aborted
"""
successes = []
self.logger.debug(
f"wait_for_blocking_results invoked; blocking command IDs: {self.blocking_command_ids}; results queue size: {self._results_queue.qsize()}"
)
if lrc_timeout is None:
lrc_timeout = self._lrc_timeout
# Loop exits when no blocking command IDs remain, or upon timeout when
# trying to get new results from the queue
while len(self.blocking_command_ids):
if task_abort_event and task_abort_event.is_set():
self.logger.warning(
"Task aborted while waiting for blocking results."
)
return TaskStatus.ABORTED
try:
event_data = self._results_queue.get(
block=True, timeout=lrc_timeout
)
self._results_queue.task_done()
except Empty:
self.logger.error(
f"Event data queue is still empty after {lrc_timeout} seconds, blocking command IDs remaining: {self.blocking_command_ids}"
)
successes.append(False)
break
# Check event error and quality factor first
if event_data.err:
self.logger.error(f"Event error: {event_data.errors}")
continue
if event_data.attr_value.quality is tango.AttrQuality.ATTR_INVALID:
self.logger.error(
f"Event quality factor invalid: {event_data}"
)
continue
# Value cannot be None if quality factor is not ATTR_INVALID
# Now we can handle the actual event data
value = tuple(event_data.attr_value.value)
if "" in value:
self.logger.debug(
f"Skipping empty event; event data: {event_data}"
)
continue
# longRunningCommandResult attribute format: ("command_id", "[result_code, message]")
try:
command_id = value[0]
result = value[1]
result_code = int(result.split(",")[0].split("[")[1])
except IndexError as ie:
self.logger.error(f"IndexError in parsing EventData; {ie}")
continue
dev_name = event_data.device.dev_name()
self.logger.debug(
f"{dev_name} valid LRC event received: {command_id}, {result}"
)
# Remove any successfully parsed results from received events
if command_id in self.blocking_command_ids:
self.blocking_command_ids.remove(command_id)
if result_code != ResultCode.OK:
self.logger.error(
f"{dev_name} blocking command failure; {command_id}: {result}"
)
successes.append(False)
continue
self.logger.debug(
f"{dev_name} blocking command completed OK: {command_id}, {result}"
)
successes.append(True)
if all(successes):
self.logger.debug("All blocking commands succeeded.")
return TaskStatus.COMPLETED
if any(successes):
if partial_success:
self.logger.debug(
"Partial success enabled, some blocking commands succeeded."
)
return TaskStatus.COMPLETED
self.logger.error(
"Partial success disabled, some blocking commands failed."
)
return TaskStatus.FAILED
self.logger.error("All blocking commands failed.")
return TaskStatus.FAILED
# ----------------
# Op State Changes
# ----------------
[docs] def wait_for_blocking_states(
self: CbfComponentManager,
target_state: tango.DevState,
task_abort_event: Optional[Event] = None,
) -> bool:
"""
Wait for the number of anticipated state changes to be pushed by subordinate devices.
:param target_state: target state to wait for in all monitored op states
:param task_abort_event: Check for abort, defaults to None
:return: True if all state changes were successfully recorded, otherwise False
"""
blocking_devices = set(self._op_states.keys())
self.logger.debug(
f"wait_for_blocking_states invoked; devices monitored: {blocking_devices}; target state: {target_state}"
)
# Loop exits upon timeout when no further new states available in queue
while True:
if task_abort_event and task_abort_event.is_set():
self.logger.warning(
"wait_for_blocking_states aborted while waiting for blocking state changes."
)
return False
try:
event_data = self._op_states_queue.get(
block=True, timeout=self._state_change_timeout
)
self._op_states_queue.task_done()
except Empty:
break
# Check event error and quality factor first
if event_data.err:
self.logger.error(f"Event error: {event_data.errors}")
continue
if event_data.attr_value.quality is tango.AttrQuality.ATTR_INVALID:
self.logger.error(
f"Event quality factor invalid: {event_data}"
)
continue
value = event_data.attr_value.value
dev_name = event_data.device.dev_name()
self.logger.info(
f"Received new value for {dev_name}/state: {value}"
)
self._op_states[dev_name] = value
for dev_name, state in self._op_states.items():
if state == target_state and dev_name in blocking_devices:
blocking_devices.remove(dev_name)
if len(blocking_devices) > 0:
self.logger.error(
f"The following devices failed to transition to {target_state}: {blocking_devices}"
)
return False
self.logger.info(
f"All monitored devices successfully transitioned to {target_state}."
)
return True
# -----------------------
# Subscription Management
# -----------------------
[docs] def attr_event_subscribe(
self: CbfComponentManager,
proxy: tango.DeviceProxy,
attr_name: str,
callback: Callable,
stateless: bool = True,
) -> None:
"""
Subscribe to a given proxy's attribute.
:param proxy: DeviceProxy
:param attr_name: name of attribute for change event subscription
:param callback: change event callback
:param stateless: If False, an exception will be thrown if the event subscription
encounters a problem; if True, the event subscription will always succeed,
even if the corresponding device server is not running
"""
# Initialize queues if subscribing to first event
# NOTE: Queue max sizes hardcoded to prevent memory leak
# Should hopefully be plenty
if len(self.event_ids) == 0:
self._op_states_queue = Queue(maxsize=256)
self._results_queue = Queue(maxsize=1024)
dev_name = proxy.dev_name()
if (
dev_name in self.event_ids
and attr_name in self.event_ids[dev_name]
):
self.logger.debug(
f"Skipping repeated {attr_name} event subscription: {dev_name}"
)
return
self.logger.debug(f"Subscribing to {dev_name}/{attr_name}")
# Set latest stored sub-device state values to None prior to subscription
if attr_name == "state":
self._op_states[dev_name] = None
event_id = proxy.subscribe_event(
attr_name=attr_name,
event_type=tango.EventType.CHANGE_EVENT,
cb_or_queuesize=callback,
stateless=stateless,
)
self.event_ids = utils.deep_update(
self.event_ids, {dev_name: {attr_name: event_id}}
)
self.logger.debug(f"Event IDs: {self.event_ids}")
[docs] def unsubscribe_all_events(
self: CbfComponentManager, proxy: tango.DeviceProxy
) -> None:
"""
Unsubscribe from a proxy's longRunningCommandResult attribute.
:param proxy: DeviceProxy
"""
dev_name = proxy.dev_name()
self.logger.debug(f"Event IDs: {self.event_ids}")
dev_events = self.event_ids.pop(dev_name, None)
if dev_events is None:
self.logger.debug(
f"No longRunningCommandResult event subscription for {dev_name}"
)
return
for attr_name, event_id in dev_events.items():
self.logger.debug(
f"Unsubscribing from {dev_name}/{attr_name} event ID {event_id}"
)
proxy.unsubscribe_event(event_id)
if attr_name == "state":
self._op_states.pop(dev_name, None)
# Clear queues if no longer subscribed
if len(self.event_ids) == 0:
self._op_states_queue = None
self._results_queue = None
@property
def is_communicating(self: CbfComponentManager) -> bool:
"""
Return whether communication with the component is established.
SKA Mid.CBF MCS uses the more expressive :py:attr:`communication_status`
for this, but this is still needed as a base classes hook.
:return: True if communication with the component is established, else False.
"""
if self.communication_state == CommunicationStatus.ESTABLISHED:
return True
self.logger.warning(
f"is_communicating() check failed; current communication_state: {self.communication_state}"
)
return False
@property
def power_state(self: CbfComponentManager) -> Optional[PowerState]:
"""
Return the power state of this component manager.
:return: the power state of this component manager, if known.
"""
return self._component_state["power"]
@property
def faulty(self: CbfComponentManager) -> Optional[bool]:
"""
Return whether this component manager is currently experiencing a fault.
:return: True if this component manager is currently experiencing a fault, else False.
"""
return cast(bool, self._component_state["fault"])