import json
from typing import List
from ska_ser_skallop.utils.nrgen import get_id
standard_composition_template = {
"interface": "https://schema.skao.int/ska-tmc-assignresources/2.0",
"transaction_id": "txn-....-00001",
"subarray_id": 1,
"dish": {"receptor_ids": ["0001", "0002", "0003", "0004"]},
"sdp": {
"interface": "https://schema.skao.int/ska-sdp-assignres/0.3",
"eb_id": "eb-mvp01-20200325-09059",
"max_length": 100.0,
"scan_types": [
{
"scan_type_id": "science_A",
"reference_frame": "ICRS",
"ra": "02:42:40.771",
"dec": "-00:00:47.84",
"channels": [
{
"count": 744,
"start": 0,
"stride": 2,
"freq_min": 350000000.0,
"freq_max": 368000000.0,
"link_map": [[0, 0], [200, 1], [744, 2], [944, 3]],
},
{
"count": 744,
"start": 2000,
"stride": 1,
"freq_min": 360000000.0,
"freq_max": 368000000.0,
"link_map": [[2000, 4], [2200, 5]],
},
],
},
{
"scan_type_id": "calibration_B",
"reference_frame": "ICRS",
"ra": "12:29:06.699",
"dec": "02:03:08.598",
"channels": [
{
"count": 744,
"start": 0,
"stride": 2,
"freq_min": 350000000.0,
"freq_max": 368000000.0,
"link_map": [[0, 0], [200, 1], [744, 2], [944, 3]],
},
{
"count": 744,
"start": 2000,
"stride": 1,
"freq_min": 360000000.0,
"freq_max": 368000000.0,
"link_map": [[2000, 4], [2200, 5]],
},
],
},
],
"processing_blocks": [
{
"pb_id": "pb-mvp01-20200325-09059",
"workflow": {
"kind": "realtime",
"name": "test-receive-addresses",
"version": "0.4.0",
},
"parameters": {},
}
],
},
}
[docs]def generate_standard_comp_to_file(
subarray_id: int, dish_ids: List[int], location: str, sb_id: str
) -> str:
template = standard_composition_template.copy()
template["subarray_id"] = subarray_id
template["dish"]["receptor_ids"] = [f"{id:0>4}" for id in dish_ids]
template["sdp"]["eb_id"] = sb_id
for pb in template["sdp"]["processing_blocks"]:
pb["pb_id"] = get_id("pb-mvp01-********-*****")
file_name = f"{location}/standard_comp.json"
with open(file_name, "w") as file:
json.dump(template, file)
return file_name
[docs]def generate_standard_comp(subarray_id: int, dish_ids: List[int], sb_id: str) -> str:
template = standard_composition_template.copy()
template["subarray_id"] = subarray_id
template["dish"]["receptor_ids"] = [f"{id:0>4}" for id in dish_ids]
template["sdp"]["eb_id"] = sb_id
for pb in template["sdp"]["processing_blocks"]:
pb["pb_id"] = get_id("pb-mvp01-********-*****")
return json.dumps(template)
[docs]def generate_tear_down_all_resources(subarray_id: int) -> str:
configuration = {
"interface": "https://schema.skao.int/ska-tmc-releaseresources/2.0",
"transaction_id": "txn-....-00001",
"subarray_id": subarray_id,
"release_all": True,
"receptor_ids": [],
}
return json.dumps(configuration)