Source code for virtual_digitiser

# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Virtual Digitiser ICL (abstraction)
"""
import json
import math
import typing
import warnings
from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
from typing import Sequence

import numpy as np
from ska_low_cbf_fpga import DISCOVER_ALL, FpgaPeripheral, IclField
from ska_low_cbf_fpga.args_fpga import ArgsWordType

from ska_low_cbf_sw_cnic.icl.ptp_scheduler import TIME_STR_FORMAT


[docs]class SpeadVersion(IntEnum): """ Supported SPEAD packet versions. For details on packet contents, see `CNIC firmware docs <https://developer.skao.int/projects/ska-low-cbf-fw-cnic/en/latest>`_ or `SPS v3 ECP <https://confluence.skatelescope.org/display/CMI/ECP-230134+-+Update+time+fields+in+the+LOW+CBF+-+SPS+ICD>`_. """ v2 = 2 v3 = 3
[docs]@dataclass class VDChannelConfig: """Virtual Digitiser Channel Configuration""" scan: int beam: int frequency: int substation: int subarray: int station: int
[docs] def as_registers(self, spead_version: int = SpeadVersion.v2) -> np.ndarray: """ Render as np array, ready to write to FPGA data RAM. Note: for v2 SPS SPEAD format only. """ scan_id_low = self.scan & 0x0_FFFF_FFFF scan_id_high = ( (self.scan >> 32) & 0x0_FFFF if spead_version == SpeadVersion.v3 else 0 ) return np.array( [ scan_id_low, self.beam, self.frequency, self.substation, self.subarray, self.station, scan_id_high, ], dtype=ArgsWordType, )
[docs] @classmethod def from_registers( cls, registers: np.ndarray, spead_version: int = SpeadVersion.v2 ): """ Create a VDChannelConfig object from a row of config table registers. Note: for v2 SPS SPEAD format only. :param registers: a slice of CONFIG_ROW_WORDS registers :return: VDChannelConfig object to match supplied registers """ scan_id = int(registers[0]) if spead_version == SpeadVersion.v3: # apply the high portion of scan ID scan_id |= int(0x0_FFFF & registers[6]) << 32 return cls( scan=scan_id, beam=int(registers[1]), frequency=int(registers[2]), substation=int(registers[3]), subarray=int(registers[4]), station=int(registers[5]), )
def __iter__(self): # allows VDChannelConfig to be converted to JSON for key in dir(self): if not key.startswith("_"): value = getattr(self, key) if not callable(value): yield key, value
# Each row of the config table is this many words # (note: not equal to the number of parameters in a channel config) CONFIG_ROW_WORDS = 8 r""" ====== ======================= =============== ================== Word v1 v2 v3 (proposed) ====== ======================= =============== ================== 0 Frequency Hz low 32b Scan ID Scan ID low 32b 1 Beam ID Beam ID Beam ID 2 Frequency ID Frequency ID Frequency ID 3 Substation ID Substation ID Substation ID 4 Subarray ID Subarray ID Subarray ID 5 Station ID Station ID Station ID 6 Frequency Hz high 16b [unused] Scan ID high 16b 7 [unused] [unused] [unused] ====== ======================= =============== ================== """ MAX_CONFIG_ROWS = 4096 CONFIG_EMPTY = 65535 # special startup value for 'number of valid lines' MAX_BURST_ENABLE_BIT = 1 << 31 """Top bit of Max. Bursts register used as enable bit.""" SPS_PACKET_PERIOD = 2048 * 1080e-9 """SPS packet period in seconds.""" def _split_into_two(x: int) -> (int, int): """ Split a number into two 32-bit register values Returns: High word, Low word """ return (x >> 32), x & 0xFFFF_FFFF def _time_between_streams(n_streams: int) -> int: """ Calculate required time delay between packets, given number of SPEAD streams. We want a data rate of ~30Gbps as expected from SPS for 576 streams (505 cycles), need to run faster for more streams. Equation based on 100 cycles @ 1728 streams. :param n_streams: number of SPEAD streams :returns: time delay between packets in FPGA clock cycles (300MHz) """ return max(round(505 - 0.3515625 * max(n_streams - 576, 0)), 0)
[docs]class VirtualDigitiser(FpgaPeripheral): _user_attributes = {DISCOVER_ALL} _not_user_attributes = { "max_number_of_vd_bursts", # bitfield decoded in our properties "no_of_packets_this_config_l", # lower/upper values merged in property "no_of_packets_this_config_u", "no_of_sps_packets_l", "no_of_sps_packets_u", "packet_counter", "packet_counter_u", "unix_epoch_time_l", "unix_epoch_time_u", "timestamp_l", "timestamp_u", "data", "number_of_valid_lines_in_vd_ram", "sps_packet_version_select", } # No methods should be driven by control system directly _user_methods = None
[docs] def configure_channels(self, configs: Sequence[VDChannelConfig]) -> None: """ Configure all the Virtual Digitiser channels you want to use :param configs: all the channel configurations """ if len(configs) > MAX_CONFIG_ROWS: raise RuntimeError(f"Too many channels - max {MAX_CONFIG_ROWS}") # note: register wants to be told "valid lines minus one" self.number_of_valid_lines_in_vd_ram = len(configs) - 1 for n, config in enumerate(configs): self.data[n * CONFIG_ROW_WORDS] = config.as_registers( self.sps_packet_version.value )
[docs] def configure_time(self, ska_time: float = 0) -> None: """ Configure time for SPS SPEAD protocol v3 headers. :param ska_time: SKA epoch based time - seconds since 2000 """ self.enable_vd = 0 high_word, low_word = _split_into_two(math.ceil(ska_time / SPS_PACKET_PERIOD)) if high_word > 0xFF: raise ValueError(f"Overflow! Packet count at t={ska_time} exceeds 40 bits") self.packet_counter_u = high_word self.packet_counter = low_word
[docs] def configure_time_v2(self, epoch: int = 0, timestamp: int = 0) -> None: """ Set time & packet count parameters for SPS SPEAD protocol v1/v2 headers. :param epoch: UNIX timestamp used as SPEAD epoch :param timestamp: 10s of nanoseconds from epoch """ self.enable_vd = 0 # timestamps are good, ... self.timestamp_u, self.timestamp_l = _split_into_two(timestamp) self.unix_epoch_time_u, self.unix_epoch_time_l = _split_into_two(epoch) # but only the packet counter is used by the correlator self.packet_counter = math.ceil(epoch / SPS_PACKET_PERIOD) self.packet_counter_u = 0
[docs] def configure_pulsar_mode(self, enable: bool, sample_count: tuple = ()) -> None: """ Configure Virtual Digitiser pulsar mode. Write 'count' registers before enabling the mode. :param enable: enable/disable pulsar mode :param sample_count: sample counts for start/on/off conditions, defaults to () """ if len(sample_count) == 3: # temporarily disable the mode until 'count' registers are updated self.enable_pulsar_timing = 0 # NOTE the caller (CnicDevice.ConfigurePulsarMode) has already # ensured the 'count' values are within the allowed range self.pulsar_start_sample_count = sample_count[0] self.pulsar_on_sample_count = sample_count[1] self.pulsar_off_sample_count = sample_count[2] self.enable_pulsar_timing = 1 if enable else 0
@property def packet_count(self) -> IclField[int]: """Get combined value of packet_counter & packet_counter_u.""" total_packets = (self.packet_counter_u.value << 32) | self.packet_counter.value return IclField( description="SPS packet counter", type_=int, value=total_packets ) @property def sps_packets(self) -> IclField[int]: total_packets = ( self.no_of_sps_packets_u.value << 32 ) | self.no_of_sps_packets_l.value return IclField( description="Total SPS Packets Sent", type_=int, value=total_packets, ) @property def session_packets(self) -> IclField[int]: """Get number of packets sent since VD last enabled.""" total_packets = ( self.no_of_packets_this_config_u.value << 32 ) | self.no_of_packets_this_config_l.value return IclField( description="Total packets sent this session (i.e. since enabled)", type_=int, value=total_packets, ) @property def epoch(self) -> IclField[str]: timestamp = (self.unix_epoch_time_u.value << 32) | self.unix_epoch_time_l.value return IclField( description="SPEAD epoch for SPS protocol v1/v2", type_=str, value=datetime.fromtimestamp(timestamp).strftime(TIME_STR_FORMAT), ) @property def timestamp(self) -> IclField[int]: timestamp = (self.timestamp_u.value << 32) | self.timestamp_l.value return IclField( description="SPEAD timestamp (10s of ns) for SPS protocol v1/v2", type_=int, value=timestamp, ) @property def configured_channels(self) -> IclField[int]: """Number of configured SPEAD channels (1 means 1).""" # register is "table size minus one" n_lines = int(self.number_of_valid_lines_in_vd_ram) if n_lines == CONFIG_EMPTY: n_channels = 0 else: n_channels = n_lines + 1 return IclField( description="Configured SPEAD channels", type_=int, value=n_channels, )
[docs] def get_channel_config(self, n: int) -> VDChannelConfig: """ Get one SPEAD channel's configuration. :param n: zero-indexed channel number (row in data table) :raises RuntimeError: if not configured """ if n >= self.configured_channels: raise RuntimeError(f"Channel {n} is not configured.") if n >= MAX_CONFIG_ROWS: raise RuntimeError(f"Channel {n} is beyond table limit.") reg_slice = slice(n * CONFIG_ROW_WORDS, (n + 1) * CONFIG_ROW_WORDS) return VDChannelConfig.from_registers( self.data[reg_slice], self.sps_packet_version.value )
@property def configuration(self) -> IclField[str]: """Get the current configuration (JSON).""" configs = [ self.get_channel_config(n) for n in range(self.configured_channels.value) ] return IclField(type_=str, value=json.dumps(configs, default=dict)) @property def last_used_datagen_buffer(self) -> IclField[int]: """ The last used (i.e. idle) Data Generator config buffer. The buffer is an internal interface between the DataGenerator and packetiser vd_buffer_gen_status is a 4-bit wide FPGA register b0: last buffer written 0 b1: last buffer written 1 b2: buffer 0 generation in progress b3: buffer 1 generation in progress :returns: -1 if neither has been used """ status = self.vd_buffer_gen_status.value last = -1 if status & 1: last = 0 elif status & 2: last = 1 return IclField( description="Last used VD Datagen config buffer", type_=int, value=last, ) # It might have been nice to combine the two max_bursts properties, # using 'None' for 'infinite', but I have a feeling it would break Tango. @property def max_bursts_enabled(self) -> IclField[bool]: """Output burst limit active?""" enabled = False if hasattr(self, "max_number_of_vd_bursts"): enabled = bool(self.max_number_of_vd_bursts.value & MAX_BURST_ENABLE_BIT) return IclField( type_=bool, value=enabled, description="Is VD burst limit enabled?" ) @property def max_bursts(self) -> IclField[int]: """Output burst limit value.""" n = 0 if hasattr(self, "max_number_of_vd_bursts"): n = self.max_number_of_vd_bursts.value & (~MAX_BURST_ENABLE_BIT) return IclField(type_=int, value=n, description="VD output burst limit")
[docs] def set_max_bursts(self, max_bursts: typing.Optional[int]) -> None: """ Set the output burst limit. :param max_bursts: Maximum VD output bursts. Use ``None`` to run forever. """ if not hasattr(self, "max_number_of_vd_bursts") and max_bursts is not None: raise NotImplementedError("Firmware does not support maximum burst setting") if max_bursts is None: self.max_number_of_vd_bursts = 0 else: self.max_number_of_vd_bursts = max_bursts | MAX_BURST_ENABLE_BIT
[docs] def set_time_between_packets_and_bursts(self, n_streams: int) -> None: """ Configure VD packet delays. :param n_streams: Number of SPEAD streams """ self.time_between_channel_bursts = 2_211_840 # Matches SPS packet rate [ns] self.time_between_packets = _time_between_streams(n_streams)
@property def sps_packet_version(self) -> IclField[int]: """SPS packet version in use (1 means v1).""" fpga_value = self.sps_packet_version_select.value return IclField( type_=int, value=fpga_value + 1, description="SPS SPEAD protocol version" ) @sps_packet_version.setter def sps_packet_version(self, version) -> None: """ Select SPS packet version to use. :param version: packet/protocol version - 1 means v1 """ if version not in (SpeadVersion.v2, SpeadVersion.v3): warnings.warn( f"Using v{version}, but SW only handles v2 and v3 properly. " "Here there be dragons!" ) self.sps_packet_version_select = version - 1