Source code for ska_pst_lmc.component.monitor_data_handler

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides the base classes for handling monitoring data handling."""

from __future__ import annotations

import threading
from typing import Callable, Dict, Generic, TypeVar

S = TypeVar("S")
T = TypeVar("T")

SUBBAND_1: int = 1
SUBBAND_2: int = 2
SUBBAND_3: int = 3
SUBBAND_4: int = 4


[docs]class MonitorDataStore(Generic[S, T]): """ Generic monitoring data store that handles subband data. This is an abstract class that other classes should extend to allow following for a contract about how to handle subband data. """ _subband_data: Dict[int, S] def __init__(self: MonitorDataStore) -> None: """Initialise data store.""" self._subband_data = {}
[docs] def update_subband(self: MonitorDataStore, subband_id: int, subband_data: S) -> None: """ Update the stored subband data for a given subband id. This just updates an internal dictionary. Subclasses should only override this method if there is a specific reason to handle a subband, as the `monitor_data` method should be the main place to handle aggregation. :param subband_id: the subband that is being updated. :param subband_data: the data for the current subband. """ self._subband_data[subband_id] = subband_data
[docs] def reset(self: MonitorDataStore) -> None: """Reset the monitoring data state.""" self._subband_data.clear()
@property def monitor_data(self: MonitorDataStore) -> T: """ Return the current calculated monitoring data. Implementations of this should aggregate the monitoring data to be what is the current snapshot of data. :returns: current monitoring data. """ raise NotImplementedError("MonitorDataStore is abstract")
[docs]class MonitorDataHandler(Generic[S, T]): """ Generic monitor data handler for subband. This handler needs to be constructed with a :py:class:`MonitorDataStore` that takes subband monitoring data (type `S`) and can aggregate that data to return a combined monitoring data object (type `T`). Since monitoring happens in the background and asynchronously the updates to the data store is guarded by a threading lock. There should be no need to override any of these methods. """ def __init__( self: MonitorDataHandler, data_store: MonitorDataStore[S, T], monitor_data_callback: Callable[[T], None], ): """ Initialise data handler. :param data_store: the monitoring data store to handle subband monitoring data. :param monitor_data_callback: the callback to call once subband monitoring data has been merged with other subband data. """ self._monitor_lock = threading.Lock() self._data_store = data_store self._monitor_data_callback = monitor_data_callback # get initial data self._monitor_data = data_store.monitor_data
[docs] def handle_subband_data(self: MonitorDataHandler, subband_id: int, subband_data: S) -> None: """ Handle subband monitoring data. This will call the data store's :py:meth:`MonitorDataStore.update_subband` within a write lock. After updating the data store it will call the monitoring callback with the latest monitoring data from the data store. """ with self._monitor_lock: self._data_store.update_subband(subband_id=subband_id, subband_data=subband_data) self.update_monitor_data(notify=True)
@property def monitor_data(self: MonitorDataHandler) -> T: """Get current monitor data.""" return self._monitor_data
[docs] def reset_monitor_data(self: MonitorDataHandler) -> None: """Reset the monitor data store.""" with self._monitor_lock: self._data_store.reset() self.update_monitor_data(notify=True)
[docs] def update_monitor_data(self: MonitorDataHandler, notify: bool) -> None: """ Update monitoring data. This method is used internally as well as by DSP to recalculate monitoring data. To force callback `notify` must be set to `True`. :param notify: whether to notify callback that there has been an update. :type notify: bool """ self._monitor_data = self._data_store.monitor_data if notify: self._monitor_data_callback(self._monitor_data)