import binascii
import logging
import math
import socket
import struct
import time
import zlib
from math import ceil
from pathlib import Path
from time import sleep
import ska_low_sps_tpm_api.boards.tpm_hw_definitions as tpm_hw_definitions
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.boards.fpgaboard import DeviceNames, FPGABoard
from ska_low_sps_tpm_api.plugins.hw_lock import HwLock, _NoLock
from ska_low_sps_tpm_api.protocols.ucp import UCP
[docs]
class TPM(FPGABoard):
"""FPGABoard subclass for communicating with a TPM board"""
[docs]
def __init__(self, **kwargs):
"""Class constructor"""
kwargs["fpgaBoard"] = BoardMake.TpmBoard
kwargs["protocol"] = UCP
# Set hardcoded CPLD xml offset address
self._cpld_xml_offset = tpm_hw_definitions.CPLD_XML_OFFSET
self._cpld_magic = tpm_hw_definitions.CPLD_MAGIC_TPM_V1_5
# Initialise variables
self._simulator = False
self._initialise = False
self._enable_ada = False
self.adas_enabled = False
self._enable_adc = True
# CPLD SelectMap Registers
self._xil_registers = ["board.smap.xil_0", "board.smap.xil_1"]
self._global_register = "board.smap.global"
self._c2c_stream_enable = "board.regfile.ena_stream"
self.download_firmware_required_cpld_version = 0
self.tpm_hw_rev_1_5 = tpm_hw_definitions.TPM_HW_REV_1_5
self.tpm_hw_rev_2_0 = tpm_hw_definitions.TPM_HW_REV_2_0
self.board_info = None
self.hw_rev = None
self.CPLD_FW_VER_LOCK_I2C_CHANGE = (
tpm_hw_definitions.CPLD_FW_VER_LOCK_I2C_CHANGE_TPM_V1_5
)
self.MCU_FW_VER_LOCK_I2C_CHANGE = (
tpm_hw_definitions.MCU_FW_VER_LOCK_I2C_CHANGE_TPM_V1_5
)
self.MCU_FW_VER_ERROR_NO_MCU = tpm_hw_definitions.MCU_FW_VER_ERROR_NO_MCU
# Call superclass initialiser
super(TPM, self).__init__(**kwargs)
self.BIOS_REV_list = [
["0.0.1", "CPLD_0x19121901-MCU_0xb0000010_0x0_0x0"],
["0.0.2", "CPLD_0x19122316-MCU_0xb0000010_0x0_0x0"],
["0.0.3", "CPLD_0x20011611-MCU_0xb0000010_0x0_0x0"],
["0.0.4", "CPLD_0x20011611-MCU_0xb0000016_0x20200123_0x113426"],
["0.0.5", "CPLD_0x20041613-MCU_0xb0000016_0x20200123_0x113426"],
["0.1.0", "CPLD_0x20111309-MCU_0xb0000100_0x20201120_0xe3f6fa2"],
["0.2.0", "CPLD_0x21033009-MCU_0xb0000107_0x20210114_0x0"],
["0.2.1", "CPLD_0x21061417-MCU_0xb0000117_0x20210615_0x0"],
["0.2.2", "CPLD_0x22032409-MCU_0xb0000118_0x20220831_0x0"],
["0.3.0", "CPLD_0x23011618-MCU_0xb000011a_0x20230126_0x0"],
["0.3.1", "CPLD_0x23011618-MCU_0xb000011a_0x20230209_0x0"],
["0.4.0", "CPLD_0x23090714-MCU_0xb000011a_0x20230209_0x0"],
["0.5.0", "CPLD_0x23092511-MCU_0xb000011a_0x20230209_0x0"],
["0.6.0", "CPLD_0x25050815-MCU_0xb000011b_0x20250509_0x29ae87a"],
["1.0.0", "CPLD_0x26031616-MCU_0xb000011c_0x20260318_0x828bd55"],
]
self.BOARD_MODE = {"ada": 0, "no-ada": 0xFF}
self._spi_lock = _NoLock()
self._smap_lock = _NoLock()
self._i2c_lock = _NoLock()
[docs]
def connect(
self,
ip,
port,
fsample=800e6,
ddc=False,
fddc=0.0,
adc_clock_divider=1,
mono_channel_14_bit=False,
mono_channel_sel=0,
adc_low_bitrate=False,
fullscale_voltage=1.59,
**kwargs,
):
"""
Overload connect method.
:param fsample: Sampling rate
:param ip: IP address to connect to
:param port: Port to connect to
"""
# Check if we are simulating or not
self._simulator = kwargs.get("simulator", False)
# Check if we are initialising or not
self._initialise = kwargs.get("initialise", False)
# TPM might or might not have an ADA
self._enable_ada = kwargs.get("enable_ada", False)
# You can skip ADC enabling for test/debug purpose (default True to preserve back compatibility)
self._enable_adc = kwargs.get("enable_adc", True)
# Call connect on super class
super(TPM, self).connect(ip, port, **kwargs)
# CPLD is assumed programmed
self._programmed[Device.Board] = True
# Try to read address 0x30000000 (CPLD version) check whether TPM is reachable
try:
self.read_address(tpm_hw_definitions.CPLD_VERSION_OFFSET)
except Exception:
self.status[Device.Board] = Status.NetworkError
self._connected = False
# Ensure transport resources are released on connect failure.
self._protocol.close_connection()
raise LibraryError("Could not reach TPM with address {}".format(ip))
_tmp = self.read_address(tpm_hw_definitions.CPLD_MAGIC_OFFSET)
if _tmp != self._cpld_magic:
raise LibraryError(
"TPM CPLD_MAGIC mismatch. Expected 0x%x, got 0x%x."
% (self._cpld_magic, _tmp)
)
# Get XML file
cpld_xml = str(self._get_xml_file(self.read_address(self._cpld_xml_offset)))
cpld_xml = cpld_xml.replace('<?xml version="1.0" ?>', "")
cpld_xml = "<node>\n{}\n</node>".format(cpld_xml)
super(TPM, self).load_firmware(device=Device.Board, register_string=cpld_xml)
self._logger.info("tpm.connect: CPLD XML File Downloaded")
# Get register "board.regfile.date_code"
# necessary to adapt driver behavior for different TPM BIOS
# better to retrieve only one time at connect
self.board_regfile_date_code = self["board.regfile.date_code"]
# Load plugins
self.load_plugins()
# Check Global status alarm
get_global_status_alarms = self.get_global_status_alarms()
# Initialise devices if required
if not self._simulator and self._connected:
if self._initialise:
self._initialise_devices(
fsample=fsample,
ddc=ddc,
fddc=fddc,
adc_clock_divider=adc_clock_divider,
mono_channel_14_bit=mono_channel_14_bit,
mono_channel_sel=mono_channel_sel,
fullscale_voltage=fullscale_voltage,
adc_low_bitrate=adc_low_bitrate,
)
# else:
# self.tpm_pll.pll_reset()
[docs]
def load_plugins(self):
"""load tpm plugins"""
# Load the CPLD plugin (does not require the board to be initialised)
if self.memory_map.has_register("board.lock.lock_anaspi"):
self._spi_lock = HwLock(board=self, lock_register="board.lock.lock_anaspi")
if self.memory_map.has_register("board.lock.lock_smap"):
self._smap_lock = HwLock(board=self, lock_register="board.lock.lock_smap")
if self.memory_map.has_register("board.lock.lock_i2c"):
self._i2c_lock = HwLock(board=self, lock_register="board.lock.lock_i2c")
self.load_plugin("TpmProgFlash")
self.load_plugin("TpmCpld")
self.load_plugin("TpmEEP")
self.load_plugin("TpmMcu")
self.i2c_old_mode = False
if (
self[tpm_hw_definitions.CPLD_MCU_VERSION_OFFSET]
!= self.MCU_FW_VER_ERROR_NO_MCU
):
if (
self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE
or self[tpm_hw_definitions.CPLD_MCU_VERSION_OFFSET]
<= self.MCU_FW_VER_LOCK_I2C_CHANGE
):
self.i2c_old_mode = True
else:
self._logger.warning(
"Board is running without MCU, reduced functionality available!"
)
self.i2c_old_mode = True
# Load QSFP_ADAPTER plugins
self.load_plugin("TpmQSFPAdapter", core_id=0)
self.load_plugin("TpmQSFPAdapter", core_id=1)
# Read Hardware information from EEPROM
if self.board_info is None:
self.board_info = self.get_board_info()
self.hw_rev = self.get_hardware_revision()
self._logger.info("Board SN: %s" % self.board_info["SN"])
self._logger.info(f"Board EXT_LABEL: {self.board_info['EXT_LABEL']}")
self._logger.info("Board HARDWARE_REV: %s" % self.board_info["HARDWARE_REV"])
self._logger.info("Board bios: %s" % self.board_info["bios"])
# Check if TPM supports C2C-LVDS FPGA firmware
self.tpm_c2c_lvds_supported = not self.has_register("board.regfile.c2c_pll")
if self["board.regfile.key_ok"] == 0:
self._logger.warning(
"Board is running without License, functionality will be corrupted after 1 hour from power-up"
)
if self["board.regfile.timebomb_status"] == 0:
self._logger.error(
"License timebomb already expired! Please power off and power on the board to recover functionality"
)
raise LibraryError(
"License timebomb already expired! Please power off and power on the board to recover functionality"
)
if not self._simulator and self._connected:
# Load PREADU plugins
self.load_plugin("TpmPreAdu", preadu_id=0)
self.load_plugin("TpmPreAdu", preadu_id=1)
# Try initialising the board (will check if FPGAs are programmed)
if not self._initialise_board():
return
# Pre-load all required plugins. Board-level devices are loaded here
[
self.load_plugin("TpmFirmwareInformation", firmware=x)
for x in range(1, 4)
]
# Load ADCs
[
self.load_plugin("TpmAdc9695", adc_id=adc)
for adc in [
"adc0",
"adc1",
"adc2",
"adc3",
"adc4",
"adc5",
"adc6",
"adc7",
"adc8",
"adc9",
"adc10",
"adc11",
"adc12",
"adc13",
"adc14",
"adc15",
]
]
# Load PLL
self.load_plugin("TpmPll")
# Update firmware information
if self.is_programmed():
[info.update_information() for info in self.tpm_firmware_information]
[docs]
def smap_select_fpga(self, fpga_list):
for n in fpga_list:
self[self._xil_registers[n]] = 0x10
[docs]
def smap_deselect_fpga(self, fpga_list):
for n in fpga_list:
self[self._xil_registers[n]] = 0x0
[docs]
def set_qsfp_led(self, id, val):
leds = [
{"device": 0, "name": "LED0n"},
{"device": 0, "name": "LED1n"},
{"device": 0, "name": "LED2n"},
{"device": 0, "name": "LED3n"},
{"device": 1, "name": "LED0n"},
{"device": 1, "name": "LED1n"},
{"device": 1, "name": "LED2n"},
{"device": 1, "name": "LED3n"},
]
# | LED | COLOR | FUNCTION |
# |-----|-------|-----------------------|
# | 1 | GREEN | QSFP-1 link-up |
# | 2 | GREEN | FPGAs programmed |
# | 3 | RED | available (SW error?) |
# | 4 | RED | MCU |
# | 5 | GREEN | QSFP-2 link-up |
# | 6 | GREEN | ADCs configured |
# | 7 | RED | available (SW error?) |
# | 8 | RED | MCU |
# LED 4 8 are managed by MCU
# | TPM STATUS | LED 4 | LED 8 |
# |-----------------|-------|-------|
# | OFF | OFF | OFF |
# | STANDBY | ON | ON |
# | SAFETY SHUTDOWN | ON | OFF |
# | NA | OFF | ON |
led = leds[id - 1]
if val == 0:
val = 1
else:
val = 0
if self.tpm_qsfp_adapter[led["device"]].present:
self.tpm_qsfp_adapter[led["device"]].set(led["name"], val)
[docs]
def erase_fpga(self, force=True):
"""Erase FPGAs configuration memory"""
def smap_data_format(dat):
def reverse_bit(num):
result = 0
for n in range(8):
result = (result << 1) + (num & 1)
num >>= 1
return result
smap_dat = 0
for n in range(4):
smap_dat |= reverse_bit(dat >> 8 * n) << 8 * n
return smap_dat
# Check if connected
if not self.is_connected():
raise LibraryError("TPM needs to be connected in order to program FPGAs")
self[self._c2c_stream_enable] = 0x0 # Disable C2C stream
# Temporary
self[self._global_register] = 0x3
# Erase FPGAs SRAM
for fpga_id in range(len(self._xil_registers)):
xil_register = self._xil_registers[fpga_id]
self._logger.info("Erasing FPGA%d using PROG pin method" % fpga_id)
self[xil_register] = 0x10 # Select FPGA
self[self._global_register] = 0x0 # PROG = 0
max_attempt = 10
attempt = 0
fpga_still_programmed = 0
while self[xil_register] & 0x1 == 1: # wait for INIT
if attempt == max_attempt:
fpga_still_programmed = 1
break
attempt += 1
sleep(0.1)
self[self._global_register] = 0x3
while self[xil_register] & 0x1 == 0: # wait for INIT
if attempt == max_attempt:
fpga_still_programmed = 1
break
attempt += 1
sleep(0.1)
# If FPGA erase with PROG pin method timed out, try with ICAP3 method
if fpga_still_programmed == 1:
self._logger.error(
"Erasing FPGA%d using PROG pin method failed" % fpga_id
)
raise LibraryError(
"Erasing FPGA%d using PROG pin method failed" % fpga_id
)
self[self._global_register] = 0x3
try:
# Power off led related to programming and QSFP link-up when erase fpga
self.set_qsfp_led(2, 0)
self.set_qsfp_led(1, 0)
self.set_qsfp_led(5, 0)
except LibraryError as e:
if str(e) == "I2C/EEP request not accepted!":
logging.error(f"Failed erasing FPGAs: {repr(e)}", exc_info=True)
else:
raise
[docs]
def download_firmware(self, device, bitfile):
"""
Download bitfile to FPGA.
:param device: FPGA to download bitfile
:param bitfile: Bitfile to download
"""
if self["board.regfile.enable.fpga"] == 0:
self._logger.info("FPGAs power disabled. Enabling")
self["board.regfile.enable.fpga"] = 1
time.sleep(0.5)
else:
self._logger.info("FPGAs power already enabled.")
# Lock SMAP interface to avoi MCU access conflict
start_lock = time.time()
if self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE:
self._logger.info("Locking SMAP interface to avoid MCU access conflict")
locked = False
retry = 10
while retry > 0:
self["board.lock.mlock1"] = 0x50000000
if self["board.lock.mlock1"] != 0x50000000:
retry = retry - 1
time.sleep(0.01)
else:
locked = True
break
else:
self._logger.info("Locking SMAP interface to avoid MCU access conflict")
self._smap_lock.lock()
locked = self._smap_lock.is_locked
if locked is False:
self._logger.error("ERROR: Can't Lock SMAP interface")
raise LibraryError("ERROR: Can't Lock SMAP interface")
# Program FPGA
""" Download bitfile to FPGA
:param device: FPGA to download bitfile
:param bitfile: Bitfile to download
"""
# Check if connected
if self.status[Device.Board] != Status.OK:
raise LibraryError("TPM needs to be connected in order to program FPGAs")
required_cpld_version = self.download_firmware_required_cpld_version
cpld_version = self[tpm_hw_definitions.CPLD_VERSION_OFFSET]
if cpld_version < required_cpld_version or cpld_version & 0xF0 == 0xB0:
self._logger.error(
"CPLD firmware version is too old. Required version is "
+ hex(required_cpld_version)
)
raise LibraryError(
"CPLD firmware version is too old. Required version is "
+ hex(required_cpld_version)
)
# rearm lock
if self.board_regfile_date_code > self.CPLD_FW_VER_LOCK_I2C_CHANGE:
self._smap_lock.refresh()
# Sanity checks on provided bitfile (exists and size)
f = Path(bitfile)
if not f.exists():
raise LibraryError("Provided bitfile ({}) does not exist".format(bitfile))
elif f.stat().st_size < 1024:
raise LibraryError(
"Provided bitfile ({}) is too small ({} bytes)".format(
bitfile, f.stat().st_size
)
)
# Disable C2C stream
self[self._c2c_stream_enable] = 0x0
# Select FPGAs to program
self.smap_deselect_fpga([0, 1])
self[self._global_register] = 0x3
# Erase FPGAs SRAM
self.erase_fpga(force=True)
# Read bitstream
with open(bitfile, "rb") as fp:
data = fp.read()
self.smap_select_fpga([0, 1])
self[self._global_register] = 0x2
fifo_register = 0x50001000
# rearm lock
if self.board_regfile_date_code > self.CPLD_FW_VER_LOCK_I2C_CHANGE:
self._smap_lock.refresh()
start = time.perf_counter()
# Check if ucp_smap_write is supported, otherwise fallback to UCP writes
# if ucp_smap_write is not supported, use normal UCP access
if cpld_version < tpm_hw_definitions.CPLD_FW_VER_REQUIRED_FOR_SMAP_DOWNLOAD:
self._logger.info("FPGA programming using UCP write")
self._protocol.select_map_program(
data, fifo_addr=fifo_register, ucp_smap_write=False
)
else:
if cpld_version <= self.CPLD_FW_VER_LOCK_I2C_CHANGE:
self._logger.info("FPGA programming using fast SelectMap write")
self._protocol.select_map_program(
data, fifo_addr=None, ucp_smap_write=True
)
else:
self._logger.info(
"FPGA programming using fast SelectMap with SMAP lock"
)
self._protocol.select_map_program(
data,
fifo_addr=None,
ucp_smap_write=True,
lock_obj=self._smap_lock,
)
# Wait for operation to complete
status_read = 0
for n, xil_register in enumerate(self._xil_registers):
while self[xil_register] & 0x2 != 0x2: # DONE high
time.sleep(0.01)
status_read += 1
if status_read == 100:
self._logger.error(
"FPGA"
+ str(n)
+ " DONE is not high. This means that either the FPGA is not \
programmed or the CPLD is not detecting the DONE signal correctly due to an hardware issue. \
In the first case further errors will be detected later. It is necessary to power cycle the board, \
re-try configuration and check if this error disappears. In the second case initialisation might \
complete without errors and this message can be ignored, however this still highlights that \
a minor hardware issue affects the TPM."
)
break
end = time.perf_counter()
self._logger.info("FPGA programming time: " + str(end - start) + "s")
self.smap_deselect_fpga([0, 1])
self[self._global_register] = 0x3
# Brute force check to make sure we can communicate with programmed TPM
magic_ok = False
for n in range(4):
try:
if self.tpm_c2c_lvds_supported:
logging.info(
"C2C PLL register not found, skipping calibrate_fpga_to_cpld"
)
self.set_c2c_calibration(False)
else:
self.calibrate_fpga_to_cpld()
magic0 = self[tpm_hw_definitions.FPGA_MAGIC_FPGA0_OFFSET]
magic1 = self[tpm_hw_definitions.FPGA_MAGIC_FPGA1_OFFSET]
if magic0 == magic1 == tpm_hw_definitions.FPGA_MAGIC:
magic_ok = True
break
else:
self._logger.info(
"FPGA magic numbers are not correct %s, %s"
% (hex(magic0), hex(magic1))
)
except:
pass
# !TODO: fix CPLD reset to work with TPM 1.6
# self._logger.info("Not possible to communicate with the FPGAs. Resetting CPLD...")
# self.write_address(0x30000008, 0x8000, retry=False) # Global Reset CPLD
# time.sleep(0.2)
# self.write_address(0x30000008, 0x8000, retry=False) # Global Reset CPLD
time.sleep(0.2)
# Unlock SMAP interface
end_lock = time.time()
self._logger.info(
"Unlocking SMAP interface: elapsed time %s", str(end_lock - start_lock)
)
if self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE:
self["board.lock.mlock1"] = 0xFFFFFFFF
else:
self._smap_lock.unlock()
if magic_ok:
self.set_qsfp_led(2, 1)
return
else:
if self.tpm_c2c_lvds_supported:
# Detected BIOS >= 1.0.0 requires FPGA firmware with C2C-LVDS functionality, version >= 11.0.0.
raise LibraryError(
f"Not possible to communicate with the FPGAs. Current TPM BIOS {self.tpm_bios_version_number} requires C2C-LVDS functionality. Check if the correct FPGA bitfile has been loaded (required >= 11.0.0)."
)
else:
# Detected BIOS < 1.0.0 requires FPGA firmware without C2C-LVDS functionality
raise LibraryError(
f"Not possible to communicate with the FPGAs. Current TPM BIOS {self.tpm_bios_version_number} requires FPGA firmware without C2C-LVDS functionality. Check if the correct FPGA bitfile has been loaded (required <= 10.0.0)."
)
[docs]
def calibrate_fpga_to_cpld(self):
"""Calibrate communication between FPGAs and CPLD"""
self.set_c2c_calibration(True)
# Disable c2c streaming
self[self._c2c_stream_enable] = 0x0
while self[self._c2c_stream_enable] != 0x0:
time.sleep(0.01)
# PLL in CPLD calibrated in 64 steps (done twice)
devices = ["fpga1", "fpga2"]
for f in devices:
if f == "fpga1":
m = 0
phasesel = 0
else:
m = 1
phasesel = 1
lo = -1
this_error = -1
mask = 0x1 << (4 + m)
for n in range(128):
self._smap_lock.refresh()
time.sleep(0.01)
previous_error = this_error
this_error = (self[0x30000040] & mask) >> (4 + m)
if (
this_error == 0
and (
previous_error == 1
or previous_error == 2
or previous_error == 3
)
and lo == -1
):
lo = n
if (
(this_error == 1 or this_error == 2 or this_error == 3)
and previous_error == 0
and lo != -1
):
k = (((n - 1) - lo) // 2) + 1
for x in range(k):
self[0x30000028] = 0x010 + (phasesel << 8)
self[0x30000028] = 0x011 + (phasesel << 8)
self[0x30000028] = 0x010 + (phasesel << 8)
time.sleep(0.02)
self._logger.info("%s to CPLD calibrated", f.upper())
self._logger.info(
"%s to CPLD. Start phase: %i, Stop phase %i "
% (f.upper(), lo, n)
)
# Disable calibration pattern transmission
if m == 0:
break
else:
self.set_c2c_calibration(False)
return
# Advancing PLL phase
self[0x30000028] = 0x000 + (phasesel << 8)
self[0x30000028] = 0x001 + (phasesel << 8)
self[0x30000028] = 0x000 + (phasesel << 8)
time.sleep(0.01)
self._logger.fatal("Could not calibrate FPGA to CPLD streaming")
[docs]
def set_c2c_calibration(self, enable=True):
if enable:
wr = 0x5A01
else:
wr = 0x1
# self['board.regfile.c2c_ctrl.mm_read_stream'] = 0
self[0x3000002C] = 1
for ba in [0x0, 0x10000000]:
try:
self.write_address(ba + 0x90, wr, retry=False)
# self[ba + 0x90] = 0x5A00
except:
pass
[docs]
def load_firmware(
self, device, register_string=None, load_values=False, base_address=0
):
"""
Override uperclass load_firmware to extract memory map from the bitfile.
:param base_address: base address at which to load firmware
:param device: Device on board to load firmware to
:param register_string: String containing register information
:param load_values: Load register values
"""
# Check if device is valid
if device not in [Device.Board, Device.FPGA_1, Device.FPGA_2]:
raise LibraryError("TPM devices can only be Board, FPGA_1 and FPGA_2")
# Check if connected
if not self._connected:
raise LibraryError("Not connected to board, cannot load firmware")
# Get FPGA base address and check if firmware is loaded in FPGA
base_address = (
self["board.info.fpga1_base_add"]
if device == Device.FPGA_1
else self["board.info.fpga2_base_add"]
)
try:
loaded = (
self["board.regfile.fpga1_programmed_fw"]
if device == Device.FPGA_1
else self["board.regfile.fpga2_programmed_fw"]
)
except:
loaded = 1
register = "board.info.fw%d_xml_offset" % loaded
if not self.memory_map.has_register(register):
raise LibraryError(
"CPLD register information must be loaded prior to loading firmware"
)
# Get design from extended info BRAM
design = "tpm_test" # default design
try:
# determine first and last address of extended info BRAM
# and read contents
offset = self["board.info.fw1_extended_info_offset"]
size = self[int(offset)]
data = self.read_address(offset + 4, n=int(ceil(size / 4.0)))
# convert contents back to ascii and uncompress
data = "".join([format(n, "08x") for n in data])
data = zlib.decompress(binascii.unhexlify(data[: 2 * size]))
# parse extended info for "\n\nDESIGN: <design>\n\n"
for item in data.decode().split("\n\n"):
key = item.split(": ")[0].strip().lower()
if key == "design":
design = item.split(": ")[1]
break
except:
pass
# Get XML file offset and read XML file from board
register_string = self._get_xml_file(self[register])
register_string = register_string.replace(b'<?xml version="1.0" ?>', b"")
register_string = register_string.replace(
f'id="{design}"'.encode(),
b'id="fpga1"' if device == Device.FPGA_1 else b'id="fpga2"',
)
register_string = "<node>\n{}\n</node>".format(register_string)
# Call superclass with this file
super(TPM, self).load_firmware(
device=device,
register_string=register_string,
base_address=base_address,
load_values=load_values,
)
[docs]
def is_programmed(self, fpga_id=None):
"""Returns True if Board is programmed"""
if not self.is_connected():
self._logger.debug("tpm.is_programmed")
return None
self._logger.debug(
"FPGAs is_programmed? Enable %d, Done[0-1] %d"
% (self["board.regfile.enable.fpga"], self["board.regfile.xilinx.done"])
)
self._programmed[Device.FPGA_1] = self["board.regfile.enable.fpga"] and bool(
self["board.regfile.xilinx.done"] & 0x1
)
self._programmed[Device.FPGA_2] = self["board.regfile.enable.fpga"] and bool(
self["board.regfile.xilinx.done"] & 0x2
)
if fpga_id is None:
if (not self._programmed[Device.FPGA_1]) and (
not self._programmed[Device.FPGA_2]
):
self._logger.debug("Return False")
return False
self._logger.debug("Return True")
return True
else:
if fpga_id == 0:
self._logger.debug("Return %s" % self._programmed[Device.FPGA_1])
return bool(self._programmed[Device.FPGA_1])
elif fpga_id == 1:
self._logger.debug("Return %s" % self._programmed[Device.FPGA_2])
return bool(self._programmed[Device.FPGA_2])
return None
[docs]
def set_lmc_ip(
self, ip="10.0.10.1", port=None, port2=None, port3=None, same_port=True
):
"""
Set the IP address for LMC data transfer.
:param ip: IP address in string form
:param port: Port
:param port2: Port for stream 1
:param port3: Port for stream 2
:param same_port: Use the same port for all ports
"""
# Set Stream 0:
# Set LMC IP
self["board.regfile.stream_dst_ip0"] = struct.unpack(
"!I", socket.inet_aton(ip)
)[0]
# Set LMC destination and source port
if port is None or type(port) is not int:
port = self["board.regfile.stream_dst_port0"] & 0xFFFF
self["board.regfile.stream_src_port0"] = 10001
self["board.regfile.stream_dst_port0"] = port
# Set Stream 1:
# Set LMC IP
self["board.regfile.stream_dst_ip1"] = struct.unpack(
"!I", socket.inet_aton(ip)
)[0]
# Set LMC destination and source port
self["board.regfile.stream_src_port1"] = 10001
if port2 is not None and type(port) is int:
self["board.regfile.stream_dst_port1"] = port2
elif same_port:
self["board.regfile.stream_dst_port1"] = port
# # Set Stream 2:
# # Set LMC IP
# self['board.regfile.stream_dst_ip2'] = struct.unpack("!I", socket.inet_aton(ip))[0]
#
# # Set LMC destination and source port
# self['board.regfile.stream_src_port2'] = 10001
# if port3 is not None and type(port) is int:
# self['board.regfile.stream_dst_port2'] = port3
# elif same_port:
# self['board.regfile.stream_dst_port2'] = port
[docs]
def temperature(self):
"""Get board temperature"""
return self.tpm_monitor.get_temperature()
[docs]
def voltage(self):
"""Get board voltage"""
return self.tpm_monitor.get_voltage_5v0()
[docs]
def set_shutdown_temperature(self, temp):
self._logger.info(
"Shutdown temperature not supported use set_board_alm_temp_thresholds in tpm_monitor"
)
[docs]
def get_firmware_list(self, device=Device.Board):
"""
Get list of downloaded firmware on TPM FLASH.
:param device: This should always be the board for the TPM
:return: List of design name as well as version
"""
# Got through all firmware information plugins and extract information
firmware = []
for plugin in self.tpm_firmware_information:
# Update information
plugin.update_information()
# Check if design is valid:
if plugin.get_design() is not None:
firmware.append(
{
"design": plugin.get_design(),
"major": plugin.get_major_version(),
"minor": plugin.get_minor_version(),
}
)
# All done, return
return firmware
[docs]
def get_part_number(self):
return self.tpm_eep.get_field("PN")
[docs]
def get_serial_number(self):
return self.tpm_eep.get_field("SN")
[docs]
def get_hardware_revision(self):
hw_rev_arr = self.tpm_eep.get_field("HARDWARE_REV")
hw_rev = 0
for byte in hw_rev_arr:
hw_rev = hw_rev * 256 + byte
if hw_rev == 0xFFFFFF or hw_rev == 0x0:
self._logger.error(
"Could not read HARDWARE_REV from EEPROM, returned: " + hex(hw_rev)
)
return hw_rev
[docs]
def get_bios(self):
bios_string = (
f"CPLD_{self.tpm_cpld.get_version()}-MCU_{self.tpm_monitor.get_version()}"
)
for version, version_bios_string in self.BIOS_REV_list:
if version_bios_string == bios_string:
self.tpm_bios_version_number = f"v{version}"
return f"v{version} ({bios_string})"
self._logger.critical(
f"BIOS version {bios_string} is not on the BIOS revision List"
)
self.tpm_bios_version_number = f"v.?.?.?"
return f"v?.?.? ({bios_string})"
[docs]
def get_mac(self):
mac = self.tpm_eep.get_field("MAC")
mac_str = ""
for i in range(0, len(mac) - 1):
mac_str += "{0:02x}".format(mac[i]) + ":"
mac_str += "{0:02x}".format(mac[len(mac) - 1])
return mac_str
def __get_ext_label__(self):
eep_rev = self.tpm_eep.get_field("eep_rev")
_label = []
_label.append(self.tpm_eep.get_field("EXT_LABEL_PN"))
_label.append(self.tpm_eep.get_field("EXT_LABEL_SN"))
if eep_rev == tpm_hw_definitions.EEPROM_VER_FULL_EXT_LABEL:
# NEW BEHAVIOR with whole EXT_LABEL semi-column separated value
_label.append(self.tpm_eep.get_field("EXT_LABEL_2"))
try:
_label.append(self.tpm_qsfp_adapter[0].get_field("TPM_EXT_LABEL_3"))
except:
self._logger.warning("QSFP-1 missing, cannot get TPM_EXT_LABEL_3 field")
return "".join(_label)
elif eep_rev == tpm_hw_definitions.EEPROM_VER_OLD:
# OLD BEHAVIOR with EXT_LABEL_PN-EXT_LABEL_SN, remove empty filed and "-".join
_label = list(filter(None, _label)) # remove empty fields
return "-".join(_label)
else:
raise BoardError(f"Unknown eep_rev {hex(eep_rev)}")
def __set_ext_label__(self, _label):
eep_rev = self.tpm_eep.get_field("eep_rev")
if eep_rev != tpm_hw_definitions.EEPROM_VER_FULL_EXT_LABEL:
self.tpm_eep.set_field(
"eep_rev",
tpm_hw_definitions.EEPROM_VER_FULL_EXT_LABEL,
override_protected=True,
)
fields = {
"EXT_LABEL_PN": self.tpm_eep,
"EXT_LABEL_SN": self.tpm_eep,
"EXT_LABEL_2": self.tpm_eep,
"TPM_EXT_LABEL_3": self.tpm_qsfp_adapter[0],
}
for key, obj in fields.items():
_str = _label[0 : obj.eep_sec[key]["size"]]
_label = _label[obj.eep_sec[key]["size"] :]
if key == "TPM_EXT_LABEL_3":
try:
obj.set_field(key, _str, override_protected=True)
except:
self._logger.warning(
"QSFP-1 missing, cannot set TPM_EXT_LABEL_3 field"
)
else:
obj.set_field(key, _str, override_protected=True)
[docs]
def get_board_info(self):
if self.board_info is not None:
return self.board_info
if self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE:
tpm_info = {
"ip_address": long2ip(self["board.regfile.eth_ip"]),
"netmask": long2ip(self["board.regfile.eth_mask"]),
"gateway": long2ip(self["board.regfile.eth_gway"]),
"ip_address_eep": self.tpm_eep.get_field("ip_address"),
"netmask_eep": self.tpm_eep.get_field("netmask"),
"gateway_eep": self.tpm_eep.get_field("gateway"),
"MAC": self.get_mac(),
"SN": self.get_serial_number(),
"PN": self.get_part_number(),
"bios": self.get_bios(),
"EXT_LABEL": self.__get_ext_label__(),
}
else:
tpm_info = {
"ip_address": long2ip(self["board.i2c.ip"]),
"netmask": long2ip(self["board.i2c.netmask"]),
"gateway": long2ip(self["board.i2c.gateway"]),
"ip_address_eep": self.tpm_eep.get_field("ip_address"),
"netmask_eep": self.tpm_eep.get_field("netmask"),
"gateway_eep": self.tpm_eep.get_field("gateway"),
"MAC": self.get_mac(),
"SN": self.get_serial_number(),
"PN": self.get_part_number(),
"bios": self.get_bios(),
"EXT_LABEL": self.__get_ext_label__(),
}
if self.tpm_eep.get_field("BOARD_MODE") == self.BOARD_MODE["ada"]:
tpm_info["BOARD_MODE"] = "ADA"
elif self.tpm_eep.get_field("BOARD_MODE") == self.BOARD_MODE["no-ada"]:
tpm_info["BOARD_MODE"] = "NO-ADA"
else:
tpm_info["BOARD_MODE"] = "UNKNOWN"
pcb_rev = self.tpm_eep.get_field("PCB_REV")
if pcb_rev == 0xFF:
pcb_rev_string = ""
else:
pcb_rev_string = str(pcb_rev)
hw_rev = self.tpm_eep.get_field("HARDWARE_REV")
tpm_info["HARDWARE_REV"] = (
"v"
+ str(hw_rev[0])
+ "."
+ str(hw_rev[1])
+ "."
+ str(hw_rev[2])
+ pcb_rev_string
)
ddr_size = self.tpm_eep.get_field("DDR_SIZE_GB")
if ddr_size == 0xFF:
tpm_info["DDR_SIZE_GB"] = "4"
else:
tpm_info["DDR_SIZE_GB"] = str(ddr_size)
return tpm_info
[docs]
def get_global_status_alarms(self):
alarms = {
"I2C_access_alm": (self["board.regfile.global_status"] & 0xF0000) >> 16,
"temperature_alm": self["board.regfile.global_status.temperature"],
"voltage_alm": self["board.regfile.global_status.voltage"],
"SEM_wd": self["board.regfile.global_status.SEM"],
"MCU_wd": self["board.regfile.global_status.MCU"],
}
active_alarms = list(filter(lambda key: alarms[key] == 2, alarms))
active_warnings = list(filter(lambda key: alarms[key] == 1, alarms))
if len(active_alarms):
self._logger.error(f"Global status alarm: {active_alarms}")
if "MCU_wd" in active_alarms:
self._logger.error(
f"Global status alarm: mcu_status - {self.tpm_monitor.get_mcu_last_exec_phase_id()}"
)
self._logger.error(
f"Global status alarm: wait 2s to verify if mcu is stuck"
)
time.sleep(2)
self._logger.error(
f"Global status alarm: mcu_status - {self.tpm_monitor.get_mcu_last_exec_phase_id()}"
)
if alarms["temperature_alm"] == 2 or alarms["voltage_alm"] == 2:
self._logger.error(
f"Global status alarm: err_status - {self.tpm_monitor.get_last_error()}"
)
if len(active_warnings):
self._logger.warning(f"Global status alarm: {active_warnings}")
if alarms["temperature_alm"] == 1 or alarms["voltage_alm"] == 1:
self._logger.warning(
f"Global status alarm: warn_status - {self.tpm_monitor.get_last_warning()}"
)
return alarms
[docs]
def clear_global_status_alarms(self):
"""Clear global status alarm flags"""
self["board.regfile.global_status_ack.voltage"] = 0x1
self["board.regfile.global_status_ack.temperature"] = 0x1
self._poll_reg(
"board.regfile.global_status_ack.voltage",
0,
"NOT voltage ACK Detected on clear_global_status_alarms",
)
self._poll_reg(
"board.regfile.global_status_ack.temperature",
0,
"NOT temperature ACK Detected on clear_global_status_alarms ",
)
[docs]
def check_ad9550_pll_status(self):
"""
Status of TPM AD9550 PLLs chip for 40GbE if.
This method returns lock status True if both AD9550 PLLs
are locked. The lock loss counter increments for a loss
of lock event on either PLL.
:return: current lock status and lock loss counter value
:rtype: tuple
"""
if self.has_register("board.regfile.eth10ge.lock"):
lock = self["board.regfile.eth10ge.lock"] & 0x3 == 0x3
loss_of_lock = None
if self.has_register("board.regfile.eth10ge.lol"):
loss_of_lock = self["board.regfile.eth10ge.lol"]
return lock, loss_of_lock
return None
[docs]
def clear_ad9550_pll_status(self):
"""
Resets the value in the AD9550 PLLs lock loss counter to 0.
"""
if self.has_register("board.regfile.eth10ge.lol"):
self["board.regfile.eth10ge.lol"] = 0
return
[docs]
def get_adc_temperature(self, adc_id=None):
"""
Get ADC temperature diode via FPGA sysmon.
:param adc_id: range 0-15 or None
:return: List of ADC temperatures if adc_id=None, selected ADC temperature otherwise
"""
diode = 1
result = []
if adc_id is None:
adc_id = list(range(16))
else:
adc_id = [adc_id]
for adc_idx in adc_id:
# Check if it is a TPM 2.0 which supports ADC temperature read
if self.hw_rev < self.tpm_hw_rev_2_0:
result.append(float("nan"))
else:
if adc_idx < 8:
sysmon_line = "VM_ADC0"
else:
sysmon_line = "VM_ADC1"
self.tpm_adc[adc_idx].adc_enable_temp_meas(
diode_type=1
) # select 1xdiode
sleep(0.01)
v_temp = self.tpm_sysmon[0].get_measure_val(sysmon_line)
if diode == 1:
temp_evaluated = (
v_temp * -542.85
) + 377.14 # get from fig. 87 in ad9695 datasheet
# elif diode==2:
# self.tpm_adc[adc_idx].adc_enable_temp_meas(diode_type=2) #select 1xdiode
# sleep(0.1)
# v_temp_x20=self.tpm_sysmon[0].get_measure_val(sysmon_line)
# vdelta=(v_temp_x20-v_temp)
# temp_evaluated=(vdelta*3999.99)-284.99 #get from fig. 87 in ad9695 datasheet
result.append(round(temp_evaluated, 2))
self.tpm_adc[adc_idx].adc_enable_temp_meas(
diode_type=0
) # set pin to High-Z
if len(result) == 1:
return result[0]
else:
return result
# ----------------------------------------- Private functions -----------------------------------------
def _initialise_board(self):
"""Initialise the TPM board"""
# Load SPI devices if XML file is available
if self.memory_map.has_register("board.info.spi_xml_offset"):
spi_xml = self._get_xml_file(
self.read_register("board.info.spi_xml_offset")
)
self.load_spi_devices(spi_xml)
# CPLD and SPI XML files have been loaded, check whether FPGA have been programmed
# If FPGA is programmed, load the firmware's XML file
if self.is_programmed(fpga_id=0):
self.load_firmware(device=Device.FPGA_1)
if self.is_programmed(fpga_id=1):
self.load_firmware(device=Device.FPGA_2)
return True
def _initialise_devices(
self,
fsample=800e6,
ddc=False,
fddc=0.0,
adc_clock_divider=1,
mono_channel_14_bit=False,
mono_channel_sel=0,
adc_low_bitrate=False,
disabled_adc=[],
fullscale_voltage=1.59,
):
"""Initialise the SPI and other devices on the board"""
# self.set_shutdown_temperature(65)
# Switch voltages ON
# self['board.regfile.ctrl.en_ddr_vdd'] = 1
available_adc = range(len(self.tpm_adc))
enabled_adc = []
for _ in available_adc:
if _ in disabled_adc:
pass
else:
enabled_adc.append(_)
# TODO: this works on TPM 1.2 but not on TPM 1.6
# self['board.regfile.ctrl.ad_pdwn'] = 0x0
# for n in available_adc:
# self.tpm_adc[n].adc_reset()
# self.tpm_adc[n].adc_power_down_enable()
# for n in disabled_adc:
# self._logger.info("Disabling ADC " + str(n))
# self['board.regfile.ctrl.ad_pdwn'] = 0x1
# Initialise PLL (3 retries)
self._logger.info("Configuring AD9528 PLL at " + str(fsample) + " Hz")
for i in range(3):
try:
self.tpm_pll.pll_start(fsample)
break
except PluginError as err:
if i == 2:
raise err
# Initialise ADAs if required
# self.tpm_ada.disable_adas()
# if self._enable_ada:
# self._logger.info("Enabling ADAs")
# self.tpm_ada.initialise_adas()
# self.adas_enabled = True
# else:
# self._logger.info("Not enabling ADAs")
# Initialise ADCs
if self._enable_adc:
if not ddc:
if mono_channel_14_bit:
self._logger.info(
"Enabling single ADC channel with 14 bit sampling"
)
for i in enabled_adc:
self.tpm_adc[i].adc_single_start(
mono_channel_14_bit=mono_channel_14_bit,
mono_channel_sel=mono_channel_sel,
fullscale_voltage=fullscale_voltage,
)
else:
self._logger.info("Enabling DDC, NCO frequency: %f Hz", fddc)
for i in enabled_adc:
self.tpm_adc[i].adc_single_start_dual_14_ddc(
sampling_frequency=fsample,
ddc_frequency=fddc,
decimation_factor=20,
low_bitrate=adc_low_bitrate,
fullscale_voltage=fullscale_voltage,
)
def _get_xml_file(self, xml_offset):
"""Get XML file from board
:param xml_offset: Memory offset where XML file is stored
:return: XML file as string
"""
# Read length of XML file (first 4 bytes from offset)
xml_len = self.read_address(xml_offset)
# Read XML file from board
zipped_xml = self.read_address(xml_offset + 4, int(ceil(xml_len / 4.0)))
# Convert to string, decompress and return
zipped_xml = "".join([format(n, "08x") for n in zipped_xml])
return zlib.decompress(binascii.unhexlify(zipped_xml[: 2 * xml_len]))
def _poll_reg(self, reg, exit_condition, exception_val, max_retry=1000):
start_time = time.time()
retry = 0
while True:
if (self[reg]) == exit_condition:
elapsed_time = time.time() - start_time
# logging.error(f"poll_reg {exception_val} successful. Time taken: {elapsed_time:.4f} seconds.")
break
else:
time.sleep(0.001)
retry = retry + 1
# if self.ticket_num is not None:
# self.i2c_refresh()
if retry >= max_retry:
logging.error("EEP Polling Reg timeout: ", exception_val)
raise LibraryError(exception_val)
# -------------------------- Methods for syntax candy --------------------------
def __getitem__(self, key):
"""Override __getitem__, return value from board"""
# Check if the specified key is a memory address or register name
if isinstance(key, int):
return self.read_address(key)
# Check if the specified key is a tuple, in which case we are reading from a device
if type(key) is tuple:
# Run checks
if not self._checks():
return
if len(key) == 2:
return self.read_device(key[0], key[1])
else:
raise LibraryError(
"A device name and address need to be specified for writing to SPI devices"
)
elif type(key) is str or isinstance(key, basestring):
# Run checks
if not self._checks():
return
# Check if a device is specified in the register name
if self.memory_map.has_register(key):
reg = self.memory_map[key]
return self.read_register(key, reg.size)
else:
raise LibraryError(
"Unrecognised key type, must be register name, memory address or SPI device tuple"
)
# Register not found
raise LibraryError("Register %s not found" % key)
def __setitem__(self, key, value):
"""Override __setitem__, set value on board"""
# Check is the specified key is a memory address or register name
if isinstance(key, int):
return self.write_address(key, value)
# Check if the specified key is a tuple, in which case we are writing to a device
if type(key) is tuple:
# Run checks
if not self._checks():
return
if len(key) == 2:
return self.write_device(key[0], key[1], value)
else:
raise LibraryError(
"A device name and address need to be specified for writing to SPI devices"
)
elif type(key) is str or isinstance(key, basestring):
# Run checks
if not self._checks():
return
# Check if device is specified in the register name
if self.memory_map.has_register(key):
return self.write_register(key, value)
else:
raise LibraryError(
"Unrecognised key type (%s), must be register name or memory address"
% key.__class__.__name__
)
# Register not found
raise LibraryError("Register %s not found" % key)
def __str__(self):
"""Override __str__ to print register information in a human readable format"""
if not self._programmed:
return ""
# Run checks
if not self._checks():
return
# Split register list into devices
registers = {}
for k, v in self.memory_map.register_list.items():
if v["device"] not in list(registers.keys()):
registers[v["device"]] = []
registers[v["device"]].append(v)
# Loop over all devices
string = "Device%sRegister%sAddress%sBitmask\n" % (" " * 2, " " * 37, " " * 8)
string += "------%s--------%s-------%s-------\n" % (" " * 2, " " * 37, " " * 8)
for k, v in registers.items():
for reg in sorted(v, key=lambda arg: arg["name"]):
regspace = " " * (45 - len(reg["name"]))
adspace = " " * (15 - len(hex(reg["address"])))
string += "%s\t%s%s%s%s0x%08X\n" % (
DeviceNames[k],
reg["name"],
regspace,
hex(reg["address"]),
adspace,
reg["bitmask"],
)
# Add SPI devices
if len(self._spi_devices) > 0:
string += "\nSPI Devices\n"
string += "-----------\n"
for key in sorted(self._spi_devices.keys()):
info = self._spi_devices[key]
string += "Name: %s\tspi_sclk: %d\tspi_en: %d\n" % (
info,
info.spi_sclk,
info.spi_en,
)
# Return string representation
return string
# ------------------------------------------------ Tile Class ---------------------------------------
if __name__ == "__main__":
# Enable debug logging
from sys import stdout
log = logging.getLogger("")
log.setLevel(logging.DEBUG)
line_format = logging.Formatter(
"%(asctime)s - %(levelname)s - %(threadName)s - %(message)s"
)
ch = logging.StreamHandler(stdout)
ch.setFormatter(line_format)
log.addHandler(ch)
# Test TPM
tpm = TPM(ip="10.0.10.2")
tpm.connect(ip="10.0.10.2", port=10000)
print(tpm.get_board_info())