# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""This module implements the DaqServer part of the MccsDaqReceiver device."""
from __future__ import annotations
import datetime
import functools
import json
import logging
import os
import queue
import re
import threading
from time import sleep
from typing import Any, Callable, Iterator, Optional, TypeVar, cast
import h5py
import numpy as np
from pydaq.daq_receiver_interface import DaqModes, DaqReceiver
from ska_control_model import ResultCode, TaskStatus
from ska_low_mccs_daq_interface.server import run_server_forever
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
__all__ = ["DaqHandler", "main"]
Wrapped = TypeVar("Wrapped", bound=Callable[..., Any])
# pylint: disable = too-many-lines
# Global parameters
bandwidth = 400.0
files_to_plot: dict[str, list[str]] = {}
class NumpyEncoder(json.JSONEncoder):
"""Converts numpy types to JSON."""
# pylint: disable=arguments-renamed
def default(self: NumpyEncoder, obj: Any) -> Any:
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
def convert_daq_modes(consumers_to_start: str) -> list[DaqModes]:
"""
Convert a string representation of DaqModes into a list of DaqModes.
Breaks a comma separated list into a list of words,
strips whitespace and extracts the `enum` part and casts the string
into a DaqMode or directly cast an int into a DaqMode.
:param consumers_to_start: A string containing a comma separated
list of DaqModes.
:return: a converted list of DaqModes or an empty list
if no consumers supplied.
"""
if consumers_to_start != "":
consumer_list = consumers_to_start.split(",")
converted_consumer_list = []
for consumer in consumer_list:
try:
# Convert string representation of a DaqMode.
converted_consumer = DaqModes[consumer.strip().split(".")[-1]]
except KeyError:
# Convert string representation of an int.
converted_consumer = DaqModes(int(consumer))
converted_consumer_list.append(converted_consumer)
return converted_consumer_list
return []
def check_initialisation(func: Wrapped) -> Wrapped:
"""
Return a function that checks component initialisation before calling.
This function is intended to be used as a decorator:
.. code-block:: python
@check_initialisation
def scan(self):
...
:param func: the wrapped function
:return: the wrapped function
"""
@functools.wraps(func)
def _wrapper(
self: DaqHandler,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Check for component initialisation before calling the function.
This is a wrapper function that implements the functionality of
the decorator.
:param self: This instance of an DaqHandler.
:param args: positional arguments to the wrapped function
:param kwargs: keyword arguments to the wrapped function
:raises ValueError: if component initialisation has
not been completed.
:return: whatever the wrapped function returns
"""
if not self.initialised:
raise ValueError(
f"Cannot execute '{type(self).__name__}.{func.__name__}'. "
"DaqReceiver has not been initialised. "
"Set adminMode to ONLINE to re-initialise."
)
return func(self, *args, **kwargs)
return cast(Wrapped, _wrapper)
# pylint: disable = too-many-instance-attributes
[docs]class DaqHandler:
"""An implementation of a DaqHandler device."""
TIME_FORMAT_STRING = "%d/%m/%y %H:%M:%S"
[docs] def __init__(self: DaqHandler):
"""Initialise this device."""
self.daq_instance: DaqReceiver = None
self._receiver_started: bool = False
self._initialised: bool = False
self._stop_bandpass: bool = False
self._monitoring_bandpass: bool = False
self.logger = logging.getLogger("daq-server")
self.client_queue: queue.SimpleQueue | None = None
self._data_mode_mapping: dict[str, DaqModes] = {
"burst_raw": DaqModes.RAW_DATA,
"cont_channel": DaqModes.CONTINUOUS_CHANNEL_DATA,
"integrated_channel": DaqModes.INTEGRATED_CHANNEL_DATA,
"burst_channel": DaqModes.CHANNEL_DATA,
"burst_beam": DaqModes.BEAM_DATA,
"integrated_beam": DaqModes.INTEGRATED_BEAM_DATA,
"correlator": DaqModes.CORRELATOR_DATA,
"station": DaqModes.STATION_BEAM_DATA,
"antenna_buffer": DaqModes.ANTENNA_BUFFER,
}
# TODO: Check this typehint. Floats might be ints, not sure.
self._antenna_locations: dict[
str, tuple[list[int], list[float], list[float]]
] = {}
self._plots_to_send: bool = False
self._x_bandpass_plots: queue.Queue = queue.Queue()
self._y_bandpass_plots: queue.Queue = queue.Queue()
self._rms_plots: queue.Queue = queue.Queue()
self._station_name: str = "a_station_name" # TODO: Get Station TRL/ID
# Callback called for every data mode.
def _file_dump_callback( # noqa: C901
self: DaqHandler,
data_mode: str,
file_name: str,
additional_info: Optional[str] = None,
) -> None:
"""
Call a callback for specific data mode.
Callbacks for all or specific data modes should be called here.
:param data_mode: The DAQ data type written
:param file_name: The filename written
:param additional_info: Any additional information/metadata.
"""
# Callbacks to call for all data modes.
daq_mode = self._data_mode_mapping[data_mode]
if daq_mode not in {DaqModes.STATION_BEAM_DATA, DaqModes.CORRELATOR_DATA}:
metadata = self.daq_instance._persisters[daq_mode].get_metadata(
tile_id=additional_info
)
else:
metadata = self.daq_instance._persisters[daq_mode].get_metadata()
if additional_info is not None and metadata is not None:
metadata["additional_info"] = additional_info
# Call additional callbacks per data mode if needed.
if data_mode == "read_raw_data":
pass
if data_mode == "read_beam_data":
pass
if data_mode == "integrated_beam":
pass
if data_mode == "station_beam":
pass
if data_mode == "read_channel_data":
pass
if data_mode == "continuous_channel":
pass
if data_mode == "integrated_channel":
self._integrated_channel_callback(
data_mode=data_mode,
file_name=file_name,
metadata=metadata,
)
if data_mode == "correlator":
pass
def _integrated_channel_callback(
self: DaqHandler,
data_mode: str,
file_name: str,
metadata: Optional[str] = None,
) -> None:
"""
Call callbacks for only the integrated channel DaqMode.
:param data_mode: The DAQ data type written
:param file_name: The filename written
:param metadata: Any additional information.
"""
if self.client_queue:
self.client_queue.put(
(data_mode, file_name, json.dumps(metadata, cls=NumpyEncoder))
)
[docs] def initialise(
self: DaqHandler, config: dict[str, Any]
) -> tuple[ResultCode, str]: # noqa: E501
"""
Initialise a new DaqReceiver instance.
:param config: the configuration to apply
:return: a resultcode, message tuple
"""
if self._initialised is False:
self.logger.info("Initialising daq.")
self.daq_instance = DaqReceiver()
try:
if config:
self.daq_instance.populate_configuration(config)
self.daq_instance.initialise_daq()
self._receiver_started = True
self._initialised = True
# pylint: disable=broad-except
except Exception as e:
self.logger.error(
"Caught exception in `DaqHandler.initialise`: %s", e
) # noqa: E501
return ResultCode.FAILED, f"Caught exception: {e}"
self.logger.info("Daq initialised.")
return ResultCode.OK, "Daq successfully initialised"
# else
self.logger.info("Daq already initialised")
return ResultCode.REJECTED, "Daq already initialised"
@property
def initialised(self) -> bool:
"""
Return whether the DAQ is initialised.
:return: whether the DAQ is initialised.
"""
return self._initialised
[docs] @check_initialisation
def start(
self: DaqHandler,
modes_to_start: str,
) -> Iterator[str | tuple[str, str, str]]:
"""
Start data acquisition with the current configuration.
A infinite streaming loop will be started until told to stop.
This will notify the client of state changes and metadata
of files written to disk, e.g. `data_type`.`file_name`.
:param modes_to_start: string listing the modes to start.
:yield: a status update.
:raises ValueError: if an invalid DaqMode is supplied
"""
try:
# Convert string representation to DaqModes
converted_modes_to_start: list[DaqModes] = convert_daq_modes(
modes_to_start
) # noqa: E501
except ValueError as e:
self.logger.error("Value Error! Invalid DaqMode supplied! %s", e)
raise
if not self._receiver_started:
self.daq_instance.initialise_daq()
self._receiver_started = True
try:
self.client_queue = queue.SimpleQueue()
callbacks = [self._file_dump_callback] * len(converted_modes_to_start)
self.daq_instance.start_daq(converted_modes_to_start, callbacks)
self.logger.info("Daq listening......")
yield "LISTENING"
yield from iter(self.client_queue.get, None)
yield "STOPPED"
finally:
# prevent queue from building up indefinitely
self.client_queue = None
[docs] @check_initialisation
def stop(self: DaqHandler) -> tuple[ResultCode, str]:
"""
Stop data acquisition.
:return: a resultcode, message tuple
"""
self.logger.info("Stopping daq.....")
self.daq_instance.stop_daq()
self._receiver_started = False
if self.client_queue:
self.client_queue.put(None)
return ResultCode.OK, "Daq stopped"
[docs] @check_initialisation
def get_configuration(
self: DaqHandler,
) -> dict[str, Any]:
"""
Retrieve the current DAQ configuration.
:return: a configuration dictionary.
"""
return self.daq_instance.get_configuration()
[docs] @check_initialisation
def get_status(self: DaqHandler) -> dict[str, Any]:
"""
Provide status information for this MccsDaqReceiver.
This method returns status as a json string with entries for:
- Running Consumers: [DaqMode.name: str, DaqMode.value: int]
- Receiver Interface: "Interface Name": str
- Receiver Ports: [Port_List]: list[int]
- Receiver IP: "IP_Address": str
- Bandpass Monitor: "Monitoring Status": bool
:return: A json string containing the status of this DaqReceiver.
"""
# 2. Get consumer list, filter by `running`
full_consumer_list = self.daq_instance._running_consumers.items()
running_consumer_list = [
[consumer.name, consumer.value]
for consumer, running in full_consumer_list
if running
]
# 3. Get Receiver Interface, Ports and IP (and later `Uptime`)
receiver_interface = self.daq_instance._config["receiver_interface"]
receiver_ports = self.daq_instance._config["receiver_ports"]
receiver_ip = self.daq_instance._config["receiver_ip"]
# 4. Compose into some format and return.
return {
"Running Consumers": running_consumer_list,
"Receiver Interface": receiver_interface,
"Receiver Ports": receiver_ports,
"Receiver IP": [
(
receiver_ip.decode()
if isinstance(receiver_ip, bytes)
else receiver_ip
) # noqa: E501
],
"Bandpass Monitor": self._monitoring_bandpass,
}
# TODO: Refactor this method to farm out some steps.
# pylint: disable = too-many-locals, too-many-statements
# pylint: disable = too-many-branches
[docs] @check_initialisation
def start_bandpass_monitor( # noqa: C901
self: DaqHandler,
argin: str,
) -> Iterator[tuple[TaskStatus, str, str | None, str | None, str | None]]:
"""
Begin monitoring antenna bandpasses.
:param argin: A dict of arguments to pass to `start_bandpass_monitor` command.
* plot_directory: Plotting directory.
Mandatory.
* monitor_rms: Flag to enable or disable RMS monitoring.
Optional. Default False.
[DEPRECATED - To be removed.]
* auto_handle_daq: Flag to indicate whether the DaqReceiver should
be automatically reconfigured, started and stopped during this
process if necessary.
Optional. Default False.
[DEPRECATED - To be removed.]
* cadence: Number of seconds over which to average data.
Optional. Default 0 (returns snapshots).
:yields: Taskstatus, Message, bandpass/rms plot(s).
:returns: TaskStatus, Message, None, None, None
"""
if self._monitoring_bandpass:
yield (
TaskStatus.REJECTED,
"Bandpass monitor is already active.",
None,
None,
None,
)
return
self._stop_bandpass = False
params: dict[str, Any] = json.loads(argin)
try:
plot_directory: str = params["plot_directory"]
except KeyError:
self.logger.error("Param `argin` must have key for `plot_directory`")
yield (
TaskStatus.REJECTED,
"Param `argin` must have key for `plot_directory`",
None,
None,
None,
)
return
# monitor_rms: bool = cast(bool, params.get("monitor_rms", False))
cadence = cast(int, params.get("cadence", 0))
auto_handle_daq = params.get("auto_handle_daq", False)
# Convert to bool if we have a string.
if not isinstance(auto_handle_daq, bool):
# pylint: disable = simplifiable-if-statement
if auto_handle_daq == "True":
auto_handle_daq = True
else:
auto_handle_daq = False
# Check DAQ is in the correct state for monitoring bandpasses.
# If not, throw an error if we chose not to auto_handle_daq
# otherwise configure appropriately.
current_config = self.get_configuration()
if current_config["append_integrated"]:
if not auto_handle_daq:
self.logger.error(
"Current DAQ config is invalid. "
"The `append_integrated` option must be set to false "
"for bandpass monitoring."
)
yield (
TaskStatus.REJECTED,
"Current DAQ config is invalid. "
"The `append_integrated` option must be set to false "
"for bandpass monitoring.",
None,
None,
None,
)
return
self.configure({"append_integrated": False})
# Check correct consumer is running.
running_consumers = self.get_status().get("Running Consumers", "")
if ["INTEGRATED_CHANNEL_DATA", 5] not in running_consumers:
if not auto_handle_daq:
self.logger.error(
"INTEGRATED_CHANNEL_DATA consumer must be running "
"before bandpasses can be monitored."
"Running consumers: %s",
running_consumers,
)
yield (
TaskStatus.REJECTED,
"INTEGRATED_CHANNEL_DATA consumer must be running "
"before bandpasses can be monitored.",
None,
None,
None,
)
return
# Auto start DAQ.
# TODO: Need to be able to start consumers incrementally for this.
# result = self.start(modes_to_start="INTEGRATED_CHANNEL_DATA")
# while "INTEGRATED_CHANNEL_DATA" not in running_consumers:
# tmp+=1
# sleep(2)
# running_consumers = self.get_status().get("Running Consumers")
# if tmp > 5:
# return
# Create plotting directory structure
if not self.create_plotting_directory(plot_directory, self._station_name):
self.logger.error(
"Unable to create plotting directory at %s", plot_directory
)
yield (
TaskStatus.FAILED,
f"Unable to create plotting directory at: {plot_directory}",
None,
None,
None,
)
return
data_directory = self.daq_instance._config["directory"]
self.logger.info("Using data dir %s", data_directory)
# # Start rms thread
# if monitor_rms:
# self.logger.debug("Starting RMS plotting thread.")
# rms = Process(
# target=self.generate_rms_plots,
# name=f"rms-plotter({self})",
# args=(station_name, os.path.join(plot_directory, station_name)),
# )
# rms.start()
# Start directory monitor
observer = Observer()
data_handler = IntegratedDataHandler(self._station_name)
observer.schedule(data_handler, data_directory)
observer.start()
# Start plotting thread
self.logger.debug("Starting bandpass plotting thread.")
bandpass_plotting_thread = threading.Thread(
target=self.generate_bandpass_plots,
args=(
os.path.join(plot_directory, self._station_name),
self._station_name,
cadence,
),
)
bandpass_plotting_thread.start()
# Wait for stop, monitoring disk space in the meantime
max_dir_size = 1000 * 1024 * 1024
self.logger.info("Bandpass monitor active, entering wait loop.")
self.logger.info(
"Params: plot_directory: %s, auto_handle_daq: %s, cadence: %i",
plot_directory,
auto_handle_daq,
cadence,
)
self._monitoring_bandpass = True
yield (TaskStatus.IN_PROGRESS, "Bandpass monitor active", None, None, None)
while not self._stop_bandpass:
try:
dir_size = sum(
os.path.getsize(f)
for f in os.listdir(data_directory)
if os.path.isfile(f)
)
except FileNotFoundError as e:
self.logger.warning("Could not find file: %s", e)
if dir_size > max_dir_size:
self.logger.error(
"Consuming too much disk space! Stopping bandpass monitor! %i/%i",
dir_size,
max_dir_size,
)
self._stop_bandpass = True
break
try:
x_bandpass_plot = self._x_bandpass_plots.get(block=False)
except queue.Empty:
x_bandpass_plot = None
except Exception as e: # pylint: disable = broad-exception-caught
self.logger.error(
"Unexpected exception retrieving x_bandpass_plot: %s", e
)
try:
y_bandpass_plot = self._y_bandpass_plots.get(block=False)
except queue.Empty:
y_bandpass_plot = None
except Exception as e: # pylint: disable = broad-exception-caught
self.logger.error(
"Unexpected exception retrieving y_bandpass_plot: %s", e
)
try:
rms_plot = self._rms_plots.get(block=False)
except queue.Empty:
rms_plot = None
except Exception as e: # pylint: disable = broad-exception-caught
self.logger.error("Unexpected exception retrieving rms_plot: %s", e)
if all(
plot is None for plot in [x_bandpass_plot, y_bandpass_plot, rms_plot]
):
# If we don't have any plots, don't uselessly spam [None]s.
pass
else:
self.logger.debug("Transmitting bandpass data.")
yield (
TaskStatus.IN_PROGRESS,
"plot sent",
x_bandpass_plot,
y_bandpass_plot,
rms_plot,
)
self.logger.debug("Bandpass data transmitted.")
sleep(1) # Plots will never be sent more often than once per second.
# Stop and clean up
self.logger.info("Waiting for threads and processes to terminate.")
# TODO: Need to be able to stop consumers incrementally for this.
# if auto_handle_daq:
# self.stop()
observer.stop()
observer.join()
bandpass_plotting_thread.join()
# if monitor_rms:
# rms.join()
self._monitoring_bandpass = False
self.logger.info("Bandpass monitoring complete.")
yield (TaskStatus.COMPLETED, "Bandpass monitoring complete.", None, None, None)
[docs] @check_initialisation
def stop_bandpass_monitor(self: DaqHandler) -> tuple[ResultCode, str]:
"""
Stop monitoring antenna bandpasses.
:return: a resultcode, message tuple
"""
if not self._monitoring_bandpass:
self.logger.info("Cannot stop bandpass monitor before it has started.")
return (ResultCode.REJECTED, "Bandpass monitor not yet started.")
if self._stop_bandpass:
self.logger.info("Bandpass monitor already stopping.")
return (ResultCode.REJECTED, "Bandpass monitor already stopping.")
self._stop_bandpass = True
self.logger.info("Bandpass monitor stopping.")
return (ResultCode.OK, "Bandpass monitor stopping.")
[docs] def generate_rms_plots( # noqa: C901
self: DaqHandler, station_name: str, plotting_directory: str
) -> None:
"""
Generate RMS plots.
:param station_name: Station name.
:param plotting_directory: Directory to store plots in.
"""
# Note: This method is commented out until we can access antenna locations
# and tile proxies in order to retrieve adc power and properly label graphs.
# Get station name (from somewhere...)
# station_name = aavs_station.configuration["station"]["name"]
# _connect_station()
# Extract antenna locations
# antenna_base, antenna_x, antenna_y = self._antenna_locations[station_name]
# Generate dummy RMS data
# colors = np.random.random(len(antenna_x)) * 30
# Generate figure and canvas
# fig = Figure(figsize=(18, 8))
# canvas = FigureCanvas(fig)
# # Generate plot for X
# ax = fig.subplots(nrows=1, ncols=2, sharex="all", sharey="all")
# fig.suptitle(f"{station_name} Antenna RMS", fontsize=14)
# x_scatter = ax[0].scatter(
# antenna_x,
# antenna_y,
# s=50,
# marker="o",
# c=colors,
# cmap="jet",
# vmin=0,
# vmax=38,
# edgecolors="k",
# linewidths=0.8,
# )
# for i, _ in enumerate(antenna_x):
# ax[0].text(
# antenna_x[i] + 0.3,
# antenna_y[i] + 0.3,
# antenna_base[i],
# fontsize=7,
# )
# ax[0].set_title(f"{station_name} Antenna RMS Map - X pol")
# ax[0].set_xlabel("X")
# ax[0].set_ylabel("Y")
# # Generate plot for Y
# y_scatter = ax[1].scatter(
# antenna_x,
# antenna_y,
# s=50,
# marker="o",
# c=colors,
# cmap="jet",
# vmin=0,
# vmax=38,
# edgecolors="k",
# linewidths=0.8,
# )
# for i, _ in enumerate(antenna_x):
# ax[1].text(
# antenna_x[i] + 0.3,
# antenna_y[i] + 0.3,
# antenna_base[i],
# fontsize=7,
# )
# ax[1].set_title(f"{station_name} Antenna RMS Map - Y Pol")
# ax[1].set_xlabel("X")
# ax[1].set_ylabel("Y")
# # Add colorbar
# fig.subplots_adjust(
# bottom=0.1, top=0.9, left=0.1, right=0.88, wspace=0.05, hspace=0.17
# )
# cb_ax = fig.add_axes([0.9, 0.1, 0.02, 0.8])
# fig.colorbar(y_scatter, label="RMS", cax=cb_ax)
# # Continue until asked to stop
# while not self._stop_bandpass:
# # Check station status
# # _connect_station()
# # Grab RMS values
# antenna_rms_x = []
# antenna_rms_y = []
# for tile in aavs_station.tiles:
# rms = tile.get_adc_rms()
# antenna_rms_x.extend(rms[0::2])
# antenna_rms_y.extend(rms[1::2])
# # Update colors
# x_scatter.set_array(np.array(antenna_rms_x))
# y_scatter.set_array(np.array(antenna_rms_y))
# # Save plot
# fig.suptitle(
# f"{station_name} Antenna RMS "
# f"({datetime.datetime.utcnow().strftime(self.TIME_FORMAT_STRING)})",
# fontsize=14,
# )
# saved_filepath = os.path.join(plotting_directory, "antenna_rms.svg")
# canvas.print_figure(
# saved_filepath,
# pad_inches=0,
# dpi=200,
# figsize=(18, 8),
# )
# self._rms_plots.put(saved_filepath)
# # Done, sleep for a bit
# sleep(1)
# pylint: disable = too-many-locals
[docs] def generate_bandpass_plots( # noqa: C901
self: DaqHandler,
plotting_directory: str,
station_name: str,
cadence: int,
) -> None:
"""
Generate antenna bandpass plots.
:param station_name: The name of the station.
:param plotting_directory: Directory to store plots in.
:param cadence: Time in seconds over which to average bandpass data.
"""
global files_to_plot # pylint: disable=global-variable-not-assigned
config = self.get_configuration()
nof_channels = config["nof_channels"]
nof_antennas_per_tile = config["nof_antennas"]
nof_pols = config["nof_polarisations"]
nof_tiles = config["nof_tiles"]
x_pol_data: np.ndarray | None = None
x_pol_data_count: int = 0
y_pol_data: np.ndarray | None = None
y_pol_data_count: int = 0
# The shape is reversed as DAQ reads the data this way around.
full_station_data: np.ndarray = np.zeros(shape=(512, 256, 2), dtype=int)
files_received_per_tile: list[int] = [0] * nof_tiles
interval_start = None
_filename_expression = re.compile(
r"channel_integ_(?P<tile>\d+)_(?P<timestamp>\d+_\d+)_0.hdf5"
)
# Loop until asked to stop
self.logger.info("Entering bandpass plotting loop.")
while not self._stop_bandpass:
# Wait for files to be queued. Check every second.
if len(files_to_plot[station_name]) == 0:
sleep(1)
continue
# Get the first item in the list
filepath = files_to_plot[station_name].pop(0)
self.logger.debug("Processing %s", filepath)
# Extract Tile number
filename = os.path.basename(os.path.abspath(filepath))
parts = _filename_expression.match(filename)
if parts is not None:
tile_number = int(parts.groupdict()["tile"])
if tile_number is not None:
try:
files_received_per_tile[tile_number] += 1
except IndexError as e:
self.logger.error(
f"Caught exception: {e}. "
f"Tile {tile_number} out of bounds! "
f"Max tile number: {len(files_received_per_tile)}"
)
# Open newly create HDF5 file
with h5py.File(filepath, "r") as f:
# Data is in channels/antennas/pols order
try:
data: np.ndarray = f["chan_"]["data"][:]
# pylint: disable=broad-exception-caught
except Exception as e:
self.logger.error("Exception: %s", e)
continue
data = data.reshape((nof_channels, nof_antennas_per_tile, nof_pols))
# # Convert to power in dB
# np.seterr(divide="ignore")
# data = 10 * np.log10(data)
# data[np.isneginf(data)] = 0
# np.seterr(divide="warn")
# Append Tile data to full station set.
# full_station_data is made of blocks of data per TPM in TPM order.
# Each block of TPM data is in port order.
start_index = nof_antennas_per_tile * tile_number
full_station_data[
:, start_index : start_index + nof_antennas_per_tile, :
] = data
present = datetime.datetime.now()
if interval_start is None:
interval_start = present
# TODO: This block is currently useless. Get averaging back in.
# Loop over polarisations (separate plots)
for pol in range(nof_pols):
# Assign first data point or maintain sum of all data.
# Divide by _pol_data_count to calculate the moving average on-demand.
if pol == 1:
if x_pol_data is None:
x_pol_data_count = 1
x_pol_data = full_station_data[:, :, pol]
else:
x_pol_data_count += 1
x_pol_data = x_pol_data + full_station_data[:, :, pol]
elif pol == 0:
if y_pol_data is None:
y_pol_data_count = 1
y_pol_data = full_station_data[:, :, pol]
else:
y_pol_data_count += 1
y_pol_data = y_pol_data + full_station_data[:, :, pol]
# Delete read file.
os.unlink(filepath)
# Every `cadence` seconds, plot graph and add the averages
# to the queue to be sent to the Tango device,
# Assert that we've received the same number (1+) of files per tile.
if all(files_received_per_tile) and (
len(set(files_received_per_tile)) == 1
):
if (present - interval_start).total_seconds() > cadence:
self.logger.debug("Queueing data for transmission")
assert isinstance(full_station_data, np.ndarray)
x_data = full_station_data[:, :, 1].transpose()
# Averaged x data (commented out for now)
# x_data = x_pol_data.transpose() / x_pol_data_count
self._x_bandpass_plots.put(json.dumps(x_data.tolist()))
y_data = full_station_data[:, :, 0].transpose()
# Averaged y data (commented out for now)
# y_data = y_pol_data.transpose() / y_pol_data_count
self._y_bandpass_plots.put(json.dumps(y_data.tolist()))
self.logger.debug("Data queued for transmission.")
# Reset vars
x_pol_data = None
x_pol_data_count = 0
y_pol_data = None
y_pol_data_count = 0
interval_start = None
files_received_per_tile = [0] * nof_tiles
self.logger.info("Exiting bandpass plotting loop.")
# pylint: disable=broad-except
[docs] def create_plotting_directory(
self: DaqHandler, parent: str, station_name: str
) -> bool:
"""
Create plotting directory structure for this station.
:param parent: Parent plotting directory
:param station_name: Station name
:return: True if this method succeeded else False
"""
# Check if plot directory exists and if not create it
dir_name = os.path.join(parent, station_name)
if not os.path.isdir(dir_name):
try:
os.makedirs(dir_name, exist_ok=True)
except PermissionError as e:
self.logger.error(e)
self.logger.error(
"Could not create plotting directory %s. "
"Check that the path is valid and permission",
parent,
)
return False
except NotADirectoryError as e:
self.logger.error(e)
self.logger.error(
"Specified plotting directory (%s) is a file. Please check", parent
)
return False
except FileExistsError as e:
self.logger.error(e)
self.logger.error("Specified plotting directory (%s) is a file")
return False
except Exception as e:
self.logger.error(e)
self.logger.error(
"Unknown exception when creating plotting directory (%s)"
)
return False
return True
class IntegratedDataHandler(FileSystemEventHandler):
"""Detect files created in the data directory and generate plots."""
def __init__(self: IntegratedDataHandler, station_name: str):
"""
Initialise a new instance.
:param station_name: Station name
"""
self._station_name = station_name
files_to_plot[station_name] = []
self.logger = logging.getLogger("daq-server")
def on_any_event(self: IntegratedDataHandler, event: FileSystemEvent) -> None:
"""
Check every event for newly created files to process.
:param event: Event to check.
"""
# We are only interested in newly created files
global files_to_plot # pylint: disable=global-variable-not-assigned
if event.event_type in ["created"]:
# Ignore lock files and other temporary files
if not ("channel_integ" in event.src_path and "lock" not in event.src_path):
return
# Add to list
sleep(0.1)
self.logger.info("Detected %s", event.src_path)
files_to_plot[self._station_name].append(event.src_path)
[docs]def main() -> None:
"""
Entrypoint for the module.
Create and start a server.
"""
port = os.getenv("DAQ_GRPC_PORT", default="50051")
run_server_forever(DaqHandler(), int(port))
if __name__ == "__main__":
main()