Source code for perfmon.core.metrics.perfcounters

"""This file contains base class to monitor perf stat metrics"""

import os
import re
import logging
import time
import copy
import multiprocessing
import psutil

from perfmon.core.metrics.common import dump_metrics_async
from perfmon.core.metrics.common import get_child_procs
from perfmon.common.utils.execute_cmd import execute_cmd_pipe
from perfmon.common.utils.json_wrappers import dump_json
from perfmon.common.utils.process import proc_if_running
from perfmon.common.perf import get_working_perf_events

from perfmon.exceptions import PerfEventListNotFoundError

_log = logging.getLogger(__name__)

# pylint: disable=E0401,W0201,C0301


[docs]class MonitorPerfCounters(multiprocessing.Process): """Engine to extract performance metrics""" # pylint: disable=too-many-instance-attributes def __init__(self, config): """Initialize setup""" # must call this before anything else multiprocessing.Process.__init__(self) self.config = config.copy() # Every 900 sec the data is dumped into file self.check_point_time = self.config['check_point'] self.user = os.environ['USER'] self._extra = {} # Initialise perf_data dict self.perf_data = {} # Name of the output files self.outfile = os.path.join( self.config['temp_path']['perf_metrics'], '_'.join([str(self.config['job_id']), self.config['host_name']]) + '.json', )
[docs] def initialise_perf_metrics_data_dict(self): """This method initialises the perf metric related parameters""" # Schema for the perf data self.perf_data = { 'host_name': self.config['host_name'], 'sampling_frequency': self.config['sampling_freq'], 'time_stamps': [], 'hardware_events': {key: [] for key in self.perf_events['hardware_events'].keys()}, 'software_events': {key: [] for key in self.perf_events['software_events'].keys()}, 'L2': {key: [] for key in self.perf_events['L2'].keys()}, 'L2cache': {key: [] for key in self.perf_events['L2cache'].keys()}, 'L3': {key: [] for key in self.perf_events['L3'].keys()}, 'L3cache': {key: [] for key in self.perf_events['L3cache'].keys()}, 'FLOPS_SP': {key: [] for key in self.perf_events['FLOPS_SP'].keys()}, 'FLOPS_DP': {key: [] for key in self.perf_events['FLOPS_DP'].keys()}, } # Add derived metrics to the schema of the perf data for perf_group, derived_metric in self.derived_perf_metrics.items(): for name in derived_metric.keys(): self.perf_data[perf_group][name] = []
[docs] def dump_avail_perf_events(self): """Dump the available perf event list for later use""" # Save the schema of available perf events for later use if self.config['master_node'] == self.config['node_name']: _log.debug('Saving the available perf event list to file') dump_json(self.perf_data, self.config['perf_event_file'])
[docs] def set_up_perf_events(self): """This method checks for available perf events, tests them and initialise the data dict""" # Get available perf events for the processor self.perf_events, self.derived_perf_metrics = get_working_perf_events() # Check if perf_events is not empty if not self.perf_events: raise PerfEventListNotFoundError("No perf events found") # Initialise the metric data dict self.initialise_perf_metrics_data_dict() # Dump the available perf event list self.dump_avail_perf_events()
[docs] def get_list_of_pids(self): """This method gets the list of pids to monitor by adding children pids to parents""" self.pids = [p.pid for p in get_child_procs(self.user, self.procs)]
[docs] def add_timestamp(self): """This method adds timestamp to the data""" # Get time stamp and convert it to int. We are not looking at fine grained monitoring here # We add time stamp for previous period at the beginning of new period. So we subtract # aggregation interval from current time stamp self.perf_data['time_stamps'].append(int(time.time()) - self.aggregation_interval)
[docs] def make_perf_command(self): """This method make the perf command to run""" # Command to get perf stats perf_cmd = 'exec perf stat -I {} -e {} -p {}'.format( self.aggregation_interval * 1000, ','.join( list(self.perf_events['hardware_events'].values()) + list(self.perf_events['software_events'].values()) + list(self.perf_events['L2'].values()) + list(self.perf_events['L2cache'].values()) + list(self.perf_events['L3'].values()) + list(self.perf_events['L3cache'].values()) + list(self.perf_events['FLOPS_SP'].values()) + list(self.perf_events['FLOPS_DP'].values()) ), ','.join([str(pid) for pid in self.pids]), ) return perf_cmd
[docs] @staticmethod def match_perf_line(pattern, cmd_out): """This method builds perf output pattern and get matching groups""" # Perf output structure out_struct = ( r'(?P<Time>[0-9.]*\s*)(?P<Value>[0-9,><a-zA-Z\s]*\s*)(' r'?P<Field>{}.*)'.format(pattern) ) # Search pattern in output result = re.search(out_struct, cmd_out) try: # return matching time and values return result.group('Time'), result.group('Value') except AttributeError: return '-1', 'not_available'
[docs] def parse_perf_cmd_out(self, cmd_out): """This method parses perf command output and populate perf data dict with counter values""" for perf_group, counters in self.perf_events.items(): for counter in counters.keys(): # If perf counter is in the perf output. This way we reduce check for unnecessary # matches if counter in cmd_out: time_string, value_string = self.match_perf_line(counter, cmd_out) try: count = int(value_string.rstrip().replace(',', '')) # Convert string to int time_float = int(float(time_string.rstrip().replace(',', ''))) except ValueError: count = 0 time_float = -1 self.perf_data[perf_group][counter].append(count) # Typically for one line output we have one perf counter to parse. If we # found it, exit the loop return time_float return -1
[docs] def compute_derived_metrics(self): """This method computes all the derived metrics from parsed perf counters""" for perf_group, derived_metrics in self.derived_perf_metrics.items(): for d_metric_name, expr in derived_metrics.items(): for metric_name, values in self.perf_data[perf_group].items(): if not metric_name.startswith('derv'): try: expr = re.sub(metric_name, str(values[-1]), expr) except IndexError: expr = '0' try: count_value = eval(expr) / self.aggregation_interval except (NameError, ValueError, ZeroDivisionError): count_value = 0 self.perf_data[perf_group][d_metric_name].append(count_value)
[docs] def setup_perf_monitor(self): """Setup steps for monitoring perf metrics""" # Process object of main job that is being monitored self.procs = [psutil.Process(p) for p in self.config['pid']] # Set and initialise the perf events and data dict self.set_up_perf_events() # Get all child processes of parent pid self.get_list_of_pids() # Generate perf stat command for given pids perf_cmd = self.make_perf_command() # Execute the command by using Popen self.monitor_proc = execute_cmd_pipe(perf_cmd)
[docs] def post_parsing_steps(self): """Steps to be made after parsing all metrics""" # Add current timestamp to the list of timestamps self.add_timestamp() # Compute derived metrics self.compute_derived_metrics()
[docs] def dump_metrics(self): """Dump metrics to JSON file and re-initiate perf_metrics dict""" dump_metrics_async(copy.deepcopy(self.perf_data), self.outfile) # Re-initialise perf metric data dict self.initialise_perf_metrics_data_dict()
[docs] def run(self): """This method extracts perf metrics for a given pid""" _log.info('Collection of perf metrics has started') # For the moment perf metrics are always collected for an aggregation period of 5 sec # during every collection cycle that is dependent on sampling frequency. # It ensures that we are not using up too much CPU resources. It is not configurable at # the moment. self.aggregation_interval = self.config['sampling_freq'] _log.debug('Perf metrics are reported for every %s', self.aggregation_interval) # Number of steps before writing the data to file check_point_step = int(self.check_point_time / self.aggregation_interval) # Step count i_step = 0 # Normally the perf command will spit out metrics with a frequency provided at the run time # We will capture the output here and parse it to get different perf counters try: # Setup perf monitoring self.setup_perf_monitor() # We will parse output line-by-line. It means for a given time stamp we will have # multiple lines and we read these lines one by one. But we should add the timestamp # to the data dict only once and compute derived metrics only once as well. # The var check_time is to check we finish parse all counters for a given time stamp # and then add timestamp to data dict check_time = -1 for line in iter(self.monitor_proc.stdout.readline, b''): # Check if main job process that we are monitoring is still running if ( proc_if_running(self.procs) and open(self.config['ipc_file'], 'r').read().rstrip() == 'INPROGRESS' ): # Parse the perf stat output elapsed_time = self.parse_perf_cmd_out(line) # If check_time is negative, set check_time to elapsed_time if check_time < 0 and elapsed_time > 0: check_time = elapsed_time # Once we finish parsing all lines of the perf we want to add timestamp and # compute other derived metrics from parsed metrics if check_time < elapsed_time: # Perform post parsing steps self.post_parsing_steps() # The new check time is the elapsed time check_time = elapsed_time # Dump metrics if check point is reached if i_step % check_point_step == 0: self.dump_metrics() # Increment i_step i_step += 1 else: # If main process terminates kill the monitor process as well and communicate # to io streams self.monitor_proc.kill() self.monitor_proc.communicate() # Perform post parsing steps self.post_parsing_steps() # And finally dump the data to disk # write_json(self.perf_data, self.outfile) dump_json(self.perf_data, self.outfile) return except ( FileNotFoundError, psutil.NoSuchProcess, psutil.AccessDenied, PerfEventListNotFoundError, ): dump_json(self.perf_data, self.outfile)