Source code for monitormetrics.cpumetrics.cpumetrics

"""This file initiates the script to extract real time cpu metrics"""

import os
import logging
import time
import copy
import multiprocessing
import threading
import psutil


from monitormetrics.utils.utils import execute_cmd, get_parser, ibstat_ports, \
    get_rapl_devices, get_mem_bw_event, dump_json, proc_if_running
from monitormetrics._version import __version__

_log = logging.LoggerAdapter(logging.getLogger(__name__),
                             {'version': __version__})

# Infiniband metrics API directory
IB_API_AIR = "/sys/class/infiniband"

# Cache line - 64 bytes
CACHE_LINE = 64.0

# Measuring time for memory bw event
MEM_BW_MEASURE_TIME = 2


[docs]class MonitorCPUMetrics(multiprocessing.Process): """Engine to monitor cpu related metrics""" # pylint: disable=too-many-instance-attributes def __init__(self, config): """Initialize setup""" # must call this before anything else multiprocessing.Process.__init__(self) self.config = config.copy() self.check_point_time = self.config['check_point'] # Every 900 sec the data is dumped into file self.user = os.environ['USER'] self._extra = {} # A non zero interval is needed to get process cpu percent self.cpu_aggregation_interval = 0.5 # Infiniband ports self.ib_ports = {} # Child processes PIDs self.all_child_pids = [] # Name of the output files self.outfile = os.path.join(self.config['temp_path']['cpu_metrics'], "_".join([str(self.config['job_id']), self.config['host_name']]) + ".json")
[docs] def initialise_cpu_metrics_params(self): """This method initialises the CPU metric related parameters""" # Schema for the cpu data self.cpu_data = { 'host_name': self.config['host_name'], 'sampling_frequency': self.config['sampling_freq'], # 'parent_proc_md': [], # 'child_proc_md': [], 'time_stamps': [], 'num_threads': [], 'cpu_time': [], 'cpu_percent': [], 'cpu_percent_sys': [], 'memory_percent': [], 'memory_info': { 'rss': [], 'vms': [], 'shared': [] }, 'memory_full_info': { 'uss': [], 'swap': [] }, 'io_counters': { 'read_count': [], 'write_count': [], 'read_bytes': [], 'write_bytes': [] }, 'net_io_counters': { 'bytes_sent': [], 'bytes_recv': [], 'packets_sent': [], 'packets_recv': [] }, 'num_fds': [] } # Add additionally found IB, RAPL, BW metrics to dict as keys to store values self.add_metrics_cpu_parameters()
[docs] def add_ib_counters_to_dict(self): """Add IB counters to base dict""" self.cpu_data = { **self.cpu_data, 'ib_io_counters': { 'port_xmit_data': [], 'port_xmit_packets': [], 'port_rcv_data': [], 'port_rcv_packets': [] }, }
[docs] def add_mem_bw_to_dict(self): """Add memory bandwidth to base dict""" self.cpu_data = { **self.cpu_data, 'memory_bw': [], }
[docs] def add_rapl_domains_to_dict(self): """Add RAPL domain names to base dict""" rapl_domains = {} for dom_name, _ in self.rapl_devices.items(): rapl_domains[dom_name] = [] self.cpu_data = { **self.cpu_data, 'rapl_powercap': rapl_domains, }
[docs] def add_metrics_cpu_parameters(self): """This method adds metrics key/value pair in cpu parameter dict""" # IB if self.ib_ports: self.add_ib_counters_to_dict() # RAPL if self.rapl_devices: self.add_rapl_domains_to_dict() # Memory BW if self.mem_bw_event: self.add_mem_bw_to_dict()
[docs] def check_availability_ib_rapl_membw(self): """This method checks if infiniband and RAPL metrics are available""" # Check if infiniband is present self.ib_ports = ibstat_ports() if self.ib_ports: _log.info("Infiniband support detected. Collecting IB port metrics.") # Check if RAPL powercap metrics are accessible self.rapl_devices = get_rapl_devices() if self.rapl_devices: _log.info("RAPL powercap interface is accessible. Collecting energy metrics.") # Check if we can get LLC cache miss to estimate memory BW self.mem_bw_event = get_mem_bw_event() if self.mem_bw_event: _log.info("Perf event found for monitoring memory bandwidth.")
[docs] def check_metric_data(self): """This method checks if all the metric data is consistent with number of timestamps""" # Get number of timestamps added into the data struct ntime_stamps = len(self.cpu_data['time_stamps']) # Loop over all the metrics for _, data in self.cpu_data.items(): if isinstance(data, dict): for _, values in data.items(): # We are adding the last monitored value as missing metric. Should not be an # issue as it is end of the execution. if values and len(values) < ntime_stamps: values += [values[-1] for _ in range(ntime_stamps - len(values))] elif isinstance(data, list): if data and len(data) < ntime_stamps: data += [data[-1] for _ in range(ntime_stamps - len(data))]
# def get_process_meta_data(self, child_proc, process_kind): # """This method gets the meta data of a process""" # # if process_kind == "parent": # proc = self.proc # elif process_kind == "child": # proc = child_proc # # # A dict with all the meta data information of the process # proc_md = { # 'pid': proc.pid, # 'name': proc.name(), # 'exe': proc.exe(), # 'environ': proc.environ(), # 'command': proc.cmdline(), # 'create_time': proc.create_time(), # 'nice': proc.nice(), # 'ionice': { # 'class': proc.ionice()[0], # 'value': proc.ionice()[1] # } # } # # # Include env info only for parent. For children processes, it will be same and we avoid # # duplication # if process_kind == "parent": # self.cpu_data['parent_proc_md'] = proc_md # elif process_kind == "child": # proc_md.pop('environ', None) # self.cpu_data['child_proc_md'].append(proc_md) # # return proc_md # def add_child_process_meta_data(self): # """This method adds meta data of child processes if not already present""" # # # Add child process' meta data info # for proc in self.proc: # for child_proc in proc.children(recursive=True): # if child_proc.pid not in self.all_child_pids and child_proc.username() == self.user: # self.all_child_pids.append(child_proc.pid) # self.get_process_meta_data(child_proc, "child")
[docs] def get_cpu_time_from_parent_and_childs(self): """This method gets cumulative CPU time from parent and its childs""" # CPU Time of the parent process cpu_time_pp = 0 for proc in self.procs: cpu_time_pp += proc.cpu_times().user for child_proc in proc.children(recursive=True): if child_proc.username() == self.user: # Add CPU times of childs to parent cpu_time_pp += child_proc.cpu_times().user # Finally add CPU times to data dict self.cpu_data['cpu_time'].append(cpu_time_pp)
[docs] def get_cpu_percent(self): """This method gives CPU percent of parent and its childs""" # Get CPU percent for the process and its childs proc_cpu = 0 for proc in self.procs: proc_cpu += proc.cpu_percent(interval=self.cpu_aggregation_interval) for child_proc in proc.children(recursive=True): if child_proc.username() == self.user: proc_cpu += child_proc.cpu_percent(interval=self.cpu_aggregation_interval) self.cpu_data['cpu_percent'].append(proc_cpu)
[docs] def get_sys_wide_net_io_counters(self): """This method gets the system wide network IO counters""" # Get system wide network IO counters. for key, value in self.cpu_data['net_io_counters'].items(): value.append(getattr(psutil.net_io_counters(pernic=False, nowrap=True), key))
[docs] def get_ib_io_counters(self): """This method gets the Infiniband port counters""" # Get Infiniband IO counters. This includes RoCE as well on p3-alaska for key, value in self.cpu_data['ib_io_counters'].items(): cum_value = 0 for name, port in self.ib_ports.items(): counter_path = os.path.join(IB_API_AIR, name, "ports", port, "counters", key) if os.path.isfile(counter_path): cmd_str = " ".join(["cat", counter_path]) cmd_out = execute_cmd(cmd_str) try: # Data port counters indicate octets divided by 4 rather than just octets. counter_value = int(cmd_out) except ValueError: counter_value = 0 # Multiply by four to calculate bytes cum_value += counter_value * 4 value.append(cum_value)
[docs] def get_cumulative_metric_value(self, metric_type): """This method gets cumulative metric account for all childs for a given metric type""" # Check if metric_type is dict or list if isinstance(self.cpu_data[metric_type], dict): metric_value = 0 for key, value in self.cpu_data[metric_type].items(): for proc in self.procs: obj_attr = getattr(proc, metric_type)() metric_value += getattr(obj_attr, key) for child_proc in proc.children(recursive=True): if child_proc.username() == self.user: metric_value += getattr(getattr(child_proc, metric_type)(), key) value.append(metric_value) elif isinstance(self.cpu_data[metric_type], list): metric_value = 0 for proc in self.procs: metric_value += getattr(proc, metric_type)() for child_proc in proc.children(recursive=True): if child_proc.username() == self.user: metric_value += getattr(child_proc, metric_type)() self.cpu_data[metric_type].append(metric_value)
[docs] def get_memory_bandwidth(self): """This method returns memory bandwidth based on perf LLC load misses event""" # Get all child processes pids = self.config['pid'] + [child_proc.pid for p in self.procs for child_proc in p.children(recursive=True) if child_proc.username() == self.user] # Command string to probe for LLC load misses cmd_str = "perf stat -e {} -p {} sleep {}".format(self.mem_bw_event, ",".join([str(pid) for pid in pids]), MEM_BW_MEASURE_TIME) # Execute command cmd_out = execute_cmd(cmd_str) parse_perf_out = get_parser(cmd_out, reg="perf") # Parse the value of LLC Local miss llc_local_miss_string = parse_perf_out("LLC_MISS_LOCAL") # Parse the value of LLC remote miss llc_remote_miss_string = parse_perf_out("LLC_MISS_REMOTE") # Convert load misses to memory bandwidth try: llc_local_miss_data = int(llc_local_miss_string.rstrip().replace(",", "")) llc_remote_miss_data = int(llc_remote_miss_string.rstrip().replace(",", "")) mem_bw = (llc_local_miss_data + llc_remote_miss_data) * CACHE_LINE \ / MEM_BW_MEASURE_TIME except ValueError: mem_bw = 0 self.cpu_data['memory_bw'].append(mem_bw)
[docs] def get_misc_metrics(self): """This method gets IO, file descriptors and thread count""" # Get cumulative IO counters for the process and its childs self.get_cumulative_metric_value('io_counters') # Get cumulative number of threads for the process and its childs self.get_cumulative_metric_value('num_threads') # Get cumulative number of file descriptors for the process and its childs self.get_cumulative_metric_value('num_fds')
[docs] def get_energy_metrics(self): """This method gets energy metrics from RAPL powercap interface""" # Get package/domain names and paths to find energy for dom_name, engy_path in self.rapl_devices.items(): cmd_str = " ".join(["cat", engy_path]) cmd_out = execute_cmd(cmd_str) try: # Units are micro Joules. Convert them to Joules in post processing engy_value = int(cmd_out) except ValueError: engy_value = 0 if dom_name not in self.cpu_data['rapl_powercap'].keys(): self.cpu_data['rapl_powercap'][dom_name] = [] self.cpu_data['rapl_powercap'][dom_name].append(engy_value)
[docs] def get_memory_usage(self): """This method gets memory usage""" # Get cumulative memory info for the process and its childs self.get_cumulative_metric_value('memory_info') # Get cumulative full memory info (uss and shared) for the process and its childs self.get_cumulative_metric_value('memory_full_info') # Get cumulative memory percent for the process and its childs self.get_cumulative_metric_value('memory_percent') # Get bandwidth info if perf event is found if self.mem_bw_event: self.get_memory_bandwidth()
[docs] def get_network_traffic(self): """Get network traffic from TCP and Infiniband (if supported)""" # System wide net IO counters self.get_sys_wide_net_io_counters() # Infiniband Io counters if self.ib_ports: self.get_ib_io_counters()
[docs] def get_cpu_usage(self): """This method gets all CPU usage statistics""" # Add child process' meta data info # self.add_child_process_meta_data() # Add CPU times of parent and children (proc.cpu_times().user + proc.cpu_times( # ).children_user) is not giving correct CPU times self.get_cpu_time_from_parent_and_childs() # Get CPU percent for the process and its children self.get_cpu_percent() # System wide CPU percent self.cpu_data['cpu_percent_sys'].append(psutil.cpu_percent())
[docs] def get_metrics_data(self): """Extract metrics data""" # CPU usage statistics self.get_cpu_usage() # Network traffic counters self.get_network_traffic() # Memory usage statistics self.get_memory_usage() # Energy usage statistics self.get_energy_metrics() # Misc statistics self.get_misc_metrics()
[docs] def add_timestamp(self): """This method adds timestamp to the data""" # Get time stamp and convert it to int. We are not looking at fine grained monitoring here self.cpu_data['time_stamps'].append(int(time.time()))
[docs] def dump_metrics(self): """Dump metrics to JSON file and re-initiate cpu_metrics dict""" # Append data to existing JSON file or create a new file if it doesnt exist # Start a new thread async to dump data to the file dump_metrics_thread = threading.Thread(target=dump_json, args=(copy.deepcopy(self.cpu_data), self.outfile)) dump_metrics_thread.start() # Re-initialise CPU metric parameters self.initialise_cpu_metrics_params()
[docs] def run(self): """This method extracts the cpu related metrics for a given pid""" _log.info("Collection of CPU metrics has started") sampling_freq = self.config['sampling_freq'] _log.debug("Current sampling frequency is %d", sampling_freq) # Number of steps before writing the data to file # To be implemented in the future check_point_step = int(self.check_point_time / sampling_freq) # Step count i_step = 0 # Get process information self.procs = [psutil.Process(p) for p in self.config['pid']] # Add parent process meta data # self.get_process_meta_data("", "parent") # Check if infiniband, RAPL and memory BW is available to monitor self.check_availability_ib_rapl_membw() # Initialise CPU metric parameters self.initialise_cpu_metrics_params() while proc_if_running(self.procs) and \ open(self.config['ipc_file'], "r").read().rstrip() == "INPROGRESS": try: # Start measuring time taken for getting metrics start_monitor = time.time() # Add current timestamp to the list of timestamps self.add_timestamp() # Get metrics data self.get_metrics_data() # Dump metrics if check point is reached if i_step % check_point_step == 0: self.dump_metrics() # Get total time elapsed to get metrics collection_time = time.time() - start_monitor # Sleep for given sampling frequency before collecting for next round # Here we remove the time taken to collect metrics from sampling frequency to # keep frequency fairly constant try: time.sleep(sampling_freq - collection_time) except ValueError: # when sampling_freq - collection_time < 0 pass i_step += 1 except (psutil.NoSuchProcess, psutil.AccessDenied): # Check if all metrics have data for added timestamps self.check_metric_data() # If the given pid terminates dump metrics to the disk # write_json(self.cpu_data, self.outfile) dump_json(self.cpu_data, self.outfile) return # Dump metrics to the disk dump_json(self.cpu_data, self.outfile)