Source code for ska_low_sps_tpm_api.plugins.sysmon

import time

from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock

__author__ = "Gabriele Sorrenti"

ad_16_fpga = [
    {
        "name": "VM_FE0",
        "ref": 3.5000,
        "fpga": 0,
        "ch": 0,
        "divider": 11.5 / (31.6 + 11.5),
    },
    {
        "name": "VM_FE1",
        "ref": 3.5000,
        "fpga": 1,
        "ch": 9,
        "divider": 11.5 / (31.6 + 11.5),
    },
    {
        "name": "FE0_mVA",
        "ref": 2.5,
        "fpga": 0,
        "ch": 2,
        "divider": 0.01 * 3920 / 100,
        "unit": "A",
    },  # divider = R201*G_op_amp, G_op_amp=(R172/R173)=(3920/100) before was 0.400 ACS70311 400 mV/A TYPE
    {
        "name": "FE1_mVA",
        "ref": 2.5,
        "fpga": 1,
        "ch": 8,
        "divider": 0.01 * 3920 / 100,
        "unit": "A",
    },  # divider = R201*G_op_amp, G_op_amp=(R172/R173)=(3920/100) before was 0.400 ACS70311 400 mV/A TYPE
    {"name": "VM_DDR0_VTT", "ref": 0.6000, "fpga": 0, "ch": 3},
    {"name": "VM_AGP0", "ref": 0.9160, "fpga": 0, "ch": 1},
    {"name": "VM_AGP1", "ref": 0.9160, "fpga": 0, "ch": 4},
    {"name": "VM_AGP2", "ref": 0.9160, "fpga": 0, "ch": 7},
    {"name": "VM_AGP3", "ref": 0.9160, "fpga": 0, "ch": 10},
    {"name": "VM_AGP4", "ref": 0.9160, "fpga": 1, "ch": 2},
    {"name": "VM_AGP5", "ref": 1, "fpga": 1, "ch": 0},
    {"name": "VM_AGP6", "ref": 0.9160, "fpga": 1, "ch": 3},
    {"name": "VM_AGP7", "ref": 0.9160, "fpga": 1, "ch": 4},
    {
        "name": "VM_CLK0B",
        "ref": 3.3000,
        "fpga": 0,
        "ch": 5,
        "divider": 11.5 / (31.6 + 11.5),
    },
    {
        "name": "VM_CLK1B",
        "ref": 3.3000,
        "fpga": 1,
        "ch": 1,
        "divider": 11.5 / (31.6 + 11.5),
    },
    {
        "name": "VM_MGT0_AUX",
        "ref": 1.8000,
        "fpga": 0,
        "ch": 6,
        "divider": 11.5 / (20 + 11.5),
    },
    {
        "name": "VM_MGT1_AUX",
        "ref": 1.8000,
        "fpga": 1,
        "ch": 5,
        "divider": 11.5 / (20 + 11.5),
    },
    {
        "name": "VM_ADC0",
        "ref": 3.3000,
        "fpga": 0,
        "ch": 8,
    },
    {
        "name": "VM_ADC1",
        "ref": 3.3000,
        "fpga": 0,
        "ch": 9,
    },
    {
        "name": "VM_PLL",
        "ref": 3.3000,
        "fpga": 0,
        "ch": 11,
        "divider": 11.5 / (31.6 + 11.5),
    },
    {"name": "VM_DDR1_VTT", "ref": 0.6000, "fpga": 1, "ch": 6},
    {
        "name": "VM_DDR1_VDD",
        "ref": 1.2000,
        "fpga": 1,
        "ch": 11,
        "divider": 11 / (4.99 + 11),
    },
    {"name": "VM_DVDD", "ref": 1.1000, "fpga": 1, "ch": 7, "divider": 11 / (4.99 + 11)},
    {
        "name": "VM_SW_AMP",
        "ref": 3.5000,
        "fpga": 1,
        "ch": 10,
        "divider": 4.53 / (20 + 4.53),
    },
]


[docs] def get_value_index(name, device): index = -1 if device == "fpga1": fpga_ind = 0 else: fpga_ind = 1 for i in range(0, len(ad_16_fpga)): if (name == ad_16_fpga[i]["name"]) and (fpga_ind == ad_16_fpga[i]["fpga"]): index = i break return index
[docs] class TpmSysmon(FirmwareBlock): """TpmSysmon plugin"""
[docs] @compatibleboards(BoardMake.TpmBoard) @friendlyname("tpm_sysmon") @maxinstances(2) def __init__(self, board, logger=None, **kwargs): """ TpmSysmon initialiser. :param board: Pointer to board instance """ super(TpmSysmon, self).__init__(board, logger=logger) self._board_type = kwargs.get("board_type", "XTPM") if "device" not in list(kwargs.keys()): raise PluginError("TpmFpga: Require a node instance") self._device = kwargs["device"] if self._device == Device.FPGA_1: self._device = "fpga1" elif self._device == Device.FPGA_2: self._device = "fpga2" else: raise PluginError("TpmFpga: Invalid device %s" % self._device)
[docs] def initialize(self): """Reset System Monitor""" self.logger.warning( f"{self._device} initialize sysmon, enable average for all channels" ) self.board["%s.sys_mon.sysmonrr" % self._device] = 0x1 self.board["%s.sys_mon.sysmonrr" % self._device] = 0x0 self.board["%s.sys_mon.conf_reg_0" % self._device] = ( 1 << 12 ) # enable 16 averages (0=0, 1=16, 2=64, 3=256)) self.board["%s.sys_mon.seq_reg2" % self._device] = 0xFFFF # enable CH self.board["%s.sys_mon.seq_reg3" % self._device] = 0xFFFF # enable CH
[docs] def get_fpga_temperature(self): """Read FPGA temperature""" rdval = self.board["%s.sys_mon.temp" % self._device] temp = ((rdval * 501.3743) / 65536) - 273.67777 return temp
[docs] def read_adx(self, ad_idx): """Read specified AD to get VAUX""" if int(ad_idx) > 15 or int(ad_idx) < 0: self.logger.error( "%s sysmon, invalid AD index %s" % (self._device, str(ad_idx)) ) return -1 else: # reg='%s.sys_mon.vaux_%s' % (self._device, str(ad_idx)) rdval = self.board["%s.sys_mon.vaux_%s" % (self._device, str(ad_idx))] # print(f"read_adx {reg} {self.board.find_register(reg)} {rdval}") voltage = float(rdval) * 1 / 65536 return voltage
[docs] def read_conf_regs(self): """Read consifguration registers""" rdval0 = self.board["%s.sys_mon.conf_reg_0" % self._device] rdval1 = self.board["%s.sys_mon.conf_reg_1" % self._device] rdval2 = self.board["%s.sys_mon.conf_reg_2" % self._device] rdval3 = self.board["%s.sys_mon.conf_reg_3" % self._device] return rdval0, rdval1, rdval2, rdval3
[docs] def get_vcc_int(self): """Read internal VCC""" val = self.board["%s.sys_mon.vccint" % self._device] val = float(val) / 65536 * 3 return val
[docs] def get_vcc_aux(self): """Read internal VAUX""" val = self.board["%s.sys_mon.vccaux" % self._device] val = float(val) / 65536 * 3 return val
[docs] def get_vcc_fe(self): if self._device == "fpga1": vcc = self.read_adx(ad_16_fpga[0]["ch"]) if "divider" in ad_16_fpga[0]: vcc = vcc / ad_16_fpga[0]["divider"] else: vcc = self.read_adx(ad_16_fpga[1]["ch"]) if "divider" in ad_16_fpga[1]: vcc = vcc / ad_16_fpga[1]["divider"] return vcc
[docs] def get_vm_ddr0_vtt(self): vcc = self.read_adx(ad_16_fpga[0]["ch"]) if "divider" in ad_16_fpga[0]: vcc = vcc / ad_16_fpga[0]["divider"] return vcc
[docs] def get_available_measure(self): measures = [] if self._device == "fpga1": fpga_ind = 0 else: fpga_ind = 1 for i in range(0, len(ad_16_fpga)): if fpga_ind == ad_16_fpga[i]["fpga"]: measures.append(ad_16_fpga[i]["name"]) print(measures)
[docs] def get_available_voltages(self): voltages = [] if self._device == "fpga1": fpga_ind = 0 else: fpga_ind = 1 for i in range(0, len(ad_16_fpga)): if fpga_ind == ad_16_fpga[i]["fpga"] and ad_16_fpga[i].get("unit") != "A": voltages.append(ad_16_fpga[i]["name"]) return voltages
[docs] def get_available_currents(self): currents = [] if self._device == "fpga1": fpga_ind = 0 else: fpga_ind = 1 for i in range(0, len(ad_16_fpga)): if fpga_ind == ad_16_fpga[i]["fpga"] and ad_16_fpga[i].get("unit") == "A": currents.append(ad_16_fpga[i]["name"]) return currents
[docs] def get_measure_val(self, name): index = get_value_index(name, self._device) if index == -1: self.logger.error("ERROR: measure not available check provided name") return -1 else: val = self.read_adx(ad_16_fpga[index]["ch"]) if "divider" in ad_16_fpga[index]: val = val / ad_16_fpga[index]["divider"] return val
[docs] def get_voltage(self, voltage=None): if voltage is None: voltages = self.get_available_voltages() else: if voltage not in self.get_available_voltages(): # raise PluginError(f"No voltage named '{voltage}' \n Options are {self.get_available_voltages()}") return {} voltages = [voltage] rt = {} for name in voltages: rt[name] = round(self.get_measure_val(name), 3) return rt
[docs] def get_current(self, current=None): if current is None: currents = self.get_available_currents() else: if current not in self.get_available_currents(): # raise PluginError(f"No current named '{current}' \n Options are {self.get_available_currents()}") return {} currents = [current] rt = {} for name in currents: rt[name] = round(self.get_measure_val(name), 3) return rt
##################### Superclass method implementations #################################
[docs] def initialise(self): """Initialise TpmSysmon""" self.logger.info("TpmSysmon has been initialised") return True
[docs] def status_check(self): """Perform status check. :return: Status """ self.logger.info("TpmSysmon : Checking status") return Status.OK
[docs] def clean_up(self): """Perform cleanup. :return: Success """ self.logger.info("TpmSysmon : Cleaning up") return True