Source code for ska_low_sps_tpm_api.plugins.preadu

import time
from copy import copy

from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock

__author__ = "Alessio Magro"


[docs] class TpmPreAdu(FirmwareBlock): """TpmPreAdu plugin"""
[docs] @compatibleboards(BoardMake.TpmBoard) @friendlyname("tpm_preadu") @maxinstances(2) def __init__(self, board, logger=None, **kwargs): """ TpmPreAdu initialiser. :param board: Pointer to board instance """ super(TpmPreAdu, self).__init__(board, logger=logger) if "preadu_id" not in list(kwargs.keys()): raise PluginError("PreAdu: preadu_id required") self._preadu_id = kwargs["preadu_id"] # Define a filter per channel in order to be able to enable and disable # RF output from the channel self._nof_channels = 16 self.channel_filters = [0x0] * self._nof_channels # Define cpld register addresses self._cpld_reg_addr = [0x70000000, 0x70000004, 0x70000008, 0x7000000C] self._cpld_preadu_cntl_addr = 0x70000010 # Define cpld register constants self._nof_cpld_regs = len(self._cpld_reg_addr) self._cpld_subword_width = 8 self._nof_cpld_subwords = int(32 / self._cpld_subword_width) # Read current configuration self.read_configuration() # Try to check if preadu is present, will fail if preADU is OFF # sets self.is_present self.check_present() self.__info = None
####################################################################################### @property def info(self): if self.__info is None: ext_label = self.board.tpm_qsfp_adapter[0].get_field( f"FE_{self._preadu_id}_LABEL" ) self.__info = { "EXT_LABEL": None, "vendor": None, "pn": None, "ver": "1.0", "sn": None, } if ext_label is not None: self.__info["EXT_LABEL"] = ext_label if len(ext_label.split(";")) >= 4: self.__info["vendor"] = ext_label.split(";")[0] self.__info["pn"] = ext_label.split(";")[1] self.__info["ver"] = ext_label.split(";")[2] self.__info["sn"] = ext_label.split(";")[3] else: # if standard format missing assume default version 1.0 self.__info["sn"] = ext_label return self.__info @property def powered_on(self): return self.board["board.regfile.enable.fe"] > 0
[docs] def switch_on(self): """Switch preadu on""" # Switch on preadu self.board["board.regfile.enable.fe"] = 1 time.sleep(0.2) # Check that preadu has been switched on properly value = self.board["board.regfile.enable.fe"] if value != 1: self.logger.warning(f"Error! preADU power is not high: {value}") return self.logger.debug("preADU power on done!") # Check if preadu is present, sets self.is_present time.sleep(0.5) self.check_present()
[docs] def switch_off(self): """Switch preadu off""" # Switch off preadu self.board["board.regfile.enable.fe"] = 0 time.sleep(0.2) # Check that preadu has been switched off properly value = self.board["board.regfile.enable.fe"] if value == 0x0: self.logger.debug("preADU power off done!") else: self.logger.warning(f"Error! preADU power is not low: {value}")
[docs] def set_attenuation(self, attenuation, channels=None): """ Set attenuation level for a particular channel. Supports attenuation from 0 to 31 with a precision of 0.25. :param attenuation: Value of attenuation :param channels: Preadu channels (None if to be applied to all channels""" if channels is None: channels = list(range(self._nof_channels)) # Sanity checks if not 0 <= attenuation < 2**5: self.logger.warning( f"Cannot set PREADU attenuation {attenuation}, out of range" ) return for channel in channels: if not 0 <= channel <= self._nof_channels: self.logger.warning( f"Cannot set attenuation for channel {channel}, invalid channel" ) if attenuation % 0.25 != 0: self.logger.warning( f"PreADU attenuation precision of 0.25 supported. Value {attenuation} specified will be rounded to {attenuation // 0.25 / 4}" ) # Apply attenuation with currently selected filter attenuation_int = int(attenuation * 4) attenuation_value = 0 attenuation_value = self.set_bits( input=attenuation_value, bit_range=(1, 7), value=attenuation_int ) self.channel_filters[channel] = attenuation_value self.logger.debug( f"preADU channel {channel} attenuation has been set to {attenuation_value}" )
[docs] def get_attenuation(self): """ Returns the current attenuation value. Stored in channel_filters[7:1] with the 2 least significant bits of which forming a fractional component. For example if channel_filters[7:1] == 33 this corresponds to an attenuation of 1000.01 (base 2) or 8.25 (base 10) """ output = [None] * self._nof_channels for channel_index in range(self._nof_channels): output[channel_index] = ( self.get_bits( input=self.channel_filters[channel_index], bit_range=(1, 7) ) * 0.25 ) return output
# Log warning if unsupported methods used # Override inherited TPM 1.2 methods # -----------------------------------------------------------------------------------
[docs] def disable_channels(self, channels=None): self.logger.warning("disable_channels method is only supported on TPM 1.2") return
[docs] def select_low_passband(self): self.logger.warning("select_low_passband method is only supported on TPM 1.2") return
[docs] def select_high_passband(self): self.logger.warning("select_high_passband method is only supported on TPM 1.2") return
[docs] def enable_channels(self, channels=None): self.logger.warning("enable_channels method is only supported on TPM 1.2") return
[docs] def get_passband(self): self.logger.warning("get_passband method is only supported on TPM 1.2") return
[docs] def write_configuration(self, update_preadu_reg=True): """Write configuration to preadu WARNING if update_preadu_reg is false, the preadu shift registers and attenuation registers out of sync """ # Each channel occupies 8-bit subword within a 32-bit word (one cpld shift register) for register_num, addr in enumerate(self._cpld_reg_addr): config = 0 for subword_num in range(self._nof_cpld_subwords): flat_channel_number = ( register_num * self._nof_cpld_subwords + subword_num ) from_bit = self._cpld_subword_width * subword_num to_bit = self._cpld_subword_width * (subword_num + 1) - 1 value_to_be_written = self.bit_reverse( self.channel_filters[flat_channel_number] ) config = self.set_bits( input=config, bit_range=(from_bit, to_bit), value=value_to_be_written, ) self.board[addr] = config self.logger.debug( f"cpld shift register {register_num} has been set to {hex(config)}" ) cpld_preadu_cntl_value = 0 # Shifts the cpld shift register to the preadu shift register cpld_preadu_cntl_value = self.set_bit( input=cpld_preadu_cntl_value, bit=0, value=1 ) if update_preadu_reg: # And updates the preADU attenuation register with the values in the preadu shift register cpld_preadu_cntl_value = self.set_bit( input=cpld_preadu_cntl_value, bit=1, value=1 ) # To the correct preADU cpld_preadu_cntl_value = self.set_bit( input=cpld_preadu_cntl_value, bit=2, value=self._preadu_id ) # set preADU cntl register on the cpld to the correct value self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value # wait for the shifting of registers to have occurred while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0: time.sleep(0.01) self.logger.debug(f"preADU registers have been written to")
[docs] def read_configuration(self): """Read configuration from preadu""" cpld_preadu_cntl_value = 0 # Shifts the cpld shift register to the preadu shift register, # and the preadu shift register to the cpld shift register cpld_preadu_cntl_value = self.set_bit( input=cpld_preadu_cntl_value, bit=0, value=1 ) # of the correct preADU cpld_preadu_cntl_value = self.set_bit( input=cpld_preadu_cntl_value, bit=2, value=self._preadu_id ) # set preADU cntl register on the cpld to the correct value self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value # wait for the shifting of registers to have occurred while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0: time.sleep(0.01) # Each channel occupies 8-bit within a 32-bit word for register_num, addr in enumerate(self._cpld_reg_addr): config = self.board[addr] # Extract channel values for subword_num in range(self._nof_cpld_subwords): flat_channel_number = ( register_num * self._nof_cpld_subwords + subword_num ) from_bit = self._cpld_subword_width * subword_num to_bit = self._cpld_subword_width * (subword_num + 1) - 1 subword_value = self.get_bits( input=config, bit_range=(from_bit, to_bit) ) self.channel_filters[flat_channel_number] = self.bit_reverse( subword_value ) self.logger.debug( f"channel_filters[{flat_channel_number}] has been set to {hex(self.bit_reverse(subword_value))}" ) # set preADU cntl register on the cpld, to switch back the cpld and preadu shift registers self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value # wait for the shifting of registers to have occurred while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0: time.sleep(0.01)
[docs] def check_present(self): """ Detects if a preADU is present. Sets attribute "is_present". A gain of 21 is written to all channels and read back. If the first gain value read back is 21, then a preADU is present. The starting configuration of the software gains and preADU hardware gains are copied and restored at the end of the method. """ if not self.powered_on: self.logger.warning(f"Preadu powered OFF, unable to check if present") self.is_present = False return # Save starting configuration starting_configuration_sw = copy(self.channel_filters) self.read_configuration() starting_configuration_hw = copy(self.channel_filters) self.set_attenuation(21) self.write_configuration(update_preadu_reg=False) self.read_configuration() returned_value = self.get_attenuation()[0] preadu_detected = True if returned_value == 21 else False if not preadu_detected: self.logger.debug( f"got: {returned_value}, expected: 21, assuming preADU is not present" ) self.is_present = preadu_detected # Write back starting configuration self.channel_filters = starting_configuration_hw self.write_configuration(update_preadu_reg=False) self.channel_filters = starting_configuration_sw return
[docs] @staticmethod def bit_reverse(value): """ Reverse bits in value. :param value: Value to bit reverse """ return int("%d" % (int("{:08b}".format(value)[::-1], 2)))
[docs] @staticmethod def set_bit(input, bit, value): """ Sets the specified bit in input with a value. :param input: value before set bit. :param bit: position of bit to set. :param value: value of bit in returned value """ if value == 1: return input | (1 << bit) else: return input & ~(1 << bit)
[docs] @staticmethod def get_bit(input, bit): """ Gets the specified bit in input. :param input: value to get bit. :param bit: position of bit. """ return (input & (1 << bit)) >> bit
[docs] def set_bits(self, input, bit_range, value): """ Sets the specified bits in input with value. :param input: value before set bits. :param bit_range: position of bits to set (from,to). :param value: value of bits in returned value """ output = input for input_index, output_index in enumerate( range(bit_range[0], bit_range[1] + 1) ): output = self.set_bit( input=output, bit=output_index, value=self.get_bit(input=value, bit=input_index), ) return output
[docs] def get_bits(self, input, bit_range): """ Gets the specified bits in an input. :param input: value to get bit. :param bit_range: position of bits to get (from,to). """ output = 0 for index in range(bit_range[0], bit_range[1] + 1): output = self.set_bit( input=output, bit=index - bit_range[0], value=self.get_bit(input=input, bit=index), ) return output
##################### Superclass method implementations #################################
[docs] def initialise(self): """Initialise AdcPowerMeter""" self.logger.info("PreAdu has been initialised") return True
[docs] def status_check(self): """Perform status check. :return: Status """ self.logger.info("PreAdu: Checking status") return Status.OK
[docs] def clean_up(self): """Perform cleanup. :return: Success """ self.logger.info("PreAdu: Cleaning up") return True