Test Mock Data Script
The test-mock-data
script is designed to provide SDP data products
in the form of data files on storage or data on Tango attributes of the
QueueConnector device, without the need to execute vis-receive or any of
the pipelines generating the data. The detailed design of the script can
be found on
this Confluence page.
It executes the different scenarios by deploying various execution engines that carry out the required processing.
It implements the following scenarios (list to grow as development progresses):
"pointing"
: Write pointing-offset results to HDF and/or the relevant Queue Connector tango attributes"measurement-set"
: Copy user-provided MS file(s) to output directory
A full description of the processing block parameters of this script can be found in the Processing block parameters section.
Pointing offset
There are three available options that can be chosen for this scenario:
"write-hdf"
: HDF files that follow a pointing data template are written to disk at the standard output directory. Corresponding metadata files are also added."send-to-kafka"
: Pointing offset results are sent to Kafka and the QueueConnector device is configured to read these data and display them on its dish-specific pointing attributes."both"
: Runs bothwrite-hdf
andsend-to-kafka
options.
The output pointing HDF files follow the data structure of the data product of the pointing offset calibration pipeline.
An internal CSV file and HDF template are used to obtain the pointing offsets. The CSV file allows for a set of fifty different sources. The pointing engine listens for any finished scans, and takes pointing offsets from the CSV file every time a completed 5-point observation has been observed. If more observations are completed than contained in the CSV file, it will loop back to start at the beginning again.
Note that the configuration information for each dish in the HDF output is identical. This is because the template HDF file contains only one dish, and the coordinates etc. are copied for all other dishes - i.e. all dishes are placed at the same physical coordinates.
Based on the selected option, the script also configures data flow objects to be created
in the SDP configuration database, where a DataProduct
flow is used for HDF files and
DataQueue
flow used to send the data via Kafka. The queue connector is also
configured using a TangoAttributeMap
flow object so that it picks up the offsets that were sent to
Kafka.
Write Measurement Sets data
Measurement Set (MS) files for each scan are written to disk at the standard output directory as defined by ADR-55. Input MS file paths must be provided on the shared PVC for each scan and these are copied to the output. A corresponding metadata file is also generated.
The measurement set engine listens for any new scan, and when the scan starts, copies the next MS to the standard output directory. The output MS is renamed to match the observed scan id.
Testing
The script uses the following environment variables.
Name |
Description |
Default |
---|---|---|
|
Host address of the Configuration DB |
|
|
Port of the Configuration DB |
|
|
Kafka server (host) |
|
|
PVC name |
|
|
K8s namespace used for data product flow |
|
|
Timeout used when waiting for scans |
60 s |
Deploy SDP and make sure the iTango console pod is also running.
After entering the iTango pod, obtain a handle to a subarray device and turn it on:
d = DeviceProxy('test-sdp/subarray/01')
d.On()
If you are not sure what devices are available, list them with
lsdev
.
The customised processing block parameters that can be included in the configuration string can be found in the Processing block parameters section and summarised in the following table.
Name |
Description |
Default |
---|---|---|
|
Name of scenario to run |
|
|
List containing path(s) of MS to copy |
|
|
Kafka topic name |
|
|
Flag to choose optional behaviour for the pointing scenario |
|
scenario
needs to be set to pointing
or measurement-set
for the different scenarios
to be run. If no scenario is specified, the script sets up the EB and PB correctly but does not deploy
any execution engine.
input_data
is a list where each element is the path to the
Measurement Set to be written for the measurement-set
scenario.
These paths must be located on a PVC accessible to the test-mock-data engine.
Note that the path should start at the root of the storage -
i.e. it should not include the mount point. The input data can
contain any number of Measurement Sets.
pointing_option
sets the options for the pointing
scenario. It must be set to
write-hdf
, send-to-kafka
or both
to determine the
operations performed by the scenario.
An example for the measurement set scenario with 5 scans is:
"parameters": {
"scenario": "measurement-set",
"input_data": ["product/eb-orcatest-20240814-94773/ska-sdp/pb-orcatestvr-20240814-94773/output.scan-1.ms",
"product/eb-orcatest-20240814-94773/ska-sdp/pb-orcatestvr-20240814-94773/output.scan-2.ms",
"product/eb-orcatest-20240814-94773/ska-sdp/pb-orcatestvr-20240814-94773/output.scan-3.ms",
"product/eb-orcatest-20240814-94773/ska-sdp/pb-orcatestvr-20240814-94773/output.scan-4.ms",
"product/eb-orcatest-20240814-94773/ska-sdp/pb-orcatestvr-20240814-94773/output.scan-5.ms"]
}
Start the execution block with the AssignResources
command:
d.AssignResources(config)
config
is a full AssignResources configuration string for SDP.
See telescope model example.
The dishes that will appear in the output data for the pointing
scenario are set from
the receptors
config parameter in execution_block.resources
section of the
configuration string.
The script will request the start of the engine pod called
mock-data
. This pod watches for the scans that are commanded on the
subarray. The measurement-set
scenario writes the relevant
measurement set files when each scan is started, and the pointing
scenario waits for the end of the 5th scan in the pointing observation
before writing files.
If the offsets were sent to the Queue Connector, you can then access the data in itango3 by running the following code (replace the dish ID as required):
q = DeviceProxy("test-sdp/queueconnector/01")
q.pointing_offset_SKA001
To remove the deployment, you need to release all the resources and
return the subarray to an EMPTY
state (in itango):
d.end()
d.releaseAllResources()
Processing block parameters
- pydantic settings generate_mock_data.test_mock_data_params.TestMockDataParams
test-mock-data script parameters
Show JSON schema
{ "title": "test-mock-data", "description": "test-mock-data script parameters", "type": "object", "properties": { "kafka_topic": { "default": "pointing_offset", "description": "Kafka topic name. If not supplied, the topic will be set to 'pointing_offset'.", "title": "Kafka topic", "type": "string" }, "input_data": { "anyOf": [ { "items": {}, "type": "array" }, { "type": "null" } ], "default": null, "description": "List of Measurement Sets to use in the measurement-set scenario - e.g. ['path/scan-1.ms', 'path/scan-2.ms'].", "title": "List of Measurement Sets" }, "scenario": { "default": null, "description": "The name of the mock data scenario. Allowed: 'pointing' for sending pointing data to Kafka and/or writing pointing HDF files, 'measurement-set' for copying MS to the output directory.", "enum": [ "pointing", "measurement-set", null ], "title": "Scenario name" }, "pointing_option": { "default": "both", "description": "Specifies the output of the 'pointing' scenario. Allowed: 'send-to-kafka' for sending pointing data to kafka, 'write-hdf' for writing pointing HDF files, 'both' for sending pointing data to kafka and writing pointing HDF files.", "enum": [ "write-hdf", "send-to-kafka", "both" ], "title": "Flag to specify options for pointing scenario", "type": "string" } }, "additionalProperties": false }
- Config:
strict: bool = True
extra: str = forbid
arbitrary_types_allowed: bool = False
validate_assignment: bool = True
title: str = test-mock-data
- Fields:
- field input_data: list | None = None
List of Measurement Sets to use in the measurement-set scenario - e.g. [‘path/scan-1.ms’, ‘path/scan-2.ms’].
- field kafka_topic: str = 'pointing_offset'
Kafka topic name. If not supplied, the topic will be set to ‘pointing_offset’.
- field pointing_option: Literal['write-hdf', 'send-to-kafka', 'both'] = 'both'
Specifies the output of the ‘pointing’ scenario. Allowed: ‘send-to-kafka’ for sending pointing data to kafka, ‘write-hdf’ for writing pointing HDF files, ‘both’ for sending pointing data to kafka and writing pointing HDF files.
- field scenario: Literal['pointing', 'measurement-set', None] = None
The name of the mock data scenario. Allowed: ‘pointing’ for sending pointing data to Kafka and/or writing pointing HDF files, ‘measurement-set’ for copying MS to the output directory.
Changelog
1.0.0
Update dependencies, documentation and bug fixes (MR246)
Remove code to configure queue connector before v5 (MR240)
Add function to create data flows in pointing scenario (MR238)
Combine “kafka” and “hdf” scenarios into a single “pointing” scenario (MR235)
Remove using specific scan IDs (MR232)
Remove option that allows users to provide input data for the pointing-offset-hdf and pointing-offset-queue-connector scenarios (MR232)
Update Dockerfile to use SKA Python base image (MR211)
Update pointing-offset-queue-connector to wait until scans are run/completed (MR206)
Update behaviour when no input_data is specified to use internal data (MR206)
0.1.0
[!WARNING] This version only works with SDP 0.24.0 but if the QueueConnector device is set to be version 4.1.0
Add the pointing data CSV file to the docker image (MR202)
Update ska-sdp-scripting to 0.12.0 (MR202)
Add functionality to check which antennas to simulate and update all relevant parameters in the template HDF5 to match number of antennas (MR195)
Update file writing to occur when scans are run/completed (MR196)
Add functionality to the mock-test-data processing script to generate data files (MR187)
Processing script reports internal errors in pb state (MR185)
Pydantic model included in documentation (MR189)
JSON parameter schema added to tmdata (MR186)
Validate processing block parameters using scripting library 0.10.0 (MR180)
Added processing block parameter JSON schema and Pydantic model (MR180)
Update script to write basic metadata yaml file (MR181)
Update script to execute required functions to produce data in an execution engine (MR179)
Initial version of the script, which sends mock pointing offset data to Kafka and configures the QueueConnector to display this data in tango attributes. (MR175)