Source code for ska_low_sps_tpm_api.boards.fpgaboard

import ast
import glob
import inspect
import logging
import re
import sys
from importlib.machinery import SourceFileLoader
from time import sleep

from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.memory_map import MemoryMap
from ska_low_sps_tpm_api.base.protocol import Protocol
from ska_low_sps_tpm_api.base.spi import SPI
from ska_low_sps_tpm_api.plugins import *

# --------------- Helpers ------------------------------
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock

# ------------------------------------------------------


# Wrap functionality for a TPM board
[docs] class FPGABoard(object): """Class which wraps LMC functionality for generic FPGA boards""" # Class constructor
[docs] def __init__(self, logger=None, **kwargs): """Class constructor for FPGABoard""" # Initialise logging (use default logger which can be set externally) if logger is None: self._logger = logging.getLogger("dummy") ch = logging.StreamHandler() ch.setLevel(logging.WARNING) formatter = logging.Formatter( "%(levelname)s\t%(asctime)s\t %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) ch.setFormatter(formatter) self._logger.addHandler(ch) else: self._logger = logger # Set defaults self.status = {Device.Board: Status.NotConnected} self._programmed = { Device.Board: False, Device.FPGA_1: False, Device.FPGA_2: False, } self._firmware_list = None self._fpga_board = 0 self._connected = False self._string_id = "Board" self._ip = None self._port = None # Override to make this compatible with IPython self.__methods__ = None self.trait_names = None self._getAttributeNames = None self.__members__ = None # Initialise list of available and loaded plugins self._available_plugins = {} self._loaded_plugins = {} # Used by __setattr__ to know how to handle new attributes self.__initialised = True # Check if FPGA board type is specified self._fpga_board = kwargs.get("fpgaBoard", None) if self._fpga_board is None: raise LibraryError("No BoarMake specified in FPGABoard initialiser") # A protocol class must be specified by the subclass such that a protocol instance can be created if "protocol" not in list(kwargs.keys()): raise LibraryError("No protocol class specified in FPGABoard initialiser") # Memory map which keeps track of board registers self.memory_map = MemoryMap(self._logger) # Sanity checks on protocol protocol = kwargs.get("protocol") if not (inspect.isclass(protocol) and issubclass(protocol, Protocol)): raise LibraryError("Protocol argument is invalid") # Create protocol instance self._protocol = protocol(self._fpga_board, logger=logger) # Placeholder for SPI device information object self._spi_devices = None # Function which will generate a list of all subclasses (and subsubclasses etc...) # which inherit from a specified class def _all_subclasses(cls): return set(cls.__subclasses__()).union( [s for c in cls.__subclasses__() for s in _all_subclasses(c)] ) # Store a list of all plugin classes self._all_available_plugins = _all_subclasses( sys.modules["ska_low_sps_tpm_api.plugins"].FirmwareBlock ) # Get list of available plugins which are compatible with board instance # noinspection PyUnresolvedReferences for plugin in self._all_available_plugins: try: constr = eval(plugin.__name__).__init__.__dict__ friendly_name = plugin.__name__ if "_friendly_name" in constr: friendly_name = constr["_friendly_name"] if "_compatible_boards" in constr: self._available_plugins[plugin.__name__] = friendly_name except: # This might fail when multiple threaded instances are loading a plugin directory at the same time pass
# ----------------------------- High-level functionality -------------------------
[docs] def initialise(self, config): """ Method for explicit initialisation. This is called by instrument when a configuration file is provided. :param config: Configuration dictionary """ self._string_id = config["id"] # Configure logging if "log" in config and eval(config["log"]): self._logger = logging.getLogger() # Get default logger else: self._logger = logging.getLogger("dummy") # Check if board is already connected, and if not, connect if not self._connected: if "ip" not in config and "port" not in config: raise LibraryError("IP and port are required for initialisation") self.connect(config["ip"], int(config["port"])) # Check if firmware was defined in config if "firmware" not in config: raise BoardError("Firmware must be specified in configuration file") # Load defined firmware self.load_firmware(Device.FPGA_1, config["firmware"]) # Load plugins if not already loaded if len(self._loaded_plugins) == 0: # Check if any plugins are required if "plugins" in config: for k, v in config["plugins"].items(): # Load and initialise plugins self.load_plugin(k) getattr(self, self._available_plugins[k]).initialise(**v)
[docs] def status_check(self): """Perform board and firmware status checks :return: Status """ # Run generic board tests status = self.get_status() if status is not Status.OK: return status # Loop over all plugins and perform checks if not all( [ getattr(self, plugin).status_check() == Status.OK for plugin in self._loaded_plugins ] ): return Status.FirmwareError # All check successful, return return Status.OK
# -------------------------- Firmware plugin functionality -----------------------
[docs] def add_plugin_directory(self, path): """ Load additional plugins from an external directory. :param path: Directory where plugins are available """ # Go through directory and detect python files for f in glob.glob("{}/*.py".format(path)): # Get list of classes in file try: classes = [ node.name for node in ast.walk(ast.parse(open(f).read())) if isinstance(node, ast.ClassDef) ] if len(classes) == 0: continue # Import the module to access the class module = SourceFileLoader("test_plugin", f).load_module() # Loop through all classes in file and check whether they are a subclass of FirmwareBlock for cls in classes: # Check if it is a subclass of any of the available plugin classes # or the base class itself current_class = getattr(module, cls) if issubclass(current_class, FirmwareBlock) or any( [ issubclass(current_class, cls) for cls in self._all_available_plugins ] ): # We have found a plugin, process it obj = getattr(module, cls) constr = obj.__init__.__dict__ friendly_name = cls if "_friendly_name" in constr: friendly_name = constr["_friendly_name"] if "_compatible_boards" in constr: self._available_plugins[cls] = friendly_name # Add class to board object globals()[cls] = obj self._logger.debug("Found external plugin '{}'".format(cls)) except Exception: self._logger.warning( "Error occured while examining {}. Ignoring".format(f) )
[docs] def load_plugin(self, plugin, **kwargs): """ Loads a firmware block plugin and incorporates its functionality. :param plugin: Plugin class name """ # Check if module is available if plugin not in list(self._available_plugins.keys()): raise LibraryError("Module %s is not available" % plugin) # Check if plugin is compatible with board make constr = eval(plugin).__init__.__dict__ if "_compatible_boards" in constr: if self._fpga_board not in constr["_compatible_boards"]: raise LibraryError( "Plugin %s is not compatible with %s" % (plugin, self._fpga_board) ) else: self._logger.warn( self.log("Plugin %s does not specify board compatability" % plugin) ) # Check if friendly name was defined for this plugin friendly_name = plugin if "_friendly_name" not in constr: self._logger.warn( self.log("Plugin %s does not specify a friendly name" % plugin) ) else: friendly_name = constr["_friendly_name"] # Check if number of plugin instances has been exceeded max_instances = 1 if "_max_instances" not in constr: self._logger.warn( self.log( "Plugin %s does not specify maximum number of instances" % plugin ) ) else: max_instances = constr["_max_instances"] # Count number of instances already loaded # 0 means an unlimited number can be loaded if max_instances > 0: if ( friendly_name in list(self.__dict__.keys()) and len(self.__dict__[friendly_name]) > max_instances ): raise LibraryError("Cannot load more instances on plugin %s" % plugin) # Check if a design name is specified in plugin decorator if "_design" in constr: # A design has been specified, check if it is available on the board available_firmware = self.get_firmware_list() # Check if firmware is available if len(available_firmware) == 0: raise LibraryError("No firmware available on board") if ( type(available_firmware[0]) is str and constr["_design"] not in available_firmware ): raise LibraryError( "Cannot load plugin %s because firmware %s is not available" % (plugin, constr["_design"]) ) elif type(available_firmware[0]) is dict: if constr["_design"] not in [x["design"] for x in available_firmware]: raise LibraryError( "Cannot load plugin %s because firmware %s is not available" % (plugin, constr["_design"]) ) # Loop over all designs with compatible designs compatible_design = None for i, design in enumerate( [x for x in available_firmware if x["design"] == constr["_design"]] ): # Loop over major and minor version numbers match = True for ver, dver in [("_major", "major"), ("_minor", "minor")]: # Check if version information is specified if ver in constr and dver in list(design.keys()): # If major version type is integer, a direct match is required if type(constr[ver]) is int and design[dver] != constr[ver]: match = False # If major version is a string, then a range of version can be defined elif ( type(constr[ver]) is str or isinstance(constr[ver], str) or type(constr[ver]) is type("s") ): if re.match("[<>=]+\d+", constr[ver]): if not eval(str(design[dver]) + constr[ver]): match = False elif re.match("\d+", constr[ver]): if int(constr[ver]) != design[dver]: match = False else: raise LibraryError( "Invalid plugin %s %s specification (%s)" % (plugin, dver, constr[ver]) ) else: raise LibraryError( "Invalid plugin %s %s specification (%s)" % (plugin, dver, str(constr[ver])) ) # If match is true, then the current design is compatible with plugin requirements if match: compatible_design = design break # Check if a compatible design was found if compatible_design is not None: if "device" not in list(kwargs.keys()): raise LibraryError( "Plugin %s with firmware association requires a device argument" % plugin ) else: # If no compatible design is found, raise error raise LibraryError( "No compatible firmware design %s for plugin %s available on board" % (constr["_design"], plugin) ) methods = [name for name, mtype in inspect.getmembers(eval(plugin), predicate=inspect.ismethod) if name not in [a for a, b in inspect.getmembers(FirmwareBlock, predicate=inspect.ismethod)] and not name.startswith('_')] # fmt: skip # Create plugin instances, passing arguments if provided if len(kwargs) == 0: instance = globals()[plugin](self, logger=self._logger) else: instance = globals()[plugin](self, logger=self._logger, **kwargs) if friendly_name in list(self.__dict__.keys()): # Plugin already loaded once, add to list self.__dict__[friendly_name].append(instance) else: # Plugin not loaded yet self.__dict__[friendly_name] = PluginList((instance,)) # Plugin loaded, add to list self._loaded_plugins[friendly_name] = [] # Some book-keeping for method in methods: self._loaded_plugins[friendly_name].append(method) self._logger.debug(self.log("Added plugin %s to class instance" % plugin)) return self.__dict__[friendly_name][-1]
[docs] def unload_plugin(self, plugin, instance=None): """ Unload plugin from instance. :param plugin: Plugin name :param instance: Unload a specific plugin instance """ # Check if plugin has been loaded if plugin in list(self._loaded_plugins.keys()): # If no instance is specified, remove all plugin instances if instance is None: del self.__dict__[plugin] elif type(instance) is int and len(getattr(self, plugin)) >= instance: getattr(self, plugin).remove(getattr(self, plugin)[instance]) else: self._logger.info( self.log( "Plugin %s instance %d does not exist" % (plugin, instance) ) ) else: self._logger.info(self.log("Plugin %s was not loaded." % plugin))
[docs] def unload_all_plugins(self): """Unload all plugins from instance""" for plugin in list(self._loaded_plugins.keys()): del self.__dict__[plugin] self._loaded_plugins = {}
[docs] def get_available_plugins(self): """ Get list of available plugins. :return: List of plugins """ return self._available_plugins
[docs] def get_loaded_plugins(self): """ Get the list of loaded plugins with associated methods. :return: List of loaded plugins """ return self._loaded_plugins
# ---------------------------- FPGA Board functionality --------------------------
[docs] def connect(self, ip, port, **kwargs): """ Connect to board. :param ip: Board IP :param port: Port to connect to :param kwargs: Additional parameters if required """ # Check if IP is valid, and if a hostname is provided, check whether it # exists and get IP address import socket try: socket.inet_aton(ip) except socket.error: try: ip = socket.gethostbyname(ip) except socket.gaierror: raise BoardError( "Provided IP address (%s) is invalid or does not exist" ) self._ip = ip self._port = port # Check if we are requesting to bind UCP socket to specific IP, needed for broadcast self._src_ip = kwargs.get("src_ip", None) # Connect to board if not self._protocol.create_connection(ip, port, self._src_ip): self.status[Device.Board] = Status.NetworkError raise BoardError("Could not connect to board with ip {}".format(ip)) else: self._logger.info(self.log("Connected to board {}".format(ip))) self.status[Device.Board] = Status.OK self._connected = True
[docs] def disconnect(self): """Disconnect from board""" # Check if board is connected if not self._connected: self._logger.warn( self.log("Call disconnect on board which was not connected") ) if self._protocol.close_connection(): # Clear memory map and spi list if self.memory_map is not None: self.memory_map.clear() if self._spi_devices is not None: self._spi_devices.clear() # Unload all plugins self.unload_all_plugins() # Clear programmed dictionary for device in list(self._programmed.keys()): self._programmed[device] = False self._connected = False self._logger.info( self.log("Disconnected from board with IP {}".format(self._ip)) )
[docs] def reset(self, device): """ Reset device on board. :param device: Device on board to reset """ pass
[docs] def get_status(self): """ Get board status. :return: Status """ return self.status
[docs] def get_firmware_list(self, device=Device.Board): """ Get list of firmware on board. :param device: Device on board to get list of firmware :return: List of firmware """ # Check if board is connected if not self._connected: raise LibraryError( "Call get_firmware_list for board which is not connected" ) # Call getFirmware on board self._firmware_list = self._protocol.list_firmware() return self._firmware_list
[docs] def load_firmware( self, device, register_string=None, load_values=False, base_address=0 ): """ Blocking call to load firmware. :param base_address: base address at which to load firmware :param device: Device on board to load firmware to :param register_string: String containing register information :param load_values: Load register values """ # Check if connected if not self._connected: raise LibraryError("Not connected to board, cannot load firmware") # Superclass method required filepath to be not null if register_string is None: raise LibraryError("Default load_memory_map requires a register_string") # Check if device argument is of type Device if not type(device) is Device: raise LibraryError( "Device argument for load_firmware should be of type Device" ) # All OK, call function self.status[device] = Status.LoadingFirmware try: self.memory_map.update_memory_map(register_string, device, base_address) self._programmed[device] = True self.status[device] = Status.OK self.get_register_list(load_values=load_values, reset=True) self._logger.debug(self.log("Successfully loaded memory map")) except Exception as e: self._programmed[device] = False self.status[device] = Status.LoadingFirmwareError raise BoardError("load_firmware failed on board: {}".format(e))
[docs] def download_firmware(self, device, bitfile): """ Download firmware onto the FPGA (or FLASH). :param device: Device to download firmware to """ raise LibraryError("Download firmware not implemented")
[docs] def get_register_list(self, reset=False, load_values=False): """ Get list of registers. :param reset: Force reload register list :param load_values: Load register values """ # Check if register list has already been acquired, and if so return it if self.memory_map is not None and not reset: return self.memory_map.register_list # Check if device is programmed if not self._programmed[Device.Board]: raise LibraryError( "Cannot get_register_list from board which has not been programmed" ) # If we need to load values, do so if load_values: for r in list(self.memory_map.register_list.values()): r.value = self.read_register(r.name, r.size, r.offset) # All done, return return self.memory_map.register_list
[docs] def read_register(self, register, n=1, offset=0): """ " Get register value. :param register: Register name :param n: Number of words to read :param offset: Memory address offset to read from :return: Values """ try: # Get register address, check offset and read from board reg_info = self.memory_map[register] if offset + n * 4 > reg_info.address + reg_info.size * 4: raise Exception( "Invalid offset secified for register {}".format(register) ) values = self._protocol.read_register(reg_info.address, n, offset) # Apply bitmask and shift to values for i in range(len(values)): values[i] = (values[i] & reg_info.bitmask) >> reg_info.shift if len(values) == 1: return values[0] else: return values except Exception as e: raise BoardError( "Failed to read_register {} from board: {}".format(register, e) )
[docs] def write_register(self, register, values, offset=0, retry=True): """ Set register value. :param register: Register name :param values: Values to write :param offset: Memory address offset to write to """ try: # Get register address and check offset reg_info = self.memory_map[register] if type(values) is not list: values = [values] if offset + len(values) * 4 > reg_info.address + reg_info.size * 4: raise Exception( "Invalid offset secified for register {}".format(register) ) # Apply shift and bitmask to values for i in range(len(values)): values[i] = (values[i] << reg_info.shift) & reg_info.bitmask # Check if we have to apply bitmask if reg_info.bitmask != 0xFFFFFFFF: read_values = self._protocol.read_register( reg_info.address, len(values), offset ) for i in range(len(read_values)): values[i] = ( read_values[i] & (reg_info.bitmask ^ 0xFFFFFFFF) ) | values[i] # Writes values to register self._protocol.write_register(reg_info.address, values, offset, retry) except Exception as e: raise BoardError( "Failed to write_register {} on board: {}".format(register, e) )
[docs] def read_address(self, address, n=1): """ " Get register value. :param address: Memory address to read from :param n: Number of words to read :return: Values """ # Call function and return try: values = self._protocol.read_register(address, n, 0) if len(values) == 1: return values[0] else: return values except Exception as e: raise BoardError( "Failed to read_address {} on board: {}".format(hex(address), e) )
[docs] def write_address(self, address, values, retry=True): """ Set register value. :param address: Memory address to write to :param values: Values to write """ # Call function and return try: if type(values) is not list: values = [values] self._protocol.write_register(address, values, 0, retry) except Exception as e: import traceback traceback.print_exc() raise BoardError( "Failed to write_address {} on board: {}".format(hex(address), e) )
[docs] def read_device(self, device, address, unlock=True): """ Get device value. :param device: SPI Device to read from :param address: Address on device to read from :return: Value """ # Check if device is in device list if not self._spi_devices.has_device(device): raise LibraryError("Device not found in SPI Device list") spi_device = self._spi_devices[device] self._spi_lock.lock() # Wait for SPI switch to be ready while True: ret = self.read_address(self._spi_devices.cmd_address) if ret & self._spi_devices.cmd_start_mask == 0: break self._spi_lock.refresh() sleep(0.05) # Issue request as an array of values # Address, 0, 0, spi_en, spi_sclk, read operation request = [ address, 0, 0, 1 << spi_device.spi_en, 1 << spi_device.spi_sclk, 0x03, ] self.write_address(self._spi_devices.spi_address, request) # Wait for request to be completed on board while True: ret = self.read_address(self._spi_devices.cmd_address) if ret & self._spi_devices.cmd_start_mask == 0: break self._spi_lock.refresh() sleep(0.05) # Request ready on device, grab data result = self.read_address(self._spi_devices.read_data) & 0xFF if unlock: self._spi_lock.unlock() return result
[docs] def write_device(self, device, address, value, unlock=False): """ Set device value. :param device: SPI device to write to :param address: Address on device to write to :param value: Value to write """ # Check if device is in device list if not self._spi_devices.has_device(device): raise LibraryError("Device not found in SPI Device list") spi_device = self._spi_devices[device] self._spi_lock.lock() # Wait for SPI switch to be ready while True: self._spi_lock.refresh() ret = self.read_address(self._spi_devices.cmd_address) if ret & self._spi_devices.cmd_start_mask == 0: break sleep(0.05) # Issue request as an array of values # Address, value to write, 0, spi_en, spi_sclk, write operation self._spi_lock.refresh() request = [ address, (value & 0xFF) << 8, 0, 1 << spi_device.spi_en, 1 << spi_device.spi_sclk, 0x01, ] self.write_address(self._spi_devices.spi_address, request) # Wait for request to be completed on board while True: self._spi_lock.refresh() ret = self.read_address(self._spi_devices.cmd_address) if ret & self._spi_devices.cmd_start_mask == 0: break sleep(0.05) # All done, return True if unlock: self._spi_lock.unlock() return True
[docs] def load_spi_devices(self, xml_string): """ Load SPI devices. :param xml_string: XML string containing SPI device information """ # Check if memory map includes SPI information if not self.memory_map.has_register("board.spi"): raise LibraryError("No SPI information present in memory map") # Parse XML string self._spi_devices = SPI(xml_string) # Populate general SPI properties # fmt: off self._spi_devices.spi_address = self.memory_map["board.spi.address"].address self._spi_devices.spi_address_mask = self.memory_map["board.spi.address"].bitmask self._spi_devices.write_data = self.memory_map["board.spi.write_data"].address self._spi_devices.write_data_mask = self.memory_map["board.spi.write_data"].bitmask self._spi_devices.read_data = self.memory_map["board.spi.read_data"].address self._spi_devices.read_data_mask = self.memory_map["board.spi.read_data"].bitmask self._spi_devices.chip_select = self.memory_map["board.spi.chip_select"].address self._spi_devices.chip_select_mask = self.memory_map["board.spi.chip_select"].bitmask self._spi_devices.sclk = self.memory_map["board.spi.sclk"].address self._spi_devices.sclk_mask = self.memory_map["board.spi.sclk"].bitmask self._spi_devices.cmd_address = self.memory_map["board.spi.cmd"].address self._spi_devices.cmd_start_mask = self.memory_map["board.spi.cmd.start"].bitmask self._spi_devices.cmd_rnw_mask = self.memory_map["board.spi.cmd.rnw"].bitmask
# fmt: on
[docs] def is_programmed(self): """Returns True if Board is programmed""" return any(self._programmed.values())
[docs] def list_register_names(self): """Print list of register names""" if not self._programmed[Device.Board]: return # Run checks if not self._checks(): return # Split register list into devices registers = {} for k, v in self.memory_map.items(): if v["device"] not in list(registers.keys()): registers[v["device"]] = [] registers[v["device"]].append(k) # Loop over all devices for k, v in registers.items(): print(DeviceNames[k]) print("-" * len(DeviceNames[k])) for regname in sorted(v): print("\t" + str(regname))
[docs] def list_device_names(self): """Print list of SPI device names""" # Run check if not self._checks(): return # Loop over all SPI devices print("List of SPI Devices") print("-------------------") for k in list(self._spi_devices.spi_map.keys()): print(k)
[docs] def has_register(self, register): """ Check if board has specified register. :param register: Register name :return: True if register exists in memory map """ return self.memory_map.has_register(register)
[docs] def find_register(self, string, display=False, info=False): """ Return register information for provided search string. :param string: Regular expression to search against :param display: True to output result to console :return: List of found registers """ # Run checks if not self._checks(): return # Go through all registers and store the name of registers # which generate a match matches = [] for k, v in self.memory_map.register_list.items(): if re.search(string, k) is not None: matches.append(v) # Display to screen if required if display: string = "\n" if info: for v in sorted(matches, key=lambda l: l.name): string += "%s:\n%s\n" % (v.name, "-" * len(v.name)) string += "Address:\t\t%s\n" % (hex(v.address)) string += "Type:\t\t\t%s\n" % str(v.type) string += "Device:\t\t\t%s\n" % str(v.device) string += "Permission:\t\t%s\n" % str(v.permission) string += "Bitmask:\t\t0x%X\n" % v.bitmask string += "Bits:\t\t\t%d\n" % v.bits string += "Size:\t\t\t%d\n" % v.size string += "Description:\t%s\n\n" % v.desc else: for v in sorted(matches, key=lambda l: l.name): string += "%s\n" % v.name print(string) if not display: return matches
[docs] def find_device(self, string, display=False): """ Return SPI device information for provided search string. :param string: Regular expression to search against :param display: True to output result to console :return: List of found devices """ # Run check if not self._checks(): return # Loop over all devices matches = [] for k, v in self._spi_devices.spi_map: if re.match(string, k) is not None: matches.append(v) # Display to screen if required if display: string = "\n" for v in sorted(matches, key=lambda l: l["name"]): string += "Name: %s, spi_sclk: %d, spi_en: %d\n" % ( v["name"], v["spi_sclk"], v["spi_en"], ) print(string) if not display: return matches
def __len__(self): """Override __len__, return number of registers""" return len(self.memory_map)
[docs] def is_connected(self): """Check if board is connected""" return self._connected
def _checks(self, device=Device.Board): """Check prior to function calls""" # Check if board is connected # Check if connected if not self._connected: raise LibraryError("Cannot perform operation on unconnected board") # Check if device is programmed if device is not None and not self._programmed[device]: raise LibraryError( "Cannot get memory map from board which has not been programmed" ) # Check if register list has been populated if self.memory_map is None: self.get_register_list() return True
[docs] def log(self, string): """ Format string for logging output. :param string: String to log :return: Formatted string """ return "%s (%s)" % (string, self._string_id)