Source code for ska_low_cbf_fpga.args_ami_tool

# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""ARGS FPGA driver using ``ami_tool``."""
import os
import subprocess
import warnings
from typing import List, Optional, Union

import numpy as np

from ska_low_cbf_fpga import WORD_SIZE, ArgsFpgaDriver


[docs]def _parse_read_string(read: str) -> list: """Decode the output from an ``ami_tool bar_rd`` command.""" values = [] for line in read.splitlines(): if not line.startswith("["): continue # after ] is a tab and a space! raw_values = line.split(" ] ")[1].split(" ") values += list(int(raw, 16) for raw in raw_values) return values
[docs]def _uuid_mismatch(cfgmem_program_output: str) -> bool: """ Check if incoming and current UUIDs differ. :param cfgmem_program_output: output of ``ami_tool cfgmem_program`` command :return: ``True`` if current and incoming UUIDs differ, ``False`` otherwise """ uuids = {"current": None, "incoming": None} section = None for line in cfgmem_program_output.splitlines(): if "Incoming" in line: section = "incoming" elif "Current" in line: section = "current" if section and "UUID" in line: uuids[section] = line.split(" | ")[-1] if uuids["current"] is None: return True return uuids["incoming"] != uuids["current"]
[docs]class ArgsAmi(ArgsFpgaDriver): """FPGA driver that wraps ``ami_tool``."""
[docs] def _check_magic(self): warnings.warn("Skipping ARGS magic number check!!!")
[docs] def _setup( self, device: str = "0", pdi_file: Optional[str] = None, **kwargs, ): """ Set up the FPGA driver. :param device: PCIe BDF address. Can be partial, don't need the zeros. :param pdi_file: Path to .pdi file to load. """ self._bdf = device self._bar = 0 # TODO - is it always 0? self._pdi_file = pdi_file if os.getuid() != 0: warnings.warn( "Not running as root, expect register access to fail!" )
[docs] def _load_firmware(self): """Load a firmware file into the FPGA.""" if self._pdi_file is None: self.logger.warning( "No .pdi file specified, using whatever is on the card..." ) return self.logger.info(f"Firmware requested: {self._pdi_file}") if not os.path.exists(self._pdi_file): raise FileNotFoundError(f"No such file: {self._pdi_file}") # Firmware loading is slow, so we check if it is needed # "-p 0" means partition zero, which is loaded on boot command = ( f"ami_tool cfgmem_program -d {self._bdf} -i {self._pdi_file} -p 0" ) check_command = subprocess.run( command.split(" "), capture_output=True, input=b"n", ) if _uuid_mismatch(check_command.stdout.decode("utf")): program_command = subprocess.run( command.split(" "), capture_output=True, input=b"Y", ) if "successfully" not in program_command.stdout.decode("utf"): raise RuntimeError("Loading firmware FAILED") self.logger.info("Loading firmware complete") else: self.logger.info("Not loading - firmware already active")
[docs] def _init_buffers(self): """Initialise memory buffers.""" pass
[docs] def read(self, source: int, length: int = 1) -> Union[List[int], int]: """Read FPGA registers.""" addr = source command = f"ami_tool bar_rd -d {self._bdf} -b {self._bar} -a {addr} -l {length}" read_command = subprocess.run(command.split(" "), capture_output=True) values = _parse_read_string(read_command.stdout.decode("utf")) if length == 1: return values[0] return values
[docs] def write( self, destination: int, values: Union[int, np.ndarray, List[int]] ) -> None: """Write to FPGA registers.""" if isinstance(values, int): values = [values] # Note that the "-I" option allows using a file with multiple values... # Use a single 32 bit hex value per line (e.g. "0xffffffff") for word, value in enumerate(values): addr = destination + word * WORD_SIZE command = f"ami_tool bar_wr -d {self._bdf} -b {self._bar} -a {addr} -i {value}" write_command = subprocess.run( command.split(" "), input=b"Y", capture_output=True ) if "Success" not in write_command.stdout.decode("utf"): raise RuntimeError("Write failed")
[docs] def read_memory( self, index: int, size_bytes: int = None, offset_bytes: int = 0 ) -> np.ndarray: """Read from HBM.""" # TODO - this should be quite similar to `read`, # with an address offset of 0x040_0000_0000_000 # Let's test read before getting too ahead of ourselves. raise NotImplementedError
[docs] def write_memory( self, index: int, values: np.ndarray, offset_bytes: int = 0 ): """Write to HBM.""" # TODO - this should be quite similar to `write`, # with an address offset of 0x040_0000_0000_000 # Let's test write before getting too ahead of ourselves. raise NotImplementedError