Consumers
Upon the reception, decoding and aggregation of payloads, a receiver passes data to a consumer. Consumers are a simple mechanism for decoupling data reception from any further data processing.
The realtime-receive-modules package
currently comes with a number of built-in consumers,
but arbitrary consumers can be used as well.
null_consumer
A consumer that drops all incoming payloads, useful for testing and benchmarking purposes.
accumulating_consumer
A consumer that accumulates all incoming data in memory, useful for testing purposes.
example_consumer
A slightly more complex consumer to be used as an example. It performs simple validations over incoming payloads.
mswriter
The mswriter consumer, as derived from its name,
writes incoming payloads into a Measurement Set.
If payloads are missing
the resulting Measurement Set will still have the missing rows,
but with invalid data.
When consumer = mswriter is specified the following config section
can be used to configure its behaviour:
[reception]: Section labeloutputfilename: str, defaults to recv-vis.ms - The output measurement set data name. Names typically contain the extension ‘.ms’.max_payloads: str, defaults to None - When specified, the mswriter will always begin a new measurement set after the specified number of payloads, effectively controlling the maximum measurement set storage size.command_template: str, defaults to None - When specified, measurement set writing when finished will trigger this command on a background thread. Special sequence ‘%s’ will be substituted with the newly written ms data location.timestamp_output: bool, defaults to False - When enabled, output measurement set names are appended with timestamps in the formatname.%Y-%m-%dT%H:%M:%SZ.ms
plasma_writer
The plasma_writer consumer puts the incoming payloads
into a shared plasma store
using the ska-sdp-dal.
See Plasma Processors for more details.
When consumer = plasma_writer is specified the following config section
can be used to configure its behaviour:
[reception]: Section labelplasma_pathstr, defaults to/tmp/plasma- the plasma store socket locationpayloads_in_flightint, defaults to 10 - Configures the maximum number of plasma object references in the store to retain whilst receiving. The caller that processes the plasma payload must hold a references to plasma objects thereafter to keep the objects alive in the store.
Adding Custom Consumers
Third-party consumers are also supported, which users can provide within their own code bases. Consumers are implemented as classes with the following signatures:
They should ideally subclass
realtime.receive.modules.consumers.consumer.Consumer.They should specify their configuration class via a
config_classmember.The consumer is created with three arguments, in this order:
config,tmanduvw_engine. Theconfigargument contains the full receiver configuration object, which will be of typeconfig_class.tmis an instance of a derivation ofrealtime.receive.core.base_tm.BaseTMcontaining most metadata about the observation.uvw_engineis an instance of a derivation ofrealtime.receive.core.uvw_engine.UVWEngineand can be used to calculate UVWs for incoming payloads if necessary. The baserealtime.receive.modules.consumers.consumer.Consumerclass already has an__init__method for initialization that receives these arguments. Subclasses implementing their own__init__method should match this signature and forward all arguments to the parent class.An
async def consume(self, payload, payload_seq_no, scan)method for payload consumption. Thepayloadargument is an instance ofrealtime.receive.core.icd.Payload, and thescanargument is an instance ofrealtime.receive.core.scan.Scan.payload_seq_nois the sequential integration number of this payload. Note that this is a coroutine, so potentially long-running tasks should be spawned off using executors to avoid hanging the event loop.A
async astop()coroutine for final resource cleanup.
To choose an arbitrary consumer,
users need to pass the consumer’s class name
via the consumers.Config.name
configuration option.