import time
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
MAX_RETRY = 1000
IOEXP_map = {
"LED0n": {"ch": 1, "output": True},
"LED1n": {"ch": 2, "output": True},
"LED2n": {"ch": 3, "output": True},
"LED3n": {"ch": 0, "output": True},
"ModPrsL": {"ch": 4, "output": False},
"IntL": {"ch": 5, "output": False},
"ResetL": {"ch": 6, "output": True},
"LPMode": {"ch": 7, "output": True},
}
i2c_status_busy = 0x1
i2c_status_ack_error = 0x2
itpm_cpld_i2c_mux = [1, 2]
PHY_ioexp = 0x40 # PCF8574TS
PHY_QSFP = 0xA0
PHY_EEP = 0xA6
[docs]
class TpmQSFPAdapter(FirmwareBlock):
"""TpmQSFPAdapter plugin"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("tpm_qsfp_adapter")
@maxinstances(2)
def __init__(self, board, logger=None, **kwargs):
"""
TpmQSFPAdapter initialiser.
:param board: Pointer to board instance
:param core_id: Index of plugin instance to select QSFP_ADAPTER 0 or 1
"""
super(TpmQSFPAdapter, self).__init__(board, logger=logger)
self.i2c_shadow = 0x90000270
self.i2c_req_reg = 0x50
self.i2c_ack_reg = 0x54
self.mcu_fwver_reg = self.board["board.bram_cpu"]
if "core_id" not in list(kwargs.keys()):
raise PluginError("TpmQSFPAdapter: Require a core_id parameter")
self._core_id = kwargs["core_id"]
if self._core_id < 0 or self._core_id > len(itpm_cpld_i2c_mux) - 1:
raise PluginError(
"TpmQSFPAdapter: core_id out of range, must be smaller than %d"
% len(itpm_cpld_i2c_mux)
)
self.i2c_mux = itpm_cpld_i2c_mux[self._core_id]
self.eep_sec = {
"SN": {
"offset": 0x00,
"size": 16,
"name": "SN",
"type": "string",
"protected": True,
},
"PN": {
"offset": 0x10,
"size": 16,
"name": "PN",
"type": "string",
"protected": True,
},
"HARDWARE_REV": {
"offset": 0x20,
"size": 3,
"name": "HARDWARE_REV",
"type": "bytearray",
"protected": True,
},
"PCB_REV": {
"offset": 0x23,
"size": 1,
"name": "PCB_REV",
"type": "string",
"protected": True,
},
"MAC": {
"offset": 0xFA,
"size": 6,
"name": "MAC",
"type": "bytearray",
"protected": True,
}, # READ-ONLY
}
if self._core_id == 0:
self.eep_sec["TPM_EXT_LABEL_3"] = {
"offset": 0x24,
"size": 12,
"name": "TPM_EXT_LABEL_3",
"type": "string",
"protected": True,
}
self.eep_sec["FE_0_LABEL"] = {
"offset": 0x30,
"size": 40,
"name": "FE_0_LABEL",
"type": "string",
"protected": True,
}
self.eep_sec["FE_1_LABEL"] = {
"offset": 0x58,
"size": 40,
"name": "FE_1_LABEL",
"type": "string",
"protected": True,
}
try:
self.__i2c_rd8(PHY_ioexp)
self.present = True
except:
self.logger.error(
f"TpmQSFPAdapter{self._core_id}: I2C busy or not acknowledge on IO expander! Assumed QSFPAdapter missing"
)
self.present = False
try:
self.__i2c_rd8(PHY_EEP)
self.eep_present = True
except:
self.logger.warning(
f"TpmQSFPAdapter{self._core_id}: I2C busy or not acknowledge on EEPROM! Assumed EEPROM of QSFPAdapter missing"
)
self.eep_present = False
self.__board_info = None
# self.get_board_info()
[docs]
def get_board_info(self):
if self.present:
if self.__board_info is None:
self.__board_info = {
"SN": None,
"PN": None,
"HARDWARE_REV": "",
}
if not self.eep_present:
self.__board_info["HARDWARE_REV"] = "<= v1.2.1"
return self.__board_info
self.__board_info["SN"] = self.get_field("SN")
self.__board_info["PN"] = self.get_field("PN")
pcb_rev = self.get_field("PCB_REV")
if pcb_rev == 0xFF:
pcb_rev_string = ""
else:
pcb_rev_string = str(pcb_rev)
hw_rev = self.get_field("HARDWARE_REV")
self.__board_info["HARDWARE_REV"] = (
"v"
+ str(hw_rev[0])
+ "."
+ str(hw_rev[1])
+ "."
+ str(hw_rev[2])
+ pcb_rev_string
)
return self.__board_info
[docs]
def set_passwd(self):
if self.board.i2c_old_mode:
rd = self.board["board.i2c.mac_hi"]
self.board["board.i2c.password"] = rd
rd = self.board["board.i2c.mac_lo"]
self.board["board.i2c.password_lo"] = rd
rd = self.board["board.i2c.password"]
if rd & 0x10000 == 0:
raise LibraryError("I2C/EEP password not accepted!")
else:
rd = self.board["board.i2c.mac_hi"]
self.board[self.i2c_shadow + 0x3C] = rd
rd = self.board["board.i2c.mac_lo"]
self.board[self.i2c_shadow + 0x38] = rd
[docs]
def check_passwd(self):
rd = self.board[self.i2c_shadow + 0x3C]
if (rd & 0x10000) != 0x10000:
self.logger.error("I2C/EEP password not accepted, rd %x!" % rd)
raise LibraryError("I2C/EEP password not accepted!")
[docs]
def remove_passwd(self):
if self.board.i2c_old_mode:
self.board["board.i2c.password"] = 0
self.board["board.i2c.password_lo"] = 0
else:
self.board[self.i2c_shadow + 0x3C] = 0
self.board[self.i2c_shadow + 0x38] = 0
[docs]
def poll_reg(self, reg, exit_condition, exception_val, log_warning=False):
retry = 0
while True:
if (self.board[reg]) == exit_condition:
break
else:
time.sleep(0.001)
retry = retry + 1
if self.board._i2c_lock.is_locked:
self.board._i2c_lock.refresh()
if retry >= MAX_RETRY:
message = f"EEP Polling Reg timeout: {exception_val}"
if log_warning:
self.logger.warning(message)
else:
self.logger.error(message)
raise LibraryError(exception_val)
def __i2c_rd8(
self, phy, nof_rd_byte=1, keep_locked=False, dbg_flag=False, log_warning=False
):
add = phy >> 1
nof_wr_byte = 0
offset = 0
cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add
if self.board.i2c_old_mode:
self.board["board.i2c.transmit"] = offset & 0xFF
self.board["board.i2c.command"] = cmd
status = i2c_status_busy
retry = 10
while status == i2c_status_busy:
status = self.board["board.i2c.status"]
time.sleep(0.1)
retry -= 1
pass
if retry == 0:
self.logger.error(
"tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)
)
raise PluginError(
"tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)
)
return None
if status == i2c_status_ack_error:
self.logger.error(
"tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)
)
raise PluginError(
"tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)
)
return None
if retry == 0:
self.logger.error(
"tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)
)
raise PluginError(
"tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)
)
return None
if status == i2c_status_ack_error:
self.logger.error(
"tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)
)
raise PluginError(
"tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)
)
return None
return self.board["board.i2c.receive"]
else:
self.logger.debug(
"I2C Read: Locking I2C interface to avoid MCU access conflict "
)
self.board._i2c_lock.lock()
if self.board[self.i2c_shadow + self.i2c_req_reg] == 1:
self.logger.warning("Detected Incomplete Previous I2C operation")
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP not ready!")
self.set_passwd()
self.board[self.i2c_shadow + 0x4] = offset & 0xFF
self.board[self.i2c_shadow + 0x0] = cmd
# send request
self.board[self.i2c_shadow + self.i2c_req_reg] = 1
# wait req acknowledge
self.logger.debug("i2c read wait req acknowledge")
# debug flag used to stop SW for debug use
if dbg_flag is True:
self.logger.error("ERROR: DEBUG I2C interface")
raise LibraryError("ERROR: DEBUG I2C interface")
self.poll_reg(
self.i2c_shadow + self.i2c_ack_reg, 1, "I2C/EEP request not accepted!"
)
# check passwd
self.logger.debug("i2c read check passwd")
self.check_passwd()
# check i2c op status: 0 ok, 1 busy, 2 NACK
self.logger.debug("i2c read check status")
self.poll_reg(
self.i2c_shadow + 0xC,
0,
"I2C/EEP busy or not acknowledge!",
log_warning=True,
)
read_data = self.board[self.i2c_shadow + 0x8]
# self.remove_passwd()
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
self.logger.debug("i2c read check end operation")
self.poll_reg(
self.i2c_shadow + self.i2c_ack_reg,
0,
"I2C/EEP operation complete not detected!",
)
# Unlock I2C interface
if not keep_locked:
self.board._i2c_lock.unlock()
time.sleep(0.020)
else:
self.board._i2c_lock.refresh()
return read_data
def __i2c_wr8(self, phy, value, offset=0, already_locked=False, nof_wr_byte=1):
add = phy >> 1
nof_rd_byte = 0
# nof_wr_byte = 1
cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add
if self.board.i2c_old_mode:
if nof_wr_byte == 1:
self.board["board.i2c.transmit"] = value & 0xFF
else:
self.board["board.i2c.transmit"] = ((value & 0xFF) << 8) + (
offset & 0xFF
)
self.board["board.i2c.command"] = cmd
status = i2c_status_busy
retry = 10
while status == i2c_status_busy and retry > 0:
status = self.board["board.i2c.status"]
time.sleep(0.1)
retry -= 1
pass
if retry == 0:
self.logger.error(
"tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy)
)
raise PluginError(
"tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy)
)
return None
if status == i2c_status_ack_error:
self.logger.error(
"tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy)
)
raise PluginError(
"tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy)
)
return None
return True
else:
# Lock I2C interface to avoid MCU access conflict
if not already_locked:
self.logger.debug(
"I2C Write: Locking I2C interface to avoid MCU access conflict "
)
self.board._i2c_lock.lock()
else:
self.board._i2c_lock.refresh()
if self.board[self.i2c_shadow + self.i2c_req_reg] == 1:
self.logger.warning("Detected Incomplete Previous I2C operation")
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP not ready!")
self.set_passwd()
if nof_wr_byte == 1:
self.board[self.i2c_shadow + 0x4] = value & 0xFF
else:
self.board[self.i2c_shadow + 0x4] = ((value & 0xFF) << 8) + (
offset & 0xFF
)
self.board[self.i2c_shadow + 0x0] = cmd
# send request
self.board[self.i2c_shadow + self.i2c_req_reg] = 1
# wait req acknowledge
self.logger.debug("wait req acknowledge")
self.poll_reg(
self.i2c_shadow + self.i2c_ack_reg, 1, "I2C/EEP request not accepted!"
)
# check passwd
self.logger.debug("check passwd")
self.check_passwd()
# check i2c op status: 0 ok, 1 busy, 2 NACK
self.logger.debug("check status")
self.poll_reg(self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknowledge!")
# read_data = self.board[self.i2c_shadow + 0x8]
# self.remove_passwd()
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
self.logger.debug("check end operation")
self.poll_reg(
self.i2c_shadow + self.i2c_ack_reg,
0,
"I2C/EEP operation complete not detected!",
)
# Unlock I2C interface
self.board._i2c_lock.unlock()
time.sleep(0.020)
return True
[docs]
def status(self, line=None):
# if line is None:
# self.logger.info("= IOEXP_map " + str(self._core_id) +" =")
value = self.__i2c_rd8(PHY_ioexp)
if value is None:
return None
# self.logger.info(bin(value))
ret_value = {}
for key in IOEXP_map:
i = IOEXP_map[key]["ch"]
# for i in [1,2,3,0,4,5,6,7]:
if value & 2**i > 0:
ret_value[key] = 1
# if line is None:
# self.logger.info(key + "\t1")
# el
if line == key:
return True
else:
ret_value[key] = 0
# if line is None:
# self.logger.info(key + "\t0")
# el
if line == key:
return False
return ret_value
[docs]
def set(self, line, value):
if IOEXP_map[line]["output"]:
actual_value = self.__i2c_rd8(PHY_ioexp, keep_locked=True)
if actual_value is None:
return None
i = IOEXP_map[line]["ch"]
if value > 0:
actual_value = actual_value | 2**i
else:
actual_value = actual_value & ~(2**i)
# self.logger.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value))
# input ports must be set to 1 even if read at 0
for key in IOEXP_map:
if not IOEXP_map[key]["output"]:
i = IOEXP_map[key]["ch"]
actual_value = actual_value | 2**i
return self.__i2c_wr8(PHY_ioexp, actual_value, already_locked=True)
else:
self.logger.error("tpm_qsfp_adapter.set: line %s is NOT output" % line)
raise PluginError("tpm_qsfp_adapter.set: line %s is NOT output" % line)
[docs]
def get(self, line, dbg_flag=False):
actual_value = self.__i2c_rd8(PHY_ioexp, dbg_flag=dbg_flag)
i = IOEXP_map[line]["ch"]
value = (actual_value & 2**i) >> i
# self.logger.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value))
return value
[docs]
def qsfp_read(self, offset=0, len=16):
if not self.status("ModPrsL"):
arr = self.board.tpm_eep._rd_bytearray(
offset, len, mux=self.i2c_mux, add=PHY_QSFP
)
return arr
else:
self.logger.info("QSFP %d: NA cannot dump read from QSFP")
return None
[docs]
def adapter_eep_write(self, offset=0, value=0):
arr = bytearray()
string = ""
self.__i2c_wr8(PHY_EEP, value, offset, nof_wr_byte=2)
[docs]
def get_line_mapping(self):
return IOEXP_map
[docs]
def irq_status(self):
irq = self.board["board.regfile.eth10ge.psnt"] & 2**self._core_id
if irq > 0:
return False
else:
return True
[docs]
def wr32_eep(self, offset, data):
for n in range(4):
self.adapter_eep_write(offset + n, (data >> 8 * (3 - n)) & 0xFF)
return
[docs]
def eep_wr_string(self, partition, string):
return self._eep_wr_string(partition["offset"], string, partition["size"])
def _eep_wr_string(self, offset, string, max_len=16):
addr = offset
for i in range(len(string)):
self.adapter_eep_write(addr, ord(string[i]))
addr += 1
if addr >= offset + max_len:
break
if addr < offset + max_len:
self.adapter_eep_write(addr, ord("\n"))
[docs]
def get_field(self, key):
if not self.eep_present:
return None
if self.eep_sec[key]["type"] == "ip":
return long2ip(
self.board.tpm_eep.rd32(
self.eep_sec[key]["offset"], mux=self.i2c_mux, add=PHY_EEP
)
)
elif self.eep_sec[key]["type"] == "bytearray":
arr = self.board.tpm_eep._rd_bytearray(
self.eep_sec[key]["offset"],
self.eep_sec[key]["size"],
mux=self.i2c_mux,
add=PHY_EEP,
)
return arr
elif self.eep_sec[key]["type"] == "string":
return self.board.tpm_eep._rd_string(
self.eep_sec[key]["offset"],
self.eep_sec[key]["size"],
mux=self.i2c_mux,
add=PHY_EEP,
)
elif self.eep_sec[key]["type"] == "uint":
arr = self.board.tpm_eep._rd_bytearray(
self.eep_sec[key]["offset"],
self.eep_sec[key]["size"],
mux=self.i2c_mux,
add=PHY_EEP,
)
val = int.from_bytes(arr, "big")
return val
[docs]
def set_field(self, key, value, override_protected=False):
if not self.eep_present:
raise LibraryError("Writing attempt on EEPROM that is not available")
if self.eep_sec[key]["protected"] is False or override_protected:
if self.eep_sec[key]["type"] == "ip":
self.wr32_eep(self.eep_sec[key]["offset"], ip2long(value))
elif self.eep_sec[key]["type"] == "bytearray":
for offset in range(self.eep_sec[key]["size"]):
self.adapter_eep_write(self.eep_sec[key]["offset"] + offset,
((value & (0xff << (8*(self.eep_sec[key]["size"]-1-offset))))
>> (8*(self.eep_sec[key]["size"]-1-offset))) & 0xff) # fmt: skip
elif self.eep_sec[key]["type"] == "string":
self.eep_wr_string(self.eep_sec[key], value)
elif self.eep_sec[key]["type"] == "uint":
val = value
for offset in range(self.eep_sec[key]["size"]):
self.adapter_eep_write(
self.eep_sec[key]["offset"] + offset, val & 0xFF
)
val = val >> 8
else:
raise LibraryError("Writing attempt on protected sector %s" % key)
[docs]
def irq_clear(self):
self.__i2c_rd8(PHY_ioexp)