Source code for ska_pst.lmc.component.simulator_process_api

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the base simulator implementation for the ``PstProcessApi``."""

from __future__ import annotations

from ska_pst.lmc.health_check.health_check import HealthCheckHandler

__all__ = ["PstProcessApiSimulator"]

import logging
import threading
import time
from typing import Any, Dict, Generator, Optional

from overrides import override
from ska_control_model import LoggingLevel, ObsState, PstProcessingMode
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.lmc.component import PstSimulator
from ska_pst.lmc.util.background_task import BackgroundTaskProcessor, background_task
from ska_pst.lmc.util.timeout_iterator import TimeoutIterator
from ska_pst.lmc.validation import ValidationError

from .monitor_data_handler import MonitorDataCallback
from .process_api import PstProcessApi


[docs]class PstProcessApiSimulator(PstProcessApi): """Abstract class for the Simulated API of the PST.LMC processes like RECV, SMRB, etc.""" def __init__( self: PstProcessApiSimulator, simulator: PstSimulator, logger: logging.Logger | None = None, **kwargs: Any, ) -> None: """ Initialise the simulator API. :param simulator: the simulator that the API would delegate requests to. :type simulator: Simulator :param logger: the logger to use logging messages, defaults to None :type logger: logging.Logger | None, optional """ self._simulator = simulator self._monitor_abort_event = threading.Event() self._scanning = False self.fail_validate_configure_beam = False self.fail_validate_configure_scan = False super().__init__(logger=logger, **kwargs) self._background_task_processor = BackgroundTaskProcessor(default_logger=self._logger) self._obs_state = ObsState.EMPTY def _should_be_monitoring(self: PstProcessApiSimulator) -> bool: if self._monitor_abort_event.is_set(): return False return self._scanning @override def connect(self: PstProcessApiSimulator) -> None: """Connect to the external process.""" @override def disconnect(self: PstProcessApiSimulator) -> None: """Disconnect from the external process.""" self.stop_monitoring() @override def validate_configure_beam( self: PstProcessApiSimulator, configuration: dict, pst_processing_mode: PstProcessingMode ) -> None: """ Validate configure beam request. :param configuration: Dictionary of resources to allocate. :type configuration: dict :raises ValidationError: if there is an issue validating the request. The error message contains the details. """ if self.fail_validate_configure_beam: raise ValidationError("Simulated validation error for configure beam.") if "source" in configuration and configuration["source"] == "invalid source": raise ValidationError("Simulated validation error due to invalid source") @override def validate_configure_scan(self: PstProcessApiSimulator, configuration: dict) -> None: """Validate configure scan request.""" if self.fail_validate_configure_scan: raise ValidationError("Simulated validation error for configure scan.") if "source" in configuration and configuration["source"] == "invalid source": raise ValidationError("Simulated validation error due to invalid source") def _simulated_monitor_data_generator( self: PstProcessApiSimulator, polling_rate: int ) -> Generator[Dict[int, Any], None, None]: """ Create a generator of simulated monitoring data. :param polling_rate: the interval, in milliseconds, at which the monitoring should generated data. :type polling_rate: int :yield: a dictionary of data for each simulated subband :rtype: Generator[Dict[int, Any], None, None] """ while self._should_be_monitoring(): self._logger.debug("Background generator is creating data") yield self._simulator.get_subband_data() self._logger.debug(f"Sleeping {polling_rate}ms") time.sleep(polling_rate / 1000.0) @override def configure_beam( self: PstProcessApiSimulator, configuration: dict, pst_processing_mode: PstProcessingMode ) -> None: """ Configure the beam. :param configuration: dictionary of parameters to be configured and their requested values """ self._logger.debug(f"Configuring beam with configuration={configuration}") self._obs_state = ObsState.IDLE @override def deconfigure_beam(self: PstProcessApiSimulator) -> None: """ Deconfigure the beam. The default implementation of this does nothing. """ self._obs_state = ObsState.EMPTY @override def configure_scan(self: PstProcessApiSimulator, configuration: dict) -> None: """ Configure a scan. :param configuration: the configuration for the scan. """ self._simulator.configure_scan(configuration=configuration) self._obs_state = ObsState.READY @override def deconfigure_scan(self: PstProcessApiSimulator) -> None: """Deconfigure a scan.""" self._simulator.deconfigure_scan() self._obs_state = ObsState.IDLE @override def start_scan(self: PstProcessApiSimulator, scan_id: int, **kwargs: Any) -> None: """ Start a scan. :param scan_id: the ID for the scan. :type scan_id: int :param kwargs: additional arguments, needed to allow for future proofing of scan request coming from TM / CSP. :type kwargs: dict """ self._simulator.start_scan(scan_id=scan_id, **kwargs) self._scanning = True self._obs_state = ObsState.SCANNING @override def stop_scan(self: PstProcessApiSimulator) -> None: """Stop a scan.""" self.stop_monitoring() self._simulator.stop_scan() self._scanning = False self._obs_state = ObsState.READY @override def abort(self: PstProcessApiSimulator) -> None: """Abort a scan.""" self.stop_monitoring() self._simulator.abort() self._scanning = False self._obs_state = ObsState.ABORTED @override def reset(self: PstProcessApiSimulator) -> None: """Reset service when in ABORTED / FAULT state.""" self.stop_monitoring() self._scanning = False self._obs_state = ObsState.EMPTY @override def restart(self: PstProcessApiSimulator) -> None: """Restart service in any state.""" self.stop_monitoring() self._scanning = False self._obs_state = ObsState.EMPTY @override def go_to_fault(self: PstProcessApiSimulator) -> None: """ Set simulator into a FAULT state. If simulator is scanning then stop scanning. """ if self._scanning: self._scanning = False self.stop_monitoring() self._obs_state = ObsState.FAULT @background_task @override def monitor( self: PstProcessApiSimulator, monitor_data_callback: MonitorDataCallback, polling_rate: int = DEFAULT_MONITORING_INTERVAL_MS, monitor_abort_event: Optional[threading.Event] = None, ) -> None: """ Monitor data of remote service. :param monitor_data_callback: callback to use when there is an update of the sub-band monitor data. :param polling_rate: the rate, in milliseconds, at which the monitoring should poll. The default value is 5000ms (i.e. 5 seconds). :param monitor_abort_event: a :py:class:`threading.Event` that can be used to signal to stop monitoring. If not set then the background task will create one. """ self._logger.debug(f"Starting to monitor at {polling_rate}ms") try: if monitor_abort_event is not None: self._monitor_abort_event = monitor_abort_event # make sure we reset the monitoring event self._monitor_abort_event.clear() while not self._monitor_abort_event.is_set(): try: for data in TimeoutIterator( self._simulated_monitor_data_generator(polling_rate=polling_rate), abort_event=self._monitor_abort_event, timeout=2 * polling_rate / 1000.0, expected_period=polling_rate / 1000.0, ): for subband_id, subband_data in data.items(): monitor_data_callback(subband_id=subband_id, subband_data=subband_data) except TimeoutError: if self._monitor_abort_event.is_set(): # this could be a race condition for the abort event, so ignore timeout continue self._logger.warning("received timeout during monitoring before abort event set.") except Exception: self._logger.error("error while monitoring.", exc_info=True) @override def stop_monitoring(self: PstProcessApiSimulator, **kwargs: Any) -> None: """Stop the monitoring background thread by setting event.""" self._monitor_abort_event.set() @override def set_log_level(self: PstProcessApiSimulator, log_level: LoggingLevel) -> None: """ Set simulator LoggingLevel of the PST.LMC processes like RECV, SMRB, etc. :param log_level: The required TANGO LoggingLevel :returns: None. """ self.logging_level = log_level @override def get_env(self: PstProcessApiSimulator) -> dict: """Get the environment properties for the service.""" return self._simulator.get_env() @override def perform_health_check( self: PstProcessApiSimulator, health_check_handler: HealthCheckHandler, health_check_interval: int = DEFAULT_HEALTH_CHECK_INTERVAL_MS, health_check_abort_event: threading.Event | None = None, ) -> None: """ Perform health check of a process in the background. The simulator process doesn't perform health check. This may change in the future to simulate errors. :param health_check_handler: an object that implements the ``HealthCheckHandler`` protocol. Any health check state object that is returned from the service is delegated to this handler to be handled. :type health_check_handler: HealthCheckHandler :param health_check_interval: the interval, in milliseconds, at which health check should be perform, defaults to 1000 (i.e. 1 second). :type health_check_interval: int, optional :param health_check_abort_event: a threading primitive to be used to stop the health check by an external mechanism, defaults to None :type health_check_abort_event: Optional[threading.Event], optional """ @override def stop_health_check(self: PstProcessApiSimulator, **kwargs: Any) -> None: """ Stop performing health check of service. As the simulator API doesn't perform health check this method does nothing. """ @override def get_state(self: PstProcessApiSimulator) -> ObsState: """Get the ObsState of the remote system.""" return self._obs_state