Source code for ska_pst.lmc.beam.beam_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the BEAM PST component manager."""

from __future__ import annotations

import dataclasses
import json
import logging
import pathlib
import queue
import threading
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, cast

from overrides import override
from ska_control_model import (
    CommunicationStatus,
    HealthState,
    LoggingLevel,
    ObsMode,
    ObsState,
    PowerState,
    PstProcessingMode,
    ResultCode,
)
from ska_pst.common.constants import GIGABITS_PER_BYTE, MEGA_HERTZ, SCAN_CONFIG_FILE_NAME
from ska_pst.lmc.beam.beam_device_interface import PstBeamDeviceInterface
from ska_pst.lmc.component import PstComponentManager, PstSubcomponentManager, SubcomponentEventMessage
from ska_pst.lmc.dsp import PstDspComponentManager
from ska_pst.lmc.dsp.dsp_util import generate_dsp_scan_request
from ska_pst.lmc.job import LambdaTask, NoopTask, ParallelTask, SequentialTask, Task, TaskExecutor
from ska_pst.lmc.receive import PstReceiveComponentManager
from ska_pst.lmc.smrb import PstSmrbComponentManager
from ska_pst.lmc.stat import PstStatComponentManager
from ska_pst.lmc.util import BackgroundTaskProcessor, background_task
from ska_pst.lmc.util.callback import Callback, callback_safely
from ska_tango_base.executor import TaskStatus

from ska_pst.common import TelescopeConfig, convert_csp_config_to_pst_config, get_telescope_config, round_

TaskResponse = Tuple[TaskStatus, str]
RemoteTaskResponse = Tuple[List[TaskStatus], List[str]]

__all__ = [
    "PstBeamComponentManager",
]


def _assert_obs_state_task(
    subcomponents: List[PstSubcomponentManager],
    allowed_states: ObsState | Sequence[ObsState],
    cmd: str,
) -> Task:
    if isinstance(allowed_states, ObsState):
        allowed_states = [allowed_states]

    allowed_state_names = [a.name for a in allowed_states]

    def _assert_obs_state(sc: PstSubcomponentManager) -> None:
        obs_state = sc.remote_obs_state
        assert obs_state in allowed_states, (
            f"{sc.subcomponent_name} {obs_state.name} not in allowed states of "
            f"{allowed_state_names} for command '{cmd}'"
        )

    return ParallelTask(
        subtasks=[
            LambdaTask(action=lambda: _assert_obs_state(sc), name=f"assert_obs_state_{sc.subcomponent_name}")
            for sc in subcomponents
        ]
    )


def _subcomponent_task(
    subcomponents: List[PstSubcomponentManager],
    action_name: str,
    action: Callable[[PstSubcomponentManager], None],
) -> Task:
    if len(subcomponents) == 0:
        return NoopTask()
    elif len(subcomponents) == 1:
        sc = subcomponents[0]
        return LambdaTask(action=lambda: action(sc), name=f"{action_name}_{sc.subcomponent_name}")

    def _action(sc: PstSubcomponentManager) -> Callable[..., None]:
        def _inner() -> None:
            return action(sc)

        return _inner

    return ParallelTask(
        subtasks=[
            LambdaTask(action=_action(sc), name=f"{action_name}_{sc.subcomponent_name}")
            for sc in subcomponents
        ]
    )


class _RemoteJob:
    def __init__(
        self: _RemoteJob,
        job: Task,
        task_executor: TaskExecutor,
        completion_callback: Callback,
        exception_handler: Callable[[Exception], None] | None,
        logger: logging.Logger,
    ):
        self._job = job
        self._completion_callback = completion_callback
        self._logger = logger
        self._task_executor = task_executor
        self._exception_handler = exception_handler

    def __call__(
        self: _RemoteJob,
        *args: Any,
        task_callback: Optional[Callable] = None,
        task_abort_event: Optional[threading.Event] = None,
        **kwargs: Any,
    ) -> None:
        def _completion_callback(*arg: Any, **kwargs: Any) -> None:
            self._completion_callback(task_callback)  # type: ignore

        callback_safely(task_callback, status=TaskStatus.IN_PROGRESS)

        try:
            self._task_executor.submit_job(job=self._job, callback=_completion_callback)
        except Exception as e:
            self._logger.warning("Error in submitting long running commands to remote devices", exc_info=True)
            if self._exception_handler:
                self._exception_handler(e)

            if task_callback:
                task_callback(
                    status=TaskStatus.FAILED,
                    result=(ResultCode.FAILED, f"Unhandled exception during execution: {str(e)}"),
                    exception=e,
                )


[docs]class PstBeamComponentManager(PstComponentManager[PstBeamDeviceInterface]): """ Component manager for the BEAM component in PST.LMC. Since the BEAM component is a logical device, this component manager is used to orchestrate the process devices, such as BEAM, RECV. Commands that are executed on this component manager are sent to instances of :py:class:`PstDeviceProxy` for each device that the BEAM device manages. This component manager only takes the fully-qualified device name (FQDN) for the remote devices, but uses the :py:class:`DeviceProxyFactory` to retrieve instances of the device proxies that commands should be sent to. """ _dsp_subcomponent: PstDspComponentManager _recv_subcomponent: PstReceiveComponentManager _smrb_subcomponent: PstSmrbComponentManager _stat_subcomponent: PstStatComponentManager _event_queue: queue.Queue[SubcomponentEventMessage] def __init__( self: PstBeamComponentManager, *, device_interface: PstBeamDeviceInterface, logger: logging.Logger | None = None, pst_task_executor: TaskExecutor | None = None, background_task_processor: BackgroundTaskProcessor | None = None, event_queue: queue.Queue[SubcomponentEventMessage] | None = None, **kwargs: Any, ) -> None: """ Initialise the BEAM component manager. :param device_interface: an interface to the TANGO device that manages this component. This is used so in testing a normal unit test doesn't need to use the TANGO testing infrastructure. :type device_interface: DeviceInterface :param logger: a logger for this object to use, defaults to None :type logger: logging.Logger | None, optional :param pst_task_executor: an instance of the task executor to perform background tasks on. If this is not provided a default executor will be used. :type pst_task_executor: TaskExecutor | None, optional :param background_task_processor: a background task processor used by the component manager for processing subcomponent events that are on the ``event_queue``, defaults to None. If this is not provided a default background task processor will be created. :type background_task_processor: BackgroundTaskProcessor | None, optional :param event_queue: an instance of a queue that has :py:class:`SubcomponentEventMessage` messages sent from subcomponents. If this is not provided a default queue will be created. This parameter is used in unit testing to simulate sending of events from the subcomponent managers. :type event_queue: queue.Queue[SubcomponentEventMessage] | None, optional """ logger = logger or logging.getLogger(__name__) self._event_queue = event_queue or queue.Queue() self._background_task_processor = background_task_processor or BackgroundTaskProcessor( default_logger=logger ) super().__init__( device_interface=device_interface, power=PowerState.UNKNOWN, fault=None, logger=logger, obsfault=None, scanning=False, configured=False, **kwargs, ) self._dsp_subcomponent = PstDspComponentManager( device_name=self.device_name, process_api_endpoint=device_interface.dsp_process_api_endpoint, monitor_data_updated_callback=self._handle_monitoring_data_update, beam_id=device_interface.beam_id, logger=logger, event_queue=self._event_queue, ) self._recv_subcomponent = PstReceiveComponentManager( device_name=self.device_name, process_api_endpoint=device_interface.recv_process_api_endpoint, subband_resources_callback=self._update_channel_block_configuration, monitor_data_updated_callback=self._handle_monitoring_data_update, beam_id=device_interface.beam_id, logger=logger, event_queue=self._event_queue, ) self._smrb_subcomponent = PstSmrbComponentManager( device_name=self.device_name, process_api_endpoint=device_interface.smrb_process_api_endpoint, monitor_data_updated_callback=self._handle_monitoring_data_update, beam_id=device_interface.beam_id, logger=logger, event_queue=self._event_queue, ) self._stat_subcomponent = PstStatComponentManager( device_name=self.device_name, process_api_endpoint=device_interface.stat_process_api_endpoint, monitor_data_updated_callback=self._handle_monitoring_data_update, beam_id=device_interface.beam_id, logger=logger, event_queue=self._event_queue, ) self._subcomponents: List[PstSubcomponentManager] = [ self._smrb_subcomponent, self._recv_subcomponent, self._dsp_subcomponent, self._stat_subcomponent, ] self._pst_task_executor = pst_task_executor or TaskExecutor(logger=logger) self._curr_scan_config: dict | None = None self._pst_task_executor.start() self._monitoring_polling_rate_ms = device_interface.monitoring_polling_rate_ms self._health_check_interval = device_interface.health_check_interval self._expected_data_record_rate = 0.0 self._processing_mode: PstProcessingMode = PstProcessingMode.IDLE self._reset_monitoring_properties() # the key in this dictionary is the subcomponent name self._previous_event_msgs: Dict[str, SubcomponentEventMessage] = {} def __del__(self: PstBeamComponentManager) -> None: """Handle shutdown of component manager.""" self._process_events = False self._pst_task_executor.stop() def _reset_monitoring_properties(self: PstBeamComponentManager) -> None: """ Initialise all the monitored properties. This method will set all the properties to there default values. This calls the `reset_monitoring` method on the subcomponent, which in turn will fire a event that the data has been updated. """ for subcomponent in self._subcomponents: subcomponent.reset_monitoring() self.expected_data_record_rate = 0.0 self.channel_block_configuration: dict = {} @background_task def _handle_subcomponent_event_messages(self: PstBeamComponentManager) -> None: """Handle the background processing of state events from the subcomponents.""" # other threads will set this which will allow for exiting. self._process_events = True try: while self._process_events: try: # wait for at most 10ms. Allows for interruption event. current_event_msg = self._event_queue.get(timeout=0.01) attrs = dataclasses.asdict(current_event_msg) del attrs["subcomponent_name"] prev_event_msg = self._previous_event_msgs.get(current_event_msg.subcomponent_name) if prev_event_msg is not None: # remove values that haven't been updated attrs = {k: v for k, v in attrs.items() if getattr(prev_event_msg, k) != v} for k, v in attrs.items(): self._attribute_value_updated_callback( f"{current_event_msg.subcomponent_name}_{k}", v ) # ensure that we update overall health state if "health_state" in attrs: self.handle_health_state_change() self._previous_event_msgs[current_event_msg.subcomponent_name] = current_event_msg except queue.Empty: pass except Exception: self.logger.exception("Error in background processing of events.", exc_info=True) raise @property def telescope_config(self: PstBeamComponentManager) -> TelescopeConfig: """ Get the current telescope configuration. This property returns a ``TelescopeConfig`` based on which telescope the PST BEAM is configured for. This is a utility property to avoid having to duplicate calling ``get_telescope_config`` within this class. :return: the current telescope configuration. :rtype: TelescopeConfig """ return get_telescope_config(self._device_interface.facility.telescope) @property def processing_mode(self: PstBeamComponentManager) -> PstProcessingMode: """Get the current processing mode. If the PST BEAM has been configured for a scan then this will return the processing mode from the configuration JSON. If the BEAM is in an IDLE state then this will return IDLE. :return: the current processing mode if there has been a configuration, otherwise IDLE. :rtype: PstProcessingMode """ return self._processing_mode @processing_mode.setter def processing_mode(self: PstBeamComponentManager, processing_mode: PstProcessingMode) -> None: """ Set the current PST Processing Mode. :param processing_mode: the processing mode that PST is currently set to. :type processing_mode: PstProcessingMode """ if processing_mode != self._processing_mode: self._processing_mode = processing_mode self._device_interface.handle_attribute_value_update("pst_processing_mode", processing_mode.name) @property def is_idle(self: PstBeamComponentManager) -> bool: """ Get whether PST is in an idle state or not. This is based off the PstProcessingMode. If PST has been configured for a scan then the processing mode is not IDLE but if PST hasn't been configured then the processing mode is IDLE. :return: whether PST is in an idle state or not. :rtype: bool """ return self.processing_mode == PstProcessingMode.IDLE @property def channel_block_configuration(self: PstBeamComponentManager) -> dict: """Get current channel block configuration.""" return self._channel_block_configuration @channel_block_configuration.setter def channel_block_configuration(self: PstBeamComponentManager, config: dict) -> None: """Set channel black configuration.""" self._channel_block_configuration = config self._attribute_value_updated_callback("channel_block_configuration", json.dumps(config)) def _update_device_attribute(self: PstBeamComponentManager, attribute_name: str, value: Any) -> None: """Update the TANGO device attribute.""" if isinstance(value, dict): value = json.dumps(value) self._attribute_value_updated_callback(attribute_name, value) def _handle_monitoring_data_update(self: PstBeamComponentManager, monitoring_data: dict) -> None: """Update the TANGO device properties from monitoring data.""" for k, v in monitoring_data.items(): self._update_device_attribute(k, v) def _update_channel_block_configuration(self: PstBeamComponentManager, subband_resources: dict) -> None: """ Update the channel block configuration. This calculates the new channel block configuration and is only called after a successful `ConfigureScan` request. It uses the SMRB util to work determine the subband configuration and then maps that to what is need by the client of the BEAM.MGMT. .. code-block:: python { "num_channel_blocks": 2, "channel_blocks": [ { "destination_host": "10.10.0.1", "destination_port": 20000, "destination_mac": "01:23:45:ab:cd:ef", "start_pst_channel": 0, "num_pst_channels": 12, "start_pst_frequency": 49609375.0, }, { "destination_host": "10.10.0.1", "destination_port": 20001, "destination_mac": "01:23:45:ab:cd:ef", "start_pst_channel": 12, "num_pst_channels": 10, "start_pst_frequency": 49652778.0, }, ] } """ if subband_resources: self.channel_block_configuration = { "num_channel_blocks": subband_resources["common"]["nsubband"], "channel_blocks": [ { "destination_host": subband["data_host"], "destination_port": subband["data_port"], "destination_mac": subband["data_mac"], "start_pst_channel": subband["start_channel"], # start_centre_freq_mhz was calculated to 6 decimal places so it can cast to int "start_pst_frequency": round_(subband["start_centre_freq_mhz"] * MEGA_HERTZ), "num_pst_channels": subband["end_channel"] - subband["start_channel"], } for subband in subband_resources["subbands"].values() ], } else: self.channel_block_configuration = {} self._update_device_attribute("channel_block_configuration", self.channel_block_configuration) @property def expected_data_record_rate(self: PstBeamComponentManager) -> float: """Get the expected data rate for DSP output for current scan configuration.""" return self._expected_data_record_rate @expected_data_record_rate.setter def expected_data_record_rate(self: PstBeamComponentManager, expected_data_record_rate: float) -> None: """Set the expected data rate for DSP output for current scan configuration.""" self._expected_data_record_rate = expected_data_record_rate self._device_interface.handle_attribute_value_update( "expected_data_record_rate", expected_data_record_rate ) @override def _simulation_mode_changed(self: PstBeamComponentManager) -> None: """ Set simulation mode state. :param simulation_mode: the new simulation mode value. :type simulation_mode: :py:class:`SimulationMode` """ # ensure we set the subordinate devices into to the same simulation mode. self._smrb_subcomponent.simulation_mode = self.simulation_mode self._recv_subcomponent.simulation_mode = self.simulation_mode self._dsp_subcomponent.simulation_mode = self.simulation_mode self._stat_subcomponent.simulation_mode = self.simulation_mode @override def _handle_communication_state_change( self: PstBeamComponentManager, communication_state: CommunicationStatus ) -> None: self.logger.info(f"communication state changes to {communication_state}") if communication_state == CommunicationStatus.NOT_ESTABLISHED: for sc in self._subcomponents: sc.connect() self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) self._update_communication_state(CommunicationStatus.ESTABLISHED) self._update_component_state(fault=None, power=PowerState.OFF) self._handle_subcomponent_event_messages() # force an update. This should result in health state of OK self.handle_health_state_change() elif communication_state == CommunicationStatus.DISABLED: for sc in self._subcomponents: sc.disconnect() self._update_component_state(fault=None, power=PowerState.UNKNOWN) self._update_communication_state(CommunicationStatus.DISABLED) self._process_events = False # force an update. This should result in health state of UNKNOWN self.handle_health_state_change() def _submit_remote_job( self: PstBeamComponentManager, job: Task, task_callback: Callback, completion_callback: Callback, ) -> TaskResponse: remote_job = _RemoteJob( job, task_executor=self._pst_task_executor, completion_callback=completion_callback, exception_handler=self._handle_remote_job_exception, logger=self.logger, ) return self.submit_task( remote_job, task_callback=task_callback, ) def _handle_remote_job_exception(self: PstBeamComponentManager, _exception: Exception) -> None: """ Handle the remote job exception. This will ensure that the obstate gets put into a FAULT state. """ self._update_component_state(obsfault=True) self._device_interface.update_health_state(health_state=HealthState.FAILED)
[docs] def validate_configure_scan(self: PstBeamComponentManager, configuration: dict) -> None: """Validate the configure scan request.""" assert "eb_id" in configuration["common"], "Expected 'eb_id' to be set in common section of request." pst_configuration = convert_csp_config_to_pst_config( telescope_config=self.telescope_config, csp_configure_scan_request=configuration, ) task = _subcomponent_task( subcomponents=self._subcomponents, action_name="validate_configure_scan", action=lambda sc: sc.validate_configure_scan(configuration=pst_configuration), ) self._pst_task_executor.submit_job(job=task)
def _set_scan_configuration( self: PstBeamComponentManager, configuration: dict, config_id: str, pst_processing_mode: PstProcessingMode, **kwargs: Any, ) -> None: """Set properties based off scan configuration.""" self.config_id = config_id self._curr_scan_config = configuration self.processing_mode = pst_processing_mode @override def configure_scan( self: PstBeamComponentManager, task_callback: Callback = None, **kwargs: Any ) -> TaskResponse: """ Configure scan for the component. The kwargs of this method is scan configuration. The super class method has the same signature. :param task_callback: callback to be called when the status of the command changes :param kwargs: the scan configuration to use. """ self.logger.info("'ConfigureScan' command requested") configuration = kwargs pst_configuration = convert_csp_config_to_pst_config( telescope_config=self.telescope_config, csp_configure_scan_request=configuration, ) def _completion_callback(task_callback: Callable) -> None: self._set_scan_configuration(configuration=configuration, **pst_configuration) dsp_scan_request = generate_dsp_scan_request(**pst_configuration) self.expected_data_record_rate = dsp_scan_request["bytes_per_second"] * GIGABITS_PER_BYTE self._device_interface.update_obs_mode(ObsMode.PULSAR_TIMING) self._update_component_state(configured=True) task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info( f"'ConfigureScan' command completed successfully. ObsState={self.obs_state.name}" ) def _configure_beam(sc: PstSubcomponentManager) -> None: sc.configure_beam(configuration=pst_configuration) def _configure_scan(sc: PstSubcomponentManager) -> None: sc.configure_scan(configuration=pst_configuration) deconfigure_tasks: List[Task] = [] if not self.is_idle: deconfigure_tasks = self._deconfigure_tasks(cmd="configure_scan") return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'ConfigureScan' command running")), _assert_obs_state_task( subcomponents=self._subcomponents, allowed_states=[ObsState.EMPTY, ObsState.READY], cmd="configure_scan", ), *deconfigure_tasks, _subcomponent_task( subcomponents=[self._smrb_subcomponent], action_name="configure_beam", action=_configure_beam, ), _subcomponent_task( subcomponents=[ self._dsp_subcomponent, self._recv_subcomponent, self._stat_subcomponent, ], action_name="configure_beam", action=_configure_beam, ), _subcomponent_task( subcomponents=[ self._smrb_subcomponent, self._recv_subcomponent, self._stat_subcomponent, ], action_name="configure_scan", action=_configure_scan, ), _subcomponent_task( subcomponents=[self._dsp_subcomponent], action_name="configure_scan", action=_configure_scan, ), ] ), task_callback=task_callback, completion_callback=_completion_callback, ) def _reset_state(self: PstBeamComponentManager, reset_health: bool = True) -> None: """ Reset state of the BEAM. This method is used by the ``deconfigure_scan``, ``obsreset`` and ``reset`` methods to force the BEAM device back into the correct state. This method performs the following actions: * sets the current scan id to zero to signal that there is no current scan * sets the current scan configuration as empty * sets the configuration id as an empty string * sets the PST processing mode to being ``PstProcessingMode.IDLE`` * attempts to reset the component state nicely to not scanning and not configured * resets all the monitoring data, this is because the system is not long in a monitoring state * resets the health state if the ``reset_health`` parameter was set to True * sets the observation mode to being ``ObsMode.IDLE`` :param reset_health: whether to reset the health state of the device, defaults to True :type reset_health: bool, optional """ self.scan_id = 0 self._curr_scan_config = None self.config_id = "" self.processing_mode = PstProcessingMode.IDLE try: self._update_component_state(obsfault=False) except Exception: # ignore error pass try: self._update_component_state(scanning=False) except Exception: # ignore error pass try: self._update_component_state(configured=False) except Exception: # ignore error pass self._reset_monitoring_properties() if reset_health: self._device_interface.update_health_state(health_state=HealthState.OK) self._device_interface.update_obs_mode(ObsMode.IDLE) @override def deconfigure_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse: """ Deconfigure scan for this component. :param task_callback: callback to be called when the status of the command changes """ # note that the Tango command is GoToIdle self.logger.info("'GoToIdle' command requested") def _completion_callback(task_callback: Callable) -> None: self._reset_state(reset_health=False) task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'GoToIdle' command completed successfully. ObsState={self.obs_state.name}") return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'GoToIdle' command running")), *self._deconfigure_tasks(cmd="deconfigure_scan"), ] ), task_callback=task_callback, completion_callback=_completion_callback, ) def _deconfigure_tasks(self: PstBeamComponentManager, cmd: str) -> List[Task]: return [ # need to deconfigure scan of all processes, this can be done in parallel. _assert_obs_state_task( subcomponents=self._subcomponents, allowed_states=ObsState.READY, cmd=cmd, ), _subcomponent_task( subcomponents=self._subcomponents, action_name="deconfigure_scan", action=lambda sc: sc.deconfigure_scan(), ), # need to release the ring buffer clients before deconfiguring SMRB _subcomponent_task( subcomponents=[ self._dsp_subcomponent, self._recv_subcomponent, self._stat_subcomponent, ], action_name="deconfigure_beam", action=lambda sc: sc.deconfigure_beam(), ), _subcomponent_task( subcomponents=[self._smrb_subcomponent], action_name="deconfigure_beam", action=lambda sc: sc.deconfigure_beam(), ), ] @override def scan( self: PstBeamComponentManager, task_callback: Callback = None, **kwargs: Any, ) -> TaskResponse: """Start scanning. The kwargs of this method is scan request, including the scan_id. The super class method has the same signature. By using the kwargs allow for forward compatibility of accepting other parameters for the starting of the scan. :param task_callback: callback for background processing to update device status. :type task_callback: Callback :param kwargs: scan request as a dict :type kwargs: dict """ self.logger.info("'Scan' command requested") scan_id = int(kwargs["scan_id"]) def _completion_callback(task_callback: Callable) -> None: self._update_component_state(scanning=True) self.scan_id = scan_id task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'Scan' command completed successfully. ObsState={self.obs_state.name}") def _scan(sc: PstSubcomponentManager) -> None: sc.scan(**kwargs) return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'Scan' command running")), _assert_obs_state_task( subcomponents=self._subcomponents, allowed_states=ObsState.READY, cmd="scan", ), LambdaTask( action=lambda: self._write_scan_config_to_output_dir(scan_id), name="write_scan_config_to_output_dir", ), _subcomponent_task( subcomponents=[self._recv_subcomponent, self._smrb_subcomponent], action_name="scan", action=_scan, ), _subcomponent_task( subcomponents=[self._dsp_subcomponent], action_name="scan", action=_scan, ), _subcomponent_task( subcomponents=[self._stat_subcomponent], action_name="scan", action=_scan, ), ] ), task_callback=task_callback, completion_callback=_completion_callback, ) def _write_scan_config_to_output_dir(self: PstBeamComponentManager, scan_id: int) -> None: """Write the scan configuration out as JSON.""" self.logger.debug(f"Writing scan configuration for scan {scan_id}") # dump current scan configuration as JSON params = { "eb_id": self._curr_scan_config["common"]["eb_id"], # type: ignore "subsystem_id": self._device_interface.subsystem_id, "scan_id": str(scan_id), } output_dir_str = self._device_interface.scan_output_dir_pattern for k, v in params.items(): output_dir_str = output_dir_str.replace(f"<{k}>", v) try: output_dir = pathlib.Path(output_dir_str) output_dir.mkdir(parents=True, exist_ok=True) scan_configuration_path = output_dir / SCAN_CONFIG_FILE_NAME self.logger.info(f"Writing scan configuration for scan {scan_id} to {scan_configuration_path}") with open(scan_configuration_path, "w") as f: json.dump(self._curr_scan_config, f) except Exception: self.logger.exception("Error in writing output file.", exc_info=True) raise @override def end_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse: """Stop scanning. :param task_callback: callback to be called when the status of the command changes """ self.logger.info("'EndScan' command requested") def _completion_callback(task_callback: Callable) -> None: self._update_component_state(scanning=False) self.scan_id = 0 task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'EndScan' command completed successfully. ObsState={self.obs_state.name}") # need to stop_scan on RECV before DSP and STAT, then SMRB return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'EndScan' command running")), _assert_obs_state_task( subcomponents=self._subcomponents, allowed_states=ObsState.SCANNING, cmd="end_scan", ), _subcomponent_task( subcomponents=[self._recv_subcomponent], action_name="end_scan", action=lambda sc: sc.end_scan(), ), _subcomponent_task( subcomponents=[self._dsp_subcomponent, self._stat_subcomponent], action_name="end_scan", action=lambda sc: sc.end_scan(), ), _subcomponent_task( subcomponents=[self._smrb_subcomponent], action_name="end_scan", action=lambda sc: sc.end_scan(), ), ] ), task_callback=task_callback, completion_callback=_completion_callback, ) def _abort_task(self: PstBeamComponentManager, subcomponents: List[PstSubcomponentManager]) -> Task: subcomponents_to_abort = [ sc for sc in subcomponents if sc.remote_obs_state not in [ObsState.ABORTED, ObsState.FAULT, ObsState.EMPTY] ] return _subcomponent_task( subcomponents=subcomponents_to_abort, action_name="abort", action=lambda sc: sc.abort() ) @override def abort(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse: """Tell the component to abort whatever it was doing. :param task_callback: callback to be called when the status of the command changes """ self.logger.info("'Abort' command requested") def _completion_callback(task_callback: Callable) -> None: self._update_component_state(scanning=False) self.abort_commands() task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'Abort' command completed successfully. ObsState={self.obs_state.name}") self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'Abort' command running")), self._abort_task([self._recv_subcomponent]), self._abort_task([self._dsp_subcomponent, self._stat_subcomponent]), self._abort_task([self._smrb_subcomponent]), ] ), task_callback=task_callback, completion_callback=_completion_callback, ) return TaskStatus.IN_PROGRESS, "Aborting" def _obsreset_task(self: PstBeamComponentManager, subcomponents: List[PstSubcomponentManager]) -> Task: # need to use the remote state to ensure that the server's precondition check doesn't fail. def _perform_obsreset(sc: PstSubcomponentManager) -> None: if sc.remote_obs_state in [ObsState.ABORTED, ObsState.FAULT]: sc.obsreset() return _subcomponent_task( subcomponents=subcomponents, action_name="obsreset", action=_perform_obsreset ) @override def obsreset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse: """Reset the component and put it into an IDLE state. The subcomponents have all the resources released and as such are moved back to the EMPTY state. :param task_callback: callback to be called when the status of the command changes """ self.logger.info("'ObsReset' command requested") def _completion_callback(task_callback: Callable) -> None: self._reset_state() task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'ObsReset' command completed successfully. ObsState={self.obs_state.name}") abort_recv_subtask: Task = self._abort_task([self._recv_subcomponent]) abort_readers_subtask: Task = self._abort_task([self._dsp_subcomponent, self._stat_subcomponent]) abort_smrb_subtask: Task = self._abort_task([self._smrb_subcomponent]) obsreset_subtasks = self._obsreset_task( [ self._stat_subcomponent, self._dsp_subcomponent, self._recv_subcomponent, ] ) obsreset_smrb_subtask = self._obsreset_task([self._smrb_subcomponent]) return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'ObsReset' command running")), abort_recv_subtask, abort_readers_subtask, abort_smrb_subtask, obsreset_subtasks, obsreset_smrb_subtask, ], ), task_callback=task_callback, completion_callback=_completion_callback, ) @override def reset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse: """Reset the component and put it into an IDLE state. The subcomponents have all the resources released and as such are moved back to the IDLE state. :param task_callback: callback to be called when the status of the command changes """ self.logger.info("'Reset' command requested") # call reset on all subcomponents SMRB has to be done last but the others can be done in parallel subcomponents_to_reset: List[PstSubcomponentManager] = [ sc for sc in cast( List[PstSubcomponentManager], [ self._stat_subcomponent, self._dsp_subcomponent, self._recv_subcomponent, ], ) ] def _completion_callback(task_callback: Callable) -> None: self._reset_state() task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'Reset' command completed successfully. ObsState={self.obs_state.name}") reset_subtasks = _subcomponent_task( subcomponents=subcomponents_to_reset, action_name="reset", action=lambda sc: sc.reset() ) reset_smrb_subtask = _subcomponent_task( subcomponents=[self._smrb_subcomponent], action_name="reset", action=lambda sc: sc.reset() ) connect_subtask = _subcomponent_task( subcomponents=self._subcomponents, action_name="connect", action=lambda sc: sc.connect() ) return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'Reset' command running")), reset_subtasks, reset_smrb_subtask, connect_subtask, ], ), task_callback=task_callback, completion_callback=_completion_callback, ) @override def go_to_fault( self: PstBeamComponentManager, fault_msg: str, task_callback: Callback = None ) -> TaskResponse: """Put all the sub-devices into a FAULT state. :param task_callback: callback to be called when the status of the command changes """ self.logger.info("'GoToFault' command requested") def _completion_callback(task_callback: Callable) -> None: self._update_component_state(obsfault=True) self._device_interface.handle_fault(fault_msg=fault_msg) task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully")) self.logger.info(f"'GoToFault' command completed successfully. ObsState={self.obs_state.name}") return self._submit_remote_job( job=SequentialTask( subtasks=[ LambdaTask(action=lambda: self.logger.debug("'GoToFault' command running")), _subcomponent_task( subcomponents=[self._recv_subcomponent], action_name="go_to_fault", action=lambda sc: sc.go_to_fault(fault_msg=fault_msg), ), _subcomponent_task( subcomponents=[self._dsp_subcomponent, self._stat_subcomponent], action_name="go_to_fault", action=lambda sc: sc.go_to_fault(fault_msg=fault_msg), ), _subcomponent_task( subcomponents=[self._smrb_subcomponent], action_name="go_to_fault", action=lambda sc: sc.go_to_fault(fault_msg=fault_msg), ), ] ), task_callback=task_callback, completion_callback=_completion_callback, ) @override def set_logging_level(self: PstBeamComponentManager, log_level: LoggingLevel) -> None: """ Set LoggingLevel of all the sub-devices. :param log_level: The required TANGO LoggingLevel :returns: None. """ for subcomponent in self._subcomponents: subcomponent.set_logging_level(log_level=log_level) @property def monitoring_polling_rate_ms(self: PstBeamComponentManager) -> int: """Get the current monitoring polling rate, in milliseconds.""" return self._monitoring_polling_rate_ms @monitoring_polling_rate_ms.setter def monitoring_polling_rate_ms(self: PstBeamComponentManager, monitoring_polling_rate_ms: int) -> None: """Set the monitoring polling rate on the subordinate devices.""" for subcomponent in self._subcomponents: subcomponent.monitoring_polling_rate_ms = monitoring_polling_rate_ms self._monitoring_polling_rate_ms = monitoring_polling_rate_ms @property def health_check_interval(self: PstBeamComponentManager) -> int: """ Get the current health check interval, in milliseconds. :return: the current health check interval, in milliseconds. :rtype: int """ return self._health_check_interval @health_check_interval.setter def health_check_interval(self: PstBeamComponentManager, health_check_interval: int) -> None: """ Set the health check interval, in milliseconds. Setting this parameter will restart the health check process of the background applications. :param health_check_interval: the updated health check interval, in milliseconds. :type health_check_interval: int """ if self._health_check_interval != health_check_interval: self._health_check_interval = health_check_interval for scm in self._subcomponents: scm.health_check_interval = health_check_interval scm.restart_health_check() @property def recv_health_state(self: PstBeamComponentManager) -> HealthState: """ Get the current health state for the RECV subcomponent. :return: the current health state for the RECV subcomponent. :rtype: HealthState """ return self._recv_subcomponent.health_state @property def recv_obs_state(self: PstBeamComponentManager) -> ObsState: """ Get the current observing state for the RECV subcomponent. :return: the current observing state for the RECV subcomponent. :rtype: ObsState """ return self._recv_subcomponent.obs_state @property def smrb_health_state(self: PstBeamComponentManager) -> HealthState: """ Get the current health state for the SMRB subcomponent. :return: the current health state for the SMRB subcomponent. :rtype: HealthState """ return self._smrb_subcomponent.health_state @property def smrb_obs_state(self: PstBeamComponentManager) -> ObsState: """ Get the current observing state for the SMRB subcomponent. :return: the current observing state for the SMRB subcomponent. :rtype: ObsState """ return self._smrb_subcomponent.obs_state @property def stat_health_state(self: PstBeamComponentManager) -> HealthState: """ Get the current health state for the STAT subcomponent. :return: the current health state for the STAT subcomponent. :rtype: HealthState """ return self._stat_subcomponent.health_state @property def stat_obs_state(self: PstBeamComponentManager) -> ObsState: """ Get the current observing state for the STAT subcomponent. :return: the current observing state for the STAT subcomponent. :rtype: ObsState """ return self._stat_subcomponent.obs_state @property def dsp_health_state(self: PstBeamComponentManager) -> HealthState: """ Get the current health state for the DSP subcomponent. :return: the current health state for the DSP subcomponent. :rtype: HealthState """ return self._dsp_subcomponent.health_state @property def dsp_obs_state(self: PstBeamComponentManager) -> ObsState: """ Get the current observing state for the DSP subcomponent. :return: the current observing state for the DSP subcomponent. :rtype: ObsState """ return self._dsp_subcomponent.obs_state @property def _subcomponent_health_states(self: PstBeamComponentManager) -> Dict[str, HealthState]: return { "RECV": self.recv_health_state, "SMRB": self.smrb_health_state, "STAT": self.stat_health_state, "DSP": self.dsp_health_state, }
[docs] def handle_health_state_change(self: PstBeamComponentManager) -> None: """ Handle a change in one of the subcomponent's health state. This method will aggregate the overall health state of the PST BEAM based on the states of the subcomponents based on the following rules: * if all subcomponents have a state of HealthState.OK, then the beam's state is HealthState.OK * if one subcomponent has a state of HealthState.DEGRADED, then the beam's state is HealthState.DEGRADED * if multiple subcomponents have a state of HealthState.DEGRADED, then the beam's state is set to HealthState.FAILED * if any subcomponent has a state of HealthState.FAILED, then the beam's state is set to HealthState.FAILED * finally, if any subcomponent has a state of HealthState.UNKNOWN then overall state is still HealthState.UNKNOWN """ out_state = HealthState.OK for scm, scm_health_state in self._subcomponent_health_states.items(): if scm_health_state == HealthState.DEGRADED: self.logger.debug(f"{scm} health state is {scm_health_state}") if out_state == HealthState.DEGRADED: self.logger.warning( f"multiple subcomponents are in degraded state. Putting " f"{self.beam_id} in FAILED state", exc_info=False, ) out_state = HealthState.FAILED break else: out_state = HealthState.DEGRADED if scm_health_state == HealthState.FAILED: self.logger.warning( f"{scm} health state is in FAILED state. Putting {self.beam_id} into FAILED state", exc_info=False, ) out_state = HealthState.FAILED break if scm_health_state == HealthState.UNKNOWN: out_state = HealthState.UNKNOWN self._device_interface.update_health_state(health_state=out_state)
def __getattr__(self: PstBeamComponentManager, name: str) -> Any: """ Get attribute of component manager. This is a Python dunder method that is used to get attributes/properties that have not been found by the ``__getattribute__`` method. This allows delegating getting the attribute from sub-component managers without needing to have code specific to delegate getting the property. :param name: the name of the attribute :type name: str :return: the attribute value :rtype: Any :raises: AttributeError if attribute cannot be found on sub-component """ for sc in self._subcomponents: try: return getattr(sc, name) except AttributeError: pass self.logger.error(f"Could not find attribute with {name=}") raise AttributeError(name=name)