from math import cos, fabs, sin
import numpy as np
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import to_signed
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
__author__ = "Carolina Belli"
[docs]
class BeamfFD(FirmwareBlock):
"""Frequency Domain Beamformer plugin"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("beamf_fd")
@maxinstances(2)
def __init__(self, board, logger=None, **kwargs):
"""
BeamformerFD initialiser.
:param board: Pointer to board instance
"""
super(BeamfFD, self).__init__(board, logger=logger)
if "device" not in list(kwargs.keys()):
raise PluginError("BeamfFD: Require a node instance")
self._device = kwargs["device"]
if self._device == Device.FPGA_1:
self._device = "fpga1"
elif self._device == Device.FPGA_2:
self._device = "fpga2"
else:
raise PluginError(f"BeamfFD: Invalid device {self._device}")
self.beam_index = [0] * 64
self.region_off = [0] * 64
self.region_sel = [0] * 64
self.nof_chans = 384 # True number of channels
self.nof_hw_chans = 384 # FIXED number of chans in output packet.
# number of independent channel blocks
try:
region_off_reg = self.board.find_register("fpga1.beamf_fd.region_off")[0]
self.nof_regions = region_off_reg.size
except:
self.nof_regions = 16
# Calibration quantities
self.cal_curve = [[[[1 + 0j, 0 + 0j, 0 + 0j, 1 + 0j]] * 512] * 8] * 8
self.antenna_tapering = [1.0] * 8
self.beam_angle = [0.0] * 8
# Rotation matrix
self.rot_matrix = [[1.0, 0.0, 0.0, 1.0]] * 8
try:
self.board["fpga1.dsp_regfile.feature.tile_beamformer_implemented"]
self.scale = 2048
except:
self.scale = 128
# cal_data read is not supported on older Firmware versions
# Pre Firmware change cal_data table size is 512 while new Firmware size is 1024
self._cal_data_read_supported = (
True
if (self.board.find_register("fpga1.beamf_cal.cal_data")[0].size == 1024)
else False
)
# cal coefficient addressing scheme after JNAUS-300. Channel-centric
self._cal_data_by_channel_supported = (
True if self.board["fpga1.beamf_cal.date_code"] > 0x20251201 else False
)
self._48_beams_supported = self.board.has_register(
f"{self._device}.beamf_fd.delay_antenna_0"
)
# Define Max number of Beams supported
self.max_beams = 48 if self._48_beams_supported else 8
############################################################################
[docs]
def initialise_beamf(self):
"""Initialise Frequency Domain Beamformer"""
# Reset beamformer
self.board[f"{self._device}.beamf_fd.control.reset"] = 1
self.board[f"{self._device}.beamf_fd.control.reset"] = 0
# Load Delay
self.board[f"{self._device}.beamf_fd.control.load_delay"] = 0
# Load Immediate Delay
self.board[f"{self._device}.beamf_fd.control.load_delay_immediate"] = 0
# Number of processed channels: 384/8
self.nof_chans = 384 # 384
self.nof_hw_chans = 384 # 384
self.board[f"{self._device}.beamf_fd.nof_chans"] = self.nof_hw_chans // 8 # 384
# Test point select
self.board[f"{self._device}.beamf_fd.tp_sel"] = 0
# Load time as frame number
self.board[f"{self._device}.beamf_fd.load_time"] = 0
# Region select table
region_sel = [0] * 64
self.board[f"{self._device}.beamf_fd.region_sel"] = region_sel
# Region offset table
region_offset = [0] * self.nof_regions
self.board[f"{self._device}.beamf_fd.region_off"] = region_offset
# Beam index table
beam_index = [0] * self.nof_regions
self.board[f"{self._device}.beamf_fd.beam_index"] = beam_index
# Antenna delay and delay rate from beam X antenna Y
# Initialise Beam 0 for all Antennas
del_x_y_init = [0] * 8
if self._48_beams_supported:
for antenna in range(8):
self.board.write_register(
register=f"{self._device}.beamf_fd.delay_antenna_{antenna}",
values=del_x_y_init[antenna],
offset=0,
)
else:
self.board[f"{self._device}.beamf_fd.delay_0_0"] = del_x_y_init
self.cal_curve = [[[[complex(1.0), complex(0.0), complex(0.0), complex(1.0)]] * 512] * 8] * 8 # fmt: skip
self.antenna_tapering = [1.0] * 8
self.beam_angle = [0.0] * 8
# Rotation matrix
self.rot_matrix = [[1.0, 0.0, 0.0, 1.0]] * 8
self.compute_calibration_coefs()
self.switch_calibration_bank(force=True)
self.logger.info("BeamfFD has been initialised")
[docs]
def current_frame(self):
"""
Return current frame processed by the tile beamformer
:returns: current frame number, in units of 256 ADC frames (276,48 us)
"""
return self.board[f"{self._device}.beamf_cal.current_time"]
# Set regions. The method computes the total number of channels and the arrays and writes in the registers
[docs]
def set_regions(self, region_array):
"""
Set frequency regions.
Regions are defined in a 2-d array, for a maximum of 16 (48) regions.
Actual limit defined in hardware by the size of the region_off table.
Each element in the array defines a region, with the form
[start_ch, nof_ch, beam_index].
- start_ch: region starting channel (currently must be a multiple of
2, LS bit discarded)
- nof_ch: size of the region: must be multiple of 8 chans
- beam_index: beam used for this region, range [0:8)
Total number of channels must be <= 384
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans, and programs it in the HW
:param region_array: Bidimensional array of regions
"""
region_start = 0
region_idx = 0
beam_index = [0] * self.nof_regions
region_off = [0] * self.nof_regions
region_sel = [0] * 64
for region in region_array:
start_ch = region[0]
reg_length = region[1] & 0x1F8
if reg_length > 384:
raise PluginError(
f"BeamfFD: Invalid region length {reg_length} in {self._device}"
)
if start_ch < 0 or start_ch + reg_length > 512:
raise PluginError(f"BeamfFD: Invalid region position in {self._device}")
beam_index[region_idx] = region[2]
if beam_index[region_idx] >= self.max_beams or beam_index[region_idx] < 0:
raise PluginError(f"BeamfFD: Invalid beam index in {self._device}")
region_off[region_idx] = start_ch - region_start
for j in range(reg_length // 8):
region_sel[region_start // 8 + j] = region_idx
region_idx += 1
region_start += reg_length
if region_start > 384:
raise PluginError(
f"BeamfFD: Invalid number of channels in {self._device}"
)
self.region_sel = region_sel
self.region_off = region_off
self.beam_index = beam_index
# Patch: changing nof_chans on the run desyncronizes the two FPGAs
# Hardware nof.chans MUST stay constant
self.nof_chans = region_start
self.board[f"{self._device}.beamf_fd.region_sel"] = self.region_sel
self.board[f"{self._device}.beamf_fd.region_off"] = self.region_off
self.board[f"{self._device}.beamf_fd.beam_index"] = self.beam_index
[docs]
def get_regions(self):
"""Get frequency regions and rebuild internal software tables.
Regions are defined in a 2-d array, for a maximum of 16 (48) regions.
Actual limit defined in hardware by the size of the region_off table.
Each element in the array defines a region, with
the form [start_ch, nof_ch, beam_index]
- start_ch: region starting channel (currently must be a
multiple of 2, LS bit discarded)
- nof_ch: size of the region: must be multiple of 8 chans
- beam_index: beam used for this region, range [0:8)
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans, and programs it in the HW
:return: Bidimensional array of regions
:rtype: list(list(int))
"""
self.region_sel = self.board[f"{self._device}.beamf_fd.region_sel"]
self.region_off = self.board[f"{self._device}.beamf_fd.region_off"]
self.beam_index = self.board[f"{self._device}.beamf_fd.beam_index"]
region_array = []
nof_ch = []
nof_regions = sum(x != 0 for x in self.region_off)
# Region Index 0
start_ch = self.region_off[0]
# Count leading zeros in region_sel
nof_ch0 = 0
for i in self.region_sel:
if i != 0:
break
nof_ch0 += 8
nof_ch0 = min(nof_ch0, 384)
nof_ch.append(nof_ch0)
region_array.append([start_ch, nof_ch[0], self.beam_index[0]])
# Region Index 1 to nof_regions-1
for region_idx in range(1, nof_regions):
nof_ch.append(self.region_sel.count(region_idx) * 8)
start_ch = self.region_off[region_idx]
for nof_ch_idx in range(region_idx):
start_ch += nof_ch[nof_ch_idx]
region_array.append(
[start_ch, nof_ch[region_idx], self.beam_index[region_idx]]
)
return region_array
# Set delay for a beam
[docs]
def set_delay(self, delay_array, beam_index):
"""
The method specifies the delay in seconds and the delay rate in seconds/seconds.
The delay_array specifies the delay and delay rate for each antenna.
beam_index specifies which beam is described (current range 0-7) from 0-47 (in the future).
Delay is updated inside the delay engine at the time specified
by method load_delay
:param delay_array: bidimensional array [8,2]. Each row contains 2 elements,
as delay (in s) and delay rate (in s/s).
One delay is specified for both polarization in each antenna
:param beam_index: hardware station beam to program. Range 0-47
"""
# Valid beam_index input data checks
if not isinstance(beam_index, int):
raise PluginError(
f"BeamfFD: Invalid Beam Index format ({beam_index}) - must be an integer"
)
elif beam_index >= self.max_beams or beam_index < 0:
raise PluginError(
f"BeamfFD: Invalid Beam Index ({beam_index}) in {self._device} - Beam Index must be from 0 to {self.max_beams -1}"
)
# Checking valid delay_array data and writing delay_array to delay registers
delay_x_y = [0] * 8
for antenna in range(8):
# Apply calculation circuitry to the inputs
delay_hw = int(round(delay_array[antenna][0] * (2**23 / 1280e-9)))
rate_hw = int(
round(delay_array[antenna][1] * ((1024.0 * 1080e-9 * 2**37) / 1280e-9))
)
if fabs(delay_hw) > (2**19) or fabs(rate_hw) > (2**11):
raise PluginError(
f"BeamfFD: Invalid Delay - Delay rate in {self._device}"
)
# Bitwise operation to quickly concatenate delay and delay rate into a 32-bit register:
# Delay[31:12] - (20 MSB)
# Delay_Rate[11:0] - (12 LSB)
delay_x_y[antenna] = ((delay_hw & 0xFFFFF) << 12) | (rate_hw & 0xFFF)
if self._48_beams_supported:
for antenna in range(8):
# Since the delays for all the beams are stored per antenna, this loop writes across all those registers.
# The write offset is set to the beam_index so the write will start at the specified beam index.
self.board.write_register(
register=f"{self._device}.beamf_fd.delay_antenna_{antenna}",
values=delay_x_y[antenna],
offset=beam_index,
)
else:
self.board[f"{self._device}.beamf_fd.delay_{beam_index}_0"] = delay_x_y
# Gets the delay values by reading the registers
[docs]
def get_delay(self, beam_index):
"""
This method retrives the approximate input delay and delay rate specified for all 8 antennas for a given beam.
beam_index specifies which beam is being specified (current range 0-7) from 0-47 (in the future)
:param beam_index: hardware station beam to program. Range 0-7 (0-47 in future firmware)
:type beam_index: int
:return: A list of the input delay and delay rate all 8 antennas for a given beam index
:rtype: list(list(float, float) * 8)
"""
# Valid input data checks
if not isinstance(beam_index, int):
raise PluginError(
f"BeamfFD: Invalid Beam Index format ({beam_index}) - must be an integer"
)
elif beam_index >= self.max_beams or beam_index < 0:
raise PluginError(
f"BeamfFD: Invalid Beam Index ({beam_index}) in {self._device} - Beam Index must be from 0 to {self.max_beams -1}"
)
read_delay_x_y = [0] * 8
# Read out and store the delay register block values from firmware (for a given beam index)
if self._48_beams_supported:
for antenna in range(8):
# Since the delays for all the beams are stored per antenna, this loop reads across all those registers.
# The read offset is set to the beam_index so the read will start at the specified beam index.
# n is set to 1 to only read 1 row from the RAM.
read_delay_x_y[antenna] = self.board.read_register(
register=f"{self._device}.beamf_fd.delay_antenna_{antenna}",
n=1,
offset=beam_index,
)
else:
read_delay_x_y = self.board.read_register(
f"{self._device}.beamf_fd.delay_{beam_index}_0", 8
)
# Convert back the register values into the input delay and delay rate
# Note: the retrived delay and delay rate will not exactly be the same as the input due to rounding errors and type conversion
input_delay_delay_rate = []
for antenna in range(8):
# Bitwise operations to extract the delay and delay rate from the 32-bit register
# Delay is signed 20bits [31:12]
# Delay_ratte is signed 12 bits [11:0]
delay_hw_mask = (read_delay_x_y[antenna] >> 12) & 0xFFFFF
rate_hw_mask = read_delay_x_y[antenna] & 0xFFF
# Calculate input values by performing the inverse calculation circuitry done in set_delay()
delay_input = to_signed(delay_hw_mask, 20) * (1280.0e-9 / 2**23)
rate_input = to_signed(rate_hw_mask, 12) * (
1280e-9 / (1024.0 * 1080e-9 * 2**37)
)
input_delay_delay_rate.append([delay_input, rate_input])
return input_delay_delay_rate
# loads the delay
[docs]
def load_delay(self, load_time=0, beam_mask=0xFFFFFFFFFFFF):
"""
Transfers the delay to the delay computing hardware at the prescribed
frame number (load_time)
If load_time = 0 transfers the delay immediately.
:param load_time: Prescribed load time (frame number). 0 = immediate
:param beam_mask: beams to be activated. Default: all. unsupported in this FW
"""
if load_time == 0:
self.board[f"{self._device}.beamf_fd.control.load_delay_immediate"] = 1
self.board[f"{self._device}.beamf_fd.control.load_delay_immediate"] = 0
else:
self.board[f"{self._device}.beamf_fd.load_time"] = load_time
self.board[f"{self._device}.beamf_fd.control.load_delay"] = 1
self.board[f"{self._device}.beamf_fd.control.load_delay"] = 0
# Calibration
[docs]
def load_calibration(self, antenna, calibration_coefs):
"""
Loads calibration coefficients.
calibration_coefs is a bidimensional complex array of the form
calibration_coefs[channel, polarization], with each element representing
a normalized coefficient, with (1.0, 0.0) the normal, expected response for
an ideal antenna.
Channel is the index specifying the channels at the beamformer output,
i.e. considering only those channels actually processed and beam assignments.
The polarization index ranges from 0 to 3.
- 0: X polarization direct element
- 1: X->Y polarization cross element
- 2: Y->X polarization cross element
- 3: Y polarization direct element
The calibration coefficients may include any rotation matrix (e.g.
the parallactic angle), but do not include the geometric delay.
:param antenna: Antenna number. Integer in range 0:8
:param calibration_coefs: Calibration coefficients. array [384, 4]
"""
if self._cal_data_by_channel_supported:
self._load_calibration_v2(antenna, calibration_coefs)
else:
self._load_calibration_v1(antenna, calibration_coefs)
def _load_calibration_v1(self, antenna, calibration_coefs):
"""
Load calibration coefficients, physical bank order.
block_sel register contains antenna index (bits 4-2) and
polarization (bits 1-0)
cal_data is addressed by channel (bits 8-0) and bank select
(bit 9, 1= active, 0 = staged coefficients bank)
"""
chans = len(calibration_coefs)
cal_data = [0] * chans
for block in range(4):
for chan in range(chans):
cal_data[chan] = self._complex_coef_to_hw_coef(
calibration_coefs[chan][block]
)
self.board[f"{self._device}.beamf_cal.block_sel"] = antenna * 4 + block
self.board[f"{self._device}.beamf_cal.cal_data"] = cal_data
def _load_calibration_v2(self, antenna, calibration_coefs):
"""
Load calibration coefficients, one bank every 16 channels.
Firmware calibration coefficients addressing after JANUS-300
cal_data is addressed by:
- polarization index (bits 1-0)
- antenna index (bits 4-2)
- 4 LS bits of channel index (bits 8-5)
- bank select (bit 9, 1= active, 0 = staged coefficients bank)
block_sel register contains upper 5 bits of channel index
"""
chans = len(calibration_coefs)
antenna_mask = (antenna & 7) << 2
for chan0 in range(0, chans, 16):
self.board[f"{self._device}.beamf_cal.block_sel"] = chan0 >> 4
for chan in range(chan0, min(chans, chan0 + 16)):
cal_data = [0] * 4
for block in range(4):
cal_data[block] = self._complex_coef_to_hw_coef(
calibration_coefs[chan][block]
)
offset = antenna_mask + ((chan & 0xF) << 5)
self.board.write_register(
f"{self._device}.beamf_cal.cal_data", cal_data, offset=offset
)
def _complex_coef_to_hw_coef(self, calibration_coef):
"""
Convert complex cal coefficient to hardware format.
Convert a complex number to a calibration cefficient in the
format to be sent to the calibration firmware.
:param calibration_coef: Complex calibration coefficient, normalized to 1.0
:return: 32 bit integer with 16 MS bits for the imaginary part and
16 LS bits for real part, normalized to 2048
"""
coef_imag = round(calibration_coef.imag * self.scale) & 0xFFFF
coef_real = round(calibration_coef.real * self.scale) & 0xFFFF
return (coef_imag << 16) | coef_real
def _hw_coef_to_complex_coef(self, hw_coef):
"""
Convert cal coefficient in hardware format to complex number.
Convert a complex number to a calibration cefficient in the
format to be sent to the calibration firmware.
:param hw_coef: 32 bit integer with 16 MS bits for the imaginary part and
16 LS bits for real part, normalized to 2048
:return: Complex calibration coefficient, normalized to 1.0
"""
# fmt: off
# TODO: use to_signed from utils.py
coef_imag = ((((hw_coef >> 16) + 0x8000) & 0xFFFF) - 0x8000) / self.scale
coef_real = (((hw_coef + 0x8000) & 0xFFFF) - 0x8000) / self.scale
# fmt: on
return complex(coef_real, coef_imag)
[docs]
def read_calibration(self, antenna, read_bank):
"""
Reads calibration coefficients, one antenna at a time.
calibration_coefs is a bidimensional complex array of the form
calibration_coefs[channel, polarization], with each element representing
a normalized coefficient, with (1.0, 0.0) the normal, expected response for
an ideal antenna.
Channel is the index specifying the channels at the beamformer output,
i.e. considering only those channels actually processed and beam assignments.
The polarization index ranges from 0 to 3.
- 0: X polarization direct element
- 1: X->Y polarization cross element
- 2: Y->X polarization cross element
- 3: Y polarization direct element
The calibration coefficients may include any rotation matrix (e.g.
the parallactic angle), but do not include the geometric delay.
:param antenna: Antenna number. Integer in range 0:8
:param read_bank: Calibration Table bank.
0 = Staged Calibration Coefficients
1 = Live Calibration Coefficients
:return: Calibration coefficients. array [384, 4]
:rtype: list(list(complex))
"""
chans = 384
read_mask = 0 if read_bank == 0 else (1 << 9)
cal_coefs = np.zeros((chans, 4), dtype=complex)
if not self._cal_data_read_supported:
self.logger.warning("Calibration Coefficient table read is not supported")
return cal_coefs
#
# Read cal data according to addressing scheme
# block_sel register addressing channel blocks
if self._cal_data_by_channel_supported:
for chan0 in range(0, 384, 16):
self.board[f"{self._device}.beamf_cal.block_sel"] = chan0 >> 4
cal_data = self.board.read_register(
f"{self._device}.beamf_cal.cal_data",
512,
offset=read_mask,
)
for block in range(4):
antenna_mask = antenna * 4 + block
for chan in range(16):
cal_coefs[chan0 + chan][block] = self._hw_coef_to_complex_coef(
cal_data[(chan << 5) + antenna_mask]
)
else: # block_sel register addressing antennas & polarizations
for block in range(4):
self.board[f"{self._device}.beamf_cal.block_sel"] = antenna * 4 + block
cal_data = self.board.read_register(
f"{self._device}.beamf_cal.cal_data",
384,
offset=read_mask,
)
for chan in range(chans):
cal_coefs[chan][block] = self._hw_coef_to_complex_coef(
cal_data[chan]
)
return cal_coefs
[docs]
def load_calibration_for_channels(self, first_channel, calibration_coefs):
"""
Loads calibration coefficients for a group of channels, all antennas.
calibration_coefs is a tridimensional complex array of the form
calibration_coefs[channel, antenna, polarization], with each element
representing a normalized coefficient, with (1.0, 0.0) the normal,
expected response for an ideal antenna.
Channel is the index specifying the channels at the beamformer output,
i.e. considering only those channels actually processed and beam
assignments, and starts at start_channel.
E.g. if start_channel = 10 and 12 channels are specified, the
calibration is specified for logical channels 10-21.
Usually calibration is specified for a block of 8N channels
starting at a multiple of 8 channels.
Other channels are not affected.
The polarization index ranges from 0 to 3.
- 0: X polarization direct element
- 1: X->Y polarization cross element
- 2: Y->X polarization cross element
- 3: Y polarization direct element
The calibration coefficients may include any rotation matrix (e.g.
the parallactic angle), but do not include the geometric delay.
:param first_channel: first channel in block. Integer in range 0:384
:param calibration_coefs: Calibration coefficients. array [nchans, 8, 4]
"""
if not self._cal_data_by_channel_supported:
return self._load_calibration_for_channels_v1(
first_channel, calibration_coefs
)
nchans = calibration_coefs.shape[0]
if calibration_coefs.shape[1] != 8:
raise ValueError(
"Only 8 antennas supported in load_calibration_for_channels"
)
# data is sent to the HW in blocks of up to 512 coefficients,
# 16 channels, with each block starting at a multiple of 16 logical
# channels and selected by block_sel register
#
# offset in the first data block sent to the HW, can be nonzero in the
# first block
offset = (first_channel & 0xF) << 5
# first_index and last_index specify the portion of calibration_coefs
# processed for this data block
first_index = 0
last_index = min(0x10 - (first_channel & 0xF), nchans)
# channel0 is the first logical channel in the addressed block.
for channel0 in range(first_channel & 0x1F0, (first_channel + nchans), 16):
self.board[f"{self._device}.beamf_cal.block_sel"] = channel0 >> 4
cal_data = [0] * (32 * (last_index - first_index))
out_index = 0
for chan in range(first_index, last_index):
for antenna in range(8):
for block in range(4):
cal_data[out_index] = self._complex_coef_to_hw_coef(
calibration_coefs[chan][antenna][block]
)
out_index += 1
self.board.write_register(
f"{self._device}.beamf_cal.cal_data", cal_data, offset=offset
)
# update offsets
offset = 0
first_index = last_index
last_index = min(first_index + 0x10, nchans)
def _load_calibration_for_channels_v1(self, first_channel, calibration_coefs):
"""Load the cal coefficients for a block of channels, for old firmware."""
nchans = calibration_coefs.shape[0]
for antenna in range(calibration_coefs.shape[1]):
for block in range(4):
self.board[f"{self._device}.beamf_cal.block_sel"] = (
(antenna & 7) << 2
) + block
cal_data = [0] * nchans
for chan in range(nchans):
cal_data[chan] = self._complex_coef_to_hw_coef(
calibration_coefs[chan][antenna][block]
)
self.board.write_register(
f"{self._device}.beamf_cal.cal_data", cal_data, offset=first_channel
)
[docs]
def read_all_calibration(self, read_bank):
"""
Read all calibration coefficients.
Read all complex calibration coefficients, for 384 channels, 8 antennas,
4 polarizations.
:param read_bank: Calibration Table bank.
0 = Staged Calibration Coefficients
1 = Live Calibration Coefficients
:return: 3D array of complex, indexed by [channel, antenna, polarization]
"""
calibration_coefs = np.zeros([384, 8, 4], complex)
offset = 0 if (read_bank == 0) else 512
if self._cal_data_by_channel_supported:
for first_channel in range(0, 384, 16):
self.board[f"{self._device}.beamf_cal.block_sel"] = first_channel >> 4
cal_data = self.board.read_register(
f"{self._device}.beamf_cal.cal_data", 512, offset=offset
)
hw_index = 0
for channel in range(16):
for antenna in range(8):
for block in range(4):
calibration_coefs[
channel + first_channel, antenna, block
] = self._hw_coef_to_complex_coef(cal_data[hw_index])
hw_index += 1
else:
for antenna in range(8):
for block in range(4):
self.board[f"{self._device}.beamf_cal.block_sel"] = (
(antenna & 7) << 2
) + block
cal_data = self.board.read_register(
f"{self._device}.beamf_cal.cal_data", 384, offset=offset
)
for channel in range(384):
calibration_coefs[channel, antenna, block] = (
self._hw_coef_to_complex_coef(cal_data[channel])
)
return calibration_coefs
[docs]
def switch_calibration_bank(self, time=0, force=False):
"""
Switch the calibration bank. To be performed after all
calibration qantities have been updated on both FPGAS
:param time: Time to perform bank switch (frame number).
Default 0 = current time + 64 frames
:param force: Force immediate switch
:return: bank used after bank switch (0 or 1)
"""
b = self.board[f"{self._device}.beamf_cal.control.cal_table_bank"]
self.board[f"{self._device}.beamf_cal.control.cal_table_bank"] = 1 - b
if force:
self.board[f"{self._device}.beamf_cal.control.sw_table_bank"] = 1
self.board[f"{self._device}.beamf_cal.control.sw_table_bank"] = 0
else:
if time == 0:
time = self.board[f"{self._device}.beamf_cal.current_time"] + 64
self.board[f"{self._device}.beamf_cal.load_time"] = time
return 1 - b
[docs]
def load_cal_curve(self, antenna, beam, cal_coefficients):
"""
Calibration curve is specified for 512 frequency channels over the
whole (0-400) MHz range.
Calibration for unused frequency regions may assume any value (e.g. 0).
Default (at initialization) is 1.0 for diagonal terms and 0.0 for cross
diagonal terms.
Obsolete, does not support subarrays, kept for backward compatibility
"""
self.cal_curve[beam][antenna] = cal_coefficients
[docs]
def compute_calibration_coefs(self):
"""
Compute the calibration coefficients and load them in the hardware.
To be used after load_cal_curve(), load_antenna_tapering() and
load_beam_angle
Obsolete, does not support subarrays, kept for backward compatibility
"""
for ant in range(8):
cal_coefficients = [[1.0, 0.0, 0.0, 1.0] for ch in range(self.nof_chans)]
for ch in range(self.nof_chans):
region = self.region_sel[int(ch // 8)]
beam = self.beam_index[region]
freq = ch + self.region_off[region]
coef_curve = self.cal_curve[beam][ant][freq]
rm_0 = self.rot_matrix[beam][0]
rm_1 = self.rot_matrix[beam][1]
rm_2 = self.rot_matrix[beam][2]
rm_3 = self.rot_matrix[beam][3]
cc_0 = coef_curve[0] * self.antenna_tapering[ant]
cc_1 = coef_curve[1] * self.antenna_tapering[ant]
cc_2 = coef_curve[2] * self.antenna_tapering[ant]
cc_3 = coef_curve[3] * self.antenna_tapering[ant]
rc_0 = rm_0 * cc_0 + rm_2 * cc_1
rc_1 = rm_1 * cc_0 + rm_3 * cc_1
rc_2 = rm_0 * cc_2 + rm_2 * cc_3
rc_3 = rm_1 * cc_2 + rm_3 * cc_3
cal_coefficients[ch] = [rc_0, rc_1, rc_2, rc_3]
self.load_calibration(ant, cal_coefficients)
[docs]
def check_errors(self):
tlast_error = self.board[f"{self._device}.beamf_fd.errors.tlast_not_aligned"]
fifo_write_error = self.board[f"{self._device}.beamf_fd.errors.fifo_write"]
fifo_read_error = self.board[f"{self._device}.beamf_fd.errors.fifo_read"]
return not any([tlast_error, fifo_write_error, fifo_read_error])
[docs]
def clear_errors(self):
self.board[f"{self._device}.beamf_fd.errors.errors_rst"] = 1
return