Source code for ska_low_sps_tpm_api.plugins.station_beamf
import time
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.boards.tpm_hw_definitions import (
FPGA_FW_8_BIT_SUBARRAY_SUBSTATION,
)
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
__author__ = "Giovanni Comoretto"
[docs]
class StationBeamformer(FirmwareBlock):
"""
Ring (station) beamformer
"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("station_beamf")
@maxinstances(2)
def __init__(self, board, logger=None, **kwargs):
"""
StationBeamformer initialiser.
:param board: Pointer to board instance
"""
super(StationBeamformer, self).__init__(board, logger=logger)
if "device" not in list(kwargs.keys()):
raise PluginError("StationBeamformer: Require a node instance")
self._device = kwargs["device"]
if self._device == Device.FPGA_1:
self._device = "fpga1"
elif self._device == Device.FPGA_2:
self._device = "fpga2"
else:
raise PluginError(f"StationBeamformer: Invalid device {self._device}")
# Number of channels, beams, etc
self.max_nof_chans = 384
self.nof_chans = 384 # 192 channels/FPGA for now (384 total)
self.station_id = self.board[self._device + ".beamf_ring.frame_id.station_id"]
# Set some defaults
# These should all be overriden with current firmware values at the end of
# __init__ with calls to _read_channel_table and _read_start_stop_table
self.beam_table = 48 * [0]
self.freq_table = list(range(64, 512 - 64, 8))
self.subarray_table = 64 * [1]
self.subarray_channel_table = list(range(0, 512, 8))
self.subarray_beam_table = 64 * [0]
self.substation_table = 64 * [0]
self.aperture_table = 64 * [0]
self.scan_id_table = 64 * [0]
self.start_frame_table = 64 * [8]
self.stop_frame_table = 64 * [0]
fw_8bit_subarray_substation = (
self.board.tpm_fpga[0]._fpga_firmware >= FPGA_FW_8_BIT_SUBARRAY_SUBSTATION
)
self.subarray_mask = 0xFF if fw_8bit_subarray_substation else 0x3F
self.substation_mask = 0xFF if fw_8bit_subarray_substation else 0x3F
self.subarray_width = 8 if fw_8bit_subarray_substation else 6
# Lots of magic numbers. To be tuned for best performance
# Timing
# TPM to TPM frame time is enough to send all packets for
# the maximum number of channels, (8 packets for
# 4 channels in each FPGA, so 384 packets total),
# in 90% of the time required to receive them.
#
# CSP frame time is a bit less than twice: 4 packets sent every
# every 8 received from the other tiles.
# It is set to 85% of the TPM to TPM frame time, so packets
# may arrived a bit packed together without packet loss
self.clock_frequency = 237.037e6
self.tpm_frame_time = 1080e-9 * 2048 / self.max_nof_chans * 0.9
self.csp_frame_time = self.tpm_frame_time * 2 * 0.85
self.sps_frame_time = 1.08e-6 * 1000 # time of one frame packet in ms
# Corner turner block length in CSP blocks
# 1 means that CSP blocks cycle through all channels
# without further corner turning
self.int_block_length = 1
self.int_block_ovl = 0
# DDR timeout in clock cycles
self.ddr_timeout = 1500
# CSP scaling in bits
self.csp_scaling = self.board[self._device + ".beamf_ring.csp_scaling"]
if self.board[f"{self._device}.beamf_ring.control.first_tile"] == 1:
self.first_tile = True
else:
self.first_tile = False
if self.board[f"{self._device}.beamf_ring.control.last_tile"] == 1:
self.last_tile = True
else:
self.last_tile = False
self._read_channel_table() # tries to read the channel table
self._read_start_stop_table() # read back the start/stop time table
# set default DSP latency threshold for error detection to 10 ms
self.set_dsp_latency_error_threshold(10)
############################################################################
# Defines if a tile is first, last, both or intermediate.
############################################################################
[docs]
def set_first_last_tile(self, is_first, is_last):
"""
Defines if a tile is first, last, both or intermediate
One, and only one tile must be first, and last, in a chain
A tile can be both (one tile chain), or none
:param isFirst: Tile is first in the beamformer chain
:param isLast: Tile is first in the beamformer chain
"""
if self.is_running():
self.logger.warning("Beamformer is running: can not set first/last tile")
return False
self.board[self._device + ".beamf_ring.control.reset"] = 1
if is_first:
self.board[self._device + ".beamf_ring.control.first_tile"] = 1
self.first_tile = True
else:
self.board[self._device + ".beamf_ring.control.first_tile"] = 0
self.first_tile = False
if is_last:
self.board[self._device + ".beamf_ring.control.last_tile"] = 1
self.last_tile = True
else:
self.board[self._device + ".beamf_ring.control.last_tile"] = 0
self.last_tile = False
self.board[self._device + ".beamf_ring.control.reset"] = 0
return True
############################################################################
# Initialize
# Resets hardware.
############################################################################
[docs]
def initialise_beamf(self):
"""
Initialise Station Beamformer
"""
if self.is_running():
self.logger.warning("Beamformer is running: can not initialise")
return False
self._program_timing()
# set global start and stop time in the past, to stop the beamforming
# while other parameters are set
# Beamformer must be started only after synchronization
self.board[self._device + ".beamf_ring.start_frame"] = 8
self.board[self._device + ".beamf_ring.last_frame"] = 0
# set station beamformer number of channels and scaling to default
self.board[self._device + ".beamf_ring.ch_n"] = self.nof_chans
self.board[self._device + ".beamf_ring.csp_scaling"] = self.csp_scaling
# disable flagging by default
self.disable_flagging()
# reset errors
self.board[self._device + ".beamf_ring.control.error_rst"] = 1
self.board[self._device + ".beamf_ring.control.error_rst"] = 0
self.board[self._device + ".beamf_ring.control.reset"] = 0
# If the start&stop table is present, use that.
if self.board.has_register(self._device + ".beamf_ring.start_tab"):
# Initialise beamformer with all beams stopped.
# This fixes an issue where due to the firmware defaults for start_tab
# and stop_tab, the first call to start_beamformer after initialise started
# all configured beams regardless of argument.
self.board[self._device + ".beamf_ring.start_tab"] = 64 * [8]
self.board[self._device + ".beamf_ring.stop_tab"] = 64 * [0]
self._read_start_stop_table()
self.logger.info("StationBeamformer has been initialised")
return True
def _program_start_stop_time(
self,
start_time,
stop_time,
scan_id=0,
channel_groups=range(48),
):
"""
Set the start and stop time, private method.
Set the start and stop time for the channel groups specified in the
channel_groups list.
Packets are transmitted for frame count >= start_time and < stop_time
If multiple start and stop times are not supported in firmware, applies
to all channels.
Channel groups list groups of 8 channels, e.g. group 0 for channels 0-7
Modification applies immediately.
:param start_time: start time, in frame counts (256 channelized samples)
:param stop time: in frame counts
:param scan_id: scan ID for the affected channel groups
:param channel_groups: list of affected channel groups
"""
nof_groups = self.nof_chans // 8
# Program scan_id
for c in channel_groups:
if c < nof_groups:
self.scan_id_table[c] = scan_id
self.start_frame_table[c] = start_time
self.stop_frame_table[c] = stop_time
for i in range(nof_groups, 64):
self.start_frame_table[i] = 8
self.stop_frame_table[i] = 0
if self.board.has_register(self._device + ".beamf_ring.start_tab"):
self.board[self._device + ".beamf_ring.start_tab"] = self.start_frame_table
self.board[self._device + ".beamf_ring.stop_tab"] = self.stop_frame_table
# if global beamformer is stopped, then restart it
# Usually it happens at the first call after synchronisation
if (
self.board[self._device + ".beamf_ring.last_frame"]
<= self.board[self._device + ".beamf_ring.current_frame"]
):
self.board[self._device + ".beamf_ring.start_frame"] = start_time
self.board[self._device + ".beamf_ring.last_frame"] = 0xFFFFFFF8
else: # set global start and stop time
self.board[self._device + ".beamf_ring.start_frame"] = start_time
self.board[self._device + ".beamf_ring.last_frame"] = stop_time
if self.board.has_register(self._device + ".beamf_ring.scan_id_tab"):
self.board[self._device + ".beamf_ring.scan_id_tab"] = self.scan_id_table[
0:nof_groups
]
[docs]
def set_scan_id(self, scan_id=0, channel_groups=range(48)):
"""
Set the scan ID for the selected channel group.
:param scan_id: scan ID for the affected channel groups
:param channel_groups: list of affected channel groups
"""
nof_groups = self.nof_chans // 8
# Program scan_id
for c in channel_groups:
if c < nof_groups:
self.scan_id_table[c] = scan_id
if self.board.has_register(self._device + ".beamf_ring.scan_id_tab"):
self.board[self._device + ".beamf_ring.scan_id_tab"] = self.scan_id_table[
0:nof_groups
]
############################################################################
# Private method to program the channel table
# from the values stored in the object
# Valid only for the last tile
############################################################################
def _program_channels(self):
"""
Private method to program the channel table
from the values stored in the object
Data is actually used only in the last tile,
for other tiles is just stored there
"""
# Change the total number of channels to those used
# TODO: Check that if the number of channels has varied, those which
# have been deleted or added are currently not being transmitted.
#
self.board[self._device + ".beamf_ring.ch_n"] = self.nof_chans
# if not self.last_tile:
# return False
freq_beam_table = [0] * (self.nof_chans // 8)
subarray_table = [0] * (self.nof_chans // 8)
for i in range(self.nof_chans // 8):
subarray_channel = self.subarray_channel_table[i] & 0x1FF
subarray_beam = self.subarray_beam_table[i] & 0x3F
frequency = self.freq_table[i] & 0x1FF
# Substation and Subarray ID is either 6 or 8 bits depending on FW version
substation = self.substation_table[i] & self.substation_mask
subarray = self.subarray_table[i] & self.subarray_mask
freq_beam_table[i] = (
(subarray_channel << (9 + 6)) # 9-bit subarray channel
+ (subarray_beam << 9) # 6-bit subarray beam
+ frequency # 9-bit frequency
)
subarray_table[i] = (substation << self.subarray_width) + subarray
self.board[self._device + ".beamf_ring.freq_beam_tab"] = freq_beam_table
if self.board.has_register(self._device + ".beamf_ring.subarray_tab"):
self.board[self._device + ".beamf_ring.subarray_tab"] = subarray_table
self.board[self._device + ".beamf_ring.scan_id_tab"] = self.scan_id_table[0 : self.nof_chans] # fmt: skip
self.board[self._device + ".beamf_ring.frame_id.antenna_index"] = (self.aperture_table[0]) # fmt: skip
return True
############################################################################
# Read the channel tables and rebuild the internal software tables
############################################################################
def _read_channel_table(self):
"""Read the channel tables and rebuild the internal software tables."""
#
# readback is possible only in firmware which also
# supports start frame table
# if (not self.board[self._device+'.beamf_ring.control.last_tile']
if not self.board.has_register(self._device + ".beamf_ring.start_tab"):
return False
aperture_base = self.station_id * 100
self.nof_chans = self.board[self._device + ".beamf_ring.ch_n"]
freq_beam_table = self.board[self._device + ".beamf_ring.freq_beam_tab"]
subarray_table = self.board[self._device + ".beamf_ring.subarray_tab"]
self.scan_id_table = self.board[self._device + ".beamf_ring.scan_id_tab"]
for i in range(self.nof_chans // 8):
self.subarray_beam_table[i] = (freq_beam_table[i] >> 9) & 0x3F
self.freq_table[i] = freq_beam_table[i] & 0x1FF
self.subarray_channel_table[i] = (freq_beam_table[i] >> 15) & 0x1FF
self.substation_table[i] = (
subarray_table[i] >> self.subarray_width
) & self.substation_mask
self.subarray_table[i] = subarray_table[i] & self.subarray_mask
self.aperture_table[i] = aperture_base + self.substation_table[i]
for i in range(self.nof_chans // 8, 48):
self.subarray_beam_table[i] = 0
self.freq_table[i] = 0
self.beam_table[i] = 0
self.subarray_channel_table[i] = 0
self.substation_table[i] = 0
self.subarray_table[i] = 0
self.aperture_table[i] = 0
return True
[docs]
def read_nof_ch(self):
"""
Read the station beamformer number of channels table, return the read value.
:return: Number of channels
:rtype: int
"""
return self.board[self._device + ".beamf_ring.ch_n"]
############################################################################
# Read the channel tables and rebuild the internal software tables
############################################################################
def _read_start_stop_table(self):
"""Read the start and stop time table."""
#
# readback is possible only in firmware which also
# supports start frame table
if not self.board.has_register(self._device + ".beamf_ring.start_tab"):
return False
self.start_frame_table = self.board[self._device + ".beamf_ring.start_tab"]
self.stop_frame_table = self.board[self._device + ".beamf_ring.stop_tab"]
return True
############################################################################
# Define the channel table, for last tile
# Obsolete, legacy code, should be deleted.
# Replaced by define_channel_table()
############################################################################
[docs]
def defineChannelTable(self, region_array):
"""
Set frequency regions
(legacy version, note the CamelCase name).
Regions are defined in a 2-d array, for a maximum of 16 regions.
Each element in the array defines a region, with
the form [start_ch, nof_ch, beam_index]
- start_ch: region starting channel (currently must be a
multiple of 2, LS bit discarded)
- nof_ch: size of the region: must be multiple of 8 chans
- beam_index: beam used for this region, range [0:8)
Total number of channels must be <= 384
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans, and programs it in the HW
"""
self.logger.warning(
"Method defineChannelTable is deprecated: " "use define_channel_table"
)
region_idx = 0
self.beam_table = 64 * [0]
self.freq_table = 64 * [0]
for region in region_array:
start_ch = region[0]
reg_length = region[1] & 0x1F8
if reg_length > 384:
raise PluginError(
f"StationBeamformer: Invalid region length in {self._device}"
)
end_ch = start_ch + reg_length
if start_ch < 0 or end_ch > 512:
raise PluginError(
f"StationBeamformer: Invalid region position in {self._device}"
)
reg_length = reg_length // 8
if region_idx + reg_length > 64:
raise PluginError(
f"StationBeamformer: too many channels specified in {self._device}"
)
self.beam_table[region_idx : (region_idx + reg_length)] = [
region[2]
] * reg_length
self.freq_table[region_idx : (region_idx + reg_length)] = list(
range(start_ch, end_ch, 8)
)
region_idx = region_idx + reg_length
self.nof_chans = region_idx * 8
return self._program_channels()
############################################################################
# Define the channel table, for last tile. New version, old kept for legacy
############################################################################
[docs]
def define_channel_table(self, region_array):
"""
Set frequency regions.
Regions are defined in a 2-d array, for a maximum of 16 regions.
Each element in the array defines a region, with the form:
``[start_ch, nof_ch, beam_index, subarray_id,
subarray_logical_ch, aperture_id, substation_id]``
- 0: start_ch: region starting channel (currently must
be a multiple of 2, LS bit discarded).
- 1: nof_ch: size of the region. Must be multiple of
8 chans.
- 2: beam_index: hardware beam ID, unused for station beamformer.
beam_index is kept in the region_array because
the tile beamformer uses beam_index and
region_array is shared between the
tile and station beamformers [0:48)
- 3: subarray_id: ID of the subarray [1:48]
- 4: subarray_logical_channel: Logical channel in the subarray.
it is the same for all (sub)stations
in the subarray.
- 5: subarray_beam_id: ID of the subarray beam.
- 6: substation_ID: ID of the substation
- 7: aperture_id: ID of the aperture
(station*100+substation?)
Total number of channels must be <= 384
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans, and programs it in the hardware.
:param region_array: bidimensional array, one row for each
spectral region, 3 or 8 items long
:return: True if OK
:raises PluginError: if parameters are illegal
"""
region_idx = 0
self.beam_table = 64 * [0]
self.freq_table = 64 * [0]
self.subarray_table = 64 * [0]
self.subarray_channel_table = list(range(0, 512, 8))
self.subarray_beam_table = 64 * [0]
self.substation_table = 64 * [1]
self.aperture_table = 64 * [0]
if len(region_array[0]) != 8: # not full table
raise PluginError(
f"StationBeamformer: Invalid region array length in {self._device}."
)
self.beam_table = 64 * [0]
self.freq_table = 64 * [0]
for region in region_array:
start_ch = region[0] & 0x1FE
nof_chans = region[1] & 0x1F8
if nof_chans > 384:
raise PluginError(
f"StationBeamformer: Invalid region length in {self._device}"
)
end_ch = start_ch + nof_chans
if start_ch < 0 or end_ch > 512:
raise PluginError(
f"StationBeamformer: Invalid region position in {self._device}"
)
reg_length = nof_chans // 8
if region_idx + reg_length > 64:
raise PluginError(
f"StationBeamformer: too many channels specified in {self._device}"
)
self.beam_table[region_idx : (region_idx + reg_length)] = [
region[2]
] * reg_length
self.freq_table[region_idx : (region_idx + reg_length)] = list(
range(start_ch, end_ch, 8)
)
subarray_id = region[3] & self.subarray_mask
start_subarray_ch = region[4] & 0x1F8
subarray_beam_id = region[5] & 0x3F
substation_id = region[6] & self.substation_mask
aperture_id = region[7] & 0xFFFF
self.subarray_table[region_idx : (region_idx + reg_length)] = [
subarray_id
] * reg_length
self.subarray_channel_table[region_idx : (region_idx + reg_length)] = list(
range(start_subarray_ch, start_subarray_ch + nof_chans, 8)
)
self.subarray_beam_table[region_idx : (region_idx + reg_length)] = [
subarray_beam_id
] * reg_length
self.substation_table[region_idx : (region_idx + reg_length)] = [
substation_id
] * reg_length
self.aperture_table[region_idx : (region_idx + reg_length)] = [
aperture_id
] * reg_length
region_idx = region_idx + reg_length
self.nof_chans = region_idx * 8
return self._program_channels()
############################################################################
def _program_timing(self):
"""
Private method to set the timing registers
from constants set during initialization (or modified afterwards)
:return: False if the beamformer is running, True if OK
"""
if self.is_running():
self.logger.warning("Beamformer is running: can not set timing")
return False
self.board[self._device + ".beamf_ring.frame_rate.first_tile"] = int(
round(self.tpm_frame_time * self.clock_frequency)
)
self.board[self._device + ".beamf_ring.frame_rate.last_tile"] = int(
round(self.csp_frame_time * self.clock_frequency)
)
self.board[self._device + ".beamf_ring.timeout"] = self.ddr_timeout
self.board[self._device + ".beamf_ring.frame_timing.int_block_len"] = (
self.int_block_length
)
self.board[self._device + ".beamf_ring.frame_timing.int_block_ovl"] = (
self.int_block_ovl
)
return True
############################################################################
# Define the SPEAD header, for last tile
# With subarray enabled firmware, most values are specified in set_regions
# Only stationId and refEpoch are specified here.
############################################################################
[docs]
def define_spead_header(
self, stationId, subarrayId=0, apertureId=0, refEpoch=-1, startTime=0
):
"""
Define_spead_header() used to define SPEAD header for last tile
requires stationId, subarrayId and apertureId from LMC
Only stationId is require.
:param stationId: ID of the station: 1-512
:param subarrayId: ID of the subarray. can be overrided by defineChannelTable
:param apertureId: ID of the aperture.
:param refEpoch: Reference peoch. -1 (default) uses value already i
defined in set_epoch()
:param startTime: (in seconds): offset from frame time, default 0
:return: True if OK
"""
if self.is_running():
self.logger.warning("Beamformer is running: can not set global header")
return False
self.board[self._device + ".beamf_ring.frame_id.station_id"] = stationId
self.station_id = stationId
if apertureId == 0:
apertureId = self.aperture_table[0]
else:
self.aperture_table = [apertureId] * 64
self.board[self._device + ".beamf_ring.frame_id.antenna_index"] = apertureId
if subarrayId == 0:
subarrayId = self.subarray_table[0]
else:
self.subarray_table = [subarrayId] * 64
# self.board[self._device+'.beamf_ring.frame_id.sub_array_id'] = subarrayId
if refEpoch != -1:
self.set_epoch(refEpoch)
if self.board.memory_map.has_register(self._device + ".beamf_ring.start_time"):
self.board[self._device + ".beamf_ring.start_time"] = (
int(startTime * 1e9) & 0xFFFFFFFF
)
return True
############################################################################
# Set the Unix epoch in seconds since Unix reference time
############################################################################
[docs]
def set_epoch(self, epoch):
"""
Set the Unix epoch in seconds since Unix reference time
:param epoch: Unix time for reference time (TPM synch time) 48 bit int
:return: True if OK
"""
epoch = epoch & 0xFFFFFFFFFF
self.board[self._device + ".beamf_ring.ref_epoch_lo"] = epoch & 0xFFFFFFFF
self.board[self._device + ".beamf_ring.ref_epoch_hi"] = (epoch >> 32) & 0xFF
return True
############################################################################
# Get the channel table
############################################################################
[docs]
def get_channel_table(self):
"""
Returns a table with the following entries for each 8-channel block:
- 0: start physical channel (64-440)
- 1: beam_index: hardware beam ID, unused for station beamformer. beam_index is
kept in the region_array because the tile beamformer uses beam_index and
region_array is shared between the tile and station beamformers [0:48)
- 2: subarray_id: ID of the subarray [1:48]
Here is the same for all channels
- 3: subarray_logical_channel: Logical channel in the subarray
Here equal to the station logical channel
- 4: subarray_beam_id: ID of the subarray beam
- 5: substation_id: ID of the substation
- 6: aperture_id: ID of the aperture (station*100+substation?)
:return: Nx7 table with one row every 8 channels
"""
self._read_channel_table()
nof_blocks = self.nof_chans // 8
table = []
for block in range(nof_blocks):
table.append(
[
self.freq_table[block],
self.beam_table[block],
self.subarray_table[block],
self.subarray_channel_table[block],
self.subarray_beam_table[block],
self.substation_table[block],
self.aperture_table[block],
]
)
return table
############################################################################
# Recreate region_array
############################################################################
[docs]
def get_regions(self):
"""
Get frequency regions.
Regions are defined in a 2-d array, for a maximum of 16 (48) regions.
Each element in the array defines a region, with the form
``[start_ch, nof_ch, beam_index, subarray_id,
subarray_logical_ch, aperture_id, substation_id]``
- 0: start_ch: region starting channel (currently must
be a multiple of 2, LS bit discarded).
- 1: nof_ch: size of the region. Must be multiple of
8 chans.
- 2: beam_index: hardware beam ID, unused for station beamformer.
beam_index is kept in the region_array because
the tile beamformer uses beam_index and
region_array is shared between the
tile and station beamformers [0:48)
- 3: subarray_id: ID of the subarray [1:48]
- 4: subarray_logical_channel: Logical channel in the subarray.
it is the same for all (sub)stations
in the subarray.
- 5: subarray_beam_id: ID of the subarray beam.
- 6: substation_ID: ID of the substation
- 7: aperture_id: ID of the aperture
(station*100+substation?)
:return: Bidimensional array of regions
:rtype: list(list(int))
"""
channel_table = self.get_channel_table()
nof_ch = []
chan_tab_idx = [0]
region_array = []
tmp_beam_index = channel_table[0][1]
reg_cnt = 0
# Find nof_ch
# Find beam_index transitions which depict start of new region
for i, row in enumerate(channel_table):
beam_index = row[1]
if tmp_beam_index != beam_index:
nof_ch.append(reg_cnt)
tmp_beam_index = beam_index
reg_cnt = 0
reg_cnt += 8
nof_ch.append(reg_cnt)
# Find the channel table indexes where start of regions are located
for i in range(1, len(nof_ch)):
region_start = sum(nof_ch[j] // 8 for j in range(i))
chan_tab_idx.append(region_start)
# Find region_array information from channel_table
for nof_ch_idx, i in enumerate(chan_tab_idx):
(
start_ch,
beam_index,
subarray_id,
subarray_logical_channel,
subarray_beam_id,
substation_id,
aperture_id,
) = channel_table[i]
region_array.append(
[
start_ch,
nof_ch[nof_ch_idx],
beam_index,
subarray_id,
subarray_logical_channel,
subarray_beam_id,
substation_id,
aperture_id,
]
)
return region_array
############################################################################
# Set output rounding for CSP
############################################################################
[docs]
def set_csp_rounding(self, rounding):
"""
Sets the number of bits rounded off before sending the result to the CSP.
For white noise it should be ``log2(sqrt(nof_antennas))``,
i.e. 4 for 256 antennas
:param rounding: Either scalar or list, of number of bits rounded off
before sending the result to the CSP. If list, only 1st element
is used. In future firmware, one value per channel.
:return: True if OK
"""
if isinstance(rounding, list):
round_int = rounding[0]
else:
round_int = rounding
if round_int < 0:
round_int = 0
if round_int > 7:
round_int = 7
self.csp_rounding = round_int
self.board[f"{self._device}.beamf_ring.csp_scaling"] = round_int
return True
############################################################################
# Get output rounding value for CSP
############################################################################
[docs]
def get_csp_rounding(self):
"""
Reads the csp rounding value stored in the firmware register
:return: CSP rounding value from device firmware register
:rtype: int
"""
return self.board[f"{self._device}.beamf_ring.csp_scaling"]
############################################################################
# Return current frame
############################################################################
[docs]
def current_frame(self):
"""
Current frame as seen by the station beamformer.
:return: current frame, in units of 256 ADC frames (276,48 us)
"""
return self.board[self._device + ".beamf_ring.current_frame"]
############################################################################
# Enable/Disable transmission on incomplete frames (data will be flagged)
############################################################################
[docs]
def enable_flagging(self):
"""
This enables the transmission of incomplete frames, any packets in the
frame that are missing will be substituted for the reserved value
(flagged).
"""
self.board[f"{self._device}.beamf_ring.control.incomplete_frame_enable"] = 0x1
[docs]
def disable_flagging(self):
"""
This disables the transmission of incomplete frames, if a frame is not
complete, the entire frame will be dropped. No flagging will occur and
this will appear as packet loss to CSP.
"""
self.board[f"{self._device}.beamf_ring.control.incomplete_frame_enable"] = 0x0
[docs]
def is_flagging_enabled(self):
"""
Return True if station beam data flagging is enabled
:return: is station beam flag enabled (bool)
"""
return (
self.board[f"{self._device}.beamf_ring.control.incomplete_frame_enable"] > 0
)
############################################################################
# Start the beamformer
############################################################################
[docs]
def start(
self,
start_time=0,
duration=-1,
scan_id=0,
channel_groups=range(48),
):
"""
Starts an integration.
The integration is specified in units of 256 ADC frames
from start_frame (included) to stop_frame (excluded).
Default for stop_frame is -1 = "forever".
:param start_time: first frame (as seen by current_frame) in integration
:param duration: Integration duration, in frames. If -1, forever
:param channel_groups: channel groups to start, default all
:return: True if OK, False if not possible (integration already active)
"""
# Program start and stop times
if start_time == 0:
start_time = self.current_frame() + 48
start_time &= 0xFFFFFFF8
if duration == -1:
end_time = 0xFFFFFFFF
else:
duration &= 0xFFFFFFF8
end_time = start_time + duration # +1
self._program_start_stop_time(start_time, end_time, scan_id, channel_groups)
self.board[self._device + ".beamf_ring.control.reset"] = 0
return True
############################################################################
[docs]
def is_running(self, channel_groups=range(48)):
"""
Check if the beamformer is still running.
Compares current frame to programmed start and last frame
:param channel_groups: list of the channels to check
:return: True if beamformer is running in one of the selected channels
"""
current_frame = self.current_frame()
if self.board.has_register(self._device + ".beamf_ring.start_tab"):
# check individual channels start and stop time
running = False
for c in channel_groups:
if c < 64:
started = current_frame >= self.start_frame_table[c]
stopped = current_frame >= self.stop_frame_table[c]
running = running or (started and not stopped)
else:
running = True
# Check global start and stop time
started = current_frame >= self.board[self._device + ".beamf_ring.start_frame"]
stopped = current_frame >= self.board[self._device + ".beamf_ring.last_frame"]
global_running = started and not stopped
return running and global_running
############################################################################
[docs]
def abort(self, channel_groups=None):
"""
Stop the beamformer
:param channel_groups: Groups of channels which must stop the scan.
If None, stop all channels and the whole beamformer chain.
If it is an empty list, do nothing
:type channel_groups: list | None
:return: True if OK
"""
if channel_groups is None:
self._program_start_stop_time(8, 0, 0, range(48))
self.board[self._device + ".beamf_ring.start_frame"] = 8
self.board[self._device + ".beamf_ring.last_frame"] = 0
else:
self._program_start_stop_time(8, 0, 0, channel_groups)
return True
############################################################################
# Report errors
############################################################################
[docs]
def report_errors(self):
"""
:return: error flags
"""
# Error Flags
self.errors = (self.board[f"{self._device}.beamf_ring.errors"]) & 0x3FF
return self.errors
[docs]
def get_discarded_or_flagged_packet_count(self):
"""
When station beam flagging disabled, count of packets discarded.
Will always be a multiple of 8. When station beam flagging enabled,
count of packets substitued (flagged).
:return packet count
"""
if self.board.memory_map.has_register(
f"{self._device}.beamf_ring.discarded_or_flagged_packet_count"
):
return self.board[
f"{self._device}.beamf_ring.discarded_or_flagged_packet_count"
]
[docs]
def get_dsp_latency(self):
"""
Returns the DSP signal chain latency in ms
The DSP latency is calculated by measuring the difference between the SPEAD
transmission timestamp in the station beamformer and the ADC sampling timestamp
for the same packet.
NOTE: if this is run on on tile which is not the final one a latency of zero
will be returned
:return: DSP latency in ms
:rtype: float
"""
latency_reg = f"{self._device}.beamf_ring.dsp_latency"
if not self.board.memory_map.has_register(latency_reg):
return None
return self.board[latency_reg] * self.sps_frame_time
[docs]
def set_dsp_latency_error_threshold(self, latency_threshold):
"""
Sets the latency threshold in ms.
If any spead packet is sent out with a higher DSP latency than this the
latency error flag goes high.
param latency_threshold: DSP latency threshold in ms
:type latency_threshold: float
"""
latency_reg = f"{self._device}.beamf_ring.dsp_latency"
if not self.board.memory_map.has_register(latency_reg):
return
latency_threshold_reg = f"{self._device}.beamf_ring.latency_threshold"
self.board[latency_threshold_reg] = int(
latency_threshold // self.sps_frame_time
)
[docs]
def clear_dsp_latency_error(self):
"""
Clears the dsp latency error.
"""
if self.board.memory_map.has_register(f"{self._device}.beamf_ring.dsp_latency"):
self.board[f"{self._device}.beamf_ring.latency_reset"] = 1
[docs]
def get_dsp_latency_error(self):
"""
Returns False if DSP latency has exceeded the DSP latency threshold.
Default threshold is 10 ms.
:return: DSP latency error
:rtype: bool
"""
latency_reg = f"{self._device}.beamf_ring.dsp_latency"
if not self.board.memory_map.has_register(latency_reg):
return None
return self.board[f"{self._device}.beamf_ring.latency_error"] > 0
[docs]
def get_dsp_output_spead_timestamp(self):
"""
Returns the output spead timestamp of firmware in UTCs
:return: output spead timestamp in UTC
:rtype: integer
"""
latency_reg = f"{self._device}.beamf_ring.dsp_latency"
if not self.board.memory_map.has_register(latency_reg):
return None
else:
sync_time_val = self.board[f"{self._device}.pps_manager.sync_time_val"]
out_timestamp_reg = f"{self._device}.beamf_ring.output_spead_timestamp"
output_spead_timestamp = self.board[out_timestamp_reg]
return sync_time_val + output_spead_timestamp * self.sps_frame_time
[docs]
def clear_errors(self):
"""Clear frame errors and general error flag."""
self.board[f"{self._device}.beamf_ring.control.error_rst"] = 1
self.board[f"{self._device}.beamf_ring.control.error_rst"] = 0
return
[docs]
def check_ddr_parity_error_counter(self):
return self.board[f"{self._device}.beamf_ring.errors.ddr_parity_error_cnt"]
############################################################################
# Some default methods
############################################################################
[docs]
def status_check(self):
"""
Perform status check
Checks if framing errors are present
:return: Status
"""
self.logger.info("StationBeamformer: Checking status")
self.frame_errors = 0 # Removed from firmware
self.errors = (self.board[self._device + ".beamf_ring.error"]) & 0x3FF
if self.frame_errors == 0:
return Status.OK
self.logger.info(f"StationBeamformer: Frame errors {self.frame_errors}")
return Status.Error
[docs]
def clean_up(self):
"""
Perform cleanup
:return: Success
"""
self.logger.info("StationBeamformer : Cleaning up")
return True