Source code for monitormetrics.getmetadata

"""This file contains the class to extract software and hardware metadata"""

import os
import logging
import time
import sys
import socket
import shutil
import platform

from monitormetrics.utils.utils import execute_cmd, get_parser, write_json
from monitormetrics._version import __version__

# Add env module to python modules
# if 'MODULESHOME' in os.environ.keys():
#     try:
#         sys.path.index(os.environ['MODULESHOME'] + '/init')
#     except ValueError:
#         sys.path.insert(0, os.environ['MODULESHOME'] + '/init')
#     from env_modules_python import module
#     # Redirect the output to stdout and NOT stderr. This avoids printing on terminal
#     os.environ['LMOD_REDIRECT'] = "yes"

_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})


[docs]class GetSWHWMetadata(): """Engine to extract metadata""" def __init__(self, config): """Initialize setup""" self.config = config.copy() self._extra = {} self.data = {} self.outfile = os.path.join(self.config['temp_path']['meta_data'], "_".join([str( self.config['job_id']), self.config['host_name']]) + ".json") # @staticmethod # def parse_lmod_list(out_str): # """Parse the string of module list output to list of modules names""" # # module_list = [] # for line in out_str.splitlines(): # result = re.search(r"^(.*)\) (.*)", line) # try: # module_list.append(result.group(2).rstrip()) # except (AttributeError, IndexError): # pass # # return module_list
[docs] def collect_smd(self): """Collect Software MetaData (SW_MD)""" _log.info("Collecting software specific data") # Command strings to get version information of different softwares cmd_str = {'singularity': "singularity version", 'docker': "docker version --format '{{.Server.Version}}'" } self.data['sw_md'] = { 'python_version': sys.version.split()[0], 'platform': platform.platform(), } # Get version of different softwares. If not installed, return "Not found" for key, val in cmd_str.items(): if shutil.which(key) is not None: self.data['sw_md'][key] = execute_cmd(val) else: self.data['sw_md'][key] = "Not found"
# # Get env module list # # Python module command prints module list. We need to store the stdout into a variable # # and parse the output to get list of loaded modules # save_stdout = sys.stdout # result = io.StringIO() # sys.stdout = result # # List loaded modules # module('list') # sys.stdout = save_stdout # # Parse output and add the module list to meta data dict # self.data['sw_md']['module_list'] = self.parse_lmod_list(result.getvalue())
[docs] @staticmethod def collect_cpu_info(): """Collect all relevant CPU information.""" _log.info("Collecting CPU information.") # Get output from lscpu command cmd_out = execute_cmd("lscpu") # _log.debug("Output of lscpu command is %s", cmd_out) # Get parser function parse_lscpu = get_parser(cmd_out) def conv(func, typ): """Convert to a given type""" try: return typ(func) except ValueError: return func # Different meta data information returned by lspcu command cpu = { 'Architecture': parse_lscpu("Architecture"), 'CPU_Model': parse_lscpu("Model name"), 'CPU_Family': parse_lscpu("CPU family"), 'CPU_num': conv(parse_lscpu(r"CPU\(s\)"), int), 'Online_CPUs_list': parse_lscpu(r"On-line CPU\(s\) list"), 'Threads_per_core': conv(parse_lscpu(r"Thread\(s\) per core"), int), 'Cores_per_socket': conv(parse_lscpu(r"Core\(s\) per socket"), int), 'Sockets': conv(parse_lscpu(r"Socket\(s\)"), int), 'Vendor_ID': parse_lscpu("Vendor ID"), 'Stepping': conv(parse_lscpu("Stepping"), int), 'CPU_MHz': conv(parse_lscpu("CPU MHz"), float), 'CPU_Max_Speed_MHz': conv(parse_lscpu("CPU max MHz"), float), 'CPU_Min_Speed_MHz': conv(parse_lscpu("CPU min MHz"), float), 'BogoMIPS': conv(parse_lscpu("BogoMIPS"), float), 'L1d_cache': parse_lscpu("L1d cache"), 'L1i_cache': parse_lscpu("L1i cache"), 'L2_cache': parse_lscpu("L2 cache"), 'L3_cache': parse_lscpu("L3 cache"), 'NUMA_nodes': conv(parse_lscpu(r"NUMA node\(s\)"), int), } # Populate NUMA nodes try: for i in range(0, int(cpu['NUMA_nodes'])): cpu['NUMA_node{}_CPUs'.format(i)] = parse_lscpu(r"NUMA node{} CPU\(s\)".format(i)) except ValueError: _log.warning('Failed to parse or NUMA nodes not existent.') # Update with additional data cpu.update({ 'Power_Policy': execute_cmd("cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor " "| sort | uniq"), 'Power_Driver': execute_cmd("cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_driver " "| sort | uniq"), 'Microcode': execute_cmd("grep microcode /proc/cpuinfo | uniq | awk 'NR==1{print $3}'"), 'SMT_Enabled?': bool(execute_cmd("cat /sys/devices/system/cpu/smt/active")) }) return cpu
[docs] @staticmethod def collect_memory_info(): """Collect system memory.""" _log.info("Collecting system memory.") # Get memory information from free command mem = { 'Mem_Total': int(execute_cmd("free | awk 'NR==2{print $2}'")), 'Mem_Available': int(execute_cmd("free | awk 'NR==2{print $7}'")), 'Mem_Swap': int(execute_cmd("free | awk 'NR==3{print $2}'")) } return mem
[docs] def collect_hmd(self): """Collect Hardware MetaData (HW_MD).""" _log.info("Collecting hardware specific information.") # caller functions to collect CPU and memory information self.data["hw_md"] = { "cpu": self.collect_cpu_info(), "memory": self.collect_memory_info(), }
[docs] def collect(self): """Collect all metadata.""" _log.info("Collecting the full metadata information.") self._extra['start_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Get name of the host self.data['host_name'] = socket.gethostname() # Collect software metadata self.collect_smd() # Collect hardware metadata self.collect_hmd() # Write the data to the disk in JSON format write_json(self.data, self.outfile) self._extra['end_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())