Source code for ska_low_sps_tpm_api.plugins.integrator
import time
from math import ceil, log
from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock
__author__ = "Riccardo Chiello"
[docs]
class TpmIntegrator(FirmwareBlock):
"""TpmIntegrator tests class"""
[docs]
@compatibleboards(BoardMake.TpmBoard)
@friendlyname("tpm_integrator")
@maxinstances(16)
def __init__(
self,
board,
fsample=800e6,
nof_frequency_channels=512,
oversampling_factor=32.0 / 27.0,
core="",
logger=None,
**kwargs,
):
"""
TpmIntegrator initialiser.
:param board: Pointer to board instance
"""
super(TpmIntegrator, self).__init__(board, logger=logger)
self._board_type = kwargs.get("board_type", "XTPM")
if "device" not in list(kwargs.keys()):
raise PluginError("TpmFpga: Require a node instance")
self._device = kwargs["device"]
if self._device == Device.FPGA_1:
self._device = "fpga1"
elif self._device == Device.FPGA_2:
self._device = "fpga2"
else:
raise PluginError(f"TpmFpga: Invalid device {self._device}")
self._core = core
# sampling frequency
self._fsample = fsample
self.requested_integration_time = -1
self.actual_integration_time = -1
self.first_channel = 0
self.last_channel = nof_frequency_channels - 1
self.time_mux_factor = 2
self.nof_frequency_channels = nof_frequency_channels
self.oversampling_factor = oversampling_factor
try:
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.regfile_tx_demux"
]
self.implemented = True
except:
self.implemented = False
[docs]
def configure(
self,
stage,
integration_time,
first_channel,
last_channel,
time_mux_factor,
carousel_enable=0x0,
download_bit_width=None,
data_bit_width=None,
):
if not self.implemented:
self.logger.warning("Integrator is not implemented.")
return
self.requested_integration_time = integration_time
self.actual_integration_time = 0
self.first_channel = first_channel
self.last_channel = last_channel
self.time_mux_factor = time_mux_factor
if download_bit_width != None:
download_bits = download_bit_width
elif stage == "beamf":
download_bits = 32
elif stage == "channel":
download_bits = 16
else:
self.logger.warning(f"Integrator {stage} is not implemented.")
return
if data_bit_width != None:
data_bits = data_bit_width
elif stage == "beamf":
data_bits = 12
elif stage == "channel":
data_bits = 8
else:
self.logger.warning(f"Integrator {stage} is not implemented.")
return
freq = self._fsample
# Calculate integration samples (including oversampling factor)
spectra_per_second = (freq * self.oversampling_factor) / (
self.nof_frequency_channels * 2.0
)
integration_length = spectra_per_second * self.requested_integration_time
integration_length = int(integration_length)
self.actual_integration_time = integration_length / spectra_per_second
min_integration_time = (
50e6 / (self.nof_frequency_channels * 2.0) / spectra_per_second
)
max_bit_width = data_bits * 2 + 1 + int(ceil(log(integration_length, 2)))
scaling_factor = 0
if max_bit_width > download_bits:
scaling_factor = max_bit_width - download_bits
if stage == "beamf":
self.logger.info(
f"Setting beam integration time on {self._device} to {self.actual_integration_time:.4f}s"
)
elif stage == "channel":
self.logger.info(
f"Setting channel integration time on {self._device} to {self.actual_integration_time:.4f}s"
)
# Sanity check
if self.actual_integration_time < min_integration_time:
self.logger.warning(
f"Integration time {self.actual_integration_time:.4f}s is less than the minimum integration time ({min_integration_time}). Settings "
"integration time to minimum"
)
integration_length = int(min_integration_time * spectra_per_second)
# Send request to TPM
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.{stage}_start_read_channel"
] = (self.first_channel // self.time_mux_factor)
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.{stage}_last_read_channel"
] = (self.last_channel + 1) // self.time_mux_factor - 1
self.board[f"{self._device}.lmc_integrated_gen{self._core}.{stage}_enable"] = (
0x0
)
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.{stage}_scaling_factor"
] = scaling_factor
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.{stage}_integration_length"
] = integration_length
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.{stage}_carousel_enable"
] = carousel_enable
self.board[f"{self._device}.lmc_integrated_gen{self._core}.{stage}_enable"] = (
0x1
)
[docs]
def configure_download(
self, download_mode, channel_payload_length=None, beam_payload_length=None
):
"""
Configure link mode and payload size for packets.
If no channel_payload_length is provided then the value remains unchanged.
If no beam_payload_length is provided then the value remains unchanged.
:param download_mode: "1g" or "10g" (1G means NSDN, 10G means 40G SDN, needs refactored)
:type download_mode: str
:param channel_payload_length: SPEAD payload length for integrated channel data
:type channel_payload_length: int
:param beam_payload_length: SPEAD payload length for integrated beam data
:type beam_payload_length: int
"""
if not self.implemented:
self.logger.warning("Integrator is not implemented.")
return
mode = download_mode.upper()
# Using 10G lane
if mode == "10G":
if channel_payload_length is not None and channel_payload_length >= 8193:
self.logger.warning("Specified channel packet length too large for 10G")
return
if beam_payload_length is not None and beam_payload_length >= 8193:
self.logger.warning("Specified beam packet length too large for 10G")
return
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.regfile_tx_demux"
] = 2
# Using dedicated 1G link
elif mode == "1G":
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.regfile_tx_demux"
] = 1
else:
self.logger.warning("Supported modes are 1G, 10G")
return
if channel_payload_length is not None:
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.regfile_channel_payload_length"
] = channel_payload_length
if beam_payload_length is not None:
self.board[
f"{self._device}.lmc_integrated_gen{self._core}.regfile_beamf_payload_length"
] = beam_payload_length
[docs]
def stop_integrated_channel_data(self):
"""Stop receiving integrated beam data from the board"""
if not self.implemented:
self.logger.warning("Integrator is not implemented.")
return
self.board[f"{self._device}.lmc_integrated_gen{self._core}.channel_enable"] = (
0x0
)
[docs]
def stop_integrated_beam_data(self):
"""Stop receiving integrated beam data from the board"""
if not self.implemented:
self.logger.warning("Integrator is not implemented.")
return
self.board[f"{self._device}.lmc_integrated_gen{self._core}.beamf_enable"] = 0x0
[docs]
def stop_integrated_data(self):
"""Stop transmission of integrated data"""
if not self.implemented:
self.logger.warning("Integrator is not implemented.")
return
self.stop_integrated_channel_data()
self.stop_integrated_beam_data()
##################### Superclass method implementations #################################
[docs]
def initialise(self):
"""Initialise TpmIntegrator"""
self.logger.info("TpmIntegrator has been initialised")
return True
[docs]
def status_check(self):
"""Perform status check.
:return: Status
"""
self.logger.info("TpmIntegrator : Checking status")
return Status.OK
[docs]
def clean_up(self):
"""Perform cleanup.
:return: Success
"""
self.logger.info("TpmIntegrator : Cleaning up")
return True