Source code for ska_low_cbf_fpga.args_xrt

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
import os
import threading
import typing

import numpy as np
import pyxrt

from ska_low_cbf_fpga.args_fpga import (
    ARGS_SHARED_ADDRESS,
    WORD_SIZE,
    ArgsFpgaDriver,
    ArgsWordType,
    str_from_int_bytes,
)

"""
FPGA Driver using Xilinx's XRT Python bindings (pyxrt).
You will need to have the pyxrt module available!
"""

XRT_TIMEOUT = int(os.getenv("FPGA_XRT_TIMEOUT", 5))
"""Timeout for FPGA read/write actions (milliseconds)"""


[docs]def _wait_for_completion(kernel_task: pyxrt.run, timeout: int): """ Wait for FPGA register transaction to complete. Similar purpose to ``pyxrt.run.wait()`` except we throw an exception for errors or timeouts rather than failing silently. TODO: If timeouts do happen sometimes, should they be handled gracefully e.g retry? :param kernel_task: pyxrt kernel run object to monitor :param timeout: timeout in milliseconds :raises RuntimeError: if the command does not succeed in a timely manner """ state = kernel_task.wait(timeout) if state == pyxrt.ert_cmd_state.ERT_CMD_STATE_ERROR: raise RuntimeError("Error detected in FGPA read/write.") elif state == pyxrt.ert_cmd_state.ERT_CMD_STATE_TIMEOUT: raise RuntimeError( f"Timeout ({timeout}ms) waiting for FPGA read/write." ) elif state != pyxrt.ert_cmd_state.ERT_CMD_STATE_COMPLETED: raise RuntimeError(f"Unexpected state: {state}")
[docs]class ArgsXrt(ArgsFpgaDriver): """ pyxrt-based FPGA Driver. Requires a .xclbin file. """ MINUS_ONE_16BIT = 65535 """-1 as a 16 bit signed integer (0xFFFF)""" # see Jira https://jira.skatelescope.org/browse/PERENTIE-1626
[docs] def _setup( self, xcl_file: str, # TODO use a file object instead? mem_config: typing.Union[list, str] = "", device: str = "0", **kwargs, ): """ :param xcl_file: path to a .xclbin FPGA kernel :param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>) the first list item is used to send/receive register values :param device: PCIe Board:Device.Function address """ self._buf = [] # buffer objects self._shared_bufs = [] """List of buffer indexes that are shared between host & FPGA""" self._kernel = None self._lock = ( threading.Lock() ) # to prevent multiple simultaneous kernel calls self._xcl_file = xcl_file self._mem_group_ids = [] self.logger.debug(f"PCIe address: {device}") self._device = pyxrt.device(device) self.logger.debug(f"pyxrt device: {self._device}")
[docs] def _load_firmware(self): """ Load the binary to the FPGA (if required) and initialise our kernel object. """ self.logger.debug("Loading {}".format(self._xcl_file)) xclbin = pyxrt.xclbin(self._xcl_file) uuid = self._device.load_xclbin(xclbin) kernel_names = [kernel.get_name() for kernel in xclbin.get_kernels()] cu_name = kernel_names[0] self.logger.debug(f"xclbin has kernels: {kernel_names}") self.logger.debug(f"Using Compute Unit name {cu_name}") self._kernel = pyxrt.kernel( self._device, uuid, cu_name, pyxrt.kernel.cu_access_mode(0) ) num_args = 0 for n in range(100): try: gid = self._kernel.group_id(n) num_args += 1 if gid not in (-1, self.MINUS_ONE_16BIT): self._mem_group_ids.append(gid) self.logger.debug(f"Kernel group: {n}, memory bank {gid}") except IndexError as e: self.logger.debug(f"_load_firmware; n:{n}, {e}") break self.logger.debug(f"kernel has {num_args} args")
[docs] def _init_buffers(self): """ Create XRT & Numpy buffers. """ flags = pyxrt.bo.flags mem_group_ids = self._mem_group_ids.copy() for index, memory in enumerate(self._mem_config): mem_group_id = mem_group_ids.pop(0) if memory.shared: self.logger.debug( "allocating {} of shared memory".format( str_from_int_bytes(memory.size) ) ) if memory.size % WORD_SIZE: raise ValueError( "memory size not a multiple of {} bytes".format( WORD_SIZE ) ) self._buf.append( pyxrt.bo( self._device, memory.size, flags.normal, mem_group_id ) ) self._shared_bufs.append(index) else: self.logger.debug( "allocating {} of device-only memory".format( str_from_int_bytes(memory.size) ) ) self._buf.append( pyxrt.bo( self._device, memory.size, flags.device_only, mem_group_id, ) ) self.EXCHANGE_BUF_BYTES = self._buf[0].size() self.EXCHANGE_BUF_WORDS = self.EXCHANGE_BUF_BYTES // WORD_SIZE
[docs] def read(self, source: int, length: int = 1): """ Read values from FPGA. :param source: Byte address to start reading from :param length: Number of words to read :return: value(s) from FPGA """ with self._lock: total_bytes = length * WORD_SIZE values = np.empty(total_bytes, dtype=np.uint8) for byte_offset in range(0, total_bytes, self.EXCHANGE_BUF_BYTES): read_address = source + byte_offset bytes_remaining = total_bytes - byte_offset valid_bytes = min(bytes_remaining, self.EXCHANGE_BUF_BYTES) self._read_args_page(read_address, valid_bytes) values[byte_offset : byte_offset + valid_bytes] = self._buf[ 0 ].read(valid_bytes, 0) result = values.view(dtype=ArgsWordType) self.logger.debug( "Read address 0x%x, length %d: %s", source, length, result, ) if length == 1: return int(result[0]) return result
[docs] def _read_args_page(self, address_start: int, n_bytes: int = None): """ Read one block of register values into host memory buffer. :param address_start: Byte address to start reading from :param n_bytes: Number of bytes to read """ if n_bytes is None: n_bytes = self.EXCHANGE_BUF_BYTES else: n_bytes = min(n_bytes, self.EXCHANGE_BUF_BYTES) if self._lock.locked(): # Execute kernel with new args. Note the unintuitive syntax: run function # overloads the () operator, it's not instantiating a new kernel, AFAICT # (See https://xilinx.github.io/XRT/master/html/xrt_native.main.html # and https://xilinx.github.io/XRT/master/html/pyxrt.html ) # Returns reference to the pyxrt.run object being executed by the kernel task = self._kernel( address_start, # source address ARGS_SHARED_ADDRESS, # destination address self._buf[0], n_bytes, *self._buf[1:], ) _wait_for_completion( task, XRT_TIMEOUT ) # Like task.wait() but safer # migrate to host self._buf[0].sync( pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_FROM_DEVICE, n_bytes, 0, ) else: raise RuntimeError("_read_args_page called without lock")
[docs] def write(self, destination: int, values: typing.Union[int, np.ndarray]): """ Write values to FPGA. :param destination: FPGA byte address where writes should start :param values: value(s) to write, if more than one value they must be words, to be written to consecutive words (i.e. byte addresses increment by WORD_SIZE) """ # transform integers and 0d arrays into 1d arrays, so slice works if isinstance(values, (int, np.integer)) or ( isinstance(values, np.ndarray) and values.ndim == 0 ): values = np.array(values, dtype=ArgsWordType, ndmin=1) assert values.dtype == ArgsWordType self.logger.debug("Write address 0x%x: %s", destination, values) with self._lock: for word_offset in range(0, values.size, self.EXCHANGE_BUF_WORDS): write_address = destination + word_offset * WORD_SIZE words_remaining = values.size - word_offset # number of words that are valid to write this loop iteration valid_words = min(words_remaining, self.EXCHANGE_BUF_WORDS) self._write_args_page( write_address, values[word_offset : word_offset + valid_words], )
[docs] def _write_args_page(self, destination, values): """ Write one block of register values from host buffer to FPGA. :param destination: Byte address to start writing to :param values: Array of values to be written to consecutive words """ if self._lock.locked(): # copy input data into shared buffer self._buf[0].write(values, 0) # migrate from host self._buf[0].sync( pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_TO_DEVICE, values.nbytes, 0, ) # Execute kernel with new arguments task = self._kernel( ARGS_SHARED_ADDRESS, # source address destination, # destination address self._buf[0], values.nbytes, *self._buf[1:], ) _wait_for_completion(task, XRT_TIMEOUT) else: raise RuntimeError("_write_args_page called without lock")
[docs] def read_memory( self, index: int, size_bytes: int = None, offset_bytes: int = 0 ) -> np.ndarray: """ Read a shared memory buffer. :param size_bytes: number of bytes to transfer (transfers the whole buffer if not specified or None) :param offset_bytes: starting address :param index: Index of the shared buffer to save. Zero is the ARGS interchange buffer, which you probably don't want. :return: shared memory buffer """ assert ( index in self._shared_bufs ), f"Buffer #{index} is not a shared memory" if size_bytes is None: size_bytes = self._buf[index].size() with self._lock: self._buf[index].sync( pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_FROM_DEVICE, size_bytes, offset_bytes, ) return ( self._buf[index].read(size_bytes, offset_bytes).view(ArgsWordType) )
[docs] def write_memory( self, index: int, values: np.ndarray, offset_bytes: int = 0 ): """ Write to a shared memory buffer. :param index: Index of the shared buffer to write to. Zero is the ARGS interchange buffer, which you probably don't want. :param values: Data to write. :param offset_bytes: Byte-based offset where values should start in buffer. """ assert ( index in self._shared_bufs ), f"Buffer #{index} is not a shared memory" with self._lock: self._buf[index].write(values.view(dtype=np.uint8), offset_bytes) self._buf[index].sync( pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_TO_DEVICE, values.nbytes, offset_bytes, )