# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""ARGS FPGA driver using ``ami_tool``."""
import json
import os
import subprocess
import warnings
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from io import FileIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import List, Optional, Union
import numpy as np
from ska_low_cbf_fpga import WORD_SIZE, ArgsFpgaDriver, ArgsWordType
DMA_MAX_SIZE = 256 << 20
"""Maximum number of bytes per DMA transaction.
Not sure exactly what the limiting factor is, possibly QDMA configuration options."""
[docs]
def _parse_read_string(read: str, n_words: int) -> np.ndarray:
"""
Decode the output from an ``ami_tool bar_rd`` command.
:param read: output of ``ami_tool bar_rd`` command.
:param n_words: number of words that were read.
"""
values = np.empty(n_words, dtype=ArgsWordType)
n_read = 0
for line in read.splitlines():
if not line.startswith("["):
continue
line_ints = [int(i, 16) for i in line.split(" ]\t ")[1].split(" ")]
values[n_read : n_read + len(line_ints)] = np.array(
line_ints, dtype=ArgsWordType
)
n_read += len(line_ints)
if n_read != n_words:
raise ValueError("Did not get expected number of words")
return values
[docs]
def _uuid_mismatch(cfgmem_program_output: str) -> bool:
"""
Check if incoming and current UUIDs differ.
:param cfgmem_program_output: output of ``ami_tool cfgmem_program`` command
:return: ``True`` if current and incoming UUIDs differ, ``False`` otherwise
"""
uuids = {"current": None, "incoming": None}
section = None
for line in cfgmem_program_output.splitlines():
if "Incoming" in line:
section = "incoming"
elif "Current" in line:
section = "current"
if section and "UUID" in line:
uuids[section] = line.split(" | ")[-1].strip()
if uuids["current"] is None:
return True
if uuids["incoming"] == "N/A":
return True
return uuids["incoming"] != uuids["current"]
[docs]
def parse_hbm_addresses_file(filename: Path | str) -> tuple[int, ...]:
"""
Read HBM address offsets from a file.
:param filename: Path to an HBM address map file.
The File must contain one integer value per line,
which is used as the QDMA address offset for the respective HBM buffer.
"""
addresses = Path(filename)
if not addresses.exists():
raise FileNotFoundError(f"{filename} does not exist")
return tuple(map(lambda x: int(x, 0), addresses.read_text().splitlines()))
[docs]
class BootDeviceType(Enum):
"""AMI boot device types."""
PRIMARY = "primary"
SECONDARY = "secondary"
[docs]
@dataclass
class PartitionSpec:
"""AMI Partition Specifier."""
boot_type: str | BootDeviceType
partition: int
def __post_init__(self):
if isinstance(self.boot_type, str):
self.boot_type = BootDeviceType(self.boot_type)
[docs]
class ArgsAmi(ArgsFpgaDriver):
"""FPGA driver that wraps ``ami_tool``."""
[docs]
def _setup(
self,
*,
device: str = "0",
partition: Optional[PartitionSpec] = None,
pdi_file: Optional[str] = None,
bars: Optional[tuple[int, ...]] = (2, 4),
bar_address_split: int = 0xFFFF_FFFF_FFFF_FFFF,
qdma_read: Optional[str] = None,
qdma_write: Optional[str] = None,
hbm_offsets: tuple[int, ...] = (),
**kwargs,
):
"""
Set up the FPGA driver.
:param device: PCIe BDF address. Can be partial, don't need the zeros.
:param boot_type: The boot device type.
:param partition: Flash memory partition to activate and boot from.
If ``pdi_file`` is given, it will be loaded to this partition unless the
image in the partition has a matching UUID.
:param pdi_file: Path to .pdi file to load.
If not given, use whatever is already on the card.
:param bars: PCIe BAR numbers to use, defaults to ``(2, 4)``.
:param bar_address_split: Byte address of first register in second PCIe BAR.
The default value is the largest possible (64 bit maximum unsigned int)
so we can read ARGS magic number and map build before loading the map.
Then, using the map, a user can find the address of the real split value.
This driver **does not automatically read the true split value**,
as we have no knowledge of the ARGS register address map.
:param qdma_read: QDMA read (c2h) character device path, used for HBM access.
:param qdma_write: QDMA write (h2c) character device path, used for HBM access.
:param hbm_offsets: HBM address offsets.
One value per HBM buffer, in order.
"""
if partition is None:
partition = PartitionSpec(partition=0, boot_type=BootDeviceType.PRIMARY)
self._bdf = device
self._bars = bars
self.bar_address_split = 0xFFFF_FFFF_FFFF_FFFF
"""
Byte address of first register in second PCIe BAR.
This value will be set externally, after loading the ``ArgsMap``.
"""
self._boot_type = partition.boot_type.value
self._partition = partition.partition
self._pdi_file = pdi_file
self._qdma_read = qdma_read
self._qdma_write = qdma_write
self._hbm_offsets = hbm_offsets
if os.getuid() != 0:
warnings.warn("Not running as root, expect register access to fail!")
if not self._hbm_offsets:
warnings.warn("No HBM offsets given, can't read/write HBM.")
if self._qdma_read is None:
warnings.warn("No QDMA read device given, can't read HBM.")
if self._qdma_write is None:
warnings.warn("No QDMA write device given, can't write HBM.")
# the partition check has to come after setting _bdf and _boot_type
if self._partition > self.n_partitions:
raise ValueError(
f"Partition {self._partition} not valid. "
f"FPT has {self.n_partitions} partitions."
)
[docs]
def _load_firmware(self):
"""Load a firmware file into the FPGA."""
if self._pdi_file is None:
self.logger.info(
f"Activating partition {self._partition} (without programming)"
)
# strangely, the device_boot command doesn't take a boot type argument...
command = f"ami_tool device_boot -d {self._bdf} -p {self._partition}"
boot_command = subprocess.run(command.split(" "), capture_output=True)
if boot_command.returncode != 0:
self.logger.error(
"Error when activating partition %d: %s",
self._partition,
boot_command.stderr.decode("utf"),
)
raise RuntimeError(f"Failed to activate partition {self._partition}")
self.logger.info(f"Activated partition {self._partition}")
return
self.logger.info(f"Using partition {self._partition}, {self._pdi_file}")
if not os.path.exists(self._pdi_file):
raise FileNotFoundError(f"No such file: {self._pdi_file}")
# Firmware loading is slow, so we check UUID first
command = (
f"ami_tool cfgmem_program -d {self._bdf} -t {self._boot_type} "
f"-i {self._pdi_file} -p {self._partition}"
)
check_command = subprocess.run(
command.split(" "),
capture_output=True,
input=b"n",
)
if _uuid_mismatch(check_command.stdout.decode("utf")):
self.logger.info("UUID differs. Programming via cfgmem_program.")
program_command = subprocess.run(
command.split(" "),
capture_output=True,
input=b"Y",
)
if program_command.returncode != 0:
self.logger.error(program_command.stderr.decode("utf"))
raise RuntimeError("Programming FAILED")
self.logger.info("Programming complete")
else:
# The UUID reported by cfgmem_program is the one that is currently in use
# by the card.
# So we don't need to trigger a device_boot or anything.
self.logger.info(
"UUID matches. Firmware already active. Will not re-program."
)
[docs]
def _init_buffers(self):
"""Initialise memory buffers."""
pass
[docs]
def read(self, source: int, length: int = 1) -> Union[np.ndarray, int]:
"""Read FPGA registers."""
bar, addr = self._get_bar_address(source)
command = f"ami_tool bar_rd -d {self._bdf} -b {bar} -a {addr} -l {length}"
read_command = subprocess.run(command.split(" "), capture_output=True)
if read_command.returncode != 0:
self.logger.error(read_command.stderr.decode("utf"))
raise RuntimeError(f"read({source=}, {length=}) failed")
values = _parse_read_string(read_command.stdout.decode("utf"), length)
if length == 1:
return int(values[0])
return values
[docs]
def write(
self, destination: int, values: Union[int, np.ndarray, List[int]]
) -> None:
"""Write to FPGA registers."""
bar, addr = self._get_bar_address(destination)
# Start of our command, without -i or -I and value or filename.
# A list is used rather than a str to be split later,
# just in case our temp file has a space in it.
command = f"ami_tool bar_wr -d {self._bdf} -b {bar} -a {addr}".split(" ")
if isinstance(values, (int, np.integer)):
# Single value, no point creating temp. file
command += ["-i", f"{values}"]
write_command = subprocess.run(command, input=b"Y", capture_output=True)
if write_command.returncode != 0:
self.logger.error(write_command.stderr.decode("utf"))
raise RuntimeError(f"Single value write({destination=}) failed")
return
if len(values) == 0:
# Shouldn't happen, but can with bugs in higher-level software.
# ArgsXrt tolerates this, so we should too.
self.logger.warning("Ignoring zero-length array")
# just in case we get a 0 dimensional 'array'
# (not sure if still in use anywhere, but ArgsXrt tolerates them)
if isinstance(values, np.ndarray) and values.ndim == 0:
values = np.array(values, dtype=ArgsWordType, ndmin=1)
with NamedTemporaryFile("w", delete=True) as temp:
# ami_tool says to use hex, but decimal works just fine
# this 'join' scheme seemed the fastest way
# (better than np.savetxt or looping one line at a time)
temp.write("\n".join(f"{value}" for value in values))
temp.flush()
command += ["-I", temp.name]
write_command = subprocess.run(command, input=b"Y", capture_output=True)
if write_command.returncode != 0:
self.logger.error(write_command.stderr.decode("utf"))
raise RuntimeError(
f"Multiple value write({destination=}, {len(values)=}) failed"
)
[docs]
def read_memory(
self, index: int, size_bytes: int = None, offset_bytes: int = 0
) -> np.ndarray:
"""
Read from HBM.
Note: Unlike ``ArgsXrt``, we do not care if the memory buffer is marked as
'shared' in the memory configuration string.
:param size_bytes: Number of bytes to transfer.
Should be a multiple of ``WORD_SIZE``,
will be rounded down to a multiple of ``WORD_SIZE``.
(transfers the whole buffer if not specified or None)
:param offset_bytes: Starting address.
:param index: Index of the HBM buffer to read.
For compatibility with :py:class:`ArgsXrt`,
the ``index`` of the first HBM buffer is 1.
:return: HBM contents, as :py:class:`ArgsWordType`
"""
if size_bytes is None:
size_bytes = self._mem_config[index].size
self.logger.debug("Reading %d Bytes from HBM %d", size_bytes, index)
# Note: numpy should create this as 8-byte aligned,
# which might be good for speed
buf = np.empty(size_bytes // WORD_SIZE, dtype=ArgsWordType)
n_bytes = 0
with FileIO(self._qdma_read, "rb") as f:
while n_bytes < buf.nbytes:
f.seek(self._hbm_offsets[index - 1] + offset_bytes + n_bytes)
start = n_bytes // WORD_SIZE
end = (n_bytes + DMA_MAX_SIZE) // WORD_SIZE
n = f.readinto(buf[start:end])
if n == 0:
raise RuntimeError("Read failed")
# if we read a partial word, wind back the clock,
# read the whole thing next loop
# (not sure if this is possible)
n_bytes += n - n % WORD_SIZE
return buf
[docs]
def write_memory(self, index: int, values: np.ndarray, offset_bytes: int = 0):
"""
Write to HBM.
Note: Unlike ``ArgsXrt``, we do not care if the memory buffer is marked as
'shared' in the memory configuration string.
:param index: Index of the HBM buffer to write to.
For compatibility with :py:class:`ArgsXrt`,
the ``index`` of the first HBM buffer is 1.
:param values: Data to write.
:param offset_bytes: Starting address.
"""
self.logger.debug("Writing %d Bytes to HBM %d", values.nbytes, index)
n_bytes = 0
with FileIO(self._qdma_write, "wb") as f:
f.seek(self._hbm_offsets[index - 1] + offset_bytes)
# .write docs tell us that it:
# Only makes one system call, so not all of the data may be written.
while n_bytes < values.nbytes:
start = n_bytes // values.itemsize
end = (n_bytes + DMA_MAX_SIZE) // values.itemsize
n = f.write(values[start:end])
if n == 0 or n is None:
raise RuntimeError("Write failed")
# if we write a partial array element, wind back the clock,
# write the whole thing next loop
# (not sure if this is possible)
n_bytes += n - (n % values.itemsize)
@cached_property
def n_partitions(self) -> int:
"""Number of Flash Partition Table (FPT) entries."""
json_filename = None
with NamedTemporaryFile() as temp_file:
json_filename = temp_file.name
# ami_tool demands a filename that does not exist, no option to overwrite!
# (above context handler will delete the file when it closes)
command = (
f"ami_tool cfgmem_info -d {self._bdf} -t {self._boot_type} "
f"-f json -o {json_filename}"
)
info_command = subprocess.run(command.split(" "), capture_output=True)
if info_command.returncode != 0:
self.logger.error(info_command.stderr.decode("utf"))
raise RuntimeError("Failed to read number of partitions")
with open(json_filename, "r", encoding="utf-8") as temp_file:
fpt = json.load(temp_file)
# to be polite, we should delete the file
os.remove(json_filename)
return len(fpt["partitions"])
[docs]
def _get_bar_address(self, raw_address) -> tuple[int, int]:
"""Get PCIe BAR number and byte address from a given raw byte address."""
if raw_address < self.bar_address_split:
return self._bars[0], raw_address
return self._bars[1], raw_address - self.bar_address_split