Source code for ska_low_sps_tpm_api.boards.tpm

import binascii
import logging
import math
import socket
import struct
import time
import zlib
from math import ceil
from pathlib import Path
from time import sleep

import ska_low_sps_tpm_api.boards.tpm_hw_definitions as tpm_hw_definitions
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.boards.fpgaboard import DeviceNames, FPGABoard
from ska_low_sps_tpm_api.plugins.hw_lock import HwLock, _NoLock
from ska_low_sps_tpm_api.protocols.ucp import UCP


[docs] class TPM(FPGABoard): """FPGABoard subclass for communicating with a TPM board"""
[docs] def __init__(self, **kwargs): """Class constructor""" kwargs["fpgaBoard"] = BoardMake.TpmBoard kwargs["protocol"] = UCP # Set hardcoded CPLD xml offset address self._cpld_xml_offset = tpm_hw_definitions.CPLD_XML_OFFSET self._cpld_magic = tpm_hw_definitions.CPLD_MAGIC_TPM_V1_5 # Initialise variables self._simulator = False self._initialise = False self._enable_ada = False self.adas_enabled = False self._enable_adc = True # CPLD SelectMap Registers self._xil_registers = ["board.smap.xil_0", "board.smap.xil_1"] self._global_register = "board.smap.global" self._c2c_stream_enable = "board.regfile.ena_stream" self.download_firmware_required_cpld_version = 0 self.tpm_hw_rev_1_5 = tpm_hw_definitions.TPM_HW_REV_1_5 self.tpm_hw_rev_2_0 = tpm_hw_definitions.TPM_HW_REV_2_0 self.board_info = None self.hw_rev = None self.CPLD_FW_VER_LOCK_I2C_CHANGE = ( tpm_hw_definitions.CPLD_FW_VER_LOCK_I2C_CHANGE_TPM_V1_5 ) self.MCU_FW_VER_LOCK_I2C_CHANGE = ( tpm_hw_definitions.MCU_FW_VER_LOCK_I2C_CHANGE_TPM_V1_5 ) self.MCU_FW_VER_ERROR_NO_MCU = tpm_hw_definitions.MCU_FW_VER_ERROR_NO_MCU # Call superclass initialiser super(TPM, self).__init__(**kwargs) self.BIOS_REV_list = [ ["0.0.1", "CPLD_0x19121901-MCU_0xb0000010_0x0_0x0"], ["0.0.2", "CPLD_0x19122316-MCU_0xb0000010_0x0_0x0"], ["0.0.3", "CPLD_0x20011611-MCU_0xb0000010_0x0_0x0"], ["0.0.4", "CPLD_0x20011611-MCU_0xb0000016_0x20200123_0x113426"], ["0.0.5", "CPLD_0x20041613-MCU_0xb0000016_0x20200123_0x113426"], ["0.1.0", "CPLD_0x20111309-MCU_0xb0000100_0x20201120_0xe3f6fa2"], ["0.2.0", "CPLD_0x21033009-MCU_0xb0000107_0x20210114_0x0"], ["0.2.1", "CPLD_0x21061417-MCU_0xb0000117_0x20210615_0x0"], ["0.2.2", "CPLD_0x22032409-MCU_0xb0000118_0x20220831_0x0"], ["0.3.0", "CPLD_0x23011618-MCU_0xb000011a_0x20230126_0x0"], ["0.3.1", "CPLD_0x23011618-MCU_0xb000011a_0x20230209_0x0"], ["0.4.0", "CPLD_0x23090714-MCU_0xb000011a_0x20230209_0x0"], ["0.5.0", "CPLD_0x23092511-MCU_0xb000011a_0x20230209_0x0"], ["0.6.0", "CPLD_0x25050815-MCU_0xb000011b_0x20250509_0x29ae87a"], ["1.0.0", "CPLD_0x26031616-MCU_0xb000011c_0x20260318_0x828bd55"], ] self.BOARD_MODE = {"ada": 0, "no-ada": 0xFF} self._spi_lock = _NoLock() self._smap_lock = _NoLock() self._i2c_lock = _NoLock()
[docs] def connect( self, ip, port, fsample=800e6, ddc=False, fddc=0.0, adc_clock_divider=1, mono_channel_14_bit=False, mono_channel_sel=0, adc_low_bitrate=False, fullscale_voltage=1.59, **kwargs, ): """ Overload connect method. :param fsample: Sampling rate :param ip: IP address to connect to :param port: Port to connect to """ # Check if we are simulating or not self._simulator = kwargs.get("simulator", False) # Check if we are initialising or not self._initialise = kwargs.get("initialise", False) # TPM might or might not have an ADA self._enable_ada = kwargs.get("enable_ada", False) # You can skip ADC enabling for test/debug purpose (default True to preserve back compatibility) self._enable_adc = kwargs.get("enable_adc", True) # Call connect on super class super(TPM, self).connect(ip, port, **kwargs) # CPLD is assumed programmed self._programmed[Device.Board] = True # Try to read address 0x30000000 (CPLD version) check whether TPM is reachable try: self.read_address(tpm_hw_definitions.CPLD_VERSION_OFFSET) except Exception: self.status[Device.Board] = Status.NetworkError self._connected = False # Ensure transport resources are released on connect failure. self._protocol.close_connection() raise LibraryError("Could not reach TPM with address {}".format(ip)) _tmp = self.read_address(tpm_hw_definitions.CPLD_MAGIC_OFFSET) if _tmp != self._cpld_magic: raise LibraryError( "TPM CPLD_MAGIC mismatch. Expected 0x%x, got 0x%x." % (self._cpld_magic, _tmp) ) # Get XML file cpld_xml = str(self._get_xml_file(self.read_address(self._cpld_xml_offset))) cpld_xml = cpld_xml.replace('<?xml version="1.0" ?>', "") cpld_xml = "<node>\n{}\n</node>".format(cpld_xml) super(TPM, self).load_firmware(device=Device.Board, register_string=cpld_xml) self._logger.info("tpm.connect: CPLD XML File Downloaded") # Get register "board.regfile.date_code" # necessary to adapt driver behavior for different TPM BIOS # better to retrieve only one time at connect self.board_regfile_date_code = self["board.regfile.date_code"] # Load plugins self.load_plugins() # Check Global status alarm get_global_status_alarms = self.get_global_status_alarms() # Initialise devices if required if not self._simulator and self._connected: if self._initialise: self._initialise_devices( fsample=fsample, ddc=ddc, fddc=fddc, adc_clock_divider=adc_clock_divider, mono_channel_14_bit=mono_channel_14_bit, mono_channel_sel=mono_channel_sel, fullscale_voltage=fullscale_voltage, adc_low_bitrate=adc_low_bitrate, )
# else: # self.tpm_pll.pll_reset()
[docs] def load_plugins(self): """load tpm plugins""" # Load the CPLD plugin (does not require the board to be initialised) if self.memory_map.has_register("board.lock.lock_anaspi"): self._spi_lock = HwLock(board=self, lock_register="board.lock.lock_anaspi") if self.memory_map.has_register("board.lock.lock_smap"): self._smap_lock = HwLock(board=self, lock_register="board.lock.lock_smap") if self.memory_map.has_register("board.lock.lock_i2c"): self._i2c_lock = HwLock(board=self, lock_register="board.lock.lock_i2c") self.load_plugin("TpmProgFlash") self.load_plugin("TpmCpld") self.load_plugin("TpmEEP") self.load_plugin("TpmMcu") self.i2c_old_mode = False if ( self[tpm_hw_definitions.CPLD_MCU_VERSION_OFFSET] != self.MCU_FW_VER_ERROR_NO_MCU ): if ( self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE or self[tpm_hw_definitions.CPLD_MCU_VERSION_OFFSET] <= self.MCU_FW_VER_LOCK_I2C_CHANGE ): self.i2c_old_mode = True else: self._logger.warning( "Board is running without MCU, reduced functionality available!" ) self.i2c_old_mode = True # Load QSFP_ADAPTER plugins self.load_plugin("TpmQSFPAdapter", core_id=0) self.load_plugin("TpmQSFPAdapter", core_id=1) # Read Hardware information from EEPROM if self.board_info is None: self.board_info = self.get_board_info() self.hw_rev = self.get_hardware_revision() self._logger.info("Board SN: %s" % self.board_info["SN"]) self._logger.info(f"Board EXT_LABEL: {self.board_info['EXT_LABEL']}") self._logger.info("Board HARDWARE_REV: %s" % self.board_info["HARDWARE_REV"]) self._logger.info("Board bios: %s" % self.board_info["bios"]) # Check if TPM supports C2C-LVDS FPGA firmware self.tpm_c2c_lvds_supported = not self.has_register("board.regfile.c2c_pll") if self["board.regfile.key_ok"] == 0: self._logger.warning( "Board is running without License, functionality will be corrupted after 1 hour from power-up" ) if self["board.regfile.timebomb_status"] == 0: self._logger.error( "License timebomb already expired! Please power off and power on the board to recover functionality" ) raise LibraryError( "License timebomb already expired! Please power off and power on the board to recover functionality" ) if not self._simulator and self._connected: # Load PREADU plugins self.load_plugin("TpmPreAdu", preadu_id=0) self.load_plugin("TpmPreAdu", preadu_id=1) # Try initialising the board (will check if FPGAs are programmed) if not self._initialise_board(): return # Pre-load all required plugins. Board-level devices are loaded here [ self.load_plugin("TpmFirmwareInformation", firmware=x) for x in range(1, 4) ] # Load ADCs [ self.load_plugin("TpmAdc9695", adc_id=adc) for adc in [ "adc0", "adc1", "adc2", "adc3", "adc4", "adc5", "adc6", "adc7", "adc8", "adc9", "adc10", "adc11", "adc12", "adc13", "adc14", "adc15", ] ] # Load PLL self.load_plugin("TpmPll") # Update firmware information if self.is_programmed(): [info.update_information() for info in self.tpm_firmware_information]
[docs] def smap_select_fpga(self, fpga_list): for n in fpga_list: self[self._xil_registers[n]] = 0x10
[docs] def smap_deselect_fpga(self, fpga_list): for n in fpga_list: self[self._xil_registers[n]] = 0x0
[docs] def set_qsfp_led(self, id, val): leds = [ {"device": 0, "name": "LED0n"}, {"device": 0, "name": "LED1n"}, {"device": 0, "name": "LED2n"}, {"device": 0, "name": "LED3n"}, {"device": 1, "name": "LED0n"}, {"device": 1, "name": "LED1n"}, {"device": 1, "name": "LED2n"}, {"device": 1, "name": "LED3n"}, ] # | LED | COLOR | FUNCTION | # |-----|-------|-----------------------| # | 1 | GREEN | QSFP-1 link-up | # | 2 | GREEN | FPGAs programmed | # | 3 | RED | available (SW error?) | # | 4 | RED | MCU | # | 5 | GREEN | QSFP-2 link-up | # | 6 | GREEN | ADCs configured | # | 7 | RED | available (SW error?) | # | 8 | RED | MCU | # LED 4 8 are managed by MCU # | TPM STATUS | LED 4 | LED 8 | # |-----------------|-------|-------| # | OFF | OFF | OFF | # | STANDBY | ON | ON | # | SAFETY SHUTDOWN | ON | OFF | # | NA | OFF | ON | led = leds[id - 1] if val == 0: val = 1 else: val = 0 if self.tpm_qsfp_adapter[led["device"]].present: self.tpm_qsfp_adapter[led["device"]].set(led["name"], val)
[docs] def erase_fpga(self, force=True): """Erase FPGAs configuration memory""" def smap_data_format(dat): def reverse_bit(num): result = 0 for n in range(8): result = (result << 1) + (num & 1) num >>= 1 return result smap_dat = 0 for n in range(4): smap_dat |= reverse_bit(dat >> 8 * n) << 8 * n return smap_dat # Check if connected if not self.is_connected(): raise LibraryError("TPM needs to be connected in order to program FPGAs") self[self._c2c_stream_enable] = 0x0 # Disable C2C stream # Temporary self[self._global_register] = 0x3 # Erase FPGAs SRAM for fpga_id in range(len(self._xil_registers)): xil_register = self._xil_registers[fpga_id] self._logger.info("Erasing FPGA%d using PROG pin method" % fpga_id) self[xil_register] = 0x10 # Select FPGA self[self._global_register] = 0x0 # PROG = 0 max_attempt = 10 attempt = 0 fpga_still_programmed = 0 while self[xil_register] & 0x1 == 1: # wait for INIT if attempt == max_attempt: fpga_still_programmed = 1 break attempt += 1 sleep(0.1) self[self._global_register] = 0x3 while self[xil_register] & 0x1 == 0: # wait for INIT if attempt == max_attempt: fpga_still_programmed = 1 break attempt += 1 sleep(0.1) # If FPGA erase with PROG pin method timed out, try with ICAP3 method if fpga_still_programmed == 1: self._logger.error( "Erasing FPGA%d using PROG pin method failed" % fpga_id ) raise LibraryError( "Erasing FPGA%d using PROG pin method failed" % fpga_id ) self[self._global_register] = 0x3 try: # Power off led related to programming and QSFP link-up when erase fpga self.set_qsfp_led(2, 0) self.set_qsfp_led(1, 0) self.set_qsfp_led(5, 0) except LibraryError as e: if str(e) == "I2C/EEP request not accepted!": logging.error(f"Failed erasing FPGAs: {repr(e)}", exc_info=True) else: raise
[docs] def download_firmware(self, device, bitfile): """ Download bitfile to FPGA. :param device: FPGA to download bitfile :param bitfile: Bitfile to download """ if self["board.regfile.enable.fpga"] == 0: self._logger.info("FPGAs power disabled. Enabling") self["board.regfile.enable.fpga"] = 1 time.sleep(0.5) else: self._logger.info("FPGAs power already enabled.") # Lock SMAP interface to avoi MCU access conflict start_lock = time.time() if self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE: self._logger.info("Locking SMAP interface to avoid MCU access conflict") locked = False retry = 10 while retry > 0: self["board.lock.mlock1"] = 0x50000000 if self["board.lock.mlock1"] != 0x50000000: retry = retry - 1 time.sleep(0.01) else: locked = True break else: self._logger.info("Locking SMAP interface to avoid MCU access conflict") self._smap_lock.lock() locked = self._smap_lock.is_locked if locked is False: self._logger.error("ERROR: Can't Lock SMAP interface") raise LibraryError("ERROR: Can't Lock SMAP interface") # Program FPGA """ Download bitfile to FPGA :param device: FPGA to download bitfile :param bitfile: Bitfile to download """ # Check if connected if self.status[Device.Board] != Status.OK: raise LibraryError("TPM needs to be connected in order to program FPGAs") required_cpld_version = self.download_firmware_required_cpld_version cpld_version = self[tpm_hw_definitions.CPLD_VERSION_OFFSET] if cpld_version < required_cpld_version or cpld_version & 0xF0 == 0xB0: self._logger.error( "CPLD firmware version is too old. Required version is " + hex(required_cpld_version) ) raise LibraryError( "CPLD firmware version is too old. Required version is " + hex(required_cpld_version) ) # rearm lock if self.board_regfile_date_code > self.CPLD_FW_VER_LOCK_I2C_CHANGE: self._smap_lock.refresh() # Sanity checks on provided bitfile (exists and size) f = Path(bitfile) if not f.exists(): raise LibraryError("Provided bitfile ({}) does not exist".format(bitfile)) elif f.stat().st_size < 1024: raise LibraryError( "Provided bitfile ({}) is too small ({} bytes)".format( bitfile, f.stat().st_size ) ) # Disable C2C stream self[self._c2c_stream_enable] = 0x0 # Select FPGAs to program self.smap_deselect_fpga([0, 1]) self[self._global_register] = 0x3 # Erase FPGAs SRAM self.erase_fpga(force=True) # Read bitstream with open(bitfile, "rb") as fp: data = fp.read() self.smap_select_fpga([0, 1]) self[self._global_register] = 0x2 fifo_register = 0x50001000 # rearm lock if self.board_regfile_date_code > self.CPLD_FW_VER_LOCK_I2C_CHANGE: self._smap_lock.refresh() start = time.perf_counter() # Check if ucp_smap_write is supported, otherwise fallback to UCP writes # if ucp_smap_write is not supported, use normal UCP access if cpld_version < tpm_hw_definitions.CPLD_FW_VER_REQUIRED_FOR_SMAP_DOWNLOAD: self._logger.info("FPGA programming using UCP write") self._protocol.select_map_program( data, fifo_addr=fifo_register, ucp_smap_write=False ) else: if cpld_version <= self.CPLD_FW_VER_LOCK_I2C_CHANGE: self._logger.info("FPGA programming using fast SelectMap write") self._protocol.select_map_program( data, fifo_addr=None, ucp_smap_write=True ) else: self._logger.info( "FPGA programming using fast SelectMap with SMAP lock" ) self._protocol.select_map_program( data, fifo_addr=None, ucp_smap_write=True, lock_obj=self._smap_lock, ) # Wait for operation to complete status_read = 0 for n, xil_register in enumerate(self._xil_registers): while self[xil_register] & 0x2 != 0x2: # DONE high time.sleep(0.01) status_read += 1 if status_read == 100: self._logger.error( "FPGA" + str(n) + " DONE is not high. This means that either the FPGA is not \ programmed or the CPLD is not detecting the DONE signal correctly due to an hardware issue. \ In the first case further errors will be detected later. It is necessary to power cycle the board, \ re-try configuration and check if this error disappears. In the second case initialisation might \ complete without errors and this message can be ignored, however this still highlights that \ a minor hardware issue affects the TPM." ) break end = time.perf_counter() self._logger.info("FPGA programming time: " + str(end - start) + "s") self.smap_deselect_fpga([0, 1]) self[self._global_register] = 0x3 # Brute force check to make sure we can communicate with programmed TPM magic_ok = False for n in range(4): try: if self.tpm_c2c_lvds_supported: logging.info( "C2C PLL register not found, skipping calibrate_fpga_to_cpld" ) self.set_c2c_calibration(False) else: self.calibrate_fpga_to_cpld() magic0 = self[tpm_hw_definitions.FPGA_MAGIC_FPGA0_OFFSET] magic1 = self[tpm_hw_definitions.FPGA_MAGIC_FPGA1_OFFSET] if magic0 == magic1 == tpm_hw_definitions.FPGA_MAGIC: magic_ok = True break else: self._logger.info( "FPGA magic numbers are not correct %s, %s" % (hex(magic0), hex(magic1)) ) except: pass # !TODO: fix CPLD reset to work with TPM 1.6 # self._logger.info("Not possible to communicate with the FPGAs. Resetting CPLD...") # self.write_address(0x30000008, 0x8000, retry=False) # Global Reset CPLD # time.sleep(0.2) # self.write_address(0x30000008, 0x8000, retry=False) # Global Reset CPLD time.sleep(0.2) # Unlock SMAP interface end_lock = time.time() self._logger.info( "Unlocking SMAP interface: elapsed time %s", str(end_lock - start_lock) ) if self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE: self["board.lock.mlock1"] = 0xFFFFFFFF else: self._smap_lock.unlock() if magic_ok: self.set_qsfp_led(2, 1) return else: if self.tpm_c2c_lvds_supported: # Detected BIOS >= 1.0.0 requires FPGA firmware with C2C-LVDS functionality, version >= 11.0.0. raise LibraryError( f"Not possible to communicate with the FPGAs. Current TPM BIOS {self.tpm_bios_version_number} requires C2C-LVDS functionality. Check if the correct FPGA bitfile has been loaded (required >= 11.0.0)." ) else: # Detected BIOS < 1.0.0 requires FPGA firmware without C2C-LVDS functionality raise LibraryError( f"Not possible to communicate with the FPGAs. Current TPM BIOS {self.tpm_bios_version_number} requires FPGA firmware without C2C-LVDS functionality. Check if the correct FPGA bitfile has been loaded (required <= 10.0.0)." )
[docs] def calibrate_fpga_to_cpld(self): """Calibrate communication between FPGAs and CPLD""" self.set_c2c_calibration(True) # Disable c2c streaming self[self._c2c_stream_enable] = 0x0 while self[self._c2c_stream_enable] != 0x0: time.sleep(0.01) # PLL in CPLD calibrated in 64 steps (done twice) devices = ["fpga1", "fpga2"] for f in devices: if f == "fpga1": m = 0 phasesel = 0 else: m = 1 phasesel = 1 lo = -1 this_error = -1 mask = 0x1 << (4 + m) for n in range(128): self._smap_lock.refresh() time.sleep(0.01) previous_error = this_error this_error = (self[0x30000040] & mask) >> (4 + m) if ( this_error == 0 and ( previous_error == 1 or previous_error == 2 or previous_error == 3 ) and lo == -1 ): lo = n if ( (this_error == 1 or this_error == 2 or this_error == 3) and previous_error == 0 and lo != -1 ): k = (((n - 1) - lo) // 2) + 1 for x in range(k): self[0x30000028] = 0x010 + (phasesel << 8) self[0x30000028] = 0x011 + (phasesel << 8) self[0x30000028] = 0x010 + (phasesel << 8) time.sleep(0.02) self._logger.info("%s to CPLD calibrated", f.upper()) self._logger.info( "%s to CPLD. Start phase: %i, Stop phase %i " % (f.upper(), lo, n) ) # Disable calibration pattern transmission if m == 0: break else: self.set_c2c_calibration(False) return # Advancing PLL phase self[0x30000028] = 0x000 + (phasesel << 8) self[0x30000028] = 0x001 + (phasesel << 8) self[0x30000028] = 0x000 + (phasesel << 8) time.sleep(0.01) self._logger.fatal("Could not calibrate FPGA to CPLD streaming")
[docs] def set_c2c_calibration(self, enable=True): if enable: wr = 0x5A01 else: wr = 0x1 # self['board.regfile.c2c_ctrl.mm_read_stream'] = 0 self[0x3000002C] = 1 for ba in [0x0, 0x10000000]: try: self.write_address(ba + 0x90, wr, retry=False) # self[ba + 0x90] = 0x5A00 except: pass
[docs] def load_firmware( self, device, register_string=None, load_values=False, base_address=0 ): """ Override uperclass load_firmware to extract memory map from the bitfile. :param base_address: base address at which to load firmware :param device: Device on board to load firmware to :param register_string: String containing register information :param load_values: Load register values """ # Check if device is valid if device not in [Device.Board, Device.FPGA_1, Device.FPGA_2]: raise LibraryError("TPM devices can only be Board, FPGA_1 and FPGA_2") # Check if connected if not self._connected: raise LibraryError("Not connected to board, cannot load firmware") # Get FPGA base address and check if firmware is loaded in FPGA base_address = ( self["board.info.fpga1_base_add"] if device == Device.FPGA_1 else self["board.info.fpga2_base_add"] ) try: loaded = ( self["board.regfile.fpga1_programmed_fw"] if device == Device.FPGA_1 else self["board.regfile.fpga2_programmed_fw"] ) except: loaded = 1 register = "board.info.fw%d_xml_offset" % loaded if not self.memory_map.has_register(register): raise LibraryError( "CPLD register information must be loaded prior to loading firmware" ) # Get design from extended info BRAM design = "tpm_test" # default design try: # determine first and last address of extended info BRAM # and read contents offset = self["board.info.fw1_extended_info_offset"] size = self[int(offset)] data = self.read_address(offset + 4, n=int(ceil(size / 4.0))) # convert contents back to ascii and uncompress data = "".join([format(n, "08x") for n in data]) data = zlib.decompress(binascii.unhexlify(data[: 2 * size])) # parse extended info for "\n\nDESIGN: <design>\n\n" for item in data.decode().split("\n\n"): key = item.split(": ")[0].strip().lower() if key == "design": design = item.split(": ")[1] break except: pass # Get XML file offset and read XML file from board register_string = self._get_xml_file(self[register]) register_string = register_string.replace(b'<?xml version="1.0" ?>', b"") register_string = register_string.replace( f'id="{design}"'.encode(), b'id="fpga1"' if device == Device.FPGA_1 else b'id="fpga2"', ) register_string = "<node>\n{}\n</node>".format(register_string) # Call superclass with this file super(TPM, self).load_firmware( device=device, register_string=register_string, base_address=base_address, load_values=load_values, )
[docs] def is_programmed(self, fpga_id=None): """Returns True if Board is programmed""" if not self.is_connected(): self._logger.debug("tpm.is_programmed") return None self._logger.debug( "FPGAs is_programmed? Enable %d, Done[0-1] %d" % (self["board.regfile.enable.fpga"], self["board.regfile.xilinx.done"]) ) self._programmed[Device.FPGA_1] = self["board.regfile.enable.fpga"] and bool( self["board.regfile.xilinx.done"] & 0x1 ) self._programmed[Device.FPGA_2] = self["board.regfile.enable.fpga"] and bool( self["board.regfile.xilinx.done"] & 0x2 ) if fpga_id is None: if (not self._programmed[Device.FPGA_1]) and ( not self._programmed[Device.FPGA_2] ): self._logger.debug("Return False") return False self._logger.debug("Return True") return True else: if fpga_id == 0: self._logger.debug("Return %s" % self._programmed[Device.FPGA_1]) return bool(self._programmed[Device.FPGA_1]) elif fpga_id == 1: self._logger.debug("Return %s" % self._programmed[Device.FPGA_2]) return bool(self._programmed[Device.FPGA_2]) return None
[docs] def set_lmc_ip( self, ip="10.0.10.1", port=None, port2=None, port3=None, same_port=True ): """ Set the IP address for LMC data transfer. :param ip: IP address in string form :param port: Port :param port2: Port for stream 1 :param port3: Port for stream 2 :param same_port: Use the same port for all ports """ # Set Stream 0: # Set LMC IP self["board.regfile.stream_dst_ip0"] = struct.unpack( "!I", socket.inet_aton(ip) )[0] # Set LMC destination and source port if port is None or type(port) is not int: port = self["board.regfile.stream_dst_port0"] & 0xFFFF self["board.regfile.stream_src_port0"] = 10001 self["board.regfile.stream_dst_port0"] = port # Set Stream 1: # Set LMC IP self["board.regfile.stream_dst_ip1"] = struct.unpack( "!I", socket.inet_aton(ip) )[0] # Set LMC destination and source port self["board.regfile.stream_src_port1"] = 10001 if port2 is not None and type(port) is int: self["board.regfile.stream_dst_port1"] = port2 elif same_port: self["board.regfile.stream_dst_port1"] = port
# # Set Stream 2: # # Set LMC IP # self['board.regfile.stream_dst_ip2'] = struct.unpack("!I", socket.inet_aton(ip))[0] # # # Set LMC destination and source port # self['board.regfile.stream_src_port2'] = 10001 # if port3 is not None and type(port) is int: # self['board.regfile.stream_dst_port2'] = port3 # elif same_port: # self['board.regfile.stream_dst_port2'] = port
[docs] def temperature(self): """Get board temperature""" return self.tpm_monitor.get_temperature()
[docs] def voltage(self): """Get board voltage""" return self.tpm_monitor.get_voltage_5v0()
[docs] def set_shutdown_temperature(self, temp): self._logger.info( "Shutdown temperature not supported use set_board_alm_temp_thresholds in tpm_monitor" )
[docs] def get_firmware_list(self, device=Device.Board): """ Get list of downloaded firmware on TPM FLASH. :param device: This should always be the board for the TPM :return: List of design name as well as version """ # Got through all firmware information plugins and extract information firmware = [] for plugin in self.tpm_firmware_information: # Update information plugin.update_information() # Check if design is valid: if plugin.get_design() is not None: firmware.append( { "design": plugin.get_design(), "major": plugin.get_major_version(), "minor": plugin.get_minor_version(), } ) # All done, return return firmware
[docs] def get_part_number(self): return self.tpm_eep.get_field("PN")
[docs] def get_serial_number(self): return self.tpm_eep.get_field("SN")
[docs] def get_hardware_revision(self): hw_rev_arr = self.tpm_eep.get_field("HARDWARE_REV") hw_rev = 0 for byte in hw_rev_arr: hw_rev = hw_rev * 256 + byte if hw_rev == 0xFFFFFF or hw_rev == 0x0: self._logger.error( "Could not read HARDWARE_REV from EEPROM, returned: " + hex(hw_rev) ) return hw_rev
[docs] def get_bios(self): bios_string = ( f"CPLD_{self.tpm_cpld.get_version()}-MCU_{self.tpm_monitor.get_version()}" ) for version, version_bios_string in self.BIOS_REV_list: if version_bios_string == bios_string: self.tpm_bios_version_number = f"v{version}" return f"v{version} ({bios_string})" self._logger.critical( f"BIOS version {bios_string} is not on the BIOS revision List" ) self.tpm_bios_version_number = f"v.?.?.?" return f"v?.?.? ({bios_string})"
[docs] def get_mac(self): mac = self.tpm_eep.get_field("MAC") mac_str = "" for i in range(0, len(mac) - 1): mac_str += "{0:02x}".format(mac[i]) + ":" mac_str += "{0:02x}".format(mac[len(mac) - 1]) return mac_str
def __get_ext_label__(self): eep_rev = self.tpm_eep.get_field("eep_rev") _label = [] _label.append(self.tpm_eep.get_field("EXT_LABEL_PN")) _label.append(self.tpm_eep.get_field("EXT_LABEL_SN")) if eep_rev == tpm_hw_definitions.EEPROM_VER_FULL_EXT_LABEL: # NEW BEHAVIOR with whole EXT_LABEL semi-column separated value _label.append(self.tpm_eep.get_field("EXT_LABEL_2")) try: _label.append(self.tpm_qsfp_adapter[0].get_field("TPM_EXT_LABEL_3")) except: self._logger.warning("QSFP-1 missing, cannot get TPM_EXT_LABEL_3 field") return "".join(_label) elif eep_rev == tpm_hw_definitions.EEPROM_VER_OLD: # OLD BEHAVIOR with EXT_LABEL_PN-EXT_LABEL_SN, remove empty filed and "-".join _label = list(filter(None, _label)) # remove empty fields return "-".join(_label) else: raise BoardError(f"Unknown eep_rev {hex(eep_rev)}") def __set_ext_label__(self, _label): eep_rev = self.tpm_eep.get_field("eep_rev") if eep_rev != tpm_hw_definitions.EEPROM_VER_FULL_EXT_LABEL: self.tpm_eep.set_field( "eep_rev", tpm_hw_definitions.EEPROM_VER_FULL_EXT_LABEL, override_protected=True, ) fields = { "EXT_LABEL_PN": self.tpm_eep, "EXT_LABEL_SN": self.tpm_eep, "EXT_LABEL_2": self.tpm_eep, "TPM_EXT_LABEL_3": self.tpm_qsfp_adapter[0], } for key, obj in fields.items(): _str = _label[0 : obj.eep_sec[key]["size"]] _label = _label[obj.eep_sec[key]["size"] :] if key == "TPM_EXT_LABEL_3": try: obj.set_field(key, _str, override_protected=True) except: self._logger.warning( "QSFP-1 missing, cannot set TPM_EXT_LABEL_3 field" ) else: obj.set_field(key, _str, override_protected=True)
[docs] def get_board_info(self): if self.board_info is not None: return self.board_info if self.board_regfile_date_code <= self.CPLD_FW_VER_LOCK_I2C_CHANGE: tpm_info = { "ip_address": long2ip(self["board.regfile.eth_ip"]), "netmask": long2ip(self["board.regfile.eth_mask"]), "gateway": long2ip(self["board.regfile.eth_gway"]), "ip_address_eep": self.tpm_eep.get_field("ip_address"), "netmask_eep": self.tpm_eep.get_field("netmask"), "gateway_eep": self.tpm_eep.get_field("gateway"), "MAC": self.get_mac(), "SN": self.get_serial_number(), "PN": self.get_part_number(), "bios": self.get_bios(), "EXT_LABEL": self.__get_ext_label__(), } else: tpm_info = { "ip_address": long2ip(self["board.i2c.ip"]), "netmask": long2ip(self["board.i2c.netmask"]), "gateway": long2ip(self["board.i2c.gateway"]), "ip_address_eep": self.tpm_eep.get_field("ip_address"), "netmask_eep": self.tpm_eep.get_field("netmask"), "gateway_eep": self.tpm_eep.get_field("gateway"), "MAC": self.get_mac(), "SN": self.get_serial_number(), "PN": self.get_part_number(), "bios": self.get_bios(), "EXT_LABEL": self.__get_ext_label__(), } if self.tpm_eep.get_field("BOARD_MODE") == self.BOARD_MODE["ada"]: tpm_info["BOARD_MODE"] = "ADA" elif self.tpm_eep.get_field("BOARD_MODE") == self.BOARD_MODE["no-ada"]: tpm_info["BOARD_MODE"] = "NO-ADA" else: tpm_info["BOARD_MODE"] = "UNKNOWN" pcb_rev = self.tpm_eep.get_field("PCB_REV") if pcb_rev == 0xFF: pcb_rev_string = "" else: pcb_rev_string = str(pcb_rev) hw_rev = self.tpm_eep.get_field("HARDWARE_REV") tpm_info["HARDWARE_REV"] = ( "v" + str(hw_rev[0]) + "." + str(hw_rev[1]) + "." + str(hw_rev[2]) + pcb_rev_string ) ddr_size = self.tpm_eep.get_field("DDR_SIZE_GB") if ddr_size == 0xFF: tpm_info["DDR_SIZE_GB"] = "4" else: tpm_info["DDR_SIZE_GB"] = str(ddr_size) return tpm_info
[docs] def get_global_status_alarms(self): alarms = { "I2C_access_alm": (self["board.regfile.global_status"] & 0xF0000) >> 16, "temperature_alm": self["board.regfile.global_status.temperature"], "voltage_alm": self["board.regfile.global_status.voltage"], "SEM_wd": self["board.regfile.global_status.SEM"], "MCU_wd": self["board.regfile.global_status.MCU"], } active_alarms = list(filter(lambda key: alarms[key] == 2, alarms)) active_warnings = list(filter(lambda key: alarms[key] == 1, alarms)) if len(active_alarms): self._logger.error(f"Global status alarm: {active_alarms}") if "MCU_wd" in active_alarms: self._logger.error( f"Global status alarm: mcu_status - {self.tpm_monitor.get_mcu_last_exec_phase_id()}" ) self._logger.error( f"Global status alarm: wait 2s to verify if mcu is stuck" ) time.sleep(2) self._logger.error( f"Global status alarm: mcu_status - {self.tpm_monitor.get_mcu_last_exec_phase_id()}" ) if alarms["temperature_alm"] == 2 or alarms["voltage_alm"] == 2: self._logger.error( f"Global status alarm: err_status - {self.tpm_monitor.get_last_error()}" ) if len(active_warnings): self._logger.warning(f"Global status alarm: {active_warnings}") if alarms["temperature_alm"] == 1 or alarms["voltage_alm"] == 1: self._logger.warning( f"Global status alarm: warn_status - {self.tpm_monitor.get_last_warning()}" ) return alarms
[docs] def clear_global_status_alarms(self): """Clear global status alarm flags""" self["board.regfile.global_status_ack.voltage"] = 0x1 self["board.regfile.global_status_ack.temperature"] = 0x1 self._poll_reg( "board.regfile.global_status_ack.voltage", 0, "NOT voltage ACK Detected on clear_global_status_alarms", ) self._poll_reg( "board.regfile.global_status_ack.temperature", 0, "NOT temperature ACK Detected on clear_global_status_alarms ", )
[docs] def check_ad9550_pll_status(self): """ Status of TPM AD9550 PLLs chip for 40GbE if. This method returns lock status True if both AD9550 PLLs are locked. The lock loss counter increments for a loss of lock event on either PLL. :return: current lock status and lock loss counter value :rtype: tuple """ if self.has_register("board.regfile.eth10ge.lock"): lock = self["board.regfile.eth10ge.lock"] & 0x3 == 0x3 loss_of_lock = None if self.has_register("board.regfile.eth10ge.lol"): loss_of_lock = self["board.regfile.eth10ge.lol"] return lock, loss_of_lock return None
[docs] def clear_ad9550_pll_status(self): """ Resets the value in the AD9550 PLLs lock loss counter to 0. """ if self.has_register("board.regfile.eth10ge.lol"): self["board.regfile.eth10ge.lol"] = 0 return
[docs] def get_adc_temperature(self, adc_id=None): """ Get ADC temperature diode via FPGA sysmon. :param adc_id: range 0-15 or None :return: List of ADC temperatures if adc_id=None, selected ADC temperature otherwise """ diode = 1 result = [] if adc_id is None: adc_id = list(range(16)) else: adc_id = [adc_id] for adc_idx in adc_id: # Check if it is a TPM 2.0 which supports ADC temperature read if self.hw_rev < self.tpm_hw_rev_2_0: result.append(float("nan")) else: if adc_idx < 8: sysmon_line = "VM_ADC0" else: sysmon_line = "VM_ADC1" self.tpm_adc[adc_idx].adc_enable_temp_meas( diode_type=1 ) # select 1xdiode sleep(0.01) v_temp = self.tpm_sysmon[0].get_measure_val(sysmon_line) if diode == 1: temp_evaluated = ( v_temp * -542.85 ) + 377.14 # get from fig. 87 in ad9695 datasheet # elif diode==2: # self.tpm_adc[adc_idx].adc_enable_temp_meas(diode_type=2) #select 1xdiode # sleep(0.1) # v_temp_x20=self.tpm_sysmon[0].get_measure_val(sysmon_line) # vdelta=(v_temp_x20-v_temp) # temp_evaluated=(vdelta*3999.99)-284.99 #get from fig. 87 in ad9695 datasheet result.append(round(temp_evaluated, 2)) self.tpm_adc[adc_idx].adc_enable_temp_meas( diode_type=0 ) # set pin to High-Z if len(result) == 1: return result[0] else: return result
# ----------------------------------------- Private functions ----------------------------------------- def _initialise_board(self): """Initialise the TPM board""" # Load SPI devices if XML file is available if self.memory_map.has_register("board.info.spi_xml_offset"): spi_xml = self._get_xml_file( self.read_register("board.info.spi_xml_offset") ) self.load_spi_devices(spi_xml) # CPLD and SPI XML files have been loaded, check whether FPGA have been programmed # If FPGA is programmed, load the firmware's XML file if self.is_programmed(fpga_id=0): self.load_firmware(device=Device.FPGA_1) if self.is_programmed(fpga_id=1): self.load_firmware(device=Device.FPGA_2) return True def _initialise_devices( self, fsample=800e6, ddc=False, fddc=0.0, adc_clock_divider=1, mono_channel_14_bit=False, mono_channel_sel=0, adc_low_bitrate=False, disabled_adc=[], fullscale_voltage=1.59, ): """Initialise the SPI and other devices on the board""" # self.set_shutdown_temperature(65) # Switch voltages ON # self['board.regfile.ctrl.en_ddr_vdd'] = 1 available_adc = range(len(self.tpm_adc)) enabled_adc = [] for _ in available_adc: if _ in disabled_adc: pass else: enabled_adc.append(_) # TODO: this works on TPM 1.2 but not on TPM 1.6 # self['board.regfile.ctrl.ad_pdwn'] = 0x0 # for n in available_adc: # self.tpm_adc[n].adc_reset() # self.tpm_adc[n].adc_power_down_enable() # for n in disabled_adc: # self._logger.info("Disabling ADC " + str(n)) # self['board.regfile.ctrl.ad_pdwn'] = 0x1 # Initialise PLL (3 retries) self._logger.info("Configuring AD9528 PLL at " + str(fsample) + " Hz") for i in range(3): try: self.tpm_pll.pll_start(fsample) break except PluginError as err: if i == 2: raise err # Initialise ADAs if required # self.tpm_ada.disable_adas() # if self._enable_ada: # self._logger.info("Enabling ADAs") # self.tpm_ada.initialise_adas() # self.adas_enabled = True # else: # self._logger.info("Not enabling ADAs") # Initialise ADCs if self._enable_adc: if not ddc: if mono_channel_14_bit: self._logger.info( "Enabling single ADC channel with 14 bit sampling" ) for i in enabled_adc: self.tpm_adc[i].adc_single_start( mono_channel_14_bit=mono_channel_14_bit, mono_channel_sel=mono_channel_sel, fullscale_voltage=fullscale_voltage, ) else: self._logger.info("Enabling DDC, NCO frequency: %f Hz", fddc) for i in enabled_adc: self.tpm_adc[i].adc_single_start_dual_14_ddc( sampling_frequency=fsample, ddc_frequency=fddc, decimation_factor=20, low_bitrate=adc_low_bitrate, fullscale_voltage=fullscale_voltage, ) def _get_xml_file(self, xml_offset): """Get XML file from board :param xml_offset: Memory offset where XML file is stored :return: XML file as string """ # Read length of XML file (first 4 bytes from offset) xml_len = self.read_address(xml_offset) # Read XML file from board zipped_xml = self.read_address(xml_offset + 4, int(ceil(xml_len / 4.0))) # Convert to string, decompress and return zipped_xml = "".join([format(n, "08x") for n in zipped_xml]) return zlib.decompress(binascii.unhexlify(zipped_xml[: 2 * xml_len])) def _poll_reg(self, reg, exit_condition, exception_val, max_retry=1000): start_time = time.time() retry = 0 while True: if (self[reg]) == exit_condition: elapsed_time = time.time() - start_time # logging.error(f"poll_reg {exception_val} successful. Time taken: {elapsed_time:.4f} seconds.") break else: time.sleep(0.001) retry = retry + 1 # if self.ticket_num is not None: # self.i2c_refresh() if retry >= max_retry: logging.error("EEP Polling Reg timeout: ", exception_val) raise LibraryError(exception_val) # -------------------------- Methods for syntax candy -------------------------- def __getitem__(self, key): """Override __getitem__, return value from board""" # Check if the specified key is a memory address or register name if isinstance(key, int): return self.read_address(key) # Check if the specified key is a tuple, in which case we are reading from a device if type(key) is tuple: # Run checks if not self._checks(): return if len(key) == 2: return self.read_device(key[0], key[1]) else: raise LibraryError( "A device name and address need to be specified for writing to SPI devices" ) elif type(key) is str or isinstance(key, basestring): # Run checks if not self._checks(): return # Check if a device is specified in the register name if self.memory_map.has_register(key): reg = self.memory_map[key] return self.read_register(key, reg.size) else: raise LibraryError( "Unrecognised key type, must be register name, memory address or SPI device tuple" ) # Register not found raise LibraryError("Register %s not found" % key) def __setitem__(self, key, value): """Override __setitem__, set value on board""" # Check is the specified key is a memory address or register name if isinstance(key, int): return self.write_address(key, value) # Check if the specified key is a tuple, in which case we are writing to a device if type(key) is tuple: # Run checks if not self._checks(): return if len(key) == 2: return self.write_device(key[0], key[1], value) else: raise LibraryError( "A device name and address need to be specified for writing to SPI devices" ) elif type(key) is str or isinstance(key, basestring): # Run checks if not self._checks(): return # Check if device is specified in the register name if self.memory_map.has_register(key): return self.write_register(key, value) else: raise LibraryError( "Unrecognised key type (%s), must be register name or memory address" % key.__class__.__name__ ) # Register not found raise LibraryError("Register %s not found" % key) def __str__(self): """Override __str__ to print register information in a human readable format""" if not self._programmed: return "" # Run checks if not self._checks(): return # Split register list into devices registers = {} for k, v in self.memory_map.register_list.items(): if v["device"] not in list(registers.keys()): registers[v["device"]] = [] registers[v["device"]].append(v) # Loop over all devices string = "Device%sRegister%sAddress%sBitmask\n" % (" " * 2, " " * 37, " " * 8) string += "------%s--------%s-------%s-------\n" % (" " * 2, " " * 37, " " * 8) for k, v in registers.items(): for reg in sorted(v, key=lambda arg: arg["name"]): regspace = " " * (45 - len(reg["name"])) adspace = " " * (15 - len(hex(reg["address"]))) string += "%s\t%s%s%s%s0x%08X\n" % ( DeviceNames[k], reg["name"], regspace, hex(reg["address"]), adspace, reg["bitmask"], ) # Add SPI devices if len(self._spi_devices) > 0: string += "\nSPI Devices\n" string += "-----------\n" for key in sorted(self._spi_devices.keys()): info = self._spi_devices[key] string += "Name: %s\tspi_sclk: %d\tspi_en: %d\n" % ( info, info.spi_sclk, info.spi_en, ) # Return string representation return string
# ------------------------------------------------ Tile Class --------------------------------------- if __name__ == "__main__": # Enable debug logging from sys import stdout log = logging.getLogger("") log.setLevel(logging.DEBUG) line_format = logging.Formatter( "%(asctime)s - %(levelname)s - %(threadName)s - %(message)s" ) ch = logging.StreamHandler(stdout) ch.setFormatter(line_format) log.addHandler(ch) # Test TPM tpm = TPM(ip="10.0.10.2") tpm.connect(ip="10.0.10.2", port=10000) print(tpm.get_board_info())