"""Class for simulating a pipeline step in a scheduling block instance."""
from math import ceil
from ska_sdp_resource_model.simulate.constants import SECONDS_IN_1HR
from ska_sdp_resource_model.simulate.logger import log_event, setup_logger
[docs]
class Pipeline:
"""Class for simulating a pipeline step in a scheduling block instance.
Methods:
__init__(name, config):
Initialises the Pipeline object with the given name and
configuration.
run(env, sdp, batch_name):
Simulates the run of a single pipeline step in a scheduling block
instance.
allocate_compute_nodes(env, sdp):
Allocates compute nodes for a given pipeline.
get_runtime():
Calculates the total runtime based on node hours, percentage of
parallelism, and nodes available.
get_event_log():
Return the event log for the pipeline.
retain_data_products(sdp):
Simulate retention of data products. This method will simulate
holding data in capacity storage until the time given by the data
retention parameter in the scheduling block types config and then
releasing them from storage after this time.
allocate_capacity_storage_data_products(env, sdp):
Allocates capacity storage for data products.
"""
[docs]
def __init__(self, name, config):
"""Initialise the Pipeline object with the given name and
configuration.
Args:
name (str): The name of the pipeline. config (dict):
Configuration of the pipeline.
"""
self.name = name
self.description = config["description"]
self.node_hours = config["node_hours"]
self.pct_parallelism = config["pct_parallelism"]
self.data_product_storage_gb = round(config["data_product_storage_gb"])
self.data_retention_hrs = config["data_retention_hrs"]
self.batch_name = None
self.num_nodes = config["num_nodes"]
self.runtime = None
self.event_log = []
self.logger = setup_logger()
[docs]
def run(self, env, sdp, batch_name):
"""Simulate resource usage of a single pipeline step in a scheduling
block instance.
Args:
env (simpy.Environment): A simpy simulation environment.
sdp (ScienceDataProcessor): A ScienceDataProcessor object.
batch_name (str): The name of the scheduling block instance.
Yields:
simpy.events.Event:
Events for allocating compute nodes and running the pipeline.
"""
pipeline_start = env.now
self.batch_name = batch_name
self.logger.debug(
"%d: %s - %s: Starting pipeline...",
env.now,
self.batch_name,
self.name,
)
# Allocate storage for data products
yield env.process(
self.allocate_capacity_storage_data_products(env, sdp)
)
# Allocate compute nodes
yield env.process(self.allocate_compute_nodes(env, sdp))
# Run pipeline
pipeline_execution_start = env.now
self.get_runtime()
yield env.timeout(self.runtime)
yield sdp.compute_nodes.put(self.num_nodes) # Release compute nodes
pipeline_execution_end = env.now
self.logger.debug(
"%d: %s - %s: %d compute nodes released.",
env.now,
self.batch_name,
self.name,
self.num_nodes,
)
self.logger.debug(
"%d: %s - %s: Pipeline completed!",
env.now,
self.batch_name,
self.name,
)
log_event(
self.event_log,
self.batch_name,
self.name + "_execution",
pipeline_execution_start,
pipeline_execution_end,
)
log_event(
self.event_log,
self.batch_name,
self.name + "_total",
pipeline_start,
pipeline_execution_end,
)
[docs]
def allocate_compute_nodes(self, env, sdp):
"""Allocate compute nodes for a given pipeline.
This method requests a number of compute nodes based on the percentage
of parallelism required by the pipeline. It logs the request and
allocation process, and records the wait time for compute node
allocation.
Args:
env (simpy.Environment): A simpy simulation environment.
sdp (ScienceDataProcessor): A ScienceDataProcessor object.
Returns:
simpy.events.Event:
Events for requesting and receiving compute nodes.
"""
pipeline_compute_wait_start = env.now
if self.num_nodes > sdp.compute_nodes.capacity:
self.logger.warning(
"Insufficient compute nodes available for pipeline %s. "
"Allocating maximum available...",
self.name,
)
self.num_nodes = sdp.compute_nodes.capacity
self.logger.debug(
"%d: %s - %s: Requesting %d compute nodes...",
env.now,
self.batch_name,
self.name,
self.num_nodes,
)
yield sdp.compute_nodes.get(self.num_nodes)
pipeline_compute_wait_end = env.now
log_event(
self.event_log,
self.batch_name,
"compute_wait",
pipeline_compute_wait_start,
pipeline_compute_wait_end,
)
self.logger.debug(
"%d: %s - %s: %d compute nodes allocated.",
env.now,
self.batch_name,
self.name,
self.num_nodes,
)
[docs]
def allocate_capacity_storage_data_products(self, env, sdp):
"""Allocate storage for data products.
This method requests a number of capacity storage based on the size
of the data products. It logs the request and allocation process, and
records the wait time for capacity storage allocation.
Args:
env (simpy.Environment): A simpy simulation environment.
sdp (ScienceDataProcessor): A ScienceDataProcessor object.
Returns:
simpy.events.Event:
Events for requesting and receiving capacity storage.
"""
if self.data_product_storage_gb <= 0:
# Some pipelines do not produce data products
return
capacity_storage_wait_start = env.now
self.logger.debug(
"%d: %s - %s: Requesting %d GB of data product storage for data "
"products...",
env.now,
self.batch_name,
self.name,
self.data_product_storage_gb,
)
sdp.log_resource_usage()
yield sdp.storage_data_product.get(self.data_product_storage_gb)
sdp.log_resource_usage()
capacity_storage_wait_end = env.now
self.logger.debug(
"%d: %s - %s: %d GB data product storage allocated for data "
"products.",
env.now,
self.batch_name,
self.name,
self.data_product_storage_gb,
)
log_event(
self.event_log,
self.name,
"capacity_storage_wait_data_products",
capacity_storage_wait_start,
capacity_storage_wait_end,
)
[docs]
def retain_data_products(self, env, sdp):
"""Simulate retention of data product.
This method will simulate holding data in capacity storage until the
time given by the data retention parameter in the scheduling block
types config and then deleting them from storage after this time.
Args:
sdp (ScienceDataProcessor): A ScienceDataProcessor instance.
Yields:
simpy.events.Process:
The process of requesting and storing retained data.
"""
self.logger.debug(
"%d: %s - %s: Retaining %d GB of data products for %.1f hours...",
env.now,
self.batch_name,
self.name,
self.data_product_storage_gb,
self.data_retention_hrs,
)
data_retention_start = env.now
yield env.timeout(int(self.data_retention_hrs * SECONDS_IN_1HR))
self.logger.debug(
"%d: %s - %s: Deleting %d GB of data products from data product "
"storage...",
env.now,
self.batch_name,
self.name,
self.data_product_storage_gb,
)
yield sdp.storage_data_product.put(self.data_product_storage_gb)
sdp.log_resource_usage()
self.logger.debug(
"%d: %s - %s: Deleted %d GB of data products from data product "
"storage.",
env.now,
self.batch_name,
self.name,
self.data_product_storage_gb,
)
data_retention_end = env.now
log_event(
self.event_log,
self.name,
"data_retention",
data_retention_start,
data_retention_end,
)
[docs]
def get_runtime(self):
"""Calculate the total runtime based on node hours, percentage of
parallelism, and nodes available."""
parallel_fraction = self.pct_parallelism / 100
runtime_parallel = (
SECONDS_IN_1HR
* self.node_hours
* (parallel_fraction)
/ self.num_nodes
)
runtime_serial = (
SECONDS_IN_1HR * self.node_hours * (1 - parallel_fraction)
)
self.runtime = ceil(runtime_parallel + runtime_serial)
[docs]
def get_event_log(self):
"""Return the event log for the pipeline."""
return self.event_log