.. _run_in_sdp: Running the pipeline in SDP =========================== The pointing offset calibration pipeline can be run either as a stand-alone pipeline (see :ref:`cli`) or as a :external+ska-sdp-script:doc:`scripts/pointing-offset` alongside the :external+ska-sdp-script:doc:`scripts/vis-receive` script in SDP. Some functionality is only used when the pipeline is run by the SDP processing script, and is not relevant for stand-alone use: - Pointing data sent to a Kafka queue - Connecting to the Configuration Database - Capturing errors These tasks are described below. Specifying data flow configuration ---------------------------------- While running in SDP, the pointing offset calibration pipeline will first retrieve the path of the input files (Measurement Sets files) and output files (HDF5 or Kafka queue) from data flows generated by the pointing-offset processing script. If the retrieval fails, the pipeline will get the path from the CLI arguments (i.e., ``--msdir`` and ``--results_dir``). SDP data flows are described in `ADR-81 `_. - ``/flow/pb-xyz:data-product:vis-receive-mswriter-processor``: This flow contains pointers to the visibility data product generated by vis-receive. The pointing offset calibration pipeline will retrieve the path of each Measurement Set file. - ``/flow/pb-xyz:data-queue:pointing-offset``: This flow contains pointers to the Kafka data queue that can be used to transfer the results calculated by pointing offset pipeline. - ``/flow/pb-xyz:data-product:pointing-offset``: This flow contains pointers the output directory where the pipeline needs to save its own pointing data products. Pointing data sent to a Kafka queue ----------------------------------- The pointing pipeline communicates its results with the rest of the telescope system via a Kafka queue. It publishes the following values as a single, structured numpy array encoded with `msgpack_numpy `_: .. code-block:: python DATA_STRUCTURE = numpy.dtype( [ ("antenna_name", "U8"), ("last_scan_index", "f8"), ("xel_offset", "f8"), ("xel_offset_std", "f8"), ("el_offset", "f8"), ("el_offset_std", "f8"), ("expected_width_h", "f8"), ("expected_width_v", "f8"), ("fitted_width_h", "f8"), ("fitted_width_h_std", "f8"), ("fitted_width_v", "f8"), ("fitted_width_v_std", "f8"), ("fitted_height", "f8"), ("fitted_height_std", "f8"), ] ) The :ref:`Data Product ` page describes what these values mean. ``xel`` stands for cross-elevation and ``el`` for elevation. ``v`` stands for vertical, ``h`` for horizontal co-polarisation, while ``std`` is the standard deviation of the given value. The default Kafka topic where the results are published to is ``pointing_offset``. This can be customized using the ``SDP_KAFKA_TOPIC`` environment variable. Processing pointing-type scans ------------------------------ The pointing offset calibration pipeline is set up to process "x" number of scans, as requested by the user. Normally this would be an observation consisting of a 5-point or 9-point scan pattern. When run in the SDP, the pipeline needs to make sure all of the scans have been finished before processing any of the data. In order to know what Measurement Sets (MS; data from one scan is contained in one MS) it needs to process, it needs to know the scan type and the related scan IDs. The pipeline is set up to wait for any scan whose type is of the form ``pointing-``, where ```` can be any alphanumeric string. Once the first such scan is executed, the processing of scans will be done as follows: - Scans of the exact same type will be processed together. - Only the requested number of scans are processed in a single batch; if more than the expected number of scans of the same type appear, the additional scans will be considered as part of a new batch. - If a scan is ``ABORTED``, it is not considered in the processing, and the next scan of the same type that is ``FINISHED`` will be considered as part of the batch. - If a different (non-pointing!) scan type is executed while still waiting for the right number of pointing scans, it is ignored, and the code keeps waiting until the expected scan type appears again. - If a batch of pointing-type scans is interrupted with a different pointing-type scan, the interrupted batch WILL NOT BE processed, instead the pipeline will move onto the next set. In short, a batch of pointing scans is processed as long as - They have the exact same pointing-type, - The expected number of them are ``FINISHED``, - The execution of scans has not been interrupted by a new pointing-type scan. The :external+ska-sdp-config:doc:`Configuration Database ` is the central information store of SDP. The SDP components connect to it and navigate, perform their tasks, based on the information stored there. The pointing offset pipeline takes scan information from the execution block state stored in the Configuration Database. The script can receive and process multiple batches of scans. Once the pointing script has been deployed, the processing block that executes the pipeline will be kept running, automatically identify the last processed scan ID, and process the batches as they are sent. If an exception occurs during processing, the error information will be stored in the processing block state for that batch of scans (see below). The entry point for the pipeline is the ``pointing_offset`` command, which runs the `cli`. In order to connect to the Configuration DB and perform the tasks described above, the ``SDP_PROCESSING_SCRIPT`` environment variable must be set to ``True``. This run-mode requires an execution block ID and an integer telling the pipeline how many pointing scans we expect to happen before processing. Capturing errors ---------------- Sometimes the pipeline will output an error. The current design of the pipeline captures if a batch of pointing scans are unable to deliver a valid array of offsets, and stores the erroneous scan IDs as well as the error messages. The error messages will then be stored in the state of the data flow (including :external:class:`~ska_sdp_config.entity.flow.DataQueue` and :external:class:`~ska_sdp_config.entity.flow.DataProduct`), which allows them to be further transferred to the subarray device. It is possible to use the :external+ska-sdp-config:doc:`cli` to retrieve the processing block state and review the error message(s) using:: ska-sdp get /pb/{PB_ID}/state