"""Module containing SchedulingBlockInstance class that represents a specific
instance of a scheduling block type."""
from collections import defaultdict
from simpy.events import AllOf
from ska_sdp_resource_model.simulate.constants import SECONDS_IN_1HR
from ska_sdp_resource_model.simulate.logger import log_event, setup_logger
from ska_sdp_resource_model.simulate.pipeline import Pipeline
[docs]
class SchedulingBlockInstance:
"""Class for holding a specific instance of a scheduling block type.
Methods:
run:
Simulate resource usage of a single scheduling block instance.
observe:
Simulate an observation using the telescope.
process:
Run batch processing for a given set of pipelines.
store_raw_visibilities:
Request and allocate storage capacity for raw visibilities.
store_pre_processed_visibilities:
Request and allocate storage capacity for pre-processed
visibilities.
store_data_products:
Request and allocate storage capacity for data products.
get_event_log:
Return the event log for the scheduling block instance.
"""
def __init__(self, scheduling_block_instance, scheduling_block_group):
"""Initialise the SchedulingBlockInstance object.
Args:
scheduling_block_instance (dict):
Configuration of pipelines for scheduling block instance.
scheduling_block_group (SchedulingBlock):
Scheduling block group to which the instance belongs.
"""
self.name = scheduling_block_instance["SBI_ID"]
self.group = scheduling_block_group
self.integration_time_hrs = scheduling_block_instance[
"scheduling_block_instance_time_hrs"
]
self.raw_vis_gb = round(scheduling_block_instance["raw_vis_gb"])
self.processed_vis_gb = round(
scheduling_block_instance["processed_vis_gb"]
)
self.pipelines = scheduling_block_instance["pipeline_steps"]
self.data_retention_hrs = scheduling_block_instance[
"data_retention_hrs"
]
self.data_products_stored_gb = round(
scheduling_block_instance["total_data_products_gb"]
)
self.total_data_retained_gb = (
self.data_products_stored_gb + self.raw_vis_gb
)
self.event_log = []
self.status = "pending"
self.logger = setup_logger()
[docs]
def run(self, env, telescope, sdp):
"""Simulate resource usage of a single scheduling block instance.
Args:
env (simpy.Environment): A simpy simulation environment.
telescope (simpy.Resource): A telescope resource.
sdp (ScienceDataProcessor): A ScienceDataProcessor object.
Returns:
None
"""
# Execute observation if storage is available
yield env.process(self.observe(env, telescope, sdp))
self.group.observation_completed()
if (
self.group.num_observations_completed
== self.group.num_sb_instances
):
yield env.process(self.run_batch_processing(env, sdp))
self.status = "completed"
return self.get_event_log()
[docs]
def run_batch_processing(self, env, sdp):
"""Run batch processing.
This method simulates the execution of batch processing for the
scheduling block instance. It allocates storage for raw visibilities,
pre-processed visibilities, and data products, and runs the pipelines
for data processing.
Args:
env (simpy.Environment):
A simpy simulation environment.
sdp (ScienceDataProcessor):
A ScienceDataProcessor object.
Yields:
simpy.events.Event:
Events for running pipelines and storing data products.
"""
# Allocate performance storage for pre-processed visibilities
yield env.process(self.allocate_performance_storage(env, sdp))
# Run batch processing pipelines
processing_start = env.now
if self.pipelines:
yield env.process(self.process(env, sdp))
log_event(
self.event_log, self.name, "block", processing_start, env.now
)
[docs]
def allocate_capacity_storage_raw_visibilities(self, env, sdp):
"""Request and allocate storage capacity for raw visibilities.
This method converts the required storage from gigabytes to petabytes,
requests the necessary capacity storage from the SDP
(ScienceDataProcessor), and logs the allocation event.
Args:
sdp (ScienceDataProcessor):
ScienceDataProcessor instance.
Yields:
simpy.events.Event:
An event that represents the request for storage capacity.
"""
if (
self.raw_vis_gb <= 0
): # Some scheduling blocks do not require storage
return
capacity_storage_wait_start = env.now
self.logger.debug(
"%d: %s: Requesting %d GB of capacity storage for raw "
"visibilities...",
env.now,
self.name,
self.raw_vis_gb,
)
sdp.log_resource_usage()
self.status = "waiting_for_capacity_storage"
yield sdp.storage_capacity.get(self.raw_vis_gb)
self.status = "in_progress"
capacity_storage_wait_end = env.now
self.logger.debug(
"%d: %s: %d GB capacity storage allocated for raw visibilities.",
env.now,
self.name,
self.raw_vis_gb,
)
log_event(
self.event_log,
self.name,
"capacity_storage_wait_raw_visibilities",
capacity_storage_wait_start,
capacity_storage_wait_end,
)
[docs]
def observe(self, env, telescope, sdp):
"""Simulate an observation using the telescope.
This method logs the start and end times of the observation and
appends the observation details to the event log.
Args:
env (simpy.Environment): A simpy simulation environment.
telescope (simpy.Resource): A telescope resource.
Yields:
simpy.events.Event: The event representing the request for the
telescope resource.
simpy.events.Timeout: The timeout event representing the
observation duration.
"""
self.logger.debug(
"%d: %s: Waiting for telescope...", env.now, self.name
)
with telescope.request() as telescope_request:
yield telescope_request
# Track the scheduling block instance in the scheduling block group
self.group.scheduling_block_instance_started(self.name)
# Allocate capacity storage for raw visibilities
yield env.process(
(self.allocate_capacity_storage_raw_visibilities(env, sdp))
)
# Observe
observation_start = env.now
self.logger.debug(
"%d: %s: Starting observation...", observation_start, self.name
)
yield env.timeout(int(self.integration_time_hrs * SECONDS_IN_1HR))
observation_end = env.now
self.logger.debug(
"%d: %s: Observation complete!", observation_end, self.name
)
log_event(
self.event_log,
self.name,
"observing",
observation_start,
observation_end,
)
[docs]
def process(self, env, sdp):
"""Run batch processing for a given set of pipelines.
This method simulates the execution of multiple data processing
pipelines and stores the resulting data products. It logs the start
and end times of the batch processing and calculates the total storage
required for the data products.
Args:
env (simpy.Environment): A simpy simulation environment.
sdp (ScienceDataProcessor): A ScienceDataProcessor object.
Yields:
simpy.events.Event:
Events for running pipelines and storing data products.
"""
batch_start = env.now
self.logger.debug(
"%d: %s: Starting batch processing...", batch_start, self.name
)
# Run pipelines
yield env.process(self.run_pipelines(env, sdp))
batch_end = env.now
self.logger.debug(
"%d: %s: Batch processing completed!", env.now, self.name
)
log_event(self.event_log, self.name, "batch", batch_start, batch_end)
[docs]
def delete_pre_processed_visibilities(self, env, sdp):
"""Delete pre-processed visibilities from performance storage.
This method deletes pre-processed visibilities from performance storage
after all pipeline steps have completed.
Args:
env (simpy.Environment): A simpy simulation environment.
sdp (ScienceDataProcessor): A ScienceDataProcessor object.
"""
self.status = "deleting_preprocessed_vis"
performance_storage_release_wait_start = env.now
self.logger.debug(
"%d: %s: Deleting %d GB of pre-processed visibilities from "
"performance storage.",
env.now,
self.name,
self.processed_vis_gb,
)
sdp.log_resource_usage()
yield sdp.storage_performance.put(self.processed_vis_gb)
performance_storage_release_wait_end = env.now
self.logger.debug(
"%d: %s: %d GB of pre-processed visibilities deleted from "
"performance storage.",
env.now,
self.name,
self.processed_vis_gb,
)
sdp.log_resource_usage()
self.logger.debug(
"Capacity of performance storage = %r",
sdp.storage_performance.capacity,
)
log_event(
self.event_log,
self.name,
"performance_storage_release_wait",
performance_storage_release_wait_start,
performance_storage_release_wait_end,
)
[docs]
def run_pipelines(self, env, sdp):
"""Run all pipelines for the scheduling block instance."""
pipeline_groups = self.get_priority_pipeline_groups()
for priority, pipelines in pipeline_groups.items():
self.logger.debug(
"%d: %s: Running pipelines with priority %d: %s",
env.now,
self.name,
priority,
pipelines,
)
pipeline_objects = []
pipeline_processes = []
for pipeline_name in pipelines:
self.status = "waiting_for_pipelines"
config = self.pipelines[pipeline_name]
config["data_retention_hrs"] = self.data_retention_hrs
pipeline = Pipeline(pipeline_name, config)
pipeline_objects.append(pipeline)
pipeline_processes.append(
env.process(pipeline.run(env, sdp, self.name))
)
# Wait for all pipelines in the group to complete
yield AllOf(env, pipeline_processes)
for pipeline in pipeline_objects:
if pipeline.data_product_storage_gb > 0:
env.process(pipeline.retain_data_products(env, sdp))
self.event_log.extend(pipeline.get_event_log())
# Delete pre-processed visibilities from performance storage
yield env.process(self.delete_pre_processed_visibilities(env, sdp))
# Delete raw visibilities from capacity storage
yield env.process(
self.group.scheduling_block_processing_completed(sdp)
)
[docs]
def get_priority_pipeline_groups(self):
"""Get dictionary of pipelines grouped by priority."""
if self.no_priority():
# If no pipelines have a priority set, assume serial execution
return {
i: [pipeline]
for i, pipeline in enumerate(self.pipelines.keys())
}
pipeline_priorities = {
pipeline: config.get("priority", 100)
for pipeline, config in self.pipelines.items()
}
priority_groups = defaultdict(list)
for pipeline, priority in pipeline_priorities.items():
priority_groups[priority].append(pipeline)
return priority_groups
[docs]
def no_priority(self):
"""Check if any pipeline has a priority set."""
return all(
config.get("priority") is None
for config in self.pipelines.values()
)
[docs]
def get_event_log(self):
"""Return the event log for the scheduling block instance."""
return self.event_log