# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module a class that abstracts DSP handling over the various sub-components."""
from __future__ import annotations
import logging
from dataclasses import fields
from typing import Any, Callable, List, Set, cast
from overrides import EnforceOverrides, override
from ska_control_model import HealthState, LoggingLevel, ObsState, PstProcessingMode
from ska_pst.lmc.component import PstProcessApiSubcomponentManager, PstSubcomponentManager
from .dsp_disk_component_manager import PstDspDiskComponentManager
from .dsp_disk_model import DspDiskMonitorData
from .dsp_ft_component_manager import PstDspFlowThroughComponentManager
from .dsp_pipeline_model import DspPipelineMonitorData
[docs]class PstDspComponentManager(PstSubcomponentManager, EnforceOverrides):
"""
A class to handle the orchestration of requests to various DSP sub-components.
This class ensures that given a configure beam request that all future requests
will go to the appropriate DSP sub-component until such time that a deconfigure
beam request is performed.
"""
def __init__(
self: PstDspComponentManager,
*,
device_name: str,
dsp_disk_process_api_endpoint: str,
dsp_flow_through_process_api_endpoint: str,
dsp_disk_component_manager: PstDspDiskComponentManager | None = None,
dsp_flow_through_component_manager: PstDspFlowThroughComponentManager | None = None,
logger: logging.Logger | None = None,
**kwargs: Any,
) -> None:
"""Initialise the DSP component manager.
:param device_name: the FQDN of the current device. This
is used within the gRPC process to identify who is
doing the calling.
:type device_name: str
:param dsp_disk_process_api_endpoint: the gRPC API endpoint to use
when communicating with DSP.DISK.
:type dsp_disk_process_api_endpoint: str
:param dsp_flow_through_process_api_endpoint: the gRPC API endpoint to use
when communicating with DSP.FT.
:type dsp_flow_through_process_api_endpoint: str
:param logger: a logger for this object to use, default None.
:type logger: logging.Logger | None, optional
"""
logger = logger or logging.getLogger(__name__)
logger.debug(f"Setting up DSP component manager with device_name='{device_name}'")
super().__init__(device_name=device_name, subcomponent_name="dsp", **kwargs)
self._dsp_disk_component_manager = dsp_disk_component_manager or PstDspDiskComponentManager(
device_name=device_name,
logger=logger,
process_api_endpoint=dsp_disk_process_api_endpoint,
**kwargs,
)
self._dsp_flow_through_component_manager = (
dsp_flow_through_component_manager
or PstDspFlowThroughComponentManager(
device_name=device_name,
logger=logger,
process_api_endpoint=dsp_flow_through_process_api_endpoint,
**kwargs,
)
)
self._processing_mode: PstProcessingMode = PstProcessingMode.IDLE
self._dsp_pipeline_field_names: Set[str] = {f.name for f in fields(DspPipelineMonitorData)}
@property
def obs_state(self: PstDspComponentManager) -> ObsState:
"""
Get the current observing state of sub-component.
:return: the current observing state of sub-component.
:rtype: ObsState
"""
return self._obs_state
@obs_state.setter
def obs_state(self: PstDspComponentManager, obs_state: ObsState) -> None:
"""
Set the current observing state of sub-component.
:param obs_state: the current observing state of the sub-component.
:type obs_state: ObsState
"""
self._obs_state = obs_state
@override
def _simulation_mode_changed(self: PstDspComponentManager) -> None:
"""
Handle an update to the simulation mode.
This requires setting the simulation mode on the all the DSP subcomponent managers.
"""
self._dsp_disk_component_manager.simulation_mode = self.simulation_mode
self._dsp_flow_through_component_manager.simulation_mode = self.simulation_mode
@property
def current_dsp_subcomponent(self: PstDspComponentManager) -> PstProcessApiSubcomponentManager:
"""Get the current DSP sub-component base on processing mode.
:raises AssertionError: if current processing mode is not set or not supported.
:return: the currently active DSP subcomponent
:rtype: PstProcessApiSubcomponentManager
"""
assert (
self._processing_mode != PstProcessingMode.IDLE
), "expected that there is a current processing mode set"
if self._processing_mode == PstProcessingMode.VOLTAGE_RECORDER:
return self._dsp_disk_component_manager
elif self._processing_mode == PstProcessingMode.FLOW_THROUGH:
return self._dsp_flow_through_component_manager
raise AssertionError(f"currently no support for {self._processing_mode.name} processing mode")
@override
def validate_configure_scan(self: PstDspComponentManager, configuration: dict) -> None:
"""Validate configure scan request with the specific configuration of the component.
This checks what the operational mode in the configuration is routes the request to the correct.
:param configuration: the configuration to validate.
:type configuration: dict
"""
pst_processing_mode: PstProcessingMode = configuration["pst_processing_mode"]
if pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER:
self._dsp_disk_component_manager.validate_configure_scan(configuration=configuration)
elif pst_processing_mode == PstProcessingMode.FLOW_THROUGH:
self._dsp_flow_through_component_manager.validate_configure_scan(configuration=configuration)
else:
raise NotImplementedError(f"currently no support for {pst_processing_mode.name} processing mode")
@override
def configure_beam(self: PstDspComponentManager, configuration: dict) -> None:
"""
Configure the beam specific configuration of the component.
:param configuration: configuration for beam
:type configuration: dict
"""
try:
self._processing_mode = configuration["pst_processing_mode"]
self.current_dsp_subcomponent.configure_beam(configuration=configuration)
self.obs_state = ObsState.IDLE
except Exception:
# ensure we reset the current processing mode
self._processing_mode = PstProcessingMode.IDLE
raise
@override
def deconfigure_beam(self: PstDspComponentManager) -> None:
"""
Deconfigure the component's beam configuration.
This will release all the resources associated with the component, including the SMRBs.
"""
self.current_dsp_subcomponent.deconfigure_beam()
self._processing_mode = PstProcessingMode.IDLE
self.obs_state = ObsState.EMPTY
@override
def configure_scan(self: PstDspComponentManager, configuration: dict) -> None:
"""
Configure the component for a scan.
:param configuration: the configuration to be configured
:type configuration: dict
"""
self.current_dsp_subcomponent.configure_scan(configuration=configuration)
self.obs_state = ObsState.READY
@override
def deconfigure_scan(self: PstDspComponentManager) -> None:
"""
Deconfigure this component for current scan configuration.
:param task_callback: callback for background processing to update device status.
:type task_callback: Callback
"""
self.current_dsp_subcomponent.deconfigure_scan()
self.obs_state = ObsState.IDLE
@override
def scan(self: PstDspComponentManager, scan_id: int, **kwargs: Any) -> None:
"""
Start scanning.
The kwargs of this method is scan request. By using the kwargs allow for
forward compatibility of accepting other parameters for the starting of the scan.
:param scan_id: the scan ID
:type scan_id: int
:param kwargs: scan request as a dict
:type kwargs: dict
"""
self.current_dsp_subcomponent.scan(scan_id=scan_id, **kwargs)
self.obs_state = ObsState.SCANNING
@override
def end_scan(self: PstDspComponentManager) -> None:
"""Stop scanning."""
self.current_dsp_subcomponent.end_scan()
self.obs_state = ObsState.READY
@override
def abort(self: PstDspComponentManager) -> None:
"""
Abort current process.
The only long lived process for API based devices is that of SCANNING. However, if another system
fails this can be used to put all the subsystems into an ABORTED state.
The CSP.LMC can call abort before the system is in a configured state, if there
is not processing mode set then apply abort to all sub-components.
"""
if self._processing_mode == PstProcessingMode.IDLE:
self._with_all_subcomponents(lambda cm: cm.abort())
else:
self.current_dsp_subcomponent.abort()
self.obs_state = ObsState.ABORTED
@override
def obsreset(self: PstDspComponentManager) -> None:
"""
Reset service.
This is used to reset a service in ABORTED or FAULT states back to an EMPTY state. This will
deconfigure a scan and beam.
"""
# it's possible that the go_to_fault has put all components into a FAULT state
# so we need to do an obsreset on all.
if self.obs_state == ObsState.FAULT or self._processing_mode == PstProcessingMode.IDLE:
self._with_all_subcomponents(lambda cm: cm.obsreset())
else:
self.current_dsp_subcomponent.obsreset()
self._processing_mode = PstProcessingMode.IDLE
self.obs_state = ObsState.EMPTY
@override
def reset(self: PstDspComponentManager) -> None:
"""
Reset service.
This is used to reset all dsp services regardless of state. This
will set the state to EMPTY
"""
self._with_all_subcomponents(lambda cm: cm.reset())
self._processing_mode = PstProcessingMode.IDLE
self.obs_state = ObsState.EMPTY
@override
def go_to_fault(self: PstDspComponentManager, fault_msg: str) -> None:
"""
Set the component into a FAULT state.
For BEAM this will make the sub-devices be put into a FAULT state. For API backed component managers
it is expected that the service backing that API should be put into a FAULT state.
"""
self._with_all_subcomponents(lambda cm: cm.go_to_fault(fault_msg=fault_msg))
self.obs_state = ObsState.FAULT
def _with_all_subcomponents(
self: PstDspComponentManager, action: Callable[[PstProcessApiSubcomponentManager], None]
) -> None:
"""Perform an action on all the sub-components."""
for cm in cast(
List[PstProcessApiSubcomponentManager],
[self._dsp_disk_component_manager, self._dsp_flow_through_component_manager],
):
action(cm)
@override
def set_logging_level(self: PstDspComponentManager, log_level: LoggingLevel) -> None:
"""
Set the LoggingLevel of the service.
:param log_level: The required TANGO LoggingLevel
:returns: None.
"""
self._with_all_subcomponents(lambda cm: cm.set_logging_level(log_level=log_level))
@override
def reset_monitoring(self: PstDspComponentManager) -> None:
"""Stop monitoring and reset monitoring data."""
self._with_all_subcomponents(lambda cm: cm.reset_monitoring())
@override
def connect(self: PstDspComponentManager) -> None:
"""Establish connection to API components."""
self._with_all_subcomponents(lambda cm: cm.connect())
@override
def disconnect(self: PstDspComponentManager) -> None:
"""Establish disconnect from API components."""
self._with_all_subcomponents(lambda cm: cm.disconnect())
@override
def start_health_check(self: PstDspComponentManager) -> None:
"""Start background health check processing on DSP subcomponents."""
self._with_all_subcomponents(lambda cm: cm.start_health_check())
@override
def stop_health_check(self: PstDspComponentManager) -> None:
"""Stop background health check on DSP subcomponents."""
self._with_all_subcomponents(lambda cm: cm.stop_health_check())
@property
def dsp_disk_monitor_data(self: PstDspComponentManager) -> DspDiskMonitorData:
"""Get monitoring data from DSP.DISK."""
return self._dsp_disk_component_manager.monitor_data
@property
def dsp_flow_through_monitor_data(self: PstDspComponentManager) -> DspPipelineMonitorData:
"""Get monitoring data from DSP.FT."""
return self._dsp_flow_through_component_manager.monitor_data
@property
def dsp_disk_health_state(self: PstDspComponentManager) -> HealthState:
"""
Get the current health state for the DSP.DISK (voltage recorder) subcomponent.
:return: the current health state for the DSP.DISK subcomponent.
:rtype: HealthState
"""
return self._dsp_disk_component_manager.health_state
@property
def dsp_disk_obs_state(self: PstDspComponentManager) -> ObsState:
"""
Get the current observing state for the DSP.DISK (voltage recorder) subcomponent.
:return: the current observing state for the DSP.DISK subcomponent.
:rtype: HealthState
"""
return self._dsp_disk_component_manager.obs_state
@property
def dsp_flow_through_health_state(self: PstDspComponentManager) -> HealthState:
"""
Get the current health state for the DSP.FT (flow through) subcomponent.
:return: the current health state for the DSP.FT subcomponent.
:rtype: HealthState
"""
return self._dsp_flow_through_component_manager.health_state
@property
def dsp_flow_through_obs_state(self: PstDspComponentManager) -> ObsState:
"""
Get the current observing state for the DSP.FT (flow through) subcomponent.
:return: the current observing state for the DSP.FT subcomponent.
:rtype: HealthState
"""
return self._dsp_flow_through_component_manager.obs_state
def __getattr__(self: PstDspComponentManager, name: str) -> Any:
"""
Get attribute of component manager.
This is a Python dunder method that is used to get attributes/properties
that have not been found by the ``__getattribute__`` method.
This allows delegating getting the attribute from sub-component managers without
needing to have code specific to delegate getting the property.
:param name: the name of the attribute
:type name: str
:return: the attribute value
:rtype: Any
:raises: AttributeError if attribute cannot be found on sub-component
"""
if name.startswith("dsp_disk_"):
name = name[9:]
return getattr(self._dsp_disk_component_manager, name)
if name.startswith("dsp_flow_through_"):
name = name[17:]
return getattr(self._dsp_flow_through_component_manager, name)
if name in self._dsp_pipeline_field_names:
if self._processing_mode == PstProcessingMode.IDLE:
# all the default values are zero
return 0
else:
return getattr(self.current_dsp_subcomponent, name)
raise AttributeError(name=name)