Source code for pyfabil.plugins.tpm.adc_power_meter_simple

from __future__ import division
from builtins import range
__author__ = 'chiello'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import numpy as np
import logging
import time


[docs] class AdcPowerMeterSimple(FirmwareBlock): """ AdcPowerMeter plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('adc_power_meter') @maxinstances(2) def __init__(self, board, fsample=800e6, samples_per_frame=864, integration_time=0.01, **kwargs): """ AdcPowerMeter initialiser :param board: Pointer to board instance """ super(AdcPowerMeterSimple, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("AdcPowerMeter: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("AdcPowerMeter: Invalid device %s" % self._device) self._fsample = fsample self._sample_period = 1.0 / fsample self._nof_signals = 16 self._frame_length = samples_per_frame self._integration_time = integration_time self._integration_length = self.get_integration_length(integration_time) self.initialise() #######################################################################################
[docs] def configure(self, integration_time=None): """ Initialise AdcPowerMeter core """ logging.debug("Initialising ADC power meter %s: " % (self._device,)) self._nof_signals = self.board['%s.adc_power_meter.feature.nof_signals' % self._device] if integration_time is None: integration_time = self._integration_time else: self._integration_time = integration_time self._integration_length = self.get_integration_length(integration_time) self.board['%s.adc_power_meter.control.rst' % self._device] = 0x1 self.board['%s.adc_power_meter.control.req' % self._device] = 0x0 self.board['%s.adc_power_meter.integration_length' % self._device] = self._integration_length - 1 self.board['%s.adc_power_meter.control.rst' % self._device] = 0x0
[docs] def get_integration_length(self, integration_time): return int(integration_time // (self._sample_period * self._frame_length))
[docs] def get_RmsAmplitude(self): self.board['%s.adc_power_meter.control.req' % self._device] = 0x1 timeout = 2 * self._integration_time timeout_cnt = timeout / 0.001 while timeout_cnt > 0: if self.board['%s.adc_power_meter.control.req' % self._device] == 0: data = self.board.read_register("%s.adc_power_meter.power_0" % self._device, self._nof_signals*2) accu64 = np.zeros(self._nof_signals, dtype=np.uint64) for n in range(self._nof_signals): accu64[n] = ((data[2*n+1] << 32) + data[2*n]) // (self._integration_length * self._frame_length) accu64 = np.sqrt(accu64) return accu64.tolist() time.sleep(0.001) timeout_cnt -= 1 logging.info("ADC power meter simple %s is not receiving data" % (self._device,)) return []
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise AdcPowerMeter """ self.configure() logging.debug("AdcPowerMeter has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.debug("AdcPowerMeter : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.debug("AdcPowerMeter : Cleaning up") return True