Source code for ska_sdp_resource_model.simulate.resource_usage

"""Module for simulating resource usage of an observing schedule."""

import pandas as pd
import simpy

from ska_sdp_resource_model.simulate.constants import (
    SECONDS_IN_1HR,
    SECONDS_IN_DAY,
)
from ska_sdp_resource_model.simulate.logger import log_event, setup_logger
from ska_sdp_resource_model.simulate.process_inputs import (
    get_scheduling_blocks_data,
)
from ska_sdp_resource_model.simulate.scheduling_block import SchedulingBlock
from ska_sdp_resource_model.simulate.scheduling_block_instance import (
    SchedulingBlockInstance,
)
from ska_sdp_resource_model.simulate.science_data_processor import (
    ScienceDataProcessor,
)
from ska_sdp_resource_model.simulate.validate_inputs import (
    configuration_ok,
    inputs_ok,
)


[docs] class ResourceUsageSimulator: """Class for simulating resource usage. Methods: __init__(): Initialises the ResourceUsageSimulator with empty event and resource logs. initialise_logs(): Initialises the event and resource logs. initialise_environment(): Initialises the simulation environment using the SimPy library. initialise_resources(hardware_config): Initialises the telescope and ScienceDataProcessor resources. run_simulation(observations, hardware_config): Simulates resource usage of the SDP. """
[docs] def __init__(self): self.initialise_logs() self.initialise_environment() self.telescope = None self.sdp = None self.logger = setup_logger()
[docs] def initialise_logs(self): """Initialise lists for containing logging.""" self.event_log = [] self.resource_log = []
[docs] def initialise_environment(self): """Initialise the simulation environment. This method sets up the simulation environment using the SimPy library. It creates an instance of the simpy.Environment class and assigns it to the 'env' attribute of the object. Returns: None """ self.env = simpy.Environment()
[docs] def initialise_resources(self, hardware_config): """Initialise the resources required for the simulation. This method sets up the telescope resource and the ScienceDataProcessor (SDP) using the provided hardware configuration. Args: hardware_config (dict): A dictionary containing the hardware configuration for the Science Data Processor. """ self.telescope = simpy.Resource(self.env, capacity=1) self.sdp = ScienceDataProcessor(self.env, hardware_config)
[docs] def run_simulation(self, observations, hardware_config): """Simulate resource usage of the SDP. Args: observations (list): Sequence of scheduling blocks with pipeline configurations to use as input to simulation. hardware_config (dict): Hardware configuration. Returns: dict: A dictionary containing the resource usage and event logs as DataFrames. """ self.initialise_logs() self.initialise_environment() self.initialise_resources(hardware_config) self.logger.debug( "Running simulation of resource usage...\n" "Hardware configuration: %s\n" "Observations: %s", hardware_config, observations, ) if not ( inputs_ok(observations, hardware_config) and configuration_ok(observations, hardware_config) ): raise ValueError( "Invalid inputs. Please check the logs for more information." ) self.logger.debug("Inputs are valid...") # Initialise scheduling blocks scheduling_blocks_data = get_scheduling_blocks_data(observations) scheduling_blocks = {} for sb_id, sb_data in scheduling_blocks_data.items(): scheduling_blocks[sb_id] = SchedulingBlock( self.env, sb_id, sb_data["num_scheduling_block_instances"], sb_data["data_retention_hrs"], sb_data["raw_vis_gb"], sb_data["total_data_products_gb"], ) self.logger.debug("%s: Scheduling block initialised.", sb_id) # Scheduling block instances scheduling_block_instances = [] for observation in observations: scheduling_block_instance = SchedulingBlockInstance( observation, scheduling_blocks[observation.get("SB_ID")] ) self.env.process( scheduling_block_instance.run( self.env, self.telescope, self.sdp ) ) scheduling_block_instances.append(scheduling_block_instance) self.logger.debug( "%s: Observation scheduled.", scheduling_block_instance.name ) self.logger.debug("Observations scheduled...") self.logger.info(f"{' Simulation starts ':-^80}") self.env.run() self.logger.info(f"{' Done! ':-^80}") self.add_events_from_batch(scheduling_block_instances) self.add_events_from_batch(scheduling_blocks.values()) failed_blocks = self.get_failed_blocks(scheduling_block_instances) log_event( self.event_log, batch_name="Simulation", step_name="Simulation", start=0, end=self.env.now, ) self.logger.debug("Fetching logs...") event_log_df = self.event_log_df() resource_log_df = self.sdp.resource_usage_df() self.logger.debug( "Capacity storage log:\n: %s\n" "Performance storage log:\n: %s\n" "Compute nodes log:\n: %s\n" "Event log:\n: %s", self.sdp.storage_capacity_log, self.sdp.storage_performance_log, self.sdp.nodes_log, self.event_log, ) if failed_blocks: self.logger.critical( f"{' %d/%d scheduling blocks failed to complete! ':-^80}\n", len(failed_blocks), len(scheduling_block_instances), failed_blocks, self.sdp.storage_capacity_log, ) success = 0 else: self.logger.info("Simulation completed successfully!") self.logger.info( "Time taken: %.2f days", resource_log_df["time_s"].max() / SECONDS_IN_DAY, ) success = 1 return { "resource_usage": resource_log_df, "event_log": event_log_df, "num_successes": success, }
[docs] def event_log_df(self): """Return the event log for the simulation as a DataFrame. Events are sorted by simulation start time and the duration of each event is calculated in seconds, hours, and days. An SB_type column is added to store the type of scheduling block for each event. Returns: event_log_df (pd.DataFrame): A DataFrame containing the event log. """ event_log_df = pd.DataFrame(self.event_log) event_log_df = event_log_df.astype( { "batch_name": "string", "step": "string", "start": int, "end": int, } ) event_log_df = event_log_df.drop_duplicates().sort_values("start") event_log_df["duration"] = event_log_df["end"] - event_log_df["start"] event_log_df["duration_hrs"] = ( event_log_df["duration"] / SECONDS_IN_1HR ) event_log_df["duration_days"] = event_log_df["duration_hrs"] / 24 event_log_df["SB_type"] = event_log_df["batch_name"].apply( lambda x: x.split("_")[0][:2] ) return event_log_df
[docs] def add_events_from_batch(self, batch): """Add events from a batch to the simulation event log. Args: batch (list): List of SchedulingBlock or SchedulingBlockInstance objects. """ for batch_object in batch: batch_object_event_log = batch_object.get_event_log() self.event_log.extend(batch_object_event_log)
[docs] def get_failed_blocks(self, scheduling_block_instances): """Return a list of failed scheduling blocks. Args: scheduling_block_instances (list): List of SchedulingBlockInstance objects. Returns: list: List of failed scheduling blocks with their latest status. """ failed_blocks = [] for scheduling_block_instance in scheduling_block_instances: if (status := scheduling_block_instance.status) != "completed": failed_blocks.append( {(batch_name := scheduling_block_instance.name): status} ) log_event( self.event_log, batch_name=batch_name, step_name=f"Failed: {status}", start=self.env.now, end=self.env.now, ) return failed_blocks