Source code for perfmon.common.df.__init__

""""This package contains modules related to creating dataframe"""

import os
import sys
import logging
import pandas

from perfmon.schemas.df import metric_header_names
from perfmon.schemas.df import perfcounters_header_names
from perfmon.schemas.df import ib_metric_header_names
from perfmon.schemas.df import nv_gpu_metric_header_names
from perfmon.common.utils.json_wrappers import load_json
from perfmon.common.utils.utilities import get_value
from perfmon.exceptions import MetricGroupNotImplementedError

_log = logging.getLogger(__name__)

# pylint: disable=E0401,W0201,C0301,W0621


[docs]class CreateDataFrame(object): """This class contains all methods to create a dataframe from JSON data""" def __init__(self, metric, config): """Initialise setup""" self.config = config.copy() self.metric = metric
[docs] def initialise_header_names(self): """This method initialises the names of headers for each metric""" # Header names of each monitored metric if self.metric == 'cpu_metrics': self.metric_header_names = metric_header_names elif self.metric == 'nv_gpu_metrics': self.metric_header_names = nv_gpu_metric_header_names # Add perf event headers. We cannot have a homogenous headers as perf # events vary with different architectures and vendors. We use same # name for the header as the event name elif self.metric == 'perf_metrics': try: perf_events = load_json(self.config['perf_event_file']) self.metric_header_names = { **perfcounters_header_names, 'L2': {key: key for key in perf_events['L2'].keys()}, 'L2cache': {key: key for key in perf_events['L2cache'].keys()}, 'L3': {key: key for key in perf_events['L3'].keys()}, 'L3cache': {key: key for key in perf_events['L3cache'].keys()}, 'FLOPS_SP': {key: key for key in perf_events['FLOPS_SP'].keys()}, 'FLOPS_DP': {key: key for key in perf_events['FLOPS_DP'].keys()}, } except FileNotFoundError: pass else: raise MetricGroupNotImplementedError( "Metric group %s not " "implemented" % self.metric )
[docs] def check_non_default_metrics(self, content): """Check for non default metrics""" # Memory bandwidth metrics if get_value(content, 'memory_bw'): self.metric_header_names = { **self.metric_header_names, 'memory_bw': 'Memory (read) bandwidth [MiB/s]', } # IB metrics if get_value(content, 'port_xmit_data'): self.metric_header_names = { **self.metric_header_names, **ib_metric_header_names, } # RAPL metrics - package, DRAM, core, uncore # we check if metrics available for upto 8 sockets rapl_metric_headers = {} for domain in ['package', 'dram', 'core', 'uncore']: for socket in range(8): metric_name = '-'.join([domain, str(socket)]) if get_value(content, metric_name): rapl_metric_headers[metric_name] = f'RAPL {domain} {socket} [uJ]' if rapl_metric_headers: self.metric_header_names = { **self.metric_header_names, 'rapl_powercap': rapl_metric_headers, }
[docs] def create_dataframe(self, content): """This method creates and returns dataframe from the metric data""" def truncate_metric_data(data): host_metric = data if len(data) < host_entries: host_metric += [float('nan') for _ in range(host_entries - len(data))] elif len(data) > host_entries: host_metric = [data[t] for t in range(host_entries)] return host_metric def append_data_to_list(header, data): header_names.append(header) if header not in header_names else None for i, j in enumerate(range(start_index, start_index + host_entries)): if header == 'Host': df_data[j].append(host) else: df_data[j].append(data[i]) # Add non default metrics to metric header names if available self.check_non_default_metrics(content) # List of header names header_names = [] # Number of entries for each metric. This is sum of all host time stamps num_entries = sum( [len(content[host]['time_stamps']) for host in content['host_names']] ) # Initialise data frame data # We add one line for a given host and time stamp. All host metrics # are added serially df_data = [[] for _ in range(num_entries)] # Make list of lists to create a dataframe. Column host will be used # to group the metrics from each host start_index = 0 for host in content['host_names']: host_entries = len(content[host]['time_stamps']) # num_entries = start_index + host_entries for metric, header in self.metric_header_names.items(): try: host_metric = content[host][metric] if isinstance(header, str): data = truncate_metric_data(host_metric) append_data_to_list(header, data) elif isinstance(header, dict): for sub_metric, sub_header in header.items(): data = truncate_metric_data(host_metric[sub_metric]) append_data_to_list(sub_header, data) except KeyError: if header == 'Host': append_data_to_list(header, []) else: pass start_index += host_entries # Create data frame return pandas.DataFrame(df_data, columns=header_names)
[docs] def go(self): """Entry point to the class""" _log.info('Creating dataframe from %s data...' % self.metric) # Setup names of the metric headers self.initialise_header_names() # Load metrics data content = load_json( os.path.join(self.config['data_dir'], self.metric + '.json') ) # If all monitor jobs on all nodes failed to monitor metrics we skip exporting if content['host_names']: return self.create_dataframe(content) else: _log.warning( 'No %s data found. Skipping creating dataframe, ' 'and exporting to excel and databases' % self.metric ) return pandas.DataFrame()
if __name__ == '__main__': import pathlib project_root = pathlib.Path(__file__).parent.parent.parent.parent config = {} config['data_dir'] = os.getcwd() config['perf_event_file'] = os.path.join( os.getcwd(), '.perf_event_list.json' ) create_df = CreateDataFrame('perf_metrics', config) df = create_df.go() dd = 1 # df.to_csv('perf_metrics.csv', index=False)