"""
"""
__all__ = ["calibrate_list_rsexecute_workflow"]
import logging
import os
import numpy as np
from ska_sdp_func_python.calibration import (
apply_calibration_chain,
dp3_gaincal,
solve_calibrate_chain,
)
from ska_sdp_func_python.visibility import (
concatenate_visibility,
divide_visibility,
integrate_visibility_by_channel,
)
from rascil.processing_components.parameters import rascil_path
from rascil.workflows.rsexecute.execution_support.rsexecute import rsexecute
log = logging.getLogger("rascil-logger")
[docs]
def calibrate_list_rsexecute_workflow(
vis_list,
model_vislist,
gt_list=None,
calibration_context="TG",
controls=None,
global_solution=True,
**kwargs,
):
"""
Create a set of components for (optionally global) calibration of a
list of visibilities
If global solution is true then visibilities are gathered to a single
visibility data set which is then self-calibrated. The resulting gaintable
is then effectively scattered out for application to each visibility
set. If global solution is false then the solutions are performed locally.
:param vis_list: list of visibilities (or graph)
:param model_vislist: list of model visibilities (or graph)
:param calibration_context: String giving terms to be calibrated e.g. 'TGB'
:param controls: Calibration controls dictionary
:param global_solution: Solve for global gains
:param kwargs: Parameters for functions in components
:return: list of calibrated vis, list of dictionaries of gaintables
"""
if global_solution:
log.info("calibration_solve: Performing global solution of gains for all vis")
else:
log.info(
"calibration_solve: Performing separate solution of gains for each vis"
)
def calibration_solve(vis, modelvis=None, gt=None):
return solve_calibrate_chain(
vis,
modelvis,
gaintables=gt,
calibration_context=calibration_context,
controls=controls,
iteration=kwargs.get("iteration", 0),
tol=kwargs.get("tol", 1e-6),
)
def calibration_apply(vis, gt):
assert gt is not None
new_vis = vis.copy(deep=True)
return apply_calibration_chain(
new_vis,
gt,
calibration_context=calibration_context,
controls=controls,
iteration=kwargs.get("iteration", 0),
inverse=True,
)
if kwargs.get("calibrate_with_dp3", False):
log.info(
"Using DP3 for selfcal. Please note that the gain tables are not "
"available in a RASCIL-known format when using this technique"
)
skymodel = kwargs.get("input_dp3_skymodel", None)
if skymodel is None:
skymodel = rascil_path("test.skymodel")
if not (os.path.exists(skymodel)):
raise RuntimeError(
f"The file {skymodel} needed for DP3 calibration is not found"
)
return (
[
rsexecute.execute(dp3_gaincal, nout=1)(
v, calibration_context, global_solution, skymodel
)
for v in vis_list
],
None,
)
# Here we do a global solution over all vis and channels or
# just solutions per vis
if global_solution and (len(vis_list) > 1):
# The conversion is a no op if it's actually a vis
point_vislist = [
rsexecute.execute(divide_visibility, nout=1)(vis_list[i], model_vislist[i])
for i, _ in enumerate(vis_list)
]
global_point_vis_list = rsexecute.execute(concatenate_visibility, nout=1)(
point_vislist, dim="frequency"
)
global_point_vis_list = rsexecute.execute(
integrate_visibility_by_channel, nout=1
)(global_point_vis_list)
# This is a global solution so we only compute one gain table
if gt_list is None or len(gt_list) < 1:
new_gt_list = [
rsexecute.execute(calibration_solve, pure=True, nout=1)(
global_point_vis_list,
)
]
else:
new_gt_list = [
rsexecute.execute(calibration_solve, pure=True, nout=1)(
global_point_vis_list,
gt=gt_list[0],
)
]
return (
[
rsexecute.execute(calibration_apply, nout=1)(v, new_gt_list[0])
for v in vis_list
],
new_gt_list,
)
else:
if gt_list is not None and len(gt_list) > 0:
new_gt_list = [
rsexecute.execute(calibration_solve, pure=True, nout=1)(
v,
model_vislist[i],
gt_list[i],
)
for i, v in enumerate(vis_list)
]
else:
new_gt_list = [
rsexecute.execute(calibration_solve, pure=True, nout=1)(
v,
model_vislist[i],
)
for i, v in enumerate(vis_list)
]
return (
[
rsexecute.execute(calibration_apply)(v, new_gt_list[i])
for i, v in enumerate(vis_list)
],
new_gt_list,
)