Source code for cnic_fpga

# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

"""
CNIC FPGA Firmware ICL (Instrument Control Layer)
"""
import logging
import threading
import time
import typing
from itertools import islice
from warnings import warn

from ska_low_cbf_fpga import ArgsMap, FpgaHardwareInfo, FpgaPersonality, IclField

from ska_low_cbf_sw_cnic.icl.hbm_packet_controller import HbmPacketController
from ska_low_cbf_sw_cnic.icl.no_attributes import NoAttributes
from ska_low_cbf_sw_cnic.icl.ptp import Ptp
from ska_low_cbf_sw_cnic.icl.ptp_scheduler import PtpScheduler
from ska_low_cbf_sw_cnic.icl.spead_sps import SpeadSPS
from ska_low_cbf_sw_cnic.icl.vd_config import StreamConfig
from ska_low_cbf_sw_cnic.icl.vd_datagen import (
    MAX_CONFIG_SOURCES,
    SOURCES_PER_STREAM,
    DataGenerator,
    pad_source_list,
)
from ska_low_cbf_sw_cnic.icl.virtual_digitiser import VirtualDigitiser
from ska_low_cbf_sw_cnic.util.pcap import count_packets_in_pcap, packet_size_from_pcap
from ska_low_cbf_sw_cnic.util.vd_datagen import parse_config

RX_SLEEP_TIME = 5
"""wait this many seconds between checking if Rx is finished"""
LOAD_SLEEP_TIME = 5
"""wait this many seconds between checking if Load is finished"""


[docs]class CnicFpga(FpgaPersonality): _peripheral_class = { "hbm_pktcontroller": HbmPacketController, "timeslave": PtpScheduler, "timeslave_b": Ptp, # Note: may want to see cmac_b attrs if 2nd port in use "cmac_b": NoAttributes, "drp": NoAttributes, "vitis_shared": NoAttributes, "vd": VirtualDigitiser, "vd_datagen": DataGenerator, "vd_datagen_2": DataGenerator, "spead_sps": SpeadSPS, }
[docs] def __init__( self, driver, map_: ArgsMap, hardware_info: FpgaHardwareInfo = None, logger: logging.Logger = None, ptp_domain: int = 24, ptp_source_b: bool = False, firmware_version_check: bool = True, **kwargs, ) -> None: """ CNIC FPGA Personality ICL Class :param driver: see FpgaPersonality :param map_: see FpgaPersonality :param logger: see FpgaPersonality :param ptp_domain: PTP domain number :param ptp_source_b: Use PTP source B? (Note: only present for some firmware versions / FPGA cards) :param firmware_version_check: Enforce firmware version restriction? """ super().__init__(driver, map_, logger=logger, hardware_info=hardware_info) # check FW version (earlier versions lack some registers we use) print(f"FW Version check: {firmware_version_check}") if firmware_version_check: if "vd" in self.peripherals: self._check_fw("CNIC", ">=0.1.12") else: # Non-VD builds do not need to be so fresh self._check_fw("CNIC", ">=0.1.6") else: self._logger.warning("Firmware version check skipped") if self.info is None: # info interface only implemented for ArgsXrt driver... self._logger.error("FPGA info interface not available! No PTP possible!") else: # We don't always have 2x PTP cores ethernet_ports = len(self.info["platform"]["macs"]) // 4 if ethernet_ports > 0: self._configure_ptp(self["timeslave"], ptp_domain, 0) if ethernet_ports > 1: self._configure_ptp(self["timeslave_b"], ptp_domain, 1) self._logger.info(f"PTP Source: {'B' if ptp_source_b else 'A'}") self["timeslave"].ptp_source_select = ptp_source_b else: self["timeslave"].ptp_source_select = 0 if ptp_source_b: self._logger.warning("No PTP source B available") else: self._logger.error( "Could not read card MAC addresses! No PTP possible!" ) self._rx_cancel = threading.Event() self._rx_thread = None self._rx_finished = False self._load_thread = None self._requested_pcap = None
def __del__(self): "Cleanup; base class might hold an Alveo card lock" super().__del__() def _configure_ptp( self, ptp: Ptp, ptp_domain: int, alveo_mac_index: int = 0 ) -> None: """ Configure a PTP Peripheral :param ptp: Ptp (FpgaPeripheral) object to configure :param alveo_mac_index: which Alveo MAC address to use as basis for PTP MAC address :param ptp_domain: PTP domain number """ alveo_macs = [_["address"] for _ in self.info["platform"]["macs"]] alveo_mac = alveo_macs[alveo_mac_index] # MAC is str, colon-separated hex bytes "01:02:03:04:05:06" self._logger.info(f"Alveo MAC address: {alveo_mac}") # take low 3 bytes of mac, convert to int alveo_mac_low = int("".join(alveo_mac.split(":")[-3:]), 16) # configure the PTP core to use the same low 3 MAC bytes # (high bytes are set by the PTP core) ptp.startup(alveo_mac_low, ptp_domain) self._logger.info(f" PTP MAC address: {ptp.mac_address.value}")
[docs] def prepare_transmit( self, in_filename: str, n_loops: int = 1, burst_size: int = 1, burst_gap: typing.Union[int, None] = None, rate: float = 100.0, n_packets: int = 0, ) -> None: """ Prepare for transmission :param str in_filename: input PCAP(NG) file path :param int n_loops: number of loops :param int burst_size: packets per burst :param burst_gap: packet burst period (ns), overrides rate :param float rate: transmission rate (Gigabits per sec), ignored if burst_gap given :param n_packets: Number of packets to transmit, if 0 then send all in the file """ if self._load_thread_active: raise RuntimeError(f"Loading {self._requested_pcap} still in progress!") self._requested_pcap = in_filename packet_size = packet_size_from_pcap(in_filename) if self.hbm_pktcontroller.loaded_pcap.value == self._requested_pcap: # if we've already loaded the pcap, use the old count # (it may be less than the number of packets in the file!) pcap_packets = self.hbm_pktcontroller.loaded_pcap_packets.value self._logger.info( f"File {in_filename} already loaded, {pcap_packets} packets" ) else: pcap_packets = count_packets_in_pcap(in_filename) self._logger.info(f"Counted {pcap_packets} packets in {in_filename} file") if pcap_packets == 0: self._logger.error(f"ERROR: 0 packets in {in_filename} file") return if n_packets == 0: tx_packets = pcap_packets else: tx_packets = min(pcap_packets, n_packets) self._logger.info(f"Configured to send {n_packets}") self.hbm_pktcontroller.tx_enable = False self.hbm_pktcontroller.tx_reset = True self.timeslave.schedule_control_reset = 1 self.hbm_pktcontroller.configure_tx( packet_size, tx_packets, n_loops, burst_size, burst_gap, rate ) if self.hbm_pktcontroller.loaded_pcap.value != self._requested_pcap: self._logger.info(f"About to load {in_filename}") self._load_thread = threading.Thread( target=self.hbm_pktcontroller.load_pcap, args=(in_filename,) ) self._load_thread.start() self._logger.info( f"Prepared to send {n_loops} loop(s), {tx_packets} packets " f"of size {packet_size} @ {rate} rate" )
@property def _load_thread_active(self) -> bool: """Is the PCAP load thread active?""" if self._load_thread: if self._load_thread.is_alive(): return True self._load_thread.join() self._load_thread = None return False @property def ready_to_transmit(self) -> IclField[bool]: """Can we transmit? i.e. Is our PCAP file loaded?""" value = False if self._requested_pcap and not self._load_thread_active: value = self.hbm_pktcontroller.loaded_pcap.value == self._requested_pcap return IclField( description="CNIC Ready to Transmit", type_=bool, value=value, user_write=False, )
[docs] def begin_transmit( self, start_time: typing.Union[str, None] = None, stop_time: typing.Union[str, None] = None, ) -> None: """ Begin Transmission (either now or later) :param start_time: optional time to begin transmission at (start now if not otherwise specified) :param stop_time: optional time to end transmission at """ self.hbm_pktcontroller.tx_reset = False self._logger.info(f"Scheduling Tx stop time: {stop_time}") self.timeslave.tx_stop_time = stop_time self._logger.info(f"Scheduling Tx start time: {start_time}") self.timeslave.tx_start_time = start_time self.timeslave.schedule_control_reset = 0 if not start_time: self._logger.info("Starting transmission") self.hbm_pktcontroller.start_tx()
[docs] def transmit_pcap( self, in_filename: str, n_loops: int = 1, burst_size: int = 1, burst_gap: typing.Union[int, None] = None, rate: float = 100.0, start_time: typing.Union[str, None] = None, stop_time: typing.Union[str, None] = None, n_packets: int = 0, ) -> None: """ Transmit packets from a PCAP file :param str in_filename: input PCAP(NG) file path :param int n_loops: number of loops (default 1) :param int burst_size: packets per burst (default 1) :param int burst_gap: packet burst period (ns), overrides rate :param float rate: transmission rate (Gigabits per sec), ignored if burst_gap given (default 100.0) :param start_time: optional time to begin transmission at (start now if not otherwise specified) :param stop_time: optional time to end transmission at :param n_packets: Number of packets to transmit, defaults to all """ self.prepare_transmit( in_filename, n_loops, burst_size, burst_gap, rate, n_packets ) while not self.ready_to_transmit: self._logger.info("Still loading PCAP file") time.sleep(LOAD_SLEEP_TIME) self.begin_transmit(start_time, stop_time)
[docs] def receive_pcap( self, out_filename: str, packet_size: int, n_packets: int = 0, start_time: typing.Union[str, None] = None, stop_time: typing.Union[str, None] = None, ) -> None: """ Receive packets into a PCAP file :param out_filename: File path to write to :param packet_size: only packets of this exact size are captured (bytes) :param n_packets: number of packets to receive :param start_time: optional time to begin reception at :param stop_time: optional time to end reception at """ self._end_rx_thread() # cancel any existing Rx wait thread self._rx_finished = False self.timeslave.schedule_control_reset = 1 self._logger.info(f"Scheduling Rx stop time: {stop_time}") self.timeslave.rx_stop_time = stop_time self._logger.info(f"Scheduling Rx start time: {start_time}") self.timeslave.rx_start_time = start_time self.timeslave.schedule_control_reset = 0 self._logger.info("Setting receive parameters") self.hbm_pktcontroller.configure_rx(packet_size, n_packets) if not start_time: self._logger.info("No PTP Rx start time, starting immediately") self.hbm_pktcontroller.start_rx() self._logger.info("Starting thread to wait for completion") self._begin_rx_thread(out_filename, packet_size)
def _begin_rx_thread(self, out_filename, packet_size): """Start a thread to wait for receive completion""" self._rx_cancel.clear() self._rx_thread = threading.Thread( target=self._dump_pcap_when_complete, args=(out_filename, packet_size), ) self._rx_thread.start() def _end_rx_thread(self) -> None: """Close down our last Rx thread""" self.stop_receive() if self._rx_thread: self._rx_thread.join() if self._rx_thread.is_alive(): raise RuntimeError("Previous Rx thread didn't stop")
[docs] def stop_receive(self) -> None: """ Abort a 'receive_pcap' that's still waiting. (e.g. if we set the wrong number of packets to wait for it may never finish automatically) """ if self._rx_thread: self.hbm_pktcontroller.flush() self._rx_cancel.set()
def _dump_pcap_when_complete( self, out_filename: str, packet_size: int, ) -> None: """ Wait for the FPGA to finish receiving packets then write them to disk :param out_filename: File object to write to :param packet_size: Number of Bytes used for each packet """ while not ( self.hbm_pktcontroller.rx_complete.value or ( self.hbm_pktcontroller.rx_packet_count >= self.hbm_pktcontroller.rx_packets_to_capture ) ): if self._rx_cancel.wait(timeout=RX_SLEEP_TIME): break print(".", end="", flush=True) print("") self.hbm_pktcontroller.dump_pcap(out_filename, packet_size) assert ( self.hbm_pktcontroller.last_dumped_pcap.value == out_filename ), "Wrong file written somehow?" self._rx_finished = True @property def finished_receive(self) -> IclField[bool]: """Have we finished receiving, including writing to disk?""" return IclField( description="CNIC Finished Receiving & Writing", type_=bool, value=self._rx_finished, ) @property def finished_transmit(self) -> IclField[bool]: """Have we finished transmitting?""" return self.hbm_pktcontroller.tx_complete @property def enable_vd(self) -> IclField[bool]: """Is Virtual Digitiser enabled?""" if "vd" not in self.peripherals: return IclField( description="Virtual Digitiser Enabled", type_=bool, value=False ) return IclField( description="Virtual Digitiser Enabled", type_=bool, value=self.vd.enable_vd.value and self.spead_sps.enable_packetiser.value and self.vd.enable_vd_hbm_path.value, ) @enable_vd.setter def enable_vd(self, enable: bool): """Enable/Disable Virtual Digitiser.""" if "vd" not in self.peripherals: raise NotImplementedError("No Virtual Digitiser Peripheral") if ( self.hbm_pktcontroller.rx_enable_capture and not self.hbm_pktcontroller.duplex ): raise RuntimeError("Duplex not enabled and Rx already running!") if enable: # Turning on self.vd.enable_vd_hbm_path = True self.spead_sps.enable_packetiser = True self.vd.enable_vd = True else: # Turning off self.vd.enable_vd = False self.vd_datagen.enable_vd = False self.spead_sps.enable_packetiser = False self.vd.reset_vd_data_gen_logic = True time.sleep(1) # Allow HBM transfers to finish self.vd.reset_vd_data_gen_logic = False self.vd.enable_vd_hbm_path = False
[docs] def reset(self): """Reset Receive, Transmit, Virtual Digitiser, PTP, and turn Duplex off.""" self.hbm_pktcontroller.tx_enable = False self.hbm_pktcontroller.tx_reset = True self.hbm_pktcontroller.rx_enable_capture = False self.hbm_pktcontroller.rx_reset_capture = True self.hbm_pktcontroller.duplex = False self.timeslave.reset() if "vd" in self.peripherals: self.enable_vd = False self.vd.use_ptp_to_begin = False # release reset signals self.hbm_pktcontroller.tx_reset = False self.hbm_pktcontroller.rx_reset_capture = False
[docs] def configure_from_yaml(self, yamlstr: str) -> None: """Configure data generator using a YAML string. In case of too many sources (>8192) send the spill over to the 2nd peripheral vd_datagen_2 """ self.enable_vd = False # ensure a valid YAML and a dict containing the "sources" keys: cfg_dict = parse_config(yamlstr) SRC_KEY = "sources" sources = cfg_dict[SRC_KEY] sources_count = len(sources) needs_split = sources_count > MAX_CONFIG_SOURCES self._logger.warning(f"SPLIT: {needs_split}, total sources {sources_count}") if needs_split: # split dictionary into two chunks - one for each FPGA peripheral # vd_datagen and vd_datagen_2 low_src = { SRC_KEY: {k: sources[k] for k in islice(sources, 0, MAX_CONFIG_SOURCES)} } high_src = { SRC_KEY: { i: sources[k] for i, k in enumerate( islice(sources, MAX_CONFIG_SOURCES, sources_count) ) } } self._logger.warning( f"HIGH SRC count: {sources_count - MAX_CONFIG_SOURCES}" ) else: low_src = cfg_dict self.vd_datagen.configure_from_yaml(low_src) if needs_split and "vd_datagen_2" in self.peripherals: self.vd_datagen_2.configure_from_yaml(high_src)
[docs] def configure_vd( self, stream_configs: typing.List[StreamConfig], sps_packet_version: int = 2, spead_epoch: int = 0, spead_timestamp_10ns: int = 0, ska_time: float = 0, max_bursts: typing.Optional[int] = None, ) -> None: """ Configure the VD - SPEAD packetiser & Data Generator. :param stream_configs: All the VD Stream configs to apply :param sps_packet_version: SPS packet version; currently: 2, 3 :param spead_epoch: UNIX timestamp used as SPEAD epoch (SPS SPEAD v1, v2) :param spead_timestamp_10ns: Tens of nanoseconds after epoch (SPS SPEAD v1, v2) :param ska_time: Time since SKA epoch - seconds since 2000 (SPS SPEAD v3) :param max_bursts: Maximum output bursts (defaults to infinite) """ if "vd" not in self.peripherals: raise NotImplementedError("No Virtual Digitiser Peripheral") self.enable_vd = False spead_streams = [stream.spead_stream for stream in stream_configs] # for expediency of implementation, we enforce 8 data sources per stream sources = [ source for stream in stream_configs for source in pad_source_list(stream.sources, SOURCES_PER_STREAM) ] if len(sources) != len(spead_streams) * SOURCES_PER_STREAM: raise RuntimeError( "Mismatch between no. of sources & SPEAD streams (how?!)" ) self.vd.set_time_between_packets_and_bursts(len(spead_streams)) self.vd.sps_packet_version = sps_packet_version self.vd.configure_channels(spead_streams) if sps_packet_version < 3: self.vd.configure_time_v2(spead_epoch, spead_timestamp_10ns) if ska_time: warn("ska_time supplied - ignored when using SPS v2") else: self.vd.configure_time(ska_time) if spead_epoch or spead_timestamp_10ns: warn("SPEAD epoch/timestamp supplied - ignored when using SPS v3") self.vd.set_max_bursts(max_bursts) self.vd_datagen.configure(sources[:MAX_CONFIG_SOURCES]) self.vd_datagen_2.configure( sources[MAX_CONFIG_SOURCES : MAX_CONFIG_SOURCES * 2] ) if len(sources) > MAX_CONFIG_SOURCES * 2: warn(f"{len(sources)} Sources > limit {MAX_CONFIG_SOURCES * 2}") # hard-coding Ethernet settings to get started quickly self.spead_sps.ethernet_source = "00:11:22:33:44:55" self.spead_sps.ethernet_destination = "00:11:22:33:44:55" self.spead_sps.ipv4_source = "10.0.0.1" self.spead_sps.ipv4_destination = "10.0.0.2" self.spead_sps.udp_src_port = 4660 self.spead_sps.udp_dst_port = 4660
[docs] def configure_next_delay_polynomials( self, polynomials: typing.List[typing.List[float]], activation_time: int, data_start_sec: int = 0, data_start_ns: int = 0, ) -> None: """ Configure the entire set of delay polynomials to use at the next activation time. You must supply exactly one polynomial per source, in order. :param polynomials: one per source - see DataGenerator.set_delay_polynomial :param activation_time: datagen time at which this set of polynomials will activate :param data_start_sec: vd datagen start seconds, used only after reset :param data_start_ns: vd_datagen start nanoseconds, used only after reset """ # check length of supplied polynomials list expected_length = self.vd.configured_channels.value * SOURCES_PER_STREAM if len(polynomials) != expected_length: raise ValueError( f"Expected {expected_length} polys, got {len(polynomials)}" ) self.vd_datagen.configure_next_delay_polynomials( polynomials[:MAX_CONFIG_SOURCES], activation_time, data_start_sec, data_start_ns, ) if len(polynomials) > MAX_CONFIG_SOURCES: self.vd_datagen_2.configure_next_delay_polynomials( polynomials[MAX_CONFIG_SOURCES : MAX_CONFIG_SOURCES * 2], activation_time, data_start_sec, data_start_ns, )