Source code for rascil.processing_components.simulation.simulation_helpers

""" Functions that help with SKA simulations

"""

__all__ = [
    "plot_visibility",
    "plot_visibility_pol",
    "find_times_above_elevation_limit",
    "plot_uvcoverage",
    "plot_uwcoverage",
    "plot_vwcoverage",
    "plot_configuration",
    "plot_azel",
    "plot_gaintable",
    "plot_pointingtable",
    "find_pb_width_null",
    "create_mid_simulation_components",
    "plot_pa",
]

import logging

import matplotlib.pyplot as plt
import numpy
from ska_sdp_datamodels.image.image_create import create_image
from ska_sdp_datamodels.science_data_model.polarisation_model import PolarisationFrame
from ska_sdp_func_python.sky_component import (
    apply_beam_to_skycomponent,
    filter_skycomponents_by_flux,
)
from ska_sdp_func_python.util import hadec_to_azel
from ska_sdp_func_python.visibility.visibility_geometry import (
    calculate_visibility_azel,
    calculate_visibility_hourangles,
    calculate_visibility_parallactic_angles,
)

from rascil import phyconst
from rascil.processing_components.image.operations import show_image
from rascil.processing_components.imaging.primary_beams import create_pb

log = logging.getLogger("rascil-logger")


[docs] def find_times_above_elevation_limit( start_times, end_times, location, phasecentre, elevation_limit ): """Find all times for which a phasecentre is above the elevation limit :param start_times: :param end_times: :param location: :param phasecentre: :param elevation_limit: :return: """ assert len(start_times) == len(end_times) def valid_elevation(time, location, phasecentre): ha = numpy.pi * time / 43200.0 dec = phasecentre.dec.rad az, el = hadec_to_azel(ha, dec, location.lat.rad) return el > elevation_limit * numpy.pi / 180.0 number_valid_times = 0 valid_start_times = [] for it, t in enumerate(start_times): if valid_elevation(start_times[it], location, phasecentre) or valid_elevation( end_times[it], location, phasecentre ): valid_start_times.append(t) number_valid_times += 1 assert number_valid_times > 0, "No data above elevation limit" return valid_start_times
[docs] def plot_visibility( vis_list, colors=None, title="Visibility", y="amp", x="uvdist", plot_file=None, chan=0, markersize=0.2, **kwargs ): """Standard plot of visibility :param vis_list: :param plot_file: :param kwargs: :return: """ plt.clf() if colors is None: colors = numpy.repeat(["b"], len(vis_list)) for ivis, vis in enumerate(vis_list): ntimes, nbaselines, nchan, npol = vis["vis"].data.shape if x == "time": xvalue = numpy.repeat(vis["time"].data, nbaselines) else: uvdist = numpy.hypot(vis.visibility_acc.u.data, vis.visibility_acc.v.data) xvalue = uvdist.flat if y == "amp": yvalue = numpy.abs(vis.visibility_acc.flagged_vis[..., chan, 0]).flat plt.plot( xvalue[yvalue > 0.0], yvalue[yvalue > 0.0], ".", color=colors[ivis], markersize=markersize, ) else: yvalue = numpy.angle(vis.visibility_acc.flagged_vis[..., chan, 0]).flat plt.plot( xvalue, yvalue, ".", color=colors[ivis], markersize=markersize, ) plt.xlabel(x) plt.ylabel(y) plt.title(title) if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_visibility_pol( vis_list, title="Visibility_pol", y="amp", x="uvdist", plot_file=None, chan=0, **kwargs ): """Standard plot of visibility :param vis_list: :param plot_file: :param kwargs: :return: """ plt.clf() for ivis, vis in enumerate(vis_list): pols = vis.visibility_acc.polarisation_frame.names colors = ["red", "blue", "green", "purple"] for pol in range(vis.visibility_acc.npol): if y == "amp": yvalue = numpy.abs(vis.visibility_acc.flagged_vis[..., chan, pol]).flat else: yvalue = numpy.angle( vis.visibility_acc.flagged_vis[..., chan, pol] ).flat if x == "time": xvalue = numpy.repeat(vis["time"].data, len(yvalue)) else: uvdist = numpy.hypot( vis.visibility_acc.u.data, vis.visibility_acc.v.data ) xvalue = uvdist.flat if ivis == 0: plt.plot( xvalue[yvalue > 0.0], yvalue[yvalue > 0.0], ".", color=colors[pol], label=pols[pol], ) else: plt.plot( xvalue[yvalue > 0.0], yvalue[yvalue > 0.0], ".", color=colors[pol] ) plt.xlabel(x) plt.ylabel(y) plt.title(title) plt.legend() if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_uvcoverage(vis_list, ax=None, plot_file=None, title="UV coverage", **kwargs): """Standard plot of uv coverage :param vis_list: :param plot_file: :param kwargs: :return: """ for ivis, ovis in enumerate(vis_list): gvis = ovis.where(ovis["flags"] == 0) bvis = ovis.where(ovis["flags"] > 0) # After selection, the uvw has been broadcasted into 5 dimensions: # ("time", "baselines", "spatial","frequency", "polarisation") # We are dropping the last two dimensions gvis["uvw"] = gvis["uvw"][..., 0, 0] bvis["uvw"] = bvis["uvw"][..., 0, 0] u = numpy.array(gvis.visibility_acc.uvw_lambda[..., 0].flat) v = numpy.array(gvis.visibility_acc.uvw_lambda[..., 1].flat) # If the entire visibility is flagged, u and v will be NAN, # In this case, we skip the plotting. if ~numpy.isnan(u).all() or ~numpy.isnan(v).all(): if ivis == 0: plt.plot(u, v, ".", color="b", markersize=0.2, label="Unflagged") else: plt.plot( u, v, ".", color="b", markersize=0.2, ) plt.plot(-u, -v, ".", color="b", markersize=0.2) else: log.warning("There is no unflagged visibility. Skip plotting unflagged.") u = numpy.array(bvis.visibility_acc.uvw_lambda[..., 0].flat) v = numpy.array(bvis.visibility_acc.uvw_lambda[..., 1].flat) if ~numpy.isnan(u).all() or ~numpy.isnan(v).all(): if ivis == 0: plt.plot(u, v, ".", color="r", markersize=0.2, label="Flagged") else: plt.plot(u, v, ".", color="r", markersize=0.2) plt.plot(-u, -v, ".", color="r", markersize=0.2) else: log.warning("There is no flagged visibility. Skip plotting flagged.") plt.xlabel("U (wavelengths)") plt.ylabel("V (wavelengths)") plt.legend() plt.title(title) if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_configuration( config, ax=None, plot_file=None, title="Configuration", label=False, **kwargs ): """Standard plot of uv coverage :param vis_list: :param plot_file: :param kwargs: :return: """ antxyz = config.xyz.data names = config.names.data if label: plt.plot(antxyz[:, 0], antxyz[:, 1], ".", color="b", markersize=2.4) for iant, name in enumerate(names): plt.annotate(name, (antxyz[iant, 0], antxyz[iant, 1])) else: plt.plot(antxyz[:, 0], antxyz[:, 1], ".", color="b", markersize=10.0) plt.xlabel("X (m)") plt.ylabel("Y (m)") plt.title(title) if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_uwcoverage(vis_list, ax=None, plot_file=None, title="UW coverage", **kwargs): """Standard plot of uw coverage :param vis_list: :param plot_file: :param kwargs: :return: """ for ivis, vis in enumerate(vis_list): u = numpy.array(vis.visibility_acc.u.data[...].flat) w = numpy.array(vis.visibility_acc.w.data[...].flat) k = vis["frequency"].data / phyconst.c_m_s u = numpy.array(numpy.outer(u, k).flat) w = numpy.array(numpy.outer(w, k).flat) plt.plot(u, w, ".", color="b", markersize=0.2) plt.plot(-u, -w, ".", color="b", markersize=0.2) plt.xlabel("U (wavelengths)") plt.ylabel("W (wavelengths)") plt.title(title) if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_vwcoverage(vis_list, ax=None, plot_file=None, title="VW coverage", **kwargs): """Standard plot of vw coverage :param vis_list: :param plot_file: :param kwargs: :return: """ for ivis, vis in enumerate(vis_list): v = numpy.array(vis.visibility_acc.v.data[...].flat) w = numpy.array(vis.visibility_acc.w.data[...].flat) k = vis["frequency"].data / phyconst.c_m_s v = numpy.array(numpy.outer(v, k).flat) w = numpy.array(numpy.outer(w, k).flat) plt.plot(v, w, ".", color="b", markersize=0.2) plt.plot(-v, -w, ".", color="b", markersize=0.2) plt.xlabel("V (wavelengths)") plt.ylabel("W (wavelengths)") plt.title(title) if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_azel(bvis_list, plot_file=None, **kwargs): """Standard plot of az el coverage :param bvis_list: :param plot_file: :param kwargs: :return: """ plt.clf() r2h = 12.0 / numpy.pi for ibvis, bvis in enumerate(bvis_list): ha = r2h * calculate_visibility_hourangles(bvis).value az, el = calculate_visibility_azel(bvis) if ibvis == 0: plt.plot(ha, az.deg, ".", color="r", label="Azimuth (deg)") plt.plot(ha, el.deg, ".", color="b", label="Elevation (deg)") else: plt.plot(ha, az.deg, ".", color="r") plt.plot(ha, el.deg, ".", color="b") plt.xlabel("HA (hours)") plt.ylabel("Angle") plt.legend() plt.title("Azimuth and elevation vs hour angle") if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_pa(bvis_list, plot_file=None, **kwargs): """Standard plot of parallactic angle coverage :param bvis_list: :param plot_file: :param kwargs: :return: """ plt.clf() for ibvis, bvis in enumerate(bvis_list): ha = calculate_visibility_hourangles(bvis).value pa = calculate_visibility_parallactic_angles(bvis) if ibvis == 0: plt.plot(ha, pa.deg, ".", color="r", label="PA (deg)") else: plt.plot(ha, pa.deg, ".", color="r") plt.xlabel("HA (hours)") plt.ylabel("Parallactic Angle") plt.legend() plt.title("Parallactic angle vs hour angle") if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_gaintable(gt_list, title="", value="amp", plot_file=None, **kwargs): """Standard plot of gain table :param gt_list: :param title: :param plot_file: :param kwargs: :return: """ plt.clf() for igt, gt in enumerate(gt_list): nrec = gt[0].gaintable_acc.nrec names = gt[0].receptor1.data if nrec > 1: recs = [0, 1] else: recs = [1] colors = ["r", "b"] for irec, rec in enumerate(recs): amp = numpy.abs(gt[0].gain[:, 0, 0, rec, rec]) if value == "phase": y = numpy.angle(gt[0].gain[:, 0, 0, rec, rec]) if igt == 0: plt.plot( gt[0].time[amp > 0.0], y[amp > 0.0], ".", color=colors[rec], label=names[rec], ) else: plt.plot( gt[0].time[amp > 0.0], y[amp > 0.0], ".", color=colors[rec] ) else: y = amp if igt == 0: plt.plot( gt[0].time[amp > 0.0], 1.0 / y[amp > 0.0], ".", color=colors[rec], label=names[rec], ) else: plt.plot( gt[0].time[amp > 0.0], 1.0 / y[amp > 0.0], ".", color=colors[rec], ) plt.title(title) plt.xlabel("Time (s)") plt.legend() if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def plot_pointingtable(pt_list, plot_file, title, **kwargs): """Standard plot of pointing table :param pt_list: :param plot_file: :param title: :param kwargs: :return: """ plt.clf() r2a = 180.0 * 3600.0 / numpy.pi rms_az = 0.0 rms_el = 0.0 num = 0 for pt in pt_list: num += len(pt.pointing[:, 0, 0, 0, 0]) rms_az += numpy.sum((r2a * pt.pointing[:, 0, 0, 0, 0]) ** 2) rms_el += numpy.sum((r2a * pt.pointing[:, 0, 0, 0, 1]) ** 2) plt.plot(pt.time, r2a * pt.pointing[:, 0, 0, 0, 0], ".", color="r") plt.plot(pt.time, r2a * pt.pointing[:, 0, 0, 0, 1], ".", color="b") rms_az = numpy.sqrt(rms_az / num) rms_el = numpy.sqrt(rms_el / num) plt.title("%s az, el rms %.2f %.2f (arcsec)" % (title, rms_az, rms_el)) plt.xlabel("Time (s)") plt.ylabel("Offset (arcsec)") if plot_file is not None: plt.savefig(plot_file) plt.show(block=False)
[docs] def find_pb_width_null(pbtype, frequency, **kwargs): """Rough estimates of HWHM and null locations :param pbtype: :param frequency: :param kwargs: :return: """ if pbtype == "MID": HWHM_deg = 0.596 * 1.36e9 / frequency[0] null_az_deg = 2.0 * HWHM_deg null_el_deg = 2.0 * HWHM_deg elif pbtype == "MID_FEKO_B1": null_az_deg = 1.0779 * 1.36e9 / frequency[0] null_el_deg = 1.149 * 1.36e9 / frequency[0] HWHM_deg = 0.447 * 1.36e9 / frequency[0] elif pbtype == "MID_FEKO_B2": null_az_deg = 1.0779 * 1.36e9 / frequency[0] null_el_deg = 1.149 * 1.36e9 / frequency[0] HWHM_deg = 0.447 * 1.36e9 / frequency[0] elif pbtype == "MID_FEKO_Ku": null_az_deg = 1.0779 * 1.36e9 / frequency[0] null_el_deg = 1.149 * 1.36e9 / frequency[0] HWHM_deg = 0.447 * 1.36e9 / frequency[0] else: null_az_deg = 1.145 * 1.36e9 / frequency[0] null_el_deg = 1.145 * 1.36e9 / frequency[0] HWHM_deg = 0.447 * 1.36e9 / frequency[0] return HWHM_deg, null_az_deg, null_el_deg
[docs] def create_mid_simulation_components( phasecentre, frequency, flux_limit, pbradius, pb_npixel, pb_cellsize, show=False, fov=10, polarisation_frame=PolarisationFrame("stokesI"), flux_max=10.0, pb_type="MID", apply_pb=True, ): """Construct components for simulation :param context: singlesource or null or s3sky :param phasecentre: Centre of components :param frequency: Frequency :param pbtype: Type of primary beam :param offset_dir: Offset in ra, dec degrees :param flux_limit: Lower limit flux :param pbradius: Radius of components in radians :param pb_npixel: Number of pixels in the primary beam model :param pb_cellsize: Cellsize in primary beam model :param fov: FOV in degrees (used to select catalog) :param flux_max: Maximum flux in model before application of primary beam :param polarisation_frame: :param apply_pb: Apply the primary beam to the output components :param show: :return: """ # Make a skymodel from S3 max_flux = 0.0 total_flux = 0.0 log.info("create_simulation_components: Constructing s3sky components") from rascil.processing_components.simulation import ( create_test_skycomponents_from_s3, ) all_components = create_test_skycomponents_from_s3( flux_limit=flux_limit, phasecentre=phasecentre, polarisation_frame=polarisation_frame, frequency=numpy.array(frequency), radius=pbradius, fov=fov, ) original_components = filter_skycomponents_by_flux( all_components, flux_max=flux_max ) log.info( "create_simulation_components: %d components before filtering with primary beam" % (len(original_components)) ) if numpy.isscalar(frequency): pbmodel = create_image( npixel=pb_npixel, cellsize=pb_cellsize, phasecentre=phasecentre, frequency=frequency, nchan=1, polarisation_frame=polarisation_frame, ) # if frequency is an array pbmodel = create_image( npixel=pb_npixel, cellsize=pb_cellsize, phasecentre=phasecentre, frequency=frequency[0], nchan=len(frequency), polarisation_frame=polarisation_frame, ) pb = create_pb(pbmodel, pb_type, pointingcentre=phasecentre, use_local=False) pb_applied_components = apply_beam_to_skycomponent(original_components, pb) filtered_components = [] filtered_pb_components = [] reference_component = -1 reference_flux = 0.0 for icomp, comp in enumerate(original_components): if pb_applied_components[icomp].flux[0, 0] > flux_limit: total_flux += pb_applied_components[icomp].flux[0, 0] nchan, npol = comp.flux.shape scomp = comp.copy() npol = polarisation_frame.npol iflux = numpy.zeros([nchan, npol]) iflux[:, 0] = scomp.flux[:, 0] scomp.flux = iflux scomp.polarisation_frame = polarisation_frame filtered_components.append(scomp) if scomp.flux[0, 0] > reference_flux: reference_component = len(filtered_components) - 1 filtered_pb_components.append(pb_applied_components[icomp]) log.info( "create_simulation_components: %d components > %.6f Jy" " after filtering with primary beam" % (len(filtered_components), flux_limit) ) log.info( "create_simulation_components: Total flux in components is %g (Jy)" % total_flux ) if show: plt.clf() show_image(pb, components=filtered_components) plt.show(block=False) log.info( "create_simulation_components: Created %d components" % len(filtered_components) ) # If applying primary beam, return components before and after the primary beam # If not, return the components, and reference component. if apply_pb: return filtered_pb_components, filtered_components else: return filtered_components, reference_component