Source code for monitormetrics.prepostmonitoring.setupmonitoring

"""This file does pre-processing steps of the metric data collection"""

import os
import sys
import logging
import socket
import time
import pathlib
import yaml
import psutil


from monitormetrics.utils.utils import execute_cmd, FileLock
from monitormetrics.utils.exceptions import BatchSchedulerNotFound
from monitormetrics._version import __version__

_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})


[docs]class SetUpMonitoring(): """Set up all the required steps to start monitoring""" # pylint: disable=too-many-instance-attributes def __init__(self, config): """Initialize setup""" self.config = config.copy()
[docs] def get_job_id(self): """Find the workload manager and job ID. Exit if none found""" # Check if job ID can be found from SLURM/PBS/OAR # If no workload manager found, exit if "SLURM_JOB_ID" in os.environ.keys(): self.config['job_id'] = int(os.environ["SLURM_JOB_ID"]) self.config['scheduler'] = "SLURM" _log.info("SLURM job with ID %d detected", self.config['job_id']) elif "PBS_JOBID" in os.environ.keys(): self.config['job_id'] = os.environ["PBS_JOBID"] self.config['scheduler'] = "PBS" _log.info("PBS job with ID %s detected", self.config['job_id']) elif "OAR_JOB_ID" in os.environ.keys(): self.config['job_id'] = int(os.environ["OAR_JOB_ID"]) self.config['scheduler'] = "OAR" _log.info("OAR job with ID %d detected", self.config['job_id']) elif "LOCAL_JOB_ID" in os.environ.keys(): self.config['job_id'] = int(os.environ["LOCAL_JOB_ID"]) self.config['scheduler'] = "LOCAL" _log.info("LOCAL job with ID %d detected", self.config['job_id']) else: _log.warning("Workload manager not recognised. Job PIDs will be " "found by iterating over all user PIDs")
[docs] def prepare_dirs(self): """Make directories to save the results""" # Create a folder to write the metric data self.config['save_dir'] = os.path.join( self.config['save_dir'], "-".join(["job", "metrics", str(self.config['job_id'])]) ) os.makedirs(self.config['save_dir'], exist_ok=True) # Create a sub directory to save data self.config['data_dir'] = os.path.join(self.config['save_dir'], "metrics") os.makedirs(self.config['data_dir'], exist_ok=True) if self.config['gen_report']: # Create a sub directory to save plots self.config['plot_dir'] = os.path.join(self.config['save_dir'], "plots") os.makedirs(self.config['plot_dir'], exist_ok=True)
[docs] def prepare_file_paths(self): """Prepare file paths for different result files""" # Create a file path to save report self.config['report_path'] = os.path.join( self.config['save_dir'], "job-report-{}.pdf".format(self.config['job_id']) ) # Create a file name for each node by appending job ID and hostname self.config['temp_path'] = {} self.config['xlsx_file'] = {} save_dir_parent = (pathlib.Path( self.config['save_dir'] ).parent.absolute()) for metric in self.config['metrics']: temp_path = os.path.join(self.config['save_dir'], 'raw_data', metric) os.makedirs(temp_path, exist_ok=True) self.config['temp_path'][metric] = temp_path if self.config['export_xlsx']: # File path for excel file if self.config['prefix'] is not None: file_name = "{}_{}.xlsx".format(self.config['prefix'], metric) else: file_name = "{}.xlsx".format(metric) self.config['xlsx_file'][metric] = os.path.join(save_dir_parent, file_name) # Perf event file path self.config['perf_event_file'] = os.path.join(self.config['save_dir'], ".perf_event_list") if self.config['export_db']: if self.config['prefix'] is not None: file_name = "{}.db".format(self.config['prefix']) else: file_name = "metrics.db" self.config['db_file'] = os.path.join(save_dir_parent, file_name)
[docs] def create_node_list(self): """Create a list of node names""" # Get node names self.config['node_name'] = self.config['host_name'] if self.config['scheduler'] == "SLURM": # Command to execute cmd_str = ("sinfo -N --nodes={} --format=\"%N\" | awk '{{if (" "NR>1)print}}'".format(os.environ['SLURM_JOB_NODELIST'])) elif self.config['scheduler'] == "PBS": # Command to execute # PBS nodefile is only available on head node. So other nodes in the reservation cannot # access this file. The hack is we copy the nodefile to CWD and define a new env # variable and export it using mpirun # On head node: # cp #PBS_NODEFILE nodefile # mpirun -x PBS_NODEFILE_LOCAL=$PWD/nodefile <snippet> cmd_str = "uniq $PBS_NODEFILE_LOCAL" elif self.config['scheduler'] == "OAR": # Command to execute cmd_str = "uniq $OAR_NODEFILE" elif self.config['scheduler'] == "LOCAL": # Command to execute cmd_str = "hostname" else: _log.error("Workload manager not identified.") raise BatchSchedulerNotFound("Batch scheduler not found") # Execute command and parse node name list self.config['node_name_list'] = execute_cmd(cmd_str).splitlines() _log.debug("Found current node name is %s and node list is %s" % ( self.config['node_name'], self.config['node_name_list'])) # Sometimes hostname and nodelist have different names, mostly due to adding # local network name to the hostname. We try to make them same here if self.config['node_name'] not in self.config['node_name_list']: # This case is when node_name is my-node-0 and node name in node_name_list is my-node-0.local check_0 = [s for s in self.config['node_name_list'] if self.config['node_name'] in s] if check_0: self.config['node_name'] = check_0[0] # This case is when node_name is my-node-0.local and node name in node_name_list is my-node-0 check_1 = [s for s in self.config['node_name_list'] if s in self.config['node_name']] if check_1: self.config['node_name'] = check_1[0] # Get master node, ie, first node in the reservation if node names are not equal self.config['master_node'] = self.config['node_name_list'][0] _log.debug("Master is %s and current is %s" % ( self.config['master_node'], self.config['node_name']) )
[docs] def find_launcher_type(self): """Find which launcher is used to launch MPI jobs""" if self.config['launcher'] is None: # PID of current process current_pid = os.getpid() # Get parent of the current process proc = psutil.Process(current_pid) ppid = proc.ppid() # Get name of the parent process pp_name = psutil.Process(ppid).name() _log.debug("Current process object is %s and its parent process is %s" % (proc, psutil.Process(ppid))) if pp_name in ['mpirun', 'orted']: self.config['launcher'] = "mpirun" elif pp_name in ['mpiexec', 'hydra_pmi_proxy']: self.config['launcher'] = "mpiexec" elif pp_name in ['slurmstepd', 'psid']: # On JUWELS, psid launches tasks self.config['launcher'] = "srun" else: _log.debug("Launcher type %s specified on CLI" % self.config['launcher'])
[docs] def create_ipc_file(self): """Create Inter process communicator (IPC) file. It will be used to signal the toolkit that main job has ended""" # Name of the IPC file self.config['ipc_file'] = "-".join([".ipc", str(self.config['job_id'])]) # Create an ipc file only by the master node if self.config['master_node'] == self.config['node_name']: # We create a hidden file with text INPROGRESS to signal that job is running # Once job is finished there will be line `echo "FINISHED" > .ipc` with open(self.config['ipc_file'], 'w') as ipc_file: ipc_file.write("INPROGRESS")
[docs] def create_lock_file(self): """Creates a lock file""" _log.info("Creating a lock file") # Name of the lock file. Prepending with "." to make it hidden self.config['lock_file'] = os.path.join( self.config['save_dir'], ".{}".format(self.config['node_name']) ) # If lock file is already there from previous runs, purge them lock_file_name = ".".join([self.config['lock_file'], 'lock']) if os.path.isfile(lock_file_name): _log.info("Purging old lock file...") os.remove(lock_file_name) self.fl = FileLock(self.config['lock_file'], timeout=10)
[docs] def acquire_lock(self): """Acquires lock by locking the created file""" _log.info("Locking the file") self.fl.acquire()
[docs] def start(self): """Preprocessing steps of data collection""" # Find Job id self.get_job_id() # Make directories to save results self.prepare_dirs() # Make file paths for different results self.prepare_file_paths() # Create a list of node names self.create_node_list() # Create a inter process communicator (IPC) file self.create_ipc_file() # Find launcher type mpirun or srun self.find_launcher_type() # Creates a lock file. Lock is acquired in pre processing call self.create_lock_file() # Acquire lock self.acquire_lock() return self.config
[docs] def release_lock(self): """Release lock file""" try: _log.info("Releasing the lock file") self.fl.release() except FileNotFoundError: _log.warning("Lock file already released")
[docs] def shutdown(self): """Finish monitoring""" # Release lock file. End of monitoring. self.release_lock() # Check if computations on all nodes has finished. The way we do it is to check # if the lock file exists for each host. If it exists, computations on other hosts # still going on. # Sometimes the name of the node in SLURM is not exactly same as hostname if self.config['master_node'] == self.config['node_name']: _log.info("Checking if all nodes finished the collection") for node in self.config['node_name_list']: lock_file = os.path.join(self.config['save_dir'], ".{}".format(node)) fl = FileLock(lock_file) while fl.lock_exists(): time.sleep(1)