API documentation

CLI interface

Unknown arguments will be passed to your processor create() function

plasma_processor [-h] [-v] [-s PLASMA_SOCKET] [-r READINESS_FILE] [--max-scans MAX_SCANS]
                 [--input INPUT] [--input-channel-range INPUT_CHANNEL_RANGE]
                 [--use-sdp-metadata USE_SDP_METADATA]
                 user_processor_class

plasma_processor positional arguments

plasma_processor options

  • -h, --help - show this help message and exit

  • -v, --verbose - If set, more verbose output will be produced

  • -s PLASMA_SOCKET, --plasma_socket PLASMA_SOCKET - The socket where Plasma is listening for connections (default: /tmp/plasma)

  • -r READINESS_FILE, --readiness-file READINESS_FILE - An empty file that will be created after the processor has finished setting up, signalling it’s ready to receive data (default: None)

  • --max-scans MAX_SCANS - The number of scans to process data for before automatically exiting. (default: None)

  • --input INPUT - Emulate data reception using a measurement set file (default: None)

  • --input-channel-range INPUT_CHANNEL_RANGE - Map the channel data in the measurement set to the specified channel ids using start:count[:stride] notation. (default: None)

  • --use-sdp-metadata USE_SDP_METADATA - Use SDP metadata support, ignored when used with –input (default: True)

Processor Interface

class realtime.receive.processors.sdp.base_processor.BaseProcessor

Base class for all Processors

Subclasses should override the create, process, timeout and close methods as appropriate.

abstract async close()

Stop any pending actions in this processor.

abstract static create(argv: Iterable[str]) BaseProcessor

Creates an instance of this class from the given command line parameters. This allows user-provided classes to have their own command line parsing logic, and receive arbitrary user-provided parameters.

Parameters:

argv – A list of command line parameters.

Returns:

A new instance of this class.

async end_scan(scan_id: int) None

Called when a scan has ended. The default implementation ignores this event, but subclasses might want to react to this.

Parameters:

scan_id – the ID of the scan that has ended.

abstract async process(dataset: Visibility) None

Processes the given visibilities dataset.

Parameters:

dataset – A dataset read from Plasma.

set_plasma_socket(plasma_socket: str)

Sets the path to the socket through which communication with Plasma takes place. The processor should not usually need to communicate with Plasma, but it can if necessary.

Parameters:

plasma_socket – The plasma socket used by this processor.

async start_scan(scan_id: int) None

Called when a new scan has started. The default implementation ignores this event, but subclasses might want to react to this.

Parameters:

scan_id – the ID of the scan that has started.

property storage: Storage

Gets the Storage object associated to this BaseProcessor.

Storage

Storage organisation classes

class realtime.receive.processors.storage.File(sdp_file: File | None = None, path: str | None = None)

A representation of a file to be written by a processor.

Users shouldn’t create this directly, instead they should use the Storage.declare_new_file method.

Parameters:
  • metadata_file – An SDP metadata File class pointing to the path of the file.

  • path – The path to the file when not using the SPD MetaData class.

property local_path: str

The path to this file on the local filesystem

property sdp_path: str | None

The path to this file as a globally accessible SDP path. If not running in the context of SDP, returns None.

update_status(status)

Updates the status of this file to status.

Parameters:

status – The new status of the file.

class realtime.receive.processors.storage.Storage(use_sdp_metadata: bool = True)

A class managing storage for processors.

This class should be used by processors to declare a new file that should be written to disk. allows processors to use the MetaData machinery or not based on a runtime toggle.

Parameters:

use_sdp_metadata – Whether to use the SDP MetaData machinery or not.

declare_new_file(path: str, description: str)

Declares that a new file will be created.

Parameters:
  • path – The path where the file will be created.

  • description – A description of the new file.

Returns:

A File object representing the new file.

In-built Processors

class realtime.receive.processors.sdp.mswriter_processor.MSWriterProcessor(output_ms_path: str | Path, use_plasmastman: bool = False, timestamp_output: bool = False, pointing_source: PointingKafkaSource | None = None, event_handlers: list[MsEventHandler] | None = None)

A Processor that writes incoming payloads into a Measurement Set

Others

class realtime.receive.processors.file_executor.FileExecutor

Executes jobs in the background for files that get scheduled to this instance.

class realtime.receive.processors.file_executor.FunctionFileExecutor(fun: Callable[[str], None])

A file executor that schedules the execution of a function on the output filename in a background thread.

Functions are executed on a single background thread, one after another.

class realtime.receive.processors.file_executor.CommandFileExecutor(command_template: List[str])

A file executor that schedules the execution of a bash command on the output filename in a background thread.

Commands are executed on a single background thread, one after another.