Writing scripts for the OET
The Observation Execution Tool (OET) can run observing scripts in a headless non-interactive manner. For efficiency, OET script execution is split into two phases: an initialisation phase and an execution phase. Scripts that are expected to be run by the OET should be structured to have two entry points corresponding to these two phases, as the template below:
1def init(subarray: int, *args, **kwargs):
2 # Called by the OET when the script is loaded and initialised by someone
3 # calling 'oet prepare'. Add your script initialisation code here. Note that
4 # the target subarray is supplied to this function as the first argument.
5 pass
6
7def main(*args, **kwargs):
8 # Called by the OET when the prepared script is told to run by someone
9 # calling 'oet start'. Add the main body of your script to this function.
10 pass
The initialisation phase occurs when the script is loaded and the script’s
init function is called (if defined) to perform any preparation and/or
initialisation. Expensive and slow operations that can be performed ahead of the main
body of script execution can be run in the initialisation phase. Typical actions
performed in init are I/O intensive operations, e.g., cloning a git repository,
creating multiple Tango device proxies, subscribing to Tango events, etc. When run by
the Observation Execution Tool (OET), the init function is passed an integer
subarray ID declaring which subarray the control script is intended to control.
Subsequently, at some point a user may call oet start, requesting that the
initialised script begin the main body of its execution. When this occurs, the OET
calls the script’s main function, which should performs the main function of the
script. For an observing script, this would involve the configuration and control of a
subarray.
below is the real example script in the scripts folder of this project.
SKA : Allocate Resources and Perform Observation
Allocating resources and performing scans requires communication with TMC CentralNode and
TMC SubarrayNode, and targets a specific subarray. This script’s init function
pre-applies the subarray ID argument to the main function. Note that this script does not
perform any Tango calls directly, but uses ska_oso_scripting.api
functions to perform all the required Tango interactions (command invocation; event
subscriptions; event monitoring).
1"""
2Standard observing script for SBDefinition execution.
3
4Creates an AssignResources command from the SBDefinition and sends to TMC CentralNode, then
5creates ConfigureRequests and Scan commands to send to TMC SubarrayNode.
6
7At the end of the observation, all resources are released.
8
9This script and the transform functions to telescope commands should support
10all functionality currently available in the ODT (e.g. PST, 5 point scans, etc)
11"""
12import functools
13import logging
14import os
15from typing import Any
16
17from ska_oso_pdm import SBDefinition, TelescopeType
18
19from ska_oso_scripting import api
20from ska_oso_scripting.api import execution_block
21from ska_oso_scripting.api.objects import SubArray
22from ska_oso_scripting.core.execution import ValueTransitionError
23from ska_oso_scripting.pdm_transforms import (
24 create_cdm_assign_resources_request_from_scheduling_block,
25 create_cdm_configure_requests_from_scheduling_block,
26)
27
28from ska_oso_scripting.topics import user_topics
29from ska_oso_scripting.engineering.low import workarounds as LOW_WORKAROUNDS
30from ska_oso_scripting.engineering.scripting_workarounds import TEMP_WORKAROUNDS
31
32LOG = logging.getLogger(__name__)
33FORMAT = "%(asctime)-15s %(message)s"
34
35logging.basicConfig(level=logging.INFO, format=FORMAT)
36
37
38def init(subarray_id: int | str, **init_args):
39 """
40 Initialise the script, binding the script runtime args to the script.
41 """
42 try:
43 subarray_id = int(subarray_id)
44 except ValueError as err:
45 raise TypeError("subarray_id must be an integer") from err
46
47 LOG.debug(f"Initializing script {__name__} with subarray_id={subarray_id}")
48
49 for arg,value in init_args.items():
50 LOG.debug(f"Initializing script {__name__} with argument(s) {arg}={value}")
51
52 global main
53
54 main = functools.partial(_main, subarray_id=subarray_id)
55 LOG.info(f"Script bound to sub-array {subarray_id}")
56
57
58def assign_resources(subarray: SubArray, sbi: SBDefinition, apply_low_workarounds: bool =False):
59 """
60 assign resources to a target sub-array using a Scheduling Block (SB).
61 :param subarray: subarray ID
62 :param sbi: ska_oso_pdm.SBDefinition
63 :return:
64 """
65 LOG.info(
66 f"Running assign_resources(subarray={subarray.id}, sbi.sbd_id={sbi.sbd_id})"
67 )
68
69 cdm_allocation = create_cdm_assign_resources_request_from_scheduling_block(
70 subarray.id, sbi
71 )
72
73 if apply_low_workarounds:
74 cdm_allocation = LOW_WORKAROUNDS.update_assign_resources_request(cdm_allocation)
75 LOW_WORKAROUNDS.pre_obs_checks(
76 cdm_allocation.model_dump(mode="json", exclude_none=True, by_alias=True)
77 )
78
79 response = api.assign_resources_from_cdm(subarray.id, cdm_allocation)
80 LOG.info(f"Resources Allocated: {response}")
81
82 LOG.info("Allocation complete")
83
84
85def observe(*, subarray: SubArray, sbi: SBDefinition, runtime_args: dict[str,Any], apply_low_workarounds:bool =False):
86 """
87 Observe using a Scheduling Block (SB) and template CDM file.
88
89 :param subarray: SubArray instance containing subarray ID
90 :param sbi: Instance of a SBDefinition
91 :param runtime_args: Any script runtime arguments to be considered.
92 :return:
93 """
94
95 LOG.info(
96 f"Starting observing for Scheduling Block: {sbi.sbd_id}, subarray_id={subarray.id})"
97 )
98
99 if runtime_args:
100 for arg,value in runtime_args.items():
101 LOG.info(f"runtime argument(s) {arg}={value}")
102
103 scan_sequence = sbi.dish_allocations.scan_sequence if sbi.telescope == TelescopeType.SKA_MID else sbi.mccs_allocation.subarray_beams[0].scan_sequence
104
105 if not scan_sequence:
106 LOG.info(f"No scans defined in Scheduling Block {sbi.sbd_id}. No observation performed.")
107 return
108
109 cdm_configure_requests = (
110 create_cdm_configure_requests_from_scheduling_block(sbi,runtime_args)
111 )
112
113 if apply_low_workarounds:
114 LOW_WORKAROUNDS.update_configure_requests(cdm_configure_requests)
115
116 for scan_definition_idx in range(len(scan_sequence)):
117 cdm_configs = cdm_configure_requests[scan_definition_idx]
118 for index, cdm_config in enumerate(cdm_configs):
119 scan_id_string = (f"{scan_definition_idx} "
120 f"({str(index + 1)}/{str(len(cdm_configs))})" if len(cdm_configs) > 1 else '')
121 try:
122 # With the CDM modified, we can now issue the Configure instruction...
123 LOG.info(f"Configuring subarray {subarray.id} for scan: {scan_id_string}")
124 api.send_message(
125 user_topics.script.announce,
126 msg=f"Configuring subarray {subarray.id} for scan: {scan_id_string}"
127 )
128 api.configure_from_cdm(subarray.id, cdm_config)
129 except ValueTransitionError as err:
130 LOG.error(f"Error configuring subarray: {err}")
131 api.send_message(
132 user_topics.script.announce,
133 msg=f"Error configuring subarray for scan {scan_id_string}"
134 )
135 raise err
136 else:
137 LOG.info(f"Configuration for scan {scan_id_string} complete")
138 api.send_message(
139 user_topics.script.announce,
140 msg=f"Configuration for scan {scan_id_string} complete"
141 )
142 try:
143 # with configuration complete, we can begin the scan.
144 LOG.info(f"Starting scan: {scan_id_string}")
145 api.send_message(
146 user_topics.script.announce,
147 msg=f"Starting scan: {scan_id_string}"
148 )
149 api.scan(subarray.id)
150 except ValueTransitionError as err:
151 LOG.error(f"Error when executing scan: {scan_id_string}: {err}")
152 api.send_message(
153 user_topics.script.announce,
154 msg=f"Error when executing scan: {scan_id_string}"
155 )
156 raise err
157 else:
158 LOG.info(f"Scan {scan_id_string} complete")
159 api.send_message(
160 user_topics.script.announce,
161 msg=f"Scan {scan_id_string} complete"
162 )
163
164 # All scans are complete. Observations are concluded with an 'end'
165 # command.
166 LOG.info(f"End scheduling block: {sbi.sbd_id}")
167 api.end(subarray.id)
168
169 LOG.info("Observation script complete")
170
171
172def _main(subarray_id: int, sb_json: str, sbi_id: str, context: dict | None = None, **runtime_args):
173 LOG.info(f"Running OS process {os.getpid()}")
174 LOG.info(f"Called with main(subarray_id={subarray_id}, sbi_id={sbi_id})")
175 LOG.debug(f"main() sb_json={sb_json}")
176 sbd: SBDefinition = api.load_sbd(sb_json)
177
178 if context and 'wait_for_qa_ready' in context:
179 # whatever the OET provides takes priority, regardless of telescope
180 qa_enabled = context.get('wait_for_qa_ready')
181 else:
182 # .. otherwise fall back to the environment variable, if defined.
183 qa_enabled = TEMP_WORKAROUNDS.enable_adr111
184 api.init_qa_wait(qa_enabled)
185
186 apply_low_workarounds = apply_low_workarounds_in_args(runtime_args)
187
188 if apply_low_workarounds:
189 sbd = LOW_WORKAROUNDS.update_scheduling_block_definition(sbd)
190
191 eb_id = execution_block.create_eb(telescope=sbd.telescope, sbi_ref=sbi_id)
192 try:
193 LOG.info(f"Created Execution Block {eb_id}")
194 subarray = SubArray(subarray_id)
195 assign_resources(subarray, sbd, apply_low_workarounds)
196 # disabled until there is agreement over QA metric names is reached.
197 # api.configure_subarray_quality_monitor(subarray.id, sbd)
198 if apply_low_workarounds:
199 LOW_WORKAROUNDS.check_mccs_subarray_leaf_node_obsstate(subarray_id)
200 observe(subarray=subarray, sbi=sbd, apply_low_workarounds=apply_low_workarounds, runtime_args=runtime_args)
201 api.release_all_resources(subarray_id)
202 execution_block.mark_execution_block_observed(eb_id)
203 except Exception as err:
204 LOG.error(f"Scheduling block execution failed: {err}", exc_info=True)
205 if apply_low_workarounds:
206 LOW_WORKAROUNDS.back_to_empty(subarray_id)
207 execution_block.mark_execution_block_failed(eb_id=eb_id, error_message=str(err))
208 raise
209
210
211def apply_low_workarounds_in_args(runtime_args: dict) -> bool:
212 if (workarounds := runtime_args.get("workarounds")) is None:
213 return False
214
215 if "low" in str(workarounds).lower():
216 return True
217
218 return False