# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""ARGS FPGA driver using ``ami_tool``."""
import os
import subprocess
import warnings
from typing import List, Optional, Union
import numpy as np
from ska_low_cbf_fpga import WORD_SIZE, ArgsFpgaDriver
[docs]def _parse_read_string(read: str) -> list:
"""Decode the output from an ``ami_tool bar_rd`` command."""
values = []
for line in read.splitlines():
if not line.startswith("["):
continue
# after ] is a tab and a space!
raw_values = line.split(" ] ")[1].split(" ")
values += list(int(raw, 16) for raw in raw_values)
return values
[docs]def _uuid_mismatch(cfgmem_program_output: str) -> bool:
"""
Check if incoming and current UUIDs differ.
:param cfgmem_program_output: output of ``ami_tool cfgmem_program`` command
:return: ``True`` if current and incoming UUIDs differ, ``False`` otherwise
"""
uuids = {"current": None, "incoming": None}
section = None
for line in cfgmem_program_output.splitlines():
if "Incoming" in line:
section = "incoming"
elif "Current" in line:
section = "current"
if section and "UUID" in line:
uuids[section] = line.split(" | ")[-1]
if uuids["current"] is None:
return True
return uuids["incoming"] != uuids["current"]
[docs]class ArgsAmi(ArgsFpgaDriver):
"""FPGA driver that wraps ``ami_tool``."""
[docs] def _check_magic(self):
warnings.warn("Skipping ARGS magic number check!!!")
[docs] def _setup(
self,
device: str = "0",
pdi_file: Optional[str] = None,
**kwargs,
):
"""
Set up the FPGA driver.
:param device: PCIe BDF address. Can be partial, don't need the zeros.
:param pdi_file: Path to .pdi file to load.
"""
self._bdf = device
self._bar = 0 # TODO - is it always 0?
self._pdi_file = pdi_file
if os.getuid() != 0:
warnings.warn(
"Not running as root, expect register access to fail!"
)
[docs] def _load_firmware(self):
"""Load a firmware file into the FPGA."""
if self._pdi_file is None:
self.logger.warning(
"No .pdi file specified, using whatever is on the card..."
)
return
self.logger.info(f"Firmware requested: {self._pdi_file}")
if not os.path.exists(self._pdi_file):
raise FileNotFoundError(f"No such file: {self._pdi_file}")
# Firmware loading is slow, so we check if it is needed
# "-p 0" means partition zero, which is loaded on boot
command = (
f"ami_tool cfgmem_program -d {self._bdf} -i {self._pdi_file} -p 0"
)
check_command = subprocess.run(
command.split(" "),
capture_output=True,
input=b"n",
)
if _uuid_mismatch(check_command.stdout.decode("utf")):
program_command = subprocess.run(
command.split(" "),
capture_output=True,
input=b"Y",
)
if "successfully" not in program_command.stdout.decode("utf"):
raise RuntimeError("Loading firmware FAILED")
self.logger.info("Loading firmware complete")
else:
self.logger.info("Not loading - firmware already active")
[docs] def _init_buffers(self):
"""Initialise memory buffers."""
pass
[docs] def read(self, source: int, length: int = 1) -> Union[List[int], int]:
"""Read FPGA registers."""
addr = source
command = f"ami_tool bar_rd -d {self._bdf} -b {self._bar} -a {addr} -l {length}"
read_command = subprocess.run(command.split(" "), capture_output=True)
values = _parse_read_string(read_command.stdout.decode("utf"))
if length == 1:
return values[0]
return values
[docs] def write(
self, destination: int, values: Union[int, np.ndarray, List[int]]
) -> None:
"""Write to FPGA registers."""
if isinstance(values, int):
values = [values]
# Note that the "-I" option allows using a file with multiple values...
# Use a single 32 bit hex value per line (e.g. "0xffffffff")
for word, value in enumerate(values):
addr = destination + word * WORD_SIZE
command = f"ami_tool bar_wr -d {self._bdf} -b {self._bar} -a {addr} -i {value}"
write_command = subprocess.run(
command.split(" "), input=b"Y", capture_output=True
)
if "Success" not in write_command.stdout.decode("utf"):
raise RuntimeError("Write failed")
[docs] def read_memory(
self, index: int, size_bytes: int = None, offset_bytes: int = 0
) -> np.ndarray:
"""Read from HBM."""
# TODO - this should be quite similar to `read`,
# with an address offset of 0x040_0000_0000_000
# Let's test read before getting too ahead of ourselves.
raise NotImplementedError
[docs] def write_memory(
self, index: int, values: np.ndarray, offset_bytes: int = 0
):
"""Write to HBM."""
# TODO - this should be quite similar to `write`,
# with an address offset of 0x040_0000_0000_000
# Let's test write before getting too ahead of ourselves.
raise NotImplementedError