# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides the Sub Component Manager for PST.LMC."""
from __future__ import annotations
__all__ = [
"PstSubcomponentManager",
]
import logging
import queue
from typing import Any
from overrides import EnforceOverrides
from ska_control_model import HealthState, LoggingLevel, ObsState, SimulationMode
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from .subcomponent_event import SubcomponentEventMessage
[docs]class PstSubcomponentManager(EnforceOverrides):
"""
An abstract base sub component manager.
This class is used as a base class for all the PST subcomponents that are managed
by the PST BEAM.MGMT TANGO device. For specific components such as RECV.CORE,
SMRB.CORE, DSP.CORE, etc, the sub-component managers should extend from
the :py:class:`PstProcessApiSubcomponentManager` which uses a process API to
manage the core application.
In the case of the DSP sub-components an umbrella DSP subcomponent manager
should be used to orchestrate the requests going to a specific sub-component
depending on the scan configuration and the processing mode.
"""
_simulation_mode: SimulationMode
def __init__(
self: PstSubcomponentManager,
*,
beam_id: int,
device_name: str,
subcomponent_name: str,
event_queue: queue.Queue[SubcomponentEventMessage],
simulation_mode: SimulationMode = SimulationMode.TRUE,
monitoring_polling_rate_ms: int = DEFAULT_MONITORING_INTERVAL_MS,
health_check_interval: int = DEFAULT_HEALTH_CHECK_INTERVAL_MS,
logger: logging.Logger | None = None,
**kwargs: Any,
) -> None:
"""
Initialise the base instance of the subcomponent manager.
:param beam_id: the ID for the BEAM instance.
:type beam_id: int,
:param device_name: the FQDN of the current device. This
is used within the gRPC process to identify who is
doing the calling.
:type device_name: str
:param subcomponent_name: the name for the subcomponent, this
is used to build up the client ID passed to gRPC API instances.
:type subcomponent_name: str
:param event_queue: the queue to send state events back to.
:type event_queue: queue.Queue[SubcomponentStateEventMessage]
:param simulation_mode: _description_, defaults to SimulationMode.TRUE
:type simulation_mode: SimulationMode, optional
:param monitoring_polling_rate_ms: the monitoring polling interval, in milliseconds,
defaults to 5000
:type monitoring_polling_rate_ms: int, optional
:param health_check_interval: the polling interval used for perform a health check of
the subcomponent, defaults to 1000.
:type health_check_interval: int, optional
:param logger: the logger to use for logging, defaults to None
:type logger: logging.Logger | None, optional
"""
self._beam_id = beam_id
self._device_name = device_name
self.subcomponent_name = subcomponent_name
self.subcomponent_id = f"{self._device_name}/{subcomponent_name}"
self._simulation_mode = simulation_mode
self._monitoring_polling_rate_ms = monitoring_polling_rate_ms
self._obs_state = ObsState.EMPTY
self._health_state = HealthState.UNKNOWN
self._fault_msg: str | None = None
self._event_queue = event_queue
self._health_check_interval = health_check_interval
self.logger = logger or logging.getLogger(__name__)
super().__init__()
# ensure that the initial state is sent to the queue.
self._fire_state_event()
@property
def beam_id(self: PstSubcomponentManager) -> int:
"""Get beam ID for the current subcomponent."""
return self._beam_id
@property
def simulation_mode(self: PstSubcomponentManager) -> SimulationMode:
"""
Get value of simulation mode state.
:returns: current simulation mode state.
"""
return self._simulation_mode
@simulation_mode.setter
def simulation_mode(self: PstSubcomponentManager, simulation_mode: SimulationMode) -> None:
"""
Set simulation mode state.
:param simulation_mode: the new simulation mode value.
:type simulation_mode: :py:class:`SimulationMode`
"""
if self._simulation_mode != simulation_mode:
self._simulation_mode = simulation_mode
self._simulation_mode_changed()
def _simulation_mode_changed(self: PstSubcomponentManager) -> None:
"""Handle an update to the simulation mode."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
@property
def monitoring_polling_rate_ms(self: PstSubcomponentManager) -> int:
"""Get the current monitoring polling rate, in milliseconds."""
return self._monitoring_polling_rate_ms
@monitoring_polling_rate_ms.setter
def monitoring_polling_rate_ms(self: PstSubcomponentManager, monitoring_polling_rate_ms: int) -> None:
"""Set the monitoring polling rate on the subordinate devices."""
self._monitoring_polling_rate_ms = monitoring_polling_rate_ms
self._monitoring_polling_rate_ms_updated()
def _monitoring_polling_rate_ms_updated(self: PstSubcomponentManager) -> None:
"""Handle that the monitoring polling rate was updated.
The default of this does nothing but it allows for the DSP sub-component manager to pass
the value to the different sub components.
"""
def _fire_state_event(self: PstSubcomponentManager) -> None:
"""Fire a state event message back to the BEAM component manager."""
self._event_queue.put(
SubcomponentEventMessage(
subcomponent_name=self.subcomponent_name,
health_state=self.health_state,
obs_state=self.obs_state,
)
)
@property
def obs_state(self: PstSubcomponentManager) -> ObsState:
"""
Get the current observing state of sub-component.
:return: the current observing state of sub-component.
:rtype: ObsState
"""
return self._obs_state
@obs_state.setter
def obs_state(self: PstSubcomponentManager, obs_state: ObsState) -> None:
"""
Set the current observing state of sub-component.
:param obs_state: the current observing state of the sub-component.
:type obs_state: ObsState
"""
self._obs_state = obs_state
self._fire_state_event()
@property
def health_state(self: PstSubcomponentManager) -> HealthState:
"""
Get the current health state of the sub-component.
:return: the current health state of the sub-component.
:rtype: HealthState
"""
return self._health_state
@health_state.setter
def health_state(self: PstSubcomponentManager, health_state: HealthState) -> None:
"""
Set the current health state of the sub-component.
:param health_state: the current health state of the sub-component.
:type health_state: HealthState
"""
self._health_state = health_state
self._fire_state_event()
@property
def health_check_interval(self: PstSubcomponentManager) -> int:
"""
Get the current health check interval, in milliseconds.
:return: the current health check interval, in milliseconds.
:rtype: int
"""
return self._health_check_interval
@health_check_interval.setter
def health_check_interval(self: PstSubcomponentManager, health_check_interval: int) -> None:
"""
Set the health check interval, in milliseconds.
:param health_check_interval: the updated health check interval, in milliseconds.
:type health_check_interval: int
"""
self._health_check_interval = health_check_interval
@property
def fault_msg(self: PstSubcomponentManager) -> str | None:
"""Get the current fault message."""
return self._fault_msg
#################
# Commands
#################
[docs] def scan(self: PstSubcomponentManager, scan_id: int, **kwargs: Any) -> None:
"""
Start scanning.
The kwargs of this method is scan request. By using the kwargs allow for
forward compatibility of accepting other parameters for the starting of the scan.
:param scan_id: the scan ID
:type scan_id: int
:param kwargs: scan request as a dict
:type kwargs: dict
"""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def end_scan(self: PstSubcomponentManager) -> None:
"""Stop scanning."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def abort(self: PstSubcomponentManager) -> None:
"""
Abort current process.
The only long lived process for API based devices is that of SCANNING. However, if another system
fails this can be used to put all the subsystems into an ABORTED state.
"""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def obsreset(self: PstSubcomponentManager) -> None:
"""
Reset service.
This is used to reset a service in ABORTED or FAULT states back to an EMPTY state. This will
deconfigure a scan and beam.
"""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def reset(self: PstSubcomponentManager) -> None:
"""
Restart service.
This is used to restart a service regardless of state.
"""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def go_to_fault(self: PstSubcomponentManager, fault_msg: str) -> None:
"""
Set the component into a FAULT state.
For BEAM this will make the sub-devices be put into a FAULT state. For API backed component managers
it is expected that the service backing that API should be put into a FAULT state.
"""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def set_logging_level(self: PstSubcomponentManager, log_level: LoggingLevel) -> None:
"""
Set LoggingLevel.
:param log_level: The required TANGO LoggingLevel
:returns: None.
"""
"""Set the LoggingLevel of the service."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def reset_monitoring(self: PstSubcomponentManager) -> None:
"""Stop monitoring and reset monitoring data."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def connect(self: PstSubcomponentManager) -> None:
"""Establish connection to API component."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def disconnect(self: PstSubcomponentManager) -> None:
"""Establish connection to API component."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def start_health_check(self: PstSubcomponentManager) -> None:
"""Start performing health check on subcomponent."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def stop_health_check(self: PstSubcomponentManager) -> None:
"""Stop background health check on subcomponent."""
raise NotImplementedError("PstSubcomponentManager is abstract.")
[docs] def restart_health_check(self: PstSubcomponentManager) -> None:
"""Restart the health check background process for the current subcomponent."""
self.stop_health_check()
self.start_health_check()
@property
def remote_obs_state(self: PstSubcomponentManager) -> ObsState:
"""Get the ObsState of the remote system."""
raise NotImplementedError("PstSubcomponentManager is abstract.")