import ast
import glob
import inspect
import logging
import re
import sys
from importlib.machinery import SourceFileLoader
from time import sleep
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.memory_map import MemoryMap
from ska_low_sps_tpm_api.base.protocol import Protocol
from ska_low_sps_tpm_api.base.spi import SPI
from ska_low_sps_tpm_api.plugins import *
# --------------- Helpers ------------------------------
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
# ------------------------------------------------------
# Wrap functionality for a TPM board
[docs]
class FPGABoard(object):
"""Class which wraps LMC functionality for generic FPGA boards"""
# Class constructor
[docs]
def __init__(self, logger=None, **kwargs):
"""Class constructor for FPGABoard"""
# Initialise logging (use default logger which can be set externally)
if logger is None:
self._logger = logging.getLogger("dummy")
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)
formatter = logging.Formatter(
"%(levelname)s\t%(asctime)s\t %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
ch.setFormatter(formatter)
self._logger.addHandler(ch)
else:
self._logger = logger
# Set defaults
self.status = {Device.Board: Status.NotConnected}
self._programmed = {
Device.Board: False,
Device.FPGA_1: False,
Device.FPGA_2: False,
}
self._firmware_list = None
self._fpga_board = 0
self._connected = False
self._string_id = "Board"
self._ip = None
self._port = None
# Override to make this compatible with IPython
self.__methods__ = None
self.trait_names = None
self._getAttributeNames = None
self.__members__ = None
# Initialise list of available and loaded plugins
self._available_plugins = {}
self._loaded_plugins = {}
# Used by __setattr__ to know how to handle new attributes
self.__initialised = True
# Check if FPGA board type is specified
self._fpga_board = kwargs.get("fpgaBoard", None)
if self._fpga_board is None:
raise LibraryError("No BoarMake specified in FPGABoard initialiser")
# A protocol class must be specified by the subclass such that a protocol instance can be created
if "protocol" not in list(kwargs.keys()):
raise LibraryError("No protocol class specified in FPGABoard initialiser")
# Memory map which keeps track of board registers
self.memory_map = MemoryMap(self._logger)
# Sanity checks on protocol
protocol = kwargs.get("protocol")
if not (inspect.isclass(protocol) and issubclass(protocol, Protocol)):
raise LibraryError("Protocol argument is invalid")
# Create protocol instance
self._protocol = protocol(self._fpga_board, logger=logger)
# Placeholder for SPI device information object
self._spi_devices = None
# Function which will generate a list of all subclasses (and subsubclasses etc...)
# which inherit from a specified class
def _all_subclasses(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in _all_subclasses(c)]
)
# Store a list of all plugin classes
self._all_available_plugins = _all_subclasses(
sys.modules["ska_low_sps_tpm_api.plugins"].FirmwareBlock
)
# Get list of available plugins which are compatible with board instance
# noinspection PyUnresolvedReferences
for plugin in self._all_available_plugins:
try:
constr = eval(plugin.__name__).__init__.__dict__
friendly_name = plugin.__name__
if "_friendly_name" in constr:
friendly_name = constr["_friendly_name"]
if "_compatible_boards" in constr:
self._available_plugins[plugin.__name__] = friendly_name
except:
# This might fail when multiple threaded instances are loading a plugin directory at the same time
pass
# ----------------------------- High-level functionality -------------------------
[docs]
def initialise(self, config):
"""
Method for explicit initialisation. This is called by instrument when
a configuration file is provided.
:param config: Configuration dictionary
"""
self._string_id = config["id"]
# Configure logging
if "log" in config and eval(config["log"]):
self._logger = logging.getLogger() # Get default logger
else:
self._logger = logging.getLogger("dummy")
# Check if board is already connected, and if not, connect
if not self._connected:
if "ip" not in config and "port" not in config:
raise LibraryError("IP and port are required for initialisation")
self.connect(config["ip"], int(config["port"]))
# Check if firmware was defined in config
if "firmware" not in config:
raise BoardError("Firmware must be specified in configuration file")
# Load defined firmware
self.load_firmware(Device.FPGA_1, config["firmware"])
# Load plugins if not already loaded
if len(self._loaded_plugins) == 0:
# Check if any plugins are required
if "plugins" in config:
for k, v in config["plugins"].items():
# Load and initialise plugins
self.load_plugin(k)
getattr(self, self._available_plugins[k]).initialise(**v)
[docs]
def status_check(self):
"""Perform board and firmware status checks
:return: Status
"""
# Run generic board tests
status = self.get_status()
if status is not Status.OK:
return status
# Loop over all plugins and perform checks
if not all(
[
getattr(self, plugin).status_check() == Status.OK
for plugin in self._loaded_plugins
]
):
return Status.FirmwareError
# All check successful, return
return Status.OK
# -------------------------- Firmware plugin functionality -----------------------
[docs]
def add_plugin_directory(self, path):
"""
Load additional plugins from an external directory.
:param path: Directory where plugins are available
"""
# Go through directory and detect python files
for f in glob.glob("{}/*.py".format(path)):
# Get list of classes in file
try:
classes = [
node.name
for node in ast.walk(ast.parse(open(f).read()))
if isinstance(node, ast.ClassDef)
]
if len(classes) == 0:
continue
# Import the module to access the class
module = SourceFileLoader("test_plugin", f).load_module()
# Loop through all classes in file and check whether they are a subclass of FirmwareBlock
for cls in classes:
# Check if it is a subclass of any of the available plugin classes
# or the base class itself
current_class = getattr(module, cls)
if issubclass(current_class, FirmwareBlock) or any(
[
issubclass(current_class, cls)
for cls in self._all_available_plugins
]
):
# We have found a plugin, process it
obj = getattr(module, cls)
constr = obj.__init__.__dict__
friendly_name = cls
if "_friendly_name" in constr:
friendly_name = constr["_friendly_name"]
if "_compatible_boards" in constr:
self._available_plugins[cls] = friendly_name
# Add class to board object
globals()[cls] = obj
self._logger.debug("Found external plugin '{}'".format(cls))
except Exception:
self._logger.warning(
"Error occured while examining {}. Ignoring".format(f)
)
[docs]
def load_plugin(self, plugin, **kwargs):
"""
Loads a firmware block plugin and incorporates its functionality.
:param plugin: Plugin class name
"""
# Check if module is available
if plugin not in list(self._available_plugins.keys()):
raise LibraryError("Module %s is not available" % plugin)
# Check if plugin is compatible with board make
constr = eval(plugin).__init__.__dict__
if "_compatible_boards" in constr:
if self._fpga_board not in constr["_compatible_boards"]:
raise LibraryError(
"Plugin %s is not compatible with %s" % (plugin, self._fpga_board)
)
else:
self._logger.warn(
self.log("Plugin %s does not specify board compatability" % plugin)
)
# Check if friendly name was defined for this plugin
friendly_name = plugin
if "_friendly_name" not in constr:
self._logger.warn(
self.log("Plugin %s does not specify a friendly name" % plugin)
)
else:
friendly_name = constr["_friendly_name"]
# Check if number of plugin instances has been exceeded
max_instances = 1
if "_max_instances" not in constr:
self._logger.warn(
self.log(
"Plugin %s does not specify maximum number of instances" % plugin
)
)
else:
max_instances = constr["_max_instances"]
# Count number of instances already loaded
# 0 means an unlimited number can be loaded
if max_instances > 0:
if (
friendly_name in list(self.__dict__.keys())
and len(self.__dict__[friendly_name]) > max_instances
):
raise LibraryError("Cannot load more instances on plugin %s" % plugin)
# Check if a design name is specified in plugin decorator
if "_design" in constr:
# A design has been specified, check if it is available on the board
available_firmware = self.get_firmware_list()
# Check if firmware is available
if len(available_firmware) == 0:
raise LibraryError("No firmware available on board")
if (
type(available_firmware[0]) is str
and constr["_design"] not in available_firmware
):
raise LibraryError(
"Cannot load plugin %s because firmware %s is not available"
% (plugin, constr["_design"])
)
elif type(available_firmware[0]) is dict:
if constr["_design"] not in [x["design"] for x in available_firmware]:
raise LibraryError(
"Cannot load plugin %s because firmware %s is not available"
% (plugin, constr["_design"])
)
# Loop over all designs with compatible designs
compatible_design = None
for i, design in enumerate(
[x for x in available_firmware if x["design"] == constr["_design"]]
):
# Loop over major and minor version numbers
match = True
for ver, dver in [("_major", "major"), ("_minor", "minor")]:
# Check if version information is specified
if ver in constr and dver in list(design.keys()):
# If major version type is integer, a direct match is required
if type(constr[ver]) is int and design[dver] != constr[ver]:
match = False
# If major version is a string, then a range of version can be defined
elif (
type(constr[ver]) is str
or isinstance(constr[ver], str)
or type(constr[ver]) is type("s")
):
if re.match("[<>=]+\d+", constr[ver]):
if not eval(str(design[dver]) + constr[ver]):
match = False
elif re.match("\d+", constr[ver]):
if int(constr[ver]) != design[dver]:
match = False
else:
raise LibraryError(
"Invalid plugin %s %s specification (%s)"
% (plugin, dver, constr[ver])
)
else:
raise LibraryError(
"Invalid plugin %s %s specification (%s)"
% (plugin, dver, str(constr[ver]))
)
# If match is true, then the current design is compatible with plugin requirements
if match:
compatible_design = design
break
# Check if a compatible design was found
if compatible_design is not None:
if "device" not in list(kwargs.keys()):
raise LibraryError(
"Plugin %s with firmware association requires a device argument"
% plugin
)
else:
# If no compatible design is found, raise error
raise LibraryError(
"No compatible firmware design %s for plugin %s available on board"
% (constr["_design"], plugin)
)
methods = [name for name, mtype in
inspect.getmembers(eval(plugin), predicate=inspect.ismethod)
if name not in
[a for a, b in inspect.getmembers(FirmwareBlock, predicate=inspect.ismethod)]
and not name.startswith('_')] # fmt: skip
# Create plugin instances, passing arguments if provided
if len(kwargs) == 0:
instance = globals()[plugin](self, logger=self._logger)
else:
instance = globals()[plugin](self, logger=self._logger, **kwargs)
if friendly_name in list(self.__dict__.keys()):
# Plugin already loaded once, add to list
self.__dict__[friendly_name].append(instance)
else:
# Plugin not loaded yet
self.__dict__[friendly_name] = PluginList((instance,))
# Plugin loaded, add to list
self._loaded_plugins[friendly_name] = []
# Some book-keeping
for method in methods:
self._loaded_plugins[friendly_name].append(method)
self._logger.debug(self.log("Added plugin %s to class instance" % plugin))
return self.__dict__[friendly_name][-1]
[docs]
def unload_plugin(self, plugin, instance=None):
"""
Unload plugin from instance.
:param plugin: Plugin name
:param instance: Unload a specific plugin instance
"""
# Check if plugin has been loaded
if plugin in list(self._loaded_plugins.keys()):
# If no instance is specified, remove all plugin instances
if instance is None:
del self.__dict__[plugin]
elif type(instance) is int and len(getattr(self, plugin)) >= instance:
getattr(self, plugin).remove(getattr(self, plugin)[instance])
else:
self._logger.info(
self.log(
"Plugin %s instance %d does not exist" % (plugin, instance)
)
)
else:
self._logger.info(self.log("Plugin %s was not loaded." % plugin))
[docs]
def unload_all_plugins(self):
"""Unload all plugins from instance"""
for plugin in list(self._loaded_plugins.keys()):
del self.__dict__[plugin]
self._loaded_plugins = {}
[docs]
def get_available_plugins(self):
"""
Get list of available plugins.
:return: List of plugins
"""
return self._available_plugins
[docs]
def get_loaded_plugins(self):
"""
Get the list of loaded plugins with associated methods.
:return: List of loaded plugins
"""
return self._loaded_plugins
# ---------------------------- FPGA Board functionality --------------------------
[docs]
def connect(self, ip, port, **kwargs):
"""
Connect to board.
:param ip: Board IP
:param port: Port to connect to
:param kwargs: Additional parameters if required
"""
# Check if IP is valid, and if a hostname is provided, check whether it
# exists and get IP address
import socket
try:
socket.inet_aton(ip)
except socket.error:
try:
ip = socket.gethostbyname(ip)
except socket.gaierror:
raise BoardError(
"Provided IP address (%s) is invalid or does not exist"
)
self._ip = ip
self._port = port
# Check if we are requesting to bind UCP socket to specific IP, needed for broadcast
self._src_ip = kwargs.get("src_ip", None)
# Connect to board
if not self._protocol.create_connection(ip, port, self._src_ip):
self.status[Device.Board] = Status.NetworkError
raise BoardError("Could not connect to board with ip {}".format(ip))
else:
self._logger.info(self.log("Connected to board {}".format(ip)))
self.status[Device.Board] = Status.OK
self._connected = True
[docs]
def disconnect(self):
"""Disconnect from board"""
# Check if board is connected
if not self._connected:
self._logger.warn(
self.log("Call disconnect on board which was not connected")
)
if self._protocol.close_connection():
# Clear memory map and spi list
if self.memory_map is not None:
self.memory_map.clear()
if self._spi_devices is not None:
self._spi_devices.clear()
# Unload all plugins
self.unload_all_plugins()
# Clear programmed dictionary
for device in list(self._programmed.keys()):
self._programmed[device] = False
self._connected = False
self._logger.info(
self.log("Disconnected from board with IP {}".format(self._ip))
)
[docs]
def reset(self, device):
"""
Reset device on board.
:param device: Device on board to reset
"""
pass
[docs]
def get_status(self):
"""
Get board status.
:return: Status
"""
return self.status
[docs]
def get_firmware_list(self, device=Device.Board):
"""
Get list of firmware on board.
:param device: Device on board to get list of firmware
:return: List of firmware
"""
# Check if board is connected
if not self._connected:
raise LibraryError(
"Call get_firmware_list for board which is not connected"
)
# Call getFirmware on board
self._firmware_list = self._protocol.list_firmware()
return self._firmware_list
[docs]
def load_firmware(
self, device, register_string=None, load_values=False, base_address=0
):
"""
Blocking call to load firmware.
:param base_address: base address at which to load firmware
:param device: Device on board to load firmware to
:param register_string: String containing register information
:param load_values: Load register values
"""
# Check if connected
if not self._connected:
raise LibraryError("Not connected to board, cannot load firmware")
# Superclass method required filepath to be not null
if register_string is None:
raise LibraryError("Default load_memory_map requires a register_string")
# Check if device argument is of type Device
if not type(device) is Device:
raise LibraryError(
"Device argument for load_firmware should be of type Device"
)
# All OK, call function
self.status[device] = Status.LoadingFirmware
try:
self.memory_map.update_memory_map(register_string, device, base_address)
self._programmed[device] = True
self.status[device] = Status.OK
self.get_register_list(load_values=load_values, reset=True)
self._logger.debug(self.log("Successfully loaded memory map"))
except Exception as e:
self._programmed[device] = False
self.status[device] = Status.LoadingFirmwareError
raise BoardError("load_firmware failed on board: {}".format(e))
[docs]
def download_firmware(self, device, bitfile):
"""
Download firmware onto the FPGA (or FLASH).
:param device: Device to download firmware to
"""
raise LibraryError("Download firmware not implemented")
[docs]
def get_register_list(self, reset=False, load_values=False):
"""
Get list of registers.
:param reset: Force reload register list
:param load_values: Load register values
"""
# Check if register list has already been acquired, and if so return it
if self.memory_map is not None and not reset:
return self.memory_map.register_list
# Check if device is programmed
if not self._programmed[Device.Board]:
raise LibraryError(
"Cannot get_register_list from board which has not been programmed"
)
# If we need to load values, do so
if load_values:
for r in list(self.memory_map.register_list.values()):
r.value = self.read_register(r.name, r.size, r.offset)
# All done, return
return self.memory_map.register_list
[docs]
def read_register(self, register, n=1, offset=0):
""" "
Get register value.
:param register: Register name
:param n: Number of words to read
:param offset: Memory address offset to read from
:return: Values
"""
try:
# Get register address, check offset and read from board
reg_info = self.memory_map[register]
if offset + n * 4 > reg_info.address + reg_info.size * 4:
raise Exception(
"Invalid offset secified for register {}".format(register)
)
values = self._protocol.read_register(reg_info.address, n, offset)
# Apply bitmask and shift to values
for i in range(len(values)):
values[i] = (values[i] & reg_info.bitmask) >> reg_info.shift
if len(values) == 1:
return values[0]
else:
return values
except Exception as e:
raise BoardError(
"Failed to read_register {} from board: {}".format(register, e)
)
[docs]
def write_register(self, register, values, offset=0, retry=True):
"""
Set register value.
:param register: Register name
:param values: Values to write
:param offset: Memory address offset to write to
"""
try:
# Get register address and check offset
reg_info = self.memory_map[register]
if type(values) is not list:
values = [values]
if offset + len(values) * 4 > reg_info.address + reg_info.size * 4:
raise Exception(
"Invalid offset secified for register {}".format(register)
)
# Apply shift and bitmask to values
for i in range(len(values)):
values[i] = (values[i] << reg_info.shift) & reg_info.bitmask
# Check if we have to apply bitmask
if reg_info.bitmask != 0xFFFFFFFF:
read_values = self._protocol.read_register(
reg_info.address, len(values), offset
)
for i in range(len(read_values)):
values[i] = (
read_values[i] & (reg_info.bitmask ^ 0xFFFFFFFF)
) | values[i]
# Writes values to register
self._protocol.write_register(reg_info.address, values, offset, retry)
except Exception as e:
raise BoardError(
"Failed to write_register {} on board: {}".format(register, e)
)
[docs]
def read_address(self, address, n=1):
""" "
Get register value.
:param address: Memory address to read from
:param n: Number of words to read
:return: Values
"""
# Call function and return
try:
values = self._protocol.read_register(address, n, 0)
if len(values) == 1:
return values[0]
else:
return values
except Exception as e:
raise BoardError(
"Failed to read_address {} on board: {}".format(hex(address), e)
)
[docs]
def write_address(self, address, values, retry=True):
"""
Set register value.
:param address: Memory address to write to
:param values: Values to write
"""
# Call function and return
try:
if type(values) is not list:
values = [values]
self._protocol.write_register(address, values, 0, retry)
except Exception as e:
import traceback
traceback.print_exc()
raise BoardError(
"Failed to write_address {} on board: {}".format(hex(address), e)
)
[docs]
def read_device(self, device, address, unlock=True):
"""
Get device value.
:param device: SPI Device to read from
:param address: Address on device to read from
:return: Value
"""
# Check if device is in device list
if not self._spi_devices.has_device(device):
raise LibraryError("Device not found in SPI Device list")
spi_device = self._spi_devices[device]
self._spi_lock.lock()
# Wait for SPI switch to be ready
while True:
ret = self.read_address(self._spi_devices.cmd_address)
if ret & self._spi_devices.cmd_start_mask == 0:
break
self._spi_lock.refresh()
sleep(0.05)
# Issue request as an array of values
# Address, 0, 0, spi_en, spi_sclk, read operation
request = [
address,
0,
0,
1 << spi_device.spi_en,
1 << spi_device.spi_sclk,
0x03,
]
self.write_address(self._spi_devices.spi_address, request)
# Wait for request to be completed on board
while True:
ret = self.read_address(self._spi_devices.cmd_address)
if ret & self._spi_devices.cmd_start_mask == 0:
break
self._spi_lock.refresh()
sleep(0.05)
# Request ready on device, grab data
result = self.read_address(self._spi_devices.read_data) & 0xFF
if unlock:
self._spi_lock.unlock()
return result
[docs]
def write_device(self, device, address, value, unlock=False):
"""
Set device value.
:param device: SPI device to write to
:param address: Address on device to write to
:param value: Value to write
"""
# Check if device is in device list
if not self._spi_devices.has_device(device):
raise LibraryError("Device not found in SPI Device list")
spi_device = self._spi_devices[device]
self._spi_lock.lock()
# Wait for SPI switch to be ready
while True:
self._spi_lock.refresh()
ret = self.read_address(self._spi_devices.cmd_address)
if ret & self._spi_devices.cmd_start_mask == 0:
break
sleep(0.05)
# Issue request as an array of values
# Address, value to write, 0, spi_en, spi_sclk, write operation
self._spi_lock.refresh()
request = [
address,
(value & 0xFF) << 8,
0,
1 << spi_device.spi_en,
1 << spi_device.spi_sclk,
0x01,
]
self.write_address(self._spi_devices.spi_address, request)
# Wait for request to be completed on board
while True:
self._spi_lock.refresh()
ret = self.read_address(self._spi_devices.cmd_address)
if ret & self._spi_devices.cmd_start_mask == 0:
break
sleep(0.05)
# All done, return True
if unlock:
self._spi_lock.unlock()
return True
[docs]
def load_spi_devices(self, xml_string):
"""
Load SPI devices.
:param xml_string: XML string containing SPI device information
"""
# Check if memory map includes SPI information
if not self.memory_map.has_register("board.spi"):
raise LibraryError("No SPI information present in memory map")
# Parse XML string
self._spi_devices = SPI(xml_string)
# Populate general SPI properties
# fmt: off
self._spi_devices.spi_address = self.memory_map["board.spi.address"].address
self._spi_devices.spi_address_mask = self.memory_map["board.spi.address"].bitmask
self._spi_devices.write_data = self.memory_map["board.spi.write_data"].address
self._spi_devices.write_data_mask = self.memory_map["board.spi.write_data"].bitmask
self._spi_devices.read_data = self.memory_map["board.spi.read_data"].address
self._spi_devices.read_data_mask = self.memory_map["board.spi.read_data"].bitmask
self._spi_devices.chip_select = self.memory_map["board.spi.chip_select"].address
self._spi_devices.chip_select_mask = self.memory_map["board.spi.chip_select"].bitmask
self._spi_devices.sclk = self.memory_map["board.spi.sclk"].address
self._spi_devices.sclk_mask = self.memory_map["board.spi.sclk"].bitmask
self._spi_devices.cmd_address = self.memory_map["board.spi.cmd"].address
self._spi_devices.cmd_start_mask = self.memory_map["board.spi.cmd.start"].bitmask
self._spi_devices.cmd_rnw_mask = self.memory_map["board.spi.cmd.rnw"].bitmask
# fmt: on
[docs]
def is_programmed(self):
"""Returns True if Board is programmed"""
return any(self._programmed.values())
[docs]
def list_register_names(self):
"""Print list of register names"""
if not self._programmed[Device.Board]:
return
# Run checks
if not self._checks():
return
# Split register list into devices
registers = {}
for k, v in self.memory_map.items():
if v["device"] not in list(registers.keys()):
registers[v["device"]] = []
registers[v["device"]].append(k)
# Loop over all devices
for k, v in registers.items():
print(DeviceNames[k])
print("-" * len(DeviceNames[k]))
for regname in sorted(v):
print("\t" + str(regname))
[docs]
def list_device_names(self):
"""Print list of SPI device names"""
# Run check
if not self._checks():
return
# Loop over all SPI devices
print("List of SPI Devices")
print("-------------------")
for k in list(self._spi_devices.spi_map.keys()):
print(k)
[docs]
def has_register(self, register):
"""
Check if board has specified register.
:param register: Register name
:return: True if register exists in memory map
"""
return self.memory_map.has_register(register)
[docs]
def find_register(self, string, display=False, info=False):
"""
Return register information for provided search string.
:param string: Regular expression to search against
:param display: True to output result to console
:return: List of found registers
"""
# Run checks
if not self._checks():
return
# Go through all registers and store the name of registers
# which generate a match
matches = []
for k, v in self.memory_map.register_list.items():
if re.search(string, k) is not None:
matches.append(v)
# Display to screen if required
if display:
string = "\n"
if info:
for v in sorted(matches, key=lambda l: l.name):
string += "%s:\n%s\n" % (v.name, "-" * len(v.name))
string += "Address:\t\t%s\n" % (hex(v.address))
string += "Type:\t\t\t%s\n" % str(v.type)
string += "Device:\t\t\t%s\n" % str(v.device)
string += "Permission:\t\t%s\n" % str(v.permission)
string += "Bitmask:\t\t0x%X\n" % v.bitmask
string += "Bits:\t\t\t%d\n" % v.bits
string += "Size:\t\t\t%d\n" % v.size
string += "Description:\t%s\n\n" % v.desc
else:
for v in sorted(matches, key=lambda l: l.name):
string += "%s\n" % v.name
print(string)
if not display:
return matches
[docs]
def find_device(self, string, display=False):
"""
Return SPI device information for provided search string.
:param string: Regular expression to search against
:param display: True to output result to console
:return: List of found devices
"""
# Run check
if not self._checks():
return
# Loop over all devices
matches = []
for k, v in self._spi_devices.spi_map:
if re.match(string, k) is not None:
matches.append(v)
# Display to screen if required
if display:
string = "\n"
for v in sorted(matches, key=lambda l: l["name"]):
string += "Name: %s, spi_sclk: %d, spi_en: %d\n" % (
v["name"],
v["spi_sclk"],
v["spi_en"],
)
print(string)
if not display:
return matches
def __len__(self):
"""Override __len__, return number of registers"""
return len(self.memory_map)
[docs]
def is_connected(self):
"""Check if board is connected"""
return self._connected
def _checks(self, device=Device.Board):
"""Check prior to function calls"""
# Check if board is connected
# Check if connected
if not self._connected:
raise LibraryError("Cannot perform operation on unconnected board")
# Check if device is programmed
if device is not None and not self._programmed[device]:
raise LibraryError(
"Cannot get memory map from board which has not been programmed"
)
# Check if register list has been populated
if self.memory_map is None:
self.get_register_list()
return True
[docs]
def log(self, string):
"""
Format string for logging output.
:param string: String to log
:return: Formatted string
"""
return "%s (%s)" % (string, self._string_id)