""""This package contains modules related to creating dataframe"""
import os
import sys
import logging
import pandas
from perfmon.schemas.df import metric_header_names
from perfmon.schemas.df import perfcounters_header_names
from perfmon.schemas.df import ib_metric_header_names
from perfmon.schemas.df import nv_gpu_metric_header_names
from perfmon.common.utils.json_wrappers import load_json
from perfmon.common.utils.utilities import get_value
from perfmon.exceptions import MetricGroupNotImplementedError
_log = logging.getLogger(__name__)
# pylint: disable=E0401,W0201,C0301,W0621
[docs]class CreateDataFrame(object):
"""This class contains all methods to create a dataframe from JSON data"""
def __init__(self, metric, config):
"""Initialise setup"""
self.config = config.copy()
self.metric = metric
[docs] def check_non_default_metrics(self, content):
"""Check for non default metrics"""
# Memory bandwidth metrics
if get_value(content, 'memory_bw'):
self.metric_header_names = {
**self.metric_header_names,
'memory_bw': 'Memory (read) bandwidth [MiB/s]',
}
# IB metrics
if get_value(content, 'port_xmit_data'):
self.metric_header_names = {
**self.metric_header_names,
**ib_metric_header_names,
}
# RAPL metrics - package, DRAM, core, uncore
# we check if metrics available for upto 8 sockets
rapl_metric_headers = {}
for domain in ['package', 'dram', 'core', 'uncore']:
for socket in range(8):
metric_name = '-'.join([domain, str(socket)])
if get_value(content, metric_name):
rapl_metric_headers[metric_name] = f'RAPL {domain} {socket} [uJ]'
if rapl_metric_headers:
self.metric_header_names = {
**self.metric_header_names,
'rapl_powercap': rapl_metric_headers,
}
[docs] def create_dataframe(self, content):
"""This method creates and returns dataframe from the metric data"""
def truncate_metric_data(data):
host_metric = data
if len(data) < host_entries:
host_metric += [float('nan') for _ in range(host_entries - len(data))]
elif len(data) > host_entries:
host_metric = [data[t] for t in range(host_entries)]
return host_metric
def append_data_to_list(header, data):
header_names.append(header) if header not in header_names else None
for i, j in enumerate(range(start_index, start_index + host_entries)):
if header == 'Host':
df_data[j].append(host)
else:
df_data[j].append(data[i])
# Add non default metrics to metric header names if available
self.check_non_default_metrics(content)
# List of header names
header_names = []
# Number of entries for each metric. This is sum of all host time stamps
num_entries = sum(
[len(content[host]['time_stamps']) for host in content['host_names']]
)
# Initialise data frame data
# We add one line for a given host and time stamp. All host metrics
# are added serially
df_data = [[] for _ in range(num_entries)]
# Make list of lists to create a dataframe. Column host will be used
# to group the metrics from each host
start_index = 0
for host in content['host_names']:
host_entries = len(content[host]['time_stamps'])
# num_entries = start_index + host_entries
for metric, header in self.metric_header_names.items():
try:
host_metric = content[host][metric]
if isinstance(header, str):
data = truncate_metric_data(host_metric)
append_data_to_list(header, data)
elif isinstance(header, dict):
for sub_metric, sub_header in header.items():
data = truncate_metric_data(host_metric[sub_metric])
append_data_to_list(sub_header, data)
except KeyError:
if header == 'Host':
append_data_to_list(header, [])
else:
pass
start_index += host_entries
# Create data frame
return pandas.DataFrame(df_data, columns=header_names)
[docs] def go(self):
"""Entry point to the class"""
_log.info('Creating dataframe from %s data...' % self.metric)
# Setup names of the metric headers
self.initialise_header_names()
# Load metrics data
content = load_json(
os.path.join(self.config['data_dir'], self.metric + '.json')
)
# If all monitor jobs on all nodes failed to monitor metrics we skip exporting
if content['host_names']:
return self.create_dataframe(content)
else:
_log.warning(
'No %s data found. Skipping creating dataframe, '
'and exporting to excel and databases' % self.metric
)
return pandas.DataFrame()
if __name__ == '__main__':
import pathlib
project_root = pathlib.Path(__file__).parent.parent.parent.parent
config = {}
config['data_dir'] = os.getcwd()
config['perf_event_file'] = os.path.join(
os.getcwd(), '.perf_event_list.json'
)
create_df = CreateDataFrame('perf_metrics', config)
df = create_df.go()
dd = 1
# df.to_csv('perf_metrics.csv', index=False)