import copy
import logging
import tempfile
from pydantic import Field
from ska_sdp_piper.piper import CLIArgument, Pipeline, PiperBaseModel
from ska_sdp_piper.piper.constants import RUN_CLI_ARGS
from ska_sdp_piper.piper.stage import Stages
from ska_sdp_piper.piper.utils.io_utils import read_yml, write_yml
from . import __version__
from .data_managers.sdm import prepare_qa_path
from .scheduler import InstrumentalDaskRunner
from .stages import (
bandpass_calibration_stage,
bandpass_initialisation_stage,
delay_calibration_stage,
export_gaintable_stage,
export_visibilities_stage,
flag_gain_stage,
generate_channel_rm_stage,
ionospheric_delay_stage,
load_data_stage,
predict_vis_stage,
smooth_gain_solution_stage,
)
[docs]
class ExperimentalConfig(PiperBaseModel):
pipeline: list = Field(default_factory=list)
[docs]
class GlobalConfig(PiperBaseModel):
experimental: ExperimentalConfig = Field(
default_factory=ExperimentalConfig
)
logger = logging.getLogger()
input_cli_arg = CLIArgument(
"input_ms",
nargs="+",
type=str,
help="Input visibility path(s)",
)
sdm_cli_arg = CLIArgument(
"--sdm-path",
dest="sdm_path",
type=str,
default=None,
help="""Directory path to store the Science Data Models""",
)
"""
This is the entrypoint for instrumental calibration pipeline.
"""
ska_sdp_instrumental_calibration = (
Pipeline(
"ska_sdp_instrumental_calibration",
load_data_stage,
predict_vis_stage,
bandpass_initialisation_stage,
bandpass_calibration_stage,
delay_calibration_stage,
flag_gain_stage,
ionospheric_delay_stage,
generate_channel_rm_stage,
smooth_gain_solution_stage,
export_visibilities_stage,
export_gaintable_stage,
global_config_model=GlobalConfig,
version=__version__,
)
.with_qa_path_resolver(prepare_qa_path)
.overide_run(
input_cli_arg,
sdm_cli_arg,
runner=InstrumentalDaskRunner,
)
)
[docs]
@ska_sdp_instrumental_calibration.sub_command(
"experimental",
*RUN_CLI_ARGS,
input_cli_arg,
sdm_cli_arg,
*InstrumentalDaskRunner.cli_args(),
help="Allows reordering of stages via additional config section",
)
def experimental(**cli_args):
"""
Reorder stages of INST pipeline. Use the config section
global_parameters.experimental.stage_order to provide the order of
callibration stages. Load data, predict and export stages are not
reorder-able.
Parameters
----------
cli_args: varargs
CLI arguments
"""
stage_mapping = {
stage.name: stage for stage in ska_sdp_instrumental_calibration._stages
}
logger.warning("=========== INST Experimental ============")
if cli_args["config_path"]:
config = read_yml(cli_args["config_path"])
reconfigured_stages = []
duplicate_counter = {}
stage_order = (
config.get("global_parameters", {})
.get("experimental", {})
.get("pipeline", [])
)
parameters = config.get("parameters", {})
new_parameters = {}
pipeline_state = {}
for stage_dict in stage_order:
stage_name, stage_config = list(stage_dict.items())[0]
stage = stage_mapping[stage_name]
if stage in reconfigured_stages:
stage = copy.deepcopy(stage)
duplicate_counter[stage_name] = (
duplicate_counter.get(stage_name, 0) + 1
)
new_stage_name = (
f"{stage_name}_{duplicate_counter[stage_name]}"
)
stage.name = new_stage_name
if stage_config:
new_parameters[stage.name] = stage_config
elif stage_config := parameters.get(stage_name):
new_parameters[stage.name] = copy.deepcopy(stage_config)
pipeline_state[stage.name] = True
reconfigured_stages.append(stage)
if reconfigured_stages:
stages = Stages(*reconfigured_stages)
ska_sdp_instrumental_calibration._stages = stages
config["parameters"] = new_parameters
config["pipeline"] = pipeline_state
_, temp_config = tempfile.mkstemp(text=True, suffix=".yml")
write_yml(temp_config, config)
cli_args["config_path"] = temp_config
logger.info("Created temprory experimental config %s", temp_config)
else:
logger.warning(
"No stage reordering provided. Using the default stage "
"order"
)
else:
logger.warning("No Config provided. Using the default stage order")
logger.warning("==========================================")
ska_sdp_instrumental_calibration.run(**cli_args)