Source code for ska_low_cbf_fpga.fpga_icl

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

"""
Instrument Control Layer
"""
import dataclasses
import logging
import os
import typing
import warnings
from dataclasses import dataclass, field

import numpy as np
from packaging import version
from packaging.specifiers import SpecifierSet

from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsFpgaDriver, ArgsWordType
from ska_low_cbf_fpga.args_map import ArgsFieldInfo, ArgsMap
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo

T = typing.TypeVar("T")


[docs]@dataclass class IclField(ArgsFieldInfo, typing.Generic[T]): """ICL field, probably derived from one or more :py:class:`IclFpgaField` s""" # user configuration type_: type = field(default=T) format: str = field(default="%i") user_write: bool = True """ Should the control system allow writing to this field? Note: not enforced at ICL! """ user_error: bool = False """ Should the control system treat a non-zero value as an error/alarm? """ # dynamic value: T = field(default=None) # note: ARGS map fields inherited from ArgsFieldInfo def __int__(self): return int(self.value) def __bool__(self): return bool(self.value) # operators def __add__(self, other): if isinstance(other, IclField): return self.value + other.type_(other) return self.value + other def __radd__(self, other): return self.__add__(other) def __sub__(self, other): if isinstance(other, IclField): return self.value - other.type_(other) return self.value - other def __rsub__(self, other): if isinstance(other, IclField): return other.type_(other) - self.value return other - self.value def __mul__(self, other): if isinstance(other, IclField): return self.value * other.type_(other) return self.value * other def __rmul__(self, other): return self.__mul__(other) def __pow__(self, other): if isinstance(other, IclField): return self.value ** other.type_(other) return self.value**other def __rpow__(self, other): if isinstance(other, IclField): return other.type_(other) ** self.value return other**self.value def __truediv__(self, other): if isinstance(other, IclField): return self.value / other.type_(other) return self.value / other def __rtruediv__(self, other): if isinstance(other, IclField): return other.type_(other) / self.value return other / self.value def __floordiv__(self, other): if isinstance(other, IclField): return self.value // other.type_(other) return self.value // other def __rfloordiv__(self, other): if isinstance(other, IclField): return other.type_(other) // self.value return other // self.value def __mod__(self, other): if isinstance(other, IclField): return self.value % other.type_(other) return self.value % other def __rmod__(self, other): if isinstance(other, IclField): return other.type_(other) % self.value return other % self.value def __lshift__(self, other): if isinstance(other, IclField): return self.value << other.type_(other) return self.value << other def __rlshift__(self, other): if isinstance(other, IclField): return other.type_(other) << self.value return other << self.value def __rshift__(self, other): if isinstance(other, IclField): return self.value >> other.type_(other) return self.value >> other def __rrshift__(self, other): if isinstance(other, IclField): return other.type_(other) >> self.value return other >> self.value def __and__(self, other): if isinstance(other, IclField): return self.value & other.type_(other) return self.value & other def __rand__(self, other): return self.__and__(other) def __or__(self, other): if isinstance(other, IclField): return self.value | other.type_(other) return self.value | other def __ror__(self, other): return self.__or__(other) def __xor__(self, other): if isinstance(other, IclField): return self.value ^ other.type_(other) return self.value ^ other def __rxor__(self, other): return self.__xor__(other) def __invert__(self): return ~self.value # comparisons def __lt__(self, other): if isinstance(other, IclField): return self.value < other.type_(other) return self.value < other def __le__(self, other): if isinstance(other, IclField): return self.value <= other.type_(other) return self.value <= other def __eq__(self, other): if isinstance(other, IclField): return self.value == other.type_(other) return self.value == other def __ne__(self, other): if isinstance(other, IclField): return self.value != other.type_(other) return self.value != other def __gt__(self, other): if isinstance(other, IclField): return self.value > other.type_(other) return self.value > other def __ge__(self, other): if isinstance(other, IclField): return self.value >= other.type_(other) return self.value >= other
# The eq and order parameters are False to stop the dataclass decorator from # overriding the operators defined in IclField
[docs]@dataclass(eq=False, order=False) class IclFpgaField(IclField[ArgsWordType]): """An ICL field that is linked to an :py:class:`ArgsFpgaDriver` .""" # configuration driver: ArgsFpgaDriver = None type_: type = ArgsWordType # dynamic value: ArgsWordType = field(init=False) @property def value(self) -> typing.Union[int, np.ndarray, bool]: """Read current value from FPGA interface""" val = self.driver.read(self.address, self.length) if type(val) is int: # Single word length - apply bitoffset, then width masking if specified if self.bit_offset: val = val >> self.bit_offset if self.width: mask = (1 << self.width) - 1 val &= mask if self.width == 1: # single bit returned as bool return bool(val) return val if type(val) is np.ndarray: # do nothing with arrays return val @value.setter def value( self, value: typing.Union[int, bool, np.integer, np.ndarray, IclField] ): """Write new value via FPGA interface""" # on initialisation, the property object is passed to the setter # we don't want to send this to the FPGA # (an alternative could be to set an "init_complete" bool in __post_init__) if type(value) is not property: if isinstance(value, IclField): # extract register value from IclField objects value = value.value if type(value) is np.ndarray: self.driver.write(self.address, value) return elif type(value) in [int, bool] or isinstance(value, np.integer): # FIXME: ideally we would lock the driver for this... # but the driver controls the lock internally so that's a job for later # if value is bool, convert to int value = int(value) bit_width = self.width or (WORD_SIZE * 8) # If updating partial register, we need to read/modify/write if bit_width != (WORD_SIZE * 8): inital_value = self.driver.read(self.address, self.length) bit_offset = self.bit_offset or 0 mask = ((1 << bit_width) - 1) << bit_offset masked_initial_value = inital_value & ~mask masked_value = (value << bit_offset) & mask value = masked_initial_value | masked_value self.driver.write(self.address, value) return else: raise TypeError( f"{self.__class__.__name__} does not know how " f"to set a value of type {type(value)}" )
[docs] def __getitem__(self, item: typing.Union[int, slice]) -> ArgsWordType: """Array access for multi-word values""" read_length = 1 if isinstance(item, slice): if item.step is not None and item.step != 1: raise NotImplementedError("only step size 1 supported") end = self.length if item.stop is not None: end = self._index(item.stop) self._check_index(end - 1) start = self._index(item.start or 0) # copying list/numpy array behaviour for e.g. list[-2:-4] read_length = max(end - start, 0) else: start = self._index(item) self._check_index(start) return self.driver.read(self.address + start * WORD_SIZE, read_length)
[docs] def __setitem__(self, key: typing.Union[int, slice], value: ArgsWordType): """ Array index style access for setting part of a multi-word value. e.g. my_field[4] = 32 my_field[3:6] = np.ones(3, dtype=ArgsWordType) If using a slice, value must be an array i.e. Don't do this: my_field[3:4] = 5. Instead, use my_field[3] = 5 or even my_field[3:4] = np.array([5]) """ if isinstance(key, slice): start = self._index(key.start or 0) if key.step is not None and key.step != 1: raise NotImplementedError("only step size 1 supported") if key.stop is not None: end = self._index(key.stop) slice_len = end - start else: slice_len = self.length - start if len(value) != slice_len: raise ValueError( f"value length {len(value)} is not equal " f"to slice length {slice_len}" ) else: start = self._index(key) self._check_index(start) self.driver.write(self.address + start * WORD_SIZE, value)
def _check_index(self, index: int): """Check if an element index is within our length""" if index >= self.length: raise IndexError("index out of range") def _index(self, index: int): """Transform a possibly-negative index""" if index < 0: self._check_index(abs(index) - 1) return self.length + index return index
[docs]class FpgaUserInterface: """ A common interface used by :py:class:`FpgaPeripheral` and :py:class:`FpgaPersonality` for exposing attributes and methods to the control system. """ _user_attributes = set() """ Attributes to be exposed to the control system. Defaults to properties (if configured), or FPGA registers. Set to None if you really want no attributes exposed. """ _not_user_attributes = set() """Attribute names to exclude from auto-discovery for the control system.""" _admin_attributes = {"user_attributes", "user_methods"} """Names of attributes that should never be listed in user_attributes""" _user_methods = set() """Methods to be exposed to the control system.""" _not_user_methods = set() """Method names to exclude from auto-discovery for the control system."""
[docs] def __init__(self): # we do not want object instances to modify the base class values! if self._user_attributes is FpgaUserInterface._user_attributes: self._user_attributes = set(self._user_attributes) if self._not_user_attributes is FpgaUserInterface._not_user_attributes: self._not_user_attributes = set(self._not_user_attributes) if self._user_methods is FpgaUserInterface._user_methods: self._user_methods = set(self._user_methods) if self._not_user_methods is FpgaUserInterface._not_user_methods: self._not_user_methods = set(self._not_user_methods) if self._admin_attributes is FpgaUserInterface._admin_attributes: self._admin_attributes = set(self._admin_attributes) # NOTE: don't use os.getenv() below - it won't reflect the variable # value changes (e.g. during unit test) value = os.environ.get("DEBUG_FPGA", "no") self._expose_all = value.lower() in ( "true", "yes", "on", "1", )
@property def user_attributes(self) -> typing.Set[str]: """ Attributes to be exposed to the control system. """ return self._user_attributes @property def user_methods(self) -> typing.Set[str]: """Methods to be exposed to the control system.""" return self._user_methods def _discover_attributes(self): """Collect object's user attributes - unless it's None""" if self._user_attributes is None: self._user_attributes = set() if not self._expose_all: return assert type(self._user_attributes) == set # if user attributes is empty, or the user requested it, # expose all properties if self._discover_properties: properties = { attr for attr in dir(self) if isinstance(getattr(self.__class__, attr, None), property) and not attr.startswith("_") and attr not in self._admin_attributes } self._user_attributes |= set(properties) # if user attributes is still empty, or the user requested it, # add all registers as read-only if isinstance(self, FpgaPeripheral) and self._discover_registers: self._user_attributes |= set(self._fields.keys()) if not self._expose_all: # remove anything that we've been told to explicitly exclude self._user_attributes -= self._not_user_attributes # remove any special configuration flags for magic_value in MAGIC_USER_ATTRIBUTES: if magic_value in self._user_attributes: self._user_attributes.remove(magic_value) @property def _discover_properties(self) -> bool: """Return True if properties should be exposed, False otherwise""" return ( self._expose_all or len(self._user_attributes) == 0 or DISCOVER_PROPERTIES in self._user_attributes or DISCOVER_ALL in self._user_attributes ) @property def _discover_registers(self) -> bool: """return True in case user attributes is still empty or the user requested registers""" return ( self._expose_all or not self._user_attributes or DISCOVER_REGISTERS in self._user_attributes or DISCOVER_ALL in self._user_attributes ) def _discover_methods(self): """Collect our (public) method names. Exclude method names listed in _not_user_methods.""" if self._user_methods is None: self._user_methods = set() if not self._expose_all: return # discover methods if not already explicitly configured, # or if we're exposing everything for debugging if not self._user_methods or self._expose_all: self._user_methods = { name for name in dir(self) # we can't use inspect.getmethods here if isinstance( getattr(self.__class__, name, None), typing.Callable ) and not name.startswith("_") } if not self._expose_all: # filter out configured exclusions self._user_methods = { name for name in self._user_methods if name not in self._not_user_methods }
DISCOVER_REGISTERS = "__discover_registers__" DISCOVER_PROPERTIES = "__discover_properties__" DISCOVER_ALL = "__discover_all__" MAGIC_USER_ATTRIBUTES = [DISCOVER_ALL, DISCOVER_PROPERTIES, DISCOVER_REGISTERS] # "forward declaration" of sorts - there's circular reference between # FpgaPeripheral and FpgaPersonality # (the Peripheral stores a reference to the Personality for convenience) FpgaPersonality = typing.NewType("FpgaPersonality", FpgaUserInterface)
[docs]class FpgaPeripheral(FpgaUserInterface): _field_config = {} """ Configuration values for fields read from FPGA. e.g. to indicate to the control system that a field should be treated as an error condition: .. code-block:: python _field_config = {"example": IclFpgaField(user_error=True)} """
[docs] def __init__( self, driver: ArgsFpgaDriver, map_field_info: typing.Dict[str, ArgsFieldInfo], fpga_personality: FpgaPersonality = None, ): """ Base class that provides functions common to all FPGA Peripherals. Creates an instance of :py:class:`IclFpgaField` for each field defined in the map. Populates :py:attr:`_user_attributes` if not set by derived class. :param driver: ArgsFpgaDriver instance, used to access FPGA :param map_field_info: Most likely a portion of an ArgsMap. :param fpga_personality: personality applicable to this peripheral """ super().__init__() self._fields = {} # configure fields 1st - they may be used in exposed attributes for name, info in map_field_info.items(): # start with info from map field_params = dataclasses.asdict(info) if name in self._field_config: # allow user config to replace info from map # don't try to read the value property (if present) as it will crash # filter out any 'None' values, so they don't overwrite defaults user_config = { field_.name: getattr(self._field_config[name], field_.name) for field_ in dataclasses.fields(self._field_config[name]) if field_.name != "value" and getattr(self._field_config[name], field_.name) is not None } field_params.update(user_config) # link to driver field_params["driver"] = driver # create field object self._fields[name] = IclFpgaField(**field_params) # use FpgaPersonality's logger - if available; we also *may* need # access to personality itself, so store a reference self._logger = ( fpga_personality._logger.getChild(self.__class__.__name__) if fpga_personality else logging.getLogger(self.__class__.__name__) ) self._personality = fpga_personality # discover things to expose via the control system self._discover_attributes() self._discover_methods() self._init_complete = True
def __dir__(self) -> list: """Add our list of registers to the default directory""" return list(super().__dir__()) + list(self._fields.keys())
[docs] def __getattr__(self, name: str) -> IclFpgaField: """Attribute access to fields""" # TODO - is there a way to inspect the AttributeError that resulted in this # function being called? It can mask some errors!! if name in self._fields.keys(): return self._fields[name] else: if name in self.__dir__(): raise AttributeError( f"AttributeError occurred reading " f"{self.__class__.__name__}.{name}. Good luck!" ) else: raise AttributeError( f"{self.__class__.__name__} has no attribute {name}" )
[docs] def __setattr__(self, key: str, value): """Pass on attribute writes to the field, where appropriate""" if "_init_complete" in self.__dict__ and key in self._fields: self._fields[key].value = value else: super().__setattr__(key, value)
[docs] def __getitem__(self, item: str) -> IclFpgaField: """Index style access to fields""" return self._fields[item]
[docs] def __setitem__(self, key: str, value): """Pass on writes to the field""" self._fields[key].value = value
[docs]class FpgaPersonality(FpgaUserInterface): _peripheral_class = {} """ Dict that maps from peripheral name (str) to class to be used. If a peripheral is not listed in peripheral_class, a default FpgaPeripheral object will be created. e.g. _peripheral_class = {"packetiser": Packetiser} """ _admin_attributes = FpgaUserInterface._admin_attributes | { "peripherals", "driver", "default_interface", "info", "read_memory", "write_memory", } """Names of attributes that should never be exposed to the control system"""
[docs] def __init__( self, driver: ArgsFpgaDriver, map_: ArgsMap, hardware_info: FpgaHardwareInfo = None, logger: logging.Logger = None, ): """ Base class that provides functions common to all FPGA Personalities. Creates FpgaPeripheral objects for each peripheral. :param driver: ArgsFpgaDriver instance, used to access FPGA :param map_: ARGS peripheral/register map. :param hardware_info: Hardware monitoring interface (XrtInfo). :param logger: Logger to report log messages to. Passed to peripherals. """ super().__init__() if logger is None: self._logger = logging.getLogger(self.__class__.__name__) else: self._logger = logger.getChild(self.__class__.__name__) self._logger.debug("FPGA Personality init") self._peripherals = {} self._driver = driver # Note this is optional, downstream must allow for info=None self.info = hardware_info # create Peripherals for peripheral_name in map_.keys(): if peripheral_name in self._peripherals: raise NotImplementedError( "Can't do multiple instances of a peripheral" ) try: class_ = self._peripheral_class[peripheral_name] except KeyError: # default to generic class class_ = FpgaPeripheral peripheral = class_( driver, map_[peripheral_name], fpga_personality=self ) self._logger.info( f"Created peripheral {peripheral_name} using class {class_.__name__}" ) self._peripherals[peripheral_name] = peripheral # discover things to expose via the control system self._discover_attributes() self._discover_methods()
def __del__(self): "Cleanup. XRT driver may hold an Alveo card lock" if self.info: del self.info del self._driver # NOTE: all the derived classes should call this manually - it # wan't be invoked automatically (unlike C++ dtor) # see https://docs.python.org/3/reference/datamodel.html#object.__del__ # for details: # "If a base class has a __del__() method, the derived class’s # __del__() method, if any, must explicitly call it to ensure proper # deletion of the base class part of the instance." @property def driver(self) -> ArgsFpgaDriver: """Access to underlying ArgsFpgaDriver""" return self._driver @property def read_memory(self) -> typing.Callable: """Access to read_memory function of underlying ArgsFpgaDriver""" return self._driver.read_memory @property def write_memory(self) -> typing.Callable: """Access to write_memory function of underlying ArgsFpgaDriver""" return self._driver.write_memory @property def default_interface(self) -> ArgsFpgaDriver: """Temporary backwards-compatibility layer""" warnings.warn( "Please use 'driver' instead of 'default_interface'", DeprecationWarning, ) return self._driver
[docs] def __getattr__(self, name: str): """Access peripherals by attribute syntax""" # TODO - is there a way to inspect the AttributeError that resulted in this # function being called? It can mask some errors!! if name in self._peripherals: return self._peripherals[name] else: if name in self.__dir__(): raise AttributeError( f"AttributeError occurred reading " f"{self.__class__.__name__}.{name}. Good luck!" ) else: raise AttributeError( f"{self.__class__.__name__} has no peripheral '{name}'" )
def __dir__(self): """Add our list of peripherals to the default directory""" return list(super().__dir__()) + self.peripherals
[docs] def __getitem__(self, item: str) -> FpgaPeripheral: """Index style access to peripheral objects""" return self._peripherals[item]
@property def peripherals(self) -> typing.List[str]: """ :return: List of peripheral names """ return list(self._peripherals.keys()) def _check_fw(self, personality: str, version_spec: str) -> None: """ Check the FPGA firmware is the right personality & version. NOTE: We deviate from PEP 440 in ACCEPTING dev versions. If version spec is ">=0.1.2" and firmware is "0.1.2-dev.1234", we will accept this because we often want to use the newly added registers to debug dev versions of firmware. (PEP 440 says that "0.1.2-dev.1234" does not satisfy ">=0.1.2") :param personality: 4-character personality code :param version_spec: version specification string (e.g. "~=1.2.3") See PEP 440 for details. (~= means major must match, minor/patch must be >= specified) :raises: RuntimeError if requirements not met """ self._logger.debug( f"Checking for {personality} personality, version {version_spec}" ) actual_personality = self.fw_personality.value if actual_personality != personality: int_required = int.from_bytes( personality.encode(encoding="ascii"), "big" ) raise RuntimeError( f"Wrong firmware personality: {actual_personality} " f"(0x{self.system.firmware_personality.value:x})" f". Expected: {personality} (0x{int_required:x})." ) # Convert '-' to '+' as a way to nudge SpecifierSet.contains() # i.e. if version_spec is ">=0.1.2": # - contains("0.1.2-dev") => False # - contains("0.1.2+dev") => True (our desired behaviour) actual_version = self.fw_version.value.replace("-", "+") spec = SpecifierSet(version_spec) if not spec.contains(version.parse(actual_version)): raise RuntimeError( f"Wrong firmware version: {actual_version}. Expected: {version_spec}" ) @property def fw_version(self) -> IclField[str]: """ Get the FPGA Firmware Version. :returns: version string formatted like the firmware packages: ``<major>.<minor>.<patch>[-<dev|main|pre|sim>.<commit hash>]`` "sim" indicates build type was zero (probably we are in simulation) """ fw_ver = ( f"{self.system.firmware_major_version.value}." f"{self.system.firmware_minor_version.value}." f"{self.system.firmware_patch_version.value}" ) if hasattr(self.system, "build_type"): # build_type register was added to firmware in Sep 2023 build_type = ( int.to_bytes(self.system.build_type.value, WORD_SIZE, "big") .decode(encoding="ascii") .lower() .strip() ) if build_type == "\x00\x00\x00\x00": # fpgamap file does not contain build type or commit hash values, # so we inject a dummy build type instead build_type = "sim" # For official releases, we do not append anything. # Other packages should have the type (dev/main/pre) and hash added. # We use a plus sign for PEP 440 compliance, so _check_fw will work. if build_type != "rel": # Note the minus sign here is swapped to a plus for internal comparison # in _check_fw. Be aware if appending further suffixes. fw_ver += ( f"-{build_type}.{self.system.commit_short_hash.value:08x}" ) return IclField( description="Firmware Version", format="%s", type_=str, value=fw_ver, user_error=False, user_write=False, ) @property def fw_personality(self) -> IclField[str]: """ Get the FPGA Firmware personality, decoded to a string """ personality = int.to_bytes( self.system.firmware_personality.value, WORD_SIZE, "big" ).decode(encoding="ascii") return IclField( description="Firmware Personality", format="%s", type_=str, value=personality, user_error=False, user_write=False, )