Benchmon Data Export & Backends Internalsο
Overviewο
Benchmon provides two distinct back-ends for collecting system metrics, allowing users to trade off between ease of use/portability and raw performance/low overhead. Detailed below are the architectures of these back-ends and how data is exported to InfluxDB.
1. Back-end Architecturesο
A. Standard Backend (Python/Bash) - Defaultο
This is the default mode of operation when running benchmon-run. It uses a hybrid approach:
CSV Recording: Legacy Bash scripts (
disk_mon.sh,cpu_mon.sh, etc.) run as subprocesses to collect metrics from/procand system tools, writing directly to CSV files.InfluxDB Streaming: A dedicated Pure Python component (
HighPerformanceCollector) runs within the main process. It uses a multi-threaded Producer-Consumer architecture to read system metrics (directly from/procwhere possible) and stream them to InfluxDB asynchronously.
Pros:
Python-based, no compilation required for the collector.
Robust error handling and logging.
Concurrent non-blocking writes to InfluxDB.
Cons:
Higher CPU overhead compared to compiled languages.
Limited sampling frequency (typically up to 10-20 Hz before overhead becomes noticeable).
Status of previous βHP Scriptβ implementation:
Use of intermediate Bash scripts (*_hp.sh) piping data to a Python processor via named pipes (described in earlier tutorials) has been deprecated and removed. It has been replaced by the pure Python HighPerformanceCollector which offers better stability and maintainability without the complexity of managing external pipes.
B. High-Performance Backend (C++)ο
For scenarios requiring high-frequency sampling (>100 Hz) or minimal system intrusion, Benchmon provides a C++ backend named rt-monitor.
Activation:
Use the --binary (or -b) flag with benchmon-run.
benchmon-run --system --binary --sys-freq 100
Architecture:
Binaries: Powered by the
rt-monitorexecutable (built via CMake).Core: Uses native C++ system calls and highly optimized parsing of
/procfiles.Output: Writes compact binary files (
.bin) to disk, minimizing I/O overhead.InfluxDB: The C++ backend features an internal
AsyncInfluxDBWritercapable of streaming metrics directly to InfluxDB with negligible latency.
Pros:
Extremely low CPU and specific memory footprint.
Supports sampling frequencies of 100 Hz or more.
2. InfluxDB Integration Architecture (Python Collector)ο
When using the default Python backend with benchmon-run --grafana, the HighPerformanceCollector (internal class) is instantiated.
graph TD
subgraph "Benchmon Process"
A[Producers Threads] -->|Queue| B[Internal Queues]
B -->|Deque| C[Consumers Threads]
C -->|Batch Write| D[InfluxDB Client]
end
D -->|HTTP/Line Protocol| E[(InfluxDB v3)]
E --> F[Grafana]
Key Componentsο
Producers: Dedicated threads for each metric type (CPU, Mem, Net, Disk). They perform blocking I/O (reading
/proc) independent of the main loop.Queues: Thread-safe queues buffer data points, handling bursts in system activity or network latency.
Consumers: Worker threads that pull data from queues, format it into InfluxDB Line Protocol, and dispatch it via the
InfluxDBClient3(using the official high-performance Flight/Arrow client where applicable).
3. How to Useο
Collecting Dataο
benchmon-start-grafana starts the local InfluxDB + Grafana stack used by Benchmon. The default value of --save-dir is ./benchmon_traces_.
# 1. Start the stack (InfluxDB + Grafana) with a specified directory
benchmon-start-grafana --save-dir /tmp/demo --influxdb-query-file-limit 2000
# or use default data storage directory (./benchmon_traces_)
benchmon-start-grafana --influxdb-query-file-limit 2000
# 2. Run Benchmon (Stream to InfluxDB)
benchmon-run --system --grafana --save-dir /tmp/demo/run1
--influxdb-query-file-limit is the number of parquet files that a single InfluxDB query may scan. Increase it when you plan to visualize all data in a large database without providing --start-time and --end-time. A value of 1000 can still be too small for large single-run databases; 2000 is a practical next step before moving to a narrower time window. The value is applied when benchmon-start-grafana launches InfluxDB, so you must restart the stack after changing it. This flag affects query/read limits only; it does not change import or write batching.
Visualizing Dataο
Once benchmon-run starts, data appears immediately in InfluxDB. Open one of the pre-provisioned Grafana dashboards (deployed by benchmon-start-grafana) to view real-time metrics.
System Overview: CPU, Memory, Network I/O.
Detailed Views: Per-core frequency, disk IOPS.
If a measurement table is missing from the database, benchmon skips the corresponding plot instead of failing the entire visualization run.
For a step-by-step guide on setting up the visualization stack, see the InfluxDB + Grafana Integration Tutorial.
4. Referencesο
Python Collector:
benchmon/run/hp_collector.pyC++ Backend:
benchmon/rt-monitor/Line Protocol: InfluxDB Documentation local iowait=$7 local irq=$8 local softirq=$9 local steal=${10} local guest=${11} local guestnice=${12}
Send to named pipe
echo βCPU|$timestamp|$cpu_core $user $nice $system $idle $iowait $irq $softirq $steal $guest $guestniceβ > β$influxdb_pipeβ }
#### Convert to InfluxDB Format
```python
def _process_cpu_data(self, timestamp: float, data: str):
"""Process CPU data"""
parts = data.strip().split()
cpu_core = parts[0]
stats = {
'user': int(parts[1]),
'nice': int(parts[2]),
'system': int(parts[3]),
'idle': int(parts[4]),
'iowait': int(parts[5]),
'irq': int(parts[6]),
'softirq': int(parts[7]),
'steal': int(parts[8]) if len(parts) > 8 else 0
}
# Generate Line Protocol
# cpu,host=hostname,core=cpu0 user=123,nice=456,system=789 timestamp
self.hook.on_cpu_data(timestamp, cpu_core, stats)
Generated InfluxDB Dataο
cpu,host=server1,core=cpu0 user=1234,nice=0,system=567,idle=8901,iowait=23,irq=0,softirq=45 1640995200
cpu,host=server1,core=cpu1 user=1123,nice=0,system=623,idle=8654,iowait=34,irq=1,softirq=56 1640995200
2. Memory Monitoring (mem_mon_hp.sh)ο
Data Sourceο
# Read /proc/meminfo file
grep "^MemTotal:\|^MemFree:\|^MemAvailable:" /proc/meminfo
Data Formatο
# HP processor format
MEMORY|timestamp|value1,value2,value3,...
Implementation Exampleο
send_to_influxdb() {
local timestamp=$1
local meminfo_values=""
local memory_fields=("MemTotal" "MemFree" "MemAvailable" "Buffers" "Cached")
for field in "${memory_fields[@]}"; do
local value=$(grep "^${field}:" /proc/meminfo | awk '{print $2}')
if [[ -n "$meminfo_values" ]]; then
meminfo_values="${meminfo_values},${value}"
else
meminfo_values="$value"
fi
done
echo "MEMORY|$timestamp|$meminfo_values" > "$influxdb_pipe"
}
Generated InfluxDB Dataο
memory,host=server1 total=16777216,free=8388608,available=12345678,buffers=1234567,cached=2345678 1640995200
3. Network Monitoring (net_mon_hp.sh)ο
Data Sourceο
# Read /proc/net/dev file
cat /proc/net/dev
Data Formatο
# HP processor format
NETWORK|timestamp|interface rx_bytes rx_packets ... tx_bytes tx_packets ...
Implementation Exampleο
send_to_influxdb() {
local timestamp=$1
local interface=$2
local rx_bytes=$3
local rx_packets=$4
local tx_bytes=$11
local tx_packets=$12
# ... other parameters
echo "NETWORK|$timestamp|$interface $rx_bytes $rx_packets $rx_errs $rx_drop $rx_fifo $rx_frame $rx_compressed $rx_multicast $tx_bytes $tx_packets $tx_errs $tx_drop $tx_fifo $tx_colls $tx_carrier $tx_compressed" > "$influxdb_pipe"
}
Generated InfluxDB Dataο
network,host=server1,interface=eth0 rx_bytes=1048576,rx_packets=1024,tx_bytes=2097152,tx_packets=2048 1640995200
network,host=server1,interface=lo rx_bytes=12345,rx_packets=123,tx_bytes=12345,tx_packets=123 1640995200
4. Disk Monitoring (disk_mon_hp.sh)ο
Data Sourceο
# Read /proc/diskstats file
cat /proc/diskstats
Data Formatο
# HP processor format (following /proc/diskstats format)
DISK|timestamp|major minor device rd_cd rd_md sect_rd time_rd wr_cd wr_md sect_wr time_wr io_ip time_io time_wei_io
Implementation Exampleο
send_to_influxdb() {
local timestamp=$1
local major=$2
local minor=$3
local device=$4
# ... other disk statistics parameters
echo "DISK|$timestamp|$major $minor $device $rd_cd $rd_md $sect_rd $time_rd $wr_cd $wr_md $sect_wr $time_wr $io_ip $time_io $time_wei_io" > "$influxdb_pipe"
}
Generated InfluxDB Dataο
disk,host=server1,device=sda reads_completed=1234,reads_merged=56,sectors_read=123456,time_reading=789,writes_completed=567,writes_merged=23,sectors_written=67890,time_writing=345 1640995200
5. CPU Frequency Monitoring (cpufreq_mon_hp.sh)ο
Data Sourceο
# Read CPU frequency files
cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_cur_freq
Data Formatο
# HP processor format
CPUFREQ|timestamp|cpu_core frequency
Implementation Exampleο
send_to_influxdb() {
local timestamp=$1
local cpu_core=$2
local frequency=$3
echo "CPUFREQ|$timestamp|$cpu_core $frequency" > "$influxdb_pipe"
}
Generated InfluxDB Dataο
cpufreq,host=server1,core=cpu0 frequency=2400000 1640995200
cpufreq,host=server1,core=cpu1 frequency=2350000 1640995200
6. InfiniBand Monitoring (ib_mon_hp.sh)ο
Data Sourceο
# Read InfiniBand counters
cat /sys/class/infiniband/*/ports/1/counters/*
Data Formatο
# Direct Line Protocol format
infiniband_stats,host=hostname,interface=interface,port=port,metric=metric_key value=metric_value timestamp_ns
Implementation Exampleο
send_to_influxdb() {
local timestamp_ns=$1
local interface=$2
local port=$3
local metric_key=$4
local metric_value=$5
local hostname=$(hostname)
# Convert timestamp to nanoseconds
if [[ $timestamp_ns == *.* ]]; then
timestamp_ns=$(echo "$timestamp_ns * 1000000000" | bc | cut -d. -f1)
else
timestamp_ns="${timestamp_ns}000000000"
fi
echo "infiniband_stats,host=$hostname,interface=$interface,port=$port,metric=$metric_key value=$metric_value $timestamp_ns" > "$influxdb_pipe"
}
Generated InfluxDB Dataο
infiniband_stats,host=server1,interface=mlx5_0,port=1,metric=port_rcv_data value=1234567890 1640995200000000000
infiniband_stats,host=server1,interface=mlx5_0,port=1,metric=port_xmit_data value=9876543210 1640995200000000000
HP Processor (hp_processor.py) Detailed Analysisο
Core Functionsο
The HP processor is the core component of Benchmonβs high-performance data processing, responsible for:
Named Pipe Reading: Receive data from
*_hp.shscriptsData Parsing: Parse different types of monitoring data
Format Conversion: Convert to InfluxDB Line Protocol format
Batch Sending: Send to database via InfluxDBSender
Data Processing Flowο
def _process_data_line(self, line: str):
"""Process data line"""
try:
# Parse data format: TYPE|timestamp|data
parts = line.strip().split('|', 2)
if len(parts) != 3:
return
data_type, timestamp_str, data = parts
timestamp = float(timestamp_str)
# Dispatch processing based on data type
if data_type == 'CPU':
self._process_cpu_data(timestamp, data)
elif data_type == 'CPUFREQ':
self._process_cpufreq_data(timestamp, data)
elif data_type == 'MEMORY':
self._process_memory_data(timestamp, data)
elif data_type == 'NETWORK':
self._process_network_data(timestamp, data)
elif data_type == 'DISK':
self._process_disk_data(timestamp, data)
self.data_points_processed += 1
except Exception as e:
self.logger.debug(f"Error processing data line '{line}': {e}")
Performance Featuresο
Non-blocking I/O: Use
selectfor non-blocking pipe readingBatch Processing: Aggregate data before batch sending
Error Recovery: Single data point errors donβt affect overall processing
Performance Statistics: Real-time processing rate statistics
InfluxDB Sender (influxdb_sender.py) Detailed Analysisο
Core Functionsο
InfluxDBSender is responsible for efficiently sending processed data to InfluxDB:
class InfluxDBSender:
"""Send metrics data to InfluxDB in real-time"""
def __init__(self, logger: logging.Logger, config: Dict[str, Any]):
self.logger = logger
self.config = config
# Configuration parameters
self.influxdb_url = config.get('url', 'http://localhost:8086')
self.organization = config.get('organization', 'benchmon')
self.bucket = config.get('bucket', 'metrics')
self.token = config.get('token', 'admin123')
# Performance settings
self.batch_size = config.get('batch_size', 50)
self.send_interval = config.get('send_interval', 2.0)
# HTTP session reuse
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Token {self.token}',
'Content-Type': 'text/plain; charset=utf-8'
})
Batch Sending Mechanismο
def _sender_loop(self):
"""Main sending loop"""
batch = []
last_send_time = time.time()
while self.is_running:
try:
# Get data point
metric = self.data_queue.get(timeout=0.1)
batch.append(metric)
# Check sending conditions
current_time = time.time()
should_send = (
len(batch) >= self.batch_size or
(current_time - last_send_time) >= self.send_interval
)
if should_send and batch:
self._send_batch(batch)
batch.clear()
last_send_time = current_time
except queue.Empty:
# Timeout sending
if batch and (time.time() - last_send_time) >= self.send_interval:
self._send_batch(batch)
batch.clear()
last_send_time = time.time()
Sending Optimizationsο
| Optimization Strategy | Description | Configuration Parameter |
|---|---|---|
| Batch Size | Aggregate multiple data points before sending | batch_size=50 |
| Send Interval | Maximum sending interval time | send_interval=2.0 |
| Connection Reuse | HTTP session reuse | Automatic |
| Retry Mechanism | Exponential backoff retry after failure | Automatic |
π Data Query and Retrievalο
Basic Queriesο
1. Query CPU Usageο
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["_field"] == "usage_percent")
|> filter(fn: (r) => r["core"] == "cpu0")
2. Query Memory Usageο
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "memory")
|> filter(fn: (r) => r["_field"] == "usage_percent")
3. Query Network I/Oο
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "network")
|> filter(fn: (r) => r["interface"] == "eth0")
|> filter(fn: (r) => r["_field"] =~ /^(rx_bytes|tx_bytes)$/)
Advanced Queriesο
1. Aggregation Query (per-minute average)ο
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["_field"] == "usage_percent")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
2. Multi-host Comparisonο
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "memory")
|> filter(fn: (r) => r["_field"] == "usage_percent")
|> group(columns: ["host"])
3. Disk IOPS Calculationο
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "disk")
|> filter(fn: (r) => r["_field"] =~ /^(reads_completed|writes_completed)$/)
|> derivative(unit: 1s, nonNegative: true)
REST API Queriesο
Using curl for queriesο
# Query CPU data from the last 5 minutes
curl -H "Authorization: Token admin123" \
"http://localhost:8086/api/v2/query?org=benchmon" \
-d 'from(bucket:"metrics")|>range(start:-5m)|>filter(fn:(r)=>r["_measurement"]=="cpu")|>limit(n:10)'
Using Python for queriesο
import requests
def query_influxdb(query):
headers = {'Authorization': 'Token admin123'}
data = {'query': query, 'org': 'benchmon'}
response = requests.post('http://localhost:8086/api/v2/query',
headers=headers, data=data)
return response.text
# Query example
cpu_query = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["_field"] == "usage_percent")
'''
result = query_influxdb(cpu_query)
print(result)
Performance Optimization Recommendationsο
1. Collection Frequency Optimizationο
# High-frequency collection (suitable for real-time monitoring)
benchmon-run --system --grafana --interval 0.5 --grafana-send-interval 1.0 --grafana-batch-size 100
# Low-frequency collection (suitable for long-term trends)
benchmon-run --system --grafana --interval 10.0 --grafana-send-interval 30.0 --grafana-batch-size 50
2. Batch Size Tuningο
| Scenario | Batch Size | Send Interval | Use Case |
|---|---|---|---|
| Real-time Monitoring | 10-20 | 1.0s | Low latency required |
| Standard Monitoring | 50-100 | 2.0s | Balance performance and latency |
| Batch Processing | 200-500 | 5.0s | Maximum throughput |
3. Network Optimizationο
# InfluxDB sender configuration
config = {
'batch_size': 100, # Reduce network requests
'send_interval': 2.0, # Balance latency and throughput
'connection_timeout': 30, # Connection timeout
'read_timeout': 60 # Read timeout
}
Troubleshootingο
1. Data Not Reaching InfluxDBο
Check Named Pipe:
# Check if pipe is created
ls -la /tmp/benchmon_data_pipe
# Check pipe data flow
tail -f /tmp/benchmon_data_pipe
Check HP Processor:
# View processor logs
grep "hp_processor" /var/log/benchmon.log
# Check processing statistics
grep "data points" /var/log/benchmon.log
2. InfluxDB Connection Issuesο
Test Connection:
# Test InfluxDB health status
curl http://localhost:8086/health
# Test authentication
curl -H "Authorization: Token admin123" \
http://localhost:8086/api/v2/buckets
Check Sender Status:
# View sender logs
grep "InfluxDBSender" /var/log/benchmon.log
# Check error messages
grep "ERROR.*influx" /var/log/benchmon.log
3. Performance Issue Diagnosisο
Monitor Resource Usage:
# View benchmon process resource usage
ps aux | grep benchmon
# View pipe buffer
lsof | grep benchmon_data_pipe
# View network connections
netstat -an | grep 8086
Tuning Recommendations:
Increase batch size to reduce network overhead
Increase send interval to reduce CPU usage
Use SSD storage to improve I/O performance
Optimize network configuration to reduce latency
Monitoring Metrics Summaryο
Supported Measurement Typesο
| Measurement Type | Tags | Fields | Update Frequency |
|---|---|---|---|
| cpu | host, core | user, nice, system, idle, iowait, irq, softirq, steal | 1Hz |
| cpufreq | host, core | frequency | 1Hz |
| memory | host | total, free, available, buffers, cached, usage_percent | 1Hz |
| network | host, interface | rx_bytes, rx_packets, tx_bytes, tx_packets, rx_errors, tx_errors | 1Hz |
| disk | host, device | reads_completed, writes_completed, sectors_read, sectors_written | 1Hz |
| infiniband | host, interface, port, metric | value | 1Hz |
Data Retention Policyο
Recommended InfluxDB data retention policies:
// High-precision data retention for 7 days
option task = {name: "retention-7d", every: 1d}
from(bucket: "metrics")
|> range(start: -7d)
|> aggregateWindow(every: 1h, fn: mean)
|> to(bucket: "metrics-hourly")
// Hourly data retention for 30 days
// Daily data retention for 1 year
Summaryο
Through this tutorial, you should now have mastered:
InfluxDB Fundamentals: Understanding core concepts of time-series databases
Data Collection Mechanisms: Understanding various system metric collection methods
High-Performance Processing: Mastering optimization techniques like named pipes and batch sending
Data Querying: Learning to use Flux language for data retrieval
Performance Optimization: Understanding various tuning strategies and best practices
Benchmonβs InfluxDB integration provides a high-performance, scalable monitoring solution that can meet various needs from real-time monitoring to large-scale data analysis.