Source code for rfi.rfi_interface.rfi_data_cube

# pylint: disable=too-many-arguments,unexpected-keyword-arg
# pylint: disable=too-many-instance-attributes

"""Dataclasses for RFI"""
import logging
from dataclasses import dataclass, field

import h5py
import numpy as np

LOG = logging.getLogger("rfi-logger")


@dataclass
class RFISource:
    """Data for a single RFI Source"""

    id: str
    source_type: str
    central_frequency: float = field(metadata={"unit": "Hz"})
    bandwidth: float = field(metadata={"unit": "Hz"})
    polarisation: str

    # for aircraft and satellite this would change with time
    height: float
    latitude: float = field(metadata={"unit": "degree"})
    longitude: float = field(metadata={"unit": "degree"})

    def _get_unit(self, field_name):
        # pylint: disable=no-member
        return self.__dataclass_fields__[field_name].metadata["unit"]

    def __repr__(self):
        return (
            f"{self.__class__.__name__}("
            f"id={self.id}, "
            f"source_type={self.source_type}, "
            # pylint: disable=line-too-long
            f"central_frequency={self.central_frequency}[{self._get_unit('central_frequency')}], "  # noqa: E501
            f"bandwidth={self.bandwidth}[{self._get_unit('bandwidth')}], "
            f"polarisation={self.polarisation}, "
            f"height={self.height}, "
            f"latitude={self.latitude}[{self._get_unit('latitude')}], "
            f"longitude={self.longitude}[{self._get_unit('longitude')}]"
            f")"
        )


@dataclass
class RFISignal:
    """Data for RFI Signal"""

    source_id: str
    time: np.ndarray
    station_id: np.ndarray  # 1D array of strings of all stations
    frequency: np.ndarray = field(
        metadata={"unit": "Hz"}
    )  # 1D array of frequency samples

    # 3D array: list of data per time per station
    # [ [ [...], [...], [...] ], [ [...], [...], [...] ] ]
    apparent_power: np.ndarray = field(metadata={"unit": "W/Hz???"})

    # coordinates at SKA stations; 2D-arrays, coordinates per time
    azimuth: np.ndarray = field(metadata={"unit": "degree"})
    elevation: np.ndarray = field(metadata={"unit": "degree"})
    distance: np.ndarray = field(metadata={"unit": "m"})

    # reshape az, el, dist, into a single array:
    # [[[az, el, dist], [az, el, dist]], [[az, el, dist], [az, el, dist]]]
    # arr[0][0] --> first time, first station values, arr[0][1]
    #   --> first time, second station values, etc.
    apparent_coordinates: np.ndarray


@dataclass
class DataCubePerSource:
    """DataCube for a single RFI source"""

    rfi_source: RFISource = field(default_factory=RFISource)
    rfi_signal: RFISignal = field(default_factory=[RFISignal])

    def validate_input(self):
        """
        Make sure input data belong together.
        Both rfi_source and rfi_signal have to refer to the same soure_id

        :return valid: bool, True if valid, False if not
                message: "" if valid, corresponding error message if not valid
        """
        valid = True
        message = ""

        if self.rfi_signal.source_id != self.rfi_source.id:
            valid = False
            message = (
                "Mismatch between input data source ids. "
                "All of the input RFISignal objects "
                "have to refer to the same source "
                "as the provided input source_id!"
            )

        return valid, message

    def __post_init__(self):
        valid, message = self.validate_input()
        if not valid:
            raise ValueError(message)


[docs]class DataCube: """ Class to transform RFI data and save the result in an HDF5 file. :param times: list of time samples the simulation ran for :param freqs: list of frequency channels the simulation ran for :param station_ids: list of station ids that were used in the simulation :param rmax: maximum distance of SKA station from its array centre :param station_skip: ... rmax and station_skip are needed for transferring information from the propagation script to the visibility simulation script """ def __init__( self, times: list, freqs: list, station_ids: list, rmax=None, station_skip=None, ): self.time_samples = np.array(times).astype("S") self.frequency_channels = np.array(freqs).astype("f") self.station_id = np.array(station_ids).astype("S") self.source_id = np.empty(1, dtype=str).astype("S") self.source_type = np.empty(1, dtype=str).astype("S") self.coordinates = np.empty_like( [None], shape=(1, len(times), len(station_ids), 3), dtype=float ) self.signal = np.empty_like( [None], shape=(1, len(times), len(station_ids), len(freqs)), dtype=complex, ) self._number_of_sources = 0 self._rmax = ( rmax # maximum distance of each station from the array centre ) self._station_skip = station_skip
[docs] def validate_input_data(self, input_data): """ Validate input data. Data are valid if: - source_id exists - time samples of the input match the ones that the DataCube was initialized with - frequency channels of the input match the ones that the DataCube was initialized with - station ids of the input match the ones that the DataCube was initialized with :param input_data: input DataCubePerSource object containing RFI data for a single source """ if not input_data.rfi_source.id: raise ValueError("New data cube cannot have an empty source_id.") if (input_data.rfi_signal.time.astype("S") != self.time_samples).all(): raise ValueError( "The time_samples that initialized the DataCube object are " "different from the appended DataCubePerSource().time_samples." ) if ( input_data.rfi_signal.frequency.astype("f") != self.frequency_channels ).all(): raise ValueError( "The frequency_channels that initialized " "the DataCube object are different from the " "appended DataCubePerSource() frequencies." ) if ( input_data.rfi_signal.station_id.astype("S") != self.station_id ).all(): raise ValueError( "The station ids that initialized the DataCube object are " "different from the appended DataCubePerSource() station ids." )
[docs] def append_data(self, new_rfi_data: DataCubePerSource): """ Append data from a DataCubePerSource object to the existing arrays. :param new_rfi_data: input DataCubePerSource object containing RFI data for a single source """ self.validate_input_data(new_rfi_data) self.source_id = np.append( self.source_id, np.array([new_rfi_data.rfi_source.id]), axis=0 ).astype("S") self.source_type = np.append( self.source_type, np.array([new_rfi_data.rfi_source.source_type]), axis=0, ).astype("S") try: self.signal[self._number_of_sources] = ( new_rfi_data.rfi_signal.apparent_power.astype("F") ) self.coordinates[self._number_of_sources] = ( new_rfi_data.rfi_signal.apparent_coordinates.astype("f") ) except IndexError: # when a new source is added, the index of the # array has to be increased by one first self.coordinates = np.append( self.coordinates, self.coordinates, axis=0 ) self.signal = np.append(self.signal, self.signal, axis=0) self.signal[self._number_of_sources] = ( new_rfi_data.rfi_signal.apparent_power.astype("F") ) self.coordinates[self._number_of_sources] = ( new_rfi_data.rfi_signal.apparent_coordinates.astype("f") ) self._number_of_sources = self._number_of_sources + 1 if self.source_id[0] == b"": self.source_id = np.delete(self.source_id, 0, axis=0) self.source_type = np.delete(self.source_type, 0, axis=0)
def _convert_data_to_hdf5(self, hdf5_file): for k, _ in self.__dict__.items(): if not k.startswith("_"): hdf5_file[k] = self.__dict__[k] # add metadata hdf5_file["coordinates"].attrs["dimensions"] = [ "Nsources x Ntimes x Nstations x 3(az, el, dist)" ] hdf5_file["signal"].attrs["dimensions"] = [ "Nsources x Ntimes x Nstations x Nfreqs" ] hdf5_file["station_id"].attrs[ "max_station_distance_from_centre" ] = self._rmax hdf5_file["station_id"].attrs["station_skip"] = self._station_skip
[docs] def export_to_hdf5(self, filename): """ Save transformed data to HDF5 :param filename: name of output file """ with h5py.File(filename, "w") as hdf5_file: self._convert_data_to_hdf5(hdf5_file) hdf5_file.flush() LOG.info("HDF5 file saved with name: %s", filename)
def _get_input(): return input("Continue? (yes, no) ")
[docs]class BeamGainDataCube: """ Data Cube to contain Beam Gain information calculated by OSKAR. :param ra: right ascension of observed source :param dec: declination of observed source :param obs_time: time of observation :param freq_chans: array of frequency channels :param rfi_ids: array of RFI source IDs :param nstations: number of SKA stations """ def __init__( self, ra: float, dec: float, obs_time: str, freq_chans: np.ndarray, rfi_ids: np.ndarray, nstations: int, ): self.right_ascension = ra self.declination = dec self.observation_time = obs_time self.freq_channels = freq_chans self.rfi_source_ids = rfi_ids.astype("S") self.num_stations = nstations self._beam_gain = None @property def beam_gain(self): """Beam gain value""" return self._beam_gain @beam_gain.setter def beam_gain(self, value): """Set beam_gain value""" if not isinstance(value, np.ndarray): raise TypeError( "Provide beam gain values as numpy arrays with dimensions: " "Nrfi_sources x Nska_stations x Nfreq_channels x 4 (Nstokes)" ) if self._beam_gain is not None: LOG.warning( "Overwriting existing data in %s.beam_gain.", self.__class__.__name__, ) cont = _get_input() if cont.lower() != "yes": LOG.info("Aborting") return value_dimensions = value.shape if not value_dimensions == ( len(self.rfi_source_ids), self.num_stations, len(self.freq_channels), 4, ): raise ValueError( "Input data has the wrong dimensions. It needs to match: \n" # pylint: disable=line-too-long f"{len(self.rfi_source_ids), len(self.freq_channels), self.num_stations, 4} \n" # noqa: E501 f"[Nrfi_sources x Nska_stations x Nfreq_channels x 4 (Nstokes)]" # noqa: E501 ) self._beam_gain = value def _convert_data_to_hdf5(self, hdf5_file): for k, _ in self.__dict__.items(): hdf5_file[k.lstrip("_")] = self.__dict__[k] hdf5_file["beam_gain"].attrs["dimensions"] = [ "Nrfi_sources x Nska_stations x Nfreq_channels " "x 4 (stokesI, stokesQ, stokesU, stokesV)" ]
[docs] def export_to_hdf5(self, filename): """ Save transformed data to HDF5 :param filename: name of output file """ with h5py.File(filename, "w") as hdf5_file: self._convert_data_to_hdf5(hdf5_file) hdf5_file.flush() LOG.info("HDF5 file saved with name: %s", filename)