Source code for ska_low_cbf_fpga.driver

# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""General driver utilities"""

import json
import logging
import os
import subprocess
import warnings
from tempfile import NamedTemporaryFile
from typing import Optional, Union

from ska_low_cbf_fpga import ArgsMap, ArgsSimulator
from ska_low_cbf_fpga.args_fpga import ArgsFpgaDriver, FpgaRegisterError
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo


[docs]def create_driver_map_info( logger: Optional[logging.Logger] = None, mem_config: Union[list, str] = "", fpgamap_path: Optional[str] = None, xcl_file: Optional[str] = None, firmware_path: Optional[str] = None, device: str = "0", simulate: Optional[bool] = None, ) -> (ArgsFpgaDriver, Optional[ArgsMap], Optional[FpgaHardwareInfo]): """ Create :py:class:`ArgsFpgaDriver`, :py:class:`ArgsMap`, :py:class:`FpgaHardwareInfo` Will use real FPGA hardware if present, unless simulate is explicitly set to True. If simulate is ``True`` or no FPGA is present, ``fpgamap_path`` must be provided for :py:class:`ArgsSimulator` to use. :param logger: Logger object to pass along for log outputs :param mem_config: FPGA memory access configuration :param fpgamap_path: Path to ``fpgamap_<numbers>.py`` file to be used :param xcl_file: Deprecated, use ``firmware_path`` instead (equivalent) :param firmware_path: path to a .pdi FPGA image (for AMI FPGA cards), or .xclbin FPGA kernel (for XRT FPGA cards) :param device: PCIe Board:Device.Function address :param simulate: if True, simulate FPGA register/memory access :return: ``(driver, map, info)`` ``driver`` is only created when either: - fpgamap_path is supplied or - an FPGA is present, either xcl_file or pdi_file is supplied, and simulate is False. To guarantee creation of driver, supply both fpgamap_path and a firmware file! ``map`` is only created when an fpgamap is present (either explicitly specified or implicitly in the xclbin's directory). If no fpgamap, the return value of map will be ``None``. ``info`` is only created if XRT FPGA hardware is present. If no XRT FPGA, the return value of info will be ``None``. ``simulate`` has no effect here, if info is not None then it is real hardware info. :raises ValueError: if both types of firmware are given :raises RuntimeError: if supplied firmware not suitable for detected FPGA (AMI and XRT are mutually exclusive) """ if logger is None: logger = logging.getLogger() # Backwards compatibility for old argument name if xcl_file: warnings.warn( "xcl_file is old news, use firmware_path", DeprecationWarning ) firmware_path = firmware_path or xcl_file firmware_ext = None if firmware_path: firmware_ext = os.path.splitext(firmware_path)[1].lower() if firmware_ext not in (".pdi", ".xclbin"): raise ValueError( "firmware_path must end in either '.pdi' or '.xclbin'. " f"Got {firmware_path}" ) ami_fpga = ami_fpga_present() if ami_fpga and firmware_ext == ".xclbin": raise RuntimeError("xclbin file supplied but AMI FPGA detected") xrt_fpga = xrt_fpga_present() if xrt_fpga and firmware_ext == ".pdi": raise RuntimeError("pdi file supplied but XRT FPGA detected") hardware_present = ami_fpga or xrt_fpga simulate_fpga = simulate or not hardware_present logger.info( f"Hardware {'is' if hardware_present else 'not'} present, " f"using {'simulated' if simulate_fpga else 'real'} FPGA." ) driver_class = None if fpgamap_path and simulate_fpga: driver_class = ArgsSimulator.create_from_file elif firmware_path and not simulate_fpga: if firmware_ext == ".xclbin": from ska_low_cbf_fpga import ArgsXrt driver_class = ArgsXrt elif firmware_ext == ".pdi": from ska_low_cbf_fpga import ArgsAmi driver_class = ArgsAmi else: # impossible, but just in case of future expansion... raise RuntimeError( f"Unrecognized firmware extension {firmware_ext}" ) driver = None if driver_class: logger.debug(f"Creating instance of {driver_class}") driver = driver_class( logger=logger, mem_config=mem_config, fpgamap_path=fpgamap_path, xcl_file=firmware_path, # for ArgsXrt pdi_file=firmware_path, # for ArgsAmi device=device, ) map_ = None if fpgamap_path or firmware_path: # use FPGA map if provided, else look in directory of firmware map_dir = os.path.dirname(fpgamap_path or firmware_path) try: build = driver.get_map_build() logger.debug(f"Looking for map build {hex(build)} in {map_dir}") map_ = ArgsMap.create_from_file(build, map_dir) except FpgaRegisterError as e: # this is not necessarily a fatal problem, it will occur when user # creates a driver for a real FPGA with no personality message = f"Failed to read map build from FPGA: {e}" logger.info(message) info = None if xrt_fpga: from ska_low_cbf_fpga.args_xrt import ArgsXrt from ska_low_cbf_fpga.xrt_info import XrtInfo if isinstance(driver, ArgsXrt): info_device = driver._device else: info_device = device logger.debug(f"Creating hardware info object for device {device}") info = XrtInfo(info_device) return driver, map_, info
[docs]def xrt_fpga_present() -> bool: """ Check if XRT-based FPGA hardware is available. :return: True if XRT FPGA present, False if not :raises FileNotFoundError: if pyxrt is available but xbutil is not :raises RuntimeError: if xbutil doesn't exit cleanly """ try: import pyxrt # noqa: F401 temp = NamedTemporaryFile() process = subprocess.run( [ "xbutil", "examine", "--format", "json", "--output", temp.name, "--force", # required because we create the file first ], capture_output=True, ) if process.returncode != 0: raise RuntimeError( "Problem running xbutil.\n" f"stdout: {process.stdout}\n" f"stderr: {process.stderr}" ) xbutil_examine = json.load(temp) return bool(xbutil_examine["system"]["host"]["devices"]) except ModuleNotFoundError: # no pyxrt library, cannot possibly access FPGA return False
[docs]def ami_fpga_present() -> bool: """ Check if AMI-based FPGA hardware is available. :return: True if AMI-based FPGA found, False otherwise :raises RuntimeError: if ami_tool exists but doesn't exit cleanly """ try: process = subprocess.run(["ami_tool", "overview"], capture_output=True) except FileNotFoundError: # no ami_tool, therefore no interface to AMI-based FPGA return False if process.returncode != 0: raise RuntimeError( "Problem running ami_tool.\n" f"stdout: {process.stdout}\n" f"stderr: {process.stderr}" ) return "ALVEO" in process.stdout.decode("utf")