Source code for ska_ser_skallop.mvp_control.configuration.composition

import json
from typing import List

from ska_ser_skallop.utils.nrgen import get_id

standard_composition_template = {
    "interface": "https://schema.skao.int/ska-tmc-assignresources/2.0",
    "transaction_id": "txn-....-00001",
    "subarray_id": 1,
    "dish": {"receptor_ids": ["0001", "0002", "0003", "0004"]},
    "sdp": {
        "interface": "https://schema.skao.int/ska-sdp-assignres/0.3",
        "eb_id": "eb-mvp01-20200325-09059",
        "max_length": 100.0,
        "scan_types": [
            {
                "scan_type_id": "science_A",
                "reference_frame": "ICRS",
                "ra": "02:42:40.771",
                "dec": "-00:00:47.84",
                "channels": [
                    {
                        "count": 744,
                        "start": 0,
                        "stride": 2,
                        "freq_min": 350000000.0,
                        "freq_max": 368000000.0,
                        "link_map": [[0, 0], [200, 1], [744, 2], [944, 3]],
                    },
                    {
                        "count": 744,
                        "start": 2000,
                        "stride": 1,
                        "freq_min": 360000000.0,
                        "freq_max": 368000000.0,
                        "link_map": [[2000, 4], [2200, 5]],
                    },
                ],
            },
            {
                "scan_type_id": "calibration_B",
                "reference_frame": "ICRS",
                "ra": "12:29:06.699",
                "dec": "02:03:08.598",
                "channels": [
                    {
                        "count": 744,
                        "start": 0,
                        "stride": 2,
                        "freq_min": 350000000.0,
                        "freq_max": 368000000.0,
                        "link_map": [[0, 0], [200, 1], [744, 2], [944, 3]],
                    },
                    {
                        "count": 744,
                        "start": 2000,
                        "stride": 1,
                        "freq_min": 360000000.0,
                        "freq_max": 368000000.0,
                        "link_map": [[2000, 4], [2200, 5]],
                    },
                ],
            },
        ],
        "processing_blocks": [
            {
                "pb_id": "pb-mvp01-20200325-09059",
                "workflow": {
                    "kind": "realtime",
                    "name": "test-receive-addresses",
                    "version": "0.4.0",
                },
                "parameters": {},
            }
        ],
    },
}


[docs]def generate_standard_comp_to_file( subarray_id: int, dish_ids: List[int], location: str, sb_id: str ) -> str: template = standard_composition_template.copy() template["subarray_id"] = subarray_id template["dish"]["receptor_ids"] = [f"{id:0>4}" for id in dish_ids] template["sdp"]["eb_id"] = sb_id for pb in template["sdp"]["processing_blocks"]: pb["pb_id"] = get_id("pb-mvp01-********-*****") file_name = f"{location}/standard_comp.json" with open(file_name, "w") as file: json.dump(template, file) return file_name
[docs]def generate_standard_comp(subarray_id: int, dish_ids: List[int], sb_id: str) -> str: template = standard_composition_template.copy() template["subarray_id"] = subarray_id template["dish"]["receptor_ids"] = [f"{id:0>4}" for id in dish_ids] template["sdp"]["eb_id"] = sb_id for pb in template["sdp"]["processing_blocks"]: pb["pb_id"] = get_id("pb-mvp01-********-*****") return json.dumps(template)
[docs]def generate_tear_down_all_resources(subarray_id: int) -> str: configuration = { "interface": "https://schema.skao.int/ska-tmc-releaseresources/2.0", "transaction_id": "txn-....-00001", "subarray_id": subarray_id, "release_all": True, "receptor_ids": [], } return json.dumps(configuration)