"""
Workflows for imaging, including predict, invert, residual, restore,
deconvolve, weight, taper, zero, subtract and sum results from invert
"""
__all__ = [
"predict_list_rsexecute_workflow",
"invert_list_rsexecute_workflow",
"residual_list_rsexecute_workflow",
"restore_list_rsexecute_workflow",
"deconvolve_list_rsexecute_workflow",
"deconvolve_list_channel_rsexecute_workflow",
"weight_list_rsexecute_workflow",
"taper_list_rsexecute_workflow",
"zero_list_rsexecute_workflow",
"subtract_list_rsexecute_workflow",
"sum_invert_results_rsexecute",
"sum_predict_results_rsexecute",
"restore_centre_rsexecute_workflow",
"threshold_list_rsexecute",
]
import collections
import copy
import logging
import numpy
from ska_sdp_datamodels.gridded_visibility import create_griddata_from_image
from ska_sdp_datamodels.image.image_model import Image
from ska_sdp_func_python.grid_data import (
grid_visibility_weight_to_griddata,
griddata_merge_weights,
griddata_visibility_reweight,
)
from ska_sdp_func_python.image import (
fit_psf,
image_gather_facets,
image_scatter_channels,
image_scatter_facets,
restore_cube,
)
from ska_sdp_func_python.image.deconvolution import (
deconvolve_cube,
deconvolve_list,
radler_deconvolve_list,
)
from ska_sdp_func_python.imaging import (
invert_visibility,
normalise_sumwt,
predict_visibility,
remove_sumwt,
sum_invert_results,
sum_predict_results,
taper_visibility_gaussian,
)
from rascil.processing_components.parameters import get_parameter
from rascil.workflows.rsexecute.execution_support.rsexecute import rsexecute
from rascil.workflows.rsexecute.image import (
image_gather_channels_rsexecute,
sum_images_rsexecute,
)
log = logging.getLogger("rascil-logger")
[docs]
def predict_list_rsexecute_workflow(vis_list, model_imagelist, context, **kwargs):
"""Predict, iterating over both the scattered vis_list and image
The visibility and image are scattered, the visibility is predicted on each part, and then the
parts are assembled.
:param vis_list: list of vis (or graph)
:param model_imagelist: list of models (or graph)
:param context: Type of processing e.g. 2d, ng
:param kwargs: Parameters for functions in components
:return: List of vis_lists
For example::
dprepb_model = [rsexecute.execute(create_low_test_image_from_gleam)
(npixel=npixel, frequency=[frequency[f]], channel_bandwidth=[channel_bandwidth[f]],
cellsize=cellsize, phasecentre=phasecentre, polarisation_frame=PolarisationFrame("stokesI"),
flux_limit=3.0, applybeam=True)
for f, freq in enumerate(frequency)]
dprepb_model_list = rsexecute.persist(dprepb_model_list)
predicted_vis_list = predict_list_rsexecute_workflow(vis_list, model_imagelist=dprepb_model_list,
context='wstack', vis_slices=51)
predicted_vis_list = rsexecute.compute(predicted_vis_list , sync=True)
""" # noqa=E501
# Predict_visibility does not clear the vis so we have to do it here.
vis_list = zero_list_rsexecute_workflow(vis_list, copy=False)
# Loop over all windows
assert len(model_imagelist) == len(vis_list)
predict_results = [
rsexecute.execute(predict_visibility, pure=True, nout=1)(
vis, model_imagelist[ivis], context=context, **kwargs
)
for ivis, vis in enumerate(vis_list)
]
return rsexecute.optimize(predict_results)
[docs]
def invert_list_rsexecute_workflow(
vis_list, template_model_imagelist, context, dopsf=False, normalise=True, **kwargs
):
"""Sum results from invert, iterating over the scattered image and vis_list
:param vis_list: list of vis (or graph)
:param template_model_imagelist: list of template models (or graph)
:param dopsf: Make the PSF instead of the dirty image
:param normalise: normalise by sumwt
:param context: Imaging context
:param kwargs: Parameters for functions in components
:return: List of (image, sumwt) tuples, one per vis in vis_list
For example::
model_list = [rsexecute.execute(create_image_from_visibility)
(v, npixel=npixel, cellsize=cellsize, polarisation_frame=pol_frame)
for v in vis_list]
model_list = rsexecute.persist(model_list)
dirty_list = invert_list_rsexecute_workflow(vis_list, template_model_imagelist=model_list, context='wstack',
vis_slices=51)
dirty_sumwt_list = rsexecute.compute(dirty_list, sync=True)
dirty, sumwt = dirty_sumwt_list[centre]
""" # noqa=E501
if not isinstance(template_model_imagelist, collections.abc.Iterable):
template_model_imagelist = [template_model_imagelist]
# Loop over all vis_lists independently
assert len(template_model_imagelist) == len(vis_list)
invert_results = [
rsexecute.execute(invert_visibility, nout=2)(
vis,
template_model_imagelist[ivis],
dopsf=dopsf,
normalise=normalise,
context=context,
**kwargs,
)
for ivis, vis in enumerate(vis_list)
]
return rsexecute.optimize(invert_results)
[docs]
def residual_list_rsexecute_workflow(vis, model_imagelist, context="2d", **kwargs):
"""Create a graph to calculate (list or graph) of residual images
:param vis: List of vis (or graph)
:param model_imagelist: Model used to determine image parameters
:param context: Imaging context e.g. '2d', 'ng'
:param kwargs: Parameters for functions in components
:return: list of (image, sumwt) tuples or graph
"""
model_vis = zero_list_rsexecute_workflow(vis)
model_vis = predict_list_rsexecute_workflow(
model_vis, model_imagelist, context=context, **kwargs
)
residual_vis = subtract_list_rsexecute_workflow(vis, model_vis)
result = invert_list_rsexecute_workflow(
residual_vis,
model_imagelist,
context=context,
dopsf=False,
normalise=True,
**kwargs,
)
return rsexecute.optimize(result)
def restore_list_singlefacet_rsexecute_workflow(
model_imagelist, psf_imagelist, residual_imagelist=None, clean_beam=None, **kwargs
):
"""Create a graph to calculate the restored images
This restores each frequency plane using a clean_beam specified
or fitted from the frequency-summed PSF.
The output is an image for each frequency. Note that the noise in the residual is
(correctly) that for each frequency.
:param model_imagelist: Model list (or graph)
:param psf_imagelist: PSF list (or graph)
:param residual_imagelist: Residual list (or graph)
:param clean_beam: dict e.g.
{"bmaj":0.1, "bmin":0.05, "bpa":-60.0}. Units are deg, deg, deg
:param kwargs: Parameters for functions in components
:return: list of restored images (or graph)
"""
if residual_imagelist is not None:
if len(residual_imagelist) != len(model_imagelist):
log.error("Model and residual list have different lengths")
raise ValueError("Model and residual list have different lengths")
if clean_beam is None:
psf_list = sum_invert_results_rsexecute(psf_imagelist)
psf = rsexecute.execute(normalise_sumwt)(psf_list[0], psf_list[1])
clean_beam = rsexecute.execute(fit_psf, nout=1)(psf)
if residual_imagelist is not None:
residual_list = rsexecute.execute(remove_sumwt, nout=len(residual_imagelist))(
residual_imagelist
)
restored_list = [
rsexecute.execute(restore_cube, nout=1)(
model_imagelist[i], residual=residual_list[i], clean_beam=clean_beam
)
for i, _ in enumerate(model_imagelist)
]
else:
restored_list = [
rsexecute.execute(restore_cube, nout=1)(
model_imagelist[i],
residual=None,
clean_beam=clean_beam,
)
for i, _ in enumerate(model_imagelist)
]
return rsexecute.optimize(restored_list)
[docs]
def restore_list_rsexecute_workflow(
model_imagelist,
psf_imagelist,
residual_imagelist=None,
restore_facets=1,
restore_overlap=8,
restore_taper="tukey",
clean_beam=None,
**kwargs,
):
"""Create a graph to calculate the restored image
:param model_imagelist: Model list (or graph)
:param psf_imagelist: PSF list (or graph)
:param residual_imagelist: Residual list (or graph)
:param kwargs: Parameters for functions in components
:param restore_facets: Number of facets used per axis (used to distribute)
:param restore_overlap: Overlap in pixels (0 is best)
:param restore_taper: Type of taper between facets
:return: list of restored images (or graph)
"""
if residual_imagelist is not None:
if len(residual_imagelist) != len(model_imagelist):
log.error("Model and residual list have different lengths")
raise ValueError("Model and residual list have different lengths")
if restore_overlap < 0:
raise ValueError("Number of pixels for restore overlap must be >= 0")
if restore_facets % 2 == 0 or restore_facets == 1:
actual_number_facets = restore_facets
else:
actual_number_facets = max(1, (restore_facets - 1))
if clean_beam is None:
clean_beam_list = sum_invert_results_rsexecute(psf_imagelist)
psf = rsexecute.execute(normalise_sumwt)(clean_beam_list[0], clean_beam_list[1])
clean_beam = rsexecute.execute(fit_psf, nout=1)(psf)
# Scatter each list element into a list. We will then run restore_cube on each
facet_model_list = [
rsexecute.execute(
image_scatter_facets, nout=actual_number_facets * actual_number_facets
)(model, facets=restore_facets, overlap=restore_overlap, taper=restore_taper)
for model in model_imagelist
]
if residual_imagelist is not None:
residual_list = rsexecute.execute(remove_sumwt, nout=len(residual_imagelist))(
residual_imagelist
)
facet_residual_list = [
rsexecute.execute(
image_scatter_facets, nout=actual_number_facets * actual_number_facets
)(
residual,
facets=restore_facets,
overlap=restore_overlap,
taper=restore_taper,
)
for residual in residual_list
]
facet_restored_list = [
[
rsexecute.execute(
restore_cube, nout=actual_number_facets * actual_number_facets
)(
model=facet_model_list[i][im],
residual=facet_residual_list[i][im],
clean_beam=clean_beam,
)
for im, _ in enumerate(facet_model_list[i])
]
for i, _ in enumerate(model_imagelist)
]
else:
facet_restored_list = [
[
rsexecute.execute(
restore_cube, nout=actual_number_facets * actual_number_facets
)(model=facet_model_list[i][im], clean_beam=clean_beam)
for im, _ in enumerate(facet_model_list[i])
]
for i, _ in enumerate(model_imagelist)
]
# Now we run restore_cube on each and gather the results across all facets
restored_imagelist = [
rsexecute.execute(image_gather_facets)(
facet_restored_list[i],
model_imagelist[i],
facets=restore_facets,
overlap=restore_overlap,
taper=restore_taper,
)
for i, _ in enumerate(model_imagelist)
]
def set_clean_beam(r, cb):
r.attrs["clean_beam"] = cb
return r
restored_imagelist = [
rsexecute.execute(set_clean_beam, nout=1)(r, clean_beam)
for r in restored_imagelist
]
return rsexecute.optimize(restored_imagelist)
[docs]
def restore_centre_rsexecute_workflow(
model_imagelist, psf_imagelist, residual_imagelist=None, **kwargs
):
"""Create a graph to calculate the restored image
This does the following:
- Takes the centre frequency slice of the model
- Integrates the residual across the band
- Fits to the band-integrated PSF
- Restores the model, clean_beam, and residual
This will not give any information on the spectral behaviour,
use residual_list_rsexecute_workflow for that purpose.
:param model_imagelist: Model list (or graph)
:param psf_imagelist: PSF list (or graph)
:param residual_imagelist: Residual list (or graph)
:param kwargs: Parameters for functions in components
:return: list of restored images (or graphs)
"""
if residual_imagelist is not None:
if len(residual_imagelist) != len(model_imagelist):
log.error("Model and residual list have different lengths")
raise ValueError("Model and residual list have different lengths")
# Find the PSF by summing over all channels, fit to this psf
psf = sum_invert_results_rsexecute(psf_imagelist)[0]
clean_beam = rsexecute.execute(fit_psf, nout=1)(psf)
# Add the model over all channels
centre = len(model_imagelist) // 2
model = model_imagelist[centre]
if residual_imagelist is not None:
# Get residual calculated across the band
residual = sum_invert_results_rsexecute(residual_imagelist)[0]
restored = rsexecute.execute(restore_cube, nout=1)(
model,
residual=residual,
clean_beam=clean_beam,
)
else:
restored = rsexecute.execute(restore_cube, nout=1)(
model,
clean_beam=clean_beam,
)
# optimize the graph to reduce size
return rsexecute.optimize(restored)
def deconvolve_list_singlefacet_rsexecute_workflow(
dirty_list,
psf_list,
model_imagelist,
sensitivity_list=None,
prefix="",
mask=None,
**kwargs,
):
"""Create a graph for deconvolution of a single image, adding to the model
:param dirty_list: list of dirty images (or graph)
:param psf_list: list of psfs (or graph)
:param model_imagelist: list of models (or graph)
:param sensitivity_list: (optional) sensitivity images
:param prefix: Informative prefix to log messages
:param mask: Mask for deconvolution
:param kwargs: Parameters for functions
:return: graph for the deconvolution
For example::
dirty_imagelist = invert_list_rsexecute_workflow(vis_list, model_imagelist, context='2d',
dopsf=False, normalise=True)
psf_imagelist = invert_list_rsexecute_workflow(vis_list, model_imagelist, context='2d',
dopsf=True, normalise=True)
dirty_imagelist = rsexecute.persist(dirty_imagelist)
psf_imagelist = rsexecute.persist(psf_imagelist)
dec_imagelist = deconvolve_list_singlefacet_rsexecute_workflow(dirty_imagelist, psf_imagelist,
model_imagelist, niter=1000, fractional_threshold=0.01,
scales=[0, 3, 10], algorithm='mmclean', nmoment=3, nchan=freqwin,
threshold=0.1, gain=0.7)
dec_imagelist = rsexecute.persist(dec_imagelist)
""" # noqa=E501
# Now do the deconvolution for a single facet.
def imaging_deconvolve(dirty, psf, model, sens, gthreshold, msk):
if not isinstance(dirty, collections.abc.Iterable):
raise ValueError(
"deconvolve_list_singlefacet_rsexecute_workflow: dirty is not Iterable"
)
if not isinstance(dirty[0], Image):
raise ValueError(
f"deconvolve_list_singlefacet_rsexecute_workflow: "
f"dirty[0] is not an Image: {dirty[0]}"
)
log.info("deconvolve_list_singlefacet_rsexecute_workflow: Starting clean")
this_peak = numpy.max([numpy.max(numpy.abs(dd["pixels"].data)) for dd in dirty])
if this_peak > 1.1 * gthreshold:
kwargs["threshold"] = gthreshold
use_radler = get_parameter(kwargs, "use_radler", False)
if use_radler:
try:
# pylint: disable-next=import-error
import radler # noqa=F401
except ImportError:
log.info(
"Radler module not available. "
"RASCIL deconvolution is used instead."
)
use_radler = False
if use_radler:
# deconvolve with radler.
comp = radler_deconvolve_list(dirty, psf, prefix=prefix, **kwargs)
else:
comp, _ = deconvolve_list(
dirty, psf, prefix=prefix, mask=msk, sensitivity=sens, **kwargs
)
for ic, c in enumerate(comp):
c["pixels"].data = c["pixels"].data + model[ic]["pixels"].data
return comp
else:
return model
threshold = get_parameter(kwargs, "threshold", 0.0)
clean_list = rsexecute.execute(imaging_deconvolve, nout=len(dirty_list))(
dirty_list,
psf_list,
model_imagelist,
sensitivity_list,
threshold,
msk=mask,
)
return clean_list
[docs]
def deconvolve_list_rsexecute_workflow(
dirty_list,
psf_list,
model_imagelist,
sensitivity_list=None,
prefix="",
mask=None,
**kwargs,
):
"""Create a graph for deconvolution, adding to the model
note dirty_list and psf_list must have sumwt trimmed before calling this function
:param dirty_list: list of dirty images (or graph)
:param psf_list: list of psfs (or graph)
:param model_imagelist: list of models (or graph)
:param prefix: Informative prefix to log messages
:param mask: Mask for deconvolution
:param kwargs: Parameters for functions
:return: graph for the deconvolution
For example::
dirty_imagelist = invert_list_rsexecute_workflow(vis_list, model_imagelist, context='2d',
dopsf=False, normalise=True)
psf_imagelist = invert_list_rsexecute_workflow(vis_list, model_imagelist, context='2d',
dopsf=True, normalise=True)
dirty_imagelist = rsexecute.persist(dirty_imagelist)
psf_imagelist = rsexecute.persist(psf_imagelist)
dec_imagelist = deconvolve_list_rsexecute_workflow(dirty_imagelist, psf_imagelist,
model_imagelist, niter=1000, fractional_threshold=0.01,
scales=[0, 3, 10], algorithm='mmclean', nmoment=3, nchan=freqwin,
threshold=0.1, gain=0.7)
dec_imagelist = rsexecute.persist(dec_imagelist)
""" # noqa=E501
# We can divide the processing up into overlapping facets and then
# run the single facet deconvolution on each
deconvolve_facets = get_parameter(kwargs, "deconvolve_facets", 1)
if deconvolve_facets == 1:
return deconvolve_list_singlefacet_rsexecute_workflow(
dirty_list,
psf_list,
model_imagelist,
sensitivity_list=sensitivity_list,
prefix=prefix,
mask=None,
**kwargs,
)
nchan = len(dirty_list)
# Find the global threshold. This uses the peak in the
# average on the frequency axis since we want to use it
# in a stopping criterion in a moment clean
global_threshold = threshold_list_rsexecute(
dirty_list,
prefix=prefix,
**kwargs,
)
# Dask is apparently smart enough to evaluate futures in kwargs
kwargs["threshold"] = global_threshold
# Single facet case
deconvolve_overlap = get_parameter(kwargs, "deconvolve_overlap", 0)
deconvolve_taper = get_parameter(kwargs, "deconvolve_taper", None)
deconvolve_number_facets = deconvolve_facets**2
# Each list is a sequence in frequency. We want to scatter each channel
# image into facet-space and then transpose from [channel][facet] to
# [facet][channel],
scattered_facets_model_list = scatter_facets_and_transpose(
deconvolve_facets,
deconvolve_number_facets,
deconvolve_overlap,
deconvolve_taper,
model_imagelist,
nchan,
)
scattered_facets_dirty_list = scatter_facets_and_transpose(
deconvolve_facets,
deconvolve_number_facets,
deconvolve_overlap,
deconvolve_taper,
dirty_list,
nchan,
)
if sensitivity_list is not None:
scattered_facets_sensitivity_list = scatter_facets_and_transpose(
deconvolve_facets,
deconvolve_number_facets,
deconvolve_overlap,
deconvolve_taper,
sensitivity_list,
nchan,
)
else:
scattered_facets_sensitivity_list = [
None for facet in range(deconvolve_number_facets)
]
# For the PSF we extract the parts around the phasecentres so no scattering
# is appropriate
def imaging_extract_psf(psf, facets):
assert not numpy.isnan(numpy.sum(psf["pixels"].data)), "NaNs present in PSF"
cx = psf["pixels"].shape[3] // 2
cy = psf["pixels"].shape[2] // 2
wx = psf["pixels"].shape[3] // facets
wy = psf["pixels"].shape[2] // facets
xbeg = cx - wx // 2
xend = cx + wx // 2
ybeg = cy - wy // 2
yend = cy + wy // 2
spsf_data = (
psf["pixels"].isel({"x": slice(xbeg, xend), "y": slice(ybeg, yend)}).data
)
wcs = copy.deepcopy(psf.image_acc.wcs)
wcs.wcs.crpix[0] -= xbeg
wcs.wcs.crpix[1] -= ybeg
spsf = Image.constructor(
data=spsf_data, polarisation_frame=psf.image_acc.polarisation_frame, wcs=wcs
)
return spsf
psf_list_extracted = [
rsexecute.execute(imaging_extract_psf)(p, deconvolve_facets) for p in psf_list
]
if mask is not None:
mask_list = rsexecute.execute(
image_scatter_facets, nout=deconvolve_number_facets
)(mask, facets=deconvolve_facets, overlap=deconvolve_overlap)
else:
mask_list = [None for _ in range(deconvolve_number_facets)]
# Do the deconvolution for each facet in turn.
# Each item of the scattered_results_list
# contains the clean image cube and lists of
# list components (a number for each channel)
scattered_results_list = [
deconvolve_list_singlefacet_rsexecute_workflow(
d,
psf_list_extracted,
m,
sensitivity_list=sens,
prefix=f"{prefix} subimage {subimage}",
msk=msk,
**kwargs,
)
for subimage, (d, m, msk, sens) in enumerate(
zip(
scattered_facets_dirty_list,
scattered_facets_model_list,
mask_list,
scattered_facets_sensitivity_list,
)
)
]
# We want to avoid constructing the entire cube so we do
# the inverse of how we got here:
# i.e. SCATTER BY CHANNEL then GATHER BY FACET
# Gather the results back into one image, correcting for
# overlaps as necessary. The taper function is used to
# feather the facets together
result = [
rsexecute.execute(image_gather_facets, nout=1)(
[
scattered_results_list[facet][chan]
for facet in range(deconvolve_number_facets)
],
model_imagelist[chan],
facets=deconvolve_facets,
overlap=deconvolve_overlap,
)
for chan in range(nchan)
]
# optimize the graph to reduce size
return rsexecute.optimize(result)
def scatter_facets_and_transpose(
deconvolve_facets,
deconvolve_number_facets,
deconvolve_overlap,
deconvolve_taper,
model_imagelist,
nchan,
):
"""Scatter images by facet and then gather by channels
:param deconvolve_facets: Number of facets per axis
:param deconvolve_number_facets: Square of deconvolve_facets
:param deconvolve_overlap: Overlap in pixels
:param deconvolve_taper: Type of taper
:param model_imagelist: List of models
:param nchan: Number of channels
:return: List of frequency image cubes (or graph), arranged by facet
"""
# Scatter into [channel][facet] space
scattered_channels_facets_list = [
rsexecute.execute(image_scatter_facets, nout=deconvolve_number_facets)(
model_imagelist[chan],
facets=deconvolve_facets,
overlap=deconvolve_overlap,
taper=deconvolve_taper,
)
for chan in range(nchan)
]
# Transpose from [channel][facet] to [facet][channel]
# A direct would create too many (channel*facet) tasks. We double
# delay it to reduce number of tasks.
scattered_facets_channels_list = [
rsexecute.execute(
[scattered_channels_facets_list[chan][facet] for chan in range(nchan)],
nout=nchan,
)
for facet in range(deconvolve_number_facets)
]
return rsexecute.optimize(scattered_facets_channels_list)
[docs]
def deconvolve_list_channel_rsexecute_workflow(
dirty_list, psf_list, model_imagelist, subimages, **kwargs
):
"""Create a graph for deconvolution by channels, adding to the model
Does deconvolution channel by channel.
:param dirty_list: list or graph of dirty images
:param psf_list: list or graph of psf images. The psfs must be the size of a facet
:param model_imagelist: list of graph of models
:param subimages: Number of channels to split into
:param kwargs: Parameters for functions in components
:return: list of updated models (or graphs)
"""
def imaging_deconvolve_channel(dirty, psf):
comp, _ = deconvolve_cube(dirty, psf, **kwargs)
return comp
def imaging_add_comp_model(sum_model, model):
sum_model.data += model.data
return sum_model
def _create_empty_image(model):
return Image.constructor(
data=numpy.zeros_like(model["pixels"].data),
polarisation_frame=model.image_acc.polarisation_frame,
wcs=model.image_acc.wcs,
clean_beam=model.attrs["clean_beam"],
)
output = rsexecute.execute(_create_empty_image, nout=1, pure=True)(model_imagelist)
dirty_lists = rsexecute.execute(image_scatter_channels, nout=subimages, pure=True)(
dirty_list[0], subimages=subimages
)
results = [
rsexecute.execute(imaging_deconvolve_channel)(dirty_list, psf_list[0])
for dirty_list in dirty_lists
]
result = image_gather_channels_rsexecute(results, output)
result = rsexecute.execute(imaging_add_comp_model, nout=1, pure=True)(
result, model_imagelist
)
return rsexecute.optimize(result)
def griddata_merge_weights_rsexecute(gd_list):
"""Merge weights into one grid using a reduction
:param gd_list: List of griddatas or graph
:return:
"""
split = 2
if len(gd_list) >= split:
centre = len(gd_list) // split
result = [
griddata_merge_weights_rsexecute(gd_list[:centre]),
griddata_merge_weights_rsexecute(gd_list[centre:]),
]
return rsexecute.execute(griddata_merge_weights, nout=1)(result)
else:
return rsexecute.execute(griddata_merge_weights, nout=1)(gd_list)
[docs]
def weight_list_rsexecute_workflow(
vis_list, model_imagelist, weighting="uniform", robustness=0.0, **kwargs
):
"""Weight the visibility data
This is done collectively so the weights are summed over all vis_lists and then
corrected
:param vis_list:
:param model_imagelist: Model required to determine weighting parameters
:param weighting: Type of weighting
:param kwargs: Parameters for functions in graphs
:return: List of vis_graphs
For example::
vis_list = weight_list_rsexecute_workflow(vis_list, model_list, weighting='uniform')
""" # noqa=E501
if weighting == "natural":
return [
rsexecute.execute(griddata_visibility_reweight)(
v, None, weighting=weighting
)
for i, v in enumerate(vis_list)
]
def imaging_grid_weights(vis, model):
if vis is not None:
if model is not None:
griddata = create_griddata_from_image(
model, polarisation_frame=vis.visibility_acc.polarisation_frame
)
griddata = grid_visibility_weight_to_griddata(vis, griddata)
return griddata
else:
return None
else:
return None
weight_list = [
rsexecute.execute(imaging_grid_weights, pure=True, nout=1)(
vis_list[i], model_imagelist[i]
)
for i in range(len(vis_list))
]
merged_weight_grid = griddata_merge_weights_rsexecute(weight_list)
def imaging_re_weight(vis, gd):
if gd is not None:
if vis is not None:
vis = griddata_visibility_reweight(
vis, gd[0], weighting=weighting, robustness=robustness, sumwt=gd[1]
)
return vis
else:
return None
else:
return vis
result = [
rsexecute.execute(imaging_re_weight, nout=1)(v, merged_weight_grid)
for i, v in enumerate(vis_list)
]
return rsexecute.optimize(result)
[docs]
def taper_list_rsexecute_workflow(vis_list, size_required):
"""Taper to desired size
:param vis_list: List of vis (or graph)
:param size_required: Size in radians
:return: List of vis (or graph)
"""
result = [
rsexecute.execute(taper_visibility_gaussian, nout=1)(v, beam=size_required)
for v in vis_list
]
return rsexecute.optimize(result)
[docs]
def zero_list_rsexecute_workflow(vis_list, copy=True):
"""Creates a new vis_list and initialises all to zero
:param vis_list: List of vis (or graph)
:param copy: Make a new copy?
:return: List of vis (or graph)
"""
def imaging_zero_vis(vis):
if vis is not None:
if copy:
zerovis = vis.copy(deep=True, zero=True)
return zerovis
else:
vis["vis"][...] = 0.0
return vis
else:
return None
result = [
rsexecute.execute(imaging_zero_vis, pure=True, nout=1)(v) for v in vis_list
]
return rsexecute.optimize(result)
[docs]
def subtract_list_rsexecute_workflow(vis_list, model_vislist):
"""Initialise vis to zero
:param vis_list: List of vis (or graph)
:param model_vislist: Model to be subtracted (or graph)
:return: List of vis or graph
"""
def imaging_subtract_vis(vis, model_vis):
if vis is not None and model_vis is not None:
assert vis.vis.shape == model_vis.vis.shape
subvis = vis.copy(deep=True)
subvis["vis"].data[...] -= model_vis["vis"].data[...]
return subvis
else:
return None
result = [
rsexecute.execute(imaging_subtract_vis, pure=True, nout=1)(
vis=vis_list[i], model_vis=model_vislist[i]
)
for i in range(len(vis_list))
]
return rsexecute.optimize(result)
[docs]
def sum_predict_results_rsexecute(bvis_list, split=2):
"""Sum a set of predict results
:param bvis_list: List of (image, sum weights) tuples
:param split: Split into
:return: BlockVis
"""
if len(bvis_list) > split:
centre = len(bvis_list) // split
result = [
sum_predict_results_rsexecute(bvis_list[:centre]),
sum_predict_results_rsexecute(bvis_list[centre:]),
]
return rsexecute.execute(sum_predict_results, nout=2)(result)
else:
return rsexecute.execute(sum_predict_results, nout=2)(bvis_list)
[docs]
def sum_invert_results_rsexecute(image_list):
"""Sum a set of invert results with appropriate weighting
Note that in the case of a single element of image_list a copy is made
:param image_list: List of (image, sum weights) tuples
:return: image, sum of weights
"""
split = 2
if len(image_list) >= split:
centre = len(image_list) // split
result = [
sum_invert_results_rsexecute(image_list[:centre]),
sum_invert_results_rsexecute(image_list[centre:]),
]
return rsexecute.execute(sum_invert_results, nout=2)(result)
else:
return rsexecute.execute(sum_invert_results, nout=2)(image_list)
[docs]
def threshold_list_rsexecute(imagelist, prefix="", **kwargs):
"""Find actual threshold for list of results
:param prefix: Prefix in log messages
:param imagelist: List of images
:return:
"""
# Use the sum since it's quickest
frequency_plane = sum_images_rsexecute(imagelist)
norm = 1.0 / len(imagelist)
threshold = get_parameter(kwargs, "threshold", 0.0)
fractional_threshold = get_parameter(kwargs, "fractional_threshold", 0.1)
nfacets = get_parameter(kwargs, "deconvolve_facets", 1)
overlap = get_parameter(kwargs, "deconvolve_overlap", 0)
def find_peak_moment0(m0):
if nfacets > 1:
ny, nx = m0["pixels"].shape[2:]
blcx = (nfacets - 1) * overlap
blcy = (nfacets - 1) * overlap
trcx = nx - (nfacets - 1) * overlap
trcy = ny - (nfacets - 1) * overlap
this_peak = norm * numpy.max(
numpy.abs(
numpy.average(m0["pixels"].data[..., blcy:trcy, blcx:trcx], axis=0)
)
)
log.info(
"threshold_list_rsexecute: peak in cleaned area = %f," % (this_peak)
)
else:
this_peak = norm * numpy.max(
numpy.abs(numpy.average(m0["pixels"].data, axis=0))
)
log.info("threshold_list_rsexecute: peak entire image = %f," % (this_peak))
actual = max(this_peak * fractional_threshold, threshold)
log.info(
"threshold_list_rsexecute: Global peak = %.6f, "
"sub-image clean threshold will be %.6f" % (this_peak, actual)
)
return actual
return rsexecute.execute(find_peak_moment0, nout=1)(frequency_plane)