# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Instrument Control Layer (ICL) "Field" Objects.
These hold data (usually read from FPGA registers) and metadata.
"""
import typing
from dataclasses import dataclass, field
import numpy as np
from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsFpgaDriver, ArgsWordType
from ska_low_cbf_fpga.args_map import ArgsFieldInfo
T = typing.TypeVar("T")
[docs]
@dataclass
class IclField(ArgsFieldInfo, typing.Generic[T]):
"""ICL field, probably derived from one or more :py:class:`IclFpgaField` s"""
# user configuration
type_: type = field(default=T)
format: str = field(default="%i")
user_write: bool = True
"""
Should the control system allow writing to this field?
Note: not enforced at ICL!
"""
user_error: bool = False
"""
Should the control system treat a non-zero value as an error/alarm?
"""
# dynamic
value: T = field(default=None)
# note: ARGS map fields inherited from ArgsFieldInfo
def __int__(self):
return int(self.value)
def __bool__(self):
return bool(self.value)
# operators
def __add__(self, other):
if isinstance(other, IclField):
return self.value + other.type_(other)
return self.value + other
def __radd__(self, other):
return self.__add__(other)
def __sub__(self, other):
if isinstance(other, IclField):
return self.value - other.type_(other)
return self.value - other
def __rsub__(self, other):
if isinstance(other, IclField):
return other.type_(other) - self.value
return other - self.value
def __mul__(self, other):
if isinstance(other, IclField):
return self.value * other.type_(other)
return self.value * other
def __rmul__(self, other):
return self.__mul__(other)
def __pow__(self, other):
if isinstance(other, IclField):
return self.value ** other.type_(other)
return self.value**other
def __rpow__(self, other):
if isinstance(other, IclField):
return other.type_(other) ** self.value
return other**self.value
def __truediv__(self, other):
if isinstance(other, IclField):
return self.value / other.type_(other)
return self.value / other
def __rtruediv__(self, other):
if isinstance(other, IclField):
return other.type_(other) / self.value
return other / self.value
def __floordiv__(self, other):
if isinstance(other, IclField):
return self.value // other.type_(other)
return self.value // other
def __rfloordiv__(self, other):
if isinstance(other, IclField):
return other.type_(other) // self.value
return other // self.value
def __mod__(self, other):
if isinstance(other, IclField):
return self.value % other.type_(other)
return self.value % other
def __rmod__(self, other):
if isinstance(other, IclField):
return other.type_(other) % self.value
return other % self.value
def __lshift__(self, other):
if isinstance(other, IclField):
return self.value << other.type_(other)
return self.value << other
def __rlshift__(self, other):
if isinstance(other, IclField):
return other.type_(other) << self.value
return other << self.value
def __rshift__(self, other):
if isinstance(other, IclField):
return self.value >> other.type_(other)
return self.value >> other
def __rrshift__(self, other):
if isinstance(other, IclField):
return other.type_(other) >> self.value
return other >> self.value
def __and__(self, other):
if isinstance(other, IclField):
return self.value & other.type_(other)
return self.value & other
def __rand__(self, other):
return self.__and__(other)
def __or__(self, other):
if isinstance(other, IclField):
return self.value | other.type_(other)
return self.value | other
def __ror__(self, other):
return self.__or__(other)
def __xor__(self, other):
if isinstance(other, IclField):
return self.value ^ other.type_(other)
return self.value ^ other
def __rxor__(self, other):
return self.__xor__(other)
def __invert__(self):
return ~self.value
# comparisons
def __lt__(self, other):
if isinstance(other, IclField):
return self.value < other.type_(other)
return self.value < other
def __le__(self, other):
if isinstance(other, IclField):
return self.value <= other.type_(other)
return self.value <= other
def __eq__(self, other):
if isinstance(other, IclField):
return self.value == other.type_(other)
return self.value == other
def __ne__(self, other):
if isinstance(other, IclField):
return self.value != other.type_(other)
return self.value != other
def __gt__(self, other):
if isinstance(other, IclField):
return self.value > other.type_(other)
return self.value > other
def __ge__(self, other):
if isinstance(other, IclField):
return self.value >= other.type_(other)
return self.value >= other
# The eq and order parameters are False to stop the dataclass decorator from
# overriding the operators defined in IclField
[docs]
@dataclass(eq=False, order=False)
class IclFpgaField(IclField[ArgsWordType]):
"""An ICL field that is linked to an :py:class:`ArgsFpgaDriver` ."""
# configuration
driver: ArgsFpgaDriver = None
type_: type = ArgsWordType
# dynamic
value: ArgsWordType = field(init=False)
@property
def value(self) -> typing.Union[int, np.ndarray, bool]: # noqa: F811
"""Read current value from FPGA interface"""
val = self.driver.read(self.address, self.length)
if type(val) is int:
# Single word length - apply bitoffset, then width masking if specified
if self.bit_offset:
val = val >> self.bit_offset
if self.width:
mask = (1 << self.width) - 1
val &= mask
if self.width == 1:
# single bit returned as bool
return bool(val)
return val
if type(val) is np.ndarray:
# do nothing with arrays
return val
@value.setter
def value(self, value: typing.Union[int, bool, np.integer, np.ndarray, IclField]):
"""Write new value via FPGA interface"""
# on initialisation, the property object is passed to the setter
# we don't want to send this to the FPGA
# (an alternative could be to set an "init_complete" bool in __post_init__)
if type(value) is not property:
if isinstance(value, IclField):
# extract register value from IclField objects
value = value.value
if type(value) is np.ndarray:
self.driver.write(self.address, value)
return
elif type(value) in [int, bool] or isinstance(value, np.integer):
# FIXME: ideally we would lock the driver for this...
# but the driver controls the lock internally so that's a job for later
# if value is bool, convert to int
value = int(value)
bit_width = self.width or (WORD_SIZE * 8)
# If updating partial register, we need to read/modify/write
if bit_width != (WORD_SIZE * 8):
inital_value = self.driver.read(self.address, self.length)
bit_offset = self.bit_offset or 0
mask = ((1 << bit_width) - 1) << bit_offset
masked_initial_value = inital_value & ~mask
masked_value = (value << bit_offset) & mask
value = masked_initial_value | masked_value
self.driver.write(self.address, value)
return
else:
raise TypeError(
f"{self.__class__.__name__} does not know how "
f"to set a value of type {type(value)}"
)
[docs]
def __getitem__(self, item: typing.Union[int, slice]) -> ArgsWordType:
"""Array access for multi-word values"""
read_length = 1
if isinstance(item, slice):
if item.step is not None and item.step != 1:
raise NotImplementedError("only step size 1 supported")
end = self.length
if item.stop is not None:
end = self._index(item.stop)
self._check_index(end - 1)
start = self._index(item.start or 0)
# copying list/numpy array behaviour for e.g. list[-2:-4]
read_length = max(end - start, 0)
else:
start = self._index(item)
self._check_index(start)
return self.driver.read(self.address + start * WORD_SIZE, read_length)
[docs]
def __setitem__(self, key: typing.Union[int, slice], value: ArgsWordType):
"""
Array index style access for setting part of a multi-word value.
e.g. my_field[4] = 32
my_field[3:6] = np.ones(3, dtype=ArgsWordType)
If using a slice, value must be an array
i.e. Don't do this: my_field[3:4] = 5.
Instead, use my_field[3] = 5 or even my_field[3:4] = np.array([5])
"""
if isinstance(key, slice):
start = self._index(key.start or 0)
if key.step is not None and key.step != 1:
raise NotImplementedError("only step size 1 supported")
if key.stop is not None:
end = self._index(key.stop)
slice_len = end - start
else:
slice_len = self.length - start
if len(value) != slice_len:
raise ValueError(
f"value length {len(value)} is not equal "
f"to slice length {slice_len}"
)
else:
start = self._index(key)
self._check_index(start)
self.driver.write(self.address + start * WORD_SIZE, value)
def _check_index(self, index: int):
"""Check if an element index is within our length"""
if index >= self.length:
raise IndexError("index out of range")
def _index(self, index: int):
"""Transform a possibly-negative index"""
if index < 0:
self._check_index(abs(index) - 1)
return self.length + index
return index