import os.path
import re
import typing
import numpy as np
from .args_fpga import (
WORD_SIZE,
ArgsFpgaDriver,
ArgsWordType,
str_from_int_bytes,
)
from .args_map import load_map
"""
Simulate an ARGS-based FPGA, for development/testing.
"""
[docs]class ArgsSimulator(ArgsFpgaDriver):
"""
FPGA Simulation driver.
"""
[docs] @classmethod
def create_from_file(
cls, fpgamap_path: str, mem_config: list = [], **kwargs
):
"""
:param fpgamap_path: Path to fpgamap_<numbers>.py file to be simulated.
:param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>).
The first list item is the ARGS interchange buffer (which would be
used to send/receive register values in a real FPGA).
"""
directory = os.path.dirname(fpgamap_path)
filename = os.path.basename(fpgamap_path)
pattern = re.compile("fpgamap_([\\d]+)\\.py")
try:
build = int(pattern.match(filename)[1], 16)
except (IndexError, TypeError):
raise ValueError(
f"Filename {filename} didn't match 'fpgamap_<numbers>.py'"
)
fpga_map = load_map(build, directory)
return cls(fpga_map=fpga_map, mem_config=mem_config)
def _setup(
self,
fpga_map: typing.Union[dict, None] = None,
mem_config: typing.Union[list, str] = "",
**kwargs,
):
"""
:param fpga_map: FPGA address map to be simulated.
:param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>).
The first list item is the ARGS interchange buffer (which would be
used to send/receive register values in a real FPGA).
"""
self._fpga_map = fpga_map
self._shared_bufs = []
"""List of buffer indexes that are shared between host & FPGA"""
self._hbm = []
"""numpy buffers used to simulate HBM"""
def _load_firmware(self):
"""Create register simulation table & set initial values"""
max_word_address = 0
# find address space and create appropriately sized buffer
for p_name, peripheral in self._fpga_map.items():
max_word_address = max(max_word_address, peripheral["stop"])
self._values = np.zeros(max_word_address, dtype=ArgsWordType)
# populate initial values
for p_name, peripheral in self._fpga_map.items():
for s_name, slave in peripheral["slaves"].items():
if slave["type"] in ["REG", "RAM"]:
for f_name, field in slave["fields"].items():
field_word_address = (
peripheral["start"]
+ slave["start"]
+ field["start"]
)
try:
self._values[field_word_address] = field["default"]
except KeyError:
self._values[field_word_address] = 0
def _init_buffers(self):
"""Create HBM simulation buffers"""
for index, memory in enumerate(self._mem_config):
if memory.shared:
self.logger.debug(
"allocating {} of shared memory".format(
str_from_int_bytes(memory.size)
)
)
if memory.size % WORD_SIZE:
raise ValueError(
f"memory size not a multiple of {WORD_SIZE} bytes"
)
else:
self._hbm.append(
np.zeros(memory.size // WORD_SIZE).astype(ArgsWordType)
)
self._shared_bufs.append(index)
[docs] def read(self, source, length: int = 1):
"""
Read values from simulated FPGA register space
:param source: Byte address to start reading from
:param length: Number of words to read
:return: value(s) from simulated registers
"""
assert source % WORD_SIZE == 0
word_address = source // WORD_SIZE
result = self._values[word_address : word_address + length]
self.logger.debug(
"Read address 0x%x, length %d: %s",
source,
length,
result,
)
if length == 1:
return int(result)
return result
[docs] def write(self, destination, values):
"""
Write values to simulated FPGA registers
:param destination: FPGA byte address where writes should start
:param values: value(s) to write, if more than one value they must be words,
to be written to consecutive words (i.e. byte addresses increment by WORD_SIZE)
"""
assert destination % 4 == 0
word_address = destination // 4
# ensure we have a numpy array, for consistent logging
if isinstance(values, (int, np.integer)) or (
isinstance(values, np.ndarray) and values.ndim == 0
):
values = np.array(values, dtype=ArgsWordType, ndmin=1)
if not isinstance(values, np.ndarray) or values.dtype != ArgsWordType:
raise TypeError(f"Don't know how to write type {type(values)}")
self.logger.debug("Write address 0x%x: %s", destination, values)
self._values[word_address : word_address + values.size] = values
[docs] def read_memory(
self, index: int, size_bytes: int = None, offset_bytes: int = 0
) -> np.ndarray:
"""
Read a shared memory buffer.
:param size_bytes: number of bytes to transfer
(transfers the whole buffer if not specified or None)
:param offset_bytes: starting address
:param index: Index of the shared buffer to save.
Zero is the ARGS interchange buffer, which you probably don't want.
:return: shared memory buffer
"""
if size_bytes:
return (
self._hbm_user_index(index)
.view(dtype=np.uint8)[
offset_bytes : (offset_bytes + size_bytes)
]
.view(ArgsWordType)
)
return self._hbm_user_index(index)
[docs] def write_memory(
self, index: int, values: np.ndarray, offset_bytes: int = 0
):
"""
Write to a shared memory buffer.
:param index: Index of the shared buffer to write to.
Zero is the ARGS interchange buffer, which you probably don't want.
:param values: Data to write.
:param offset_bytes: Byte-based offset where values should start in
buffer.
"""
self._hbm_user_index(index).view(dtype=np.uint8)[
offset_bytes : offset_bytes + values.size
] = values
def _hbm_user_index(self, user_index: int) -> np.ndarray:
"""
Lookup a user-facing index number in our internal simulated HBM
structures.
:param user_index: reference to memory as specified on command line,
e.g. if given "1Gi:1Gs" as memory config, then the only valid request
here is 2 (there is no access to FPGA-internal memories, by definition)
:return: the internal data buffer
:raises AssertionError: if invalid index requested
"""
assert user_index in self._shared_bufs, (
f"Buffer #{user_index} is not a shared memory. "
f"Shared memories are: {self._shared_bufs}"
)
return self._hbm[self._shared_bufs.index(user_index)]