# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Instrument Control Layer
"""
import dataclasses
import logging
import os
import typing
import warnings
from packaging import version
from packaging.specifiers import SpecifierSet
from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsFpgaDriver
from ska_low_cbf_fpga.args_map import ArgsFieldInfo, ArgsMap
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo
from ska_low_cbf_fpga.icl_field import IclField, IclFpgaField
[docs]
class FpgaUserInterface:
"""
A common interface used by :py:class:`FpgaPeripheral` and
:py:class:`FpgaPersonality` for exposing attributes and methods to the control
system.
"""
_user_attributes = set()
"""
Attributes to be exposed to the control system.
Defaults to properties (if configured), or FPGA registers if no properties defined.
Set to ``None`` if you really want no attributes exposed.
"""
_not_user_attributes = set()
"""Attribute names to exclude from auto-discovery for the control system."""
_admin_attributes = {"user_attributes", "user_methods"}
"""Names of attributes that should never be listed in user_attributes"""
_user_methods = set()
"""
Methods to be exposed to the control system.
Set to ``None`` if you want no methods exposed.
"""
_not_user_methods = set()
"""Method names to exclude from auto-discovery for the control system."""
[docs]
def __init__(self):
# we do not want object instances to modify the base class values,
# so we copy them to convert to instance variables
if self._user_attributes is not None:
self._user_attributes = set(self._user_attributes)
if self._user_methods is not None:
self._user_methods = set(self._user_methods)
self._not_user_attributes = set(self._not_user_attributes)
self._not_user_methods = set(self._not_user_methods)
self._admin_attributes = set(self._admin_attributes)
# NOTE: don't use os.getenv() below - it won't reflect the variable
# value changes (e.g. during unit test)
value = os.environ.get("DEBUG_FPGA", "no")
self._expose_all = value.lower() in (
"true",
"yes",
"on",
"1",
)
@property
def user_attributes(self) -> typing.Set[str]:
"""
Attributes to be exposed to the control system.
"""
return self._user_attributes
@property
def user_methods(self) -> typing.Set[str]:
"""Methods to be exposed to the control system."""
return self._user_methods
def _discover_attributes(self):
"""Collect object's user attributes - unless it's None"""
if self._user_attributes is None:
self._user_attributes = set()
if not self._expose_all:
return
if not isinstance(self._user_attributes, set):
raise TypeError("_user_attributes must be a set")
# if user attributes is empty, or the user requested it,
# expose all properties
if self._discover_properties:
properties = {
attr
for attr in dir(self)
if isinstance(getattr(self.__class__, attr, None), property)
and not attr.startswith("_")
and attr not in self._admin_attributes
}
self._user_attributes |= set(properties)
# if user attributes is still empty, or the user requested it,
# add all registers as read-only
if isinstance(self, FpgaPeripheral) and self._discover_registers:
self._user_attributes |= set(self._fields.keys())
if not self._expose_all:
# remove anything that we've been told to explicitly exclude
self._user_attributes -= self._not_user_attributes
# remove any special configuration flags
for magic_value in MAGIC_USER_ATTRIBUTES:
if magic_value in self._user_attributes:
self._user_attributes.remove(magic_value)
@property
def _discover_properties(self) -> bool:
"""Return True if properties should be exposed, False otherwise"""
return (
self._expose_all
or len(self._user_attributes) == 0
or DISCOVER_PROPERTIES in self._user_attributes
or DISCOVER_ALL in self._user_attributes
)
@property
def _discover_registers(self) -> bool:
"""return True in case user attributes is still empty
or the user requested registers"""
return (
self._expose_all
or not self._user_attributes
or DISCOVER_REGISTERS in self._user_attributes
or DISCOVER_ALL in self._user_attributes
)
def _discover_methods(self):
"""Collect our (public) method names.
Exclude method names listed in _not_user_methods."""
if self._user_methods is None:
self._user_methods = set()
if not self._expose_all:
return
# discover methods if not already explicitly configured,
# or if we're exposing everything for debugging
if not self._user_methods or self._expose_all:
self._user_methods = {
name
for name in dir(self) # we can't use inspect.getmethods here
if isinstance(getattr(self.__class__, name, None), typing.Callable)
and not name.startswith("_")
}
if not self._expose_all:
# filter out configured exclusions
self._user_methods = {
name
for name in self._user_methods
if name not in self._not_user_methods
}
DISCOVER_REGISTERS = "__discover_registers__"
DISCOVER_PROPERTIES = "__discover_properties__"
DISCOVER_ALL = "__discover_all__"
MAGIC_USER_ATTRIBUTES = [DISCOVER_ALL, DISCOVER_PROPERTIES, DISCOVER_REGISTERS]
# "forward declaration" of sorts - there's circular reference between
# FpgaPeripheral and FpgaPersonality
# (the Peripheral stores a reference to the Personality for convenience)
FpgaPersonality = typing.NewType("FpgaPersonality", FpgaUserInterface)
[docs]
class FpgaPeripheral(FpgaUserInterface):
_field_config = {}
"""
Configuration values for fields read from FPGA.
e.g. to indicate to the control system that a field should be treated as
an error condition:
.. code-block:: python
_field_config = {"example": IclFpgaField(user_error=True)}
"""
[docs]
def __init__(
self,
driver: ArgsFpgaDriver,
map_field_info: typing.Dict[str, ArgsFieldInfo],
fpga_personality: FpgaPersonality = None,
):
"""
Base class that provides functions common to all FPGA Peripherals.
Creates an instance of :py:class:`IclFpgaField` for each field defined in the
map. Populates :py:attr:`_user_attributes` if not set by derived class.
:param driver: ArgsFpgaDriver instance, used to access FPGA
:param map_field_info: Most likely a portion of an ArgsMap.
:param fpga_personality: personality applicable to this peripheral
"""
super().__init__()
self._fields = {}
# configure fields 1st - they may be used in exposed attributes
for name, info in map_field_info.items():
# start with info from map
field_params = dataclasses.asdict(info)
if name in self._field_config:
# allow user config to replace info from map
# don't try to read the value property (if present) as it will crash
# filter out any 'None' values, so they don't overwrite defaults
user_config = {
field_.name: getattr(self._field_config[name], field_.name)
for field_ in dataclasses.fields(self._field_config[name])
if field_.name != "value"
and getattr(self._field_config[name], field_.name) is not None
}
field_params.update(user_config)
# link to driver
field_params["driver"] = driver
# create field object
self._fields[name] = IclFpgaField(**field_params)
# use FpgaPersonality's logger - if available; we also *may* need
# access to personality itself, so store a reference
self._logger = (
fpga_personality._logger.getChild(self.__class__.__name__)
if fpga_personality
else logging.getLogger(self.__class__.__name__)
)
self._personality = fpga_personality
# discover things to expose via the control system
self._discover_attributes()
self._discover_methods()
self._init_complete = True
def __dir__(self) -> list:
"""Add our list of registers to the default directory"""
return list(super().__dir__()) + list(self._fields.keys())
[docs]
def __getattr__(self, name: str) -> IclFpgaField:
"""Attribute access to fields"""
# TODO - is there a way to inspect the AttributeError that resulted in this
# function being called? It can mask some errors!!
if name in self._fields.keys():
return self._fields[name]
else:
if name in self.__dir__():
raise AttributeError(
f"AttributeError occurred reading "
f"{self.__class__.__name__}.{name}. Good luck!"
)
else:
raise AttributeError(
f"{self.__class__.__name__} has no attribute {name}"
)
[docs]
def __setattr__(self, key: str, value):
"""Pass on attribute writes to the field, where appropriate"""
if "_init_complete" in self.__dict__ and key in self._fields:
# FpgaPeripheral __init__ is finished, key is an FPGA field, write to FPGA
self._fields[key].value = value
return
if not key.startswith("_") and key not in dir(self):
self._logger.warning(
f"attr '{key}' is not an FPGA field name or peripheral attribute, "
"so a new Python instance variable will be created. "
"Prefix variable names with _ to suppress this message."
)
super().__setattr__(key, value)
[docs]
def __getitem__(self, item: str) -> IclFpgaField:
"""Index style access to fields"""
return self._fields[item]
[docs]
def __setitem__(self, key: str, value):
"""Pass on writes to the field"""
self._fields[key].value = value
[docs]
class FpgaPersonality(FpgaUserInterface):
_peripheral_class = {}
"""
Dict that maps from peripheral name (str) to class to be used.
If a peripheral is not listed in peripheral_class, a default FpgaPeripheral
object will be created.
e.g.
_peripheral_class = {"packetiser": Packetiser}
"""
_admin_attributes = FpgaUserInterface._admin_attributes | {
"peripherals",
"driver",
"default_interface",
"info",
"read_memory",
"write_memory",
}
"""Names of attributes that should never be exposed to the control system"""
[docs]
def __init__(
self,
driver: ArgsFpgaDriver,
map_: ArgsMap,
hardware_info: FpgaHardwareInfo = None,
logger: logging.Logger = None,
):
"""
Base class that provides functions common to all FPGA Personalities.
Creates FpgaPeripheral objects for each peripheral.
:param driver: ArgsFpgaDriver instance, used to access FPGA
:param map_: ARGS peripheral/register map.
:param hardware_info: Hardware monitoring interface (XrtInfo).
:param logger: Logger to report log messages to. Passed to peripherals.
"""
super().__init__()
if logger is None:
self._logger = logging.getLogger(self.__class__.__name__)
else:
self._logger = logger.getChild(self.__class__.__name__)
self._logger.debug("FPGA Personality init")
self._peripherals = {}
self._driver = driver
# Note this is optional, downstream must allow for info=None
self.info = hardware_info
# create Peripherals
for peripheral_name in map_.keys():
if peripheral_name in self._peripherals:
raise NotImplementedError("Can't do multiple instances of a peripheral")
try:
class_ = self._peripheral_class[peripheral_name]
except KeyError:
# default to generic class
class_ = FpgaPeripheral
peripheral = class_(driver, map_[peripheral_name], fpga_personality=self)
self._logger.info(
f"Created peripheral {peripheral_name} using class {class_.__name__}"
)
self._peripherals[peripheral_name] = peripheral
# discover things to expose via the control system
self._discover_attributes()
self._discover_methods()
def __del__(self):
"Cleanup. XRT driver may hold an Alveo card lock"
if self.info:
del self.info
del self._driver
# NOTE: all the derived classes should call this manually - it
# wan't be invoked automatically (unlike C++ dtor)
# see https://docs.python.org/3/reference/datamodel.html#object.__del__
# for details:
# "If a base class has a __del__() method, the derived class’s
# __del__() method, if any, must explicitly call it to ensure proper
# deletion of the base class part of the instance."
@property
def driver(self) -> ArgsFpgaDriver:
"""Access to underlying ArgsFpgaDriver"""
return self._driver
@property
def read_memory(self) -> typing.Callable:
"""Access to read_memory function of underlying ArgsFpgaDriver"""
return self._driver.read_memory
@property
def write_memory(self) -> typing.Callable:
"""Access to write_memory function of underlying ArgsFpgaDriver"""
return self._driver.write_memory
@property
def default_interface(self) -> ArgsFpgaDriver:
"""Temporary backwards-compatibility layer"""
warnings.warn(
"Please use 'driver' instead of 'default_interface'",
DeprecationWarning,
)
return self._driver
[docs]
def __getattr__(self, name: str):
"""Access peripherals by attribute syntax"""
# TODO - is there a way to inspect the AttributeError that resulted in this
# function being called? It can mask some errors!!
if name in self._peripherals:
return self._peripherals[name]
else:
if name in self.__dir__():
raise AttributeError(
f"AttributeError occurred reading "
f"{self.__class__.__name__}.{name}. Good luck!"
)
else:
raise AttributeError(
f"{self.__class__.__name__} has no peripheral '{name}'"
)
def __dir__(self):
"""Add our list of peripherals to the default directory"""
return list(super().__dir__()) + self.peripherals
[docs]
def __getitem__(self, item: str) -> FpgaPeripheral:
"""Index style access to peripheral objects"""
return self._peripherals[item]
@property
def peripherals(self) -> typing.List[str]:
"""
:return: List of peripheral names
"""
return list(self._peripherals.keys())
def _check_fw(self, personality: str, version_spec: str) -> None:
"""
Check the FPGA firmware is the right personality & version.
NOTE: We deviate from PEP 440 in ACCEPTING dev versions.
If version spec is ">=0.1.2" and firmware is "0.1.2-dev.1234", we will accept
this because we often want to use the newly added registers to debug dev
versions of firmware.
(PEP 440 says that "0.1.2-dev.1234" does not satisfy ">=0.1.2")
:param personality: 4-character personality code
:param version_spec: version specification string (e.g. "~=1.2.3")
See PEP 440 for details.
(~= means major must match, minor/patch must be >= specified)
:raises: RuntimeError if requirements not met
"""
self._logger.debug(
f"Checking for {personality} personality, version {version_spec}"
)
actual_personality = self.fw_personality.value
if actual_personality != personality:
int_required = int.from_bytes(personality.encode(encoding="ascii"), "big")
raise RuntimeError(
f"Wrong firmware personality: {actual_personality} "
f"(0x{self.system.firmware_personality.value:x})"
f". Expected: {personality} (0x{int_required:x})."
)
# Convert '-' to '+' as a way to nudge SpecifierSet.contains()
# i.e. if version_spec is ">=0.1.2":
# - contains("0.1.2-dev") => False
# - contains("0.1.2+dev") => True (our desired behaviour)
actual_version = self.fw_version.value.replace("-", "+")
spec = SpecifierSet(version_spec)
if not spec.contains(version.parse(actual_version)):
raise RuntimeError(
f"Wrong firmware version: {actual_version}. Expected: {version_spec}"
)
@property
def fw_version(self) -> IclField[str]:
"""
Get the FPGA Firmware Version.
:returns: version string formatted like the firmware packages:
``<major>.<minor>.<patch>[-<dev|main|pre|sim>.<commit hash>]``
"sim" indicates build type was zero (probably we are in simulation)
"""
fw_ver = (
f"{self.system.firmware_major_version.value}."
f"{self.system.firmware_minor_version.value}."
f"{self.system.firmware_patch_version.value}"
)
if hasattr(self.system, "build_type"):
# build_type register was added to firmware in Sep 2023
build_type = (
int.to_bytes(self.system.build_type.value, WORD_SIZE, "big")
.decode(encoding="ascii")
.lower()
.strip()
)
if build_type == "\x00\x00\x00\x00":
# fpgamap file does not contain build type or commit hash values,
# so we inject a dummy build type instead
build_type = "sim"
# For official releases, we do not append anything.
# Other packages should have the type (dev/main/pre) and hash added.
# We use a plus sign for PEP 440 compliance, so _check_fw will work.
if build_type != "rel":
# Note the minus sign here is swapped to a plus for internal comparison
# in _check_fw. Be aware if appending further suffixes.
fw_ver += f"-{build_type}.{self.system.commit_short_hash.value:08x}"
return IclField(
description="Firmware Version",
format="%s",
type_=str,
value=fw_ver,
user_error=False,
user_write=False,
)
@property
def fw_personality(self) -> IclField[str]:
"""
Get the FPGA Firmware personality, decoded to a string
"""
personality = int.to_bytes(
self.system.firmware_personality.value, WORD_SIZE, "big"
).decode(encoding="ascii")
return IclField(
description="Firmware Personality",
format="%s",
type_=str,
value=personality,
user_error=False,
user_write=False,
)