Source code for ska_mid_cbf_tdc_mcs.component.component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

# Copyright (c) 2019 National Research Council of Canada

from __future__ import annotations

from functools import partial
from queue import Empty, Full, Queue
from threading import Event, Lock
from typing import Callable, Optional, cast

import tango
from pydantic.v1 import utils
from ska_control_model import (
    AdminMode,
    CommunicationStatus,
    HealthState,
    PowerState,
    ResultCode,
    SimulationMode,
    TaskStatus,
)
from ska_tango_base.executor.executor_component_manager import (
    TaskExecutorComponentManager,
)
from ska_tango_testing import context
from tango.utils import PyTangoThreadPoolExecutor

__all__ = ["CbfComponentManager"]

DEFAULT_LRC_TIMEOUT = 15
DEFAULT_STATE_TIMEOUT = 5


[docs]class CbfComponentManager(TaskExecutorComponentManager): """ A base component manager for SKA Mid.CBF MCS This class exists to modify the interface of the :py:class:`ska_tango_base.executor.executor_component_manager.TaskExecutorComponentManager`. The ``TaskExecutorComponentManager`` accepts ``max_queue_size`` keyword argument to determine limits on worker queue length, for the management of SubmittedSlowCommand (LRC) threads. Additionally, this provides optional arguments for attribute change event and HealthState updates, for a device to pass in its callbacks for push change events. Finally, the ``TaskExecutorComponentManager`` inherits from BaseComponentManager, which accepts the keyword arguments communication_state_callback and component_state_callback, each with an analogous callback method in the SKABaseDevice (namely _communication_state_changed and _component_state_changed) used to drive the operational state (opState) model from the component manager. """ def __init__( self: CbfComponentManager, *args: any, lrc_timeout: int = DEFAULT_LRC_TIMEOUT, state_change_timeout: int = DEFAULT_STATE_TIMEOUT, attr_change_callback: Callable[[str, any], None] | None = None, attr_archive_callback: Callable[[str, any], None] | None = None, health_state_callback: Callable[[HealthState], None] | None = None, admin_mode_callback: Callable[[str], None] | None = None, simulation_mode: SimulationMode = SimulationMode.TRUE, **kwargs: any, ) -> None: """ Initialise a new CbfComponentManager instance. max_queue_size of the parent is set to match the MAX_QUEUED_COMMANDS of the base device class, as this constant is also used to limit the dimensions of the longRunningCommandsInQueue, longRunningCommandIDsInQueue, longRunningCommandStatus and longRunningCommandProgress attributes used to track LRCs, a current limitation of the SKABaseDevice class. :param lrc_timeout: timeout (in seconds) per LRC when waiting for blocking results; defaults to 15.0 seconds :param state_change_timeout: timeout (in seconds) when waiting for Devices state attribute change; defaults to 3.0 seconds :param attr_change_callback: callback to be called when an attribute change event needs to be pushed from the component manager :param attr_archive_callback: callback to be called when an attribute archive event needs to be pushed from the component manager :param health_state_callback: callback to be called when the HealthState of the component changes :param simulation_mode: simulation mode identifies if the real component or a simulator should be monitored and controlled; defaults to SimulationMode.TRUE """ # supply operating state machine trigger keywords super().__init__( *args, fault=None, power=None, **kwargs, ) self.device_attr_change_callback = attr_change_callback self.device_attr_archive_callback = attr_archive_callback self.device_admin_mode_callback = admin_mode_callback self._device_health_state_callback = health_state_callback self._health_state_lock = Lock() self._health_state = HealthState.UNKNOWN # Initialize received LRC results queue and set for blocking command IDs # See docstring under wait_for_blocking_result for an example scenario self._results_queue = None # instantiated in attr_event_subscribe self.blocking_command_ids = set() # dict to store latest sub-device state attribute values self._op_states = {} self._op_states_queue = None # instantiated in attr_event_subscribe self._attr_event_lock = Lock() self.event_ids = {} self._lrc_timeout = lrc_timeout self._state_change_timeout = state_change_timeout # NOTE: currently all devices are using constructor default # simulation_mode == SimulationMode.TRUE self.simulation_mode = simulation_mode # ------------- # Communication # ------------- def _start_communicating( self: CbfComponentManager, *args, admin_mode: AdminMode, **kwargs ) -> None: """ Thread for start_communicating operation. """ self.logger.info("Entering CbfComponentManager._start_communicating") self._update_communication_state( communication_state=CommunicationStatus.ESTABLISHED ) if admin_mode == AdminMode.ENGINEERING: self.device_admin_mode_callback("to_engineering") elif admin_mode == AdminMode.ONLINE: self.device_admin_mode_callback("to_online")
[docs] def start_communicating( self: CbfComponentManager, current_admin_mode: AdminMode, admin_mode: AdminMode, ) -> None: """ Establish communication with the component, then start monitoring. """ # skip this check if the admin mode is edited. if current_admin_mode == admin_mode: if self._communication_state == CommunicationStatus.ESTABLISHED: raise ValueError("Communications already online, returning...") self.logger.info( f"Submitting start_communicating task {admin_mode.name}" ) task_status, message = self.submit_task( self._start_communicating, kwargs={"admin_mode": admin_mode}, ) if task_status == TaskStatus.REJECTED: self.logger.error( f"start_communicating thread rejected; {message}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED )
def _stop_communicating( self: CbfComponentManager, *args, admin_mode: AdminMode, **kwargs ) -> None: """ Thread for stop_communicating operation. """ self.logger.info( f"Entering CbfComponentManager._stop_communicating {admin_mode.name}" ) if admin_mode == AdminMode.OFFLINE: self._update_component_state(power=PowerState.UNKNOWN) self._update_communication_state( communication_state=CommunicationStatus.DISABLED ) self.device_admin_mode_callback("to_offline") elif admin_mode == AdminMode.NOT_FITTED: self.device_admin_mode_callback("to_notfitted")
[docs] def stop_communicating( self: CbfComponentManager, current_admin_mode: AdminMode, admin_mode: AdminMode, ) -> None: """ Stop communication with the component """ # skip this check if the admin mode is edited. if current_admin_mode == admin_mode: if admin_mode == AdminMode.OFFLINE: if self._communication_state == CommunicationStatus.DISABLED: self.logger.warning("Communications already offline.") if admin_mode == AdminMode.NOT_FITTED: if ( self._communication_state == CommunicationStatus.ESTABLISHED ): raise ValueError( "Communications are online; must be set offline first to set component not fitted" ) self.logger.info( f"Submitting stop_communicating task: {admin_mode.name}" ) task_status, message = self.submit_task( self._stop_communicating, kwargs={"admin_mode": admin_mode}, ) if task_status == TaskStatus.REJECTED: self.logger.error( f"stop_communicating {admin_mode.name} thread rejected; {message}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED )
[docs] def task_abort_event_is_set( self: CbfComponentManager, command_name: str, task_callback: Callable, task_abort_event: Event, ) -> bool: """ Helper method for checking task abort event during command thread. :param command_name: name of command for result message :param task_callback: command tracker update_command_info callback :param task_abort_event: task executor abort event :return: True if abort event is set, otherwise False """ if task_abort_event.is_set(): task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, f"{command_name} command aborted by task executor abort event.", ), ) return True return False
# ------------- # Group Methods # -------------
[docs] def create_group_proxies( self: CbfComponentManager, group_proxies: dict ) -> bool: """ Create group proxies (list of DeviceProxy) from the list of FQDNs passed in. Store as class attributes. :param :return: True if the group proxies are successfully created, False otherwise. """ for group, fqdn in group_proxies.items(): try: setattr( self, group, [ context.DeviceProxy(device_name=device) for device in fqdn ], ) except tango.DevFailed as df: self.logger.error(f"Failure in connection to {fqdn}: {df}") return False return True
def _issue_command_thread( self: CbfComponentManager, proxy: tango.DeviceProxy, argin: any, command_name: str, ) -> any: """ Helper function to issue command to a DeviceProxy :param proxy: proxy target for command :param argin: optional command argument :param command_name: command to be issued :return: command result (if any) """ try: return ( proxy.command_inout(command_name, argin) if argin is not None else proxy.command_inout(command_name) ) except tango.DevFailed as df: return ( ResultCode.FAILED, f"Error issuing {command_name} command; {df}", )
[docs] def issue_group_command( self: CbfComponentManager, command_name: str, proxies: list[tango.DeviceProxy], max_workers: int, argin: any = None, ) -> list[any]: """ Helper function to perform tango.Group-like threaded command issuance. Returns list of command results in the same order as the input proxies list. If any command causes a tango.DevFailed exception, the result code for that device's return value will be ResultCode.FAILED. Important note: all proxies provided must be of the same device type. For fast commands, the return value will a list of ResultCode and message string tuples. For Long Running Commands, the return value will be a list of ResultCode and unique command ID tuples. :param command_name: name of command to be issued :param proxies: list of device proxies in group; determines ordering of return values :param argin: optional command argument, defaults to None :param max_workers: maximum number of PyTangoThreadPoolExecutor workers :return: list of proxy command returns """ results = [] with PyTangoThreadPoolExecutor(max_workers=max_workers) as executor: for r in executor.map( partial( self._issue_command_thread, argin=argin, command_name=command_name, ), proxies, ): results.append(r) return results
def _read_attribute_thread( self: CbfComponentManager, proxy: tango.DeviceProxy, attr_name: str, ) -> any: """ Helper function to read attribute from a DeviceProxy :param proxy: proxy target for read_attribute :param attr_name: name of attribute to be read :return: read attribute value """ # Indicate that this thread performs Tango operations with tango.EnsureOmniThread(): try: return proxy.read_attribute(attr_name) except tango.DevFailed as df: self.logger.error(f"Error reading attribute {attr_name}; {df}") return None def _read_group_attribute( self: CbfComponentManager, attr_name: str, proxies: list[tango.DeviceProxy], max_workers: int, ) -> list[any]: """ Helper function to perform tango.Group-like threaded read_attribute(). Returns list of attribute values in the same order as the input proxies list. If any command causes a tango.DevFailed exception, the result code for that device's return value will be None. Important note: all proxies provided must be of the same device type. :param attr_name: name of attribute to be read :param proxies: list of device proxies in group; determines ordering of return values :param max_workers: maximum number of PyTangoThreadPoolExecutor workers :return: list of proxy attribute values """ results = [] with PyTangoThreadPoolExecutor(max_workers=max_workers) as executor: for r in executor.map( partial(self._read_attribute_thread, attr_name=attr_name), proxies, ): results.append(r) return results def _write_attribute_thread( self: CbfComponentManager, proxy: tango.DeviceProxy, attr_name: str, value: any, ) -> bool: """ Helper function to write attribute from a DeviceProxy :param proxy: proxy target for read_attribute :param attr_name: name of attribute to be read :param value: attribute value to be written :return: read attribute value """ # Indicate that this thread performs Tango operations with tango.EnsureOmniThread(): try: proxy.write_attribute(attr_name, value) return True except tango.DevFailed as df: self.logger.error( f"Error writing {value} to attribute {attr_name}; {df}" ) return False def _write_group_attribute( self: CbfComponentManager, attr_name: str, value: any, proxies: list[tango.DeviceProxy], max_workers: int, ) -> bool: """ Helper function to perform tango.Group-like threaded write_attribute(). Returns a bool depending on each device's write_attribute success; True if all writes were successful, False otherwise. Important note: all proxies provided must be of the same device type. :param attr_name: name of attribute to be written :param value: attribute value to be written :param proxies: list of device proxies in group; determines ordering of return values :param max_workers: maximum number of PyTangoThreadPoolExecutor workers :return: list of proxy attribute values """ results = [] with PyTangoThreadPoolExecutor(max_workers=max_workers) as executor: for r in executor.map( partial( self._write_attribute_thread, attr_name=attr_name, value=value, ), proxies, ): results.append(r) return all(results) # ---------------- # Callback Methods # ---------------- def _push_health_state_update( self: CbfComponentManager, health_state: HealthState ) -> None: """ Push a health state update to the device. :param health_state: the new health state of the component manager. """ if self._device_health_state_callback is not None: self._device_health_state_callback(health_state)
[docs] def update_device_health_state( self: CbfComponentManager, health_state: HealthState, ) -> None: """ Handle a health state change. This is a helper method for use by subclasses. :param health_state: the new health state of the component manager. """ with self._health_state_lock: if self._health_state != health_state: self.logger.info(f"Updating healthState to {health_state}") self._health_state = health_state self._push_health_state_update(self._health_state)
[docs] def results_callback( self: CbfComponentManager, event_data: Optional[tango.EventData] ) -> None: """ Callback for LRC command result events. All subdevices that may block our LRC thread with their own LRC execution have their `longRunningCommandResult` attribute subscribed to with this method as the change event callback. :param event_data: Tango attribute change event data """ # NOTE (CIP-3559): event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._results_queue.put(event_data, block=False) except Full: # Start dropping data if queue is full self._results_queue.get_nowait() self._results_queue.task_done() self._results_queue.put(event_data)
[docs] def op_state_callback( self: CbfComponentManager, event_data: Optional[tango.EventData] ) -> None: """ Callback for state attribute events. :param event_data: Tango attribute change event data """ # NOTE (CIP-3559): event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._op_states_queue.put(event_data, block=False) except Full: # Start dropping data if queue is full self._op_states_queue.get_nowait() self._op_states_queue.task_done() self._op_states_queue.put(event_data)
# ------------------------- # Wait for Blocking Results # -------------------------
[docs] def wait_for_blocking_results( self: CbfComponentManager, task_abort_event: Optional[Event] = None, partial_success: bool = False, lrc_timeout: int = None, ) -> TaskStatus: """ Wait for the number of anticipated results to be pushed by subordinate devices. When issuing an LRC (or multiple) on subordinate devices from an LRC thread, command result events will be stored in self._results_queue; use this method to wait for all blocking command ID `longRunningCommandResult` events. All subdevices that may block our LRC thread with their own LRC execution have the `results_callback` method above provided as the change event callback for their `longRunningCommandResult` attribute subscription, which will store command IDs and results as change events are received. :param task_abort_event: Check for abort, defaults to None :param partial_success: set to True if we only need at least 1 of the blocking LRCs to be successful; defaults to False, in which case all LRCs must succeed :param lrc_timeout: timeout in seconds. Use device default value if not provided. :return: TaskStatus.COMPLETED if status reached, TaskStatus.FAILED if timed out TaskStatus.ABORTED if aborted """ successes = [] self.logger.debug( f"wait_for_blocking_results invoked; blocking command IDs: {self.blocking_command_ids}; results queue size: {self._results_queue.qsize()}" ) if lrc_timeout is None: lrc_timeout = self._lrc_timeout # Loop exits when no blocking command IDs remain, or upon timeout when # trying to get new results from the queue while len(self.blocking_command_ids): if task_abort_event and task_abort_event.is_set(): self.logger.warning( "Task aborted while waiting for blocking results." ) return TaskStatus.ABORTED try: event_data = self._results_queue.get( block=True, timeout=lrc_timeout ) self._results_queue.task_done() except Empty: self.logger.error( f"Event data queue is still empty after {lrc_timeout} seconds, blocking command IDs remaining: {self.blocking_command_ids}" ) successes.append(False) break # Check event error and quality factor first if event_data.err: self.logger.error(f"Event error: {event_data.errors}") continue if event_data.attr_value.quality is tango.AttrQuality.ATTR_INVALID: self.logger.error( f"Event quality factor invalid: {event_data}" ) continue # Value cannot be None if quality factor is not ATTR_INVALID # Now we can handle the actual event data value = tuple(event_data.attr_value.value) if "" in value: self.logger.debug( f"Skipping empty event; event data: {event_data}" ) continue # longRunningCommandResult attribute format: ("command_id", "[result_code, message]") try: command_id = value[0] result = value[1] result_code = int(result.split(",")[0].split("[")[1]) except IndexError as ie: self.logger.error(f"IndexError in parsing EventData; {ie}") continue dev_name = event_data.device.dev_name() self.logger.debug( f"{dev_name} valid LRC event received: {command_id}, {result}" ) # Remove any successfully parsed results from received events if command_id in self.blocking_command_ids: self.blocking_command_ids.remove(command_id) if result_code != ResultCode.OK: self.logger.error( f"{dev_name} blocking command failure; {command_id}: {result}" ) successes.append(False) continue self.logger.debug( f"{dev_name} blocking command completed OK: {command_id}, {result}" ) successes.append(True) if all(successes): self.logger.debug("All blocking commands succeeded.") return TaskStatus.COMPLETED if any(successes): if partial_success: self.logger.debug( "Partial success enabled, some blocking commands succeeded." ) return TaskStatus.COMPLETED self.logger.error( "Partial success disabled, some blocking commands failed." ) return TaskStatus.FAILED self.logger.error("All blocking commands failed.") return TaskStatus.FAILED
# ---------------- # Op State Changes # ----------------
[docs] def wait_for_blocking_states( self: CbfComponentManager, target_state: tango.DevState, task_abort_event: Optional[Event] = None, ) -> bool: """ Wait for the number of anticipated state changes to be pushed by subordinate devices. :param target_state: target state to wait for in all monitored op states :param task_abort_event: Check for abort, defaults to None :return: True if all state changes were successfully recorded, otherwise False """ blocking_devices = set(self._op_states.keys()) self.logger.debug( f"wait_for_blocking_states invoked; devices monitored: {blocking_devices}; target state: {target_state}" ) # Loop exits upon timeout when no further new states available in queue while True: if task_abort_event and task_abort_event.is_set(): self.logger.warning( "wait_for_blocking_states aborted while waiting for blocking state changes." ) return False try: event_data = self._op_states_queue.get( block=True, timeout=self._state_change_timeout ) self._op_states_queue.task_done() except Empty: break # Check event error and quality factor first if event_data.err: self.logger.error(f"Event error: {event_data.errors}") continue if event_data.attr_value.quality is tango.AttrQuality.ATTR_INVALID: self.logger.error( f"Event quality factor invalid: {event_data}" ) continue value = event_data.attr_value.value dev_name = event_data.device.dev_name() self.logger.info( f"Received new value for {dev_name}/state: {value}" ) self._op_states[dev_name] = value for dev_name, state in self._op_states.items(): if state == target_state and dev_name in blocking_devices: blocking_devices.remove(dev_name) if len(blocking_devices) > 0: self.logger.error( f"The following devices failed to transition to {target_state}: {blocking_devices}" ) return False self.logger.info( f"All monitored devices successfully transitioned to {target_state}." ) return True
# ----------------------- # Subscription Management # -----------------------
[docs] def attr_event_subscribe( self: CbfComponentManager, proxy: tango.DeviceProxy, attr_name: str, callback: Callable, stateless: bool = True, ) -> None: """ Subscribe to a given proxy's attribute. :param proxy: DeviceProxy :param attr_name: name of attribute for change event subscription :param callback: change event callback :param stateless: If False, an exception will be thrown if the event subscription encounters a problem; if True, the event subscription will always succeed, even if the corresponding device server is not running """ # Initialize queues if subscribing to first event # NOTE: Queue max sizes hardcoded to prevent memory leak # Should hopefully be plenty if len(self.event_ids) == 0: self._op_states_queue = Queue(maxsize=256) self._results_queue = Queue(maxsize=1024) dev_name = proxy.dev_name() if ( dev_name in self.event_ids and attr_name in self.event_ids[dev_name] ): self.logger.debug( f"Skipping repeated {attr_name} event subscription: {dev_name}" ) return self.logger.debug(f"Subscribing to {dev_name}/{attr_name}") # Set latest stored sub-device state values to None prior to subscription if attr_name == "state": self._op_states[dev_name] = None event_id = proxy.subscribe_event( attr_name=attr_name, event_type=tango.EventType.CHANGE_EVENT, cb_or_queuesize=callback, stateless=stateless, ) self.event_ids = utils.deep_update( self.event_ids, {dev_name: {attr_name: event_id}} ) self.logger.debug(f"Event IDs: {self.event_ids}")
[docs] def unsubscribe_all_events( self: CbfComponentManager, proxy: tango.DeviceProxy ) -> None: """ Unsubscribe from a proxy's longRunningCommandResult attribute. :param proxy: DeviceProxy """ dev_name = proxy.dev_name() self.logger.debug(f"Event IDs: {self.event_ids}") dev_events = self.event_ids.pop(dev_name, None) if dev_events is None: self.logger.debug( f"No longRunningCommandResult event subscription for {dev_name}" ) return for attr_name, event_id in dev_events.items(): self.logger.debug( f"Unsubscribing from {dev_name}/{attr_name} event ID {event_id}" ) proxy.unsubscribe_event(event_id) if attr_name == "state": self._op_states.pop(dev_name, None) # Clear queues if no longer subscribed if len(self.event_ids) == 0: self._op_states_queue = None self._results_queue = None
@property def is_communicating(self: CbfComponentManager) -> bool: """ Return whether communication with the component is established. SKA Mid.CBF MCS uses the more expressive :py:attr:`communication_status` for this, but this is still needed as a base classes hook. :return: True if communication with the component is established, else False. """ if self.communication_state == CommunicationStatus.ESTABLISHED: return True self.logger.warning( f"is_communicating() check failed; current communication_state: {self.communication_state}" ) return False @property def power_state(self: CbfComponentManager) -> Optional[PowerState]: """ Return the power state of this component manager. :return: the power state of this component manager, if known. """ return self._component_state["power"] @property def faulty(self: CbfComponentManager) -> Optional[bool]: """ Return whether this component manager is currently experiencing a fault. :return: True if this component manager is currently experiencing a fault, else False. """ return cast(bool, self._component_state["fault"])