Source code for ska_sdp_resource_model.simulate.pipeline

"""Class for simulating a pipeline step in a scheduling block instance."""

from math import ceil

from ska_sdp_resource_model.simulate.constants import SECONDS_IN_1HR
from ska_sdp_resource_model.simulate.logger import log_event, setup_logger


[docs] class Pipeline: """Class for simulating a pipeline step in a scheduling block instance. Methods: __init__(name, config): Initialises the Pipeline object with the given name and configuration. run(env, sdp, batch_name): Simulates the run of a single pipeline step in a scheduling block instance. allocate_compute_nodes(env, sdp): Allocates compute nodes for a given pipeline. get_runtime(): Calculates the total runtime based on node hours, percentage of parallelism, and nodes available. get_event_log(): Return the event log for the pipeline. retain_data_products(sdp): Simulate retention of data products. This method will simulate holding data in capacity storage until the time given by the data retention parameter in the scheduling block types config and then releasing them from storage after this time. allocate_capacity_storage_data_products(env, sdp): Allocates capacity storage for data products. """
[docs] def __init__(self, name, config): """Initialise the Pipeline object with the given name and configuration. Args: name (str): The name of the pipeline. config (dict): Configuration of the pipeline. """ self.name = name self.description = config["description"] self.node_hours = config["node_hours"] self.pct_parallelism = config["pct_parallelism"] self.data_product_storage_gb = round(config["data_product_storage_gb"]) self.data_retention_hrs = config["data_retention_hrs"] self.batch_name = None self.num_nodes = config["num_nodes"] self.runtime = None self.event_log = [] self.logger = setup_logger()
[docs] def run(self, env, sdp, batch_name): """Simulate resource usage of a single pipeline step in a scheduling block instance. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. batch_name (str): The name of the scheduling block instance. Yields: simpy.events.Event: Events for allocating compute nodes and running the pipeline. """ pipeline_start = env.now self.batch_name = batch_name self.logger.debug( "%d: %s - %s: Starting pipeline...", env.now, self.batch_name, self.name, ) # Allocate storage for data products yield env.process( self.allocate_capacity_storage_data_products(env, sdp) ) # Allocate compute nodes yield env.process(self.allocate_compute_nodes(env, sdp)) # Run pipeline pipeline_execution_start = env.now self.get_runtime() yield env.timeout(self.runtime) yield sdp.compute_nodes.put(self.num_nodes) # Release compute nodes pipeline_execution_end = env.now self.logger.debug( "%d: %s - %s: %d compute nodes released.", env.now, self.batch_name, self.name, self.num_nodes, ) self.logger.debug( "%d: %s - %s: Pipeline completed!", env.now, self.batch_name, self.name, ) log_event( self.event_log, self.batch_name, self.name + "_execution", pipeline_execution_start, pipeline_execution_end, ) log_event( self.event_log, self.batch_name, self.name + "_total", pipeline_start, pipeline_execution_end, )
[docs] def allocate_compute_nodes(self, env, sdp): """Allocate compute nodes for a given pipeline. This method requests a number of compute nodes based on the percentage of parallelism required by the pipeline. It logs the request and allocation process, and records the wait time for compute node allocation. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. Returns: simpy.events.Event: Events for requesting and receiving compute nodes. """ pipeline_compute_wait_start = env.now if self.num_nodes > sdp.compute_nodes.capacity: self.logger.warning( "Insufficient compute nodes available for pipeline %s. " "Allocating maximum available...", self.name, ) self.num_nodes = sdp.compute_nodes.capacity self.logger.debug( "%d: %s - %s: Requesting %d compute nodes...", env.now, self.batch_name, self.name, self.num_nodes, ) yield sdp.compute_nodes.get(self.num_nodes) pipeline_compute_wait_end = env.now log_event( self.event_log, self.batch_name, "compute_wait", pipeline_compute_wait_start, pipeline_compute_wait_end, ) self.logger.debug( "%d: %s - %s: %d compute nodes allocated.", env.now, self.batch_name, self.name, self.num_nodes, )
[docs] def allocate_capacity_storage_data_products(self, env, sdp): """Allocate storage for data products. This method requests a number of capacity storage based on the size of the data products. It logs the request and allocation process, and records the wait time for capacity storage allocation. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. Returns: simpy.events.Event: Events for requesting and receiving capacity storage. """ if self.data_product_storage_gb <= 0: # Some pipelines do not produce data products return capacity_storage_wait_start = env.now self.logger.debug( "%d: %s - %s: Requesting %d GB of data product storage for data " "products...", env.now, self.batch_name, self.name, self.data_product_storage_gb, ) sdp.log_resource_usage() yield sdp.storage_data_product.get(self.data_product_storage_gb) sdp.log_resource_usage() capacity_storage_wait_end = env.now self.logger.debug( "%d: %s - %s: %d GB data product storage allocated for data " "products.", env.now, self.batch_name, self.name, self.data_product_storage_gb, ) log_event( self.event_log, self.name, "capacity_storage_wait_data_products", capacity_storage_wait_start, capacity_storage_wait_end, )
[docs] def retain_data_products(self, env, sdp): """Simulate retention of data product. This method will simulate holding data in capacity storage until the time given by the data retention parameter in the scheduling block types config and then deleting them from storage after this time. Args: sdp (ScienceDataProcessor): A ScienceDataProcessor instance. Yields: simpy.events.Process: The process of requesting and storing retained data. """ self.logger.debug( "%d: %s - %s: Retaining %d GB of data products for %.1f hours...", env.now, self.batch_name, self.name, self.data_product_storage_gb, self.data_retention_hrs, ) data_retention_start = env.now yield env.timeout(int(self.data_retention_hrs * SECONDS_IN_1HR)) self.logger.debug( "%d: %s - %s: Deleting %d GB of data products from data product " "storage...", env.now, self.batch_name, self.name, self.data_product_storage_gb, ) yield sdp.storage_data_product.put(self.data_product_storage_gb) sdp.log_resource_usage() self.logger.debug( "%d: %s - %s: Deleted %d GB of data products from data product " "storage.", env.now, self.batch_name, self.name, self.data_product_storage_gb, ) data_retention_end = env.now log_event( self.event_log, self.name, "data_retention", data_retention_start, data_retention_end, )
[docs] def get_runtime(self): """Calculate the total runtime based on node hours, percentage of parallelism, and nodes available.""" parallel_fraction = self.pct_parallelism / 100 runtime_parallel = ( SECONDS_IN_1HR * self.node_hours * (parallel_fraction) / self.num_nodes ) runtime_serial = ( SECONDS_IN_1HR * self.node_hours * (1 - parallel_fraction) ) self.runtime = ceil(runtime_parallel + runtime_serial)
[docs] def get_event_log(self): """Return the event log for the pipeline.""" return self.event_log