API documentation
CLI interface
Unknown arguments will be passed to your processor create() function
plasma_processor [-h] [-v] [-s PLASMA_SOCKET] [-r READINESS_FILE] [--max-scans MAX_SCANS]
[--input INPUT] [--input-channel-range INPUT_CHANNEL_RANGE]
[--use-sdp-metadata USE_SDP_METADATA]
user_processor_class
plasma_processor positional arguments
user_processor_class
- The class implementing the Processor (default:None
)
plasma_processor options
-v
,--verbose
- If set, more verbose output will be produced-s
PLASMA_SOCKET
,--plasma_socket
PLASMA_SOCKET
- The socket where Plasma is listening for connections (default:/tmp/plasma
)-r
READINESS_FILE
,--readiness-file
READINESS_FILE
- An empty file that will be created after the processor has finished setting up, signalling it’s ready to receive data (default:None
)--max-scans
MAX_SCANS
- The number of scans to process data for before automatically exiting. (default:None
)--input
INPUT
- Emulate data reception using a measurement set file (default:None
)--input-channel-range
INPUT_CHANNEL_RANGE
- Map the channel data in the measurement set to the specified channel ids using start:count[:stride] notation. (default:None
)--use-sdp-metadata
USE_SDP_METADATA
- Use SDP metadata support, ignored when used with –input (default:True
)
Processor Interface
- class realtime.receive.processors.sdp.base_processor.BaseProcessor
Base class for all Processors
Subclasses should override the create, process, timeout and close methods as appropriate.
- abstract async close()
Stop any pending actions in this processor.
- abstract static create(argv: Iterable[str]) BaseProcessor
Creates an instance of this class from the given command line parameters. This allows user-provided classes to have their own command line parsing logic, and receive arbitrary user-provided parameters.
- Parameters:
argv – A list of command line parameters.
- Returns:
A new instance of this class.
- async end_scan(scan_id: int) None
Called when a scan has ended. The default implementation ignores this event, but subclasses might want to react to this.
- Parameters:
scan_id – the ID of the scan that has ended.
- abstract async process(dataset: Visibility) None
Processes the given visibilities dataset.
- Parameters:
dataset – A dataset read from Plasma.
- set_plasma_socket(plasma_socket: str)
Sets the path to the socket through which communication with Plasma takes place. The processor should not usually need to communicate with Plasma, but it can if necessary.
- Parameters:
plasma_socket – The plasma socket used by this processor.
- async start_scan(scan_id: int) None
Called when a new scan has started. The default implementation ignores this event, but subclasses might want to react to this.
- Parameters:
scan_id – the ID of the scan that has started.
Storage
Storage organisation classes
- class realtime.receive.processors.storage.File(sdp_file: File | None = None, path: str | None = None)
A representation of a file to be written by a processor.
Users shouldn’t create this directly, instead they should use the Storage.declare_new_file method.
- Parameters:
metadata_file – An SDP metadata File class pointing to the path of the file.
path – The path to the file when not using the SPD MetaData class.
- property local_path: str
The path to this file on the local filesystem
- property sdp_path: str | None
The path to this file as a globally accessible SDP path. If not running in the context of SDP, returns None.
- update_status(status)
Updates the status of this file to status.
- Parameters:
status – The new status of the file.
- class realtime.receive.processors.storage.Storage(use_sdp_metadata: bool = True)
A class managing storage for processors.
This class should be used by processors to declare a new file that should be written to disk. allows processors to use the MetaData machinery or not based on a runtime toggle.
- Parameters:
use_sdp_metadata – Whether to use the SDP MetaData machinery or not.
- declare_new_file(path: str, description: str)
Declares that a new file will be created.
- Parameters:
path – The path where the file will be created.
description – A description of the new file.
- Returns:
A File object representing the new file.
In-built Processors
- class realtime.receive.processors.sdp.mswriter_processor.MSWriterProcessor(output_ms_path: str | Path, use_plasmastman: bool = False, timestamp_output: bool = False, pointing_source: PointingKafkaSource | None = None, event_handlers: list[MsEventHandler] | None = None)
A Processor that writes incoming payloads into a Measurement Set
Others
- class realtime.receive.processors.file_executor.FileExecutor
Executes jobs in the background for files that get scheduled to this instance.
- class realtime.receive.processors.file_executor.FunctionFileExecutor(fun: Callable[[str], None])
A file executor that schedules the execution of a function on the output filename in a background thread.
Functions are executed on a single background thread, one after another.
- class realtime.receive.processors.file_executor.CommandFileExecutor(command_template: List[str])
A file executor that schedules the execution of a bash command on the output filename in a background thread.
Commands are executed on a single background thread, one after another.