Tile subpackage

This subpackage implement tile functionality for the MCCS.

It includes an operational Tango device, a demonstrator Tango device, modules for driving and simulating TPM hardware, and a CLI.

class DemoTile(*args, **kwargs)

Version of the MccsTile tango device with extra methods for testing/demos.

init_device()

Tango hook for initialisation code.

Overridden here to log the fact that this is a demo tile.

Return type:

None

class DynamicTileSimulator(logger)

A simulator for a TPM, with dynamic value updates to certain attributes.

__init__(logger)

Initialise a new Dynamic Tile simulator instance.

Parameters:

logger (Logger) – a logger for this simulator to use

property board_temperature: float | None

Return the temperature of the TPM.

Returns:

the temperature of the TPM

property current: float

Return the current of the TPM.

Returns:

the current of the TPM

property fpga1_temperature: float

Return the temperature of FPGA 1.

Returns:

the temperature of FPGA 1

property fpga2_temperature: float

Return the temperature of FPGA 2.

Returns:

the temperature of FPGA 2

get_board_temperature()
Return type:

Optional[float]

Returns:

the mocked board temperature.

get_current()
Return type:

Optional[float]

Returns:

the mocked current.

get_fpga0_temperature()
Return type:

Optional[float]

Returns:

the mocked fpga0 temperature.

get_fpga1_temperature()
Return type:

Optional[float]

Returns:

the mocked fpga1 temperature.

get_voltage()
Return type:

Optional[float]

Returns:

the mocked voltage.

property voltage: float

Return the voltage of the TPM.

Returns:

the voltage of the TPM

class DynamicValuesGenerator(soft_min, soft_max, window_size=20, in_range_rate=0.95)

A generator of dynamic values with the following properties.

  • We want the values to gradually walk around their range rather than randomly jumping around. i.e. we want values to be temporally correlated. We achieve this by calculating values as a sliding window sum of a sequence of independent (uncorrelated) random values.

  • The provided range is a “soft” range – we allow values to walk outside this range occasionally. The proportion of time that the values should stay within the required range is exposed as an argument. This is useful for testing the alarm conditions of TANGO attributes: we set the soft range of this generator to the attribute’s alarm range, and we specify how often the attribute should exceed that range and thus start alarming.

__init__(soft_min, soft_max, window_size=20, in_range_rate=0.95)

Create a new instance.

Parameters:
  • soft_min (float) – a “soft” minimum value. For TANGO device attributes, this should be the alarm minimum.

  • soft_max (float) – a “soft” maximum value. For TANGO device attributes, this should be the alarm maximum.

  • window_size (int) – the size of the sliding window to sum over. A value of 1 will give uncorrelated values. Increasing the value increases correlation – a graph of how the value changes over time will be smoother. The default is 20.

  • in_range_rate (float) – the proportion of time during which the value should remain within the [soft_min, soft_max] range. The default is 0.95. Don’t change this to 1.0 unless you want the variance to collapse: you’ll get the mean of the range every time.

class DynamicValuesUpdater(update_rate=1.0)

An dynamic updater of values, for use in a dynamic simulator.

__init__(update_rate=1.0)

Create a new instance.

Parameters:

update_rate (float) – how often, in seconds, the target values should be updated. Defaults to 1 second.

add_target(generator, callback)

Add a new target to be updated.

Parameters:
  • generator (Any) – the generator of values to be used as updates

  • callback (Callable) – the callback to be called with updates

Return type:

None

start()

Start the updater thread.

Return type:

None

stop()

Stop the updater thread.

Return type:

None

class MccsTile(*args, **kwargs)

An implementation of a Tile Tango device for MCCS.

ApplyCalibration(argin)

Load the calibration coefficients at the specified time delay.

Parameters:

argin (str) – switch time, in ISO formatted time

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("ApplyCalibration", "")
class ApplyCalibrationCommand(component_manager, logger=None)

Class for handling the ApplyCalibration(argin) command.

__init__(component_manager, logger=None)

Initialise a new ApplyCalibrationCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ApplyCalibration() command functionality.

Parameters:
  • args (Any) – switch time

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

ApplyPointingDelays(argin)

Apply the pointing delays at the specified time delay.

Parameters:

argin (str) – time delay (default = 0)

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("ApplyPointingDelays", "")
class ApplyPointingDelaysCommand(component_manager, logger=None)

Class for handling the ApplyPointingDelays(argin) command.

__init__(component_manager, logger=None)

Initialise a new ApplyPointingDelayommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ApplyPointingDelays() command functionality.

Parameters:
  • args (Any) – load time

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Configure(argin)

Configure the tile device attributes.

Parameters:

argin (str) – the configuration for the device in stringified json format

Return type:

None

Configure40GCore(argin)

Configure 40g core_id with specified parameters.

Parameters:

argin (str) –

json dictionary with only optional keywords:

  • core_id - (int) core id

  • arp_table_entry - (int) ARP table entry ID

  • source_mac - (int) mac address

  • source_ip - (string) IP dot notation.

  • source_port - (int) source port

  • destination_ip - (string) IP dot notation

  • destination_port - (int) destination port

  • netmask - (int) 40g (science data) subnet mask

  • gateway_ip - (int) IP address of 40g (science) subnet gateway

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"core_id":2, "arp_table_entry":0, "source_mac":0x62000a0a01c9,
            "source_ip":"10.0.99.3", "source_port":4000,
            "destination_ip":"10.0.99.3", "destination_port":5000}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("Configure40GCore", jstr)
class Configure40GCoreCommand(component_manager, logger=None)

Class for handling the Configure40GCore() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_Configure40gCore.json",
    "title": "MccsTile Configure40gCore schema",
    "description": "Schema for MccsTile's Configure40gCore command",
    "type": "object",
    "properties": {
        "core_id": {
            "description": "Core ID",
            "type": "integer"
        },
        "arp_table_entry": {
            "description": "ARP table entry",
            "type": "integer"
        },
        "source_mac": {
            "description": "Source MAC address",
            "type": "integer"
        },
        "source_ip": {
            "description": "Source IP address",
            "type": "string",
            "format": "ipv4"
        },
        "source_port": {
            "description": "Source port",
            "type": "integer",
            "minimum": 1
        },
        "destination_ip": {
            "description": "Destinate IP address",
            "type": "string",
            "format": "ipv4"
        },
        "destination_port": {
            "description": "Destination port",
            "type": "integer",
            "minimum": 1
        },
        "rx_port_filter": {
            "description": "Receive ports filter",
            "type": "integer",
            "minimum": 1
        },
        "netmask": {
            "description": "Integer netmask for the 40g (science data) subnet",
            "type": "integer",
            "minimum": 0,
            "maximum": 4294967296
        },
        "gateway_ip": {
            "description": "Integer IP address of the 40g (science data) subnet gateway",
            "minimum": 0,
            "maximum": 4294967296
        }
    },
    "additionalProperties": false
}
__init__(component_manager, logger=None)

Initialise a new Configure40GCoreCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.Configure40GCore() command functionality.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

ConfigureIntegratedBeamData(argin)

Configure the transmission of integrated beam data.

Using the provided integration time, the first channel and the last channel. The data are sent continuously until the StopIntegratedData command is run.

Parameters:

argin (str) – json dictionary with optional keywords:

  • integration_time - (float) in seconds (default = 0.5)

  • first_channel - (int) default 0

  • last_channel - (int) default 191

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"integration_time": 0.2, "first_channel":0, "last_channel": 191}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("ConfigureIntegratedBeamData", jstr)
class ConfigureIntegratedBeamDataCommand(component_manager, logger=None)

Class for handling the ConfigureIntegratedBeamData() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_ConfigureIntegratedBeamData.json",
    "title": "MccsTile ConfigureIntegratedBeamData schema",
    "description": "Schema for MccsTile's ConfigureIntegratedBeamData command",
    "type": "object",
    "properties": {
        "integration_time": {
            "description": "Integration time in seconds",
            "type": "number",
            "minimum": 0.0
        },
        "first_channel": {
            "description": "First channel",
            "type": "integer",
            "minimum": 0,
            "maximum": 511
        },
        "last_channel": {
            "description": "Last channel",
            "type": "integer",
            "minimum": 0,
            "maximum": 511
        }
    }
}
__init__(component_manager, logger=None)

Initialise a new ConfigureIntegratedBeamDataCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ConfigureIntegratedBeamData() commands.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

ConfigureIntegratedChannelData(argin)

Configure and start the transmission of integrated channel data.

Using the provided integration time, first channel and last channel. Data are sent continuously until the StopIntegratedData command is run.

Parameters:

argin (str) – json dictionary with optional keywords:

  • integration_time - (float) in seconds (default = 0.5)

  • first_channel - (int) default 0

  • last_channel - (int) default 511

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"integration_time": 0.2, "first_channel":0, "last_channel": 191}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("ConfigureIntegratedChannelData", jstr)
class ConfigureIntegratedChannelDataCommand(component_manager, logger=None)

Class for handling the ConfigureIntegratedChannelData(argin) command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_ConfigureIntegratedChannelData.json",
    "title": "MccsTile ConfigureIntegratedChannelData schema",
    "description": "Schema for MccsTile's ConfigureIntegratedChannelData command",
    "type": "object",
    "properties": {
        "integration_time": {
            "description": "Integration time in seconds",
            "type": "number",
            "minimum": 0.0
        },
        "first_channel": {
            "description": "First channel",
            "type": "integer",
            "minimum": 0,
            "maximum": 511
        },
        "last_channel": {
            "description": "Last channel",
            "type": "integer",
            "minimum": 0,
            "maximum": 511
        }
    }
}
__init__(component_manager, logger=None)

Initialise a new ConfigureIntegratedChannelDataCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ConfigureIntegratedChannelData() commands.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

ConfigureStationBeamformer(argin)

Initialise and start the station beamformer.

Initial configuration of the tile-station beamformer. Optionally set the observed region, Default is 6.25 MHz starting at 150 MHz, and set whether the tile is the first or last in the beamformer chain.

Parameters:

argin (str) –

json dictionary with mandatory keywords:

  • start_channel - (int) start channel of the observed region default = 192 (150 MHz)

  • n_channels - (int) is the number of channels in the observed region default = 8 (6.25 MHz)

  • is_first - (bool) whether the tile is the first one in the station default False

  • is_last - (bool) whether the tile is the last one in the station default False

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>> dp = tango.DeviceProxy(“mccs/tile/01”) >> dict = {“start_channel”:64, “n_channels”:10, “is_first”:True, >> “is_last:True} >> jstr = json.dumps(dict) >> dp.command_inout(“ConfigureStationBeamformer”, jstr)

class ConfigureStationBeamformerCommand(component_manager, logger=None)

Class for handling the ConfigureStationBeamformer() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_ConfigureStationBeamformer.json",
    "title": "MccsTile ConfigureStationBeamformer schema",
    "description": "Schema for MccsTile's ConfigureStationBeamformer command",
    "type": "object",
    "properties": {
        "start_channel": {
            "description": "Start channel",
            "type": "integer",
            "minimum": 2,
            "maximum": 504
        },
        "n_channels": {
            "description": "Number of channels",
            "type": "integer",
            "minimum": 0,
            "maximum": 511
        },
        "is_first": {
            "description": "Is first",
            "type": "boolean"
        },
        "is_last": {
            "description": "Is last",
            "type": "boolean"
        }
    }
}
__init__(component_manager, logger=None)

Initialise a new ConfigureStationBeamformerCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ConfigureStationBeamformer() commands.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if the argin argument does not have the right length / structure

ConfigureTestGenerator(argin)

Set the test signal generator.

Parameters:

argin (str) – json dictionary with keywords:

  • tone_frequency: first tone frequency, in Hz. The frequency

    is rounded to the resolution of the generator. If this is not specified, the tone generator is disabled.

  • tone_amplitude: peak tone amplitude, normalized to 31.875 ADC

    units. The amplitude is rounded to 1/8 ADC unit. Default is 1.0. A value of -1.0 keeps the previously set value.

  • tone_2_frequency: frequency for the second tone. Same

    as ToneFrequency.

  • tone_2_amplitude: peak tone amplitude for the second tone.

    Same as ToneAmplitude.

  • noise_amplitude: RMS amplitude of the pseudorandom Gaussian

    white noise, normalized to 26.03 ADC units.

  • pulse_frequency: frequency of the periodic pulse. A code

    in the range 0 to 7, corresponding to (16, 12, 8, 6, 4, 3, 2) times the ADC frame frequency.

  • pulse_amplitude: peak amplitude of the periodic pulse, normalized

    to 127 ADC units. Default is 1.0. A value of -1.0 keeps the previously set value.

  • set_time: time at which the generator is set, for synchronization

    among different TPMs. In UTC ISO format (string)

  • adc_channels: list of adc channels which will be substituted with

    the generated signal. It is a 32 integer, with each bit representing an input channel. Default: all if at least q source is specified, none otherwises.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"tone_frequency": 150e6, "tone_amplitude": 0.1,
        "noise_amplitude": 0.9, "pulse_frequency": 7,
        "set_time": "2022-08-09T12:34:56.7Z"}
>>> jstr = json.dumps(dict)
>>> values = dp.command_inout("ConfigureTestGenerator", jstr)
class ConfigureTestGeneratorCommand(component_manager, logger=None)

Class for handling the ConfigureTestGenerator() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_ConfigureTestGenerator.json",
    "title": "MccsTile ConfigureTestGenerator schema",
    "description": "Schema for MccsTile's ConfigureTestGenerator command",
    "type": "object",
    "properties": {
        "set_time": {
            "description": "Time to start the generator, in UTC ISO format",
            "type": "string"
        },
        "tone_frequency": {
            "description": "Tone 1 frequency in Hz of DDC 0",
            "type": "number"
        },
        "tone_amplitude": {
            "description": "Tone 1 peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU",
            "type": "number"
        },
        "tone_2_frequency": {
            "description": "Tone 2 frequency in Hz of DDC 1",
            "type": "number"
        },
        "tone_2_amplitude": {
            "description": "Tone 2 peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU",
            "type": "number"
        },
        "noise_amplitude": {
            "description": "Amplitude of pseudorandom noise normalized to 26.03 ADC units, resolution 0.102 ADU",
            "type": "number"
        },
        "pulse_frequency": {
            "description": "Code for pulse frequency. Range 0 to 7: 16,12,8,6,4,3,2 times frame frequency",
            "type": "integer",
            "minimum": 0,
            "maximum": 7
        },
        "pulse_amplitude": {
            "description": "pulse peak amplitude, normalized to 127.5 ADC units, resolution 0.5 ADU",
            "type": "number"
        },
        "adc_channels": {
            "description": "ADC channels",
            "type": "array",
            "items": {
                "type": "integer"
            }
        }
    }
}
__init__(component_manager, logger=None)

Initialise a new ConfigureTestGeneratorCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ConfigureTestGenerator() commands.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

DownloadFirmware(argin)

Download the firmware contained in bitfile to all FPGAs on the board.

This should also update the internal register mapping, such that registers become available for use.

Parameters:

argin (str) – can either be the design name returned from GetFirmwareAvailable() command, or a path to a file

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("DownloadFirmware", "/tmp/firmware/bitfile")
Get40GCoreConfiguration(argin)

Get 40g core configuration for core_id.

This is required to chain up TPMs to form a station.

Parameters:

argin (str) – json dictionary with optional keywords:

  • core_id - (int) core id

  • arp_table_entry - (int) ARP table entry ID to use

Return type:

str

Returns:

the configuration is a json string describilg a list (possibly empty) Each list entry comprising: core_id, arp_table_entry, source_mac, source_ip, source_port, destination_ip, destination_port, netmask, gateway_ip

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> core_id = 2
>>> arp_table_entry = 0
>>> argout = dp.command_inout(Get40GCoreConfiguration, core_id, arp_table_entry)
>>> params = json.loads(argout)
class Get40GCoreConfigurationCommand(component_manager, logger=None)

Class for handling the Get40GCoreConfiguration() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_Get40gCoreConfiguration.json",
    "title": "MccsTile Get40gCoreConfiguration schema",
    "description": "Schema for MccsTile's Get40gCoreConfiguration command",
    "type": "object",
    "properties": {
        "core_id": {
            "description": "Core ID",
            "type": "integer"
        },
        "arp_table_entry": {
            "description": "ARP table entry",
            "type": "integer"
        }
    }
}
__init__(component_manager, logger=None)

Initialise a new Get40GCoreConfigurationCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.Get40GCoreConfiguration() commands.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

str

Returns:

json string with configuration

Raises:

ValueError – if the argin is an invalid code id

GetArpTable()

Return a dictionary with populated ARP table for all used cores.

40G interfaces use cores 0 (fpga0) and 1(fpga1) and ARP ID 0 for beamformer, 1 for LMC. 10G interfaces use cores 0,1 (fpga0) and 4,5 (fpga1) for beamforming, and 2, 6 for LMC with only one ARP.

Return type:

str

Returns:

a JSON-encoded dictionary of coreId and populated arpID table

Example:

>>> argout = dp.command_inout("GetArpTable")
>>> dict = json.loads(argout)
>>>    {
>>>    "core_id0": [0, 1],
>>>    "core_id1": [0],
>>>    "core_id3": [],
>>>    }
class GetArpTableCommand(component_manager, logger=None)

Class for handling the GetArpTable() command.

__init__(component_manager, logger=None)

Initialise a new GetArpTableCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.GetArpTable() commands.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

str

Returns:

a JSON-encoded dictionary of coreId and populated arpID table

GetFirmwareAvailable()

Get available firmware.

Return a dictionary containing the following information for each firmware stored on the board (such as in Flash memory).

For each firmware, a dictionary containing the following keys with their respective values should be provided: ‘design’, which is a textual name for the firmware, ‘major’, which is the major version number, and ‘minor’.

Return type:

str

Returns:

a JSON-encoded dictionary of firmware details

Example:
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> jstr = dp.command_inout("GetFirmwareAvailable")
>>> dict = json.load(jstr)
{
"firmware1": {"design": "model1", "major": 2, "minor": 3},
"firmware2": {"design": "model2", "major": 3, "minor": 7},
"firmware3": {"design": "model3", "major": 2, "minor": 6},
}
class GetFirmwareAvailableCommand(component_manager, logger=None)

Class for handling the GetFirmwareAvailable() command.

__init__(component_manager, logger=None)

Initialise a new GetFirmwareAvailableCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.GetFirmwareAvailable() command functionality.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

str

Returns:

json encoded string containing list of dictionaries

GetRegisterList()

Return a list of descriptions of the exposed firmware (and CPLD) registers.

Return type:

list[str]

Returns:

a list of register names

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> reglist = dp.command_inout("GetRegisterList")
class GetRegisterListCommand(component_manager, logger=None)

Class for handling the GetRegisterList() command.

__init__(component_manager, logger=None)

Initialise a new GetRegisterListCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.GetRegisterList() command functionality.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

list[str]

Returns:

a list of firmware & cpld registers

class InitCommand(*args, **kwargs)

Class that implements device initialisation for the MCCS Tile device.

do(*args, **kwargs)

Initialise the attributes and properties of the MCCS Tile device.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Initialise()

Perform all required initialisation.

(switches on on-board devices, locks PLL, performs synchronisation and other operations required to start configuring the signal processing functions of the firmware, such as channelisation and beamforming)

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("Initialise")
LoadCalibrationCoefficients(argin)

Load the calibration coefficients, but does not apply them.

This is performed by apply_calibration. The calibration coefficients may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay.

Parameters:

argin (list[float]) – list comprises:

  • antenna - (int) is the antenna to which the coefficients will be applied.

  • calibration_coefficients - [array] a bidimensional complex array comprising

    calibration_coefficients[channel, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) being the normal, expected response for an ideal antenna.

    • channel - (int) channel is the index specifying the channels at the

      beamformer output, i.e. considering only those channels actually processed and beam assignments.

    • polarization index ranges from 0 to 3.

      • 0: X polarization direct element

      • 1: X->Y polarization cross element

      • 2: Y->X polarization cross element

      • 3: Y polarization direct element

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> antenna = 2
>>> complex_coefficients = [[complex(3.4, 1.2), complex(2.3, 4.1),
>>>            complex(4.6, 8.2), complex(6.8, 2.4)]]*5
>>> inp = list(itertools.chain.from_iterable(complex_coefficients))
>>> out = ([v.real, v.imag] for v in inp]
>>> coefficients = list(itertools.chain.from_iterable(out))
>>> coefficients.insert(0, float(antenna))
>>> input = list(itertools.chain.from_iterable(coefficients))
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("LoadCalibrationCoefficients", input)
class LoadCalibrationCoefficientsCommand(component_manager, logger=None)

Class for handling the LoadCalibrationCoefficients(argin) command.

__init__(component_manager, logger=None)

Initialise a new LoadCalibrationCoefficientsCommand instance.

Parameters:
do(argin, *args, **kwargs)

Implement MccsTile.LoadCalibrationCoefficients() commands.

Parameters:
  • argin (list[float]) – calibration coefficients

  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if the argin argument does not have the right length / structure

LoadPointingDelays(argin)

Specify the delay in seconds and the delay rate in seconds/second.

The delay_array specifies the delay and delay rate for each antenna. beam_index specifies which beam is desired (range 0-7)

Parameters:

argin (list[float]) – An array containing: beam index, the delay in seconds and the delay rate in seconds/second, for each antenna.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> # example delays: 16 values from -2 to +2 ns, rates = 0
>>> delays = [step * 0.25e-9 for step in list(range(-8, 8))]
>>> rates = [0.0]*16
>>> beam = 0.0
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> arg = [beam]
>>> for i in range(16):
>>>   arg.append(delays[i])
>>>   arg.append(rates[i])
>>> dp.command_inout("LoadPointingDelays", arg)
class LoadPointingDelaysCommand(component_manager, logger)

Class for handling the LoadPointingDelays(argin) command.

__init__(component_manager, logger)

Initialise a new LoadPointingDelaysCommand instance.

Parameters:
  • component_manager (TileComponentManager) – the device to which this command belongs.

  • logger (Logger) – the logger to be used by this Command. If not provided, then a default module logger will be used.

do(argin, *args, **kwargs)

Implement MccsTile.LoadPointingDelays() command functionality.

Parameters:
  • argin (list[float]) – an array containing a beam index and antenna delays

  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if the argin argument does not have the right length / structure

ReadAddress(argin)

Read n 32-bit values from address.

Parameters:

argin (list[int]) – [0] = address to read from [1] = number of values to read, default 1

Return type:

list[int]

Returns:

list of values

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> reglist = dp.command_inout("ReadAddress", [address, nvalues])
class ReadAddressCommand(component_manager, logger=None)

Class for handling the ReadAddress(argin) command.

__init__(component_manager, logger=None)

Initialise a new ReadAddressCommand instance.

Parameters:
do(argin, *args, **kwargs)

Implement MccsTile.ReadAddress() command functionality.

Parameters:
  • argin (list[int]) – sequence of length two, containing an address and a value

  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

[values, ]

Raises:

ValueError – if the argin argument has the wrong length or structure

ReadRegister(register_name)

Return the value(s) of the specified register.

Parameters:

register_name (str) – full hyerarchic register name

Return type:

list[int]

Returns:

a list of register values

Example:

>>> dp = tango.DeviceProxy("fpga1./tile/01")
>>> values = dp.command_inout("ReadRegister", "test-reg1")
class ReadRegisterCommand(component_manager, logger=None)

Class for handling the ReadRegister(argin) command.

__init__(component_manager, logger=None)

Initialise a new ReadRegisterCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.ReadRegister() command functionality.

Parameters:
  • args (Any) – the register name

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

list[int]

Returns:

list of register values

Raises:

ValueError – if the name is invalid

SendDataSamples(argin)

Transmit a snapshot containing raw antenna data.

Parameters:

argin (str) – json dictionary with optional keywords:

  • data_type - type of snapshot data (mandatory): “raw”, “channel”,

    “channel_continuous”, “narrowband”, “beam”

  • start_time - Time (UTC string) to start sending data. Default immediately

  • seconds - (float) Delay if timestamp is not specified. Default 0.2 seconds

Depending on the data type: raw:

  • sync: bool: send synchronised samples for all antennas, vs. round robin

    larger snapshot from each antenna

channel:

  • n_samples: Number of samples per channel, default 1024

  • first_channel - (int) first channel to send, default 0

  • last_channel - (int) last channel to send, default 511

channel_continuous

  • channel_id - (int) channel_id (Mandatory)

  • n_samples - (int) number of samples to send per packet, default 128

narrowband:

  • frequency - (int) Sky frequency for band centre, in Hz (Mandatory)

  • round_bits - (int) Specify whow many bits to round

  • n_samples - (int) number of spectra to send

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"data_type": "raw", "Sync":True, "Seconds": 0.2}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("SendDataSamples", jstr)
class SendDataSamplesCommand(component_manager, logger=None)

Class for handling the SendDataSamples() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_SendDataSamples.json",
    "title": "MccsTile SendDataSamples schema",
    "description": "Schema for MccsTile's SendDataSamples command",
    "type": "object",
    "properties": {
        "data_type": {
            "description": "type of snapshot data (mandatory): raw, channel, channel_continuous, narrowband, beam",
            "type": "string",
            "enum": ["raw", "channel", "channel_continuous", "narrowband", "beam"]
        }
    },
    "required": [ "data_type" ],
    "allOf": [
        {
            "if": {
                "properties": {
                    "data_type": {
                        "const": "channel"
                    }
                }
            },
            "then": {
                "properties": {
                    "first_channel": {
                        "description": "First channel to send",
                        "type": "integer",
                        "minimum": 0,
                        "maximum": 511
                    },
                    "last_channel": {
                        "description": "Last channel to send",
                        "type": "integer",
                        "minimum": 0,
                        "maximum": 511
                    }
                }
            }
        },
        {
            "if": {
                "properties": {
                    "data_type": {
                        "const": "channel_continuous"
                    }
                }
            },
            "then": {
                "properties": {
                    "channel_id": {
                        "description": "Channel ID",
                        "type": "integer",
                        "minimum": 1,
                        "maximum": 511
                    }
                },
                "required": [ "channel_id" ]
            }
        },
        {
            "if": {
                "properties": {
                    "data_type": {
                        "const": "narrowband"
                    }
                }
            },
            "then": {
                "properties": {
                    "frequency": {
                        "description": "Sky frequency of band centre, in Hz",
                        "type": "number",
                        "minimum": 1e6,
                        "maximum": 399e6
                    }
                },
                "required": [ "frequency" ]
            }
        }
    ]
}
__init__(component_manager, logger=None)

Initialise a new SendDataSamplesCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.SendDataSamples() command functionality.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if mandatory parameters are missing

SetBeamFormerRegions(argin)

Set the frequency regions which are going to be beamformed into each beam.

region_array is defined as a flattened 2D array, for a maximum of 48 regions. Total number of channels must be <= 384.

Parameters:

argin (list[int]) – list of regions. Each region comprises:

  • start_channel - (int) region starting channel, must be even in range 0 to 510

  • num_channels - (int) size of the region, must be a multiple of 8

  • beam_index - (int) beam used for this region with range 0 to 47

  • subarray_id - (int) Subarray

  • subarray_logical_channel - (int) logical channel # in the subarray

  • subarray_beam_id - (int) ID of the subarray beam

  • substation_id - (int) Substation

  • aperture_id: ID of the aperture (station*100+substation?)

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> regions = [[4, 24, 0, 0, 0, 3, 1, 101], [26, 40, 1, 0, 24, 4, 2, 102]]
>>> input = list(itertools.chain.from_iterable(regions))
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("SetBeamFormerRegions", input)
class SetBeamFormerRegionsCommand(component_manager, logger=None)

Class for handling the SetBeamFormerRegions(argin) command.

__init__(component_manager, logger=None)

Initialise a new SetBeamFormerRegionsCommand instance.

Parameters:
do(argin, *args, **kwargs)

Implement MccsTile.SetBeamFormerRegions() command functionality.

Parameters:
  • argin (list[int]) – a region array

  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if the argin argument does not have the right length / structure

SetLmcDownload(argin)

Specify whether control data will be transmitted over 1G or 40G networks.

Parameters:

argin (str) –

json dictionary with optional keywords:

  • mode - (string) ‘1G’ or ‘10G’ (Mandatory) (use ‘10G’ for 40G also)

  • payload_length - (int) SPEAD payload length for channel data

  • destination_ip - (string) Destination IP.

  • source_port - (int) Source port for integrated data streams

  • destination_port - (int) Destination port for integrated data streams

  • netmask_40g - (int) 40g (science data) subnet mask

  • gateway_40g - (int) IP address of 40g (science) subnet gateway

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>> dp = tango.DeviceProxy(“mccs/tile/01”) >> dict = {“mode”: “1G”, “payload_length”: 4, “destination_ip”: “10.0.1.23”} >> jstr = json.dumps(dict) >> dp.command_inout(“SetLmcDownload”, jstr)

class SetLmcDownloadCommand(component_manager, logger=None)

Class for handling the SetLmcDownload() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_SetLmcDownload.json",
    "title": "MccsTile SetLmcDownload schema",
    "description": "Schema for MccsTile's SetLmcDownload command",
    "type": "object",
    "properties": {
        "mode": {
            "description": "Mode: 1G or 10G",
            "type": "string",
            "enum": [
                "1g",
                "1G",
                "10g",
                "10G"
            ]
        },
        "payload_length": {
            "description": "SPEAD payload length for channel data",
            "type": "integer",
            "minimum": 0
        },
        "destination_ip": {
            "description": "Destination IP address",
            "type": "string",
            "format": "ipv4"
        },
        "source_port": {
            "description": "Source port for integrated data streams",
            "type": "integer",
            "minimum": 0
        },
        "destination_port": {
            "description": "Destination port for integrated data streams",
            "type": "integer",
            "minimum": 0
        },
        "netmask_40g": {
            "description": "Integer netmask for the 40g (science data) subnet",
            "type": "integer",
            "minimum": 0,
            "maximum": 4294967296
        },
        "gateway_40g": {
            "description": "Integer IP address of the 40g (science data) subnet gateway",
            "minimum": 0,
            "maximum": 4294967296
        }
    },
    "required": [
        "mode"
    ],
    "additionalProperties": false
}
__init__(component_manager, logger=None)

Initialise a new SetLmcDownloadCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.SetLmcDownload() command functionality.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

SetLmcIntegratedDownload(argin)

Configure link and size of control data.

Parameters:

argin (str) –

json dictionary with optional keywords:

  • mode - (string) ‘1G’ or ‘10G’ (Mandatory)

  • channel_payload_length - (int) SPEAD payload length for integrated

    channel data

  • beam_payload_length - (int) SPEAD payload length for integrated beam data

  • destination_ip - (string) Destination IP

  • source_port - (int) Source port for integrated data streams

  • destination_port - (int) Destination port for integrated data streams

  • netmask_40g - (int) 40g (science data) subnet mask

  • gateway_40g - (int) IP address of 40g (science) subnet gateway

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"mode": "1G", "channel_payload_length":4,
            "beam_payload_length": 1024, "destination_ip": "10.0.1.23"}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("SetLmcIntegratedDownload", jstr)
class SetLmcIntegratedDownloadCommand(component_manager, logger=None)

Class for handling the SetLmcIntegratedDownload() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_SetLmcIntegratedDownload.json",
    "title": "MccsTile SetLmcIntegratedDownload schema",
    "description": "Schema for MccsTile's SetLmcIntegratedDownload command",
    "type": "object",
    "properties": {
        "mode": {
            "description": "Mode: 1G or 10G",
            "type": "string",
            "enum": [
                "1g",
                "1G",
                "10g",
                "10G"
            ]
        },
        "channel_payload_length": {
            "description": "SPEAD payload length for integrated channel data",
            "type": "integer",
            "minimum": 0
        },
        "beam_payload_length": {
            "description": "SPEAD payload length for integrated beam data",
            "type": "integer",
            "minimum": 0
        },
        "destination_ip": {
            "description": "Destination IP address",
            "type": "string",
            "format": "ipv4"
        },
        "source_port": {
            "description": "Source port for integrated data streams",
            "type": "integer",
            "minimum": 0
        },
        "destination_port": {
            "description": "Destination port for integrated data streams",
            "type": "integer",
            "minimum": 0
        },
        "netmask_40g": {
            "description": "Integer netmask for the 40g (science data) subnet",
            "type": "integer",
            "minimum": 0,
            "maximum": 4294967296
        },
        "gateway_40g": {
            "description": "Integer IP address of the 40g (science data) subnet gateway",
            "minimum": 0,
            "maximum": 4294967296
        }
    },
    "required": [
        "mode"
    ],
    "additionalProperties": false
}
__init__(component_manager, logger=None)

Initialise a new SetLmcIntegratedDownloadCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.SetLmcIntegratedDownload() commands.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StartAcquisition(argin)

Start data acquisition.

Parameters:

argin (str) – json dictionary with optional keywords:

  • start_time - (ISO UTC time) start time

  • delay - (int) delay start if StartTime is not specified, default 0.2s

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"start_time":"2021-11-22, "delay":20}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("StartAcquisition", jstr)
class StartAcquisitionCommand(command_tracker, component_manager, callback=None, logger=None)

Class for handling the StartAcquisition() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_StartAcquisition.json",
    "title": "MccsTile StartAcquisition schema",
    "description": "Schema for MccsTile's StartAcquisition command",
    "type": "object",
    "properties": {
        "start_time": {
            "description": "",
            "type": "string",
            "format": "time"
        },
        "delay": {
            "description": "An acquisition start delay",
            "type": "integer"
        }
    }
}
__init__(command_tracker, component_manager, callback=None, logger=None)

Initialise a new instance.

Parameters:
  • command_tracker (CommandTracker) – the device’s command tracker

  • component_manager (TileComponentManager) – the device’s component manager

  • callback (Optional[Callable]) – an optional callback to be called when this command starts and finishes.

  • logger (Optional[Logger]) – a logger for this command to log with.

StartBeamformer(argin)

Start the beamformer at the specified time delay.

Parameters:

argin (str) – json dictionary with optional keywords:

  • start_time - (str, ISO UTC time) start time

  • duration - (int) if > 0 is a duration in CSP frames (2211.84 us)

    if == -1 run forever

  • subarray_beam_id - (int)Subarray beam ID of the channels to be started

    Command affects only beamformed channels for given subarray ID Default -1: all channels

  • scan_id - (int) The unique ID for the started scan. Default 0

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"StartTime":10, "Duration":20}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("StartBeamformer", jstr)
class StartBeamformerCommand(component_manager, logger=None)

Class for handling the StartBeamformer(argin) command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_StartBeamformer.json",
    "title": "MccsTile StartBeamformer schema",
    "description": "Schema for MccsTile's StartBeamformer command",
    "type": "object",
    "properties": {
        "start_time": {
            "description": "",
            "oneOf": [
                {"type": "string", "format": "time"},
                {"type": "null"}
            ]
        },
        "duration": {
            "description": "Duration in CSP frames (or -1 to run forever)",
            "oneOf": [
                {"type": "integer", "minimum": -1},
                {"type": "null"}
            ]
        },
        "subarray_beam_id": {
            "description": "Subarray beam ID of the changes to be started.",
            "type": "integer",
            "minimum": -1
        },
        "scan_id": {
            "description": "The unique ID for the started scan",
            "type": "integer",
            "minimum": 0
        }
    }
}
__init__(component_manager, logger=None)

Initialise a new StartBeamformerCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.StartBeamformer() command functionality.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StopBeamformer()

Stop the beamformer.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StopBeamformer")
class StopBeamformerCommand(component_manager, logger=None)

Class for handling the StopBeamformer() command.

__init__(component_manager, logger=None)

Initialise a new StopBeamformerCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.StopBeamformer() command functionality.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StopDataTransmission()

Stop data transmission from board.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("StopDataTransmission")
class StopDataTransmissionCommand(component_manager, logger=None)

Class for handling the StopDataTransmission() command.

__init__(component_manager, logger=None)

Initialise a new StopDataTransmissionCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.StopDataTransmission() command functionality.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

StopIntegratedData()

Stop the integrated data.

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

class StopIntegratedDataCommand(component_manager, logger=None)

Class for handling the StopIntegratedData command.

__init__(component_manager, logger=None)

Initialise a new StopIntegratedDataCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.StopIntegratedData() command functionality.

Parameters:
  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

WriteAddress(argin)

Write list of values at address.

Parameters:

argin (list[int]) – [0] = address to write to [1..n] = list of values to write

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> values = [.....]
>>> address = 0xfff
>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dp.command_inout("WriteAddress", [address, values])
class WriteAddressCommand(component_manager, logger=None)

Class for handling the WriteAddress(argin) command.

__init__(component_manager, logger=None)

Initialise a new WriteAddressCommand instance.

Parameters:
do(argin, *args, **kwargs)

Implement MccsTile.WriteAddress() command functionality.

Parameters:
  • argin (list[int]) – sequence of length two, containing an address and a value

  • args (Any) – unspecified positional arguments. This should be empty and is provided for type hinting only

  • kwargs (Any) – unspecified keyword arguments. This should be empty and is provided for type hinting only

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Raises:

ValueError – if the argin has the wrong length/structure

WriteRegister(argin)

Write values to the specified register.

Parameters:

argin (str) –

json dictionary with mandatory keywords:

  • register_name - (string) register fully qualified string representation

  • values - (list) is a list containing the 32-bit values to write

Return type:

tuple[list[ResultCode], list[str]]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

Example:

>>> dp = tango.DeviceProxy("mccs/tile/01")
>>> dict = {"register_name": "test-reg1", "values": values,
            "offset": 0}
>>> jstr = json.dumps(dict)
>>> dp.command_inout("WriteRegister", jstr)
class WriteRegisterCommand(component_manager, logger=None)

Class for handling the WriteRegister() command.

This command takes as input a JSON string that conforms to the following schema:

{
    "$schema": "https://json-schema.org/draft/2020-12/schema",
    "$id": "https://skao.int/MccsTile_WriteRegister.json",
    "title": "MccsTile WriteRegister schema",
    "description": "Schema for MccsTile's WriteRegister command",
    "type": "object",
    "properties": {
        "register_name": {
            "description": "Register fully qualified string representation",
            "type": "string"
        },
        "values": {
            "description": "List of 32-bit values to write",
            "type": "array"
        }
    },
    "required": ["register_name", "values"]
}
__init__(component_manager, logger=None)

Initialise a new WriteRegisterCommand instance.

Parameters:
do(*args, **kwargs)

Implement MccsTile.WriteRegister() command functionality.

Parameters:
  • args (Any) – Positional arguments. This should be empty and is provided for type hinting purposes only.

  • kwargs (Any) – keyword arguments unpacked from the JSON argument to the command.

Return type:

tuple[ResultCode, str]

Returns:

A tuple containing a return code and a string message indicating status. The message is for information purpose only.

__init__(*args, **kwargs)

Initialise this device object.

Parameters:
  • args (Any) – positional args to the init

  • kwargs (Any) – keyword args to the init

adcHealth()

Read the ADC Health State of the device.

This is an aggregated quantity representing if any of the ADC monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

ADC Health State of the device

adcPower()

Return the RMS power of every ADC signal.

(so a TPM processes 16 antennas, this should return 32 RMS value.

Return type:

list[float]

Returns:

RMP power of ADC signals

adcs()

Return the ADC status.

Return type:

str

Returns:

the ADC status

alarmHealth()

Read the alarm Health State of the device.

This is an aggregated quantity representing if any of the alarm monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

alarm Health State of the device

alarms()

Return the TPM’s alarm status.

Return type:

str

Returns:

the TPM’s alarm status

antennaIds(antenna_ids)

Set the antenna IDs.

Parameters:

antenna_ids (list[int]) – the antenna IDs

Return type:

None

beamformerTable()

Get beamformer region table.

Bidimensional array of one row for each 8 channels, with elements: 0. start physical channel 1. beam number 2. subarray ID 3. subarray_logical_channel 4. subarray_beam_id 5. substation_id 6. aperture_id

Each row is a set of 7 consecutive elements in the list.

Return type:

list[int]

Returns:

list of up to 7*48 values

boardTemperature()

Return the board temperature.

Return type:

tuple[Optional[float], float, AttrQuality]

Returns:

the board temperature

channeliserRounding(truncation)

Set channeliser rounding.

Parameters:

truncation (list[int]) – List with either a single value (applies to all channels) or a list of 512 values. Range 0 (no truncation) to 7

Return type:

None

clockPresent()

Report if 10 MHz clock signal is present at the TPM input.

Return type:

bool

Returns:

presence of 10 MHz clock signal

create_component_manager()

Create and return a component manager for this device.

Return type:

TileComponentManager

Returns:

a component manager for this device.

cspDestinationIp()

Return the cspDestinationIp attribute.

Return type:

str

Returns:

the IP address of the csp destination

cspDestinationMac()

Return the cspDestinationMac attribute.

Return type:

str

Returns:

the MAC address of the csp destination

cspDestinationPort()

Return the cspDestinationMac attribute.

Return type:

int

Returns:

the port of the csp destination

cspRounding(rounding)

Set CSP formatter rounding.

Parameters:

rounding (ndarray) – list of up to 384 values in the range 0-7. Current hardware supports only a single value, thus oly 1st value is used

Return type:

None

currentFrame()

Return current frame.

in units of 256 ADC frames (276,48 us) Currently this is required, not sure if it will remain so.

Return type:

int

Returns:

current frame

currentHealth()

Read the current Health State of the device.

This is an aggregated quantity representing if any of the current monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

current Health State of the device

currentTileBeamformerFrame()

Return current frame.

in units of 256 ADC frames (276,48 us) Currently this is required, not sure if it will remain so.

Return type:

int

Returns:

current frame

currents()

Return all the currents values available.

Return type:

str

Returns:

currents available

dev_state()

Calculate this device state.

The base device offers some automatic state discovery. However we have some attributes that require explicit analysis as to whether they are in ALARM or not,

e.g. DevBoolean

Return type:

DevState

Returns:

the ‘tango.DevState’ calculated

dsp()

Return the tile beamformer and station beamformer status.

Return type:

str

Returns:

the tile beamformer and station beamformer status

dspHealth()

Read the dsp Health State of the device.

This is an aggregated quantity representing if any of the dsp monitoring points do not have a permitted value. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

dsp Health State of the device

firmwareName(value)

Set the firmware name.

Parameters:

value (str) – firmware name

Return type:

None

firmwareVersion(value)

Set the firmware version.

Parameters:

value (str) – firmware version

Return type:

None

fortyGbDestinationIps()

Return the destination IPs for all 40Gb ports on the tile.

Return type:

list[str]

Returns:

IP addresses

fortyGbDestinationPorts()

Return the destination ports for all 40Gb ports on the tile.

Return type:

list[int]

Returns:

ports

fpga1Temperature()

Return the temperature of FPGA 1.

Return type:

tuple[Optional[float], float, AttrQuality]

Returns:

the temperature of FPGA 1

fpga2Temperature()

Return the temperature of FPGA 2.

Return type:

tuple[Optional[float], float, AttrQuality]

Returns:

the temperature of FPGA 2

fpgaFrameTime()

Return the FPGA synchronization timestamp.

Return type:

str

Returns:

the FPGA timestamp, in UTC format

fpgaReferenceTime()

Return the FPGA synchronization timestamp.

Return type:

str

Returns:

the FPGA timestamp, in UTC format

fpgaTime()

Return the FPGA internal time.

Return type:

str

Returns:

the FPGA time, in UTC format

fpgasUnixTime()

Return the time for FPGAs.

Return type:

list[int]

Returns:

the time for FPGAs

healthModelParams(argin)

Set the params for health transition rules.

Parameters:

argin (str) – JSON-string of dictionary of health states

Return type:

None

init_command_objects()

Set up the handler objects for Commands.

Return type:

None

init_device()

Initialise the device.

Return type:

None

io()

Return a dictionary of I/O interfaces status available.

Return type:

str

Returns:

I/O interfaces status

ioHealth()

Read the io Health State of the device.

This is an aggregated quantity representing if any of the io monitoring points do not have a permitted value. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

io Health State of the device

isBeamformerRunning()

Check if beamformer is running.

Return type:

bool

Returns:

whether the beamformer is running

isProgrammed()

Return a flag indicating whether of not the board is programmed.

Return type:

bool

Returns:

whether of not the board is programmed

is_ConfigureTestGenerator_allowed()

Check if command is allowed.

It is allowed only in maintenance mode.

Return type:

bool

Returns:

whether the command is allowed

is_Off_allowed()

Check if command On is allowed in the current device state.

Return type:

bool

Returns:

True if the command is allowed

is_On_allowed()

Check if command On is allowed in the current device state.

Return type:

bool

Returns:

True if the command is allowed

logicalTileId(value)

Set the logicalTileId attribute.

The logical tile id is the id of the tile in the station.

Parameters:

value (int) – the new logical tile id

Return type:

None

pendingDataRequests()

Check for pending data requests.

Return type:

bool

Returns:

whether there are data requests pending

phaseTerminalCount(value)

Set the phase terminal count.

Parameters:

value (int) – the phase terminal count

Return type:

None

pllLocked()

Report if ADC clock PLL is in locked state.

Return type:

bool

Returns:

PLL lock state

ppsDelay()

Return the delay between PPS and 10 MHz clock.

Return type:

int

Returns:

Return the PPS delay in nanoseconds

ppsDelayCorrection(pps_delay_correction)

Set a correction to make to the pps delay.

Note: will be applied during next initialisation.

Parameters:

pps_delay_correction (int) – a correction to apply to the pps_delay.

Return type:

None

ppsPresent()

Report if PPS signal is present at the TPM input.

Return type:

tuple[Optional[bool], float, AttrQuality]

Returns:

a tuple with attribute_value, time, quality

preaduLevels(levels)

Set attenuator level of preADU channels, one per input channel.

Parameters:

levels (ndarray) – ttenuator level of preADU channels, one per input channel, in dB

Return type:

None

simulationMode(value)

Set the simulation mode.

Writing this attribute is deliberately unimplemented. The simulation mode should instead be set by setting the device’s SimulationConfig property at launch.

Parameters:

value (SimulationMode) – The simulation mode, as a SimulationMode value

Return type:

None

staticTimeDelays(delays)

Set static time delay.

Parameters:

delays (list[float]) – Delay in samples (positive = increase the signal delay) to correct for static delay mismathces, e.g. cable length.

Return type:

None

stationId(value)

Set the id of the station to which this tile is assigned.

Parameters:

value (int) – the station id

Return type:

None

sysrefPresent()

Report if SYSREF signal is present at the FPGA.

Return type:

bool

Returns:

presence of SYSREF signal

temperatureHealth()

Read the temperature Health State of the device.

This is an aggregated quantity representing if any of the temperature monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

temperature Health State of the device

temperatures()

Return all the temperatures values available.

Return type:

str

Returns:

temperatures available

testGeneratorActive()

Report if the test generator is used for some channels.

Return type:

bool

Returns:

test generator status

testMode(value)

Set the test mode.

Writing this attribute is deliberately unimplemented. The test mode should instead be set by setting the device’s TestConfig property at launch.

Parameters:

value (int) – The test mode, as a TestMode value

Return type:

None

tileProgrammingState()

Get the tile programming state.

Return type:

str

Returns:

a string describing the programming state of the tile

timing()

Return a dictionary of the timing signals status.

Return type:

str

Returns:

timing signals status

timingHealth()

Read the timing Health State of the device.

This is an aggregated quantity representing if any of the timing monitoring points do not have a permitted value. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

timing Health State of the device

unpack_monitoring_point(health_structure, dictionary_path)

Unpack the monitoring point value from dictionary.

Parameters:
  • health_structure (dict[str, Any]) – A nested health_structure dictionary

  • dictionary_path (list[str]) – A list of strings used to traverse the dictionary.

Example:

>> tile_health = {‘timing’: { ‘pps’: {‘status’: False}}} >> pps=[‘timing’, ‘pps’, ‘status’] >> value = unpack_monitoring_point(tile_health, pps) >> print(value) -> False

Return type:

Optional[Any]

Returns:

the monitoring point value or None.

update_tile_health_attributes()

Update TANGO attributes from the tile health structure dictionary.

Return type:

None

voltageHealth()

Read the voltage Health State of the device.

This is an aggregated quantity representing if any of the voltage monitoring points are outside of their thresholds. This is used to compute the overall healthState of the tile.

Return type:

HealthState

Returns:

voltage Health State of the device

voltageMon()

Return the internal 5V supply of the TPM.

Return type:

float

Returns:

Internal supply of the TPM

voltages()

Return all the voltage values available.

Return type:

str

Returns:

voltages available

class MockTpm(logger)

Simulator for a pyfabil::Tpm class.

__init__(logger)

Initialise the MockTPM.

Parameters:

logger (Logger) – a logger for this simulator to use

find_register(string, display=False, info=False)

Find a item in a dictionary.

This is mocking the reading of a register for the purpose of testing TPM_driver

Parameters:
  • string (str) – Regular expression to search against

  • display (bool | None) – True to output result to console

  • info (bool | None) – for linter.

Return type:

List[RegisterInfo | None]

Returns:

registers found at address.

read_address(address, n=1)

Get address value.

Parameters:
  • address (int) – Memory address to read from

  • n (int) – Number of items to read

Return type:

Optional[Any]

Returns:

Values

read_register(register, n=1, offset=0)

Get register value.

Parameters:
  • register (int | str) – Memory register to read from

  • n (int) – Number of words to read

  • offset (int) – Memory address offset to read from

Return type:

Optional[Any]

Returns:

Values

property station_beamf: List[StationBeamformer]

Station beamf.

Returns:

the station_beamf.

property tpm_preadu: List[PreAdu]

Tpm pre adu.

Returns:

the preadu.

write_address(address, values, retry=True)

Write address value.

Parameters:
  • address (int) – Memory address to write

  • values (list[int]) – value to write

  • retry (bool) – retry (does nothing yet.)

Return type:

None

write_register(register, values, offset=0, retry=True)

Set register value.

Parameters:
  • register (int | str) – Register name

  • values (int) – Values to write

  • offset (int) – Memory address offset to write to

  • retry (bool) – retry

Return type:

None

:raises LibraryError:Attempting to set a register not in the memory address.

class TileComponentManager(simulation_mode, test_mode, logger, max_workers, tile_id, station_id, tpm_ip, tpm_cpld_port, tpm_version, subrack_fqdn, subrack_tpm_id, communication_state_changed_callback, component_state_changed_callback, _tpm_driver=None)

A component manager for a TPM (simulator or driver) and its power supply.

__init__(simulation_mode, test_mode, logger, max_workers, tile_id, station_id, tpm_ip, tpm_cpld_port, tpm_version, subrack_fqdn, subrack_tpm_id, communication_state_changed_callback, component_state_changed_callback, _tpm_driver=None)

Initialise a new instance.

Parameters:
  • simulation_mode (SimulationMode) – the simulation mode of this component manager. If SimulationMode.TRUE, then this component manager will launch an internal TPM simulator and interact with it; if SimulationMode.FALSE, this component manager will attempt to connect with an external TPM at the configured IP address and port.

  • test_mode (TestMode) – the test mode of this component manager. This has no effect when the device is in SimulationMode.FALSE. But when the simulation mode is SimulationMode.TRUE, then this determines some properties of the simulator: if the test mode is TestMode.TEST, then the simulator will return static “canned” values that are easy to assert against during testing; if TestMode.NONE, the simulator will return dynamically changing values for attributes such as temperatures and voltages, making for a nice demo but not so easy to test against.

  • logger (Logger) – a logger for this object to use

  • max_workers (int) – nos. of worker threads

  • tile_id (int) – the unique ID for the tile

  • station_id (int) – the unique ID for the station to which this tile belongs.

  • tpm_ip (str) – the IP address of the tile

  • tpm_cpld_port (int) – the port at which the tile is accessed for control

  • tpm_version (str) – TPM version: “tpm_v1_2” or “tpm_v1_6”

  • subrack_fqdn (str) – FQDN of the subrack that controls power to this tile

  • subrack_tpm_id (int) – This tile’s position in its subrack

  • communication_state_changed_callback (Callable[[CommunicationStatus], None]) – callback to be called when the status of the communications channel between the component manager and its component changes

  • component_state_changed_callback (Callable[..., None]) – callback to be called when the component state changes

  • _tpm_driver (Optional[TpmDriver]) – a optional TpmDriver to inject for testing.

apply_calibration(load_time='')

Load the calibration coefficients at the specified time delay.

Parameters:

load_time (str) – switch time as ISO formatted time

Raises:

ValueError – invalid time

Return type:

None

apply_pointing_delays(load_time='')

Load the pointing delays at the specified time delay.

Parameters:

load_time (str) – switch time as ISO formatted time

Raises:

ValueError – invalid time

Return type:

None

configure_test_generator(frequency0, amplitude0, frequency1, amplitude1, amplitude_noise, pulse_code, amplitude_pulse, load_time=None)

Test generator setting.

Parameters:
  • frequency0 (float) – Tone frequency in Hz of DDC 0

  • amplitude0 (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • frequency1 (float) – Tone frequency in Hz of DDC 1

  • amplitude1 (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • amplitude_noise (float) – Amplitude of pseudorandom noise normalized to 26.03 ADC units, resolution 0.102 ADU

  • pulse_code (int) – Code for pulse frequency. Range 0 to 7: 16,12,8,6,4,3,2 times frame frequency

  • amplitude_pulse (float) – pulse peak amplitude, normalized to 127.5 ADC units, resolution 0.5 ADU

  • load_time (Optional[str]) – Time to start the generator. in UTC ISO formatted string.

Raises:

ValueError – invalid time specified

Return type:

None

download_firmware(argin, task_callback=None)

Submit the download_firmware slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • argin (str) – can either be the design name returned from GetFirmwareAvailable command, or a path to a file

  • task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

A tuple containing a task status and a unique id string to identify the command

property fpga_frame_time: str

Return FPGA frame time in UTC format.

frame time is the timestamp for the current frame being processed. Value reported here refers to the ADC frames, but the total processing delay is < 1ms and thus irrelevant on the timescales of MCCS response time

Returns:

FPGA reference time

property fpga_reference_time: str

Return FPGA reference time in UTC format.

Reference time is set as part of start_observation. It represents the timestamp for the first frame

Returns:

FPGA reference time

property fpga_time: str

Return FPGA internal time in UTC format.

Returns:

FPGA internal time

property fpgas_unix_time: list[int]

Return FPGA internal Unix time.

Used to check proper synchronization :return: list of two Unix time integers

initialise(task_callback=None, program_fpga=True)

Submit the initialise slow task.

This method returns immediately after it is submitted for execution.

Parameters:
  • task_callback (Optional[Callable]) – Update task state, defaults to None

  • program_fpga (bool) – Force FPGA reprogramming, for complete initialisation

Return type:

tuple[TaskStatus, str]

Returns:

A tuple containing a task status and a unique id string to identify the command

off(task_callback=None)

Tell the upstream power supply proxy to turn the tpm off.

Parameters:

task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

a result code and a unique_id or message.

on(task_callback=None)

Tell the upstream power supply proxy to turn the tpm on.

Parameters:

task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

a result code and a unique_id or message.

post_synchronisation(task_callback=None)

Submit the post_synchronisation slow task.

This method returns immediately after it is submitted for execution.

Parameters:

task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

A tuple containing a task status and a unique id string to identify the command

send_data_samples(data_type='', start_time=None, seconds=0.2, n_samples=1024, sync=False, first_channel=0, last_channel=511, channel_id=128, frequency=100.0, round_bits=3, **params)

Front end for send_xxx_data methods.

Parameters:
  • data_type (str) – sample type. “raw”, “channel”, “channel_continuous”, “narrowband”, “beam”

  • start_time (Optional[str]) – UTC Time for start sending data. Default start now

  • seconds (float) – Delay if timestamp is not specified. Default 0.2 seconds

  • n_samples (int) – number of samples to send per packet

  • sync (bool) – (raw) send synchronised antenna samples, vs. round robin

  • first_channel (int) – (channel) first channel to send, default 0

  • last_channel (int) – (channel) last channel to send, default 511

  • channel_id (int) – (channel_continuous) channel to send

  • frequency (float) – (narrowband) Sky frequency for band centre, in Hz

  • round_bits (int) – (narrowband) how many bits to round

  • params (Any) – any additional keyword arguments

Raises:

ValueError – error in time specification

Return type:

None

set_power_state(power_state)

Set the power state of the tile.

If power state changed, re-evaluate the tile programming state and updates it inside the driver. This pushes a callback if it changed.

Parameters:

power_state (PowerState) – The desired power state

Return type:

None

standby(task_callback=None)

Tell the upstream power supply proxy to turn the tpm on.

Parameters:

task_callback (Optional[Callable]) – Update task state, defaults to None

Return type:

tuple[TaskStatus, str]

Returns:

a result code, or None if there was nothing to do.

start_acquisition(task_callback=None, *, start_time=None, delay=2)

Submit the start_acquisition slow task.

Parameters:
  • task_callback (Optional[Callable]) – Update task state, defaults to None

  • start_time (Optional[str]) – the acquisition start time

  • delay (int) – a delay to the acquisition start

Return type:

tuple[TaskStatus, str]

Returns:

A tuple containing a task status and a unique id string to identify the command

start_beamformer(start_time=None, duration=-1, subarray_beam_id=-1, scan_id=0)

Start beamforming on a specific subset of the beamformed channels.

Current firmware version does not support channel mask and scan ID, these are ignored

Parameters:
  • start_time (Optional[str]) – Start time as ISO formatted time

  • duration (int) – Scan duration, in frames, default “forever”

  • subarray_beam_id (int) – Subarray beam ID of the channels to be started Command affects only beamformed channels for given subarray ID Default -1: all channels

  • scan_id (int) – ID of the scan to be started. Default 0

Raises:

ValueError – invalid time specified

Return type:

None

start_communicating()

Establish communication with the tpm and the upstream power supply.

Return type:

None

stop_communicating()

Establish communication with the tpm and the upstream power supply.

Return type:

None

property tpm_status: TpmStatus

Return the TPM status.

Returns:

the TPM status

update_tpm_power_state(power_state)

Update the power state, calling callbacks as required.

If power state is ON, then the TPM is checked for initialisation, and initialised if not already so.

Parameters:

power_state (PowerState) – the new power state of the component. This can be None, in which case the internal value is updated but no callback is called. This is useful to ensure that the callback is called next time a real value is pushed.

Return type:

None

class TileData

This class contain data/facts about a tile needed by multiple classes.

For example the channelized sample and beamformer frame period, the number of antennas per tile. So rather than store this fact in separate places, we store it here.

classmethod generate_tile_defaults()

Compute the default values for tile monitoring points.

These are computed to be halfway between the minimum and maximum values. In cases where there is a permitted value instead, the permitted value is used

Return type:

dict[str, Any]

Returns:

the default values for tile monitoring points

classmethod get_tile_defaults()

Get the defaults for tile monitoring points.

If these have not been computed, compute them first.

Return type:

dict[str, Any]

Returns:

the defaults for tile monitoring points.

class TileHealthModel(health_changed_callback, thresholds=None)

A health model for a tile.

At present this uses the base health model; this is a placeholder for a future, better implementation.

__init__(health_changed_callback, thresholds=None)

Initialise a new instance.

Parameters:
  • health_changed_callback (HealthChangedCallbackProtocol) – callback to be called whenever there is a change to this this health model’s evaluated health state.

  • thresholds (Optional[dict[str, Any]]) – the threshold parameters for the health rules

evaluate_health()

Compute overall health of the station.

The overall health is based on the fault and communication status of the station overall, together with the health of the tiles that it manages.

This implementation simply sets the health of the station to the health of its least healthy component.

Return type:

HealthState

Returns:

an overall health of the station

property health_params: dict[str, Any]

Get the thresholds for health rules.

Returns:

the thresholds for health rules

class TileSimulator(logger)

This attempts to simulate pyaavs Tile.

This is used for testing the tpm_driver, it implements __getitem__, __setitem__ so that the TileSimulator can interface with the TPMSimulator in the same way as the AAVS Tile interfaces with the pyfabil TPM. Instead of writing to a register we write to a dictionary. It overwrite read_address, write_address, read_register, write_register for simplicity.

__init__(logger)

Initialise a new TPM simulator instance.

Parameters:

logger (Logger) – a logger for this simulator to use

beamformer_is_running()

Beamformer is running.

Return type:

bool

Returns:

is the beam is running

check_arp_table(timeout=30.0)

Check arp table.

Parameters:

timeout (float) – Timeout in seconds

Return type:

bool

Returns:

a bool representing if arp table is healthy.

check_global_status_alarms()

Check global status alarms.

Return type:

dict[str, int]

Returns:

a dictionary with the simulated alarm status.

check_pending_data_requests()
Return type:

bool

Returns:

the pending data requess flag.

compute_calibration_coefficients()

Compute calibration coefficients.

Return type:

None

configure_40g_core(core_id=0, arp_table_entry=0, src_mac=None, src_ip=None, src_port=None, dst_ip=None, dst_port=None, rx_port_filter=None, netmask=None, gateway_ip=None)

Configure the 40G code.

The dst_mac parameter is ignored in true 40G core (ARP resolution used instead)

Parameters:
  • core_id (int) – id of the core

  • arp_table_entry (int) – ARP table entry to use

  • src_mac (Optional[int]) – MAC address of the source

  • src_ip (Optional[str]) – IP address of the source

  • src_port (Optional[int]) – port of the source

  • dst_ip (Optional[str]) – IP address of the destination

  • dst_port (Optional[int]) – port of the destination

  • rx_port_filter (Optional[int]) – Filter for incoming packets

  • netmask (Optional[int]) – Netmask

  • gateway_ip (Optional[int]) – Gateway IP

Raises:

ValueError – when the core_id is not [0,1]

Return type:

None

configure_integrated_beam_data(integration_time=0.5, first_channel=0, last_channel=192)

Configure and start continuous integrated beam data.

Parameters:
  • integration_time (float) – integration time in seconds, defaults to 0.5

  • first_channel (int) – first channel

  • last_channel (int) – last channel

Return type:

None

configure_integrated_channel_data(integration_time=0.5, first_channel=0, last_channel=512)

Configure and start continuous integrated channel data.

TODO Implement generation of integrated packets :type integration_time: float :param integration_time: integration time in seconds, defaults to 0.5 :type first_channel: int :param first_channel: first channel :type last_channel: int :param last_channel: last channel

Return type:

None

connect(initialise=False, load_plugin=True, enable_ada=False, enable_adc=True, dsp_core=True, adc_mono_channel_14_bit=False, adc_mono_channel_sel=0)

Attempt to form a connection with TPM.

Parameters:
  • initialise (bool) – Initialises the TPM object

  • load_plugin (bool) – loads software plugins

  • enable_ada (bool) – Enable ADC amplifier (usually not present)

  • enable_adc (bool) – Enable ADC

  • dsp_core (bool) – Enable loading of DSP core plugins

  • adc_mono_channel_14_bit (bool) – Enable ADC mono channel 14bit mode

  • adc_mono_channel_sel (int) – Select channel in mono channel mode (0=A, 1=B)

Return type:

None

current_tile_beamformer_frame()
Return type:

int

Returns:

beamformer frame.

define_spead_header(station_id, subarray_id, nof_antennas, ref_epoch=-1, start_time=0)

Define the SPEAD header for the given parameters.

Parameters:
  • station_id (int) – The ID of the station.

  • subarray_id (int) – The ID of the subarray.

  • nof_antennas (int) – Number of antennas in the station

  • ref_epoch (int) – Unix time of epoch. -1 uses value defined in set_epoch

  • start_time (Optional[int]) – start time

Return type:

bool

Returns:

a bool representing if command executed without error.

erase_fpga(force=True)

Erase the fpga firmware.

Parameters:

force (bool) – force the erase.

Return type:

None

get_40g_core_configuration(core_id=-1, arp_table_entry=0)

Return a 40G configuration.

Parameters:
  • core_id (int) – id of the core for which a configuration is to be returned. Defaults to -1, in which case all core configurations are returned, defaults to -1

  • arp_table_entry (int) – ARP table entry to use

Return type:

UnionType[dict, list[dict], None]

Returns:

core configuration or list of core configurations or none

get_adc_rms(sync=False)

Get ADC power, immediate.

Parameters:

sync (Optional[bool]) – Synchronise RMS read

Return type:

list[float]

Returns:

the mock ADC rms values.

get_arp_table()

Get arp table.

Return type:

dict[int, list[int]]

Returns:

the app table

get_firmware_list()
Return type:

List[dict[str, Any]]

Returns:

firmware list.

get_fpga_time(device)
Parameters:

device (Device) – device.

Return type:

int

Returns:

the fpga_time.

Raises:

LibraryError – If invalid device specified.

get_fpga_timestamp(device=pyfabil.base.definitions.Device.FPGA_1)

Get timestamp from FPGA.

Parameters:

device (Device) – device.

Return type:

int

Returns:

the simulated timestamp.

Raises:

LibraryError – Invalid device specified

get_health_status(**kwargs)

Get the health state of the tile.

Parameters:

kwargs (Any) – Any kwargs to identify health group. see aavs-system.Tile

Return type:

dict[str, Any]

Returns:

mocked fetch of health.

get_phase_terminal_count()

Get PPS phase terminal count.

Return type:

int

Returns:

the simulated phase terminal count.

get_pps_delay(enable_correction=True)

Get the pps delay.

Parameters:

enable_correction (bool) – enable correction.

Return type:

int

Returns:

the pps delay.

get_preadu_levels()

Get preADU attenuation levels.

Return type:

list[float]

Returns:

Attenuation levels corresponding to each ADC channel, in dB.

get_station_id()

Get station ID.

Returns:

station ID programmed in HW

Return type:

int

get_tile_id()
Return type:

int

Returns:

the mocked tile_id.

initialise(station_id=0, tile_id=0, lmc_use_40g=False, lmc_dst_ip=None, lmc_dst_port=4660, lmc_integrated_use_40g=False, src_ip_fpga1=None, src_ip_fpga2=None, dst_ip_fpga1=None, dst_ip_fpga2=None, src_port=4661, dst_port=4660, dst_port_single_port_mode=4662, rx_port_single_port_mode=4662, netmask_40g=None, gateway_ip_40g=None, active_40g_ports_setting='port1-only', enable_adc=True, enable_ada=False, enable_test=False, use_internal_pps=False, pps_delay=0, time_delays=0, is_first_tile=False, is_last_tile=False, qsfp_detection='auto', adc_mono_channel_14_bit=False, adc_mono_channel_sel=0)

Initialise tile.

Parameters:
  • station_id (int) – station ID

  • tile_id (int) – Tile ID in the station

  • lmc_use_40g (bool) – if True use 40G interface to transmit LMC data, otherwise use 1G

  • lmc_dst_ip (Optional[str]) – destination IP address for LMC data packets

  • lmc_dst_port (int) – destination UDP port for LMC data packets

  • lmc_integrated_use_40g (bool) – if True use 40G interface to transmit LMC integrated data, otherwise use 1G

  • src_ip_fpga1 (Optional[str]) – source IP address for FPGA1 40G interface

  • src_ip_fpga2 (Optional[str]) – source IP address for FPGA2 40G interface

  • dst_ip_fpga1 (Optional[str]) – destination IP address for beamformed data from FPGA1 40G interface

  • dst_ip_fpga2 (Optional[str]) – destination IP address for beamformed data from FPGA2 40G interface

  • src_port (int) – source UDP port for beamformed data packets

  • dst_port (int) – destination UDP port for beamformed data packets

  • enable_ada (bool) – enable adc amplifier, Not present in most TPM versions

  • enable_adc (bool) – Enable ADC

  • active_40g_ports_setting (str) – placeholder docstring

  • dst_port_single_port_mode (int) – placeholder docstring

  • gateway_ip_40g (Optional[str]) – placeholder docstring

  • netmask_40g (Optional[str]) – placeholder docstring

  • rx_port_single_port_mode (int) – placeholder docstring

  • enable_test (bool) – setup internal test signal generator instead of ADC

  • use_internal_pps (bool) – use internal PPS generator synchronised across FPGAs

  • pps_delay (int) – PPS delay correction in 625ps units

  • time_delays (int) – time domain delays for 32 inputs

  • is_first_tile (bool) – True if this tile is the first tile in the beamformer chain

  • is_last_tile (bool) – True if this tile is the last tile in the beamformer chain

  • qsfp_detection (str) – “auto” detects QSFP cables automatically, “qsfp1”, force QSFP1 cable detected, QSFP2 cable not detected “qsfp2”, force QSFP1 cable not detected, QSFP2 cable detected “all”, force QSFP1 and QSFP2 cable detected “flyover_test”, force QSFP1 and QSFP2 cable detected and adjust polarity for board-to-board cable “none”, force no cable not detected

  • adc_mono_channel_14_bit (bool) – Enable ADC mono channel 14bit mode

  • adc_mono_channel_sel (int) – Select channel in mono channel mode (0=A, 1=B)

Return type:

None

initialise_beamformer(start_channel, nof_channels)

Mock set the beamformer parameters.

Parameters:
  • start_channel (int) – start_channel

  • nof_channels (int) – nof_channels

Raises:

ValueError – For out of range values.

Return type:

None

is_programmed()

Return whether the mock has been implemented.

Return type:

bool

Returns:

the mocked programmed state

load_antenna_tapering(beam, tapering_coefficients)

Load antenna tapering.

Parameters:
  • beam (int) – beam

  • tapering_coefficients (list[int]) – tapering coefficients

Return type:

None

load_beam_angle(angle_coefficients)

Load beam angle.

Parameters:

angle_coefficients (list[float]) – angle coefficients.

Return type:

None

load_calibration_coefficients(antenna, calibration_coefficients)

Load calibration coefficients.

calibration_coefficients is a bi-dimensional complex array of the form calibration_coefficients[channel, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) the normal, expected response for an ideal antenna. Channel is the index specifying the channels at the beamformer output, i.e. considering only those channels actually processed and beam assignments. The polarization index ranges from 0 to 3. 0: X polarization direct element 1: X->Y polarization cross element 2: Y->X polarization cross element 3: Y polarization direct element The calibration coefficients may include any rotation matrix (e.g. the parallitic angle), but do not include the geometric delay.

Parameters:
  • antenna (int) – Antenna number (0-15)

  • calibration_coefficients (list[complex]) – Calibration coefficient array

Return type:

None

load_pointing_delay(load_time=0, load_delay=64)

Load pointing delay.

Parameters:
  • load_time (int) – load time

  • load_delay (int) – delay in (in ADC frames/256) to apply when load_time == 0

Return type:

None

mock_off()

Fake a connection by constructing the TPM.

NOTE:

This method exists in the Simulator Only

Return type:

None

mock_on()

Fake a connection by constructing the TPM.

NOTE:

This method exists in the Simulator Only

Return type:

None

program_fpgas(bitfile)

Mock programmed state to True.

Parameters:

bitfile (str) – the name of the bitfile to download

Raises:

LibraryError – if bitfile is of type None.

Return type:

None

reset_eth_errors()

Reset Ethernet errors.

Return type:

None

send_beam_data(timeout=0, timestamp=0, seconds=0.2)

Send beam data.

Parameters:
  • timeout (int) – timeout

  • timestamp (int) – timestamp

  • seconds (float) – When to synchronise

Return type:

None

send_channelised_data(number_of_samples=1024, first_channel=0, last_channel=511, timestamp=None, seconds=0.2)

Send channelised data from the TPM.

Parameters:
  • number_of_samples (int) – Number of spectra to send

  • first_channel (int) – First channel to send

  • last_channel (int) – Last channel to send

  • timestamp (Optional[int]) – When to start transmission

  • seconds (float) – When to synchronise

Return type:

None

send_channelised_data_continuous(channel_id, number_of_samples=128, wait_seconds=0, timestamp=None, seconds=0.2)

Continuously send channelised data from a single channel.

Parameters:
  • channel_id (int) – Channel ID

  • number_of_samples (int) – Number of spectra to send

  • wait_seconds (int) – Wait time before sending data

  • timestamp (Optional[int]) – When to start

  • seconds (float) – When to synchronise

Return type:

None

send_channelised_data_narrowband(frequency, round_bits, number_of_samples=128, wait_seconds=0, timestamp=None, seconds=0.2)

Continuously send channelised data from a single channel.

Parameters:
  • frequency (int) – Sky frequency to transmit

  • round_bits (int) – Specify which bits to round

  • number_of_samples (int) – Number of spectra to send

  • wait_seconds (int) – Wait time before sending data

  • timestamp (Optional[int]) – When to start

  • seconds (float) – When to synchronise

Return type:

None

send_raw_data(sync=False, timestamp=None, seconds=0.2, fpga_id=None)

Send raw data.

Parameters:
Return type:

None

set_beamformer_regions(region_array)

Set beamformer region_array.

Parameters:

region_array (list[list[int]]) – region_array

Return type:

None

set_channeliser_truncation(trunc, signal=None)

Set the channeliser coefficients to modify the bandpass.

Parameters:
  • trunc (list[int]) – list with M values, one for each of the frequency channels. Same truncation is applied to the corresponding frequency channels in all inputs.

  • signal (Optional[int]) – Input signal, 0 to 31. If None, apply to all

Return type:

None

set_csp_rounding(rounding)

Set the final rounding in the CSP samples, one value per beamformer channel.

Parameters:

rounding (list[int]) – Number of bits rounded in final 8 bit requantization to CSP

Return type:

bool

Returns:

true is write a success.

set_first_last_tile(is_first, is_last)

Set first last tile in chain.

Parameters:
  • is_first (bool) – true if first

  • is_last (bool) – true if last

Return type:

bool

Returns:

a bool representing if command executed without error.

set_lmc_download(mode, payload_length=1024, dst_ip=None, src_port=61648, dst_port=4660, netmask_40g=None, gateway_ip_40g=None)

Specify where the control data will be transmitted.

With the simulator no traffic will leave the cluster. To transmit data from the pod hosting the simulator to the DAQ (data acquisition) receiver, a Kubernetes service is required. Therefore dst_ip is the name of the service to use rather than the IP.

Parameters:
  • mode (str) – “1G” or “10G”

  • payload_length (int) – SPEAD payload length for integrated channel data, defaults to 1024

  • dst_ip (Optional[str]) – destination service.

  • src_port (Optional[int]) – sourced port, defaults to 0xF0D0

  • dst_port (Optional[int]) – destination port, defaults to 4660

  • netmask_40g (Optional[str]) – the mask to apply

  • gateway_ip_40g (Optional[str]) – the gateway ip.

Return type:

None

set_lmc_integrated_download(mode, channel_payload_length, beam_payload_length, dst_ip=None, src_port=61648, dst_port=4660, netmask_40g=None, gateway_ip_40g=None)

Configure link and size of control data for integrated LMC packets.

Parameters:
  • mode (str) – ‘1G’ or ‘10G’

  • channel_payload_length (int) – SPEAD payload length for integrated channel data

  • beam_payload_length (int) – SPEAD payload length for integrated beam data

  • dst_ip (Optional[str]) – Destination IP

  • src_port (int) – Source port for integrated data streams

  • dst_port (int) – Destination port for integrated data streams

  • netmask_40g (Optional[str]) – the mask to apply to the 40g.

  • gateway_ip_40g (Optional[str]) – the gateway ip for the 40g.

Return type:

None

set_pointing_delay(delay_array, beam_index)

Set pointing delay.

Parameters:
  • delay_array (list[list[float]]) – delay array

  • beam_index (int) – beam index

Return type:

None

set_preadu_levels(levels)

Set preADU attenuation levels.

Parameters:

levels (list[float]) – Desired attenuation levels for each ADC channel, in dB.

Return type:

None

set_station_id(station_id, tile_id)

Set mock registers to some value.

Parameters:
  • tile_id (int) – tile_id

  • station_id (int) – station_id

Return type:

None

set_test_generator_pulse(freq_code, amplitude=0.0)

Set test generator pulse.

Parameters:
  • freq_code (int) – Code for pulse frequency. Range 0 to 7: 16,12,8,6,4,3,2 times frame frequency

  • amplitude (float) – Tone peak amplitude, normalized to 127.5 ADC units, resolution 0.5 ADU

Return type:

None

set_time_delays(delays)

Set coarse zenith delay for input ADC streams.

Parameters:

delays (list[float]) – the delay in input streams, specified in nanoseconds. A positive delay adds delay to the signal stream

Return type:

bool

Returns:

True if command executed to completion.

start_acquisition(start_time=None, delay=2, tpm_start_time=None)

Start data acquisition.

Parameters:
  • start_time (Optional[int]) – Time for starting (frames)

  • delay (int) – delay after start_time (frames)

  • tpm_start_time (Optional[int]) – TPM will act as if it is started at this time (seconds)

Return type:

None

start_beamformer(start_time=0, duration=-1, scan_id=0, mask=1099511627775)

Start beamformer.

Parameters:
  • start_time (int) – start time UTC

  • duration (int) – duration

  • scan_id (int) – ID of the scan, to be specified in the CSP SPEAD header

  • mask (int) – Bitmask of the channels to be started. Unsupported by FW

Return type:

bool

Returns:

true if the beamformer was started successfully.

stop_beamformer()

Stop beamformer.

Return type:

None

stop_data_transmission()

Stop data transmission.

Return type:

None

stop_integrated_data()

Stop integrated data.

Return type:

None

switch_calibration_bank(switch_time=0)

Switch calibration bank.

Parameters:

switch_time (int) – switch time

Return type:

None

test_generator_input_select(inputs)

Test generator input select.

Parameters:

inputs (int) – inputs

Return type:

None

test_generator_set_noise(amplitude_noise=0.0, load_time=0)

Set generator test noise.

Parameters:
  • amplitude_noise (float) – amplitude of noise

  • load_time (int) – load time

Return type:

None

test_generator_set_tone(generator, frequency=100000000.0, amplitude=0.0, phase=0.0, load_time=0)

Test generator tone setting.

Parameters:
  • generator (int) – generator select. 0 or 1

  • frequency (float) – Tone frequency in Hz

  • amplitude (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • phase (float) – Initial tone phase, in turns

  • load_time (int) – Time to start the tone.

Return type:

None

class TileTime(reference_time=0)

Library to convert from rfc3339 strings to internal Tile time and back.

Frame time is the time expressed as an offset in frames from a timestamp. Unix time is used for the timestamp. It is expressed in integrer seconds from 1970-01-01T01:00:00.000000Z. Time is otherwise expressed as a ISO-8601 (RFC3339) string, e.g. 2021-03-02T12:34.56.789000Z.

__init__(reference_time=0)

Set the internal reference time. To be used each time it is reset.

Parameters:

reference_time (int) – Unix timestamp of the (integer) reference time

format_time_from_frame(frame_count)

Format a time expressed as frame count into ISO-8601 (RFC3339) string.

e.g. 2021-03-02T12:34.56.789000Z. Returns the Unix zero time if the object is not yet initialised.

Parameters:

frame_count (int) – Frame number expressed as a multiple of 256 channelised samples

Return type:

str

Returns:

ISO-8601 formatted time

format_time_from_timestamp(timestamp)

Format a time expressed as a frame count into ISO-8601.

Format a time expressed as a frame count into a properly formatted ISO-8601 (RFC3339) string, e.g. 2021-03-02T12:34.56.789000Z.

Parameters:

timestamp (int) – Unix timestamp of the (integer) reference time

Returns:

ISO-8601 formatted time

Return type:

str

frame_from_utc_time(utc_time)

Return first frame after specified time.

Parameters:

utc_time (str) – Utc Time in standard rfc3339 format

Return type:

int

Returns:

frame number equal or after specified time. -1 if error

set_reference_time(reference_time)

Set the internal reference time. To be used each time it is reset.

Parameters:

reference_time (int) – Unix timestamp of the (integer) reference time

Return type:

None

timestamp_from_utc_time(utc_time)

Return first timestamp (Unix) second after specified time.

Does not account for leap seconds, utc_time must avoid them.

Parameters:

utc_time (str) – Utc Time in standard rfc3339 format

Return type:

int

Returns:

Unix timestamp equal or after specified time. -1 if error

class TpmDriver(logger, tile_id, station_id, tile, tpm_version, communication_state_changed_callback, component_state_changed_callback)

Hardware driver for a TPM.

__init__(logger, tile_id, station_id, tile, tpm_version, communication_state_changed_callback, component_state_changed_callback)

Initialise a new TPM driver instance passing in the Tile object.

Parameters:
  • logger (Logger) – a logger for this simulator to use

  • tile_id (int) – the unique ID for the tile

  • station_id (int) – the unique ID for the station to which this tile belongs.

  • tile (Tile) – the tile driven by this TpmDriver

  • tpm_version (str) – TPM version: “tpm_v1_2” or “tpm_v1_6”

  • communication_state_changed_callback (Callable[[CommunicationStatus], None]) – callback to be called when the status of the communications channel between the component manager and its component changes

  • component_state_changed_callback (Callable[..., None]) – callback to be called when the component state changes.

property adc_rms: list[float]

Return the last measured RMS power of the TPM’s analog-to-digital converter.

Returns:

the RMS power of the TPM’s ADC

property adcs: dict[str, Any]

Return the ADC status in the TPM.

Returns:

ADC status in the TPM

property alarms: dict[str, Any]

Return the alarms status in the TPM.

Returns:

alarms status in the TPM

apply_calibration(switch_time=0)

Switch the calibration bank.

(i.e. apply the calibration coefficients previously loaded by load_calibration_coefficients()).

Parameters:

switch_time (Optional[int]) – an optional time at which to perform the switch

Return type:

None

apply_pointing_delays(load_time)

Load the pointing delay at a specified time.

Parameters:

load_time (int) – time at which to load the pointing delay

Return type:

None

property arp_table: dict[int, list[int]]

Check that ARP table has been populated in for all used cores 40G interfaces.

Use cores 0 (fpga0) and 1(fpga1) and ARP ID 0 for beamformer, 1 for LMC 10G interfaces use cores 0,1 (fpga0) and 4,5 (fpga1) for beamforming, and 2, 6 for LMC with only one ARP.

Returns:

list of core id and arp table populated

property beamformer_table: list[list[int]]

Fetch internal beamformer table.

Fetch table used by the hardware beamformer to define beams and logical bands :return: bidimensional table, with 48 entries, one every 8 channels

  • start physical channel

  • tile hardware beam

  • subarray ID

  • subarray start logical channel

  • subarray_beam_id - (int) ID of the subarray beam

  • substation_id - (int) Substation

  • aperture_id: ID of the aperture (station*100+substation?)

property board_temperature: float

Return the temperature of the TPM.

Returns:

the temperature of the TPM

property channeliser_truncation: list[int]

Read the cached value for the channeliser truncation.

Returns:

cached value for the channeliser truncation

property clock_present: bool

Check if 10 MHz clock signal is present.

Returns:

True if 10 MHz clock is present. Checked in poll loop, cached

configure_40g_core(core_id=0, arp_table_entry=0, src_mac=None, src_ip=None, src_port=None, dst_ip=None, dst_port=None, rx_port_filter=None, netmask=None, gateway_ip=None)

Configure the 40G code.

Parameters:
  • core_id (int) – id of the core

  • arp_table_entry (int) – ARP table entry to use

  • src_mac (Optional[int]) – MAC address of the source

  • src_ip (Optional[str]) – IP address of the source

  • src_port (Optional[int]) – port of the source

  • dst_ip (Optional[str]) – IP address of the destination

  • dst_port (Optional[int]) – port of the destination

  • rx_port_filter (Optional[int]) – Filter for incoming packets

  • netmask (Optional[int]) – Netmask

  • gateway_ip (Optional[int]) – Gateway IP

Return type:

None

configure_integrated_beam_data(integration_time=0.5, first_channel=0, last_channel=191)

Configure and start the transmission of integrated channel data.

Configure with the provided integration time, first channel and last channel. Data are sent continuously until the StopIntegratedData command is run.

Parameters:
  • integration_time (float) – integration time in seconds, defaults to 0.5

  • first_channel (int) – first channel

  • last_channel (int) – last channel

Return type:

None

configure_integrated_channel_data(integration_time=0.5, first_channel=0, last_channel=511)

Configure and start the transmission of integrated channel data.

Configure with the provided integration time, first channel and last channel. Data are sent continuously until the StopIntegratedData command is run.

Parameters:
  • integration_time (float) – integration time in seconds, defaults to 0.5

  • first_channel (int) – first channel

  • last_channel (int) – last channel

Return type:

None

configure_test_generator(frequency0, amplitude0, frequency1, amplitude1, amplitude_noise, pulse_code, amplitude_pulse, load_time=0)

Test generator setting.

Parameters:
  • frequency0 (float) – Tone frequency in Hz of DDC 0

  • amplitude0 (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • frequency1 (float) – Tone frequency in Hz of DDC 1

  • amplitude1 (float) – Tone peak amplitude, normalized to 31.875 ADC units, resolution 0.125 ADU

  • amplitude_noise (float) – Amplitude of pseudorandom noise normalized to 26.03 ADC units, resolution 0.102 ADU

  • pulse_code (int) – Code for pulse frequency. Range 0 to 7: 16,12,8,6,4,3,2 times frame frequency

  • amplitude_pulse (float) – pulse peak amplitude, normalized to 127.5 ADC units, resolution 0.5 ADU

  • load_time (int) – Time to start the generator.

Return type:

None

property csp_rounding: numpy.ndarray | None

Read the cached value for the final rounding in the CSP samples.

Need to be specfied only for the last tile :return: Final rounding for the CSP samples. Up to 384 values

property current_tile_beamformer_frame: int

Return current tile beamformer frame, in units of 256 ADC frames.

Returns:

current tile beamformer frame

property currents: dict[str, Any]

Return a dictionary of all current values available in the TPM.

Returns:

currents in the TPM

download_firmware(bitfile)

Download the provided firmware bitfile onto the TPM.

Parameters:

bitfile (str) – a binary firmware blob

Return type:

None

property dsp: dict[str, Any]

Return the tile beamformer and station beamformer status in the TPM.

Returns:

tile beamformer and station beamformer status in the TPM

erase_fpga()

Erase FPGA programming to reduce FPGA power consumption.

Return type:

None

property firmware_available: list[dict[str, Any]]

Return the list of the firmware loaded in the system.

Returns:

the firmware list

property firmware_name: str

Return the name of the firmware that this TPM simulator is running.

Returns:

firmware name

property firmware_version: str

Return the name of the firmware that this TPM simulator is running.

Returns:

firmware version (major.minor)

property fpga1_temperature: float

Return the temperature of FPGA 1.

Returns:

the temperature of FPGA 1

property fpga2_temperature: float

Return the temperature of FPGA 2.

Returns:

the temperature of FPGA 2

property fpga_current_frame: int

Return the FPGA current frame counter.

Returns:

the FPGA_1 current frame counter

Raises:

ConnectionError – if communication with tile failed

property fpga_reference_time: int

Return the FPGA reference time.

Required to map the FPGA timestamps, expressed in frames to UTC time

Returns:

the FPGA_1 reference time, in Unix seconds

property fpgas_time: list[int]

Return the FPGAs clock time.

Useful for detecting clock skew, propagation delays, contamination delays, etc.

Returns:

the FPGAs clock time

Raises:

ConnectionError – if communication with tile failed

get_40g_configuration(core_id=-1, arp_table_entry=0)

Return a 40G configuration.

Parameters:
  • core_id (int) – id of the core for which a configuration is to be return. Defaults to -1, in which case all cores configurations are returned

  • arp_table_entry (int) – ARP table entry to use

Return type:

list[dict]

Returns:

core configuration or list of core configurations

get_tile_id()

Get the tile ID stored in the FPGA.

Return type:

int

Returns:

tile ID

Raises:

LibraryError

property hardware_version: str

Return whether this TPM is 1.2 or 1.6.

Returns:

TPM hardware version. 120 or 160

initialise()

Download firmware, if not already downloaded, and initialises tile.

Return type:

None

initialise_beamformer(start_channel, nof_channels, is_first, is_last)

Initialise the beamformer.

Parameters:
  • start_channel (int) – the start channel

  • nof_channels (int) – number of channels

  • is_first (bool) – whether this is the first (?)

  • is_last (bool) – whether this is the last (?)

Return type:

None

property io: dict[str, Any]

Return a dictionary of I/O Interfaces status available in the TPM.

Returns:

io in the TPM

property is_beamformer_running: bool

Whether the beamformer is currently running.

Returns:

whether the beamformer is currently running

property is_programmed: bool

Return whether this TPM is programmed (i.e. firmware has been downloaded to it).

Returns:

whether this TPM is programmed

load_calibration_coefficients(antenna, calibration_coefficients)

Load calibration coefficients.

These may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay.

Parameters:
  • antenna (int) – the antenna to which the coefficients apply

  • calibration_coefficients (list[complex]) – a bidirectional complex array of coefficients, flattened into a list

Return type:

None

load_pointing_delays(delay_array, beam_index)

Specify the delay in seconds and the delay rate in seconds/second.

The delay_array specifies the delay and delay rate for each antenna. beam_index specifies which beam is desired (range 0-7)

Parameters:
  • delay_array (list[list[float]]) – delay in seconds, and delay rate in seconds/second

  • beam_index (int) – the beam to which the pointing delay should be applied

Return type:

None

property pending_data_requests: bool

Check for pending data requests.

Returns:

whether there are pending send data requests

property phase_terminal_count: int

Return the phase terminal count.

Returns:

the phase terminal count

property pll_locked: bool

Check if ADC clock PLL is locked.

Returns:

True if PLL is locked. Checked in poll loop, cached

post_synchronisation()

Perform post tile configuration synchronization.

TODO Private method or must be available externally?

Return type:

None

property pps_delay: int | None

Return last measured delay between PPS and 10 MHz clock.

Returns:

PPS delay correction in nanoseconds. Rounded to 1.25 ns units

property pps_delay_correction: int | None

Return last measured ppsdelay correction.

Returns:

PPS delay correction in nanoseconds. Rounded to 1.25 ns units

property pps_present: bool

Check if PPS signal is present.

Returns:

True if PPS is present. Checked in poll loop, cached

property preadu_levels: list[float] | None

Get preadu levels in dB.

Returns:

cached values of Preadu attenuation level in dB

read_address(address, nvalues)

Return a list of values from a given address.

Parameters:
  • address (int) – address of start of read

  • nvalues (int) – number of values to read

Return type:

list[int]

Returns:

values at the address

read_register(register_name)

Read the values in a named register.

Parameters:

register_name (str) – name of the register

Return type:

list[int]

Returns:

values read from the register

property register_list: list[str]

Return a list of registers available on each device.

Returns:

list of registers

send_data_samples(data_type='', timestamp=0, seconds=0.2, n_samples=1024, sync=False, first_channel=0, last_channel=511, channel_id=128, frequency=150000000.0, round_bits=3)

Front end for send_xxx_data methods.

Parameters:
  • data_type (str) – sample type. “raw”, “channel”, “channel_continuous”, “narrowband”, “beam”

  • timestamp (int) – Timestamp for start sending data. Default 0 start now

  • seconds (float) – Delay if timestamp is not specified. Default 0.2 seconds

  • n_samples (int) – number of samples to send per packet

  • sync (bool) – (raw) send synchronised antenna samples, vs. round robin

  • first_channel (int) – (channel) first channel to send, default 0

  • last_channel (int) – (channel) last channel to send, default 511

  • channel_id (int) – (channel_continuous) channel to send

  • frequency (float) – (narrowband) Sky frequency for band centre, in Hz

  • round_bits (int) – (narrowband) how many bits to round

Raises:

ValueError – if values wrong

Return type:

None

set_beamformer_regions(regions)

Set the frequency regions to be beamformed into a single beam.

The input list contains up to 48 blocks which represent at most 16 contiguous channel regions. Each block has 8 entries which represent: - starting physical channel - number of channels - hardware beam number - subarray ID - subarray logical channel - subarray beam ID - substation ID

Parameters:

regions (list[list[int]]) – a list encoding up to 48 regions

Return type:

None

set_lmc_download(mode, payload_length=1024, dst_ip=None, src_port=61648, dst_port=4660, netmask_40g=None, gateway_40g=None)

Specify whether control data will be transmitted over 1G or 40G networks.

Parameters:
  • mode (str) – “1G” or “10G”

  • payload_length (int) – SPEAD payload length for integrated channel data, defaults to 1024

  • dst_ip (Optional[str]) – destination IP, defaults to None

  • src_port (Optional[int]) – sourced port, defaults to 0xF0D0

  • dst_port (Optional[int]) – destination port, defaults to 4660

  • netmask_40g (Optional[int]) – netmask of the 40g subnet

  • gateway_40g (Optional[int]) – IP address of the 40g subnet gateway, if it exists.

Return type:

None

set_lmc_integrated_download(mode, channel_payload_length, beam_payload_length, dst_ip=None, src_port=61648, dst_port=4660, netmask_40g=None, gateway_40g=None)

Configure link and size of control data.

Parameters:
  • mode (str) – ‘1G’ or ‘10G’

  • channel_payload_length (int) – SPEAD payload length for integrated channel data

  • beam_payload_length (int) – SPEAD payload length for integrated beam data

  • dst_ip (Optional[str]) – Destination IP, defaults to None

  • src_port (int) – source port, defaults to 0xF0D0

  • dst_port (int) – destination port, defaults to 4660

  • netmask_40g (Optional[int]) – netmask of the 40g subnet

  • gateway_40g (Optional[int]) – IP address of the 40g subnet gateway, if it exists.

Return type:

None

start_acquisition(start_time=None, delay=2)

Start data acquisition.

This must be run as a long running command

Parameters:
  • start_time (Optional[int]) – the time at which to start data acquisition, defaults to None

  • delay (Optional[int]) – delay start, defaults to 2

Return type:

bool

Returns:

if data acquisition started correctly

start_beamformer(start_time=0, duration=-1, subarray_beam_id=1, scan_id=0)

Start the beamformer at the specified time.

Parameters:
  • start_time (int) – time at which to start the beamformer, defaults to 0

  • duration (int) – duration for which to run the beamformer, defaults to -1 (run forever)

  • subarray_beam_id (int) – ID of the subarray beam to start. Default = -1, all

  • scan_id (int) – ID of the scan which is started.

Return type:

None

start_communicating()

Establish communication with the TPM.

Return type:

None

start_connection()

Try to connect tile to tpm.

Return type:

None

Returns:

None

property static_delays: list[float]

Read the cached value for the static delays, in sample.

Returns:

static delay, in samples one per TPM input

property station_id: int

Get the Station ID.

Returns:

assigned station Id value

stop_beamformer()

Stop the beamformer.

Return type:

None

stop_communicating()

Stop communicating with the TPM.

Todo:

is there a better way to do this? Should Tile16 have a disconnect() method that we can call here?

Return type:

None

stop_data_transmission()

Stop data transmission for send_channelised_data_continuous.

Return type:

None

stop_integrated_data()

Stop the integrated data.

Return type:

None

property sysref_present: bool

Check if SYSREF signal is present.

Returns:

True if SYSREF is present. Checked in poll loop, cached

property temperatures: dict[str, Any]

Return a dictionary of all temperature values available in the TPM.

Returns:

temperatures in the TPM

property test_generator_active: bool

Check if the test generator is active.

Returns:

whether the test generator is active

test_generator_input_select(inputs=0)

Specify ADC inputs which are substitute to test signal.

Specified using a 32 bit mask, with LSB for ADC input 0.

Parameters:

inputs (int) – Bit mask of inputs using test signal

Return type:

None

property tile_id: int

Get the Tile ID.

Returns:

assigned tile Id value

property timing: dict[str, Any]

Return a dictionary of the Timing Signals status available in the TPM.

Returns:

timings in the TPM

tpm_connected()

Tile connected to tpm.

Return type:

None

tpm_disconnected(intentional_disconnect=False)

Tile disconnected to tpm.

Parameters:

intentional_disconnect (bool) – True if disconnection was expected.

Return type:

None

property tpm_status: TpmStatus

Return the TPM status.

Returns:

the TPM status

property voltage_mon: float

Return the internal 5V supply of the TPM.

Returns:

the internal 5V supply of the TPM

property voltages: dict[str, Any]

Return a dictionary of all voltage values available in the TPM.

Returns:

voltages in the TPM

write_address(address, values)

Write a list of values to a given address.

Parameters:
  • address (int) – address of start of read

  • values (list[int]) – values to write

Return type:

None

write_register(register_name, values)

Read the values in a register.

Parameters:
  • register_name (str) – name of the register

  • values (list[Any] | int) – values to write

Return type:

None

class TpmStatus(value)

Enumerated type for tile status.

Used in initialisation to know what long running commands have been issued

INITIALISED = 5

Initialise command has been issued.

OFF = 1

The TPM is not powered.

PROGRAMMED = 4

The TPM is powered on and FPGAS are programmed.

SYNCHRONISED = 6

Time has been synchronised with UTC, timestamp is valid.

UNCONNECTED = 2

The TPM is not connected.

UNKNOWN = 0

The status is not known.

UNPROGRAMMED = 3

The TPM is powered on but FPGAS are not programmed.

pretty_name()

Return string representation.

Return type:

str

Returns:

String representation in camelcase