# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the BEAM PST component manager."""
from __future__ import annotations
import dataclasses
import json
import logging
import pathlib
import queue
import threading
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, cast
from overrides import override
from ska_control_model import (
CommunicationStatus,
HealthState,
LoggingLevel,
ObsMode,
ObsState,
PowerState,
PstProcessingMode,
ResultCode,
)
from ska_pst.common.constants import GIGABITS_PER_BYTE, MEGA_HERTZ, SCAN_CONFIG_FILE_NAME
from ska_pst.lmc.beam.beam_device_interface import PstBeamDeviceInterface
from ska_pst.lmc.component import PstComponentManager, PstSubcomponentManager, SubcomponentEventMessage
from ska_pst.lmc.dsp import PstDspComponentManager
from ska_pst.lmc.dsp.dsp_util import generate_dsp_scan_request
from ska_pst.lmc.job import LambdaTask, NoopTask, ParallelTask, SequentialTask, Task, TaskExecutor
from ska_pst.lmc.receive import PstReceiveComponentManager
from ska_pst.lmc.smrb import PstSmrbComponentManager
from ska_pst.lmc.stat import PstStatComponentManager
from ska_pst.lmc.util import BackgroundTaskProcessor, background_task
from ska_pst.lmc.util.callback import Callback, callback_safely
from ska_tango_base.executor import TaskStatus
from ska_pst.common import TelescopeConfig, convert_csp_config_to_pst_config, get_telescope_config, round_
TaskResponse = Tuple[TaskStatus, str]
RemoteTaskResponse = Tuple[List[TaskStatus], List[str]]
__all__ = [
"PstBeamComponentManager",
]
def _assert_obs_state_task(
subcomponents: List[PstSubcomponentManager],
allowed_states: ObsState | Sequence[ObsState],
cmd: str,
) -> Task:
if isinstance(allowed_states, ObsState):
allowed_states = [allowed_states]
allowed_state_names = [a.name for a in allowed_states]
def _assert_obs_state(sc: PstSubcomponentManager) -> None:
obs_state = sc.remote_obs_state
assert obs_state in allowed_states, (
f"{sc.subcomponent_name} {obs_state.name} not in allowed states of "
f"{allowed_state_names} for command '{cmd}'"
)
return ParallelTask(
subtasks=[
LambdaTask(action=lambda: _assert_obs_state(sc), name=f"assert_obs_state_{sc.subcomponent_name}")
for sc in subcomponents
]
)
def _subcomponent_task(
subcomponents: List[PstSubcomponentManager],
action_name: str,
action: Callable[[PstSubcomponentManager], None],
) -> Task:
if len(subcomponents) == 0:
return NoopTask()
elif len(subcomponents) == 1:
sc = subcomponents[0]
return LambdaTask(action=lambda: action(sc), name=f"{action_name}_{sc.subcomponent_name}")
def _action(sc: PstSubcomponentManager) -> Callable[..., None]:
def _inner() -> None:
return action(sc)
return _inner
return ParallelTask(
subtasks=[
LambdaTask(action=_action(sc), name=f"{action_name}_{sc.subcomponent_name}")
for sc in subcomponents
]
)
class _RemoteJob:
def __init__(
self: _RemoteJob,
job: Task,
task_executor: TaskExecutor,
completion_callback: Callback,
exception_handler: Callable[[Exception], None] | None,
logger: logging.Logger,
):
self._job = job
self._completion_callback = completion_callback
self._logger = logger
self._task_executor = task_executor
self._exception_handler = exception_handler
def __call__(
self: _RemoteJob,
*args: Any,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
**kwargs: Any,
) -> None:
def _completion_callback(*arg: Any, **kwargs: Any) -> None:
self._completion_callback(task_callback) # type: ignore
callback_safely(task_callback, status=TaskStatus.IN_PROGRESS)
try:
self._task_executor.submit_job(job=self._job, callback=_completion_callback)
except Exception as e:
self._logger.warning("Error in submitting long running commands to remote devices", exc_info=True)
if self._exception_handler:
self._exception_handler(e)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, f"Unhandled exception during execution: {str(e)}"),
exception=e,
)
[docs]class PstBeamComponentManager(PstComponentManager[PstBeamDeviceInterface]):
"""
Component manager for the BEAM component in PST.LMC.
Since the BEAM component is a logical device, this component
manager is used to orchestrate the process devices, such as
BEAM, RECV.
Commands that are executed on this component manager are
sent to instances of :py:class:`PstDeviceProxy` for each
device that the BEAM device manages.
This component manager only takes the fully-qualified device
name (FQDN) for the remote devices, but uses the
:py:class:`DeviceProxyFactory` to retrieve instances of the
device proxies that commands should be sent to.
"""
_dsp_subcomponent: PstDspComponentManager
_recv_subcomponent: PstReceiveComponentManager
_smrb_subcomponent: PstSmrbComponentManager
_stat_subcomponent: PstStatComponentManager
_event_queue: queue.Queue[SubcomponentEventMessage]
def __init__(
self: PstBeamComponentManager,
*,
device_interface: PstBeamDeviceInterface,
logger: logging.Logger | None = None,
pst_task_executor: TaskExecutor | None = None,
background_task_processor: BackgroundTaskProcessor | None = None,
event_queue: queue.Queue[SubcomponentEventMessage] | None = None,
**kwargs: Any,
) -> None:
"""
Initialise the BEAM component manager.
:param device_interface: an interface to the TANGO device that manages this component.
This is used so in testing a normal unit test doesn't need to use the TANGO testing
infrastructure.
:type device_interface: DeviceInterface
:param logger: a logger for this object to use, defaults to None
:type logger: logging.Logger | None, optional
:param pst_task_executor: an instance of the task executor to perform background
tasks on. If this is not provided a default executor will be used.
:type pst_task_executor: TaskExecutor | None, optional
:param background_task_processor: a background task processor used by the component
manager for processing subcomponent events that are on the ``event_queue``, defaults
to None. If this is not provided a default background task processor will
be created.
:type background_task_processor: BackgroundTaskProcessor | None, optional
:param event_queue: an instance of a queue that has :py:class:`SubcomponentEventMessage`
messages sent from subcomponents. If this is not provided a default queue will
be created. This parameter is used in unit testing to simulate sending of events from
the subcomponent managers.
:type event_queue: queue.Queue[SubcomponentEventMessage] | None, optional
"""
logger = logger or logging.getLogger(__name__)
self._event_queue = event_queue or queue.Queue()
self._background_task_processor = background_task_processor or BackgroundTaskProcessor(
default_logger=logger
)
super().__init__(
device_interface=device_interface,
power=PowerState.UNKNOWN,
fault=None,
logger=logger,
obsfault=None,
scanning=False,
configured=False,
**kwargs,
)
self._dsp_subcomponent = PstDspComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.dsp_process_api_endpoint,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._recv_subcomponent = PstReceiveComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.recv_process_api_endpoint,
subband_resources_callback=self._update_channel_block_configuration,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._smrb_subcomponent = PstSmrbComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.smrb_process_api_endpoint,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._stat_subcomponent = PstStatComponentManager(
device_name=self.device_name,
process_api_endpoint=device_interface.stat_process_api_endpoint,
monitor_data_updated_callback=self._handle_monitoring_data_update,
beam_id=device_interface.beam_id,
logger=logger,
event_queue=self._event_queue,
)
self._subcomponents: List[PstSubcomponentManager] = [
self._smrb_subcomponent,
self._recv_subcomponent,
self._dsp_subcomponent,
self._stat_subcomponent,
]
self._pst_task_executor = pst_task_executor or TaskExecutor(logger=logger)
self._curr_scan_config: dict | None = None
self._pst_task_executor.start()
self._monitoring_polling_rate_ms = device_interface.monitoring_polling_rate_ms
self._health_check_interval = device_interface.health_check_interval
self._expected_data_record_rate = 0.0
self._processing_mode: PstProcessingMode = PstProcessingMode.IDLE
self._reset_monitoring_properties()
# the key in this dictionary is the subcomponent name
self._previous_event_msgs: Dict[str, SubcomponentEventMessage] = {}
def __del__(self: PstBeamComponentManager) -> None:
"""Handle shutdown of component manager."""
self._process_events = False
self._pst_task_executor.stop()
def _reset_monitoring_properties(self: PstBeamComponentManager) -> None:
"""
Initialise all the monitored properties.
This method will set all the properties to there default values. This
calls the `reset_monitoring` method on the subcomponent, which in
turn will fire a event that the data has been updated.
"""
for subcomponent in self._subcomponents:
subcomponent.reset_monitoring()
self.expected_data_record_rate = 0.0
self.channel_block_configuration: dict = {}
@background_task
def _handle_subcomponent_event_messages(self: PstBeamComponentManager) -> None:
"""Handle the background processing of state events from the subcomponents."""
# other threads will set this which will allow for exiting.
self._process_events = True
try:
while self._process_events:
try:
# wait for at most 10ms. Allows for interruption event.
current_event_msg = self._event_queue.get(timeout=0.01)
attrs = dataclasses.asdict(current_event_msg)
del attrs["subcomponent_name"]
prev_event_msg = self._previous_event_msgs.get(current_event_msg.subcomponent_name)
if prev_event_msg is not None:
# remove values that haven't been updated
attrs = {k: v for k, v in attrs.items() if getattr(prev_event_msg, k) != v}
for k, v in attrs.items():
self._attribute_value_updated_callback(
f"{current_event_msg.subcomponent_name}_{k}", v
)
# ensure that we update overall health state
if "health_state" in attrs:
self.handle_health_state_change()
self._previous_event_msgs[current_event_msg.subcomponent_name] = current_event_msg
except queue.Empty:
pass
except Exception:
self.logger.exception("Error in background processing of events.", exc_info=True)
raise
@property
def telescope_config(self: PstBeamComponentManager) -> TelescopeConfig:
"""
Get the current telescope configuration.
This property returns a ``TelescopeConfig`` based on which telescope the
PST BEAM is configured for.
This is a utility property to avoid having to duplicate calling
``get_telescope_config`` within this class.
:return: the current telescope configuration.
:rtype: TelescopeConfig
"""
return get_telescope_config(self._device_interface.facility.telescope)
@property
def processing_mode(self: PstBeamComponentManager) -> PstProcessingMode:
"""Get the current processing mode.
If the PST BEAM has been configured for a scan then this will return
the processing mode from the configuration JSON. If the BEAM is in an
IDLE state then this will return IDLE.
:return: the current processing mode if there has been a configuration, otherwise IDLE.
:rtype: PstProcessingMode
"""
return self._processing_mode
@processing_mode.setter
def processing_mode(self: PstBeamComponentManager, processing_mode: PstProcessingMode) -> None:
"""
Set the current PST Processing Mode.
:param processing_mode: the processing mode that PST is currently set to.
:type processing_mode: PstProcessingMode
"""
if processing_mode != self._processing_mode:
self._processing_mode = processing_mode
self._device_interface.handle_attribute_value_update("pst_processing_mode", processing_mode.name)
@property
def is_idle(self: PstBeamComponentManager) -> bool:
"""
Get whether PST is in an idle state or not.
This is based off the PstProcessingMode. If PST has been configured for a scan then the
processing mode is not IDLE but if PST hasn't been configured then the processing mode is IDLE.
:return: whether PST is in an idle state or not.
:rtype: bool
"""
return self.processing_mode == PstProcessingMode.IDLE
@property
def channel_block_configuration(self: PstBeamComponentManager) -> dict:
"""Get current channel block configuration."""
return self._channel_block_configuration
@channel_block_configuration.setter
def channel_block_configuration(self: PstBeamComponentManager, config: dict) -> None:
"""Set channel black configuration."""
self._channel_block_configuration = config
self._attribute_value_updated_callback("channel_block_configuration", json.dumps(config))
def _update_device_attribute(self: PstBeamComponentManager, attribute_name: str, value: Any) -> None:
"""Update the TANGO device attribute."""
if isinstance(value, dict):
value = json.dumps(value)
self._attribute_value_updated_callback(attribute_name, value)
def _handle_monitoring_data_update(self: PstBeamComponentManager, monitoring_data: dict) -> None:
"""Update the TANGO device properties from monitoring data."""
for k, v in monitoring_data.items():
self._update_device_attribute(k, v)
def _update_channel_block_configuration(self: PstBeamComponentManager, subband_resources: dict) -> None:
"""
Update the channel block configuration.
This calculates the new channel block configuration and is only called
after a successful `ConfigureScan` request. It uses the SMRB util to work
determine the subband configuration and then maps that to what is need
by the client of the BEAM.MGMT.
.. code-block:: python
{
"num_channel_blocks": 2,
"channel_blocks": [
{
"destination_host": "10.10.0.1",
"destination_port": 20000,
"destination_mac": "01:23:45:ab:cd:ef",
"start_pst_channel": 0,
"num_pst_channels": 12,
"start_pst_frequency": 49609375.0,
},
{
"destination_host": "10.10.0.1",
"destination_port": 20001,
"destination_mac": "01:23:45:ab:cd:ef",
"start_pst_channel": 12,
"num_pst_channels": 10,
"start_pst_frequency": 49652778.0,
},
]
}
"""
if subband_resources:
self.channel_block_configuration = {
"num_channel_blocks": subband_resources["common"]["nsubband"],
"channel_blocks": [
{
"destination_host": subband["data_host"],
"destination_port": subband["data_port"],
"destination_mac": subband["data_mac"],
"start_pst_channel": subband["start_channel"],
# start_centre_freq_mhz was calculated to 6 decimal places so it can cast to int
"start_pst_frequency": round_(subband["start_centre_freq_mhz"] * MEGA_HERTZ),
"num_pst_channels": subband["end_channel"] - subband["start_channel"],
}
for subband in subband_resources["subbands"].values()
],
}
else:
self.channel_block_configuration = {}
self._update_device_attribute("channel_block_configuration", self.channel_block_configuration)
@property
def expected_data_record_rate(self: PstBeamComponentManager) -> float:
"""Get the expected data rate for DSP output for current scan configuration."""
return self._expected_data_record_rate
@expected_data_record_rate.setter
def expected_data_record_rate(self: PstBeamComponentManager, expected_data_record_rate: float) -> None:
"""Set the expected data rate for DSP output for current scan configuration."""
self._expected_data_record_rate = expected_data_record_rate
self._device_interface.handle_attribute_value_update(
"expected_data_record_rate", expected_data_record_rate
)
@override
def _simulation_mode_changed(self: PstBeamComponentManager) -> None:
"""
Set simulation mode state.
:param simulation_mode: the new simulation mode value.
:type simulation_mode: :py:class:`SimulationMode`
"""
# ensure we set the subordinate devices into to the same simulation mode.
self._smrb_subcomponent.simulation_mode = self.simulation_mode
self._recv_subcomponent.simulation_mode = self.simulation_mode
self._dsp_subcomponent.simulation_mode = self.simulation_mode
self._stat_subcomponent.simulation_mode = self.simulation_mode
@override
def _handle_communication_state_change(
self: PstBeamComponentManager, communication_state: CommunicationStatus
) -> None:
self.logger.info(f"communication state changes to {communication_state}")
if communication_state == CommunicationStatus.NOT_ESTABLISHED:
for sc in self._subcomponents:
sc.connect()
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
self._update_communication_state(CommunicationStatus.ESTABLISHED)
self._update_component_state(fault=None, power=PowerState.OFF)
self._handle_subcomponent_event_messages()
# force an update. This should result in health state of OK
self.handle_health_state_change()
elif communication_state == CommunicationStatus.DISABLED:
for sc in self._subcomponents:
sc.disconnect()
self._update_component_state(fault=None, power=PowerState.UNKNOWN)
self._update_communication_state(CommunicationStatus.DISABLED)
self._process_events = False
# force an update. This should result in health state of UNKNOWN
self.handle_health_state_change()
def _submit_remote_job(
self: PstBeamComponentManager,
job: Task,
task_callback: Callback,
completion_callback: Callback,
) -> TaskResponse:
remote_job = _RemoteJob(
job,
task_executor=self._pst_task_executor,
completion_callback=completion_callback,
exception_handler=self._handle_remote_job_exception,
logger=self.logger,
)
return self.submit_task(
remote_job,
task_callback=task_callback,
)
def _handle_remote_job_exception(self: PstBeamComponentManager, _exception: Exception) -> None:
"""
Handle the remote job exception.
This will ensure that the obstate gets put into a FAULT state.
"""
self._update_component_state(obsfault=True)
self._device_interface.update_health_state(health_state=HealthState.FAILED)
def _set_scan_configuration(
self: PstBeamComponentManager,
configuration: dict,
config_id: str,
pst_processing_mode: PstProcessingMode,
**kwargs: Any,
) -> None:
"""Set properties based off scan configuration."""
self.config_id = config_id
self._curr_scan_config = configuration
self.processing_mode = pst_processing_mode
@override
def configure_scan(
self: PstBeamComponentManager, task_callback: Callback = None, **kwargs: Any
) -> TaskResponse:
"""
Configure scan for the component.
The kwargs of this method is scan configuration. The super class method has
the same signature.
:param task_callback: callback to be called when the status of the command changes
:param kwargs: the scan configuration to use.
"""
self.logger.info("'ConfigureScan' command requested")
configuration = kwargs
pst_configuration = convert_csp_config_to_pst_config(
telescope_config=self.telescope_config,
csp_configure_scan_request=configuration,
)
def _completion_callback(task_callback: Callable) -> None:
self._set_scan_configuration(configuration=configuration, **pst_configuration)
dsp_scan_request = generate_dsp_scan_request(**pst_configuration)
self.expected_data_record_rate = dsp_scan_request["bytes_per_second"] * GIGABITS_PER_BYTE
self._device_interface.update_obs_mode(ObsMode.PULSAR_TIMING)
self._update_component_state(configured=True)
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(
f"'ConfigureScan' command completed successfully. ObsState={self.obs_state.name}"
)
def _configure_beam(sc: PstSubcomponentManager) -> None:
sc.configure_beam(configuration=pst_configuration)
def _configure_scan(sc: PstSubcomponentManager) -> None:
sc.configure_scan(configuration=pst_configuration)
deconfigure_tasks: List[Task] = []
if not self.is_idle:
deconfigure_tasks = self._deconfigure_tasks(cmd="configure_scan")
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'ConfigureScan' command running")),
_assert_obs_state_task(
subcomponents=self._subcomponents,
allowed_states=[ObsState.EMPTY, ObsState.READY],
cmd="configure_scan",
),
*deconfigure_tasks,
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="configure_beam",
action=_configure_beam,
),
_subcomponent_task(
subcomponents=[
self._dsp_subcomponent,
self._recv_subcomponent,
self._stat_subcomponent,
],
action_name="configure_beam",
action=_configure_beam,
),
_subcomponent_task(
subcomponents=[
self._smrb_subcomponent,
self._recv_subcomponent,
self._stat_subcomponent,
],
action_name="configure_scan",
action=_configure_scan,
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent],
action_name="configure_scan",
action=_configure_scan,
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _reset_state(self: PstBeamComponentManager, reset_health: bool = True) -> None:
"""
Reset state of the BEAM.
This method is used by the ``deconfigure_scan``, ``obsreset``
and ``reset`` methods to force the BEAM device back into the correct state.
This method performs the following actions:
* sets the current scan id to zero to signal that there is no current scan
* sets the current scan configuration as empty
* sets the configuration id as an empty string
* sets the PST processing mode to being ``PstProcessingMode.IDLE``
* attempts to reset the component state nicely to not scanning and not configured
* resets all the monitoring data, this is because the system is not long in a monitoring state
* resets the health state if the ``reset_health`` parameter was set to True
* sets the observation mode to being ``ObsMode.IDLE``
:param reset_health: whether to reset the health state of the device, defaults to True
:type reset_health: bool, optional
"""
self.scan_id = 0
self._curr_scan_config = None
self.config_id = ""
self.processing_mode = PstProcessingMode.IDLE
try:
self._update_component_state(obsfault=False)
except Exception:
# ignore error
pass
try:
self._update_component_state(scanning=False)
except Exception:
# ignore error
pass
try:
self._update_component_state(configured=False)
except Exception:
# ignore error
pass
self._reset_monitoring_properties()
if reset_health:
self._device_interface.update_health_state(health_state=HealthState.OK)
self._device_interface.update_obs_mode(ObsMode.IDLE)
@override
def deconfigure_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""
Deconfigure scan for this component.
:param task_callback: callback to be called when the status of the command changes
"""
# note that the Tango command is GoToIdle
self.logger.info("'GoToIdle' command requested")
def _completion_callback(task_callback: Callable) -> None:
self._reset_state(reset_health=False)
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'GoToIdle' command completed successfully. ObsState={self.obs_state.name}")
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'GoToIdle' command running")),
*self._deconfigure_tasks(cmd="deconfigure_scan"),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _deconfigure_tasks(self: PstBeamComponentManager, cmd: str) -> List[Task]:
return [
# need to deconfigure scan of all processes, this can be done in parallel.
_assert_obs_state_task(
subcomponents=self._subcomponents,
allowed_states=ObsState.READY,
cmd=cmd,
),
_subcomponent_task(
subcomponents=self._subcomponents,
action_name="deconfigure_scan",
action=lambda sc: sc.deconfigure_scan(),
),
# need to release the ring buffer clients before deconfiguring SMRB
_subcomponent_task(
subcomponents=[
self._dsp_subcomponent,
self._recv_subcomponent,
self._stat_subcomponent,
],
action_name="deconfigure_beam",
action=lambda sc: sc.deconfigure_beam(),
),
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="deconfigure_beam",
action=lambda sc: sc.deconfigure_beam(),
),
]
@override
def scan(
self: PstBeamComponentManager,
task_callback: Callback = None,
**kwargs: Any,
) -> TaskResponse:
"""Start scanning.
The kwargs of this method is scan request, including the scan_id. The super class
method has the same signature. By using the kwargs allow for forward compatibility
of accepting other parameters for the starting of the scan.
:param task_callback: callback for background processing to update device status.
:type task_callback: Callback
:param kwargs: scan request as a dict
:type kwargs: dict
"""
self.logger.info("'Scan' command requested")
scan_id = int(kwargs["scan_id"])
def _completion_callback(task_callback: Callable) -> None:
self._update_component_state(scanning=True)
self.scan_id = scan_id
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'Scan' command completed successfully. ObsState={self.obs_state.name}")
def _scan(sc: PstSubcomponentManager) -> None:
sc.scan(**kwargs)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'Scan' command running")),
_assert_obs_state_task(
subcomponents=self._subcomponents,
allowed_states=ObsState.READY,
cmd="scan",
),
LambdaTask(
action=lambda: self._write_scan_config_to_output_dir(scan_id),
name="write_scan_config_to_output_dir",
),
_subcomponent_task(
subcomponents=[self._recv_subcomponent, self._smrb_subcomponent],
action_name="scan",
action=_scan,
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent],
action_name="scan",
action=_scan,
),
_subcomponent_task(
subcomponents=[self._stat_subcomponent],
action_name="scan",
action=_scan,
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _write_scan_config_to_output_dir(self: PstBeamComponentManager, scan_id: int) -> None:
"""Write the scan configuration out as JSON."""
self.logger.debug(f"Writing scan configuration for scan {scan_id}")
# dump current scan configuration as JSON
params = {
"eb_id": self._curr_scan_config["common"]["eb_id"], # type: ignore
"subsystem_id": self._device_interface.subsystem_id,
"scan_id": str(scan_id),
}
output_dir_str = self._device_interface.scan_output_dir_pattern
for k, v in params.items():
output_dir_str = output_dir_str.replace(f"<{k}>", v)
try:
output_dir = pathlib.Path(output_dir_str)
output_dir.mkdir(parents=True, exist_ok=True)
scan_configuration_path = output_dir / SCAN_CONFIG_FILE_NAME
self.logger.info(f"Writing scan configuration for scan {scan_id} to {scan_configuration_path}")
with open(scan_configuration_path, "w") as f:
json.dump(self._curr_scan_config, f)
except Exception:
self.logger.exception("Error in writing output file.", exc_info=True)
raise
@override
def end_scan(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Stop scanning.
:param task_callback: callback to be called when the status of the command changes
"""
self.logger.info("'EndScan' command requested")
def _completion_callback(task_callback: Callable) -> None:
self._update_component_state(scanning=False)
self.scan_id = 0
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'EndScan' command completed successfully. ObsState={self.obs_state.name}")
# need to stop_scan on RECV before DSP and STAT, then SMRB
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'EndScan' command running")),
_assert_obs_state_task(
subcomponents=self._subcomponents,
allowed_states=ObsState.SCANNING,
cmd="end_scan",
),
_subcomponent_task(
subcomponents=[self._recv_subcomponent],
action_name="end_scan",
action=lambda sc: sc.end_scan(),
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent, self._stat_subcomponent],
action_name="end_scan",
action=lambda sc: sc.end_scan(),
),
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="end_scan",
action=lambda sc: sc.end_scan(),
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
def _abort_task(self: PstBeamComponentManager, subcomponents: List[PstSubcomponentManager]) -> Task:
subcomponents_to_abort = [
sc
for sc in subcomponents
if sc.remote_obs_state not in [ObsState.ABORTED, ObsState.FAULT, ObsState.EMPTY]
]
return _subcomponent_task(
subcomponents=subcomponents_to_abort, action_name="abort", action=lambda sc: sc.abort()
)
@override
def abort(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Tell the component to abort whatever it was doing.
:param task_callback: callback to be called when the status of the command changes
"""
self.logger.info("'Abort' command requested")
def _completion_callback(task_callback: Callable) -> None:
self._update_component_state(scanning=False)
self.abort_commands()
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'Abort' command completed successfully. ObsState={self.obs_state.name}")
self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'Abort' command running")),
self._abort_task([self._recv_subcomponent]),
self._abort_task([self._dsp_subcomponent, self._stat_subcomponent]),
self._abort_task([self._smrb_subcomponent]),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
return TaskStatus.IN_PROGRESS, "Aborting"
def _obsreset_task(self: PstBeamComponentManager, subcomponents: List[PstSubcomponentManager]) -> Task:
# need to use the remote state to ensure that the server's precondition check doesn't fail.
def _perform_obsreset(sc: PstSubcomponentManager) -> None:
if sc.remote_obs_state in [ObsState.ABORTED, ObsState.FAULT]:
sc.obsreset()
return _subcomponent_task(
subcomponents=subcomponents, action_name="obsreset", action=_perform_obsreset
)
@override
def obsreset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Reset the component and put it into an IDLE state.
The subcomponents have all the resources released and as such are moved back to
the EMPTY state.
:param task_callback: callback to be called when the status of the command changes
"""
self.logger.info("'ObsReset' command requested")
def _completion_callback(task_callback: Callable) -> None:
self._reset_state()
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'ObsReset' command completed successfully. ObsState={self.obs_state.name}")
abort_recv_subtask: Task = self._abort_task([self._recv_subcomponent])
abort_readers_subtask: Task = self._abort_task([self._dsp_subcomponent, self._stat_subcomponent])
abort_smrb_subtask: Task = self._abort_task([self._smrb_subcomponent])
obsreset_subtasks = self._obsreset_task(
[
self._stat_subcomponent,
self._dsp_subcomponent,
self._recv_subcomponent,
]
)
obsreset_smrb_subtask = self._obsreset_task([self._smrb_subcomponent])
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'ObsReset' command running")),
abort_recv_subtask,
abort_readers_subtask,
abort_smrb_subtask,
obsreset_subtasks,
obsreset_smrb_subtask,
],
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@override
def reset(self: PstBeamComponentManager, task_callback: Callback = None) -> TaskResponse:
"""Reset the component and put it into an IDLE state.
The subcomponents have all the resources released and as such are moved back to
the IDLE state.
:param task_callback: callback to be called when the status of the command changes
"""
self.logger.info("'Reset' command requested")
# call reset on all subcomponents SMRB has to be done last but the others can be done in parallel
subcomponents_to_reset: List[PstSubcomponentManager] = [
sc
for sc in cast(
List[PstSubcomponentManager],
[
self._stat_subcomponent,
self._dsp_subcomponent,
self._recv_subcomponent,
],
)
]
def _completion_callback(task_callback: Callable) -> None:
self._reset_state()
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'Reset' command completed successfully. ObsState={self.obs_state.name}")
reset_subtasks = _subcomponent_task(
subcomponents=subcomponents_to_reset, action_name="reset", action=lambda sc: sc.reset()
)
reset_smrb_subtask = _subcomponent_task(
subcomponents=[self._smrb_subcomponent], action_name="reset", action=lambda sc: sc.reset()
)
connect_subtask = _subcomponent_task(
subcomponents=self._subcomponents, action_name="connect", action=lambda sc: sc.connect()
)
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'Reset' command running")),
reset_subtasks,
reset_smrb_subtask,
connect_subtask,
],
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@override
def go_to_fault(
self: PstBeamComponentManager, fault_msg: str, task_callback: Callback = None
) -> TaskResponse:
"""Put all the sub-devices into a FAULT state.
:param task_callback: callback to be called when the status of the command changes
"""
self.logger.info("'GoToFault' command requested")
def _completion_callback(task_callback: Callable) -> None:
self._update_component_state(obsfault=True)
self._device_interface.handle_fault(fault_msg=fault_msg)
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully"))
self.logger.info(f"'GoToFault' command completed successfully. ObsState={self.obs_state.name}")
return self._submit_remote_job(
job=SequentialTask(
subtasks=[
LambdaTask(action=lambda: self.logger.debug("'GoToFault' command running")),
_subcomponent_task(
subcomponents=[self._recv_subcomponent],
action_name="go_to_fault",
action=lambda sc: sc.go_to_fault(fault_msg=fault_msg),
),
_subcomponent_task(
subcomponents=[self._dsp_subcomponent, self._stat_subcomponent],
action_name="go_to_fault",
action=lambda sc: sc.go_to_fault(fault_msg=fault_msg),
),
_subcomponent_task(
subcomponents=[self._smrb_subcomponent],
action_name="go_to_fault",
action=lambda sc: sc.go_to_fault(fault_msg=fault_msg),
),
]
),
task_callback=task_callback,
completion_callback=_completion_callback,
)
@override
def set_logging_level(self: PstBeamComponentManager, log_level: LoggingLevel) -> None:
"""
Set LoggingLevel of all the sub-devices.
:param log_level: The required TANGO LoggingLevel
:returns: None.
"""
for subcomponent in self._subcomponents:
subcomponent.set_logging_level(log_level=log_level)
@property
def monitoring_polling_rate_ms(self: PstBeamComponentManager) -> int:
"""Get the current monitoring polling rate, in milliseconds."""
return self._monitoring_polling_rate_ms
@monitoring_polling_rate_ms.setter
def monitoring_polling_rate_ms(self: PstBeamComponentManager, monitoring_polling_rate_ms: int) -> None:
"""Set the monitoring polling rate on the subordinate devices."""
for subcomponent in self._subcomponents:
subcomponent.monitoring_polling_rate_ms = monitoring_polling_rate_ms
self._monitoring_polling_rate_ms = monitoring_polling_rate_ms
@property
def health_check_interval(self: PstBeamComponentManager) -> int:
"""
Get the current health check interval, in milliseconds.
:return: the current health check interval, in milliseconds.
:rtype: int
"""
return self._health_check_interval
@health_check_interval.setter
def health_check_interval(self: PstBeamComponentManager, health_check_interval: int) -> None:
"""
Set the health check interval, in milliseconds.
Setting this parameter will restart the health check process of the background applications.
:param health_check_interval: the updated health check interval, in milliseconds.
:type health_check_interval: int
"""
if self._health_check_interval != health_check_interval:
self._health_check_interval = health_check_interval
for scm in self._subcomponents:
scm.health_check_interval = health_check_interval
scm.restart_health_check()
@property
def recv_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the RECV subcomponent.
:return: the current health state for the RECV subcomponent.
:rtype: HealthState
"""
return self._recv_subcomponent.health_state
@property
def recv_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the RECV subcomponent.
:return: the current observing state for the RECV subcomponent.
:rtype: ObsState
"""
return self._recv_subcomponent.obs_state
@property
def smrb_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the SMRB subcomponent.
:return: the current health state for the SMRB subcomponent.
:rtype: HealthState
"""
return self._smrb_subcomponent.health_state
@property
def smrb_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the SMRB subcomponent.
:return: the current observing state for the SMRB subcomponent.
:rtype: ObsState
"""
return self._smrb_subcomponent.obs_state
@property
def stat_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the STAT subcomponent.
:return: the current health state for the STAT subcomponent.
:rtype: HealthState
"""
return self._stat_subcomponent.health_state
@property
def stat_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the STAT subcomponent.
:return: the current observing state for the STAT subcomponent.
:rtype: ObsState
"""
return self._stat_subcomponent.obs_state
@property
def dsp_health_state(self: PstBeamComponentManager) -> HealthState:
"""
Get the current health state for the DSP subcomponent.
:return: the current health state for the DSP subcomponent.
:rtype: HealthState
"""
return self._dsp_subcomponent.health_state
@property
def dsp_obs_state(self: PstBeamComponentManager) -> ObsState:
"""
Get the current observing state for the DSP subcomponent.
:return: the current observing state for the DSP subcomponent.
:rtype: ObsState
"""
return self._dsp_subcomponent.obs_state
@property
def _subcomponent_health_states(self: PstBeamComponentManager) -> Dict[str, HealthState]:
return {
"RECV": self.recv_health_state,
"SMRB": self.smrb_health_state,
"STAT": self.stat_health_state,
"DSP": self.dsp_health_state,
}
[docs] def handle_health_state_change(self: PstBeamComponentManager) -> None:
"""
Handle a change in one of the subcomponent's health state.
This method will aggregate the overall health state of the PST BEAM based
on the states of the subcomponents based on the following rules:
* if all subcomponents have a state of HealthState.OK, then the beam's state is HealthState.OK
* if one subcomponent has a state of HealthState.DEGRADED, then the beam's state
is HealthState.DEGRADED
* if multiple subcomponents have a state of HealthState.DEGRADED, then the beam's state is
set to HealthState.FAILED
* if any subcomponent has a state of HealthState.FAILED, then the beam's state is set
to HealthState.FAILED
* finally, if any subcomponent has a state of HealthState.UNKNOWN then overall state is
still HealthState.UNKNOWN
"""
out_state = HealthState.OK
for scm, scm_health_state in self._subcomponent_health_states.items():
if scm_health_state == HealthState.DEGRADED:
self.logger.debug(f"{scm} health state is {scm_health_state}")
if out_state == HealthState.DEGRADED:
self.logger.warning(
f"multiple subcomponents are in degraded state. Putting "
f"{self.beam_id} in FAILED state",
exc_info=False,
)
out_state = HealthState.FAILED
break
else:
out_state = HealthState.DEGRADED
if scm_health_state == HealthState.FAILED:
self.logger.warning(
f"{scm} health state is in FAILED state. Putting {self.beam_id} into FAILED state",
exc_info=False,
)
out_state = HealthState.FAILED
break
if scm_health_state == HealthState.UNKNOWN:
out_state = HealthState.UNKNOWN
self._device_interface.update_health_state(health_state=out_state)
def __getattr__(self: PstBeamComponentManager, name: str) -> Any:
"""
Get attribute of component manager.
This is a Python dunder method that is used to get attributes/properties
that have not been found by the ``__getattribute__`` method.
This allows delegating getting the attribute from sub-component managers without
needing to have code specific to delegate getting the property.
:param name: the name of the attribute
:type name: str
:return: the attribute value
:rtype: Any
:raises: AttributeError if attribute cannot be found on sub-component
"""
for sc in self._subcomponents:
try:
return getattr(sc, name)
except AttributeError:
pass
self.logger.error(f"Could not find attribute with {name=}")
raise AttributeError(name=name)