"""This file initiates the script to extract real time perf metrics"""
import os
import logging
import re
import time
import copy
import multiprocessing
import threading
import psutil
from monitormetrics.utils.utils import execute_cmd_pipe, get_perf_events, \
check_perf_events, dump_json, proc_if_running
from monitormetrics.utils.exceptions import PerfEventListNotFoundError
from monitormetrics._version import __version__
_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})
[docs]class MonitorPerfEventsMetrics(multiprocessing.Process):
"""Engine to extract performance metrics"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config):
"""Initialize setup"""
# must call this before anything else
multiprocessing.Process.__init__(self)
self.config = config.copy()
self.check_point_time = self.config['check_point'] # Every 900 sec the data is dumped into
# file
self.user = os.environ['USER']
self._extra = {}
# Initialise perf_data dict
self.perf_data = {}
# Name of the output files
self.outfile = os.path.join(self.config['temp_path']['perf_metrics'],
"_".join([str(self.config['job_id']),
self.config['host_name']]) + ".json")
[docs] def initialise_perf_metrics_data_dict(self):
"""This method initialises the perf metric related parameters"""
# Schema for the perf data
self.perf_data = {
'host_name': self.config['host_name'],
'sampling_frequency': self.config['sampling_freq'],
'time_stamps': [],
'hardware_events': {key: [] for key in self.perf_events['hardware_events'].keys()},
'software_events': {key: [] for key in self.perf_events['software_events'].keys()},
'L2': {key: [] for key in self.perf_events['L2'].keys()},
'L2cache': {key: [] for key in self.perf_events['L2cache'].keys()},
'L3': {key: [] for key in self.perf_events['L3'].keys()},
'L3cache': {key: [] for key in self.perf_events['L3cache'].keys()},
'FLOPS_SP': {key: [] for key in self.perf_events['FLOPS_SP'].keys()},
'FLOPS_DP': {key: [] for key in self.perf_events['FLOPS_DP'].keys()}
}
# Add derived metrics to the schema of the perf data
for perf_group, derived_metric in self.derived_perf_metrics.items():
for name in derived_metric.keys():
self.perf_data[perf_group][name] = []
[docs] def dump_avail_perf_events(self):
"""Dump the available perf event list for later use"""
# Save the schema of available perf events for later use
if self.config['master_node'] == self.config['node_name']:
_log.debug("Saving the available perf event list to file")
dump_json(self.perf_data, self.config['perf_event_file'])
[docs] def set_up_perf_events(self):
"""This method checks for available perf events, tests them and initialise the data dict"""
# Get available perf events for the processor
found_perf_events, self.derived_perf_metrics = get_perf_events()
# Check if all perf groups are working
self.perf_events = check_perf_events(found_perf_events)
# Initialise the metric data dict
self.initialise_perf_metrics_data_dict()
# Dump the available perf event list
self.dump_avail_perf_events()
[docs] def get_list_of_pids(self):
"""This method gets the list of pids to monitor by adding children pids to parents"""
self.pids = self.config['pid'] + [child_proc.pid for p in self.procs
for child_proc in p.children(recursive=True)
if child_proc.username() == self.user]
[docs] def add_timestamp(self):
"""This method adds timestamp to the data"""
# Get time stamp and convert it to int. We are not looking at fine grained monitoring here
# We add time stamp for previous period at the beginning of new period. So we subtract
# aggregation interval from current time stamp
self.perf_data['time_stamps'].append(int(time.time()) - self.aggregation_interval)
[docs] def make_perf_command(self):
"""This method make the perf command to run"""
# Command to get perf stats
perf_cmd = "exec perf stat -I {} -e {} -p {}". \
format(self.aggregation_interval * 1000,
",".join(list(self.perf_events['hardware_events'].values()) +
list(self.perf_events['software_events'].values()) +
list(self.perf_events['L2'].values()) +
list(self.perf_events['L2cache'].values()) +
list(self.perf_events['L3'].values()) +
list(self.perf_events['L3cache'].values()) +
list(self.perf_events['FLOPS_SP'].values()) +
list(self.perf_events['FLOPS_DP'].values())),
",".join([str(pid) for pid in self.pids]))
return perf_cmd
[docs] @staticmethod
def match_perf_line(pattern, cmd_out):
"""This method builds perf output pattern and get matching groups"""
# Perf output structure
out_struct = (r"(?P<Time>[0-9.]*\s*)(?P<Value>[0-9,><a-zA-Z\s]*\s*)("
r"?P<Field>{}.*)".format(pattern))
# Search pattern in output
result = re.search(out_struct, cmd_out)
try:
# return matching time and values
return result.group('Time'), result.group('Value')
except AttributeError:
return "-1", "not_available"
[docs] def parse_perf_cmd_out(self, cmd_out):
"""This method parses perf command output and populate perf data dict with counter values"""
for perf_group, counters in self.perf_events.items():
for counter in counters.keys():
# If perf counter is in the perf output. This way we reduce check for unnecessary
# matches
if counter in cmd_out:
time_string, value_string = self.match_perf_line(counter, cmd_out)
try:
count = int(value_string.rstrip().replace(",", ""))
# Convert string to int
time_float = int(float(time_string.rstrip().replace(",", "")))
except ValueError:
count = 0
time_float = -1
self.perf_data[perf_group][counter].append(count)
# Typically for one line output we have one perf counter to parse. If we
# found it, exit the loop
return time_float
return -1
[docs] def compute_derived_metrics(self):
"""This method computes all the derived metrics from parsed perf counters"""
for perf_group, derived_metrics in self.derived_perf_metrics.items():
for d_metric_name, expr in derived_metrics.items():
for metric_name, values in self.perf_data[perf_group].items():
if not metric_name.startswith('derv'):
try:
expr = re.sub(metric_name, str(values[-1]), expr)
except IndexError:
expr = "0"
try:
count_value = eval(expr) / self.aggregation_interval
except (NameError, ValueError, ZeroDivisionError):
count_value = 0
self.perf_data[perf_group][d_metric_name].append(count_value)
[docs] def setup_perf_monitor(self):
"""Setup steps for monitoring perf metrics"""
# Process object of main job that is being monitored
self.procs = [psutil.Process(p) for p in self.config['pid']]
# Set and initialise the perf events and data dict
self.set_up_perf_events()
# Get all child processes of parent pid
self.get_list_of_pids()
# Generate perf stat command for given pids
perf_cmd = self.make_perf_command()
# Execute the command by using Popen
self.monitor_proc = execute_cmd_pipe(perf_cmd)
[docs] def post_parsing_steps(self):
"""Steps to be made after parsing all metrics"""
# Add current timestamp to the list of timestamps
self.add_timestamp()
# Compute derived metrics
self.compute_derived_metrics()
[docs] def dump_metrics(self):
"""Dump metrics to JSON file and re-initiate perf_metrics dict"""
# Append data to existing JSON file or create a new file if it doesnt exist
# Start a new thread async to dump data to the file
dump_metrics_thread = threading.Thread(target=dump_json,
args=(copy.deepcopy(self.perf_data), self.outfile))
dump_metrics_thread.start()
# Re-initialise perf metric data dict
self.initialise_perf_metrics_data_dict()
[docs] def run(self):
"""This method extracts perf metrics for a given pid"""
_log.info("Collection of perf metrics has started")
# For the moment perf metrics are always collected for an aggregation period of 5 sec
# during every collection cycle that is dependent on sampling frequency.
# It ensures that we are not using up too much CPU resources. It is not configurable at
# the moment.
self.aggregation_interval = self.config['sampling_freq']
_log.debug("Perf metrics are reported for every %s", self.aggregation_interval)
# Number of steps before writing the data to file
check_point_step = int(self.check_point_time / self.aggregation_interval)
# Step count
i_step = 0
# Normally the perf command will spit out metrics with a frequency provided at the run time
# We will capture the output here and parse it to get different perf counters
try:
# Setup perf monitoring
self.setup_perf_monitor()
# We will parse output line-by-line. It means for a given time stamp we will have
# multiple lines and we read these lines one by one. But we should add the timestamp
# to the data dict only once and compute derived metrics only once as well.
# The var check_time is to check we finish parse all counters for a given time stamp
# and then add timestamp to data dict
check_time = -1
for line in iter(self.monitor_proc.stdout.readline, b''):
# Check if main job process that we are monitoring is still running
if proc_if_running(self.procs) and \
open(self.config['ipc_file'], "r").read().rstrip() == "INPROGRESS":
# Parse the perf stat output
elapsed_time = self.parse_perf_cmd_out(line)
# If check_time is negative, set check_time to elapsed_time
if check_time < 0 and elapsed_time > 0:
check_time = elapsed_time
# Once we finish parsing all lines of the perf we want to add timestamp and
# compute other derived metrics from parsed metrics
if check_time < elapsed_time:
# Perform post parsing steps
self.post_parsing_steps()
# The new check time is the elapsed time
check_time = elapsed_time
# Dump metrics if check point is reached
if i_step % check_point_step == 0:
self.dump_metrics()
# Increment i_step
i_step += 1
else:
# If main process terminates kill the monitor process as well and communicate
# to io streams
self.monitor_proc.kill()
self.monitor_proc.communicate()
# Perform post parsing steps
self.post_parsing_steps()
# And finally dump the data to disk
# write_json(self.perf_data, self.outfile)
dump_json(self.perf_data, self.outfile)
return
except (FileNotFoundError, psutil.NoSuchProcess, psutil.AccessDenied, PerfEventListNotFoundError):
# write_json(self.perf_data, self.outfile)
dump_json(self.perf_data, self.outfile)