Source code for ska_sdp_instrumental_calibration.instrumental_calibration

import copy
import logging
import tempfile

from pydantic import Field
from ska_sdp_piper.piper import CLIArgument, Pipeline, PiperBaseModel
from ska_sdp_piper.piper.constants import RUN_CLI_ARGS
from ska_sdp_piper.piper.stage import Stages
from ska_sdp_piper.piper.utils.io_utils import read_yml, write_yml

from . import __version__
from .data_managers.sdm import prepare_qa_path
from .scheduler import InstrumentalDaskRunner
from .stages import (
    bandpass_calibration_stage,
    bandpass_initialisation_stage,
    delay_calibration_stage,
    export_gaintable_stage,
    export_visibilities_stage,
    flag_gain_stage,
    generate_channel_rm_stage,
    ionospheric_delay_stage,
    load_data_stage,
    predict_vis_stage,
    smooth_gain_solution_stage,
)


[docs] class ExperimentalConfig(PiperBaseModel): pipeline: list = Field(default_factory=list)
[docs] class GlobalConfig(PiperBaseModel): experimental: ExperimentalConfig = Field( default_factory=ExperimentalConfig )
logger = logging.getLogger() input_cli_arg = CLIArgument( "input_ms", nargs="+", type=str, help="Input visibility path(s)", ) sdm_cli_arg = CLIArgument( "--sdm-path", dest="sdm_path", type=str, default=None, help="""Directory path to store the Science Data Models""", ) """ This is the entrypoint for instrumental calibration pipeline. """ ska_sdp_instrumental_calibration = ( Pipeline( "ska_sdp_instrumental_calibration", load_data_stage, predict_vis_stage, bandpass_initialisation_stage, bandpass_calibration_stage, delay_calibration_stage, flag_gain_stage, ionospheric_delay_stage, generate_channel_rm_stage, smooth_gain_solution_stage, export_visibilities_stage, export_gaintable_stage, global_config_model=GlobalConfig, version=__version__, ) .with_qa_path_resolver(prepare_qa_path) .overide_run( input_cli_arg, sdm_cli_arg, runner=InstrumentalDaskRunner, ) )
[docs] @ska_sdp_instrumental_calibration.sub_command( "experimental", *RUN_CLI_ARGS, input_cli_arg, sdm_cli_arg, *InstrumentalDaskRunner.cli_args(), help="Allows reordering of stages via additional config section", ) def experimental(**cli_args): """ Reorder stages of INST pipeline. Use the config section global_parameters.experimental.stage_order to provide the order of callibration stages. Load data, predict and export stages are not reorder-able. Parameters ---------- cli_args: varargs CLI arguments """ stage_mapping = { stage.name: stage for stage in ska_sdp_instrumental_calibration._stages } logger.warning("=========== INST Experimental ============") if cli_args["config_path"]: config = read_yml(cli_args["config_path"]) reconfigured_stages = [] duplicate_counter = {} stage_order = ( config.get("global_parameters", {}) .get("experimental", {}) .get("pipeline", []) ) parameters = config.get("parameters", {}) new_parameters = {} pipeline_state = {} for stage_dict in stage_order: stage_name, stage_config = list(stage_dict.items())[0] stage = stage_mapping[stage_name] if stage in reconfigured_stages: stage = copy.deepcopy(stage) duplicate_counter[stage_name] = ( duplicate_counter.get(stage_name, 0) + 1 ) new_stage_name = ( f"{stage_name}_{duplicate_counter[stage_name]}" ) stage.name = new_stage_name if stage_config: new_parameters[stage.name] = stage_config elif stage_config := parameters.get(stage_name): new_parameters[stage.name] = copy.deepcopy(stage_config) pipeline_state[stage.name] = True reconfigured_stages.append(stage) if reconfigured_stages: stages = Stages(*reconfigured_stages) ska_sdp_instrumental_calibration._stages = stages config["parameters"] = new_parameters config["pipeline"] = pipeline_state _, temp_config = tempfile.mkstemp(text=True, suffix=".yml") write_yml(temp_config, config) cli_args["config_path"] = temp_config logger.info("Created temprory experimental config %s", temp_config) else: logger.warning( "No stage reordering provided. Using the default stage " "order" ) else: logger.warning("No Config provided. Using the default stage order") logger.warning("==========================================") ska_sdp_instrumental_calibration.run(**cli_args)