Source code for pyfabil.plugins.tpm.adc_power_meter

from __future__ import division
from builtins import range
from math import fabs, cos, sin, ceil, log, sqrt
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import bit_list_to_mask

import logging
import time

__author__ = 'gcomoretto'


[docs] class AdcPowerMeter(FirmwareBlock): """ Total power meter and RFI detector The total power meter measures the broadband total power level for the input signals. Total power integration time is programmable Integration time is expressed in seconds, total power is returned in ADC counts squared, RMS in ADC counts Specific methods: initialise(): Initialises everything. Device left with: 10 ms integration time, TP running, no RFI removing set_intTime(intTime) Sets integration time. Does not modify running status Integration time in seconds (min. 2 us, max 0.05 s) start_IntTime(intTime) Stops integration, changes integration time and restarts it start_TP(), stop_TP() Starts and stops TP. Synchronous start not yet supported enable_rfi_flagging(antennas), disable_rfi_flagging() enables/disables RFI flagging. antennas is a list of antennas where rfi flagging is enabled/disabled set_broadband_rfi_factor(rfi_factor) sets the rfi factor to control how much RFI flagging occurs. wait_TpReady() If TP is running, waits for data to become ready. Else returns False read_TpData() Reads the TP data if ready, else returns empty list wait_TpData() Waits for data to become ready and returns them. Returns empty list if TP not running get_RmsAmplitude() Same of wait_TpData(), but returns RMS amplitude instead of power read_RfiData(): Returns number of RFI affected frames """ @compatibleboards(BoardMake.TpmBoard) @friendlyname('adc_power_meter') @maxinstances(2) def __init__(self, board, fsample=800e6, samples_per_frame=864, **kwargs): """ AdcPowerMeter initialiser :param board: Pointer to board instance """ super(AdcPowerMeter, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("AdcPowerMeter: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError(f"TpmTotalPower: Invalid device {self._device}") # sampling frequency self._fsample = fsample # hertz self._frameLen = samples_per_frame # samples per tick self._minDiscard = 2 # Min. number of discarded bits self.intTime = 0.01 # seconds self.discardBits = 0 # bits discarded self.initialise() self.antenna_to_mask_bit_dict = dict() for antenna in range(8): self.antenna_to_mask_bit_dict[antenna] = [2*antenna, 2*antenna+1] # This is what i think the value of 1 represents for the rfi_factor # TODO This needs to be confirmed in simulation self.rfi_factor_scale = 4096 #RFI factor is only 16 bits self.min_rfi_factor = 0 self.max_rfi_factor = (2**16 - 1) /self.rfi_factor_scale #########################################################################
[docs] def initialise(self): """ Initialise AdcPowerMeter""" logging.debug(f"Initialising ADC power meter {self._device}: ") # Reset core self.board[f"{self._device}.adc_power_meter.control.reset "] = 0x1 self.board[f'{self._device}.adc_power_meter.control'] = 0x0 # TODO uncomment when implemented in the firmware # self.disable_rfi_flagging() self.stop_TP() self.set_intTime(self.intTime) self.start_TP() return True
############################################################################ # Set integration time # Compute number of frames in given integration time and associated # number of bits to discard ############################################################################
[docs] def set_intTime(self, integrationTime): """ Set integration time and discard bits. Integration time in seconds """ integration_ticks = round(integrationTime * self._fsample / self._frameLen) if integration_ticks < 1: integration_ticks = 1 if integration_ticks > 2**16 - 1: integration_ticks = 2**16 - 1 self.intTime = integration_ticks * self._frameLen / self._fsample nBits = int(ceil(log(integrationTime * self._fsample) / log(4.0) - 15.5 + 8)) - self._minDiscard if nBits > 7: nBits = 7 if nBits < 0: nBits = 0 self.discardBits = nBits self.board[f"{self._device}.adc_power_meter.integration_time"] = int(integration_ticks - 1) self.board[f"{self._device}.adc_power_meter.control.discard_bits"] = self.discardBits return True
[docs] def start_intTime(self, intTime): self.stop_TP() self.set_intTime(intTime) self.start_TP() return True
############################################################################ # Start/stop total power detector ############################################################################
[docs] def start_TP(self, sync_time=None): """ Starts total power detector at predefined frame time """ self.board[f"{self._device}.adc_power_meter.control.reset "] = 0 self.board[f"{self._device}.adc_power_meter.control.tp_run"] = 1 return True
#
[docs] def stop_TP(self): """ Stops total power detector""" self.board[f"{self._device}.adc_power_meter.control.tp_run"] = 0 return True
############################################################################ # Enable and disable RFI flagging ############################################################################
[docs] def disable_rfi_flagging(self, antennas=range(8)): """ disables the broadband RFI flagging :param antennas: list antennas where rfi will be enabled :type antennas: list(int) """ mask_bit_list = [] for antenna in antennas: mask_bit_list.extend(self.antenna_to_mask_bit_dict[antenna]) # TODO uncomment when implemented in the firmware # mask = bit_list_to_mask(mask_bit_list) # get_current_mask = self.board[f"{self._device}.adc_power_meter.control.rfi_enable"] # self.board[f"{self._device}.adc_power_meter.control.rfi_enable"] = ~mask & get_current_mask # return True raise NotImplementedError("disabling rfi flagging is not yet implemented")
[docs] def enable_rfi_flagging(self, antennas=range(8)): """ enables the broadband RFI flagging :param antennas: list antennas where rfi will be disabled :type antennas: list(int) """ mask_bit_list = [] for antenna in antennas: mask_bit_list.extend(self.antenna_to_mask_bit_dict[antenna]) # TODO uncomment when implemented in the firmware # mask = bit_list_to_mask(mask_bit_list) # get_current_mask = self.board[f"{self._device}.adc_power_meter.control.rfi_enable"] # self.board[f"{self._device}.adc_power_meter.control.rfi_enable"] = mask | get_current_mask # return True raise NotImplementedError("enabling rfi flagging is not yet implemented")
[docs] def set_broadband_rfi_factor(self, rfi_factor=1.0): """ Sets the rfi factor for broadband rfi detection, the higher the rfi factor the less rfi is detected/flagged This is because data is flagged if the short term power is greater than the long term power * rfi factor * 32/27 :param rfi_factor: the sensitivity value for the rfi detection :type rfi_factor: double """ rfi_factor_scaled = int(rfi_factor*self.rfi_factor_scale) if rfi_factor > self.max_rfi_factor: logging.warning(f"rfi_factor of {rfi_factor} is greater than max allowed value, setting to {self.max_rfi_factor}") rfi_factor_scaled = (self.max_rfi_factor * self.rfi_factor_scale) elif rfi_factor < self.min_rfi_factor: logging.warning(f"rfi_factor of {rfi_factor} is less than min allowed value, setting to {self.min_rfi_factor}") rfi_factor_scaled = (self.min_rfi_factor * self.rfi_factor_scale) # TODO uncomment when implemented in firmware # self.board[f"{self._device}.adc_power_meter.rfi_factor"] = rfi_factor_scaled raise NotImplementedError("setting rfi factor is not yet implemented")
[docs] def read_data(self): norm = (4.0 ** self.discardBits) * (2 ** self._minDiscard) / (self._fsample * self.intTime) tp = self.board.read_register(f"{self._device}.adc_power_meter.tp_counts_0", 16) for i in range(16): tp[i] = tp[i] * norm return tp
############################################################################ # Read total power data if available # Returns empty array if no power is available ############################################################################
[docs] def read_TpData(self): """ Read total power data if available Returns empty array if no power is available Output is in units of ADC units squared """ ready = self.board[f"{self._device}.adc_power_meter.status.ready"] if ready == 1: tp = self.read_data() self.board[f"{self._device}.adc_power_meter.control.clear_ready"] = 1 self.board[f"{self._device}.adc_power_meter.control.clear_ready"] = 0 else: logging.warning(f"AdcPowerMeter : {self._device} read_TpData not ready. ") tp = [] return tp
# Wait for total power data to be ready
[docs] def wait_TpReady(self): # Check that the TP function is enabled if self.board[f"{self._device}.adc_power_meter.control.reset "] == 1: return False if self.board[f"{self._device}.adc_power_meter.control.tp_run"] == 0: return False # Wait at most for one integration time (plus 1.5 ms) # to avoid locking if something does not work #timeout = round(self.intTime / 0.001 + 1.5) timeout = 1.0 / 0.001 while self.board[f"{self._device}.adc_power_meter.status.ready"] == 0: time.sleep(0.001) if timeout < 0: return False timeout = timeout - 1 return True
# Get total power data waiting for it to be available
[docs] def wait_TpData(self): """ Wait TP data to be available and return it Return False if TP detector is not programmed Output is in units of ADC units squared """ if self.wait_TpReady(): return self.read_TpData() else: logging.warning(f"AdcPowerMeter : {self._device} wait_TpData not ready. ") return []
# Wait for data to be available and returns RMS amplitude
[docs] def get_RmsAmplitude(self, sync=True): if sync: read_method = self.read_TpData else: read_method = self.read_data if sync: if not self.wait_TpReady(): logging.warning(f"AdcPowerMeter : {self._device} get_RmsAmplitude failed. ") return [] tp = read_method() for i in range(len(tp)): tp[i] = sqrt(tp[i]) return tp
############################################################################ # Read RFI data. Does not check availability ############################################################################
[docs] def read_RfiData(self): tp = self.board.read_register(f"{self._device}.adc_power_meter.rfi_0", 16) return tp
############################################################################
[docs] def status_check(self): logging.info("AdcPowerMeter : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("AdcPowerMeter : Cleaning up") return True