# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
CNIC FPGA Firmware ICL (Instrument Control Layer)
"""
import logging
import threading
import time
import typing
from itertools import islice
from warnings import warn
from ska_low_cbf_fpga import ArgsMap, FpgaHardwareInfo, FpgaPersonality, IclField
from ska_low_cbf_sw_cnic.icl.hbm_packet_controller import HbmPacketController
from ska_low_cbf_sw_cnic.icl.no_attributes import NoAttributes
from ska_low_cbf_sw_cnic.icl.ptp import Ptp
from ska_low_cbf_sw_cnic.icl.ptp_scheduler import PtpScheduler
from ska_low_cbf_sw_cnic.icl.spead_sps import SpeadSPS
from ska_low_cbf_sw_cnic.icl.vd_config import StreamConfig
from ska_low_cbf_sw_cnic.icl.vd_datagen import (
MAX_CONFIG_SOURCES,
SOURCES_PER_STREAM,
DataGenerator,
pad_source_list,
)
from ska_low_cbf_sw_cnic.icl.virtual_digitiser import VirtualDigitiser
from ska_low_cbf_sw_cnic.util.pcap import count_packets_in_pcap, packet_size_from_pcap
from ska_low_cbf_sw_cnic.util.vd_datagen import parse_config
RX_SLEEP_TIME = 5
"""wait this many seconds between checking if Rx is finished"""
LOAD_SLEEP_TIME = 5
"""wait this many seconds between checking if Load is finished"""
[docs]class CnicFpga(FpgaPersonality):
_peripheral_class = {
"hbm_pktcontroller": HbmPacketController,
"timeslave": PtpScheduler,
"timeslave_b": Ptp,
# Note: may want to see cmac_b attrs if 2nd port in use
"cmac_b": NoAttributes,
"drp": NoAttributes,
"vitis_shared": NoAttributes,
"vd": VirtualDigitiser,
"vd_datagen": DataGenerator,
"vd_datagen_2": DataGenerator,
"spead_sps": SpeadSPS,
}
[docs] def __init__(
self,
driver,
map_: ArgsMap,
hardware_info: FpgaHardwareInfo = None,
logger: logging.Logger = None,
ptp_domain: int = 24,
ptp_source_b: bool = False,
firmware_version_check: bool = True,
**kwargs,
) -> None:
"""
CNIC FPGA Personality ICL Class
:param driver: see FpgaPersonality
:param map_: see FpgaPersonality
:param logger: see FpgaPersonality
:param ptp_domain: PTP domain number
:param ptp_source_b: Use PTP source B?
(Note: only present for some firmware versions / FPGA cards)
:param firmware_version_check: Enforce firmware version restriction?
"""
super().__init__(driver, map_, logger=logger, hardware_info=hardware_info)
# check FW version (earlier versions lack some registers we use)
print(f"FW Version check: {firmware_version_check}")
if firmware_version_check:
if "vd" in self.peripherals:
self._check_fw("CNIC", ">=0.1.12")
else:
# Non-VD builds do not need to be so fresh
self._check_fw("CNIC", ">=0.1.6")
else:
self._logger.warning("Firmware version check skipped")
if self.info is None:
# info interface only implemented for ArgsXrt driver...
self._logger.error("FPGA info interface not available! No PTP possible!")
else:
# We don't always have 2x PTP cores
ethernet_ports = len(self.info["platform"]["macs"]) // 4
if ethernet_ports > 0:
self._configure_ptp(self["timeslave"], ptp_domain, 0)
if ethernet_ports > 1:
self._configure_ptp(self["timeslave_b"], ptp_domain, 1)
self._logger.info(f"PTP Source: {'B' if ptp_source_b else 'A'}")
self["timeslave"].ptp_source_select = ptp_source_b
else:
self["timeslave"].ptp_source_select = 0
if ptp_source_b:
self._logger.warning("No PTP source B available")
else:
self._logger.error(
"Could not read card MAC addresses! No PTP possible!"
)
self._rx_cancel = threading.Event()
self._rx_thread = None
self._rx_finished = False
self._load_thread = None
self._requested_pcap = None
def __del__(self):
"Cleanup; base class might hold an Alveo card lock"
super().__del__()
def _configure_ptp(
self, ptp: Ptp, ptp_domain: int, alveo_mac_index: int = 0
) -> None:
"""
Configure a PTP Peripheral
:param ptp: Ptp (FpgaPeripheral) object to configure
:param alveo_mac_index: which Alveo MAC address to use as basis for PTP
MAC address
:param ptp_domain: PTP domain number
"""
alveo_macs = [_["address"] for _ in self.info["platform"]["macs"]]
alveo_mac = alveo_macs[alveo_mac_index]
# MAC is str, colon-separated hex bytes "01:02:03:04:05:06"
self._logger.info(f"Alveo MAC address: {alveo_mac}")
# take low 3 bytes of mac, convert to int
alveo_mac_low = int("".join(alveo_mac.split(":")[-3:]), 16)
# configure the PTP core to use the same low 3 MAC bytes
# (high bytes are set by the PTP core)
ptp.startup(alveo_mac_low, ptp_domain)
self._logger.info(f" PTP MAC address: {ptp.mac_address.value}")
[docs] def prepare_transmit(
self,
in_filename: str,
n_loops: int = 1,
burst_size: int = 1,
burst_gap: typing.Union[int, None] = None,
rate: float = 100.0,
n_packets: int = 0,
) -> None:
"""
Prepare for transmission
:param str in_filename: input PCAP(NG) file path
:param int n_loops: number of loops
:param int burst_size: packets per burst
:param burst_gap: packet burst period (ns), overrides rate
:param float rate: transmission rate (Gigabits per sec),
ignored if burst_gap given
:param n_packets: Number of packets to transmit, if 0 then
send all in the file
"""
if self._load_thread_active:
raise RuntimeError(f"Loading {self._requested_pcap} still in progress!")
self._requested_pcap = in_filename
packet_size = packet_size_from_pcap(in_filename)
if self.hbm_pktcontroller.loaded_pcap.value == self._requested_pcap:
# if we've already loaded the pcap, use the old count
# (it may be less than the number of packets in the file!)
pcap_packets = self.hbm_pktcontroller.loaded_pcap_packets.value
self._logger.info(
f"File {in_filename} already loaded, {pcap_packets} packets"
)
else:
pcap_packets = count_packets_in_pcap(in_filename)
self._logger.info(f"Counted {pcap_packets} packets in {in_filename} file")
if pcap_packets == 0:
self._logger.error(f"ERROR: 0 packets in {in_filename} file")
return
if n_packets == 0:
tx_packets = pcap_packets
else:
tx_packets = min(pcap_packets, n_packets)
self._logger.info(f"Configured to send {n_packets}")
self.hbm_pktcontroller.tx_enable = False
self.hbm_pktcontroller.tx_reset = True
self.timeslave.schedule_control_reset = 1
self.hbm_pktcontroller.configure_tx(
packet_size, tx_packets, n_loops, burst_size, burst_gap, rate
)
if self.hbm_pktcontroller.loaded_pcap.value != self._requested_pcap:
self._logger.info(f"About to load {in_filename}")
self._load_thread = threading.Thread(
target=self.hbm_pktcontroller.load_pcap, args=(in_filename,)
)
self._load_thread.start()
self._logger.info(
f"Prepared to send {n_loops} loop(s), {tx_packets} packets "
f"of size {packet_size} @ {rate} rate"
)
@property
def _load_thread_active(self) -> bool:
"""Is the PCAP load thread active?"""
if self._load_thread:
if self._load_thread.is_alive():
return True
self._load_thread.join()
self._load_thread = None
return False
@property
def ready_to_transmit(self) -> IclField[bool]:
"""Can we transmit? i.e. Is our PCAP file loaded?"""
value = False
if self._requested_pcap and not self._load_thread_active:
value = self.hbm_pktcontroller.loaded_pcap.value == self._requested_pcap
return IclField(
description="CNIC Ready to Transmit",
type_=bool,
value=value,
user_write=False,
)
[docs] def begin_transmit(
self,
start_time: typing.Union[str, None] = None,
stop_time: typing.Union[str, None] = None,
) -> None:
"""
Begin Transmission (either now or later)
:param start_time: optional time to begin transmission at
(start now if not otherwise specified)
:param stop_time: optional time to end transmission at
"""
self.hbm_pktcontroller.tx_reset = False
self._logger.info(f"Scheduling Tx stop time: {stop_time}")
self.timeslave.tx_stop_time = stop_time
self._logger.info(f"Scheduling Tx start time: {start_time}")
self.timeslave.tx_start_time = start_time
self.timeslave.schedule_control_reset = 0
if not start_time:
self._logger.info("Starting transmission")
self.hbm_pktcontroller.start_tx()
[docs] def transmit_pcap(
self,
in_filename: str,
n_loops: int = 1,
burst_size: int = 1,
burst_gap: typing.Union[int, None] = None,
rate: float = 100.0,
start_time: typing.Union[str, None] = None,
stop_time: typing.Union[str, None] = None,
n_packets: int = 0,
) -> None:
"""
Transmit packets from a PCAP file
:param str in_filename: input PCAP(NG) file path
:param int n_loops: number of loops (default 1)
:param int burst_size: packets per burst (default 1)
:param int burst_gap: packet burst period (ns), overrides rate
:param float rate: transmission rate (Gigabits per sec),
ignored if burst_gap given (default 100.0)
:param start_time: optional time to begin transmission at
(start now if not otherwise specified)
:param stop_time: optional time to end transmission at
:param n_packets: Number of packets to transmit, defaults to all
"""
self.prepare_transmit(
in_filename, n_loops, burst_size, burst_gap, rate, n_packets
)
while not self.ready_to_transmit:
self._logger.info("Still loading PCAP file")
time.sleep(LOAD_SLEEP_TIME)
self.begin_transmit(start_time, stop_time)
[docs] def receive_pcap(
self,
out_filename: str,
packet_size: int,
n_packets: int = 0,
start_time: typing.Union[str, None] = None,
stop_time: typing.Union[str, None] = None,
) -> None:
"""
Receive packets into a PCAP file
:param out_filename: File path to write to
:param packet_size: only packets of this exact size are captured (bytes)
:param n_packets: number of packets to receive
:param start_time: optional time to begin reception at
:param stop_time: optional time to end reception at
"""
self._end_rx_thread() # cancel any existing Rx wait thread
self._rx_finished = False
self.timeslave.schedule_control_reset = 1
self._logger.info(f"Scheduling Rx stop time: {stop_time}")
self.timeslave.rx_stop_time = stop_time
self._logger.info(f"Scheduling Rx start time: {start_time}")
self.timeslave.rx_start_time = start_time
self.timeslave.schedule_control_reset = 0
self._logger.info("Setting receive parameters")
self.hbm_pktcontroller.configure_rx(packet_size, n_packets)
if not start_time:
self._logger.info("No PTP Rx start time, starting immediately")
self.hbm_pktcontroller.start_rx()
self._logger.info("Starting thread to wait for completion")
self._begin_rx_thread(out_filename, packet_size)
def _begin_rx_thread(self, out_filename, packet_size):
"""Start a thread to wait for receive completion"""
self._rx_cancel.clear()
self._rx_thread = threading.Thread(
target=self._dump_pcap_when_complete,
args=(out_filename, packet_size),
)
self._rx_thread.start()
def _end_rx_thread(self) -> None:
"""Close down our last Rx thread"""
self.stop_receive()
if self._rx_thread:
self._rx_thread.join()
if self._rx_thread.is_alive():
raise RuntimeError("Previous Rx thread didn't stop")
[docs] def stop_receive(self) -> None:
"""
Abort a 'receive_pcap' that's still waiting.
(e.g. if we set the wrong number of packets to wait for it may never
finish automatically)
"""
if self._rx_thread:
self.hbm_pktcontroller.flush()
self._rx_cancel.set()
def _dump_pcap_when_complete(
self,
out_filename: str,
packet_size: int,
) -> None:
"""
Wait for the FPGA to finish receiving packets then write them to disk
:param out_filename: File object to write to
:param packet_size: Number of Bytes used for each packet
"""
while not (
self.hbm_pktcontroller.rx_complete.value
or (
self.hbm_pktcontroller.rx_packet_count
>= self.hbm_pktcontroller.rx_packets_to_capture
)
):
if self._rx_cancel.wait(timeout=RX_SLEEP_TIME):
break
print(".", end="", flush=True)
print("")
self.hbm_pktcontroller.dump_pcap(out_filename, packet_size)
assert (
self.hbm_pktcontroller.last_dumped_pcap.value == out_filename
), "Wrong file written somehow?"
self._rx_finished = True
@property
def finished_receive(self) -> IclField[bool]:
"""Have we finished receiving, including writing to disk?"""
return IclField(
description="CNIC Finished Receiving & Writing",
type_=bool,
value=self._rx_finished,
)
@property
def finished_transmit(self) -> IclField[bool]:
"""Have we finished transmitting?"""
return self.hbm_pktcontroller.tx_complete
@property
def enable_vd(self) -> IclField[bool]:
"""Is Virtual Digitiser enabled?"""
if "vd" not in self.peripherals:
return IclField(
description="Virtual Digitiser Enabled", type_=bool, value=False
)
return IclField(
description="Virtual Digitiser Enabled",
type_=bool,
value=self.vd.enable_vd.value
and self.spead_sps.enable_packetiser.value
and self.vd.enable_vd_hbm_path.value,
)
@enable_vd.setter
def enable_vd(self, enable: bool):
"""Enable/Disable Virtual Digitiser."""
if "vd" not in self.peripherals:
raise NotImplementedError("No Virtual Digitiser Peripheral")
if (
self.hbm_pktcontroller.rx_enable_capture
and not self.hbm_pktcontroller.duplex
):
raise RuntimeError("Duplex not enabled and Rx already running!")
if enable: # Turning on
self.vd.enable_vd_hbm_path = True
self.spead_sps.enable_packetiser = True
self.vd.enable_vd = True
else: # Turning off
self.vd.enable_vd = False
self.vd_datagen.enable_vd = False
self.spead_sps.enable_packetiser = False
self.vd.reset_vd_data_gen_logic = True
time.sleep(1) # Allow HBM transfers to finish
self.vd.reset_vd_data_gen_logic = False
self.vd.enable_vd_hbm_path = False
[docs] def reset(self):
"""Reset Receive, Transmit, Virtual Digitiser, PTP, and turn Duplex off."""
self.hbm_pktcontroller.tx_enable = False
self.hbm_pktcontroller.tx_reset = True
self.hbm_pktcontroller.rx_enable_capture = False
self.hbm_pktcontroller.rx_reset_capture = True
self.hbm_pktcontroller.duplex = False
self.timeslave.reset()
if "vd" in self.peripherals:
self.enable_vd = False
self.vd.use_ptp_to_begin = False
# release reset signals
self.hbm_pktcontroller.tx_reset = False
self.hbm_pktcontroller.rx_reset_capture = False