Source code for ska_low_cbf_fpga.args_fpga

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence Agreement
# See LICENSE.txt for more info.

"""
Base classes to interface with ARGS-based FPGA
"""

import logging
import os
import typing
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from math import log2
from time import sleep

import numpy as np


[docs]class ArgsWordType( np.uint32 ): # type must match internal FPGA addressing scheme! """Data type used for ARGS words.""" pass
WORD_SIZE = 4 # Sphinx docs can't handle using ArgsWordType(1).nbytes here? """Size of ARGS words, in bytes.""" ARGS_SHARED_ADDRESS = 0x8000 * WORD_SIZE ARGS_MAGIC = 0xF96A7001 ARGS_MAGIC_ADDRESS = 0 * WORD_SIZE ARGS_BUILD_ADDRESS = 1 * WORD_SIZE
[docs]@dataclass class MemConfig: """Memory buffer configuration info""" size: int # in bytes shared: bool # False = FPGA internal
EXCHANGE_BUF_CONFIG = MemConfig(128 << 10, True) """ARGS register interchange buffer configuration."""
[docs]def mem_config_check(mem_config: typing.Union[list, str] = ""): """ :param mem_config: a list of MemConfig tuples (<size in bytes>, <shared?>) the first list item is used to send/receive register values """ if ( mem_config and isinstance(mem_config, list) and all(isinstance(item, MemConfig) for item in mem_config) ): return mem_config elif isinstance(mem_config, str): return mem_parse(mem_config) else: # TODO next version, raise an exception instead message = ( "you should supply a list of MemConfigs - using default" "interchange buffer {}".format(EXCHANGE_BUF_CONFIG) ) warnings.warn(message, DeprecationWarning) return [EXCHANGE_BUF_CONFIG]
[docs]def str_from_int_bytes(n_bytes: int, precision: int = 2) -> str: """ Automatically scale a number of bytes for printing. No decimals are shown for exact conversions, e.g. 1024 -> "1 KiB" :param n_bytes: Number of bytes :param precision: decimal places to show :return: A number and units, e.g. '1.5 KiB' """ n_bytes = int(n_bytes) unit = int(log2(abs(n_bytes))) // 10 if int(n_bytes) != 0 else 0 scaled = n_bytes / 2 ** (unit * 10) # IEC 80000 standard prefixes for multiple-byte units by powers of 1024 units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"] if 2 ** (unit * 10) * int(scaled) == n_bytes: # don't print ".0" for exact matches precision = 0 return f"{scaled:.{precision}f} {units[unit]}"
[docs]def mem_parse(memory_config: str = "") -> typing.List[MemConfig]: """ Convert a string-formatted list of memory configurations to a list of MemConfigs. The register interchange buffer is automatically included as the first element. Do not list the register buffer in the input string! :param memory_config: a colon-separated string <size><unit><s|i> size: int unit: k, M, G (powers of 1024) s: shared i: FPGA internal e.g. '128Ms:1Gi' :return: list of MemConfigs, decoded from the string """ mem = [EXCHANGE_BUF_CONFIG] memories = memory_config.split(":") for memory in memories: if len(memory) == 0: continue if memory[-1] == "s": shared = True elif memory[-1] == "i": shared = False else: continue if memory[-2] == "k": unit = 1 << 10 elif memory[-2] == "M": unit = 1 << 20 elif memory[-2] == "G": unit = 1 << 30 else: continue size = int(memory[:-2]) mem.append(MemConfig(size * unit, shared)) return mem
[docs]class ArgsFpgaDriverIface(ABC): """ Defines a pure-virtual Interface to a FPGA """ # 1st group of abstract methods to load firmware @abstractmethod def _setup(self, *args, **kwargs): """Derived class to provide initialisation code""" raise NotImplementedError @abstractmethod def _load_firmware(self): """Derived class to provide firmware loading code""" raise NotImplementedError @abstractmethod def _init_buffers(self): """Derived class to provide memory buffer initialisation code""" raise NotImplementedError # Second set of abstract methods for reading/writing registers
[docs] @abstractmethod def read(self, source, length=None): """ Read from FPGA register(s). :param source: Start address (bytes) :param length: Number of words to read """ raise NotImplementedError
[docs] @abstractmethod def write(self, destination, values): """ Write to FPGA register(s). :param destination: Start address (bytes) :param values: One or more values to write to consecutive words """ raise NotImplementedError
[docs] @abstractmethod def read_memory( self, index: int, size_bytes: int = None, offset_bytes: int = 0 ) -> np.ndarray: """ Read a shared memory buffer. :param size_bytes: number of bytes to transfer (transfers the whole buffer if not specified or None) :param offset_bytes: starting address :param index: Index of the shared buffer to save. Zero is the ARGS interchange buffer, which you probably don't want. :return: shared memory buffer """ raise NotImplementedError
[docs] @abstractmethod def write_memory( self, index: int, values: np.ndarray, offset_bytes: int = 0 ): """ Write to a shared memory buffer. :param index: Index of the shared buffer to write to. Zero is the ARGS interchange buffer, which you probably don't want. :param values: Data to write. :param offset_bytes: Byte-based offset where values should start in buffer. """ raise NotImplementedError
[docs]class FpgaRegisterError(RuntimeError): pass
[docs]class ArgsFpgaDriver(ArgsFpgaDriverIface): """ Base class for ARGS FPGA Drivers. Extends interface class with startup steps common to all ARGS FPGAs. """
[docs] def __init__( self, logger=None, mem_config: typing.Union[list, str] = "", **kwargs ): """ :param logger: a logging object with .debug, .info, etc functions :param kwargs: optional extra arguments passed on to the derived class's _setup function """ if logger is None: self.logger = logging.getLogger(self.__class__.__name__) else: self.logger = logger.getChild(self.__class__.__name__) self._mem_config = mem_config_check(mem_config) self._setup(**kwargs) self._load_firmware() # we need to init buffers before checking magic self._init_buffers() # A small sleep here may help with intermittent magic number faults? sleep(float(os.getenv("FPGA_POST_FW_LOAD_DELAY", 0))) self._check_magic()
def _check_magic(self): """ Check that FPGA contains the correct 'Magic Number'. """ magic_number = self.read(ARGS_MAGIC_ADDRESS) if not magic_number: raise FpgaRegisterError("No ARGS magic number from FPGA") if magic_number != ARGS_MAGIC: msg = "ARGS Magic Number mismatch! expected: 0x{:x}, got: 0x{:x}".format( ARGS_MAGIC, magic_number ) raise RuntimeError(msg)
[docs] def get_map_build(self): """ Get the ARGS map build timestamp from the FPGA. """ map_build = self.read(ARGS_BUILD_ADDRESS) if not map_build: raise FpgaRegisterError("No ARGS map build from FPGA") return int(map_build)