# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
import os
import threading
import typing
import numpy as np
import pyxrt
from ska_low_cbf_fpga.args_fpga import (
ARGS_SHARED_ADDRESS,
WORD_SIZE,
ArgsFpgaDriver,
ArgsWordType,
str_from_int_bytes,
)
"""
FPGA Driver using Xilinx's XRT Python bindings (pyxrt).
You will need to have the pyxrt module available!
"""
XRT_TIMEOUT = int(os.getenv("FPGA_XRT_TIMEOUT", 5))
"""Timeout for FPGA read/write actions (milliseconds)"""
[docs]def _wait_for_completion(kernel_task: pyxrt.run, timeout: int):
"""
Wait for FPGA register transaction to complete.
Similar purpose to ``pyxrt.run.wait()`` except we throw an exception
for errors or timeouts rather than failing silently.
TODO: If timeouts do happen sometimes, should they be handled gracefully e.g retry?
:param kernel_task: pyxrt kernel run object to monitor
:param timeout: timeout in milliseconds
:raises RuntimeError: if the command does not succeed in a timely manner
"""
state = kernel_task.wait(timeout)
if state == pyxrt.ert_cmd_state.ERT_CMD_STATE_ERROR:
raise RuntimeError("Error detected in FGPA read/write.")
elif state == pyxrt.ert_cmd_state.ERT_CMD_STATE_TIMEOUT:
raise RuntimeError(
f"Timeout ({timeout}ms) waiting for FPGA read/write."
)
elif state != pyxrt.ert_cmd_state.ERT_CMD_STATE_COMPLETED:
raise RuntimeError(f"Unexpected state: {state}")
[docs]class ArgsXrt(ArgsFpgaDriver):
"""
pyxrt-based FPGA Driver.
Requires a .xclbin file.
"""
MINUS_ONE_16BIT = 65535
"""-1 as a 16 bit signed integer (0xFFFF)"""
# see Jira https://jira.skatelescope.org/browse/PERENTIE-1626
[docs] def _setup(
self,
xcl_file: str, # TODO use a file object instead?
mem_config: typing.Union[list, str] = "",
device: str = "0",
**kwargs,
):
"""
:param xcl_file: path to a .xclbin FPGA kernel
:param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>)
the first list item is used to send/receive register values
:param device: PCIe Board:Device.Function address
"""
self._buf = [] # buffer objects
self._shared_bufs = []
"""List of buffer indexes that are shared between host & FPGA"""
self._kernel = None
self._lock = (
threading.Lock()
) # to prevent multiple simultaneous kernel calls
self._xcl_file = xcl_file
self._mem_group_ids = []
self.logger.debug(f"PCIe address: {device}")
self._device = pyxrt.device(device)
self.logger.debug(f"pyxrt device: {self._device}")
[docs] def _load_firmware(self):
"""
Load the binary to the FPGA (if required) and initialise our kernel object.
"""
self.logger.debug("Loading {}".format(self._xcl_file))
xclbin = pyxrt.xclbin(self._xcl_file)
uuid = self._device.load_xclbin(xclbin)
kernel_names = [kernel.get_name() for kernel in xclbin.get_kernels()]
cu_name = kernel_names[0]
self.logger.debug(f"xclbin has kernels: {kernel_names}")
self.logger.debug(f"Using Compute Unit name {cu_name}")
self._kernel = pyxrt.kernel(
self._device, uuid, cu_name, pyxrt.kernel.cu_access_mode(0)
)
num_args = 0
for n in range(100):
try:
gid = self._kernel.group_id(n)
num_args += 1
if gid not in (-1, self.MINUS_ONE_16BIT):
self._mem_group_ids.append(gid)
self.logger.debug(f"Kernel group: {n}, memory bank {gid}")
except IndexError as e:
self.logger.debug(f"_load_firmware; n:{n}, {e}")
break
self.logger.debug(f"kernel has {num_args} args")
[docs] def _init_buffers(self):
"""
Create XRT & Numpy buffers.
"""
flags = pyxrt.bo.flags
mem_group_ids = self._mem_group_ids.copy()
for index, memory in enumerate(self._mem_config):
mem_group_id = mem_group_ids.pop(0)
if memory.shared:
self.logger.debug(
"allocating {} of shared memory".format(
str_from_int_bytes(memory.size)
)
)
if memory.size % WORD_SIZE:
raise ValueError(
"memory size not a multiple of {} bytes".format(
WORD_SIZE
)
)
self._buf.append(
pyxrt.bo(
self._device, memory.size, flags.normal, mem_group_id
)
)
self._shared_bufs.append(index)
else:
self.logger.debug(
"allocating {} of device-only memory".format(
str_from_int_bytes(memory.size)
)
)
self._buf.append(
pyxrt.bo(
self._device,
memory.size,
flags.device_only,
mem_group_id,
)
)
self.EXCHANGE_BUF_BYTES = self._buf[0].size()
self.EXCHANGE_BUF_WORDS = self.EXCHANGE_BUF_BYTES // WORD_SIZE
[docs] def read(self, source: int, length: int = 1):
"""
Read values from FPGA.
:param source: Byte address to start reading from
:param length: Number of words to read
:return: value(s) from FPGA
"""
with self._lock:
total_bytes = length * WORD_SIZE
values = np.empty(total_bytes, dtype=np.uint8)
for byte_offset in range(0, total_bytes, self.EXCHANGE_BUF_BYTES):
read_address = source + byte_offset
bytes_remaining = total_bytes - byte_offset
valid_bytes = min(bytes_remaining, self.EXCHANGE_BUF_BYTES)
self._read_args_page(read_address, valid_bytes)
values[byte_offset : byte_offset + valid_bytes] = self._buf[
0
].read(valid_bytes, 0)
result = values.view(dtype=ArgsWordType)
self.logger.debug(
"Read address 0x%x, length %d: %s",
source,
length,
result,
)
if length == 1:
return int(result[0])
return result
[docs] def _read_args_page(self, address_start: int, n_bytes: int = None):
"""
Read one block of register values into host memory buffer.
:param address_start: Byte address to start reading from
:param n_bytes: Number of bytes to read
"""
if n_bytes is None:
n_bytes = self.EXCHANGE_BUF_BYTES
else:
n_bytes = min(n_bytes, self.EXCHANGE_BUF_BYTES)
if self._lock.locked():
# Execute kernel with new args. Note the unintuitive syntax: run function
# overloads the () operator, it's not instantiating a new kernel, AFAICT
# (See https://xilinx.github.io/XRT/master/html/xrt_native.main.html
# and https://xilinx.github.io/XRT/master/html/pyxrt.html )
# Returns reference to the pyxrt.run object being executed by the kernel
task = self._kernel(
address_start, # source address
ARGS_SHARED_ADDRESS, # destination address
self._buf[0],
n_bytes,
*self._buf[1:],
)
_wait_for_completion(
task, XRT_TIMEOUT
) # Like task.wait() but safer
# migrate to host
self._buf[0].sync(
pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_FROM_DEVICE,
n_bytes,
0,
)
else:
raise RuntimeError("_read_args_page called without lock")
[docs] def write(self, destination: int, values: typing.Union[int, np.ndarray]):
"""
Write values to FPGA.
:param destination: FPGA byte address where writes should start
:param values: value(s) to write, if more than one value they must be words,
to be written to consecutive words (i.e. byte addresses increment by WORD_SIZE)
"""
# transform integers and 0d arrays into 1d arrays, so slice works
if isinstance(values, (int, np.integer)) or (
isinstance(values, np.ndarray) and values.ndim == 0
):
values = np.array(values, dtype=ArgsWordType, ndmin=1)
assert values.dtype == ArgsWordType
self.logger.debug("Write address 0x%x: %s", destination, values)
with self._lock:
for word_offset in range(0, values.size, self.EXCHANGE_BUF_WORDS):
write_address = destination + word_offset * WORD_SIZE
words_remaining = values.size - word_offset
# number of words that are valid to write this loop iteration
valid_words = min(words_remaining, self.EXCHANGE_BUF_WORDS)
self._write_args_page(
write_address,
values[word_offset : word_offset + valid_words],
)
[docs] def _write_args_page(self, destination, values):
"""
Write one block of register values from host buffer to FPGA.
:param destination: Byte address to start writing to
:param values: Array of values to be written to consecutive words
"""
if self._lock.locked():
# copy input data into shared buffer
self._buf[0].write(values, 0)
# migrate from host
self._buf[0].sync(
pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_TO_DEVICE,
values.nbytes,
0,
)
# Execute kernel with new arguments
task = self._kernel(
ARGS_SHARED_ADDRESS, # source address
destination, # destination address
self._buf[0],
values.nbytes,
*self._buf[1:],
)
_wait_for_completion(task, XRT_TIMEOUT)
else:
raise RuntimeError("_write_args_page called without lock")
[docs] def read_memory(
self, index: int, size_bytes: int = None, offset_bytes: int = 0
) -> np.ndarray:
"""
Read a shared memory buffer.
:param size_bytes: number of bytes to transfer
(transfers the whole buffer if not specified or None)
:param offset_bytes: starting address
:param index: Index of the shared buffer to save.
Zero is the ARGS interchange buffer, which you probably don't want.
:return: shared memory buffer
"""
assert (
index in self._shared_bufs
), f"Buffer #{index} is not a shared memory"
if size_bytes is None:
size_bytes = self._buf[index].size()
with self._lock:
self._buf[index].sync(
pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_FROM_DEVICE,
size_bytes,
offset_bytes,
)
return (
self._buf[index].read(size_bytes, offset_bytes).view(ArgsWordType)
)
[docs] def write_memory(
self, index: int, values: np.ndarray, offset_bytes: int = 0
):
"""
Write to a shared memory buffer.
:param index: Index of the shared buffer to write to.
Zero is the ARGS interchange buffer, which you probably don't want.
:param values: Data to write.
:param offset_bytes: Byte-based offset where values should start in buffer.
"""
assert (
index in self._shared_bufs
), f"Buffer #{index} is not a shared memory"
with self._lock:
self._buf[index].write(values.view(dtype=np.uint8), offset_bytes)
self._buf[index].sync(
pyxrt.xclBOSyncDirection.XCL_BO_SYNC_BO_TO_DEVICE,
values.nbytes,
offset_bytes,
)