Source code for perfmon.common.plots.__init__

""""This package contains functions to plot gathered metrics"""

import os
import datetime
import logging
import warnings
import matplotlib.pyplot
import pandas

from perfmon.schemas.plots import base_metric_labels
from perfmon.schemas.plots import ib_metric_labels
from perfmon.schemas.plots import mem_bw_metric_label
from perfmon.schemas.plots import engy_package_metric_label
from perfmon.schemas.plots import engy_dram_metric_label
from perfmon.schemas.plots import engy_core_metric_label
from perfmon.schemas.plots import engy_uncore_metric_label

logging.getLogger('matplotlib').setLevel(logging.WARNING)

_log = logging.getLogger(__name__)

# Generate colors for plots
COLORS = matplotlib.pyplot.rcParams['axes.prop_cycle'].by_key()['color']

# Centimeters to inches conversion factor
CM2INCH = 1 / 2.54

# DPI resolution for the plots
DPI = 200

# matplotlib line styles
LINESTYLES = ['-', '--', '-.', ':']

# matplotlib global parameters
PARAMS = {
    'lines.markersize': 3,
    'legend.fontsize': 5,
    'axes.labelsize': 7,
    'axes.titlesize': 7,
    'xtick.labelsize': 6,
    'ytick.labelsize': 6,
}
matplotlib.pyplot.rcParams.update(PARAMS)

# Ignore userwarnings from matplotlib, pandas
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)

# pylint: disable=E0401,W0201,C0301,W0621


[docs]class GenPlots(object): """This class contains all plotting methods (Only for CPU metrics)""" # pylint: disable=too-many-instance-attributes def __init__(self, config, df_dict): """Initialize setup""" # Config dict self.config = config.copy() # Dataframes dict self.df_dict = df_dict # Metrics to plot and their parameters self.metric_labels = base_metric_labels
[docs] def check_non_default_metrics(self, df): """Check if IB, mem. bandwidth and RAPL metrics are available in collected metrics""" # IB metrics if 'IB recv data [bytes]' in df.columns: self.metric_labels = { **self.metric_labels, **ib_metric_labels, } # Memory bandwidth metrics if 'Memory (read) bandwidth [MiB/s]' in df.columns: self.metric_labels = { **self.metric_labels, **mem_bw_metric_label, } # RAPL metrics - package if 'RAPL package 0 [uJ]' in df.columns: self.metric_labels = { **self.metric_labels, **engy_package_metric_label, } # RAPL metrics - DRAM if 'RAPL dram 0 [uJ]' in df.columns: self.metric_labels = { **self.metric_labels, **engy_dram_metric_label, } # RAPL metrics - Core if 'RAPL core 0 [uJ]' in df.columns: self.metric_labels = { **self.metric_labels, **engy_core_metric_label, } # RAPL metrics - Uncore if 'RAPL uncore 0 [uJ]' in df.columns: self.metric_labels = { **self.metric_labels, **engy_uncore_metric_label, }
[docs] @staticmethod def convert_ts_datetime(df): """Convert timestamps in df to datetime format""" df['Datetime'] = df['Timestamps'].apply( lambda x: datetime.datetime.fromtimestamp(x).strftime('%m/%d/%Y\n %H:%M:%S') ) return df
[docs] def apply_plot_settings(self, plot_type, metric_att, mean_max, ax): """This method applies the common settings to the plots""" # Set x axis name ax.set_xlabel('Time') # Set y axis name ax.set_ylabel(' '.join([metric_att['name'], f"[{metric_att['units']}]"])) # Plot log scale on y axis for bytes and packets if metric_att['log_scale']: ax.set_yscale('log', base=10) # Add total, mean, max information to the title of the plot ax.set_title( f"Average {metric_att['name']}/Node: {mean_max[0]:.3f} " f"{metric_att['units']}, Max {0}/Node: {mean_max[1]:.3f} " f"{metric_att['units']}", weight='bold', ) # Place legend at upper right corner matplotlib.pyplot.legend(loc='upper right') # Add major and minor grid lines matplotlib.pyplot.grid(visible=True, which='both', axis='both') # Choose tight layout matplotlib.pyplot.tight_layout() # Save figure to png if plot_dir is found if 'plot_dir' in self.config.keys(): fig_path = os.path.join( self.config['plot_dir'], metric_att['name'].replace(' ', '_') + plot_type + '.png' ) matplotlib.pyplot.savefig(fig_path, dpi=DPI) matplotlib.pyplot.close()
[docs] @staticmethod def get_global_mean_max(mean_max_all): """Get global mean max of metric from host data""" return [ sum(i[0] for i in mean_max_all) / len(mean_max_all), max(i[1] for i in mean_max_all), ]
[docs] @staticmethod def replace_neg_values(df): """Replace negative values in df to preceding positive values""" return df.mask(df.lt(0)).ffill().fillna(0).convert_dtypes()
[docs] def plotting_engine(self, host_name, metric, metric_att, ax, data): """Main engine to create plots""" def convert_data_and_plot(t_metric, n_metric, df): # Unit conversion of metric e.g. bytes to MiB df[t_metric] = df[t_metric].div(metric_att['unit_conversion']) # Convert to rate if metric_att['convert_to_rate']: df[n_metric] = df[t_metric].diff() / sampling_intervals else: n_metric = t_metric # Replace negative metrics values to preceding positive values df[n_metric] = self.replace_neg_values(df[n_metric]) # Plot metric on given axis df.plot(ax=ax, x='Datetime', y=n_metric, style=['o-'], label=host_name) return n_metric, [df[n_metric].mean(), df[n_metric].max()], data # Get sampling intervals for each host sampling_intervals = data['Timestamps'].diff() # Create a new metric new_metric = ' '.join([metric_att['name'], metric_att['units']]) # For energy metrics multiple packages are involved. We are # checking for 8 packages for each device if metric_att['cat'] in ['Energy']: mean_max_all = [] # We aggregate all packages readings here total_metric = metric_att['name'] for d in range(8): # Create a new sub metric for a given package and device test_metric = f'{metric} {d} [uJ]' # Check if package is there in data if test_metric in data.columns: # Make necessary conversions and plot on axis handler new_metric, mean_max, data = convert_data_and_plot( test_metric, new_metric, data ) if total_metric in data.columns: data[total_metric] += data[new_metric] else: data[total_metric] = data[new_metric] # Append mean_max of package to global mean_max_all mean_max_all.append(mean_max) # We rename the column to original metric name and drop the # intermediate column data[new_metric] = data[total_metric] data.drop(columns=[total_metric]) # Get global mean and max values of all packages mean_max = self.get_global_mean_max(mean_max_all) else: new_metric, mean_max, data = convert_data_and_plot(metric, new_metric, data) return new_metric, mean_max, data
[docs] def combined_plotting_engine(self, metric, metric_att, comb_ts_df, comb_metric_df): """Plotting engine for combined metrics""" # Concat time stamps from all hosts df_ts_concat = pandas.concat(comb_ts_df, axis=1, ignore_index=True) # Concat df from all hosts df_concat = pandas.concat(comb_metric_df, axis=1, ignore_index=True) # Depending on type of aggregating either sum or take mean of all # hosts data if metric_att['comb'] == 'Total': final_df = pandas.concat([df_ts_concat.mean(axis=1), df_concat.sum(axis=1)], axis=1) elif metric_att['comb'] == 'Average': final_df = pandas.concat([df_ts_concat.mean(axis=1), df_concat.mean(axis=1)], axis=1) # Rename columns of df final_df.columns = ['Timestamps', metric] # Add datetime string to df final_df = self.convert_ts_datetime(final_df) # Replace negative metrics values to preceding positive values final_df[metric] = self.replace_neg_values(final_df[metric]) # Add aggregation type as prefix to metric name metric_att['name'] = ' '.join([metric_att['comb'], metric_att['name']]) # Initialise figure and axis _, ax = matplotlib.pyplot.subplots(figsize=(18 * CM2INCH, metric_att['size'] * CM2INCH)) # Plot data final_df.plot(ax=ax, x='Datetime', y=metric, style=['o-']) # Get mean max of entire data mean_max = [final_df[metric].mean(), final_df[metric].max()] # Apply plot settings for all nodes plots self.apply_plot_settings('_all_nodes', metric_att, mean_max, ax)
[docs] def make_plots(self, df): """This method plots both per host and combined metrics""" for metric, metric_att in self.metric_labels.items(): # Init matplotlib axis _, ax = matplotlib.pyplot.subplots(figsize=(18 * CM2INCH, metric_att['size'] * CM2INCH)) # Initialise empty list for timestamp df comb_ts_df = [] # Initialise empty list for concatenated df comb_metric_df = [] # Mean max of all nodes mean_max_all = [] for host_name, data in df.groupby('Host'): new_metric, mean_max, data = self.plotting_engine( host_name, metric, metric_att, ax, data ) comb_metric_df.append(data[new_metric].reset_index(drop=True)) comb_ts_df.append(data['Timestamps'].reset_index(drop=True)) mean_max_all.append(mean_max) # Get global mean max of metric mean_max = self.get_global_mean_max(mean_max_all) # Apply plot settings for per host plots self.apply_plot_settings('_per_node', metric_att, mean_max, ax) # Plot combined metric self.combined_plotting_engine(new_metric, metric_att, comb_ts_df, comb_metric_df)
[docs] def plot_metric_data(self, df): """Make plots for the cpu metric data""" # Check the availability of IB, memory bandwidth and RAPL metrics # and add to metric label dict self.check_non_default_metrics(df) # Convert timestamps to datetime format df = self.convert_ts_datetime(df) # Plot metrics self.make_plots(df)
[docs] def go(self): """Entry point for plotting""" _log.info('Making plots...') # Load CPU metrics data df = self.df_dict['cpu_metrics'] if not df.empty: self.plot_metric_data(df) _log.info('Plots generated') else: _log.warning('No data found. Skipping plots generation')
if __name__ == '__main__': import pathlib # Get project root directory project_root = pathlib.Path(__file__).parent.parent.parent.parent # Set GCP service account key file env variable config = { 'plot_dir': os.getcwd(), } df_dict = { 'cpu_metrics': pandas.read_hdf('metrics.h5', 'cpu_metrics_3072'), 'perf_metrics': pandas.read_hdf('metrics.h5', 'perf_metrics_3072'), } make_plots = GenPlots(config=config, df_dict=df_dict) make_plots.go()