Source code for perfmon.core.metrics.gpu

"""This file contains base class to monitor GPU metrics"""

import os
import logging
import time
import copy
import multiprocessing
import psutil

from perfmon.schemas.metrics import nv_gpu_data_schema
from perfmon.core.metrics.gpumetrics.nvidia.memory import memory_usage
from perfmon.core.metrics.gpumetrics.nvidia.utilization import get_gpu_mem_util_rates
from perfmon.core.metrics.gpumetrics.nvidia.utilization import get_encoder_decoder_util_rates
from perfmon.core.metrics.gpumetrics.nvidia.errors import ecc_error_counts
from perfmon.core.metrics.gpumetrics.nvidia.clock import clock_info
from perfmon.core.metrics.gpumetrics.nvidia.power import power_usage
from perfmon.core.metrics.gpumetrics.nvidia.power import power_violation_report
from perfmon.core.metrics.gpumetrics.nvidia.misc import misc_metrics
from perfmon.core.metrics.common import dump_metrics_async
from perfmon.common.utils.json_wrappers import dump_json
from perfmon.common.utils.process import proc_if_running

_log = logging.getLogger(__name__)

# pylint: disable=E0401,W0201,C0301


[docs]class MonitorNvidiaGpuMetrics(multiprocessing.Process): """Engine to monitor gpu related metrics""" # pylint: disable=too-many-instance-attributes def __init__(self, config): """Initialize setup""" # must call this before anything else multiprocessing.Process.__init__(self) self.config = config.copy() # Every 900 sec the data is dumped into file self.check_point_time = self.config['check_point'] self.user = os.environ['USER'] self._extra = {} # GPU devices iterator self.gpu_dev_iterator = range(self.config['num_nvidia_gpus']) # Name of the output files self.outfiles = [ os.path.join( self.config['temp_path']['nv_gpu_metrics'], '_'.join([str(self.config['job_id']), self.get_new_host_name(i)]) + '.json', ) for i in self.gpu_dev_iterator ]
[docs] def get_new_host_name(self, gpu_dev_num): """Append GPU number to host name""" return '.'.join([f'gpu-{gpu_dev_num}', self.config['host_name']])
[docs] def initialise_gpu_metrics_params(self): """This method initialises the GPU metric related parameters""" # Initialise dict for each device # gpu_metrics = {} # for dev in range(self.config['num_nvidia_gpus']): # gpu_metrics[f'GPU_{dev}'] = copy.deepcopy(nv_gpu_data_schema) # Need deepcopy for re-init # Schema for the gpu data # List of schemas for each GPU self.gpu_data = [ { 'host_name': self.get_new_host_name(i), 'sampling_frequency': self.config['sampling_freq'], 'time_stamps': [], **copy.deepcopy(nv_gpu_data_schema), # Need deepcopy for re-init, } for i in self.gpu_dev_iterator ]
[docs] def get_misc_metrics(self): """This method gets different misc metrics""" # Get all misc metrics like temperature, fan speed etc self.gpu_data = misc_metrics(self.gpu_data)
[docs] def get_power_metrics(self): """This method gets power metrics""" # Get power usage self.gpu_data = power_usage(self.gpu_data) # Get throttling period due to constraints self.gpu_data = power_violation_report(self.gpu_data)
[docs] def get_utilization_rates(self): """This method gets all utilization statistics""" # Append gpu and memory utilisation to data dict self.gpu_data = get_gpu_mem_util_rates(self.gpu_data) # Append encoder and decoder utilisation to data dict self.gpu_data = get_encoder_decoder_util_rates(self.gpu_data)
[docs] def get_memory_usage(self): """This method gets memory usage""" # Append memory metrics to data dict self.gpu_data = memory_usage(self.gpu_data)
[docs] def get_ecc_metrics(self): """This method gets ECC error counts""" # Get ECC error counts self.gpu_data = ecc_error_counts(self.gpu_data)
[docs] def get_clock_info(self): """This method gets different clock info metrics""" # Get clock info self.gpu_data = clock_info(self.gpu_data)
[docs] def get_metrics_data(self): """Extract metrics data""" # Clock info self.get_clock_info() # ECC counts self.get_ecc_metrics() # Memory usage statistics self.get_memory_usage() # Utilization statistics self.get_utilization_rates() # Energy usage statistics self.get_power_metrics() # Misc statistics self.get_misc_metrics()
[docs] def add_timestamp(self): """This method adds timestamp to the data""" # Get time stamp and convert it to int. We are not looking at fine grained monitoring here time_stamp = int(time.time()) for i in self.gpu_dev_iterator: self.gpu_data[i]['time_stamps'].append(time_stamp)
[docs] def dump_metrics(self): """Dump metrics to JSON file and re-initiate gpu_metrics dict""" for i in self.gpu_dev_iterator: dump_metrics_async(copy.deepcopy(self.gpu_data[i]), self.outfiles[i]) # Re-initialise GPU metric parameters self.initialise_gpu_metrics_params()
[docs] def run(self): """This method extracts the gpu related metrics for a given pid""" _log.info('Collection of GPU metrics has started') sampling_freq = self.config['sampling_freq'] _log.debug('Current sampling frequency is %d', sampling_freq) # Number of steps before writing the data to file # To be implemented in the future check_point_step = int(self.check_point_time / sampling_freq) # Step count i_step = 0 # Get process information self.procs = [psutil.Process(p) for p in self.config['pid']] # Initialise CPU metric parameters self.initialise_gpu_metrics_params() while ( proc_if_running(self.procs) and open(self.config['ipc_file'], 'r').read().rstrip() == 'INPROGRESS' ): try: # Start measuring time taken for getting metrics start_monitor = time.time() # Add current timestamp to the list of timestamps self.add_timestamp() # Get metrics data self.get_metrics_data() # Dump metrics if check point is reached if i_step % check_point_step == 0: self.dump_metrics() # Get total time elapsed to get metrics collection_time = time.time() - start_monitor # Sleep for given sampling frequency before collecting for next round # Here we remove the time taken to collect metrics from sampling frequency to # keep frequency fairly constant try: time.sleep(sampling_freq - collection_time) except ValueError: # when sampling_freq - collection_time < 0 pass i_step += 1 except (psutil.NoSuchProcess, psutil.AccessDenied): for i in self.gpu_dev_iterator: dump_json(self.gpu_data[i], self.outfiles[i]) return # Dump metrics to the disk for i in self.gpu_dev_iterator: dump_json(self.gpu_data[i], self.outfiles[i])