"""This file contains base class to monitor GPU metrics"""
import os
import logging
import time
import copy
import multiprocessing
import psutil
from perfmon.schemas.metrics import nv_gpu_data_schema
from perfmon.core.metrics.gpumetrics.nvidia.memory import memory_usage
from perfmon.core.metrics.gpumetrics.nvidia.utilization import get_gpu_mem_util_rates
from perfmon.core.metrics.gpumetrics.nvidia.utilization import get_encoder_decoder_util_rates
from perfmon.core.metrics.gpumetrics.nvidia.errors import ecc_error_counts
from perfmon.core.metrics.gpumetrics.nvidia.clock import clock_info
from perfmon.core.metrics.gpumetrics.nvidia.power import power_usage
from perfmon.core.metrics.gpumetrics.nvidia.power import power_violation_report
from perfmon.core.metrics.gpumetrics.nvidia.misc import misc_metrics
from perfmon.core.metrics.common import dump_metrics_async
from perfmon.common.utils.json_wrappers import dump_json
from perfmon.common.utils.process import proc_if_running
_log = logging.getLogger(__name__)
# pylint: disable=E0401,W0201,C0301
[docs]class MonitorNvidiaGpuMetrics(multiprocessing.Process):
"""Engine to monitor gpu related metrics"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config):
"""Initialize setup"""
# must call this before anything else
multiprocessing.Process.__init__(self)
self.config = config.copy()
# Every 900 sec the data is dumped into file
self.check_point_time = self.config['check_point']
self.user = os.environ['USER']
self._extra = {}
# GPU devices iterator
self.gpu_dev_iterator = range(self.config['num_nvidia_gpus'])
# Name of the output files
self.outfiles = [
os.path.join(
self.config['temp_path']['nv_gpu_metrics'],
'_'.join([str(self.config['job_id']), self.get_new_host_name(i)]) + '.json',
)
for i in self.gpu_dev_iterator
]
[docs] def get_new_host_name(self, gpu_dev_num):
"""Append GPU number to host name"""
return '.'.join([f'gpu-{gpu_dev_num}', self.config['host_name']])
[docs] def initialise_gpu_metrics_params(self):
"""This method initialises the GPU metric related parameters"""
# Initialise dict for each device
# gpu_metrics = {}
# for dev in range(self.config['num_nvidia_gpus']):
# gpu_metrics[f'GPU_{dev}'] = copy.deepcopy(nv_gpu_data_schema) # Need deepcopy for re-init
# Schema for the gpu data
# List of schemas for each GPU
self.gpu_data = [
{
'host_name': self.get_new_host_name(i),
'sampling_frequency': self.config['sampling_freq'],
'time_stamps': [],
**copy.deepcopy(nv_gpu_data_schema), # Need deepcopy for re-init,
}
for i in self.gpu_dev_iterator
]
[docs] def get_misc_metrics(self):
"""This method gets different misc metrics"""
# Get all misc metrics like temperature, fan speed etc
self.gpu_data = misc_metrics(self.gpu_data)
[docs] def get_power_metrics(self):
"""This method gets power metrics"""
# Get power usage
self.gpu_data = power_usage(self.gpu_data)
# Get throttling period due to constraints
self.gpu_data = power_violation_report(self.gpu_data)
[docs] def get_utilization_rates(self):
"""This method gets all utilization statistics"""
# Append gpu and memory utilisation to data dict
self.gpu_data = get_gpu_mem_util_rates(self.gpu_data)
# Append encoder and decoder utilisation to data dict
self.gpu_data = get_encoder_decoder_util_rates(self.gpu_data)
[docs] def get_memory_usage(self):
"""This method gets memory usage"""
# Append memory metrics to data dict
self.gpu_data = memory_usage(self.gpu_data)
[docs] def get_ecc_metrics(self):
"""This method gets ECC error counts"""
# Get ECC error counts
self.gpu_data = ecc_error_counts(self.gpu_data)
[docs] def get_clock_info(self):
"""This method gets different clock info metrics"""
# Get clock info
self.gpu_data = clock_info(self.gpu_data)
[docs] def get_metrics_data(self):
"""Extract metrics data"""
# Clock info
self.get_clock_info()
# ECC counts
self.get_ecc_metrics()
# Memory usage statistics
self.get_memory_usage()
# Utilization statistics
self.get_utilization_rates()
# Energy usage statistics
self.get_power_metrics()
# Misc statistics
self.get_misc_metrics()
[docs] def add_timestamp(self):
"""This method adds timestamp to the data"""
# Get time stamp and convert it to int. We are not looking at fine grained monitoring here
time_stamp = int(time.time())
for i in self.gpu_dev_iterator:
self.gpu_data[i]['time_stamps'].append(time_stamp)
[docs] def dump_metrics(self):
"""Dump metrics to JSON file and re-initiate gpu_metrics dict"""
for i in self.gpu_dev_iterator:
dump_metrics_async(copy.deepcopy(self.gpu_data[i]), self.outfiles[i])
# Re-initialise GPU metric parameters
self.initialise_gpu_metrics_params()
[docs] def run(self):
"""This method extracts the gpu related metrics for a given pid"""
_log.info('Collection of GPU metrics has started')
sampling_freq = self.config['sampling_freq']
_log.debug('Current sampling frequency is %d', sampling_freq)
# Number of steps before writing the data to file
# To be implemented in the future
check_point_step = int(self.check_point_time / sampling_freq)
# Step count
i_step = 0
# Get process information
self.procs = [psutil.Process(p) for p in self.config['pid']]
# Initialise CPU metric parameters
self.initialise_gpu_metrics_params()
while (
proc_if_running(self.procs)
and open(self.config['ipc_file'], 'r').read().rstrip() == 'INPROGRESS'
):
try:
# Start measuring time taken for getting metrics
start_monitor = time.time()
# Add current timestamp to the list of timestamps
self.add_timestamp()
# Get metrics data
self.get_metrics_data()
# Dump metrics if check point is reached
if i_step % check_point_step == 0:
self.dump_metrics()
# Get total time elapsed to get metrics
collection_time = time.time() - start_monitor
# Sleep for given sampling frequency before collecting for next round
# Here we remove the time taken to collect metrics from sampling frequency to
# keep frequency fairly constant
try:
time.sleep(sampling_freq - collection_time)
except ValueError: # when sampling_freq - collection_time < 0
pass
i_step += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
for i in self.gpu_dev_iterator:
dump_json(self.gpu_data[i], self.outfiles[i])
return
# Dump metrics to the disk
for i in self.gpu_dev_iterator:
dump_json(self.gpu_data[i], self.outfiles[i])