# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
import json
import typing
import pyxrt
from ska_low_cbf_fpga import FpgaHardwareInfo, IclField
[docs]
class XrtInfo(FpgaHardwareInfo):
"""
Hardware info monitoring via pyxrt xrt_info_device.
Access via item index, e.g. ``my_xrt_info["thermal"]``.
Some flattening of data structures is performed.
"""
_INFO_PARAMS = {
"bdf": str,
"dynamic_regions": "json",
"electrical": "json",
"host": "json",
"interface_uuid": str,
"kdma": bool,
"m2m": bool,
"max_clock_frequency_mhz": int,
"mechanical": "json",
"memory": "json",
"name": str,
"nodma": bool,
"offline": bool,
"pcie_info": "json",
"platform": "json",
"thermal": "json",
}
"""Mapping from known hardware info item keys to their data types."""
[docs]
def __init__(self, device: typing.Union[str, pyxrt.device]):
if isinstance(device, str):
self._device = pyxrt.device(device)
elif isinstance(device, pyxrt.device):
self._device = device
else:
raise TypeError(
"device must be str or pyxrt.device. " f"{type(device)} not supported."
)
@property
def xclbin_uuid(self) -> str:
"""Get the UUID of the active xclbin firmware."""
return self._device.get_xclbin_uuid().to_string()
@property
def fpga_temperature(self) -> IclField[int]:
"""
Get FPGA temperature in degrees Celsius.
"""
LOC_KEY = "fpga0" # others locations are: pcb_top_front/rear etc
readings = self["thermal"]
temperature = 0
for i in readings:
if i["location_id"] == LOC_KEY and i["is_present"] == "true":
temperature = int(i["temp_C"])
break
return IclField(
description="FPGA temperature in °C",
format="%d",
type_=int,
value=temperature,
user_write=False,
)
@property
def fpga_power(self) -> IclField[float]:
"""
Get FPGA power consumption in Watts.
"""
# drv = self._driver
POWER_KEY = "power_consumption_watts"
readings = self["electrical"]
power = float(readings[POWER_KEY]) if POWER_KEY in readings else 0
return IclField(
description="FPGA power consumption in W",
format="%f",
type_=float,
value=power,
user_write=False,
)
@property
def hbm_temperature(self) -> IclField[int]:
"""
Get HBM temperature in degrees Celsius.
"""
HBM_KEY = "fpga_hbm"
readings = self["thermal"]
temperature = 0
for i in readings:
if i["location_id"] == HBM_KEY and i["is_present"] == "true":
temperature = int(i["temp_C"])
break
return IclField(
description="HBM temperature in °C",
format="%d",
type_=int,
value=temperature,
user_write=False,
)
@property
def mac_addresses(self) -> IclField[str]:
"""
Get Ethernet MAC addresses.
:returns: Array of str, formatted like "00:01:02:03:04:05".
"""
addresses = [mac["address"] for mac in self["platform"]["macs"]]
return IclField(
description="Ethernet MAC addresses",
format="%s",
type_=str,
value=addresses,
length=len(addresses),
user_write=False,
)
@property
def power_supply_12v_voltage(self) -> IclField[float]:
"""
Get power rail 12 volt reading.
"""
readings = self["electrical"]
VOLTAGE_KEY = "12v_aux"
voltage = 0
for i in readings["power_rails"]:
if i["id"] == VOLTAGE_KEY and i["voltage"]["is_present"] == "true":
voltage = float(i["voltage"]["volts"])
break
return IclField(
description="AUX 12 voltage in volts",
format="%f",
type_=float,
value=voltage,
user_write=False,
)
@property
def power_supply_12v_current(self) -> IclField[float]:
"""
Get 12V power rail's current.
"""
readings = self["electrical"]
VOLTAGE_KEY = "12v_aux"
current = 0
for i in readings["power_rails"]:
if i["id"] == VOLTAGE_KEY and i["current"]["is_present"] == "true":
current = float(i["current"]["amps"])
break
return IclField(
description="AUX 12V current in Amperes",
format="%f",
type_=float,
value=current,
user_write=False,
)
@property
def pcie_12v_voltage(self) -> IclField[float]:
"""
Get PCIe 12V power rail voltage reading.
"""
readings = self["electrical"]
VOLTAGE_KEY = "12v_pex"
voltage = 0
for i in readings["power_rails"]:
if i["id"] == VOLTAGE_KEY and i["voltage"]["is_present"] == "true":
voltage = float(i["voltage"]["volts"])
break
return IclField(
description="PCIe 12V power rail in volts",
format="%f",
type_=float,
value=voltage,
user_write=False,
)
@property
def pcie_12v_current(self) -> IclField[float]:
"""
Get PCIe 12V power rail's current.
"""
readings = self["electrical"]
VOLTAGE_KEY = "12v_pex"
current = 0
for i in readings["power_rails"]:
if i["id"] == VOLTAGE_KEY and i["current"]["is_present"] == "true":
current = float(i["current"]["amps"])
break
return IclField(
description="PCIe 12V current in Amperes",
format="%f",
type_=float,
value=current,
user_write=False,
)
[docs]
def __getitem__(self, item: str):
"""Access info parameters via item index syntax.
:param item: probably one of the values defined in ``_INFO_PARAMS``
"""
# TODO there's scope to cache values retrieved by XRT driver; e.g. if
# we read 12V current in a quick succession after reading 12V
# voltage we could be using a previously cached pyxrt.xrt_info_device.electrical
# readings instead of fetching the readings from driver again
if item not in self._INFO_PARAMS.keys():
raise KeyError(f"{item} is not an available health parameter")
raw = self._device.get_info(getattr(pyxrt.xrt_info_device, item))
type_ = self._INFO_PARAMS[item]
if type_ == "json":
parsed = json.loads(raw)
# many of these are dicts with one top-level key
if len(parsed.keys()) == 1:
parsed = parsed[next(iter(parsed))]
# and then often a list with one element
if isinstance(parsed, list) and len(parsed) == 1:
parsed = parsed[0]
return parsed
elif isinstance(type_, type):
if type_ == bool:
# We need str '0' to evaluate as False
return bool(int(raw))
return type_(raw)
else:
raise ValueError(f"Unsupported type {self._INFO_PARAMS[item]}")
def __contains__(self, item):
return item in self._INFO_PARAMS.keys()
def __iter__(self):
return iter(self._INFO_PARAMS.keys())