Source code for rascil.processing_components.simulation.testing_support

"""
Functions that aid testing in various ways. A typical use would be::

        lowcore = create_named_configuration('LOWBD2-CORE')
        times = numpy.linspace(-3, +3, 13) * (numpy.pi / 12.0)

        frequency = numpy.array([1e8])
        channel_bandwidth = numpy.array([1e7])

        # Define the component and give it some polarisation and spectral behaviour
        f = numpy.array([100.0])
        flux = numpy.array([f])

        phasecentre =
            SkyCoord(ra=+15.0 * u.deg, dec=-35.0 * u.deg, frame='icrs', equinox='J2000')
        compabsdirection =
            SkyCoord(ra=17.0 * u.deg, dec=-36.5 * u.deg, frame='icrs', equinox='J2000')

        comp = SkyComponent(flux=flux, frequency=frequency, direction=compabsdirection,
                                        polarisation_frame=PolarisationFrame('stokesI'))
        image = create_test_image(frequency=frequency,
                                  phasecentre=phasecentre,
                                  cellsize=0.001,
                                  polarisation_frame=PolarisationFrame('stokesI'))
        vis = create_visibility(lowcore, times=times, frequency=frequency,
                                     channel_bandwidth=channel_bandwidth,
                                     phasecentre=phasecentre, weight=1,
                                     polarisation_frame=PolarisationFrame('stokesI'),
                                     integration_time=1.0)

"""

__all__ = [
    "create_low_test_image_from_gleam",
    "create_low_test_skycomponents_from_gleam",
    "create_low_test_skymodel_from_gleam",
    "create_test_image",
    "create_test_image_from_s3",
    "create_test_skycomponents_from_s3",
    "create_unittest_components",
    "create_unittest_model",
    "ingest_unittest_visibility",
    "insert_unittest_errors",
    "replicate_image",
    "simulate_gaintable",
]

import csv
import logging
from typing import List

import astropy.units as u
import numpy
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.wcs import WCS
from astropy.wcs.utils import pixel_to_skycoord
from scipy import interpolate
from ska_sdp_datamodels.calibration import GainTable, create_gaintable_from_visibility
from ska_sdp_datamodels.image.image_model import Image
from ska_sdp_datamodels.science_data_model.polarisation_model import PolarisationFrame
from ska_sdp_datamodels.sky_model.sky_model import SkyComponent, SkyModel
from ska_sdp_datamodels.visibility import create_visibility
from ska_sdp_func_python.calibration import apply_gaintable, create_calibration_controls
from ska_sdp_func_python.imaging import advise_wide_field, create_image_from_visibility
from ska_sdp_func_python.sky_component import (
    apply_beam_to_skycomponent,
    filter_skycomponents_by_flux,
    insert_skycomponent,
)

from rascil.processing_components.image.operations import import_image_from_fits
from rascil.processing_components.imaging.primary_beams import create_pb
from rascil.processing_components.parameters import rascil_data_path
from rascil.processing_components.util.installation_checks import check_data_directory

log = logging.getLogger("rascil-logger")


[docs] def create_test_image( cellsize=None, frequency=None, channel_bandwidth=None, phasecentre=None, polarisation_frame=None, ) -> Image: """Create a useful test image This is the test image M31 widely used in ALMA and other simulations. It is actually part of an Halpha region in M31. :param cellsize: :param frequency: Frequency (array) in Hz :param channel_bandwidth: Channel bandwidth (array) in Hz :param phasecentre: Phase centre of image (SkyCoord) :param polarisation_frame: Polarisation frame :return: Image """ check_data_directory() im = import_image_from_fits(rascil_data_path("models/M31_canonical.model.fits")) if frequency is None: frequency = [1e8] if polarisation_frame is None: polarisation_frame = im.image_acc.polarisation_frame im = replicate_image(im, frequency=frequency, polarisation_frame=polarisation_frame) wcs = im.image_acc.wcs.deepcopy() if cellsize is not None: wcs.wcs.cdelt[0] = -180.0 * cellsize / numpy.pi wcs.wcs.cdelt[1] = +180.0 * cellsize / numpy.pi if frequency is not None: wcs.wcs.crval[3] = frequency[0] if channel_bandwidth is not None: wcs.wcs.cdelt[3] = channel_bandwidth[0] else: if len(frequency) > 1: wcs.wcs.cdelt[3] = frequency[1] - frequency[0] else: wcs.wcs.cdelt[3] = 0.001 * frequency[0] wcs.wcs.radesys = "ICRS" wcs.wcs.equinox = 2000.00 if phasecentre is None: phasecentre = im.image_acc.phasecentre else: wcs.wcs.crval[0] = phasecentre.ra.deg wcs.wcs.crval[1] = phasecentre.dec.deg # WCS is 1 relative wcs.wcs.crpix[0] = im["pixels"].data.shape[3] // 2 + 1 wcs.wcs.crpix[1] = im["pixels"].data.shape[2] // 2 + 1 return Image.constructor( data=im["pixels"].data, polarisation_frame=polarisation_frame, wcs=wcs )
[docs] def create_test_image_from_s3( npixel=16384, polarisation_frame=PolarisationFrame("stokesI"), cellsize=0.000015, frequency=numpy.array([1e8]), channel_bandwidth=numpy.array([1e6]), phasecentre=None, fov=20, flux_limit=1e-3, ) -> Image: """Create MID test image from S3 The input catalog was generated using the following query:: Database: s3_sex SQL: select * from Galaxies where (pow(10,itot_151)*1000 > 1.0) and (right_ascension between -5 and 5) and (declination between -5 and 5);; Number of rows returned: 29966 For frequencies < 610MHz, there are three tables to use:: data/models/S3_151MHz_10deg.csv, use fov=10 data/models/S3_151MHz_20deg.csv, use fov=20 data/models/S3_151MHz_40deg.csv, use fov=40 For frequencies > 610MHz, there are three tables: data/models/S3_1400MHz_1mJy_10deg.csv, use flux_limit>= 1e-3 data/models/S3_1400MHz_100uJy_10deg.csv, use flux_limit < 1e-3 data/models/S3_1400MHz_10uJy_10deg.csv, use flux_limit < 1e-4 data/models/S3_1400MHz_1mJy_18deg.csv, use flux_limit>= 1e-3 data/models/S3_1400MHz_100uJy_18deg.csv, use flux_limit < 1e-3 The component spectral index is calculated from the 610MHz and 151MHz or 1400MHz and 610MHz, and then calculated for the specified frequencies. If polarisation_frame is not stokesI then the image will a polarised axis but the values will be zero. :param npixel: Number of pixels :param polarisation_frame: Polarisation frame (default PolarisationFrame("stokesI")) :param cellsize: cellsize in radians :param frequency: :param channel_bandwidth: Channel width (Hz) :param phasecentre: phasecentre (SkyCoord) :param fov: fov 10 | 20 | 40 :param flux_limit: Minimum flux (Jy) :return: Image """ check_data_directory() ras = [] decs = [] fluxes = [] if phasecentre is None: phasecentre = SkyCoord( ra=+180.0 * u.deg, dec=-60.0 * u.deg, frame="icrs", equinox="J2000" ) if polarisation_frame is None: polarisation_frame = PolarisationFrame("stokesI") npol = polarisation_frame.npol nchan = len(frequency) shape = [nchan, npol, npixel, npixel] w = WCS(naxis=4) # The negation in the longitude is needed by definition of RA, DEC w.wcs.cdelt = [ -cellsize * 180.0 / numpy.pi, cellsize * 180.0 / numpy.pi, 1.0, channel_bandwidth[0], ] w.wcs.crpix = [npixel // 2 + 1, npixel // 2 + 1, 1.0, 1.0] w.wcs.ctype = ["RA---SIN", "DEC--SIN", "STOKES", "FREQ"] w.wcs.crval = [phasecentre.ra.deg, phasecentre.dec.deg, 1.0, frequency[0]] w.naxis = 4 w.wcs.radesys = "ICRS" w.wcs.equinox = 2000.0 model = Image.constructor( data=numpy.zeros(shape), polarisation_frame=polarisation_frame, wcs=w ) if numpy.max(frequency) > 6.1e8: if fov > 10: fovstr = "18" else: fovstr = "10" if flux_limit >= 1e-3: csvfilename = rascil_data_path("models/S3_1400MHz_1mJy_%sdeg.csv" % fovstr) elif flux_limit >= 1e-4: csvfilename = rascil_data_path( "models/S3_1400MHz_100uJy_%sdeg.csv" % fovstr ) else: fovstr = "10" csvfilename = rascil_data_path("models/S3_1400MHz_10uJy_%sdeg.csv" % fovstr) log.info( "create_test_image_from_s3: Reading S3-SEX sources from %s " % csvfilename ) else: assert fov in [10, 20, 40], "Field of view invalid: use one of %s" % ( [10, 20, 40] ) csvfilename = rascil_data_path("models/S3_151MHz_%ddeg.csv" % (fov)) log.info( "create_test_image_from_s3: Reading S3-SEX sources from %s " % csvfilename ) with open(csvfilename) as csvfile: readCSV = csv.reader(csvfile, delimiter=",") r = 0 for row in readCSV: # Skip first row if r > 0: ra = float(row[4]) / numpy.cos(phasecentre.dec.rad) + phasecentre.ra.deg dec = float(row[5]) + phasecentre.dec.deg if numpy.max(frequency) > 4.68e9: alpha = (float(row[13]) - float(row[12])) / numpy.log10( 18000.0 / 4680.0 ) flux = numpy.power(10, float(row[12])) * numpy.power( frequency / 4.68e9, alpha ) elif numpy.max(frequency) > 1.4e9: alpha = (float(row[12]) - float(row[11])) / numpy.log10( 4860.0 / 1400.0 ) flux = numpy.power(10, float(row[11])) * numpy.power( frequency / 1.4e9, alpha ) elif numpy.max(frequency) > 6.1e8: alpha = (float(row[11]) - float(row[10])) / numpy.log10( 1400.0 / 610.0 ) flux = numpy.power(10, float(row[10])) * numpy.power( frequency / 6.1e8, alpha ) else: alpha = (float(row[10]) - float(row[9])) / numpy.log10( 610.0 / 151.0 ) flux = numpy.power(10, float(row[9])) * numpy.power( frequency / 1.51e8, alpha ) if numpy.max(flux) > flux_limit: ras.append(ra) decs.append(dec) fluxes.append(flux) r += 1 csvfile.close() assert len(fluxes) > 0, "No sources found above flux limit %s" % flux_limit log.info("create_test_image_from_s3: %d sources read" % (len(fluxes))) p = w.sub(2).wcs_world2pix(numpy.array(ras), numpy.array(decs), 1) fluxes = numpy.array(fluxes) total_flux = numpy.sum(fluxes) ip = numpy.round(p).astype("int") ok = numpy.where( (0 <= ip[0, :]) & (npixel > ip[0, :]) & (0 <= ip[1, :]) & (npixel > ip[1, :]) )[0] ps = ip[:, ok] fluxes = fluxes[ok] actual_flux = numpy.sum(fluxes) log.info("create_test_image_from_s3: %d sources inside the image" % (ps.shape[1])) log.info( "create_test_image_from_s3: average channel flux in S3-SEX model = %.3f," " actual average channel flux in " "image = %.3f" % (total_flux / float(nchan), actual_flux / float(nchan)) ) for chan in range(nchan): for iflux, flux in enumerate(fluxes): model["pixels"].data[chan, 0, ps[1, iflux], ps[0, iflux]] = flux[chan] return model
[docs] def create_test_skycomponents_from_s3( polarisation_frame=PolarisationFrame("stokesI"), frequency=numpy.array([1e8]), channel_bandwidth=numpy.array([1e6]), phasecentre=None, fov=20, flux_limit=1e-3, radius=None, ): """Create test image from S3 The input catalog was generated using the following query:: Database: s3_sex SQL: select * from Galaxies where (pow(10,itot_151)*1000 > 1.0) and (right_ascension between -5 and 5) and (declination between -5 and 5);; Number of rows returned: 29966 For frequencies < 610MHz, there are three tables to use:: data/models/S3_151MHz_10deg.csv, use fov=10 data/models/S3_151MHz_20deg.csv, use fov=20 data/models/S3_151MHz_40deg.csv, use fov=40 For frequencies > 610MHz, there are three tables: data/models/S3_1400MHz_1mJy_10deg.csv, use flux_limit>= 1e-3 data/models/S3_1400MHz_100uJy_10deg.csv, use flux_limit < 1e-3 data/models/S3_1400MHz_10uJy_10deg.csv, use flux_limit < 1e-4 data/models/S3_1400MHz_1mJy_18deg.csv, use flux_limit>= 1e-3 data/models/S3_1400MHz_100uJy_18deg.csv, use flux_limit < 1e-3 The component spectral index is calculated from the 610MHz and 151MHz or 1400MHz and 610MHz, and then calculated for the specified frequencies. If polarisation_frame is not stokesI then the image will a polarised axis but the values will be zero. :param polarisation_frame: Polarisation frame (default PolarisationFrame("stokesI")) :param frequency: :param channel_bandwidth: Channel width (Hz) :param phasecentre: phasecentre (SkyCoord) :param fov: fov 10 | 20 | 40 :param flux_limit: Minimum flux (Jy) :param radius: radius of search area in radians (Default is half-width of the axis) :return: SkyComponents """ check_data_directory() ras = [] decs = [] fluxes = [] names = [] if phasecentre is None: phasecentre = SkyCoord( ra=+180.0 * u.deg, dec=-60.0 * u.deg, frame="icrs", equinox="J2000" ) if polarisation_frame is None: polarisation_frame = PolarisationFrame("stokesI") if numpy.max(frequency) > 6.1e8: if fov > 10: fovstr = "18" else: fovstr = "10" if flux_limit >= 1e-3: csvfilename = rascil_data_path("models/S3_1400MHz_1mJy_%sdeg.csv" % fovstr) elif flux_limit >= 1e-4: csvfilename = rascil_data_path( "models/S3_1400MHz_100uJy_%sdeg.csv" % fovstr ) else: fovstr = "10" csvfilename = rascil_data_path("models/S3_1400MHz_10uJy_%sdeg.csv" % fovstr) log.info( "create_test_skycomponents_from_s3: Reading S3-SEX sources from %s " % csvfilename ) else: assert fov in [10, 20, 40], "Field of view invalid: use one of %s" % ( [10, 20, 40] ) csvfilename = rascil_data_path("models/S3_151MHz_%ddeg.csv" % (fov)) log.info( "create_test_skycomponents_from_s3: Reading S3-SEX sources from %s " % csvfilename ) skycomps = list() with open(csvfilename) as csvfile: readCSV = csv.reader(csvfile, delimiter=",") r = 0 for row in readCSV: # Skip first row if r > 0: ra = float(row[4]) / numpy.cos(phasecentre.dec.rad) + phasecentre.ra.deg dec = float(row[5]) + phasecentre.dec.deg if numpy.max(frequency) > 4.68e9: alpha = (float(row[13]) - float(row[12])) / numpy.log10( 18000.0 / 4680.0 ) flux = numpy.power(10, float(row[12])) * numpy.power( frequency / 4.68e9, alpha ) elif numpy.max(frequency) > 1.4e9: alpha = (float(row[12]) - float(row[11])) / numpy.log10( 4860.0 / 1400.0 ) flux = numpy.power(10, float(row[11])) * numpy.power( frequency / 1.4e9, alpha ) elif numpy.max(frequency) > 6.1e8: alpha = (float(row[11]) - float(row[10])) / numpy.log10( 1400.0 / 610.0 ) flux = numpy.power(10, float(row[10])) * numpy.power( frequency / 6.1e8, alpha ) else: alpha = (float(row[10]) - float(row[9])) / numpy.log10( 610.0 / 151.0 ) flux = numpy.power(10, float(row[9])) * numpy.power( frequency / 1.51e8, alpha ) if numpy.max(flux) > flux_limit: ras.append(ra) decs.append(dec) if polarisation_frame == PolarisationFrame("stokesIQUV"): polscale = numpy.array([1.0, 0.0, 0.0, 0.0]) fluxes.append(numpy.outer(flux, polscale)) else: fluxes.append([[f] for f in flux]) names.append("S3_%s" % row[0]) r += 1 csvfile.close() assert len(fluxes) > 0, "No sources found above flux limit %s" % flux_limit directions = SkyCoord(ra=ras * u.deg, dec=decs * u.deg) if phasecentre is not None: separations = directions.separation(phasecentre).to("rad").value else: separations = numpy.zeros(len(names)) for isource, name in enumerate(names): direction = directions[isource] if separations[isource] < radius: if not numpy.isnan(flux).any(): skycomps.append( SkyComponent( direction=direction, flux=fluxes[isource], frequency=frequency, name=names[isource], shape="Point", polarisation_frame=polarisation_frame, ) ) log.info( "create_test_skycomponents_from_s3: %d sources found above " "fluxlimit inside search radius" % len(skycomps) ) return skycomps
[docs] def create_low_test_image_from_gleam( npixel=512, polarisation_frame=PolarisationFrame("stokesI"), cellsize=0.000015, frequency=numpy.array([1e8]), channel_bandwidth=None, phasecentre=None, kind="cubic", applybeam=False, flux_limit=0.1, flux_max=numpy.inf, flux_min=-numpy.inf, radius=None, insert_method="Nearest", ) -> Image: """Create LOW test image from the GLEAM survey Stokes I is estimated from a cubic spline fit to the measured fluxes. The polarised flux is always zero. See http://www.mwatelescope.org/science/gleam-survey The catalog is available from Vizier. VIII/100 GaLactic and Extragalactic All-sky MWA survey (Hurley-Walker+, 2016) GaLactic and Extragalactic All-sky Murchison Wide Field Array (GLEAM) survey. I: A low-frequency extragalactic catalogue. Hurley-Walker N., et al., Mon. Not. R. Astron. Soc., 464, 1146-1167 (2017), 2017MNRAS.464.1146H :param npixel: Number of pixels :param polarisation_frame: Polarisation frame (default PolarisationFrame("stokesI")) :param cellsize: cellsize in radians :param frequency: :param channel_bandwidth: Channel width (Hz) :param phasecentre: phasecentre (SkyCoord) :param kind: Kind of interpolation (see scipy.interpolate.interp1d) Default: linear :param radius: radius of search area in radians (Default is half-width of the diagonal) :return: Image """ check_data_directory() if phasecentre is None: phasecentre = SkyCoord( ra=+15.0 * u.deg, dec=-35.0 * u.deg, frame="icrs", equinox="J2000" ) if radius is None: radius = npixel * cellsize / numpy.sqrt(2.0) sc = create_low_test_skycomponents_from_gleam( flux_limit=flux_limit, polarisation_frame=polarisation_frame, frequency=frequency, phasecentre=phasecentre, kind=kind, radius=radius, ) sc = filter_skycomponents_by_flux(sc, flux_min=flux_min, flux_max=flux_max) if polarisation_frame is None: polarisation_frame = PolarisationFrame("stokesI") if channel_bandwidth is None: nchan = len(frequency) if nchan > 1: channel_bandwidth = numpy.array(nchan * [frequency[1] - frequency[0]]) else: channel_bandwidth = numpy.array([1e6]) npol = polarisation_frame.npol nchan = len(frequency) shape = [nchan, npol, npixel, npixel] w = WCS(naxis=4) # The negation in the longitude is needed by definition of RA, DEC w.wcs.cdelt = [ -cellsize * 180.0 / numpy.pi, cellsize * 180.0 / numpy.pi, 1.0, channel_bandwidth[0], ] w.wcs.crpix = [npixel // 2 + 1, npixel // 2 + 1, 1.0, 1.0] w.wcs.ctype = ["RA---SIN", "DEC--SIN", "STOKES", "FREQ"] w.wcs.crval = [phasecentre.ra.deg, phasecentre.dec.deg, 1.0, frequency[0]] w.naxis = 4 w.wcs.radesys = "ICRS" w.wcs.equinox = 2000.0 model = Image.constructor( data=numpy.zeros(shape), polarisation_frame=polarisation_frame, wcs=w ) model = insert_skycomponent(model, sc, insert_method=insert_method) if applybeam: beam = create_pb(model, telescope="LOW", use_local=False) model["pixels"].data[...] *= beam["pixels"].data[...] return model
[docs] def create_low_test_skymodel_from_gleam( npixel=512, polarisation_frame=PolarisationFrame("stokesI"), cellsize=0.000015, frequency=numpy.array([1e8]), channel_bandwidth=numpy.array([1e6]), phasecentre=None, kind="cubic", applybeam=True, flux_limit=0.1, flux_max=numpy.inf, flux_threshold=1.0, insert_method="Nearest", telescope="LOW", radius=None, ) -> SkyModel: """Create LOW test skymodel from the GLEAM survey Stokes I is estimated from a cubic spline fit to the measured fluxes. The polarised flux is always zero. See http://www.mwatelescope.org/science/gleam-survey The catalog is available from Vizier. VIII/100 GaLactic and Extragalactic All-sky MWA survey (Hurley-Walker+, 2016) GaLactic and Extragalactic All-sky Murchison Wide Field Array (GLEAM) survey. I: A low-frequency extragalactic catalogue. Hurley-Walker N., et al., Mon. Not. R. Astron. Soc., 464, 1146-1167 (2017), 2017MNRAS.464.1146H :param telescope: :param npixel: Number of pixels :param polarisation_frame: Polarisation frame (default PolarisationFrame("stokesI")) :param cellsize: cellsize in radians :param frequency: :param channel_bandwidth: Channel width (Hz) :param phasecentre: phasecentre (SkyCoord) :param kind: Kind of interpolation (see scipy.interpolate.interp1d) Default: cubic :param applybeam: Apply the primary beam? :param flux_limit: Weakest component :param flux_max: Maximum strength component to be included in components :param flux_threshold: Split between components (brighter) and image (weaker) :param insert_method: Nearest | PSWF | Lanczos :param radius: radius of search area in radians (Default is half-width of the axis) :return: SkyModel """ check_data_directory() if phasecentre is None: phasecentre = SkyCoord( ra=+15.0 * u.deg, dec=-35.0 * u.deg, frame="icrs", equinox="J2000" ) if radius is None: radius = npixel * cellsize sc = create_low_test_skycomponents_from_gleam( flux_limit=flux_limit, polarisation_frame=polarisation_frame, frequency=frequency, phasecentre=phasecentre, kind=kind, radius=radius, ) sc = filter_skycomponents_by_flux(sc, flux_max=flux_max) if polarisation_frame is None: polarisation_frame = PolarisationFrame("stokesI") npol = polarisation_frame.npol nchan = len(frequency) shape = [nchan, npol, npixel, npixel] w = WCS(naxis=4) # The negation in the longitude is needed by definition of RA, DEC w.wcs.cdelt = [ -cellsize * 180.0 / numpy.pi, cellsize * 180.0 / numpy.pi, 1.0, channel_bandwidth[0], ] w.wcs.crpix = [npixel // 2 + 1, npixel // 2 + 1, 1.0, 1.0] w.wcs.ctype = ["RA---SIN", "DEC--SIN", "STOKES", "FREQ"] w.wcs.crval = [phasecentre.ra.deg, phasecentre.dec.deg, 1.0, frequency[0]] w.naxis = 4 w.wcs.radesys = "ICRS" w.wcs.equinox = 2000.0 model = Image.constructor( data=numpy.zeros(shape), polarisation_frame=polarisation_frame, wcs=w ) if applybeam: beam = create_pb(model, telescope=telescope, use_local=False) sc = apply_beam_to_skycomponent(sc, beam, phasecentre=phasecentre) weaksc = filter_skycomponents_by_flux(sc, flux_max=flux_threshold) brightsc = filter_skycomponents_by_flux(sc, flux_min=flux_threshold) model = insert_skycomponent(model, weaksc, insert_method=insert_method) log.info( "create_low_test_skymodel_from_gleam: %d bright sources above " "flux threshold %.3f, %d weak sources below" % (len(brightsc), flux_threshold, len(weaksc)) ) return SkyModel(components=brightsc, image=model, mask=None, gaintable=None)
[docs] def create_low_test_skycomponents_from_gleam( flux_limit=0.1, polarisation_frame=PolarisationFrame("stokesI"), frequency=numpy.array([1e8]), kind="cubic", phasecentre=None, radius=1.0, ) -> List[SkyComponent]: """Create sky components from the GLEAM survey Stokes I is estimated from a cubic spline fit to the measured fluxes. The polarised flux is always zero. See http://www.mwatelescope.org/science/gleam-survey The catalog is available from Vizier. VIII/100 GaLactic and Extragalactic All-sky MWA survey (Hurley-Walker+, 2016) GaLactic and Extragalactic All-sky Murchison Wide Field Array (GLEAM) survey. I: A low-frequency extragalactic catalogue. Hurley-Walker N., et al., Mon. Not. R. Astron. Soc., 464, 1146-1167 (2017), 2017MNRAS.464.1146H :param flux_limit: Only write components brighter than this (Jy) :param polarisation_frame: Polarisation frame (default PolarisationFrame("stokesI")) :param frequency: Frequencies at which the flux will be estimated :param kind: Kind of interpolation (see scipy.interpolate.interp1d) Default: linear :param phasecentre: Desired phase centre (SkyCoord) default None implies all sources :param radius: Radius of sources selected around phasecentre (default 1.0 rad) :return: List of SkyComponents """ check_data_directory() fitsfile = rascil_data_path("models/GLEAM_EGC.fits") rad2deg = 180.0 / numpy.pi decmin = phasecentre.dec.to("deg").value - rad2deg * radius / 2.0 decmax = phasecentre.dec.to("deg").value + rad2deg * radius / 2.0 hdulist = fits.open(fitsfile, lazy_load_hdus=False) recs = hdulist[1].data[0].array fluxes = recs["peak_flux_wide"] mask = fluxes > flux_limit filtered_recs = recs[mask] decs = filtered_recs["DEJ2000"] mask = decs > decmin filtered_recs = filtered_recs[mask] decs = filtered_recs["DEJ2000"] mask = decs < decmax filtered_recs = filtered_recs[mask] ras = filtered_recs["RAJ2000"] decs = filtered_recs["DEJ2000"] names = filtered_recs["Name"] if polarisation_frame is None: polarisation_frame = PolarisationFrame("stokesI") npol = polarisation_frame.npol nchan = len(frequency) # For every source, we read all measured fluxes and interpolate to the # required frequencies gleam_freqs = numpy.array( [ 76, 84, 92, 99, 107, 115, 122, 130, 143, 151, 158, 166, 174, 181, 189, 197, 204, 212, 220, 227, ] ) gleam_flux_freq = numpy.zeros([len(names), len(gleam_freqs)]) for i, f in enumerate(gleam_freqs): gleam_flux_freq[:, i] = filtered_recs["int_flux_%03d" % (f)][:] skycomps = [] directions = SkyCoord(ra=ras * u.deg, dec=decs * u.deg) if phasecentre is not None: separations = directions.separation(phasecentre).to("rad").value else: separations = numpy.zeros(len(names)) for isource, name in enumerate(names): direction = directions[isource] if separations[isource] < radius: fint = interpolate.interp1d( gleam_freqs * 1.0e6, gleam_flux_freq[isource, :], kind=kind ) flux = numpy.zeros([nchan, npol]) flux[:, 0] = fint(frequency) if not numpy.isnan(flux).any(): skycomps.append( SkyComponent( direction=direction, flux=flux, frequency=frequency, name=name, shape="Point", polarisation_frame=polarisation_frame, ) ) log.info( "create_low_test_skycomponents_from_gleam: %d sources above flux limit %.3f" % (len(skycomps), flux_limit) ) hdulist.close() return skycomps
[docs] def replicate_image( im: Image, polarisation_frame=PolarisationFrame("stokesI"), frequency=numpy.array([1e8]), ) -> Image: """Make a new canonical shape Image, extended along third and fourth axes by replication. The order of the data is [chan, pol, dec, ra] :param frequency: :param im: :param polarisation_frame: Polarisation_frame :return: Image """ newwcs = WCS(naxis=4) newwcs.wcs.crpix = [ im.image_acc.wcs.wcs.crpix[0] + 1.0, im.image_acc.wcs.wcs.crpix[1] + 1.0, 1.0, 1.0, ] newwcs.wcs.cdelt = [ im.image_acc.wcs.wcs.cdelt[0], im.image_acc.wcs.wcs.cdelt[1], 1.0, 1.0, ] newwcs.wcs.crval = [ im.image_acc.wcs.wcs.crval[0], im.image_acc.wcs.wcs.crval[1], 1.0, frequency[0], ] newwcs.wcs.ctype = [ im.image_acc.wcs.wcs.ctype[0], im.image_acc.wcs.wcs.ctype[1], "STOKES", "FREQ", ] phasecentre = SkyCoord(newwcs.wcs.crval[0] * u.deg, newwcs.wcs.crval[1] * u.deg) nchan = len(frequency) npol = polarisation_frame.npol fshape = [nchan, npol, im["pixels"].data.shape[-2], im["pixels"].data.shape[-1]] data = numpy.zeros(fshape) log.info( "replicate_image: replicating shape %s to %s" % (im["pixels"].data.shape, data.shape) ) if len(im["pixels"].data.shape) == 2: data[...] = im["pixels"].data[numpy.newaxis, numpy.newaxis, ...] else: for pol in range(npol): data[:, pol] = im["pixels"][:, 0] return Image.constructor( data=data, polarisation_frame=polarisation_frame, wcs=newwcs )
[docs] def simulate_gaintable( gt: GainTable, phase_error=0.1, amplitude_error=0.0, smooth_channels=1, leakage=0.0, seed=180550721, **kwargs ) -> GainTable: """Simulate a gain table :type gt: GainTable :param phase_error: std of normal distribution, zero mean :param amplitude_error: std of log normal distribution :param leakage: std of cross hand leakage :param smooth_channels: Use bspline over smooth_channels :param kwargs: :return: Gaintable """ from numpy.random import default_rng if seed is None: rng = default_rng(1805550721) else: rng = default_rng(seed) from scipy.ndimage import convolve1d def moving_average(a, n=3, axis=0): return convolve1d(a, numpy.ones((n,)) / n, mode="wrap", axis=axis) log.debug( "simulate_gaintable: Simulating amplitude error = %.4f, phase error = %.4f" % (amplitude_error, phase_error) ) amps = 1.0 phases = 1.0 ntimes, nant, nchan, nrec, _ = gt["gain"].data.shape if phase_error > 0.0: phases = rng.normal(0, phase_error, gt["gain"].data.shape) if smooth_channels > 1: phases = moving_average(phases, smooth_channels, axis=2) if amplitude_error > 0.0: amps = rng.lognormal(0.0, amplitude_error, gt["gain"].data.shape) if smooth_channels > 1: amps = moving_average(amps, smooth_channels, axis=2) amps = amps / numpy.average(amps) gt["gain"].data = amps * numpy.exp(0 + 1j * phases) nrec = gt["gain"].data.shape[-1] if nrec > 1: if leakage > 0.0: leak = rng.normal( 0, leakage, gt["gain"].data[..., 0, 0].shape ) + 1j * rng.normal(0, leakage, gt["gain"].data[..., 0, 0].shape) gt["gain"].data[..., 0, 1] = gt["gain"].data[..., 0, 0] * leak leak = rng.normal( 0, leakage, gt["gain"].data[..., 1, 1].shape ) + 1j * rng.normal(0, leakage, gt["gain"].data[..., 1, 1].shape) gt["gain"].data[..., 1, 0] = gt["gain"].data[..., 1, 1].data * leak else: gt["gain"].data[..., 0, 1] = 0.0 gt["gain"].data[..., 1, 0] = 0.0 return gt
[docs] def ingest_unittest_visibility( config, frequency, channel_bandwidth, times, vis_pol, phasecentre, zerow=False, times_are_ha=True, ): """Make a standard visibility simulation :param config: Configuration :param frequency: Frequency (array in Hz) :param channel_bandwidth: Channel bandwidth (array in Hz) :param times: Times (radians, utc or hour angle depending on times_are_ha :param vis_pol: Polarisation frame :param phasecentre: Phase centre (SkyCoord) :param zerow: Zero the w terms? :param times_are_ha: Are the times hourangles or utc (in radians) :return: """ vt = create_visibility( config, times, frequency, channel_bandwidth=channel_bandwidth, phasecentre=phasecentre, weight=1.0, polarisation_frame=vis_pol, zerow=zerow, times_are_ha=times_are_ha, ) vt["vis"].data[...] = 0.0 return vt
[docs] def create_unittest_components( model, flux, applypb=False, telescope="LOW", npixel=None, scale=1.0, single=False, symmetric=False, angular_scale=1.0, offset=[0.0, 0.0], ): # Fill the visibility with exactly computed point sources. if npixel is None: _, _, _, npixel = model["pixels"].data.shape spacing_pixels = int(scale * npixel) // 4 log.info("Spacing in pixels = %s" % spacing_pixels) if not symmetric: centers = [(0.2 * angular_scale, 1.1 * angular_scale)] else: centers = list() if not single: centers.append([0.0, 0.0]) for x in numpy.linspace(-1.2 * angular_scale, 1.2 * angular_scale, 7): if abs(x) > 1e-15: centers.append([x, x]) centers.append([x, -x]) model_pol = model.image_acc.polarisation_frame # Make the list of components rpix = model.image_acc.wcs.wcs.crpix - 1.0 components = [] for center in centers: ix, iy = center # The phase center in 0-relative coordinates is n // 2 so we centre the grid of # components on ny // 2, nx // 2. The wcs must be defined consistently. p = ( int( round( rpix[0] + ix * spacing_pixels * numpy.sign(model.image_acc.wcs.wcs.cdelt[0]) ) ) + offset[0], int( round( rpix[1] + iy * spacing_pixels * numpy.sign(model.image_acc.wcs.wcs.cdelt[1]) ) ) + offset[1], ) sc = pixel_to_skycoord(p[0], p[1], model.image_acc.wcs, origin=0) log.info("Component at (%f, %f) [0-rel] %s" % (p[0], p[1], str(sc))) # Channel images comp = SkyComponent( direction=sc, flux=flux, frequency=model.frequency, polarisation_frame=model_pol, ) components.append(comp) if applypb: beam = create_pb(model, telescope=telescope, use_local=False) components = apply_beam_to_skycomponent(components, beam) return components
[docs] def create_unittest_model(vis, model_pol, npixel=None, cellsize=None, nchan=1): advice = advise_wide_field( vis, guard_band_image=2.0, delA=0.02, facets=1, oversampling_synthesised_beam=4.0, ) if cellsize is None: cellsize = advice["cellsize"] if npixel is None: npixel = advice["npixels2"] model = create_image_from_visibility( vis, npixel=npixel, cellsize=cellsize, nchan=nchan, polarisation_frame=model_pol ) return model
[docs] def insert_unittest_errors( vt, seed=1805550721, calibration_context="TG", amp_errors=None, phase_errors=None ): """Simulate gain errors and apply :param vt: :param seed: Random number seed, set to big integer repeat values from run to run :param phase_errors: e.g. {'T': 1.0, 'G': 0.1, 'B': 0.01} :param amp_errors: e.g. {'T': 0.0, 'G': 0.01, 'B': 0.01} :return: """ controls = create_calibration_controls() if amp_errors is None: amp_errors = {"T": 0.0, "G": 0.01, "B": 0.01} if phase_errors is None: phase_errors = {"T": 1.0, "G": 0.1, "B": 0.01} for c in calibration_context: gaintable = create_gaintable_from_visibility( vt, timeslice=controls[c]["timeslice"], jones_type=c ) gaintable = simulate_gaintable( gaintable, phase_error=phase_errors[c], amplitude_error=amp_errors[c], timeslice=controls[c]["timeslice"], phase_only=controls[c]["phase_only"], crosspol=controls[c]["shape"] == "matrix", seed=seed, ) vt = apply_gaintable(vt, gaintable, inverse=True, use_flags=False) return vt