""" Pipeline functions using SkyModel. SDP standard pipelines expressed as functions.
"""
__all__ = [
"ical_skymodel_list_rsexecute_workflow",
"continuum_imaging_skymodel_list_rsexecute_workflow",
"spectral_line_imaging_skymodel_list_rsexecute_workflow",
]
import logging
from ska_sdp_datamodels.sky_model.sky_functions import export_skymodel_to_text
from ska_sdp_datamodels.sky_model.sky_model import SkyModel
from ska_sdp_func_python.image import calculate_frequency_taylor_terms_from_image_list
from ska_sdp_func_python.sky_component import (
calculate_skycomponent_list_taylor_terms,
gather_skycomponents_from_channels,
)
from ska_sdp_func_python.visibility import concatenate_visibility_frequency
from rascil.processing_components.flagging.operations import flagging_aoflagger
from rascil.processing_components.parameters import get_parameter, rascil_path
from rascil.workflows.rsexecute import calibrate_list_rsexecute_workflow
from rascil.workflows.rsexecute.execution_support.rsexecute import rsexecute
from rascil.workflows.rsexecute.imaging.imaging_rsexecute import (
invert_list_rsexecute_workflow,
predict_list_rsexecute_workflow,
subtract_list_rsexecute_workflow,
)
from rascil.workflows.rsexecute.skymodel.skymodel_rsexecute import (
deconvolve_skymodel_list_rsexecute_workflow,
invert_skymodel_list_rsexecute_workflow,
predict_skymodel_list_rsexecute_workflow,
residual_skymodel_list_rsexecute_workflow,
restore_centre_skymodel_list_rsexecute_workflow,
restore_skymodel_list_rsexecute_workflow,
)
log = logging.getLogger("rascil-logger")
[docs]
def ical_skymodel_list_rsexecute_workflow(
vis_list,
model_imagelist,
context,
skymodel_list=None,
calibration_context="TG",
controls=None,
do_selfcal=True,
pipeline_name="ical",
**kwargs,
):
"""Create graph for ICAL pipeline using SkyModel
:param vis_list: List of vis (or graph)
:param model_imagelist: list of models (or graph)
:param skymodel_list: list of SkyModels
:param context: imaging context e.g. '2d'
:param calibration_context: Sequence of calibration steps e.g. TGB
:param do_selfcal: Do the selfcalibration?
:param perform_flagging: Run flagging strategy
:param kwargs: Parameters for functions in components
:return:
"""
perform_flagging = get_parameter(kwargs, "perform_flagging", False)
if perform_flagging:
# To perform flagging, all frequencies must be available in the same bvis entry
bvis = rsexecute.execute(concatenate_visibility_frequency)(vis_list)
flagging_strategy_name = get_parameter(kwargs, "flagging_strategy_name", "")
bvis = rsexecute.execute(flagging_aoflagger)(bvis, flagging_strategy_name)
# Undo the frequency concatenation
def reorder_freqs(bvis_list, flagged_bvis):
out_vis_list = bvis_list.copy()
for freq_idx in range(len(bvis_list)):
out_vis_list[freq_idx].flags.data = flagged_bvis.flags.data[
:, :, freq_idx : freq_idx + 1, :
]
return out_vis_list
vis_list = rsexecute.execute(reorder_freqs, nout=len(vis_list))(vis_list, bvis)
gt_list = list()
# Create PSFs
psf_imagelist = invert_list_rsexecute_workflow(
vis_list, model_imagelist, context=context, dopsf=True, **kwargs
)
def trim(x):
return x[0]
psf_imagelist_trimmed = [rsexecute.execute(trim)(d) for d in psf_imagelist]
def copy_vis(v, deep=True, zero=False):
return v.copy(deep=deep, zero=zero)
# Create a list of copied input visibilities
model_vislist = [
rsexecute.execute(copy_vis)(v, deep=True, zero=True) for v in vis_list
]
# Ensure that we always have a skymodel to work with
if skymodel_list is None:
skymodel_list = [
rsexecute.execute(SkyModel)(image=model) for model in model_imagelist
]
if do_selfcal:
# Make the predicted visibilities, selfcalibrate against it correcting the gains, then
# form the residual visibility, then make the residual image
if kwargs.get("calibrate_with_dp3", False):
if kwargs.get("input_dp3_skymodel") is None:
# The skymodel must be converted in a format which DP3 understands
x = [
rsexecute.execute(export_skymodel_to_text, nout=1)(
skymodel, rascil_path("test.skymodel")
)
for skymodel in skymodel_list
]
rsexecute.compute(x, sync=True)
predicted_model_vislist = predict_skymodel_list_rsexecute_workflow(
model_vislist,
skymodel_list,
context=context,
docal=True,
**kwargs,
)
# Selfcalibrate against it correcting the gains
cal_vis_list, gt_list = calibrate_list_rsexecute_workflow(
vis_list,
predicted_model_vislist,
gt_list,
calibration_context=calibration_context,
controls=controls,
**kwargs,
)
# Erase data in the input skymodel_list?
reset_skymodel = get_parameter(kwargs, "reset_skymodel", True)
if reset_skymodel:
def pipeline_zero_skymodel_image(sm):
new_sm = SkyModel(image=sm.image.copy(deep=True))
log.info(
"ical_list_rsexecute_workflow: setting initial model to zero after initial selfcal"
)
if new_sm.image is not None:
new_sm.image["pixels"].data[...] = 0.0
return new_sm
skymodel_list = [
rsexecute.execute(pipeline_zero_skymodel_image, nout=1)(s)
for s in skymodel_list
]
# Make the residual images for the skymodels
residual_imagelist = residual_skymodel_list_rsexecute_workflow(
cal_vis_list,
model_imagelist,
context=context,
skymodel_list=skymodel_list,
**kwargs,
)
else:
cal_vis_list = vis_list
residual_imagelist = residual_skymodel_list_rsexecute_workflow(
cal_vis_list,
model_imagelist,
context=context,
skymodel_list=skymodel_list,
**kwargs,
)
def trim(x):
return x[0]
residual_imagelist_trimmed = [
rsexecute.execute(trim)(d) for d in residual_imagelist
]
skymodel_list = deconvolve_skymodel_list_rsexecute_workflow(
residual_imagelist_trimmed,
psf_imagelist_trimmed,
skymodel_list,
prefix=f"{pipeline_name} cycle 0",
fit_skymodel=True,
**kwargs,
)
# Next major cycles, if nmajor>1
nmajor = get_parameter(kwargs, "nmajor", 5)
if nmajor > 1:
for cycle in range(nmajor):
if do_selfcal:
# Predict the visibility for the skymodel
model_vislist = predict_skymodel_list_rsexecute_workflow(
model_vislist,
skymodel_list,
context=context,
docal=True,
**kwargs,
)
cal_vis_list = [
rsexecute.execute(copy_vis)(v, deep=True, zero=False)
for v in vis_list
]
# Calibrate using the observed visibility and the model visibility
cal_vis_list, gt_list = calibrate_list_rsexecute_workflow(
cal_vis_list,
model_vislist,
gt_list,
calibration_context=calibration_context,
controls=controls,
iteration=cycle,
**kwargs,
)
# Calculate the residual visibility
residual_vislist = subtract_list_rsexecute_workflow(
cal_vis_list, model_vislist
)
# ... and the residual images
residual_imagelist = invert_skymodel_list_rsexecute_workflow(
residual_vislist,
skymodel_list,
docal=True,
dopsf=False,
iteration=0,
**kwargs,
)
else:
# Calculate the residual images
residual_imagelist = residual_skymodel_list_rsexecute_workflow(
cal_vis_list,
skymodel_list,
context=context,
skymodel_list=skymodel_list,
**kwargs,
)
def trim(x):
return x[0]
residual_imagelist_trimmed = [
rsexecute.execute(trim)(d) for d in residual_imagelist
]
# Deconvolve to get an updated skymodel
skymodel_list = deconvolve_skymodel_list_rsexecute_workflow(
residual_imagelist_trimmed,
psf_imagelist_trimmed,
skymodel_list,
prefix=f"{pipeline_name} cycle {cycle + 1}",
**kwargs,
)
# Most of the computations are done here. We noticed that the
# graph size increases non-linearly for each major cycle. We
# faced issues on computing the entire graph at once and so here
# we are persisting the graph so that a smaller graph size
# starts executing as it gets constructed for each major cycle.
# The idea is to introduce a convergence criteria so that we can
# break the loop when criteria is met. More details can
# be found here: https://jira.skatelescope.org/browse/SIM-1015
skymodel_list = rsexecute.persist(skymodel_list)
# We've finished so now we update the residual images and calculate the restored image
residual_imagelist = residual_skymodel_list_rsexecute_workflow(
cal_vis_list,
skymodel_list,
context=context,
skymodel_list=skymodel_list,
**kwargs,
)
(
skymodel_list,
residual_imagelist,
restored_imagelist,
) = restore_skymodel_pipeline_rsexecute_workflow(
skymodel_list, psf_imagelist, residual_imagelist, **kwargs
)
return (residual_imagelist, restored_imagelist, skymodel_list, gt_list)
def restore_skymodel_pipeline_rsexecute_workflow(
skymodel_list, psf_imagelist, residual_imagelist, **kwargs
):
"""Restore images using one of a number of methods selected by restored_output
list: lists of images
integrated: restored is the central channel image plus the average residual
taylor: lists of Taylor terms of all images
:param skymodel_imagelist: List (or graph) of skymodels
:param psf_imagelist: List (or graph) of point spread functions
:param residual_imagelist: List (or graph) of residual images
:param kwargs:
:return:
"""
output = get_parameter(kwargs, "restored_output", "list")
if output == "integrated":
restored_imagelist = restore_centre_skymodel_list_rsexecute_workflow(
skymodel_list, psf_imagelist, residual_imagelist, **kwargs
)
elif output == "taylor":
# In this case, we overwrite the originals with Taylor term versions
nmoment = get_parameter(kwargs, "nmoment", 1)
def extract_sm_image(s):
return s.image
deconvolve_model_imagelist = [
rsexecute.execute(extract_sm_image, nout=1)(sm) for sm in skymodel_list
]
deconvolve_model_imagelist = rsexecute.execute(
calculate_frequency_taylor_terms_from_image_list, nout=nmoment
)(deconvolve_model_imagelist, nmoment=nmoment)
def trim(x):
return x[0]
residual_imagelist_trimmed = [
rsexecute.execute(trim)(d) for d in residual_imagelist
]
residual_imagelist = rsexecute.execute(
calculate_frequency_taylor_terms_from_image_list, nout=nmoment
)(residual_imagelist_trimmed, nmoment=nmoment)
def untrim(x):
return (x, 0.0)
residual_imagelist = [rsexecute.execute(untrim)(d) for d in residual_imagelist]
# Now we need to create a skymodel in Taylor space
skymodel_list = rsexecute.execute(
convert_skycomponents_taylor_terms_list, nout=nmoment
)(deconvolve_model_imagelist, nmoment=nmoment, skymodel_list=skymodel_list)
# Note that the psf has not been converted to Taylor form
restored_imagelist = restore_skymodel_list_rsexecute_workflow(
skymodel_list, psf_imagelist, residual_imagelist, **kwargs
)
elif output == "list":
restored_imagelist = restore_skymodel_list_rsexecute_workflow(
skymodel_list, psf_imagelist, residual_imagelist, **kwargs
)
else:
raise ValueError(
f"continuum_imaging_skymodel_list_rsexecute_workflow: Unknown restored_output {output}"
)
return skymodel_list, residual_imagelist, restored_imagelist
def convert_skycomponents_taylor_terms_list(
deconvolve_model_imagelist, nmoment, skymodel_list
):
"""Convert skycomponents into Taylor term form and update skymodel
:param deconvolve_model_imagelist: Deconvolved model in Taylor term sequence
:param nmoment: Number of moments/Taylor terms
:param skymodel_list: Skymodel in Frequency space, to be updated
:return:
"""
# Extract the skycomponents in frequency space
skycomponent_list = [sm.components for sm in skymodel_list]
# Gather the different frequency components into a set of multifrequency skycomponents
channel_sky_component_list = gather_skycomponents_from_channels(skycomponent_list)
if len(channel_sky_component_list) == 0:
skymodel_list = [
SkyModel(
image=deconvolve_model_imagelist[moment],
)
for moment in range(nmoment)
]
return skymodel_list
# Convert to Taylor term components [source][taylor term]
skycomponent_list = calculate_skycomponent_list_taylor_terms(
channel_sky_component_list, nmoment=nmoment
)
skymodel_list = [
SkyModel(
image=deconvolve_model_imagelist[moment],
components=[scl[moment] for scl in skycomponent_list],
)
for moment in range(nmoment)
]
return skymodel_list
[docs]
def continuum_imaging_skymodel_list_rsexecute_workflow(
vis_list, model_imagelist, context, skymodel_list=None, **kwargs
):
"""Create graph for the continuum imaging pipeline.
Same as ICAL but with no selfcal.
:param vis_list: List of vis (or graph)
:param model_imagelist: List of models (or graph)
:param skymodel_list: list of SkyModels
:param context: Imaging context
:param skymodel_list: list of SkyModels
:param kwargs: Parameters for functions in components
:return:
"""
(
residual_imagelist,
restore_imagelist,
skymodel_list,
gt_list,
) = ical_skymodel_list_rsexecute_workflow(
vis_list,
model_imagelist,
context=context,
skymodel_list=skymodel_list,
calibration_context="",
do_selfcal=False,
pipeline_name="cip",
**kwargs,
)
return (
residual_imagelist,
restore_imagelist,
skymodel_list,
)
[docs]
def spectral_line_imaging_skymodel_list_rsexecute_workflow(
vis_list,
model_imagelist,
context,
continuum_model_imagelist=None,
**kwargs,
):
"""Create graph for spectral line imaging pipeline
Uses the continuum imaging rsexecute pipeline after subtraction of a continuum model
:param vis_list: List of vis (or graph)
:param model_imagelist: List of Spectral line model (or graph)
:param continuum_model_imagelist: Continuum model list (or graph)
:param context: Imaging context e.g. ng or 2d
:param kwargs: Parameters for functions in components
:return: list of (deconvolved model, residual, restored) or graph
"""
if continuum_model_imagelist is not None:
vis_list = predict_list_rsexecute_workflow(
vis_list,
continuum_model_imagelist,
context=context,
**kwargs,
)
return continuum_imaging_skymodel_list_rsexecute_workflow(
vis_list,
model_imagelist,
context=context,
pipeline_name="slip",
**kwargs,
)