Source code for ska_pst.lmc.component.api_subcomponent_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides the base Process API Component Manager for PST.LMC."""

from __future__ import annotations

__all__ = [
    "PstProcessApiSubcomponentManager",
]

import dataclasses
import threading
import time
from functools import cache
from typing import Any, Callable, Generic, TypeVar

from overrides import EnforceOverrides, final, override
from ska_control_model import HealthState, LoggingLevel, ObsState, SimulationMode
from ska_pst.lmc.health_check import HealthCheckState
from ska_pst.lmc.util import background_task

from .grpc_process_api import GrpcApiStrategy, PstProcessApiGrpc
from .monitor_data_handler import MonitorDataHandler, MonitorDataStore
from .process_api import PstProcessApi
from .pst_simulator import PstSimulator
from .simulator_process_api import PstProcessApiSimulator
from .subcomponent_manager import PstSubcomponentManager

SubbandMonitorDataType = TypeVar("SubbandMonitorDataType")
MonitorDataType = TypeVar("MonitorDataType")
DataStore = TypeVar("DataStore", bound=MonitorDataStore)

HEALTH_CHECK_MISMATCH_DEBUG_LEVEL: int = 1
"""The number of health check mismatches to do debug level logging on."""

HEALTH_CHECK_MISMATCH_INFO_LEVEL: int = 2
"""The number of health check mismatches to do info level logging on."""

HEALTH_CHECK_MISMATCH_WARNING_LEVEL: int = 3
"""The number of health check mismatches to do warning level logging on."""


[docs]class PstProcessApiSubcomponentManager( PstSubcomponentManager, EnforceOverrides, Generic[SubbandMonitorDataType, MonitorDataType, DataStore] ): """A base subcomponent Manager for the PST.LMC that uses an API. This extends from the :py:class:`PstBaseSubcomponentManager` """ _simulation_mode: SimulationMode def __init__( self: PstProcessApiSubcomponentManager, *, process_api_endpoint: str, data_store: DataStore, monitor_data_updated_callback: Callable[[dict], None], simulator: PstSimulator, grpc_strategy: GrpcApiStrategy, **kwargs: Any, ) -> None: """ Initialise instance of the subcomponent manager that uses an API. :param process_api_endpoint: the process API endpoint to use if needing to create an instance of gRPC based API instance. :type process_api_endpoint: str :param api: the API client to use, this could be a simulated API or a gRPC based API. :type api: Api :param data_store: the data store for scan monitoring data. :type data_store: DataStore :param monitor_data_updated_callback: the callback to use when there is a change to the monitoring data during a scan. :type monitor_data_updated_callback: Callable[[dict], None] """ self._health_state_lock: threading.RLock = threading.RLock() super().__init__(**kwargs) self._monitor_data_store = data_store self._simulator = simulator self._grpc_strategy = grpc_strategy self._api = self._simulator_api() self._monitor_data_handler: MonitorDataHandler[SubbandMonitorDataType, MonitorDataType] = ( MonitorDataHandler( data_store=data_store, monitor_data_callback=self.__handle_monitor_data_update, ) ) self.monitor_data_updated_callback = monitor_data_updated_callback self.process_api_endpoint = process_api_endpoint self._reset_condvar: threading.Condition | None = None self._health_check_state_mismatch_count = 0 @final @override def _simulation_mode_changed(self: PstProcessApiSubcomponentManager) -> None: """ Handle simulation mode has changed. The default action for API subcomponent managers is to call the :py:meth:`_update_api` method. """ self._update_api()
[docs] @cache def get_env(self: PstProcessApiSubcomponentManager) -> dict: """Get the environment properties for the service.""" return self._api.get_env()
@final def __handle_monitor_data_update( self: PstProcessApiSubcomponentManager, monitor_data: MonitorDataType ) -> None: """Handle monitoring data. This method is used convert the monitoring data into a dict and then call the :py:attr:`monitor_data_updated_callback` callback. :param monitor_data: the monitoring data that has just been updated. :type monitor_data: MonitorDataType """ data_dict = dataclasses.asdict(monitor_data) # type: ignore self.monitor_data_updated_callback(data_dict) @property def monitor_data(self: PstProcessApiSubcomponentManager) -> MonitorDataType: """Get the current monitoring data.""" return self._monitor_data_handler.monitor_data @property def health_check_state_mismatch_count(self: PstProcessApiSubcomponentManager) -> int: """Get the number of times that the health check state was not the expected state.""" with self._health_state_lock: return self._health_check_state_mismatch_count @health_check_state_mismatch_count.setter def health_check_state_mismatch_count( self: PstProcessApiSubcomponentManager, health_check_state_mismatch_count: int ) -> None: with self._health_state_lock: self._health_check_state_mismatch_count = health_check_state_mismatch_count @property def obs_state(self: PstProcessApiSubcomponentManager) -> ObsState: """ Get the current observing state of sub-component. :return: the current observing state of sub-component. :rtype: ObsState """ return self._obs_state @obs_state.setter def obs_state(self: PstProcessApiSubcomponentManager, obs_state: ObsState) -> None: """ Set the current observing state of sub-component. :param obs_state: the current observing state of the sub-component. :type obs_state: ObsState """ with self._health_state_lock: self._obs_state = obs_state self._fire_state_event() # --------------- # Commands # --------------- @override def validate_configure_scan(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """Validate configure scan request with the specific configuration of the subcomponent. Note this is for the whole ConfigureScan request for a PST BEAM subcomponent, which includes checking both the beam and scan configuration is correct. This is due to the fact that a client of BEAM.MGMT only exposes a ConfigureScan request as it's an Obs device. :param configuration: the configuration to validate. :type configuration: dict """ raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.") @override @final def configure_beam(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """ Configure the beam specific configuration of the subcomponent. This method calls the abstract method :py:meth:`_configure_beam` to ensure state is in the right mode for the subcomponent but allowing for implementation specific details for each subcomponent. NOTE: this method is marked as final but each individual subcomponent manager may need to do specific processing. Subclasses of this class should implement the ``_configure_beam`` method. :param configuration: configuration for beam :type configuration: dict """ self.logger.info(f"Performing 'configure_beam' on '{self.subcomponent_name.upper()}'") self._configure_beam(configuration=configuration) self.obs_state = ObsState.IDLE def _configure_beam(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """Configure the beam specific configuration of the subcomponent. This method is abstract to allow for each subcomponent to do specific handling of beam configuration. :param configuration: configuration for beam :type configuration: dict """ raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.") @override def deconfigure_beam(self: PstProcessApiSubcomponentManager) -> None: """ Deconfigure the subcomponent's beam configuration. This will release all the resources associated with the subcomponent, including the SMRBs. """ self.logger.info(f"Performing 'deconfigure_beam' on '{self.subcomponent_name.upper()}'") self._api.deconfigure_beam() self.obs_state = ObsState.EMPTY @override @final def configure_scan(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """ Configure the subcomponent for a scan. :param configuration: the scan configuration parameters :type configuration: dict """ self.logger.info(f"Performing 'configure_scan' on '{self.subcomponent_name.upper()}'") self._api.configure_scan(configuration=configuration) self.obs_state = ObsState.READY self._api.monitor( monitor_data_callback=self._monitor_data_handler.handle_subband_data, polling_rate=self.monitoring_polling_rate_ms, ) @override @final def deconfigure_scan(self: PstProcessApiSubcomponentManager) -> None: """Deconfigure this subcomponent for current scan configuration.""" self.logger.info(f"Performing 'deconfigure_scan' on '{self.subcomponent_name.upper()}'") self._api.deconfigure_scan() self.obs_state = ObsState.IDLE self.reset_monitoring() @override @final def scan(self: PstProcessApiSubcomponentManager, scan_id: int, **kwargs: Any) -> None: """ Start scanning. The kwargs of this method is scan request. By using the kwargs allow for forward compatibility of accepting other parameters for the starting of the scan. :param scan_id: the scan ID :type scan_id: int """ try: self.logger.info(f"Performing 'scan' on '{self.subcomponent_name.upper()}'. SCAN_ID={scan_id}") self._api.start_scan(scan_id=scan_id, **kwargs) self.obs_state = ObsState.SCANNING except Exception: self.logger.exception(f"Error in starting scan for {self.subcomponent_id}", exc_info=True) raise @override @final def end_scan(self: PstProcessApiSubcomponentManager) -> None: """Stop scanning.""" self.logger.info(f"Performing 'end_scan' on '{self.subcomponent_name.upper()}'") self._api.stop_scan() self.obs_state = ObsState.READY @override @final def abort(self: PstProcessApiSubcomponentManager) -> None: """ Abort current process. The only long lived process for API based subcomponents is that of SCANNING. However, if another system fails this can be used to put all the subcomponents into an ABORTED state. """ self.logger.info(f"Performing 'abort' on '{self.subcomponent_name.upper()}'") try: self.reset_monitoring() self._api.abort() self.obs_state = ObsState.ABORTED except Exception: # Don't let the exception bubble up any further. # If an exception has occurred during aborting then it's okay # for the service to be in FAULT state as it's a resettable state. self.obs_state = ObsState.FAULT @override @final def reset_monitoring(self: PstProcessApiSubcomponentManager) -> None: """Stop monitoring and reset monitoring data.""" self._api.stop_monitoring() self._monitor_data_handler.reset_monitor_data() @override @final def obsreset(self: PstProcessApiSubcomponentManager) -> None: """ Reset service. This is used to reset a service in ABORTED or FAULT states back to an EMPTY state. This will deconfigure a scan and beam. """ self.logger.info(f"Performing 'obsreset' on '{self.subcomponent_name.upper()}'") self._api.reset() self.obs_state = ObsState.EMPTY @override @final def reset(self: PstProcessApiSubcomponentManager) -> None: """ Restart service. This is used to restart a service regardless of state. """ self.logger.info(f"Performing 'reset' on '{self.subcomponent_name.upper()}'") # The TANGO will ensure that there is only one Reset method # happening at a time. We need this reset condvar to know from # the health check background process that the service has exited # which will allow that for the reset process to complete. self._reset_condvar = threading.Condition() # need to lock the reset cond before performing restart on API # as we use the health check stopping as an indicator with self._reset_condvar: self._api.restart() if self._reset_condvar.wait(1.0): self.logger.debug(f"Reset condvar for {self.subcomponent_name} been reset.") else: self.logger.warning( f"Waiting for reset of {self.subcomponent_name} service to happen timed out." ) self._reset_condvar = None self.obs_state = ObsState.EMPTY @override @final def go_to_fault(self: PstProcessApiSubcomponentManager, fault_msg: str) -> None: """ Set the subcomponent into a FAULT state. For BEAM this will put the subcomponents into a FAULT state. For API backed subcomponent managers it is expected that the service backing that API should be put into a FAULT state. :param fault_msg: description of the fault :type fault_msg: str """ self.logger.info(f"Performing 'go_to_fault' on '{self.subcomponent_name.upper()}'") self.reset_monitoring() self._api.go_to_fault() self._set_fault_state(fault_msg) @final def _set_fault_state(self: PstProcessApiSubcomponentManager, fault_msg: str) -> None: self._fault_msg = fault_msg self.obs_state = ObsState.FAULT @override @final def set_logging_level(self: PstProcessApiSubcomponentManager, log_level: LoggingLevel) -> None: """ Set log level of subcomponent. :param log_level: The required logging level for the subcomponent :type log_level: LoggingLevel """ self._api.set_log_level(log_level=log_level) @override def connect(self: PstProcessApiSubcomponentManager) -> None: """Establish connection to API subcomponent.""" self._api.connect() self.health_state = HealthState.OK self.start_health_check() @background_task def _connect_background(self: PstProcessApiSubcomponentManager) -> None: """Attempt to connect to API subcomponent in the background.""" self.connect() @override def disconnect(self: PstProcessApiSubcomponentManager) -> None: """Disconnect from API subcomponent.""" self._api.stop_health_check(wait_for_task=False) self._api.disconnect() self.health_state = HealthState.UNKNOWN @final def _update_api(self: PstProcessApiSubcomponentManager) -> None: """ Update API used by subcomponent manager. This is called when there is a change in the simulation mode. """ self.disconnect() if self._simulation_mode == SimulationMode.TRUE: self._api = self._simulator_api() else: self._api = self._grpc_api() self.connect() @final def _simulator_api(self: PstProcessApiSubcomponentManager) -> PstProcessApi: """Get instance of the simulator API.""" self.logger.debug(f"{self.subcomponent_name} component manager setting up simulated API") return PstProcessApiSimulator( simulator=self._simulator, logger=self.logger, ) @final def _grpc_api(self: PstProcessApiSubcomponentManager) -> PstProcessApi: """Get instance of a gRPC API.""" self.logger.debug(f"{self.subcomponent_name} component manager setting up gRPC API") return PstProcessApiGrpc( client_id=self.subcomponent_id, grpc_endpoint=self.process_api_endpoint, logger=self.logger, strategy=self._grpc_strategy, ) @override def start_health_check(self: PstProcessApiSubcomponentManager) -> None: """ Start performing health check on subcomponent in the background. The background health check processing is delegated to the process API. """ self.logger.info(f"Starting health check for {self.subcomponent_name}") self.health_check_state_mismatch_count = 0 self._api.perform_health_check( health_check_handler=self, health_check_interval=self.health_check_interval, ) self.logger.debug(f"Health check for {self.subcomponent_name} has started") @override def stop_health_check(self: PstProcessApiSubcomponentManager) -> None: """Stop background health check on subcomponent.""" self.logger.info(f"Stopping health check for {self.subcomponent_name}") self._api.stop_health_check(wait_for_task=False) self.logger.debug(f"Health check for {self.subcomponent_name} has stopped") @property def remote_obs_state(self: PstProcessApiSubcomponentManager) -> ObsState: """Get the ObsState of the remote system.""" return self._api.get_state()
[docs] def handle_health_check_state( self: PstProcessApiSubcomponentManager, state: HealthCheckState, **kwargs: Any ) -> None: """ Handle a health check state from the subcomponent. This is an implementation of the :py:class:`ska_pst.lmc.health_check.HealthCheckHandler` protocol. :param state: the current health check state of subcomponent. :type state: HealthCheckState """ with self._health_state_lock: self.logger.debug( f"Received a health check state for {self.subcomponent_name}. Details = {state}" ) if state.exception is not None: self._handle_health_check_exception(state) elif self.obs_state == state.obs_state: # Only log if self.health_check_state_mismatch_count >= HEALTH_CHECK_MISMATCH_INFO_LEVEL: self.logger.info( f"Health check for {state.service_name} returned to expected " f"state {state.obs_state.name}" ) self.health_check_state_mismatch_count = 0 elif state.obs_state is ObsState.FAULT: # this tracks that the service has gone into a FAULT state but # it was unexpected. self._handle_health_check_fault(state) # type: ignore else: # update a health check state mismatch, debug log assert state.obs_state is not None, f"expected obs_state of {state} to not be None" self._handle_health_check_unexpected_state(state)
def _handle_health_check_exception( self: PstProcessApiSubcomponentManager, state: HealthCheckState ) -> None: # if there is not condition var then this exception has happened # unexpectedly so the health state should go to FAILED. # # if the condvar exists then there is a reset in progress and the # code should notify that condvar that a streaming exception has # happen to allow for the completion of the reset process. if self._reset_condvar is None: # Should this be FAILED or UNKNOWN? self.health_state = HealthState.FAILED self.logger.error( f"Health check for {state.service_name} has raised an " f"exception: {state.exception}. Restarting health check with 1s delay" ) # prevents tight health check failure/restart busy loop time.sleep(1) self.start_health_check() else: self.health_state = HealthState.UNKNOWN self.logger.info( f"Health check received an exception during Reset. Assuming connection of remote " f"service {state.service_name} has closed due to a restart" ) with self._reset_condvar: self._reset_condvar.notify_all() def _handle_health_check_fault(self: PstProcessApiSubcomponentManager, state: HealthCheckState) -> None: # already have the health state lock so we can do this here self._health_check_state_mismatch_count += 1 self.logger.warning( f"{state.service_name} has gone into a FAULT stage. It is expected to have been in " f"{self.obs_state.name}. Fault message = '{state.fault_message}'" ) self._set_fault_state(state.fault_message) # type: ignore self.health_state = HealthState.FAILED def _handle_health_check_unexpected_state( self: PstProcessApiSubcomponentManager, state: HealthCheckState ) -> None: obs_state: ObsState = state.obs_state # type: ignore mismatch_count = self.health_check_state_mismatch_count mismatch_count += 1 if mismatch_count >= HEALTH_CHECK_MISMATCH_WARNING_LEVEL: self.logger.warning( f"{self.subcomponent_name} health check state mismatch " f"{HEALTH_CHECK_MISMATCH_WARNING_LEVEL} or more times " f"in a row. currently expecting {self.obs_state.name} got {obs_state.name}", exc_info=False, ) elif mismatch_count >= HEALTH_CHECK_MISMATCH_INFO_LEVEL: self.logger.info( f"{self.subcomponent_name} health check state mismatch " f"{HEALTH_CHECK_MISMATCH_INFO_LEVEL} or more times - " f"expected {self.obs_state.name} got {obs_state.name}" ) elif mismatch_count >= HEALTH_CHECK_MISMATCH_DEBUG_LEVEL: # Ignore being out by 1 to avoid issues with race conditions between LMC and # remote service. self.logger.debug( f"{self.subcomponent_name} health check state mismatch " f"{HEALTH_CHECK_MISMATCH_DEBUG_LEVEL} or more times - " f"expected {self.obs_state.name} got {obs_state.name}" ) self.health_check_state_mismatch_count = mismatch_count def __getattr__(self: PstProcessApiSubcomponentManager, name: str) -> Any: """ Get attribute of component manager. This is a Python dunder method that is used to get attributes/properties that have not been found by the ``__getattribute__`` method. This allows delegating to properties of the ``monitor_data`` property :param name: the name of the attribute :type name: str :return: the attribute value :rtype: Any :raises: AttributeError if attribute cannot be found on monitoring data """ return getattr(self.monitor_data, name)