"""This module contains utility functions for gathering CPU metrics"""
import os
import logging
import datetime
import time
import sys
import re
import struct
import subprocess
import json
import errno
import psutil
import fpdf
from monitormetrics.utils.processorspecific import cpu_micro_architecture_name, \
llc_cache_miss_perf_event, perf_event_list
from monitormetrics.utils.exceptions import CommandExecutionFailed, JobPIDNotFoundError, \
ProcessorVendorNotFoundError, KeyNotFoundError
from monitormetrics.utils.cpuid import CPUID
from monitormetrics._version import __version__
_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})
# Infiniband metrics API directory
IB_API_DIR = "/sys/class/infiniband"
# RAPL powercap API directory
RAPL_API_DIR = '/sys/class/powercap/intel-rapl'
# Maximum number of RAPL domains
# cores, gpu, pkg, ram, psys
NUM_RAPL_DOMAINS = 5
[docs]def execute_cmd(cmd_str, handle_exception=True):
"""Accept command string and returns output.
Args:
cmd_str (str): Command string to be executed
handle_exception (bool): Handle exception manually. If set to false, raises an exception
to the caller function
Returns:
str: Output of the command. If command execution fails, returns 'not_available'
Raises:
subprocess.CalledProcessError: An error occurred in execution of command iff
handle_exception is set to False
"""
# _log.debug("Executing command: %s", cmd_str)
try:
# Execute command
cmd_out = subprocess.run(cmd_str, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, check=True)
# Get stdout and stderr. We are piping stderr to stdout as well
cmd_out = cmd_out.stdout.decode('utf-8').rstrip()
except subprocess.CalledProcessError as err:
# If handle_exception is True, return "not_available"
if handle_exception:
cmd_out = "not_available"
else:
# If handle_exception is False, raise an exception
_log.warning("Execution of command %s failed", cmd_str)
raise CommandExecutionFailed("Execution of command \"{}\" command".format(cmd_str)) from err
return cmd_out
[docs]def execute_cmd_pipe(cmd_str):
"""Accept command string and execute it using piping and returns process object.
Args:
cmd_str (str): Command string to be executed
Returns:
object: Process object
"""
proc = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, bufsize=-1, universal_newlines=True)
return proc
[docs]def get_parser(cmd_output, reg="lscpu"):
"""Regex parser.
Args:
cmd_output (str): Output of the executed command
reg (str): Regex pattern to be used
Returns:
Function handle to parse the output
"""
def parser(pattern):
"""Parser function."""
# Different regex for parsing different outputs
if reg == "perf":
exp = r'(?P<Value>[0-9,]*\s*)(?P<Field>{}.*)'.format(pattern)
elif reg == "perf-intvl":
exp = r'(?P<Time>[0-9.]*\s*)(?P<Value>[0-9,><a-zA-Z\s]*\s*)(?P<Field>{}.*)'.format(
pattern)
else:
exp = r'(?P<Field>{}:\s*\s)(?P<Value>.*)'.format(pattern)
# Search pattern in output
result = re.search(exp, cmd_output)
try:
# Get value of the group if found
return result.group('Value')
except AttributeError:
# _log.debug("Parsing = %s | Field = %s | Value = %s", reg,
# pattern, "None")
# If not found, return "not_available"
return "not_available"
return parser
[docs]def find_procs_by_name(name):
"""Return a list of processes matching 'name'
Args:
name (str): name of the process to find
Returns:
list: List of psutil objects
"""
ls = []
for p in psutil.process_iter(['name']):
if p.info['name'] == name:
ls.append(p)
return ls
[docs]def proc_if_running(procs):
"""Check if all processes are running and returns a False if all of them are terminated
Args:
procs (list): List of psutil process objects
Returns:
bool: Running status of the processes
"""
# Get status of all processes
all_proc_stats = [p.is_running() for p in procs]
return any(all_proc_stats)
[docs]def ibstat_ports():
"""This function returns Infiniband ports if present
Returns:
dict: A dict with IB port names and numbers
"""
# A dict with port name as key and port number as value
ports = {}
# Execute ibstat command to get IB ports
ibstat = execute_cmd("ibstat")
if ibstat != 'not_available':
# If command fails, execute_cmd returns not_available
ibstat = ibstat.splitlines()
for index, line in enumerate(ibstat):
line = line.strip()
# Get port name
match = re.match(r"CA '(.*)'", line)
if match:
name = match.group(1)
# Get port number
number = re.match(r"Port ([0-9]+)\:", ibstat[index + 7].strip()).group(1)
state = ibstat[index + 8].split(':')[1].strip()
# Check the state of the port
port_state = re.match(r"Active", state)
if port_state:
ports[name] = number
# If IB drivers are not installed (for example Omnipath), we can directly check the
# IB API directory
if ibstat == 'not_available' and os.path.exists(IB_API_DIR):
for name in os.listdir(IB_API_DIR):
number = os.listdir(os.path.join(IB_API_DIR, name, 'ports'))[0]
ports[name] = str(number)
_log.debug("Infiniband ports detected are %s", ",".join(ports.keys()))
return ports
[docs]def get_rapl_devices():
"""This function gets all the packages, core, uncore and dram device available within RAPL
powercap interface
Returns:
dict: A dict with package names and paths
"""
# Get output from lscpu command
cmd_out = execute_cmd("lscpu")
# Get parser function
parse_lscpu = get_parser(cmd_out)
# Get number of sockets present in the processor
num_sockets = int(parse_lscpu(r"Socket\(s\)"))
# Dict with package and domain names and paths
rapl_domains = {}
# Check available devices for each socket
for i_soc in range(num_sockets):
package_path = os.path.join(RAPL_API_DIR, "intel-rapl:{}".format(i_soc))
if os.path.exists(package_path):
with open(os.path.join(package_path, "name"), "r") as pkg_name:
package_name = pkg_name.readline().rstrip('\n')
package_num = re.search(r"package-([0-9]+)", package_name).group(1)
package_energy_path = os.path.join(package_path, "energy_uj")
rapl_domains[package_name] = package_energy_path
for i_dom in range(NUM_RAPL_DOMAINS):
domain_path = os.path.join(package_path, "intel-rapl:{}:{}".format(i_soc, i_dom))
if os.path.exists(domain_path):
with open(os.path.join(domain_path, "name"), "r") as dom_name:
domain_name = "-".join([dom_name.readline().rstrip('\n'), package_num])
domain_energy_path = os.path.join(domain_path, "energy_uj")
rapl_domains[domain_name] = domain_energy_path
_log.debug("RAPL domains detected are %s", ",".join(rapl_domains.keys()))
return rapl_domains
[docs]def merge_dicts(exst_dict, new_dict):
"""Merge two dicts. old_content is updated with data from new_content
Args:
exst_dict(dict): Existing data in the dict
new_dict(dict): New data to be added to the dict
Returns:
dict: updated exst_dict with contents from new_dict
"""
for key, value in exst_dict.items():
if isinstance(value, list):
value.extend(new_dict[key])
elif isinstance(value, dict):
merge_dicts(exst_dict[key], new_dict[key])
return exst_dict
[docs]def load_json(filename):
"""This function loads json file and return dict
Args:
filename (str): Name of the file to load
Returns:
dict: File contents as dict
"""
with open(filename, "r") as json_file:
content = json.load(json_file)
return content
[docs]def write_json(content, filename):
"""This function writes json content to a file
Args:
content (dict): Dict to write into JSON format
filename (str): Name of the file to load
"""
with open(filename, "w") as json_file:
json.dump(content, json_file, indent=4, sort_keys=True)
[docs]def dump_json(content, filename):
"""This function appends data to an existing json content. It creates a new file if no
existing file found.
Args:
content (dict): Dict to write into JSON format
filename (str): Name of the file to load
"""
if os.path.isfile(filename):
existing_data = load_json(filename)
updated_dict = merge_dicts(existing_data, content)
else:
updated_dict = content
write_json(updated_dict, filename)
[docs]def get_value(input_dict, target):
"""Find the value for a given target in dict
Args:
input_dict (dict): Dict to search for key
target (Any): Key to search
Returns:
list: List of values found in d
"""
val = filter(None, [[b] if a == target else get_value(b, target)
if isinstance(b, dict) else None for a, b in input_dict.items()])
return [i for b in val for i in b]
[docs]def replace_negative(input_list):
"""This function replaces the negative values in numpy array with mean of neighbours. If the
values happen to be at the extremum, it replaces with preceding or succeding elements
Args:
input_list (list): A list with positive and/or negative elements
Returns:
list: A list with just positive elements
"""
len_list = len(input_list)
out_list = []
for ielem, elem in enumerate(input_list):
if elem < 0 and (0 < ielem < len_list - 1):
out_list.append(0.5 * (input_list[ielem - 1] + input_list[ielem + 1]))
elif elem < 0 and ielem == 0:
out_list.append(input_list[ielem + 1])
elif elem < 0 and ielem == len_list - 1:
out_list.append(input_list[ielem - 1])
else:
out_list.append(elem)
return out_list
[docs]def get_cpu_model_names_for_non_x86():
"""
This function tries to extract the vendor, model and cpu architectures
for non x86 machines like IBM POWER, ARM
Returns:
str: Name of the vendor
model name/number of the processor
micro architecture of the processor
"""
# We try to parse lscpu command
cmd_out = execute_cmd('lscpu')
if cmd_out != 'not_available':
parse_lscpu = get_parser(cmd_out)
arch = parse_lscpu("Architecture")
if arch == "ppc64le":
vendor_name = "IBM"
cpu_family = parse_lscpu("Model")
cpu_model = parse_lscpu("Model name")
return vendor_name, cpu_family, cpu_model
else:
_log.error("Error while looking for processor infos")
raise ProcessorVendorNotFoundError
[docs]def get_cpu_vendor(cpu):
"""
This function gets the vendor name from CPUID instruction
Args:
cpu (object): CPUID object
Returns:
str: Name of the vendor
"""
# EAX, EBS, ECX, EDX for level 0 CPUID instruction
_, b, c, d = cpu(0)
return struct.pack("III", b, d, c).decode("utf-8")
[docs]def get_cpu_model_family(cpu):
""""
This function gives CPU model and family ids from CPUID instruction
Args:
cpu (object): CPUID object
Returns:
list: Family and model IDs
"""
# EAX for level 1 CPUID instruction
raw = cpu(1)[0]
# Model id starts at bit 4. More details -> https://en.wikipedia.org/wiki/CPUID
model = (raw & 0xf0) >> 4
# Family id
family = (raw & 0xf00) >> 8
if family in [6, 15]:
model += (raw & 0xf0000) >> 12
if family == 15:
family += (raw & 0xff00000) >> 20
return [family, model]
[docs]def get_cpu_vendor_model_family():
"""
This function gets the name of CPU vendor, family and model parsed from CPUID instruction
Returns:
list: Name of the vendor, CPU family and model ID
"""
# Get output from CPUID instruction
try:
cpu = CPUID()
# Get vendor name
vendor_name = get_cpu_vendor(cpu)
# Get CPU family
cpu_family, cpu_model = get_cpu_model_family(cpu)
except SystemError:
vendor_name, cpu_family, cpu_model = get_cpu_model_names_for_non_x86()
return vendor_name, cpu_family, cpu_model
[docs]def get_mem_bw_event():
"""
This function returns the perf event to get memory bandwidth
Returns:
str: A string to get memory bandwidth for perf stat command
"""
# First get processor vendor and model details
vendor_name, family_id, model_id = get_cpu_vendor_model_family()
try:
# Get micro architecture name
micro_architecture = cpu_micro_architecture_name(vendor_name, model_id, family_id)
# Get perf event
perf_event = llc_cache_miss_perf_event(vendor_name, micro_architecture)
except (ProcessorVendorNotFoundError, KeyNotFoundError):
perf_event = ""
_log.error("Vendor and/or micro architecture not found")
return perf_event
[docs]def get_perf_events():
"""
This function checks the micro architecture type and returns available perf events. Raises an
exception if micro architecture is not implemented
Returns:
dict: Perf events with event name
dict: Derived perf metrics from event counters
Raises:
PerfEventsNotFoundError: An error occurred while looking for perf events
"""
# First get processor vendor and model details
vendor_name, family_id, model_id = get_cpu_vendor_model_family()
# vendor_name = "GenuineIntel"
# family_id = 6
# model_id = 79
try:
# Get micro architecture name
micro_architecture = cpu_micro_architecture_name(vendor_name, model_id,
family_id)
# Get perf events
perf_events, derived_perf_metrics = perf_event_list(micro_architecture)
except (ProcessorVendorNotFoundError, KeyNotFoundError):
perf_events = ""
_log.error("Vendor and/or micro architecture not found")
return perf_events, derived_perf_metrics
[docs]def check_perf_events(perf_events):
"""
This function check if all perf groups are actually working. We will only probe the working
counters during monitoring
Args:
perf_events (dict): A dict of found perf events
Returns:
dict: A dict of working perf events
"""
for group, event_list in perf_events.items():
if group not in ["hardware_events", "software_events"]:
final_event_list = event_list.copy()
cmd_str = "perf stat -e {} sleep 0.1".format(",".join(list(event_list.values())))
cmd_out = execute_cmd(cmd_str)
parse_perf_out = get_parser(cmd_out, reg="perf")
for event in event_list:
try:
_ = int(parse_perf_out(event).rstrip().replace(",", ""))
except ValueError:
final_event_list.pop(event)
perf_events[group] = final_event_list
return perf_events
[docs]class FileLock(object):
""" A file locking mechanism that has context-manager support so
you can use it in a ``with`` statement. This should be relatively cross
compatible as it doesn't rely on ``msvcrt`` or ``fcntl`` for the locking.
"""
[docs] class FileLockException(Exception):
"""Exception to the file lock object"""
pass
def __init__(self, protected_file_path, timeout=None, delay=1,
lock_file_contents=None):
"""
Prepare the file locker. Specify the file to lock and optionally
the maximum timeout and the delay between each attempt to lock.
"""
self.is_locked = False
self.lockfile = protected_file_path + ".lock"
self.timeout = timeout
self.delay = delay
self._lock_file_contents = lock_file_contents
if self._lock_file_contents is None:
self._lock_file_contents = "Owning process args:\n"
for arg in sys.argv:
self._lock_file_contents += arg + "\n"
[docs] def locked(self):
"""
Returns True iff the file is owned by THIS FileLock instance.
(Even if this returns false, the file could be owned by another FileLock instance,
possibly in a different thread or process).
"""
return self.is_locked
[docs] def available(self):
"""
Returns True iff the file is currently available to be locked.
"""
return not os.path.exists(self.lockfile)
[docs] def lock_exists(self):
"""
Returns True iff the external lockfile exists.
"""
return os.path.exists(self.lockfile)
[docs] def acquire(self, blocking=True):
"""
Acquire the lock, if possible. If the lock is in use, and `blocking` is False,
return False. Otherwise, check again every `self.delay` seconds until it either gets the
lock or exceeds `timeout` number of seconds, in which case it raises an exception.
"""
start_time = time.time()
while True:
try:
# Attempt to create the lockfile.
# These flags cause os.open to raise an OSError if the file already exists.
fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
with os.fdopen(fd, "a") as f:
# Print some info about the current process as debug info
# for anyone who bothers to look.
f.write(self._lock_file_contents)
break
except OSError as e:
if e.errno != errno.EEXIST:
raise
if (self.timeout is not None and
(time.time() - start_time) >= self.timeout):
raise FileLock.FileLockException("Timeout occurred.")
if not blocking:
return False
time.sleep(self.delay)
self.is_locked = True
return True
[docs] def release(self):
"""
Get rid of the lock by deleting the lockfile.
When working in a `with` statement, this gets automatically
called at the end.
"""
self.is_locked = False
os.unlink(self.lockfile)
def __enter__(self):
"""
Activated when used in the with statement.
Should automatically acquire a lock to be used in the with block.
"""
self.acquire()
return self
def __exit__(self, type, value, traceback):
"""
Activated at the end of the with statement.
It automatically releases the lock if it isn't locked.
"""
self.release()
def __del__(self):
"""
Make sure this ``FileLock`` instance doesn't leave a .lock file
lying around.
"""
if self.is_locked:
self.release()
[docs] def purge(self):
"""
For debug purposes only. Removes the lock file from the hard disk.
"""
if os.path.exists(self.lockfile):
self.release()
return True
return False
[docs]class PDF(fpdf.FPDF):
"""custom PDF class that inherits from the FPDF"""
def __init__(self, config):
super().__init__()
# A4 size
self.width = 210
self.height = 297
self.start_y = 18
self.eff_height = self.height - self.start_y
self.config = config.copy()
[docs] def page_body(self, images):
"""This method defines body of the pdf"""
# Determine how many plots there are per page and set positions
# and margins accordingly
for im, image in enumerate(images):
if im == 0:
self.image(image, 15, self.start_y, self.width - 30)
else:
self.image(image, 15,
self.start_y +
im * (self.eff_height / len(images)) - 7,
self.width - 30)
[docs] def print_page(self, images):
"""This method add an empty pages and populates with images/text"""
# Generates the report
self.add_page()
self.page_body(images)
if __name__ == '__main__':
ports = ibstat_ports()
print(ports)
domains = get_rapl_devices()
print(domains)
a, b, c = get_cpu_vendor_model_family()
print(a, b, c)
a, b = get_perf_events()
print(a, b)