"""This file initiates the script to extract real time cpu metrics"""
import os
import logging
import time
import copy
import multiprocessing
import threading
import psutil
from monitormetrics.utils.utils import execute_cmd, get_parser, ibstat_ports, \
get_rapl_devices, get_mem_bw_event, dump_json, proc_if_running
from monitormetrics._version import __version__
_log = logging.LoggerAdapter(logging.getLogger(__name__),
{'version': __version__})
# Infiniband metrics API directory
IB_API_AIR = "/sys/class/infiniband"
# Cache line - 64 bytes
CACHE_LINE = 64.0
# Measuring time for memory bw event
MEM_BW_MEASURE_TIME = 2
[docs]class MonitorCPUMetrics(multiprocessing.Process):
"""Engine to monitor cpu related metrics"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config):
"""Initialize setup"""
# must call this before anything else
multiprocessing.Process.__init__(self)
self.config = config.copy()
self.check_point_time = self.config['check_point'] # Every 900 sec the data is dumped into file
self.user = os.environ['USER']
self._extra = {}
# A non zero interval is needed to get process cpu percent
self.cpu_aggregation_interval = 0.5
# Infiniband ports
self.ib_ports = {}
# Child processes PIDs
self.all_child_pids = []
# Name of the output files
self.outfile = os.path.join(self.config['temp_path']['cpu_metrics'],
"_".join([str(self.config['job_id']),
self.config['host_name']]) + ".json")
[docs] def initialise_cpu_metrics_params(self):
"""This method initialises the CPU metric related parameters"""
# Schema for the cpu data
self.cpu_data = {
'host_name': self.config['host_name'],
'sampling_frequency': self.config['sampling_freq'],
# 'parent_proc_md': [],
# 'child_proc_md': [],
'time_stamps': [],
'num_threads': [],
'cpu_time': [],
'cpu_percent': [],
'cpu_percent_sys': [],
'memory_percent': [],
'memory_info': {
'rss': [],
'vms': [],
'shared': []
},
'memory_full_info': {
'uss': [],
'swap': []
},
'io_counters': {
'read_count': [],
'write_count': [],
'read_bytes': [],
'write_bytes': []
},
'net_io_counters': {
'bytes_sent': [],
'bytes_recv': [],
'packets_sent': [],
'packets_recv': []
},
'num_fds': []
}
# Add additionally found IB, RAPL, BW metrics to dict as keys to store values
self.add_metrics_cpu_parameters()
[docs] def add_ib_counters_to_dict(self):
"""Add IB counters to base dict"""
self.cpu_data = {
**self.cpu_data,
'ib_io_counters': {
'port_xmit_data': [],
'port_xmit_packets': [],
'port_rcv_data': [],
'port_rcv_packets': []
},
}
[docs] def add_mem_bw_to_dict(self):
"""Add memory bandwidth to base dict"""
self.cpu_data = {
**self.cpu_data,
'memory_bw': [],
}
[docs] def add_rapl_domains_to_dict(self):
"""Add RAPL domain names to base dict"""
rapl_domains = {}
for dom_name, _ in self.rapl_devices.items():
rapl_domains[dom_name] = []
self.cpu_data = {
**self.cpu_data,
'rapl_powercap': rapl_domains,
}
[docs] def add_metrics_cpu_parameters(self):
"""This method adds metrics key/value pair in cpu parameter dict"""
# IB
if self.ib_ports:
self.add_ib_counters_to_dict()
# RAPL
if self.rapl_devices:
self.add_rapl_domains_to_dict()
# Memory BW
if self.mem_bw_event:
self.add_mem_bw_to_dict()
[docs] def check_availability_ib_rapl_membw(self):
"""This method checks if infiniband and RAPL metrics are available"""
# Check if infiniband is present
self.ib_ports = ibstat_ports()
if self.ib_ports:
_log.info("Infiniband support detected. Collecting IB port metrics.")
# Check if RAPL powercap metrics are accessible
self.rapl_devices = get_rapl_devices()
if self.rapl_devices:
_log.info("RAPL powercap interface is accessible. Collecting energy metrics.")
# Check if we can get LLC cache miss to estimate memory BW
self.mem_bw_event = get_mem_bw_event()
if self.mem_bw_event:
_log.info("Perf event found for monitoring memory bandwidth.")
[docs] def check_metric_data(self):
"""This method checks if all the metric data is consistent with number of timestamps"""
# Get number of timestamps added into the data struct
ntime_stamps = len(self.cpu_data['time_stamps'])
# Loop over all the metrics
for _, data in self.cpu_data.items():
if isinstance(data, dict):
for _, values in data.items():
# We are adding the last monitored value as missing metric. Should not be an
# issue as it is end of the execution.
if values and len(values) < ntime_stamps:
values += [values[-1] for _ in range(ntime_stamps - len(values))]
elif isinstance(data, list):
if data and len(data) < ntime_stamps:
data += [data[-1] for _ in range(ntime_stamps - len(data))]
# def get_process_meta_data(self, child_proc, process_kind):
# """This method gets the meta data of a process"""
#
# if process_kind == "parent":
# proc = self.proc
# elif process_kind == "child":
# proc = child_proc
#
# # A dict with all the meta data information of the process
# proc_md = {
# 'pid': proc.pid,
# 'name': proc.name(),
# 'exe': proc.exe(),
# 'environ': proc.environ(),
# 'command': proc.cmdline(),
# 'create_time': proc.create_time(),
# 'nice': proc.nice(),
# 'ionice': {
# 'class': proc.ionice()[0],
# 'value': proc.ionice()[1]
# }
# }
#
# # Include env info only for parent. For children processes, it will be same and we avoid
# # duplication
# if process_kind == "parent":
# self.cpu_data['parent_proc_md'] = proc_md
# elif process_kind == "child":
# proc_md.pop('environ', None)
# self.cpu_data['child_proc_md'].append(proc_md)
#
# return proc_md
# def add_child_process_meta_data(self):
# """This method adds meta data of child processes if not already present"""
#
# # Add child process' meta data info
# for proc in self.proc:
# for child_proc in proc.children(recursive=True):
# if child_proc.pid not in self.all_child_pids and child_proc.username() == self.user:
# self.all_child_pids.append(child_proc.pid)
# self.get_process_meta_data(child_proc, "child")
[docs] def get_cpu_time_from_parent_and_childs(self):
"""This method gets cumulative CPU time from parent and its childs"""
# CPU Time of the parent process
cpu_time_pp = 0
for proc in self.procs:
cpu_time_pp += proc.cpu_times().user
for child_proc in proc.children(recursive=True):
if child_proc.username() == self.user:
# Add CPU times of childs to parent
cpu_time_pp += child_proc.cpu_times().user
# Finally add CPU times to data dict
self.cpu_data['cpu_time'].append(cpu_time_pp)
[docs] def get_cpu_percent(self):
"""This method gives CPU percent of parent and its childs"""
# Get CPU percent for the process and its childs
proc_cpu = 0
for proc in self.procs:
proc_cpu += proc.cpu_percent(interval=self.cpu_aggregation_interval)
for child_proc in proc.children(recursive=True):
if child_proc.username() == self.user:
proc_cpu += child_proc.cpu_percent(interval=self.cpu_aggregation_interval)
self.cpu_data['cpu_percent'].append(proc_cpu)
[docs] def get_sys_wide_net_io_counters(self):
"""This method gets the system wide network IO counters"""
# Get system wide network IO counters.
for key, value in self.cpu_data['net_io_counters'].items():
value.append(getattr(psutil.net_io_counters(pernic=False, nowrap=True), key))
[docs] def get_ib_io_counters(self):
"""This method gets the Infiniband port counters"""
# Get Infiniband IO counters. This includes RoCE as well on p3-alaska
for key, value in self.cpu_data['ib_io_counters'].items():
cum_value = 0
for name, port in self.ib_ports.items():
counter_path = os.path.join(IB_API_AIR, name, "ports", port, "counters", key)
if os.path.isfile(counter_path):
cmd_str = " ".join(["cat", counter_path])
cmd_out = execute_cmd(cmd_str)
try:
# Data port counters indicate octets divided by 4 rather than just octets.
counter_value = int(cmd_out)
except ValueError:
counter_value = 0
# Multiply by four to calculate bytes
cum_value += counter_value * 4
value.append(cum_value)
[docs] def get_cumulative_metric_value(self, metric_type):
"""This method gets cumulative metric account for all childs for a given metric type"""
# Check if metric_type is dict or list
if isinstance(self.cpu_data[metric_type], dict):
metric_value = 0
for key, value in self.cpu_data[metric_type].items():
for proc in self.procs:
obj_attr = getattr(proc, metric_type)()
metric_value += getattr(obj_attr, key)
for child_proc in proc.children(recursive=True):
if child_proc.username() == self.user:
metric_value += getattr(getattr(child_proc, metric_type)(), key)
value.append(metric_value)
elif isinstance(self.cpu_data[metric_type], list):
metric_value = 0
for proc in self.procs:
metric_value += getattr(proc, metric_type)()
for child_proc in proc.children(recursive=True):
if child_proc.username() == self.user:
metric_value += getattr(child_proc, metric_type)()
self.cpu_data[metric_type].append(metric_value)
[docs] def get_memory_bandwidth(self):
"""This method returns memory bandwidth based on perf LLC load misses event"""
# Get all child processes
pids = self.config['pid'] + [child_proc.pid for p in self.procs
for child_proc in p.children(recursive=True)
if child_proc.username() == self.user]
# Command string to probe for LLC load misses
cmd_str = "perf stat -e {} -p {} sleep {}".format(self.mem_bw_event,
",".join([str(pid) for pid in pids]),
MEM_BW_MEASURE_TIME)
# Execute command
cmd_out = execute_cmd(cmd_str)
parse_perf_out = get_parser(cmd_out, reg="perf")
# Parse the value of LLC Local miss
llc_local_miss_string = parse_perf_out("LLC_MISS_LOCAL")
# Parse the value of LLC remote miss
llc_remote_miss_string = parse_perf_out("LLC_MISS_REMOTE")
# Convert load misses to memory bandwidth
try:
llc_local_miss_data = int(llc_local_miss_string.rstrip().replace(",", ""))
llc_remote_miss_data = int(llc_remote_miss_string.rstrip().replace(",", ""))
mem_bw = (llc_local_miss_data + llc_remote_miss_data) * CACHE_LINE \
/ MEM_BW_MEASURE_TIME
except ValueError:
mem_bw = 0
self.cpu_data['memory_bw'].append(mem_bw)
[docs] def get_misc_metrics(self):
"""This method gets IO, file descriptors and thread count"""
# Get cumulative IO counters for the process and its childs
self.get_cumulative_metric_value('io_counters')
# Get cumulative number of threads for the process and its childs
self.get_cumulative_metric_value('num_threads')
# Get cumulative number of file descriptors for the process and its childs
self.get_cumulative_metric_value('num_fds')
[docs] def get_energy_metrics(self):
"""This method gets energy metrics from RAPL powercap interface"""
# Get package/domain names and paths to find energy
for dom_name, engy_path in self.rapl_devices.items():
cmd_str = " ".join(["cat", engy_path])
cmd_out = execute_cmd(cmd_str)
try:
# Units are micro Joules. Convert them to Joules in post processing
engy_value = int(cmd_out)
except ValueError:
engy_value = 0
if dom_name not in self.cpu_data['rapl_powercap'].keys():
self.cpu_data['rapl_powercap'][dom_name] = []
self.cpu_data['rapl_powercap'][dom_name].append(engy_value)
[docs] def get_memory_usage(self):
"""This method gets memory usage"""
# Get cumulative memory info for the process and its childs
self.get_cumulative_metric_value('memory_info')
# Get cumulative full memory info (uss and shared) for the process and its childs
self.get_cumulative_metric_value('memory_full_info')
# Get cumulative memory percent for the process and its childs
self.get_cumulative_metric_value('memory_percent')
# Get bandwidth info if perf event is found
if self.mem_bw_event:
self.get_memory_bandwidth()
[docs] def get_network_traffic(self):
"""Get network traffic from TCP and Infiniband (if supported)"""
# System wide net IO counters
self.get_sys_wide_net_io_counters()
# Infiniband Io counters
if self.ib_ports:
self.get_ib_io_counters()
[docs] def get_cpu_usage(self):
"""This method gets all CPU usage statistics"""
# Add child process' meta data info
# self.add_child_process_meta_data()
# Add CPU times of parent and children (proc.cpu_times().user + proc.cpu_times(
# ).children_user) is not giving correct CPU times
self.get_cpu_time_from_parent_and_childs()
# Get CPU percent for the process and its children
self.get_cpu_percent()
# System wide CPU percent
self.cpu_data['cpu_percent_sys'].append(psutil.cpu_percent())
[docs] def get_metrics_data(self):
"""Extract metrics data"""
# CPU usage statistics
self.get_cpu_usage()
# Network traffic counters
self.get_network_traffic()
# Memory usage statistics
self.get_memory_usage()
# Energy usage statistics
self.get_energy_metrics()
# Misc statistics
self.get_misc_metrics()
[docs] def add_timestamp(self):
"""This method adds timestamp to the data"""
# Get time stamp and convert it to int. We are not looking at fine grained monitoring here
self.cpu_data['time_stamps'].append(int(time.time()))
[docs] def dump_metrics(self):
"""Dump metrics to JSON file and re-initiate cpu_metrics dict"""
# Append data to existing JSON file or create a new file if it doesnt exist
# Start a new thread async to dump data to the file
dump_metrics_thread = threading.Thread(target=dump_json, args=(copy.deepcopy(self.cpu_data),
self.outfile))
dump_metrics_thread.start()
# Re-initialise CPU metric parameters
self.initialise_cpu_metrics_params()
[docs] def run(self):
"""This method extracts the cpu related metrics for a given pid"""
_log.info("Collection of CPU metrics has started")
sampling_freq = self.config['sampling_freq']
_log.debug("Current sampling frequency is %d", sampling_freq)
# Number of steps before writing the data to file
# To be implemented in the future
check_point_step = int(self.check_point_time / sampling_freq)
# Step count
i_step = 0
# Get process information
self.procs = [psutil.Process(p) for p in self.config['pid']]
# Add parent process meta data
# self.get_process_meta_data("", "parent")
# Check if infiniband, RAPL and memory BW is available to monitor
self.check_availability_ib_rapl_membw()
# Initialise CPU metric parameters
self.initialise_cpu_metrics_params()
while proc_if_running(self.procs) and \
open(self.config['ipc_file'], "r").read().rstrip() == "INPROGRESS":
try:
# Start measuring time taken for getting metrics
start_monitor = time.time()
# Add current timestamp to the list of timestamps
self.add_timestamp()
# Get metrics data
self.get_metrics_data()
# Dump metrics if check point is reached
if i_step % check_point_step == 0:
self.dump_metrics()
# Get total time elapsed to get metrics
collection_time = time.time() - start_monitor
# Sleep for given sampling frequency before collecting for next round
# Here we remove the time taken to collect metrics from sampling frequency to
# keep frequency fairly constant
try:
time.sleep(sampling_freq - collection_time)
except ValueError: # when sampling_freq - collection_time < 0
pass
i_step += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Check if all metrics have data for added timestamps
self.check_metric_data()
# If the given pid terminates dump metrics to the disk
# write_json(self.cpu_data, self.outfile)
dump_json(self.cpu_data, self.outfile)
return
# Dump metrics to the disk
dump_json(self.cpu_data, self.outfile)