Source code for ska_sdp_exec_swiftly.api_helper

# pylint: disable=too-many-arguments
# pylint: disable=consider-using-set-comprehension
"""
Some helper function for Distributed Fourier Transform API
"""

import numpy

from .fourier_transform.fourier_algorithm import (
    make_facet_from_sources,
    make_subgrid_from_sources,
)


[docs] def make_subgrid(N, xA_size, sg_off0, sg_A0, sg_off1, sg_A1, sources): """make subgrid""" if isinstance(sg_A0, list) and isinstance(sg_A1, list): sg_A0 = make_mask_from_slice(sg_A0[0], sg_A0[1]) sg_A1 = make_mask_from_slice(sg_A1[0], sg_A1[1]) return make_subgrid_from_sources( sources, N, xA_size, [sg_off0, sg_off1], [sg_A0, sg_A1] )
[docs] def make_facet( N, yB_size, facet_off0, facet_B0, facet_off1, facet_B1, sources ): """make facet""" # Create facet if isinstance(facet_B0, list) and isinstance(facet_B1, list): facet_B0 = make_mask_from_slice(facet_B0[0], facet_B0[1]) facet_B1 = make_mask_from_slice(facet_B1[0], facet_B1[1]) return make_facet_from_sources( sources, N, yB_size, [facet_off0, facet_off1], [facet_B0, facet_B1] )
[docs] def check_facet( N, facet_off0, facet_B0, facet_off1, facet_B1, approx_facet, sources ): """ check facet using sources """ # Re-generate facet to compare against if isinstance(facet_B0, list) and isinstance(facet_B1, list): facet_B0 = make_mask_from_slice(facet_B0[0], facet_B0[1]) facet_B1 = make_mask_from_slice(facet_B1[0], facet_B1[1]) yB_size = approx_facet.shape[0] facet = make_facet( N, yB_size, facet_off0, facet_B0, facet_off1, facet_B1, sources ) # Compare against result return numpy.sqrt(numpy.average(numpy.abs(facet - approx_facet) ** 2))
[docs] def check_residual(residual_facet): """ check residual image """ return numpy.sqrt(numpy.average(numpy.abs(residual_facet) ** 2))
[docs] def check_subgrid(N, sg_off0, sg_A0, sg_off1, sg_A1, approx_subgrid, sources): """ check subgrid using sources """ # Compare against subgrid (normalised) if isinstance(sg_A0, list) and isinstance(sg_A1, list): sg_A0 = make_mask_from_slice(sg_A0[0], sg_A0[1]) sg_A1 = make_mask_from_slice(sg_A1[0], sg_A1[1]) subgrid = make_subgrid_from_sources( sources, N, approx_subgrid.shape[0], [sg_off0, sg_off1], [sg_A0, sg_A1] ) return numpy.sqrt(numpy.average(numpy.abs(subgrid - approx_subgrid) ** 2))
[docs] def sum_and_finish_subgrid( distributedFFT, NMBF_NMBF_tasks, facets_config_list, subgrid_mask0, subgrid_mask1, ): """sum faect contribution and finsh subgrid""" # Initialise facet sum summed_facet = numpy.zeros( (distributedFFT.xM_size, distributedFFT.xM_size), dtype=complex ) for facets_config, NMBF_NMBF in zip(facets_config_list, NMBF_NMBF_tasks): summed_facet += distributedFFT.add_facet_contribution( distributedFFT.add_facet_contribution( NMBF_NMBF, facets_config.off0, axis=0 ), facets_config.off1, axis=1, ) # Finish if (subgrid_mask0 is not None) and (subgrid_mask1 is not None): if isinstance(subgrid_mask0, list) and isinstance(subgrid_mask1, list): subgrid_mask0 = make_mask_from_slice( subgrid_mask0[0], subgrid_mask0[1] ) subgrid_mask1 = make_mask_from_slice( subgrid_mask1[0], subgrid_mask1[1] ) approx_subgrid = distributedFFT.finish_subgrid( summed_facet, [subgrid_mask0, subgrid_mask1], ) else: approx_subgrid = distributedFFT.finish_subgrid( summed_facet, ) return approx_subgrid
[docs] def prepare_and_split_subgrid(distributedFFT, Fn, facets_config_list, subgrid): """prepare NAF_NAF""" # Prepare subgrid prepared_subgrid = distributedFFT.prepare_subgrid(subgrid) # Extract subgrid facet contributions NAF_AFs = { off0: distributedFFT.extract_subgrid_contrib_to_facet( prepared_subgrid, off0, Fn, axis=0 ) for off0 in set( facet_config.off0 for facet_config in facets_config_list ) } NAF_NAFs = [ distributedFFT.extract_subgrid_contrib_to_facet( NAF_AFs[facet_config.off0], facet_config.off1, Fn, axis=1 ) for facet_config in facets_config_list ] return NAF_NAFs
[docs] def accumulate_column( distributedFFT, NAF_NAF, NAF_MNAF, facet_m0_trunc, subgrid_off1 ): """update NAF_MNAF""" # TODO: add_subgrid_contribution should add # directly to NAF_MNAF here at some point. if NAF_MNAF is None: return distributedFFT.add_subgrid_contribution( NAF_NAF, subgrid_off1, facet_m0_trunc, axis=1, ) return NAF_MNAF + distributedFFT.add_subgrid_contribution( NAF_NAF, subgrid_off1, facet_m0_trunc, axis=1, )
[docs] def accumulate_facet( distributedFFT, NAF_MNAF, MNAF_BMNAF, Fb, facet_m0_trunc, facet_mask1, off0, ): """update MNAF_BMNAF""" if isinstance(facet_mask1, list): facet_mask1 = make_mask_from_slice(facet_mask1[0], facet_mask1[1]) NAF_BMNAF = distributedFFT.finish_facet(NAF_MNAF, facet_mask1, Fb, axis=1) if MNAF_BMNAF is None: return distributedFFT.add_subgrid_contribution( NAF_BMNAF, off0, facet_m0_trunc, axis=0 ) # TODO: add_subgrid_contribution should add # directly to NAF_MNAF here at some point. MNAF_BMNAF = MNAF_BMNAF + distributedFFT.add_subgrid_contribution( NAF_BMNAF, off0, facet_m0_trunc, axis=0 ) return MNAF_BMNAF
[docs] def finish_facet(distriFFT, MNAF_BMNAF, Fb, facet_mask0): """wrapper of finish_facet""" if MNAF_BMNAF is not None: if isinstance(facet_mask0, list): facet_mask0 = make_mask_from_slice(facet_mask0[0], facet_mask0[1]) approx_facet = distriFFT.finish_facet( MNAF_BMNAF, facet_mask0, Fb, axis=0, ) else: approx_facet = numpy.zeros( (distriFFT.yB_size, distriFFT.yB_size), dtype=complex ) return approx_facet
[docs] def extract_column( distriFFT, BF_F, Fn_task, Fb_task, facet_m0_trunc_task, subgrid_off0 ): """extract column task""" return distriFFT.prepare_facet( distriFFT.extract_facet_contrib_to_subgrid( BF_F, subgrid_off0, facet_m0_trunc_task, Fn_task, axis=0, ), Fb_task, axis=1, )
[docs] def make_full_cover_config(N, chunk_size, class_name): """Generate a fully covered config list :param N: N :param chunk_size: facet size or subgrid size :param class_name: SubgridConfig or FacetConfig :return: config list """ offsets = chunk_size * numpy.arange(int(numpy.ceil(N / chunk_size))) border = (offsets + numpy.hstack([offsets[1:], [N + offsets[0]]])) // 2 config_list = [] for idx0, off0 in enumerate(offsets): for idx1, off1 in enumerate(offsets): left0 = (border[idx0 - 1] - off0 + chunk_size // 2) % N right0 = border[idx0] - off0 + chunk_size // 2 left1 = (border[idx1 - 1] - off1 + chunk_size // 2) % N right1 = border[idx1] - off1 + chunk_size // 2 config_list.append( class_name( off0, off1, [[slice(left0, right0)], chunk_size], [[slice(left1, right1)], chunk_size], ) ) return config_list
[docs] def make_mask_from_slice(slice_list, mask_size): """make mask from sparse_mask result :param slice_list: slice list :param mask_size: mask size :return: mask """ mask = numpy.zeros((mask_size,)) for sl in slice_list: mask[sl] = 1 return mask