Source code for ska_low_sps_tpm_api.plugins.integrator

import time
from math import ceil, log

from ska_low_sps_tpm_api.base.definitions import *
from ska_low_sps_tpm_api.base.utils import *
from ska_low_sps_tpm_api.plugins.firmwareblock import FirmwareBlock

__author__ = "Riccardo Chiello"


[docs] class TpmIntegrator(FirmwareBlock): """TpmIntegrator tests class"""
[docs] @compatibleboards(BoardMake.TpmBoard) @friendlyname("tpm_integrator") @maxinstances(16) def __init__( self, board, fsample=800e6, nof_frequency_channels=512, oversampling_factor=32.0 / 27.0, core="", logger=None, **kwargs, ): """ TpmIntegrator initialiser. :param board: Pointer to board instance """ super(TpmIntegrator, self).__init__(board, logger=logger) self._board_type = kwargs.get("board_type", "XTPM") if "device" not in list(kwargs.keys()): raise PluginError("TpmFpga: Require a node instance") self._device = kwargs["device"] if self._device == Device.FPGA_1: self._device = "fpga1" elif self._device == Device.FPGA_2: self._device = "fpga2" else: raise PluginError(f"TpmFpga: Invalid device {self._device}") self._core = core # sampling frequency self._fsample = fsample self.requested_integration_time = -1 self.actual_integration_time = -1 self.first_channel = 0 self.last_channel = nof_frequency_channels - 1 self.time_mux_factor = 2 self.nof_frequency_channels = nof_frequency_channels self.oversampling_factor = oversampling_factor try: self.board[ f"{self._device}.lmc_integrated_gen{self._core}.regfile_tx_demux" ] self.implemented = True except: self.implemented = False
[docs] def configure( self, stage, integration_time, first_channel, last_channel, time_mux_factor, carousel_enable=0x0, download_bit_width=None, data_bit_width=None, ): if not self.implemented: self.logger.warning("Integrator is not implemented.") return self.requested_integration_time = integration_time self.actual_integration_time = 0 self.first_channel = first_channel self.last_channel = last_channel self.time_mux_factor = time_mux_factor if download_bit_width != None: download_bits = download_bit_width elif stage == "beamf": download_bits = 32 elif stage == "channel": download_bits = 16 else: self.logger.warning(f"Integrator {stage} is not implemented.") return if data_bit_width != None: data_bits = data_bit_width elif stage == "beamf": data_bits = 12 elif stage == "channel": data_bits = 8 else: self.logger.warning(f"Integrator {stage} is not implemented.") return freq = self._fsample # Calculate integration samples (including oversampling factor) spectra_per_second = (freq * self.oversampling_factor) / ( self.nof_frequency_channels * 2.0 ) integration_length = spectra_per_second * self.requested_integration_time integration_length = int(integration_length) self.actual_integration_time = integration_length / spectra_per_second min_integration_time = ( 50e6 / (self.nof_frequency_channels * 2.0) / spectra_per_second ) max_bit_width = data_bits * 2 + 1 + int(ceil(log(integration_length, 2))) scaling_factor = 0 if max_bit_width > download_bits: scaling_factor = max_bit_width - download_bits if stage == "beamf": self.logger.info( f"Setting beam integration time on {self._device} to {self.actual_integration_time:.4f}s" ) elif stage == "channel": self.logger.info( f"Setting channel integration time on {self._device} to {self.actual_integration_time:.4f}s" ) # Sanity check if self.actual_integration_time < min_integration_time: self.logger.warning( f"Integration time {self.actual_integration_time:.4f}s is less than the minimum integration time ({min_integration_time}). Settings " "integration time to minimum" ) integration_length = int(min_integration_time * spectra_per_second) # Send request to TPM self.board[ f"{self._device}.lmc_integrated_gen{self._core}.{stage}_start_read_channel" ] = (self.first_channel // self.time_mux_factor) self.board[ f"{self._device}.lmc_integrated_gen{self._core}.{stage}_last_read_channel" ] = (self.last_channel + 1) // self.time_mux_factor - 1 self.board[f"{self._device}.lmc_integrated_gen{self._core}.{stage}_enable"] = ( 0x0 ) self.board[ f"{self._device}.lmc_integrated_gen{self._core}.{stage}_scaling_factor" ] = scaling_factor self.board[ f"{self._device}.lmc_integrated_gen{self._core}.{stage}_integration_length" ] = integration_length self.board[ f"{self._device}.lmc_integrated_gen{self._core}.{stage}_carousel_enable" ] = carousel_enable self.board[f"{self._device}.lmc_integrated_gen{self._core}.{stage}_enable"] = ( 0x1 )
[docs] def configure_download( self, download_mode, channel_payload_length=None, beam_payload_length=None ): """ Configure link mode and payload size for packets. If no channel_payload_length is provided then the value remains unchanged. If no beam_payload_length is provided then the value remains unchanged. :param download_mode: "1g" or "10g" (1G means NSDN, 10G means 40G SDN, needs refactored) :type download_mode: str :param channel_payload_length: SPEAD payload length for integrated channel data :type channel_payload_length: int :param beam_payload_length: SPEAD payload length for integrated beam data :type beam_payload_length: int """ if not self.implemented: self.logger.warning("Integrator is not implemented.") return mode = download_mode.upper() # Using 10G lane if mode == "10G": if channel_payload_length is not None and channel_payload_length >= 8193: self.logger.warning("Specified channel packet length too large for 10G") return if beam_payload_length is not None and beam_payload_length >= 8193: self.logger.warning("Specified beam packet length too large for 10G") return self.board[ f"{self._device}.lmc_integrated_gen{self._core}.regfile_tx_demux" ] = 2 # Using dedicated 1G link elif mode == "1G": self.board[ f"{self._device}.lmc_integrated_gen{self._core}.regfile_tx_demux" ] = 1 else: self.logger.warning("Supported modes are 1G, 10G") return if channel_payload_length is not None: self.board[ f"{self._device}.lmc_integrated_gen{self._core}.regfile_channel_payload_length" ] = channel_payload_length if beam_payload_length is not None: self.board[ f"{self._device}.lmc_integrated_gen{self._core}.regfile_beamf_payload_length" ] = beam_payload_length
[docs] def stop_integrated_channel_data(self): """Stop receiving integrated beam data from the board""" if not self.implemented: self.logger.warning("Integrator is not implemented.") return self.board[f"{self._device}.lmc_integrated_gen{self._core}.channel_enable"] = ( 0x0 )
[docs] def stop_integrated_beam_data(self): """Stop receiving integrated beam data from the board""" if not self.implemented: self.logger.warning("Integrator is not implemented.") return self.board[f"{self._device}.lmc_integrated_gen{self._core}.beamf_enable"] = 0x0
[docs] def stop_integrated_data(self): """Stop transmission of integrated data""" if not self.implemented: self.logger.warning("Integrator is not implemented.") return self.stop_integrated_channel_data() self.stop_integrated_beam_data()
##################### Superclass method implementations #################################
[docs] def initialise(self): """Initialise TpmIntegrator""" self.logger.info("TpmIntegrator has been initialised") return True
[docs] def status_check(self): """Perform status check. :return: Status """ self.logger.info("TpmIntegrator : Checking status") return Status.OK
[docs] def clean_up(self): """Perform cleanup. :return: Success """ self.logger.info("TpmIntegrator : Cleaning up") return True