Source code for ska_low_cbf_fpga.xrt_info

# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
import json
import typing

import pyxrt

from ska_low_cbf_fpga import FpgaHardwareInfo, IclField


[docs] class XrtInfo(FpgaHardwareInfo): """ Hardware info monitoring via pyxrt xrt_info_device. Access via item index, e.g. ``my_xrt_info["thermal"]``. Some flattening of data structures is performed. """ _INFO_PARAMS = { "bdf": str, "dynamic_regions": "json", "electrical": "json", "host": "json", "interface_uuid": str, "kdma": bool, "m2m": bool, "max_clock_frequency_mhz": int, "mechanical": "json", "memory": "json", "name": str, "nodma": bool, "offline": bool, "pcie_info": "json", "platform": "json", "thermal": "json", } """Mapping from known hardware info item keys to their data types."""
[docs] def __init__(self, device: typing.Union[str, pyxrt.device]): if isinstance(device, str): self._device = pyxrt.device(device) elif isinstance(device, pyxrt.device): self._device = device else: raise TypeError( "device must be str or pyxrt.device. " f"{type(device)} not supported." )
@property def xclbin_uuid(self) -> str: """Get the UUID of the active xclbin firmware.""" return self._device.get_xclbin_uuid().to_string() @property def fpga_temperature(self) -> IclField[int]: """ Get FPGA temperature in degrees Celsius. """ LOC_KEY = "fpga0" # others locations are: pcb_top_front/rear etc readings = self["thermal"] temperature = 0 for i in readings: if i["location_id"] == LOC_KEY and i["is_present"] == "true": temperature = int(i["temp_C"]) break return IclField( description="FPGA temperature in °C", format="%d", type_=int, value=temperature, user_write=False, ) @property def fpga_power(self) -> IclField[float]: """ Get FPGA power consumption in Watts. """ # drv = self._driver POWER_KEY = "power_consumption_watts" readings = self["electrical"] power = float(readings[POWER_KEY]) if POWER_KEY in readings else 0 return IclField( description="FPGA power consumption in W", format="%f", type_=float, value=power, user_write=False, ) @property def hbm_temperature(self) -> IclField[int]: """ Get HBM temperature in degrees Celsius. """ HBM_KEY = "fpga_hbm" readings = self["thermal"] temperature = 0 for i in readings: if i["location_id"] == HBM_KEY and i["is_present"] == "true": temperature = int(i["temp_C"]) break return IclField( description="HBM temperature in °C", format="%d", type_=int, value=temperature, user_write=False, ) @property def mac_addresses(self) -> IclField[str]: """ Get Ethernet MAC addresses. :returns: Array of str, formatted like "00:01:02:03:04:05". """ addresses = [mac["address"] for mac in self["platform"]["macs"]] return IclField( description="Ethernet MAC addresses", format="%s", type_=str, value=addresses, length=len(addresses), user_write=False, ) @property def power_supply_12v_voltage(self) -> IclField[float]: """ Get power rail 12 volt reading. """ readings = self["electrical"] VOLTAGE_KEY = "12v_aux" voltage = 0 for i in readings["power_rails"]: if i["id"] == VOLTAGE_KEY and i["voltage"]["is_present"] == "true": voltage = float(i["voltage"]["volts"]) break return IclField( description="AUX 12 voltage in volts", format="%f", type_=float, value=voltage, user_write=False, ) @property def power_supply_12v_current(self) -> IclField[float]: """ Get 12V power rail's current. """ readings = self["electrical"] VOLTAGE_KEY = "12v_aux" current = 0 for i in readings["power_rails"]: if i["id"] == VOLTAGE_KEY and i["current"]["is_present"] == "true": current = float(i["current"]["amps"]) break return IclField( description="AUX 12V current in Amperes", format="%f", type_=float, value=current, user_write=False, ) @property def pcie_12v_voltage(self) -> IclField[float]: """ Get PCIe 12V power rail voltage reading. """ readings = self["electrical"] VOLTAGE_KEY = "12v_pex" voltage = 0 for i in readings["power_rails"]: if i["id"] == VOLTAGE_KEY and i["voltage"]["is_present"] == "true": voltage = float(i["voltage"]["volts"]) break return IclField( description="PCIe 12V power rail in volts", format="%f", type_=float, value=voltage, user_write=False, ) @property def pcie_12v_current(self) -> IclField[float]: """ Get PCIe 12V power rail's current. """ readings = self["electrical"] VOLTAGE_KEY = "12v_pex" current = 0 for i in readings["power_rails"]: if i["id"] == VOLTAGE_KEY and i["current"]["is_present"] == "true": current = float(i["current"]["amps"]) break return IclField( description="PCIe 12V current in Amperes", format="%f", type_=float, value=current, user_write=False, )
[docs] def __getitem__(self, item: str): """Access info parameters via item index syntax. :param item: probably one of the values defined in ``_INFO_PARAMS`` """ # TODO there's scope to cache values retrieved by XRT driver; e.g. if # we read 12V current in a quick succession after reading 12V # voltage we could be using a previously cached pyxrt.xrt_info_device.electrical # readings instead of fetching the readings from driver again if item not in self._INFO_PARAMS.keys(): raise KeyError(f"{item} is not an available health parameter") raw = self._device.get_info(getattr(pyxrt.xrt_info_device, item)) type_ = self._INFO_PARAMS[item] if type_ == "json": parsed = json.loads(raw) # many of these are dicts with one top-level key if len(parsed.keys()) == 1: parsed = parsed[next(iter(parsed))] # and then often a list with one element if isinstance(parsed, list) and len(parsed) == 1: parsed = parsed[0] return parsed elif isinstance(type_, type): if type_ == bool: # We need str '0' to evaluate as False return bool(int(raw)) return type_(raw) else: raise ValueError(f"Unsupported type {self._INFO_PARAMS[item]}")
def __contains__(self, item): return item in self._INFO_PARAMS.keys() def __iter__(self): return iter(self._INFO_PARAMS.keys())