Source code for rascil.workflows.rsexecute.calibration.calibration_rsexecute

"""

"""

__all__ = ["calibrate_list_rsexecute_workflow"]

import logging
import os

import numpy as np
from ska_sdp_func_python.calibration import (
    apply_calibration_chain,
    dp3_gaincal,
    solve_calibrate_chain,
)
from ska_sdp_func_python.visibility import (
    concatenate_visibility,
    divide_visibility,
    integrate_visibility_by_channel,
)

from rascil.processing_components.parameters import rascil_path
from rascil.workflows.rsexecute.execution_support.rsexecute import rsexecute

log = logging.getLogger("rascil-logger")


[docs] def calibrate_list_rsexecute_workflow( vis_list, model_vislist, gt_list=None, calibration_context="TG", controls=None, global_solution=True, **kwargs, ): """ Create a set of components for (optionally global) calibration of a list of visibilities If global solution is true then visibilities are gathered to a single visibility data set which is then self-calibrated. The resulting gaintable is then effectively scattered out for application to each visibility set. If global solution is false then the solutions are performed locally. :param vis_list: list of visibilities (or graph) :param model_vislist: list of model visibilities (or graph) :param calibration_context: String giving terms to be calibrated e.g. 'TGB' :param controls: Calibration controls dictionary :param global_solution: Solve for global gains :param kwargs: Parameters for functions in components :return: list of calibrated vis, list of dictionaries of gaintables """ if global_solution: log.info("calibration_solve: Performing global solution of gains for all vis") else: log.info( "calibration_solve: Performing separate solution of gains for each vis" ) def calibration_solve(vis, modelvis=None, gt=None): return solve_calibrate_chain( vis, modelvis, gaintables=gt, calibration_context=calibration_context, controls=controls, iteration=kwargs.get("iteration", 0), tol=kwargs.get("tol", 1e-6), ) def calibration_apply(vis, gt): assert gt is not None new_vis = vis.copy(deep=True) return apply_calibration_chain( new_vis, gt, calibration_context=calibration_context, controls=controls, iteration=kwargs.get("iteration", 0), inverse=True, ) if kwargs.get("calibrate_with_dp3", False): log.info( "Using DP3 for selfcal. Please note that the gain tables are not " "available in a RASCIL-known format when using this technique" ) skymodel = kwargs.get("input_dp3_skymodel", None) if skymodel is None: skymodel = rascil_path("test.skymodel") if not (os.path.exists(skymodel)): raise RuntimeError( f"The file {skymodel} needed for DP3 calibration is not found" ) return ( [ rsexecute.execute(dp3_gaincal, nout=1)( v, calibration_context, global_solution, skymodel ) for v in vis_list ], None, ) # Here we do a global solution over all vis and channels or # just solutions per vis if global_solution and (len(vis_list) > 1): # The conversion is a no op if it's actually a vis point_vislist = [ rsexecute.execute(divide_visibility, nout=1)(vis_list[i], model_vislist[i]) for i, _ in enumerate(vis_list) ] global_point_vis_list = rsexecute.execute(concatenate_visibility, nout=1)( point_vislist, dim="frequency" ) global_point_vis_list = rsexecute.execute( integrate_visibility_by_channel, nout=1 )(global_point_vis_list) # This is a global solution so we only compute one gain table if gt_list is None or len(gt_list) < 1: new_gt_list = [ rsexecute.execute(calibration_solve, pure=True, nout=1)( global_point_vis_list, ) ] else: new_gt_list = [ rsexecute.execute(calibration_solve, pure=True, nout=1)( global_point_vis_list, gt=gt_list[0], ) ] return ( [ rsexecute.execute(calibration_apply, nout=1)(v, new_gt_list[0]) for v in vis_list ], new_gt_list, ) else: if gt_list is not None and len(gt_list) > 0: new_gt_list = [ rsexecute.execute(calibration_solve, pure=True, nout=1)( v, model_vislist[i], gt_list[i], ) for i, v in enumerate(vis_list) ] else: new_gt_list = [ rsexecute.execute(calibration_solve, pure=True, nout=1)( v, model_vislist[i], ) for i, v in enumerate(vis_list) ] return ( [ rsexecute.execute(calibration_apply)(v, new_gt_list[i]) for i, v in enumerate(vis_list) ], new_gt_list, )