Source code for ska_sdp_resource_model.simulate.scheduling_block_instance

"""Module containing SchedulingBlockInstance class that represents a specific
instance of a scheduling block type."""

from collections import defaultdict

from simpy.events import AllOf

from ska_sdp_resource_model.simulate.constants import SECONDS_IN_1HR
from ska_sdp_resource_model.simulate.logger import log_event, setup_logger
from ska_sdp_resource_model.simulate.pipeline import Pipeline


[docs] class SchedulingBlockInstance: """Class for holding a specific instance of a scheduling block type. Methods: run: Simulate resource usage of a single scheduling block instance. observe: Simulate an observation using the telescope. process: Run batch processing for a given set of pipelines. store_raw_visibilities: Request and allocate storage capacity for raw visibilities. store_pre_processed_visibilities: Request and allocate storage capacity for pre-processed visibilities. store_data_products: Request and allocate storage capacity for data products. get_event_log: Return the event log for the scheduling block instance. """ def __init__(self, scheduling_block_instance, scheduling_block_group): """Initialise the SchedulingBlockInstance object. Args: scheduling_block_instance (dict): Configuration of pipelines for scheduling block instance. scheduling_block_group (SchedulingBlock): Scheduling block group to which the instance belongs. """ self.name = scheduling_block_instance["SBI_ID"] self.group = scheduling_block_group self.integration_time_hrs = scheduling_block_instance[ "scheduling_block_instance_time_hrs" ] self.raw_vis_gb = round(scheduling_block_instance["raw_vis_gb"]) self.processed_vis_gb = round( scheduling_block_instance["processed_vis_gb"] ) self.pipelines = scheduling_block_instance["pipeline_steps"] self.data_retention_hrs = scheduling_block_instance[ "data_retention_hrs" ] self.data_products_stored_gb = round( scheduling_block_instance["total_data_products_gb"] ) self.total_data_retained_gb = ( self.data_products_stored_gb + self.raw_vis_gb ) self.event_log = [] self.status = "pending" self.logger = setup_logger()
[docs] def run(self, env, telescope, sdp): """Simulate resource usage of a single scheduling block instance. Args: env (simpy.Environment): A simpy simulation environment. telescope (simpy.Resource): A telescope resource. sdp (ScienceDataProcessor): A ScienceDataProcessor object. Returns: None """ # Execute observation if storage is available yield env.process(self.observe(env, telescope, sdp)) self.group.observation_completed() if ( self.group.num_observations_completed == self.group.num_sb_instances ): yield env.process(self.run_batch_processing(env, sdp)) self.status = "completed" return self.get_event_log()
[docs] def run_batch_processing(self, env, sdp): """Run batch processing. This method simulates the execution of batch processing for the scheduling block instance. It allocates storage for raw visibilities, pre-processed visibilities, and data products, and runs the pipelines for data processing. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. Yields: simpy.events.Event: Events for running pipelines and storing data products. """ # Allocate performance storage for pre-processed visibilities yield env.process(self.allocate_performance_storage(env, sdp)) # Run batch processing pipelines processing_start = env.now if self.pipelines: yield env.process(self.process(env, sdp)) log_event( self.event_log, self.name, "block", processing_start, env.now )
[docs] def allocate_capacity_storage_raw_visibilities(self, env, sdp): """Request and allocate storage capacity for raw visibilities. This method converts the required storage from gigabytes to petabytes, requests the necessary capacity storage from the SDP (ScienceDataProcessor), and logs the allocation event. Args: sdp (ScienceDataProcessor): ScienceDataProcessor instance. Yields: simpy.events.Event: An event that represents the request for storage capacity. """ if ( self.raw_vis_gb <= 0 ): # Some scheduling blocks do not require storage return capacity_storage_wait_start = env.now self.logger.debug( "%d: %s: Requesting %d GB of capacity storage for raw " "visibilities...", env.now, self.name, self.raw_vis_gb, ) sdp.log_resource_usage() self.status = "waiting_for_capacity_storage" yield sdp.storage_capacity.get(self.raw_vis_gb) self.status = "in_progress" capacity_storage_wait_end = env.now self.logger.debug( "%d: %s: %d GB capacity storage allocated for raw visibilities.", env.now, self.name, self.raw_vis_gb, ) log_event( self.event_log, self.name, "capacity_storage_wait_raw_visibilities", capacity_storage_wait_start, capacity_storage_wait_end, )
[docs] def observe(self, env, telescope, sdp): """Simulate an observation using the telescope. This method logs the start and end times of the observation and appends the observation details to the event log. Args: env (simpy.Environment): A simpy simulation environment. telescope (simpy.Resource): A telescope resource. Yields: simpy.events.Event: The event representing the request for the telescope resource. simpy.events.Timeout: The timeout event representing the observation duration. """ self.logger.debug( "%d: %s: Waiting for telescope...", env.now, self.name ) with telescope.request() as telescope_request: yield telescope_request # Track the scheduling block instance in the scheduling block group self.group.scheduling_block_instance_started(self.name) # Allocate capacity storage for raw visibilities yield env.process( (self.allocate_capacity_storage_raw_visibilities(env, sdp)) ) # Observe observation_start = env.now self.logger.debug( "%d: %s: Starting observation...", observation_start, self.name ) yield env.timeout(int(self.integration_time_hrs * SECONDS_IN_1HR)) observation_end = env.now self.logger.debug( "%d: %s: Observation complete!", observation_end, self.name ) log_event( self.event_log, self.name, "observing", observation_start, observation_end, )
[docs] def process(self, env, sdp): """Run batch processing for a given set of pipelines. This method simulates the execution of multiple data processing pipelines and stores the resulting data products. It logs the start and end times of the batch processing and calculates the total storage required for the data products. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. Yields: simpy.events.Event: Events for running pipelines and storing data products. """ batch_start = env.now self.logger.debug( "%d: %s: Starting batch processing...", batch_start, self.name ) # Run pipelines yield env.process(self.run_pipelines(env, sdp)) batch_end = env.now self.logger.debug( "%d: %s: Batch processing completed!", env.now, self.name ) log_event(self.event_log, self.name, "batch", batch_start, batch_end)
[docs] def allocate_performance_storage(self, env, sdp): """Request and allocate storage for pre-processed visibilities. This method converts the required storage from gigabytes to terabytes, requests the necessary performance storage from the SDP (ScienceDataProcessor), and logs the allocation event. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. Returns: performance_storage_required (float): The amount of performance storage allocated in TB. """ self.status = "waiting_for_performance_storage" if ( self.processed_vis_gb > 0 ): # Some scheduling blocks do not require storage performance_storage_wait_start = env.now self.logger.debug( "%d: %s: Requesting %d GB of performance storage for " "pre-processed visibilities...", env.now, self.name, self.processed_vis_gb, ) sdp.log_resource_usage() yield sdp.storage_performance.get(self.processed_vis_gb) performance_storage_wait_end = env.now self.logger.debug( "%d: %s: %d GB performance storage allocated for " "pre-processed visibilities.", env.now, self.name, self.processed_vis_gb, ) sdp.log_resource_usage() log_event( self.event_log, self.name, "performance_storage_wait", performance_storage_wait_start, performance_storage_wait_end, )
[docs] def delete_pre_processed_visibilities(self, env, sdp): """Delete pre-processed visibilities from performance storage. This method deletes pre-processed visibilities from performance storage after all pipeline steps have completed. Args: env (simpy.Environment): A simpy simulation environment. sdp (ScienceDataProcessor): A ScienceDataProcessor object. """ self.status = "deleting_preprocessed_vis" performance_storage_release_wait_start = env.now self.logger.debug( "%d: %s: Deleting %d GB of pre-processed visibilities from " "performance storage.", env.now, self.name, self.processed_vis_gb, ) sdp.log_resource_usage() yield sdp.storage_performance.put(self.processed_vis_gb) performance_storage_release_wait_end = env.now self.logger.debug( "%d: %s: %d GB of pre-processed visibilities deleted from " "performance storage.", env.now, self.name, self.processed_vis_gb, ) sdp.log_resource_usage() self.logger.debug( "Capacity of performance storage = %r", sdp.storage_performance.capacity, ) log_event( self.event_log, self.name, "performance_storage_release_wait", performance_storage_release_wait_start, performance_storage_release_wait_end, )
[docs] def run_pipelines(self, env, sdp): """Run all pipelines for the scheduling block instance.""" pipeline_groups = self.get_priority_pipeline_groups() for priority, pipelines in pipeline_groups.items(): self.logger.debug( "%d: %s: Running pipelines with priority %d: %s", env.now, self.name, priority, pipelines, ) pipeline_objects = [] pipeline_processes = [] for pipeline_name in pipelines: self.status = "waiting_for_pipelines" config = self.pipelines[pipeline_name] config["data_retention_hrs"] = self.data_retention_hrs pipeline = Pipeline(pipeline_name, config) pipeline_objects.append(pipeline) pipeline_processes.append( env.process(pipeline.run(env, sdp, self.name)) ) # Wait for all pipelines in the group to complete yield AllOf(env, pipeline_processes) for pipeline in pipeline_objects: if pipeline.data_product_storage_gb > 0: env.process(pipeline.retain_data_products(env, sdp)) self.event_log.extend(pipeline.get_event_log()) # Delete pre-processed visibilities from performance storage yield env.process(self.delete_pre_processed_visibilities(env, sdp)) # Delete raw visibilities from capacity storage yield env.process( self.group.scheduling_block_processing_completed(sdp) )
[docs] def get_priority_pipeline_groups(self): """Get dictionary of pipelines grouped by priority.""" if self.no_priority(): # If no pipelines have a priority set, assume serial execution return { i: [pipeline] for i, pipeline in enumerate(self.pipelines.keys()) } pipeline_priorities = { pipeline: config.get("priority", 100) for pipeline, config in self.pipelines.items() } priority_groups = defaultdict(list) for pipeline, priority in pipeline_priorities.items(): priority_groups[priority].append(pipeline) return priority_groups
[docs] def no_priority(self): """Check if any pipeline has a priority set.""" return all( config.get("priority") is None for config in self.pipelines.values() )
[docs] def get_event_log(self): """Return the event log for the scheduling block instance.""" return self.event_log