Source code for hbm_packet_controller

# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

# we use dynamic attributes that confuse pylint...
# pylint: disable=attribute-defined-outside-init
"""
HBM Packet Controller ICL (abstraction)
"""

import bisect
import math
import time
import typing
import warnings

import dpkt.pcap as pcap
import numpy as np
from ska_low_cbf_fpga import (
    DISCOVER_PROPERTIES,
    FpgaPeripheral,
    IclField,
    IclFpgaField,
    str_from_int_bytes,
)

from ska_low_cbf_sw_cnic.icl.ptp_scheduler import (
    TIMESTAMP_BITS,
    ptp_ts_from_float,
    unix_ts_from_ptp,
)
from ska_low_cbf_sw_cnic.util.pcap import eth_from_sll, get_reader, get_writer

# These sizes are all in Bytes
ETHERNET_IPG_SIZE = 12  # Inter-Packet Gap, configured in CNIC firmware
ETHERNET_FCS_SIZE = 4  # Frame Check Sequence
ETHERNET_SFD_SIZE = 1  # Start Frame Delimiter
ETHERNET_PREAMBLE_SIZE = 7
ETHERNET_OVERHEAD_SIZE = (
    ETHERNET_PREAMBLE_SIZE + ETHERNET_SFD_SIZE + ETHERNET_FCS_SIZE + ETHERNET_IPG_SIZE
)
AXI_TRANSACTION_SIZE = 4096
BEAT_SIZE = 64
MEM_ALIGN_SIZE = 64  # data in HBM aligned to multiples of this
TIMESTAMP_SIZE = TIMESTAMP_BITS // 8  # bytes
PACKET_LEN_SIZE = 2  # bytes used to store length of packet in HBM metadata
hbm_metadata = np.dtype(
    [
        ("timestamp", "B", TIMESTAMP_SIZE),
        ("packet_size", f">u{PACKET_LEN_SIZE}"),
    ]
)
METADATA_SIZE = hbm_metadata.itemsize
"""total bytes of metadata stored in HBM for each packet"""


def _get_padded_size(data_size: int) -> int:
    """
    Round up the packet size to the next 'beat's worth of data
    :param data_size: bytes
    """
    pad_length = 0
    if data_size % MEM_ALIGN_SIZE:
        pad_length = MEM_ALIGN_SIZE - (data_size % MEM_ALIGN_SIZE)
    return data_size + pad_length


def _gap_from_rate(packet_size: int, rate: float, burst_size: int = 1) -> int:
    """
    Calculate packet burst gap (really a period) in nanoseconds
    :param packet_size: bytes
    :param rate: Gigabits per second
    :param burst_size: number of packets in a burst
    """
    # Effective packet size on wire
    line_bytes = packet_size + ETHERNET_OVERHEAD_SIZE
    # Desired packets/s
    packet_rate = (rate * 1e9) / (line_bytes * 8)
    # Convert to nanoseconds and apply burst size factor
    return math.ceil(1e9 * burst_size / packet_rate)


[docs]class HbmPacketController(FpgaPeripheral): """ Class to represent an HbmPacketController FPGA Peripheral """ _user_attributes = { # Expose all the properties this class defines DISCOVER_PROPERTIES, # The below registers could be useful for GUIs, CI scripts, # or debugging # TODO - add legacy_rate_sel when we increment to 1.6 # "legacy_rate_sel", "rx_complete", "rx_enable_capture", "rx_hbm_1_end_addr", "rx_hbm_2_end_addr", "rx_hbm_3_end_addr", "rx_hbm_4_end_addr", "rx_packet_size", "rx_packet_size_abs", "rx_packets_to_capture", "tx_burst_gap", "tx_complete", "tx_enable", "tx_packet_type", "tx_running", } _field_config = { # Tell control system about Read-Only FPGA registers "tx_running": IclFpgaField(user_write=False), "tx_looping": IclFpgaField(user_write=False), "tx_loop_count": IclFpgaField(user_write=False), "tx_axi_transaction_count": IclFpgaField(user_write=False), "tx_burst_count": IclFpgaField(user_write=False), "tx_packet_count_hi": IclFpgaField(user_write=False), "tx_packet_count_lo": IclFpgaField(user_write=False), "tx_packets_to_mac_hi": IclFpgaField(user_write=False), "tx_packets_to_mac_lo": IclFpgaField(user_write=False), "tx_complete": IclFpgaField(user_write=False), "debug_tx_current_hbm_rd_addr": IclFpgaField(user_write=False), "debug_tx_current_hbm_rd_buffer": IclFpgaField(user_write=False), "debug_tx_total_packet_beat_count": IclFpgaField(user_write=False), "debug_tx_packet_beat_count": IclFpgaField(user_write=False), "debug_tx_burst_packet_count": IclFpgaField(user_write=False), "debug_rd_fsm_debug": IclFpgaField(user_write=False), "debug_output_fsm_debug": IclFpgaField(user_write=False), "debug_input_fsm_debug": IclFpgaField(user_write=False), "debug_fifo_datacount": IclFpgaField(user_write=False), "debug_capture_filter_target": IclFpgaField(user_write=False), "debug_capture_filter_non_target": IclFpgaField(user_write=False), "ns_total_time": IclFpgaField(user_write=False), "ns_burst_timer": IclFpgaField(user_write=False), "rx_hbm_1_end_addr": IclFpgaField(user_write=False), "rx_hbm_2_end_addr": IclFpgaField(user_write=False), "rx_hbm_3_end_addr": IclFpgaField(user_write=False), "rx_hbm_4_end_addr": IclFpgaField(user_write=False), "rx_complete": IclFpgaField(user_write=False), "rx_packet_count_hi": IclFpgaField(user_write=False), "rx_packet_count_lo": IclFpgaField(user_write=False), }
[docs] def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # we don't have nice interface to find the size of the buffers... self._fpga_interface = self._personality.driver # skip the first buffer (ARGS interchange), # get the sizes of all other shared buffers hbm_sizes = [hbm.size for hbm in self._fpga_interface._mem_config[1:]] # convert sizes to a list of virtual end addresses of each buffer # e.g. [1000, 1000, 1000] => [1000, 2000, 3000] hbm_end_addresses = np.cumsum(hbm_sizes) # insert a zero for the first buffer's start address: # [0, 1000, 2000, 3000] self._buffer_offsets = np.insert(hbm_end_addresses, 0, 0) """Virtual addresses of start/end of each HBM buffer (Note: n+1 elements, last element is end of last buffer)""" self._loaded_pcap: str = "" """Filename of the pcap file loaded to HBM""" self._loaded_pcap_packets: int = 0 """Number of packets loaded into HBM""" self._dumped_pcap: str = "" """Filename of last pcap file dumped from HBM (updated when complete)"""
@property def tx_packet_count(self) -> IclField[int]: """Get 64-bit total Tx packet count""" return IclField( description="Transmitted Packet Count", type_=int, value=(self.tx_packet_count_hi.value << 32) | self.tx_packet_count_lo.value, ) @property def tx_packets_to_mac(self) -> IclField[int]: """Get 64-bit total Tx packets to MAC count""" return IclField( description="Transmitted to MAC Packet Count", type_=int, value=(self.tx_packets_to_mac_hi.value << 32) | self.tx_packets_to_mac_lo.value, ) @property def rx_packet_count(self) -> IclField[int]: """Get 64-bit total Rx packet count""" return IclField( description="Received Packet Count", type_=int, value=(self.rx_packet_count_hi.value << 32) | self.rx_packet_count_lo.value, ) def _virtual_write(self, data: np.ndarray, address: int) -> None: """ Simple virtual address mapper for writing to multiple HBM buffers. :param data: numpy array to write :param address: byte-based address :raises IndexError: if data cannot fit at address """ # Note bisect works here because our first buffer to use is memory 1 # (would need to add an offset if this was not the case) # e.g. if _buffer_offsets is [0, 1000, 2000, 3000] # address 50 will return 1; address 1500 will return 2 start_buffer = bisect.bisect(self._buffer_offsets, address) end_buffer = bisect.bisect(self._buffer_offsets, address + len(data)) if end_buffer >= len(self._buffer_offsets): raise IndexError( f"Cannot fit {len(data)} bytes " f"starting from virtual address {address}. " f"Buffers end at {self._buffer_offsets[-1]}." ) start_offset = address - self._buffer_offsets[start_buffer - 1] if start_buffer == end_buffer: # the easy case - everything in one buffer self._fpga_interface.write_memory(start_buffer, data, start_offset) else: # split across buffers, assuming buffer size >> data size # how much room is left in the first buffer? first_size = ( # calculate buffer size from address map self._buffer_offsets[start_buffer] - self._buffer_offsets[start_buffer - 1] ) - start_offset self._fpga_interface.write_memory( start_buffer, data[:first_size], start_offset ) self._fpga_interface.write_memory(start_buffer + 1, data[first_size:], 0)
[docs] def dump_pcap(self, out_filename: str, packet_size: int): """ Dump a PCAP(NG) file to disk from HBM :param str out_filename: file to save to :param int packet_size: Number of Bytes used for each packet # TODO - remove """ self._logger.info(f"Writing to {out_filename}") with open(out_filename, "wb") as out_file: self._dump_pcap(out_file)
def _dump_pcap( self, out_file: typing.BinaryIO, ) -> None: """ Dump a PCAP(NG) file from HBM. :param out_file: File object to write to. File type determined by extension, use .pcapng for next-gen. """ writer = get_writer(out_file) last_partial_packet = None n_packets = 0 n_packets_to_dump = min(self.rx_packet_count, self.rx_packets_to_capture) total_bytes = 0 padded_metadata_size = _get_padded_size(METADATA_SIZE) # start from 1 as our first buffer is #1 for buffer in range(1, len(self._buffer_offsets)): # skipping buffers for debugging if not self._rx_buffer_enabled(buffer): self._logger.debug(f"Skipping buffer {buffer}") continue end = getattr(self, f"rx_hbm_{buffer}_end_addr").value self._logger.info(f"Reading {end} B from HBM buffer {buffer} ") if end == 0: # No data in this buffer, # so we have already processed the last packet break # WORKAROUND for weird bug when reading 2GB+ on some machines # hopefully we can remove this later raw = np.empty(end, dtype=np.uint8) page_size = 1 << 30 # read 1GB for this_read_start in range(0, end, page_size): this_read_end = min(this_read_start + page_size, end) n_bytes = this_read_end - this_read_start raw[this_read_start:this_read_end] = self._personality.read_memory( buffer, n_bytes, this_read_start ).view(dtype=np.uint8) print(".", end="", flush=True) print("") # END WORKAROUND # below is the code that would work if not for the bug! # raw = ( # self._personality.driver # .read_memory(buffer, end) # .view(dtype=np.uint8) # ) self._logger.info(f"Writing buffer {buffer} packets to file") if last_partial_packet is not None: # insert tail of last buffer into head of this one raw = np.insert(raw, 0, last_partial_packet) offset = 0 while offset < raw.nbytes and n_packets < n_packets_to_dump: metadata = raw[offset : offset + METADATA_SIZE].view(dtype=hbm_metadata) ptp_ts = int.from_bytes(metadata["timestamp"].tobytes(), "big") timestamp = unix_ts_from_ptp(ptp_ts) if n_packets == 0: first_ts = timestamp if n_packets % 100 == 0: # brief sleep to give the control system a chance to do things time.sleep(0.001) packet_size = int(metadata["packet_size"][0]) start = offset + padded_metadata_size end = start + packet_size next_offset = start + _get_padded_size(packet_size) if end < raw.nbytes: offset = next_offset packet_data = raw[start:end].tobytes() writer.writepkt(packet_data, timestamp) n_packets += 1 total_bytes += packet_size else: # packet goes into the next buffer last_partial_packet = np.copy(raw[offset:]) break # end for each buffer loop self._logger.info( f"Finished writing {n_packets} packets," f" {str_from_int_bytes(total_bytes)}" ) try: duration = float(timestamp - first_ts) self._logger.info(f"Capture duration {duration:.9f} s") # guard against divide by zero # when PTP isn't active it marks all packets at t=0 if duration > 0: line_bytes = total_bytes + n_packets * ETHERNET_OVERHEAD_SIZE data_rate_gbps = (8 * line_bytes / duration) / 1e9 self._logger.info(f"Average data rate {data_rate_gbps:.3f} Gbps") else: self._logger.warning("Cannot calculate data rate") except NameError: self._logger.error("Couldn't calculate duration of capture") self._logger.info( ( f"Wrote {n_packets} packets, " f"{str_from_int_bytes(total_bytes)} " f"to {out_file.name}" ) ) self._dumped_pcap = out_file.name @property def last_dumped_pcap(self) -> IclField[str]: """Get our last dumped PCAP file name""" return IclField( description="Last dumped PCAP file name", type_=str, value=self._dumped_pcap, ) @property def loaded_pcap(self) -> IclField[str]: """Get our last loaded PCAP file name""" return IclField( description="Last loaded PCAP file name", type_=str, value=self._loaded_pcap, ) @property def loaded_pcap_packets(self) -> IclField[int]: """Get our last loaded PCAP packet count.""" return IclField( description="Last loaded PCAP packet count", type_=int, value=self._loaded_pcap_packets, )
[docs] def load_pcap(self, in_filename: str) -> None: """ Load a PCAP(NG) file from disk to FPGA. :param str in_filename: path to input PCAP(NG) file """ if self._loaded_pcap == in_filename: self._logger.info(f"Won't load {in_filename} file again") return self._loaded_pcap = "" with open(in_filename, "rb") as in_file: self._logger.info(f"Loading from {in_filename}") self._loaded_pcap_packets = self._load_pcap(in_file) self._loaded_pcap = in_filename self._logger.info("Loading complete") return # leave some trace in case there are problems with the file self._logger.error(f"ERROR loading PCAP file {in_filename}")
def _load_pcap(self, in_file: typing.BinaryIO) -> int: """ Load a PCAP(NG) file from disk to FPGA. :param in_file: input PCAP(NG) file :raises RuntimeError: if FPGA settings don't match PCAP file :returns: Number of packets loaded """ reader = get_reader(in_file) virtual_address = 0 # byte address to write to if self.enable_duplex: buffer = len(self._buffer_offsets) // 2 virtual_address = self._buffer_offsets[buffer] self._logger.debug( f"Duplex mode, loading into HBM {buffer + 1} " f"(virtual address {virtual_address})" ) start_virt_addr = virtual_address metadata_padded_size = _get_padded_size(METADATA_SIZE) dot_print_increment = 128 << 20 # print progress every 128MiB print_next_dot = start_virt_addr n_packets = 0 link_layer = reader.datalink() if link_layer not in [pcap.DLT_EN10MB, pcap.DLT_LINUX_SLL]: raise NotImplementedError(f"Link-Layer Header {link_layer} not supported!") for timestamp, packet in reader: if link_layer == pcap.DLT_LINUX_SLL: # convert "cooked" packet data back to standard Ethernet packet = eth_from_sll(packet) packet_size = len(packet) packet_padded_size = _get_padded_size(packet_size) data = np.zeros(packet_padded_size + metadata_padded_size, dtype=np.uint8) # TODO do we need to check that it's a valid ethernet packet? # Set CNIC control metadata in data array metadata = data[:METADATA_SIZE].view(dtype=hbm_metadata) metadata["timestamp"] = np.frombuffer( ptp_ts_from_float(timestamp).to_bytes(TIMESTAMP_SIZE, "big"), dtype=np.uint8, ) metadata["packet_size"] = packet_size # Set Ethernet packet contents in data array data[ metadata_padded_size : metadata_padded_size + packet_size ] = np.frombuffer(packet, dtype=np.uint8) try: self._virtual_write(data, virtual_address) except IndexError: # stop if we don't have enough memory left for the packet self._logger.debug( f"Aborting load, {data.nbytes} B can't fit at" f" virtual address {virtual_address}" ) break n_packets += 1 # virtual address is used as the DMA pointer for HBM memory. # this is advancing the ptr to the start position of the next packet # in memory. TX memory structure is metadata(mod64) + packetsize(mod64) virtual_address += data.nbytes if virtual_address >= print_next_dot: print(".", end="", flush=True) print_next_dot += dot_print_increment if n_packets % 1000 == 0: # brief sleep to give the control system a chance to do things time.sleep(0.0001) self._logger.info( f"Loaded {n_packets} packets, " f"{str_from_int_bytes(virtual_address - start_virt_addr)}" ) # set AXI transaction limit to suit amount of HBM used # (can't be set in configure_tx because we need to parse all the packets first) self.tx_axi_transactions = math.ceil( (virtual_address - start_virt_addr) / AXI_TRANSACTION_SIZE ) if n_packets < self.tx_packet_to_send.value: self._logger.warning( f"Expected {self.tx_packet_to_send.value} packets but only " f"got {n_packets} (will use {n_packets})" ) self.tx_packet_to_send = n_packets return n_packets
[docs] def configure_tx( self, packet_size: int, n_packets: int, n_loops: int = 1, burst_size: int = 1, burst_gap: typing.Union[int, None] = None, rate: float = 100.0, ) -> None: """ Configure packet transmission parameters :param int packet_size: packet size (Bytes), all packets assumed same size :param int n_packets: number of packets to send :param int n_loops: number of loops :param int burst_size: packets per burst :param int burst_gap: packet burst period (ns), overrides rate :param float rate: transmission rate (Gigabits per sec), ignored if burst_gap given """ self._logger.info("Configuring Tx params") if burst_size != 1: warnings.warn("Packet burst not tested!") if burst_gap: self.tx_burst_gap = burst_gap else: self.tx_burst_gap = _gap_from_rate(packet_size, rate, burst_size) self._logger.info( ( f"{rate} Gbps with {packet_size} B packets " f"in bursts of {burst_size} " f"gives a burst period of {self.tx_burst_gap.value} ns" ) ) self.tx_packet_size = packet_size self.tx_packet_to_send = n_packets self.tx_packets_per_burst = burst_size self.tx_bursts = math.ceil(n_packets / burst_size) packet_padded_size = _get_padded_size(packet_size) self.tx_beats_per_packet = packet_padded_size // BEAT_SIZE self.tx_beats_per_burst = self.tx_beats_per_packet * burst_size # note: Tx AXI transaction counts is set in _load_pcap self.tx_loop_enable = n_loops > 1 self.tx_loops = max(n_loops - 1, 0) # FPGA loops tx_loops+1 times
[docs] def start_tx(self) -> None: """ Start transmitting packets """ # if _loaded_pcap was stored in the FPGA, we could do something like: # if not _loaded_pcap: # raise RuntimeError("No PCAP loaded") # since it's only in software, we will defer to the user's judgement if self.rx_enable_capture and not self.duplex: raise RuntimeError("Duplex not enabled and Rx already running!") self.tx_enable = 0 self.tx_enable = 1
[docs] def configure_rx(self, packet_size: int, n_packets: int = 0) -> None: """ Prepare for receiving packets :param int packet_size: only packets of this exact size are captured (bytes) :param int n_packets: number of packets to receive """ self.rx_enable_capture = 0 if not self.duplex: # Receiving won't wipe the loaded Tx PCAP in duplex mode self._loaded_pcap = "" self.rx_packet_size = packet_size self.rx_packets_to_capture = n_packets self.rx_reset_capture = 1 self.rx_reset_capture = 0
[docs] def start_rx(self) -> None: """ Begin receiving packets into FPGA memory. Call configure_rx first. """ if self.tx_enable and not self.duplex: raise RuntimeError("Duplex not enabled and Tx already running!") self.rx_enable_capture = 1
def _rx_buffer_enabled(self, buffer: int) -> bool: """ Check if Rx buffer is enabled :param buffer: Buffer index, starting from 1 :return: True = buffer in use """ # check if debug register exists if "rx_bank_enable" not in self._fields: return True return not bool(self.rx_bank_enable & (1 << (buffer - 1)))
[docs] def flush(self) -> None: """ Flush internal FPGA Rx packet buffers into HBM. WARNING: After doing this, you won't get any more packets until you run start_rx to configure a new receiving session. """ # this register appeared part-way through FW v0.1.4 development... if hasattr(self, "rx_flush_to_hbm"): self.rx_flush_to_hbm = 1 self.rx_flush_to_hbm = 0
@property def duplex(self) -> IclField[bool]: """Is duplex mode active?""" return self.enable_duplex @duplex.setter def duplex(self, enable: bool) -> None: """ Control duplex mode. Note: set this before configuring Tx or Rx! :param enable: True=duplex (simultaneous Tx/Rx), False=simplex (one or the other) """ self.tx_enable = False self.tx_reset = True self.rx_enable_capture = False self.rx_reset_capture = True # Tx loads to different address depending on duplex setting self._loaded_pcap = "" self.enable_duplex = enable self.tx_reset = False self.rx_reset_capture = False