Running the pipeline in SDP

The pointing offset calibration pipeline can be run either as a stand-alone pipeline (see Pointing offset CLI module) or as a Pointing Offset Script alongside the Visibility Receive Script script in SDP.

Some functionality is only used when the pipeline is run by the SDP processing script, and is not relevant for stand-alone use:

  • Pointing data sent to a Kafka queue

  • Connecting to the Configuration Database

  • Capturing errors

These tasks are described below.

Specifying data flow configuration

While running in SDP, the pointing offset calibration pipeline will first retrieve the path of the input files (Measurement Sets files) and output files (HDF5 or Kafka queue) from data flows generated by the pointing-offset processing script. If the retrieval fails, the pipeline will get the path from the CLI arguments (i.e., --msdir and --results_dir). SDP data flows are described in ADR-81.

  • /flow/pb-xyz:data-product:vis-receive-mswriter-processor: This flow contains pointers to the visibility data product generated by vis-receive. The pointing offset calibration pipeline will retrieve the path of each Measurement Set file.

  • /flow/pb-xyz:data-queue:pointing-offset: This flow contains pointers to the Kafka data queue that can be used to transfer the results calculated by pointing offset pipeline.

  • /flow/pb-xyz:data-product:pointing-offset: This flow contains pointers the output directory where the pipeline needs to save its own pointing data products.

Pointing data sent to a Kafka queue

The pointing pipeline communicates its results with the rest of the telescope system via a Kafka queue. It publishes the following values as a single, structured numpy array encoded with msgpack_numpy:

DATA_STRUCTURE = numpy.dtype(
    [
        ("antenna_name", "U8"),
        ("last_scan_index", "f8"),
        ("xel_offset", "f8"),
        ("xel_offset_std", "f8"),
        ("el_offset", "f8"),
        ("el_offset_std", "f8"),
        ("expected_width_h", "f8"),
        ("expected_width_v", "f8"),
        ("fitted_width_h", "f8"),
        ("fitted_width_h_std", "f8"),
        ("fitted_width_v", "f8"),
        ("fitted_width_v_std", "f8"),
        ("fitted_height", "f8"),
        ("fitted_height_std", "f8"),
    ]
)

The Data Product page describes what these values mean. xel stands for cross-elevation and el for elevation. v stands for vertical, h for horizontal co-polarisation, while std is the standard deviation of the given value.

The default Kafka topic where the results are published to is pointing_offset. This can be customized using the SDP_KAFKA_TOPIC environment variable.

Processing pointing-type scans

The pointing offset calibration pipeline is set up to process “x” number of scans, as requested by the user. Normally this would be an observation consisting of a 5-point or 9-point scan pattern. When run in the SDP, the pipeline needs to make sure all of the scans have been finished before processing any of the data. In order to know what Measurement Sets (MS; data from one scan is contained in one MS) it needs to process, it needs to know the scan type and the related scan IDs.

The pipeline is set up to wait for any scan whose type is of the form pointing-<suffix>, where <suffix> can be any alphanumeric string. Once the first such scan is executed, the processing of scans will be done as follows:

  • Scans of the exact same type will be processed together.

  • Only the requested number of scans are processed in a single batch; if more than the expected number of scans of the same type appear, the additional scans will be considered as part of a new batch.

  • If a scan is ABORTED, it is not considered in the processing, and the next scan of the same type that is FINISHED will be considered as part of the batch.

  • If a different (non-pointing!) scan type is executed while still waiting for the right number of pointing scans, it is ignored, and the code keeps waiting until the expected scan type appears again.

  • If a batch of pointing-type scans is interrupted with a different pointing-type scan, the interrupted batch WILL NOT BE processed, instead the pipeline will move onto the next set.

In short, a batch of pointing scans is processed as long as

  • They have the exact same pointing-type,

  • The expected number of them are FINISHED,

  • The execution of scans has not been interrupted by a new pointing-type scan.

The Configuration Database is the central information store of SDP. The SDP components connect to it and navigate, perform their tasks, based on the information stored there. The pointing offset pipeline takes scan information from the execution block state stored in the Configuration Database.

The script can receive and process multiple batches of scans. Once the pointing script has been deployed, the processing block that executes the pipeline will be kept running, automatically identify the last processed scan ID, and process the batches as they are sent.

If an exception occurs during processing, the error information will be stored in the processing block state for that batch of scans (see below).

The entry point for the pipeline is the pointing_offset command, which runs the cli. In order to connect to the Configuration DB and perform the tasks described above, the SDP_PROCESSING_SCRIPT environment variable must be set to True. This run-mode requires an execution block ID and an integer telling the pipeline how many pointing scans we expect to happen before processing.

Capturing errors

Sometimes the pipeline will output an error. The current design of the pipeline captures if a batch of pointing scans are unable to deliver a valid array of offsets, and stores the erroneous scan IDs as well as the error messages.

The error messages will then be stored in the state of the data flow (including DataQueue and DataProduct), which allows them to be further transferred to the subarray device.

It is possible to use the SDP command-line interface to retrieve the processing block state and review the error message(s) using:

ska-sdp get /pb/{PB_ID}/state