"""Module to optimise the hardware configuration of a simulation."""
import argparse
import logging
from functools import partial
from math import floor, inf
from pathlib import Path
import optuna
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from ska_sdp_resource_model.simulate import monte_carlo_simulation
from ska_sdp_resource_model.simulate.config_models import HardwareConfig
from ska_sdp_resource_model.simulate.constants import SECONDS_IN_DAY
from ska_sdp_resource_model.simulate.logger import setup_logger
from ska_sdp_resource_model.simulate.process_inputs import (
OBSERVING_SCHEDULE_PATH,
PIPELINES_CONFIG_PATH,
SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
process_inputs,
)
from ska_sdp_resource_model.simulate.resource_usage import (
ResourceUsageSimulator,
)
from ska_sdp_resource_model.simulate.validate_inputs import (
configuration_ok,
inputs_ok,
)
EUR_PER_TB_PERFORMANCE = 1000 # 1000 euros per TB
EUR_PER_PB_CAPACITY = 100000 # 100k euros per PB
EUR_PER_NODE = 10000 # 10k euros per node
parser = argparse.ArgumentParser(
description="Optimise the hardware configuration"
)
parser.add_argument(
"--budget", type=float, default=8e6, help="Hardware budget in EUR."
)
parser.add_argument(
"--n_trials",
type=int,
default=10,
help="Number of trials to run for the optimisation.",
)
parser.add_argument(
"--observing_schedule_path",
type=Path,
default=OBSERVING_SCHEDULE_PATH,
help="Path to the observing schedule file.",
)
parser.add_argument(
"--pipelines_config_path",
type=Path,
default=PIPELINES_CONFIG_PATH,
help="Path to the pipelines configuration file.",
)
parser.add_argument(
"--scheduling_block_types_config_path",
type=Path,
default=SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
help="Path to the scheduling block types file.",
)
parser.add_argument(
"--storage",
type=str,
default="sqlite:///hardware-optimisation.db",
help="Name of the database for the optuna study. "
"Results will be appended if this already exists, "
"otherwise a new DB will be created.",
)
parser.add_argument(
"--study_name",
type=str,
default=None,
help="Name of the optuna study. If not provided, "
"the study name will be named 'budget-{budget}' where"
"{budget} is taken from the --budget parameter.",
)
parser.add_argument(
"--num_monte_carlo_iterations",
type=int,
default=0,
help="Number of Monte Carlo iterations to run. If greater than 0, the "
"simulation will be run multiple times with different values for "
"pct_parallelism and node_hours for each pipeline and the average "
"time taken will be used to optimise hardware.",
)
parser.add_argument( # pylint: disable=duplicate-code
"--shuffle_observations",
action="store_true",
help="Shuffle the observations list before running the simulation.",
)
parser.add_argument(
"--debug",
action="store_true",
help="Set logging level to DEBUG for detailed logging.",
)
parser.add_argument(
"--num_pipeline_groups",
type=int,
default=6,
help="Number of groups for clustering pipelines. Must be greater than "
"0. Pipelines will be clustered into groups and the number of nodes "
"will be sampled for each group. If num_pipeline_groups is greater "
"than the number of pipelines, the number of groups will be set to"
"the number of pipelines.",
)
[docs]
def main():
"""Optimise the hardware configuration.
Runs optuna on the hardware configuration to minimise the total time taken.
Returns:
None
"""
args = parser.parse_args()
logger = setup_logger(name="simulation", verbose=True, debug=args.debug)
logger.info(
f"\n{' OPTIMISING ':-^80}\n"
"Optimising hardware configuration with budget %.1f EUR, %d trials\n"
"Observing schedule file: %s\n"
"Pipelines configuration file: %s\n"
"Scheduling block types configuration file: %s\n"
"%d Monte Carlo simulations will be run in each trial with shuffle "
"set to %s. Pipelines will be clustered into %d groups for sampling "
"num_nodes.",
args.budget,
args.n_trials,
args.observing_schedule_path,
args.pipelines_config_path,
args.scheduling_block_types_config_path,
args.num_monte_carlo_iterations,
args.shuffle_observations,
args.num_pipeline_groups,
)
study = optimise_hardware(
budget=args.budget,
n_trials=args.n_trials,
verbose=True,
observing_schedule_path=args.observing_schedule_path,
pipelines_config_path=args.pipelines_config_path,
scheduling_block_types_config_path=(
args.scheduling_block_types_config_path
),
storage=args.storage,
num_monte_carlo_iterations=args.num_monte_carlo_iterations,
shuffle=args.shuffle_observations,
n_groups=args.num_pipeline_groups,
)
best_value = study.best_trial.value
best_params = study.best_trial.params
logger.info(
f"{' DONE! ':-^80}"
"Best time was %.1f days with parameters:\n %s.\n"
"Best budget split was:\n %s\n"
"Best hardware config was:\n %s\n"
"Run 'optuna-dashboard %s' to view the results of all trials\n"
f"{'':-^80}\n",
best_value,
best_params,
study.best_trial.user_attrs["budget_split"],
study.best_trial.user_attrs["hardware_config"],
args.storage,
)
return args
[docs]
def optimise_hardware(
budget=8e6,
n_trials=10,
verbose=False,
observing_schedule_path=OBSERVING_SCHEDULE_PATH,
pipelines_config_path=PIPELINES_CONFIG_PATH,
scheduling_block_types_config_path=SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
storage="sqlite:///hardware-optimisation.db",
study_name=None,
num_monte_carlo_iterations=0,
shuffle=False,
n_groups=6,
client=None,
):
"""Optimise the hardware configuration.
If num_monte_carlo_iterations is greater than 0, the simulation will be
run multiple times with different values for pct_parallelism for each
pipeline and the average time taken will be used to optimise hardware.
Args:
budget (float): The budget for the hardware.
n_trials (int): The number of trials to run.
verbose (bool): Whether to print verbose output.
observing_schedule_path (Path): Path to the observing schedule file.
pipelines_config_path (Path): Path to the pipelines configuration file.
scheduling_block_types_config_path (Path): Path to the scheduling block
types file.
storage (str): The name of the database for the optuna
study. Defaults to 'sqlite:///hardware-optimisation.db'.
study_name (str): The name of the optuna
study. Defaults to 'budget-{budget}'.
num_monte_carlo_iterations (int): Number of Monte Carlo iterations to
run. Defaults to 0.
shuffle (bool): Whether to shuffle the observations list before
running the simulation. Defaults to False.
n_groups (int): The number of groups for clustering pipelines.
Defaults to 6.
client (dask.Client): Optional, Dask client to utilise for parallelism.
Returns:
best_params (dict):
The best parameters.
"""
logger = setup_logger(name="optimisation", verbose=verbose)
if not study_name:
study_name = f"budget-{budget}"
logger.info(
"Results will be saved inside RDB '%s' under study '%s'",
storage,
study_name,
)
# Load pipeline config / Observing schedule
observations_list, pipelines_config = process_inputs(
observing_schedule_path,
scheduling_block_types_config_path,
pipelines_config_path,
)
# Initialize study
study = optuna.create_study(
study_name=study_name,
direction="minimize",
storage=storage,
load_if_exists=True,
)
# If a dask client is provided, trigger distributing the Monte Carlo
# simulation by passing `num_workers`. We can't pass the client object
# through to the monte_carlo_runner, since the client object is not
# picklable. This split the work across `num_workers`, but the client will
# use however many workers it was initialised with to execute.
num_workers = sum(client.ncores().values()) if client else 1
parametrised_objective = partial(
objective,
observations_list=observations_list,
budget=budget,
pipelines_config=pipelines_config,
num_monte_carlo_iterations=num_monte_carlo_iterations,
num_workers=num_workers,
shuffle=shuffle,
n_groups=n_groups,
)
# Run optimization
if client is None:
study.optimize(
parametrised_objective,
n_trials=n_trials,
)
else:
# Adapted from https://docs.dask.org/en/stable/ml.html#optuna
futures = [
client.submit(
study.optimize, parametrised_objective, n_trials=1, pure=False
)
for _ in range(n_trials)
]
client.compute(futures, sync=True)
# Log results
best_time = study.best_trial.value
if best_time < inf:
best_params = study.best_trial.params
logger.info(
"Best trial took %.1f days with hardware configuration: %s",
best_time,
best_params,
)
return study
[docs]
def update_scheduling_blocks(observations_list, pipelines_config):
"""Update the number of nodes for each pipeline in the scheduling blocks.
This function iterates through the observation list and updates the
`num_nodes` parameter for each pipeline in the `pipeline_steps` dictionary
with the value from the `pipelines_config` dictionary.
Args:
observations_list (list): List of observations.
pipelines_config (dict): Dictionary containing configuration for each
pipeline.
Returns:
None
"""
for observation in observations_list:
for pipeline, config in observation["pipeline_steps"].items():
config["num_nodes"] = pipelines_config[pipeline]["num_nodes"]
[docs]
def objective( # pylint: disable=too-many-return-statements
trial,
budget,
observations_list,
pipelines_config,
num_monte_carlo_iterations=0,
num_workers=1,
shuffle=False,
n_groups=6,
):
"""Objective function for optimisation.
Args:
trial: The optuna trial object.
observations_list (list): List of observations to simulate.
budget (float): The budget for the hardware.
pipelines_config (dict): Pipelines configuration data.
scheduling_block_types_config_path (Path): Path to the scheduling
block types file.
num_monte_carlo_iterations (int): Number of Monte Carlo iterations to
run. Defaults to 0.
shuffle (bool): Whether to shuffle the observations list before running
the simulation. Defaults to False.
n_groups (int): The number of groups for clustering pipelines. Defaults
to 6.
Returns:
float: Total simulation time in days.
"""
logger = logging.getLogger("optimisation")
minimum_budget = (
EUR_PER_TB_PERFORMANCE + EUR_PER_PB_CAPACITY + EUR_PER_NODE
)
assert minimum_budget > 0
if budget < minimum_budget:
logger.warning(
"Budget %s is less than minimum budget %s. Skipping trial.",
budget,
minimum_budget,
)
return inf
budget_split = get_budget_split(trial, budget=budget)
if budget_split == inf:
return inf
trial.set_user_attr("budget_split", budget_split)
if any(budget <= 0 for budget in budget_split.values()):
logger.warning("No budget for one or more components. Skipping trial.")
return inf
hardware_config = configure_hardware(budget_split)
if hardware_config["compute_nodes"] == 0:
logger.warning("No compute nodes. Skipping trial.")
return inf
trial.set_user_attr("hardware_config", hardware_config)
# Update the "num_nodes" parameter in the pipelines_config dictionary.
parameterise_pipelines_num_nodes(
trial,
hardware_config["compute_nodes"],
pipelines_config=pipelines_config,
n_groups=n_groups,
)
# Save the updated pipeline configuration on the trial
trial.set_user_attr("pipelines_config", pipelines_config)
# We also need to update the "num_nodes" parameter value in the
# "pipeline_steps" dictionary of each observation in observations_list
update_scheduling_blocks(observations_list, pipelines_config)
if not (
configuration_ok(observations_list, hardware_config)
and inputs_ok(observations_list, hardware_config)
):
return inf
hardware_config = HardwareConfig(**hardware_config).model_dump()
hardware_cost = get_hardware_cost(hardware_config)
if hardware_cost["total"] > budget:
logger.warning(
"Hardware cost %d exceeds budget %d",
hardware_cost["total"],
budget,
)
return inf
if num_monte_carlo_iterations > 0:
run_times_days, _, _, num_successful_runs = monte_carlo_simulation(
observations_list,
pipelines_config,
hardware_config,
n_iter=num_monte_carlo_iterations,
shuffle=shuffle,
num_workers=num_workers,
)
trial.set_user_attr("run_times", run_times_days)
if num_successful_runs == 0:
logger.warning("No successful runs in Monte Carlo simulation.")
return inf
return sum(run_times_days) / num_monte_carlo_iterations
simulator = ResourceUsageSimulator()
try:
output = simulator.run_simulation(observations_list, hardware_config)
except ValueError as error:
logger.warning("Simulation failed to complete... %s", error)
return inf
return (
output["resource_usage"]["time_s"].max() / SECONDS_IN_DAY
) # Total time in days
[docs]
def get_budget_split(trial, budget):
"""Get the budget split for each component.
Args:
trial: The optuna trial object. budget (int): Total budget in EUR.
Returns:
budget_allocation (dict):
A dictionary with the budget allocated for each component.
"""
logger = logging.getLogger("optimisation")
min_budget_proportion = 0.1 # At least 10% of budget for each component
max_budget_proportion = 0.9 # Up to 90% of budget for each component
max_num_nodes = (
max_budget_proportion * budget
) // EUR_PER_NODE # Whole number of nodes
max_compute_budget = max(
EUR_PER_NODE, max_num_nodes * EUR_PER_NODE
) # At least 1 node
budget_allocation = {
"compute": trial.suggest_float(
"compute_fraction",
EUR_PER_NODE,
max_compute_budget,
step=EUR_PER_NODE,
)
}
if budget_allocation["compute"] < EUR_PER_NODE:
logger.warning(
"Compute budget %s is less than minimum budget %s. "
"Skipping trial.",
budget_allocation["compute"],
EUR_PER_NODE,
)
return inf
remaining_budget = budget - budget_allocation["compute"]
budget_allocation["performance"] = trial.suggest_float(
"performance_fraction",
min_budget_proportion * remaining_budget,
max_budget_proportion * remaining_budget,
step=None,
) # Allocate 10-90% of remaining budget to performance
remaining_budget = remaining_budget - budget_allocation["performance"]
budget_allocation["capacity"] = trial.suggest_float(
"capacity_fraction",
min_budget_proportion * remaining_budget,
max_budget_proportion * remaining_budget,
step=None,
) # Allocate 10-90% of remaining budget to capacity
remaining_budget = remaining_budget - budget_allocation["capacity"]
# Allocate remaining budget to data product
budget_allocation["data_product"] = remaining_budget
return budget_allocation
[docs]
def get_hardware_cost(hardware_config):
"""Compute the total cost of a hardware configuration in EUR.
Args:
hardware_config (dict):
Hardware configuration. Must include the following:
capacity_storage_pb, performance_storage_tb and compute_nodes
Returns:
cost (dict):
Dictionary with the cost of each component and the total cost.
"""
cost = {
"capacity_storage": hardware_config["capacity_storage_pb"]
* EUR_PER_PB_CAPACITY,
"data_product_storage": hardware_config["data_product_storage_pb"]
* EUR_PER_PB_CAPACITY,
"performance_storage": hardware_config["performance_storage_tb"]
* EUR_PER_TB_PERFORMANCE,
"compute": hardware_config["compute_nodes"] * EUR_PER_NODE,
}
cost["total"] = sum(cost.values())
return cost
[docs]
def parameterise_pipelines_num_nodes(
trial, max_compute_nodes, pipelines_config, n_groups=6
):
"""Parameterise the pipelines configuration.
Reads in the pipelines config file and uses optuna to sample the number of
compute nodes for each pipeline, replacing the "num_nodes" parameter.
Args:
trial:
The optuna trial object.
max_compute_nodes (int):
The maximum number of compute nodes.
pipelines_config_path (Path):
Path to the pipelines configuration file.
n_groups (int):
The number of groups for clustering pipelines. Defaults to 6.
Returns:
pipelines_config (dict):
The parameterised pipelines configuration.
trial:
The updated optuna trial object.
"""
pipelines_groups = group_pipelines(pipelines_config, n_groups=n_groups)
num_nodes_per_pipeline_group = {
group: trial.suggest_int(
f"num_nodes_group_{group}", 1, max_compute_nodes, log=True
)
for group in set(pipelines_groups.values())
}
for pipeline, config in pipelines_config.items():
config["num_nodes"] = num_nodes_per_pipeline_group[
pipelines_groups[pipeline]
]
return pipelines_config
[docs]
def group_pipelines(pipelines_config, n_groups, seed=42):
"""Group pipelines into n_groups.
Uses K-means clustering to group pipelines into n_groups based on
`node_hours_mean`, `pct_parallelism_min` and `pct_parallelism_max`.
Args:
pipelines_config (dict): The pipelines configuration.
n_groups (int): The number of groups to create.
Returns:
grouped_pipelines (dict):
Dictionary of pipelines with assigned group number.
"""
n_groups = min(len(pipelines_config), n_groups)
selected_features = [
"node_hours_mean",
"pct_parallelism_min",
"pct_parallelism_max",
]
pipelines_df = pd.DataFrame(pipelines_config).transpose()[
selected_features
]
scaler = StandardScaler()
pipelines_df_scaled = scaler.fit_transform(pipelines_df)
kmeans = KMeans(n_clusters=n_groups, random_state=seed)
pipelines_df["cluster"] = kmeans.fit_predict(pipelines_df_scaled)
return pipelines_df["cluster"].to_dict()