"""This file contains base class to monitor CPU metrics"""
import os
import logging
import time
import copy
import multiprocessing
import psutil
from perfmon.schemas.metrics import cpu_data_schema
from perfmon.core.metrics.cpumetrics.usage import get_cpu_time
from perfmon.core.metrics.cpumetrics.usage import get_cpu_percent
from perfmon.core.metrics.cpumetrics.network import network_io_counters
from perfmon.core.metrics.cpumetrics.network import ib_io_counters
from perfmon.core.metrics.cpumetrics.memory import memory_usage
from perfmon.core.metrics.cpumetrics.energy import rapl_energy_readings
from perfmon.core.metrics.cpumetrics.misc import misc_metrics
from perfmon.core.metrics.common import dump_metrics_async
from perfmon.core.metrics.common import get_child_procs
from perfmon.common.utils.json_wrappers import dump_json
from perfmon.common.utils.devices import ibstat_ports
from perfmon.common.utils.devices import get_rapl_devices
from perfmon.common.utils.process import proc_if_running
from perfmon.common.perf import get_mem_bw_event
_log = logging.getLogger(__name__)
# pylint: disable=E0401,W0201,C0301
[docs]class MonitorCpuUsage(multiprocessing.Process):
"""Engine to monitor cpu related metrics"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config):
"""Initialize setup"""
# must call this before anything else
multiprocessing.Process.__init__(self)
self.config = config.copy()
# Every 900 sec the data is dumped into file
self.check_point_time = self.config['check_point']
self.user = os.environ['USER']
self._extra = {}
# A non zero interval is needed to get process cpu percent
self.cpu_aggregation_interval = 0.5
# Infiniband ports
self.ib_ports = {}
# Child processes PIDs
self.all_child_pids = []
# Name of the output files
self.outfile = os.path.join(
self.config['temp_path']['cpu_metrics'],
'_'.join([str(self.config['job_id']), self.config['host_name']]) + '.json',
)
[docs] def initialise_cpu_metrics_params(self):
"""This method initialises the CPU metric related parameters"""
# Schema for the cpu data
self.cpu_data = {
'host_name': self.config['host_name'],
'sampling_frequency': self.config['sampling_freq'],
**copy.deepcopy(cpu_data_schema), # Need deepcopy for re-init
}
# Add additionally found IB, RAPL, BW metrics to dict as keys to store values
self.add_metrics_cpu_parameters()
[docs] def add_ib_counters_to_dict(self):
"""Add IB counters to base dict"""
self.cpu_data = {
**self.cpu_data,
'ib_io_counters': {
'port_xmit_data': [],
'port_xmit_packets': [],
'port_rcv_data': [],
'port_rcv_packets': [],
},
}
[docs] def add_mem_bw_to_dict(self):
"""Add memory bandwidth to base dict"""
self.cpu_data = {
**self.cpu_data,
'memory_bw': [],
}
[docs] def add_rapl_domains_to_dict(self):
"""Add RAPL domain names to base dict"""
rapl_domains = {}
for dom_name, _ in self.rapl_devices.items():
rapl_domains[dom_name] = []
self.cpu_data = {
**self.cpu_data,
'rapl_powercap': rapl_domains,
}
[docs] def add_metrics_cpu_parameters(self):
"""This method adds metrics key/value pair in cpu parameter dict"""
# IB
if self.ib_ports:
self.add_ib_counters_to_dict()
# RAPL
if self.rapl_devices:
self.add_rapl_domains_to_dict()
# Memory BW
if self.mem_bw_event:
self.add_mem_bw_to_dict()
[docs] def check_availability_ib_rapl_membw(self):
"""This method checks if infiniband and RAPL metrics are available"""
# Check if infiniband is present
self.ib_ports = ibstat_ports()
if self.ib_ports:
_log.info('Infiniband support detected. Collecting IB port metrics.')
# Check if RAPL powercap metrics are accessible
self.rapl_devices = get_rapl_devices()
if self.rapl_devices:
_log.info('RAPL powercap interface is accessible. Collecting energy metrics.')
# Check if we can get LLC cache miss to estimate memory BW
self.mem_bw_event = get_mem_bw_event()
if self.mem_bw_event:
_log.info('Perf event found for monitoring memory bandwidth.')
[docs] def get_misc_metrics(self):
"""This method gets IO, file descriptors and thread count"""
# # Get cumulative IO counters for the process and its childs
self.cpu_data = misc_metrics(self.all_procs, self.cpu_data)
[docs] def get_energy_metrics(self):
"""This method gets energy metrics from RAPL powercap interface"""
self.cpu_data = rapl_energy_readings(self.rapl_devices, self.cpu_data)
[docs] def get_memory_usage(self):
"""This method gets memory usage"""
self.cpu_data = memory_usage(self.mem_bw_event, self.all_procs, self.cpu_data)
[docs] def get_network_traffic(self):
"""Get network traffic from TCP and Infiniband (if supported)"""
# System wide net IO counters
self.cpu_data = network_io_counters(self.cpu_data)
# Infiniband Io counters
if self.ib_ports:
self.cpu_data = ib_io_counters(self.ib_ports, self.cpu_data)
[docs] def get_cpu_usage(self):
"""This method gets all CPU usage statistics"""
# Add CPU times of parent and children (proc.cpu_times().user + proc.cpu_times(
# ).children_user) is not giving correct CPU times
# self.get_cpu_time_from_parent_and_childs()
self.cpu_data['cpu_time'].append(get_cpu_time(self.all_procs))
# Get CPU percent for the process and its children
# self.get_cpu_percent()
self.cpu_data['cpu_percent'].append(
get_cpu_percent(self.cpu_aggregation_interval, self.all_procs)
)
# System wide CPU percent
self.cpu_data['cpu_percent_sys'].append(psutil.cpu_percent())
[docs] def get_metrics_data(self):
"""Extract metrics data"""
# CPU usage statistics
self.get_cpu_usage()
# Network traffic counters
self.get_network_traffic()
# Memory usage statistics
self.get_memory_usage()
# Energy usage statistics
self.get_energy_metrics()
#
# # Misc statistics
self.get_misc_metrics()
[docs] def add_timestamp(self):
"""This method adds timestamp to the data"""
# Get time stamp and convert it to int. We are not looking at fine grained monitoring here
self.cpu_data['time_stamps'].append(int(time.time()))
[docs] def dump_metrics(self):
"""Dump metrics to JSON file and re-initiate cpu_metrics dict"""
dump_metrics_async(copy.deepcopy(self.cpu_data), self.outfile)
# Re-initialise CPU metric parameters
self.initialise_cpu_metrics_params()
[docs] def run(self):
"""This method extracts the cpu related metrics for a given pid"""
_log.info('Collection of CPU metrics has started')
sampling_freq = self.config['sampling_freq']
_log.debug('Current sampling frequency is %d', sampling_freq)
# Number of steps before writing the data to file
# To be implemented in the future
check_point_step = int(self.check_point_time / sampling_freq)
# Step count
i_step = 0
# Get process information
self.procs = [psutil.Process(p) for p in self.config['pid']]
# Check if infiniband, RAPL and memory BW is available to monitor
self.check_availability_ib_rapl_membw()
# Initialise CPU metric parameters
self.initialise_cpu_metrics_params()
while (
proc_if_running(self.procs)
and open(self.config['ipc_file'], 'r').read().rstrip() == 'INPROGRESS'
):
try:
# Start measuring time taken for getting metrics
start_monitor = time.time()
# Add current timestamp to the list of timestamps
self.add_timestamp()
# Add children to procs list
self.all_procs = get_child_procs(self.user, self.procs)
# Get metrics data
self.get_metrics_data()
# Dump metrics if check point is reached
if i_step % check_point_step == 0:
self.dump_metrics()
# Get total time elapsed to get metrics
collection_time = time.time() - start_monitor
# Sleep for given sampling frequency before collecting for next round
# Here we remove the time taken to collect metrics from sampling frequency to
# keep frequency fairly constant
try:
time.sleep(sampling_freq - collection_time)
except ValueError: # when sampling_freq - collection_time < 0
pass
i_step += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Check if all metrics have data for added timestamps
# self.cpu_data = check_metric_data(self.cpu_data)
# If the given pid terminates dump metrics to the disk
# write_json(self.cpu_data, self.outfile)
dump_json(self.cpu_data, self.outfile)
return
# Dump metrics to the disk
dump_json(self.cpu_data, self.outfile)