"""Module hosting MonteCarloSimulation class."""
import copy
import functools as ftl
import logging
import multiprocessing as mp
from collections import defaultdict
import dask.bag as db
import more_itertools as mit
import numpy as np
import pandas as pd
from scipy import stats
from ska_sdp_resource_model.simulate.config_models import PipelinesConfigModel
from ska_sdp_resource_model.simulate.constants import SECONDS_IN_DAY
from ska_sdp_resource_model.simulate.process_inputs import update_observations
from ska_sdp_resource_model.simulate.resource_usage import (
ResourceUsageSimulator,
)
logger = logging.getLogger("simulation")
DEFAULT_NUM_WORKERS = 4 * mp.cpu_count()
[docs]
class MonteCarloSimulation:
"""Sets up and executes a Monte Carlo simulation run of the
ResourceUsageSimulator.
This class handles parameterising the pipeline configurations and running
multiple simulations, possibly concurrently, with different random seeds.
Attributes:
observations (list): List of observation dictionaries. Each dictionary
contains configurations for scheduling block types and pipeline
steps.
pipelines_config (dict): Configuration for each pipeline, including
'node_hours_mean', 'node_hours_uncertainty', 'pct_parallelism_min',
and 'pct_parallelism_max'.
hardware_config (dict): Hardware configuration for the simulation.
shuffle (bool): Whether to shuffle the observations list before
each simulation.
samplers (dict): Samplers for each pipeline's 'node_hours' (truncated
normal distribution) and 'pct_parallelism' (uniform distribution).
Methods:
get_samplers(): Initialise distributions for sampling pipeline
parameters.
run(): Execute the Monte Carlo simulation.
run_simulation(): Run a single iteration of the simulation.
parameterise_pipelines(): Sample pipeline parameters for a single
iteration.
sample_parameter(): Sample a specific parameter for all pipelines.
"""
def __init__(
self,
observations_list,
pipelines_config,
hardware_config,
shuffle=False,
):
"""Initialise a Monte Carlo simulation of the SDP Resource Model.
Args:
observations_list (list):
A list of dictionaries containing the observations to be
simulated. This must contain the configurations for each
scheduling block type and each pipeline step.
pipelines_config (dict):
A dictionary containing the configuration of each pipeline.
Must contain the keys pct_parallelism_min and
pct_parallelism_max.
hardware_config (dict):
A dictionary containing the hardware configuration to use in
the simulation.
shuffle (bool):
Whether to shuffle the observations list before running the
simulation. Default is False.
"""
self.observations = observations_list
self.pipelines_config = PipelinesConfigModel(
pipelines_config
).model_dump()
self.hardware_config = hardware_config
self.shuffle = bool(shuffle)
self.samplers = self.get_samplers()
[docs]
def get_samplers(self):
"""Initialise the distributions for sampling pipeline node hours and
percentage parallelism.
Returns:
samplers (dict):
Dictionary containing samplers for each pipline. Each pipeline
has samplers for "node_hours" and "pct_parallelism". Samplers
are `scipy.stats._distn_infrastructure.rv_continuous_frozen`
objects.
"""
# pipelines_config
samplers = defaultdict(dict)
for pipeline, config in self.pipelines_config.items():
# Node hours - truncated normal
samplers[pipeline]["node_hours"] = (
self.get_node_hours_distribution(config)
)
# Percentage parallelism - uniform
samplers[pipeline]["pct_parallelism"] = (
self.get_pct_parallelism_distribution(config)
)
return dict(samplers)
[docs]
def get_pct_parallelism_distribution(self, config):
"""Initialise the distribution for sampling pipeline percentage
parallelism.
Args:
config (dict): Pipeline configuration dictionary.
Returns:
dist (scipy.stats._distn_infrastructure.rv_continuous_frozen):
Uniform distribution object for sampling percentage
parallelism.
"""
loc, end = config["pct_parallelism_min"], config["pct_parallelism_max"]
return stats.uniform(loc, end - loc)
[docs]
def get_node_hours_distribution(self, config):
"""Initialise the distribution for sampling pipeline node hours.
Args:
config (dict): Pipeline configuration dictionary.
Returns:
dist (scipy.stats._distn_infrastructure.rv_continuous_frozen):
Truncated normal distribution object for sampling node hours.
"""
loc = config["node_hours_mean"]
scale = loc * config["node_hours_uncertainty"]
a_trunc_std = (0 - loc) / scale
return stats.truncnorm(a_trunc_std, np.inf, loc=loc, scale=scale)
def __call__(
self,
n_iter=100,
num_workers=1,
seed=None,
client=None,
):
"""Run a Monte Carlo simulation of the SDP Resource Model.
Args:
n_iter (int):
The number of Monte Carlo trials to run. Default is 100.
num_workers (int):
Number of worker processes to use for running parallel
simulations. If `None` or `-1`, a suitable default number of
workers will be chosen. For a small number of iterations
(`n_iter < 100`), the simulations will be run sequentially on
a single worker. For larger simulations, the default value is
set to four times the cpu count of the system at import time.
seed (int):
The random seed to use for the Monte Carlo simulation for
reproducible results. Default is None.
client (dask.Client):
Optional Dask client to utilise for parallelism. If provided,
the Monte Carlo iterations will be run using this scheduler.
If None, any previously initialized schedulers will
automatically be used by dask. If None, and no exisiting dask
scheduler is running, the Monte Carlo iterations will be run
sequentially on a single process.
Returns:
run_times_days (list):
A list of the total simulation times for each Monte Carlo
trial in days.
resource_usages (list):
A list of the resource usage dataframes for each Monte Carlo
trial.
event_logs (list):
A list of the event logs dataframes for each Monte Carlo trial.
num_successful_runs (int):
The number of successful runs in the Monte Carlo simulation.
"""
logger.info("---------Starting Monte Carlo simulation--------")
logger.debug("Original pipeline config %r...", self.pipelines_config)
# setup
if client:
num_workers = sum(client.ncores().values())
else:
num_workers = resolve_num_workers(num_workers, n_iter)
if num_workers == 1:
runner = self._run_sequential
compute_config = {}
else:
runner = self._run_dask
compute_config = {"scheduler": client}
# log setup
logger.debug(
"%d iterations will be run using %d worker%s with %s "
"implementation.",
n_iter,
num_workers,
"s" * (num_workers > 1),
runner.__name__.rsplit("_", 1)[1],
)
# execute
event_log_df, resource_usage_df, num_successful_runs = runner(
n_iter, num_workers=num_workers, seed=seed, **compute_config
)
run_times_days = (
resource_usage_df.groupby("iteration")["time_s"].max()
/ SECONDS_IN_DAY
).tolist()
logger.info(
"-------------------------Finished Monte Carlo simulation!-------"
"--------------------\n"
"Mean time: %.2f days\n"
"Min time: %.2f days\n"
"Max time: %.2f days\n"
"95%% CI: %.2f - %.2f days\n"
"%d successful runs out of %d iterations\n",
np.mean(run_times_days),
np.min(run_times_days),
np.max(run_times_days),
*np.percentile(run_times_days, [2.5, 97.5]),
num_successful_runs,
n_iter,
)
return (
run_times_days,
resource_usage_df,
event_log_df,
num_successful_runs,
)
# alias
run = __call__
def _run_sequential(
self, n_iter, seed, sequence=None, **_ignored
): # pylint: disable=unused-argument
# Seed random number generator
rng = np.random.default_rng(seed)
if sequence is None:
sequence = range(1, n_iter + 1)
# loop simulations
event_logs, resourse_use = [], []
num_successes = 0
for i in sequence:
logger.debug(
"Running Monte Carlo iteration %d/%d...\nSampling"
"pct_parallelism for each pipeline...",
i,
n_iter,
)
output = self.run_simulation(rng)
output["resource_usage"]["iteration"] = i
output["event_log"]["iteration"] = i
if output["num_successes"] == 1:
num_successes += 1
event_logs.append(output["event_log"])
resourse_use.append(output["resource_usage"])
# aggregte results
if num_successes:
event_logs = pd.concat(event_logs)
resourse_use = pd.concat(resourse_use)
return event_logs, resourse_use, num_successes
def _run_dask(self, n_iter, num_workers, seed, **compute_config):
# Define worker function
worker = ftl.partial(self._run_sequential, n_iter)
# split work across `num_workers`
batch_size = int(np.ceil(n_iter / num_workers))
workloads = db.from_sequence(
mit.batched(range(1, n_iter + 1), batch_size)
)
# Spawn child SeedSequences to pass to the worker processes.
seed_sequence = np.random.SeedSequence(seed)
child_seeds = db.from_sequence(
seed_sequence.spawn(workloads.npartitions)
)
# create task graph
graph = db.map(worker, child_seeds, workloads)
# execute
event_log_dfs, resource_usage_dfs, num_successes = zip(
*graph.compute(**compute_config)
)
# aggregate results
return (
pd.concat(event_log_dfs),
pd.concat(resource_usage_dfs),
np.sum(num_successes),
)
[docs]
def run_simulation(self, rng):
"""Run a single Monte Carlo iteration of the SDP Resource Model.
This function samples the parameter space for pct_parallelism and
node_hours of each pipeline, updates the observations, runs the
simulation and returns the output.
Args:
rng (np.random.Generator): A random number generator instance.
Returns:
(dict): A dictionary containing the simulation results.
"""
pipelines_config_parameterised = self.parameterise_pipelines(rng)
observations_parameterised = update_observations(
self.observations, pipelines_config_parameterised
)
if self.shuffle:
shuffle_observations(observations_parameterised, rng)
simulator = ResourceUsageSimulator()
return simulator.run_simulation(
observations_parameterised, self.hardware_config
)
[docs]
def parameterise_pipelines(self, rng=None):
"""Parameterise the pipelines with sampled values for pct_parallelism
and node_hours.
This function samples the parameter space for pct_parallelism and
node_hours of each pipeline using the distributions defined in the
`samplers` attribute.
Args:
rng (np.random.Generator): A random number generator instance.
Returns:
(dict): A dictionary containing the parameterised pipeline
configurations.
"""
pipelines_config_sampled = self.sample_parameter(
"pct_parallelism", rng=rng
)
return self.sample_parameter(
"node_hours", pipelines_config_sampled, rng
)
[docs]
def sample_parameter(self, parameter_name, input_config=None, rng=None):
"""Generate a randomly sampled value for parameter of each pipeline.
Uses the distribution defined in `samplers` attribute to draw
parameter values for `pct_parallelism` or `node_hours` pipeline
parameters.
Args:
parameter_name (str):
Name of the parameter to sample. One of "pct_parallelism"
or "node_hours".
input_config (dict):
A dictionary containing the configuration of each pipeline.
This will be updated with the parameter value drawn from the
pipeline parameter samplers.
rng (np.random.Generator):
A random number generator instance.
Returns:
pipelines_config_sampled (dict):
A dictionary containing the configuration for each pipeline
with a randomly sampled value for `parameter_name`.
"""
pipelines_config_sampled = input_config or copy.deepcopy(
self.pipelines_config
)
for pipeline, samplers in self.samplers.items():
sampler = samplers[parameter_name]
pipelines_config_sampled[pipeline][parameter_name] = sampler.rvs(
random_state=rng
)
return pipelines_config_sampled
[docs]
def resolve_num_workers(num_workers, n_iter):
"""Resolve the number of workers to use for parallel processing.
If num_workers is None or -1, use the default number of workers, which is
four workers per cpu. Otherwise, convert num_workers to an integer. Also
ensure that the number of workers does not exceed the number of Monte Carlo
iterations.
Args:
num_workers (int, optional): The number of workers to use.
n_iter (int): The number of iterations.
Returns:
int: The resolved number of workers.
Raises:
ValueError: If num_workers is not a positive integer, or a default
substitute value -1 or None.
"""
# Benchmarking indicates that parallelisation is only advantageous from
# around 500 iterations upward with current implementation. Default is
# therefore to use the serial execution for small number of iterations.
if num_workers in {-1, None}:
num_workers = DEFAULT_NUM_WORKERS
num_workers = int(num_workers)
if num_workers <= 0:
raise ValueError(
"Number of worker processes `num_workers` should be a positive "
f"integer value, (or -1 for the default {DEFAULT_NUM_WORKERS})"
)
# more workers than iterations does not make sense
return min(n_iter, num_workers)
[docs]
def shuffle_observations(observations_list, rng):
"""Shuffle the observations list.
Args:
observations_list (list):
A list of dictionaries containing the observations to be simulated.
rng (np.random.Generator):
A random number generator instance.
Returns:
None
"""
rng.shuffle(observations_list)