Source code for ska_low_sps_tpm_api.plugins.jesd
from time import sleep
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
__author__ = "Alessio Magro"
[docs]
class TpmJesd(FirmwareBlock):
"""TpmJesd plugin"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("tpm_jesd")
@maxinstances(4)
def __init__(self, board, logger=None, **kwargs):
"""TpmJesd initialiser.
:param board: Pointer to board instance
"""
super(TpmJesd, self).__init__(board, logger=logger)
if "device" not in list(kwargs.keys()):
raise PluginError("TpmJesd: device required")
if "core" not in list(kwargs.keys()):
raise PluginError("TpmJesd: core_id required")
if "frame_length" not in list(kwargs.keys()):
self.logger.info("TpmJesd: frame_length not specified, using default 216.")
frame_length = 216 - 1
else:
frame_length = kwargs["frame_length"] - 1
self._board_type = kwargs.get("board_type", "XTPM")
self._fpga = "fpga1" if kwargs["device"] == Device.FPGA_1 else "fpga2"
self._core = kwargs["core"]
self.board[f"{self._fpga}.jesd204_if.regfile_axi4_tlast_period"] = frame_length
#######################################################################################
[docs]
def jesd_core_start(
self,
single_lane=False,
octects_per_frame=None,
lane_in_use=0xFF,
disable_core=False,
):
"""!@brief This function performs the FPGA internal JESD core
configuration and initialization procedure as implemented in ADI demo.
"""
if disable_core:
self.logger.info(f"TpmJesd: disabling core{self._core}")
rd = self.board[f"{self._fpga}.jesd204_if.regfile_channel_disable"]
self.board[f"{self._fpga}.jesd204_if.regfile_channel_disable"] = (
0xFF << (self._core * 8)
) | rd
rd = self.board[f"{self._fpga}.jesd204_if.regfile_sync_force"]
self.board[f"{self._fpga}.jesd204_if.regfile_sync_force"] = (
0x1 << self._core
) | rd
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_ila_support"] = 0x1
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_sysref_handling"] = (
0x10000
)
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_scrambling"] = 0x1
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_lanes_in_use"] = (
lane_in_use
)
if not single_lane:
self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_octets_per_frame"
] = 0x0
self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_frames_per_multiframe"
] = 0x1F
else:
self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_octets_per_frame"
] = 0x3
self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_frames_per_multiframe"
] = 0x1F
if octects_per_frame is not None:
self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_octets_per_frame"
] = octects_per_frame
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_error_reporting"] = (
0x0
)
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_error_reporting"] = (
0x101
)
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_subclass_mode"] = 0x1
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_reset"] = 0x1
self.board[f"{self._fpga}.jesd204_if.regfile_pol_switch"] = 0x0
[docs]
def jesd_core_restart(self):
# self.board['%s.jesd204_if.regfile_ctrl.reset_n' % self._fpga] = 0x0
# self.board['%s.jesd204_if.regfile_ctrl.reset_n' % self._fpga] = 0x1
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_reset"] = 0x1
[docs]
def jesd_core_check(self):
max_retries = 4
retries = 0
while True:
# Check if it's correct
if self.board[f"{self._fpga}.jesd204_if.regfile_status"] & 0x1F == 0x1E:
break
retries += 1
sleep(0.2)
if retries == max_retries:
raise BoardError("TpmJesd: Could not restart JESD cores")
[docs]
def jesd_lane_zero(self, lanes):
self.board["{}.jesd204_if.regfile_channel_disable".format(self._fpga)] = lanes
[docs]
def disable_all_lanes(self):
self.board[f"{self._fpga}.jesd204_if.regfile_channel_disable"] = 0xFFFF
[docs]
def enable_all_lanes(self):
self.board[f"{self._fpga}.jesd204_if.regfile_channel_disable"] = 0x0
[docs]
def check_link_error_status(self):
# link_error_status_1 not read as lanes 8-11 unused
rd = self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_link_error_status_0"
]
lane_ok = True
for c in range(8):
if rd & 0x7 != 0:
self.logger.error(
f"{self._fpga.upper()} Lane {self._core * 8 + c} error detected! Error code: {rd & 0x7}"
)
lane_ok = False
rd = rd >> 3
return lane_ok # returns True if Status OK, no errors
[docs]
def check_sync_status(self):
# Check sync status bit 0 and 16 are high
# Bit 16 - A SYSREF event has been captured
# Bit 0 - Link SYNC achieved
sync_status = self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_sync_status"
]
sync_status_OK = True if sync_status & (1 << 16) and sync_status & 1 else False
return sync_status_OK
[docs]
def check_link_error_counter(self):
error_counts = {}
for lane in range(8):
error_counts[f"lane{lane}"] = self.board[
f"{self._fpga}.jesd204_if.core_id_{self._core}_lane_{lane}_link_error_count"
]
return error_counts
[docs]
def check_resync_counter(self, show_result=True):
"""
Resync count is implemented in firmware to increment on each rising edge of the
sync signal. Sync signal is the AND of both JESD cores.
"""
count = self.board[f"{self._fpga}.jesd204_if.regfile_status.resync_cnt"]
if show_result:
self.logger.info(f"{self._fpga.upper()} resync count {count}")
return count
[docs]
def check_qpll_lock_loss_counter(self, show_result=True):
count = self.board[f"{self._fpga}.jesd204_if.regfile_status.qpll_lock_loss_cnt"]
if show_result:
self.logger.info(f"{self._fpga.upper()} qpll lock loss count {count}")
return count
[docs]
def check_qpll_lock_status(self):
return self.board[f"{self._fpga}.jesd204_if.regfile_status.qpll_locked"] > 0
[docs]
def clear_error_counters(self):
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_error_reporting"] = 1
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_error_reporting"] = 0
self.board[f"{self._fpga}.jesd204_if.core_id_{self._core}_error_reporting"] = 1
# Clear resync counter and qpll lock loss counter
self.board[f"{self._fpga}.jesd204_if.regfile_status.cnt_reset"] = 1
##################### Superclass method implementations #################################
[docs]
def initialise(self):
"""Initialise TpmJesd"""
self.logger.info("TpmJesd has been initialised")
return True
[docs]
def status_check(self):
"""Perform status check.
:return: Status
"""
self.logger.info("TpmJesd : Checking status")
return Status.OK
[docs]
def clean_up(self):
"""Perform cleanup.
:return: Success
"""
self.logger.info("TpmJesd : Cleaning up")
return True