Source code for ska_low_cbf_fpga.driver

# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""General driver utilities"""
import json
import logging
import os
import subprocess
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Optional, Union

from ska_low_cbf_fpga import ArgsMap, ArgsSimulator
from ska_low_cbf_fpga.ami_info import AmiInfo
from ska_low_cbf_fpga.args_ami_tool import (
    ArgsAmi,
    PartitionSpec,
    parse_hbm_addresses_file,
)
from ska_low_cbf_fpga.args_fpga import ArgsFpgaDriver, FpgaRegisterError
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo


[docs] def create_driver_map_info( logger: Optional[logging.Logger] = None, mem_config: Union[list, str] = "", fpgamap_path: Optional[str] = None, firmware_path: Optional[str] = None, device: str = "0", partition: Optional[PartitionSpec] = None, simulate: Optional[bool] = None, verify_registers: Optional[dict[str, dict[str, int]]] = None, qdma_read: Optional[str] = None, qdma_write: Optional[str] = None, ) -> tuple[ArgsFpgaDriver, Optional[ArgsMap], Optional[FpgaHardwareInfo]]: """ Create :py:class:`ArgsFpgaDriver`, :py:class:`ArgsMap`, :py:class:`FpgaHardwareInfo` Will use real FPGA hardware if present, unless simulate is explicitly set to True. If simulate is ``True`` or no FPGA is present, ``fpgamap_path`` must be provided for :py:class:`ArgsSimulator` to use. :param logger: Logger object to pass along for log outputs :param mem_config: FPGA memory access configuration :param fpgamap_path: Path to ``fpgamap_<numbers>.py`` file to be used :param firmware_path: path to a .pdi FPGA image (for AMI FPGA cards), or .xclbin FPGA kernel (for XRT FPGA cards). Optional for AMI FPGA cards (will use whatever is active on the card), required for XRT FPGA cards. :param device: PCIe Board:Device.Function address :param partition: Partition number and boot device type to use for AMI FPGA cards. :param simulate: if True, simulate FPGA register/memory access :param verify_registers: Register values used to verify that we have loaded the correct firmware image. Allows bypassing the long programming step for AMI cards without relying on UUIDs. e.g. ``{"system": {"commit_short_hash": 0x12345678} }`` This option may disappear if we can get the .pdi file UUID working. Not checked for XRT FPGAs - underlying driver relies on UUID. :param qdma_read: Path to QDMA read (c2h) character device. AMI only. :param qdma_write: Path to QDMA write (h2c) character device. AMI only. :return: ``(driver, map, info)`` ``driver`` is only created when either: - fpgamap_path is supplied or - an FPGA is present, firmware_path is supplied, and simulate is False. To guarantee creation of driver, supply both fpgamap_path and a firmware file! ``map`` is only created when an fpgamap is present (either explicitly specified or implicitly in the firmware directory). If no fpgamap provided or found, the return value of map will be ``None``. ``info`` is only created if XRT FPGA hardware is present. If no XRT FPGA, the return value of info will be ``None``. ``simulate`` has no effect here, if info is not None then it is real hardware info. :raises ValueError: if both types of firmware are given :raises RuntimeError: if supplied firmware not suitable for detected FPGA (AMI and XRT are mutually exclusive), unknown file extension in ``firmware_path``, or mismatch between ``verify_registers`` and FPGA """ if logger is None: logger = logging.getLogger() firmware_ext = None if firmware_path: firmware_ext = os.path.splitext(firmware_path)[1].lower() if firmware_ext not in (".pdi", ".xclbin"): raise ValueError( "firmware_path must end in either '.pdi' or '.xclbin'. " f"Got {firmware_path}" ) ami_fpga = ami_fpga_present() if ami_fpga and firmware_ext == ".xclbin": raise RuntimeError("xclbin file supplied but AMI FPGA detected") if ami_fpga and partition is None: raise RuntimeError("partition argument required for AMI FPGA") xrt_fpga = xrt_fpga_present() if xrt_fpga and firmware_ext == ".pdi": raise RuntimeError("pdi file supplied but XRT FPGA detected") if xrt_fpga and partition is not None: raise RuntimeError("partition argument not supported for XRT FPGA") hardware_present = ami_fpga or xrt_fpga simulate_fpga = simulate or not hardware_present logger.info( f"Hardware {'is' if hardware_present else 'not'} present, " f"using {'simulated' if simulate_fpga else 'real'} FPGA." ) driver: ArgsFpgaDriver | None = None map_ = None driver_args: dict[str, Any] = { "device": device, "logger": logger, "mem_config": mem_config, } if simulate_fpga: if fpgamap_path is None: raise RuntimeError("fpgamap_path must be supplied when simulating FPGA") driver_args["fpgamap_path"] = fpgamap_path driver = _create_driver(ArgsSimulator.create_from_file, driver_args, logger) map_ = _load_map(driver, firmware_path, fpgamap_path, logger) elif xrt_fpga: if firmware_path is None: logging.warning("XRT FPGA, no firmware, no driver will be created") else: from ska_low_cbf_fpga import ArgsXrt # For XRT, we always provide the firmware file # (lower level software skips the upload step if UUID matches) driver_args["xcl_file"] = firmware_path driver = _create_driver(ArgsXrt, driver_args, logger) map_ = _load_map(driver, firmware_path, fpgamap_path, logger) elif ami_fpga: driver_args["pdi_file"] = firmware_path driver_args["partition"] = partition driver_args["qdma_read"] = qdma_read driver_args["qdma_write"] = qdma_write try: driver_args["hbm_offsets"] = parse_hbm_addresses_file( Path(firmware_path or fpgamap_path).parent / "addresses.hbm" ) except (FileNotFoundError, TypeError): # FileNotFoundError if no addresses.hbm file, # TypeError if both paths are None. # We won't have HBM access, driver will emit a warning. driver_args["hbm_offsets"] = None driver = _create_driver(ArgsAmi, driver_args, logger) map_ = _load_map(driver, firmware_path, fpgamap_path, logger) if map_ is not None: # Now we have our AMI driver and ARGS map, we can set the BAR split value split = driver.read(map_["system"]["v80_args_bar_address_split"].address) logger.debug("Setting BAR split to %#010x", split) driver.bar_address_split = split else: # impossible, but just in case of future expansion... raise RuntimeError(f"Unrecognized firmware extension {firmware_ext}") _verify_registers(verify_registers, driver, map_, logger) info = None if ami_fpga: ami_info_env_vars = {"FPGA_SYS_DIR_0", "FPGA_HWMON_DIR"} if ami_info_env_vars.issubset(os.environ): logger.debug("Creating AmiInfo from environment variables.") info = AmiInfo( sys_dir_0=os.environ["FPGA_SYS_DIR_0"], hwmon_dir=os.environ["FPGA_HWMON_DIR"], ) else: logger.error( "No AmiInfo object will be created! " "Missing required environment variable(s): %s", ami_info_env_vars.difference(os.environ), ) if xrt_fpga: from ska_low_cbf_fpga.args_xrt import ArgsXrt from ska_low_cbf_fpga.xrt_info import XrtInfo if isinstance(driver, ArgsXrt): info_device = driver._device else: info_device = device logger.debug(f"Creating hardware info object for device {device}") info = XrtInfo(info_device) return driver, map_, info
[docs] def _verify_registers( verify_registers: dict[str, dict[str, int]] | None, driver: ArgsFpgaDriver, map_: ArgsMap, logger: logging.Logger, ) -> None: """ Verify register values in FPGA. :param verify_registers: Register values to verify. e.g. To check for Correlator firmware: .. code:: python { "system": {"firmware_personality": int.from_bytes(b"CORR", "big")} } """ if verify_registers is None or verify_registers == {}: logger.info("Not verifying any FPGA register values") return # check if active image matches request if all( driver.read(map_[peripheral][field].address) == value for peripheral in verify_registers for field, value in verify_registers[peripheral].items() ): logger.info(f"Verified registers: {verify_registers}") return logger.error("Required register values: %s", verify_registers) logger.error( "Actual register values: %s", { peripheral: { field: driver.read(map_[peripheral][field].address) for field in verify_registers[peripheral] } for peripheral in verify_registers }, ) raise RuntimeError("Register verification failed")
[docs] def _create_driver( driver_class: type[ArgsFpgaDriver], driver_args: dict, logger: logging.Logger ) -> ArgsFpgaDriver: """ Create an ``ArgsFpgaDriver``-derived class with given arguments. :param driver_args: Arguments to pass to driver constructor. :param driver_class: Derived class to create. :param logger: For logging. """ logger.debug(f"Creating {driver_class.__name__} with arguments: {driver_args}") return driver_class(**driver_args)
[docs] def _load_map( driver: ArgsFpgaDriver, firmware_path: str, fpgamap_path: str, logger: logging.Logger, ) -> ArgsMap | None: """Create an ArgsMap (if possible), using the build timestamp from the driver.""" if not (fpgamap_path or firmware_path): return None # use FPGA map if provided, else look in directory of firmware map_dir = os.path.dirname(fpgamap_path or firmware_path) try: build = driver.get_map_build() logger.debug(f"Looking for map build {hex(build)} in {map_dir}") return ArgsMap.create_from_file(build, map_dir) except FpgaRegisterError as e: # this is not necessarily a fatal problem, it will occur when a user # creates a driver for a real FPGA with no personality message = f"Failed to read map build from FPGA: {e}" logger.info(message) except FileNotFoundError as fnf_error: logger.warning( f"Failed to find map file for build {hex(build)} in {map_dir}: {fnf_error}" ) return None
[docs] def xrt_fpga_present() -> bool: """ Check if XRT-based FPGA hardware is available. :return: True if XRT FPGA present, False if not :raises FileNotFoundError: if pyxrt is available but xbutil is not :raises RuntimeError: if xbutil doesn't exit cleanly """ try: import pyxrt # noqa: F401 temp = NamedTemporaryFile() process = subprocess.run( [ "xbutil", "examine", "--format", "json", "--output", temp.name, "--force", # required because we create the file first ], capture_output=True, ) if process.returncode != 0: raise RuntimeError( "Problem running xbutil.\n" f"stdout: {process.stdout}\n" f"stderr: {process.stderr}" ) xbutil_examine = json.load(temp) return bool(xbutil_examine["system"]["host"]["devices"]) except ModuleNotFoundError: # no pyxrt library, cannot possibly access FPGA return False
[docs] def ami_fpga_present() -> bool: """ Check if AMI-based FPGA hardware is available. :return: True if AMI-based FPGA found, False otherwise :raises RuntimeError: if ami_tool exists but doesn't exit cleanly """ try: process = subprocess.run(["ami_tool", "overview"], capture_output=True) except FileNotFoundError: # no ami_tool, therefore no interface to AMI-based FPGA return False if process.returncode != 0: raise RuntimeError( "Problem running ami_tool.\n" f"stdout: {process.stdout}\n" f"stderr: {process.stderr}" ) return "ALVEO" in process.stdout.decode("utf")