Source code for monitormetrics.utils.utils

"""This module contains utility functions for gathering CPU metrics"""

import os
import logging
import datetime
import time
import sys
import re
import struct
import subprocess
import json
import errno
import psutil
import fpdf

from monitormetrics.utils.processorspecific import cpu_micro_architecture_name, \
    llc_cache_miss_perf_event, perf_event_list
from monitormetrics.utils.exceptions import CommandExecutionFailed, JobPIDNotFoundError, \
    ProcessorVendorNotFoundError, KeyNotFoundError
from monitormetrics.utils.cpuid import CPUID
from monitormetrics._version import __version__

_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})

# Infiniband metrics API directory
IB_API_DIR = "/sys/class/infiniband"

# RAPL powercap API directory
RAPL_API_DIR = '/sys/class/powercap/intel-rapl'

# Maximum number of RAPL domains
# cores, gpu, pkg, ram, psys
NUM_RAPL_DOMAINS = 5


[docs]def execute_cmd(cmd_str, handle_exception=True): """Accept command string and returns output. Args: cmd_str (str): Command string to be executed handle_exception (bool): Handle exception manually. If set to false, raises an exception to the caller function Returns: str: Output of the command. If command execution fails, returns 'not_available' Raises: subprocess.CalledProcessError: An error occurred in execution of command iff handle_exception is set to False """ # _log.debug("Executing command: %s", cmd_str) try: # Execute command cmd_out = subprocess.run(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True) # Get stdout and stderr. We are piping stderr to stdout as well cmd_out = cmd_out.stdout.decode('utf-8').rstrip() except subprocess.CalledProcessError as err: # If handle_exception is True, return "not_available" if handle_exception: cmd_out = "not_available" else: # If handle_exception is False, raise an exception _log.warning("Execution of command %s failed", cmd_str) raise CommandExecutionFailed("Execution of command \"{}\" command".format(cmd_str)) from err return cmd_out
[docs]def execute_cmd_pipe(cmd_str): """Accept command string and execute it using piping and returns process object. Args: cmd_str (str): Command string to be executed Returns: object: Process object """ proc = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=-1, universal_newlines=True) return proc
[docs]def get_parser(cmd_output, reg="lscpu"): """Regex parser. Args: cmd_output (str): Output of the executed command reg (str): Regex pattern to be used Returns: Function handle to parse the output """ def parser(pattern): """Parser function.""" # Different regex for parsing different outputs if reg == "perf": exp = r'(?P<Value>[0-9,]*\s*)(?P<Field>{}.*)'.format(pattern) elif reg == "perf-intvl": exp = r'(?P<Time>[0-9.]*\s*)(?P<Value>[0-9,><a-zA-Z\s]*\s*)(?P<Field>{}.*)'.format( pattern) else: exp = r'(?P<Field>{}:\s*\s)(?P<Value>.*)'.format(pattern) # Search pattern in output result = re.search(exp, cmd_output) try: # Get value of the group if found return result.group('Value') except AttributeError: # _log.debug("Parsing = %s | Field = %s | Value = %s", reg, # pattern, "None") # If not found, return "not_available" return "not_available" return parser
[docs]def find_procs_by_name(name): """Return a list of processes matching 'name' Args: name (str): name of the process to find Returns: list: List of psutil objects """ ls = [] for p in psutil.process_iter(['name']): if p.info['name'] == name: ls.append(p) return ls
[docs]def proc_if_running(procs): """Check if all processes are running and returns a False if all of them are terminated Args: procs (list): List of psutil process objects Returns: bool: Running status of the processes """ # Get status of all processes all_proc_stats = [p.is_running() for p in procs] return any(all_proc_stats)
[docs]def ibstat_ports(): """This function returns Infiniband ports if present Returns: dict: A dict with IB port names and numbers """ # A dict with port name as key and port number as value ports = {} # Execute ibstat command to get IB ports ibstat = execute_cmd("ibstat") if ibstat != 'not_available': # If command fails, execute_cmd returns not_available ibstat = ibstat.splitlines() for index, line in enumerate(ibstat): line = line.strip() # Get port name match = re.match(r"CA '(.*)'", line) if match: name = match.group(1) # Get port number number = re.match(r"Port ([0-9]+)\:", ibstat[index + 7].strip()).group(1) state = ibstat[index + 8].split(':')[1].strip() # Check the state of the port port_state = re.match(r"Active", state) if port_state: ports[name] = number # If IB drivers are not installed (for example Omnipath), we can directly check the # IB API directory if ibstat == 'not_available' and os.path.exists(IB_API_DIR): for name in os.listdir(IB_API_DIR): number = os.listdir(os.path.join(IB_API_DIR, name, 'ports'))[0] ports[name] = str(number) _log.debug("Infiniband ports detected are %s", ",".join(ports.keys())) return ports
[docs]def get_rapl_devices(): """This function gets all the packages, core, uncore and dram device available within RAPL powercap interface Returns: dict: A dict with package names and paths """ # Get output from lscpu command cmd_out = execute_cmd("lscpu") # Get parser function parse_lscpu = get_parser(cmd_out) # Get number of sockets present in the processor num_sockets = int(parse_lscpu(r"Socket\(s\)")) # Dict with package and domain names and paths rapl_domains = {} # Check available devices for each socket for i_soc in range(num_sockets): package_path = os.path.join(RAPL_API_DIR, "intel-rapl:{}".format(i_soc)) if os.path.exists(package_path): with open(os.path.join(package_path, "name"), "r") as pkg_name: package_name = pkg_name.readline().rstrip('\n') package_num = re.search(r"package-([0-9]+)", package_name).group(1) package_energy_path = os.path.join(package_path, "energy_uj") rapl_domains[package_name] = package_energy_path for i_dom in range(NUM_RAPL_DOMAINS): domain_path = os.path.join(package_path, "intel-rapl:{}:{}".format(i_soc, i_dom)) if os.path.exists(domain_path): with open(os.path.join(domain_path, "name"), "r") as dom_name: domain_name = "-".join([dom_name.readline().rstrip('\n'), package_num]) domain_energy_path = os.path.join(domain_path, "energy_uj") rapl_domains[domain_name] = domain_energy_path _log.debug("RAPL domains detected are %s", ",".join(rapl_domains.keys())) return rapl_domains
[docs]def merge_dicts(exst_dict, new_dict): """Merge two dicts. old_content is updated with data from new_content Args: exst_dict(dict): Existing data in the dict new_dict(dict): New data to be added to the dict Returns: dict: updated exst_dict with contents from new_dict """ for key, value in exst_dict.items(): if isinstance(value, list): value.extend(new_dict[key]) elif isinstance(value, dict): merge_dicts(exst_dict[key], new_dict[key]) return exst_dict
[docs]def load_json(filename): """This function loads json file and return dict Args: filename (str): Name of the file to load Returns: dict: File contents as dict """ with open(filename, "r") as json_file: content = json.load(json_file) return content
[docs]def write_json(content, filename): """This function writes json content to a file Args: content (dict): Dict to write into JSON format filename (str): Name of the file to load """ with open(filename, "w") as json_file: json.dump(content, json_file, indent=4, sort_keys=True)
[docs]def dump_json(content, filename): """This function appends data to an existing json content. It creates a new file if no existing file found. Args: content (dict): Dict to write into JSON format filename (str): Name of the file to load """ if os.path.isfile(filename): existing_data = load_json(filename) updated_dict = merge_dicts(existing_data, content) else: updated_dict = content write_json(updated_dict, filename)
[docs]def get_value(input_dict, target): """Find the value for a given target in dict Args: input_dict (dict): Dict to search for key target (Any): Key to search Returns: list: List of values found in d """ val = filter(None, [[b] if a == target else get_value(b, target) if isinstance(b, dict) else None for a, b in input_dict.items()]) return [i for b in val for i in b]
[docs]def replace_negative(input_list): """This function replaces the negative values in numpy array with mean of neighbours. If the values happen to be at the extremum, it replaces with preceding or succeding elements Args: input_list (list): A list with positive and/or negative elements Returns: list: A list with just positive elements """ len_list = len(input_list) out_list = [] for ielem, elem in enumerate(input_list): if elem < 0 and (0 < ielem < len_list - 1): out_list.append(0.5 * (input_list[ielem - 1] + input_list[ielem + 1])) elif elem < 0 and ielem == 0: out_list.append(input_list[ielem + 1]) elif elem < 0 and ielem == len_list - 1: out_list.append(input_list[ielem - 1]) else: out_list.append(elem) return out_list
[docs]def get_cpu_model_names_for_non_x86(): """ This function tries to extract the vendor, model and cpu architectures for non x86 machines like IBM POWER, ARM Returns: str: Name of the vendor model name/number of the processor micro architecture of the processor """ # We try to parse lscpu command cmd_out = execute_cmd('lscpu') if cmd_out != 'not_available': parse_lscpu = get_parser(cmd_out) arch = parse_lscpu("Architecture") if arch == "ppc64le": vendor_name = "IBM" cpu_family = parse_lscpu("Model") cpu_model = parse_lscpu("Model name") return vendor_name, cpu_family, cpu_model else: _log.error("Error while looking for processor infos") raise ProcessorVendorNotFoundError
[docs]def get_cpu_vendor(cpu): """ This function gets the vendor name from CPUID instruction Args: cpu (object): CPUID object Returns: str: Name of the vendor """ # EAX, EBS, ECX, EDX for level 0 CPUID instruction _, b, c, d = cpu(0) return struct.pack("III", b, d, c).decode("utf-8")
[docs]def get_cpu_model_family(cpu): """" This function gives CPU model and family ids from CPUID instruction Args: cpu (object): CPUID object Returns: list: Family and model IDs """ # EAX for level 1 CPUID instruction raw = cpu(1)[0] # Model id starts at bit 4. More details -> https://en.wikipedia.org/wiki/CPUID model = (raw & 0xf0) >> 4 # Family id family = (raw & 0xf00) >> 8 if family in [6, 15]: model += (raw & 0xf0000) >> 12 if family == 15: family += (raw & 0xff00000) >> 20 return [family, model]
[docs]def get_cpu_vendor_model_family(): """ This function gets the name of CPU vendor, family and model parsed from CPUID instruction Returns: list: Name of the vendor, CPU family and model ID """ # Get output from CPUID instruction try: cpu = CPUID() # Get vendor name vendor_name = get_cpu_vendor(cpu) # Get CPU family cpu_family, cpu_model = get_cpu_model_family(cpu) except SystemError: vendor_name, cpu_family, cpu_model = get_cpu_model_names_for_non_x86() return vendor_name, cpu_family, cpu_model
[docs]def get_mem_bw_event(): """ This function returns the perf event to get memory bandwidth Returns: str: A string to get memory bandwidth for perf stat command """ # First get processor vendor and model details vendor_name, family_id, model_id = get_cpu_vendor_model_family() try: # Get micro architecture name micro_architecture = cpu_micro_architecture_name(vendor_name, model_id, family_id) # Get perf event perf_event = llc_cache_miss_perf_event(vendor_name, micro_architecture) except (ProcessorVendorNotFoundError, KeyNotFoundError): perf_event = "" _log.error("Vendor and/or micro architecture not found") return perf_event
[docs]def get_perf_events(): """ This function checks the micro architecture type and returns available perf events. Raises an exception if micro architecture is not implemented Returns: dict: Perf events with event name dict: Derived perf metrics from event counters Raises: PerfEventsNotFoundError: An error occurred while looking for perf events """ # First get processor vendor and model details vendor_name, family_id, model_id = get_cpu_vendor_model_family() # vendor_name = "GenuineIntel" # family_id = 6 # model_id = 79 try: # Get micro architecture name micro_architecture = cpu_micro_architecture_name(vendor_name, model_id, family_id) # Get perf events perf_events, derived_perf_metrics = perf_event_list(micro_architecture) except (ProcessorVendorNotFoundError, KeyNotFoundError): perf_events = "" _log.error("Vendor and/or micro architecture not found") return perf_events, derived_perf_metrics
[docs]def check_perf_events(perf_events): """ This function check if all perf groups are actually working. We will only probe the working counters during monitoring Args: perf_events (dict): A dict of found perf events Returns: dict: A dict of working perf events """ for group, event_list in perf_events.items(): if group not in ["hardware_events", "software_events"]: final_event_list = event_list.copy() cmd_str = "perf stat -e {} sleep 0.1".format(",".join(list(event_list.values()))) cmd_out = execute_cmd(cmd_str) parse_perf_out = get_parser(cmd_out, reg="perf") for event in event_list: try: _ = int(parse_perf_out(event).rstrip().replace(",", "")) except ValueError: final_event_list.pop(event) perf_events[group] = final_event_list return perf_events
[docs]class FileLock(object): """ A file locking mechanism that has context-manager support so you can use it in a ``with`` statement. This should be relatively cross compatible as it doesn't rely on ``msvcrt`` or ``fcntl`` for the locking. """
[docs] class FileLockException(Exception): """Exception to the file lock object""" pass
def __init__(self, protected_file_path, timeout=None, delay=1, lock_file_contents=None): """ Prepare the file locker. Specify the file to lock and optionally the maximum timeout and the delay between each attempt to lock. """ self.is_locked = False self.lockfile = protected_file_path + ".lock" self.timeout = timeout self.delay = delay self._lock_file_contents = lock_file_contents if self._lock_file_contents is None: self._lock_file_contents = "Owning process args:\n" for arg in sys.argv: self._lock_file_contents += arg + "\n"
[docs] def locked(self): """ Returns True iff the file is owned by THIS FileLock instance. (Even if this returns false, the file could be owned by another FileLock instance, possibly in a different thread or process). """ return self.is_locked
[docs] def available(self): """ Returns True iff the file is currently available to be locked. """ return not os.path.exists(self.lockfile)
[docs] def lock_exists(self): """ Returns True iff the external lockfile exists. """ return os.path.exists(self.lockfile)
[docs] def acquire(self, blocking=True): """ Acquire the lock, if possible. If the lock is in use, and `blocking` is False, return False. Otherwise, check again every `self.delay` seconds until it either gets the lock or exceeds `timeout` number of seconds, in which case it raises an exception. """ start_time = time.time() while True: try: # Attempt to create the lockfile. # These flags cause os.open to raise an OSError if the file already exists. fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) with os.fdopen(fd, "a") as f: # Print some info about the current process as debug info # for anyone who bothers to look. f.write(self._lock_file_contents) break except OSError as e: if e.errno != errno.EEXIST: raise if (self.timeout is not None and (time.time() - start_time) >= self.timeout): raise FileLock.FileLockException("Timeout occurred.") if not blocking: return False time.sleep(self.delay) self.is_locked = True return True
[docs] def release(self): """ Get rid of the lock by deleting the lockfile. When working in a `with` statement, this gets automatically called at the end. """ self.is_locked = False os.unlink(self.lockfile)
def __enter__(self): """ Activated when used in the with statement. Should automatically acquire a lock to be used in the with block. """ self.acquire() return self def __exit__(self, type, value, traceback): """ Activated at the end of the with statement. It automatically releases the lock if it isn't locked. """ self.release() def __del__(self): """ Make sure this ``FileLock`` instance doesn't leave a .lock file lying around. """ if self.is_locked: self.release()
[docs] def purge(self): """ For debug purposes only. Removes the lock file from the hard disk. """ if os.path.exists(self.lockfile): self.release() return True return False
[docs]class PDF(fpdf.FPDF): """custom PDF class that inherits from the FPDF""" def __init__(self, config): super().__init__() # A4 size self.width = 210 self.height = 297 self.start_y = 18 self.eff_height = self.height - self.start_y self.config = config.copy()
[docs] def header(self): """This method defines header of the pdf""" self.set_font('Times', 'B', 10) self.cell(0, 5, "Job ID: {:<30} User: {:<30} # Nodes: " "{:<30} Date: {:<30}". format(self.config['job_id'], self.config['user'], self.config['num_nodes'], datetime.datetime.now().strftime("%m/%d/%Y %H:%M:%S")), 1, 0, 'C') self.ln(20)
[docs] def footer(self): """This method defines footer of the pdf""" # Page numbers in the footer self.set_y(-15) self.set_font('Times', 'I', 8) self.set_text_color(128) self.cell(0, 10, 'Page ' + str(self.page_no()), 0, 0, 'C')
[docs] def page_body(self, images): """This method defines body of the pdf""" # Determine how many plots there are per page and set positions # and margins accordingly for im, image in enumerate(images): if im == 0: self.image(image, 15, self.start_y, self.width - 30) else: self.image(image, 15, self.start_y + im * (self.eff_height / len(images)) - 7, self.width - 30)
[docs] def print_page(self, images): """This method add an empty pages and populates with images/text""" # Generates the report self.add_page() self.page_body(images)
if __name__ == '__main__': ports = ibstat_ports() print(ports) domains = get_rapl_devices() print(domains) a, b, c = get_cpu_vendor_model_family() print(a, b, c) a, b = get_perf_events() print(a, b)