# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the BEAM PST component manager."""
from __future__ import annotations
import dataclasses
import json
import logging
import pathlib
import queue
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
from overrides import override
from ska_control_model import (
CommunicationStatus,
HealthState,
LoggingLevel,
ObsMode,
ObsState,
PowerState,
PstProcessingMode,
)
from ska_pst.common.constants import GIGABITS_PER_BYTE, MEGA_HERTZ
from ska_pst.lmc.beam.beam_device_interface import PstBeamDeviceInterface
from ska_pst.lmc.component import PstComponentManager, PstSubcomponentManager, SubcomponentEventMessage
from ska_pst.lmc.dsp import PstDspComponentManager
from ska_pst.lmc.dsp.dsp_util import generate_dsp_scan_request
from ska_pst.lmc.job import LambdaTask, NoopTask, ParallelTask, SequentialTask, Task, TaskExecutor
from ska_pst.lmc.receive import PstReceiveComponentManager
from ska_pst.lmc.smrb import PstSmrbComponentManager
from ska_pst.lmc.stat import PstStatComponentManager
from ska_pst.lmc.util import BackgroundTaskProcessor, background_task
from ska_pst.lmc.util.callback import Callback, callback_safely
from ska_tango_base.executor import TaskStatus
from ska_pst.common import TelescopeConfig, convert_csp_config_to_pst_config, get_telescope_config, round_
TaskResponse = Tuple[TaskStatus, str]
RemoteTaskResponse = Tuple[List[TaskStatus], List[str]]
__all__ = [
"PstBeamComponentManager",
]
def _subcomponent_task(
subcomponents: List[PstSubcomponentManager],
action_name: str,
action: Callable[[PstSubcomponentManager], None],
) -> Task:
if len(subcomponents) == 0:
return NoopTask()
elif len(subcomponents) == 1:
sc = subcomponents[0]
return LambdaTask(action=lambda: action(sc), name=f"{action_name}_{sc.subcomponent_name}")
def _action(sc: PstSubcomponentManager) -> Callable[..., None]:
def _inner() -> None:
return action(sc)
return _inner
return ParallelTask(
subtasks=[
LambdaTask(action=_action(sc), name=f"{action_name}_{sc.subcomponent_name}")
for sc in subcomponents
]
)
class _RemoteJob:
def __init__(
self: _RemoteJob,
job: Task,
task_executor: TaskExecutor,
completion_callback: Callback,
logger: logging.Logger,
):
self._job = job
self._completion_callback = completion_callback
self._logger = logger
self._task_executor = task_executor
def __call__(
self: _RemoteJob,
*args: Any,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
**kwargs: Any,
) -> None:
def _completion_callback(*arg: Any, **kwargs: Any) -> None:
self._completion_callback(task_callback) # type: ignore
callback_safely(task_callback, status=TaskStatus.IN_PROGRESS)
try:
self._task_executor.submit_job(job=self._job, callback=_completion_callback)
except Exception as e:
self._logger.warning("Error in submitting long running commands to remote devices", exc_info=True)
if task_callback:
task_callback(status=TaskStatus.FAILED, result=str(e), exception=e)
[docs]class PstBeamComponentManager(PstComponentManager[PstBeamDeviceInterface]):
"""
Component manager for the BEAM component in PST.LMC.
Since the BEAM component is a logical device, this component
manager is used to orchestrate the process devices, such as
BEAM, RECV.
Commands that are executed on this component manager are
sent to instances of :py:class:`PstDeviceProxy` for each
device that the BEAM device manages.
This component manager only takes the fully-qualified device
name (FQDN) for the remote devices, but uses the
:py:class:`DeviceProxyFactory` to retrieve instances of the
device proxies that commands should be sent to.
"""
_dsp_subcomponent: PstDspComponentManager
_recv_subcomponent: PstReceiveComponentManager
_smrb_subcomponent: PstSmrbComponentManager
_stat_subcomponent: PstStatComponentManager
_event_queue: queue.Queue[SubcomponentEventMessage]
def __init__(
self: PstBeamComponentManager,
*,
device_interface: PstBeamDeviceInterface,
logger: logging.Logger | None = None,
pst_task_executor: TaskExecutor | None = None,
background_task_processor: BackgroundTaskProcessor | None = None,
event_queue: queue.Queue[SubcomponentEventMessage] | None = None,
**kwargs: Any,
) -> None:
"""
Initialise the BEAM component manager.
:param device_interface: an interface to the TANGO device that manages this component.
This is used so in testing a normal unit test doesn't need to use the TANGO testing
infrastructure.
:type device_interface: DeviceInterface
:param logger: a logger for this object to use, defaults to None
:type logger: logging.Logger | None, optional
:param pst_task_executor: an instance of the task executor to perform background
tasks on. If this is not provided a default executor will be used.
:type pst_task_executor: TaskExecutor | None, optional
:param background_task_processor: a background task processor used by the component
manager for processing subcomponent events that are on the ``event_queue``, defaults
to None. If this is not provided a default background task processor will
be created.
:type background_task_processor: BackgroundTaskProcessor | None, optional
:param event_queue: an instance of a queue that has :py:class:`SubcomponentEventMessage`
messages sent from subcomponents. If this is not provided a default queue will
be created. This parameter is used in unit testing to simulate sending of events from
the subcomponent managers.
:type event_queue: queue.Queue[SubcomponentEventMessage] | None, optional
"""
logger = logger or logging.getLogger(__name__)
self._event_queue = event_queue or queue.Queue()
self._background_task_processor = background_task_processor or BackgroundTaskProcessor(
default_logger=logger
)
super().__init__(
device_interface=device_interface,
power=PowerState.UNKNOWN,
fault=None,
logger=logger,
**kwargs,
)
self._dsp_subcomponent = PstDspComponentManager(
device_name=self.device_name,
dsp_disk_process_api_endpoint=device_interface.dsp_disk_process_api_endpoint,
dsp_flow_through_process_api_endpoint=device_interface.dsp_flow_through_process_api_endpoint,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._recv_subcomponent = PstReceiveComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.recv_process_api_endpoint,
subband_resources_callback=self._update_channel_block_configuration,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._smrb_subcomponent = PstSmrbComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.smrb_process_api_endpoint,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._stat_subcomponent = PstStatComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.stat_process_api_endpoint,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._subcomponents: List[PstSubcomponentManager] = [
self._smrb_subcomponent,
self._recv_subcomponent,
self._dsp_subcomponent,
self._stat_subcomponent,
]
self._pst_task_executor = pst_task_executor or TaskExecutor(logger=logger)
self._curr_scan_config: dict | None = None
self._pst_task_executor.start()
self._monitoring_polling_rate_ms = device_interface.monitoring_polling_rate_ms
self._health_check_interval = device_interface.health_check_interval
self._expected_data_record_rate = 0.0
self._processing_mode: PstProcessingMode = PstProcessingMode.IDLE
self._reset_monitoring_properties()
# the key in this dictionary is the subcomponent name
self._previous_event_msgs: Dict[str, SubcomponentEventMessage] = {}
def __del__(self: PstBeamComponentManager) -> None:
"""Handle shutdown of component manager."""
self._process_events = False
self._pst_task_executor.stop()
def _reset_monitoring_properties(self: PstBeamComponentManager) -> None:
"""
Initialise all the monitored properties.
This method will set all the properties to there default values. This
calls the `reset_monitoring` method on the subcomponent, which in
turn will fire a event that the data has been updated.
"""
for subcomponent in self._subcomponents:
subcomponent.reset_monitoring()
self.expected_data_record_rate = 0.0
self.channel_block_configuration: dict = {}
@background_task
def _handle_subcomponent_event_messages(self: PstBeamComponentManager) -> None:
"""Handle the background processing of state events from the subcomponents."""
# other threads will set this which will allow for exiting.
self._process_events = True
try:
while self._process_events:
try:
# wait for at most 10ms. Allows for interruption event.
current_event_msg = self._event_queue.get(timeout=0.01)
attrs = dataclasses.asdict(current_event_msg)
del attrs["subcomponent_name"]
prev_event_msg = self._previous_event_msgs.get(current_event_msg.subcomponent_name)
if prev_event_msg is not None:
# remove values that haven't been updated
attrs = {k: v for k, v in attrs.items() if getattr(prev_event_msg, k) != v}
for k, v in attrs.items():
self._attribute_value_updated_callback(
f"{current_event_msg.subcomponent_name}_{k}", v
)
# ensure that we update overall health state
if "health_state" in attrs:
self.handle_health_state_change()
self._previous_event_msgs[current_event_msg.subcomponent_name] = current_event_msg
except queue.Empty:
pass
except Exception:
self.logger.exception("Error in background processing of events.", exc_info=True)
raise
@property
def telescope_config(self: PstBeamComponentManager) -> TelescopeConfig:
"""
Get the current telescope configuration.
This property returns a ``TelescopeConfig`` based on which telescope the
PST BEAM is configured for.
This is a utility property to avoid having to duplicate calling
``get_telescope_config`` within this class.
:return: the current telescope configuration.
:rtype: TelescopeConfig
"""
return get_telescope_config(self._device_interface.facility.telescope)
@property
def processing_mode(self: PstBeamComponentManager) -> PstProcessingMode:
"""Get the current processing mode.
If the PST BEAM has been configured for a scan then this will return
the processing mode from the configuration JSON. If the BEAM is in an
IDLE state then this will return IDLE.
:return: the current processing mode if there has been a configuration, otherwise IDLE.
:rtype: PstProcessingMode
"""
return self._processing_mode
@processing_mode.setter
def processing_mode(self: PstBeamComponentManager, processing_mode: PstProcessingMode) -> None:
"""
Set the current PST Processing Mode.
:param processing_mode: the processing mode that PST is currently set to.
:type processing_mode: PstProcessingMode
"""
if processing_mode != self._processing_mode:
self._processing_mode = processing_mode
self._device_interface.handle_attribute_value_update("pst_processing_mode", processing_mode.name)
@property
def is_idle(self: PstBeamComponentManager) -> bool:
"""
Get whether PST is in an idle state or not.
This is based off the PstProcessingMode. If PST has been configured for a scan then the
processing mode is not IDLE but if PST hasn't been configured then the processing mode is IDLE.
:return: whether PST is in an idle state or not.
:rtype: bool
"""
return self.processing_mode == PstProcessingMode.IDLE
@property
def channel_block_configuration(self: PstBeamComponentManager) -> dict:
"""Get current channel block configuration."""
return self._channel_block_configuration
@channel_block_configuration.setter
def channel_block_configuration(self: PstBeamComponentManager, config: dict) -> None:
"""Set channel black configuration."""
self._channel_block_configuration = config
self._attribute_value_updated_callback("channel_block_configuration", json.dumps(config))
def _update_device_attribute(self: PstBeamComponentManager, attribute_name: str, value: Any) -> None:
"""Update the TANGO device attribute."""
if isinstance(value, dict):
value = json.dumps(value)
self._attribute_value_updated_callback(attribute_name, value)
def _handle_monitoring_data_update(self: PstBeamComponentManager, monitoring_data: dict) -> None:
"""Update the TANGO device properties from monitoring data."""
for k, v in monitoring_data.items():
self._update_device_attribute(k, v)
def _update_channel_block_configuration(self: PstBeamComponentManager, subband_resources: dict) -> None:
"""
Update the channel block configuration.
This calculates the new channel block configuration and is only called
after a successful `ConfigureScan` request. It uses the SMRB util to work
determine the subband configuration and then maps that to what is need
by the client of the BEAM.MGMT.
.. code-block:: python
{
"num_channel_blocks": 2,
"channel_blocks": [
{
"destination_host": "10.10.0.1",
"destination_port": 20000,
"destination_mac": "01:23:45:ab:cd:ef",
"start_pst_channel": 0,
"num_pst_channels": 12,
"start_pst_frequency": 49609375.0,
},
{
"destination_host": "10.10.0.1",
"destination_port": 20001,
"destination_mac": "01:23:45:ab:cd:ef",
"start_pst_channel": 12,
"num_pst_channels": 10,
"start_pst_frequency": 49652778.0,
},
]
}
"""
if subband_resources:
self.channel_block_configuration = {
"num_channel_blocks": subband_resources["common"]["nsubband"],
"channel_blocks": [
{
"destination_host": subband["data_host"],
"destination_port": subband["data_port"],
"destination_mac": subband["data_mac"],
"start_pst_channel": subband["start_channel"],
# start_centre_freq_mhz was calculated to 6 decimal places so it can cast to int
"start_pst_frequency": round_(subband["start_centre_freq_mhz"] * MEGA_HERTZ),
"num_pst_channels": subband["end_channel"] - subband["start_channel"],
}
for subband in subband_resources["subbands"].values()
],
}
else:
self.channel_block_configuration = {}
self._update_device_attribute("channel_block_configuration", self.channel_block_configuration)
@property
def data_receive_rate(self: PstBeamComponentManager) -> float:
"""Get current received data rate in Gb/s."""
return self._recv_subcomponent.data_receive_rate
@property
def data_received(self: PstBeamComponentManager) -> int:
"""Get current received data in bytes."""
return self._recv_subcomponent.data_received
@property
def data_drop_rate(self: PstBeamComponentManager) -> float:
"""Get current dropped data rate in bytes per second."""
return self._recv_subcomponent.data_drop_rate
@property
def data_dropped(self: PstBeamComponentManager) -> int:
"""Get current dropped data in bytes."""
return self._recv_subcomponent.data_dropped
@property
def misordered_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of packets received out of order in the current scan."""
return self._recv_subcomponent.misordered_packets
@property
def misordered_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of packets received out of order in packets/sec."""
return self._recv_subcomponent.misordered_packet_rate
@property
def malformed_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of malformed packets in the current scan."""
return self._recv_subcomponent.malformed_packets
@property
def malformed_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of malformed packets in packets/sec."""
return self._recv_subcomponent.malformed_packet_rate
@property
def misdirected_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of misdirected packets in the current scan."""
return self._recv_subcomponent.misdirected_packets
@property
def misdirected_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of misdirected packets in packets/sec."""
return self._recv_subcomponent.misdirected_packet_rate
@property
def checksum_failure_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of packets with checksum failures for the current scan."""
return self._recv_subcomponent.checksum_failure_packets
@property
def checksum_failure_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of packets with checksum failures in packets/sec."""
return self._recv_subcomponent.checksum_failure_packet_rate
@property
def timestamp_sync_error_packets(self: PstBeamComponentManager) -> int:
"""Get the total number of packets with timestamp sync errors for the current scan."""
return self._recv_subcomponent.timestamp_sync_error_packets
@property
def timestamp_sync_error_packet_rate(self: PstBeamComponentManager) -> float:
"""Get the current rate of packets with timestamp sync errors in packets/sec."""
return self._recv_subcomponent.timestamp_sync_error_packet_rate
@property
def seq_number_sync_error_packets(self: PstBeamComponentManager) -> int:
"""
Get the total number of packets with seq.
number sync error for the current scan.
"""
return self._recv_subcomponent.seq_number_sync_error_packets
@property
def seq_number_sync_error_packet_rate(self: PstBeamComponentManager) -> float:
"""
Get the current rate of packets with seq.
number sync error in packets/sec.
"""
return self._recv_subcomponent.seq_number_sync_error_packet_rate
@property
def no_valid_polarisation_correction_packets(self: PstBeamComponentManager) -> int:
"""
Get the number of packets received where no valid Jones polarisation corrections have been applied.
:return: the number of packets received where no valid Jones polarisation corrections have been applied.
:rtype: int
""" # noqa: E501
return self._recv_subcomponent.no_valid_polarisation_correction_packets
@property
def no_valid_polarisation_correction_packet_rate(self: PstBeamComponentManager) -> float:
"""
Get rate of packets where no valid Jones polarisation corrections have been applied in packets/sec.
:return: rate of packets where no valid Jones polarisation corrections have been applied in packets/sec.
:rtype: float
""" # noqa: E501
return self._recv_subcomponent.no_valid_polarisation_correction_packet_rate
@property
def no_valid_station_beam_packets(self: PstBeamComponentManager) -> int:
"""
Get the number of packets received where no valid station beam delay polynomials have been applied.
:return: the number of packets received where no valid station beam delay polynomials have been applied.
:rtype: int
""" # noqa: E501
return self._recv_subcomponent.no_valid_station_beam_packets
@property
def no_valid_station_beam_packet_rate(self: PstBeamComponentManager) -> float:
"""
Get current rate of packets where no valid station beam delay polynomials have been applied in packets/sec.
:return: current rate of packets where no valid station beam delay polynomials have been applied in packets/sec.
:rtype: float
""" # noqa: E501
return self._recv_subcomponent.no_valid_station_beam_packet_rate
@property
def no_valid_pst_beam_packets(self: PstBeamComponentManager) -> int:
"""
Get the number of packets received where no valid PST beam delay polynomials have been applied.
:return: the number of packets received where no valid PST beam delay polynomials have been applied.
:rtype: int
""" # noqa: E501
return self._recv_subcomponent.no_valid_pst_beam_packets
@property
def no_valid_pst_beam_packet_rate(self: PstBeamComponentManager) -> float:
"""
Get current rate of packets where no valid PST beam delay polynomials have been applied in packets/sec.
:return: current rate of packets where no valid PST beam delay polynomials have been applied in packets/sec.
:rtype: float
""" # noqa: E501
return self._recv_subcomponent.no_valid_pst_beam_packet_rate
@property
def data_record_rate(self: PstBeamComponentManager) -> float:
"""Get current data write rate in bytes per second."""
return self._dsp_subcomponent.dsp_disk_monitor_data.data_record_rate
@property
def data_recorded(self: PstBeamComponentManager) -> int:
"""Get current amount of bytes written to file."""
return self._dsp_subcomponent.dsp_disk_monitor_data.data_recorded
@property
def disk_capacity(self: PstBeamComponentManager) -> int:
"""Get size, in bytes, for the disk used for recording scan data."""
return self._dsp_subcomponent.dsp_disk_monitor_data.disk_capacity
@property
def disk_used_bytes(self: PstBeamComponentManager) -> int:
"""Get the current amount, in bytes, of disk used used."""
return self._dsp_subcomponent.dsp_disk_monitor_data.disk_used_bytes
@property
def disk_used_percentage(self: PstBeamComponentManager) -> float:
"""Get the percentage of used disk space for recording of scan data."""
return self._dsp_subcomponent.dsp_disk_monitor_data.disk_used_percentage
@property
def available_disk_space(self: PstBeamComponentManager) -> int:
"""Get available bytes for disk to be written to during scan."""
return self._dsp_subcomponent.dsp_disk_monitor_data.available_disk_space
@property
def available_recording_time(self: PstBeamComponentManager) -> float:
"""Get the available recording time, for the disk being written to during the scan, in seconds."""
return self._dsp_subcomponent.dsp_disk_monitor_data.available_recording_time
@property
def expected_data_record_rate(self: PstBeamComponentManager) -> float:
"""Get the expected data rate for DSP output for current scan configuration."""
return self._expected_data_record_rate
@expected_data_record_rate.setter
def expected_data_record_rate(self: PstBeamComponentManager, expected_data_record_rate: float) -> None:
"""Set the expected data rate for DSP output for current scan configuration."""
self._expected_data_record_rate = expected_data_record_rate
self._device_interface.handle_attribute_value_update(
"expected_data_record_rate", expected_data_record_rate
)
@property
def ring_buffer_utilisation(self: PstBeamComponentManager) -> float:
"""Get current utilisation of ring buffer for current scan configuration."""
return self._smrb_subcomponent.ring_buffer_utilisation
@property
def real_pol_a_mean_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the mean of the real data for pol A, averaged over all channels."""
return self._stat_subcomponent.real_pol_a_mean_freq_avg
@property
def real_pol_a_variance_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the variance of the real data for pol A, averaged over all channels."""
return self._stat_subcomponent.real_pol_a_variance_freq_avg
@property
def real_pol_a_num_clipped_samples(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the real data for pol A."""
return self._stat_subcomponent.real_pol_a_num_clipped_samples
@property
def imag_pol_a_mean_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the mean of the imaginary data for pol A, averaged over all channels."""
return self._stat_subcomponent.imag_pol_a_mean_freq_avg
@property
def imag_pol_a_variance_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the variance of the imaginary data for pol A, averaged over all channels."""
return self._stat_subcomponent.imag_pol_a_variance_freq_avg
@property
def imag_pol_a_num_clipped_samples(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the imaginary data for pol A."""
return self._stat_subcomponent.imag_pol_a_num_clipped_samples
@property
def real_pol_a_mean_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the mean of the real data for pol A, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.real_pol_a_mean_freq_avg_rfi_excised
@property
def real_pol_a_variance_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the variance of the real data for pol A, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.real_pol_a_variance_freq_avg_rfi_excised
@property
def real_pol_a_num_clipped_samples_rfi_excised(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the real data for pol A in channels not flagged for RFI."""
return self._stat_subcomponent.real_pol_a_num_clipped_samples_rfi_excised
@property
def imag_pol_a_mean_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the mean of the imaginary data for pol A, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.imag_pol_a_mean_freq_avg_rfi_excised
@property
def imag_pol_a_variance_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the variance of the imaginary data for pol A, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.imag_pol_a_variance_freq_avg_rfi_excised
@property
def imag_pol_a_num_clipped_samples_rfi_excised(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the imaginary data for pol A in channels not flagged for RFI."""
return self._stat_subcomponent.imag_pol_a_num_clipped_samples_rfi_excised
@property
def real_pol_b_mean_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the mean of the real data for pol B, averaged over all channels."""
return self._stat_subcomponent.real_pol_b_mean_freq_avg
@property
def real_pol_b_variance_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the variance of the real data for pol B, averaged over all channels."""
return self._stat_subcomponent.real_pol_b_variance_freq_avg
@property
def real_pol_b_num_clipped_samples(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the real data for pol B."""
return self._stat_subcomponent.real_pol_b_num_clipped_samples
@property
def imag_pol_b_mean_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the mean of the imaginary data for pol B, averaged over all channels."""
return self._stat_subcomponent.imag_pol_b_mean_freq_avg
@property
def imag_pol_b_variance_freq_avg(self: PstBeamComponentManager) -> float:
"""Get the variance of the imaginary data for pol B, averaged over all channels."""
return self._stat_subcomponent.imag_pol_b_variance_freq_avg
@property
def imag_pol_b_num_clipped_samples(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the imaginary data for pol B."""
return self._stat_subcomponent.imag_pol_b_num_clipped_samples
@property
def real_pol_b_mean_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the mean of the real data for pol B, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.real_pol_b_mean_freq_avg_rfi_excised
@property
def real_pol_b_variance_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the variance of the real data for pol B, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.real_pol_b_variance_freq_avg_rfi_excised
@property
def real_pol_b_num_clipped_samples_rfi_excised(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the real data for pol B in channels not flagged for RFI."""
return self._stat_subcomponent.real_pol_b_num_clipped_samples_rfi_excised
@property
def imag_pol_b_mean_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the mean of the imaginary data for pol B, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.imag_pol_b_mean_freq_avg_rfi_excised
@property
def imag_pol_b_variance_freq_avg_rfi_excised(self: PstBeamComponentManager) -> float:
"""Get the variance of the imaginary data for pol B, averaged over channels not flagged for RFI."""
return self._stat_subcomponent.imag_pol_b_variance_freq_avg_rfi_excised
@property
def imag_pol_b_num_clipped_samples_rfi_excised(self: PstBeamComponentManager) -> int:
"""Get the num of clipped samples of the imaginary data for pol B in channels not flagged for RFI."""
return self._stat_subcomponent.imag_pol_b_num_clipped_samples_rfi_excised
@override
def _simulation_mode_changed(self: PstBeamComponentManager) -> None:
"""
Set simulation mode state.
:param simulation_mode: the new simulation mode value.
:type simulation_mode: :py:class:`SimulationMode`
"""
# ensure we set the subordinate devices into to the same simulation mode.
self._smrb_subcomponent.simulation_mode = self.simulation_mode
self._recv_subcomponent.simulation_mode = self.simulation_mode
self._dsp_subcomponent.simulation_mode = self.simulation_mode
self._stat_subcomponent.simulation_mode = self.simulation_mode
@override
def _handle_communication_state_change(
self: PstBeamComponentManager, communication_state: CommunicationStatus
) -> None:
self.logger.info(f"communication state changes to {communication_state}")
if communication_state == CommunicationStatus.NOT_ESTABLISHED:
for sc in self._subcomponents:
sc.connect()
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
self._update_communication_state(CommunicationStatus.ESTABLISHED)
self._push_component_state_update(fault=None, power=PowerState.OFF)
self._handle_subcomponent_event_messages()
# force an update. This should result in health state of OK
self.handle_health_state_change()
elif communication_state == CommunicationStatus.DISABLED:
for sc in self._subcomponents:
sc.disconnect()
self._push_component_state_update(fault=None, power=PowerState.UNKNOWN)
self._update_communication_state(CommunicationStatus.DISABLED)
self._process_events = False
# force an update. This should result in health state of UNKNOWN
self.handle_health_state_change()
def _submit_remote_job(
self: PstBeamComponentManager,
job: Task,
task_callback: Callback,
completion_callback: Callback,
) -> TaskResponse:
remote_job = _RemoteJob(
job,
task_executor=self._pst_task_executor,
completion_callback=completion_callback,
logger=self.logger,
)
return self.submit_task(
remote_job,
task_callback=task_callback,
)
def _set_scan_configuration(
self: PstBeamComponentManager,
configuration: dict,
config_id: str,
pst_processing_mode: PstProcessingMode,
**kwargs: Any,
) -> None:
"""Set properties based off scan configuration."""
self.config_id = config_id
self._curr_scan_config = configuration
self.processing_mode = pst_processing_mode
@override
def configure_scan(
self: PstBeamComponentManager, task_callback: Callback = None, **kwargs: Any
) -> TaskResponse:
"""
Configure scan for the component.
The kwargs of this method is scan configuration. The super class method has
the same signature.
:param task_callback: callback to be called when the status of the command changes
:param kwargs: the scan configuration to use.
"""
configuration = kwargs
pst_configuration = convert_csp_config_to_pst_config(
telescope_config=self.telescope_config,
csp_configure_scan_request=configuration,
)
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'ConfigureScan' commands have completed.")
if self.is_idle:
self._push_component_state_update(configured=True)
self._set_scan_configuration(configuration=configuration, **pst_configuration)
dsp_scan_request = generate_dsp_scan_request(**pst_configuration)
self.expected_data_record_rate = dsp_scan_request["bytes_per_second"] * GIGABITS_PER_BYTE
self._device_interface.update_obs_mode(ObsMode.PULSAR_TIMING)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
def _configure_beam(sc: PstSubcomponentManager) -> None:
sc.configure_beam(configuration=pst_configuration)
def _configure_scan(sc: PstSubcomponentManager) -> None:
sc.configure_scan(configuration=pst_configuration)
deconfigure_tasks: List[Task] = []
if not self.is_idle:
deconfigure_tasks = self._deconfigure_tasks()
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
*deconfigure_tasks,
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="configure_beam",
action=_configure_beam,
),
_subcomponent_task(
subcomponents=[
self._dsp_subcomponent,
self._recv_subcomponent,
self._stat_subcomponent,
],
action_name="configure_beam",
action=_configure_beam,
),
_subcomponent_task(
subcomponents=[
self._smrb_subcomponent,
self._recv_subcomponent,
self._stat_subcomponent,
],
action_name="configure_scan",
action=_configure_scan,
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent],
action_name="configure_scan",
action=_configure_scan,
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _reset_state(self: PstBeamComponentManager, reset_health: bool = True) -> None:
"""
Reset state of the BEAM.
This method is used by the ``deconfigure_scan``, ``obsreset``
and ``reset`` methods to force the BEAM device back into the correct state.
This method performs the following actions:
* sets the current scan id to zero to signal that there is no current scan
* sets the current scan configuration as empty
* sets the configuration id as an empty string
* sets the PST processing mode to being ``PstProcessingMode.IDLE``
* attempts to reset the component state nicely to not scanning and not configured
* resets all the monitoring data, this is because the system is not long in a monitoring state
* resets the health state if the ``reset_health`` parameter was set to True
* sets the observation mode to being ``ObsMode.IDLE``
:param reset_health: whether to reset the health state of the device, defaults to True
:type reset_health: bool, optional
"""
self.scan_id = 0
self._curr_scan_config = None
self.config_id = ""
self.processing_mode = PstProcessingMode.IDLE
try:
self._push_component_state_update(scanning=False)
except Exception:
# ignore error
pass
try:
self._push_component_state_update(configured=False)
except Exception:
# ignore error
pass
self._reset_monitoring_properties()
if reset_health:
self._device_interface.update_health_state(health_state=HealthState.OK)
self._device_interface.update_obs_mode(ObsMode.IDLE)
@override
def deconfigure_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""
Deconfigure scan for this component.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'DeconfigureScan' commands have completed.")
self._reset_state(reset_health=False)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
return self._submit_remote_job(
job=SequentialTask(subtasks=self._deconfigure_tasks()),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _deconfigure_tasks(self: PstBeamComponentManager) -> List[Task]:
return [
# need to deconfigure scan of all processes, this can be done in parallel.
_subcomponent_task(
subcomponents=self._subcomponents,
action_name="deconfigure_scan",
action=lambda sc: sc.deconfigure_scan(),
),
# need to release the ring buffer clients before deconfiguring SMRB
_subcomponent_task(
subcomponents=[
self._dsp_subcomponent,
self._recv_subcomponent,
self._stat_subcomponent,
],
action_name="deconfigure_beam",
action=lambda sc: sc.deconfigure_beam(),
),
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="deconfigure_beam",
action=lambda sc: sc.deconfigure_beam(),
),
]
@override
def scan(
self: PstBeamComponentManager,
task_callback: Callback = None,
**kwargs: Any,
) -> TaskResponse:
"""Start scanning.
The kwargs of this method is scan request, including the scan_id. The super class
method has the same signature. By using the kwargs allow for forward compatibility
of accepting other parameters for the starting of the scan.
:param task_callback: callback for background processing to update device status.
:type task_callback: Callback
:param kwargs: scan request as a dict
:type kwargs: dict
"""
scan_id = int(kwargs["scan_id"])
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Scan' commands have completed.")
self._push_component_state_update(scanning=True)
self.scan_id = scan_id
task_callback(status=TaskStatus.COMPLETED, result="Completed")
def _scan(sc: PstSubcomponentManager) -> None:
sc.scan(**kwargs)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(
action=lambda: self._write_scan_config_to_output_dir(scan_id),
name="write_scan_config_to_output_dir",
),
_subcomponent_task(
subcomponents=[self._recv_subcomponent, self._smrb_subcomponent],
action_name="scan",
action=_scan,
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent],
action_name="scan",
action=_scan,
),
_subcomponent_task(
subcomponents=[self._stat_subcomponent],
action_name="scan",
action=_scan,
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _write_scan_config_to_output_dir(self: PstBeamComponentManager, scan_id: int) -> None:
"""Write the scan configuration out as JSON."""
self.logger.debug(f"Writing scan configuration for scan {scan_id}")
# dump current scan configuration as JSON
params = {
"eb_id": self._curr_scan_config["common"]["eb_id"], # type: ignore
"subsystem_id": self._device_interface.subsystem_id,
"scan_id": str(scan_id),
}
output_dir_str = self._device_interface.scan_output_dir_pattern
for k, v in params.items():
output_dir_str = output_dir_str.replace(f"<{k}>", v)
try:
output_dir = pathlib.Path(output_dir_str)
output_dir.mkdir(parents=True, exist_ok=True)
scan_configuration_path = output_dir / "scan_configuration.json"
self.logger.info(f"Writing scan configuration for scan {scan_id} to {scan_configuration_path}")
with open(scan_configuration_path, "w") as f:
json.dump(self._curr_scan_config, f)
except Exception:
self.logger.exception("Error in writing output file.", exc_info=True)
raise
@override
def end_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Stop scanning.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'EndScan' commands have completed.")
self._push_component_state_update(scanning=False)
self.scan_id = 0
task_callback(status=TaskStatus.COMPLETED, result="Completed")
# need to stop_scan on RECV before DSP and STAT, then SMRB
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
_subcomponent_task(
subcomponents=[self._recv_subcomponent],
action_name="end_scan",
action=lambda sc: sc.end_scan(),
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent, self._stat_subcomponent],
action_name="end_scan",
action=lambda sc: sc.end_scan(),
),
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="end_scan",
action=lambda sc: sc.end_scan(),
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _abort_task(self: PstBeamComponentManager, subcomponents: List[PstSubcomponentManager]) -> Task:
subcomponents_to_abort = [
sc
for sc in subcomponents
if sc.obs_state not in [ObsState.ABORTED, ObsState.FAULT, ObsState.EMPTY]
]
return _subcomponent_task(
subcomponents=subcomponents_to_abort, action_name="abort", action=lambda sc: sc.abort()
)
@override
def abort(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Tell the component to abort whatever it was doing.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Abort' commands have completed.")
self._push_component_state_update(scanning=False)
self.abort_commands()
task_callback(status=TaskStatus.COMPLETED, result="Completed")
self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(
action=lambda: callback_safely(task_callback, status=TaskStatus.IN_PROGRESS),
name="abort_in_progress",
),
self._abort_task([self._recv_subcomponent]),
self._abort_task([self._dsp_subcomponent, self._stat_subcomponent]),
self._abort_task([self._smrb_subcomponent]),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
return TaskStatus.IN_PROGRESS, "Aborting"
@override
def obsreset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Reset the component and put it into an IDLE state.
The subcomponents have all the resources released and as such are moved back to
the EMPTY state.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'ObsReset' commands have completed.")
self._reset_state()
task_callback(status=TaskStatus.COMPLETED, result="Completed")
abort_recv_subtask: Task = self._abort_task([self._recv_subcomponent])
abort_readers_subtask: Task = self._abort_task([self._dsp_subcomponent, self._stat_subcomponent])
abort_smrb_subtask: Task = self._abort_task([self._smrb_subcomponent])
# call reset on all subcomponents SMRB has to be done last but the others can be done in parallel
subcomponents_to_reset: List[PstSubcomponentManager] = [
sc
for sc in cast(
List[PstSubcomponentManager],
[
self._stat_subcomponent,
self._dsp_subcomponent,
self._recv_subcomponent,
],
)
]
obsreset_subtasks = _subcomponent_task(
subcomponents=subcomponents_to_reset, action_name="obsreset", action=lambda sc: sc.obsreset()
)
obsreset_smrb_subtask = _subcomponent_task(
subcomponents=[self._smrb_subcomponent], action_name="obsreset", action=lambda sc: sc.obsreset()
)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
abort_recv_subtask,
abort_readers_subtask,
abort_smrb_subtask,
obsreset_subtasks,
obsreset_smrb_subtask,
],
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@override
def reset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Reset the component and put it into an IDLE state.
The subcomponents have all the resources released and as such are moved back to
the IDLE state.
:param task_callback: callback to be called when the status of the command changes
"""
# call reset on all subcomponents SMRB has to be done last but the others can be done in parallel
subcomponents_to_reset: List[PstSubcomponentManager] = [
sc
for sc in cast(
List[PstSubcomponentManager],
[
self._stat_subcomponent,
self._dsp_subcomponent,
self._recv_subcomponent,
],
)
]
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'Reset' commands have completed.")
self._reset_state()
task_callback(status=TaskStatus.COMPLETED, result="Completed")
reset_subtasks = _subcomponent_task(
subcomponents=subcomponents_to_reset, action_name="reset", action=lambda sc: sc.reset()
)
reset_smrb_subtask = _subcomponent_task(
subcomponents=[self._smrb_subcomponent], action_name="reset", action=lambda sc: sc.reset()
)
connect_subtask = _subcomponent_task(
subcomponents=self._subcomponents, action_name="connect", action=lambda sc: sc.connect()
)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
reset_subtasks,
reset_smrb_subtask,
connect_subtask,
],
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@override
def go_to_fault(
self: PstBeamComponentManager, fault_msg: str, task_callback: Callback = None
) -> TaskResponse:
"""Put all the sub-devices into a FAULT state.
:param task_callback: callback to be called when the status of the command changes
"""
def _completion_callback(task_callback: Callable) -> None:
self.logger.debug("All the 'GoToFault' commands have completed.")
self._push_component_state_update(obsfault=True)
task_callback(status=TaskStatus.COMPLETED, result="Completed")
self._device_interface.handle_fault(fault_msg=fault_msg)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
_subcomponent_task(
subcomponents=[self._recv_subcomponent],
action_name="go_to_fault",
action=lambda sc: sc.go_to_fault(fault_msg=fault_msg),
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent, self._stat_subcomponent],
action_name="go_to_fault",
action=lambda sc: sc.go_to_fault(fault_msg=fault_msg),
),
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="go_to_fault",
action=lambda sc: sc.go_to_fault(fault_msg=fault_msg),
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@override
def set_logging_level(self: PstBeamComponentManager, log_level: LoggingLevel) -> None:
"""
Set LoggingLevel of all the sub-devices.
:param log_level: The required TANGO LoggingLevel
:returns: None.
"""
for subcomponent in self._subcomponents:
subcomponent.set_logging_level(log_level=log_level)
@property
def monitoring_polling_rate_ms(self: PstBeamComponentManager) -> int:
"""Get the current monitoring polling rate, in milliseconds."""
return self._monitoring_polling_rate_ms
@monitoring_polling_rate_ms.setter
def monitoring_polling_rate_ms(self: PstBeamComponentManager, monitoring_polling_rate_ms: int) -> None:
"""Set the monitoring polling rate on the subordinate devices."""
for subcomponent in self._subcomponents:
subcomponent.monitoring_polling_rate_ms = monitoring_polling_rate_ms
self._monitoring_polling_rate_ms = monitoring_polling_rate_ms
@property
def health_check_interval(self: PstBeamComponentManager) -> int:
"""
Get the current health check interval, in milliseconds.
:return: the current health check interval, in milliseconds.
:rtype: int
"""
return self._health_check_interval
@health_check_interval.setter
def health_check_interval(self: PstBeamComponentManager, health_check_interval: int) -> None:
"""
Set the health check interval, in milliseconds.
Setting this parameter will restart the health check process of the background applications.
:param health_check_interval: the updated health check interval, in milliseconds.
:type health_check_interval: int
"""
if self._health_check_interval != health_check_interval:
self._health_check_interval = health_check_interval
for scm in self._subcomponents:
scm.health_check_interval = health_check_interval
scm.restart_health_check()
@property
def recv_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the RECV subcomponent.
:return: the current health state for the RECV subcomponent.
:rtype: HealthState
"""
return self._recv_subcomponent.health_state
@property
def recv_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the RECV subcomponent.
:return: the current observing state for the RECV subcomponent.
:rtype: ObsState
"""
return self._recv_subcomponent.obs_state
@property
def smrb_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the SMRB subcomponent.
:return: the current health state for the SMRB subcomponent.
:rtype: HealthState
"""
return self._smrb_subcomponent.health_state
@property
def smrb_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the SMRB subcomponent.
:return: the current observing state for the SMRB subcomponent.
:rtype: ObsState
"""
return self._smrb_subcomponent.obs_state
@property
def stat_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the STAT subcomponent.
:return: the current health state for the STAT subcomponent.
:rtype: HealthState
"""
return self._stat_subcomponent.health_state
@property
def stat_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the STAT subcomponent.
:return: the current observing state for the STAT subcomponent.
:rtype: ObsState
"""
return self._stat_subcomponent.obs_state
@property
def dsp_disk_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the DSP.DISK (voltage recorder) subcomponent.
:return: the current health state for the DSP.DISK subcomponent.
:rtype: HealthState
"""
return self._dsp_subcomponent.dsp_disk_health_state
@property
def dsp_disk_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the DSP.DISK (voltage recorder) subcomponent.
:return: the current observing state for the DSP.DISK subcomponent.
:rtype: ObsState
"""
return self._dsp_subcomponent.dsp_disk_obs_state
@property
def dsp_flow_through_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the DSP.FT (flow through) subcomponent.
:return: the current health state for the DSP.FT subcomponent.
:rtype: HealthState
"""
return self._dsp_subcomponent.dsp_flow_through_health_state
@property
def dsp_flow_through_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the DSP.FT (flow through) subcomponent.
:return: the current observing state for the DSP.FT subcomponent.
:rtype: ObsState
"""
return self._dsp_subcomponent.dsp_flow_through_obs_state
@property
def _subcomponent_health_states(self: PstBeamComponentManager) -> Dict[str, HealthState]:
return {
"RECV": self.recv_health_state,
"SMRB": self.smrb_health_state,
"STAT": self.stat_health_state,
"DSP.DISK": self.dsp_disk_health_state,
"DSP.FT": self.dsp_flow_through_health_state,
}
[docs] def handle_health_state_change(self: PstBeamComponentManager) -> None:
"""
Handle a change in one of the subcomponent's health state.
This method will aggregate the overall health state of the PST BEAM based
on the states of the subcomponents based on the following rules:
* if all subcomponents have a state of HealthState.OK, then the beam's state is HealthState.OK
* if one subcomponent has a state of HealthState.DEGRADED, then the beam's state
is HealthState.DEGRADED
* if multiple subcomponents have a state of HealthState.DEGRADED, then the beam's state is
set to HealthState.FAILED
* if any subcomponent has a state of HealthState.FAILED, then the beam's state is set
to HealthState.FAILED
* finally, if any subcomponent has a state of HealthState.UNKNOWN then overall state is
still HealthState.UNKNOWN
"""
out_state = HealthState.OK
for scm, scm_health_state in self._subcomponent_health_states.items():
if scm_health_state == HealthState.DEGRADED:
self.logger.debug(f"{scm} health state is {scm_health_state}")
if out_state == HealthState.DEGRADED:
self.logger.warning(
f"multiple subcomponents are in degraded state. Putting "
f"{self.beam_id} in FAILED state",
exc_info=False,
)
out_state = HealthState.FAILED
break
else:
out_state = HealthState.DEGRADED
if scm_health_state == HealthState.FAILED:
self.logger.warning(
f"{scm} health state is in FAILED state. Putting {self.beam_id} into FAILED state",
exc_info=False,
)
out_state = HealthState.FAILED
break
if scm_health_state == HealthState.UNKNOWN:
out_state = HealthState.UNKNOWN
self._device_interface.update_health_state(health_state=out_state)
def __getattr__(self: PstBeamComponentManager, name: str) -> Any:
"""
Get attribute of component manager.
This is a Python dunder method that is used to get attributes/properties
that have not been found by the ``__getattribute__`` method.
This allows delegating getting the attribute from sub-component managers without
needing to have code specific to delegate getting the property.
:param name: the name of the attribute
:type name: str
:return: the attribute value
:rtype: Any
:raises: AttributeError if attribute cannot be found on sub-component
"""
for sc in self._subcomponents:
try:
return getattr(sc, name)
except AttributeError:
pass
raise AttributeError(name=name)