Source code for rascil.processing_components.skycomponent.plot_skycomponent

"""
Functions to manage plotting skycomponents in comparisons.
"""

__all__ = [
    "plot_skycomponents_positions",
    "plot_skycomponents_position_distance",
    "plot_skycomponents_flux",
    "plot_skycomponents_flux_ratio",
    "plot_skycomponents_flux_histogram",
    "plot_skycomponents_position_quiver",
    "plot_gaussian_beam_position",
    "plot_multifreq_spectral_index",
]

import logging

import astropy.units as u
import matplotlib.pyplot as plt
import numpy
from ska_sdp_func_python.sky_component import (
    find_skycomponent_matches,
    fit_skycomponent,
    fit_skycomponent_spectral_index,
)

log = logging.getLogger("rascil-logger")


[docs] def plot_skycomponents_positions( comps_test, comps_ref=None, img_size=1.0, plot_file=None, tol=1e-5, plot_error=True, **kwargs, ): """ Generate position scatter plot for two lists of skycomponents :param comps_test: List of components to be tested :param img_size: Cell size per pixel in the image to compare :param comps_ref: List of reference components :param plot_file: Filename of the plot :param tol: Tolerance in rad :param plot_error: If True, plot error, else just plot absolute values :return: [ra_error, dec_error]: The error array for users to check """ # This wraps the angles larger than 180 degrees to (-180, 180] angle_wrap = 180.0 * u.deg # If the angle values cross over -180 OR 180 degrees, don't apply wrapping # Input a list of astropy Angle object def unwrap_around_180(angles): angles_degree = [angle.degree for angle in angles] if ( max(angles_degree) > 180.0 and max(angles_degree) < 270.0 and min(angles_degree) < 180.0 and min(angles_degree) > 90.0 ) or ( max(angles_degree) > -180.0 and max(angles_degree) < -90.0 and min(angles_degree) < -180.0 and min(angles_degree) > -270.0 ): return numpy.array(angles_degree) else: log.info("Wrap angles to (-180, 180]") wrap = [angle.wrap_at(angle_wrap).degree for angle in angles] return numpy.array(wrap) if comps_ref is None: # No comparison needed ra_test = [comp.direction.ra for comp in comps_test] dec_test = [comp.direction.dec.degree for comp in comps_test] ra_test = unwrap_around_180(ra_test) plt.plot(ra_test, dec_test, "o", color="b", markersize=5, label="Components") else: matches = find_skycomponent_matches(comps_test, comps_ref, tol) ra_test = [None] * len(matches) dec_test = numpy.zeros(len(matches)) ra_ref = [None] * len(matches) dec_ref = numpy.zeros(len(matches)) ra_error = numpy.zeros(len(matches)) dec_error = numpy.zeros(len(matches)) for i, match in enumerate(matches): m_comp = comps_test[match[0]] ra_test[i] = m_comp.direction.ra dec_test[i] = m_comp.direction.dec.degree m_ref = comps_ref[match[1]] ra_ref[i] = m_ref.direction.ra dec_ref[i] = m_ref.direction.dec.degree if img_size > 0.0: ra_error[i] = ( ( m_comp.direction.ra.wrap_at(angle_wrap).degree - m_ref.direction.ra.wrap_at(angle_wrap).degree ) * numpy.cos(m_ref.direction.dec.rad) / img_size ) dec_error[i] = ( m_comp.direction.dec.degree - m_ref.direction.dec.degree ) / img_size else: log.info("Wrong input image resolution. Plot absolute values instead.") ra_error[i] = ( m_comp.direction.ra.wrap_at(angle_wrap).degree - m_ref.direction.ra.wrap_at(angle_wrap).degree ) * numpy.cos(m_ref.direction.dec.rad) dec_error[i] = m_comp.direction.dec.degree - m_ref.direction.dec.degree ra_test = unwrap_around_180(ra_test) ra_ref = unwrap_around_180(ra_ref) ax = plt.gca() ax.set_aspect(1.0) ax.plot( ra_test, dec_test, "o", color="b", markersize=5, label="Tested components" ) ax = plt.gca() ax.set_aspect(1.0) ax.plot( ra_ref, dec_ref, "x", color="r", markersize=8, alpha=0.5, label="Original components", ) plt.title("Positions of sources") plt.xlabel("RA (deg)") plt.ylabel("Dec (deg)") plt.legend(loc="best") if plot_file is not None: plt.savefig(plot_file + "_position_value.png") plt.show(block=False) plt.clf() if plot_error is True: if comps_ref is None: log.info("Error: No reference components. No position errors are plotted.") else: ax = plt.gca() ax.set_aspect(1.0) ax.plot(ra_error, dec_error, "o", markersize=5, alpha=0.5) err_r = max(numpy.max(ra_error), numpy.max(dec_error)) err_l = min(numpy.min(ra_error), numpy.min(dec_error)) plt.xlim([err_l, err_r]) plt.ylim([err_l, err_r]) plt.xlabel(r"$\Delta\ RA * cos(Dec) / \Delta x$") plt.ylabel(r"$\Delta\ Dec/ \Delta x$") plt.title("Errors in RA and Dec") if plot_file is not None: plt.savefig(plot_file + "_position_error.png") plt.show(block=False) plt.clf() return [ra_test, dec_test]
[docs] def plot_skycomponents_position_distance( comps_test, comps_ref, phasecentre, img_size, plot_file=None, tol=1e-5, **kwargs ): """Generate position error plot vs distance for two lists of skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param plot_file: Filename of the plot :param tol: Tolerance in rad :param phasecentre: Centre of image in SkyCoords :param img_size: Cell size per pixel in the image to compare :return: [ra_error, dec_error]: The error array for users to check """ angle_wrap = 180.0 * u.deg matches = find_skycomponent_matches(comps_test, comps_ref, tol) ra_error = numpy.zeros(len(matches)) dec_error = numpy.zeros(len(matches)) dist = numpy.zeros(len(matches)) for i, match in enumerate(matches): m_comp = comps_test[match[0]] m_ref = comps_ref[match[1]] if img_size > 0.0: ra_error[i] = ( ( m_comp.direction.ra.wrap_at(angle_wrap).degree - m_ref.direction.ra.wrap_at(angle_wrap).degree ) * numpy.cos(m_ref.direction.dec.rad) / img_size ) dec_error[i] = ( m_comp.direction.dec.degree - m_ref.direction.dec.degree ) / img_size dist[i] = m_comp.direction.separation(phasecentre).degree else: log.info("Wrong input image resolution. Plot absolute values instead.") ra_error[i] = ( m_comp.direction.ra.wrap_at(angle_wrap).degree - m_ref.direction.ra.wrap_at(angle_wrap).degree ) * numpy.cos(m_ref.direction.dec.rad) dec_error[i] = m_comp.direction.dec.degree - m_ref.direction.dec.degree dist[i] = m_comp.direction.separation(phasecentre).degree err_r = max(numpy.max(ra_error), numpy.max(dec_error)) err_l = min(numpy.min(ra_error), numpy.min(dec_error)) fig, (ax1, ax2) = plt.subplots(2, sharex=True) fig.suptitle("Position error vs. Distance") ax1.plot(dist, ra_error, "o", color="b", markersize=5, alpha=0.5) ax2.plot(dist, dec_error, "o", color="b", markersize=5, alpha=0.5) ax1.set_ylabel(r"$\Delta\ RA/ \Delta x$") ax2.set_ylabel(r"$\Delta\ Dec/ \Delta x$") ax2.set_xlabel(r"Separation To Center (deg)") ax1.set_ylim([err_l, err_r]) ax2.set_ylim([err_l, err_r]) if plot_file is not None: plt.savefig(plot_file + "_position_distance.png") plt.show(block=False) plt.clf() return [ra_error, dec_error]
[docs] def plot_skycomponents_flux( comps_test, comps_ref, plot_file=None, tol=1e-5, refchan=None, **kwargs ): """Generate flux scatter plot for two lists of skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param plot_file: Filename of the plot :param tol: Tolerance in rad :param refchan: Reference channel for comparison, default is centre channel :return: [flux_in, flux_out]: The flux array for users to check """ matches = find_skycomponent_matches(comps_test, comps_ref, tol) flux_in = numpy.zeros(len(matches)) flux_out = numpy.zeros(len(matches)) for i, match in enumerate(matches): m_comp = comps_test[match[0]] m_ref = comps_ref[match[1]] # Take the first polarisation if refchan is None: nchan, _ = m_ref.flux.shape flux_in[i] = m_ref.flux[nchan // 2, 0] flux_out[i] = m_comp.flux[nchan // 2, 0] else: flux_in[i] = m_ref.flux[refchan, 0] flux_out[i] = m_comp.flux[refchan, 0] plt.loglog(flux_in, flux_out, "o", color="b", markersize=5, alpha=0.5) plt.title("Flux in vs. flux out") plt.xlabel("Flux in (Jy)") plt.ylabel("Flux out (Jy)") if plot_file is not None: plt.savefig(plot_file + "_flux_value.png") plt.show(block=False) plt.clf() return [flux_in, flux_out]
[docs] def plot_skycomponents_flux_ratio( comps_test, comps_ref, phasecentre, plot_file=None, tol=1e-5, refchan=None, max_ratio=2, **kwargs, ): """Generate flux ratio plot vs distance for two lists of skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param plot_file: Filename of the plot :param tol: Tolerance in rad :param phasecentre: Centre of image in SkyCoords :param refchan: Reference channel for comparison, default is centre channel :param max_ratio: Maximum ratio to plot (default is 2.0) :return: [dist, flux_ratio]: The flux array for users to check """ angle_wrap = 180.0 * u.deg matches = find_skycomponent_matches(comps_test, comps_ref, tol) flux_ratio = [] dist = [] ra = [] dec = [] for i, match in enumerate(matches): m_comp = comps_test[match[0]] m_ref = comps_ref[match[1]] # Take the first polarisation if refchan is None: nchan, _ = m_ref.flux.shape if m_ref.flux[nchan // 2, 0] > 0.0: fr = m_comp.flux[nchan // 2, 0] / m_ref.flux[nchan // 2, 0] else: if m_ref.flux[refchan, 0] > 0.0: fr = m_comp.flux[refchan, 0] / m_ref.flux[refchan, 0] if fr > 0.0 and fr < max_ratio: flux_ratio.append(fr) dist.append(m_comp.direction.separation(phasecentre).degree) ra.append(m_comp.direction.ra.wrap_at(angle_wrap).degree) dec.append(m_comp.direction.dec.degree) if len(dist) == 0: raise ValueError("No valid points found for flux ratio plot") plt.plot(dist, flux_ratio, "o", color="b", markersize=5, alpha=0.5) plt.title("Flux ratio vs. distance") plt.xlabel("Distance to center (Deg)") plt.ylabel("Flux Ratio (Out/In)") if plot_file is not None: plt.savefig(plot_file + "_flux_ratio.png") plt.show(block=False) plt.clf() # Flux ratio vs. RA & Dec fig, (ax1, ax2) = plt.subplots(1, 2, sharey=True) fig.suptitle("Flux ratio vs. Position") ax1.plot(ra, flux_ratio, "o", color="b", markersize=5, alpha=0.5) ax2.plot(dec, flux_ratio, "o", color="b", markersize=5, alpha=0.5) ax1.set_xlabel("RA (deg)") ax2.set_xlabel("Dec (deg)") ax1.set_ylabel("Flux ratio (Out/In)") if plot_file is not None: plt.savefig(plot_file + "_flux_position.png") plt.show(block=False) plt.clf() return [dist, flux_ratio]
[docs] def plot_skycomponents_flux_histogram( comps_test, comps_ref, plot_file=None, nbins=10, tol=1e-5, refchan=None, **kwargs ): """Generate flux ratio plot vs distance for two lists of skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param plot_file: Filename of the plot :param tol: Tolerance in rad :param nbins: Number of bins for the histrogram :param refchan: Reference channel for comparison, default is centre channel :return: hist: The flux array for users to check """ if refchan is None: nchan, _ = comps_ref[0].flux.shape flux_in = numpy.array([comp.flux[nchan // 2, 0] for comp in comps_ref]) flux_out = numpy.array([comp.flux[nchan // 2, 0] for comp in comps_test]) else: flux_in = numpy.array([comp.flux[refchan, 0] for comp in comps_ref]) flux_out = numpy.array([comp.flux[refchan, 0] for comp in comps_test]) flux_in = flux_in[flux_in > 0.0] flux_out = flux_out[flux_out > 0.0] hist = [flux_in, flux_out] labels = ["Flux In", "Flux Out"] colors = ["r", "b"] hist_min = min(numpy.min(flux_in), numpy.min(flux_out)) hist_max = max(numpy.max(flux_in), numpy.max(flux_out)) hist_bins = numpy.logspace(numpy.log10(hist_min), numpy.log10(hist_max), nbins) fig, ax = plt.subplots() ax.hist(hist, bins=hist_bins, log=True, color=colors, label=labels) ax.set_title("Flux histogram") ax.set_xlabel("Flux (Jy)") ax.set_xscale("log") ax.set_ylabel("Source Count") plt.legend(loc="best") if plot_file is not None: plt.savefig(plot_file + "_flux_histogram.png") plt.show(block=False) plt.clf() return hist
[docs] def plot_skycomponents_position_quiver( comps_test, comps_ref, phasecentre, num=100, plot_file=None, tol=1e-5, **kwargs ): """Generate position error quiver diagram for two lists of skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param phasecentre: Centre of image in SkyCoords :param num: Number of the brightest sources to plot :param plot_file: Filename of the plot :param tol: Tolerance in rad :return: [ra_error, dec_error]: The error array for users to check """ angle_wrap = 180.0 * u.deg comps_test_sorted = sorted(comps_test, key=lambda cmp: numpy.max(cmp.flux)) matches = find_skycomponent_matches(comps_test_sorted, comps_ref, tol) num = min(num, len(matches)) ra_ref = numpy.zeros(num) dec_ref = numpy.zeros(num) ra_error = numpy.zeros(num) dec_error = numpy.zeros(num) for i in range(num): m_comp = comps_test_sorted[matches[i][0]] m_ref = comps_ref[matches[i][1]] ra_ref[i] = m_ref.direction.ra.wrap_at(angle_wrap).degree dec_ref[i] = m_ref.direction.dec.degree ra_error[i] = ( m_comp.direction.ra.wrap_at(angle_wrap).degree - m_ref.direction.ra.wrap_at(angle_wrap).degree ) * numpy.cos(m_ref.direction.dec.rad) dec_error[i] = m_comp.direction.dec.degree - m_ref.direction.dec.degree ref = max(numpy.max(numpy.abs(ra_error)), numpy.max(numpy.abs(dec_error))) scale_factor = 10 * ref log.info(f" Scale factor is {scale_factor}") fig, ax = plt.subplots() if numpy.mean(numpy.deg2rad(dec_ref)) != 0.0: ax.set_aspect(1.0 / numpy.cos(numpy.mean(numpy.deg2rad(dec_ref)))) q = ax.quiver(ra_ref, dec_ref, ra_error, dec_error, color="b") ax.scatter(ra_ref, dec_ref, color="r", s=8) ax.set_xlabel("RA (deg)") ax.set_ylabel("Dec (deg)") plt.title(f"Brightest {num} sources") if plot_file is not None: plt.savefig(plot_file + "_position_quiver.png") plt.show(block=False) plt.clf() return [ra_error, dec_error]
[docs] def plot_gaussian_beam_position( comps_test, comps_ref, phasecentre, image, num=100, plot_file=None, tol=1e-5, **kwargs, ): """Plot the major and minor size of beams for two lists of skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param phasecentre: Centre of image in SkyCoords :param image: Image to fit the skycomponents :param num: Number of the brightest sources to plot :param plot_file: Filename of the plot :param tol: Tolerance in rad :return: [bmaj, bmin]: The beam parameters for users to check """ angle_wrap = 180.0 * u.deg comps_test_sorted = sorted(comps_test, key=lambda cmp: numpy.max(cmp.flux)) matches = find_skycomponent_matches(comps_test_sorted, comps_ref, tol) num = min(num, len(matches)) # Only put in the items that can be fitted ra_dist = numpy.zeros(num) dec_dist = numpy.zeros(num) bmaj = numpy.zeros(num) bmin = numpy.zeros(num) dist = numpy.zeros(num) count = 0 i = 0 while count < num: match = matches[i] m_comp = comps_test_sorted[match[0]] m_ref = comps_ref[match[1]] log.info(f"Processing {match[0]}") i = i + 1 try: fitted = fit_skycomponent(image, m_comp, force_point_sources=False) log.info("{}".format(fitted.params)) ra_dist[count] = ( m_comp.direction.ra.wrap_at(angle_wrap).degree - phasecentre.ra.wrap_at(angle_wrap).deg ) * numpy.cos(m_ref.direction.dec.rad) dec_dist[count] = m_comp.direction.dec.degree - phasecentre.dec.deg dist[count] = m_comp.direction.separation(phasecentre).degree bmaj[count] = fitted.params["bmaj"] bmin[count] = fitted.params["bmin"] count = count + 1 # If fitting failed, no items will be found in the params dictionary except KeyError: log.warning(f"Fit skycomponent failed for component number {match[0]} ") log.info(f"Fitted {i} components, selected {num}") pos_dist = numpy.sqrt(numpy.array(ra_dist) ** 2.0 + numpy.array(dec_dist) ** 2.0) dist_r = numpy.max(pos_dist) dist_l = numpy.min(pos_dist) beam_r = max(numpy.max(bmaj), numpy.max(bmin)) beam_l = min(numpy.min(bmin), numpy.min(bmin)) ax1 = plt.subplot(212) ax1.plot(dist, bmaj, "o", color="b", markersize=5, alpha=0.5, label="Bmaj") ax1.plot(dist, bmin, "o", color="r", markersize=5, alpha=0.5, label="Bmin") ax1.legend(loc="best") ax1.set_ylabel("Beam size (deg)") ax1.set_xlabel(r"Distance to phase centre (deg)") ax2 = plt.subplot(221) ax2.plot(ra_dist, bmaj, "o", color="b", markersize=5, alpha=0.5, label="Bmaj") ax2.plot(ra_dist, bmin, "o", color="r", markersize=5, alpha=0.5, label="Bmin") ax2.set_ylabel(r"Beam size (deg)") ax2.set_title(r"$\Delta RA (deg)$") ax2.set_xlim([dist_l, dist_r]) ax2.set_ylim([beam_l, beam_r]) ax3 = plt.subplot(222) ax3.plot(dec_dist, bmaj, "o", color="b", markersize=5, alpha=0.5, label="Bmaj") ax3.plot(dec_dist, bmin, "o", color="r", markersize=5, alpha=0.5, label="Bmin") ax3.set_title(r"$\Delta Dec (deg)$") ax3.set_xlim([dist_l, dist_r]) ax3.set_ylim([beam_l, beam_r]) if plot_file is not None: plt.savefig(plot_file + "_gaussian_beam_position.png") plt.show(block=False) plt.clf() return [bmaj, bmin]
[docs] def plot_multifreq_spectral_index( comps_test, comps_ref, phasecentre, plot_file=None, tol=1e-5, flux_limit=0.0, spec_indx_test=None, spec_indx_ref=None, plot_diagnostics=False, **kwargs, ): """Generate spectral index plot for two lists of multi-frequency skycomponents :param comps_test: List of components to be tested :param comps_ref: List of reference components :param phasecentre: Centre of image in SkyCoords :param plot_file: Filename of the plot :param tol: Tolerance in rad :param flux_limit: Cutoff for plot (only components with central flux larger than this are plotted) :param spec_indx_test: Spectral index of comps_test if provided (if None, fit from components) :param spec_indx_ref: Spectral index of comps_ref if provided (if None, fit from components) :param plot_diagnostics: Whether to plot diagnostics plot (flux in vs. spectral index out) :return: [spec_in, spec_out]: The spectral index array for users to check """ matches = find_skycomponent_matches(comps_test, comps_ref, tol) spec_in = numpy.zeros(len(matches)) spec_out = numpy.zeros(len(matches)) flux_in = numpy.zeros(len(matches)) dist = numpy.zeros(len(matches)) for i, match in enumerate(matches): m_comp = comps_test[match[0]] m_ref = comps_ref[match[1]] if spec_indx_ref is None or len(spec_indx_ref) < i: spec_in[i] = fit_skycomponent_spectral_index(m_ref) else: spec_in[i] = spec_indx_ref[match[1]] if spec_indx_test is None or len(spec_indx_test) < i: spec_out[i] = fit_skycomponent_spectral_index(m_comp) else: spec_out[i] = spec_indx_test[match[0]] flux_in[i] = m_ref.flux[m_ref.flux.shape[0] // 2][0] dist[i] = m_comp.direction.separation(phasecentre).degree # mask out the ones that didn't get fitted properly mask_spec = (spec_in != 0.0) & (spec_out != 0.0) mask_cutoff = flux_in > flux_limit spec_in = spec_in[mask_spec & mask_cutoff] spec_out = spec_out[mask_spec & mask_cutoff] flux_in = flux_in[mask_spec & mask_cutoff] dist = dist[mask_spec & mask_cutoff] plt.plot(spec_in, spec_out, "o", color="b", markersize=5, alpha=0.5) plt.title("Spectral Indexes") plt.xlabel("Spectral index in") plt.ylabel("Spectral index out") if plot_file is not None: plt.savefig(plot_file + "_spec_index.png") plt.show(block=False) plt.clf() # Plot diagnostics plots: spectral index out vs flux in and vs distance if plot_diagnostics: plt.plot(flux_in, spec_out, "o", color="b", markersize=5, alpha=0.5) plt.xlabel("Flux In (Jy)") plt.ylabel("Spectral Index") if plot_file is not None: plt.savefig(plot_file + "_spec_index_diagnostics_flux.png") plt.show(block=False) plt.clf() plt.plot(dist, spec_out - spec_in, "o", color="b", markersize=5, alpha=0.5) plt.xlabel("Distance to centre (Deg)") plt.ylabel("Spectral Index Out-In") if plot_file is not None: plt.savefig(plot_file + "_spec_index_diagnostics_dist.png") plt.show(block=False) plt.clf() return [spec_in, spec_out]