import time
import numpy as np
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
__author__ = "Riccardo Chiello"
[docs]
def antenna_buffer_implemented(func):
"""
Checks if the antenna buffer is implemented in the firmware. This is called as a decorator from tile methods
:return: True if antenna buffer is implemented, else raise Exception error
"""
def inner_func(self, *args, **kwargs):
try:
implemented = (
self.tpm[f"fpga1.dsp_regfile.feature.antenna_buffer_implemented"]
and self.tpm[f"fpga2.dsp_regfile.feature.antenna_buffer_implemented"]
)
if not implemented:
raise LibraryError(f"Antenna Buffer not implemented by FPGA firmware")
except Exception as e:
raise LibraryError(f"Antenna Buffer not implemented by FPGA firmware")
return func(self, *args, **kwargs)
return inner_func
[docs]
class AntennaBuffer(FirmwareBlock):
"""AntennaBuffer plugin"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("tpm_antenna_buffer")
@maxinstances(2)
def __init__(self, board, samples_per_frame=864, logger=None, **kwargs):
"""AntennaBuffer initialiser.
:param board: Pointer to board instance
"""
super(AntennaBuffer, self).__init__(board, logger=logger)
if "device" not in list(kwargs.keys()):
raise PluginError("AntennaBuffer: Require a node instance")
self._device = kwargs["device"]
if self._device == Device.FPGA_1:
self._device = "fpga1"
elif self._device == Device.FPGA_2:
self._device = "fpga2"
else:
raise PluginError(f"AntennaBuffer: Invalid device {self._device}")
# FPGA DDR capacity in number of GB
self._ddr_capacity_gigabyte = int(self.board.board_info["DDR_SIZE_GB"])
# tracks antenna buffer mode: continuous or non-continuous
self._continuous_mode = False
# antenna buffer payload length, default set to 1024
self._antenna_buffer_payload_length = 1024
# max number of buffered antennas
self._max_nof_antenna = 2
# number of buffered antennas
self._nof_antenna = 2
# number of ADC samples in a frame
self._samples_per_frame = samples_per_frame
# number of frames in one timestamp
self._nof_frame_in_timestamp = 256
# number of buffered timestamp, calculated furing buffer allocation
self._nof_ddr_timestamp = 0
# ddr_timestamp_byte_size = nof_samples * pols * nof_antenna * nof_frames, 1 byte per sample
self._ddr_timestamp_byte_size = (
self._samples_per_frame
* 2
* self._nof_antenna
* self._nof_frame_in_timestamp
)
# antenna buffer ddr write length in bytes
self._ddr_write_length_byte = 0
#######################################################################################
[docs]
def select_nof_antenna(self, antennas):
"""
Selects the number of antenna to calculate the DDR byte size per timestamp and to assign the antenna IDs
:param antennas: nof_antenna
:type antennas: list of int
"""
# Check a none empty antenna list is given
if not antennas:
raise Exception(f"AntennaBuffer ERROR: {self._device} no antennas given")
# Check that the number of antennas selected is not more than the maximum supported per FPGA (currently this is 2)
if len(antennas) > self._max_nof_antenna:
raise Exception(
f"AntennaBuffer ERROR: {self._device} the number of selected antennas: {len(antennas)}, exceeds the number of supported antennas: {self._max_nof_antenna}"
)
# Update the nof selected antennas to the Hardware register
self._nof_antenna = len(antennas)
self.board[f"{self._device}.antenna_buffer.input_sel.no_of_antenna"] = (
self._nof_antenna
)
# Calcualte the DDR byte size per timestamp required for the selected number of antenna/s
self._ddr_timestamp_byte_size = (
self._samples_per_frame
* 2
* self._nof_antenna
* self._nof_frame_in_timestamp
)
# Assigning the antenna ID (0-7 per FPGA) to the selected antenna/s
self.logger.info(
f"AntennaBuffer: {self._device} Number of antennas selected = {self._nof_antenna}"
)
# Clear all sel_antenna_ids
for antenna_index in range(4):
self.board[
f"{self._device}.antenna_buffer.input_sel.sel_antenna_id_{antenna_index}"
] = 0
# Set new sel_antenna_ids
for antenna_index, antenna_ID in enumerate(antennas):
self.board[
f"{self._device}.antenna_buffer.input_sel.sel_antenna_id_{antenna_index}"
] = antenna_ID
self.logger.info(
f"{self._device} Antenna ID {antenna_index} = {antenna_ID}"
)
return
[docs]
def set_download(self, mode="SDN", payload_length=8192):
"""
Selects the Tx mode and payload length
"""
# Reconfiguring the payload length
self._antenna_buffer_payload_length = payload_length
if mode.upper() == "NSDN":
self.board[f"{self._device}.antenna_buffer.control.tx_demux"] = 0
self.board[f"{self._device}.antenna_buffer.payload_length"] = (
self._antenna_buffer_payload_length
)
elif mode.upper() == "SDN":
self.board[f"{self._device}.antenna_buffer.control.tx_demux"] = 1
self.board[f"{self._device}.antenna_buffer.payload_length"] = (
self._antenna_buffer_payload_length
)
else:
raise Exception(
f"AntennaBuffer ERROR: invalid mode selected. Should be 'NSDN' or 'SDN'"
)
return
[docs]
def write_ddr(self, start_time=-1, delay=256, continuous_mode=False):
"""
Method to write antenna buffer data into the DDR
"""
if self.board[f"{self._device}.antenna_buffer.control.start_read"] == 1:
raise Exception(
"AntennaBuffer: Still reading from DDR buffer, not possible to write yet."
)
if not self._nof_ddr_timestamp:
raise Exception(
f"AntennaBuffer: DDR buffer {self._device} not configured, not writing"
)
# Track the Antenna Buffer mode
self._continuous_mode = continuous_mode
self.board[f"{self._device}.antenna_buffer.control.ddr_pointer_reset"] = 1
t0 = self.board[f"{self._device}.pps_manager.timestamp_read_val"]
if start_time == -1:
start_time = t0 + delay
else:
if t0 > start_time:
raise Exception(
f"AntennaBuffer: {self._device} antenna buffer write failed, requested start_time passed"
)
return False
if self._continuous_mode:
stop_time = 0
self.logger.info(
f"AntennaBuffer: {self._device} Continuous mode selected - Start Time = {start_time}, no Stop Time"
)
else:
stop_time = start_time + self._nof_ddr_timestamp
self.logger.info(
f"AntennaBuffer: {self._device} Non-continuous mode selected - Start Time = {start_time}, Stop Time = {stop_time}"
)
# Setting the start and stop time to registers
self.board[f"{self._device}.antenna_buffer.start_frame_write"] = start_time
self.board[f"{self._device}.antenna_buffer.stop_frame_write"] = stop_time
# Toggle register to write AntennaBuffer data to DDR
self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 1
self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 0
t1 = self.board[f"{self._device}.pps_manager.timestamp_read_val"]
if t1 >= start_time:
raise Exception(
f"AntennaBuffer: {self._device} antenna buffer write failed, write buffer activated after start frame timestamp: t1 >= start_time"
)
return False
return True
@property
def ddr_write_busy(self):
"""
Checks if antenna buffer is currently writing to DDR and checks if DDR is ready for read access
:return: True if DDR still busy and False when DDR is ready for read access
:type: bool
"""
current_timestamp = self.board[f"{self._device}.pps_manager.timestamp_read_val"]
# Checking if Antenna Buffer is still writing to DDR by checking the timestamps
if (
current_timestamp
< self.board[f"{self._device}.antenna_buffer.stop_frame_write"]
):
self.logger.info(
"AntennaBuffer: Still writing into DDR, not possible to read yet"
)
return True
# Checking if the DDR is ready for READ access
if self.board[f"{self._device}.antenna_buffer.write_status.busy"]:
self.logger.info(
"AntennaBuffer: DDR write complete, waiting for DDR read access"
)
return True
return False
[docs]
def wait_for_ddr_read_access(self):
"""
Waits until DDR is ready for read access
"""
while self.ddr_write_busy:
time.sleep(0.1)
self.logger.info("Antenna Buffer: Ready to read DDR")
return
[docs]
def read_ddr(self):
"""
Reads AntennaBuffer data from DDR and configures AntennaBuffer timestamp header
"""
# Wait for DDR read access rights
self.wait_for_ddr_read_access()
# Configure the Read registers:
# Set the 'Read DDR Start ADDR' equal to the same as the 'Write DDR Start ADDR'
# Set the 'End Read DDR ADDR' to the Byte before/infront the 'WriteDDR Start ADDR': This allows the DDR Read pointer to access the entire DDR
# Set the 'Read DDR Length' equal to the same as the 'Write DDR Length'
self.board[f"{self._device}.antenna_buffer.ddr_read_start_addr"] = self.board[
f"{self._device}.antenna_buffer.ddr_write_start_addr"
]
self.board[f"{self._device}.antenna_buffer.ddr_read_high_addr"] = (
self.board[f"{self._device}.antenna_buffer.ddr_write_start_addr"] - 8
)
self.board[f"{self._device}.antenna_buffer.ddr_read_length"] = self.board[
f"{self._device}.antenna_buffer.ddr_write_length"
]
# Set the AntennaBuffer SPEAD timestamp to the Timestamp when data is first captured into DDR
self.board[f"{self._device}.antenna_buffer.spead_timestamp"] = self.board[
f"{self._device}.antenna_buffer.first_frame"
]
# Start antenna buffer read
self.logger.info(f"AntennaBuffer: {self._device} Starting to read from DDR")
self.board[f"{self._device}.antenna_buffer.control.start_read"] = 1
return True
[docs]
def one_shot(self, start_time=-1, delay=256):
"""
Method that will run the DDR write and read antenna buffer operations
one after another for 1 timestamp worth of data.
Mainly used for debugging the Plugin DDR write & read methods
"""
self.write_ddr(start_time, delay, continuous_mode=False)
while not self.read_ddr():
time.sleep(0.1)
[docs]
def stop_now(self):
"""
Method to stop the antenna buffer from writing.
This must be used to stop the antenna buffer when it runs in continuous mode
"""
self.logger.info(f"AntennaBuffer: {self._device} Stopping Antenna Buffer")
# Stop the AntennaBuffer
self.board[f"{self._device}.antenna_buffer.control.stop_now"] = 1
while self.board[f"{self._device}.antenna_buffer.write_status.busy"] == 1:
self.logger.info("AntennaBuffer: Waiting for busy bit to go low...")
time.sleep(0.1)
# Set the timestamp of the Write Start & Stop Timestamp to the lowest possible Timestamp value to
# prevent the Antenna Buffer getting stuck by waiting for the PPS timestamp to pass the Write Start Timestamp
self.board[f"{self._device}.antenna_buffer.start_frame_write"] = 0x1
self.board[f"{self._device}.antenna_buffer.stop_frame_write"] = 0x1
# Toggle the 'Frame Write Update' register for write register changes to take effect
# and disable the AntennaBuffer Stop Now register
self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 1
self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 0
self.board[f"{self._device}.antenna_buffer.control.stop_now"] = 0
self.logger.info(
f"AntennaBuffer: {self._device} Antenna Buffer stopped and registers reset"
)
[docs]
def check_ddr_errors(self, show_result=True):
"""
Check there are no DDR read or write erros
:return: False if there are any errors, True if no erros
:type: bool
"""
errors = []
errors.append(
self.board[f"{self._device}.antenna_buffer.write_status.fifo_overflow"]
)
errors.append(self.board[f"{self._device}.antenna_buffer.read_status.timeout"])
errors.append(
self.board[f"{self._device}.antenna_buffer.read_status.fifo_overflow"]
)
if show_result:
self.logger.info(f"Errors{errors}")
return False if any(errors) else True
##################### Superclass method implementations #################################
[docs]
def initialise(self):
"""Initialise AntennaBuffer"""
self.logger.debug("AntennaBuffer has been initialised")
return True
[docs]
def status_check(self):
"""Perform status check.
:return: Status
"""
self.logger.debug("AntennaBuffer : Checking status")
return Status.OK
[docs]
def clean_up(self):
"""Perform cleanup.
:return: Success
"""
self.logger.debug("AntennaBuffer : Cleaning up")
return True