Tile subpackage

This subpackage implement tile functionality for the MCCS.

It includes an operational Tango device, a demonstrator Tango device, modules for driving and simulating TPM hardware, and a CLI.

class DemoTile(*args, **kwargs)

Version of the MccsTile tango device with extra methods for testing/demos.

init_device()

Tango hook for initialisation code.

Overridden here to log the fact that this is a demo tile.

Return type:

None

class DynamicTileSimulator(logger)

A simulator for a TPM, with dynamic value updates to certain attributes.

__init__(logger)

Initialise a new Dynamic Tile simulator instance.

Parameters:

logger (Logger) – a logger for this simulator to use

property board_temperature: float | None

Return the temperature of the TPM.

Returns:

the temperature of the TPM

cleanup()

Garbage-collection hook.

Return type:

None

property current: float

Return the current of the TPM.

Returns:

the current of the TPM

property fpga1_temperature: float

Return the temperature of FPGA 1.

Returns:

the temperature of FPGA 1

property fpga2_temperature: float

Return the temperature of FPGA 2.

Returns:

the temperature of FPGA 2

get_board_temperature()
Return type:

Optional[float]

Returns:

the mocked board temperature.

get_current()
Return type:

Optional[float]

Returns:

the mocked current.

get_fpga0_temperature()
Return type:

Optional[float]

Returns:

the mocked fpga0 temperature.

get_fpga1_temperature()
Return type:

Optional[float]

Returns:

the mocked fpga1 temperature.

get_voltage()
Return type:

Optional[float]

Returns:

the mocked voltage.

classmethod random_antenna_generator()

Generate a random antenna/pol number.

Yields:

a random antenna/pol number.

property voltage: float

Return the voltage of the TPM.

Returns:

the voltage of the TPM

class DynamicValuesGenerator(soft_min, soft_max, window_size=20, in_range_rate=0.95)

A generator of dynamic values with the following properties.

  • We want the values to gradually walk around their range rather than randomly jumping around. i.e. we want values to be temporally correlated. We achieve this by calculating values as a sliding window sum of a sequence of independent (uncorrelated) random values.

  • The provided range is a “soft” range – we allow values to walk outside this range occasionally. The proportion of time that the values should stay within the required range is exposed as an argument. This is useful for testing the alarm conditions of TANGO attributes: we set the soft range of this generator to the attribute’s alarm range, and we specify how often the attribute should exceed that range and thus start alarming.

__init__(soft_min, soft_max, window_size=20, in_range_rate=0.95)

Create a new instance.

Parameters:
  • soft_min (float) – a “soft” minimum value. For TANGO device attributes, this should be the alarm minimum.

  • soft_max (float) – a “soft” maximum value. For TANGO device attributes, this should be the alarm maximum.

  • window_size (int) – the size of the sliding window to sum over. A value of 1 will give uncorrelated values. Increasing the value increases correlation – a graph of how the value changes over time will be smoother. The default is 20.

  • in_range_rate (float) – the proportion of time during which the value should remain within the [soft_min, soft_max] range. The default is 0.95. Don’t change this to 1.0 unless you want the variance to collapse: you’ll get the mean of the range every time.

class DynamicValuesUpdater(update_rate=1.0)

An dynamic updater of values, for use in a dynamic simulator.

__init__(update_rate=1.0)

Create a new instance.

Parameters:

update_rate (float) – how often, in seconds, the target values should be updated. Defaults to 1 second.

add_target(generator, callback)

Add a new target to be updated.

Parameters:
  • generator (Any) – the generator of values to be used as updates

  • callback (Callable) – the callback to be called with updates

Return type:

None

cleanup()

Things to do before this object is garbage collected.

Return type:

None

start()

Start the updater thread.

Return type:

None

stop()

Stop the updater thread.

Return type:

None

class FirmwareThresholds(_fpga1_warning_threshold='Undefined', _fpga1_alarm_threshold='Undefined', _fpga2_warning_threshold='Undefined', _fpga2_alarm_threshold='Undefined', _board_warning_threshold='Undefined', _board_alarm_threshold='Undefined', _MGT_AVCC_min_alarm_threshold='Undefined', _MGT_AVCC_max_alarm_threshold='Undefined', _MGT_AVTT_min_alarm_threshold='Undefined', _MGT_AVTT_max_alarm_threshold='Undefined', _SW_AVDD1_min_alarm_threshold='Undefined', _SW_AVDD1_max_alarm_threshold='Undefined', _SW_AVDD2_min_alarm_threshold='Undefined', _SW_AVDD2_max_alarm_threshold='Undefined', _AVDD3_min_alarm_threshold='Undefined', _AVDD3_max_alarm_threshold='Undefined', _MAN_1V2_min_alarm_threshold='Undefined', _MAN_1V2_max_alarm_threshold='Undefined', _DDR0_VREF_min_alarm_threshold='Undefined', _DDR0_VREF_max_alarm_threshold='Undefined', _DDR1_VREF_min_alarm_threshold='Undefined', _DDR1_VREF_max_alarm_threshold='Undefined', _VM_DRVDD_min_alarm_threshold='Undefined', _VM_DRVDD_max_alarm_threshold='Undefined', _VIN_min_alarm_threshold='Undefined', _VIN_max_alarm_threshold='Undefined', _MON_3V3_min_alarm_threshold='Undefined', _MON_3V3_max_alarm_threshold='Undefined', _MON_1V8_min_alarm_threshold='Undefined', _MON_1V8_max_alarm_threshold='Undefined', _MON_5V0_min_alarm_threshold='Undefined', _MON_5V0_max_alarm_threshold='Undefined', _FE0_mVA_min_alarm_threshold='Undefined', _FE0_mVA_max_alarm_threshold='Undefined', _FE1_mVA_min_alarm_threshold='Undefined', _FE1_mVA_max_alarm_threshold='Undefined')

Dataclass containing firmware thresholds for the TPM.

__init__(_fpga1_warning_threshold='Undefined', _fpga1_alarm_threshold='Undefined', _fpga2_warning_threshold='Undefined', _fpga2_alarm_threshold='Undefined', _board_warning_threshold='Undefined', _board_alarm_threshold='Undefined', _MGT_AVCC_min_alarm_threshold='Undefined', _MGT_AVCC_max_alarm_threshold='Undefined', _MGT_AVTT_min_alarm_threshold='Undefined', _MGT_AVTT_max_alarm_threshold='Undefined', _SW_AVDD1_min_alarm_threshold='Undefined', _SW_AVDD1_max_alarm_threshold='Undefined', _SW_AVDD2_min_alarm_threshold='Undefined', _SW_AVDD2_max_alarm_threshold='Undefined', _AVDD3_min_alarm_threshold='Undefined', _AVDD3_max_alarm_threshold='Undefined', _MAN_1V2_min_alarm_threshold='Undefined', _MAN_1V2_max_alarm_threshold='Undefined', _DDR0_VREF_min_alarm_threshold='Undefined', _DDR0_VREF_max_alarm_threshold='Undefined', _DDR1_VREF_min_alarm_threshold='Undefined', _DDR1_VREF_max_alarm_threshold='Undefined', _VM_DRVDD_min_alarm_threshold='Undefined', _VM_DRVDD_max_alarm_threshold='Undefined', _VIN_min_alarm_threshold='Undefined', _VIN_max_alarm_threshold='Undefined', _MON_3V3_min_alarm_threshold='Undefined', _MON_3V3_max_alarm_threshold='Undefined', _MON_1V8_min_alarm_threshold='Undefined', _MON_1V8_max_alarm_threshold='Undefined', _MON_5V0_min_alarm_threshold='Undefined', _MON_5V0_max_alarm_threshold='Undefined', _FE0_mVA_min_alarm_threshold='Undefined', _FE0_mVA_max_alarm_threshold='Undefined', _FE1_mVA_min_alarm_threshold='Undefined', _FE1_mVA_max_alarm_threshold='Undefined')
to_device_property_dict()

Return dict of all thresholds with current values.

Return type:

dict

Returns:

a dictionary with structure required for loading into database

to_device_property_keys_only()

Return dict of all threshold keys with None values.

Return type:

dict

Returns:

a dictionary with structure required for reading from the database.

update_from_dict(thresholds, logger)

Update FirmwareThresholds properties from a Tango-style dict.

The tango style dict is dict<str, dict<str, seq<str>>>

Example input:

{‘fpga1_alarm_threshold’: [‘80’], ‘VIN_max_alarm_threshold’: [‘5.0’]}

Parameters:
  • thresholds (dict[str, StdStringVector]) – the tango dictionary.

  • logger (Optional[Logger]) – an optional logger for information.

Raises:

ValueError – In the case value was not Undefined or a numeric value.

Return type:

None

class FirmwareThresholdsDbAdapter(device_name, thresholds, db_connection=None, logger=None)

Tango DB interface for reading/writing FirmwareThresholds.

__init__(device_name, thresholds, db_connection=None, logger=None)

Initialise an interface to database.

Parameters:
  • device_name (str) – the trl of the device these thresholds belong to.

  • thresholds (FirmwareThresholds) – A class containing the FirmwareThresholds.

  • db_connection (Database | None) – An optional database connection to inject for testing.

  • logger (logging.Logger | None) – an optional logger for information.

resync_with_db()

Resync class with db values.

Return type:

None

write_threshold_to_db()

Put thresholds into database.

Return type:

None

class MccsTile(*args, **kwargs)

An implementation of a Tile Tango device for MCCS.

ApplyCalibration(switch_time)

Load the calibration coefficients at the specified time delay.

Parameters:

switch_time (str) – switch time, in ISO formatted time

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("ApplyCalibration", "")
ApplyPointingDelays(start_time)

Apply the pointing delays at the specified time delay.

Parameters:

start_time (str) – time for applying the delays (default = 0)

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("ApplyPointingDelays", "")
BeamformerRunningForChannels(channel_groups=None)

Check whether the beamformer is running for the given channel groups.

A json dictionary with optional keywords:

Parameters:

channel_groups (Optional[list[int]]) – (list) List of channel groups

Return type:

bool

Returns:

Whether the beamformer is running

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"channel_groups": [0,1,4,5]}
>>> jstr = json.dumps(dict)
>>> running = dp.command_inout("BeamformerRunningForChannels", jstr)
ClearBroadbandRfi()

Clear all RFI counts registers.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

Configure(antenna_ids=None, fixed_delays=None)

Configure the tile device attributes.

A json string with the keywords:

Parameters:
  • antenna_ids (Optional[list[int]]) – list of antenna ids to configure, indexed from 1. If not provided, all antennas will be configured.

  • fixed_delays (Optional[list[float]]) – list of fixed delay values to apply to each channel If not provided, no static delays will be applied.

Return type:

None

Configure40GCore(core_id=0, arp_table_entry=0, source_mac=None, source_ip=None, source_port=None, destination_ip=None, destination_port=None, rx_port_filter=None, netmask=None, gateway_ip=None)

Configure 40g core_id with specified parameters.

A json dictionary with only optional keywords:

Parameters:
  • core_id (int) – (int) core id

  • arp_table_entry (int) – (int) ARP table entry ID

  • source_mac (Optional[int]) – (int) mac address

  • source_ip (Optional[str]) – (string) IP dot notation.

  • source_port (Optional[int]) – (int) source port

  • destination_ip (Optional[str]) – (string) IP dot notation

  • destination_port (Optional[int]) – (int) destination port

  • rx_port_filter (Optional[int]) – (int) Filter for incoming packets

  • netmask (Optional[str]) – (string) 40g (science data) subnet mask

  • gateway_ip (Optional[str]) – (string) IP address of 40g (science) subnet gateway

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"core_id":2, "arp_table_entry":0, "source_mac":0x62000a0a01c9,
            "source_ip":"10.0.99.3", "source_port":4000,
            "destination_ip":"10.0.99.3", "destination_port":5000}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("Configure40GCore", jstr)
ConfigureIntegratedBeamData(integration_time=0.5, first_channel=0, last_channel=191)

Configure the transmission of integrated beam data.

Using the provided integration time, the first channel and the last channel. The data are sent continuously until the StopIntegratedData command is run.

A json dictionary with optional keywords:

Parameters:
  • integration_time (float) – (float) in seconds (default = 0.5)

  • first_channel (int) – (int) default 0

  • last_channel (int) – (int) default 191

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"integration_time": 0.2, "first_channel":0, "last_channel": 191}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("ConfigureIntegratedBeamData", jstr)
ConfigureIntegratedChannelData(integration_time=0.5, first_channel=0, last_channel=511)

Configure and start the transmission of integrated channel data.

Using the provided integration time, first channel and last channel. Data are sent continuously until the StopIntegratedData command is run.

A json dictionary with optional keywords:

Parameters:
  • integration_time (float) – (float) in seconds (default = 0.5)

  • first_channel (int) – (int) default 0

  • last_channel (int) – (int) default 511

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"integration_time": 0.2, "first_channel":0, "last_channel": 191}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("ConfigureIntegratedChannelData", jstr)
ConfigurePatternGenerator(stage, pattern, adders, start=False, shift=0, zero=0, ramp1=None, ramp2=None)

Set the test pattern generator using the provided configuration.

A JSON dictionary with the following keywords:

Parameters:
  • stage (str) – The stage in the signal chain where the pattern is injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of the channelizer), or ‘beamf’ (output of the tile beamformer).

  • pattern (list[int]) – The data pattern in time order. Must be an array of length 1 to 1024. Represents values in time order, not for antennas or polarizations.

  • adders (list[int]) – A list of 32 integers that expands the pattern to cover 16 antennas and 2 polarizations. The adders map the pattern to hardware signals.

  • start (bool) – Boolean flag to indicate whether to start the pattern immediately. If False, the pattern can be started manually later.

  • shift (int) – Optional bit shift (divides by 2^shift). Must not be used in ‘beamf’ stage, where it is always overridden to 4.

  • zero (int) – Integer (0-65535) used as a mask to disable the pattern on specific antennas and polarizations. Applied to both FPGAs, supports up to 8 antennas and 2 polarizations.

  • ramp1 (Optional[dict[str, int]]) – An optional ramp1 applied after pattern. polarisation: The polarisation to apply the ramp for. This must be 0, 1 or -1 to use all stages.

  • ramp2 (Optional[dict[str, int]]) – An optional ramp2 applied after pattern. (note: ramp2 = ramp1 + 1234) polarisation: The polarisation to apply the ramp for. This must be 0, 1 or -1 to use all stages.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> test_pattern = list(range(1024))
>>> for n in range(1024):
        if n % 2 == 0:
            test_pattern[n] = n
        else:
            test_pattern[n] = random.randrange(0, 255, 1)
>>> test_adders = list(range(32))
>>> config = {"stage": "jesd", "pattern": test_pattern, "adders":
              test_adders, "start": True}
>>> jstr = json.dumps(config)
>>> values = dp.command_inout("ConfigurePatternGenerator", jstr)
ConfigureStationBeamformer(start_channel=192, n_channels=8, is_first=False, is_last=False)

Initialise and start the station beamformer.

Initial configuration of the tile-station beamformer. Optionally set the observed region, Default is 6.25 MHz starting at 150 MHz, and set whether the tile is the first or last in the beamformer chain.

A json dictionary with mandatory keywords:

Parameters:
  • start_channel (int) – (int) start channel of the observed region default = 192 (150 MHz)

  • n_channels (int) – (int) is the number of channels in the observed region default = 8 (6.25 MHz)

  • is_first (bool) – (bool) whether the tile is the first one in the station default False

  • is_last (bool) – (bool) whether the tile is the last one in the station default False

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – when the specified observed region is invalid

Example:

>> dp = tango.DeviceProxy(“mccs/tile/01”) >> dict = {“start_channel”:64, “n_channels”:10, “is_first”:True, >> “is_last:True} >> jstr = json.dumps(dict) >> dp.command_inout(“ConfigureStationBeamformer”, jstr)

ConfigureTestGenerator(set_time=None, tone_frequency=None, tone_amplitude=None, tone_2_frequency=None, tone_2_amplitude=None, noise_amplitude=None, pulse_frequency=None, pulse_amplitude=None, adc_channels=None, **kwargs)

Set the test signal generator.

A json dictionary with keywords:

Parameters:
  • tone_frequency (Optional[float]) – first tone frequency, in Hz. The frequency is rounded to the resolution of the generator. If this is not specified, the tone generator is disabled.

  • tone_amplitude (Optional[float]) – peak tone amplitude, normalized to 31.875 ADC units. The amplitude is rounded to 1/8 ADC unit. Default is 1.0. A value of -1.0 keeps the previously set value.

  • tone_2_frequency (Optional[float]) – frequency for the second tone. Same as ToneFrequency.

  • tone_2_amplitude (Optional[float]) – peak tone amplitude for the second tone. Same as ToneAmplitude.

  • noise_amplitude (Optional[float]) – RMS amplitude of the pseudorandom Gaussian white noise, normalized to 26.03 ADC units.

  • pulse_frequency (Optional[int]) – frequency of the periodic pulse. A code in the range 0 to 7, corresponding to (16, 12, 8, 6, 4, 3, 2) times the ADC frame frequency.

  • pulse_amplitude (Optional[float]) – peak amplitude of the periodic pulse, normalized to 127 ADC units. Default is 1.0. A value of -1.0 keeps the previously set value.

  • set_time (Optional[str]) – time at which the generator is set, for synchronization among different TPMs. In UTC ISO format (string)

  • adc_channels (Optional[list[int]]) – list of adc channels which will be substituted with the generated signal. It is a 32 integer, with each bit representing an input channel. Default: all if at least q source is specified, none otherwises.

  • kwargs (Optional[Any]) – optional keyword arguments: Currently supports: delays

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"tone_frequency": 150e6, "tone_amplitude": 0.1,
        "noise_amplitude": 0.9, "pulse_frequency": 7,
        "set_time": "2022-08-09T12:34:56.7Z"}
>>> jstr = json.dumps(dict)
>>> values = dp.command_inout("ConfigureTestGenerator", jstr)
DisableBroadbandRfiBlanking(argin)

Disable broadband RFI blanking on specified antennas.

Parameters:

argin (list[int]) – List of antenna IDs to disable blanking on (0-15).

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

DisableStationBeamFlagging()

Disable station beam flagging.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

TODO THORN-68: Currently we can’t verify if the flag has been set correctly, this functionality will get added later

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("DisableStationBeamFlagging")
DownloadFirmware(argin)

Download the firmware contained in bitfile to all FPGAs on the board.

This should also update the internal register mapping, such that registers become available for use.

Parameters:

argin (str) – can either be the design name returned from GetFirmwareAvailable() command, or a path to a file

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("DownloadFirmware", "/tmp/firmware/bitfile")
EnableBroadbandRfiBlanking(argin)

Enable broadband RFI blanking on specified antennas.

Parameters:

argin (list[int]) – List of antenna IDs to enable blanking on (0-15).

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

EnableStationBeamFlagging()

Enable station beam flagging.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

TODO THORN-68: Currently we can’t verify if the flag has been set correctly, this functionality will get added later

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("EnableStationBeamFlagging")
EvaluateTileProgrammingState()

Re-evaluate the TileProgrammingState.

Evaluate and update the TileProgrammingState. Return True is the re-evaluation returned a different value to the value from automatic detection. (A value of True could signify a race condition, or that there is a bug in the automatic evaluation.)

Return type:

bool

Returns:

True is the re-evaluation of TpmStatus returns a different value.

Get40GCoreConfiguration(core_id=-1, arp_table_entry=0)

Get 40g core configuration for core_id.

This is required to chain up TPMs to form a station.

A json dictionary with optional keywords:

Parameters:
  • core_id (int) – (int) core id

  • arp_table_entry (int) – (int) ARP table entry ID to use

Return type:

str

Returns:

the configuration is a json string describilg a list (possibly empty) Each list entry comprising: core_id, arp_table_entry, source_mac, source_ip, source_port, destination_ip, destination_port, netmask, gateway_ip

Raises:

ValueError – if no configuration is found for the specified core_id and arp_table_entry

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> core_id = 2
>>> arp_table_entry = 0
>>> argout = dp.command_inout(Get40GCoreConfiguration, core_id, arp_table_entry)
>>> params = json.loads(argout)
GetArpTable()

Return a dictionary with populated ARP table for all used cores.

40G interfaces use cores 0 (fpga0) and 1(fpga1) and ARP ID 0 for beamformer, 1 for LMC. 10G interfaces use cores 0,1 (fpga0) and 4,5 (fpga1) for beamforming, and 2, 6 for LMC with only one ARP.

Return type:

str

Returns:

a JSON-encoded dictionary of coreId and populated arpID table

Example:

>>> argout = dp.command_inout("GetArpTable")
>>> dict = json.loads(argout)
>>>    {
>>>    "core_id0": [0, 1],
>>>    "core_id1": [0],
>>>    "core_id3": [],
>>>    }
GetFirmwareAvailable()

Get available firmware.

Return a dictionary containing the following information for each firmware stored on the board (such as in Flash memory).

For each firmware, a dictionary containing the following keys with their respective values should be provided: ‘design’, which is a textual name for the firmware, ‘major’, which is the major version number, and ‘minor’.

Return type:

str

Returns:

a JSON-encoded dictionary of firmware details

Example:
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> jstr = dp.command_inout("GetFirmwareAvailable")
>>> dict = json.load(jstr)
{
"firmware1": {"design": "model1", "major": 2, "minor": 3},
"firmware2": {"design": "model2", "major": 3, "minor": 7},
"firmware3": {"design": "model3", "major": 2, "minor": 6},
}
GetRegisterList()

Return a list of descriptions of the exposed firmware (and CPLD) registers.

Return type:

list[str]

Returns:

a list of register names

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> reglist = dp.command_inout("GetRegisterList")
I2C_access_alm()

Return the I2C alarm reading.

0 -> OK 1 -> WARN 2 -> ALARM

Return type:

Optional[int]

Returns:

The alarm state for I2C.

Initialise()

Perform all required initialisation.

(switches on on-board devices, locks PLL, performs synchronisation and other operations required to start configuring the signal processing functions of the firmware, such as channelisation and beamforming)

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("Initialise")
LoadCalibrationCoefficients(argin)

Load the calibration coefficients, but does not apply them.

This is performed by apply_calibration. The calibration coefficients may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay.

Parameters:

argin (list[float]) – list comprises:

  • antenna - (int) is the antenna to which the coefficients will be applied.

  • calibration_coefficients - [array] a bidimensional complex array comprising

    calibration_coefficients[channel, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) being the normal, expected response for an ideal antenna.

    • channel - (int) channel is the index specifying the channels at the

      beamformer output, i.e. considering only those channels actually processed and beam assignments.

    • polarization index ranges from 0 to 3.

      • 0: X polarization direct element

      • 1: X->Y polarization cross element

      • 2: Y->X polarization cross element

      • 3: Y polarization direct element

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – when the inputs are invalid

Example:

>>> antenna = 2
>>> complex_coefficients = [[complex(3.4, 1.2), complex(2.3, 4.1),
>>>            complex(4.6, 8.2), complex(6.8, 2.4)]]*5
>>> inp = list(itertools.chain.from_iterable(complex_coefficients))
>>> out = ([v.real, v.imag] for v in inp]
>>> coefficients = list(itertools.chain.from_iterable(out))
>>> coefficients.insert(0, float(antenna))
>>> input = list(itertools.chain.from_iterable(coefficients))
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("LoadCalibrationCoefficients", input)
LoadCalibrationCoefficientsForChannels(argin)

Load the calibration coefficients, but does not apply them.

This is performed by apply_calibration. The calibration coefficients may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay.

Parameters:

argin (list[float]) – list comprises:

  • start_channel - (int) is the first channel to which the coefficients

    will be applied.

  • calibration_coefficients - [array] a tridimensional complex array comprising

    calibration_coefficients[channel, antenna, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) being the normal, expected response for an ideal antenna.

    • channel - (int) channel is the index specifying the channels at the

      beamformer output, i.e. considering only those channels actually processed and beam assignments.

    • antenna - index ranging 0 to 16, for the 16 antennas managed by the tile

    • polarization index ranges from 0 to 3.

      • 0: X polarization direct element

      • 1: X->Y polarization cross element

      • 2: Y->X polarization cross element

      • 3: Y polarization direct element

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – when the inputs are invalid

Example:

>>> start_channel = 2
>>> complex_coefficients =[[[complex(3.4, 1.2), complex(2.3, 4.1),
>>>            complex(4.6, 8.2), complex(6.8, 2.4)]]*16]*4
>>> inp = list(itertools.chain.from_iterable(complex_coefficients))
>>> out = ([v.real, v.imag] for v in inp]
>>> coefficients = list(itertools.chain.from_iterable(out))
>>> coefficients.insert(0, float(start_channel))
>>> input = list(itertools.chain.from_iterable(coefficients))
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("LoadCalibrationCoefficientsForChannels", input)
LoadPointingDelays(argin)

Specify the delay in seconds and the delay rate in seconds/second.

The delay_array specifies the delay and delay rate for each antenna. beam_index specifies which beam is desired (range 0-7)

Parameters:

argin (list[float]) – An array containing: beam index, the delay in seconds and the delay rate in seconds/second, for each antenna.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – If the number of parameters is insufficient or if the beam index is invalid.

Example:

>>> # example delays: 16 values from -2 to +2 ns, rates = 0
>>> delays = [step * 0.25e-9 for step in list(range(-8, 8))]
>>> rates = [0.0]*16
>>> beam = 0.0
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> arg = [beam]
>>> for i in range(16):
>>>   arg.append(delays[i])
>>>   arg.append(rates[i])
>>> dp.command_inout("LoadPointingDelays", arg)
MCU_wd()

Return the MCUwd alarm reading.

0 -> OK 1 -> WARN 2 -> ALARM

Return type:

Optional[int]

Returns:

The alarm state for MCUwd.

MaxBroadbandRfi(argin)

Get max of RFI counts of specified antennas.

This returns the RFI count of the antenna with the maximum RFI count.

Parameters:

argin (list[int]) – list antennas whose RFI counters to read

Returns:

Maximum RFI counts

Return type:

int

Raises:

ValueError – if input arguments are invalid

ReadAddress(argin)

Read n 32-bit values from address.

Parameters:

argin (list[int]) – [0] = address to read from [1] = number of values to read, default 1

Return type:

list[int]

Returns:

list of values

Raises:

ValueError – if no address is provided, or if more than 2 parameters are provided

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> reglist = dp.command_inout("ReadAddress", [address, nvalues])
ReadAntennaBuffer()

Read the data from the antenna buffer.

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

ReadBroadbandRfi(argin)

Read out the broadband RFI counters for the specified antennas.

Parameters:

argin (list[int]) – list antennas of which RFI counters to read

Return type:

list[int]

Returns:

RFI counters per pol flattened as a 1D list

Raises:

ValueError – if input arguments are invalid

ReadRegister(register_name)

Return the value(s) of the specified register.

Parameters:

register_name (str) – full hyerarchic register name

Return type:

list[int]

Returns:

a list of register values

Example:

>>> dp = tango.DeviceProxy("fpga1./tile/01")
>>> values = dp.command_inout("ReadRegister", "test-reg1")
SEM_wd()

Return the SEMwd alarm reading.

0 -> OK 1 -> WARN 2 -> ALARM

Return type:

Optional[int]

Returns:

The alarm state for SEMwd.

SendDataSamples(**kwargs)

Transmit a snapshot containing raw antenna data.

Parameters:

kwargs (Optional[Any]) – json dictionary with optional keywords:

  • data_type - type of snapshot data (mandatory): “raw”, “channel”,

    “channel_continuous”, “narrowband”, “beam”

  • start_time - Time (UTC string) to start sending data. Default immediately

  • seconds - (float) Delay if timestamp is not specified. Default 0.2 seconds

Depending on the data type: raw:

  • sync: bool: send synchronised samples for all antennas, vs. round robin

    larger snapshot from each antenna

channel:

  • n_samples: Number of samples per channel, default 1024

  • first_channel - (int) first channel to send, default 0

  • last_channel - (int) last channel to send, default 511

channel_continuous

  • channel_id - (int) channel_id (Mandatory)

  • n_samples - (int) number of samples to send per packet, default 128

narrowband:

  • frequency - (int) Sky frequency for band centre, in Hz (Mandatory)

  • round_bits - (int) Specify how many bits to round

  • n_samples - (int) number of spectra to send

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if the provided arguments are invalid

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"data_type": "raw", "Sync":True, "Seconds": 0.2}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("SendDataSamples", jstr)
SetAttributeThresholds(argin)

Set the ALARM and WARNING thresholds on attributes.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing ResultCode and a message.

Example:
>>> thresholds = {"boardTemperature" : {
    >>>         "max_alarm": "79"
    >>>         "min_alarm": "25"
    >>>         "max_warning": "74"
    >>>         "min_warning": "27"
    >>>         },
    >>>     }
>>> tile_proxy.SetAttributeThresholds(json.dumps(thresholds))
Parameters:

argin (str) – a serialised dictionary containing attribute names and threshold limits.

SetBeamFormerRegions(argin)

Set the frequency regions which are going to be beamformed into each beam.

region_array is defined as a flattened 2D array, for a maximum of 48 regions. Total number of channels must be <= 384.

Parameters:

argin (list[int]) – list of regions. Each region comprises:

  • start_channel - (int) region starting channel, must be even in range 0 to 510

  • num_channels - (int) size of the region, must be a multiple of 8

  • beam_index - (int) beam used for this region with range 0 to 47

  • subarray_id - (int) Subarray

  • subarray_logical_channel - (int) logical channel # in the subarray

  • subarray_beam_id - (int) ID of the subarray beam

  • substation_id - (int) Substation

  • aperture_id: ID of the aperture (station*100+substation?)

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – when the inputs are invalid

Example:

>>> regions = [[4, 24, 0, 0, 0, 3, 1, 101], [26, 40, 1, 0, 24, 4, 2, 102]]
>>> input = list(itertools.chain.from_iterable(regions))
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("SetBeamFormerRegions", input)
SetBroadbandRfiFactor(rfi_factor)

Set the RFI factor for broadband RFI detection.

The higher the RFI factor, the less RFI is detected/flagged. This is because data is flagged if the short term power is greater than the long term power * RFI factor * 32/27

Parameters:

rfi_factor (float) – the sensitivity value for the RFI detection

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

SetCspDownload(destination_ip_1, destination_ip_2, is_last, destination_port=None, source_port=None, netmask=None, gateway=None)

Set CSP Destination per tile.

A json dictionary with optional keywords:

Parameters:
  • source_port (Optional[int]) – Source port

  • destination_ip_1 (str) – Destination IP FPGA1

  • destination_ip_2 (str) – Destination IP FPGA2

  • destination_port (Optional[int]) – Destination port

  • is_last (bool) – True for last tile in beamforming chain

  • netmask (Optional[str]) – Netmask

  • gateway (Optional[str]) – Gateway IP

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"source_port": 4661, "destination_ip_1": "10.0.10.2",
            "destination_ip_2": "10.0.10.3", "destination_port": 4660,
            "is_last": False, "netmask": "255.255.255.0",
            "gateway": "10.0.10.1"}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("SetCspDownload", jstr)
SetLmcDownload(mode, payload_length=1024, destination_ip='10.0.10.1', source_port=61648, destination_port=4660, netmask_40g=None, gateway_40g=None)

Specify whether control data will be transmitted over 1G or 40G networks.

A json dictionary with optional keywords:

Parameters:
  • mode (str) – (string) ‘1G’ or ‘10G’ (Mandatory) (use ‘10G’ for 40G also)

  • payload_length (int) – (int) SPEAD payload length for channel data

  • destination_ip (str) – (string) Destination IP.

  • source_port (Optional[int]) – (int) Source port for integrated data streams

  • destination_port (Optional[int]) – (int) Destination port for integrated data streams

  • netmask_40g (Optional[str]) – (string) 40g (science data) subnet mask

  • gateway_40g (Optional[str]) – (string) IP address of 40g (science) subnet gateway

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>> dp = tango.DeviceProxy(“mccs/tile/01”) >> dict = {“mode”: “1G”, “payload_length”: 4, “destination_ip”: “10.0.1.23”} >> jstr = json.dumps(dict) >> dp.command_inout(“SetLmcDownload”, jstr)

SetLmcIntegratedDownload(mode, channel_payload_length=1024, beam_payload_length=1024, destination_ip='10.0.10.1', source_port=61648, destination_port=4660, netmask_40g=None, gateway_40g=None)

Configure link and size of control data.

A json dictionary with optional keywords:

Parameters:
  • mode (str) – (string) ‘1G’ or ‘10G’ (Mandatory)

  • channel_payload_length (int) – (int) SPEAD payload length for integrated channel data

  • beam_payload_length (int) – (int) SPEAD payload length for integrated beam data

  • destination_ip (str) – (string) Destination IP

  • source_port (int) – (int) Source port for integrated data streams

  • destination_port (int) – (int) Destination port for integrated data streams

  • netmask_40g (Optional[str]) – (string) 40g (science data) subnet mask

  • gateway_40g (Optional[str]) – (string) IP address of 40g (science) subnet gateway

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"mode": "1G", "channel_payload_length":4,
            "beam_payload_length": 1024, "destination_ip": "10.0.1.23"}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("SetLmcIntegratedDownload", jstr)
SetUpAntennaBuffer(mode='SDN', ddr_start_byte_address=536870912, max_ddr_byte_size=None)

Set up the antenna buffer.

A json serialised dictionary containing the following keys:

Parameters:
  • mode (str) – network to transmit antenna buffer data to. Options: ‘SDN’ (Science Data Network) and ‘NSDN’ (Non-Science Data Network)

  • ddr_start_byte_address (int) – first address in the DDR for antenna buffer data to be written in (in bytes).

  • max_ddr_byte_size (Optional[int]) – last address for writing antenna buffer data (in bytes). If ‘None’ is chosen, the method will assume the last address to be the final address of the DDR chip

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StartADCs()

Start the ADCs.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StartADCs")
StartAcquisition(start_time=None, global_reference_time=None, delay=2)

Start data acquisition.

json dictionary with optional keywords:

Parameters:
  • start_time (Optional[str]) – the acquisition start time

  • delay (Optional[int]) – a delay to the acquisition start (default 2s)

  • global_reference_time (Optional[str]) – the start time assumed for starting the timestamp

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"start_time":"2021-11-22, "delay":20}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("StartAcquisition", jstr)
StartAntennaBuffer(antennas, start_time=-1, timestamp_capture_duration=75, continuous_mode=False)

Start recording to the antenna buffer.

Parameters:
  • antennas (list[int]) – a list of antenna IDs to be used by the buffer, from 0 to 15. One or two antennas can be used for each FPGA, or 1 to 4 per buffer.

  • start_time (int) – the first time stamp that will be written into the DDR. When set to -1, the buffer will begin writing as soon as possible.

  • timestamp_capture_duration (int) – the capture duration in timestamps. Timestamps are in units of 256 ADC samples (256*1.08us).

  • continuous_mode (bool) – “True” for continous capture. If enabled, time capture durations is ignored

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StartBeamformer(start_time=None, duration=-1, channel_groups=None, scan_id=0)

Start the beamformer at the specified time delay.

Parameters:
  • start_time (Optional[str]) – Start time as ISO formatted time

  • duration (int) – Scan duration, in frames, default “forever”

  • channel_groups (Optional[list[int]]) – Channel groups to be started Command affects only beamformed channels for given groups Default: all channels

  • scan_id (int) – ID of the scan to be started. Default 0

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"StartTime":10, "Duration":20, "channel_groups": [0,1,4] }
>>> jstr = json.dumps(dict)
>>> dp.command_inout("StartBeamformer", jstr)
StartPatternGenerator(stage)

Start the pattern generator at the specified stage.

The stage can be the output of the JESD, the channelizer, or the beamformer.

Parameters:

stage (str) – A positional string argument specifying the stage in the signal chain where the pattern was injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), ‘beamf’ (output of tile beamformer), or ‘all’ for all stages.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StartPatternGenerator", "channel")
StopADCs()

Stop the ADCs.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StopADCs")
StopAntennaBuffer()

Stop writting to the antenna buffer.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StopBeamformer(channel_groups=None)

Stop the beamformer.

Parameters:

channel_groups (Optional[list[int]]) – list of channel groups to be stopped Command affects only beamformed channels for given groups Default: all channels

Return type:

TaskFunctionType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"channel_groups": [0,1,4] }
>>> jstr = json.dumps(dict)
>>> dp.command_inout("StopBeamformer", jstr)
StopDataTransmission()

Stop data transmission from board.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StopDataTransmission")
StopIntegratedData()

Stop the integrated data.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StopPatternGenerator(stage)

Stop the pattern generator at the specified stage.

The stage can be the output of the JESD, the channelizer, or the beamformer.

Parameters:

stage (str) – A positional string argument specifying the stage in the signal chain where the pattern was injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), ‘beamf’ (output of tile beamformer), or ‘all’ for all stages.

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purposes only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StopPatternGenerator", "jesd")
UpdateThresholdCache()

Re-sync the threshold caches.

Re-sync the db thresholds and the firmware thresholds and compare the two, transitioning to fault when they do not match.

NOTE: this command is deprecated, it has been put in to alleviate potential issues with ADR-115 firmware threshold work, in the case of bugs.

Return type:

bool

Returns:

True if the database matches the firmware, False otherwise.

WriteAddress(argin)

Write list of values at address.

Parameters:

argin (list[int]) – [0] = address to write to [1..n] = list of values to write

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if less than 2 parameters are provided

Example:

>>> values = [.....]
>>> address = 0xfff
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("WriteAddress", [address, values])
WriteRegister(register_name, values)

Write values to the specified register.

A json dictionary with mandatory keywords:

Parameters:
  • register_name (str) – (string) register fully qualified string representation

  • values (list[int]) – (list) is a list containing the 32-bit values to write

Return type:

DevVarLongStringArrayType

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"register_name": "test-reg1", "values": values,
            "offset": 0}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("WriteRegister", jstr)
__init__(*args, **kwargs)

Initialise this device object.

Parameters:
  • args (Any) – positional args to the init

  • kwargs (Any) – keyword args to the init

adcHealth()

Read the ADC Health State of the device.

This is an aggregated quantity representing if any of the ADC monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

ADC Health State of the device

adcPower()

Return the RMS power of every ADC signal.

so a TPM processes 16 antennas, this should return 32 RMS value.

Return type:

Optional[list[float]]

Returns:

RMP power of ADC signals

adc_pll_lock_status()

Return the pll status of all 16 ADCs.

The first list represents the pll status of the ADCs in order. The second list represents the lock lost counter for ADCs in order.

Expected: 1 if PLL locked and loss of lock flag is low

(lock has not fallen).

Example:
>>> tile.adc_pll_lock_status
[[1]*16,[1]*16]
Return type:

ndarray

Returns:

the pll status of all ADCs

adc_sysref_counter()

Return the sysref_counter of all ADCs.

Expected: 1 if SYSREF counter is incrementing (SYSREF is present), 0 if not present.

Example:
>>> tile.adc_sysref_counter
[1] * 16
Return type:

str

Returns:

the sysref_counter of all ADCs idx0->ADC0, idx1->ADC1, … idx15->ADC15

adc_sysref_timing_requirements()

Return the sysref_timing_requirements of all ADCs.

Expected: 1 if setup and hold requirements for SYSREF are met, else return 0.

Example:
>>> tile.adc_sysref_timing_requirements
[1] * 16
Return type:

str

Returns:

the sysref_timing_requirements of all ADCs idx0->ADC0, idx1->ADC1, … idx15->ADC15

adcs()

Return the ADC status.

Return type:

str

Returns:

the ADC status

alarmHealth()

Read the alarm Health State of the device.

This is an aggregated quantity representing if any of the alarm monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

alarm Health State of the device

allLiveCal()

Read all live calibration coefficients.

Return type:

str

Returns:

JSON string of all live calibration coefficients.

allStagedCal()

Read all staged calibration coefficients.

Return type:

str

Returns:

JSON string of all staged calibration coefficients.

antennaBufferMode()

Return if antenna buffer is sending over SDN or NSDN.

Return type:

str

Returns:

string of SND or NSDN

antennaIds(antenna_ids)

Set the antenna IDs.

Parameters:

antenna_ids (list[int]) – the antenna IDs

Return type:

None

arp()

Return the arp status.

Expected: True if table entries are valid and resolved.

Example:
>>> tile.arp
True
Return type:

bool

Returns:

the arp status.

beamformerRegions()

Get beamformer region table.

Bidimensional array of one row for each 8 channels, with elements: 0. start physical channel 1. number of channels 2. beam index 3. subarray ID 4. subarray_logical_channel 5. subarray_beam_id 6. substation_id 8. aperture_id

Each row is a set of 8 consecutive elements in the list.

Return type:

Optional[list[int]]

Returns:

list of up to 8*48 values

beamformerTable()

Get beamformer region table.

Bidimensional array of one row for each 8 channels, with elements: 0. start physical channel 1. beam number 2. subarray ID 3. subarray_logical_channel 4. subarray_beam_id 5. substation_id 6. aperture_id

Each row is a set of 7 consecutive elements in the list.

Return type:

Optional[list[int]]

Returns:

list of up to 7*48 values

broadbandRfiFactor()

Get the RFI factor for broadband RFI detection.

Note: Only the RFI factor of FPGA1 is read,

since the same value is loaded into all FPGAs.

Returns:

rfi_factor: the sensitivity value for the RFI detection

Return type:

float

channeliserRounding(truncation)

Set channeliser rounding.

Parameters:

truncation (list[int]) – List with either a single value (applies to all channels) or a list of 512 values. Range 0 (no truncation) to 7

Return type:

None

clockPresent()

Report if 10 MHz clock signal is present at the TPM input.

Raises:

NotImplementedError – not implemented in ska-low-sps-tpm-api.

Return type:

NoReturn

coreCommunicationStatus()

Return status of connection to TPM, CPLD and FPGAs.

Return True if communication is OK else False

Example:

>>> core_communication_status = tile_proxy.coreCommunicationStatus
>>> print(core_communication_status)
>>> {'CPLD': True, 'FPGA0': True, 'FPGA1': True}
Return type:

Optional[str]

Returns:

dictionary containing if the CPLD and FPGAs are connectable or None if not yet polled.

create_component_manager()

Create and return a component manager for this device.

Return type:

TileComponentManager

Returns:

a component manager for this device.

cspDestinationIp()

Return the cspDestinationIp attribute.

Return type:

str

Returns:

the IP address of the csp destination

cspDestinationMac()

Return the cspDestinationMac attribute.

Return type:

str

Returns:

the MAC address of the csp destination

cspDestinationPort()

Return the cspDestinationMac attribute.

Return type:

int

Returns:

the port of the csp destination

cspRounding(rounding)

Set CSP formatter rounding.

Parameters:

rounding (ndarray) – list of up to 384 values in the range 0-7. Current hardware supports only a single value, thus oly 1st value is used

Return type:

None

cspSpeadFormat(spead_format)

Set CSP SPEAD format.

CSP format is: AAVS for the format used in AAVS2-AAVS3 system, using a reference Unix time specified in the header. SKA for the format defined in SPS-CBF ICD, based on TAI2000 epoch.

Parameters:

spead_format (str) – format used in CBF SPEAD header: “AAVS” or “SKA”

Return type:

None

currentFE0()

Handle a Tango attribute read of the FE0 current.

Return type:

Optional[float]

Returns:

FE0 current

currentFE1()

Handle a Tango attribute read of the FE1 current.

Return type:

Optional[float]

Returns:

FE1 current

currentFrame()

Return current frame.

in units of 256 ADC frames (276,48 us) Currently this is required, not sure if it will remain so.

Return type:

int

Returns:

current frame

currentHealth()

Read the current Health State of the device.

This is an aggregated quantity representing if any of the current monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

current Health State of the device

currentTileBeamformerFrame()

Return current frame.

in units of 256 ADC frames (276,48 us) Currently this is required, not sure if it will remain so.

Return type:

int

Returns:

current frame

currents()

Return all the currents values available.

Return type:

str

Returns:

currents available

dataTransmissionMode()

Return if we’re sending data through 1G or 10G port.

Return type:

str

Returns:

Either 1G or 10G string

data_router_discarded_packets()

Return the number of discarded packets.

Expected: 0 if no packets are discarded.

Example:
>>> tile.data_router_discarded_packets
'{"FPGA0": [0, 0], "FPGA1": [0, 0]}'
Return type:

str

Returns:

the linkup loss count per FPGA.

ddr_initialisation()

Return the ddr initialisation status.

Expected: True if DDR interface was successfully initialised.

Example:
>>> tile.ddr_initialisation
True
Return type:

bool

Returns:

the ddr initialisation status.

ddr_write_size()

Return the ddr write size obtained from running start_antenna_buffer.

Example:
>>> tile.ddr_write_size
Return type:

int

Returns:

ddr write size of a frame

delete_device()

Prepare to delete the device.

This method must be done explicitly, else polling threads are not cleaned up after init_device().

Return type:

None

dev_state()

Calculate this device state.

The base device offers some automatic state discovery. However we have some attributes that require explicit analysis as to whether they are in ALARM or not,

e.g. DevBoolean

Return type:

DevState

Returns:

the ‘tango.DevState’ calculated

dsp()

Return the tile beamformer and station beamformer status.

Return type:

str

Returns:

the tile beamformer and station beamformer status

dspHealth()

Read the dsp Health State of the device.

This is an aggregated quantity representing if any of the dsp monitoring points do not have a permitted value. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

dsp Health State of the device

execute_Off()

Turn the device off.

To modify behaviour for this command, modify the do() method of the command class.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

execute_On()

Turn device on.

To modify behaviour for this command, modify the do() method of the command class.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

f2f_hard_errors()

Return the f2f interface hard error count.

Expected: 0 if no hard errors detected in FPGA-to-FPGA interface.

Hard errors require the interface to be reset. This likely means reinitialising the TPM entirely due to the impact on beamformers.

Example:
>>> tile.f2f_hard_errors
0
Return type:

int

Returns:

the f2f interface hard error count.

f2f_pll_counter()

Return the PLL lock loss counter.

Expected: 0 if no PLL lock loss events detected. Increments for each lock loss event.

Example:
>>> tile.f2f_pll_counter
'0'
Return type:

int

Returns:

the PLL lock loss counter.

f2f_pll_lock_status()

Return the PLL lock status.

Expected: 1 if PLL locked, 0 otherwise.

Example:
>>> tile.f2f_pll_lock_status
'1'
Return type:

int

Returns:

the PLL lock status.

f2f_soft_errors()

Return the f2f interface soft error count.

Expected: 0 if no soft errors detected in FPGA-to-FPGA interface.

Example:

tile.f2f_soft_errors 0

Return type:

int

Returns:

the f2f interface soft error count.

faultReport()

Get the fault report.

Return type:

str

Returns:

the fault report.

firmwareCurrentThresholds(value)

Set the Current thresholds in firmware.

Parameters:

value (str) – A json serialised string with the thresholds.

Raises:

ValueError – When only one threshold is defined, you must define min and max.

Return type:

None

firmwareName(value)

Set the firmware name.

Parameters:

value (str) – firmware name

Return type:

None

firmwareTemperatureThresholds(value)

Write the temperature thresholds in firmware.

Parameters:

value (str) – A json serialised string with the thresholds.

Return type:

None

firmwareVersion(value)

Set the firmware version.

Parameters:

value (str) – firmware version

Return type:

None

firmwareVoltageThresholds(value)

Set the voltage thresholds in the firmware.

Parameters:

value (str) – A json serialised string with the thresholds.

Raises:

ValueError – When only one threshold is defined, you must define min and max.

Return type:

None

fortyGPacketCount()

Get 40G packet counts.

The return value depends on how many 40G cores are active. Typically, only one core is active.

Example:

# 0 cores active
{}

# 1 core active
{
    'FPGA0': {
        'rx_received': 2921,
        'rx_forwarded': 0,
        'tx_transmitted': 6973024
    }
}

# 2 cores active
{
    'FPGA0': {
        'rx_received': 3881,
        'rx_forwarded': 0,
        'tx_transmitted': 7321460
    },
    'FPGA1': {
        'rx_received': 1,
        'rx_forwarded': 0,
        'tx_transmitted': 3122
    }
}
Return type:

str

Returns:

Packet counts per active 40G core. Returns an empty dictionary if no 40G cores are active.

fortyGbDestinationIps()

Return the destination IPs for all 40Gb ports on the tile.

Return type:

list[str]

Returns:

IP addresses

fortyGbDestinationPorts()

Return the destination ports for all 40Gb ports on the tile.

Return type:

list[int]

Returns:

ports

fpga0_bip_error_count()

Return the bip error count for FPGA0.

Expected: 0 if no bit-interleaved parity (BIP) errors detected.

Example:
>>> tile.fpga0_bip_error_count
[0, 0, 0, 0]
Return type:

list[int]

Returns:

the bip error count for FPGA0.

fpga0_clock_managers_count()

Return the PLL lock loss counter for C2C, JESD and DSP.

Expected: 0 per interface if no lock loss events.

Example:
>>> tile.fpga0_clock_managers_count
[0, 0, 0]

3 rows → one for each MMCM type: [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”]

Return type:

list[int]

Returns:

the lock loss counter for [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”].

fpga0_clock_managers_status()

Return the PLL lock status C2C, JESD and DSP.

Expected: 1 if MMCM clock locked 0 otherwise

Example:
>>> tile.fpga0_clock_managers_status
[0, 0, 0]

3 rows → one for each MMCM type: [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”]

Return type:

list[int]

Returns:

the clock status for [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”].

fpga0_clocks()

Return the status of clocks for the interfaces of FPGA0.

Expected: 1 per interface if status is OK. 0 if not OK.

Example:
>>> tile.fpga0_clocks
[1, 1, 1]
Return type:

list[int]

Returns:

the status of clocks for the interfaces of FPGA0. [1, 1, 1] == [JESD, DDR, UDP]

fpga0_crc_error_count()

Return the crc error count for FPGA0.

Expected: 0 if no Cyclic Redundancy Check (CRC) errors detected.

Example:
>>> tile.fpga0_crc_error_count
0
Return type:

int

Returns:

the crc error count for FPGA0.

fpga0_data_router_status()

Return the status of the data router.

Expected: 0 if no status OK.

Example:
>>> tile.fpga0_data_router_status
0
Return type:

int

Returns:

the linkup loss count per FPGA.

fpga0_ddr_reset_counter()

Return the ddr reset count.

Expected: 0 if no reset events have occurred.

Example:
>>> tile.fpga0_ddr_reset_counter
0
Return type:

int

Returns:

the ddr reset count.

fpga0_decode_error_count()

Return the decode error count per FPGA.

Expected: 0 if errors have not been detected.

Note: This counter increments when at least one error is detected in a clock cycle.

Example:
>>> tile.fpga0_decode_error_count
[0, 0, 0, 0]
Return type:

list[int]

Returns:

the decode error count per FPGA.

fpga0_lane_error_count()

Return the error count per lane, per core.

Expected: 0 for all lanes.

Example:
>>> tile.fpga0_lane_error_count
[ [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0] ]
Return type:

list[int]

Returns:

the error count per lane, per core [[Core0lanes],[Core1lanes]]

fpga0_linkup_loss_count()

Return the linkup loss count.

Expected: 0 if no link loss events are detected.

Example:
>>> tile.fpga0_linkup_loss_count
0
Return type:

int

Returns:

the linkup loss count.

fpga0_qpll_counter()

Return the QPLL lock loss counter.

Expected: 0 if no lock loss events detected. Increments for each lock loss event.

Example:
>>> tile.fpga0_qpll_counter
0
Return type:

int

Returns:

the QPLL lock loss counter.

fpga0_qpll_status()

Return the QPLL lock status.

Expected: 1 if QPLL locked.

Example:
>>> tile.fpga0_qpll_status
1
Return type:

int

Returns:

the QPLL lock status and lock loss counter.

fpga0_resync_count()

Return the resync count.

Expected: 0 if no resync events have ocurred.

Example:
>>> tile.fpga0_resync_count
0
Return type:

int

Returns:

the resync count

fpga0_station_beamformer_error_count()

Return the station beamformer error count for FPGA0.

Expected: 0 if no parity errors detected.

Example:
>>> tile.fpga0_station_beamformer_error_count
0
Return type:

int

Returns:

the station beamformer error count for FPGA0.

fpga0_station_beamformer_flagged_count()

Return the station beamformer error count for FPGA0.

Note: When station beam flagging is enabled, this returns a count of packets flagged, but when station beam flagging is disabled, this instead returns a count of packets discarded/dropped

Expected: 0 if no parity errors detected.

Example:
>>> tile.fpga0_station_beamformer_flagged_count
0
Return type:

int

Returns:

the station beamformer error count for FPGA0.

fpga1Temperature()

Return the temperature of FPGA 1.

Return type:

Optional[tuple[Optional[float], float, AttrQuality]]

Returns:

the temperature of FPGA 1

fpga1_bip_error_count()

Return the bip error count for FPGA1.

Expected: 0 if no bit-interleaved parity (BIP) errors detected.

Example:
>>> tile.fpga1_bip_error_count
[0, 0, 0, 0]
Return type:

list[int]

Returns:

the bip error count for FPGA1.

fpga1_clock_managers_count()

Return the PLL lock loss counter for C2C, JESD and DSP.

Expected: 0 per interface if no lock loss events.

Example:
>>> tile.fpga1_clock_managers_count
[0, 0, 0]

3 rows → one for each MMCM type: [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”]

Return type:

list[int]

Returns:

the lock loss counter for [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”].

fpga1_clock_managers_status()

Return the PLL lock status for C2C, JESD and DSP.

Expected: 1 if MMCM clock locked 0 otherwise

Example:
>>> tile.fpga1_clock_managers_status
[0, 0, 0]

3 rows → one for each MMCM type: [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”]

Return type:

list[int]

Returns:

the clock status for [“C2C_MMCM”, “JESD_MMCM”, “DSP_MMCM”].

fpga1_clocks()

Return the status of clocks for the interfaces of FPGA1.

Expected: 1 per interface if status is OK. 0 if not OK.

Example:
>>> tile.fpga1_clocks
[1, 1, 1]
Return type:

str

Returns:

the status of clocks for the interfaces of FPGA1. [1, 1, 1] == [JESD, DDR, UDP]

fpga1_crc_error_count()

Return the crc error count for FPGA1.

Expected: 0 if no Cyclic Redundancy Check (CRC) errors detected.

Example:
>>> tile.fpga1_crc_error_count
0
Return type:

int

Returns:

the crc error count for FPGA0.

fpga1_data_router_status()

Return the status of the data router.

Expected: 0 if no status OK.

Example:
>>> tile.fpga1_data_router_status
0
Return type:

int

Returns:

the linkup loss count per FPGA.

fpga1_ddr_reset_counter()

Return the ddr reset count.

Expected: 0 if no reset events have occurred.

Example:
>>> tile.fpga1_ddr_reset_counter
0
Return type:

int

Returns:

the ddr reset count.

fpga1_decode_error_count()

Return the decode error count per FPGA.

Expected: 0 if errors have not been detected.

Note: This counter increments when at least one error is detected in a clock cycle.

Example:
>>> tile.fpga1_decode_error_count
[0, 0, 0, 0]
Return type:

list[int]

Returns:

the decode error count per FPGA.

fpga1_lane_error_count()

Return the error count per lane, per core.

Expected: 0 for all lanes.

Example:
>>> tile.fpga1_lane_error_count
[ [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0] ]
Return type:

list[int]

Returns:

the error count per lane, per core [[Core0],[Core1]]

fpga1_linkup_loss_count()

Return the linkup loss count.

Expected: 0 if no link loss events are detected.

Example:
>>> tile.fpga1_linkup_loss_count
0
Return type:

int

Returns:

the linkup loss count.

fpga1_qpll_counter()

Return the QPLL lock loss counter.

Expected: 0 if no lock loss events detected. Increments for each lock loss event.

Example:
>>> tile.fpga1_qpll_counter
0
Return type:

int

Returns:

the QPLL lock loss counter.

fpga1_qpll_status()

Return the QPLL lock status.

Expected: 1 if QPLL locked.

Example:
>>> tile.fpga1_qpll_status
'1'
Return type:

int

Returns:

the QPLL lock status.

fpga1_resync_count()

Return the resync count.

Expected: 0 if no resync events have ocurred.

Example:
>>> tile.fpga1_resync_count
0
Return type:

int

Returns:

the resync count

fpga1_station_beamformer_error_count()

Return the station beamformer error count for FPGA1.

Expected: 0 if no parity errors detected.

Example:
>>> tile.fpga1_station_beamformer_error_count
0
Return type:

int

Returns:

the station beamformer error count for FPGA1.

fpga1_station_beamformer_flagged_count()

Return the station beamformer error count for FPGA1.

Note: When station beam flagging is enabled, this returns a count of packets flagged, but when station beam flagging is disabled, this instead returns a count of packets discarded/dropped

Expected: 0 if no parity errors detected.

Example:
>>> tile.fpga1_station_beamformer_flagged_count
0
Return type:

int

Returns:

the station beamformer error count for FPGA1.

fpga2Temperature()

Return the temperature of FPGA 2.

Return type:

Optional[tuple[Optional[float], float, AttrQuality]]

Returns:

the temperature of FPGA 2

fpgaFrameTime()

Return the FPGA synchronization timestamp.

Return type:

str

Returns:

the FPGA timestamp, in UTC format

fpgaReferenceTime()

Return the FPGA synchronization timestamp.

Return type:

str

Returns:

the FPGA timestamp, in UTC format

fpgaTime()

Return the FPGA internal time.

Return type:

str

Returns:

the FPGA time, in UTC format

fpgasUnixTime()

Return the time for FPGAs.

Return type:

list[int]

Returns:

the time for FPGAs

globalReferenceTime(reference_time)

Set the global global synchronization timestamp.

Parameters:

reference_time (str) – the synchronization time, in ISO9660 format, or “”

Return type:

None

healthModelParams(argin)

Set the params for health transition rules.

Parameters:

argin (str) – JSON-string of dictionary of health states

Raises:

NotImplementedError – If UseAttributesForHealth if True

Return type:

None

healthReport()

Get the health report.

Return type:

str

Returns:

the health report.

init_device()

Initialise the device.

Raises:

TypeError – when attributes have a converter that is not callable.

Return type:

None

integratedDataTransmissionMode()

Return if we’re sending integrated data through 1G or 10G port.

Return type:

str

Returns:

Either 1G or 10G string

io()

Return a dictionary of I/O interfaces status available.

Return type:

str

Returns:

I/O interfaces status

ioHealth()

Read the io Health State of the device.

This is an aggregated quantity representing if any of the io monitoring points do not have a permitted value. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

io Health State of the device

isBeamformerRunning()

Check if beamformer is running.

Return type:

Optional[bool]

Returns:

whether the beamformer is running

isProgrammed()

Return a flag indicating whether of not the board is programmed.

Return type:

bool

Returns:

whether of not the board is programmed

is_ConfigureTestGenerator_allowed()

Check if command is allowed.

It is allowed only in engineering mode.

Return type:

bool

Returns:

whether the command is allowed

is_engineering()

Return a flag representing whether we are in Engineering mode.

Return type:

bool

Returns:

True if Tile is in Engineering Mode.

is_firmware_threshold_allowed(req_type)

Return a flag representing whether we are allowed to access the attribute.

Parameters:

req_type (AttReqType) – the request type

Return type:

bool

Returns:

True if access is allowed.

lane_status()

Return the lane status.

Expected: True if no errors detected on any lane.

Example:
>>> tile.lane_status
True
Return type:

bool

Returns:

the lane status.

lastPointingDelays()

Return last pointing delays applied to the tile.

Values are initialised to 0.0 if they haven’t been set. These values are in channel order, with each pair corresponding to a delay and delay rate.

Return type:

list[list]

Returns:

last pointing delays applied to the tile.

Return the jesd link status.

Expected: True if link up and synchronised.

Example:
>>> tile.link_status
True
Return type:

bool

Returns:

the link status.

logicalTileId(value)

Set the logicalTileId attribute.

The logical tile id is the id of the tile in the station.

Parameters:

value (int) – the new logical tile id

Return type:

None

notify_emission(signal, value)

Handle signal emissions, pushing tango events and checking alarms.

Overrides the base class to trigger alarm checking for signal-based attributes after their tango events have been pushed.

Parameters:
  • signal (str) – the absolute name of the signal that was emitted

  • value (Any) – the emitted value

Return type:

None

pendingDataRequests()

Check for pending data requests.

Return type:

Optional[bool]

Returns:

whether there are data requests pending

pfbVersion()

Return the version of the polyphase filter firmware.

Return type:

str

Returns:

the version of the polyphase filter firmware

phaseTerminalCount(value)

Set the phase terminal count.

Parameters:

value (int) – the phase terminal count

Return type:

None

pllLocked()

Report if ADC clock PLL is in locked state.

Return type:

Optional[bool]

Returns:

PLL lock state

pointingDelays()

Get the pointing delays for beam 0.

Return type:

list[float]

Returns:

list of pointing delays for beam 0

post_change_event(name, attr_value, attr_time, attr_quality)

Post a Archive and Change TANGO event.

Parameters:
  • name (str) – the name of the TANGO attribute to push

  • attr_value (Any) – The value of the attribute.

  • attr_time (float) – A parameter specifying the time the attribute was updated.

  • attr_quality (AttrQuality) – A paramter specifying the quality factor of the attribute.

Return type:

None

ppsDelay()

Return the delay between PPS and 10 MHz clock.

Return type:

Optional[int]

Returns:

Return the PPS delay in 1.25ns units.

ppsDelayCorrection(pps_delay_correction)

Set a correction to make to the pps delay.

Note: will be applied during next initialisation.

Parameters:

pps_delay_correction (int) – a correction to apply to the pps_delay.

Return type:

None

ppsDrift()

Return the observed drift in the ppsDelay of this Tile.

ppsDrift measures the difference between the TPM’s currently reported ppsDelay and the value it had when the Tile Tango device was initialised. This will initially be zero and then will increase as reported value of ppsDelay changes over time.

Return type:

int

Returns:

Return the pps delay drift in 1.25ns units or None if not initialised

ppsPresent()

Report if PPS signal is present at the TPM input.

Return type:

tuple[Optional[bool], float, AttrQuality]

Returns:

a tuple with attribute_value, time, quality

preaduLevels(levels)

Set attenuator level of preADU channels, one per input channel.

Parameters:

levels (ndarray) – ttenuator level of preADU channels, one per input channel, in dB

Return type:

None

rfiBlankingEnabledAntennas()

Get the list of antennas for broadband RFI blanking is currently enabled.

Returns:

list of antennas with RFI blanking enabled

Return type:

list(int)

rfiCount()

Return the RFI count per antenna/pol.

Return type:

list[list]

Returns:

the RFI count per antenna/pol.

runningBeams()

List running status for each SubarrayBeam.

Return type:

list[bool]

Returns:

list of hardware beam running states

shutdown_on_max_alarm(attr_name)

Turn off TPM when attribute in question is in max_alarm state.

Parameters:

attr_name (str) – the name of the attribute causing the shutdown.

Return type:

None

simulationMode(value)

Set the simulation mode.

Writing this attribute is deliberately unimplemented. The simulation mode should instead be set by setting the device’s SimulationConfig property at launch.

Parameters:

value (SimulationMode) – The simulation mode, as a SimulationMode value

Return type:

None

srcip40gfpga1(argin)

Set source IP for FPGA1.

Parameters:

argin (str) – source IP for FPGA1

Return type:

None

srcip40gfpga2(argin)

Set source IP for FPGA2.

Parameters:

argin (str) – source IP for FPGA2

Return type:

None

staticTimeDelays(delays)

Set static time delay.

Parameters:

delays (list[float]) – Delay in nanoseconds (positive = increase the signal delay) to correct for static delay mismathces, e.g. cable length.

Return type:

None

stationBeamFlagEnabled()

Return True if station beam data flagging is enabled.

Return type:

list[bool]

Returns:

a list of bool values corresponding to the fpgas

stationId(value)

Set the id of the station to which this tile is assigned.

Parameters:

value (int) – the station id

Return type:

None

station_beamformer_status()

Return the status of the station beamformer.

Expected: True if status OK.

Example:
>>> tile.station_beamformer_status
True
Return type:

bool

Returns:

the status of the station beamformer.

sysrefPresent()

Report if SYSREF signal is present at the FPGA.

Raises:

NotImplementedError – not implemented in ska-low-sps-tpm-api.

Return type:

NoReturn

temperatureADC0()

Handle a Tango attribute read of the ADC 0 temperature.

Return type:

Optional[float]

Returns:

ADC 0 temperature

temperatureADC1()

Handle a Tango attribute read of the ADC 1 temperature.

Return type:

Optional[float]

Returns:

ADC 1 temperature

temperatureADC10()

Handle a Tango attribute read of the ADC 10 temperature.

Return type:

Optional[float]

Returns:

ADC 10 temperature

temperatureADC11()

Handle a Tango attribute read of the ADC 11 temperature.

Return type:

Optional[float]

Returns:

ADC 11 temperature

temperatureADC12()

Handle a Tango attribute read of the ADC 12 temperature.

Return type:

Optional[float]

Returns:

ADC 12 temperature

temperatureADC13()

Handle a Tango attribute read of the ADC 13 temperature.

Return type:

Optional[float]

Returns:

ADC 13 temperature

temperatureADC14()

Handle a Tango attribute read of the ADC 14 temperature.

Return type:

Optional[float]

Returns:

ADC 14 temperature

temperatureADC15()

Handle a Tango attribute read of the ADC 15 temperature.

Return type:

Optional[float]

Returns:

ADC 15 temperature

temperatureADC2()

Handle a Tango attribute read of the ADC 2 temperature.

Return type:

Optional[float]

Returns:

ADC 2 temperature

temperatureADC3()

Handle a Tango attribute read of the ADC 3 temperature.

Return type:

Optional[float]

Returns:

ADC 3 temperature

temperatureADC4()

Handle a Tango attribute read of the ADC 4 temperature.

Return type:

Optional[float]

Returns:

ADC 4 temperature

temperatureADC5()

Handle a Tango attribute read of the ADC 5 temperature.

Return type:

Optional[float]

Returns:

ADC 5 temperature

temperatureADC6()

Handle a Tango attribute read of the ADC 6 temperature.

Return type:

Optional[float]

Returns:

ADC 6 temperature

temperatureADC7()

Handle a Tango attribute read of the ADC 7 temperature.

Return type:

Optional[float]

Returns:

ADC 7 temperature

temperatureADC8()

Handle a Tango attribute read of the ADC 8 temperature.

Return type:

Optional[float]

Returns:

ADC 8 temperature

temperatureADC9()

Handle a Tango attribute read of the ADC 9 temperature.

Return type:

Optional[float]

Returns:

ADC 9 temperature

temperatureHealth()

Read the temperature Health State of the device.

This is an aggregated quantity representing if any of the temperature monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

temperature Health State of the device

temperature_alm()

Return the Temperature alarm reading.

0 -> OK 1 -> WARN 2 -> ALARM

Return type:

Optional[int]

Returns:

The alarm state for temperature.

temperatures()

Return all the temperatures values available.

Return type:

str

Returns:

temperatures available

testGeneratorActive()

Report if the test generator is used for some channels.

Return type:

bool

Returns:

test generator status

testMode(value)

Set the test mode.

Writing this attribute is deliberately unimplemented. The test mode should instead be set by setting the device’s TestConfig property at launch.

Parameters:

value (int) – The test mode, as a TestMode value

Return type:

None

tileProgrammingState()

Get the tile programming state.

Return type:

Optional[str]

Returns:

a string describing the programming state of the tile

tile_beamformer_status()

Return the status of the tile beamformer.

Expected: True if status OK.

Example:
>>> tile.tile_beamformer_status
True
Return type:

bool

Returns:

the status of the tile beamformer.

tile_info()

Return all the tile info available.

Example:
>>> tile.tile_info
'{"hardware": {"ip_address_eep": "10.0.10.2",
"netmask_eep": "255.255.255.0", "gateway_eep": "255.255.255.255",
"SN": "0850423050008", "PN": "iTPM_ADU_2.0",
"bios": "v0.6.0 (CPLD_0x23092511-MCU_0xb000011a_0x20230209_0x0)",
"BOARD_MODE": "NO-ADA", "EXT_LABEL": "00291;163-010013;2.0;36240080",
"HARDWARE_REV": "v2.0.1a", "DDR_SIZE_GB": "4"},
"fpga_firmware": {"design": "tpm_test", "build": "2004",
"compile_time": "2024-05-29 02:00:36.158315",
"compile_user": "gitlab-runner (created by john holden)", "compile_host":
"te7nelson linux-4.18.0-553.44.1.el8_10.x86_64-x86_64-with-glibc2.28",
"git_branch": "detached head", "git_commit":
"a22da05fe4cc7078c966 merge branch 'rel-2069-release-v-6-3-0' into 'main'",
"version": "6.3.0"},
"network": {"1g_ip_address": "10.132.0.46",
"1g_mac_address": "fc:0f:e7:e6:43:6c", "1g_netmask": "255.255.255.0",
"1g_gateway": "10.132.0.254", "40g_ip_address_p1": "10.130.0.108",
"40g_mac_address_p1": "62:00:0A:82:00:6C", "40g_gateway_p1": "10.130.0.126",
"40g_netmask_p1": "255.255.255.128", "40g_ip_address_p2": "0.0.0.0",
"40g_mac_address_p2": "02:00:00:00:00:00",
"40g_gateway_p2": "10.130.0.126", "40g_netmask_p2": "255.255.255.128"}}'
Return type:

str

Returns:

info available

timing()

Return a dictionary of the timing signals status.

Return type:

str

Returns:

timing signals status

timingHealth()

Read the timing Health State of the device.

This is an aggregated quantity representing if any of the timing monitoring points do not have a permitted value. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

timing Health State of the device

timing_pll_40g_count()

Return the PLL 40G lock loss counter.

Expected: 0 if PLL 40G has no lock loss events detected. Increments for each lock loss event. These are combined readings for both PLLs within the AD9528.

Example:
>>> tile.timing_pll_40g_count
'0'
Return type:

int

Returns:

the PLL lock loss counter.

timing_pll_40g_lock_status()

Return the PLL 40G lock status.

Expected: 1 if PLL 40G locked.

Example:
>>> tile.timing_pll_40g_lock_status
'1`
Return type:

int

Returns:

the PLL lock status and lock loss counter.

timing_pll_count()

Return the PLL lock loss counter.

Expected: 0 if no lock loss events detected. Increments for each lock loss event. These are combined readings for both PLLs within the AD9528.

Example:
>>> tile.timing_pll_count
'0'
Return type:

int

Returns:

the lock loss counter.

timing_pll_lock_status()

Return the PLL lock status and lock loss counter.

Expected: 1 if PLL locked, 0 otherwise.

Example:
>>> tile.timing_pll_lock_status
1
Return type:

int

Returns:

the PLL lock status.

udp_status()

Return the UDP status.

Expected: True if virtual lanes aligned and no BIP or CRC errors.

Example:
>>> tile.udp_status
False
Return type:

bool

Returns:

the UDP status.

unpack_alarms(alarms, mark_invalid=False)

Unpack a dictionary of alarms.

Parameters:
  • alarms (dict[str, int]) – The alarms we want to unpack.

  • mark_invalid (bool) – mark attribute as invalid.

Return type:

None

update_tile_health_attributes(mark_invalid=False)

Update TANGO attributes from the tile health structure dictionary.

Parameters:

mark_invalid (bool) – True when values being reported are not valid.

Return type:

None

useAttributesForHealth()

Return if adr115 is in use.

Return type:

bool

Returns:

True if attributes quality is being evaluated in health.

voltageAVDD3()

Handle a Tango attribute read of the Analog 2.5 V voltage.

Return type:

Optional[float]

Returns:

Analog 2.5 V voltage

voltageHealth()

Read the voltage Health State of the device.

This is an aggregated quantity representing if any of the voltage monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

voltage Health State of the device

voltageMGT_AVCC()

Handle a Tango attribute read of the FPGA MGT AV voltage.

Return type:

Optional[float]

Returns:

FPGA MGT AV voltage

voltageMGT_AVTT()

Handle a Tango attribute read of the FPGA MGT AVTT voltage.

Return type:

Optional[float]

Returns:

FPGA MGT AVTT voltage

voltageMan1V2()

Handle a Tango attribute read of the Management 1.2V voltage.

Return type:

Optional[float]

Returns:

Management 1.2V voltage

voltageMon1V8()

Return the Management 1.8 V supply of the TPM.

Note: sensor values have a measurement bias.

Return type:

Optional[float]

Returns:

Management supply of the TPM

voltageMon3V3()

Return the Management 3.3 V supply of the TPM.

Note: sensor values have a measurement bias.

Return type:

Optional[float]

Returns:

Management supply of the TPM

voltageMon5V0()

Return the Management 5V supply of the TPM.

Return type:

Optional[float]

Returns:

Management supply of the TPM

voltageSW_AVDD1()

Handle a Tango attribute read of the SW Analog 1.1 V voltage.

Return type:

Optional[float]

Returns:

SW Analog 1.1 V voltage

voltageSW_AVDD2()

Handle a Tango attribute read of the SW Analog 2.3 V voltage.

Return type:

Optional[float]

Returns:

SW Analog 2.3 V voltage

voltageVIN()

Handle a Tango attribute read of the input supply voltage.

Return type:

Optional[float]

Returns:

input supply voltage

voltageVM_AGP0()

Handle a Tango attribute read of the AD AGP group 0 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 0 voltage

voltageVM_AGP1()

Handle a Tango attribute read of the AD AGP group 1 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 1 voltage

voltageVM_AGP2()

Handle a Tango attribute read of the AD AGP group 2 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 2 voltage

voltageVM_AGP3()

Handle a Tango attribute read of the AD AGP group 3 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 3 voltage

voltageVM_AGP4()

Handle a Tango attribute read of the AD AGP group 4 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 4 voltage

voltageVM_AGP5()

Handle a Tango attribute read of the AD AGP group 5 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 5 voltage

voltageVM_AGP6()

Handle a Tango attribute read of the AD AGP group 6 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 6 voltage

voltageVM_AGP7()

Handle a Tango attribute read of the AD AGP group 7 Voltage Monitor.

Return type:

Optional[float]

Returns:

AD AGP group 7 voltage

voltageVM_CLK0B()

Handle a Tango attribute read of Clock Buffer0 3.3V Voltage Monitor.

Return type:

Optional[float]

Returns:

Clock Buffer0 3.3V voltage

voltageVM_CLK1B()

Handle a Tango attribute read of Clock Buffer1 3.3V Voltage Monitor.

Return type:

Optional[float]

Returns:

Clock Buffer1 3.3V voltage

voltageVM_DDR0_VTT()

Handle a Tango attribute read of DDR FPGA0 Vtt Voltage Monitor.

Return type:

Optional[float]

Returns:

DDR FPGA0 Vtt voltage

voltageVM_DDR1_VDD()

Handle a Tango attribute read of DDR4 Voltage Monitor.

Return type:

Optional[float]

Returns:

DDR4 voltage

voltageVM_DDR1_VTT()

Handle a Tango attribute read of DDR FPGA1 Vtt Voltage Monitor.

Return type:

Optional[float]

Returns:

DDR FPGA1 Vtt voltage

voltageVM_DRVDD()

Handle a Tango attribute read of the SW DRVDD 1.8V voltage.

Return type:

Optional[float]

Returns:

SW DRVDD 1.8V voltage

voltageVM_DVDD()

Handle a Tango attribute read of AD DVDD Voltage Monitor.

Return type:

Optional[float]

Returns:

AD DVDD voltage

voltageVM_FE0()

Handle a Tango attribute read of FE0 Voltage Monitor.

Note: PreADU must be on.

Return type:

Optional[float]

Returns:

FE0 voltage

voltageVM_FE1()

Handle a Tango attribute read of FE1 Voltage Monitor.

Note: PreADU must be on.

Return type:

Optional[float]

Returns:

FE1 voltage

voltageVM_MGT0_AUX()

Handle a Tango attribute read of FPGA MGT0 AUX Voltage Monitor.

Return type:

Optional[float]

Returns:

FPGA MGT0 AUX voltage

voltageVM_MGT1_AUX()

Handle a Tango attribute read of FPGA MGT1 AUX Voltage Monitor.

Return type:

Optional[float]

Returns:

FPGA MGT1 AUX voltage

voltageVM_PLL()

Handle a Tango attribute read of the ANALOG PLL Voltage Monitor.

Return type:

Optional[float]

Returns:

ANALOG PLL voltage

voltageVM_SW_AMP()

Handle a Tango attribute read of VGA DC-DC Voltage Monitor.

Return type:

Optional[float]

Returns:

VGA DC-DC voltage

voltageVrefDDR0()

Handle a Tango attribute read of the Vref voltage for DDR0.

Return type:

Optional[float]

Returns:

Vref voltage for DDR0

voltageVrefDDR1()

Handle a Tango attribute read of the Vref voltage for DDR1.

Return type:

Optional[float]

Returns:

Vref voltage for DDR1

voltage_alm()

Return the Voltage alarm reading.

0 -> OK 1 -> WARN 2 -> ALARM

Return type:

Optional[int]

Returns:

The alarm state for voltage.

voltages()

Return all the voltage values available.

Return type:

str

Returns:

voltages available

class MockTpm(logger)

Simulator for a ska_low_sps_tpm_api.boards::Tpm class.

__init__(logger)

Initialise the MockTPM.

Parameters:

logger (Logger) – a logger for this simulator to use

find_register(string, display=False, info=False)

Find a item in a dictionary.

This is mocking the reading of a register for the purpose of testing TPM_driver

Parameters:
  • string (str) – Regular expression to search against

  • display (bool | None) – True to output result to console

  • info (bool | None) – for linter.

Return type:

List[RegisterInfo | None]

Returns:

registers found at address.

get_40g_core_configuration(core_id=-1, arp_table_entry=0)

Return a 40G configuration.

Parameters:
  • core_id (int) – id of the core for which a configuration is to be returned. Defaults to -1, in which case all core configurations are returned, defaults to -1

  • arp_table_entry (int) – ARP table entry to use

Return type:

dict[str, Any]

Returns:

core configuration or list of core configurations or none

get_bios()

Return a mock bios string.

Return type:

str

Returns:

A string bios version.

get_board_info()

Retrieve TPM board information.

Return type:

dict[str, Any]

Returns:

A dictionary of board info.

get_gateway()

Return a mock gateway string.

Return type:

str

Returns:

A string gateway.

get_gateway_eep()

Return a mock gateway_eep string.

Return type:

str

Returns:

A string gateway_eep.

get_ip()

Return a mock ip string.

Return type:

str

Returns:

A string ip.

get_ip_eep()

Return a mock ip_eep string.

Return type:

str

Returns:

A string ip_eep.

get_mac()

Return a mock mac address string.

Return type:

str

Returns:

A string mac address.

get_netmask()

Return a mock netmask string.

Return type:

str

Returns:

A string netmask.

get_netmask_eep()

Return a mock netmask_eep string.

Return type:

str

Returns:

A string netmask_eep.

get_part_number()

Return a mock part_number string.

Return type:

str

Returns:

A string part_number.

get_serial_number()

Return a mock serial_number string.

Return type:

str

Returns:

A string serial_number.

property info: dict[str, Any]

Report MockTPM information.

Returns:

the info

read_address(address, n=1)

Get address value.

Parameters:
  • address (int) – Memory address to read from

  • n (int) – Number of items to read

Return type:

Any

Returns:

Values

read_register(register, n=1, offset=0)

Get register value.

Parameters:
  • register (int | str) – Memory register to read from

  • n (int) – Number of words to read

  • offset (int) – Memory address offset to read from

Raises:

NotImplementedError – if trying to read more than one value

Return type:

list[int]

Returns:

Values

property station_beamf: List[StationBeamformer]

Station beamf.

Returns:

the station_beamf.

property tpm_preadu: List[PreAdu]

Tpm pre adu.

Returns:

the preadu.

write_address(address, values, retry=True)

Write address value.

Parameters:
  • address (int) – Memory address to write

  • values (list[int]) – value to write

  • retry (bool) – retry (does nothing yet.)

Return type:

None

write_register(register, values, offset=0, retry=True)

Set register value.

Parameters:
  • register (int | str) – Register name

  • values (list[int]) – Values to write

  • offset (int) – Memory address offset to write to

  • retry (bool) – retry

Raises:
  • LibraryError – Attempting to set a register not in the memory address.

  • NotImplementedError – if trying to write more than one value

Return type:

None

class TileComponentManager(simulation_mode, test_mode, logger, poll_rate, tile_id, station_id, tpm_ip, tpm_cpld_port, preadu_levels, static_time_delays, subrack_fqdn, subrack_tpm_id, preadu_present, communication_state_changed_callback, component_state_changed_callback, update_attribute_callback, _tile=None, event_serialiser=None, default_lock_timeout=0.4, poll_timeout=6.0, power_callback_timeout=6.0)

A component manager for a Tile (simulator or driver) and its power supply.

__init__(simulation_mode, test_mode, logger, poll_rate, tile_id, station_id, tpm_ip, tpm_cpld_port, preadu_levels, static_time_delays, subrack_fqdn, subrack_tpm_id, preadu_present, communication_state_changed_callback, component_state_changed_callback, update_attribute_callback, _tile=None, event_serialiser=None, default_lock_timeout=0.4, poll_timeout=6.0, power_callback_timeout=6.0)

Initialise a new instance.

Parameters:
  • simulation_mode (SimulationMode) – the simulation mode of this component manager. If SimulationMode.TRUE, then this component manager will launch an internal TPM simulator and interact with it; if SimulationMode.FALSE, this component manager will attempt to connect with an external TPM at the configured IP address and port.

  • test_mode (TestMode) – the test mode of this component manager. This has no effect when the device is in SimulationMode.FALSE. But when the simulation mode is SimulationMode.TRUE, then this determines some properties of the simulator: if the test mode is TestMode.TEST, then the simulator will return static “canned” values that are easy to assert against during testing; if TestMode.NONE, the simulator will return dynamically changing values for attributes such as temperatures and voltages, making for a nice demo but not so easy to test against.

  • logger (Logger) – a logger for this object to use

  • poll_rate (float) – the poll rate

  • tile_id (int) – the unique ID for the tile

  • station_id (int) – the unique ID for the station to which this tile belongs.

  • tpm_ip (str) – the IP address of the tile

  • tpm_cpld_port (int) – the port at which the tile is accessed for control

  • preadu_levels (Optional[list[float]]) – preADU gain attenuation settings to apply for this TPM.

  • static_time_delays (list[float]) – Delays in nanoseconds to account for static delay missmatches.

  • subrack_fqdn (str) – FQDN of the subrack that controls power to this tile

  • subrack_tpm_id (int) – This tile’s position in its subrack

  • preadu_present (list[bool]) – A list representing if the PreAdu is attached.

  • communication_state_changed_callback (Callable[[CommunicationStatus], None]) – callback to be called when the status of the communications channel between the component manager and its component changes

  • update_attribute_callback (Callable[..., None]) – Callback to call when attribute is updated.

  • component_state_changed_callback (Callable[..., None]) – callback to be called when the component state changes

  • event_serialiser (Optional[EventSerialiser]) – serialiser for events

  • _tile (Optional[TileSimulator]) – Optional tile to inject.

  • default_lock_timeout (float) – default timeout for hardware serialisation lock.

  • poll_timeout (float) – max time for a poll to wait on hardware serialisation lock before giving up.

  • power_callback_timeout (float) – max time for power callback to wait for hardware serialisation lock to determine if initialisation is required.

property adcs: dict[str, Any]

Return the ADC status in the TPM.

Returns:

ADC status in the TPM

property alarms: dict[str, Any]

Return the alarms status in the TPM.

Returns:

alarms status in the TPM

apply_calibration(load_time='')

Load the calibration coefficients at the specified time delay.

Parameters:

load_time (str) – switch time as ISO formatted time

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message.

apply_pointing_delays(load_time='')

Load the pointing delays at the specified time delay.

Parameters:

load_time (str) – switch time as ISO formatted time

Raises:

ValueError – invalid time

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

property arp_table: dict[int, list[int]] | None

Check that ARP table has been populated in for all used cores 40G interfaces.

Use cores 0 (fpga0) and 1(fpga1) and ARP ID 0 for beamformer, 1 for LMC 10G interfaces use cores 0,1 (fpga0) and 4,5 (fpga1) for beamforming, and 2, 6 for LMC with only one ARP.

Returns:

list of core id and arp table populated

beamformer_running_for_channels(channel_groups)

Check if the beamformer is running in a list of channel blocks.

Parameters:

channel_groups (Optional[list[int]]) – List of channel blocks to check

Return type:

bool

Returns:

True if the beamformer is running

property board_temperature: float

Return the temperature of the TPM.

Returns:

the temperature of the TPM

property broadband_rfi_factor: float

Return the broadband RFI factor.

Returns:

the broadband RFI factor

property channeliser_truncation: list[int] | None

Read the value for the channeliser truncation.

Returns:

value for the channeliser truncation

cleanup()

Perform necessary cleanup before device teardown.

Return type:

None

clear_broadband_rfi()

Clear the broadband RFI levels.

Return type:

None

property clock_present: NoReturn

Check if 10 MHz clock signal is present.

Raises:

NotImplementedError – not implemented in ska-low-sps-tpm-api.

configure_40g_core(core_id=0, arp_table_entry=0, source_mac=None, source_ip=None, source_port=None, destination_ip=None, destination_port=None, rx_port_filter=None, netmask=None, gateway_ip=None)

Configure the 40G code.

Parameters:
  • core_id (int) – id of the core

  • arp_table_entry (int) – ARP table entry to use

  • source_mac (Optional[int]) – MAC address of the source

  • source_ip (Optional[str]) – IP address of the source

  • source_port (Optional[int]) – port of the source

  • destination_ip (Optional[str]) – IP address of the destination

  • destination_port (Optional[int]) – port of the destination

  • rx_port_filter (Optional[int]) – Filter for incoming packets

  • netmask (Optional[str]) – Netmask

  • gateway_ip (Optional[str]) – Gateway IP

Return type:

tuple[list[ResultCode], list[str]]

Returns:

result code and message

configure_integrated_beam_data(integration_time=0.5, first_channel=0, last_channel=191)

Configure and start the transmission of integrated channel data.

Configure with the provided integration time, first channel and last channel. Data are sent continuously until the StopIntegratedData command is run.

Parameters:
  • integration_time (float) – integration time in seconds, defaults to 0.5

  • first_channel (int) – first channel

  • last_channel (int) – last channel

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

configure_integrated_channel_data(integration_time=0.5, first_channel=0, last_channel=511)

Configure and start the transmission of integrated channel data.

Configure with the provided integration time, first channel and last channel. Data are sent continuously until the StopIntegratedData command is run.

Parameters:
  • integration_time (float) – integration time in seconds, defaults to 0.5

  • first_channel (int) – first channel

  • last_channel (int) – last channel

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

configure_pattern_generator(stage, pattern, adders, start=False, shift=0, zero=0, ramp1=None, ramp2=None)

Configure the TPM pattern generator.

Parameters:
  • stage (str) – The stage in the signal chain where the pattern is injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), or ‘beamf’ (output of tile beamformer) or ‘all’ for all stages.

  • pattern (list[int]) – The data pattern in time order. This must be a list of integers with a length between 1 and 1024. The pattern represents values in time order (not antennas or polarizations).

  • adders (list[int]) – A list of 32 integers that expands the pattern to cover 16 antennas and 2 polarizations in hardware. This list maps the pattern to the corresponding signals for the antennas and polarizations.

  • start (bool) – Boolean flag indicating whether to start the pattern immediately. If False, the pattern will need to be started manually later.

  • shift (int) – Optional bit shift (divides the pattern by 2^shift). This must not be used in the ‘beamf’ stage, where it is always overridden to 4. The default value is 0.

  • zero (int) – An integer (0-65535) used as a mask to disable the pattern on specific antennas and polarizations. The same mask is applied to both FPGAs, supporting up to 8 antennas and 2 polarizations. The default value is 0.

  • ramp1 (Optional[dict[str, int]]) – if defined a ramp will be applied for ramp1 after the pattern is set. A mandatory kwarg polarisation is used a configuration. This must be 0, 1 or -1 to use all stages.

  • ramp2 (Optional[dict[str, int]]) – if defined a ramp will be applied for ramp2 after the pattern is set. A mandatory kwarg polarisation is used a configuration. This must be 0, 1 or -1 to use all stages.

Return type:

None

configure_test_generator(frequency0, amplitude0, frequency1, amplitude1, amplitude_noise, pulse_code, amplitude_pulse, delays=None, load_time=None)

Test generator setting.

Parameters:
  • frequency0 (float) – Tone frequency in Hz of DDC 0

  • amplitude0 (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • frequency1 (float) – Tone frequency in Hz of DDC 1

  • amplitude1 (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • amplitude_noise (float) – Amplitude of pseudorandom noise normalized to 26.03 ADC units, resolution 0.102 ADU

  • pulse_code (int) – Code for pulse frequency. Range 0 to 7: 16,12,8,6,4,3,2 times frame frequency

  • amplitude_pulse (float) – pulse peak amplitude, normalized to 127.5 ADC units, resolution 0.5 ADU

  • delays (Optional[list[float]]) – delays to load into the test generator, list of 32 floats.

  • load_time (Optional[str]) – Time to start the generator. in UTC ISO formatted string.

Raises:
Return type:

None

connect()

Check we can connect to the TPM.

Raises:

ConnectionError – when unable to connect to TPM

Return type:

None

property csp_rounding: list[int]

Read the cached value for the final rounding in the CSP samples.

Need to be specfied only for the last tile :return: Final rounding for the CSP samples. Up to 384 values

property csp_spead_format: str

Get CSP SPEAD format.

CSP format is: AAVS for the format used in AAVS2-AAVS3 system, using a reference Unix time specified in the header. SKA for the format defined in SPS-CBF ICD, based on TAI2000 epoch.

Returns:

CSP Spead format. AAVS or SKA

property current_tile_beamformer_frame: int

Return current tile beamformer frame, in units of 256 ADC frames.

Returns:

current tile beamformer frame

property data_router_discarded_packets: dict

Return the data router values.

Returns:

The number of discarded packets

property data_router_status: dict

Return the data router values.

Returns:

The status of both FPGAs

disable_broadband_rfi_blanking(antennas)

Disable broadband RFI blanking for a list of antennas.

Parameters:

antennas (list[int]) – list of antennas to disable broadband RFI blanking

Return type:

None

disable_station_beam_flagging()

Disable station beam flagging.

Return type:

None

download_firmware(argin, task_callback)

Submit the download_firmware slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • argin (str) – can either be the design name returned from GetFirmwareAvailable command, or a path to a file

  • task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a result code and a unique id string to identify the command

enable_broadband_rfi_blanking(antennas)

Enable broadband RFI blanking for a list of antennas.

Parameters:

antennas (list[int]) – list of antennas to enable broadband RFI blanking

Return type:

None

enable_station_beam_flagging()

Enable station beam flagging.

Return type:

None

property firmware_available: List[dict[str, Any]]

Return the list of the firmware loaded in the system.

Returns:

the firmware list

property firmware_name: str

Return the name of the firmware that this TPM simulator is running.

Returns:

firmware name

property firmware_version: str

Return the name of the firmware that this TPM simulator is running.

Returns:

firmware version (major.minor)

property flagged_packets: dict

Return the total number of flagged packets by the TPM.

Returns:

the total number of flagged packets by the TPM

property formatted_fpga_reference_time: str

Return FPGA reference time in UTC format.

Reference time is set as part of start_observation. It represents the timestamp for the first frame

Returns:

FPGA reference time

property fpga1_temperature: float

Return the temperature of FPGA 1.

Returns:

the temperature of FPGA 1

property fpga2_temperature: float

Return the temperature of FPGA 2.

Returns:

the temperature of FPGA 2

property fpga_current_frame: int

Return the FPGA current frame counter.

Returns:

the FPGA_1 current frame counter

Raises:

ConnectionError – if communication with tile failed

property fpga_frame_time: str

Return FPGA frame time in UTC format.

frame time is the timestamp for the current frame being processed. Value reported here refers to the ADC frames, but the total processing delay is < 1ms and thus irrelevant on the timescales of MCCS response time

Returns:

FPGA reference time

property fpga_reference_time: int

Return the FPGA reference time.

Required to map the FPGA timestamps, expressed in frames to UTC time

Returns:

the FPGA_1 reference time, in Unix seconds

property fpga_time: str

Return FPGA internal time in UTC format.

Returns:

FPGA internal time

property fpgas_time: list[int]

Return the FPGAs clock time.

Useful for detecting clock skew, propagation delays, contamination delays, etc.

Returns:

the FPGAs clock time

Raises:

ConnectionError – if communication with tile failed

frame_from_utc_time(utc_time)

Return the frame from utc time.

Parameters:

utc_time (str) – the time in utc format.

Return type:

int

Returns:

the frame from utc time.

get_40g_configuration(core_id=-1, arp_table_entry=0)

Return a 40G configuration.

Parameters:
  • core_id (int) – id of the core for which a configuration is to be return. Defaults to -1, in which case all cores configurations are returned

  • arp_table_entry (int) – ARP table entry to use

Return type:

list[dict]

Returns:

core configuration or list of core configurations

get_40g_packet_counts()

Get 40G packet counts.

The return value depends on how many 40G cores are active. Typically, only one core is active.

Example:

# 0 cores active
{}

# 1 core active
{
    'FPGA0': {
        'rx_received': 2921,
        'rx_forwarded': 0,
        'tx_transmitted': 6973024
    }
}

# 2 cores active
{
    'FPGA0': {
        'rx_received': 3881,
        'rx_forwarded': 0,
        'tx_transmitted': 7321460
    },
    'FPGA1': {
        'rx_received': 1,
        'rx_forwarded': 0,
        'tx_transmitted': 3122
    }
}
Returns:

Packet counts per active 40G core. Returns an empty dictionary if no 40G cores are active.

Return type:

dict

get_all_pointing_delays()

Read pointing delays from the TPM for all beams.

Return type:

ndarray

Returns:

pointing delays for all beams as (8, 32) ndarray.

get_current_warning_thresholds(current='')

Get the current warning thresholds.

Parameters:

current (str) – The current type to get the thresholds for.

Return type:

str

Returns:

a jsonified dictionary with the current warning thresholds or a message if the specified current is not recognized.

get_request()

Return the action/s to be taken in the next poll.

Raises:

AssertionError – if the request provider is not initialised i.e has a None value.

Return type:

Union[TileRequest, TileLRCRequest, None]

Returns:

request to be to be executed in the next poll.

get_static_delays()

Read the cached value for the static delays, in sample.

Return type:

list[float]

Returns:

static delay, in nanoseconds one per TPM input

get_tpm_temperature_thresholds()

Return the temperature thresholds in firmware.

Return type:

Optional[dict[str, float]]

Returns:

A dictionary containing the thresholds or None if lock could not be acquired in 0.4 seconds.

get_voltage_warning_thresholds(voltage='')

Get the voltage warning thresholds.

Parameters:

voltage (str) – The voltage type to get the thresholds for.

Return type:

str

Returns:

a jsonified dictionary with the voltage warning thresholds or a message if the specified voltage is not recognized.

property global_reference_time: str | None

Return the Unix time used as global synchronization time.

Returns:

Unix time used as global synchronization time

initialise(task_callback=None, force_reprogramming=True)

Submit the initialise slow task.

This method returns immediately after it is submitted for polling.

Parameters:
  • task_callback (Optional[Callable]) – Update task state, defaults to None

  • force_reprogramming (bool) – Force FPGA reprogramming, for complete initialisation

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a task status and a unique id string to identify the command

initialise_beamformer(start_channel, nof_channels, is_first, is_last)

Initialise the beamformer.

Parameters:
  • start_channel (int) – the start channel

  • nof_channels (int) – number of channels

  • is_first (bool) – whether this is the first (?)

  • is_last (bool) – whether this is the last (?)

Return type:

None

property is_beamformer_running: bool | None

Check if the beamformer is running.

Returns:

True if the beamformer is running

property is_programmed: bool

Return whether this TPM is programmed (i.e. firmware has been downloaded to it).

Returns:

whether this TPM is programmed

property is_station_beam_flagging_enabled: list

Return station beam data flagging state for each fpga.

Returns:

a list of bool values corresponding to the fpgas

load_calibration_coefficients(antenna, calibration_coefficients)

Load calibration coefficients.

These may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay.

Parameters:
  • antenna (int) – the antenna to which the coefficients apply

  • calibration_coefficients (list[list[complex]]) – a bidirectional complex array of coefficients, flattened into a list

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message.

load_calibration_coefficients_for_channels(start_channel, calibration_coefficients)

Load calibration coefficients for all antennas and a subset of channels.

These may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay.

Parameters:
  • start_channel (int) – the first channel to which the coefficients apply

  • calibration_coefficients (list[list[list[complex]]]) – a tridimensional complex array of coefficients

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message.

load_pointing_delays(delay_array, beam_index)

Specify the delay in seconds and the delay rate in seconds/second.

The delay_array specifies the delay and delay rate for each antenna. beam_index specifies which beam is desired (range 0-7)

Parameters:
  • delay_array (list[list[float]]) – list of [delay in seconds, and delay rate in seconds/second] per antenna.

  • beam_index (int) – the beam to which the pointing delay should be applied

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

max_broadband_rfi(antennas)

Read the maximum broadband RFI levels for a list of antennas.

Parameters:

antennas (list[int]) – list of antennas to read maximum broadband RFI levels

Return type:

int

Returns:

maximum broadband RFI levels for the specified antennas

off(task_callback=None)

Tell the upstream power supply proxy to turn the tpm off.

Parameters:

task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

a result code and a unique_id or message.

on(task_callback=None)

Tell the upstream power supply proxy to turn the tpm on.

Parameters:

task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

a result code and a unique_id or message.

Raises:

AssertionError – request_provider is not yet initialised.

property pending_data_requests: bool | None

Check for pending data requests.

Returns:

whether there are pending send data requests

ping()

Check we can communicate with TPM.

Return type:

None

poll(poll_request)

Poll request for TileComponentManager.

Execute a command or read some values.

Parameters:

poll_request (TileRequest | TileLRCRequest) – specification of the actions to be taken in this poll.

Return type:

TileResponse

Returns:

responses to queries in this poll

poll_failed(exception)

Handle a failed poll.

This is a hook called by the poller when an exception was raised.

NOTE: This implementation may be a bit simplistic as of MCCS-1507. The exception code can be used to give more information to user. And potentially inform the poll prioritisation.

Parameters:

exception (Exception) – exception code raised from poll.

Return type:

None

poll_succeeded(poll_response)

Handle the receipt of new polling values.

This is a hook called by the poller when values have been read during a poll.

Parameters:

poll_response (TileResponse) – response to the pool, including any values read.

Return type:

None

polling_started()

Initialise the request provider and start connecting.

Return type:

None

polling_stopped()

Uninitialise the request provider and set state UNKNOWN.

Return type:

None

property pps_delay: int | None

Return the pps delay from the TPM.

Returns:

the pps_delay from the TPM.

property pps_present: bool

Check if PPS signal is present.

Returns:

True if PPS is present. Checked in poll loop, cached

read_address(address, nvalues)

Return a list of values from a given address.

Parameters:
  • address (int) – address of start of read

  • nvalues (int) – number of values to read

Return type:

list[int]

Returns:

values at the address

read_all_live_calibration_coefficients()

Read all live calibration coefficients from the TPM.

Return type:

list[list[list[complex]]]

Returns:

Calibration coefficient array

read_all_staged_calibration_coefficients()

Read all staged calibration coefficients from the TPM.

Return type:

list[list[list[complex]]]

Returns:

Calibration coefficient array

read_broadband_rfi(antennas)

Read the broadband RFI levels for a list of antennas.

Parameters:

antennas (range | list[int]) – list of antennas to read broadband RFI levels

Return type:

ndarray

Returns:

broadband RFI levels for the specified antennas

read_firmware_thresholds()

Read all thresholds from firmware and save in cache.

Return type:

FirmwareThresholds

Returns:

the firmwareThesholds.

read_register(register_name)

Read the values in a named register.

Parameters:

register_name (str) – name of the register

Return type:

list[int]

Returns:

values read from the register

Raises:

ValueError – if the tpm is value None.

reevaluate_tpm_status()

Reevaluate the TpmStatus.

NOTE: This method should not be needed. But can be used as a sanity check on the TileProgrammingState

Return type:

bool

Returns:

True is the re-evaluated TpmStatus differs from the automated evaluation.

property register_list: list[str]

Return a list of registers available on each device.

Returns:

list of registers

Raises:

ValueError – if the tpm is value None.

property rfi_blanking_enabled_antennas: list[int]

Return the list of antennas with RFI blanking enabled.

Returns:

list of antennas with RFI blanking enabled

property running_beams: list[bool]

List hardware beams currently running.

Returns:

list of hardware beam running states

send_data_samples(data_type='', start_time=None, seconds=0.2, n_samples=1024, sync=False, first_channel=0, last_channel=511, channel_id=128, frequency=100.0, round_bits=3, **params)

Front end for send_xxx_data methods.

Parameters:
  • data_type (str) – sample type. “raw”, “channel”, “channel_continuous”, “narrowband”, “beam”

  • start_time (Optional[str]) – UTC Time for start sending data. Default start now

  • seconds (float) – Delay if timestamp is not specified. Default 0.2 seconds

  • n_samples (int) – number of samples to send per packet

  • sync (bool) – (raw) send synchronised antenna samples, vs. round robin

  • first_channel (int) – (channel) first channel to send, default 0

  • last_channel (int) – (channel) last channel to send, default 511

  • channel_id (int) – (channel_continuous) channel to send

  • frequency (float) – (narrowband) Sky frequency for band centre, in Hz

  • round_bits (int) – (narrowband) how many bits to round

  • params (Any) – any additional keyword arguments

Raises:

ValueError – error in time specification

Return type:

None

set_beamformer_regions(regions)

Set the frequency regions to be beamformed into a single beam.

The input list contains up to 48 blocks which represent at most 16 contiguous channel regions. Each block has 8 entries which represent: - starting physical channel - number of channels - hardware beam number - subarray ID - subarray logical channel - subarray beam ID - substation ID

Parameters:

regions (list[list[int]]) – a list encoding up to 48 regions

Raises:

ValueError – if the tpm is value None.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

set_broadband_rfi_factor(factor)

Set the broadband RFI factor.

Parameters:

factor (float) – the broadband RFI factor

Return type:

None

set_csp_download(src_port, dst_ip_1, dst_ip_2, dst_port, is_last, netmask, gateway)

Set CSP Destination per tile.

Parameters:
  • src_port (int) – Source port

  • dst_ip_1 (str) – Destination IP FPGA1

  • dst_ip_2 (str) – Destination IP FPGA2

  • dst_port (int) – Destination port

  • is_last (bool) – True for last tile in beamforming chain

  • netmask (str) – Netmask

  • gateway (str) – Gateway IP

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message for information.

set_current_warning_thresholds(current, min_thr, max_thr)

Set the current warning thresholds.

Parameters:
  • current (str) – The current type to set the thresholds for. Must be one of the keys in the current warning thresholds dictionary.

  • min_thr (float) – The minimum threshold value.

  • max_thr (float) – The maximum threshold value.

Return type:

Optional[dict[str, Any]]

Returns:

The set current thresholds if successful else None

set_lmc_download(mode, payload_length=1024, dst_ip='10.0.10.1', src_port=61648, dst_port=4660, netmask_40g=None, gateway_40g=None)

Specify whether control data will be transmitted over 1G or 40G networks.

Parameters:
  • mode (str) – “1G” or “10G”

  • payload_length (int) – SPEAD payload length for integrated channel data, defaults to 1024

  • dst_ip (str) – destination IP, defaults to “10.0.10.1”

  • src_port (Optional[int]) – sourced port, defaults to 0xF0D0

  • dst_port (Optional[int]) – destination port, defaults to 4660

  • netmask_40g (Optional[str]) – netmask of the 40g subnet

  • gateway_40g (Optional[str]) – IP address of the 40g subnet gateway, if it exists.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

set_lmc_integrated_download(mode, channel_payload_length, beam_payload_length, dst_ip='10.0.10.1', src_port=61648, dst_port=4660, netmask_40g=None, gateway_40g=None)

Configure link and size of control data.

Parameters:
  • mode (str) – ‘1G’ or ‘10G’

  • channel_payload_length (int) – SPEAD payload length for integrated channel data

  • beam_payload_length (int) – SPEAD payload length for integrated beam data

  • dst_ip (str) – Destination IP, defaults to “10.0.10.1”

  • src_port (int) – source port, defaults to 0xF0D0

  • dst_port (int) – destination port, defaults to 4660

  • netmask_40g (Optional[str]) – netmask of the 40g subnet

  • gateway_40g (Optional[str]) – IP address of the 40g subnet gateway, if it exists.

Return type:

None

set_phase_terminal_count(value)

Set the phase terminal count.

Parameters:

value (int) – the phase terminal count

Return type:

None

set_pps_delay_correction(correction)

Set the ppsDelay correction.

Parameters:

correction (int) – the correction to set

Return type:

None

set_preadu_levels(levels)

Set preadu levels in dB.

Parameters:

levels (ndarray) – Preadu attenuation levels in dB

Raises:
  • ValueError – When attempting to set preaduLevels with a list of length not equal to 32 (i.e 32 ADC channels).

  • HardwareVerificationError – When the readback from hardware is unexpected.

Return type:

None

set_static_delays(delays)

Set the static delays.

Parameters:

delays (list[float]) – Static zenith delays, one per input channel, in nanoseconds, nominal = 0, positive delay adds delay to the signal stream

Return type:

None

set_station_id(value)

Set Station ID.

Parameters:

value (int) – assigned station Id value

Return type:

None

set_tile_id(value)

Set Tile ID.

Parameters:

value (int) – assigned tile Id value

Return type:

None

set_tpm_temperature_thresholds(max_board_alarm_threshold=None, max_fpga1_alarm_threshold=None, max_fpga2_alarm_threshold=None)

Set the temperature thresholds.

NOTE: Warning this method can configure the shutdown temperature of components and must be used with care. This method is capped to a maximum of 50 (unit: Degree Celsius). And is ONLY supported in tpm1_6.

Parameters:
  • max_board_alarm_threshold (Optional[float]) – The maximum alarm thresholds for the board (unit: Degree Celsius)

  • max_fpga1_alarm_threshold (Optional[float]) – The maximum alarm thresholds for the fpga1 (unit: Degree Celsius)

  • max_fpga2_alarm_threshold (Optional[float]) – The maximum alarm thresholds for the fpga2 (unit: Degree Celsius)

Return type:

dict[str, Any]

Returns:

The set temperature thresholds if successful else None

set_voltage_warning_thresholds(voltage, min_thr, max_thr)

Set the voltage warning thresholds.

Parameters:
  • voltage (str) – The voltage type to set the thresholds for. Must be one of the keys in the voltage warning thresholds dictionary.

  • min_thr (float) – The minimum threshold value.

  • max_thr (float) – The maximum threshold value.

Return type:

Optional[dict[str, Any]]

Returns:

The set voltage thresholds if successful else None

start_acquisition(task_callback=None, start_time=None, delay=2, global_reference_time=None)

Submit the start_acquisition slow task.

Parameters:
  • task_callback (Optional[Callable]) – Update task state, defaults to None

  • start_time (Optional[str]) – the acquisition start time

  • delay (int) – a delay to the acquisition start

  • global_reference_time (Optional[str]) – the start time assumed for starting the timestamp

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a result code and a unique id string to identify the command

start_adcs()

Start the ADCs.

Return type:

None

start_beamformer(task_callback=None, start_time=None, duration=-1, channel_groups=None, scan_id=0)

Start beamforming on a specific subset of the beamformed channels.

Current firmware version does not support channel mask and scan ID, these are ignored

Parameters:
  • task_callback (Optional[Callable]) – Update task state, defaults to None

  • start_time (Optional[str]) – Start time as ISO formatted time

  • duration (int) – Scan duration, in frames, default “forever”

  • channel_groups (Optional[list[int]]) – Channel groups to be started Command affects only beamformed channels for given groups Default: all channels

  • scan_id (int) – ID of the scan to be started. Default 0

Raises:

ValueError – invalid time specified

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a task status and a unique id string to identify the command

start_pattern_generator(stage)

Start the pattern generator.

Parameters:

stage (str) – The stage in the signal chain where the pattern was injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), or ‘beamf’ (output of tile beamformer) or ‘all’ for all stages.

Return type:

None

stop_adcs()

Stop the ADCs.

Return type:

None

stop_beamformer(task_callback=None, channel_groups=None)

Stop the beamformer.

Parameters:
  • task_callback (Optional[Callable]) – Update task state, defaults to None

  • channel_groups (Optional[list[int]]) – Channel groups to be started Command affects only beamformed channels for given groups Default: all channels

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a result code and a unique id string to

stop_data_transmission()

Stop data transmission for send_channelised_data_continuous.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

stop_integrated_data()

Stop the integrated data.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

stop_pattern_generator(stage)

Stop the pattern generator.

Parameters:

stage (str) – The stage in the signal chain where the pattern was injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), or ‘beamf’ (output of tile beamformer) or ‘all’ for all stages.

Return type:

None

property sysref_present: NoReturn

Check if SYSREF signal is present.

Raises:

NotImplementedError – not implemented in ska-low-sps-tpm-api.

property test_generator_active: bool

Check if the test generator is active.

Returns:

whether the test generator is active

test_generator_input_select(inputs=0)

Specify ADC inputs which are substitute to test signal.

Specified using a 32 bit mask, with LSB for ADC input 0.

Parameters:

inputs (int) – Bit mask of inputs using test signal

Return type:

None

tile_info()

Return information about the tile.

Return type:

dict[str, Any]

Returns:

information relevant to tile.

property tpm_status: TpmStatus

Return the TPM status.

Returns:

the TPM status

update_fault_state(poll_success, exception_code=None)

Update fault state.

This method will evaluate if the current state if faulty. Depending on the previous fault state we will update the fault_state to allow navigation of the Opstate machine

NOTE: As evaluation becomes more complex we may want to refactor this method into a class. Currently this is a very simple evaluation, only checking for an inconsistent state.

Parameters:
  • poll_success (bool) – a bool representing if the poll was a success

  • exception_code (Optional[Any]) – the exception code raised in last poll.

Return type:

None

property voltage_mon: float

Return the internal 5V supply of the TPM.

Returns:

the internal 5V supply of the TPM

write_address(address, values)

Write a list of values to a given address.

Parameters:
  • address (int) – address of start of read

  • values (list[int]) – values to write

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

write_register(register_name, values)

Read the values in a register.

Parameters:
  • register_name (str) – name of the register

  • values (list[Any] | int) – values to write

Raises:

ValueError – if the tpm is value None.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

Result code and message

class TileData

This class contain data/facts about a tile needed by multiple classes.

For example the channelized sample and beamformer frame period, the number of antennas per tile. So rather than store this fact in separate places, we store it here.

classmethod generate_tile_defaults()

Compute the default values for tile monitoring points.

These are computed to be halfway between the minimum and maximum values. In cases where there is a permitted value instead, the permitted value is used

Return type:

dict[str, Any]

Returns:

the default values for tile monitoring points

classmethod get_tile_defaults()

Get the defaults for tile monitoring points.

If these have not been computed, compute them first.

Return type:

dict[str, Any]

Returns:

the defaults for tile monitoring points.

class TileHealthModel(health_changed_callback, hw_version, bios_version, preadu_presence, thresholds=None)

A health model for a tile.

At present this uses the base health model; this is a placeholder for a future, better implementation.

__init__(health_changed_callback, hw_version, bios_version, preadu_presence, thresholds=None)

Initialise a new instance.

Parameters:
  • health_changed_callback (HealthChangedCallbackProtocol) – callback to be called whenever there is a change to this this health model’s evaluated health state.

  • hw_version (str) – the TPM version.

  • bios_version (str) – the TPM bios version.

  • preadu_presence (list[bool]) – on a per channel basis is there a PreAdu present.

  • thresholds (Optional[dict[str, Any]]) – the threshold parameters for the health rules

evaluate_health()

Compute overall health of the tile.

The overall health is based on the fault and communication status of the tile.

Return type:

tuple[HealthState, str]

Returns:

an overall health of the tile

property health_params: dict[str, Any]

Get the thresholds for health rules.

Returns:

the thresholds for health rules

property intermediate_healths: dict[str, tuple[ska_control_model.HealthState, str]]

Get the intermediate health roll-up states.

Returns:

the intermediate health roll-up states

set_logger(logger)

Set logger for debugging.

Parameters:

logger (Any) – a logger.

Return type:

None

update_data(new_states)

Update this health model with state relevant to evaluating health.

Parameters:

new_states (dict) – New states of the data points.

Return type:

None

class TileRequestProvider(stale_attribute_callback=None, _request_iterator=None)

A class that manages requests for the Tile.

It ensures that:

  • commands get executed as promptly as possible

  • only attributes allowed to be polled given a TpmStatus are returned.

__init__(stale_attribute_callback=None, _request_iterator=None)

Initialise a new instance.

Parameters:
  • stale_attribute_callback (Optional[Callable]) – an optional callback to call with attributes no longer being updated.

  • _request_iterator (Optional[RequestIterator]) – an optional RequestIterator to supply for testing.

cleanup()

Clean up and notify callbacks.

Return type:

None

desire_configuration_read()

Register a request to read configuration from the TPM.

Return type:

None

desire_connection()

Register a request to connect with the TPM.

Return type:

None

enqueue_lrc(request, priority=999, wipe_time=None)

Register a request to be executed on the Tile.

Parameters:
  • priority (int) – The priority of the request. Lower number means higher priority.

  • request (TileLRCRequest) – The LRC request to execute on a poll.

  • wipe_time (Optional[float]) – the approx time at which to wipe this command.

Return type:

None

get_request(tpm_status)

Get the next request to execute on the Tile.

Parameters:

tpm_status (TpmStatus) – the tpm_status at the time of the request.

Return type:

UnionType[str, TileRequest, None]

Returns:

the next request to execute on the Tile.

inform_configuration_read()

Remove request to read configuration from TPM.

Return type:

None

class TileSimulator(logger)

This attempts to simulate ska_low_sps_tpm_api.Tile.

This is used for testing the tpm_driver, it implements __getitem__, __setitem__ so that the TileSimulator can interface with the TPMSimulator in the same way as the ska_low_sps_tpm_api.Tile interfaces with the ska_low_sps_tpm_api.boards.TPM. Instead of writing to a register we write to a dictionary. It overwrite read_address, write_address, read_register, write_register for simplicity.

__init__(logger)

Initialise a new TPM simulator instance.

Parameters:

logger (Logger) – a logger for this simulator to use

beamformer_is_running(mask=None, beam=None, channel_groups=None)

Beamformer is running.

Parameters:
  • mask (Optional[int]) – Bitmask of the channels to be started. Ignored if beam is specified.

  • beam (Optional[int]) – beam number to start. Computes the mask using beam table

  • channel_groups (Optional[list[int]]) – list of channel groups, in range 0:48. group 0 for channels 0-7, to group 47 for channels 380-383.

Return type:

bool

Returns:

is the beam is running

property broadband_rfi_factor: float

Return broadband RFI factor.

Returns:

broadband RFI factor

check_arp_table(timeout=30.0)

Check arp table.

Parameters:

timeout (float) – Timeout in seconds

Return type:

bool

Returns:

a bool representing if arp table is healthy.

check_communication()

Return status of connection to TPM CPLD and FPGAs.

Example:

>> OK Status:

{‘CPLD’: True, ‘FPGA0’: True, ‘FPGA1’: True}

>> TPM ON, FPGAs not programmed or TPM overtemperature self shutdown:

{‘CPLD’: True, ‘FPGA0’: False, ‘FPGA1’: False}

>> TPM OFF or Network Issue:

{‘CPLD’: False, ‘FPGA0’: False, ‘FPGA1’: False}

Return type:

dict[str, bool]

Returns:

a dictionary with the key communication information.

check_global_status_alarms()

Check global status alarms.

Return type:

dict[str, int]

Returns:

a dictionary with the simulated alarm status.

check_pending_data_requests()
Return type:

bool

Returns:

the pending data requess flag.

check_pll_locked()

Check in hardware if PLL is locked.

Return type:

bool

Returns:

True if PLL is locked.

cleanup()

Cleanup logic for gc.

Return type:

None

clear_broadband_rfi()

Clear broadband RFI counters.

Return type:

None

compute_calibration_coefficients()

Compute calibration coefficients.

Return type:

None

configure_40g_core(core_id=0, arp_table_entry=0, src_mac=None, src_ip=None, src_port=None, dst_ip=None, dst_port=None, rx_port_filter=None, netmask=None, gateway_ip=None)

Configure the 40G code.

The dst_mac parameter is ignored in true 40G core (ARP resolution used instead)

Parameters:
  • core_id (int) – id of the core

  • arp_table_entry (int) – ARP table entry to use

  • src_mac (Optional[int]) – MAC address of the source

  • src_ip (Optional[str]) – IP address of the source

  • src_port (Optional[int]) – port of the source

  • dst_ip (Optional[str]) – IP address of the destination

  • dst_port (Optional[int]) – port of the destination

  • rx_port_filter (Optional[int]) – Filter for incoming packets

  • netmask (Optional[str]) – Netmask

  • gateway_ip (Optional[str]) – Gateway IP

Raises:

ValueError – when the core_id is not [0,1]

Return type:

None

configure_integrated_beam_data(integration_time=0.5, first_channel=0, last_channel=191)

Configure and start continuous integrated beam data.

Parameters:
  • integration_time (float) – integration time in seconds, defaults to 0.5

  • first_channel (int) – first channel

  • last_channel (int) – last channel

Return type:

None

configure_integrated_channel_data(integration_time=0.5, first_channel=0, last_channel=511)

Configure and start continuous integrated channel data.

TODO Implement generation of integrated packets :type integration_time: float :param integration_time: integration time in seconds, defaults to 0.5 :type first_channel: int :param first_channel: first channel :type last_channel: int :param last_channel: last channel

Return type:

None

configure_ramp_pattern(stage, polarisation, ramp)

Configure a ramp pattern.

Parameters:
  • stage (str) – one of (“jesd”, “channel”, “beamf”, “all”)

  • polarisation (int) – one of (0, 1, -1). select -1 for “all” polariations

  • ramp (str) – (“ramp1”, “ramp2”, “all”)

Return type:

None

connect(initialise=False, load_plugin=True, enable_ada=False, enable_adc=True, dsp_core=True, adc_mono_channel_14_bit=False, adc_mono_channel_sel=0, adc_fullscale_voltage=1.59)

Attempt to form a connection with TPM.

Parameters:
  • initialise (bool) – Initialises the TPM object

  • load_plugin (bool) – loads software plugins

  • enable_ada (bool) – Enable ADC amplifier (usually not present)

  • enable_adc (bool) – Enable ADC

  • dsp_core (bool) – Enable loading of DSP core plugins

  • adc_mono_channel_14_bit (bool) – Enable ADC mono channel 14bit mode

  • adc_mono_channel_sel (int) – Select channel in mono channel mode (0=A, 1=B)

  • adc_fullscale_voltage (float) – Congiure ADC full-scale voltage (default 1.59 V)

Return type:

None

current_tile_beamformer_frame()
Return type:

int

Returns:

beamformer frame.

define_channel_table(region_array, fpga_id=None)

Set frequency regions.

Regions are defined in a 2-d array, for a maximum of 16 regions. Each element in the array defines a region, with the form: >> [start_ch, nof_ch, beam_index, <optional> >> subarray_id, subarray_logical_ch, aperture_id, substation_id] >> 0: start_ch: region starting channel (currently must be a >> multiple of 2, LS bit discarded) >> 1: nof_ch: size of the region: must be multiple of 8 chans >> 2: beam_index: subarray beam used for this region, range [0:48) >> 3: subarray_id: ID of the subarray [1:48] >> 4: subarray_logical_channel: Logical channel in the subarray >> it is the same for all (sub)stations in the subarray >> Defaults to station logical channel >> 5: subarray_beam_id: ID of the subarray beam >> Defaults to beam index >> 6: substation_ID: ID of the substation >> Defaults to 0 (no substation) >> 7: aperture_id: ID of the aperture (station*100+substation?) >> Defaults to

Total number of channels must be <= 384 The routine computes the arrays beam_index, region_off, region_sel, and the total number of channels nof_chans, and programs it in the hardware. Optional parameters are placeholders for firmware supporting more than 1 subarray. Current firmware supports only one subarray and substation, so corresponding IDs must be the same in each row

Parameters:
  • fpga_id (Optional[int]) – the id of the fpga we want to define the channel table for. if None both are configured.

  • region_array (list[list[int]]) – bidimensional array, one row for each spectral region, 3 or 8 items long

Return type:

bool

Returns:

True if OK

define_spead_header(station_id, subarray_id, nof_antennas, ref_epoch=-1, start_time=0, ska_spead_header_format=False)

Define the SPEAD header for the given parameters.

Parameters:
  • station_id (int) – The ID of the station.

  • subarray_id (int) – The ID of the subarray.

  • nof_antennas (int) – Number of antennas in the station

  • ref_epoch (int) – Unix time of epoch. -1 uses value defined in set_epoch

  • start_time (Optional[int]) – start time

  • ska_spead_header_format (bool) – True for new (SKA) CBF SPEAD header format

Return type:

bool

Returns:

a bool representing if command executed without error.

disable_all_adcs()

Disable all simulated ADC channels.

Return type:

None

disable_broadband_rfi_blanking(antennas=range(0, 16))

Disable broadband RFI blanking on specified antennas.

Parameters:

antennas (range | list[int]) – list antennas on which to disable rfi blanking

Return type:

None

disable_station_beam_flagging(fpga_id=None)

Disable station beam flagging.

Parameters:

fpga_id (Optional[int]) – id of the fpga.

Return type:

None

disconnect()

Disconnect the simulator by executing cleanup logic.

Return type:

None

enable_all_adcs()

Enable all simulated ADC channels.

Return type:

None

enable_broadband_rfi_blanking(antennas=range(0, 16))

Enable broadband RFI blanking on specified antennas.

Parameters:

antennas (range | list[int]) – list antennas on which to enable rfi blanking

Return type:

None

enable_station_beam_flagging(fpga_id=None)

Enable station beam flagging.

Parameters:

fpga_id (Optional[int]) – id of the fpga.

Return type:

None

erase_fpgas()

Erase the fpga firmware.

Return type:

None

evaluate_mcu_action()

Evaluate thresholds to temperatures.

In the case of overheating, we will mock the action of the MCU (micro controller unit).

Return type:

None

find_register(register_name='', display=False, info=False)

Return register information from a provided search string.

Note: this is a wrapper method of ‘ska_low_sps_tpm_api.boards.tpm.find_register’

Parameters:
  • register_name (str) – Regular expression to search against

  • display (bool) – True to output result to console

  • info (bool) – print a message with additional information if True.

Return type:

list[None | RegisterInfo]

Returns:

List of found registers

get_40g_core_configuration(core_id, arp_table_entry=0)

Return a 40G configuration.

Parameters:
  • core_id (int) – id of the core for which a configuration is to be returned. Defaults to -1, in which case all core configurations are returned, defaults to -1

  • arp_table_entry (int) – ARP table entry to use

Return type:

UnionType[dict[str, Any], list[dict], None]

Returns:

core configuration or list of core configurations or none

get_40g_packet_counts()

Get 40G packet counts.

The return value depends on how many 40G cores are active. Typically, only one core is active.

Return type:

dict[str, dict[str, int]]

Returns:

mocked 40G packet counts.

get_adc_rms(sync=False)

Get ADC power, immediate.

Parameters:

sync (Optional[bool]) – Synchronise RMS read

Return type:

list[float]

Returns:

the mock ADC rms values.

get_arp_table()

Get arp table.

Return type:

dict[int, list[int]]

Returns:

the app table

get_beamformer_regions()

Get frequency regions.

Read the beamformer regions in the channelizer, with all the parameters for each region. Only FPGA1 tables are read because the same region_array is written to FPGA2. Regions are defined in a 2-d array, for a maximum of 16 (48) regions. Each element in the array defines a region, with the form [start_ch, nof_ch, beam_index]

  • start_ch: region starting channel (currently must be a

    multiple of 2, LS bit discarded)

  • nof_ch: size of the region: must be multiple of 8 chans

  • beam_index: beam used for this region, range [0:8)

  • subarray_id: ID of the subarray [1:48]

  • subarray_logical_channel: Logical channel in the subarray

    it is the same for all (sub)stations in the subarray Defaults to station logical channel

  • subarray_beam_id: ID of the subarray beam

    Defaults to beam index

  • substation_ID: ID of the substation

    Defaults to 0 (no substation)

  • aperture_id: ID of the aperture (station*100+substation?)

    Defaults to antenna ID = 1, substation ID

Return type:

list[list[int]]

Returns:

Bidimensional array of regions

get_beamformer_table(fpga_id=0)

Return the beamformer table.

Returns a table with the following entries for each 8-channel block: >> 0: start physical channel (64-440) >> 1: beam_index: subarray beam used for this region, range [0:48) >> 2: subarray_id: ID of the subarray [1:48] >> Here is the same for all channels >> 3: subarray_logical_channel: Logical channel in the subarray >> Here equal to the station logical channel >> 4: subarray_beam_id: ID of the subarray beam >> 5: substation_id: ID of the substation >> 6: aperture_id: ID of the aperture (station*100+substation?)

Parameters:

fpga_id (int) – A parameter to specify what fpga we want to return the beamformer table for. (Default fpga_id = 0)

Note: this is a wrapper method of ‘ska_low_sps_tpm_api.boards.tpm.station_beamf.get_channel_table’

Return type:

list[list[int]]

Returns:

Nx7 table with one row every 8 channels

get_channeliser_truncation()

Get the channeliser truncation.

Return type:

list[int]

Returns:

the channeliser truncation

get_firmware_list()
Return type:

List[dict[str, Any]]

Returns:

firmware list.

get_fpga_time(device)
Parameters:

device (Device) – device.

Return type:

int

Returns:

the fpga_time.

Raises:

LibraryError – If invalid device specified.

get_fpga_timestamp(device=ska_low_sps_tpm_api.base.definitions.Device.FPGA_1)

Get timestamp from FPGA.

Parameters:

device (Device) – device.

Return type:

int

Returns:

the simulated timestamp.

Raises:

LibraryError – Invalid device specified

get_health_status(**kwargs)

Get the health state of the tile.

Parameters:

kwargs (Any) – Any kwargs to identify health group. see ska_low_sps_tpm_api.Tile

Return type:

dict[str, Any]

Returns:

mocked fetch of health.

get_phase_terminal_count()

Get PPS phase terminal count.

Return type:

int

Returns:

the simulated phase terminal count.

get_pointing_delay(beam_index)

Get pointing delay for a given beam.

Parameters:

beam_index (int) – beam number

Return type:

list[list[list[float]]]

Returns:

pointing delay in seconds

get_pps_delay(enable_correction=True)

Get the pps delay.

Parameters:

enable_correction (bool) – enable correction.

Return type:

int

Returns:

the pps delay.

get_preadu_levels()

Get preADU attenuation levels.

Return type:

list[float]

Returns:

Attenuation levels corresponding to each ADC channel, in dB.

get_station_id()

Get station ID.

Returns:

station ID programmed in HW

Return type:

int

get_temperature()

Get the board temperature.

Return type:

float

Returns:

a float with the board temperature.

Raises:

BoardError – when the CPLD is not connectable.

get_tile_id()
Return type:

int

Returns:

the mocked tile_id.

get_tpm_temperature_thresholds()

Return a dictionary of temperature thresholds.

return structure looks like: >>{ >> “board_warning_threshold”: max, >> “board_alarm_threshold” : max, >> “fpga1_warning_threshold”: max, >> “fpga1_alarm_threshold”: max, >> “fpga2_warning_threshold”: max, >> “fpga2_alarm_threshold”: max, >>}

Returns:

A dictionary containing the temperature thresholds.

Return type:

dict

initialise(station_id=0, tile_id=0, lmc_use_40g=False, lmc_dst_ip=None, lmc_dst_port=4660, lmc_integrated_use_40g=False, lmc_integrated_dst_ip=None, lmc_integrated_dst_port=4660, src_ip_fpga1=None, src_ip_fpga2=None, dst_ip_fpga1=None, dst_ip_fpga2=None, src_port=4661, dst_port=4660, netmask_40g=None, gateway_ip_40g=None, active_40g_ports_setting='port1-only', enable_adc=True, enable_ada=False, use_internal_pps=False, pps_delay=0, time_delays=0, pps_period=1, is_first_tile=False, is_last_tile=False, qsfp_detection='auto', adc_mono_channel_14_bit=False, adc_mono_channel_sel=0, adc_fullscale_voltage=1.59, global_start_time=None)

Initialise tile.

Parameters:
  • station_id (int) – station ID

  • tile_id (int) – Tile ID in the station

  • lmc_use_40g (bool) – if True use 40G interface to transmit LMC data, otherwise use 1G

  • lmc_dst_ip (Optional[str]) – destination IP address for LMC data packets

  • lmc_dst_port (int) – destination UDP port for LMC data packets

  • lmc_integrated_use_40g (bool) – if True use 40G interface to transmit LMC integrated data, otherwise use 1G

  • lmc_integrated_dst_ip (Optional[str]) – the destingation ip for lmc integrated data.

  • lmc_integrated_dst_port (int) – the destingation port for lmc integrated data.

  • src_ip_fpga1 (Optional[str]) – source IP address for FPGA1 40G interface

  • src_ip_fpga2 (Optional[str]) – source IP address for FPGA2 40G interface

  • dst_ip_fpga1 (Optional[str]) – destination IP address for beamformed data from FPGA1 40G interface

  • dst_ip_fpga2 (Optional[str]) – destination IP address for beamformed data from FPGA2 40G interface

  • src_port (int) – source UDP port for beamformed data packets

  • dst_port (int) – destination UDP port for beamformed data packets

  • enable_ada (bool) – enable adc amplifier, Not present in most TPM versions

  • enable_adc (bool) – Enable ADC

  • active_40g_ports_setting (str) – placeholder docstring

  • gateway_ip_40g (Optional[str]) – placeholder docstring

  • netmask_40g (Optional[str]) – placeholder docstring

  • use_internal_pps (bool) – use internal PPS generator synchronised across FPGAs

  • pps_delay (int) – PPS delay correction in 625ps units

  • time_delays (float | int | list) – time domain delays for 32 inputs

  • pps_period (int) – PPS period in seconds

  • is_first_tile (bool) – True if this tile is the first tile in the beamformer chain

  • is_last_tile (bool) – True if this tile is the last tile in the beamformer chain

  • qsfp_detection (str) – “auto” detects QSFP cables automatically, “qsfp1”, force QSFP1 cable detected, QSFP2 cable not detected “qsfp2”, force QSFP1 cable not detected, QSFP2 cable detected “all”, force QSFP1 and QSFP2 cable detected “flyover_test”, force QSFP1 and QSFP2 cable detected and adjust polarity for board-to-board cable “none”, force no cable not detected

  • adc_mono_channel_14_bit (bool) – Enable ADC mono channel 14bit mode

  • adc_mono_channel_sel (int) – Select channel in mono channel mode (0=A, 1=B)

  • adc_fullscale_voltage (float) – Congiure ADC full-scale voltage (default 1.59 V)

  • global_start_time (Optional[int]) – TPM will act as if it is started at this time (seconds)

Return type:

None

initialise_beamformer(start_channel, nof_channels, beam_index=0)

Mock set the beamformer parameters.

Parameters:
  • start_channel (int) – start_channel

  • nof_channels (int) – nof_channels

  • beam_index (int) – index of the beam

Raises:

ValueError – For out of range values.

Return type:

None

is_programmed()

Return whether the mock has been implemented.

Return type:

bool

Returns:

the mocked programmed state

is_station_beam_flagging_enabled(fpga_id=None)

Get the station beam flag state.

Parameters:

fpga_id (Optional[int]) – id of the fpga.

Return type:

list

Returns:

is station beam flag enabled

load_calibration_coefficients(antenna, calibration_coefficients)

Load calibration coefficients.

calibration_coefficients is a bi-dimensional complex array of the form calibration_coefficients[channel, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) the normal, expected response for an ideal antenna. Channel is the index specifying the channels at the beamformer output, i.e. considering only those channels actually processed and beam assignments. The polarization index ranges from 0 to 3. 0: X polarization direct element 1: X->Y polarization cross element 2: Y->X polarization cross element 3: Y polarization direct element The calibration coefficients may include any rotation matrix (e.g. the parallitic angle), but do not include the geometric delay.

Parameters:
  • antenna (int) – Antenna number (0-15)

  • calibration_coefficients (list[list[complex]]) – Calibration coefficient array (384x4 complex)

Raises:

ValueError – If inputs do not pass validation.

Return type:

None

load_calibration_coefficients_for_channels(first_channel, calibration_coefficients)

Load calibration coefficients for all antennas and specific channels.

calibration_coefficients is a tri-dimensional complex array of the form calibration_coefficients[channel, antenna, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) the normal, expected response for an ideal antenna.

channel is the index specifying the channels at the beamformer output, i.e. considering only those channels actually processed and beam assignments. First channel is specified in the parameter, the number of channels is determined by the array shape.

antenna is the antenna index, ranging 0 to 15.

The polarization index ranges from 0 to 3.

  • 0: X polarization direct element

  • 1: X->Y polarization cross element

  • 2: Y->X polarization cross element

  • 3: Y polarization direct element

The calibration coefficients may include any rotation matrix (e.g. the parallitic angle), but do not include the geometric delay.

Parameters:
  • first_channel (int) – First channel index at the beamformer output for which calibration coefficients are provided.

  • calibration_coefficients (ndarray) – Calibration coefficient array [channel, antenna, polarization]

Return type:

None

load_pointing_delay(load_time=0, load_delay=64)

Load pointing delay.

Parameters:
  • load_time (int) – load time

  • load_delay (int) – delay in (in ADC frames/256) to apply when load_time == 0

Return type:

None

max_broadband_rfi(antennas=range(0, 16))

Return maximum broadband RFI counter value.

Parameters:

antennas (range | list[int]) – list antenna IDs whose RFI counters to read

Return type:

int

Returns:

maximum broadband RFI counter value

mock_off(lock=False)

Fake a connection by constructing the TPM.

Parameters:

lock (bool) – True if we want to lock this state.

Return type:

None

mock_on(lock=False)

Fake a connection by constructing the TPM.

Parameters:

lock (bool) – True if we want to lock this state.

Return type:

None

program_fpgas(bitfile)

Mock programmed state to True.

Parameters:

bitfile (str) – the name of the bitfile to download

Raises:

LibraryError – if bitfile is of type None.

Return type:

None

read_all_live_calibration_coefficients()

Read all live calibration coefficients.

Return type:

list[list[list[complex]]]

Returns:

Calibration coefficient array

read_all_staged_calibration_coefficients()

Read all staged calibration coefficients.

Return type:

list[list[list[complex]]]

Returns:

Calibration coefficient array

read_broadband_rfi(antennas=range(0, 16))

Read out the broadband RFI counters.

Parameters:

antennas (range | list[int]) – list antennas of which rfi counters to read

Return type:

ndarray

Returns:

rfi counters

read_polyfilter_name()

Return the polyfilter name.

Return type:

str

Returns:

string filter name

reset_eth_errors()

Reset Ethernet errors.

Return type:

None

property rfi_blanking_enabled_antennas: list[int]

Return list of antennas with broadband RFI blanking enabled.

Returns:

list of antennas with rfi blanking enabled

send_beam_data(timeout=0, timestamp=None, seconds=0.2)

Send beam data.

Parameters:
  • timeout (int) – timeout

  • timestamp (Optional[int]) – timestamp

  • seconds (float) – When to synchronise

Return type:

None

send_channelised_data(number_of_samples=1024, first_channel=0, last_channel=511, timestamp=None, seconds=0.4)

Send channelised data from the TPM.

Parameters:
  • number_of_samples (int) – Number of spectra to send

  • first_channel (int) – First channel to send

  • last_channel (int) – Last channel to send

  • timestamp (Optional[int]) – When to start transmission

  • seconds (float) – When to synchronise

Return type:

None

send_channelised_data_continuous(channel_id, number_of_samples=128, wait_seconds=0, timestamp=None, seconds=0.2)

Continuously send channelised data from a single channel.

Parameters:
  • channel_id (int) – Channel ID

  • number_of_samples (int) – Number of spectra to send

  • wait_seconds (int) – Wait time before sending data

  • timestamp (Optional[int]) – When to start

  • seconds (float) – When to synchronise

Return type:

None

send_channelised_data_narrowband(frequency, round_bits, number_of_samples=128, wait_seconds=0, timestamp=None, seconds=0.2)

Continuously send channelised data from a single channel.

Parameters:
  • frequency (float) – Sky frequency to transmit

  • round_bits (int) – Specify which bits to round

  • number_of_samples (int) – Number of spectra to send

  • wait_seconds (int) – Wait time before sending data

  • timestamp (Optional[int]) – When to start

  • seconds (float) – When to synchronise

Return type:

None

send_raw_data(sync=False, timestamp=None, seconds=0.2, fpga_id=None)

Send raw data.

Parameters:
Return type:

None

set_beamformer_regions(region_array)

Set beamformer region_array.

Parameters:

region_array (list[list[int]]) – region_array

Return type:

None

set_broadband_rfi_factor(rfi_factor=1.0)

Set broadband RFI factor.

Parameters:

rfi_factor (float) – broadband RFI factor

Return type:

None

set_channeliser_truncation(trunc)

Set the channeliser coefficients to modify the bandpass.

Parameters:

trunc (list[int]) – list with M values, one for each of the frequency channels. Same truncation is applied to the corresponding frequency channels in all inputs.

Return type:

None

set_csp_download(src_port=None, dst_ip_1=None, dst_ip_2=None, dst_port=None, is_last=False, netmask=None, gateway=None)

Set CSP Destination.

Determines where station beams are sent on the Science Data Network.

Parameters:
Return type:

None

set_csp_rounding(rounding)

Set the final rounding in the CSP samples, one value per beamformer channel.

Parameters:

rounding (list[int]) – Number of bits rounded in final 8 bit requantization to CSP

Return type:

bool

Returns:

true is write a success.

set_first_last_tile(is_first, is_last)

Set first last tile in chain.

Parameters:
  • is_first (bool) – true if first

  • is_last (bool) – true if last

Return type:

bool

Returns:

a bool representing if command executed without error.

set_lmc_download(mode, data_type=None, payload_length=None, dst_ip='10.0.10.1', src_port=61648, dst_port=4660, netmask_40g=None, gateway_ip_40g=None)

Specify where the control data will be transmitted.

With the simulator no traffic will leave the cluster. To transmit data from the pod hosting the simulator to the DAQ (data acquisition) receiver, a Kubernetes service is required. Therefore dst_ip is the name of the service to use rather than the IP.

Parameters:
  • mode (str) – “1G” or “10G”

  • data_type (Optional[str]) – Specify which data type to configure, or None for all.

  • payload_length (Optional[int]) – SPEAD payload length for integrated channel data, defaults to 1024

  • dst_ip (str) – destination service.

  • src_port (Optional[int]) – sourced port, defaults to 0xF0D0

  • dst_port (Optional[int]) – destination port, defaults to 4660

  • netmask_40g (Optional[str]) – the mask to apply

  • gateway_ip_40g (Optional[str]) – the gateway ip.

Return type:

None

set_lmc_integrated_download(mode, data_type=None, channel_payload_length=None, beam_payload_length=None, dst_ip='10.0.10.1', src_port=61648, dst_port=4660, netmask_40g=None, gateway_ip_40g=None)

Configure link and size of control data for integrated LMC packets.

Parameters:
  • mode (str) – ‘1G’ or ‘10G’

  • data_type (Optional[str]) – Specify which data type to configure, or None for all.

  • channel_payload_length (Optional[int]) – SPEAD payload length for integrated channel data

  • beam_payload_length (Optional[int]) – SPEAD payload length for integrated beam data

  • dst_ip (str) – Destination IP

  • src_port (int) – Source port for integrated data streams

  • dst_port (int) – Destination port for integrated data streams

  • netmask_40g (Optional[str]) – the mask to apply to the 40g.

  • gateway_ip_40g (Optional[str]) – the gateway ip for the 40g.

Return type:

None

set_pattern(stage, pattern, adders, start=False, shift=0, zero=0)

Configure the TPM pattern generator.

Parameters:
  • stage (str) – The stage in the signal chain where the pattern is injected. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), or ‘beamf’ (output of tile beamformer) or ‘all’ for all stages.

  • pattern (list[int]) – The data pattern in time order. This must be a list of integers with a length between 1 and 1024. The pattern represents values in time order (not antennas or polarizations).

  • adders (list[int]) – A list of 32 integers that expands the pattern to cover 16 antennas and 2 polarizations in hardware. This list maps the pattern to the corresponding signals for the antennas and polarizations.

  • start (bool) – Boolean flag indicating whether to start the pattern immediately. If False, the pattern will need to be started manually later.

  • shift (int) – Optional bit shift (divides the pattern by 2^shift). This must not be used in the ‘beamf’ stage, where it is always overridden to 4. The default value is 0.

  • zero (int) – An integer (0-65535) used as a mask to disable the pattern on specific antennas and polarizations. The same mask is applied to both FPGAs, supporting up to 8 antennas and 2 polarizations. The default value is 0.

Return type:

None

set_phase_terminal_count(value)

Set PPS phase terminal count.

Parameters:

value (int) – phase terminal count to apply.

Return type:

None

set_pointing_delay(delay_array, beam_index)

Set pointing delay.

Parameters:
  • delay_array (list[list[float]]) – delay array

  • beam_index (int) – beam index

Return type:

None

set_preadu_levels(levels)

Set preADU attenuation levels.

Parameters:

levels (ndarray) – Desired attenuation levels for each ADC channel, in dB.

Return type:

None

set_spead_format(ska_spead_header_format)

Set CSP SPEAD format.

Parameters:

ska_spead_header_format (bool) – True for new (SKA) format, False for old (AAVS)

Return type:

None

set_station_id(station_id, tile_id)

Set mock registers to some value.

Parameters:
  • tile_id (int) – tile_id

  • station_id (int) – station_id

Return type:

None

set_test_generator_pulse(freq_code, amplitude=0.0)

Set test generator pulse.

Parameters:
  • freq_code (int) – Code for pulse frequency. Range 0 to 7: 16,12,8,6,4,3,2 times frame frequency

  • amplitude (float) – Tone peak amplitude, normalized to 127.5 ADC units, resolution 0.5 ADU

Return type:

None

set_time_delays(delays)

Set coarse zenith delay for input ADC streams.

Parameters:

delays (int | float | list[float]) – the delay in input streams, specified in nanoseconds. A positive delay adds delay to the signal stream

Return type:

bool

Returns:

True if command executed to completion.

set_tpm_temperature_thresholds(max_board_alarm_threshold=None, max_fpga1_alarm_threshold=None, max_fpga2_alarm_threshold=None)

Set the temperature thresholds.

NOTE: Warning this method can configure the shutdown temperature of components and must be used with care. This method is capped to a maximum of 50 (unit: Degree Celsius). And is ONLY supported in tpm1_6.

Parameters:
  • max_board_alarm_threshold (Optional[float]) – The maximum alarm thresholds for the board (unit: Degree Celsius)

  • max_fpga1_alarm_threshold (Optional[float]) – The maximum alarm thresholds for the fpga1 (unit: Degree Celsius)

  • max_fpga2_alarm_threshold (Optional[float]) – The maximum alarm thresholds for the fpga2 (unit: Degree Celsius)

Raises:

ValueError – is the value set is not in the set range.

Return type:

None

simulate_health_value(path, value)

Simulate a value in the health structure.

Parameters:
  • path (list[str]) – The dictionary path you want to inject a value at

  • value (Any) – The value you want injecting.

Return type:

None

property ska_spead_header: bool

Return format of the CSP Spead header.

Returns:

True for new new (SKA) format, False for old (AAVS)

property spead_ska_format_supported: bool

Check if new (SKA) format for CSP SPEAD header is supported.

Returns:

True if new (SKA) format for CSP SPEAD header is supported

start_acquisition(start_time=None, delay=2, global_start_time=None)

Start data acquisition.

Parameters:
  • start_time (Optional[int]) – Time for starting (frames)

  • delay (int) – delay after start_time (frames)

  • global_start_time (Optional[int]) – TPM will act as if it is started at this time (seconds)

Return type:

None

start_beamformer(start_time=0, duration=-1, scan_id=0, mask=None, beam=None, channel_groups=None)

Start beamformer.

Parameters:
  • start_time (int) – start time UTC

  • duration (int) – duration

  • scan_id (int) – ID of the scan, to be specified in the CSP SPEAD header

  • mask (Optional[int]) – Bitmask of the channels to be started. Ignored if beam is specified.

  • beam (Optional[int]) – beam number to start. Computes the mask using beam table

  • channel_groups (Optional[list[int]]) – list of channel groups, in range 0:48. group 0 for channels 0-7, to group 47 for channels 380-383.

Return type:

bool

Returns:

true if the beamformer was started successfully.

start_pattern(stage)

Start the pattern generator at the specified stage.

Parameters:

stage (str) – The stage in the signal chain where the pattern should be started. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), or ‘beamf’ (output of tile beamformer), or ‘all’ for all stages.

Return type:

None

stop_beamformer(mask=None, beam=None, channel_groups=None)

Stop beamformer.

Parameters:
  • mask (Optional[bool]) – Bitmask of the channels to be started. Ignored if beam is specified.

  • beam (Optional[int]) – beam number to start. Computes the mask using beam table

  • channel_groups (Optional[list[int]]) – list of channel groups, in range 0:48. group 0 for channels 0-7, to group 47 for channels 380-383.

Return type:

None

stop_data_transmission()

Stop data transmission.

Return type:

None

stop_integrated_data()

Stop integrated data.

Return type:

None

stop_pattern(stage)

Stop the pattern generator at the specified stage.

Parameters:

stage (str) – The stage in the signal chain where the pattern should be stopped. Options are: ‘jesd’ (output of ADCs), ‘channel’ (output of channelizer), or ‘beamf’ (output of tile beamformer), or ‘all’ for all stages.

Return type:

None

switch_calibration_bank(switch_time=0)

Switch calibration bank.

Parameters:

switch_time (int) – switch time

Return type:

None

test_generator_input_select(inputs)

Test generator input select.

Parameters:

inputs (int) – inputs

Return type:

None

test_generator_set_delay(delays)

Set delay values for test generator channels.

Parameters:

delays (list[float]) – one delay value per ADC channel (32 entries).

Raises:

ValueError – if delays does not contain exactly 32 entries.

Return type:

None

test_generator_set_noise(amplitude=0.0, load_time=0)

Set generator test noise.

Parameters:
  • amplitude (float) – amplitude of noise

  • load_time (int) – load time

Return type:

None

test_generator_set_tone(generator, frequency=100000000.0, amplitude=0.0, phase=0.0, load_time=0)

Test generator tone setting.

Parameters:
  • generator (int) – generator select. 0 or 1

  • frequency (float) – Tone frequency in Hz

  • amplitude (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • phase (float) – Initial tone phase, in turns

  • load_time (int) – Time to start the tone.

Return type:

None

property tile_info: str

Report tile firmware information.

Returns:

A string of tile information.

class TileTime(reference_time=0)

Library to convert from rfc3339 strings to internal Tile time and back.

Frame time is the time expressed as an offset in frames from a timestamp. Unix time is used for the timestamp. It is expressed in integrer seconds from 1970-01-01T01:00:00.000000Z. Time is otherwise expressed as a ISO-8601 (RFC3339) string, e.g. 2021-03-02T12:34.56.789000Z.

__init__(reference_time=0)

Set the internal reference time. To be used each time it is reset.

Parameters:

reference_time (int) – Unix timestamp of the (integer) reference time

format_time_from_frame(frame_count)

Format a time expressed as frame count into ISO-8601 (RFC3339) string.

e.g. 2021-03-02T12:34.56.789000Z. Returns the Unix zero time if the object is not yet initialised.

Parameters:

frame_count (int) – Frame number expressed as a multiple of 256 channelised samples

Return type:

str

Returns:

ISO-8601 formatted time

format_time_from_timestamp(timestamp)

Format a time expressed as a frame count into ISO-8601.

Format a time expressed as a frame count into a properly formatted ISO-8601 (RFC3339) string, e.g. 2021-03-02T12:34.56.789000Z.

Parameters:

timestamp (int) – Unix timestamp of the (integer) reference time

Returns:

ISO-8601 formatted time

Return type:

str

frame_from_utc_time(utc_time)

Return first frame after specified time.

Parameters:

utc_time (str) – Utc Time in standard rfc3339 format

Return type:

int

Returns:

frame number equal or after specified time. -1 if error

set_reference_time(reference_time)

Set the internal reference time. To be used each time it is reset.

Parameters:

reference_time (int) – Unix timestamp of the (integer) reference time

Return type:

None

timestamp_from_utc_time(utc_time)

Return first timestamp (Unix) second after specified time.

Does not account for leap seconds, utc_time must avoid them.

Parameters:

utc_time (str) – Utc Time in standard rfc3339 format

Return type:

int

Returns:

Unix timestamp equal or after specified time. -1 if error

class TpmStatus(value)

Enumerated type for tile status.

Used in initialisation to know what long running commands have been issued

INITIALISED = 5

Initialise command has been issued.

OFF = 1

The TPM is not powered.

PROGRAMMED = 4

The TPM is powered on and FPGAS are programmed.

SYNCHRONISED = 6

Time has been synchronised with UTC, timestamp is valid.

UNCONNECTED = 2

The TPM is not connected.

UNKNOWN = 0

The status is not known.

UNPROGRAMMED = 3

The TPM is powered on but FPGAS are not programmed.

pretty_name()

Return string representation.

Return type:

str

Returns:

String representation in camelcase