"""This file contains base class to monitor perf stat metrics"""
import os
import re
import logging
import time
import copy
import multiprocessing
import psutil
from perfmon.core.metrics.common import dump_metrics_async
from perfmon.core.metrics.common import get_child_procs
from perfmon.common.utils.execute_cmd import execute_cmd_pipe
from perfmon.common.utils.json_wrappers import dump_json
from perfmon.common.utils.process import proc_if_running
from perfmon.common.perf import get_working_perf_events
from perfmon.exceptions import PerfEventListNotFoundError
_log = logging.getLogger(__name__)
# pylint: disable=E0401,W0201,C0301
[docs]class MonitorPerfCounters(multiprocessing.Process):
"""Engine to extract performance metrics"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config):
"""Initialize setup"""
# must call this before anything else
multiprocessing.Process.__init__(self)
self.config = config.copy()
# Every 900 sec the data is dumped into file
self.check_point_time = self.config['check_point']
self.user = os.environ['USER']
self._extra = {}
# Initialise perf_data dict
self.perf_data = {}
# Name of the output files
self.outfile = os.path.join(
self.config['temp_path']['perf_metrics'],
'_'.join([str(self.config['job_id']), self.config['host_name']]) + '.json',
)
[docs] def initialise_perf_metrics_data_dict(self):
"""This method initialises the perf metric related parameters"""
# Schema for the perf data
self.perf_data = {
'host_name': self.config['host_name'],
'sampling_frequency': self.config['sampling_freq'],
'time_stamps': [],
'hardware_events': {key: [] for key in self.perf_events['hardware_events'].keys()},
'software_events': {key: [] for key in self.perf_events['software_events'].keys()},
'L2': {key: [] for key in self.perf_events['L2'].keys()},
'L2cache': {key: [] for key in self.perf_events['L2cache'].keys()},
'L3': {key: [] for key in self.perf_events['L3'].keys()},
'L3cache': {key: [] for key in self.perf_events['L3cache'].keys()},
'FLOPS_SP': {key: [] for key in self.perf_events['FLOPS_SP'].keys()},
'FLOPS_DP': {key: [] for key in self.perf_events['FLOPS_DP'].keys()},
}
# Add derived metrics to the schema of the perf data
for perf_group, derived_metric in self.derived_perf_metrics.items():
for name in derived_metric.keys():
self.perf_data[perf_group][name] = []
[docs] def dump_avail_perf_events(self):
"""Dump the available perf event list for later use"""
# Save the schema of available perf events for later use
if self.config['master_node'] == self.config['node_name']:
_log.debug('Saving the available perf event list to file')
dump_json(self.perf_data, self.config['perf_event_file'])
[docs] def set_up_perf_events(self):
"""This method checks for available perf events, tests them and initialise the data dict"""
# Get available perf events for the processor
self.perf_events, self.derived_perf_metrics = get_working_perf_events()
# Check if perf_events is not empty
if not self.perf_events:
raise PerfEventListNotFoundError("No perf events found")
# Initialise the metric data dict
self.initialise_perf_metrics_data_dict()
# Dump the available perf event list
self.dump_avail_perf_events()
[docs] def get_list_of_pids(self):
"""This method gets the list of pids to monitor by adding children pids to parents"""
self.pids = [p.pid for p in get_child_procs(self.user, self.procs)]
[docs] def add_timestamp(self):
"""This method adds timestamp to the data"""
# Get time stamp and convert it to int. We are not looking at fine grained monitoring here
# We add time stamp for previous period at the beginning of new period. So we subtract
# aggregation interval from current time stamp
self.perf_data['time_stamps'].append(int(time.time()) - self.aggregation_interval)
[docs] def make_perf_command(self):
"""This method make the perf command to run"""
# Command to get perf stats
perf_cmd = 'exec perf stat -I {} -e {} -p {}'.format(
self.aggregation_interval * 1000,
','.join(
list(self.perf_events['hardware_events'].values())
+ list(self.perf_events['software_events'].values())
+ list(self.perf_events['L2'].values())
+ list(self.perf_events['L2cache'].values())
+ list(self.perf_events['L3'].values())
+ list(self.perf_events['L3cache'].values())
+ list(self.perf_events['FLOPS_SP'].values())
+ list(self.perf_events['FLOPS_DP'].values())
),
','.join([str(pid) for pid in self.pids]),
)
return perf_cmd
[docs] @staticmethod
def match_perf_line(pattern, cmd_out):
"""This method builds perf output pattern and get matching groups"""
# Perf output structure
out_struct = (
r'(?P<Time>[0-9.]*\s*)(?P<Value>[0-9,><a-zA-Z\s]*\s*)('
r'?P<Field>{}.*)'.format(pattern)
)
# Search pattern in output
result = re.search(out_struct, cmd_out)
try:
# return matching time and values
return result.group('Time'), result.group('Value')
except AttributeError:
return '-1', 'not_available'
[docs] def parse_perf_cmd_out(self, cmd_out):
"""This method parses perf command output and populate perf data dict with counter values"""
for perf_group, counters in self.perf_events.items():
for counter in counters.keys():
# If perf counter is in the perf output. This way we reduce check for unnecessary
# matches
if counter in cmd_out:
time_string, value_string = self.match_perf_line(counter, cmd_out)
try:
count = int(value_string.rstrip().replace(',', ''))
# Convert string to int
time_float = int(float(time_string.rstrip().replace(',', '')))
except ValueError:
count = 0
time_float = -1
self.perf_data[perf_group][counter].append(count)
# Typically for one line output we have one perf counter to parse. If we
# found it, exit the loop
return time_float
return -1
[docs] def compute_derived_metrics(self):
"""This method computes all the derived metrics from parsed perf counters"""
for perf_group, derived_metrics in self.derived_perf_metrics.items():
for d_metric_name, expr in derived_metrics.items():
for metric_name, values in self.perf_data[perf_group].items():
if not metric_name.startswith('derv'):
try:
expr = re.sub(metric_name, str(values[-1]), expr)
except IndexError:
expr = '0'
try:
count_value = eval(expr) / self.aggregation_interval
except (NameError, ValueError, ZeroDivisionError):
count_value = 0
self.perf_data[perf_group][d_metric_name].append(count_value)
[docs] def setup_perf_monitor(self):
"""Setup steps for monitoring perf metrics"""
# Process object of main job that is being monitored
self.procs = [psutil.Process(p) for p in self.config['pid']]
# Set and initialise the perf events and data dict
self.set_up_perf_events()
# Get all child processes of parent pid
self.get_list_of_pids()
# Generate perf stat command for given pids
perf_cmd = self.make_perf_command()
# Execute the command by using Popen
self.monitor_proc = execute_cmd_pipe(perf_cmd)
[docs] def post_parsing_steps(self):
"""Steps to be made after parsing all metrics"""
# Add current timestamp to the list of timestamps
self.add_timestamp()
# Compute derived metrics
self.compute_derived_metrics()
[docs] def dump_metrics(self):
"""Dump metrics to JSON file and re-initiate perf_metrics dict"""
dump_metrics_async(copy.deepcopy(self.perf_data), self.outfile)
# Re-initialise perf metric data dict
self.initialise_perf_metrics_data_dict()
[docs] def run(self):
"""This method extracts perf metrics for a given pid"""
_log.info('Collection of perf metrics has started')
# For the moment perf metrics are always collected for an aggregation period of 5 sec
# during every collection cycle that is dependent on sampling frequency.
# It ensures that we are not using up too much CPU resources. It is not configurable at
# the moment.
self.aggregation_interval = self.config['sampling_freq']
_log.debug('Perf metrics are reported for every %s', self.aggregation_interval)
# Number of steps before writing the data to file
check_point_step = int(self.check_point_time / self.aggregation_interval)
# Step count
i_step = 0
# Normally the perf command will spit out metrics with a frequency provided at the run time
# We will capture the output here and parse it to get different perf counters
try:
# Setup perf monitoring
self.setup_perf_monitor()
# We will parse output line-by-line. It means for a given time stamp we will have
# multiple lines and we read these lines one by one. But we should add the timestamp
# to the data dict only once and compute derived metrics only once as well.
# The var check_time is to check we finish parse all counters for a given time stamp
# and then add timestamp to data dict
check_time = -1
for line in iter(self.monitor_proc.stdout.readline, b''):
# Check if main job process that we are monitoring is still running
if (
proc_if_running(self.procs)
and open(self.config['ipc_file'], 'r').read().rstrip() == 'INPROGRESS'
):
# Parse the perf stat output
elapsed_time = self.parse_perf_cmd_out(line)
# If check_time is negative, set check_time to elapsed_time
if check_time < 0 and elapsed_time > 0:
check_time = elapsed_time
# Once we finish parsing all lines of the perf we want to add timestamp and
# compute other derived metrics from parsed metrics
if check_time < elapsed_time:
# Perform post parsing steps
self.post_parsing_steps()
# The new check time is the elapsed time
check_time = elapsed_time
# Dump metrics if check point is reached
if i_step % check_point_step == 0:
self.dump_metrics()
# Increment i_step
i_step += 1
else:
# If main process terminates kill the monitor process as well and communicate
# to io streams
self.monitor_proc.kill()
self.monitor_proc.communicate()
# Perform post parsing steps
self.post_parsing_steps()
# And finally dump the data to disk
# write_json(self.perf_data, self.outfile)
dump_json(self.perf_data, self.outfile)
return
except (
FileNotFoundError,
psutil.NoSuchProcess,
psutil.AccessDenied,
PerfEventListNotFoundError,
):
dump_json(self.perf_data, self.outfile)