Source code for ska_low_cbf_fpga.log

# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""Logging configuration & translation functions"""
import argparse
import json
import logging
import sys
import typing
from collections import OrderedDict
from dataclasses import dataclass
from enum import Enum, auto

import numpy as np
from parse import parse

from ska_low_cbf_fpga.args_map import (
    AddressInfo,
    all_register_addresses,
    get_name_and_offset,
    load_fpgamap_from_file,
)

LOGGER_NAME = "ska-low-cbf-fpga register log"


[docs]def log_to_file( logger: logging.Logger, filename: str, level: int = logging.DEBUG, hex_vals: bool = False, ) -> None: """ Add a file handler to a logger, with our predefined log format, and configure numpy to ensure full arrays are printed on a single line. Warning: numpy print options are a global setting! :param logger: Logger to configure :param filename: File name to write (append) to :param level: Logging level, defaults to DEBUG to capture driver read/write :param hex_vals: convert values to hexadecimal? """ logger.setLevel(level) formatter = logging.Formatter( "%(asctime)s|%(name)s|%(levelname)s|%(message)s" ) fh = logging.FileHandler(filename, encoding="utf-8") fh.setLevel(level) fh.setFormatter(formatter) fh.set_name(LOGGER_NAME) logger.addHandler(fh) # make numpy print full arrays, on a single line np.set_printoptions(threshold=sys.maxsize, linewidth=sys.maxsize) if hex_vals: np.set_printoptions(formatter={"int": hex})
[docs]def stop_log_to_file(logger: logging.Logger) -> None: """ Stop register logging. :param logger: Logger to configure """ for handler in logger.handlers: if handler.get_name() == LOGGER_NAME: logger.removeHandler(handler)
KNOWN_DRIVERS = ["ArgsXrt", "ArgsSimulator"]
[docs]class ConversionFormat(Enum): """Log conversion output formats.""" HUMAN = auto() REGISTER = auto() TESTBENCH = auto()
[docs]@dataclass class LogEntry: """Details of one log entry.""" time: str mode: str name: str offset: int values: list length: str address: int level: str source: str
[docs]def convert_log_contents( log: typing.TextIO, register_lookup: typing.Dict[int, AddressInfo], conversion_format: ConversionFormat, file_out: typing.TextIO, include_reads: bool = False, include_others: bool = False, hex_vals: bool = False, filter_spec: typing.Union[dict, typing.TextIO, None] = None, ) -> None: """ Convert the contents of a log file. :param log: input log file :param register_lookup: lookup table :param conversion_format: choice of output format :param file_out: output file to write to :param include_reads: include FPGA reads in output? :param include_others: include non-FPGA register log lines in output? :param hex_vals: convert values to hex? (only relevant for HUMAN mode) :param filter_spec: filtering configuration as dict or JSON in file. e.g. .. code-block:: python { "select_words": { # word address range to keep (inclusive) "spead_sdp.spead_params.data": [0, 7167], "spead_sdp_2.spead_params.data": [0, 7167], }, "ignore": [ "spead_sdp.spead_ctrl.trigger_packet", "spead_sdp_2.spead_ctrl.trigger_packet", ], "keep_only_last": True, } """ if filter_spec is None: filter_spec = {} elif isinstance(filter_spec, dict): pass else: filter_spec = json.load(filter_spec) only_last = filter_spec.get("keep_only_last", False) if only_last: last_log = OrderedDict() log.seek(0) for line in log: time, source, level, message = line.split("|", maxsplit=4) if ( not any(driver in source for driver in KNOWN_DRIVERS) or level != "DEBUG" or not ( message.startswith("Write address") or message.startswith("Read address") ) ): if include_others: file_out.write(line) continue # we now have a log message that requires conversion message = message.strip() length = None if message.startswith("Write address"): address, values = parse("Write address {}: {}", message) mode = "Write" else: if not include_reads: continue address, length, values = parse( "Read address {}, length {}: {}", message ) mode = "Read" address = int(address, 16) # address is always hex name, offset = get_name_and_offset(register_lookup, address) # values will be one or more whitespace delimited numbers inside [] assert values.startswith("[") and values.endswith("]") values = values[1:-1] values = values.split() # 0 here means "guess": 0x -> hex, default to decimal values = [int(val, 0) for val in values] # Filter if ( "select_words" in filter_spec and name in filter_spec["select_words"] ): min_word, max_word = filter_spec["select_words"][name] if offset > max_word or (offset + (length or 0)) < min_word: continue values = values[min_word : (max_word + 1 - offset)] if "ignore" in filter_spec and name in filter_spec["ignore"]: continue entry = LogEntry( time, mode, name, offset, values, length, address, level, source ) if only_last: last_log[name] = entry last_log.move_to_end(name) # in case it was already in there else: _write_log_entry( conversion_format, entry, file_out, hex_vals, ) if only_last: for entry in last_log.values(): _write_log_entry( conversion_format, entry, file_out, hex_vals, )
[docs]def _write_log_entry( conversion_format: ConversionFormat, entry: LogEntry, file_out: typing.TextIO, hex_vals: bool, ): """Write one log entry in the requested format.""" values = entry.values if conversion_format == ConversionFormat.HUMAN: human_message = f"{entry.mode} {entry.name}" if entry.offset: human_message += f"[{entry.offset}]" if entry.length is not None and int(entry.length) > 1: human_message += f", {entry.length} words" if hex_vals: values = ( "[" + ", ".join([f"0x{value:08x}" for value in values]) + "]" ) human_message += f": {values}" print( entry.time, entry.source, entry.level, human_message, sep="|", file=file_out, ) elif conversion_format == ConversionFormat.REGISTER: file_out.write(f"[{entry.name}][{entry.offset}]\n") for value in values: file_out.write(f"0x{value:08x}\n") elif conversion_format == ConversionFormat.TESTBENCH: file_out.write(f"wr {entry.address :08x} {len(values) * 4:08x}\n") for value in values: file_out.write(f"{value:08x}\n")
[docs]def convert_log(): """Command-Line Interface - convert a log file.""" parser = argparse.ArgumentParser( description="ska-low-cbf-fpga Log File Conversion Utility" ) parser.add_argument( "log", type=argparse.FileType("r", encoding="utf-8"), help="input log file", ) parser.add_argument( "-m", "--fpgamap", type=argparse.FileType("r"), help="path to fpgamap_nnnnnnnn.py file to decode addresses", ) parser.add_argument( "-u", "--human", type=argparse.FileType("w"), help="human-readable output file", ) parser.add_argument( "-r", "--registers", type=argparse.FileType("w"), help="machine-readable register values output file", ) parser.add_argument( "-t", "--testbench", type=argparse.FileType("w"), help="register values output file, formatted for use in simulation testbench", ) parser.add_argument( "-f", "--filter", type=argparse.FileType("r"), help="filter specification file (JSON)", ) parser.add_argument( "-x", "--hex", action="store_true", help="convert values to hexadecimal (for human-readable mode)", ) args = parser.parse_args() fpgamap = load_fpgamap_from_file(args.fpgamap.name) register_lookup = all_register_addresses(fpgamap) filter_spec = {} if args.filter: filter_spec = json.load(args.filter) if args.human: convert_log_contents( args.log, register_lookup, ConversionFormat.HUMAN, args.human, include_reads=True, include_others=True, hex_vals=args.hex, filter_spec=filter_spec, ) if args.registers: convert_log_contents( args.log, register_lookup, ConversionFormat.REGISTER, args.registers, filter_spec=filter_spec, ) if args.testbench: convert_log_contents( args.log, register_lookup, ConversionFormat.TESTBENCH, args.testbench, filter_spec=filter_spec, )