Source code for ska_low_sps_tpm_api.plugins.antenna_buffer

import time

import numpy as np

from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock

__author__ = "Riccardo Chiello"


[docs] def antenna_buffer_implemented(func): """ Checks if the antenna buffer is implemented in the firmware. This is called as a decorator from tile methods :return: True if antenna buffer is implemented, else raise Exception error """ def inner_func(self, *args, **kwargs): try: implemented = ( self.tpm[f"fpga1.dsp_regfile.feature.antenna_buffer_implemented"] and self.tpm[f"fpga2.dsp_regfile.feature.antenna_buffer_implemented"] ) if not implemented: raise LibraryError(f"Antenna Buffer not implemented by FPGA firmware") except Exception as e: raise LibraryError(f"Antenna Buffer not implemented by FPGA firmware") return func(self, *args, **kwargs) return inner_func
[docs] class AntennaBuffer(FirmwareBlock): """AntennaBuffer plugin"""
[docs] @compatibleboards(BoardMake.TpmBoard) @friendlyname("tpm_antenna_buffer") @maxinstances(2) def __init__(self, board, samples_per_frame=864, logger=None, **kwargs): """AntennaBuffer initialiser. :param board: Pointer to board instance """ super(AntennaBuffer, self).__init__(board, logger=logger) if "device" not in list(kwargs.keys()): raise PluginError("AntennaBuffer: Require a node instance") self._device = kwargs["device"] if self._device == Device.FPGA_1: self._device = "fpga1" elif self._device == Device.FPGA_2: self._device = "fpga2" else: raise PluginError(f"AntennaBuffer: Invalid device {self._device}") # FPGA DDR capacity in number of GB self._ddr_capacity_gigabyte = int(self.board.board_info["DDR_SIZE_GB"]) # tracks antenna buffer mode: continuous or non-continuous self._continuous_mode = False # antenna buffer payload length, default set to 1024 self._antenna_buffer_payload_length = 1024 # max number of buffered antennas self._max_nof_antenna = 2 # number of buffered antennas self._nof_antenna = 2 # number of ADC samples in a frame self._samples_per_frame = samples_per_frame # number of frames in one timestamp self._nof_frame_in_timestamp = 256 # number of buffered timestamp, calculated furing buffer allocation self._nof_ddr_timestamp = 0 # ddr_timestamp_byte_size = nof_samples * pols * nof_antenna * nof_frames, 1 byte per sample self._ddr_timestamp_byte_size = ( self._samples_per_frame * 2 * self._nof_antenna * self._nof_frame_in_timestamp ) # antenna buffer ddr write length in bytes self._ddr_write_length_byte = 0
#######################################################################################
[docs] def select_nof_antenna(self, antennas): """ Selects the number of antenna to calculate the DDR byte size per timestamp and to assign the antenna IDs :param antennas: nof_antenna :type antennas: list of int """ # Check a none empty antenna list is given if not antennas: raise Exception(f"AntennaBuffer ERROR: {self._device} no antennas given") # Check that the number of antennas selected is not more than the maximum supported per FPGA (currently this is 2) if len(antennas) > self._max_nof_antenna: raise Exception( f"AntennaBuffer ERROR: {self._device} the number of selected antennas: {len(antennas)}, exceeds the number of supported antennas: {self._max_nof_antenna}" ) # Update the nof selected antennas to the Hardware register self._nof_antenna = len(antennas) self.board[f"{self._device}.antenna_buffer.input_sel.no_of_antenna"] = ( self._nof_antenna ) # Calcualte the DDR byte size per timestamp required for the selected number of antenna/s self._ddr_timestamp_byte_size = ( self._samples_per_frame * 2 * self._nof_antenna * self._nof_frame_in_timestamp ) # Assigning the antenna ID (0-7 per FPGA) to the selected antenna/s self.logger.info( f"AntennaBuffer: {self._device} Number of antennas selected = {self._nof_antenna}" ) # Clear all sel_antenna_ids for antenna_index in range(4): self.board[ f"{self._device}.antenna_buffer.input_sel.sel_antenna_id_{antenna_index}" ] = 0 # Set new sel_antenna_ids for antenna_index, antenna_ID in enumerate(antennas): self.board[ f"{self._device}.antenna_buffer.input_sel.sel_antenna_id_{antenna_index}" ] = antenna_ID self.logger.info( f"{self._device} Antenna ID {antenna_index} = {antenna_ID}" ) return
[docs] def set_download(self, mode="SDN", payload_length=8192): """ Selects the Tx mode and payload length """ # Reconfiguring the payload length self._antenna_buffer_payload_length = payload_length if mode.upper() == "NSDN": self.board[f"{self._device}.antenna_buffer.control.tx_demux"] = 0 self.board[f"{self._device}.antenna_buffer.payload_length"] = ( self._antenna_buffer_payload_length ) elif mode.upper() == "SDN": self.board[f"{self._device}.antenna_buffer.control.tx_demux"] = 1 self.board[f"{self._device}.antenna_buffer.payload_length"] = ( self._antenna_buffer_payload_length ) else: raise Exception( f"AntennaBuffer ERROR: invalid mode selected. Should be 'NSDN' or 'SDN'" ) return
[docs] def configure_nof_ddr_timestamps(self, ddr_start_byte_address, nof_timestamp): """ Configures the DDR write memory from the timestamp duration to capture. Alternatively, the buffer size can be configured using the method configure_ddr_write_length() """ if ddr_start_byte_address % 64 != 0: raise Exception( f"AntennaBuffer: ddr_start_byte_address {self._device} must be aligned to 64 bytes boundary" ) # Verify that the number of timestamps given is more than zero if nof_timestamp < 1: raise Exception( f"AntennaBuffer: nof_timestamp {self._device} must be more than zero" ) self._nof_ddr_timestamp = nof_timestamp self._ddr_write_length_byte = ( self._nof_ddr_timestamp * self._ddr_timestamp_byte_size ) # Checks that the DDR Write Length is not more than what is available on the TPM's DDR if ( self._ddr_write_length_byte + ddr_start_byte_address > self._ddr_capacity_gigabyte * (1024**3) ): raise Exception( f"AntennaBuffer: {self._device} Unable to allocate enough DDR memory for requested {self._nof_ddr_timestamp} timestamps for {self._nof_antenna} antennas.\n" f"DDR length {format_data(self._ddr_write_length_byte)} starting at address {format_data(ddr_start_byte_address)} requires more memory than available capacity ({self._ddr_capacity_gigabyte} GiB).\n" f"You are {format_data(self._ddr_write_length_byte + ddr_start_byte_address - self._ddr_capacity_gigabyte*1024**3)} short!" ) # divide by 64 as per DDR addressing self.board[f"{self._device}.antenna_buffer.ddr_write_start_addr"] = ( ddr_start_byte_address // 8 ) self.board[f"{self._device}.antenna_buffer.ddr_write_length"] = ( self._ddr_write_length_byte // 64 - 1 ) return self._ddr_write_length_byte
[docs] def configure_ddr_write_length(self, ddr_start_byte_address, write_byte_size): """ Configures the amount of DDR to use for Writing. Alternatively, the buffer size can be configured using the method configure_nof_ddr_timestamps() """ if ddr_start_byte_address % 64 != 0: raise Exception( f"AntennaBuffer: ddr_start_address {self._device} must be aligned to 64 bytes boundary" ) if write_byte_size < self._ddr_timestamp_byte_size: raise Exception( f"AntennaBuffer: allocated buffer size must be larger than {self._ddr_timestamp_byte_size} bytes" ) self._nof_ddr_timestamp = write_byte_size // self._ddr_timestamp_byte_size self._ddr_write_length_byte = ( self._nof_ddr_timestamp * self._ddr_timestamp_byte_size ) # divide by 64 as per DDR addressing self.board[f"{self._device}.antenna_buffer.ddr_write_start_addr"] = ( ddr_start_byte_address // 8 ) self.board[f"{self._device}.antenna_buffer.ddr_write_length"] = ( self._ddr_write_length_byte // 64 - 1 ) return self._ddr_write_length_byte
[docs] def write_ddr(self, start_time=-1, delay=256, continuous_mode=False): """ Method to write antenna buffer data into the DDR """ if self.board[f"{self._device}.antenna_buffer.control.start_read"] == 1: raise Exception( "AntennaBuffer: Still reading from DDR buffer, not possible to write yet." ) if not self._nof_ddr_timestamp: raise Exception( f"AntennaBuffer: DDR buffer {self._device} not configured, not writing" ) # Track the Antenna Buffer mode self._continuous_mode = continuous_mode self.board[f"{self._device}.antenna_buffer.control.ddr_pointer_reset"] = 1 t0 = self.board[f"{self._device}.pps_manager.timestamp_read_val"] if start_time == -1: start_time = t0 + delay else: if t0 > start_time: raise Exception( f"AntennaBuffer: {self._device} antenna buffer write failed, requested start_time passed" ) return False if self._continuous_mode: stop_time = 0 self.logger.info( f"AntennaBuffer: {self._device} Continuous mode selected - Start Time = {start_time}, no Stop Time" ) else: stop_time = start_time + self._nof_ddr_timestamp self.logger.info( f"AntennaBuffer: {self._device} Non-continuous mode selected - Start Time = {start_time}, Stop Time = {stop_time}" ) # Setting the start and stop time to registers self.board[f"{self._device}.antenna_buffer.start_frame_write"] = start_time self.board[f"{self._device}.antenna_buffer.stop_frame_write"] = stop_time # Toggle register to write AntennaBuffer data to DDR self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 1 self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 0 t1 = self.board[f"{self._device}.pps_manager.timestamp_read_val"] if t1 >= start_time: raise Exception( f"AntennaBuffer: {self._device} antenna buffer write failed, write buffer activated after start frame timestamp: t1 >= start_time" ) return False return True
@property def ddr_write_busy(self): """ Checks if antenna buffer is currently writing to DDR and checks if DDR is ready for read access :return: True if DDR still busy and False when DDR is ready for read access :type: bool """ current_timestamp = self.board[f"{self._device}.pps_manager.timestamp_read_val"] # Checking if Antenna Buffer is still writing to DDR by checking the timestamps if ( current_timestamp < self.board[f"{self._device}.antenna_buffer.stop_frame_write"] ): self.logger.info( "AntennaBuffer: Still writing into DDR, not possible to read yet" ) return True # Checking if the DDR is ready for READ access if self.board[f"{self._device}.antenna_buffer.write_status.busy"]: self.logger.info( "AntennaBuffer: DDR write complete, waiting for DDR read access" ) return True return False
[docs] def wait_for_ddr_read_access(self): """ Waits until DDR is ready for read access """ while self.ddr_write_busy: time.sleep(0.1) self.logger.info("Antenna Buffer: Ready to read DDR") return
[docs] def read_ddr(self): """ Reads AntennaBuffer data from DDR and configures AntennaBuffer timestamp header """ # Wait for DDR read access rights self.wait_for_ddr_read_access() # Configure the Read registers: # Set the 'Read DDR Start ADDR' equal to the same as the 'Write DDR Start ADDR' # Set the 'End Read DDR ADDR' to the Byte before/infront the 'WriteDDR Start ADDR': This allows the DDR Read pointer to access the entire DDR # Set the 'Read DDR Length' equal to the same as the 'Write DDR Length' self.board[f"{self._device}.antenna_buffer.ddr_read_start_addr"] = self.board[ f"{self._device}.antenna_buffer.ddr_write_start_addr" ] self.board[f"{self._device}.antenna_buffer.ddr_read_high_addr"] = ( self.board[f"{self._device}.antenna_buffer.ddr_write_start_addr"] - 8 ) self.board[f"{self._device}.antenna_buffer.ddr_read_length"] = self.board[ f"{self._device}.antenna_buffer.ddr_write_length" ] # Set the AntennaBuffer SPEAD timestamp to the Timestamp when data is first captured into DDR self.board[f"{self._device}.antenna_buffer.spead_timestamp"] = self.board[ f"{self._device}.antenna_buffer.first_frame" ] # Start antenna buffer read self.logger.info(f"AntennaBuffer: {self._device} Starting to read from DDR") self.board[f"{self._device}.antenna_buffer.control.start_read"] = 1 return True
[docs] def one_shot(self, start_time=-1, delay=256): """ Method that will run the DDR write and read antenna buffer operations one after another for 1 timestamp worth of data. Mainly used for debugging the Plugin DDR write & read methods """ self.write_ddr(start_time, delay, continuous_mode=False) while not self.read_ddr(): time.sleep(0.1)
[docs] def stop_now(self): """ Method to stop the antenna buffer from writing. This must be used to stop the antenna buffer when it runs in continuous mode """ self.logger.info(f"AntennaBuffer: {self._device} Stopping Antenna Buffer") # Stop the AntennaBuffer self.board[f"{self._device}.antenna_buffer.control.stop_now"] = 1 while self.board[f"{self._device}.antenna_buffer.write_status.busy"] == 1: self.logger.info("AntennaBuffer: Waiting for busy bit to go low...") time.sleep(0.1) # Set the timestamp of the Write Start & Stop Timestamp to the lowest possible Timestamp value to # prevent the Antenna Buffer getting stuck by waiting for the PPS timestamp to pass the Write Start Timestamp self.board[f"{self._device}.antenna_buffer.start_frame_write"] = 0x1 self.board[f"{self._device}.antenna_buffer.stop_frame_write"] = 0x1 # Toggle the 'Frame Write Update' register for write register changes to take effect # and disable the AntennaBuffer Stop Now register self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 1 self.board[f"{self._device}.antenna_buffer.control.frame_write_update"] = 0 self.board[f"{self._device}.antenna_buffer.control.stop_now"] = 0 self.logger.info( f"AntennaBuffer: {self._device} Antenna Buffer stopped and registers reset" )
[docs] def check_ddr_errors(self, show_result=True): """ Check there are no DDR read or write erros :return: False if there are any errors, True if no erros :type: bool """ errors = [] errors.append( self.board[f"{self._device}.antenna_buffer.write_status.fifo_overflow"] ) errors.append(self.board[f"{self._device}.antenna_buffer.read_status.timeout"]) errors.append( self.board[f"{self._device}.antenna_buffer.read_status.fifo_overflow"] ) if show_result: self.logger.info(f"Errors{errors}") return False if any(errors) else True
##################### Superclass method implementations #################################
[docs] def initialise(self): """Initialise AntennaBuffer""" self.logger.debug("AntennaBuffer has been initialised") return True
[docs] def status_check(self): """Perform status check. :return: Status """ self.logger.debug("AntennaBuffer : Checking status") return Status.OK
[docs] def clean_up(self): """Perform cleanup. :return: Success """ self.logger.debug("AntennaBuffer : Cleaning up") return True