"""This file does pre-processing steps of the metric data collection"""
import os
import sys
import logging
import socket
import time
import pathlib
import yaml
import psutil
from monitormetrics.utils.utils import execute_cmd, FileLock
from monitormetrics.utils.exceptions import BatchSchedulerNotFound
from monitormetrics._version import __version__
_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})
[docs]class SetUpMonitoring():
"""Set up all the required steps to start monitoring"""
# pylint: disable=too-many-instance-attributes
def __init__(self, config):
"""Initialize setup"""
self.config = config.copy()
[docs] def get_job_id(self):
"""Find the workload manager and job ID. Exit if none found"""
# Check if job ID can be found from SLURM/PBS/OAR
# If no workload manager found, exit
if "SLURM_JOB_ID" in os.environ.keys():
self.config['job_id'] = int(os.environ["SLURM_JOB_ID"])
self.config['scheduler'] = "SLURM"
_log.info("SLURM job with ID %d detected", self.config['job_id'])
elif "PBS_JOBID" in os.environ.keys():
self.config['job_id'] = os.environ["PBS_JOBID"]
self.config['scheduler'] = "PBS"
_log.info("PBS job with ID %s detected", self.config['job_id'])
elif "OAR_JOB_ID" in os.environ.keys():
self.config['job_id'] = int(os.environ["OAR_JOB_ID"])
self.config['scheduler'] = "OAR"
_log.info("OAR job with ID %d detected", self.config['job_id'])
elif "LOCAL_JOB_ID" in os.environ.keys():
self.config['job_id'] = int(os.environ["LOCAL_JOB_ID"])
self.config['scheduler'] = "LOCAL"
_log.info("LOCAL job with ID %d detected", self.config['job_id'])
else:
_log.warning("Workload manager not recognised. Job PIDs will be "
"found by iterating over all user PIDs")
[docs] def prepare_dirs(self):
"""Make directories to save the results"""
# Create a folder to write the metric data
self.config['save_dir'] = os.path.join(
self.config['save_dir'], "-".join(["job", "metrics",
str(self.config['job_id'])])
)
os.makedirs(self.config['save_dir'], exist_ok=True)
# Create a sub directory to save data
self.config['data_dir'] = os.path.join(self.config['save_dir'],
"metrics")
os.makedirs(self.config['data_dir'], exist_ok=True)
if self.config['gen_report']:
# Create a sub directory to save plots
self.config['plot_dir'] = os.path.join(self.config['save_dir'],
"plots")
os.makedirs(self.config['plot_dir'], exist_ok=True)
[docs] def prepare_file_paths(self):
"""Prepare file paths for different result files"""
# Create a file path to save report
self.config['report_path'] = os.path.join(
self.config['save_dir'],
"job-report-{}.pdf".format(self.config['job_id'])
)
# Create a file name for each node by appending job ID and hostname
self.config['temp_path'] = {}
self.config['xlsx_file'] = {}
save_dir_parent = (pathlib.Path(
self.config['save_dir']
).parent.absolute())
for metric in self.config['metrics']:
temp_path = os.path.join(self.config['save_dir'], 'raw_data',
metric)
os.makedirs(temp_path, exist_ok=True)
self.config['temp_path'][metric] = temp_path
if self.config['export_xlsx']:
# File path for excel file
if self.config['prefix'] is not None:
file_name = "{}_{}.xlsx".format(self.config['prefix'],
metric)
else:
file_name = "{}.xlsx".format(metric)
self.config['xlsx_file'][metric] = os.path.join(save_dir_parent,
file_name)
# Perf event file path
self.config['perf_event_file'] = os.path.join(self.config['save_dir'],
".perf_event_list")
if self.config['export_db']:
if self.config['prefix'] is not None:
file_name = "{}.db".format(self.config['prefix'])
else:
file_name = "metrics.db"
self.config['db_file'] = os.path.join(save_dir_parent, file_name)
[docs] def create_node_list(self):
"""Create a list of node names"""
# Get node names
self.config['node_name'] = self.config['host_name']
if self.config['scheduler'] == "SLURM":
# Command to execute
cmd_str = ("sinfo -N --nodes={} --format=\"%N\" | awk '{{if ("
"NR>1)print}}'".format(os.environ['SLURM_JOB_NODELIST']))
elif self.config['scheduler'] == "PBS":
# Command to execute
# PBS nodefile is only available on head node. So other nodes in the reservation cannot
# access this file. The hack is we copy the nodefile to CWD and define a new env
# variable and export it using mpirun
# On head node:
# cp #PBS_NODEFILE nodefile
# mpirun -x PBS_NODEFILE_LOCAL=$PWD/nodefile <snippet>
cmd_str = "uniq $PBS_NODEFILE_LOCAL"
elif self.config['scheduler'] == "OAR":
# Command to execute
cmd_str = "uniq $OAR_NODEFILE"
elif self.config['scheduler'] == "LOCAL":
# Command to execute
cmd_str = "hostname"
else:
_log.error("Workload manager not identified.")
raise BatchSchedulerNotFound("Batch scheduler not found")
# Execute command and parse node name list
self.config['node_name_list'] = execute_cmd(cmd_str).splitlines()
_log.debug("Found current node name is %s and node list is %s" % (
self.config['node_name'], self.config['node_name_list']))
# Sometimes hostname and nodelist have different names, mostly due to adding
# local network name to the hostname. We try to make them same here
if self.config['node_name'] not in self.config['node_name_list']:
# This case is when node_name is my-node-0 and node name in node_name_list is my-node-0.local
check_0 = [s for s in self.config['node_name_list']
if self.config['node_name'] in s]
if check_0:
self.config['node_name'] = check_0[0]
# This case is when node_name is my-node-0.local and node name in node_name_list is my-node-0
check_1 = [s for s in self.config['node_name_list']
if s in self.config['node_name']]
if check_1:
self.config['node_name'] = check_1[0]
# Get master node, ie, first node in the reservation if node names are not equal
self.config['master_node'] = self.config['node_name_list'][0]
_log.debug("Master is %s and current is %s" % (
self.config['master_node'], self.config['node_name'])
)
[docs] def find_launcher_type(self):
"""Find which launcher is used to launch MPI jobs"""
if self.config['launcher'] is None:
# PID of current process
current_pid = os.getpid()
# Get parent of the current process
proc = psutil.Process(current_pid)
ppid = proc.ppid()
# Get name of the parent process
pp_name = psutil.Process(ppid).name()
_log.debug("Current process object is %s and its parent process is %s"
% (proc, psutil.Process(ppid)))
if pp_name in ['mpirun', 'orted']:
self.config['launcher'] = "mpirun"
elif pp_name in ['mpiexec', 'hydra_pmi_proxy']:
self.config['launcher'] = "mpiexec"
elif pp_name in ['slurmstepd', 'psid']: # On JUWELS, psid launches tasks
self.config['launcher'] = "srun"
else:
_log.debug("Launcher type %s specified on CLI" % self.config['launcher'])
[docs] def create_ipc_file(self):
"""Create Inter process communicator (IPC) file. It will be used to signal the toolkit
that main job has ended"""
# Name of the IPC file
self.config['ipc_file'] = "-".join([".ipc", str(self.config['job_id'])])
# Create an ipc file only by the master node
if self.config['master_node'] == self.config['node_name']:
# We create a hidden file with text INPROGRESS to signal that job is running
# Once job is finished there will be line `echo "FINISHED" > .ipc`
with open(self.config['ipc_file'], 'w') as ipc_file:
ipc_file.write("INPROGRESS")
[docs] def create_lock_file(self):
"""Creates a lock file"""
_log.info("Creating a lock file")
# Name of the lock file. Prepending with "." to make it hidden
self.config['lock_file'] = os.path.join(
self.config['save_dir'], ".{}".format(self.config['node_name'])
)
# If lock file is already there from previous runs, purge them
lock_file_name = ".".join([self.config['lock_file'], 'lock'])
if os.path.isfile(lock_file_name):
_log.info("Purging old lock file...")
os.remove(lock_file_name)
self.fl = FileLock(self.config['lock_file'], timeout=10)
[docs] def acquire_lock(self):
"""Acquires lock by locking the created file"""
_log.info("Locking the file")
self.fl.acquire()
[docs] def start(self):
"""Preprocessing steps of data collection"""
# Find Job id
self.get_job_id()
# Make directories to save results
self.prepare_dirs()
# Make file paths for different results
self.prepare_file_paths()
# Create a list of node names
self.create_node_list()
# Create a inter process communicator (IPC) file
self.create_ipc_file()
# Find launcher type mpirun or srun
self.find_launcher_type()
# Creates a lock file. Lock is acquired in pre processing call
self.create_lock_file()
# Acquire lock
self.acquire_lock()
return self.config
[docs] def release_lock(self):
"""Release lock file"""
try:
_log.info("Releasing the lock file")
self.fl.release()
except FileNotFoundError:
_log.warning("Lock file already released")
[docs] def shutdown(self):
"""Finish monitoring"""
# Release lock file. End of monitoring.
self.release_lock()
# Check if computations on all nodes has finished. The way we do it is to check
# if the lock file exists for each host. If it exists, computations on other hosts
# still going on.
# Sometimes the name of the node in SLURM is not exactly same as hostname
if self.config['master_node'] == self.config['node_name']:
_log.info("Checking if all nodes finished the collection")
for node in self.config['node_name_list']:
lock_file = os.path.join(self.config['save_dir'],
".{}".format(node))
fl = FileLock(lock_file)
while fl.lock_exists():
time.sleep(1)