# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Instrument Control Layer
"""
import dataclasses
import logging
import os
import typing
import warnings
from dataclasses import dataclass, field
import numpy as np
from packaging import version
from packaging.specifiers import SpecifierSet
from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsFpgaDriver, ArgsWordType
from ska_low_cbf_fpga.args_map import ArgsFieldInfo, ArgsMap
from ska_low_cbf_fpga.hardware_info import FpgaHardwareInfo
T = typing.TypeVar("T")
[docs]@dataclass
class IclField(ArgsFieldInfo, typing.Generic[T]):
"""ICL field, probably derived from one or more :py:class:`IclFpgaField` s"""
# user configuration
type_: type = field(default=T)
format: str = field(default="%i")
user_write: bool = True
"""
Should the control system allow writing to this field?
Note: not enforced at ICL!
"""
user_error: bool = False
"""
Should the control system treat a non-zero value as an error/alarm?
"""
# dynamic
value: T = field(default=None)
# note: ARGS map fields inherited from ArgsFieldInfo
def __int__(self):
return int(self.value)
def __bool__(self):
return bool(self.value)
# operators
def __add__(self, other):
if isinstance(other, IclField):
return self.value + other.type_(other)
return self.value + other
def __radd__(self, other):
return self.__add__(other)
def __sub__(self, other):
if isinstance(other, IclField):
return self.value - other.type_(other)
return self.value - other
def __rsub__(self, other):
if isinstance(other, IclField):
return other.type_(other) - self.value
return other - self.value
def __mul__(self, other):
if isinstance(other, IclField):
return self.value * other.type_(other)
return self.value * other
def __rmul__(self, other):
return self.__mul__(other)
def __pow__(self, other):
if isinstance(other, IclField):
return self.value ** other.type_(other)
return self.value**other
def __rpow__(self, other):
if isinstance(other, IclField):
return other.type_(other) ** self.value
return other**self.value
def __truediv__(self, other):
if isinstance(other, IclField):
return self.value / other.type_(other)
return self.value / other
def __rtruediv__(self, other):
if isinstance(other, IclField):
return other.type_(other) / self.value
return other / self.value
def __floordiv__(self, other):
if isinstance(other, IclField):
return self.value // other.type_(other)
return self.value // other
def __rfloordiv__(self, other):
if isinstance(other, IclField):
return other.type_(other) // self.value
return other // self.value
def __mod__(self, other):
if isinstance(other, IclField):
return self.value % other.type_(other)
return self.value % other
def __rmod__(self, other):
if isinstance(other, IclField):
return other.type_(other) % self.value
return other % self.value
def __lshift__(self, other):
if isinstance(other, IclField):
return self.value << other.type_(other)
return self.value << other
def __rlshift__(self, other):
if isinstance(other, IclField):
return other.type_(other) << self.value
return other << self.value
def __rshift__(self, other):
if isinstance(other, IclField):
return self.value >> other.type_(other)
return self.value >> other
def __rrshift__(self, other):
if isinstance(other, IclField):
return other.type_(other) >> self.value
return other >> self.value
def __and__(self, other):
if isinstance(other, IclField):
return self.value & other.type_(other)
return self.value & other
def __rand__(self, other):
return self.__and__(other)
def __or__(self, other):
if isinstance(other, IclField):
return self.value | other.type_(other)
return self.value | other
def __ror__(self, other):
return self.__or__(other)
def __xor__(self, other):
if isinstance(other, IclField):
return self.value ^ other.type_(other)
return self.value ^ other
def __rxor__(self, other):
return self.__xor__(other)
def __invert__(self):
return ~self.value
# comparisons
def __lt__(self, other):
if isinstance(other, IclField):
return self.value < other.type_(other)
return self.value < other
def __le__(self, other):
if isinstance(other, IclField):
return self.value <= other.type_(other)
return self.value <= other
def __eq__(self, other):
if isinstance(other, IclField):
return self.value == other.type_(other)
return self.value == other
def __ne__(self, other):
if isinstance(other, IclField):
return self.value != other.type_(other)
return self.value != other
def __gt__(self, other):
if isinstance(other, IclField):
return self.value > other.type_(other)
return self.value > other
def __ge__(self, other):
if isinstance(other, IclField):
return self.value >= other.type_(other)
return self.value >= other
# The eq and order parameters are False to stop the dataclass decorator from
# overriding the operators defined in IclField
[docs]@dataclass(eq=False, order=False)
class IclFpgaField(IclField[ArgsWordType]):
"""An ICL field that is linked to an :py:class:`ArgsFpgaDriver` ."""
# configuration
driver: ArgsFpgaDriver = None
type_: type = ArgsWordType
# dynamic
value: ArgsWordType = field(init=False)
@property
def value(self) -> typing.Union[int, np.ndarray, bool]:
"""Read current value from FPGA interface"""
val = self.driver.read(self.address, self.length)
if type(val) is int:
# Single word length - apply bitoffset, then width masking if specified
if self.bit_offset:
val = val >> self.bit_offset
if self.width:
mask = (1 << self.width) - 1
val &= mask
if self.width == 1:
# single bit returned as bool
return bool(val)
return val
if type(val) is np.ndarray:
# do nothing with arrays
return val
@value.setter
def value(
self, value: typing.Union[int, bool, np.integer, np.ndarray, IclField]
):
"""Write new value via FPGA interface"""
# on initialisation, the property object is passed to the setter
# we don't want to send this to the FPGA
# (an alternative could be to set an "init_complete" bool in __post_init__)
if type(value) is not property:
if isinstance(value, IclField):
# extract register value from IclField objects
value = value.value
if type(value) is np.ndarray:
self.driver.write(self.address, value)
return
elif type(value) in [int, bool] or isinstance(value, np.integer):
# FIXME: ideally we would lock the driver for this...
# but the driver controls the lock internally so that's a job for later
# if value is bool, convert to int
value = int(value)
bit_width = self.width or (WORD_SIZE * 8)
# If updating partial register, we need to read/modify/write
if bit_width != (WORD_SIZE * 8):
inital_value = self.driver.read(self.address, self.length)
bit_offset = self.bit_offset or 0
mask = ((1 << bit_width) - 1) << bit_offset
masked_initial_value = inital_value & ~mask
masked_value = (value << bit_offset) & mask
value = masked_initial_value | masked_value
self.driver.write(self.address, value)
return
else:
raise TypeError(
f"{self.__class__.__name__} does not know how "
f"to set a value of type {type(value)}"
)
[docs] def __getitem__(self, item: typing.Union[int, slice]) -> ArgsWordType:
"""Array access for multi-word values"""
read_length = 1
if isinstance(item, slice):
if item.step is not None and item.step != 1:
raise NotImplementedError("only step size 1 supported")
end = self.length
if item.stop is not None:
end = self._index(item.stop)
self._check_index(end - 1)
start = self._index(item.start or 0)
# copying list/numpy array behaviour for e.g. list[-2:-4]
read_length = max(end - start, 0)
else:
start = self._index(item)
self._check_index(start)
return self.driver.read(self.address + start * WORD_SIZE, read_length)
[docs] def __setitem__(self, key: typing.Union[int, slice], value: ArgsWordType):
"""
Array index style access for setting part of a multi-word value.
e.g. my_field[4] = 32
my_field[3:6] = np.ones(3, dtype=ArgsWordType)
If using a slice, value must be an array
i.e. Don't do this: my_field[3:4] = 5.
Instead, use my_field[3] = 5 or even my_field[3:4] = np.array([5])
"""
if isinstance(key, slice):
start = self._index(key.start or 0)
if key.step is not None and key.step != 1:
raise NotImplementedError("only step size 1 supported")
if key.stop is not None:
end = self._index(key.stop)
slice_len = end - start
else:
slice_len = self.length - start
if len(value) != slice_len:
raise ValueError(
f"value length {len(value)} is not equal "
f"to slice length {slice_len}"
)
else:
start = self._index(key)
self._check_index(start)
self.driver.write(self.address + start * WORD_SIZE, value)
def _check_index(self, index: int):
"""Check if an element index is within our length"""
if index >= self.length:
raise IndexError("index out of range")
def _index(self, index: int):
"""Transform a possibly-negative index"""
if index < 0:
self._check_index(abs(index) - 1)
return self.length + index
return index
[docs]class FpgaUserInterface:
"""
A common interface used by :py:class:`FpgaPeripheral` and
:py:class:`FpgaPersonality` for exposing attributes and methods to the control
system.
"""
_user_attributes = set()
"""
Attributes to be exposed to the control system.
Defaults to properties (if configured), or FPGA registers.
Set to None if you really want no attributes exposed.
"""
_not_user_attributes = set()
"""Attribute names to exclude from auto-discovery for the control system."""
_admin_attributes = {"user_attributes", "user_methods"}
"""Names of attributes that should never be listed in user_attributes"""
_user_methods = set()
"""Methods to be exposed to the control system."""
_not_user_methods = set()
"""Method names to exclude from auto-discovery for the control system."""
[docs] def __init__(self):
# we do not want object instances to modify the base class values!
if self._user_attributes is FpgaUserInterface._user_attributes:
self._user_attributes = set(self._user_attributes)
if self._not_user_attributes is FpgaUserInterface._not_user_attributes:
self._not_user_attributes = set(self._not_user_attributes)
if self._user_methods is FpgaUserInterface._user_methods:
self._user_methods = set(self._user_methods)
if self._not_user_methods is FpgaUserInterface._not_user_methods:
self._not_user_methods = set(self._not_user_methods)
if self._admin_attributes is FpgaUserInterface._admin_attributes:
self._admin_attributes = set(self._admin_attributes)
# NOTE: don't use os.getenv() below - it won't reflect the variable
# value changes (e.g. during unit test)
value = os.environ.get("DEBUG_FPGA", "no")
self._expose_all = value.lower() in (
"true",
"yes",
"on",
"1",
)
@property
def user_attributes(self) -> typing.Set[str]:
"""
Attributes to be exposed to the control system.
"""
return self._user_attributes
@property
def user_methods(self) -> typing.Set[str]:
"""Methods to be exposed to the control system."""
return self._user_methods
def _discover_attributes(self):
"""Collect object's user attributes - unless it's None"""
if self._user_attributes is None:
self._user_attributes = set()
if not self._expose_all:
return
assert type(self._user_attributes) == set
# if user attributes is empty, or the user requested it,
# expose all properties
if self._discover_properties:
properties = {
attr
for attr in dir(self)
if isinstance(getattr(self.__class__, attr, None), property)
and not attr.startswith("_")
and attr not in self._admin_attributes
}
self._user_attributes |= set(properties)
# if user attributes is still empty, or the user requested it,
# add all registers as read-only
if isinstance(self, FpgaPeripheral) and self._discover_registers:
self._user_attributes |= set(self._fields.keys())
if not self._expose_all:
# remove anything that we've been told to explicitly exclude
self._user_attributes -= self._not_user_attributes
# remove any special configuration flags
for magic_value in MAGIC_USER_ATTRIBUTES:
if magic_value in self._user_attributes:
self._user_attributes.remove(magic_value)
@property
def _discover_properties(self) -> bool:
"""Return True if properties should be exposed, False otherwise"""
return (
self._expose_all
or len(self._user_attributes) == 0
or DISCOVER_PROPERTIES in self._user_attributes
or DISCOVER_ALL in self._user_attributes
)
@property
def _discover_registers(self) -> bool:
"""return True in case user attributes is still empty
or the user requested registers"""
return (
self._expose_all
or not self._user_attributes
or DISCOVER_REGISTERS in self._user_attributes
or DISCOVER_ALL in self._user_attributes
)
def _discover_methods(self):
"""Collect our (public) method names.
Exclude method names listed in _not_user_methods."""
if self._user_methods is None:
self._user_methods = set()
if not self._expose_all:
return
# discover methods if not already explicitly configured,
# or if we're exposing everything for debugging
if not self._user_methods or self._expose_all:
self._user_methods = {
name
for name in dir(self) # we can't use inspect.getmethods here
if isinstance(
getattr(self.__class__, name, None), typing.Callable
)
and not name.startswith("_")
}
if not self._expose_all:
# filter out configured exclusions
self._user_methods = {
name
for name in self._user_methods
if name not in self._not_user_methods
}
DISCOVER_REGISTERS = "__discover_registers__"
DISCOVER_PROPERTIES = "__discover_properties__"
DISCOVER_ALL = "__discover_all__"
MAGIC_USER_ATTRIBUTES = [DISCOVER_ALL, DISCOVER_PROPERTIES, DISCOVER_REGISTERS]
# "forward declaration" of sorts - there's circular reference between
# FpgaPeripheral and FpgaPersonality
# (the Peripheral stores a reference to the Personality for convenience)
FpgaPersonality = typing.NewType("FpgaPersonality", FpgaUserInterface)
[docs]class FpgaPeripheral(FpgaUserInterface):
_field_config = {}
"""
Configuration values for fields read from FPGA.
e.g. to indicate to the control system that a field should be treated as
an error condition:
.. code-block:: python
_field_config = {"example": IclFpgaField(user_error=True)}
"""
[docs] def __init__(
self,
driver: ArgsFpgaDriver,
map_field_info: typing.Dict[str, ArgsFieldInfo],
fpga_personality: FpgaPersonality = None,
):
"""
Base class that provides functions common to all FPGA Peripherals.
Creates an instance of :py:class:`IclFpgaField` for each field defined in the
map. Populates :py:attr:`_user_attributes` if not set by derived class.
:param driver: ArgsFpgaDriver instance, used to access FPGA
:param map_field_info: Most likely a portion of an ArgsMap.
:param fpga_personality: personality applicable to this peripheral
"""
super().__init__()
self._fields = {}
# configure fields 1st - they may be used in exposed attributes
for name, info in map_field_info.items():
# start with info from map
field_params = dataclasses.asdict(info)
if name in self._field_config:
# allow user config to replace info from map
# don't try to read the value property (if present) as it will crash
# filter out any 'None' values, so they don't overwrite defaults
user_config = {
field_.name: getattr(self._field_config[name], field_.name)
for field_ in dataclasses.fields(self._field_config[name])
if field_.name != "value"
and getattr(self._field_config[name], field_.name)
is not None
}
field_params.update(user_config)
# link to driver
field_params["driver"] = driver
# create field object
self._fields[name] = IclFpgaField(**field_params)
# use FpgaPersonality's logger - if available; we also *may* need
# access to personality itself, so store a reference
self._logger = (
fpga_personality._logger.getChild(self.__class__.__name__)
if fpga_personality
else logging.getLogger(self.__class__.__name__)
)
self._personality = fpga_personality
# discover things to expose via the control system
self._discover_attributes()
self._discover_methods()
self._init_complete = True
def __dir__(self) -> list:
"""Add our list of registers to the default directory"""
return list(super().__dir__()) + list(self._fields.keys())
[docs] def __getattr__(self, name: str) -> IclFpgaField:
"""Attribute access to fields"""
# TODO - is there a way to inspect the AttributeError that resulted in this
# function being called? It can mask some errors!!
if name in self._fields.keys():
return self._fields[name]
else:
if name in self.__dir__():
raise AttributeError(
f"AttributeError occurred reading "
f"{self.__class__.__name__}.{name}. Good luck!"
)
else:
raise AttributeError(
f"{self.__class__.__name__} has no attribute {name}"
)
[docs] def __setattr__(self, key: str, value):
"""Pass on attribute writes to the field, where appropriate"""
if "_init_complete" in self.__dict__ and key in self._fields:
self._fields[key].value = value
else:
super().__setattr__(key, value)
[docs] def __getitem__(self, item: str) -> IclFpgaField:
"""Index style access to fields"""
return self._fields[item]
[docs] def __setitem__(self, key: str, value):
"""Pass on writes to the field"""
self._fields[key].value = value
[docs]class FpgaPersonality(FpgaUserInterface):
_peripheral_class = {}
"""
Dict that maps from peripheral name (str) to class to be used.
If a peripheral is not listed in peripheral_class, a default FpgaPeripheral
object will be created.
e.g.
_peripheral_class = {"packetiser": Packetiser}
"""
_admin_attributes = FpgaUserInterface._admin_attributes | {
"peripherals",
"driver",
"default_interface",
"info",
"read_memory",
"write_memory",
}
"""Names of attributes that should never be exposed to the control system"""
[docs] def __init__(
self,
driver: ArgsFpgaDriver,
map_: ArgsMap,
hardware_info: FpgaHardwareInfo = None,
logger: logging.Logger = None,
):
"""
Base class that provides functions common to all FPGA Personalities.
Creates FpgaPeripheral objects for each peripheral.
:param driver: ArgsFpgaDriver instance, used to access FPGA
:param map_: ARGS peripheral/register map.
:param hardware_info: Hardware monitoring interface (XrtInfo).
:param logger: Logger to report log messages to. Passed to peripherals.
"""
super().__init__()
if logger is None:
self._logger = logging.getLogger(self.__class__.__name__)
else:
self._logger = logger.getChild(self.__class__.__name__)
self._logger.debug("FPGA Personality init")
self._peripherals = {}
self._driver = driver
# Note this is optional, downstream must allow for info=None
self.info = hardware_info
# create Peripherals
for peripheral_name in map_.keys():
if peripheral_name in self._peripherals:
raise NotImplementedError(
"Can't do multiple instances of a peripheral"
)
try:
class_ = self._peripheral_class[peripheral_name]
except KeyError:
# default to generic class
class_ = FpgaPeripheral
peripheral = class_(
driver, map_[peripheral_name], fpga_personality=self
)
self._logger.info(
f"Created peripheral {peripheral_name} using class {class_.__name__}"
)
self._peripherals[peripheral_name] = peripheral
# discover things to expose via the control system
self._discover_attributes()
self._discover_methods()
def __del__(self):
"Cleanup. XRT driver may hold an Alveo card lock"
if self.info:
del self.info
del self._driver
# NOTE: all the derived classes should call this manually - it
# wan't be invoked automatically (unlike C++ dtor)
# see https://docs.python.org/3/reference/datamodel.html#object.__del__
# for details:
# "If a base class has a __del__() method, the derived class’s
# __del__() method, if any, must explicitly call it to ensure proper
# deletion of the base class part of the instance."
@property
def driver(self) -> ArgsFpgaDriver:
"""Access to underlying ArgsFpgaDriver"""
return self._driver
@property
def read_memory(self) -> typing.Callable:
"""Access to read_memory function of underlying ArgsFpgaDriver"""
return self._driver.read_memory
@property
def write_memory(self) -> typing.Callable:
"""Access to write_memory function of underlying ArgsFpgaDriver"""
return self._driver.write_memory
@property
def default_interface(self) -> ArgsFpgaDriver:
"""Temporary backwards-compatibility layer"""
warnings.warn(
"Please use 'driver' instead of 'default_interface'",
DeprecationWarning,
)
return self._driver
[docs] def __getattr__(self, name: str):
"""Access peripherals by attribute syntax"""
# TODO - is there a way to inspect the AttributeError that resulted in this
# function being called? It can mask some errors!!
if name in self._peripherals:
return self._peripherals[name]
else:
if name in self.__dir__():
raise AttributeError(
f"AttributeError occurred reading "
f"{self.__class__.__name__}.{name}. Good luck!"
)
else:
raise AttributeError(
f"{self.__class__.__name__} has no peripheral '{name}'"
)
def __dir__(self):
"""Add our list of peripherals to the default directory"""
return list(super().__dir__()) + self.peripherals
[docs] def __getitem__(self, item: str) -> FpgaPeripheral:
"""Index style access to peripheral objects"""
return self._peripherals[item]
@property
def peripherals(self) -> typing.List[str]:
"""
:return: List of peripheral names
"""
return list(self._peripherals.keys())
def _check_fw(self, personality: str, version_spec: str) -> None:
"""
Check the FPGA firmware is the right personality & version.
NOTE: We deviate from PEP 440 in ACCEPTING dev versions.
If version spec is ">=0.1.2" and firmware is "0.1.2-dev.1234", we will accept
this because we often want to use the newly added registers to debug dev
versions of firmware.
(PEP 440 says that "0.1.2-dev.1234" does not satisfy ">=0.1.2")
:param personality: 4-character personality code
:param version_spec: version specification string (e.g. "~=1.2.3")
See PEP 440 for details.
(~= means major must match, minor/patch must be >= specified)
:raises: RuntimeError if requirements not met
"""
self._logger.debug(
f"Checking for {personality} personality, version {version_spec}"
)
actual_personality = self.fw_personality.value
if actual_personality != personality:
int_required = int.from_bytes(
personality.encode(encoding="ascii"), "big"
)
raise RuntimeError(
f"Wrong firmware personality: {actual_personality} "
f"(0x{self.system.firmware_personality.value:x})"
f". Expected: {personality} (0x{int_required:x})."
)
# Convert '-' to '+' as a way to nudge SpecifierSet.contains()
# i.e. if version_spec is ">=0.1.2":
# - contains("0.1.2-dev") => False
# - contains("0.1.2+dev") => True (our desired behaviour)
actual_version = self.fw_version.value.replace("-", "+")
spec = SpecifierSet(version_spec)
if not spec.contains(version.parse(actual_version)):
raise RuntimeError(
f"Wrong firmware version: {actual_version}. Expected: {version_spec}"
)
@property
def fw_version(self) -> IclField[str]:
"""
Get the FPGA Firmware Version.
:returns: version string formatted like the firmware packages:
``<major>.<minor>.<patch>[-<dev|main|pre|sim>.<commit hash>]``
"sim" indicates build type was zero (probably we are in simulation)
"""
fw_ver = (
f"{self.system.firmware_major_version.value}."
f"{self.system.firmware_minor_version.value}."
f"{self.system.firmware_patch_version.value}"
)
if hasattr(self.system, "build_type"):
# build_type register was added to firmware in Sep 2023
build_type = (
int.to_bytes(self.system.build_type.value, WORD_SIZE, "big")
.decode(encoding="ascii")
.lower()
.strip()
)
if build_type == "\x00\x00\x00\x00":
# fpgamap file does not contain build type or commit hash values,
# so we inject a dummy build type instead
build_type = "sim"
# For official releases, we do not append anything.
# Other packages should have the type (dev/main/pre) and hash added.
# We use a plus sign for PEP 440 compliance, so _check_fw will work.
if build_type != "rel":
# Note the minus sign here is swapped to a plus for internal comparison
# in _check_fw. Be aware if appending further suffixes.
fw_ver += (
f"-{build_type}.{self.system.commit_short_hash.value:08x}"
)
return IclField(
description="Firmware Version",
format="%s",
type_=str,
value=fw_ver,
user_error=False,
user_write=False,
)
@property
def fw_personality(self) -> IclField[str]:
"""
Get the FPGA Firmware personality, decoded to a string
"""
personality = int.to_bytes(
self.system.firmware_personality.value, WORD_SIZE, "big"
).decode(encoding="ascii")
return IclField(
description="Firmware Personality",
format="%s",
type_=str,
value=personality,
user_error=False,
user_write=False,
)