Source code for ska_sdp_func_python.sky_component.operations

"""
Functions to manage sky components operations.
"""

__all__ = [
    "apply_beam_to_skycomponent",
    "apply_voltage_pattern_to_skycomponent",
    "filter_skycomponents_by_flux",
    "find_nearest_skycomponent",
    "find_nearest_skycomponent_index",
    "find_separation_skycomponents",
    "find_skycomponents",
    "find_skycomponent_matches",
    "find_skycomponent_matches_atomic",
    "fit_skycomponent",
    "fit_skycomponent_spectral_index",
    "image_voronoi_iter",
    "insert_skycomponent",
    "partition_skycomponent_neighbours",
    "remove_neighbouring_components",
    "select_components_by_separation",
    "select_neighbouring_components",
    "voronoi_decomposition",
]

import collections
import copy
import logging
import warnings
from itertools import compress
from typing import List, Union

import astropy.units as u
import numpy
from astropy.convolution import Gaussian2DKernel, convolve
from astropy.coordinates import SkyCoord, match_coordinates_sky
from astropy.modeling import fitting, models
from astropy.stats import gaussian_fwhm_to_sigma
from astropy.wcs.utils import pixel_to_skycoord, skycoord_to_pixel
from photutils import segmentation
from scipy import interpolate
from scipy.optimize import minpack
from scipy.spatial import Voronoi  # pylint: disable=no-name-in-module
from ska_sdp_datamodels.image.image_model import Image
from ska_sdp_datamodels.science_data_model.polarisation_functions import (
    convert_pol_frame,
)
from ska_sdp_datamodels.science_data_model.polarisation_model import (
    PolarisationFrame,
)
from ska_sdp_datamodels.sky_model.sky_model import SkyComponent

from ska_sdp_func_python.calibration.jones import apply_jones
from ska_sdp_func_python.image.operations import convert_clean_beam_to_pixels
from ska_sdp_func_python.util.array_functions import (
    insert_array,
    insert_function_L,
    insert_function_pswf,
    insert_function_sinc,
)

log = logging.getLogger("func-python-logger")


[docs] def find_nearest_skycomponent_index(home, comps) -> int: """Find the nearest component in a list to a given direction (home). :param home: Home direction :param comps: List of SkyComponents :return: Index of best in comps """ if len(comps) == 0: raise ValueError("find_nearest_skycomponent_index: Catalog is empty") catalog = SkyCoord( ra=[c.direction.ra for c in comps], dec=[c.direction.dec for c in comps], ) idx, _, _ = match_coordinates_sky(home, catalog) return idx
[docs] def find_nearest_skycomponent(home: SkyCoord, comps) -> (SkyComponent, float): """Find the nearest component to a given direction. :param home: Home direction :param comps: List of SkyComponents :return: Index of the nearest SkyComponent """ best_index = find_nearest_skycomponent_index(home, comps) best = comps[best_index] return best, best.direction.separation(home).rad
[docs] def find_separation_skycomponents(comps_test, comps_ref=None): """Find the matrix of separations for two lists of components. :param comps_test: List of SkyComponents to be tested :param comps_ref: List of SkyComponents to compare with, If None then set to comps_test :return: Distance matrix """ if comps_ref is None: ncomps = len(comps_test) distances = numpy.zeros([ncomps, ncomps]) for i in range(ncomps): for j in range(i + 1, ncomps): distances[i, j] = ( comps_test[i] .direction.separation(comps_test[j].direction) .rad ) distances[j, i] = distances[i, j] return distances ncomps_ref = len(comps_ref) ncomps_test = len(comps_test) separations = numpy.zeros([ncomps_ref, ncomps_test]) for ref in range(ncomps_ref): for test in range(ncomps_test): separations[ref, test] = ( comps_test[test] .direction.separation(comps_ref[ref].direction) .rad ) return separations
[docs] def find_skycomponent_matches_atomic(comps_test, comps_ref, tol=1e-7): """ Match a list of candidates to a reference set of SkyComponents. find_skycomponent_matches is faster since it uses the astropy catalog matching. Many to one is allowed. :param comps_test: SkyComponents to test :param comps_ref: reference SkyComponents :param tol: Tolerance in rad :return: List of matched SkyComponents """ separations = find_separation_skycomponents(comps_test, comps_ref) matches = [] for test, _ in enumerate(comps_test): best = numpy.argmin(separations[:, test]) best_sep = separations[best, test] if best_sep < tol: matches.append((test, best, best_sep)) assert len(matches) <= len(comps_test) return matches
[docs] def find_skycomponent_matches(comps_test, comps_ref, tol=1e-7): """Match a list of candidates to a reference set of SkyComponents. Many to one is allowed. :param comps_test: SkyComponents to test :param comps_ref: Reference SkyComponents :param tol: Tolerance in rad :return: List of matched SkyComponents """ catalog_test = SkyCoord( ra=[c.direction.ra for c in comps_test], dec=[c.direction.dec for c in comps_test], ) catalog_ref = SkyCoord( ra=[c.direction.ra for c in comps_ref], dec=[c.direction.dec for c in comps_ref], ) idx, dist2d, _ = match_coordinates_sky(catalog_test, catalog_ref) matches = [] for test, _ in enumerate(comps_test): best = idx[test] best_sep = dist2d[test].rad if best_sep < tol: matches.append((test, best, best_sep)) return matches
[docs] def select_components_by_separation( home, comps, rmax=2 * numpy.pi, rmin=0.0 ) -> [SkyComponent]: """ Select components with a range in separation. :param home: Home direction :param comps: List of SkyComponents :param rmin: Minimum range :param rmax: Maximum range :return: Selected SkyComponents """ selected = [] for comp in comps: comp_sep = comp.direction.separation(home).rad if rmin <= comp_sep <= rmax: selected.append(comp) return selected
[docs] def select_neighbouring_components(comps, target_comps): """ Assign components to nearest in the target. :param comps: List of SkyComponents :param target_comps: Target SkyComponents :return: Indices of components in target_comps and the separations """ target_catalog = SkyCoord( [c.direction.ra.rad for c in target_comps] * u.rad, [c.direction.dec.rad for c in target_comps] * u.rad, ) all_catalog = SkyCoord( [c.direction.ra.rad for c in comps] * u.rad, [c.direction.dec.rad for c in comps] * u.rad, ) idx, d2d, _ = match_coordinates_sky(all_catalog, target_catalog) return idx, d2d
[docs] def remove_neighbouring_components(comps, distance): """ Remove the faintest of a pair of components that are within a specified distance. :param comps: List of SkyComponents :param distance: Minimum distance :return: Indices of components in target_comps, selected components """ ncomps = len(comps) ok = ncomps * [True] for i in range(ncomps): if ok[i]: for j in range(i + 1, ncomps): if ok[j]: d = comps[i].direction.separation(comps[j].direction).rad if d < distance: if numpy.max(comps[i].flux) > numpy.max(comps[j].flux): ok[j] = False else: ok[i] = False break idx = list(compress(list(range(ncomps)), ok)) comps_sel = list(compress(comps, ok)) return idx, comps_sel
[docs] def find_skycomponents( im: Image, fwhm=1.0, threshold=1.0, npixels=5 ) -> List[SkyComponent]: """Find gaussian components in Image above a certain threshold as SkyComponent. :param im: Image to be searched :param fwhm: Full width half maximum of gaussian in pixels :param threshold: Threshold for component detection. Default: 1 Jy. :param npixels: Number of connected pixels required :return: List of SkyComponents """ log.debug( "find_skycomponents: Finding components in Image by segmentation" ) # We use photutils segmentation - this first segments the image # into pieces that are thought to contain individual sources, then # identifies the concrete source properties. Having these two # steps makes it straightforward to extract polarisation and # spectral information. # Make filter kernel sigma = fwhm * gaussian_fwhm_to_sigma kernel_size = int(1.5 * fwhm) if kernel_size % 2 == 0: kernel_size += 1 kernel = Gaussian2DKernel(sigma, x_size=kernel_size, y_size=kernel_size) kernel.normalize() # Segment the average over all channels of Stokes I image_sum = numpy.sum(im["pixels"].data, axis=0)[0, ...] / float( im["pixels"].data.shape[0] ) image_sum = convolve(image_sum, kernel) segments = segmentation.detect_sources( image_sum, threshold, npixels=npixels ) if segments is None: raise ValueError("find_skycomponents: Failed to find any components") log.info("find_skycomponents: Identified %d segments", segments.nlabels) comp_catalog = [ [ segmentation.SourceCatalog( im["pixels"].data[chan, pol], convolved_data=convolve(im["pixels"].data[chan, pol], kernel), segment_img=segments, ) for pol in [0] ] for chan in range(im.image_acc.nchan) ] def comp_prop(comp, prop_name): return [ [getattr(comp_catalog[chan][pol][comp], prop_name) for pol in [0]] for chan in range(im.image_acc.nchan) ] # Generate components comps = [] for segment in range(segments.nlabels): # Get flux and position. Astropy's quantities make this # unnecessarily complicated. flux = numpy.array(comp_prop(segment, "max_value")) xs = u.Quantity( list(map(u.Quantity, comp_prop(segment, "maxval_xindex"))) ) ys = u.Quantity( list(map(u.Quantity, comp_prop(segment, "maxval_yindex"))) ) sc = pixel_to_skycoord(xs, ys, im.image_acc.wcs, 0) ras = sc.ra decs = sc.dec # Determine "true" position by weighting aflux = numpy.abs(flux) flux_sum = numpy.sum(aflux) ra = numpy.sum(aflux * ras) / flux_sum dec = numpy.sum(aflux * decs) / flux_sum xs = numpy.sum(aflux * xs) / flux_sum ys = numpy.sum(aflux * ys) / flux_sum # pylint: disable=no-member point_flux = im["pixels"].data[ :, :, numpy.round(ys.value).astype("int"), numpy.round(xs.value).astype("int"), ] # Add component comps.append( SkyComponent( direction=SkyCoord(ra=ra, dec=dec), frequency=im.frequency, name=f"Segment {segment}", flux=point_flux, shape="Point", polarisation_frame=im.image_acc.polarisation_frame, params={}, ) ) return comps
[docs] def apply_beam_to_skycomponent( sc: Union[SkyComponent, List[SkyComponent]], beam: Image, phasecentre=None, inverse=False, ) -> Union[SkyComponent, List[SkyComponent]]: """Apply a primary beam to a SkyComponent. if inverse==True, do an inverse where we subtract the primary beam from the skycomponents. if inverse==False, do a multiplication of beam and skycomponent fluxes. :param sc: SkyComponent or list of SkyComponents :param beam: Primary beam (Image) :param phasecentre: Phase Centre of beam (astropy.SkyCoord) :param inverse: do multiplication or subtraction of fluxes (default false) :return: List of SkyComponents """ single = not isinstance(sc, collections.abc.Iterable) if single: sc = [sc] ny = beam["pixels"].data.shape[2] nx = beam["pixels"].data.shape[3] log.debug("apply_beam_to_skycomponent: Processing %d components", len(sc)) ras = [comp.direction.ra.radian for comp in sc] decs = [comp.direction.dec.radian for comp in sc] skycoords = SkyCoord(ras * u.rad, decs * u.rad, frame="icrs") if beam.image_acc.wcs.wcs.ctype[0] == "RA---SIN": pixlocs = skycoord_to_pixel( skycoords, beam.image_acc.wcs, origin=1, mode="wcs" ) else: wcs = copy.deepcopy(beam.image_acc.wcs) wcs.wcs.ctype[0] = "RA---SIN" wcs.wcs.ctype[1] = "DEC--SIN" wcs.wcs.crval[0] = phasecentre.ra.deg wcs.wcs.crval[1] = phasecentre.dec.deg pixlocs = skycoord_to_pixel(skycoords, wcs, origin=1, mode="wcs") newsc = [] total_flux = numpy.zeros_like(sc[0].flux) for icomp, comp in enumerate(sc): assert comp.shape == "Point", f"Cannot handle shape {comp.shape}" pixloc = (pixlocs[0][icomp], pixlocs[1][icomp]) if not numpy.isnan(pixloc).any(): x, y = int(round(float(pixloc[0]))), int(round(float(pixloc[1]))) if 0 <= x < nx and 0 <= y < ny: if inverse and (beam["pixels"].data[:, :, y, x] != 0.0).all(): comp_flux = comp.flux / beam["pixels"].data[:, :, y, x] else: comp_flux = comp.flux * beam["pixels"].data[:, :, y, x] total_flux += comp_flux else: comp_flux = 0.0 * comp.flux newsc.append( SkyComponent( comp.direction, comp.frequency, comp.name, comp_flux, shape=comp.shape, polarisation_frame=comp.polarisation_frame, ) ) log.debug( "apply_beam_to_skycomponent: %d components with total flux %s", len(newsc), total_flux, ) if single: return newsc[0] return newsc
[docs] def apply_voltage_pattern_to_skycomponent( sc: Union[SkyComponent, List[SkyComponent]], vp: Image, inverse=False, phasecentre=None, ) -> Union[SkyComponent, List[SkyComponent]]: """Apply a voltage pattern to a SkyComponent. For inverse==False, input polarisation_frame must be stokesIQUV, and output polarisation_frame is same as voltage pattern. For inverse==True, input polarisation_frame must be same as voltage pattern, and output polarisation_frame is "stokesIQUV". Requires a complex Image with the correct ordering of polarisation axes: e.g. RR, LL, RL, LR or XX, YY, XY, YX. :param sc: SkyComponent or list of SkyComponents :param vp: voltage pattern as complex image :param inverse: input and output polarisation frame (default False) :param phasecentre: Phasecentre (Skycoord) :return: List of SkyComponents """ assert ( vp.image_acc.polarisation_frame == PolarisationFrame("linear") ) or (vp.image_acc.polarisation_frame == PolarisationFrame("circular")) # assert vp["pixels"].data.dtype == "complex128" single = not isinstance(sc, collections.abc.Iterable) if single: sc = [sc] nchan, npol, ny, nx = vp["pixels"].data.shape log.debug("apply_vp_to_skycomponent: Processing %d components", len(sc)) ras = [comp.direction.ra.radian for comp in sc] decs = [comp.direction.dec.radian for comp in sc] skycoords = SkyCoord(ras * u.rad, decs * u.rad, frame="icrs") if vp.image_acc.wcs.wcs.ctype[0] == "RA---SIN": pixlocs = skycoord_to_pixel( skycoords, vp.image_acc.wcs, origin=1, mode="wcs" ) else: assert phasecentre is not None, "Need to know the phasecentre" wcs = copy.deepcopy(vp.image_acc.wcs) wcs.wcs.ctype[0] = "RA---SIN" wcs.wcs.ctype[1] = "DEC--SIN" wcs.wcs.crval[0] = phasecentre.ra.deg wcs.wcs.crval[1] = phasecentre.dec.deg pixlocs = skycoord_to_pixel(skycoords, wcs, origin=1, mode="wcs") newsc = [] total_flux = numpy.zeros([nchan, npol], dtype="complex") for icomp, comp in enumerate(sc): assert comp.shape == "Point", f"Cannot handle shape {comp.shape}" # Convert to linear (xx, xy, yx, yy) or circular (rr, rl, lr, ll) nchan, npol = comp.flux.shape assert npol == 4 if not inverse: assert comp.polarisation_frame == PolarisationFrame("stokesIQUV") comp_flux_cstokes = convert_pol_frame( comp.flux, comp.polarisation_frame, vp.image_acc.polarisation_frame ).reshape([nchan, 2, 2]) comp_flux = numpy.zeros([nchan, npol], dtype="complex") pixloc = (pixlocs[0][icomp], pixlocs[1][icomp]) if not numpy.isnan(pixloc).any(): x, y = int(round(float(pixloc[0]))), int(round(float(pixloc[1]))) if 0 <= x < nx and 0 <= y < ny: # Now we want to left and right multiply by the Jones matrices # comp_flux = vp["pixels"].data[:, :, y, x] * comp_flux_cstokes # * numpy.vp["pixels"].data[:, :, y, x] for chan in range(nchan): ej = vp["pixels"].data[chan, :, y, x].reshape([2, 2]) cfs = comp_flux_cstokes[chan].reshape([2, 2]) comp_flux[chan, :] = apply_jones(ej, cfs, inverse).reshape( [4] ) total_flux += comp_flux if inverse: comp_flux = convert_pol_frame( comp_flux, vp.image_acc.polarisation_frame, PolarisationFrame("stokesIQUV"), ) comp.polarisation_frame = PolarisationFrame("stokesIQUV") newsc.append( SkyComponent( comp.direction, comp.frequency, comp.name, comp_flux, shape=comp.shape, polarisation_frame=vp.image_acc.polarisation_frame, ) ) log.debug( "apply_vp_to_skycomponent: %d components with total flux %s", len(newsc), total_flux, ) if single: return newsc[0] return newsc
[docs] def filter_skycomponents_by_flux(sc, flux_min=-numpy.inf, flux_max=numpy.inf): """Filter sky components by stokes I flux. :param sc: List of SkyComponents :param flux_min: Minimum I flux :param flux_max: Maximum I flux :return: Filtered list of SkyComponents """ newcomps = [] for comp in sc: if (numpy.max(comp.flux[:, 0]) > flux_min) and ( numpy.max(comp.flux[:, 0]) < flux_max ): newcomps.append(comp) return newcomps
[docs] def insert_skycomponent( im: Image, sc: Union[SkyComponent, List[SkyComponent]], insert_method="Nearest", bandwidth=1.0, support=8, ) -> Image: """Insert a SkyComponent into an Image. :param im: Image :param sc: SkyComponent or list of SkyComponents :param insert_method: 'Nearest' | 'Sinc' | 'Lanczos' | 'PSWF' :param bandwidth: Fractional of uv plane to optimise over (1.0) :param support: Support of kernel (7) :return: Image """ support = int(support / bandwidth) nchan, npol, ny, nx = im["pixels"].data.shape if not isinstance(sc, collections.abc.Iterable): sc = [sc] log.debug("insert_skycomponent: Using insert method %s", insert_method) image_frequency = im.frequency.data ras = [comp.direction.ra.radian for comp in sc] decs = [comp.direction.dec.radian for comp in sc] skycoords = SkyCoord(ras * u.rad, decs * u.rad, frame="icrs") pixlocs = skycoord_to_pixel( skycoords, im.image_acc.wcs, origin=0, mode="wcs" ) insert_method_map = { "Lanczos": insert_function_L, "Sinc": insert_function_sinc, "PSWF": insert_function_pswf, } nbad = 0 for icomp, comp in enumerate(sc): if not comp.shape == "Point": raise ValueError(f"Cannot handle shape {comp.shape}") pixloc = (pixlocs[0][icomp], pixlocs[1][icomp]) flux = numpy.zeros([nchan, npol]) if comp.flux.shape[0] > 1: for pol in range(npol): fint = interpolate.interp1d( comp.frequency.data, comp.flux[:, pol], kind="cubic" ) flux[:, pol] = fint(image_frequency) else: flux = comp.flux try: insert_array( im["pixels"].data, pixloc[0], pixloc[1], flux, bandwidth, support, insert_function=insert_method_map[insert_method], ) except KeyError: # this is for insert_method = "Nearest" y, x = ( numpy.round(pixloc[1]).astype("int"), numpy.round(pixloc[0]).astype("int"), ) if 0 <= x < nx and 0 <= y < ny: im["pixels"].data[:, :, y, x] += flux[...] else: nbad += 1 if nbad > 0: log.warning( "insert_skycomponent: %s components of %s do not fit on image", nbad, len(sc), ) return im
def restore_skycomponent( im: Image, sc: Union[SkyComponent, List[SkyComponent]], clean_beam=None, ) -> Image: """Restore a SkyComponent into an image :param im: Image :param sc: SkyComponent or list of SkyComponents :param clean_beam: dict e.g. {"bmaj":0.1, "bmin":0.05, "bpa":-60.0}. Units are deg, deg, deg :return: Image """ nchan = im["pixels"].data.shape[0] npol = im["pixels"].data.shape[1] if not isinstance(sc, collections.abc.Iterable): sc = [sc] image_frequency = im.frequency.data ras = [comp.direction.ra.radian for comp in sc] decs = [comp.direction.dec.radian for comp in sc] skycoords = SkyCoord(ras * u.rad, decs * u.rad, frame="icrs") pixlocs = skycoord_to_pixel( skycoords, im.image_acc.wcs, origin=0, mode="wcs" ) beam_pixels = convert_clean_beam_to_pixels(im, clean_beam) for icomp, comp in enumerate(sc): if comp.shape != "Point": raise ValueError( f"restore_skycomponent: Cannot handle shape {comp.shape}" ) pixloc = (pixlocs[0][icomp], pixlocs[1][icomp]) flux = numpy.zeros([nchan, npol]) if ( len(comp.frequency.data) == len(image_frequency) and numpy.max(numpy.abs(comp.frequency.data - image_frequency)) < 1e-7 ): flux = comp.flux elif comp.flux.shape[0] > 1: for pol in range(npol): fint = interpolate.interp1d( comp.frequency.data, comp.flux[:, pol], kind="cubic" ) flux[:, pol] = fint(image_frequency) else: flux = comp.flux gaussian = models.Gaussian2D( amplitude=1.0, x_mean=pixloc[0], y_mean=pixloc[1], x_stddev=beam_pixels[0], y_stddev=beam_pixels[1], theta=beam_pixels[2], ) xi, yi = numpy.indices(im["pixels"].data.shape[-2:]) im["pixels"].data[...] += ( flux[..., numpy.newaxis, numpy.newaxis] * gaussian(yi, xi)[numpy.newaxis, numpy.newaxis, ...] ) im.attrs["clean_beam"] = clean_beam return im
[docs] def voronoi_decomposition(im, comps): """Construct a Voronoi decomposition of a set of components. The array return contains the index into the scipy.spatial.qhull.Voronoi structure. :param im: Image :param comps: List of SkyComponents :return: Voronoi structure, vertex Image """ def voronoi_vertex(vy, vx, vertex_y, vertex_x): """Return the nearest Voronoi vertex. :param vy: Voronoi y index :param vx: Voronoi x index :param vertex_y: Vertex y index :param vertex_x: Vertex x index :return: """ return numpy.argmin(numpy.hypot(vy - vertex_y, vx - vertex_x)) directions = SkyCoord( [u.rad * c.direction.ra.rad for c in comps], [u.rad * c.direction.dec.rad for c in comps], ) x, y = skycoord_to_pixel(directions, im.image_acc.wcs, 1, "wcs") points = [(x_elem, y[i]) for i, x_elem in enumerate(x)] vor = Voronoi(points) ny = im["pixels"].data.shape[2] nx = im["pixels"].data.shape[3] vertex_image = numpy.zeros([ny, nx]).astype("int") for j in range(ny): for i in range(nx): vertex_image[j, i] = voronoi_vertex( j, i, vor.points[:, 1], vor.points[:, 0] ) return vor, vertex_image
[docs] def image_voronoi_iter( im: Image, components: list ) -> collections.abc.Iterable: """Iterate through Voronoi decomposition, returning a generator yielding fullsize images. :param im: Image :param components: Components to define Voronoi decomposition :returns: Generator of Images """ if len(components) == 1: mask = numpy.ones(im["pixels"].data.shape) # need to pass data here yield Image.constructor( data=mask, polarisation_frame=im.image_acc.polarisation_frame, wcs=im.image_acc.wcs, ) else: _, vertex_array = voronoi_decomposition(im, components) nregions = numpy.max(vertex_array) + 1 for region in range(nregions): mask = numpy.zeros(im["pixels"].data.shape) mask[:, :, (vertex_array == region)] = 1.0 yield Image.constructor( data=mask, polarisation_frame=im.image_acc.polarisation_frame, wcs=im.image_acc.wcs, )
[docs] def partition_skycomponent_neighbours(comps, targets): """Partition sky components by nearest target source. :param comps: List of SkyComponents :param targets: List of targets :return: Partitioned SkyComponents """ idx, _ = select_neighbouring_components(comps, targets) comps_lists = [] for comp_id in numpy.unique(idx): selected_comps = list(compress(comps, idx == comp_id)) comps_lists.append(selected_comps) return comps_lists
[docs] def fit_skycomponent(im: Image, sc: SkyComponent, **kwargs): """Fit a two-dimensional Gaussian skycomponent using astropy.modeling. :params im: Input Image :params sc: Single SkyComponent :return: SkyComponent after fitting """ pixloc = numpy.round( skycoord_to_pixel(sc.direction, im.image_acc.wcs, origin=0) ).astype("int") sl_y = slice(pixloc[1] - 7, pixloc[1] + 8) sl_x = slice(pixloc[0] - 7, pixloc[0] + 8) y, x = numpy.mgrid[sl_y, sl_x] z = im["pixels"].data[0, 0, sl_y, sl_x] image_shape = im["pixels"].data[0, 0].shape # isotropic at the moment! newsc = sc.copy() try: p_init = models.Gaussian2D( amplitude=numpy.max(z), x_mean=numpy.mean(x), y_mean=numpy.mean(y) ) fit_p = fitting.LevMarLSQFitter() with warnings.catch_warnings(): warnings.simplefilter("ignore") fit = fit_p(p_init, x, y, z) # Now fill in the new skycomponent values newsc.direction = pixel_to_skycoord( fit.x_mean, fit.y_mean, im.image_acc.wcs, 0 ) iy = round(fit.y_mean.value) ix = round(fit.x_mean.value) # We could fit each frequency separately. For the moment, we just scale if 0 <= iy < image_shape[0] and 0 <= ix < image_shape[1]: newsc.flux = im["pixels"].data[:, :, iy, ix] try: force_point_sources = kwargs["force_point_sources"] except KeyError: log.info( "fit_skycomponent: force_point_sources not give, " "setting as default: True" ) force_point_sources = True if force_point_sources or (fit.x_fwhm <= 0.0 or fit.y_fwhm <= 0.0): newsc.shape = "Point" else: newsc.shape = "Gaussian" # cellsize in radians cellsize = numpy.abs((im["x"][0].data - im["x"][-1].data)) / len( im["x"] ) gaussian_pixels = (fit.x_fwhm, fit.y_fwhm, fit.theta) if gaussian_pixels[1] > gaussian_pixels[0]: clean_gaussian = { "bmaj": numpy.rad2deg(gaussian_pixels[1] * cellsize), "bmin": numpy.rad2deg(gaussian_pixels[0] * cellsize), "bpa": numpy.rad2deg(gaussian_pixels[2]), } else: clean_gaussian = { "bmaj": numpy.rad2deg(gaussian_pixels[0] * cellsize), "bmin": numpy.rad2deg(gaussian_pixels[1] * cellsize), "bpa": numpy.rad2deg(gaussian_pixels[2]) + 90.0, } newsc.shape = "Gaussian" newsc.params = clean_gaussian except (minpack.error, ValueError) as err: log.warning("fit_skycomponent: fit failed %s", err) return sc return newsc
[docs] def fit_skycomponent_spectral_index(sc: SkyComponent): """ Fit the spectral index for a multi frequency SkyComponent. :param sc: SkyComponent :return: Spectral index (float) """ nchan = sc.frequency.shape[0] if nchan <= 1: log.warning("Single frequency skycomponent, skip fitting") spec_indx = 0.0 else: centre = nchan // 2 if sc.frequency[centre] > 0.0 and sc.flux[centre, 0] > 0.0: xdata = numpy.log10(sc.frequency / sc.frequency[centre]) ydata = numpy.log10(sc.flux[:, 0] / sc.flux[centre, 0]) out = numpy.polyfit(xdata, ydata, 1) spec_indx = out[0] else: log.warning("Wrong values encountered, no fitting performed.") spec_indx = 0.0 return spec_indx