"""Module for reading and processing input files."""
import base64
import itertools as itt
import json
import logging
import operator as op
from collections import defaultdict
from pathlib import Path
import pandas as pd
from pydantic import ValidationError
from ska_sdp_resource_model.simulate.config_models import (
HardwareConfigModel,
ObservationSchedule,
PipelinesConfigModel,
SchedulingBlockConfigModel,
)
from ska_sdp_resource_model.simulate.generate_inputs import (
add_observing_schedule_id_columns,
generate_observing_schedule,
)
logger = logging.getLogger()
DEFAULT_DATA_PATH = Path(__file__).parent.parent / "data"
DEFAULT_CONFIG_PATH = DEFAULT_DATA_PATH / "config"
DEFAULT_SCHEDULES_PATH = DEFAULT_DATA_PATH / "schedules"
HARDWARE_CONFIG_PATH = DEFAULT_CONFIG_PATH / "hardware.json"
PIPELINES_CONFIG_PATH = DEFAULT_CONFIG_PATH / "pipelines.json"
OBSERVING_SCHEDULE_PATH = DEFAULT_SCHEDULES_PATH / "observing_schedule.json"
SCHEDULING_BLOCK_TYPES_CONFIG_PATH = (
DEFAULT_CONFIG_PATH / "scheduling_block_types.json"
)
[docs]
def load_config(file_path, expected_keys=None, config_model=None):
"""Load configuration from a file.
Args:
file_path (str or Path):
Path to the configuration file.
expected_keys (list):
Optional list of expected keys for each item in the configuration
file.
config_model (BaseModel):
Optional Pydantic model to validate the configuration.
Returns:
config (dict): Configuration data loaded from file_path.
"""
file_path = Path(file_path)
config = {}
try:
with open(file_path, "r", encoding="utf-8") as file:
config = json.load(file)
logger.debug("Configuration loaded from %s.", file_path)
if config_model:
config = config_model(**config).model_dump()
if expected_keys and config:
validate_config_schema(config, expected_keys, file_path)
return config
except (json.JSONDecodeError, FileNotFoundError, ValidationError) as error:
raise RuntimeError(
f"Error loading configuration from {file_path}: {error}"
) from error
[docs]
def read_base64_contents(file_path_or_contents):
"""Read a Base64-encoded JSON file and the contents as a dictionary.
Args:
file_path_or_contents (str): A Base64-encoded string of the JSON file.
Returns:
dict: Dictionary containing the decoded JSON data.
"""
content_string = file_path_or_contents.split(",")[1]
decoded = base64.b64decode(content_string)
return json.loads(decoded)
[docs]
def validate_config_schema(config, expected_keys, file_path):
"""Validate configuration against expected keys.
Args:
config (dict):
Configuration data to validate.
expected_keys (list):
List of expected keys for each item in the configuration data.
file_path (str or Path):
Path to the configuration file.
Returns:
bool: True if configuration is valid, False otherwise.
"""
for item in config.values():
missing_keys = [key for key in expected_keys if key not in item]
unrecognized_keys = [key for key in item if key not in expected_keys]
if missing_keys:
raise RuntimeError(f"Missing keys: {missing_keys} in {file_path}")
if unrecognized_keys:
raise RuntimeError(
f"Unrecognised keys: {unrecognized_keys} in {file_path}"
)
[docs]
def add_pipeline_config_to_scheduling_blocks(
scheduling_block_types_config, pipelines_config
):
"""Add pipeline configurations to scheduling block type configurations.
Replaces the list of pipeline steps with a dictionary of pipeline
configurations.
Args:
scheduling_block_types_config (dict):
Scheduling block types configuration.
pipelines_config (dict):
Pipelines configuration.
Returns:
scheduling_block_types_config (dict):
Scheduling block types configuration with pipeline configurations.
"""
pipelines_config = sanitise_pipelines_config(**pipelines_config)
for scheduling_block_config in scheduling_block_types_config.values():
scheduling_block_config["pipeline_steps"] = {
pipeline: pipelines_config.get(pipeline, {})
for pipeline in scheduling_block_config.get("pipeline_steps", [])
}
return scheduling_block_types_config
[docs]
def sanitise_pipelines_config(**config):
"""Sanitise pipeline configuration for resourse usage simulation before
scheduling.
For all pipelines, ensure that the required keys "node_hours" and
"pct_parallelism" exist, defaulting their values to those of
"node_hours_mean" and "pct_parallelism_min" if necessary. Removes
superfluous keys: {"node_hours_mean", "node_hours_uncertainty",
"pct_parallelism_min", "pct_parallelism_max"}
Args:
config (dict): Pipeline configuration.
Returns:
dict: Sanitised pipeline configuration.
"""
return {
pipeline: sanitise_pipeline_config(**pipeline_config)
for pipeline, pipeline_config in config.items()
}
[docs]
def sanitise_pipeline_config(**config):
"""Sanitise configuration for a single pipeline.
Ensures that the required keys "node_hours" and "pct_parallelism" exist,
defaulting their values to those of "node_hours_mean" and
"pct_parallelism_min" if necessary. Removes superfluous keys:
{"node_hours_mean", "node_hours_uncertainty", "pct_parallelism_min",
"pct_parallelism_max"}
Args:
config (dict): Pipeline configuration.
Returns:
dict: Sanitised pipeline configuration.
"""
if not config:
return config
if "node_hours" not in config:
config["node_hours"] = config["node_hours_mean"]
if "pct_parallelism" not in config:
config["pct_parallelism"] = config["pct_parallelism_min"]
# remove keys that are not needed for ResourceUsageSimulator
for key in (
"node_hours_mean",
"node_hours_uncertainty",
"pct_parallelism_min",
"pct_parallelism_max",
):
config.pop(key, None)
return config
[docs]
def get_observations_config(observing_schedule, scheduling_block_types_config):
"""Get list of configurations for all observations in the observing
schedule.
GB values are rounded to the nearest whole number.
Args:
observing_schedule (dict):
Observing schedule.
scheduling_block_types_config (dict):
Scheduling block types configuration with pipeline configurations.
Returns:
observations_config (list):
List of configurations for all observations in the observing
schedule.
"""
observations_config = []
time = 0
for scheduling_block_type in observing_schedule["scheduling_blocks"]:
scheduling_block_config = scheduling_block_types_config.get(
scheduling_block_type, {}
)
total_data_products_gb = sum(
round(pipeline["data_product_storage_gb"])
for pipeline in scheduling_block_config["pipeline_steps"].values()
)
observation_config = {
**scheduling_block_config,
"SB_type": scheduling_block_type,
"total_data_products_gb": round(total_data_products_gb),
"raw_vis_gb": round(scheduling_block_config["raw_vis_gb"]),
"processed_vis_gb": round(
scheduling_block_config["processed_vis_gb"]
),
"Start": time,
"End": time
+ scheduling_block_config["scheduling_block_instance_time_hrs"],
}
time += scheduling_block_config["scheduling_block_instance_time_hrs"]
observations_config.append(observation_config)
observing_schedule_df = pd.DataFrame(observations_config)
observing_schedule_df = add_observing_schedule_id_columns(
observing_schedule_df
)
observations_config = observing_schedule_df.to_dict(orient="records")
return observations_config
[docs]
def get_observations(
pipelines_config,
observing_schedule_path=OBSERVING_SCHEDULE_PATH,
scheduling_block_types_config_path=SCHEDULING_BLOCK_TYPES_CONFIG_PATH,
generate_observing_schedule_hrs=0,
):
"""Get the observations configuration.
Reads in the observing schedule file and the scheduling block types file,
and adds the pipeline configurations to the scheduling blocks.
Args:
pipelines_config (dict):
The pipelines configuration.
observing_schedule_path (str or Path):
Path to the observing schedule file.
scheduling_block_types_config_path (str or Path):
Path to the scheduling block types file.
Returns:
observations (list):
The observations schedule with pipeline configurations.
"""
blocks_config = load_config(
scheduling_block_types_config_path,
config_model=SchedulingBlockConfigModel,
)
blocks = add_pipeline_config_to_scheduling_blocks(
blocks_config, pipelines_config
)
if generate_observing_schedule_hrs > 0:
logger.debug(
"Generating mixture of scheduling blocks totalling %d hours of "
"observations...",
generate_observing_schedule_hrs,
)
observing_schedule_df = generate_observing_schedule(
blocks_config,
schedule_length_hrs=generate_observing_schedule_hrs,
seed=None,
)
observing_schedule = {
"scheduling_blocks": observing_schedule_df["SB_type"].tolist()
}
else:
observing_schedule = load_config(
observing_schedule_path, config_model=ObservationSchedule
)
return get_observations_config(observing_schedule, blocks)
[docs]
def load_pipelines_config(path):
"""Load configuration for simulated pipelines.
Args:
path (Path): Path to json file containing pipeline configuration data.
Returns:
pipelines_config (dict): Pipelines configuration data.
"""
return load_config(path, config_model=PipelinesConfigModel)
[docs]
def load_hardware_configs(hardware_config_path=HARDWARE_CONFIG_PATH):
"""Load hardware configuration data.
Args:
hardware_config (str or Path):
Relative path to json file containing hardware configuration data.
Returns:
hardware_config (dict): Hardware configuration data.
"""
return load_config(hardware_config_path, config_model=HardwareConfigModel)
[docs]
def get_hardware_config_options(hardware_config_path):
"""Get hardware configuration options for plotting.
Args:
hardware_config_path (str or Path):
relative path to json file containing hardware configuration data.
"""
hardware_config = load_config(
hardware_config_path, config_model=HardwareConfigModel
)
options = [{"label": key, "value": key} for key in hardware_config.keys()]
options = sorted(options, key=lambda x: x["label"])
return options
[docs]
def get_processing_blocks(scheduling_block_types_config, pipelines_config):
"""Get processing blocks from configuration.
Takes the configurations of scheduling block types and pipelines and
returns a DataFrame containing the storage and compute requirements.
Columns are included for the serial and parallel node hours required.
This is used for plotting resource requirements independently of the
simulation. Actual runtimes in the simulation will depend on the number
of nodes available to run each pipeline (the parallel node
hours will be divided by the number of nodes allocated to a pipeline).
Args:
scheduling_block_types_config (dict): Scheduling block configuration.
pipeline_config (dict): Pipeline configuration.
Returns:
processing_blocks (pd.DataFrame): DataFrame of processing blocks.
"""
scheduling_blocks_df = pd.DataFrame.from_dict(
scheduling_block_types_config, orient="index"
)
pipelines_df = pd.DataFrame.from_dict(pipelines_config, orient="index")
processing_blocks_df = (
scheduling_blocks_df.explode("pipeline_steps")
.reset_index(names="sb_type")
.filter(
items=[
"sb_type",
"pipeline_steps",
"raw_vis_gb",
"processed_vis_gb",
]
)
.merge(
pipelines_df.reset_index(names="pipeline_steps").filter(
items=[
"pipeline_steps",
"node_hours_mean",
"pct_parallelism_min",
"data_product_storage_gb",
]
),
on="pipeline_steps",
)
)
# Compute hours required for serial processing (will not change with
# number of nodes used)
processing_blocks_df["serial_node_hours"] = processing_blocks_df[
"node_hours_mean"
] * (1 - processing_blocks_df["pct_parallelism_min"] / 100)
# Compute hours required for parallel processing (will change with
# number of nodes used)
processing_blocks_df["parallel_node_hours"] = processing_blocks_df[
"node_hours_mean"
] * (processing_blocks_df["pct_parallelism_min"] / 100)
return processing_blocks_df
[docs]
def update_observations(observations, pipelines_config):
"""Update the observations list with new pipeline step configurations.
Args:
observations (list):
A list of observations to update.
pipelines_config (dict):
A dictionary containing the configuration of each pipeline.
Returns:
updated_observations (list): A list of updated observations.
"""
for observation in observations:
for pipeline in observation["pipeline_steps"]:
observation["pipeline_steps"][pipeline].update(
pipelines_config[pipeline]
)
return observations
[docs]
def get_scheduling_blocks_data(observation_list):
"""Get the list of scheduling blocks from an observation list.
This function uses python builtins to sort, group and aggregate the
scheduling block instances. This implementation is around ~500-800 times
faster than the previous implementation using pandas DataFrames.
Args:
observations (list):
Sequence of scheduling blocks with pipeline configurations
Returns:
scheduling_blocks_grouped (dict):
Dictionary containing scheduling block data.
"""
get_id = op.itemgetter("SB_ID")
observation_list = sorted(observation_list, key=get_id)
scheduling_blocks_grouped = defaultdict(lambda: defaultdict(float))
for group_name, scheduling_blocks_list in itt.groupby(
observation_list, get_id
):
count = 0
raw_vis_gb = 0
processed_vis_gb = 0
total_data_products_gb = 0
data_retention_hrs = []
for count, scheduling_block in enumerate(scheduling_blocks_list, 1):
raw_vis_gb += scheduling_block["raw_vis_gb"]
processed_vis_gb += scheduling_block["processed_vis_gb"]
total_data_products_gb += scheduling_block[
"total_data_products_gb"
]
data_retention_hrs.append(scheduling_block["data_retention_hrs"])
scheduling_blocks_grouped[group_name] = {
"data_retention_hrs": max(data_retention_hrs),
"num_scheduling_block_instances": count,
"total_data_products_gb": total_data_products_gb,
"processed_vis_gb": processed_vis_gb,
"raw_vis_gb": raw_vis_gb,
}
return scheduling_blocks_grouped