Source code for ska_sdp_resource_model.simulate.validate_inputs

"""Module for validating simulation inputs."""

import logging

from ska_sdp_resource_model.simulate.constants import GB_IN_PB, GB_IN_TB
from ska_sdp_resource_model.simulate.process_inputs import (
    get_scheduling_blocks_data,
)

logger = logging.getLogger("simulation")


[docs] def inputs_ok(observations, hardware_config): """Validate simulation inputs. Args: observations (list): Sequence of scheduling blocks with pipeline configurations. hardware_config (dict): Hardware configuration data. Returns: bool: True if simulation inputs are valid, False otherwise. """ data_ok = True if not observations: logger.error("No observations found.") data_ok = False if not hardware_config: logger.error("No hardware configuration found.") data_ok = False if not data_ok: logger.critical( "SIMULATION FAILED - check your input files contain valid data!" ) return data_ok
[docs] def configuration_ok(observations, hardware_config): """Check storage and compute requirements are sufficient.""" scheduling_blocks_data = get_scheduling_blocks_data(observations) data_ok = True for sb_id, scheduling_block in scheduling_blocks_data.items(): if not scheduling_block_storage_ok( sb_id, scheduling_block, hardware_config["capacity_storage_pb"], hardware_config["data_product_storage_pb"], ): data_ok = False for observation in observations: if not storage_ok(observation, hardware_config): data_ok = False if not compute_nodes_ok(observation, hardware_config): data_ok = False if not data_ok: logger.critical( "SIMULATION FAILED - make sure your " "hardware configuration can support " "the requirements of your observing schedule!" ) return data_ok
[docs] def scheduling_block_storage_ok( sb_id, scheduling_block, hardware_capacity_pb, hardware_data_product_pb ): """Check if there is sufficient capacity storage for the scheduling block. Args: storage_limit (int): The total capacity storage limit in GB. Returns: bool: True if there is sufficient capacity storage, False otherwise. """ sb_ok_raw = ( scheduling_block["raw_vis_gb"] <= hardware_capacity_pb * GB_IN_PB ) if not sb_ok_raw: logger.critical( "Insufficient capacity storage for %s: requires %d GB but capacity" " is %d GB.", sb_id, scheduling_block["raw_vis_gb"], hardware_capacity_pb * GB_IN_PB, ) sb_ok_dp = ( scheduling_block["total_data_products_gb"] <= hardware_data_product_pb * GB_IN_PB ) if not sb_ok_dp: logger.critical( "Insufficient data product storage for %s: requires %d GB but " "capacity is %d GB.", sb_id, scheduling_block["total_data_products_gb"], hardware_data_product_pb * GB_IN_PB, ) return sb_ok_raw and sb_ok_dp
[docs] def storage_ok(observation, hardware_config): """Check if performance storage is sufficient for the observation.""" capacity_ok = capacity_storage_ok( observation, hardware_config["capacity_storage_pb"] ) data_product_ok = data_product_storage_ok( observation, hardware_config["data_product_storage_pb"] ) performance_ok = performance_storage_ok( observation, hardware_config["performance_storage_tb"] ) return bool((capacity_ok and data_product_ok and performance_ok))
[docs] def capacity_storage_ok(observation, hardware_capacity_pb): """Check if capacity storage is sufficient for the observation.""" capacity_ok = True capacity_requirement_gb = observation["raw_vis_gb"] capacity_limit_gb = hardware_capacity_pb * GB_IN_PB if capacity_requirement_gb > capacity_limit_gb: logger.critical( "Insufficient capacity storage for %s: requested %d GB but " "capacity is %d GB.", observation["SBI_ID"], capacity_requirement_gb, capacity_limit_gb, ) capacity_ok = False return capacity_ok
[docs] def data_product_storage_ok(observation, hardware_data_product_pb): """Check if data product storage is sufficient for the observation.""" data_product_ok = True data_product_requirement_gb = observation["total_data_products_gb"] data_product_limit_gb = hardware_data_product_pb * GB_IN_PB if data_product_requirement_gb > data_product_limit_gb: logger.critical( "Insufficient data product storage for %s: requested %d GB but " "data product storage is %d GB.", observation["SBI_ID"], data_product_requirement_gb, data_product_limit_gb, ) data_product_ok = False return data_product_ok
[docs] def performance_storage_ok(observation, hardware_performance_tb): """Check if performance storage is sufficient for the observation.""" performance_ok = True performance_requirement_gb = observation["processed_vis_gb"] performance_limit_gb = hardware_performance_tb * GB_IN_TB if performance_requirement_gb > performance_limit_gb: logger.critical( "Insufficient performance storage for %s: requested %d GB but " "capacity is %d GB.", observation["SBI_ID"], performance_requirement_gb, performance_limit_gb, ) performance_ok = False return performance_ok
[docs] def compute_nodes_ok(observation, hardware_config): """Check if compute nodes are sufficient for the pipeline.""" compute_nodes_limit = hardware_config["compute_nodes"] compute_ok = True for pipeline, config in observation["pipeline_steps"].items(): compute_nodes_required = config["num_nodes"] if compute_nodes_required > compute_nodes_limit: logger.critical( "Insufficient compute nodes for %s: requested %d but only %d " "available.", pipeline, compute_nodes_required, compute_nodes_limit, ) compute_ok = False return compute_ok