Source code for ska_pst.testutils.stats.stats_monitor

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module to monitor statistics files."""

from __future__ import annotations

import logging
import pathlib
import threading
from dataclasses import InitVar, dataclass, field
from typing import List

from watchdog.events import FileSystemEvent, PatternMatchingEventHandler
from watchdog.observers.polling import PollingObserver as Observer

# Polling every 1 second
OBSERVER_POLLING_INTERVAL = 1.0


[docs]@dataclass(kw_only=True, frozen=True) class StatFileCreatedEvent: """Data class capturing a file creation event. :ivar file_path: the full path to the file that was created. :vartype file_path: pathlib.Path :ivar create_datetime: the time, in seconds from epoch, when the file was created. :vartype create_datetime: float """ file_path: pathlib.Path create_datetime: float
[docs]@dataclass(kw_only=True) class StatFileEventDifference: """A data class used to calculate differences in file creation events. :ivar first_file_path: the path to the file that was created first :vartype first_file_path: pathlib.Path :ivar second_file_path: the path to the file that was created second :vartype second_file_path: pathlib.Path :ivar creation_time_difference: the difference in creation time of the files :vartype creation_time_difference: float """ first_file_event: InitVar[StatFileCreatedEvent] second_file_event: InitVar[StatFileCreatedEvent] first_file_path: pathlib.Path = field(init=False) second_file_path: pathlib.Path = field(init=False) creation_time_difference: float = field(init=False) def __post_init__( self: StatFileEventDifference, first_file_event: StatFileCreatedEvent, second_file_event: StatFileCreatedEvent, ) -> None: """Set the computed properties for this instance.""" self.first_file_path = first_file_event.file_path self.second_file_path = second_file_event.file_path self.creation_time_difference = second_file_event.create_datetime - first_file_event.create_datetime
[docs]class ScanStatFileWatcher(PatternMatchingEventHandler): """Class to watch for when STAT file files are created. Instances of this class watches a scan directory for real time monitoring STAT HDF5 files to be created and stores the events for later. """ def __init__( self: ScanStatFileWatcher, scan_path: pathlib.Path, logger: logging.Logger | None = None, ) -> None: """Create instance of class. :param scan_path: the local scan path for files :type scan_path: pathlib.Path :param logger: the logger to use when performing logging, defaults to None :type logger: logging.Logger | None, optional """ self._logger = logger or logging.getLogger(__name__) self._scan_path = scan_path self._events: List[StatFileCreatedEvent] = [] self._observer = Observer(timeout=OBSERVER_POLLING_INTERVAL) self._lock = threading.Lock() super().__init__(patterns=["*/input-stats/*.h5"], ignore_directories=True, case_sensitive=True) def __del__(self: ScanStatFileWatcher) -> None: """Teardown the watcher safely when instance is destroyed.""" self._stop()
[docs] def watch(self: ScanStatFileWatcher) -> None: """Start watching for STAT files.""" self._logger.debug("ScanStatFileWatcher starting to watch for file create events") with self._lock: assert not self._observer.is_alive(), "Watcher is already watching." self._observer.schedule(event_handler=self, path=self._scan_path, recursive=True) self._observer.start()
[docs] def stop(self: ScanStatFileWatcher) -> None: """Stop watching for STAT files.""" self._logger.debug("ScanStatFileWatcher stopping watching for file create events") self._stop()
def _stop(self: ScanStatFileWatcher) -> None: """Stop watching STAT files without logging.""" with self._lock: if self._observer.is_alive(): self._observer.stop() self._observer.join()
[docs] def on_created(self: ScanStatFileWatcher, event: FileSystemEvent) -> None: """Handle an on created system event. The event comes from `watchdog` and this method converts the event to a :py:class:`StatFileCreatedEvent` instance and saves the event that can then later be retrieved from :py:attr:`events`. """ file_path = pathlib.Path(event.src_path) file_event = StatFileCreatedEvent( file_path=file_path, create_datetime=file_path.stat().st_ctime, ) self._logger.debug(f"Received on_created event {file_event}") with self._lock: self._events.append(file_event)
@property def events(self: ScanStatFileWatcher) -> List[StatFileCreatedEvent]: """Get the list of file created events.""" with self._lock: return [*self._events]
[docs] def event_time_diffs(self: ScanStatFileWatcher) -> List[StatFileEventDifference]: """Get a list of differences between file creation events.""" events = self.events if len(events) <= 1: return [] return [ StatFileEventDifference(first_file_event=f, second_file_event=s) for (f, s) in zip(events[:-1], events[1:]) ]