Source code for ska_pst.lmc.component.api_subcomponent_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides the base Process API Component Manager for PST.LMC."""

from __future__ import annotations

__all__ = [
    "PstProcessApiSubcomponentManager",
]

import dataclasses
import threading
from functools import cache
from typing import Any, Callable, Generic, TypeVar, cast

from overrides import EnforceOverrides, final, override
from ska_control_model import HealthState, LoggingLevel, ObsState, SimulationMode
from ska_pst.lmc.component.monitor_data_handler import MonitorDataHandler, MonitorDataStore
from ska_pst.lmc.component.process_api import PstProcessApi
from ska_pst.lmc.health_check import HealthCheckState
from ska_pst.lmc.util import background_task

from .subcomponent_manager import PstSubcomponentManager

SubbandMonitorDataType = TypeVar("SubbandMonitorDataType")
MonitorDataType = TypeVar("MonitorDataType")
Api = TypeVar("Api", bound=PstProcessApi)
DataStore = TypeVar("DataStore", bound=MonitorDataStore)


[docs]class PstProcessApiSubcomponentManager( PstSubcomponentManager, EnforceOverrides, Generic[SubbandMonitorDataType, MonitorDataType, Api, DataStore] ): """A base subcomponent Manager for the PST.LMC that uses an API. This extends from the :py:class:`PstBaseSubcomponentManager` """ _simulation_mode: SimulationMode def __init__( self: PstProcessApiSubcomponentManager, *, process_api_endpoint: str, api: Api, data_store: DataStore, monitor_data_updated_callback: Callable[[dict], None], monitor_data_prefix: str = "", **kwargs: Any, ) -> None: """ Initialise instance of the subcomponent manager that uses an API. :param process_api_endpoint: the process API endpoint to use if needing to create an instance of gRPC based API instance. :type process_api_endpoint: str :param api: the API client to use, this could be a simulated API or a gRPC based API. :type api: Api :param data_store: the data store for scan monitoring data. :type data_store: DataStore :param monitor_data_updated_callback: the callback to use when there is a change to the monitoring data during a scan. :type monitor_data_updated_callback: Callable[[dict], None] :param monitor_data_prefix: the prefix to prepend to data model fields names when calling the ``monitor_data_updated_callback``, defaults to "" :type monitor_data_prefix: str, optional """ self._health_state_lock: threading.RLock = threading.RLock() super().__init__(**kwargs) self._monitor_data_store = data_store self._api: PstProcessApi = api self._monitor_data_handler: MonitorDataHandler[SubbandMonitorDataType, MonitorDataType] = ( MonitorDataHandler( data_store=data_store, monitor_data_callback=self.__handle_monitor_data_update, ) ) self.monitor_data_updated_callback = monitor_data_updated_callback self.process_api_endpoint = process_api_endpoint self._reset_condvar: threading.Condition | None = None self._health_check_state_mismatch_count = 0 if monitor_data_prefix and not monitor_data_prefix.endswith("_"): self._monitor_data_prefix = f"{monitor_data_prefix}_" else: self._monitor_data_prefix = monitor_data_prefix @final @override def _simulation_mode_changed(self: PstProcessApiSubcomponentManager) -> None: """ Handle simulation mode has changed. The default action for API subcomponent managers is to call the :py:meth:`_update_api` method. """ self._update_api()
[docs] @cache def get_env(self: PstProcessApiSubcomponentManager) -> dict: """Get the environment properties for the service.""" return self._api.get_env()
@final def __handle_monitor_data_update( self: PstProcessApiSubcomponentManager, monitor_data: MonitorDataType ) -> None: """Handle monitoring data. This method is used convert the monitoring data into a dict and then call the :py:attr:`monitor_data_updated_callback` callback. :param monitor_data: the monitoring data that has just been updated. :type monitor_data: MonitorDataType """ data_dict = { f"{self._monitor_data_prefix}{k}": v for k, v in dataclasses.asdict(monitor_data).items() # type: ignore } self.monitor_data_updated_callback(data_dict) @property def monitor_data(self: PstProcessApiSubcomponentManager) -> MonitorDataType: """Get the current monitoring data.""" return self._monitor_data_handler.monitor_data @property def health_check_state_mismatch_count(self: PstProcessApiSubcomponentManager) -> int: """Get the number of times that the health check state was not the expected state.""" with self._health_state_lock: return self._health_check_state_mismatch_count @health_check_state_mismatch_count.setter def health_check_state_mismatch_count( self: PstProcessApiSubcomponentManager, health_check_state_mismatch_count: int ) -> None: with self._health_state_lock: self._health_check_state_mismatch_count = health_check_state_mismatch_count @property def obs_state(self: PstProcessApiSubcomponentManager) -> ObsState: """ Get the current observing state of sub-component. :return: the current observing state of sub-component. :rtype: ObsState """ return self._obs_state @obs_state.setter def obs_state(self: PstProcessApiSubcomponentManager, obs_state: ObsState) -> None: """ Set the current observing state of sub-component. :param obs_state: the current observing state of the sub-component. :type obs_state: ObsState """ with self._health_state_lock: self._obs_state = obs_state self._fire_state_event() # --------------- # Commands # --------------- @override def validate_configure_scan(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """Validate configure scan request with the specific configuration of the subcomponent. Note this is for the whole ConfigureScan request for a PST BEAM subcomponent, which includes checking both the beam and scan configuration is correct. This is due to the fact that a client of BEAM.MGMT only exposes a ConfigureScan request as it's an Obs device. :param configuration: the configuration to validate. :type configuration: dict """ raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.") @override @final def configure_beam(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """ Configure the beam specific configuration of the subcomponent. This method calls the abstract method :py:meth:`_configure_beam` to ensure state is in the right mode for the subcomponent but allowing for implementation specific details for each subcomponent. NOTE: this method is marked as final but each individual subcomponent manager may need to do specific processing. Subclasses of this class should implement the ``_configure_beam`` method. :param configuration: configuration for beam :type configuration: dict """ self._configure_beam(configuration=configuration) self.obs_state = ObsState.IDLE def _configure_beam(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """Configure the beam specific configuration of the subcomponent. This method is abstract to allow for each subcomponent to do specific handling of beam configuration. :param configuration: configuration for beam :type configuration: dict """ raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.") @override def deconfigure_beam(self: PstProcessApiSubcomponentManager) -> None: """ Deconfigure the subcomponent's beam configuration. This will release all the resources associated with the subcomponent, including the SMRBs. """ self._api.deconfigure_beam() self.obs_state = ObsState.EMPTY @override @final def configure_scan(self: PstProcessApiSubcomponentManager, configuration: dict) -> None: """ Configure the subcomponent for a scan. :param configuration: the scan configuration parameters :type configuration: dict """ self._api.configure_scan(configuration=configuration) self.obs_state = ObsState.READY @override @final def deconfigure_scan(self: PstProcessApiSubcomponentManager) -> None: """Deconfigure this subcomponent for current scan configuration.""" self._api.deconfigure_scan() self.obs_state = ObsState.IDLE @override @final def scan(self: PstProcessApiSubcomponentManager, scan_id: int, **kwargs: Any) -> None: """ Start scanning. The kwargs of this method is scan request. By using the kwargs allow for forward compatibility of accepting other parameters for the starting of the scan. :param scan_id: the scan ID :type scan_id: int """ try: self._api.start_scan(scan_id=scan_id, **kwargs) self.obs_state = ObsState.SCANNING self._api.monitor( subband_monitor_data_callback=self._monitor_data_handler.handle_subband_data, polling_rate=self.monitoring_polling_rate_ms, ) except Exception: self.logger.exception(f"Error in starting scan for {self.subcomponent_id}", exc_info=True) raise @override @final def end_scan(self: PstProcessApiSubcomponentManager) -> None: """Stop scanning.""" self.reset_monitoring() self._api.stop_scan() self.obs_state = ObsState.READY @override @final def abort(self: PstProcessApiSubcomponentManager) -> None: """ Abort current process. The only long lived process for API based subcomponents is that of SCANNING. However, if another system fails this can be used to put all the subcomponents into an ABORTED state. """ self.reset_monitoring() self._api.abort() self.obs_state = ObsState.ABORTED @override @final def reset_monitoring(self: PstProcessApiSubcomponentManager) -> None: """Stop monitoring and reset monitoring data.""" self._api.stop_monitoring() self._monitor_data_handler.reset_monitor_data() @override @final def obsreset(self: PstProcessApiSubcomponentManager) -> None: """ Reset service. This is used to reset a service in ABORTED or FAULT states back to an EMPTY state. This will deconfigure a scan and beam. """ self._api.reset() self.obs_state = ObsState.EMPTY @override @final def reset(self: PstProcessApiSubcomponentManager) -> None: """ Restart service. This is used to restart a service regardless of state. """ # The TANGO will ensure that there is only one Reset method # happening at a time. We need this reset condvar to know from # the health check background process that the service has exited # which will allow that for the reset process to complete. self._reset_condvar = threading.Condition() # need to lock the reset cond before performing restart on API # as we use the health check stopping as an indicator with self._reset_condvar: self._api.restart() if self._reset_condvar.wait(1.0): self.logger.debug(f"Reset condvar for {self.subcomponent_name} been reset.") else: self.logger.warning( f"Waiting for reset of {self.subcomponent_name} service to happen timed out." ) self._reset_condvar = None self.obs_state = ObsState.EMPTY @override @final def go_to_fault(self: PstProcessApiSubcomponentManager, fault_msg: str) -> None: """ Set the subcomponent into a FAULT state. For BEAM this will put the subcomponents into a FAULT state. For API backed subcomponent managers it is expected that the service backing that API should be put into a FAULT state. :param fault_msg: description of the fault :type fault_msg: str """ self.reset_monitoring() self._api.go_to_fault() self._set_fault_state(fault_msg) @final def _set_fault_state(self: PstProcessApiSubcomponentManager, fault_msg: str) -> None: self._fault_msg = fault_msg self.obs_state = ObsState.FAULT @override @final def set_logging_level(self: PstProcessApiSubcomponentManager, log_level: LoggingLevel) -> None: """ Set log level of subcomponent. :param log_level: The required logging level for the subcomponent :type log_level: LoggingLevel """ self._api.set_log_level(log_level=log_level) @override def connect(self: PstProcessApiSubcomponentManager) -> None: """Establish connection to API subcomponent.""" self._api.connect() self.health_state = HealthState.OK self.start_health_check() @background_task def _connect_background(self: PstProcessApiSubcomponentManager) -> None: """Attempt to connect to API subcomponent in the background.""" self.connect() @override def disconnect(self: PstProcessApiSubcomponentManager) -> None: """Disconnect from API subcomponent.""" self.stop_health_check() self._api.disconnect() self.health_state = HealthState.UNKNOWN @final def _update_api(self: PstProcessApiSubcomponentManager) -> None: """ Update API used by subcomponent manager. This is called when there is a change in the simulation mode. """ self.disconnect() if self._simulation_mode == SimulationMode.TRUE: self._api = cast(Api, self._simulator_api()) else: self._api = cast(Api, self._grpc_api()) self.connect() def _simulator_api(self: PstProcessApiSubcomponentManager) -> Api: """Get an instance of the simulation API.""" raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.") def _grpc_api(self: PstProcessApiSubcomponentManager) -> Api: """Get an instance of the gRPC API.""" raise NotImplementedError("PstProcessApiSubcomponentManager is abstract.") @override def start_health_check(self: PstProcessApiSubcomponentManager) -> None: """ Start performing health check on subcomponent in the background. The background health check processing is delegated to the process API. """ self.logger.info(f"Starting health check for {self.subcomponent_name}") self.health_check_state_mismatch_count = 0 self._api.perform_health_check( health_check_handler=self, health_check_interval=self.health_check_interval, ) self.logger.debug(f"Health check for {self.subcomponent_name} has started") @override def stop_health_check(self: PstProcessApiSubcomponentManager) -> None: """Stop background health check on subcomponent.""" self.logger.info(f"Stopping health check for {self.subcomponent_name}") self._api.stop_health_check() self.logger.debug(f"Health check for {self.subcomponent_name} has stopped")
[docs] def handle_health_check_state( self: PstProcessApiSubcomponentManager, state: HealthCheckState, **kwargs: Any ) -> None: """ Handle a health check state from the subcomponent. This is an implementation of the :py:class:`ska_pst.lmc.health_check.HealthCheckHandler` protocol. :param state: the current health check state of subcomponent. :type state: HealthCheckState """ with self._health_state_lock: self.logger.debug( f"Received a health check state for {self.subcomponent_name}. Details = {state}" ) if state.exception is not None: self._handle_health_check_exception(state) elif self.obs_state == state.obs_state: # If Python had trace logging we could it put here. set state mismatch to 0 if self.health_check_state_mismatch_count > 0: self.logger.info( f"Health check for {state.service_name} returned to expected " f"state {state.obs_state.name}" ) self.health_check_state_mismatch_count = 0 elif state.obs_state is ObsState.FAULT: # this tracks that the service has gone into a FAULT state but # it was unexpected. self._handle_health_check_fault(state) # type: ignore else: # update a health check state mismatch, debug log assert state.obs_state is not None, f"expected obs_state of {state} to not be None" self._handle_health_check_unexpected_state(state)
def _handle_health_check_exception( self: PstProcessApiSubcomponentManager, state: HealthCheckState ) -> None: # if there is not condition var then this exception has happened # unexpectedly so the health state should go to FAILED. # # if the condvar exists then there is a reset in progress and the # code should notify that condvar that a streaming exception has # happen to allow for the completion of the reset process. if self._reset_condvar is None: # Should this be FAILED or UNKNOWN? self.health_state = HealthState.FAILED self.logger.error( f"Health check for {state.service_name} has raised an " f"exception: {state.exception}. Restarting health check" ) self.start_health_check() else: self.health_state = HealthState.UNKNOWN self.logger.info( f"Health check received an exception during Reset. Assuming connection of remote " f"service {state.service_name} has closed due to a restart" ) with self._reset_condvar: self._reset_condvar.notify_all() def _handle_health_check_fault(self: PstProcessApiSubcomponentManager, state: HealthCheckState) -> None: # already have the health state lock so we can do this here self._health_check_state_mismatch_count += 1 self.logger.warning( f"{state.service_name} has gone into a FAULT stage. It is expected to have been in " f"{self.obs_state.name}. Fault message = '{state.fault_message}'" ) self._set_fault_state(state.fault_message) # type: ignore self.health_state = HealthState.FAILED def _handle_health_check_unexpected_state( self: PstProcessApiSubcomponentManager, state: HealthCheckState ) -> None: obs_state: ObsState = state.obs_state # type: ignore self._health_check_state_mismatch_count += 1 if self._health_check_state_mismatch_count == 1: self.logger.info( f"{self.subcomponent_name} initial health check state mismatch - " f"expected {self.obs_state.name} got {obs_state.name}" ) elif self._health_check_state_mismatch_count == 2: self.logger.warning( f"{self.subcomponent_name} health check state mismatch twice - currently " f"expecting {self.obs_state.name} got {obs_state.name}" ) else: self.logger.error( f"{self.subcomponent_name} health check state mismatch more than 2 times " f"in a row. currently expecting {self.obs_state.name} got {obs_state.name}" )