#! /usr/bin/env python
from pyaavs.slack import get_slack_instance
from pyaavs.tile_wrapper import Tile
from pyfabil import Device
from future.utils import iteritems
from multiprocessing import Pool
from threading import Thread
from builtins import input
from ipaddress import IPv4Interface
import threading
import logging
import socket
import yaml
import time
import math
import sys
import os
# Define default configuration
configuration = {'tiles': None,
'time_delays': None,
'station': {
'id': 0,
'name': "Unnamed",
"number_of_antennas": 256,
'program': False,
'initialise': False,
'program_cpld': False,
'enable_test': False,
'qsfp_detection': "auto",
'start_beamformer': False,
'bitfile': None,
'channel_truncation': 5,
'channel_integration_time': -1,
'beam_integration_time': -1,
'equalize_preadu': 0,
'default_preadu_attenuation': 0,
'beamformer_scaling': 4,
'pps_delays': 0,
'use_internal_pps': False},
'observation': {
'bandwidth': 8 * (400e6 / 512.0),
'start_frequency_channel': 50e6},
'network': {
'tile_40g_subnet' : None,
'lmc': {
'tpm_cpld_port': 10000,
'lmc_ip': "10.0.10.200",
'use_teng': True,
'lmc_port': 4660,
'lmc_mac': 0x248A078F9D38,
'integrated_data_ip': "10.0.0.2",
'integrated_data_port': 5000,
'use_teng_integrated': True},
'csp_ingest': {
'src_ip': "10.0.10.254",
'dst_mac': 0x248A078F9D38,
'src_port': None,
'dst_port': 4660,
'dst_ip': "10.0.10.200",
'src_mac': None}
}
}
[docs]
def create_tile_instance(config, tile_number):
""" Add a new tile to the station
:param config: Station configuration
:param tile_number: TPM to associate tile to """
# If all traffic is going through 1G then set the destination port to
# the lmc_port. If only integrated data is going through the 1G set the
# destination port to integrated_data_port
dst_port = config['network']['lmc']['lmc_port']
lmc_ip = config['network']['lmc']['lmc_ip']
if not config['network']['lmc']['use_teng_integrated'] and \
config['network']['lmc']['use_teng']:
dst_port = config['network']['lmc']['integrated_data_port']
lmc_ip = config['network']['lmc']['integrated_data_ip']
return Tile(config['tiles'][tile_number],
config['network']['lmc']['tpm_cpld_port'],
lmc_ip,
dst_port)
[docs]
def program_cpld(params):
""" Update tile CPLD.
:param params: Contain 0) Station configuration and 1) Tile number to program CPLD """
config, tile_number = params
try:
threading.current_thread().name = config['tiles'][tile_number]
logging.info("Initialising Tile {}".format(config['tiles'][tile_number]))
# Create station instance and program CPLD
station_tile = create_tile_instance(config, tile_number)
station_tile.program_cpld(config['station']['bitfile'])
return True
except Exception as e:
logging.error("Could not program CPLD of {}: {}".format(config['tiles'][tile_number], e))
return False
[docs]
def program_fpgas(params):
""" Program FPGAs
:param params: Contain 0) Station configuration and 1) Tile number to program FPGAs """
config, tile_number = params
try:
threading.current_thread().name = config['tiles'][tile_number]
logging.info("Initialising Tile {}".format(config['tiles'][tile_number]))
# Create station instance and program FPGAs
station_tile = create_tile_instance(config, tile_number)
station_tile.program_fpgas(config['station']['bitfile'])
return True
except Exception as e:
logging.error("Could not program FPGAs of {}: {}".format(config['tiles'][tile_number], e))
return False
[docs]
def initialise_tile(params):
""" Internal connect method to thread connection
:param params: Contain 0) Station configuration and 1) Tile number to initialise """
config, tile_number = params
nof_tiles = len(config['tiles'])
this_tile_ip = socket.gethostbyname(config['tiles'][tile_number])
next_tile_ip = socket.gethostbyname(config['tiles'][(tile_number + 1) % nof_tiles])
# Default 40G network config as used at AAVS1 & AAVS2
src_ip_40g_fpga1 = f"10.0.1.{this_tile_ip.split('.')[3]}"
src_ip_40g_fpga2 = f"10.0.2.{this_tile_ip.split('.')[3]}"
dst_ip_40g_fpga1 = f"10.0.1.{next_tile_ip.split('.')[3]}"
dst_ip_40g_fpga2 = f"10.0.2.{next_tile_ip.split('.')[3]}"
netmask_40g = None
gateway_ip_40g = None
# Support for SKA-Low 40G network config with specified subnet
if config['network']['tile_40g_subnet'] is not None:
next_tile_number = tile_number + 1
ip_intf_40g = IPv4Interface(config['network']['tile_40g_subnet'])
first_40g_ip = ip_intf_40g.ip
netmask_40g = ip_intf_40g.network.netmask
gateway_ip_40g = ip_intf_40g.network.broadcast_address - 1
src_ip_40g_fpga1 = str(first_40g_ip + 2*tile_number)
src_ip_40g_fpga2 = str(first_40g_ip + 2*tile_number + 1)
dst_ip_40g_fpga1 = str(first_40g_ip + 2*next_tile_number)
dst_ip_40g_fpga2 = str(first_40g_ip + 2*next_tile_number + 1)
logging.info(f"Configuring TPM {tile_number} 40G Interfaces with IPs {src_ip_40g_fpga1} & {src_ip_40g_fpga2}, netmask {netmask_40g} and gateway {gateway_ip_40g}")
# Convert IPv4Address type to hex if not None
netmask_40g = int(netmask_40g) if netmask_40g is not None else netmask_40g
gateway_ip_40g = int(gateway_ip_40g) if gateway_ip_40g is not None else gateway_ip_40g
src_port_40g = config['network']['csp_ingest']['src_port']
dst_port_40g = config['network']['csp_ingest']['dst_port']
rx_port_40g_single_port_mode = config['network']['csp_ingest']['dst_port'] + 2
dst_port_40g_single_port_mode = rx_port_40g_single_port_mode
is_first = tile_number == 0
is_last = tile_number == nof_tiles - 1
if is_last:
if config['network']['csp_ingest']['dst_ip'] != "0.0.0.0":
dst_ip_40g_fpga1=config['network']['csp_ingest']['dst_ip']
dst_ip_40g_fpga2=config['network']['csp_ingest']['dst_ip']
dst_port_40g_single_port_mode = config['network']['csp_ingest']['dst_port']
# If all traffic is going through 40G then set the destination port to
# the lmc_port. If only integrated data is going through the 40G set the
# destination port to integrated_data_port
lmc_dst_port=config['network']['lmc']['lmc_port']
lmc_dst_ip = config['network']['lmc']['lmc_ip']
if config['network']['lmc']['use_teng_integrated'] and not config['network']['lmc']['use_teng']:
lmc_dst_port=config['network']['lmc']['integrated_data_port']
lmc_dst_ip = config['network']['lmc']['integrated_data_ip']
# get pps delay for current tile
pps_delay = 0
if config['station'].get('pps_delays', None) is not None:
logging.info("Loading PPS delays")
if type(config['station']['pps_delays']) is int:
pps_delay = config['station']['pps_delays']
elif len(config['station']['pps_delays']) != nof_tiles:
logging.warning("Incorrect number of PPS delays specified, must match number of TPMs. Ignoring")
else:
pps_delay = config['station']['pps_delays'][tile_number]
# get time delays for current tile
time_delays = 0
if config['time_delays'] is not None:
if len(config['time_delays']) != nof_tiles:
logging.warning("Incorrect number of time delays specified, must match number of TPMs. Ignoring")
else:
logging.info("Setting a delay of {}ns to tile {}".format(config['time_delays'][tile_number], tile_number))
time_delays = configuration['time_delays'][tile_number]
try:
threading.current_thread().name = config['tiles'][tile_number]
logging.info("Initialising Tile {}".format(config['tiles'][tile_number]))
threading.current_thread().name = config['tiles'][tile_number]
# Create station instance and initialise
station_tile = create_tile_instance(config, tile_number)
station_tile.initialise(
station_id=config['station']['id'],
tile_id=tile_number,
lmc_use_40g=config['network']['lmc']['use_teng'],
lmc_dst_ip=lmc_dst_ip,
lmc_dst_port=lmc_dst_port,
lmc_integrated_use_40g=config['network']['lmc']['use_teng_integrated'],
src_ip_fpga1=src_ip_40g_fpga1,
src_ip_fpga2=src_ip_40g_fpga2,
dst_ip_fpga1=dst_ip_40g_fpga1,
dst_ip_fpga2=dst_ip_40g_fpga2,
src_port=src_port_40g,
dst_port=dst_port_40g,
dst_port_single_port_mode=dst_port_40g_single_port_mode,
rx_port_single_port_mode=rx_port_40g_single_port_mode,
netmask_40g=netmask_40g,
gateway_ip_40g=gateway_ip_40g,
active_40g_ports_setting="port1-only", # "port2-only", "both-ports"
qsfp_detection=config['station']['qsfp_detection'],
enable_test=config['station']['enable_test'],
use_internal_pps=config['station']['use_internal_pps'],
pps_delay=pps_delay,
time_delays=time_delays,
is_first_tile=is_first,
is_last_tile=is_last
)
# Set channeliser truncation
station_tile.set_channeliser_truncation(config['station']['channel_truncation'])
# Configure channel and beam integrated data
station_tile.stop_integrated_data()
if config['station']['channel_integration_time'] != -1:
station_tile.configure_integrated_channel_data(
config['station']['channel_integration_time'])
if config['station']['beam_integration_time'] != -1:
station_tile.configure_integrated_beam_data(
config['station']['beam_integration_time'])
return True
except Exception as e:
logging.warning("Could not initialise Tile {}: {}".format(config['tiles'][tile_number], e))
return False
[docs]
class Station(object):
""" Class representing an AAVS station """
def __init__(self, config):
""" Class constructor
:param config: Configuration dictionary for station """
# Save configuration locally
self.configuration = config
self._station_id = config['station']['id']
# Check if station name is specified
self._slack = None
if config['station']['name'] == "":
logging.warning("Station name not defined, will be able to push notifications to Slack")
else:
self._slack = get_slack_instance(config['station']['name'])
# Add tiles to station
self.tiles = []
for tile in config['tiles']:
self.add_tile(tile)
# Default duration of sleeps
self._seconds = 1.0
# Set if the station is properly configured
self.properly_formed_station = None
# Cache plugin directory
# __import__("pyaavs.tpm_test_firmware", fromlist=[None])
[docs]
def add_tile(self, tile_ip):
""" Add a new tile to the station
:param tile_ip: Tile IP to be added to station """
# If all traffic is going through 1G then set the destination port to
# the lmc_port. If only integrated data is going through the 1G set the
# destination port to integrated_data_port
dst_port = self.configuration['network']['lmc']['lmc_port']
lmc_ip = self.configuration['network']['lmc']['lmc_ip']
if not self.configuration['network']['lmc']['use_teng_integrated'] and \
self.configuration['network']['lmc']['use_teng']:
dst_port = self.configuration['network']['lmc']['integrated_data_port']
lmc_ip = self.configuration['network']['lmc']['integrated_data_ip']
self.tiles.append(Tile(tile_ip,
self.configuration['network']['lmc']['tpm_cpld_port'],
lmc_ip,
dst_port))
[docs]
def connect(self):
""" Initialise all tiles """
# Start with the assumption that the station will be properly formed
self.properly_formed_station = True
# Create a pool of nof_tiles processes
pool = None
if any([self.configuration['station']['program_cpld'],
self.configuration['station']['program'],
self.configuration['station']['initialise']]):
pool = Pool(len(self.tiles))
# Create parameters for processes
params = tuple([(self.configuration, i) for i in range(len(self.tiles))])
# Check if we are programming the CPLD, and if so program
if self.configuration['station']['program_cpld']:
logging.info("Programming CPLD")
self._slack.info("CPLD is being updated for tiles: {}".format(self.tiles))
res = pool.map(program_cpld, params)
if not all(res):
logging.error("Could not program TPM CPLD!")
self.properly_formed_station = False
# Check if programming is required, and if so program
if self.configuration['station']['program'] and self.properly_formed_station:
logging.info("Programming tiles")
self._slack.info("Station is being programmed")
res = pool.map(program_fpgas, params)
if not all(res):
logging.error("Could not program tiles!")
self.properly_formed_station = False
# Check if initialisation is required, and if so initialise
if self.configuration['station']['initialise'] and self.properly_formed_station:
logging.info("Initialising tiles")
self._slack.info("Station is being initialised")
res = pool.map(initialise_tile, params)
if not all(res):
logging.error("Could not initialise tiles!")
self.properly_formed_station = False
# Ready from pool
if pool is not None:
pool.terminate()
# Connect all tiles
for tile in self.tiles:
tile.connect()
# Initialise if required
if self.configuration['station']['initialise'] and self.properly_formed_station:
logging.info("Initializing tile and station beamformer")
start_channel = int(round(self.configuration['observation']['start_frequency_channel'] / (400e6 / 512.0)))
nof_channels = max(int(round(self.configuration['observation']['bandwidth'] / (400e6 / 512.0))), 8)
if self.configuration['station']['start_beamformer']:
logging.info("Station beamformer enabled")
self._slack.info("Station beamformer enabled with start frequency {:.2f} MHz and bandwidth {:.2f} MHz".format(
self.configuration['observation']['start_frequency_channel'] * 1e-6,
self.configuration['observation']['bandwidth'] * 1e-6))
# configure beamformer
if self.tiles[0].tpm.tpm_test_firmware[0].tile_beamformer_implemented and \
self.tiles[0].tpm.tpm_test_firmware[0].station_beamformer_implemented:
for i, tile in enumerate(self.tiles):
# Initialise beamformer
tile.initialise_beamformer(start_channel, nof_channels)
# Define SPEAD header
# TODO: Insert proper values here
if i == len(self.tiles) - 1:
tile.define_spead_header(station_id=self._station_id,
subarray_id=0,
nof_antennas=self.configuration['station']['number_of_antennas'],
ref_epoch=-1,
start_time=0)
# Start beamformer
if self.configuration['station']['start_beamformer']:
logging.info("Starting station beamformer")
tile.start_beamformer(start_time=0, duration=-1)
# Set beamformer scaling
for tile in self.tiles:
for station_beamf in tile.tpm.station_beamf:
station_beamf.set_csp_rounding(
self.configuration['station']['beamformer_scaling']
)
# If in testing mode, override tile-specific test generators
if self.configuration['station']['enable_test']:
for tile in self.tiles:
for gen in tile.tpm.test_generator:
gen.channel_select(0x0000)
gen.disable_prdg()
for tile in self.tiles:
for gen in tile.tpm.test_generator:
gen.set_tone(0, 100 * 400e6 / 512, 1)
gen.set_tone(1, 100 * 400e6 / 512, 0)
gen.channel_select(0xFFFF)
# if self['fpga1.regfile.feature.xg_eth_implemented'] == 1:
# for tile in self.tiles:
# tile.reset_eth_errors()
# time.sleep(1)
# for tile in self.tiles:
# tile.check_arp_table()
# If initialising, synchronise all tiles in station
logging.info("Synchronising station")
self._check_pps_sampling_synchronisation()
self._check_time_synchronisation()
self._start_acquisition()
# Setting PREADU values
att_value = self.configuration['station']['default_preadu_attenuation']
if not (self.configuration['station']['enable_test'] or att_value == -1):
# Set default preadu attenuation
time.sleep(1)
logging.info("Setting default PREADU attenuation to {}".format(att_value))
for tile in self.tiles:
tile.set_preadu_attenuation(att_value)
# If equalization is required, do it
if self.configuration['station']['equalize_preadu'] != 0:
logging.info("Equalizing PREADU signals to ADU RMS {}".format(
self.configuration['station']['equalize_preadu'])
)
self._slack.info("Station gains are being equalized to ADU RMS {}".format(
self.configuration['station']['equalize_preadu'])
)
for tile in self.tiles:
tile.equalize_preadu_gain(self.configuration['station']['equalize_preadu'])
elif not self.properly_formed_station:
logging.warning("Some tiles were not initialised or programmed. Not forming station")
# If not initialising, check that station is formed properly
else:
self.check_station_status()
[docs]
def check_station_status(self):
""" Check that the station is still valid """
tile_ids = []
for tile in self.tiles:
if tile.tpm is None:
self.properly_formed_station = False
break
tile_id = tile.get_tile_id()
if tile.get_tile_id() < len(self.tiles) and tile_id not in tile_ids:
tile_ids.append(tile_id)
else:
self.properly_formed_station = False
break
if not self.properly_formed_station:
logging.warning("Station configuration is incorrect (unreachable TPMs or incorrect tile ids)!")
def _check_time_synchronisation(self):
""" Check UTC time synchronisation across tiles and FPGAs. Re-write UTC time when check fails """
pps_detect = self['fpga1.pps_manager.pps_detected']
logging.debug("FPGA1 PPS detection register is ({})".format(pps_detect))
pps_detect = self['fpga2.pps_manager.pps_detected']
logging.debug("FPGA2 PPS detection register is ({})".format(pps_detect))
# Repeat operation until Tiles are synchronised, synchronise all tiles to the first tile
max_attempts = 5
for n in range(max_attempts):
# Read the current time on first tile
self.tiles[0].wait_pps_event()
# PPS edge detected, read time from first tile
curr_time = self.tiles[0].get_fpga_time(Device.FPGA_1)
times = set()
for tile in self.tiles:
times.add(tile.get_fpga_time(Device.FPGA_1))
times.add(tile.get_fpga_time(Device.FPGA_2))
if len(times) == 1:
logging.info("Tiles in station synchronised, time is %d" % curr_time)
break
else:
logging.info("Re-Synchronising tiles in station with time %d" % curr_time)
for tile in self.tiles:
tile.set_fpga_time(Device.FPGA_1, curr_time)
tile.set_fpga_time(Device.FPGA_2, curr_time)
if n == max_attempts - 1:
logging.error("Not possible to synchronise station UTC time across tiles")
def _start_acquisition(self):
# Check if ARP table is populated before starting
for tile in self.tiles:
tile.reset_eth_errors()
tile.check_arp_table()
# Start data acquisition on all boards
delay = 2
t0 = self.tiles[0].get_fpga_time(Device.FPGA_1)
for tile in self.tiles:
tile.start_acquisition(start_time=t0, delay=delay)
t1 = self.tiles[0].get_fpga_time(Device.FPGA_1)
if t0 + delay <= t1:
logging.error("Start data acquisition not synchronised! Rerun initialisation")
exit()
logging.info("Waiting for start acquisition")
wait_timeout = 2000
for n in range(wait_timeout):
rd_vld = self['fpga1.dsp_regfile.stream_status.channelizer_vld'] + \
self['fpga2.dsp_regfile.stream_status.channelizer_vld']
if all(rd_vld):
break
else:
time.sleep(0.001)
if n == wait_timeout - 1:
logging.error("Start data acquisition timeout! Rerun initialisation")
exit()
def _check_pps_sampling_synchronisation(self):
""" Post tile configuration synchronization """
# Station synchronisation loop
max_delay_skew = 4
sync_loop = 0
max_sync_loop = 5
while sync_loop < max_sync_loop:
self.tiles[0].wait_pps_event()
sync_loop += 1
# get the PPS delay from tile
measured_delay = [tile.get_pps_delay() for tile in self.tiles]
# check if there is too much skew
synced = True
for n in range(len(self.tiles) - 1):
if abs(measured_delay[0] - measured_delay[n + 1]) > max_delay_skew:
synced = False
if synced:
# if skew is not too much, the tiles are synchronised
phase1 = [hex(tile['fpga1.pps_manager.sync_phase']) for tile in self.tiles]
phase2 = [hex(tile['fpga2.pps_manager.sync_phase']) for tile in self.tiles]
logging.debug("Final FPGA1 clock phase ({})".format(phase1))
logging.debug("Final FPGA2 clock phase ({})".format(phase2))
logging.info("Finished PPS sampling synchronisation check ({})".format(measured_delay))
return measured_delay
else:
# if skew is too much, repeat the synchronisation using the first tile as reference
logging.warning("Resynchronizing station ({})".format(measured_delay))
for n in range(len(self.tiles)):
self.tiles[n].set_pps_sampling(measured_delay[0], 4)
logging.error("Station PPS sampling synchronisation check failed!")
# ------------------------------------------------------------------------------------------------
[docs]
def test_generator_set_tone(self, dds, frequency=100e6, ampl=0.0, phase=0.0, delay=512):
t0 = self.tiles[0]["fpga1.pps_manager.timestamp_read_val"] + delay
for tile in self.tiles:
for gen in tile.tpm.test_generator:
gen.set_tone(dds, frequency, ampl, phase, t0)
t1 = self.tiles[0]["fpga1.pps_manager.timestamp_read_val"]
if t1 > t0:
logging.info("Set tone test pattern generators synchronisation failed.")
[docs]
def test_generator_disable_tone(self, dds, delay=512):
t0 = self.tiles[0]["fpga1.pps_manager.timestamp_read_val"] + delay
for tile in self.tiles:
for gen in tile.tpm.test_generator:
gen.set_tone(dds, 0, 0, 0, t0)
t1 = self.tiles[0]["fpga1.pps_manager.timestamp_read_val"]
if t1 > t0:
logging.info("Set tone test pattern generators synchronisation failed.")
[docs]
def test_generator_set_noise(self, ampl=0.0, delay=512):
t0 = self.tiles[0]["fpga1.pps_manager.timestamp_read_val"] + delay
for tile in self.tiles:
for gen in tile.tpm.test_generator:
gen.enable_prdg(ampl, t0)
t1 = self.tiles[0]["fpga1.pps_manager.timestamp_read_val"]
if t1 > t0:
logging.info("Set tone test pattern generators synchronisation failed.")
# --------------------------------- CALIBRATION OPERATIONS ---------------------------------------
[docs]
def calibrate_station(self, coefficients, switch_time=2048):
"""Coefficients is a 3D complex array of the form [antenna, channel, polarization], with each
element representing a normalized coefficient, with (1.0, 0.0) the normal, expected response
for an ideal antenna. Antenna is the index specifying the antenna within the index (using
correlator indexing). Channel is the index specifying the channels at the beamformer output,
i.e. considering only those channels actually processed and beam assignments. The polarization
index ranges from 0 to 3.
0: X polarization direct element
1: X->Y polarization cross element
2: Y->X polarization cross element
3: Y polarization direct element"""
# Check that we have the correct coefficients shape
nof_channels = int(round(self.configuration['observation']['bandwidth'] / (400e6 / 512.0)))
if coefficients.shape != (len(self.tiles) * 16, nof_channels, 4):
logging.error("Coefficients shape mismatch. Should be ({},{},4), is ({}). Not calibrating".format(
len(self.tiles) * 16, nof_channels, coefficients.shape))
return
t0 = time.time()
# Download coefficients
for i, tile in enumerate(self.tiles):
for antenna in range(16):
tile.load_calibration_coefficients(antenna,
coefficients[i * 16 + antenna, :, :].tolist())
t1 = time.time()
logging.info("Downloaded coefficients to tiles in {0:.2}s".format(t1 - t0))
# Done downloading coefficient, switch calibration bank
self.switch_calibration_banks(switch_time)
self._slack.info("Calibration coefficients loaded to station")
logging.info("Switched calibration banks")
[docs]
def switch_calibration_banks(self, switch_time=0):
""" Switch calibration bank on all tiles"""
if switch_time == 0:
switch_time = self.tiles[0].current_tile_beamformer_frame() + 64
else:
switch_time = self.tiles[0].current_tile_beamformer_frame() + switch_time
# Switch calibration bank on all tiles
for tile in self.tiles:
tile.beamf_fd[0].switch_calibration_bank(switch_time)
tile.beamf_fd[1].switch_calibration_bank(switch_time)
if switch_time < self.tiles[0].current_tile_beamformer_frame():
logging.warning("Calibration switching not synchronised!")
[docs]
def load_pointing_delay(self, load_time=0):
""" Load pointing delays on all tiles """
if load_time == 0:
load_time = self.tiles[0].current_tile_beamformer_frame() + 64
else:
load_time = self.tiles[0].current_tile_beamformer_frame() + load_time
# Load pointing delays
for tile in self.tiles:
tile.tpm.beamf_fd[0].load_delay(load_time)
tile.tpm.beamf_fd[1].load_delay(load_time)
if load_time < self.tiles[0].current_tile_beamformer_frame():
logging.warning("Delay loading not synchronised!")
self._slack.info("Pointing delays loaded to station")
# ------------------------------------ DATA OPERATIONS -------------------------------------------
[docs]
def send_raw_data(self, sync=False):
""" Send raw data from all Tiles """
self._wait_available()
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.send_raw_data(sync=sync, timestamp=t0, seconds=self._seconds)
return self._check_data_sync(t0)
[docs]
def send_raw_data_synchronised(self):
""" Send synchronised raw data from all Tiles """
self._wait_available()
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.send_raw_data_synchronised(timestamp=t0, seconds=self._seconds)
return self._check_data_sync(t0)
[docs]
def send_channelised_data(self, number_of_samples=1024, first_channel=0, last_channel=511):
""" Send channelised data from all Tiles """
self._wait_available()
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.send_channelised_data(number_of_samples=number_of_samples, first_channel=first_channel,
last_channel=last_channel,
timestamp=t0, seconds=self._seconds)
return self._check_data_sync(t0)
[docs]
def send_beam_data(self):
""" Send beam data from all Tiles """
self._wait_available()
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.send_beam_data(timestamp=t0, seconds=self._seconds)
return self._check_data_sync(t0)
[docs]
def send_channelised_data_continuous(self, channel_id, number_of_samples=65536):
""" Send continuous channelised data from all Tiles """
self.stop_data_transmission()
self._wait_available()
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.send_channelised_data_continuous(channel_id=channel_id, number_of_samples=number_of_samples,
timestamp=t0, seconds=self._seconds)
return self._check_data_sync(t0)
[docs]
def send_channelised_data_narrowband(self, frequency, round_bits, number_of_samples=256):
""" Send narrowband continuous channel data """
# Check if feature is available
if len(self.tiles[0].find_register("fpga1.lmc_gen.channelized_ddc_mode")) == 0:
logging.warning("Downloaded firwmare does not support narrowband channels")
return
self.stop_data_transmission()
self._wait_available()
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.send_channelised_data_narrowband(frequency=frequency, round_bits=round_bits,
number_of_samples=number_of_samples,
timestamp=t0, seconds=self._seconds)
return self._check_data_sync(t0)
[docs]
def stop_data_transmission(self):
""" Stop data transmission """
logging.info("Stopping data transmission")
for tile in self.tiles:
tile.stop_data_transmission()
[docs]
def stop_integrated_data(self):
""" Stop integrated data transmission """
for tile in self.tiles:
tile.stop_integrated_data()
def _wait_available(self):
""" Make sure all boards can send data """
while any([tile.check_pending_data_requests() for tile in self.tiles]):
logging.info("Waiting for pending data requests to finish")
time.sleep(0.1)
def _check_data_sync(self, t0):
""" Check whether data synchronisation worked """
delay = self._seconds * (1 / (1080 * 1e-9) / 256)
timestamps = [tile.get_fpga_timestamp(Device.FPGA_1) for tile in self.tiles]
logging.debug("Data sync check: timestamp={}, delay={}".format(str(timestamps), delay))
return all([(t0 + delay) > t1 for t1 in timestamps])
# ------------------------------------ MULTICHANNEL TX DATA OPERATIONS -----------------------------
[docs]
def set_multi_channel_tx(self, instance_id, channel_id, destination_id):
""" Set multichannel transmitter instance
:param instance_id: Transmitter instance ID
:param channel_id: Channel ID
:param destination_id: 40G destination ID"""
for tile in self.tiles:
tile.set_multi_channel_tx(instance_id, channel_id, destination_id)
[docs]
def start_multi_channel_tx(self, instances, seconds=0.2):
""" Start multichannel data transmission from the TPM
:param instances: 64 bit integer, each bit addresses the corresponding TX transmitter
:param seconds: synchronisation delay ID"""
t0 = self.tiles[0].get_fpga_timestamp(Device.FPGA_1)
for tile in self.tiles:
tile.start_multi_channel_tx(instances, t0, seconds=1)
[docs]
def stop_multi_channel_tx(self):
""" Stop multichannel TX data transmission """
for tile in self.tiles:
tile.stop_multi_channel_tx()
[docs]
def set_multi_channel_dst_ip(self, dst_ip, destination_id):
for tile in self.tiles:
tile.set_multi_channel_dst_ip(dst_ip, destination_id)
# ------------------------------------------- TEST FUNCTIONS ---------------------------------------
# ------------------------------------------- OVERLOADED FUNCTIONS ---------------------------------------
def __getitem__(self, key):
""" Read register across all tiles """
return [tile.tpm[key] for tile in self.tiles]
def __setitem__(self, key, value):
""" Write register across all tiles """
for tile in self.tiles:
tile.tpm[key] = value
[docs]
def apply_config_file(input_dict, output_dict):
""" Recursively copy value from input_dict to output_dict"""
for k, v in iteritems(input_dict):
if type(v) is dict:
apply_config_file(v, output_dict[k])
elif k not in list(output_dict.keys()):
logging.warning("{} not a valid configuration item. Skipping".format(k))
else:
output_dict[k] = v
[docs]
def load_configuration_file(filepath):
""" Load station configuration from configuration file """
if filepath is not None:
if not os.path.exists(filepath) or not os.path.isfile(filepath):
logging.error("Specified configuration file ({}) does not exist. Exiting".format(filepath))
exit()
# Configuration file defined, load and update default configuration
with open(filepath, 'r') as f:
c = yaml.load(f, Loader=yaml.FullLoader)
apply_config_file(c, configuration)
# Fix beam bandwidth and start frequency (in case they were written in scientific notation)
configuration['observation']['bandwidth'] = \
float(configuration['observation']['bandwidth'])
configuration['observation']['start_frequency_channel'] = \
float(configuration['observation']['start_frequency_channel'])
else:
logging.error("No configuration file specified. Exiting")
exit()
[docs]
def load_station_configuration(config_params):
""" Combine configuration defined in configuration file with command-line arguments """
# If a configuration file is defined, check if it exists and load it
load_configuration_file(config_params.config)
# Go through command line options and update where necessary
if config_params.beam_bandwidth is not None:
configuration['observation']['bandwidth'] = config_params.beam_bandwidth
if config_params.beam_integ is not None:
configuration['station']['beam_integration_time'] = config_params.beam_integ
if config_params.beam_scaling is not None:
configuration['station']['beamformer_scaling'] = config_params.beam_scaling
if config_params.beamf_start is not None:
configuration['station']['start_beamformer'] = config_params.beamf_start
if config_params.bitfile is not None:
configuration['station']['bitfile'] = config_params.bitfile
if config_params.chan_trunc is not None:
configuration['station']['channel_truncation'] = config_params.chan_trunc
if config_params.channel_integ is not None:
configuration['station']['channel_integration_time'] = config_params.channel_integ
if config_params.enable_test is not None:
configuration['station']['enable_test'] = config_params.enable_test
if config_params.qsfp_detection is not None:
configuration['station']['qsfp_detection'] = config_params.qsfp_detection
# if config_params.use_internal_pps is True: # Not clear how to use the command line option wrt value set in config file
# configuration['station']['use_internal_pps'] = True
if config_params.initialise is not None:
configuration['station']['initialise'] = config_params.initialise
if config_params.lmc_ip is not None:
configuration['network']['lmc']['lmc_ip'] = config_params.lmc_ip
if config_params.lmc_mac is not None:
configuration['network']['lmc']['lmc_mac'] = config_params.lmc_mac
if config_params.lmc_port is not None:
configuration['network']['lmc']['lmc_port'] = config_params.lmc_port
if config_params.port is not None:
configuration['network']['lmc']['tpm_cpld_port'] = config_params.port
if config_params.program is not None:
configuration['station']['program'] = config_params.program
if config_params.program_cpld is not None:
configuration['station']['program_cpld'] = config_params.program_cpld
if config_params.start_frequency_channel is not None:
configuration['observation']['start_frequency_channel'] = config_params.start_frequency_channel
if config_params.tiles is not None:
configuration['tiles'] = config_params.tiles.split(',')
if config_params.use_teng is not None:
configuration['network']['lmc']['use_teng'] = config_params.use_teng
return configuration
if __name__ == "__main__":
import pyaavs.logger
from optparse import OptionParser
from sys import argv, stdout
parser = OptionParser(usage="usage: %station [options]")
parser.add_option("--config", action="store", dest="config",
type="str", default=None, help="Configuration file [default: None]")
parser.add_option("--port", action="store", dest="port",
type="int", default=None, help="Port [default: None]")
parser.add_option("--lmc_ip", action="store", dest="lmc_ip",
default=None, help="IP [default: None]")
parser.add_option("--lmc_port", action="store", dest="lmc_port",
type="int", default=None, help="Port [default: None]")
parser.add_option("--lmc-mac", action="store", dest="lmc_mac",
type="int", default=None, help="LMC MAC address [default: None]")
parser.add_option("-f", "--bitfile", action="store", dest="bitfile",
default=None, help="Bitfile to use (-P still required) [default: None]")
parser.add_option("-t", "--tiles", action="store", dest="tiles",
default=None, help="Tiles to add to station [default: None]")
parser.add_option("-P", "--program", action="store_true", dest="program",
default=False, help="Program FPGAs [default: False]")
parser.add_option("-I", "--initialise", action="store_true", dest="initialise",
default=False, help="Initialise TPM [default: False]")
parser.add_option("-C", "--program_cpld", action="store_true", dest="program_cpld",
default=False, help="Update CPLD firmware (requires -f option) [default: False]")
parser.add_option("-T", "--enable-test", action="store_true", dest="enable_test",
default=False, help="Enable test pattern [default: False]")
# parser.add_option("--use_internal_pps", action="store_true", dest="use_internal_pps",
# default=False, help="Enable internal PPS generator ['default: False]")
parser.add_option("--qsfp_detection", action="store", dest="qsfp_detection",
default=None, help="Force QSFP cable detection: auto, qsfp1, qsfp2, all, none [default: auto]")
parser.add_option("--use_teng", action="store_true", dest="use_teng",
default=None, help="Use 10G for LMC [default: None]")
parser.add_option("--chan-trunc", action="store", dest="chan_trunc",
default=None, type="int", help="Channeliser truncation [default: None]")
parser.add_option("-B", "--beamf_start", action="store_true", dest="beamf_start",
default=False, help="Start network beamformer [default: False]")
parser.add_option("--channel-integration-time", action="store", dest="channel_integ",
type="float", default=None, help="Integrated channel integration time [default: None]")
parser.add_option("--beam-integration-time", action="store", dest="beam_integ",
type="float", default=None, help="Integrated beam integration time [default: None]")
parser.add_option("--beamformer-scaling", action="store", dest="beam_scaling",
type="int", default=None, help="Beamformer scaling [default: None]")
parser.add_option("--beam-start_frequency", action="store", dest="start_frequency_channel",
type="float", default=None, help="Beamformer scaling [default: None]")
parser.add_option("--beam-bandwidth", action="store", dest="beam_bandwidth",
type="float", default=None, help="Beamformer scaling [default: None]")
parser.add_option("--fft_sign_invert", action="store_true", dest="fft_sign_invert",
default=False, help="Conjugate FFT output [default: False]")
parser.add_option("--debug", action="store_true", dest="debug",
default=False, help="Set console output to DEBUG log level [default: False]")
(conf, args) = parser.parse_args(argv[1:])
if conf.debug:
pyaavs.logger.set_console_log_level("DEBUG")
# Set current thread name
threading.current_thread().name = "Station"
# Load station configuration
configuration = load_station_configuration(conf)
# Create station
station = Station(configuration)
# Connect station (program, initialise and configure if required)
station.connect()
if conf.fft_sign_invert:
station['fpga1.dsp_regfile.channelizer_config.fft_conjugate'] = 1
station['fpga2.dsp_regfile.channelizer_config.fft_conjugate'] = 1