Source code for ska_low_sps_tpm_api.plugins.qsfp_adapter

import time

from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock

MAX_RETRY = 1000

IOEXP_map = {
    "LED0n": {"ch": 1, "output": True},
    "LED1n": {"ch": 2, "output": True},
    "LED2n": {"ch": 3, "output": True},
    "LED3n": {"ch": 0, "output": True},
    "ModPrsL": {"ch": 4, "output": False},
    "IntL": {"ch": 5, "output": False},
    "ResetL": {"ch": 6, "output": True},
    "LPMode": {"ch": 7, "output": True},
}

i2c_status_busy = 0x1
i2c_status_ack_error = 0x2

itpm_cpld_i2c_mux = [1, 2]

PHY_ioexp = 0x40  # PCF8574TS
PHY_QSFP = 0xA0
PHY_EEP = 0xA6


[docs] class TpmQSFPAdapter(FirmwareBlock): """TpmQSFPAdapter plugin"""
[docs] @compatibleboards(BoardMake.TpmBoard) @friendlyname("tpm_qsfp_adapter") @maxinstances(2) def __init__(self, board, logger=None, **kwargs): """ TpmQSFPAdapter initialiser. :param board: Pointer to board instance :param core_id: Index of plugin instance to select QSFP_ADAPTER 0 or 1 """ super(TpmQSFPAdapter, self).__init__(board, logger=logger) self.i2c_shadow = 0x90000270 self.i2c_req_reg = 0x50 self.i2c_ack_reg = 0x54 self.mcu_fwver_reg = self.board["board.bram_cpu"] if "core_id" not in list(kwargs.keys()): raise PluginError("TpmQSFPAdapter: Require a core_id parameter") self._core_id = kwargs["core_id"] if self._core_id < 0 or self._core_id > len(itpm_cpld_i2c_mux) - 1: raise PluginError( "TpmQSFPAdapter: core_id out of range, must be smaller than %d" % len(itpm_cpld_i2c_mux) ) self.i2c_mux = itpm_cpld_i2c_mux[self._core_id] self.eep_sec = { "SN": { "offset": 0x00, "size": 16, "name": "SN", "type": "string", "protected": True, }, "PN": { "offset": 0x10, "size": 16, "name": "PN", "type": "string", "protected": True, }, "HARDWARE_REV": { "offset": 0x20, "size": 3, "name": "HARDWARE_REV", "type": "bytearray", "protected": True, }, "PCB_REV": { "offset": 0x23, "size": 1, "name": "PCB_REV", "type": "string", "protected": True, }, "MAC": { "offset": 0xFA, "size": 6, "name": "MAC", "type": "bytearray", "protected": True, }, # READ-ONLY } if self._core_id == 0: self.eep_sec["TPM_EXT_LABEL_3"] = { "offset": 0x24, "size": 12, "name": "TPM_EXT_LABEL_3", "type": "string", "protected": True, } self.eep_sec["FE_0_LABEL"] = { "offset": 0x30, "size": 40, "name": "FE_0_LABEL", "type": "string", "protected": True, } self.eep_sec["FE_1_LABEL"] = { "offset": 0x58, "size": 40, "name": "FE_1_LABEL", "type": "string", "protected": True, } try: self.__i2c_rd8(PHY_ioexp) self.present = True except: self.logger.error( f"TpmQSFPAdapter{self._core_id}: I2C busy or not acknowledge on IO expander! Assumed QSFPAdapter missing" ) self.present = False try: self.__i2c_rd8(PHY_EEP) self.eep_present = True except: self.logger.warning( f"TpmQSFPAdapter{self._core_id}: I2C busy or not acknowledge on EEPROM! Assumed EEPROM of QSFPAdapter missing" ) self.eep_present = False self.__board_info = None
# self.get_board_info()
[docs] def get_board_info(self): if self.present: if self.__board_info is None: self.__board_info = { "SN": None, "PN": None, "HARDWARE_REV": "", } if not self.eep_present: self.__board_info["HARDWARE_REV"] = "<= v1.2.1" return self.__board_info self.__board_info["SN"] = self.get_field("SN") self.__board_info["PN"] = self.get_field("PN") pcb_rev = self.get_field("PCB_REV") if pcb_rev == 0xFF: pcb_rev_string = "" else: pcb_rev_string = str(pcb_rev) hw_rev = self.get_field("HARDWARE_REV") self.__board_info["HARDWARE_REV"] = ( "v" + str(hw_rev[0]) + "." + str(hw_rev[1]) + "." + str(hw_rev[2]) + pcb_rev_string ) return self.__board_info
[docs] def set_passwd(self): if self.board.i2c_old_mode: rd = self.board["board.i2c.mac_hi"] self.board["board.i2c.password"] = rd rd = self.board["board.i2c.mac_lo"] self.board["board.i2c.password_lo"] = rd rd = self.board["board.i2c.password"] if rd & 0x10000 == 0: raise LibraryError("I2C/EEP password not accepted!") else: rd = self.board["board.i2c.mac_hi"] self.board[self.i2c_shadow + 0x3C] = rd rd = self.board["board.i2c.mac_lo"] self.board[self.i2c_shadow + 0x38] = rd
[docs] def check_passwd(self): rd = self.board[self.i2c_shadow + 0x3C] if (rd & 0x10000) != 0x10000: self.logger.error("I2C/EEP password not accepted, rd %x!" % rd) raise LibraryError("I2C/EEP password not accepted!")
[docs] def remove_passwd(self): if self.board.i2c_old_mode: self.board["board.i2c.password"] = 0 self.board["board.i2c.password_lo"] = 0 else: self.board[self.i2c_shadow + 0x3C] = 0 self.board[self.i2c_shadow + 0x38] = 0
[docs] def poll_reg(self, reg, exit_condition, exception_val, log_warning=False): retry = 0 while True: if (self.board[reg]) == exit_condition: break else: time.sleep(0.001) retry = retry + 1 if self.board._i2c_lock.is_locked: self.board._i2c_lock.refresh() if retry >= MAX_RETRY: message = f"EEP Polling Reg timeout: {exception_val}" if log_warning: self.logger.warning(message) else: self.logger.error(message) raise LibraryError(exception_val)
def __i2c_rd8( self, phy, nof_rd_byte=1, keep_locked=False, dbg_flag=False, log_warning=False ): add = phy >> 1 nof_wr_byte = 0 offset = 0 cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add if self.board.i2c_old_mode: self.board["board.i2c.transmit"] = offset & 0xFF self.board["board.i2c.command"] = cmd status = i2c_status_busy retry = 10 while status == i2c_status_busy: status = self.board["board.i2c.status"] time.sleep(0.1) retry -= 1 pass if retry == 0: self.logger.error( "tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy) ) raise PluginError( "tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy) ) return None if status == i2c_status_ack_error: self.logger.error( "tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy) ) raise PluginError( "tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy) ) return None if retry == 0: self.logger.error( "tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy) ) raise PluginError( "tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy) ) return None if status == i2c_status_ack_error: self.logger.error( "tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy) ) raise PluginError( "tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy) ) return None return self.board["board.i2c.receive"] else: self.logger.debug( "I2C Read: Locking I2C interface to avoid MCU access conflict " ) self.board._i2c_lock.lock() if self.board[self.i2c_shadow + self.i2c_req_reg] == 1: self.logger.warning("Detected Incomplete Previous I2C operation") self.board[self.i2c_shadow + self.i2c_req_reg] = 0 self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP not ready!") self.set_passwd() self.board[self.i2c_shadow + 0x4] = offset & 0xFF self.board[self.i2c_shadow + 0x0] = cmd # send request self.board[self.i2c_shadow + self.i2c_req_reg] = 1 # wait req acknowledge self.logger.debug("i2c read wait req acknowledge") # debug flag used to stop SW for debug use if dbg_flag is True: self.logger.error("ERROR: DEBUG I2C interface") raise LibraryError("ERROR: DEBUG I2C interface") self.poll_reg( self.i2c_shadow + self.i2c_ack_reg, 1, "I2C/EEP request not accepted!" ) # check passwd self.logger.debug("i2c read check passwd") self.check_passwd() # check i2c op status: 0 ok, 1 busy, 2 NACK self.logger.debug("i2c read check status") self.poll_reg( self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknowledge!", log_warning=True, ) read_data = self.board[self.i2c_shadow + 0x8] # self.remove_passwd() self.board[self.i2c_shadow + self.i2c_req_reg] = 0 self.logger.debug("i2c read check end operation") self.poll_reg( self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP operation complete not detected!", ) # Unlock I2C interface if not keep_locked: self.board._i2c_lock.unlock() time.sleep(0.020) else: self.board._i2c_lock.refresh() return read_data def __i2c_wr8(self, phy, value, offset=0, already_locked=False, nof_wr_byte=1): add = phy >> 1 nof_rd_byte = 0 # nof_wr_byte = 1 cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add if self.board.i2c_old_mode: if nof_wr_byte == 1: self.board["board.i2c.transmit"] = value & 0xFF else: self.board["board.i2c.transmit"] = ((value & 0xFF) << 8) + ( offset & 0xFF ) self.board["board.i2c.command"] = cmd status = i2c_status_busy retry = 10 while status == i2c_status_busy and retry > 0: status = self.board["board.i2c.status"] time.sleep(0.1) retry -= 1 pass if retry == 0: self.logger.error( "tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy) ) raise PluginError( "tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy) ) return None if status == i2c_status_ack_error: self.logger.error( "tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy) ) raise PluginError( "tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy) ) return None return True else: # Lock I2C interface to avoid MCU access conflict if not already_locked: self.logger.debug( "I2C Write: Locking I2C interface to avoid MCU access conflict " ) self.board._i2c_lock.lock() else: self.board._i2c_lock.refresh() if self.board[self.i2c_shadow + self.i2c_req_reg] == 1: self.logger.warning("Detected Incomplete Previous I2C operation") self.board[self.i2c_shadow + self.i2c_req_reg] = 0 self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP not ready!") self.set_passwd() if nof_wr_byte == 1: self.board[self.i2c_shadow + 0x4] = value & 0xFF else: self.board[self.i2c_shadow + 0x4] = ((value & 0xFF) << 8) + ( offset & 0xFF ) self.board[self.i2c_shadow + 0x0] = cmd # send request self.board[self.i2c_shadow + self.i2c_req_reg] = 1 # wait req acknowledge self.logger.debug("wait req acknowledge") self.poll_reg( self.i2c_shadow + self.i2c_ack_reg, 1, "I2C/EEP request not accepted!" ) # check passwd self.logger.debug("check passwd") self.check_passwd() # check i2c op status: 0 ok, 1 busy, 2 NACK self.logger.debug("check status") self.poll_reg(self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknowledge!") # read_data = self.board[self.i2c_shadow + 0x8] # self.remove_passwd() self.board[self.i2c_shadow + self.i2c_req_reg] = 0 self.logger.debug("check end operation") self.poll_reg( self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP operation complete not detected!", ) # Unlock I2C interface self.board._i2c_lock.unlock() time.sleep(0.020) return True
[docs] def status(self, line=None): # if line is None: # self.logger.info("= IOEXP_map " + str(self._core_id) +" =") value = self.__i2c_rd8(PHY_ioexp) if value is None: return None # self.logger.info(bin(value)) ret_value = {} for key in IOEXP_map: i = IOEXP_map[key]["ch"] # for i in [1,2,3,0,4,5,6,7]: if value & 2**i > 0: ret_value[key] = 1 # if line is None: # self.logger.info(key + "\t1") # el if line == key: return True else: ret_value[key] = 0 # if line is None: # self.logger.info(key + "\t0") # el if line == key: return False return ret_value
[docs] def set(self, line, value): if IOEXP_map[line]["output"]: actual_value = self.__i2c_rd8(PHY_ioexp, keep_locked=True) if actual_value is None: return None i = IOEXP_map[line]["ch"] if value > 0: actual_value = actual_value | 2**i else: actual_value = actual_value & ~(2**i) # self.logger.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value)) # input ports must be set to 1 even if read at 0 for key in IOEXP_map: if not IOEXP_map[key]["output"]: i = IOEXP_map[key]["ch"] actual_value = actual_value | 2**i return self.__i2c_wr8(PHY_ioexp, actual_value, already_locked=True) else: self.logger.error("tpm_qsfp_adapter.set: line %s is NOT output" % line) raise PluginError("tpm_qsfp_adapter.set: line %s is NOT output" % line)
[docs] def get(self, line, dbg_flag=False): actual_value = self.__i2c_rd8(PHY_ioexp, dbg_flag=dbg_flag) i = IOEXP_map[line]["ch"] value = (actual_value & 2**i) >> i # self.logger.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value)) return value
[docs] def qsfp_read(self, offset=0, len=16): if not self.status("ModPrsL"): arr = self.board.tpm_eep._rd_bytearray( offset, len, mux=self.i2c_mux, add=PHY_QSFP ) return arr else: self.logger.info("QSFP %d: NA cannot dump read from QSFP") return None
[docs] def adapter_eep_write(self, offset=0, value=0): arr = bytearray() string = "" self.__i2c_wr8(PHY_EEP, value, offset, nof_wr_byte=2)
[docs] def get_line_mapping(self): return IOEXP_map
[docs] def irq_status(self): irq = self.board["board.regfile.eth10ge.psnt"] & 2**self._core_id if irq > 0: return False else: return True
[docs] def wr32_eep(self, offset, data): for n in range(4): self.adapter_eep_write(offset + n, (data >> 8 * (3 - n)) & 0xFF) return
[docs] def eep_wr_string(self, partition, string): return self._eep_wr_string(partition["offset"], string, partition["size"])
def _eep_wr_string(self, offset, string, max_len=16): addr = offset for i in range(len(string)): self.adapter_eep_write(addr, ord(string[i])) addr += 1 if addr >= offset + max_len: break if addr < offset + max_len: self.adapter_eep_write(addr, ord("\n"))
[docs] def get_field(self, key): if not self.eep_present: return None if self.eep_sec[key]["type"] == "ip": return long2ip( self.board.tpm_eep.rd32( self.eep_sec[key]["offset"], mux=self.i2c_mux, add=PHY_EEP ) ) elif self.eep_sec[key]["type"] == "bytearray": arr = self.board.tpm_eep._rd_bytearray( self.eep_sec[key]["offset"], self.eep_sec[key]["size"], mux=self.i2c_mux, add=PHY_EEP, ) return arr elif self.eep_sec[key]["type"] == "string": return self.board.tpm_eep._rd_string( self.eep_sec[key]["offset"], self.eep_sec[key]["size"], mux=self.i2c_mux, add=PHY_EEP, ) elif self.eep_sec[key]["type"] == "uint": arr = self.board.tpm_eep._rd_bytearray( self.eep_sec[key]["offset"], self.eep_sec[key]["size"], mux=self.i2c_mux, add=PHY_EEP, ) val = int.from_bytes(arr, "big") return val
[docs] def set_field(self, key, value, override_protected=False): if not self.eep_present: raise LibraryError("Writing attempt on EEPROM that is not available") if self.eep_sec[key]["protected"] is False or override_protected: if self.eep_sec[key]["type"] == "ip": self.wr32_eep(self.eep_sec[key]["offset"], ip2long(value)) elif self.eep_sec[key]["type"] == "bytearray": for offset in range(self.eep_sec[key]["size"]): self.adapter_eep_write(self.eep_sec[key]["offset"] + offset, ((value & (0xff << (8*(self.eep_sec[key]["size"]-1-offset)))) >> (8*(self.eep_sec[key]["size"]-1-offset))) & 0xff) # fmt: skip elif self.eep_sec[key]["type"] == "string": self.eep_wr_string(self.eep_sec[key], value) elif self.eep_sec[key]["type"] == "uint": val = value for offset in range(self.eep_sec[key]["size"]): self.adapter_eep_write( self.eep_sec[key]["offset"] + offset, val & 0xFF ) val = val >> 8 else: raise LibraryError("Writing attempt on protected sector %s" % key)
[docs] def irq_clear(self): self.__i2c_rd8(PHY_ioexp)