Source code for ska_low_cbf_fpga.args_simulator

import os.path
import re
import typing

import numpy as np

from .args_fpga import (
    WORD_SIZE,
    ArgsFpgaDriver,
    ArgsWordType,
    str_from_int_bytes,
)
from .args_map import load_map

"""
Simulate an ARGS-based FPGA, for development/testing.
"""


[docs]class ArgsSimulator(ArgsFpgaDriver): """ FPGA Simulation driver. """
[docs] @classmethod def create_from_file( cls, fpgamap_path: str, mem_config: list = [], **kwargs ): """ :param fpgamap_path: Path to fpgamap_<numbers>.py file to be simulated. :param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>). The first list item is the ARGS interchange buffer (which would be used to send/receive register values in a real FPGA). """ directory = os.path.dirname(fpgamap_path) filename = os.path.basename(fpgamap_path) pattern = re.compile("fpgamap_([\\d]+)\\.py") try: build = int(pattern.match(filename)[1], 16) except (IndexError, TypeError): raise ValueError( f"Filename {filename} didn't match 'fpgamap_<numbers>.py'" ) fpga_map = load_map(build, directory) return cls(fpga_map=fpga_map, mem_config=mem_config)
def _setup( self, fpga_map: typing.Union[dict, None] = None, mem_config: typing.Union[list, str] = "", **kwargs, ): """ :param fpga_map: FPGA address map to be simulated. :param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>). The first list item is the ARGS interchange buffer (which would be used to send/receive register values in a real FPGA). """ self._fpga_map = fpga_map self._shared_bufs = [] """List of buffer indexes that are shared between host & FPGA""" self._hbm = [] """numpy buffers used to simulate HBM""" def _load_firmware(self): """Create register simulation table & set initial values""" max_word_address = 0 # find address space and create appropriately sized buffer for p_name, peripheral in self._fpga_map.items(): max_word_address = max(max_word_address, peripheral["stop"]) self._values = np.zeros(max_word_address, dtype=ArgsWordType) # populate initial values for p_name, peripheral in self._fpga_map.items(): for s_name, slave in peripheral["slaves"].items(): if slave["type"] in ["REG", "RAM"]: for f_name, field in slave["fields"].items(): field_word_address = ( peripheral["start"] + slave["start"] + field["start"] ) try: self._values[field_word_address] = field["default"] except KeyError: self._values[field_word_address] = 0 def _init_buffers(self): """Create HBM simulation buffers""" for index, memory in enumerate(self._mem_config): if memory.shared: self.logger.debug( "allocating {} of shared memory".format( str_from_int_bytes(memory.size) ) ) if memory.size % WORD_SIZE: raise ValueError( f"memory size not a multiple of {WORD_SIZE} bytes" ) else: self._hbm.append( np.zeros(memory.size // WORD_SIZE).astype(ArgsWordType) ) self._shared_bufs.append(index)
[docs] def read(self, source, length: int = 1): """ Read values from simulated FPGA register space :param source: Byte address to start reading from :param length: Number of words to read :return: value(s) from simulated registers """ assert source % WORD_SIZE == 0 word_address = source // WORD_SIZE result = self._values[word_address : word_address + length] self.logger.debug( "Read address 0x%x, length %d: %s", source, length, result, ) if length == 1: return int(result) return result
[docs] def write(self, destination, values): """ Write values to simulated FPGA registers :param destination: FPGA byte address where writes should start :param values: value(s) to write, if more than one value they must be words, to be written to consecutive words (i.e. byte addresses increment by WORD_SIZE) """ assert destination % 4 == 0 word_address = destination // 4 # ensure we have a numpy array, for consistent logging if isinstance(values, (int, np.integer)) or ( isinstance(values, np.ndarray) and values.ndim == 0 ): values = np.array(values, dtype=ArgsWordType, ndmin=1) if not isinstance(values, np.ndarray) or values.dtype != ArgsWordType: raise TypeError(f"Don't know how to write type {type(values)}") self.logger.debug("Write address 0x%x: %s", destination, values) self._values[word_address : word_address + values.size] = values
[docs] def read_memory( self, index: int, size_bytes: int = None, offset_bytes: int = 0 ) -> np.ndarray: """ Read a shared memory buffer. :param size_bytes: number of bytes to transfer (transfers the whole buffer if not specified or None) :param offset_bytes: starting address :param index: Index of the shared buffer to save. Zero is the ARGS interchange buffer, which you probably don't want. :return: shared memory buffer """ if size_bytes: return ( self._hbm_user_index(index) .view(dtype=np.uint8)[ offset_bytes : (offset_bytes + size_bytes) ] .view(ArgsWordType) ) return self._hbm_user_index(index)
[docs] def write_memory( self, index: int, values: np.ndarray, offset_bytes: int = 0 ): """ Write to a shared memory buffer. :param index: Index of the shared buffer to write to. Zero is the ARGS interchange buffer, which you probably don't want. :param values: Data to write. :param offset_bytes: Byte-based offset where values should start in buffer. """ self._hbm_user_index(index).view(dtype=np.uint8)[ offset_bytes : offset_bytes + values.size ] = values
def _hbm_user_index(self, user_index: int) -> np.ndarray: """ Lookup a user-facing index number in our internal simulated HBM structures. :param user_index: reference to memory as specified on command line, e.g. if given "1Gi:1Gs" as memory config, then the only valid request here is 2 (there is no access to FPGA-internal memories, by definition) :return: the internal data buffer :raises AssertionError: if invalid index requested """ assert user_index in self._shared_bufs, ( f"Buffer #{user_index} is not a shared memory. " f"Shared memories are: {self._shared_bufs}" ) return self._hbm[self._shared_bufs.index(user_index)]