import time
from copy import copy
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
__author__ = "Alessio Magro"
[docs]
class TpmPreAdu(FirmwareBlock):
"""TpmPreAdu plugin"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("tpm_preadu")
@maxinstances(2)
def __init__(self, board, logger=None, **kwargs):
"""
TpmPreAdu initialiser.
:param board: Pointer to board instance
"""
super(TpmPreAdu, self).__init__(board, logger=logger)
if "preadu_id" not in list(kwargs.keys()):
raise PluginError("PreAdu: preadu_id required")
self._preadu_id = kwargs["preadu_id"]
# Define a filter per channel in order to be able to enable and disable
# RF output from the channel
self._nof_channels = 16
self.channel_filters = [0x0] * self._nof_channels
# Define cpld register addresses
self._cpld_reg_addr = [0x70000000, 0x70000004, 0x70000008, 0x7000000C]
self._cpld_preadu_cntl_addr = 0x70000010
# Define cpld register constants
self._nof_cpld_regs = len(self._cpld_reg_addr)
self._cpld_subword_width = 8
self._nof_cpld_subwords = int(32 / self._cpld_subword_width)
# Read current configuration
self.read_configuration()
# Try to check if preadu is present, will fail if preADU is OFF
# sets self.is_present
self.check_present()
self.__info = None
#######################################################################################
@property
def info(self):
if self.__info is None:
ext_label = self.board.tpm_qsfp_adapter[0].get_field(
f"FE_{self._preadu_id}_LABEL"
)
self.__info = {
"EXT_LABEL": None,
"vendor": None,
"pn": None,
"ver": "1.0",
"sn": None,
}
if ext_label is not None:
self.__info["EXT_LABEL"] = ext_label
if len(ext_label.split(";")) >= 4:
self.__info["vendor"] = ext_label.split(";")[0]
self.__info["pn"] = ext_label.split(";")[1]
self.__info["ver"] = ext_label.split(";")[2]
self.__info["sn"] = ext_label.split(";")[3]
else: # if standard format missing assume default version 1.0
self.__info["sn"] = ext_label
return self.__info
@property
def powered_on(self):
return self.board["board.regfile.enable.fe"] > 0
[docs]
def switch_on(self):
"""Switch preadu on"""
# Switch on preadu
self.board["board.regfile.enable.fe"] = 1
time.sleep(0.2)
# Check that preadu has been switched on properly
value = self.board["board.regfile.enable.fe"]
if value != 1:
self.logger.warning(f"Error! preADU power is not high: {value}")
return
self.logger.debug("preADU power on done!")
# Check if preadu is present, sets self.is_present
time.sleep(0.5)
self.check_present()
[docs]
def switch_off(self):
"""Switch preadu off"""
# Switch off preadu
self.board["board.regfile.enable.fe"] = 0
time.sleep(0.2)
# Check that preadu has been switched off properly
value = self.board["board.regfile.enable.fe"]
if value == 0x0:
self.logger.debug("preADU power off done!")
else:
self.logger.warning(f"Error! preADU power is not low: {value}")
[docs]
def set_attenuation(self, attenuation, channels=None):
"""
Set attenuation level for a particular channel.
Supports attenuation from 0 to 31 with a precision of 0.25.
:param attenuation: Value of attenuation
:param channels: Preadu channels (None if to be applied to all channels"""
if channels is None:
channels = list(range(self._nof_channels))
# Sanity checks
if not 0 <= attenuation < 2**5:
self.logger.warning(
f"Cannot set PREADU attenuation {attenuation}, out of range"
)
return
for channel in channels:
if not 0 <= channel <= self._nof_channels:
self.logger.warning(
f"Cannot set attenuation for channel {channel}, invalid channel"
)
if attenuation % 0.25 != 0:
self.logger.warning(
f"PreADU attenuation precision of 0.25 supported. Value {attenuation} specified will be rounded to {attenuation // 0.25 / 4}"
)
# Apply attenuation with currently selected filter
attenuation_int = int(attenuation * 4)
attenuation_value = 0
attenuation_value = self.set_bits(
input=attenuation_value, bit_range=(1, 7), value=attenuation_int
)
self.channel_filters[channel] = attenuation_value
self.logger.debug(
f"preADU channel {channel} attenuation has been set to {attenuation_value}"
)
[docs]
def get_attenuation(self):
"""
Returns the current attenuation value.
Stored in channel_filters[7:1] with the 2 least significant bits of which
forming a fractional component.
For example if channel_filters[7:1] == 33 this corresponds to an attenuation
of 1000.01 (base 2) or 8.25 (base 10)
"""
output = [None] * self._nof_channels
for channel_index in range(self._nof_channels):
output[channel_index] = (
self.get_bits(
input=self.channel_filters[channel_index], bit_range=(1, 7)
)
* 0.25
)
return output
# Log warning if unsupported methods used
# Override inherited TPM 1.2 methods
# -----------------------------------------------------------------------------------
[docs]
def disable_channels(self, channels=None):
self.logger.warning("disable_channels method is only supported on TPM 1.2")
return
[docs]
def select_low_passband(self):
self.logger.warning("select_low_passband method is only supported on TPM 1.2")
return
[docs]
def select_high_passband(self):
self.logger.warning("select_high_passband method is only supported on TPM 1.2")
return
[docs]
def enable_channels(self, channels=None):
self.logger.warning("enable_channels method is only supported on TPM 1.2")
return
[docs]
def get_passband(self):
self.logger.warning("get_passband method is only supported on TPM 1.2")
return
[docs]
def write_configuration(self, update_preadu_reg=True):
"""Write configuration to preadu
WARNING if update_preadu_reg is false, the preadu shift registers and attenuation registers out of sync
"""
# Each channel occupies 8-bit subword within a 32-bit word (one cpld shift register)
for register_num, addr in enumerate(self._cpld_reg_addr):
config = 0
for subword_num in range(self._nof_cpld_subwords):
flat_channel_number = (
register_num * self._nof_cpld_subwords + subword_num
)
from_bit = self._cpld_subword_width * subword_num
to_bit = self._cpld_subword_width * (subword_num + 1) - 1
value_to_be_written = self.bit_reverse(
self.channel_filters[flat_channel_number]
)
config = self.set_bits(
input=config,
bit_range=(from_bit, to_bit),
value=value_to_be_written,
)
self.board[addr] = config
self.logger.debug(
f"cpld shift register {register_num} has been set to {hex(config)}"
)
cpld_preadu_cntl_value = 0
# Shifts the cpld shift register to the preadu shift register
cpld_preadu_cntl_value = self.set_bit(
input=cpld_preadu_cntl_value, bit=0, value=1
)
if update_preadu_reg:
# And updates the preADU attenuation register with the values in the preadu shift register
cpld_preadu_cntl_value = self.set_bit(
input=cpld_preadu_cntl_value, bit=1, value=1
)
# To the correct preADU
cpld_preadu_cntl_value = self.set_bit(
input=cpld_preadu_cntl_value, bit=2, value=self._preadu_id
)
# set preADU cntl register on the cpld to the correct value
self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value
# wait for the shifting of registers to have occurred
while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0:
time.sleep(0.01)
self.logger.debug(f"preADU registers have been written to")
[docs]
def read_configuration(self):
"""Read configuration from preadu"""
cpld_preadu_cntl_value = 0
# Shifts the cpld shift register to the preadu shift register,
# and the preadu shift register to the cpld shift register
cpld_preadu_cntl_value = self.set_bit(
input=cpld_preadu_cntl_value, bit=0, value=1
)
# of the correct preADU
cpld_preadu_cntl_value = self.set_bit(
input=cpld_preadu_cntl_value, bit=2, value=self._preadu_id
)
# set preADU cntl register on the cpld to the correct value
self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value
# wait for the shifting of registers to have occurred
while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0:
time.sleep(0.01)
# Each channel occupies 8-bit within a 32-bit word
for register_num, addr in enumerate(self._cpld_reg_addr):
config = self.board[addr]
# Extract channel values
for subword_num in range(self._nof_cpld_subwords):
flat_channel_number = (
register_num * self._nof_cpld_subwords + subword_num
)
from_bit = self._cpld_subword_width * subword_num
to_bit = self._cpld_subword_width * (subword_num + 1) - 1
subword_value = self.get_bits(
input=config, bit_range=(from_bit, to_bit)
)
self.channel_filters[flat_channel_number] = self.bit_reverse(
subword_value
)
self.logger.debug(
f"channel_filters[{flat_channel_number}] has been set to {hex(self.bit_reverse(subword_value))}"
)
# set preADU cntl register on the cpld, to switch back the cpld and preadu shift registers
self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value
# wait for the shifting of registers to have occurred
while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0:
time.sleep(0.01)
[docs]
def check_present(self):
"""
Detects if a preADU is present.
Sets attribute "is_present".
A gain of 21 is written to all channels and read back.
If the first gain value read back is 21, then a preADU
is present.
The starting configuration of the software gains and
preADU hardware gains are copied and restored at the
end of the method.
"""
if not self.powered_on:
self.logger.warning(f"Preadu powered OFF, unable to check if present")
self.is_present = False
return
# Save starting configuration
starting_configuration_sw = copy(self.channel_filters)
self.read_configuration()
starting_configuration_hw = copy(self.channel_filters)
self.set_attenuation(21)
self.write_configuration(update_preadu_reg=False)
self.read_configuration()
returned_value = self.get_attenuation()[0]
preadu_detected = True if returned_value == 21 else False
if not preadu_detected:
self.logger.debug(
f"got: {returned_value}, expected: 21, assuming preADU is not present"
)
self.is_present = preadu_detected
# Write back starting configuration
self.channel_filters = starting_configuration_hw
self.write_configuration(update_preadu_reg=False)
self.channel_filters = starting_configuration_sw
return
[docs]
@staticmethod
def bit_reverse(value):
"""
Reverse bits in value.
:param value: Value to bit reverse
"""
return int("%d" % (int("{:08b}".format(value)[::-1], 2)))
[docs]
@staticmethod
def set_bit(input, bit, value):
"""
Sets the specified bit in input with a value.
:param input: value before set bit.
:param bit: position of bit to set.
:param value: value of bit in returned value
"""
if value == 1:
return input | (1 << bit)
else:
return input & ~(1 << bit)
[docs]
@staticmethod
def get_bit(input, bit):
"""
Gets the specified bit in input.
:param input: value to get bit.
:param bit: position of bit.
"""
return (input & (1 << bit)) >> bit
[docs]
def set_bits(self, input, bit_range, value):
"""
Sets the specified bits in input with value.
:param input: value before set bits.
:param bit_range: position of bits to set (from,to).
:param value: value of bits in returned value
"""
output = input
for input_index, output_index in enumerate(
range(bit_range[0], bit_range[1] + 1)
):
output = self.set_bit(
input=output,
bit=output_index,
value=self.get_bit(input=value, bit=input_index),
)
return output
[docs]
def get_bits(self, input, bit_range):
"""
Gets the specified bits in an input.
:param input: value to get bit.
:param bit_range: position of bits to get (from,to).
"""
output = 0
for index in range(bit_range[0], bit_range[1] + 1):
output = self.set_bit(
input=output,
bit=index - bit_range[0],
value=self.get_bit(input=input, bit=index),
)
return output
##################### Superclass method implementations #################################
[docs]
def initialise(self):
"""Initialise AdcPowerMeter"""
self.logger.info("PreAdu has been initialised")
return True
[docs]
def status_check(self):
"""Perform status check.
:return: Status
"""
self.logger.info("PreAdu: Checking status")
return Status.OK
[docs]
def clean_up(self):
"""Perform cleanup.
:return: Success
"""
self.logger.info("PreAdu: Cleaning up")
return True