# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""General driver utilities"""
import json
import logging
import os
import subprocess
import warnings
from tempfile import NamedTemporaryFile
from typing import Optional, Union
from ska_low_cbf_fpga import ArgsMap, ArgsSimulator
from ska_low_cbf_fpga.args_fpga import ArgsFpgaDriver, FpgaRegisterError
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo
[docs]def create_driver_map_info(
logger: Optional[logging.Logger] = None,
mem_config: Union[list, str] = "",
fpgamap_path: Optional[str] = None,
xcl_file: Optional[str] = None,
firmware_path: Optional[str] = None,
device: str = "0",
simulate: Optional[bool] = None,
) -> (ArgsFpgaDriver, Optional[ArgsMap], Optional[FpgaHardwareInfo]):
"""
Create :py:class:`ArgsFpgaDriver`, :py:class:`ArgsMap`, :py:class:`FpgaHardwareInfo`
Will use real FPGA hardware if present, unless simulate is explicitly set to True.
If simulate is ``True`` or no FPGA is present, ``fpgamap_path`` must be provided for
:py:class:`ArgsSimulator` to use.
:param logger: Logger object to pass along for log outputs
:param mem_config: FPGA memory access configuration
:param fpgamap_path: Path to ``fpgamap_<numbers>.py`` file to be used
:param xcl_file: Deprecated, use ``firmware_path`` instead (equivalent)
:param firmware_path: path to a .pdi FPGA image (for AMI FPGA cards),
or .xclbin FPGA kernel (for XRT FPGA cards)
:param device: PCIe Board:Device.Function address
:param simulate: if True, simulate FPGA register/memory access
:return: ``(driver, map, info)``
``driver`` is only created when either:
- fpgamap_path is supplied or
- an FPGA is present, either xcl_file or pdi_file is supplied, and simulate is
False.
To guarantee creation of driver, supply both fpgamap_path and a firmware file!
``map`` is only created when an fpgamap is present (either explicitly
specified or implicitly in the xclbin's directory). If no fpgamap,
the return value of map will be ``None``.
``info`` is only created if XRT FPGA hardware is present. If no XRT FPGA, the
return value of info will be ``None``. ``simulate`` has no effect here, if
info is not None then it is real hardware info.
:raises ValueError: if both types of firmware are given
:raises RuntimeError: if supplied firmware not suitable for detected FPGA
(AMI and XRT are mutually exclusive)
"""
if logger is None:
logger = logging.getLogger()
# Backwards compatibility for old argument name
if xcl_file:
warnings.warn(
"xcl_file is old news, use firmware_path", DeprecationWarning
)
firmware_path = firmware_path or xcl_file
firmware_ext = None
if firmware_path:
firmware_ext = os.path.splitext(firmware_path)[1].lower()
if firmware_ext not in (".pdi", ".xclbin"):
raise ValueError(
"firmware_path must end in either '.pdi' or '.xclbin'. "
f"Got {firmware_path}"
)
ami_fpga = ami_fpga_present()
if ami_fpga and firmware_ext == ".xclbin":
raise RuntimeError("xclbin file supplied but AMI FPGA detected")
xrt_fpga = xrt_fpga_present()
if xrt_fpga and firmware_ext == ".pdi":
raise RuntimeError("pdi file supplied but XRT FPGA detected")
hardware_present = ami_fpga or xrt_fpga
simulate_fpga = simulate or not hardware_present
logger.info(
f"Hardware {'is' if hardware_present else 'not'} present, "
f"using {'simulated' if simulate_fpga else 'real'} FPGA."
)
driver_class = None
if fpgamap_path and simulate_fpga:
driver_class = ArgsSimulator.create_from_file
elif firmware_path and not simulate_fpga:
if firmware_ext == ".xclbin":
from ska_low_cbf_fpga import ArgsXrt
driver_class = ArgsXrt
elif firmware_ext == ".pdi":
from ska_low_cbf_fpga import ArgsAmi
driver_class = ArgsAmi
else:
# impossible, but just in case of future expansion...
raise RuntimeError(
f"Unrecognized firmware extension {firmware_ext}"
)
driver = None
if driver_class:
logger.debug(f"Creating instance of {driver_class}")
driver = driver_class(
logger=logger,
mem_config=mem_config,
fpgamap_path=fpgamap_path,
xcl_file=firmware_path, # for ArgsXrt
pdi_file=firmware_path, # for ArgsAmi
device=device,
)
map_ = None
if fpgamap_path or firmware_path:
# use FPGA map if provided, else look in directory of firmware
map_dir = os.path.dirname(fpgamap_path or firmware_path)
try:
build = driver.get_map_build()
logger.debug(f"Looking for map build {hex(build)} in {map_dir}")
map_ = ArgsMap.create_from_file(build, map_dir)
except FpgaRegisterError as e:
# this is not necessarily a fatal problem, it will occur when user
# creates a driver for a real FPGA with no personality
message = f"Failed to read map build from FPGA: {e}"
logger.info(message)
info = None
if xrt_fpga:
from ska_low_cbf_fpga.args_xrt import ArgsXrt
from ska_low_cbf_fpga.xrt_info import XrtInfo
if isinstance(driver, ArgsXrt):
info_device = driver._device
else:
info_device = device
logger.debug(f"Creating hardware info object for device {device}")
info = XrtInfo(info_device)
return driver, map_, info
[docs]def xrt_fpga_present() -> bool:
"""
Check if XRT-based FPGA hardware is available.
:return: True if XRT FPGA present, False if not
:raises FileNotFoundError: if pyxrt is available but xbutil is not
:raises RuntimeError: if xbutil doesn't exit cleanly
"""
try:
import pyxrt # noqa: F401
temp = NamedTemporaryFile()
process = subprocess.run(
[
"xbutil",
"examine",
"--format",
"json",
"--output",
temp.name,
"--force", # required because we create the file first
],
capture_output=True,
)
if process.returncode != 0:
raise RuntimeError(
"Problem running xbutil.\n"
f"stdout: {process.stdout}\n"
f"stderr: {process.stderr}"
)
xbutil_examine = json.load(temp)
return bool(xbutil_examine["system"]["host"]["devices"])
except ModuleNotFoundError:
# no pyxrt library, cannot possibly access FPGA
return False
[docs]def ami_fpga_present() -> bool:
"""
Check if AMI-based FPGA hardware is available.
:return: True if AMI-based FPGA found, False otherwise
:raises RuntimeError: if ami_tool exists but doesn't exit cleanly
"""
try:
process = subprocess.run(["ami_tool", "overview"], capture_output=True)
except FileNotFoundError:
# no ami_tool, therefore no interface to AMI-based FPGA
return False
if process.returncode != 0:
raise RuntimeError(
"Problem running ami_tool.\n"
f"stdout: {process.stdout}\n"
f"stderr: {process.stderr}"
)
return "ALVEO" in process.stdout.decode("utf")