# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides the base Process API Component Manager for PST.LMC."""
from __future__ import annotations
__all__ = [
"PstProcessApiSubcomponentManager",
]
import dataclasses
import threading
import time
from functools import cache
from typing import Any, Callable, Generic, TypeVar
from overrides import EnforceOverrides, final, override
from ska_control_model import HealthState, LoggingLevel, ObsState, SimulationMode
from ska_pst.lmc.health_check import HealthCheckState
from ska_pst.lmc.util import background_task
from .grpc_process_api import GrpcApiStrategy, PstProcessApiGrpc
from .monitor_data_handler import MonitorDataHandler, MonitorDataStore
from .process_api import PstProcessApi
from .pst_simulator import PstSimulator
from .simulator_process_api import PstProcessApiSimulator
from .subcomponent_manager import PstSubcomponentManager
SubbandMonitorDataType = TypeVar("SubbandMonitorDataType")
MonitorDataType = TypeVar("MonitorDataType")
DataStore = TypeVar("DataStore", bound=MonitorDataStore)
HEALTH_CHECK_MISMATCH_DEBUG_LEVEL: int = 1
"""The number of health check mismatches to do debug level logging on."""
HEALTH_CHECK_MISMATCH_INFO_LEVEL: int = 2
"""The number of health check mismatches to do info level logging on."""
HEALTH_CHECK_MISMATCH_WARNING_LEVEL: int = 3
"""The number of health check mismatches to do warning level logging on."""
[docs]class PstProcessApiSubcomponentManager(
PstSubcomponentManager, EnforceOverrides, Generic[SubbandMonitorDataType, MonitorDataType, DataStore]
):
"""A base subcomponent Manager for the PST.LMC that uses an API.
This extends from the :py:class:`PstBaseSubcomponentManager`
"""
_simulation_mode: SimulationMode
def __init__(
self: PstProcessApiSubcomponentManager,
*,
process_api_endpoint: str,
data_store: DataStore,
monitor_data_updated_callback: Callable[[dict], None],
simulator: PstSimulator,
grpc_strategy: GrpcApiStrategy,
**kwargs: Any,
) -> None:
"""
Initialise instance of the subcomponent manager that uses an API.
:param process_api_endpoint: the process API endpoint to use
if needing to create an instance of gRPC based API instance.
:type process_api_endpoint: str
:param api: the API client to use, this could be a simulated API
or a gRPC based API.
:type api: Api
:param data_store: the data store for scan monitoring data.
:type data_store: DataStore
:param monitor_data_updated_callback: the callback to use when there
is a change to the monitoring data during a scan.
:type monitor_data_updated_callback: Callable[[dict], None]
"""
self._health_state_lock: threading.RLock = threading.RLock()
super().__init__(**kwargs)
self._monitor_data_store = data_store
self._simulator = simulator
self._grpc_strategy = grpc_strategy
self._api = self._simulator_api()
self._monitor_data_handler: MonitorDataHandler[SubbandMonitorDataType, MonitorDataType] = (
MonitorDataHandler(
data_store=data_store,
monitor_data_callback=self.__handle_monitor_data_update,
)
)
self.monitor_data_updated_callback = monitor_data_updated_callback
self.process_api_endpoint = process_api_endpoint
self._reset_condvar: threading.Condition | None = None
self._health_check_state_mismatch_count = 0
@final
@override
def _simulation_mode_changed(self: PstProcessApiSubcomponentManager) -> None:
"""
Handle simulation mode has changed.
The default action for API subcomponent managers is to call the
:py:meth:`_update_api` method.
"""
self._update_api()
[docs] @cache
def get_env(self: PstProcessApiSubcomponentManager) -> dict:
"""Get the environment properties for the service."""
return self._api.get_env()
@final
def __handle_monitor_data_update(
self: PstProcessApiSubcomponentManager, monitor_data: MonitorDataType
) -> None:
"""Handle monitoring data.
This method is used convert the monitoring data into a dict and then call
the :py:attr:`monitor_data_updated_callback` callback.
:param monitor_data: the monitoring data that has just been updated.
:type monitor_data: MonitorDataType
"""
data_dict = dataclasses.asdict(monitor_data) # type: ignore
self.monitor_data_updated_callback(data_dict)
@property
def monitor_data(self: PstProcessApiSubcomponentManager) -> MonitorDataType:
"""Get the current monitoring data."""
return self._monitor_data_handler.monitor_data
@property
def health_check_state_mismatch_count(self: PstProcessApiSubcomponentManager) -> int:
"""Get the number of times that the health check state was not the expected state."""
with self._health_state_lock:
return self._health_check_state_mismatch_count
@health_check_state_mismatch_count.setter
def health_check_state_mismatch_count(
self: PstProcessApiSubcomponentManager, health_check_state_mismatch_count: int
) -> None:
with self._health_state_lock:
self._health_check_state_mismatch_count = health_check_state_mismatch_count
@property
def obs_state(self: PstProcessApiSubcomponentManager) -> ObsState:
"""
Get the current observing state of sub-component.
:return: the current observing state of sub-component.
:rtype: ObsState
"""
return self._obs_state
@obs_state.setter
def obs_state(self: PstProcessApiSubcomponentManager, obs_state: ObsState) -> None:
"""
Set the current observing state of sub-component.
:param obs_state: the current observing state of the sub-component.
:type obs_state: ObsState
"""
with self._health_state_lock:
self._obs_state = obs_state
self._fire_state_event()
# ---------------
# Commands
# ---------------
@override
def validate_configure_scan(self: PstProcessApiSubcomponentManager, configuration: dict) -> None:
"""Validate configure scan request with the specific configuration of the subcomponent.
Note this is for the whole ConfigureScan request for a PST BEAM subcomponent, which includes checking
both the beam and scan configuration is correct. This is due to the fact that a client of BEAM.MGMT
only exposes a ConfigureScan request as it's an Obs device.
:param configuration: the configuration to validate.
:type configuration: dict
"""
raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.")
@override
@final
def configure_beam(self: PstProcessApiSubcomponentManager, configuration: dict) -> None:
"""
Configure the beam specific configuration of the subcomponent.
This method calls the abstract method :py:meth:`_configure_beam`
to ensure state is in the right mode for the subcomponent but
allowing for implementation specific details for each subcomponent.
NOTE: this method is marked as final but each individual subcomponent
manager may need to do specific processing. Subclasses of this
class should implement the ``_configure_beam`` method.
:param configuration: configuration for beam
:type configuration: dict
"""
self.logger.info(f"Performing 'configure_beam' on '{self.subcomponent_name.upper()}'")
self._configure_beam(configuration=configuration)
self.obs_state = ObsState.IDLE
def _configure_beam(self: PstProcessApiSubcomponentManager, configuration: dict) -> None:
"""Configure the beam specific configuration of the subcomponent.
This method is abstract to allow for each subcomponent to do specific
handling of beam configuration.
:param configuration: configuration for beam
:type configuration: dict
"""
raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.")
@override
def deconfigure_beam(self: PstProcessApiSubcomponentManager) -> None:
"""
Deconfigure the subcomponent's beam configuration.
This will release all the resources associated with the subcomponent, including the SMRBs.
"""
self.logger.info(f"Performing 'deconfigure_beam' on '{self.subcomponent_name.upper()}'")
self._api.deconfigure_beam()
self.obs_state = ObsState.EMPTY
@override
@final
def configure_scan(self: PstProcessApiSubcomponentManager, configuration: dict) -> None:
"""
Configure the subcomponent for a scan.
:param configuration: the scan configuration parameters
:type configuration: dict
"""
self.logger.info(f"Performing 'configure_scan' on '{self.subcomponent_name.upper()}'")
self._api.configure_scan(configuration=configuration)
self.obs_state = ObsState.READY
self._api.monitor(
monitor_data_callback=self._monitor_data_handler.handle_subband_data,
polling_rate=self.monitoring_polling_rate_ms,
)
@override
@final
def deconfigure_scan(self: PstProcessApiSubcomponentManager) -> None:
"""Deconfigure this subcomponent for current scan configuration."""
self.logger.info(f"Performing 'deconfigure_scan' on '{self.subcomponent_name.upper()}'")
self._api.deconfigure_scan()
self.obs_state = ObsState.IDLE
self.reset_monitoring()
@override
@final
def scan(self: PstProcessApiSubcomponentManager, scan_id: int, **kwargs: Any) -> None:
"""
Start scanning.
The kwargs of this method is scan request. By using the kwargs allow for
forward compatibility of accepting other parameters for the starting of the scan.
:param scan_id: the scan ID
:type scan_id: int
"""
try:
self.logger.info(f"Performing 'scan' on '{self.subcomponent_name.upper()}'. SCAN_ID={scan_id}")
self._api.start_scan(scan_id=scan_id, **kwargs)
self.obs_state = ObsState.SCANNING
except Exception:
self.logger.exception(f"Error in starting scan for {self.subcomponent_id}", exc_info=True)
raise
@override
@final
def end_scan(self: PstProcessApiSubcomponentManager) -> None:
"""Stop scanning."""
self.logger.info(f"Performing 'end_scan' on '{self.subcomponent_name.upper()}'")
self._api.stop_scan()
self.obs_state = ObsState.READY
@override
@final
def abort(self: PstProcessApiSubcomponentManager) -> None:
"""
Abort current process.
The only long lived process for API based subcomponents is that of
SCANNING. However, if another system fails this can be used to put
all the subcomponents into an ABORTED state.
"""
self.logger.info(f"Performing 'abort' on '{self.subcomponent_name.upper()}'")
try:
self.reset_monitoring()
self._api.abort()
self.obs_state = ObsState.ABORTED
except Exception:
# Don't let the exception bubble up any further.
# If an exception has occurred during aborting then it's okay
# for the service to be in FAULT state as it's a resettable state.
self.obs_state = ObsState.FAULT
@override
@final
def reset_monitoring(self: PstProcessApiSubcomponentManager) -> None:
"""Stop monitoring and reset monitoring data."""
self._api.stop_monitoring()
self._monitor_data_handler.reset_monitor_data()
@override
@final
def obsreset(self: PstProcessApiSubcomponentManager) -> None:
"""
Reset service.
This is used to reset a service in ABORTED or FAULT states back to an EMPTY state. This will
deconfigure a scan and beam.
"""
self.logger.info(f"Performing 'obsreset' on '{self.subcomponent_name.upper()}'")
self._api.reset()
self.obs_state = ObsState.EMPTY
@override
@final
def reset(self: PstProcessApiSubcomponentManager) -> None:
"""
Restart service.
This is used to restart a service regardless of state.
"""
self.logger.info(f"Performing 'reset' on '{self.subcomponent_name.upper()}'")
# The TANGO will ensure that there is only one Reset method
# happening at a time. We need this reset condvar to know from
# the health check background process that the service has exited
# which will allow that for the reset process to complete.
self._reset_condvar = threading.Condition()
# need to lock the reset cond before performing restart on API
# as we use the health check stopping as an indicator
with self._reset_condvar:
self._api.restart()
if self._reset_condvar.wait(1.0):
self.logger.debug(f"Reset condvar for {self.subcomponent_name} been reset.")
else:
self.logger.warning(
f"Waiting for reset of {self.subcomponent_name} service to happen timed out."
)
self._reset_condvar = None
self.obs_state = ObsState.EMPTY
@override
@final
def go_to_fault(self: PstProcessApiSubcomponentManager, fault_msg: str) -> None:
"""
Set the subcomponent into a FAULT state.
For BEAM this will put the subcomponents into a FAULT state. For API backed subcomponent
managers it is expected that the service backing that API should be put into a FAULT state.
:param fault_msg: description of the fault
:type fault_msg: str
"""
self.logger.info(f"Performing 'go_to_fault' on '{self.subcomponent_name.upper()}'")
self.reset_monitoring()
self._api.go_to_fault()
self._set_fault_state(fault_msg)
@final
def _set_fault_state(self: PstProcessApiSubcomponentManager, fault_msg: str) -> None:
self._fault_msg = fault_msg
self.obs_state = ObsState.FAULT
@override
@final
def set_logging_level(self: PstProcessApiSubcomponentManager, log_level: LoggingLevel) -> None:
"""
Set log level of subcomponent.
:param log_level: The required logging level for the subcomponent
:type log_level: LoggingLevel
"""
self._api.set_log_level(log_level=log_level)
@override
def connect(self: PstProcessApiSubcomponentManager) -> None:
"""Establish connection to API subcomponent."""
self._api.connect()
self.health_state = HealthState.OK
self.start_health_check()
@background_task
def _connect_background(self: PstProcessApiSubcomponentManager) -> None:
"""Attempt to connect to API subcomponent in the background."""
self.connect()
@override
def disconnect(self: PstProcessApiSubcomponentManager) -> None:
"""Disconnect from API subcomponent."""
self._api.stop_health_check(wait_for_task=False)
self._api.disconnect()
self.health_state = HealthState.UNKNOWN
@final
def _update_api(self: PstProcessApiSubcomponentManager) -> None:
"""
Update API used by subcomponent manager.
This is called when there is a change in the simulation mode.
"""
self.disconnect()
if self._simulation_mode == SimulationMode.TRUE:
self._api = self._simulator_api()
else:
self._api = self._grpc_api()
self.connect()
@final
def _simulator_api(self: PstProcessApiSubcomponentManager) -> PstProcessApi:
"""Get instance of the simulator API."""
self.logger.debug(f"{self.subcomponent_name} component manager setting up simulated API")
return PstProcessApiSimulator(
simulator=self._simulator,
logger=self.logger,
)
@final
def _grpc_api(self: PstProcessApiSubcomponentManager) -> PstProcessApi:
"""Get instance of a gRPC API."""
self.logger.debug(f"{self.subcomponent_name} component manager setting up gRPC API")
return PstProcessApiGrpc(
client_id=self.subcomponent_id,
grpc_endpoint=self.process_api_endpoint,
logger=self.logger,
strategy=self._grpc_strategy,
)
@override
def start_health_check(self: PstProcessApiSubcomponentManager) -> None:
"""
Start performing health check on subcomponent in the background.
The background health check processing is delegated to the process API.
"""
self.logger.info(f"Starting health check for {self.subcomponent_name}")
self.health_check_state_mismatch_count = 0
self._api.perform_health_check(
health_check_handler=self,
health_check_interval=self.health_check_interval,
)
self.logger.debug(f"Health check for {self.subcomponent_name} has started")
@override
def stop_health_check(self: PstProcessApiSubcomponentManager) -> None:
"""Stop background health check on subcomponent."""
self.logger.info(f"Stopping health check for {self.subcomponent_name}")
self._api.stop_health_check(wait_for_task=False)
self.logger.debug(f"Health check for {self.subcomponent_name} has stopped")
@property
def remote_obs_state(self: PstProcessApiSubcomponentManager) -> ObsState:
"""Get the ObsState of the remote system."""
return self._api.get_state()
[docs] def handle_health_check_state(
self: PstProcessApiSubcomponentManager, state: HealthCheckState, **kwargs: Any
) -> None:
"""
Handle a health check state from the subcomponent.
This is an implementation of the :py:class:`ska_pst.lmc.health_check.HealthCheckHandler`
protocol.
:param state: the current health check state of subcomponent.
:type state: HealthCheckState
"""
with self._health_state_lock:
self.logger.debug(
f"Received a health check state for {self.subcomponent_name}. Details = {state}"
)
if state.exception is not None:
self._handle_health_check_exception(state)
elif self.obs_state == state.obs_state:
# Only log
if self.health_check_state_mismatch_count >= HEALTH_CHECK_MISMATCH_INFO_LEVEL:
self.logger.info(
f"Health check for {state.service_name} returned to expected "
f"state {state.obs_state.name}"
)
self.health_check_state_mismatch_count = 0
elif state.obs_state is ObsState.FAULT:
# this tracks that the service has gone into a FAULT state but
# it was unexpected.
self._handle_health_check_fault(state) # type: ignore
else:
# update a health check state mismatch, debug log
assert state.obs_state is not None, f"expected obs_state of {state} to not be None"
self._handle_health_check_unexpected_state(state)
def _handle_health_check_exception(
self: PstProcessApiSubcomponentManager, state: HealthCheckState
) -> None:
# if there is not condition var then this exception has happened
# unexpectedly so the health state should go to FAILED.
#
# if the condvar exists then there is a reset in progress and the
# code should notify that condvar that a streaming exception has
# happen to allow for the completion of the reset process.
if self._reset_condvar is None:
# Should this be FAILED or UNKNOWN?
self.health_state = HealthState.FAILED
self.logger.error(
f"Health check for {state.service_name} has raised an "
f"exception: {state.exception}. Restarting health check with 1s delay"
)
# prevents tight health check failure/restart busy loop
time.sleep(1)
self.start_health_check()
else:
self.health_state = HealthState.UNKNOWN
self.logger.info(
f"Health check received an exception during Reset. Assuming connection of remote "
f"service {state.service_name} has closed due to a restart"
)
with self._reset_condvar:
self._reset_condvar.notify_all()
def _handle_health_check_fault(self: PstProcessApiSubcomponentManager, state: HealthCheckState) -> None:
# already have the health state lock so we can do this here
self._health_check_state_mismatch_count += 1
self.logger.warning(
f"{state.service_name} has gone into a FAULT stage. It is expected to have been in "
f"{self.obs_state.name}. Fault message = '{state.fault_message}'"
)
self._set_fault_state(state.fault_message) # type: ignore
self.health_state = HealthState.FAILED
def _handle_health_check_unexpected_state(
self: PstProcessApiSubcomponentManager, state: HealthCheckState
) -> None:
obs_state: ObsState = state.obs_state # type: ignore
mismatch_count = self.health_check_state_mismatch_count
mismatch_count += 1
if mismatch_count >= HEALTH_CHECK_MISMATCH_WARNING_LEVEL:
self.logger.warning(
f"{self.subcomponent_name} health check state mismatch "
f"{HEALTH_CHECK_MISMATCH_WARNING_LEVEL} or more times "
f"in a row. currently expecting {self.obs_state.name} got {obs_state.name}",
exc_info=False,
)
elif mismatch_count >= HEALTH_CHECK_MISMATCH_INFO_LEVEL:
self.logger.info(
f"{self.subcomponent_name} health check state mismatch "
f"{HEALTH_CHECK_MISMATCH_INFO_LEVEL} or more times - "
f"expected {self.obs_state.name} got {obs_state.name}"
)
elif mismatch_count >= HEALTH_CHECK_MISMATCH_DEBUG_LEVEL:
# Ignore being out by 1 to avoid issues with race conditions between LMC and
# remote service.
self.logger.debug(
f"{self.subcomponent_name} health check state mismatch "
f"{HEALTH_CHECK_MISMATCH_DEBUG_LEVEL} or more times - "
f"expected {self.obs_state.name} got {obs_state.name}"
)
self.health_check_state_mismatch_count = mismatch_count
def __getattr__(self: PstProcessApiSubcomponentManager, name: str) -> Any:
"""
Get attribute of component manager.
This is a Python dunder method that is used to get attributes/properties
that have not been found by the ``__getattribute__`` method.
This allows delegating to properties of the ``monitor_data`` property
:param name: the name of the attribute
:type name: str
:return: the attribute value
:rtype: Any
:raises: AttributeError if attribute cannot be found on monitoring data
"""
return getattr(self.monitor_data, name)