Source code for ska_low_cbf_fpga.fpga_icl

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

"""
Instrument Control Layer
"""
import dataclasses
import logging
import os
import typing
import warnings

from packaging import version
from packaging.specifiers import SpecifierSet

from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsFpgaDriver
from ska_low_cbf_fpga.args_map import ArgsFieldInfo, ArgsMap
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo
from ska_low_cbf_fpga.icl_field import IclField, IclFpgaField


[docs] class FpgaUserInterface: """ A common interface used by :py:class:`FpgaPeripheral` and :py:class:`FpgaPersonality` for exposing attributes and methods to the control system. """ _user_attributes = set() """ Attributes to be exposed to the control system. Defaults to properties (if configured), or FPGA registers if no properties defined. Set to ``None`` if you really want no attributes exposed. """ _not_user_attributes = set() """Attribute names to exclude from auto-discovery for the control system.""" _admin_attributes = {"user_attributes", "user_methods"} """Names of attributes that should never be listed in user_attributes""" _user_methods = set() """ Methods to be exposed to the control system. Set to ``None`` if you want no methods exposed. """ _not_user_methods = set() """Method names to exclude from auto-discovery for the control system."""
[docs] def __init__(self): # we do not want object instances to modify the base class values, # so we copy them to convert to instance variables if self._user_attributes is not None: self._user_attributes = set(self._user_attributes) if self._user_methods is not None: self._user_methods = set(self._user_methods) self._not_user_attributes = set(self._not_user_attributes) self._not_user_methods = set(self._not_user_methods) self._admin_attributes = set(self._admin_attributes) # NOTE: don't use os.getenv() below - it won't reflect the variable # value changes (e.g. during unit test) value = os.environ.get("DEBUG_FPGA", "no") self._expose_all = value.lower() in ( "true", "yes", "on", "1", )
@property def user_attributes(self) -> typing.Set[str]: """ Attributes to be exposed to the control system. """ return self._user_attributes @property def user_methods(self) -> typing.Set[str]: """Methods to be exposed to the control system.""" return self._user_methods def _discover_attributes(self): """Collect object's user attributes - unless it's None""" if self._user_attributes is None: self._user_attributes = set() if not self._expose_all: return if not isinstance(self._user_attributes, set): raise TypeError("_user_attributes must be a set") # if user attributes is empty, or the user requested it, # expose all properties if self._discover_properties: properties = { attr for attr in dir(self) if isinstance(getattr(self.__class__, attr, None), property) and not attr.startswith("_") and attr not in self._admin_attributes } self._user_attributes |= set(properties) # if user attributes is still empty, or the user requested it, # add all registers as read-only if isinstance(self, FpgaPeripheral) and self._discover_registers: self._user_attributes |= set(self._fields.keys()) if not self._expose_all: # remove anything that we've been told to explicitly exclude self._user_attributes -= self._not_user_attributes # remove any special configuration flags for magic_value in MAGIC_USER_ATTRIBUTES: if magic_value in self._user_attributes: self._user_attributes.remove(magic_value) @property def _discover_properties(self) -> bool: """Return True if properties should be exposed, False otherwise""" return ( self._expose_all or len(self._user_attributes) == 0 or DISCOVER_PROPERTIES in self._user_attributes or DISCOVER_ALL in self._user_attributes ) @property def _discover_registers(self) -> bool: """return True in case user attributes is still empty or the user requested registers""" return ( self._expose_all or not self._user_attributes or DISCOVER_REGISTERS in self._user_attributes or DISCOVER_ALL in self._user_attributes ) def _discover_methods(self): """Collect our (public) method names. Exclude method names listed in _not_user_methods.""" if self._user_methods is None: self._user_methods = set() if not self._expose_all: return # discover methods if not already explicitly configured, # or if we're exposing everything for debugging if not self._user_methods or self._expose_all: self._user_methods = { name for name in dir(self) # we can't use inspect.getmethods here if isinstance(getattr(self.__class__, name, None), typing.Callable) and not name.startswith("_") } if not self._expose_all: # filter out configured exclusions self._user_methods = { name for name in self._user_methods if name not in self._not_user_methods }
DISCOVER_REGISTERS = "__discover_registers__" DISCOVER_PROPERTIES = "__discover_properties__" DISCOVER_ALL = "__discover_all__" MAGIC_USER_ATTRIBUTES = [DISCOVER_ALL, DISCOVER_PROPERTIES, DISCOVER_REGISTERS] # "forward declaration" of sorts - there's circular reference between # FpgaPeripheral and FpgaPersonality # (the Peripheral stores a reference to the Personality for convenience) FpgaPersonality = typing.NewType("FpgaPersonality", FpgaUserInterface)
[docs] class FpgaPeripheral(FpgaUserInterface): _field_config = {} """ Configuration values for fields read from FPGA. e.g. to indicate to the control system that a field should be treated as an error condition: .. code-block:: python _field_config = {"example": IclFpgaField(user_error=True)} """
[docs] def __init__( self, driver: ArgsFpgaDriver, map_field_info: typing.Dict[str, ArgsFieldInfo], fpga_personality: FpgaPersonality = None, ): """ Base class that provides functions common to all FPGA Peripherals. Creates an instance of :py:class:`IclFpgaField` for each field defined in the map. Populates :py:attr:`_user_attributes` if not set by derived class. :param driver: ArgsFpgaDriver instance, used to access FPGA :param map_field_info: Most likely a portion of an ArgsMap. :param fpga_personality: personality applicable to this peripheral """ super().__init__() self._fields = {} # configure fields 1st - they may be used in exposed attributes for name, info in map_field_info.items(): # start with info from map field_params = dataclasses.asdict(info) if name in self._field_config: # allow user config to replace info from map # don't try to read the value property (if present) as it will crash # filter out any 'None' values, so they don't overwrite defaults user_config = { field_.name: getattr(self._field_config[name], field_.name) for field_ in dataclasses.fields(self._field_config[name]) if field_.name != "value" and getattr(self._field_config[name], field_.name) is not None } field_params.update(user_config) # link to driver field_params["driver"] = driver # create field object self._fields[name] = IclFpgaField(**field_params) # use FpgaPersonality's logger - if available; we also *may* need # access to personality itself, so store a reference self._logger = ( fpga_personality._logger.getChild(self.__class__.__name__) if fpga_personality else logging.getLogger(self.__class__.__name__) ) self._personality = fpga_personality # discover things to expose via the control system self._discover_attributes() self._discover_methods() self._init_complete = True
def __dir__(self) -> list: """Add our list of registers to the default directory""" return list(super().__dir__()) + list(self._fields.keys())
[docs] def __getattr__(self, name: str) -> IclFpgaField: """Attribute access to fields""" # TODO - is there a way to inspect the AttributeError that resulted in this # function being called? It can mask some errors!! if name in self._fields.keys(): return self._fields[name] else: if name in self.__dir__(): raise AttributeError( f"AttributeError occurred reading " f"{self.__class__.__name__}.{name}. Good luck!" ) else: raise AttributeError( f"{self.__class__.__name__} has no attribute {name}" )
[docs] def __setattr__(self, key: str, value): """Pass on attribute writes to the field, where appropriate""" if "_init_complete" in self.__dict__ and key in self._fields: # FpgaPeripheral __init__ is finished, key is an FPGA field, write to FPGA self._fields[key].value = value return if not key.startswith("_") and key not in dir(self): self._logger.warning( f"attr '{key}' is not an FPGA field name or peripheral attribute, " "so a new Python instance variable will be created. " "Prefix variable names with _ to suppress this message." ) super().__setattr__(key, value)
[docs] def __getitem__(self, item: str) -> IclFpgaField: """Index style access to fields""" return self._fields[item]
[docs] def __setitem__(self, key: str, value): """Pass on writes to the field""" self._fields[key].value = value
[docs] class FpgaPersonality(FpgaUserInterface): _peripheral_class = {} """ Dict that maps from peripheral name (str) to class to be used. If a peripheral is not listed in peripheral_class, a default FpgaPeripheral object will be created. e.g. _peripheral_class = {"packetiser": Packetiser} """ _admin_attributes = FpgaUserInterface._admin_attributes | { "peripherals", "driver", "default_interface", "info", "read_memory", "write_memory", } """Names of attributes that should never be exposed to the control system"""
[docs] def __init__( self, driver: ArgsFpgaDriver, map_: ArgsMap, hardware_info: FpgaHardwareInfo = None, logger: logging.Logger = None, ): """ Base class that provides functions common to all FPGA Personalities. Creates FpgaPeripheral objects for each peripheral. :param driver: ArgsFpgaDriver instance, used to access FPGA :param map_: ARGS peripheral/register map. :param hardware_info: Hardware monitoring interface (XrtInfo). :param logger: Logger to report log messages to. Passed to peripherals. """ super().__init__() if logger is None: self._logger = logging.getLogger(self.__class__.__name__) else: self._logger = logger.getChild(self.__class__.__name__) self._logger.debug("FPGA Personality init") self._peripherals = {} self._driver = driver # Note this is optional, downstream must allow for info=None self.info = hardware_info # create Peripherals for peripheral_name in map_.keys(): if peripheral_name in self._peripherals: raise NotImplementedError("Can't do multiple instances of a peripheral") try: class_ = self._peripheral_class[peripheral_name] except KeyError: # default to generic class class_ = FpgaPeripheral peripheral = class_(driver, map_[peripheral_name], fpga_personality=self) self._logger.info( f"Created peripheral {peripheral_name} using class {class_.__name__}" ) self._peripherals[peripheral_name] = peripheral # discover things to expose via the control system self._discover_attributes() self._discover_methods()
def __del__(self): "Cleanup. XRT driver may hold an Alveo card lock" if self.info: del self.info del self._driver # NOTE: all the derived classes should call this manually - it # wan't be invoked automatically (unlike C++ dtor) # see https://docs.python.org/3/reference/datamodel.html#object.__del__ # for details: # "If a base class has a __del__() method, the derived class’s # __del__() method, if any, must explicitly call it to ensure proper # deletion of the base class part of the instance." @property def driver(self) -> ArgsFpgaDriver: """Access to underlying ArgsFpgaDriver""" return self._driver @property def read_memory(self) -> typing.Callable: """Access to read_memory function of underlying ArgsFpgaDriver""" return self._driver.read_memory @property def write_memory(self) -> typing.Callable: """Access to write_memory function of underlying ArgsFpgaDriver""" return self._driver.write_memory @property def default_interface(self) -> ArgsFpgaDriver: """Temporary backwards-compatibility layer""" warnings.warn( "Please use 'driver' instead of 'default_interface'", DeprecationWarning, ) return self._driver
[docs] def __getattr__(self, name: str): """Access peripherals by attribute syntax""" # TODO - is there a way to inspect the AttributeError that resulted in this # function being called? It can mask some errors!! if name in self._peripherals: return self._peripherals[name] else: if name in self.__dir__(): raise AttributeError( f"AttributeError occurred reading " f"{self.__class__.__name__}.{name}. Good luck!" ) else: raise AttributeError( f"{self.__class__.__name__} has no peripheral '{name}'" )
def __dir__(self): """Add our list of peripherals to the default directory""" return list(super().__dir__()) + self.peripherals
[docs] def __getitem__(self, item: str) -> FpgaPeripheral: """Index style access to peripheral objects""" return self._peripherals[item]
@property def peripherals(self) -> typing.List[str]: """ :return: List of peripheral names """ return list(self._peripherals.keys()) def _check_fw(self, personality: str, version_spec: str) -> None: """ Check the FPGA firmware is the right personality & version. NOTE: We deviate from PEP 440 in ACCEPTING dev versions. If version spec is ">=0.1.2" and firmware is "0.1.2-dev.1234", we will accept this because we often want to use the newly added registers to debug dev versions of firmware. (PEP 440 says that "0.1.2-dev.1234" does not satisfy ">=0.1.2") :param personality: 4-character personality code :param version_spec: version specification string (e.g. "~=1.2.3") See PEP 440 for details. (~= means major must match, minor/patch must be >= specified) :raises: RuntimeError if requirements not met """ self._logger.debug( f"Checking for {personality} personality, version {version_spec}" ) actual_personality = self.fw_personality.value if actual_personality != personality: int_required = int.from_bytes(personality.encode(encoding="ascii"), "big") raise RuntimeError( f"Wrong firmware personality: {actual_personality} " f"(0x{self.system.firmware_personality.value:x})" f". Expected: {personality} (0x{int_required:x})." ) # Convert '-' to '+' as a way to nudge SpecifierSet.contains() # i.e. if version_spec is ">=0.1.2": # - contains("0.1.2-dev") => False # - contains("0.1.2+dev") => True (our desired behaviour) actual_version = self.fw_version.value.replace("-", "+") spec = SpecifierSet(version_spec) if not spec.contains(version.parse(actual_version)): raise RuntimeError( f"Wrong firmware version: {actual_version}. Expected: {version_spec}" ) @property def fw_version(self) -> IclField[str]: """ Get the FPGA Firmware Version. :returns: version string formatted like the firmware packages: ``<major>.<minor>.<patch>[-<dev|main|pre|sim>.<commit hash>]`` "sim" indicates build type was zero (probably we are in simulation) """ fw_ver = ( f"{self.system.firmware_major_version.value}." f"{self.system.firmware_minor_version.value}." f"{self.system.firmware_patch_version.value}" ) if hasattr(self.system, "build_type"): # build_type register was added to firmware in Sep 2023 build_type = ( int.to_bytes(self.system.build_type.value, WORD_SIZE, "big") .decode(encoding="ascii") .lower() .strip() ) if build_type == "\x00\x00\x00\x00": # fpgamap file does not contain build type or commit hash values, # so we inject a dummy build type instead build_type = "sim" # For official releases, we do not append anything. # Other packages should have the type (dev/main/pre) and hash added. # We use a plus sign for PEP 440 compliance, so _check_fw will work. if build_type != "rel": # Note the minus sign here is swapped to a plus for internal comparison # in _check_fw. Be aware if appending further suffixes. fw_ver += f"-{build_type}.{self.system.commit_short_hash.value:08x}" return IclField( description="Firmware Version", format="%s", type_=str, value=fw_ver, user_error=False, user_write=False, ) @property def fw_personality(self) -> IclField[str]: """ Get the FPGA Firmware personality, decoded to a string """ personality = int.to_bytes( self.system.firmware_personality.value, WORD_SIZE, "big" ).decode(encoding="ascii") return IclField( description="Firmware Personality", format="%s", type_=str, value=personality, user_error=False, user_write=False, )