"""Module for running the resource usage and Monte Carlo simulation."""
import argparse
import time
from pathlib import Path
from ska_sdp_resource_model.simulate.logger import setup_logger
from ska_sdp_resource_model.simulate.monte_carlo import MonteCarloSimulation
from ska_sdp_resource_model.simulate.process_inputs import (
HARDWARE_CONFIG_PATH,
OBSERVING_SCHEDULE_PATH,
PIPELINES_CONFIG_PATH,
SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
load_hardware_configs,
process_inputs,
)
from ska_sdp_resource_model.simulate.resource_usage import (
ResourceUsageSimulator,
)
parser = argparse.ArgumentParser(
description="Run the resource usage simulation."
)
parser.add_argument(
"--observing_schedule_path",
type=Path,
default=OBSERVING_SCHEDULE_PATH,
help="Path to CSV file containing the observing schedule.",
)
parser.add_argument(
"--generate_observing_schedule_hrs",
type=int,
default=0,
help="Generate a new observing schedule with the specified number of "
"hours of observations. Samples from the scheduling block types "
"config file and generate datetimes and IDs. Replaces the observing "
"schedule file specified by --observing_schedule_path.",
)
parser.add_argument(
"--hardware_path",
type=Path,
default=HARDWARE_CONFIG_PATH,
help="Path to JSON file containing hardware configuration.",
)
parser.add_argument(
"--hardware",
type=str,
default="control",
help="Name of the hardware configuration to use.",
)
parser.add_argument(
"--scheduling_block_types_path",
type=Path,
default=SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
help="Path to JSON file containing scheduling block types "
"configuration.",
)
parser.add_argument(
"--pipelines_path",
type=Path,
default=PIPELINES_CONFIG_PATH,
help="Path to JSON file containing pipelines configuration.",
)
parser.add_argument(
"--num_monte_carlo_iterations",
type=int,
default=0,
help="Number of Monte Carlo iterations to run.",
)
parser.add_argument(
"--num_workers",
type=int,
help="Number of worker processes to use for running parallel "
"simulations. If `None` or `-1`, a suitable default number of workers "
"will be chosen. For a small number of iterations (`n_iter < 500`), "
"the simulations will be run sequentially on a single worker. For "
"larger simulations, the default value is set to four times the cpu "
"count of the system at import time.",
default=1,
)
parser.add_argument(
"--shuffle_observations",
action="store_true",
help="Shuffle the observations list before running the simulation.",
)
parser.add_argument(
"--output_path",
type=Path,
default="output",
help="Path to output directory.",
)
parser.add_argument(
"--verbose", action="store_true", help="Print logging to console."
)
parser.add_argument(
"--debug",
action="store_true",
help="Set logging level to DEBUG for detailed logging.",
)
[docs]
def main():
"""Run the resource usage simulation.
Parses the command line arguments and runs the resource usage simulation.
Returns:
dict: Dictionary containing the simulation output.
"""
args = parser.parse_args()
logger = setup_logger(
output_dir=args.output_path, verbose=args.verbose, debug=args.debug
)
logger.info("SDP Resource Model started with args: %s", vars(args))
output = run_simulation(
hardware_config=args.hardware,
observing_schedule_path=args.observing_schedule_path,
scheduling_block_types_config_path=args.scheduling_block_types_path,
hardware_config_path=args.hardware_path,
pipelines_config_path=args.pipelines_path,
num_monte_carlo_iterations=args.num_monte_carlo_iterations,
num_workers=args.num_workers,
shuffle=args.shuffle_observations,
generate_observing_schedule_hrs=args.generate_observing_schedule_hrs,
)
# Output the event log and resource usage log to CSV files
args.output_path.mkdir(parents=True, exist_ok=True)
file_prefix = time.strftime("%Y%m%d%H%M%S")
output["event_log"].to_csv(
args.output_path / f"{file_prefix}-event-log.csv", index=False
)
output["resource_usage"].to_csv(
args.output_path / f"{file_prefix}-resource-log.csv", index=False
)
[docs]
def run_simulation(
hardware_config,
observing_schedule_path=OBSERVING_SCHEDULE_PATH,
scheduling_block_types_config_path=SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
hardware_config_path=HARDWARE_CONFIG_PATH,
pipelines_config_path=PIPELINES_CONFIG_PATH,
num_monte_carlo_iterations=0,
num_workers=1,
shuffle=False,
generate_observing_schedule_hrs=0,
):
"""Runs a resource usage simulation based on the provided hardware
configuration.
Args:
hardware_config (str):
The key identifying the hardware configuration to use.
observing_schedule_path (str, optional):
Path to the CSV file containing the observing schedule. Defaults to
"data/schedules/observing_schedule.json".
scheduling_block_types_config_path (Path, optional): Path to the JSON
file defining scheduling block types. Defaults to
Path("data/config/scheduling_block_types.json").
hardware_config_path (Path, optional): Path to the JSON file
containing hardware configurations. Defaults to
Path("data/config/hardware.json").
pipelines_config_path (Path, optional): Path to the JSON file defining
pipeline configurations. Defaults to
Path("data/config/pipelines.json").
num_monte_carlo_iterations (int): Number of Monte Carlo iterations to
run. Defaults to 0.
num_workers (int): Number of worker processes to use for running
parallel simulations.
shuffle (bool): Whether to shuffle the observations list before
running the simulation. Defaults to False.
generate_observing_schedule_hrs (int): Number of hours of observations
to generate. Replaces observations from file if > 0. Defaults to 0.
Returns:
dict: A dictionary containing the simulation results.
Raises:
KeyError: If the specified `hardware_config` is not found in the
hardware configuration data.
"""
logger = setup_logger()
# Resolve inputs and validate
observations_list, pipelines_config_from_file = process_inputs(
observing_schedule_path,
scheduling_block_types_config_path,
pipelines_config_path,
generate_observing_schedule_hrs,
)
hardware_config_data = load_hardware_configs(
hardware_config_path=hardware_config_path
)
if hardware_config not in hardware_config_data:
message = (
f"Invalid hardware config ({hardware_config}). "
f"Valid configs: {', '.join(hardware_config_data.keys())}."
)
logger.error(message)
raise ValueError(message)
hardware_config = hardware_config_data[hardware_config]
num_monte_carlo_iterations = int(num_monte_carlo_iterations)
if num_monte_carlo_iterations < 0:
raise ValueError(
"Number of Monte Carlo iterations should be a positive integer"
" value."
)
if num_monte_carlo_iterations == 0:
simulator = ResourceUsageSimulator()
return simulator.run_simulation(observations_list, hardware_config)
logger.info("Running a Monte Carlo simulation...")
_, resource_usage, event_logs, num_successes = monte_carlo_simulation(
observations_list,
pipelines_config_from_file,
hardware_config,
n_iter=num_monte_carlo_iterations,
shuffle=shuffle,
num_workers=num_workers,
)
return {
"resource_usage": resource_usage,
"event_log": event_logs,
"num_successes": num_successes,
}
[docs]
def monte_carlo_simulation(
observations_list,
pipelines_config,
hardware_config,
n_iter=100,
num_workers=1,
shuffle=False,
seed=None,
client=None,
):
"""Run a Monte Carlo simulation of the SDP Resource Model.
Args:
observations_list (list):
A list of dictionaries containing the observations to be simulated.
This must contain the configurations for each scheduling block type
and each pipeline step.
pipelines_config (dict):
A dictionary containing the configuration of each pipeline. Must
contain the keys node_hours_mean, node_hours_uncertainty,
pct_parallelism_min and pct_parallelism_max.
hardware_config (dict):
A dictionary containing the hardware configuration to use in the
simulation.
n_iter (int):
The number of Monte Carlo trials to run. Default is 100.
num_workers (int):
Number of worker processes to use for running parallel simulations.
If `None` or `-1`, a suitable default number of workers will be
chosen. For a small number of iterations (`n_iter < 100`), the
simulations will be run sequentially on a single worker. For
larger simulations, the default value is set to four times the
cpu count of the system at import time.
shuffle (bool):
Whether to shuffle the observations list before running the
simulation. Default is False.
seed (int):
The random seed to use for the Monte Carlo simulation for
reproducible results. Default is None.
client (dask.Client):
Optional Dask client to utilise for parallelism. If provided,
the Monte Carlo iterations will be run using this scheduler.
If None, any previously initialized schedulers will automatically
be used by dask. If None, and no exisiting dask scheduler is
running, the Monte Carlo iterations will be run sequentially on a
single process.
Returns:
run_times_days (list):
A list of the total simulation times for each Monte Carlo trial in
days.
resource_usages (list):
A list of the resource usage dataframes for each Monte Carlo trial.
event_logs (list):
A list of the event logs dataframes for each Monte Carlo trial.
num_successful_runs (int):
The number of successful runs in the Monte Carlo simulation.
"""
sim = MonteCarloSimulation(
observations_list, pipelines_config, hardware_config, shuffle
)
return sim.run(n_iter, num_workers, seed, client)