# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the base simulator implementation for the ``PstProcessApi``."""
from __future__ import annotations
from ska_pst.lmc.health_check.health_check import HealthCheckHandler
__all__ = ["PstProcessApiSimulator"]
import logging
import threading
import time
from typing import Any, Dict, Generator, Optional
from overrides import override
from ska_control_model import LoggingLevel, ObsState, PstProcessingMode
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.lmc.component import PstSimulator
from ska_pst.lmc.util.background_task import BackgroundTaskProcessor, background_task
from ska_pst.lmc.util.timeout_iterator import TimeoutIterator
from ska_pst.lmc.validation import ValidationError
from .monitor_data_handler import MonitorDataCallback
from .process_api import PstProcessApi
[docs]class PstProcessApiSimulator(PstProcessApi):
"""Abstract class for the Simulated API of the PST.LMC processes like RECV, SMRB, etc."""
def __init__(
self: PstProcessApiSimulator,
simulator: PstSimulator,
logger: logging.Logger | None = None,
**kwargs: Any,
) -> None:
"""
Initialise the simulator API.
:param simulator: the simulator that the API would delegate requests to.
:type simulator: Simulator
:param logger: the logger to use logging messages, defaults to None
:type logger: logging.Logger | None, optional
"""
self._simulator = simulator
self._monitor_abort_event = threading.Event()
self._scanning = False
self.fail_validate_configure_beam = False
self.fail_validate_configure_scan = False
super().__init__(logger=logger, **kwargs)
self._background_task_processor = BackgroundTaskProcessor(default_logger=self._logger)
self._obs_state = ObsState.EMPTY
def _should_be_monitoring(self: PstProcessApiSimulator) -> bool:
if self._monitor_abort_event.is_set():
return False
return self._scanning
@override
def connect(self: PstProcessApiSimulator) -> None:
"""Connect to the external process."""
@override
def disconnect(self: PstProcessApiSimulator) -> None:
"""Disconnect from the external process."""
self.stop_monitoring()
@override
def validate_configure_beam(
self: PstProcessApiSimulator, configuration: dict, pst_processing_mode: PstProcessingMode
) -> None:
"""
Validate configure beam request.
:param configuration: Dictionary of resources to allocate.
:type configuration: dict
:raises ValidationError: if there is an issue validating the request.
The error message contains the details.
"""
if self.fail_validate_configure_beam:
raise ValidationError("Simulated validation error for configure beam.")
if "source" in configuration and configuration["source"] == "invalid source":
raise ValidationError("Simulated validation error due to invalid source")
@override
def validate_configure_scan(self: PstProcessApiSimulator, configuration: dict) -> None:
"""Validate configure scan request."""
if self.fail_validate_configure_scan:
raise ValidationError("Simulated validation error for configure scan.")
if "source" in configuration and configuration["source"] == "invalid source":
raise ValidationError("Simulated validation error due to invalid source")
def _simulated_monitor_data_generator(
self: PstProcessApiSimulator, polling_rate: int
) -> Generator[Dict[int, Any], None, None]:
"""
Create a generator of simulated monitoring data.
:param polling_rate: the interval, in milliseconds, at which the monitoring should generated data.
:type polling_rate: int
:yield: a dictionary of data for each simulated subband
:rtype: Generator[Dict[int, Any], None, None]
"""
while self._should_be_monitoring():
self._logger.debug("Background generator is creating data")
yield self._simulator.get_subband_data()
self._logger.debug(f"Sleeping {polling_rate}ms")
time.sleep(polling_rate / 1000.0)
@override
def configure_beam(
self: PstProcessApiSimulator, configuration: dict, pst_processing_mode: PstProcessingMode
) -> None:
"""
Configure the beam.
:param configuration: dictionary of parameters to be configured and their requested values
"""
self._logger.debug(f"Configuring beam with configuration={configuration}")
self._obs_state = ObsState.IDLE
@override
def deconfigure_beam(self: PstProcessApiSimulator) -> None:
"""
Deconfigure the beam.
The default implementation of this does nothing.
"""
self._obs_state = ObsState.EMPTY
@override
def configure_scan(self: PstProcessApiSimulator, configuration: dict) -> None:
"""
Configure a scan.
:param configuration: the configuration for the scan.
"""
self._simulator.configure_scan(configuration=configuration)
self._obs_state = ObsState.READY
@override
def deconfigure_scan(self: PstProcessApiSimulator) -> None:
"""Deconfigure a scan."""
self._simulator.deconfigure_scan()
self._obs_state = ObsState.IDLE
@override
def start_scan(self: PstProcessApiSimulator, scan_id: int, **kwargs: Any) -> None:
"""
Start a scan.
:param scan_id: the ID for the scan.
:type scan_id: int
:param kwargs: additional arguments, needed to allow for future proofing
of scan request coming from TM / CSP.
:type kwargs: dict
"""
self._simulator.start_scan(scan_id=scan_id, **kwargs)
self._scanning = True
self._obs_state = ObsState.SCANNING
@override
def stop_scan(self: PstProcessApiSimulator) -> None:
"""Stop a scan."""
self.stop_monitoring()
self._simulator.stop_scan()
self._scanning = False
self._obs_state = ObsState.READY
@override
def abort(self: PstProcessApiSimulator) -> None:
"""Abort a scan."""
self.stop_monitoring()
self._simulator.abort()
self._scanning = False
self._obs_state = ObsState.ABORTED
@override
def reset(self: PstProcessApiSimulator) -> None:
"""Reset service when in ABORTED / FAULT state."""
self.stop_monitoring()
self._scanning = False
self._obs_state = ObsState.EMPTY
@override
def restart(self: PstProcessApiSimulator) -> None:
"""Restart service in any state."""
self.stop_monitoring()
self._scanning = False
self._obs_state = ObsState.EMPTY
@override
def go_to_fault(self: PstProcessApiSimulator) -> None:
"""
Set simulator into a FAULT state.
If simulator is scanning then stop scanning.
"""
if self._scanning:
self._scanning = False
self.stop_monitoring()
self._obs_state = ObsState.FAULT
@background_task
@override
def monitor(
self: PstProcessApiSimulator,
monitor_data_callback: MonitorDataCallback,
polling_rate: int = DEFAULT_MONITORING_INTERVAL_MS,
monitor_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Monitor data of remote service.
:param monitor_data_callback: callback to use when there is an
update of the sub-band monitor data.
:param polling_rate: the rate, in milliseconds, at which the monitoring
should poll. The default value is 5000ms (i.e. 5 seconds).
:param monitor_abort_event: a :py:class:`threading.Event` that can be
used to signal to stop monitoring. If not set then the background task
will create one.
"""
self._logger.debug(f"Starting to monitor at {polling_rate}ms")
try:
if monitor_abort_event is not None:
self._monitor_abort_event = monitor_abort_event
# make sure we reset the monitoring event
self._monitor_abort_event.clear()
while not self._monitor_abort_event.is_set():
try:
for data in TimeoutIterator(
self._simulated_monitor_data_generator(polling_rate=polling_rate),
abort_event=self._monitor_abort_event,
timeout=2 * polling_rate / 1000.0,
expected_period=polling_rate / 1000.0,
):
for subband_id, subband_data in data.items():
monitor_data_callback(subband_id=subband_id, subband_data=subband_data)
except TimeoutError:
if self._monitor_abort_event.is_set():
# this could be a race condition for the abort event, so ignore timeout
continue
self._logger.warning("received timeout during monitoring before abort event set.")
except Exception:
self._logger.error("error while monitoring.", exc_info=True)
@override
def stop_monitoring(self: PstProcessApiSimulator, **kwargs: Any) -> None:
"""Stop the monitoring background thread by setting event."""
self._monitor_abort_event.set()
@override
def set_log_level(self: PstProcessApiSimulator, log_level: LoggingLevel) -> None:
"""
Set simulator LoggingLevel of the PST.LMC processes like RECV, SMRB, etc.
:param log_level: The required TANGO LoggingLevel
:returns: None.
"""
self.logging_level = log_level
@override
def get_env(self: PstProcessApiSimulator) -> dict:
"""Get the environment properties for the service."""
return self._simulator.get_env()
@override
def perform_health_check(
self: PstProcessApiSimulator,
health_check_handler: HealthCheckHandler,
health_check_interval: int = DEFAULT_HEALTH_CHECK_INTERVAL_MS,
health_check_abort_event: threading.Event | None = None,
) -> None:
"""
Perform health check of a process in the background.
The simulator process doesn't perform health check. This may change
in the future to simulate errors.
:param health_check_handler: an object that implements the
``HealthCheckHandler`` protocol. Any health check state
object that is returned from the service is delegated to
this handler to be handled.
:type health_check_handler: HealthCheckHandler
:param health_check_interval: the interval, in milliseconds, at which
health check should be perform, defaults to 1000 (i.e. 1 second).
:type health_check_interval: int, optional
:param health_check_abort_event: a threading primitive to be used
to stop the health check by an external mechanism, defaults to None
:type health_check_abort_event: Optional[threading.Event], optional
"""
@override
def stop_health_check(self: PstProcessApiSimulator, **kwargs: Any) -> None:
"""
Stop performing health check of service.
As the simulator API doesn't perform health check this method does nothing.
"""
@override
def get_state(self: PstProcessApiSimulator) -> ObsState:
"""Get the ObsState of the remote system."""
return self._obs_state