Consumers

Upon the reception, decoding and aggregation of payloads, a receiver passes data to a consumer. Consumers are a simple mechanism for decoupling data reception from any further data processing.

The realtime-receive-modules package currently comes with a number of built-in consumers, but arbitrary consumers can be used as well.

null_consumer

A consumer that drops all incoming payloads, useful for testing and benchmarking purposes.

accumulating_consumer

A consumer that accumulates all incoming data in memory, useful for testing purposes.

example_consumer

A slightly more complex consumer to be used as an example. It performs simple validations over incoming payloads.

mswriter

The mswriter consumer, as derived from its name, writes incoming payloads into a Measurement Set. If payloads are missing the resulting Measurement Set will still have the missing rows, but with invalid data.

When consumer = mswriter is specified the following config section can be used to configure its behaviour:

  • [reception]: Section label

  • outputfilename: str, defaults to recv-vis.ms - The output measurement set data name. Names typically contain the extension ‘.ms’.

  • max_payloads: str, defaults to None - When specified, the mswriter will always begin a new measurement set after the specified number of payloads, effectively controlling the maximum measurement set storage size.

  • command_template: str, defaults to None - When specified, measurement set writing when finished will trigger this command on a background thread. Special sequence ‘%s’ will be substituted with the newly written ms data location.

  • timestamp_output: bool, defaults to False - When enabled, output measurement set names are appended with timestamps in the format name.%Y-%m-%dT%H:%M:%SZ.ms

plasma_writer

The plasma_writer consumer puts the incoming payloads into a shared plasma store using the ska-sdp-dal. See Plasma Processors for more details.

When consumer = plasma_writer is specified the following config section can be used to configure its behaviour:

  • [reception]: Section label

  • plasma_path str, defaults to /tmp/plasma - the plasma store socket location

  • payloads_in_flight int, defaults to 10 - Configures the maximum number of plasma object references in the store to retain whilst receiving. The caller that processes the plasma payload must hold a references to plasma objects thereafter to keep the objects alive in the store.

Adding Custom Consumers

Third-party consumers are also supported, which users can provide within their own code bases. Consumers are implemented as classes with the following signatures:

  • They should ideally subclass realtime.receive.modules.consumers.consumer.Consumer.

  • They should specify their configuration class via a config_class member.

  • The consumer is created with three arguments, in this order: config, tm and uvw_engine. The config argument contains the full receiver configuration object, which will be of type config_class. tm is an instance of a derivation of realtime.receive.core.base_tm.BaseTM containing most metadata about the observation. uvw_engine is an instance of a derivation of realtime.receive.core.uvw_engine.UVWEngine and can be used to calculate UVWs for incoming payloads if necessary. The base realtime.receive.modules.consumers.consumer.Consumer class already has an __init__ method for initialization that receives these arguments. Subclasses implementing their own __init__ method should match this signature and forward all arguments to the parent class.

  • An async def consume(self, payload, payload_seq_no, scan) method for payload consumption. The payload argument is an instance of realtime.receive.core.icd.Payload, and the scan argument is an instance of realtime.receive.core.scan.Scan. payload_seq_no is the sequential integration number of this payload. Note that this is a coroutine, so potentially long-running tasks should be spawned off using executors to avoid hanging the event loop.

  • A async astop() coroutine for final resource cleanup.

To choose an arbitrary consumer, users need to pass the consumer’s class name via the consumers.Config.name configuration option.