Source code for perfmon.core.metrics.cpu

"""This file contains base class to monitor CPU metrics"""

import os
import logging
import time
import copy
import multiprocessing
import psutil

from perfmon.schemas.metrics import cpu_data_schema
from perfmon.core.metrics.cpumetrics.usage import get_cpu_time
from perfmon.core.metrics.cpumetrics.usage import get_cpu_percent
from perfmon.core.metrics.cpumetrics.network import network_io_counters
from perfmon.core.metrics.cpumetrics.network import ib_io_counters
from perfmon.core.metrics.cpumetrics.memory import memory_usage
from perfmon.core.metrics.cpumetrics.energy import rapl_energy_readings
from perfmon.core.metrics.cpumetrics.misc import misc_metrics
from perfmon.core.metrics.common import dump_metrics_async
from perfmon.core.metrics.common import get_child_procs
from perfmon.common.utils.json_wrappers import dump_json
from perfmon.common.utils.devices import ibstat_ports
from perfmon.common.utils.devices import get_rapl_devices
from perfmon.common.utils.process import proc_if_running
from perfmon.common.perf import get_mem_bw_event

_log = logging.getLogger(__name__)

# pylint: disable=E0401,W0201,C0301


[docs]class MonitorCpuUsage(multiprocessing.Process): """Engine to monitor cpu related metrics""" # pylint: disable=too-many-instance-attributes def __init__(self, config): """Initialize setup""" # must call this before anything else multiprocessing.Process.__init__(self) self.config = config.copy() # Every 900 sec the data is dumped into file self.check_point_time = self.config['check_point'] self.user = os.environ['USER'] self._extra = {} # A non zero interval is needed to get process cpu percent self.cpu_aggregation_interval = 0.5 # Infiniband ports self.ib_ports = {} # Child processes PIDs self.all_child_pids = [] # Name of the output files self.outfile = os.path.join( self.config['temp_path']['cpu_metrics'], '_'.join([str(self.config['job_id']), self.config['host_name']]) + '.json', )
[docs] def initialise_cpu_metrics_params(self): """This method initialises the CPU metric related parameters""" # Schema for the cpu data self.cpu_data = { 'host_name': self.config['host_name'], 'sampling_frequency': self.config['sampling_freq'], **copy.deepcopy(cpu_data_schema), # Need deepcopy for re-init } # Add additionally found IB, RAPL, BW metrics to dict as keys to store values self.add_metrics_cpu_parameters()
[docs] def add_ib_counters_to_dict(self): """Add IB counters to base dict""" self.cpu_data = { **self.cpu_data, 'ib_io_counters': { 'port_xmit_data': [], 'port_xmit_packets': [], 'port_rcv_data': [], 'port_rcv_packets': [], }, }
[docs] def add_mem_bw_to_dict(self): """Add memory bandwidth to base dict""" self.cpu_data = { **self.cpu_data, 'memory_bw': [], }
[docs] def add_rapl_domains_to_dict(self): """Add RAPL domain names to base dict""" rapl_domains = {} for dom_name, _ in self.rapl_devices.items(): rapl_domains[dom_name] = [] self.cpu_data = { **self.cpu_data, 'rapl_powercap': rapl_domains, }
[docs] def add_metrics_cpu_parameters(self): """This method adds metrics key/value pair in cpu parameter dict""" # IB if self.ib_ports: self.add_ib_counters_to_dict() # RAPL if self.rapl_devices: self.add_rapl_domains_to_dict() # Memory BW if self.mem_bw_event: self.add_mem_bw_to_dict()
[docs] def check_availability_ib_rapl_membw(self): """This method checks if infiniband and RAPL metrics are available""" # Check if infiniband is present self.ib_ports = ibstat_ports() if self.ib_ports: _log.info('Infiniband support detected. Collecting IB port metrics.') # Check if RAPL powercap metrics are accessible self.rapl_devices = get_rapl_devices() if self.rapl_devices: _log.info('RAPL powercap interface is accessible. Collecting energy metrics.') # Check if we can get LLC cache miss to estimate memory BW self.mem_bw_event = get_mem_bw_event() if self.mem_bw_event: _log.info('Perf event found for monitoring memory bandwidth.')
[docs] def get_misc_metrics(self): """This method gets IO, file descriptors and thread count""" # # Get cumulative IO counters for the process and its childs self.cpu_data = misc_metrics(self.all_procs, self.cpu_data)
[docs] def get_energy_metrics(self): """This method gets energy metrics from RAPL powercap interface""" self.cpu_data = rapl_energy_readings(self.rapl_devices, self.cpu_data)
[docs] def get_memory_usage(self): """This method gets memory usage""" self.cpu_data = memory_usage(self.mem_bw_event, self.all_procs, self.cpu_data)
[docs] def get_network_traffic(self): """Get network traffic from TCP and Infiniband (if supported)""" # System wide net IO counters self.cpu_data = network_io_counters(self.cpu_data) # Infiniband Io counters if self.ib_ports: self.cpu_data = ib_io_counters(self.ib_ports, self.cpu_data)
[docs] def get_cpu_usage(self): """This method gets all CPU usage statistics""" # Add CPU times of parent and children (proc.cpu_times().user + proc.cpu_times( # ).children_user) is not giving correct CPU times # self.get_cpu_time_from_parent_and_childs() self.cpu_data['cpu_time'].append(get_cpu_time(self.all_procs)) # Get CPU percent for the process and its children # self.get_cpu_percent() self.cpu_data['cpu_percent'].append( get_cpu_percent(self.cpu_aggregation_interval, self.all_procs) ) # System wide CPU percent self.cpu_data['cpu_percent_sys'].append(psutil.cpu_percent())
[docs] def get_metrics_data(self): """Extract metrics data""" # CPU usage statistics self.get_cpu_usage() # Network traffic counters self.get_network_traffic() # Memory usage statistics self.get_memory_usage() # Energy usage statistics self.get_energy_metrics() # # # Misc statistics self.get_misc_metrics()
[docs] def add_timestamp(self): """This method adds timestamp to the data""" # Get time stamp and convert it to int. We are not looking at fine grained monitoring here self.cpu_data['time_stamps'].append(int(time.time()))
[docs] def dump_metrics(self): """Dump metrics to JSON file and re-initiate cpu_metrics dict""" dump_metrics_async(copy.deepcopy(self.cpu_data), self.outfile) # Re-initialise CPU metric parameters self.initialise_cpu_metrics_params()
[docs] def run(self): """This method extracts the cpu related metrics for a given pid""" _log.info('Collection of CPU metrics has started') sampling_freq = self.config['sampling_freq'] _log.debug('Current sampling frequency is %d', sampling_freq) # Number of steps before writing the data to file # To be implemented in the future check_point_step = int(self.check_point_time / sampling_freq) # Step count i_step = 0 # Get process information self.procs = [psutil.Process(p) for p in self.config['pid']] # Check if infiniband, RAPL and memory BW is available to monitor self.check_availability_ib_rapl_membw() # Initialise CPU metric parameters self.initialise_cpu_metrics_params() while ( proc_if_running(self.procs) and open(self.config['ipc_file'], 'r').read().rstrip() == 'INPROGRESS' ): try: # Start measuring time taken for getting metrics start_monitor = time.time() # Add current timestamp to the list of timestamps self.add_timestamp() # Add children to procs list self.all_procs = get_child_procs(self.user, self.procs) # Get metrics data self.get_metrics_data() # Dump metrics if check point is reached if i_step % check_point_step == 0: self.dump_metrics() # Get total time elapsed to get metrics collection_time = time.time() - start_monitor # Sleep for given sampling frequency before collecting for next round # Here we remove the time taken to collect metrics from sampling frequency to # keep frequency fairly constant try: time.sleep(sampling_freq - collection_time) except ValueError: # when sampling_freq - collection_time < 0 pass i_step += 1 except (psutil.NoSuchProcess, psutil.AccessDenied): # Check if all metrics have data for added timestamps # self.cpu_data = check_metric_data(self.cpu_data) # If the given pid terminates dump metrics to the disk # write_json(self.cpu_data, self.outfile) dump_json(self.cpu_data, self.outfile) return # Dump metrics to the disk dump_json(self.cpu_data, self.outfile)