Source code for ska_sdp_resource_model.simulate.monte_carlo

"""Module hosting MonteCarloSimulation class."""

import copy
import functools as ftl
import logging
import multiprocessing as mp
from collections import defaultdict

import dask.bag as db
import more_itertools as mit
import numpy as np
import pandas as pd
from scipy import stats

from ska_sdp_resource_model.simulate.config_models import PipelinesConfigModel
from ska_sdp_resource_model.simulate.constants import SECONDS_IN_DAY
from ska_sdp_resource_model.simulate.process_inputs import update_observations
from ska_sdp_resource_model.simulate.resource_usage import (
    ResourceUsageSimulator,
)

logger = logging.getLogger("simulation")


DEFAULT_NUM_WORKERS = 4 * mp.cpu_count()


[docs] class MonteCarloSimulation: """Sets up and executes a Monte Carlo simulation run of the ResourceUsageSimulator. This class handles parameterising the pipeline configurations and running multiple simulations, possibly concurrently, with different random seeds. Attributes: observations (list): List of observation dictionaries. Each dictionary contains configurations for scheduling block types and pipeline steps. pipelines_config (dict): Configuration for each pipeline, including 'node_hours_mean', 'node_hours_uncertainty', 'pct_parallelism_min', and 'pct_parallelism_max'. hardware_config (dict): Hardware configuration for the simulation. shuffle (bool): Whether to shuffle the observations list before each simulation. samplers (dict): Samplers for each pipeline's 'node_hours' (truncated normal distribution) and 'pct_parallelism' (uniform distribution). Methods: get_samplers(): Initialise distributions for sampling pipeline parameters. run(): Execute the Monte Carlo simulation. run_simulation(): Run a single iteration of the simulation. parameterise_pipelines(): Sample pipeline parameters for a single iteration. sample_parameter(): Sample a specific parameter for all pipelines. """ def __init__( self, observations_list, pipelines_config, hardware_config, shuffle=False, ): """Initialise a Monte Carlo simulation of the SDP Resource Model. Args: observations_list (list): A list of dictionaries containing the observations to be simulated. This must contain the configurations for each scheduling block type and each pipeline step. pipelines_config (dict): A dictionary containing the configuration of each pipeline. Must contain the keys pct_parallelism_min and pct_parallelism_max. hardware_config (dict): A dictionary containing the hardware configuration to use in the simulation. shuffle (bool): Whether to shuffle the observations list before running the simulation. Default is False. """ self.observations = observations_list self.pipelines_config = PipelinesConfigModel( pipelines_config ).model_dump() self.hardware_config = hardware_config self.shuffle = bool(shuffle) self.samplers = self.get_samplers()
[docs] def get_samplers(self): """Initialise the distributions for sampling pipeline node hours and percentage parallelism. Returns: samplers (dict): Dictionary containing samplers for each pipline. Each pipeline has samplers for "node_hours" and "pct_parallelism". Samplers are `scipy.stats._distn_infrastructure.rv_continuous_frozen` objects. """ # pipelines_config samplers = defaultdict(dict) for pipeline, config in self.pipelines_config.items(): # Node hours - truncated normal samplers[pipeline]["node_hours"] = ( self.get_node_hours_distribution(config) ) # Percentage parallelism - uniform samplers[pipeline]["pct_parallelism"] = ( self.get_pct_parallelism_distribution(config) ) return dict(samplers)
[docs] def get_pct_parallelism_distribution(self, config): """Initialise the distribution for sampling pipeline percentage parallelism. Args: config (dict): Pipeline configuration dictionary. Returns: dist (scipy.stats._distn_infrastructure.rv_continuous_frozen): Uniform distribution object for sampling percentage parallelism. """ loc, end = config["pct_parallelism_min"], config["pct_parallelism_max"] return stats.uniform(loc, end - loc)
[docs] def get_node_hours_distribution(self, config): """Initialise the distribution for sampling pipeline node hours. Args: config (dict): Pipeline configuration dictionary. Returns: dist (scipy.stats._distn_infrastructure.rv_continuous_frozen): Truncated normal distribution object for sampling node hours. """ loc = config["node_hours_mean"] scale = loc * config["node_hours_uncertainty"] a_trunc_std = (0 - loc) / scale return stats.truncnorm(a_trunc_std, np.inf, loc=loc, scale=scale)
def __call__( self, n_iter=100, num_workers=1, seed=None, client=None, ): """Run a Monte Carlo simulation of the SDP Resource Model. Args: n_iter (int): The number of Monte Carlo trials to run. Default is 100. num_workers (int): Number of worker processes to use for running parallel simulations. If `None` or `-1`, a suitable default number of workers will be chosen. For a small number of iterations (`n_iter < 100`), the simulations will be run sequentially on a single worker. For larger simulations, the default value is set to four times the cpu count of the system at import time. seed (int): The random seed to use for the Monte Carlo simulation for reproducible results. Default is None. client (dask.Client): Optional Dask client to utilise for parallelism. If provided, the Monte Carlo iterations will be run using this scheduler. If None, any previously initialized schedulers will automatically be used by dask. If None, and no exisiting dask scheduler is running, the Monte Carlo iterations will be run sequentially on a single process. Returns: run_times_days (list): A list of the total simulation times for each Monte Carlo trial in days. resource_usages (list): A list of the resource usage dataframes for each Monte Carlo trial. event_logs (list): A list of the event logs dataframes for each Monte Carlo trial. num_successful_runs (int): The number of successful runs in the Monte Carlo simulation. """ logger.info("---------Starting Monte Carlo simulation--------") logger.debug("Original pipeline config %r...", self.pipelines_config) # setup if client: num_workers = sum(client.ncores().values()) else: num_workers = resolve_num_workers(num_workers, n_iter) if num_workers == 1: runner = self._run_sequential compute_config = {} else: runner = self._run_dask compute_config = {"scheduler": client} # log setup logger.debug( "%d iterations will be run using %d worker%s with %s " "implementation.", n_iter, num_workers, "s" * (num_workers > 1), runner.__name__.rsplit("_", 1)[1], ) # execute event_log_df, resource_usage_df, num_successful_runs = runner( n_iter, num_workers=num_workers, seed=seed, **compute_config ) run_times_days = ( resource_usage_df.groupby("iteration")["time_s"].max() / SECONDS_IN_DAY ).tolist() logger.info( "-------------------------Finished Monte Carlo simulation!-------" "--------------------\n" "Mean time: %.2f days\n" "Min time: %.2f days\n" "Max time: %.2f days\n" "95%% CI: %.2f - %.2f days\n" "%d successful runs out of %d iterations\n", np.mean(run_times_days), np.min(run_times_days), np.max(run_times_days), *np.percentile(run_times_days, [2.5, 97.5]), num_successful_runs, n_iter, ) return ( run_times_days, resource_usage_df, event_log_df, num_successful_runs, ) # alias run = __call__ def _run_sequential( self, n_iter, seed, sequence=None, **_ignored ): # pylint: disable=unused-argument # Seed random number generator rng = np.random.default_rng(seed) if sequence is None: sequence = range(1, n_iter + 1) # loop simulations event_logs, resourse_use = [], [] num_successes = 0 for i in sequence: logger.debug( "Running Monte Carlo iteration %d/%d...\nSampling" "pct_parallelism for each pipeline...", i, n_iter, ) output = self.run_simulation(rng) output["resource_usage"]["iteration"] = i output["event_log"]["iteration"] = i if output["num_successes"] == 1: num_successes += 1 event_logs.append(output["event_log"]) resourse_use.append(output["resource_usage"]) # aggregte results if num_successes: event_logs = pd.concat(event_logs) resourse_use = pd.concat(resourse_use) return event_logs, resourse_use, num_successes def _run_dask(self, n_iter, num_workers, seed, **compute_config): # Define worker function worker = ftl.partial(self._run_sequential, n_iter) # split work across `num_workers` batch_size = int(np.ceil(n_iter / num_workers)) workloads = db.from_sequence( mit.batched(range(1, n_iter + 1), batch_size) ) # Spawn child SeedSequences to pass to the worker processes. seed_sequence = np.random.SeedSequence(seed) child_seeds = db.from_sequence( seed_sequence.spawn(workloads.npartitions) ) # create task graph graph = db.map(worker, child_seeds, workloads) # execute event_log_dfs, resource_usage_dfs, num_successes = zip( *graph.compute(**compute_config) ) # aggregate results return ( pd.concat(event_log_dfs), pd.concat(resource_usage_dfs), np.sum(num_successes), )
[docs] def run_simulation(self, rng): """Run a single Monte Carlo iteration of the SDP Resource Model. This function samples the parameter space for pct_parallelism and node_hours of each pipeline, updates the observations, runs the simulation and returns the output. Args: rng (np.random.Generator): A random number generator instance. Returns: (dict): A dictionary containing the simulation results. """ pipelines_config_parameterised = self.parameterise_pipelines(rng) observations_parameterised = update_observations( self.observations, pipelines_config_parameterised ) if self.shuffle: shuffle_observations(observations_parameterised, rng) simulator = ResourceUsageSimulator() return simulator.run_simulation( observations_parameterised, self.hardware_config )
[docs] def parameterise_pipelines(self, rng=None): """Parameterise the pipelines with sampled values for pct_parallelism and node_hours. This function samples the parameter space for pct_parallelism and node_hours of each pipeline using the distributions defined in the `samplers` attribute. Args: rng (np.random.Generator): A random number generator instance. Returns: (dict): A dictionary containing the parameterised pipeline configurations. """ pipelines_config_sampled = self.sample_parameter( "pct_parallelism", rng=rng ) return self.sample_parameter( "node_hours", pipelines_config_sampled, rng )
[docs] def sample_parameter(self, parameter_name, input_config=None, rng=None): """Generate a randomly sampled value for parameter of each pipeline. Uses the distribution defined in `samplers` attribute to draw parameter values for `pct_parallelism` or `node_hours` pipeline parameters. Args: parameter_name (str): Name of the parameter to sample. One of "pct_parallelism" or "node_hours". input_config (dict): A dictionary containing the configuration of each pipeline. This will be updated with the parameter value drawn from the pipeline parameter samplers. rng (np.random.Generator): A random number generator instance. Returns: pipelines_config_sampled (dict): A dictionary containing the configuration for each pipeline with a randomly sampled value for `parameter_name`. """ pipelines_config_sampled = input_config or copy.deepcopy( self.pipelines_config ) for pipeline, samplers in self.samplers.items(): sampler = samplers[parameter_name] pipelines_config_sampled[pipeline][parameter_name] = sampler.rvs( random_state=rng ) return pipelines_config_sampled
[docs] def resolve_num_workers(num_workers, n_iter): """Resolve the number of workers to use for parallel processing. If num_workers is None or -1, use the default number of workers, which is four workers per cpu. Otherwise, convert num_workers to an integer. Also ensure that the number of workers does not exceed the number of Monte Carlo iterations. Args: num_workers (int, optional): The number of workers to use. n_iter (int): The number of iterations. Returns: int: The resolved number of workers. Raises: ValueError: If num_workers is not a positive integer, or a default substitute value -1 or None. """ # Benchmarking indicates that parallelisation is only advantageous from # around 500 iterations upward with current implementation. Default is # therefore to use the serial execution for small number of iterations. if num_workers in {-1, None}: num_workers = DEFAULT_NUM_WORKERS num_workers = int(num_workers) if num_workers <= 0: raise ValueError( "Number of worker processes `num_workers` should be a positive " f"integer value, (or -1 for the default {DEFAULT_NUM_WORKERS})" ) # more workers than iterations does not make sense return min(n_iter, num_workers)
[docs] def shuffle_observations(observations_list, rng): """Shuffle the observations list. Args: observations_list (list): A list of dictionaries containing the observations to be simulated. rng (np.random.Generator): A random number generator instance. Returns: None """ rng.shuffle(observations_list)