Source code for ska_low_cbf_fpga.icl_field

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

"""
Instrument Control Layer (ICL) "Field" Objects.

These hold data (usually read from FPGA registers) and metadata.
"""
import typing
from dataclasses import dataclass, field

import numpy as np

from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsFpgaDriver, ArgsWordType
from ska_low_cbf_fpga.args_map import ArgsFieldInfo

T = typing.TypeVar("T")


[docs] @dataclass class IclField(ArgsFieldInfo, typing.Generic[T]): """ICL field, probably derived from one or more :py:class:`IclFpgaField` s""" # user configuration type_: type = field(default=T) format: str = field(default="%i") user_write: bool = True """ Should the control system allow writing to this field? Note: not enforced at ICL! """ user_error: bool = False """ Should the control system treat a non-zero value as an error/alarm? """ # dynamic value: T = field(default=None) # note: ARGS map fields inherited from ArgsFieldInfo def __int__(self): return int(self.value) def __bool__(self): return bool(self.value) # operators def __add__(self, other): if isinstance(other, IclField): return self.value + other.type_(other) return self.value + other def __radd__(self, other): return self.__add__(other) def __sub__(self, other): if isinstance(other, IclField): return self.value - other.type_(other) return self.value - other def __rsub__(self, other): if isinstance(other, IclField): return other.type_(other) - self.value return other - self.value def __mul__(self, other): if isinstance(other, IclField): return self.value * other.type_(other) return self.value * other def __rmul__(self, other): return self.__mul__(other) def __pow__(self, other): if isinstance(other, IclField): return self.value ** other.type_(other) return self.value**other def __rpow__(self, other): if isinstance(other, IclField): return other.type_(other) ** self.value return other**self.value def __truediv__(self, other): if isinstance(other, IclField): return self.value / other.type_(other) return self.value / other def __rtruediv__(self, other): if isinstance(other, IclField): return other.type_(other) / self.value return other / self.value def __floordiv__(self, other): if isinstance(other, IclField): return self.value // other.type_(other) return self.value // other def __rfloordiv__(self, other): if isinstance(other, IclField): return other.type_(other) // self.value return other // self.value def __mod__(self, other): if isinstance(other, IclField): return self.value % other.type_(other) return self.value % other def __rmod__(self, other): if isinstance(other, IclField): return other.type_(other) % self.value return other % self.value def __lshift__(self, other): if isinstance(other, IclField): return self.value << other.type_(other) return self.value << other def __rlshift__(self, other): if isinstance(other, IclField): return other.type_(other) << self.value return other << self.value def __rshift__(self, other): if isinstance(other, IclField): return self.value >> other.type_(other) return self.value >> other def __rrshift__(self, other): if isinstance(other, IclField): return other.type_(other) >> self.value return other >> self.value def __and__(self, other): if isinstance(other, IclField): return self.value & other.type_(other) return self.value & other def __rand__(self, other): return self.__and__(other) def __or__(self, other): if isinstance(other, IclField): return self.value | other.type_(other) return self.value | other def __ror__(self, other): return self.__or__(other) def __xor__(self, other): if isinstance(other, IclField): return self.value ^ other.type_(other) return self.value ^ other def __rxor__(self, other): return self.__xor__(other) def __invert__(self): return ~self.value # comparisons def __lt__(self, other): if isinstance(other, IclField): return self.value < other.type_(other) return self.value < other def __le__(self, other): if isinstance(other, IclField): return self.value <= other.type_(other) return self.value <= other def __eq__(self, other): if isinstance(other, IclField): return self.value == other.type_(other) return self.value == other def __ne__(self, other): if isinstance(other, IclField): return self.value != other.type_(other) return self.value != other def __gt__(self, other): if isinstance(other, IclField): return self.value > other.type_(other) return self.value > other def __ge__(self, other): if isinstance(other, IclField): return self.value >= other.type_(other) return self.value >= other
# The eq and order parameters are False to stop the dataclass decorator from # overriding the operators defined in IclField
[docs] @dataclass(eq=False, order=False) class IclFpgaField(IclField[ArgsWordType]): """An ICL field that is linked to an :py:class:`ArgsFpgaDriver` .""" # configuration driver: ArgsFpgaDriver = None type_: type = ArgsWordType # dynamic value: ArgsWordType = field(init=False) @property def value(self) -> typing.Union[int, np.ndarray, bool]: # noqa: F811 """Read current value from FPGA interface""" val = self.driver.read(self.address, self.length) if type(val) is int: # Single word length - apply bitoffset, then width masking if specified if self.bit_offset: val = val >> self.bit_offset if self.width: mask = (1 << self.width) - 1 val &= mask if self.width == 1: # single bit returned as bool return bool(val) return val if type(val) is np.ndarray: # do nothing with arrays return val @value.setter def value(self, value: typing.Union[int, bool, np.integer, np.ndarray, IclField]): """Write new value via FPGA interface""" # on initialisation, the property object is passed to the setter # we don't want to send this to the FPGA # (an alternative could be to set an "init_complete" bool in __post_init__) if type(value) is not property: if isinstance(value, IclField): # extract register value from IclField objects value = value.value if type(value) is np.ndarray: self.driver.write(self.address, value) return elif type(value) in [int, bool] or isinstance(value, np.integer): # FIXME: ideally we would lock the driver for this... # but the driver controls the lock internally so that's a job for later # if value is bool, convert to int value = int(value) bit_width = self.width or (WORD_SIZE * 8) # If updating partial register, we need to read/modify/write if bit_width != (WORD_SIZE * 8): inital_value = self.driver.read(self.address, self.length) bit_offset = self.bit_offset or 0 mask = ((1 << bit_width) - 1) << bit_offset masked_initial_value = inital_value & ~mask masked_value = (value << bit_offset) & mask value = masked_initial_value | masked_value self.driver.write(self.address, value) return else: raise TypeError( f"{self.__class__.__name__} does not know how " f"to set a value of type {type(value)}" )
[docs] def __getitem__(self, item: typing.Union[int, slice]) -> ArgsWordType: """Array access for multi-word values""" read_length = 1 if isinstance(item, slice): if item.step is not None and item.step != 1: raise NotImplementedError("only step size 1 supported") end = self.length if item.stop is not None: end = self._index(item.stop) self._check_index(end - 1) start = self._index(item.start or 0) # copying list/numpy array behaviour for e.g. list[-2:-4] read_length = max(end - start, 0) else: start = self._index(item) self._check_index(start) return self.driver.read(self.address + start * WORD_SIZE, read_length)
[docs] def __setitem__(self, key: typing.Union[int, slice], value: ArgsWordType): """ Array index style access for setting part of a multi-word value. e.g. my_field[4] = 32 my_field[3:6] = np.ones(3, dtype=ArgsWordType) If using a slice, value must be an array i.e. Don't do this: my_field[3:4] = 5. Instead, use my_field[3] = 5 or even my_field[3:4] = np.array([5]) """ if isinstance(key, slice): start = self._index(key.start or 0) if key.step is not None and key.step != 1: raise NotImplementedError("only step size 1 supported") if key.stop is not None: end = self._index(key.stop) slice_len = end - start else: slice_len = self.length - start if len(value) != slice_len: raise ValueError( f"value length {len(value)} is not equal " f"to slice length {slice_len}" ) else: start = self._index(key) self._check_index(start) self.driver.write(self.address + start * WORD_SIZE, value)
def _check_index(self, index: int): """Check if an element index is within our length""" if index >= self.length: raise IndexError("index out of range") def _index(self, index: int): """Transform a possibly-negative index""" if index < 0: self._check_index(abs(index) - 1) return self.length + index return index