Source code for ska_sdp_resource_model.simulate.generate_inputs

"""Module to generate inputs for the simulation."""

import itertools

import numpy as np
import pandas as pd

from ska_sdp_resource_model.simulate.constants import SECONDS_IN_1HR


[docs] def generate_observing_schedule( scheduling_block_types_config, randomise=True, schedule_length_hrs=7 * 24, seed=42, ): """Generate an observing schedule of specified length. Args: scheduling_block_types_config (dict): Scheduling block types configuration. schedule_length_hrs (int): Total length of the observing schedule in hours. seed (int): Random seed for sampling scheduling blocks. Returns: pd.DataFrame: Observing schedule DataFrame containing randomly sampled scheduling blocks. """ if randomise: scheduling_blocks = sample_scheduling_blocks( scheduling_block_types_config, schedule_length_hrs, seed ) else: scheduling_blocks = cycle_scheduling_blocks( scheduling_block_types_config, schedule_length_hrs ) return fill_observing_schedule_df(scheduling_blocks)
[docs] def cycle_scheduling_blocks( scheduling_block_types_config, schedule_length_hrs=7 * 24 ): """Iterate over scheduling block types configuration. Args: scheduling_block_types_config (dict): Scheduling block types configuration. schedule_length_hrs (int): Total length of the observation schedule in hours. Returns: scheduling_blocks (list): List of scheduling blocks. """ integration_sum = 0 scheduling_blocks = [] for project_name in itertools.cycle(scheduling_block_types_config): config = scheduling_block_types_config[project_name] integration_time = config["scheduling_block_instance_time_hrs"] observation = { "SB_type": project_name, "integration_time_hrs": config["integration_time_hrs"], "scheduling_block_instance_time_hrs": integration_time, "short_name": config["short_name"], } scheduling_blocks.append(observation) integration_sum += integration_time if integration_sum > schedule_length_hrs: break return scheduling_blocks
[docs] def sample_scheduling_blocks( scheduling_block_types_config, schedule_length_hrs=7 * 24, seed=42 ): """Sample scheduling block types to compile a list for generating an observing schedule. Args: scheduling_block_types_config (dict): Scheduling block types. configuration. schedule_length_hrs (int): Total length of the observing schedule in hours. Returns: observing_schedule (list): Sampled scheduling blocks """ rng = np.random.default_rng(seed=seed) observing_schedule = [] total_time = 0 sb_keys = list(scheduling_block_types_config.keys()) sb_durations = [ scheduling_block_types_config[sb]["scheduling_block_instance_time_hrs"] for sb in sb_keys ] sb_durations_min = min(sb_durations) sb_weights = [ scheduling_block_types_config[sb]["sampling_weight"] for sb in sb_keys ] sb_weights_norm = sb_weights / np.sum(sb_weights) while schedule_can_be_filled( total_time, schedule_length_hrs, sb_durations_min ): sb_type = rng.choice(sb_keys, p=sb_weights_norm) sb_integration_time = scheduling_block_types_config[sb_type][ "scheduling_block_instance_time_hrs" ] if scheduling_block_too_long( total_time, schedule_length_hrs, sb_integration_time ): continue total_time += sb_integration_time observing_schedule.append( { "SB_type": sb_type, "integration_time_hrs": sb_integration_time, "scheduling_block_instance_time_hrs": sb_integration_time, "short_name": scheduling_block_types_config[sb_type][ "short_name" ], } ) return observing_schedule
[docs] def scheduling_block_too_long( current_schedule_length, target_schedule_length, block_duration ): """Check if the scheduling block is too long to fit in the schedule. Args: current_schedule_length (float): Current length of the schedule (hrs). target_schedule_length (float): Target length of the schedule (hrs). block_duration (float): Duration of the scheduling block (hrs). Returns: bool: True if the scheduling block is too long to fit in the schedule, False otherwise. """ return (current_schedule_length + block_duration) > target_schedule_length
[docs] def schedule_can_be_filled( current_schedule_length, target_schedule_length, minimum_block_duration ): """Check if the schedule can be filled with scheduling blocks. Checks for remaining gaps in the schedule and if the gaps can be filled with scheduling blocks. Args: current_schedule_length (float): Current length of the schedule (hrs). target_schedule_length (float): Target length of the schedule (hrs). minimum_block_duration (float): Minimum duration of a scheduling block (hrs). Returns: bool: True if the schedule can be filled with scheduling blocks, False otherwise. """ gaps_remain = schedule_incomplete( current_schedule_length, target_schedule_length ) gaps_can_be_filled = schedule_remainder_fits_smallest_block( current_schedule_length, target_schedule_length, minimum_block_duration ) return gaps_remain and gaps_can_be_filled
[docs] def schedule_remainder_fits_smallest_block( current_schedule_length, target_schedule_length, minimum_block_duration ): """Check if the remainder of the schedule can be filled with scheduling blocks. Args: current_schedule_length (float): Current length of the schedule (hrs). target_schedule_length (float): Target length of the schedule (hrs). minimum_block_duration (float): Minimum duration of a scheduling block (hrs). Returns: bool: True if the remainder of the schedule can be filled with scheduling blocks, False otherwise. """ remaining_schedule_length = ( target_schedule_length - current_schedule_length ) return remaining_schedule_length >= minimum_block_duration
[docs] def schedule_incomplete(current_schedule_length, target_schedule_length): """Check if the current schedule length is less than the target schedule length. Args: current_schedule_length (float): Current length of the schedule (hrs). target_schedule_length (float): Target length of the schedule (hrs). Returns: bool: True if the current schedule length is less than the target schedule length, False otherwise. """ return current_schedule_length < target_schedule_length
[docs] def fill_observing_schedule_df(scheduling_blocks): """Generate an observing schedule DataFrame from a list of scheduling blocks. Args: scheduling_blocks (list): List of scheduling blocks. Returns: observing_schedule_df (pd.DataFrame): Observing schedule DataFrame. """ observing_schedule_df = pd.DataFrame(scheduling_blocks) observing_schedule_df = add_observing_schedule_id_columns( observing_schedule_df ) return observing_schedule_df
[docs] def add_observing_schedule_id_columns(observing_schedule_df): """Add SB_ID and SBI_ID columns to the observing schedule DataFrame. SB_ID is a unique identifier for each Scheduling Block type with the format {SB_type_shortname}_{00X}, where SB_type_shortname is the first letter of the first two words of the SB_type and 00X is a zero-padded integer incremented for each SB_type. SBI_ID is a unique identifier for each Scheduling Block instance with the format {SB_ID}_{00X}, where SB_ID is the unique identifier for the Scheduling Block type and 00X is a zero-padded integer incremented for each instance of the Scheduling Block. Currently this is not used in the simulation but will be used to track individual instances of Scheduling Blocks in the future. Args: observing_schedule_df (pd.DataFrame): Observing schedule DataFrame. Must contain the column "SB_type". Returns: observing_schedule_df (pd.DataFrame): Observing schedule DataFrame with ID columns added. """ observing_schedule_df["scheduling_block_instance_time_seconds"] = ( observing_schedule_df["scheduling_block_instance_time_hrs"] * SECONDS_IN_1HR ) observing_schedule_df["integration_time_seconds"] = ( observing_schedule_df["integration_time_hrs"] * SECONDS_IN_1HR ) for short_name, group_df in observing_schedule_df.groupby("short_name"): cumulative_duration = 0 sb_id_counter = 0 sbi_id_counter = 0 for index, row in group_df.iterrows(): observing_schedule_df.at[index, "SB_ID"] = ( f"{short_name}_{sb_id_counter:03d}" ) observing_schedule_df.at[index, "SBI_ID"] = ( f"{short_name}_{sb_id_counter:03d}_{sbi_id_counter:03d}" ) cumulative_duration += row[ "scheduling_block_instance_time_seconds" ] if cumulative_duration >= row["integration_time_seconds"]: sbi_id_counter = 0 sb_id_counter += 1 cumulative_duration = 0 else: sbi_id_counter += 1 observing_schedule_df.drop( columns=[ "scheduling_block_instance_time_seconds", "integration_time_seconds", ], inplace=True, ) return observing_schedule_df