Source code for modules.utils

"""Utility functions for SDP benchmark tests"""

import os
import datetime
import re
import random
import glob
import math
import subprocess
import shlex
from pathlib import Path

import numpy as np
import yaml
import pandas as pd

import reframe.utility.osext as osext  # pylint: disable=import-error
import reframe.core.runtime as runtime  # pylint: disable=import-error

from reframe_config import site_configuration

# pylint: disable=C0301

[docs]class GenerateHplConfig: """Class to generate HPL.dat file for LINPACK benchmark""" def __init__(self, num_nodes, num_procs, nb, mem, alpha=0.8): """ The class takes following arguments :param num_nodes: Number of nodes :type num_nodes: int :param num_procs: Number of MPI processes :type num_procs: int :param nb: NB value for HPL based on architecture :type nb: int :param mem: Total available memory on the node :type mem: str :param alpha: Fraction of memory to be used :type alpha: float """ self.num_nodes = num_nodes self.num_procs = num_procs self.nb = nb # If the systems have huge memory, the generated problem size will be huge as well. # To keep problem size "within " limits we cap the system memory to 200 GB self.mem = min(int(mem), 2e11) self.alpha = alpha
[docs] def estimate_n(self): """Estimate N based on memory and alpha""" base_n = self.alpha * math.sqrt(self.mem * self.num_nodes / 8) # N must be multiple of Nb return math.floor(base_n / self.nb) * self.nb
[docs] def estimate_pq(self): """Get best combination of P and Q based on nprocs""" sqrt_nproc = int(math.sqrt(self.num_procs)) factors = [] for num in range(2, sqrt_nproc+1): if (self.num_procs % num) == 0: factors.append(num) if len(factors) == 0: factors.append(1) diff = 0 keep = 0 for f in factors: if diff == 0: diff = self.num_procs - f if keep == 0: keep = f tmpDiff = self.num_procs - f if tmpDiff < diff: diff = tmpDiff keep = f return keep, int(self.num_procs / keep)
[docs] def get_config(self): """Write HPL.dat file to outfile location""" n = self.estimate_n() p, q = self.estimate_pq() contents = '' contents += 'HPLinpack benchmark input file\n' contents += 'Innovative Computing Laboratory, University of Tennessee\n' contents += 'HPL.out output file name (if any) \n' contents += '6 device out (6=stdout,7=stderr,file)\n' contents += '1 # of problems sizes (N)\n' contents += str(n) + ' Ns\n' contents += '1 # of NBs\n' contents += str(self.nb) + ' NBs\n' contents += '0 PMAP process mapping (0=Row-,1=Column-major)\n' contents += '1 # of process grids (P x Q)\n' contents += str(p) + ' Ps\n' contents += str(q) + ' Qs\n' contents += '16.0 threshold\n' contents += '1 # of panel fact\n' contents += '2 PFACTs (0=left, 1=Crout, 2=Right)\n' contents += '1 # of recursive stopping criterium\n' contents += '4 NBMINs (>= 1)\n' contents += '1 # of panels in recursion\n' contents += '2 NDIVs\n' contents += '1 # of recursive panel fact.\n' contents += '1 RFACTs (0=left, 1=Crout, 2=Right)\n' contents += '1 # of broadcast\n' contents += '1 BCASTs (0=1rg,1=1rM,2=2rg,3=2rM,4=Lng,5=LnM)\n' contents += '1 # of lookahead depth\n' contents += '1 DEPTHs (>=0)\n' contents += '2 SWAP (0=bin-exch,1=long,2=mix)\n' contents += '64 swapping threshold\n' contents += '0 L1 in (0=transposed,1=no-transposed) form\n' contents += '0 U in (0=transposed,1=no-transposed) form\n' contents += '1 Equilibration (0=no,1=yes)\n' contents += '8 memory alignment in double (> 0)\n' contents += '##### This line (no. 32) is ignored (it serves as a separator). ######\n' contents += '0 Number of additional problem sizes for PTRANS\n' contents += '1200 10000 30000 values of N\n' contents += '0 number of additional blocking sizes for PTRANS\n' contents += '40 9 8 13 13 20 16 32 64 values of NB\n' return contents
[docs]def sdp_benchmark_tests_root(): """ Returns the root directory of SKA SDP Benchmark tests :returns: Path of the root directory :rtype: str """ return Path(__file__).parent.parent
[docs]def parse_path_metadata(path): """ Return a dict of reframe info from a results path :param path: ReFrame stage/output path :returns: ReFrame system/partition info :rtype: dict """ parts = path.split(os.path.sep) COMPONENTS = ('sysname', 'partition', 'environ', 'testname', 'filename') info = dict(zip(COMPONENTS, parts[-5:])) info['path'] = path return info
[docs]def find_perf_logs(root, benchmark): """ Get perflog file names for given test :param root: Root where perflogs exist :param benchmark: Name of the benchmark :returns: List of perflog file names :rtype: list """ perflogs = [] for dir in glob.glob(os.path.join(root, '*/*/*/*/')): rel_path = os.path.relpath(dir, root) info = parse_path_metadata(rel_path) test = info['testname'] if re.match(benchmark, test): perflogs.append(os.path.join(dir, test + '.log')) return perflogs
[docs]def read_perflog(path): """ Return a pandas dataframe from a ReFrame performance log. NB: This currently depends on having a non-default handlers_perflog.filelog.format in reframe's configuration. See code. The returned dataframe will have columns for: - all keys returned by `parse_path_metadata()` - all fields in a performance log record, noting that: - 'completion_time' is converted to a `datetime.datetime` - 'tags' is split on commas into a list of strs - 'perf_var' and 'perf_value', derived from 'perf_info' field - <key> for any tags of the format "<key>=<value>", with values converted to int or float if possible :param path: Path to log file :type path: str :returns: Dataframe of perflogs :rtype: pandas.DataFrame """ # NB: # b/c perflog prefix is '%(check_system)s/%(check_partition)s/%(check_environ)s/%(check_name)s' # we know that this is unique for this combo - as it was for results records = [] meta = parse_path_metadata(path) with open(path) as f: try: for line in f: # turn the line into a dict so we can access it: line = line.strip() LOG_FIELDS = ('completion_time,reframe,info,jobid,perf_data,' 'perf_unit,perf_ref,tags'.split(',')) record = meta.copy() fields = dict(zip(LOG_FIELDS, line.split('|'))) record.update(fields) # allows this to override metadata # process values: perf_var, perf_value = record['perf_data'].split('=') record['perf_var'] = perf_var try: record['perf_value'] = float(perf_value) except ValueError: record['perf_value'] = perf_value record['completion_time'] = datetime.datetime.fromisoformat( record['completion_time'] ) # original: "jobid=2378" record['jobid'] = record['jobid'].split('=')[-1] tags = record['tags'].split(',') for tag in tags: if '=' in tag: k, v = tag.split('=') for conv in (int, float): try: v = conv(v) except ValueError: pass else: break record[k] = v record['tags'] = tags records.append(record) except Exception as e: e.args = (e.args[0] + ': during processing %s' % path,) + e.args[1:] raise return pd.DataFrame.from_records(records)
[docs]def load_perf_logs(root='.', test=None, extras=None, last=False, aggregate_multi_runs=np.median): """ Convenience wrapper around read_perflog(). :param root: Path to root of tree containing perf logs :type root: str :param test: Shell-style glob pattern matched against last directory component to restrict loaded logs, or None to load all in tree :type test: str :param extras: Additional dataframe headers to add :type extras: list :param last: True to only return the most-recent record for each system/partition/enviroment/testname/perf_var combination. :type last: bool :param aggregate_multi_runs: How to aggregate the perf-values of multiple runs. If None, no aggregation is applied. Defaults to np.median :type aggregate_multi_runs: Callable :returns: Single pandas.dataframe concatenated from all loaded logs, or None if no logs exist :rtype: pandas.DataFrame """ perf_logs = find_perf_logs(root, test) perf_records = [] for path in perf_logs: if os.path.isfile(path): records = read_perflog(path) perf_records.append(records) if len(perf_records) == 0: return None perf_records = pd.concat(perf_records).reset_index(drop=True) # If we want to aggregate the multiple runs, we group everything according to the system, partition, # environment and perf_var together with the multi-run unique "unique_key" # We then drop the duplicated entries and reset the index. if aggregate_multi_runs is not None and 'unique_key' in perf_records.columns: columns_to_group_by = ['sysname', 'partition', 'environ', 'perf_var', *[t.split('=')[0] for t in perf_records.iloc[0].tags]] perf_records = perf_records.groupby(columns_to_group_by, as_index=False).agg( { # aggregate perf_value if necessary 'perf_value': aggregate_multi_runs, # keep all other (non-grouping) columns untouched # (take 'first', as they should be equals to each other anyways) # There might however result a discrepancy between perf_value and perf_data, perf_ref, and tags. # This is no issue until now, but we need to keep this in mind. **{k: 'first' for k in perf_records.columns.values if k != 'perf_value' and k not in columns_to_group_by} } ) if last: base_columns = ['sysname', 'partition', 'environ', 'testname', 'perf_var'] for var in extras or []: if var in perf_records and var not in base_columns: base_columns.append(var) perf_records = perf_records.sort_index().groupby(base_columns).tail(1) return perf_records
[docs]def tabulate_last_perf(test, root='../../perflogs', extras=None, **kwargs): """ Retrieve last perf_log entry for each system/partition/environment. :param test: Shell-style glob pattern matched against last directory component to restrict loaded logs, or None to load all in tree :type test: str :param root: Path to root of tree containing perf logs of interest - default assumes this is called from an `apps/<application>/` directory :type root: str :param extras: Additional dataframe headers to add :type extras: list :returns: A dataframe with columns: - case: name of the system, partition and environ - perf_var: Performance variable - add_var: Any additional variable passed as argument :rtype: pandas.DataFrame """ def add_extras_columns(columns): for var in extras or []: if var not in columns: columns.append(var) return columns df = load_perf_logs(root=root, test=test, extras=extras, last=True) if df is None: # no data return None # filter to rows for correct perf_var: for key, value in kwargs.items(): if key in df: df = df.loc[df[key] == value] # keep only the LAST record in each system/partition/environment/xvar columns = ['sysname', 'partition', 'environ', 'perf_var'] columns = add_extras_columns(columns) df = df.sort_index().groupby(columns).tail(1) # Add "case" column from combined system/partition: df['case'] = df[['sysname', 'partition', 'environ']].agg(':'.join, axis=1) # df.rename(columns={"perf_var": "Metric"}, inplace=True) # reshape to wide table: columns = ['case'] columns = add_extras_columns(columns) df = df.pivot(index='perf_var', columns=columns, values='perf_value') return df
[docs]def tabulate_partitions(root): """ Tabulate the list of partitions defined with ReFrame config file and high level overview of each partition. We tabulate only partitions that are found in the perflog directory :param root: Perflog root directory :type root: str :returns: A dataframe with all partition details :rtype: pandas.DataFrame """ found_parts = [] for dir_path, _, files in os.walk(root): for file in files: if file.endswith('.log'): part_name = os.path.split( os.path.split(os.path.relpath(dir_path, root))[0] )[0].replace('/', ':') found_parts.append(part_name) # Remove duplications found_parts = list(set(found_parts)) all_partitions = {} for system in site_configuration['systems']: # Skip local iMac if system['name'] == 'catalina': continue for partition in system['partitions']: name = ':'.join([system['name'], partition['name']]) if name not in found_parts: continue try: if name not in all_partitions.keys(): all_partitions[name] = {} all_partitions[name]['descr'] = partition['descr'] all_partitions[name]['arch'] = partition['processor']['arch'] all_partitions[name]['S:C:T'] = ','.join( [str(partition['processor']['num_sockets']), str(partition['processor']['num_cpus_per_socket'] // partition['processor'][ 'num_sockets']), str(partition['processor']['num_cpus_per_core'])] ) cache_sizes = [] for cache in partition['processor']['topology']['caches']: cache_level = int(cache['type'].replace('L', '')) cache_sizes.insert(cache_level - 1, str(cache['size'] // 1024)) all_partitions[name]['caches'] = ','.join(cache_sizes) except KeyError: all_partitions[name] = { 'descr': 'N/A', 'arch': 'N/A', 'S:C:T': 'N/A', 'caches': 'N/A', } return pd.DataFrame(all_partitions)
[docs]def filter_systems_by_name(patterns): """ Filter systems based on patterns in the name. If all patterns are found in the name, the system is chosen. :param patterns: List of patterns to be searched :type patterns: list :returns: List of partitions that match the pattern :rtype: list """ return [p.fullname for p in runtime.runtime().system.partitions if all(re.search(pattern, p.fullname) for pattern in patterns)]
[docs]def filter_systems_by_env(envs): """ Filter systems based on valid environments defined for them. :param envs: List of environments to be searched :type envs: list :returns: List of partitions that match the envs :rtype: list """ return [p.fullname for p in runtime.runtime().system.partitions for e in envs if e in [pe.name for pe in p.environs]]
[docs]def git_describe(dir): """ Return a string describing the state of the git repo in which the dir is. See `git describe --dirty --always` for full details. :param dir: Root path of git repo :type dir: str :returns: Git describe output :rtype: str """ cmd = shlex.split('git describe --dirty --always') proc = subprocess.run(cmd, capture_output=True, universal_newlines=True, check=True, cwd=dir) proc.check_returncode() if proc.stderr: raise ValueError(proc.stderr) return proc.stdout.strip()
[docs]def generate_random_number(n): """ Generate random integer of n digits :param n: Length of the desired random number :type n: int :returns: Generated random number :rtype: int """ range_start = 10**(n-1) range_end = (10**n)-1 return random.randint(range_start, range_end)
[docs]def get_scheduler_env_list(scheduler_name): """ Return the environment variables that stores different job details of different schedulers :param scheduler_name: Name of the workload scheduler :type scheduler_name: str :returns: Environment variables dict :rtype: dict """ if scheduler_name in ['slurm', 'squeue']: env_dict = { 'job_id': 'SLURM_JOB_ID', 'node_list': 'SLURM_JOB_NODELIST', 'head_node': '(scontrol show hostnames $SLURM_JOB_NODELIST | head -n1)', 'num_nodes': 'SLURM_JOB_NUM_NODES', 'num_tasks': 'SLURM_NTASKS', } elif scheduler_name in ['pbs', 'torque']: env_dict = { 'job_id': 'PBS_JOBID', 'node_list': 'PBS_NODEFILE', 'head_node': '(uniq $PBS_NODEFILE | head -n1)', 'num_nodes': 'PBS_NUM_NODES', 'num_tasks': 'PBS_NP', } elif scheduler_name in ['oar']: env_dict = { 'job_id': 'OAR_JOB_ID', 'node_list': 'OAR_NODEFILE', 'head_node': '(uniq $OAR_NODEFILE | head -n1)', 'num_nodes': '(uniq $OAR_NODEFILE | wc -l)', 'num_tasks': 'NA', } elif scheduler_name == 'local': # We create a fake job ID env_dict = { 'job_id': 'LOCAL_JOB_ID', } else: raise NotImplementedError("Scheduler not supported") return env_dict
[docs]def emit_conda_init_cmds(): """ This function emits the command to initialize conda. It temporarily clears the PYTHONPATH, so that no pre-installed dependencies from external or external/perfmon are used. # todo: test if this works even with perfmon """ return 'export PYTHONPATH= && source $(conda info --base)/etc/profile.d/conda.sh'
def emit_conda_activate_cmds(env_name): cmd_list = [ emit_conda_init_cmds(), f'conda activate {env_name}' ] # Activate the env return cmd_list
[docs]def emit_conda_env_cmds(env_name, py_ver='3.8'): """ This function emits all the commands to create/activate a conda environment. This function assumes conda is installed in the system :param env_name: Name of the conda env to create/activate :type env_name: str :param py_ver: Version of python to be used in conda environment (Default: 3.8) :type py_ver: str :returns: List of commands to create/activate conda env :rtype: list """ # Intialise the list of commands to be emitted cmd_list = [] if os.getenv('CREATE_CONDA_ENV', 'YES') == 'YES': cmd_list.append(emit_conda_init_cmds()) # List all available conda envs conda_env_list = osext.run_command( cmd='conda env list', shell=True, check=True ).stdout.strip().splitlines() # Get names of all envs env_names = [] for line in conda_env_list: if '#' not in line: env_names.append(line.split()[0]) # If env is already there, first remove it if env_name in env_names: cmd_list.append(f'conda env remove -n {env_name}') # Create a new conda env cmd_list.append(f'conda create -n {env_name} python={py_ver} -y') # Activate the env cmd_list.append(f'conda activate {env_name}') return cmd_list
[docs]def merge_spack_configs(input_file, output_file): """ This function merges all spack config files by replacing `include` keyword with respective yaml file :param input_file: Path to input spack.yaml file :type path: str :param output_file: Path to output merged spack.yaml file :type path: str :returns: None """ with open(input_file) as f: cfg = yaml.load(f, Loader=yaml.FullLoader) # Parent directory of input file parent_dir = Path(input_file).parent # Update dict with include files for inc in cfg['spack'].get('include', []): with open(os.path.join(parent_dir, inc)) as f: cfg['spack'].update(yaml.load(f, Loader=yaml.FullLoader)) # Delete include key and values del cfg['spack']['include'] # Overwrite original file with open(output_file, 'w') as f: yaml.dump(cfg, f, default_flow_style=False, sort_keys=False)
if __name__ == '__main__': # import matplotlib.pyplot as plt # from matplotlib.ticker import ScalarFormatter # import pandas as pd PERFLOG_DIR = '../perflogs' # df = tabulate_partitions(PERFLOG_DIR) # print(df.transpose()) df = load_perf_logs(root=PERFLOG_DIR, extras=['num_threads'], test='FftCpuTest*') with pd.option_context('display.max_rows', None, 'display.max_columns', None): print(df.columns)