Consumers
Upon the reception, decoding and aggregation of payloads, a receiver passes data to a consumer. Consumers are a simple mechanism for decoupling data reception from any further data processing.
The realtime-receive-modules
package
currently comes with a number of built-in consumers,
but arbitrary consumers can be used as well.
null_consumer
A consumer that drops all incoming payloads, useful for testing and benchmarking purposes.
accumulating_consumer
A consumer that accumulates all incoming data in memory, useful for testing purposes.
example_consumer
A slightly more complex consumer to be used as an example. It performs simple validations over incoming payloads.
mswriter
The mswriter
consumer, as derived from its name,
writes incoming payloads into a Measurement Set.
If payloads are missing
the resulting Measurement Set will still have the missing rows,
but with invalid data.
When consumer = mswriter
is specified the following config section
can be used to configure its behaviour:
[reception]
: Section labeloutputfilename
: str, defaults to recv-vis.ms - The output measurement set data name. Names typically contain the extension ‘.ms’.max_payloads
: str, defaults to None - When specified, the mswriter will always begin a new measurement set after the specified number of payloads, effectively controlling the maximum measurement set storage size.command_template
: str, defaults to None - When specified, measurement set writing when finished will trigger this command on a background thread. Special sequence ‘%s’ will be substituted with the newly written ms data location.timestamp_output
: bool, defaults to False - When enabled, output measurement set names are appended with timestamps in the formatname.%Y-%m-%dT%H:%M:%SZ.ms
plasma_writer
The plasma_writer
consumer puts the incoming payloads
into a shared plasma store
using the ska-sdp-dal.
See Plasma Processors for more details.
When consumer = plasma_writer
is specified the following config section
can be used to configure its behaviour:
[reception]
: Section labelplasma_path
str, defaults to/tmp/plasma
- the plasma store socket locationpayloads_in_flight
int, defaults to 10 - Configures the maximum number of plasma object references in the store to retain whilst receiving. The caller that processes the plasma payload must hold a references to plasma objects thereafter to keep the objects alive in the store.
Adding Custom Consumers
Third-party consumers are also supported, which users can provide within their own code bases. Consumers are implemented as classes with the following signatures:
They should ideally subclass
realtime.receive.modules.consumers.consumer.Consumer
.They should specify their configuration class via a
config_class
member.The consumer is created with three arguments, in this order:
config
,tm
anduvw_engine
. Theconfig
argument contains the full receiver configuration object, which will be of typeconfig_class
.tm
is an instance of a derivation ofrealtime.receive.core.base_tm.BaseTM
containing most metadata about the observation.uvw_engine
is an instance of a derivation ofrealtime.receive.core.uvw_engine.UVWEngine
and can be used to calculate UVWs for incoming payloads if necessary. The baserealtime.receive.modules.consumers.consumer.Consumer
class already has an__init__
method for initialization that receives these arguments. Subclasses implementing their own__init__
method should match this signature and forward all arguments to the parent class.An
async def consume(self, payload, payload_seq_no, scan)
method for payload consumption. Thepayload
argument is an instance ofrealtime.receive.core.icd.Payload
, and thescan
argument is an instance ofrealtime.receive.core.scan.Scan
.payload_seq_no
is the sequential integration number of this payload. Note that this is a coroutine, so potentially long-running tasks should be spawned off using executors to avoid hanging the event loop.A
async astop()
coroutine for final resource cleanup.
To choose an arbitrary consumer,
users need to pass the consumer’s class name
via the consumers.Config.name
configuration option.