OET command line tool
The oet
command can be used to control a remote OET deployment [1].
The oet
command has two sub-commands, procedure
and activity
.
oet procedure
commands are used to control individual observing scripts,
which includes loading and starting and stopping script execution.
oet activity
commands are used to execute more general activities on the
telescope, for example running the allocate activity on SB with ID xxx.
See Procedure and Activity sections for further details on commands available for each of the approaches.
General help and specific help is available at the command line by adding the
--help
argument. For example:
# get a general overview of the OET CLI
$ oet procedure --help
$ oet activity --help
# get specific help on the oet create command
$ oet procedure create --help
# get specific help on the oet describe command
$ oet activity describe --help
Installation
The OET command line tool is available as the oet
command at the terminal.
The OET CLI is packaged separately so it can be installed without OET backend
dependencies, such as PyTango. It can be installed into a Python environment,
and configured to access a remote OET deployment as detailed below:
$ pip install --upgrade ska_oso_oet_client
By default, the OET image has the CLI installed, meaning the CLI is accessible from inside the running OET pod.
Configuration
The address of the remote OET backend can be specified at the command line
via the server-url
argument, or set session-wide by setting the
OET_REST_URI
environment variable, e.g.,
# provide the server URL when running the command, e.g.
$ oet --server-url=http://my-oet-deployment.com:5000/api/v1.0 procedure list
# alternatively, set the server URL for a session by defining an environment variable
$ export OET_REST_URI=http://my-oet-deployment.com:5000/api/v1.0
$ oet procedure list
$ oet activity describe
$ oet procedure create ...
By default, the client assumes it is operating within a SKAMPI environment
and attempts to connect to a REST server using the default REST service name
of http://ska-oso-oet-rest:5000/api/v1.0. If running the OET
client within a SKAMPI pod, the OET_REST_URI
should automatically be set.
Commands
Common
The oet
CLI tool has listen
command which is neither activity or procedure specific.
It is used to observe OET messages and script messages from, procedure, activity and several
other topics.
OET CLI action |
Parameters |
Default |
Description |
---|---|---|---|
Listen |
server-url |
See Configuration section |
Get real times scripts events Get a real time delivery of events published by oet server/scripts |
Examples
A ‘listen’ command will give the real time delivery of oet events published by scripts:
$ oet listen
event: request.procedure.list
data: args=() kwargs={'msg_src': 'FlaskWorker', 'request_id': 1604056049.4846392, 'pids': None}
event: procedure.pool.list
data: args=() kwargs={'msg_src': 'SESWorker', 'request_id': 1604056049.4846392, 'result': []}
event: activity.pool.list
data: args=() kwargs={'msg_src': 'ActivityWorker', 'request_id': 1604056078.4847652, 'result': []}
event: request.procedure.create
data: args=() kwargs={'msg_src': 'FlaskWorker', 'request_id': 1604056247.0666442, 'cmd': PrepareProcessCommand(script_uri='file://scripts/eventbus.py', init_args=<ProcedureInput(, subarray_id=1)>)}
event: procedure.lifecycle.created
data: args=() kwargs={'msg_src': 'SESWorker', 'request_id': 1604056247.0666442, 'result': ProcedureSummary(id=1, script_uri='file://scripts/eventbus.py', script_args={'init': <ProcedureInput(, subarray_id=1)>, 'run': <ProcedureInput(, )>}, history=<ProcessHistory(process_states=[(ProcedureState.READY, 1604056247.713874)], stacktrace=None)>, state=<ProcedureState.READY: 1>)}
Press Control-c to exit from oet listen
.
Procedure
Using oet procedure
, a remote OET deployment can be instructed to:
load a Python script using
oet procedure create
;run a function contained in the Python script using
oet procedure start
;stop a running Python function using
oet procedure stop
;
In addition, the current and historic state of Python processes running on the backend can be inspected with
oet procedure list
to list all scripts that are prepared to run or are currently running;oet procedure describe
to inspect the current and historic state of a specific process.
The commands available via oet procedure
are described below.
OET CLI action |
Parameters |
Default |
Description |
---|---|---|---|
create |
server-url |
See Configuration section |
Prepare a new procedure Load the requested script and prepare it for execution. Arguments provided here are passed to the script init function, if defined OET maintains record of 10 newest scripts which means creating 11th script will remove the oldest script from the record. |
script-uri |
None |
||
args |
None |
||
kwargs |
--subarray_id=1 --git_repo= “http://gitlab.com/ska-telescope/oso/ska-oso-scripting” --git_branch=”master” --git_commit=None --create_env=False |
||
list |
server-url |
See Configuration section |
List procedures Return info on the collection of 10 newest procedures, or info on the one specified by process ID (pid) |
pid |
None |
||
start |
server-url |
See Configuration section |
Start a Procedure Executing Start a process executing the procedure specified by process ID (pid) or, if none is specified start the last one loaded. Only one procedure can be executing at any time. listen flag is set to True by default which means that events are shown on the command line unless is is explicitly set to False. |
pid |
None |
||
args |
None |
||
kwargs |
None |
||
listen |
True |
||
stop |
server-url |
See Configuration section |
Stop Procedure Execution Stop a running process executing the procedure specified by process ID (pid) or, if none is specified, stop the currently running process. If run_abort flag is True, OET will send Abort command to the SubArray as part of script termination. |
pid |
None |
||
run_abort |
True |
||
describe |
server-url |
See Configuration section |
Investigate a procedure Displays the call arguments, state history and, if the procedure failed, the stack trace of a specified process ID (pid). If no pid is specified describe the last process created. |
pid |
None |
In the table ‘args’ refers to parameters specified by position on the command line, ‘kwargs’ to those specified by name e.g. –myparam=12.
Examples
This section runs through an example session in which we will load two new ‘Procedures’ [2] and then run one of them. First we load the procedure, and see the backend report that it is creating a process with ID=1 to run the script.
$ oet procedure create file://test.py 'hello' --verbose=true
ID Script Creation time State
---- --------------- ------------------- -------
1 file://test.py 2020-09-30 10:30:12 CREATING
Note the use of both positional and keyword/value arguments for the procedure on the command line. Now create a second procedure:
$ oet procedure create file://test2.py 'goodbye'
ID Script Creation time State
---- --------------- ------------------- -------
2 file://test2.py 2020-09-30 10:35:12 CREATING
Now create a third procedure that will be pulled from git:
$ oet procedure create git://test3.py --git_repo="http://foo.git" --git_branch="test" --create_env=True
ID Script Creation time State
---- --------------- ------------------- -------
3 git://test3.py 2020-09-30 10:40:12 CREATING
We can check the state of the procedures currently loaded:
$ oet procedure list
ID Script Creation time State
---- --------------- ------------------- -------
1 file://test.py 2020-09-30 10:30:12 READY
2 file://test2.py 2020-09-30 10:35:12 READY
3 git://test3.py 2020-09-30 10:40:12 READY
Alternatively, we could check the state of procedure 2 alone:
$ oet procedure list --pid=2
ID Script Creation time State
---- --------------- ------------------- -------
2 file://test2.py 2020-09-30 10:35:12 READY
Now that we have our procedures loaded we can start one of them running. At this point we supply the ID of the procedure to run, and some runtime arguments to pass to it if required. The backend responds with the new status of the procedure.
$ oet procedure start --pid=2 'bob' --simulate=false
ID Script Creation time State
---- --------------- ------------------- -------
2 file://test2.py 2020-09-30 10:35:12 RUNNING
An oet procedure list
command also shows the updated status of procedure #2:
$ oet procedure list
ID Script Creation time State
---- --------------- ------------------- -------
1 file://test.py 2020-09-30 10:30:12 READY
2 file://test2.py 2020-09-30 10:35:12 RUNNING
3 git://test3.py 2020-09-30 10:40:12 READY
An oet procedure describe
command will give further detail on a procedure, no
matter its state.
$ oet procedure describe --pid=2
ID Script URI
---- --------------- -----------------------------------------
2 file://test2.py http://0.0.0.0:5000/api/v1.0/procedures/2
Time State
-------------------------- -------
2020-09-30 10:19:38.011584 CREATING
2020-09-30 10:19:38.016266 IDLE
2020-09-30 10:19:38.017883 LOADING
2020-09-30 10:19:38.018880 IDLE
2020-09-30 10:19:38.019006 RUNNING 1
2020-09-30 10:19:38.019021 READY
2020-09-30 10:35:12.605270 RUNNING 2
Index Method Arguments Keyword Arguments
-------- --------- ----------- -------------------
1 init ['goodbye'] {'subarray_id': 1}
2 run ['bob'] {'simulate': false}
Describing a script from git shows additional information on the repository:
$ oet procedure describe --pid=3
ID Script URI
---- --------------- -----------------------------------------
3 git://test3.py http://0.0.0.0:5000/api/v1.0/procedures/3
Time State
-------------------------- -------
2020-09-30 10:40:12.435305 CREATING
2020-09-30 10:40:12.435332 IDLE
2020-09-30 10:40:12.435364 LOADING
2020-09-30 10:40:12.435401 IDLE
2020-09-30 10:40:12.435433 RUNNING 1
2020-09-30 10:40:12.435642 READY
Index Method Arguments Keyword Arguments
-------- -------- ---------- -------------------
1 init [] {'subarray_id': 1}
2 run [] {}
Repository Branch Commit
--------------- ------- -------------------
http://foo.git test
If the procedure failed, then the stack trace will also be displayed.
Example session in a SKAMPI environment
From a shell, you can use the ‘oet procedure’ command to trigger remote execution of a full observation, e.g.,
# create process for telescope start-up and execute it
oet procedure create file:///scripts/startup.py
oet procedure start
# create process for resource allocation script
oet procedure create file:///scripts/allocate_from_file_sb.py --subarray_id=3
oet procedure start scripts/example_sb.json
# create process for configure/scan script
oet procedure create file:///scripts/observe_sb.py --subarray_id=3
# run the script, specifying scheduling block JSON which defines
# the configurations, and the order and number of scans
oet procedure start scripts/example_sb.json
# create process for resource deallocation script
oet procedure create file:///scripts/deallocate.py --subarray_id=3
# run with no arguments, which requests deallocation of all resources
oet procedure start
# create process for telescope standby script
oet procedure create file:///scripts/standby.py
oet procedure start
Activity
Using oet activity
, a remote OET deployment can be instructed to:
execute a observing activity of a Scheduling Block with
oet activity run
In addition, the current and historic state of Activities can be inspected with
oet activity list
to list all activities that have been started;oet activity describe
to inspect the current and historic state of a specific activity.
The commands available via oet activity
are described below.
OET CLI action |
Parameters |
Default |
Description |
---|---|---|---|
run |
server-url |
See Configuration section |
Run an activity of an SB Create and run a script referenced by an activity defined in an SB. The activity-name and sbd-id are mandatory arguments. script-args is a dictionary defining function name as a key (e.g. ‘init’) and any keyword arguments to be passed for the function on top of arguments present in the SB. Only keyword args are currently allowed. preparep-only should be set to False if the script referred to by SB and activity is not to be run yet. To start a prepared script, use the oet procedure commands. create-env flag should be set to True if script referred to by SB is a Git script and requires a non- default environment to run. |
activity-name |
None |
||
sbd-id |
None |
||
script-args |
None |
||
prepare-only |
False |
||
create-env |
False |
||
listen |
True |
||
list |
server-url |
See Configuration section |
List activities Return info on the collection of 10 newest activities, or info on the one specified by activity ID (aid) |
aid |
None |
||
describe |
server-url |
See note above |
Investigate an activity Displays the script arguments, and the state history of a specified activity ID (aid). If no aid is specified describe the last activity created. |
aid |
None |
Examples
This section runs through an example session in which we will
run an activity with arguments to the script. We will also demonstrate
the more advanced use of controlling activity execution with additional
oet procedure
commands. For this we will prepare an activity without
executing it and use the oet procedure
commands to run the prepared
activity.
$ oet activity run allocate sbd-123 --script-args='{"init": {"kwargs": {"foo": "bar"}}}'
ID Activity SB ID Creation Time Procedure ID State
---- ---------- ------- ------------------- -------------- ---------
1 allocate sbd-123 2023-01-06 13:56:47 1 REQUESTED
Note the use of keyword arguments for the script arguments. These will be passed as arguments when each function in the script is run. If the given keyword argument is already defined in the Scheduling Block, the value will be overwritten with the user provided one.
The activity has now been started and will complete without any further interaction from the user.
For an example of more advanced use of the activity interface, run an activity
but set the prepare-only
flag to True:
$ oet activity run observe sbd-123 --prepare-only=True
ID Activity SB ID Creation Time Procedure ID State
---- ---------- ------- ------------------- -------------- ---------
2 observe sbd-123 2023-01-06 13:56:56 2 REQUESTED
We can check the state of the activities currently present:
$ oet activity list
ID Activity SB ID Creation Time Procedure ID State
---- ---------- ------- ------------------- -------------- ---------
1 allocate sbd-123 2023-01-06 13:56:47 1 COMPLETE
2 observe sbd-123 2023-01-06 13:56:56 2 PREPARED
Note that the first activity prepares and runs the script automatically but
the second one only prepares the script but does not run it. To run the script
of the second activity we need to note the Procedure ID
for the activity
and use oet procedure
commands to run the script:
$ oet procedure start --pid=2
ID Script Creation time State
---- --------------- ------------------- -------
2 file://observe.py 2023-01-06 13:57:25 RUNNING
An oet activity describe
command will give further detail on an activity.
$ oet activity describe --aid=1
ID Activity SB ID Procedure ID State
---- ---------- ------- -------------- ---------
1 allocate sbd-123 1 COMPLETE
URI Prepare Only
----------------------------------------- --------------
http://0.0.0.0:5000/api/v1.0/activities/1 False
Time State
-------------------------- ---------
2023-01-06 13:56:47.655175 REQUESTED
2023-01-06 13:56:47.934723 PREPARED
2023-01-06 13:56:48.004753 RUNNING
2023-01-06 13:56:50.382756 COMPLETE
Script Arguments
----------------
Method Arguments Keyword Arguments
-------- ----------- -------------------
init [1, 'foo'] {'foo': 'bar'}
You can also view the details of the script that was run by the activity:
$ oet procedure describe --pid=1
ID Script URI
---- --------------- -----------------------------------------
1 file://allocate.py http://0.0.0.0:5000/api/v1.0/procedures/1
Time State
-------------------------- -------
2023-01-06 13:56:47.655175 CREATING
2023-01-06 13:56:47.663742 IDLE
2023-01-06 13:56:47.665741 LOADING
2023-01-06 13:56:47.730696 IDLE
2023-01-06 13:56:47.731965 RUNNING 1
2023-01-06 13:56:47.934723 READY
2023-01-06 13:56:48.004753 RUNNING 2
2023-01-06 13:56:50.382756 READY
Index Method Arguments Keyword Arguments
-------- -------- ---------- -------------------
1 init [1, 'foo'] {'foo': 'bar'}
2 run [] {}
Footnotes
Environment Variables
Telescope
The SKA comprises two telescopes: SKA MID (Dishes) and SKA LOW (Antennas). The behaviour of code in the ska_oso_scripting module differs depending on whether it is running in an SKA MID environment (default) or an SKA LOW environment. For example, when configured for SKA MID, the code will reject CDM payloads intended for SKA LOW.
The ska-oso-scripting code is configured for MID or LOW by setting the
SKA_TELESCOPE
environment variable to either ‘skamid’ or ‘skalow’.
If no environment variable is specified, the code assumes it is controlling
SKA MID.
The telescope setting is also exposed as a configurable value in the ska-oso-scripting Helm charts, with a default value also set to SKA MID. The ska-oso-scripting definitions in the skamid and skalow SKAMPI Helm charts set the appropriate value for their respective deployments.
Tango Device FQDNs
The SKA, and so by extension the OET, makes use of Tango Controls to control
the telescope hardware. The Fully Qualified Domain Names (FQDNs) or prefixes
of the Tango devices used to control the central node (telescope) and
sub-arrays are set as environment variables CENTRALNODE_FQDN
and
SUBARRAYNODE_FQDN_PREFIX
respectively. These environment variables are set
to the those defined in values.yaml when ska-oso-scripting/SKAMPI is deployed.
C&C view: OET client and OET backend
This view is a component and connector (C&C) view of the OET that depicts the primary OET clients and their connection to the OET backend, and how the components of the backend are connected.
Primary Presentation
Elements and their properties
Components
Component |
Description |
---|---|
|
FlaskWorker is a Flask application that presents a RESTful OET API, functioning as a REST adapter for the
ScriptExecutionService. Scripts can be created, controlled, and terminated via the REST API. The FlaskWorker
presents a REST resource for each script process created and managed by the ProcessManager.
|
|
The main component is the first component to be started when the OET backend is launched. It has two major
responsibilities: first, it launches and thereafter manages the lifecycle of all components comprising the OET
backend apart from the ‘script process’, whose lifecycle is managed separately by the script
supervisor component. Second, it manages the OET event bus, routing OET events between backend components.
|
OET Web UI |
NOT IMPLEMENTED YET
|
OST |
NOT IMPLEMENTED YET
|
|
RestClientUI provides a command-line interface for invoke actions on the OET backend. The CLI is a general interface
whose operations (currently) focus on the script execution perspective (load script, abort script, etc.) rather
than the telescope-domain use cases (assign resources to subarray, execute SB, etc.).
|
|
ScriptExecutionServiceWorker responds to requests received by the FlaskWorker, relaying the request to the ScriptExecutionService and publishing the response as an event that can be received by the FlaskWorker and returned to the user in the appropriate format. |
ScriptExecutionService present the high-level API for script execution. The ScriptExecutionService orchestrates control of internal OET objects to satisfy an API request. ScriptExecutionService is also responsible for recording script execution history. ScriptExecutionService can return a presentation model of a script, its current state, and its execution history. See ProcedureSummary in the backend module view. |
|
ScriptWorker represents the child Python process running the requested user script. For SKA operations, most
scripts executed by the OET, and hence scripts that will run in a Script Process, will be ‘observing scripts’
that control an SKA subarray. The content and purpose of these ‘observing scripts’ is contained and defined in
the ska-oso-scripting project.
|
Connectors
Connectors |
Description |
---|---|
REST over HTTP |
REST over HTTP defines a request/response connector that is used by a client to invoke services on a server using REST over HTTP. Script processes are presented as REST resources by the OET backend. Using the REST over HTTP connector, clients can control the lifecycle and/or inspect the status of scripts running in the OET backend. |
OET event bus |
OET event bus connector defines an internal pub/sub connector used by an OET component to publish and subscribe to OET events (messages) sent on a topic. |
Server-Sent Event |
SSE connector defines a connector that is used by a client to listen to a continuous data stream of SSE events sent over a HTTP connection from an SSE server. SSE connectors have a client role and a server role. The SSE connector is used to give clients visibility of OET events published on the OET event bus. |
Context
Variability Guide
The OET CLI reads the OET_REST_URI
environment variable to find the URL of the OET REST server.
Rationale
REST over HTTP
REST over HTTP was selected as the protocol for remote control of the backend control for two reasons. First, we needed a protocol that was supported by multiple languages, anticipating that the OET web UI might not be Python based. Second, we preferred a stable and mature protocol with good library support. REST satisfies all these requirements, with good Python library support for both REST clients and REST servers.
Server-Sent Events
Insights into remote OET activities and script execution are obtained by monitoring events sent on the OET event bus. OET components, scripting libraries, and user scripts can all announce events of interest by publishing an event on the OET event bus. Events are published on various topics, from the script lifecycle (script loaded, script running, script aborting, script aborted, etc.), through to the SB lifecycle (SB resources allocated, observation started, observation complete, etc.) and subarray lifecycle (resources allocated, resources configured, scan started, etc.).
We needed a mechanism that would give the OET CLI, and possible the OET web UI at some future date, a tap into these events broadcast inside a remote OET backend. This use case requires the server to push events as they happen and have the client process/display them as they are received. Standard synchronous HTTP request/response does not map easily onto this use case and so we searched for a standard that would allow server-pushed messages. Any mechanism would also need to be language independent, mature, easily implemented and easily deployable in a Kubernetes setting, just as for REST over HTTP.
Server-Sent Events (SSE) was selected as it satisfies all of these criteria. SSE operates over HTTP, and the SSE API is standardised as part of HTML5. SSE has growing language support, including Python server and client library support, which helps keep the OET implementation simple. As it operates over HTTP, it can be delivered via the same Kubernetes ingress as the OET REST API.
No dedicated message broker
Systems that use a message-oriented architecture often use an dedicated message broker component such as RabbitMQ or Kafka whose sole responsibility is the delivery of messages to subscribers. Using a dedicated message broker can increase scalability by allowing multiple distributed brokers, increase reliability by allowing guaranteed message delivery, and promote system modifiability and composability by allowing routing of messages to inhomogeneous, loosely coupled, and potentially distributed subscribers via the network.
The OET does not currently use an external message broker as simplicity of deployment and reduced system complexity are currently prioritised over the advantages that an external message broker brings. Routing messages via a network broker would introduce complexity, overhead, and failure modes that are unnecessary in a homogeneous system with message publishers and message subscribers running in the same process space on the same host. We assume that message delivery through Python multiprocessing queues - essentially, communication via UNIX pipes - is robust and does not require message delivery guarantees. Additionally, telescope control scripts are not designed to be resumed in the event of failure, hence there is no value in resending any message lost to a failed ScriptWorker to a new replacement ScriptWorker. There is also a desire to keep the OET deployment footprint small and with minimal dependencies so that the OET can be easily incorporated and/or deployed in a simulator context for other OSO use.
That said, the OET architecture does allow the introduction of a dedicated message broker if the OET requirements change.
Module view: Script Execution UI and Service API
Note
Diagrams are embedded as SVG images. If the text is too small, please use your web browser to zoom in to the images, which should be magnified without losing detail. Alternatively open image in a new tab with right click + Open in a new tab.
This view is a module view showing the key components responsible for the OET interface, how they relay requests from remote OET clients to the internal OET components responsible for meeting that request, and how the response makes its way back to the client.
Primary Presentation
Major classes involved in the user interface and remote control of the script execution API.
Element Catalogue
Elements and Their Properties
Component |
Description |
---|---|
app
|
app is the Flask web application that makes the OET available over HTTP. app is the local variable created during FlaskWorker startup. The web application has the API blueprint and ServerSentEventsBlueprint registered, which makes the OET REST API and the OET event stream available when the web app is run. |
|
ProcedureAPI is a Flask blueprint containing the Python functions that implement the OET REST API. HTTP resources
in this blueprint are accessed and modified to control script execution. As the resources are accessed, the API
implementation publishes an equivalent request event, which triggers the ScriptExecutionServiceWorker to take the
appropriate action to satisfy that request. API also converts the response back to a suitable HTML response.
|
Blueprint |
A Flask Blueprint collects a set of HTTP operations that can be registered on a Flask web application. Registering a Blueprint to a Flask application makes the HTTP operations in that blueprint available when the web application is deployed. |
|
EventBusWorker is a base class that bridges the independent pypubsub publish-subscribe networks so that a pypubsub message seen in one EventBusWorker process is also seen by other EventBusWorker processes. EventBusWorker is intended to be inherited by classes that register their methods as subscribers to pypubsub topics, so that the subclass method is called whenever an event on that topic is received. |
Flask |
Flask (https://flask.palletsprojects.com) is a third-party Python framework for developing web applications. It provides an easy way to expose a Python function as a HTTP endpoint. Flask is used to present the functions in the restserver module as HTTP REST resources. |
|
FlaskWorker runs the ‘app’ Flask application. As a subclass of EventBusWorker, FlaskWorker also relays pypubsub messages to and from other Python processes. |
mptools is a Python framework for creating robust Python applications that run code concurrently in independent Python processes. See Module view: Script Execution for details. |
|
PrepareProcessCommand encapsulates all the information required to prepare a script for execution. It references both the script location and arguments that should be passed to the script initialisation function, if such a function is present. |
|
ProcedureHistory represents the state history of a script execution process, holding a timeline of state transitions and any stacktrace resulting from script execution failure. |
|
ProcedureSummary is a presentation model capturing information on a script and its execution history. Through the ProcedureSummary, information identifying the script, the process running it, the current and historic process state, plus a timeline of all function called on the script and any resulting stacktrace can be resolved. |
|
pypubsub |
pypubsub (https://pypubsub.readthedocs.io) is a third-party Python library that provides an implementation of the Observer pattern. It provides a publish-subscribe API for that clients can use to subscribe to topics. pypubsub notifies each subscriber whenever a message is received on that topic, passing the message to the client. pypubsub offer in-process publish-subscribe; it has no means of communicating messages to other Python processes. |
|
RestClientUI is a command line utility that accesses the OET REST API over the network. The RestClientUI provides commands for creating new script execution processes, invoking methods on user scripts, terminating scrip execution, listing user processes on the remote machine, and inspecting the state of a particular user script process. |
ScriptExecutionService provides the high-level API for the script execution domain, presenting methods that
‘start script X’ or ‘run method Y of user script Z’. See Module view: Script Execution for details on
how this is achieved.
|
|
|
ServerSentEventsBlueprint is a Flask Blueprint contains the functions required to expose the OET event bus as a server-sent events stream (https://en.wikipedia.org/wiki/Server-sent_events). This SSE stream republishes all events sent over the OET event bus as HTTP data. This provides the mechanism for external visibility of OET actions, significant milestones, and user events emitted by the script such as ‘subarray resources allocated’, ‘scan started’, ‘scan stopped’, etc. |
StartProcessCommand encapsulates all the information required to call a method of a user script running on the OET backend. It captures information on the script process to target, the script function to call, and any arguments to be passed to the function. |
|
StopProcesCommand encapsulates the information required to terminate a process. It holds information on which script process should be terminated and whether the ‘abort subarray activity’ follow-on script should be run. |
Element Interfaces
The major interface between the UI and OET backend is the REST API presented by the FlaskWorker, which is documented separately in Module View: REST API.
Element Behaviour
API invocation via HTTP REST
The sequence diagram below illustrates how the components above interact to invoke a call on an remote ScriptExecutionService instance in response to a request from a client. This diagram shows how the user request is received by the FlaskWorker REST backend, how that triggers actions on independent ScriptExecutionServiceWorker process hosting the ScriptExecutionService instance, and how the response is returned to the user.
Inter-process publish-subscribe
The sequence diagram below illustrates how in-process pypubsub messages are communicated to other processes, which is an essential part of the communication between FlaskWorker and ScriptExecutionServiceWorker and forms the basis for how event messages emitted by scripts can be published to the outside world in an HTTP SSE stream.
Context Diagram
Variability Guide
N/A
Rationale
N/A
Module view: Activity UI and Service API
Note
Diagrams are embedded as SVG images. If the text is too small, please use your web browser to zoom in to the images, which should be magnified without losing detail. Alternatively open image in a new tab with right click + Open in a new tab.
This view is a module view depicting the key components involved in SB activity execution; that is, requesting an activity described by a Scheduling Block to be run.
Primary Presentation
Major classes responsible for the execution and management of activities.
Element Catalogue
Elements and their properties
Component |
Description |
---|---|
ActivityState is an enumeration defining the states that an Activity (a concept linking Scheduling Blocks
to Procedures) can be in. State machine for activities has not yet been completely defined and currently
Activity can only be in state |
|
ActivityService provides the high-level API for the activity domain, presenting methods that
‘run a script referenced by activity X of scheduling block Y’. The ActivityService completes user requests
by translating the activity requests into Procedure domain commands which then execute the scripts.
|
|
|
For a the OET REST deployment, ActivityServiceWorker is the client sending requests to the ActivityService.
|
Element Interfaces
The major public interface in these interactions is the ActivityService API. For more information on this interface, please see the Module View: REST API.
Element Behaviour
Activity API invocation via HTTP REST
The sequence diagram below illustrates how the components above interact to invoke a call on an remote ActivityService instance in response to a request from a client. This diagram shows how the user request is received by the FlaskWorker REST backend, how that triggers actions on independent ActivityWorker process hosting the ActivityService instance, and how the response is returned to the user
Inter-process publish-subscribe
The Activity domain uses the same publish-subscribe system as Procedure domain for both communication between FlaskWorker and ActivityServiceWorker and for the ActivityService to communicate with the ScriptExecutionServiceWorker. For a diagram illustrating the flow of in-process pypubsub messages, see the Inter-process publish-subscribe section in the script execution API documentation.
Variability Guide
N/A
Rationale
Storing Scheduling Block in the Filesystem
It is currently only possible to deploy the activity and procedure services as one service. This means that the Scheduling Block can be written to file by the ActivityService and it will still be available to the Procedure domain. In the future the Activity and Procedure related services could be deployed in different locations so the current approach of saving SBs to a file should be refactored so that the script running on a different server can also access the SB.
Scheduling Block URI
Currently the Scheduling Block URI used in the OET system is a simple path string to a JSON file referred to by a keyword argument sb_json. In the future this will be expanded into a proper URI with several allowed prefixes such as file:// for SB located in a file and oda:// for SB that should be retrieved from the ODA.
Module view: Script Execution
Note
Diagrams are embedded as SVG images. If the text is too small, please use your web browser to zoom in to the images, which should be magnified without losing detail. Alternatively open image in a new tab with right click + Open in a new tab.
This view is a module view depicting the key components involved in script execution; that is, creating new Python processes that load a user script and run functions in that user script when requested.
Primary Presentation
Major classes responsible for the execution and management of user scripts.
Element Catalogue
Elements and their properties
Component |
Description |
---|---|
EmbeddedStringScript |
NOT IMPLEMENTED YET
|
Environment is a dataclass that holds the information required to identify a Python virtual environment and its location on disk. In addition, it holds synchronisation primitives to avoid race conditions between multiple requests to create the same environment, as would be the case for multiple requests to create virtual environments for the same git project and git commit hash. |
|
|
EnvironmentManager is responsible for creating and managing Environments, the custom Python virtual environments
in which a user script that requiring a non-default environment runs. Typically, this is the case for a request
to run a script located in a git repository, where the request requires a more recent version of the
ska-oso-scripting library or control scripts than was packaged with the OET.
|
Event |
The Event class manages a flag that can be set and/or inspected by multi Python processes. Events are commonly used to signify to observers of the Event that a condition has occurred. Event is part of the standard Python library. |
|
EventBusWorker is a QueueProcWorker that relays pubsub events seen in one EventBusWorker process to other EventBusWorker processes. See Module view: Script Execution UI and Service API for more information. |
ExecutableScript is an abstract class for any class that defines a Python script to be executed. |
|
|
FilesystemScript captures the information required to run a Python script located within the filesystem of a deployed OET backend. As an example, in a Kubernetes context this could point to a script contained in the default preinstalled scripting environment, or a script made available in a persistent volume mounted by the OET pod. |
GitScript captures the information required to run a Python script that is located in a git repository. It collects a set of identifying information that together can conclusively identify the specific script to be run, such as git repository, branch, tag, and commit hash. |
|
MainContext is the parent context for a set of worker processes that communicate via message queues. It defines
a consistent architecture for event-based communication between Python processes and consistent behaviour for
POSIX signal handling and process management.
|
|
MPQueue is an extension of the standard library multiprocessing.Queue that adds get/set methods that return booleans when the operation fails rather than raising exceptions, which makes the class easier to use in some contexts. |
|
Proc represents a child Python process of a MainContext.
|
|
ProcedureInput captures the anonymous positional arguments and named keyword arguments for a Python function call. ProcedureInput is used in the presentation model to help describe historic function calls as well as in the PrepareProcessCommand and StartProcessCommand to define the arguments for an upcoming call. |
|
ProcedureState is an enumeration defining the states that a Procedure (a child ScriptWorker process running a Python script) can be in. The states are:
|
|
ProcessManager is the parent for all script execution processes. Specifically, it is the parent of all the
ScriptWorker instances that run user code in a child Python process. ProcessManager is responsible for launching
ScriptWorker processes and communicating relaying requests such as ‘load user script X from git repository
Y’ ‘run main() function’ or ‘stop execution’ to the running scripts.
|
|
ProcWorker is a template class for code that should execute in a child Python interpreter process.
|
|
Queue |
Queue is a class that implements a multi-consumer, multi-producer FIFO queue that can be shared between Python processes. Queue is part of the standard Python library. |
QueueProcWorker is a ProcWorker that loops over items received on a message queue, calling the abstract main_func() function for every item received. Together with the ProcWorker base class functionality, QueueProcWorker will call main_func() for every event received for as long as the shutdown event is not set. |
|
ScriptExecutionService provides the high-level API for the script execution domain, presenting methods that
‘start script _Y_’ or ‘run method _Y_ of user script _Z_’. The ScriptExecutionService orchestrates control of the
ProcessManager and associated domain objects in order to satisfy an API request.
|
|
ScriptWorker is a class that can load a user script in a child process, running functions of that user script on
request.
|
Element Interfaces
The major public interface in these interactions is the ScriptExecutionService API. For more information on this
interface, please see the API documentation for
ScriptExecutionService
.
Element Behaviour
ScriptExecutionService
The sequence diagram below gives a high-level overview of how the
ScriptExecutionService
controls objects in the domain module to
meet requests to prepare, start, and stop user script execution.
ScriptExecutionService.prepare
The diagram below gives more detail on how the domain layer handles a request to prepare a script for execution.
ScriptWorker
The diagram below illustrates how a ScriptWorker
is created and how it
communicates startup success with the parent process.
ScriptWorker.main_loop
The diagram below depicts the main ScriptWorker
message loop, illustrating how
the various messages from the parent ProcessManager
are handled by child
ScriptWorker
.
Context Diagram
Variability Guide
N/A
Rationale
N/A
Module View: REST API
1. Interface Identity
OET REST API presents REST resources that can be used to manage the lifecycle of Python scripts running on a remote server and to inspect their status.
2. Resources
A ‘Procedure’ represents a Python script to run, or that is running, on the backend. The REST API operates on Procedures.
The standard workflow is to use the API to:
Instruct the backend to prepare a script for execution by using HTTP POST to upload a JSON
Procedure
to /api/v1/proceduresStart script execution by uploading an updated JSON
Procedure
with aProcedureState
ofRUNNING
.(optional) a running script can be terminated by using PUT to upload a JSON
Procedure
with aProcedureState
ofSTOPPED
.
The current status of a script execution can be inspected at any time by reading the JSON Procedure
with HTTP GET.
This workflow has been mapped to the following REST resources:
HTTP Method |
Resource URL |
Description |
---|---|---|
GET |
|
List procedures
|
GET |
|
Return a procedure definition |
GET |
|
Streaming realtime OET events
|
POST |
|
Prepare a new procedure
|
PUT |
|
Modify the state of a prepared procedure
|
An ‘Activity’ represents an action which a user will command the telescope to perform, eg ‘allocate’
HTTP Method |
Resource URL |
Description |
---|---|---|
GET |
|
List activities
|
GET |
|
Get activity
|
POST |
|
Prepare a new activity
|
3. Data Types and Constants
Type: Procedure
Procedure
is used to represent a Python script running in a Python process on the OET backend. Attributes are:
string uri
: read-only URI of this procedure. Defined by the server on procedure creation.
FileSystemScript/GitScript script
: Script details containing script_uri, e.g.,file:///path/to/obsscript.py
, and additional information like git arguments.
CallArgs script_args
: arguments provided to the script at initialisation time and main execution time.
ProcedureState state
: current state of thisProcedure
.
ProcedureHistory history
: timestamped execution history for thisProcedure
.
Example
Below is an example Procedure
JSON object. This resource
(located at URI http://localhost:5000/api/v1.0/procedures/1), represents a
script (located on disk at /path/to/observing_script.py), that has been loaded
and its initialisation method called with two arguments (e.g, the script init
function was called as
init(subarray_id=1, sb_uri=’file:///path/to/scheduling_block_123.json’)
. The
script is ready to execute but is not yet executing, as shown by its state
being READY
:
{
"script_args": {
"init": {
"args": [],
"kwargs": {
"sb_uri": "file:///path/to/scheduling_block_123.json",
"subarray_id": 1
}
},
"run": {
"args": [],
"kwargs": {}
}
},
"script": {
"script_type": "filesystem",
"script_uri": "file:///path/to/observing_script.py",
},
"history": {
"process_states": [
("CREATING", 1601463545.57689632),
("IDLE", 1601463545.57843814),
("LOADING", 1601463545.58043561),
("IDLE", 1601463545.58865546),
("RUNNING", 1601463545.62904726),
("READY", 1601463545.7789776)
],
"stacktrace": null
},
"state": "READY",
"uri": "http://localhost:5000/api/v1.0/procedures/1"
}
If user wanted to run script located in a git repository http://gitrepo.git
in branch test
, the script JSON would look as below:
{ ...
"script": {
"script_type": "git",
"script_uri": "git:///path/to/observing_script.py",
"git_args": {"git_repo": "http://gitrepo.git", "git_branch": "test"}
} ...
}
Type: FileSystemScript
FileSystemScript
represents the script to be run from the file system. It has script_uri
argument which
points to an observing script present on the file system and script_type
which has the value of filesystem
.
Type: GitScript
GitScript
inherits from FileSystemScript
, which means it also has a script_uri
argument and
script_type
of git
. Additionally it has an argument, GitArgs
which points to the git repository
the given script is located in. The arguments for GitArgs
are:
git_repo
which points to the full URL of the repository
git_branch
if specifying other than the defaultmaster
branch
git_commit
if wanting to point to a specific commit within the repository.
Type: CallArgs
CallArgs
represents the arguments to be passed to functions in the user script when those functions are called.
Attributes are:
FunctionArgs init
: arguments passed to the scriptinit
function at script creation and initialisation time.
FunctionArgs run
: arguments passed to the scriptmain
function when the main execution function is called.
Type: FunctionArgs
FunctionArgs
captures the positional arguments and keywords arguments (to be) passed to a Python
function. Attribute are:
list args
: list of positional arguments for the Python function, e.g.,"args": [1, 2, 3]
dict kwargs
: dictionary of keywords arguments, e.g.,"kwargs": {"subarray_id": 1}
Type: ProcedureState
ProcedureState
is an enumeration representing the current lifecycle state of the Python process running the user
script. It can be one of:
IDLE
: state between script preparation steps where no action is ongoing.
CREATING
: script creation has been started.
LOADING
: loading the specified script file to be executed.
READY
: script is ready to run specified function, e.g.init
ormain
.
RUNNING
: script is running, i.e., the script’sinit
ormain
function is currently executing.
STOPPED
: script was terminated by the OET before the script could complete.
COMPLETE
: the script completed successfully, i.e., themain
function completed and no exception was raised.
FAILED
: an exception was raised during script preparation or execution.
Type: ProcedureHistory
ProcedureHistory
represents a timeline of ProcedureStates
that the Procedure
has passed through. Attributes
are:
list process_states
: a List ofProcedureStates
and timestamps when thatProcedureState
was reached, e.g.process_states: [('CREATING', 18392174.543), ('RUNNING', 18392143.546), ('COMPLETE', 183925456.744)]
.
string stacktrace
: populated with the stacktrace from the script if the finalProcedureState
isFAILED
. This attribute is set to None for any other final state.
4. Error Handling
Accessing the URL of a Procedure
that does not exist on the backend or whose history has expired will result in a
HTTP 404 error:
tangodev@buster:~/ska/ska-oso-oet$ curl -i http://localhost:5000/api/v1.0/procedures/4
HTTP/1.0 404 NOT FOUND
Content-Type: application/json
Content-Length: 103
Server: Werkzeug/1.0.1 Python/3.7.3
Date: Thu, 18 Feb 2021 17:40:30 GMT
{"error": "404 Not Found", "type": "ResourceNotFound", "Message": "No information available for PID=4"}
5. Variability
None
6. Quality Attribute Characteristics
None
7. Rationale and Design Issues
The procedure history is limited, and at some point a Procedure REST resource will become unavailable as it becomes superseded by new Procedures and that history slot is reclaimed. This is not expected to be a problem as a maximum of one script can run at any one time, so even a small history allows a reasonable amount of time for that Procedure history to be inspected.
8. Usage Guide
The following examples show some interactions with the REST service from the command line, using curl to send input to the service and with responses output to the terminal.
Creating a procedure
The session below creates a new procedure, which loads the script and calls the script’s init() function, but does not commence execution. The created procedure is returned as JSON. Note that in the return JSON the procedure URI is defined. This URI can be used in a PUT request that commences script execution:
tangodev@buster:~/ska/ska-oso-oet$ curl -i -H "Content-Type: application/json" -X POST -d '{"script_uri":"file:///path/to/observing_script.py", "script_args": {"init": { "kwargs": {"subarray_id": 1, "sb_uri": "file:///path/to/scheduling_block_123.json"} } }}' http://localhost:5000/api/v1.0/procedures
HTTP/1.0 201 CREATED
Content-Type: application/json
Content-Length: 424
Server: Werkzeug/0.16.0 Python/3.7.3
Date: Wed, 15 Jan 2020 10:08:01 GMT
{
"procedure": {
"script_args": {
"init": {
"args": [],
"kwargs": {
"sb_uri": "file:///path/to/scheduling_block_123.json",
"subarray_id": 1
}
},
"run": {
"args": [],
"kwargs": {}
}
},
"script": {
"script_type": "filesystem",
"script_uri": "file:///path/to/observing_script.py"
},
"history": {
"process_states": [
("CREATING", 1601463545.7589678),
("IDLE", 1601463545.7598525),
("LOADING", 1601463545.7649524),
("IDLE", 1601463545.7668241),
("RUNNING", 1601463545.7694371),
("READY", 1601463545.7748005)
],
"stacktrace": null
},
"state": "READY",
"uri": "http://localhost:5000/api/v1.0/procedures/2"
}
}
Listing all procedures
The session below lists all procedures, both running and non-running. This example shows two procedures have been created: procedure #1 that will run resource_allocation.py, and procedure #2 that will run observing_script.py:
tangodev@buster:~/ska/ska-oso-oet$ curl -i http://localhost:5000/api/v1.0/procedures
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 913
Server: Werkzeug/0.16.0 Python/3.7.3
Date: Wed, 15 Jan 2020 10:11:42 GMT
{
"procedures": [
{
"script_args": {
"init": {
"args": [],
"kwargs": {
"dishes": [
1,
2,
3
]
}
},
"run": {
"args": [],
"kwargs": {}
}
},
"script": {
"script_type": "filesystem",
"script_uri": "file:///path/to/resource_allocation.py"
},
"history": {
"process_states": [
("CREATING", 1601463545.7589678),
("IDLE", 1601463545.7598525),
("LOADING", 1601463545.7649524),
("IDLE", 1601463545.7668241),
("RUNNING", 1601463545.7694371),
("READY", 1601463545.7748005)
],
"stacktrace": null
},
"state": "READY",
"uri": "http://localhost:5000/api/v1.0/procedures/1"
},
{
"script_args": {
"init": {
"args": [],
"kwargs": {
"sb_uri": "file:///path/to/scheduling_block_123.json",
"subarray_id": 1
}
},
"run": {
"args": [],
"kwargs": {}
}
},
"script": {
"script_type": "filesystem",
"script_uri": "file:///path/to/observing_script.py"
},
"history": {
"process_states": [
("CREATING", 1601463545.7589678),
("IDLE", 1601463545.7598525),
("LOADING", 1601463545.7649524),
("IDLE", 1601463545.7668241),
("RUNNING", 1601463545.7694371),
("READY", 1601463545.7748005)
],
"stacktrace": null
},
"state": "READY",
"uri": "http://localhost:5000/api/v1.0/procedures/2"
}
]
}
Listing one procedure
A specific procedure can be listed by a GET request to its specific URI. The session below lists procedure #1:
tangodev@buster:~/ska/ska-oso-oet$ curl -i http://localhost:5000/api/v1.0/procedures/1
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 417
Server: Werkzeug/0.16.0 Python/3.7.3
Date: Wed, 15 Jan 2020 10:18:26 GMT
{
"procedure": {
"script_args": {
"init": {
"args": [],
"kwargs": {
"dishes": [
1,
2,
3
]
}
},
"run": {
"args": [],
"kwargs": {}
}
},
"script": {
"script_type": "filesystem",
"script_uri": "file:///path/to/resource_allocation.py"
},
"history": {
"process_states": [
("CREATING", 1601463545.7589678),
("IDLE", 1601463545.7598525),
("LOADING", 1601463545.7649524),
("IDLE", 1601463545.7668241),
("RUNNING", 1601463545.7694371),
("READY", 1601463545.7748005)
],
"stacktrace": null
},
"state": "READY",
"uri": "http://localhost:5000/api/v1.0/procedures/1"
}
}
Starting procedure execution
The signal to begin script execution is to change the state of a procedure to
RUNNING
. This is achieved with a PUT request to the resource. Any
additional late-binding arguments to pass to the script’s run() function
should be defined in the ‘run’ script_args key.
The example below requests execution of procedure #2, with late binding kw argument scan_duration=14:
tangodev@buster:~/ska/ska-oso-oet$ curl -i -H "Content-Type: application/json" -X PUT -d '{"script_args": {"run": {"kwargs": {"scan_duration": 14.0}}}, "state": "RUNNING"}' http://localhost:5000/api/v1.0/procedures/2
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 467
Server: Werkzeug/0.16.0 Python/3.7.3
Date: Wed, 15 Jan 2020 10:14:06 GMT
{
"procedure": {
"script_args": {
"init": {
"args": [],
"kwargs": {
"sb_uri": "file:///path/to/scheduling_block_123.json",
"subarray_id": 1
}
},
"run": {
"args": [],
"kwargs": {
"scan_duration": 14.0
}
}
},
"script": {
"script_type": "filesystem",
"script_uri": "file:///path/to/observing_script.py"
},
"history": {
"process_states": [
("CREATING", 1601463545.7589678),
("IDLE", 1601463545.7598525),
("LOADING", 1601463545.7649524),
("IDLE", 1601463545.7668241),
("RUNNING", 1601463545.7694371),
("READY", 1601463545.7748005)
],
"stacktrace": null
}
"state": "READY",
"uri": "http://localhost:5000/api/v1.0/procedures/2"
}
}
Terminate process execution
The signal to abort script mid-execution is to change the state of a procedure to
STOPPED
. This is achieved with a PUT request to the resource. Additional argument
abort can be provided in the request which, when true, will execute an abort script
that will send Abort command to the sub-array device. The default value of abort is
False.
tangodev@buster:~/ska/ska-oso-oet$ curl -i -H "Content-Type: application/json" -X PUT -d '{"abort": true, "state": "STOPPED"}' http://localhost:5000/api/v1.0/procedures/2
HTTP/1.0 200 OK
Content-Type: application/json
Content-Length: 467
Server: Werkzeug/0.16.0 Python/3.7.3
Date: Wed, 15 Jan 2020 10:14:09 GMT
{"abort_message":"Successfully stopped script with ID 2 and aborted subarray activity "}
Listen to OET events
The session below lists all events published by oet scripts. This example shows two events, #1 request to available procedures #2 get the details of all the created procedures
tangodev@buster:~/ska/ska-oso-oet$ curl -i http://localhost:5000/api/v1.0/stream
HTTP/1.0 200 OK
Content-Type: text/event-stream; charset=utf-8
Connection: close
Server: Werkzeug/1.0.1 Python/3.7.3
Date: Mon, 02 Nov 2020 06:57:40 GMT
data:{"msg_src": "FlaskWorker", "pids": null, "topic": "request.procedure.list"}
id:1605017762.46912
data:{"msg_src": "SESWorker", "result": [], "topic": "procedure.pool.list"}
id:1605017762.46912
data:{"msg_src": "FlaskWorker", "cmd": {"py/object": "oet.procedure.application.application.PrepareProcessCommand", "script_uri": "file://scripts/eventbus.py", "init_args": {"py/object": "oet.procedure.domain.ProcedureInput", "args": {"py/tuple": []}, "kwargs": {"subarray_id": 1}}}, "topic": "request.procedure.create"}
id:1605017784.1536236
data:{"msg_src": "SESWorker", "result": {"py/object": "oet.procedure.application.application.ProcedureSummary", "id": 1, "script_uri": "file://scripts/eventbus.py", "script_args": {"init": {"py/object": "oet.procedure.domain.ProcedureInput", "args": {"py/tuple": []}, "kwargs": {"subarray_id": 1}}, "run": {"py/object": "oet.procedure.domain.ProcedureInput", "args": {"py/tuple": []}, "kwargs": {}}}, "history": {"py/object": "oet.procedure.domain.ProcedureHistory", "process_states": {"py/reduce": [{"py/type": "collections.OrderedDict"}, {"py/tuple": []}, null, null, {"py/tuple": [{"py/tuple": [{"py/reduce": [{"py/type": "oet.procedure.domain.ProcedureState"}, {"py/tuple": [1]}]}, 1605017786.0569353]}]}]}, "stacktrace": null}, "state": {"py/id": 5}}, "topic": "procedure.lifecycle.created"}
id:1605017784.1536236
ska_oso_oet.tango
The ska_oso_oet.tango module contains code that could be called from observing scripts. Primarily, this will
involve interactions with ska_oso_oet.tango.TangoExecutor
.
- class ska_oso_oet.tango.TangoExecutor(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]
TangoExecutor is the proxy between calling code and Tango devices. It accepts encapsulated Tango interactions and performs them on behalf of the calling code.
- __init__(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]
Create a new TangoExecutor.
- Parameters:
proxy_factory – a function or object which, when called, returns an object that conforms to the PyTango DeviceProxy interface.
- execute(command: Command, **kwargs)[source]
Execute a Command on a Tango device.
Additional kwargs to the DeviceProxy can be specified if required.
- Parameters:
command – the command to execute
- Returns:
the response, if any, returned by the Tango device
- read(attribute: Attribute)[source]
Read an attribute on a Tango device.
- Parameters:
attribute – the attribute to read
- Returns:
the attribute value
- class ska_oso_oet.tango.Attribute(device: str, name: str)[source]
An abstraction of a Tango attribute.
- class ska_oso_oet.tango.Command(device: str, command_name: str, *args, **kwargs)[source]
An abstraction of a Tango command.
- __init__(device: str, command_name: str, *args, **kwargs)[source]
Create a Tango command. :param device: the FQDN of the target Tango device :param command_name: the name of the command to execute :param args: unnamed arguments to be passed to the command :param kwargs: keyword arguments to be passed to the command
ska_oso_oet.features
The features module contains code handling the setting and reading of OET feature flags. OET feature flags are configured once, at deployment time, and are not reconfigured during execution.
Feature flag values are set from, in order:
environment variables,
an .ini file
default values set in code
- class ska_oso_oet.features.Features(config_parser: ConfigParser)[source]
The Features class holds flags for OET features that can be toggled.
- __init__(config_parser: ConfigParser)[source]
ska_oso_oet
Reading ska_oso_oet.ini file value and initializing constant of feature toggle with enabling event based polling/pubsub
ska_oso_oet.main
ska_oso_oet.tango
- class ska_oso_oet.tango.TangoDeviceProxyFactory[source]
A call to create Tango DeviceProxy clients. This class exists to allow unit tests to override the factory with an implementation that returns mock DeviceProxy instances.
- class ska_oso_oet.tango.TangoExecutor(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]
TangoExecutor is the proxy between calling code and Tango devices. It accepts encapsulated Tango interactions and performs them on behalf of the calling code.
- class SingleQueueEventStrategy(mgr: SubscriptionManager)[source]
SingleQueueEventStrategy encapsulates the event handling behaviour of the TangoExecutor from ~October 2021, when all events were added to a single queue and subscriptions were created and released after each attribute read operation.
We hope to replace this with a more advanced implementation that allows subscriptions to multiple events.
- Parameters:
mgr – SubscriptionManager instance used to observe events
- __init__(mgr: SubscriptionManager)[source]
- notify(evt: tango.EventData)[source]
This implements the SubscriptionManager EventObserver interface. Tango ChangeEvents republished by the SubscriptionManager are received via this method.
Queue is thread-safe so we do not need to synchronise this method with read_event.
- read_event(attr: Attribute) tango.EventData [source]
Read an event from the queue. This function blocks until an event is received.
With a single subscription active at any one time, the attribute is ignored by this implementation but is expected to be required by strategy that support multiple attribute subscriptions.
- subscribe_event(attr: Attribute) int [source]
Subscribe to change events published by a Tango attribute.
This strategy only supports one active subscription at any time. An exception will be raised if a second subscription is attempted.
This method returns a subscription identifier which should be supplied to a subsequent unsubscribe_event method.
- Parameters:
attr – attribute to subscribe to
- Returns:
subscription identifier
- unsubscribe_event(attr: Attribute, subscription_id: int) None [source]
Unsubscribe to change events published by a Tango attribute.
This strategy only supports one active subscription at any time. An exception will be raised if a second subscription is attempted.
- Parameters:
attr – attribute to unsubscribe from
subscription_id – subscription identifier
- __init__(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]
Create a new TangoExecutor.
- Parameters:
proxy_factory – a function or object which, when called, returns an object that conforms to the PyTango DeviceProxy interface.
- execute(command: Command, **kwargs)[source]
Execute a Command on a Tango device.
Additional kwargs to the DeviceProxy can be specified if required.
- Parameters:
command – the command to execute
- Returns:
the response, if any, returned by the Tango device
- read(attribute: Attribute)[source]
Read an attribute on a Tango device.
- Parameters:
attribute – the attribute to read
- Returns:
the attribute value
- class ska_oso_oet.tango.SubscriptionManager(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]
SubscriptionManager is a proxy for Tango event subscriptions that prevents duplicate subscriptions and minimises subscribe/unsubscribe calls.
Previously, each time a script listened to an event, it would subscribe to an event, wait for reception of the appropriate event, then unsubscribe. These multiple subscribe/unsubscribe calls were found to create problems. SubscriptionManager was introduced to manage subscriptions, with the aim of having fewer, longer-lived subscriptions. Clients subscribe to the SubscriptionManager, and the SubscriptionManager handles any required subscriptions to Tango devices.
The SubscriptionManager component is responsible for managing events and event subscriptions in the OET. The SubscriptionManager sits as a proxy between client and Tango event subscriptions, moving the pub/sub layer accessed by clients away from the Tango layer and into the OET layer. Clients register with the SubscriptionManager as observers of an attribute. If required, one long-lived Tango subscription per attribute is created on demand by the SubscriptionManager. The SubscriptionManager relays received Tango events to all attribute observers registered at the time of event reception. Unregistering an observer from the SubscriptionManager prevents subsequent notifications but does not affect the underlying Tango event subscription, which continues to operate until the Python interpreter exits.
Legacy calling code expects a maximum of one subscription to be active at any one time. Additionally, the caller always sandwiched
read_event
calls betweensubscribe_attribute
andunsubscribe_attribute
calls. Together, this meant subscriptions were short-lived, existing for the duration of a single attribute monitoring operation, and that one Queue to hold events was sufficient as there would only ever be one Tango event subscription. To maintain this legacy behaviour,subscribe_attribute
andunsubscribe_attribute
register and unregister the TangoExecutor as an observer of events, with theTangoExecutor.notify
method adding received events to the TangoExecutor queue read by the legacyTangoExecutor.read_event
method.Class diagram for components involved in OET event handling
Sequence diagram from OET event handling
- Members:
- register_observer(attr: Attribute, observer)[source]
Register an EventObserver as an observer of a Tango attribute.
Once registered, the EventObserver will be notified of each Tango event published by the attribute.
- Parameters:
attr – Tango attribute to observe
observer – the EventObserver to notify
- class ska_oso_oet.tango.LocalScanIdGenerator(start=1)[source]
LocalScanIdGenerator is an abstraction of a service that will generate scan IDs as unique integers. Expect scan UID generation to be a database operation or similar in the production implementation.
- property value
Get the current scan ID.
- class ska_oso_oet.tango.RemoteScanIdGenerator(hostname)[source]
RemoteScanIdGenerator connects to the skuid service to retrieve IDs
- property value
Get the current scan ID.
- class ska_oso_oet.tango.Callback[source]
Callback is an observable that distributes Tango events received by the callback instance to all observers registered at the moment of event reception.
- notify_observers(evt: tango.EventData)[source]
Distribute an event to all registered observers.
- Parameters:
evt – event to distribute
ska_oso_oet.ui
ska_oso_oet.activity
ska_oso_oet.activity.application
The ska_oso_oet.activity.application module contains code related to OET ‘activities’ that belong in the application layer. This application layer holds the application interface, delegating to objects in the domain layer for business rules and actions.
- class ska_oso_oet.activity.application.ActivityService[source]
ActivityService provides the high-level interface and facade for the activity domain.
The interface is used to run activities referenced by Scheduling Blocks. Each activity will run a script (or procedure) but ActivityService will create the necessary commands for Procedure domain to create and execute the scripts.
- complete_run_activity(prepared_summary: ProcedureSummary, request_id: int) ActivitySummary | None [source]
Complete the request to run the Activity, using the ProcedureSummary that is now available. This includes updating the Activity with the procedure_id, sending the request to start the procedure if prepare_only is not set to True, and returning the ActivitySummary.
- Parameters:
prepared_summary – the ProcedureSummary for the Procedure related to the requested Activity
request_id – The original request_id from the REST layer
- Returns:
an ActivitySummary describing the state of the Activity that the Procedure is linked to, or None if the Procedure was not created from an Activity
- prepare_run_activity(cmd: ActivityCommand, request_id: int) None [source]
Prepare to run the activity of a Scheduling Block. This includes retrieving the script from the scheduling block and sending the request messages to the ScriptExecutionService to prepare the script.
The request_id is required to be propagated through the messages sent to the Procedure layer, so the REST layer can wait for the correct response event.
- Parameters:
cmd – dataclass argument capturing the activity name and SB ID
request_id – The original request_id from the REST layer
- summarise(activity_ids: List[int] | None = None) List[ActivitySummary] [source]
Return ActivitySummary objects for Activities with the requested IDs.
This method accepts an optional list of integers, representing the Activity IDs to summarise. If the IDs are left undefined, ActivitySummary objects for all current Activities will be returned.
- Parameters:
activity_ids – optional list of Activity IDs to summarise.
- Returns:
list of ActivitySummary objects
ska_oso_oet.activity.domain
The ska_oso.activity.domain module contains code that belongs to the activity domain layer. Classes and definitions contained in this domain layer define the high-level concepts used to describe and launch scheduling block activities.
- class ska_oso_oet.activity.domain.Activity(activity_id: int, procedure_id: int | None, sbd_id: str, activity_name: str, prepare_only: bool)[source]
Activity represents an action taken on a scheduling block.
An activity maps to a script that accomplishes the activity’s goal. In a telescope control context, activities and goals could be ‘allocate resources for this SB’, ‘observe this SB’, etc. That is, users talk about doing something with the SB; their focus is not on which script needs to run and what script parameters are required to accomplish that task.
ska_oso_oet.activity.ui
The ska_oso_oet.activity.ui module contains code that belongs to the activity UI/presentation layer. This layer is the means by which external users or systems would interact with activities.
- ska_oso_oet.activity.ui.make_public_activity_summary(activity: ActivitySummary)[source]
Convert an ActivitySummary into JSON ready for client consumption.
The main use of this function is to replace the internal Activity ID with the resource URI, e.g., 1 -> http://localhost:5000/api/v1.0/procedures/1
- Parameters:
activity – ActivitySummary to convert
- Returns:
safe JSON representation
ska_oso_oet.event.topics
- class ska_oso_oet.event.topics.activity[source]
Root topic for events related to activities.
- class ska_oso_oet.event.topics.procedure[source]
Root topic for events related to procedures.
- class lifecycle[source]
Topic for events related to procedure lifecycle.
- class complete[source]
Emitted when a Procedure has completed successfully and is no longer available to be called.
- class created[source]
Emitted when a procedure is created, i.e., a script is loaded and Python interpreter initialised.
- class started[source]
Emitted when any user function in a procedure is running, i.e., script init is called
- class ska_oso_oet.event.topics.request[source]
Root topic for events emitted when a user or system component has made a request.
- class activity[source]
Topic for user requests related to activities.
- class procedure[source]
Topic for user requests related to procedures.
- class ska_oso_oet.event.topics.sb[source]
Root topic for events emitted relating to Scheduling Blocks
- class lifecycle[source]
Topic for events related to Scheduling Block lifecycle
- class ska_oso_oet.event.topics.scan[source]
Root topic for events emitted relating to Scans in the context of SB execution
- class lifecycle[source]
Topic for events related to SB scan lifecycle
- class configure[source]
Emitted when sub-array resources are configured for a scan
- class ska_oso_oet.event.topics.subarray[source]
Root topic for events emitted relating to individual Subarray activites
- class resources[source]
Topic for events relating to Subarray resources
ska_oso_oet.mptools
Top-level package for Multiprocessing Tools.
This package is substantially based on Pamela D McA’Nulty’s mptools project, which is hosted at
Pamela presents an excellent article given an overview of the MPTools package at
MPTools is subject to the MIT licence.
MIT License
Copyright (c) 2019, Pamela D McA’Nulty
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- class ska_oso_oet.mptools.EventMessage(msg_src: str, msg_type: str, msg: Any)[source]
EventMessage holds the message and message metadata for events sent on the event queue between MPTools ProcWorkers.
- class ska_oso_oet.mptools.MPQueue(maxsize=0, *, ctx)[source]
MPQueue is a multiprocessing Queue extended with convenience methods that return booleans to reflect success and failure rather than raising exceptions.
- MPQueue adds methods to:
get next item in an exception-free manner
put an item in an exception-free manner
drain queue to allow safe closure
close queue in an exception-free manner
- drain()[source]
Drain all items from this MPQueue, yielding each item until all items have been removed.
- safe_close() int [source]
Drain and close this MPQueue.
No more items can be added to this MPQueue one safe_close has been called.
- safe_get(timeout: float | None = 0.02)[source]
Remove and return an item from this MPQueue.
If optional arg timeout is None, safe_get returns an item if one is immediately available. If optional arg timeout is a positive number (the default), safe_get blocks at most timeout seconds for an item to become available. In either case, None is returned if no item is available.
- Parameters:
timeout – maximum timeout in seconds, or None for no waiting period
- Returns:
None if no item is available
- safe_put(item, timeout: float | None = 0.02) bool [source]
Put an item on this MPQueue.
safe_put adds an item onto the queue if a free slot is available, blocking at most timeout seconds for a free slot and returning False if no free slot was available within that time.
- Parameters:
item – item to add
timeout – timeout in seconds
- Returns:
True if the operation succeeded within the timeout
- class ska_oso_oet.mptools.MainContext(mp_ctx: BaseContext | None = None)[source]
MainContext is the parent context for a set of worker processes that communicate via message queues.
- MPQueue(*args, **kwargs) MPQueue [source]
Create a new message queue managed by this context.
- Parameters:
args – queue constructor args
kwargs – queue constructor kwargs
- Returns:
message queue instance
- Proc(name: str, worker_class: Type[ProcWorker], *args, **kwargs) Proc [source]
Create a new process managed by this context.
- Parameters:
name – name for worker process
worker_class – worker process class
args – any worker class constructor args
kwargs – any worker class constructor kwargs
- Returns:
worker instance
- stop_procs() Tuple[int, int] [source]
Stop all ProcWorkers managed by this MPContext.
stop_procs requests cooperative shutdown of running ProcWorkers before escalating to more forceful methods using POSIX signals.
This function returns with a 2-tuple, the first item indicating the number of ProcWorkers that returned a non-zero exit status on termination, the second item indicating the number of ProcWorkers that required termination.
- Returns:
tuple of process termination stats
- class ska_oso_oet.mptools.Proc(mp: BaseContext, name: str, worker_class: Type[ProcWorker], shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]
Proc represents a child process of a MainContext.
Proc instances exist in the scope of a MainContext instance and in the same Python interpreter process as the MainContext. Procs are the MainContext’s link to the ProcWorkers which run in separate Python interpreters. Every ProcWorker running in a child process is associated with one Proc.
Each Proc is responsible for bootstrapping its ProcWorker and managing its lifecycle. Proc arranges for an instance of the ProcWorker class passed as a constructor argument to be initialised and start running in a new child Python interpreter. Proc checks that the ProcWorker has started successfully by checking the status of a multiprocessing Event passed to the ProcWorker as a constructor argument, which should be set by the ProcWorker on successful startup. If ProcWorker startup does not complete successfully and the event is left unset, Proc will forcibly terminate the child process and report the error.
Proc is able to terminate its associated ProcWorker, first by giving the ProcWorker chance to co-operatively exit by setting the shutdown event. If the ProcWorker does not respond by exiting within the grace period set by Proc.SHUTDOWN_WAIT_SECS, Proc will forcibly terminate the ProcWorker’s process.
Proc ensures that the shutdown event and MPQueues it receives are passed through to the ProcWorker. Note that by default only one shutdown event is created by the MainContext, so setting the shutdown event triggers shutdown in all ProcWorkers!
Proc does not contain any business logic or application-specific code, which should be contained in the ProcWorker - or more likely, a class that extends ProcWorker.
- __init__(mp: BaseContext, name: str, worker_class: Type[ProcWorker], shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]
- full_stop(wait_time=3.0) None [source]
Stop the ProcWorker child process.
The method will attempt to terminate ProcWorker execution, first by setting the shutdown event and giving the ProcWorker opportunity to cleanly exit. If the ProcWorker has not terminated after wait_time seconds, SIGTERM signals are sent to the child process hosting the ProcWorker.
- Parameters:
wait_time – grace time before sending SIGTERM signals
- terminate(max_retries=3, timeout=0.1) bool [source]
Terminate the child process using POSIX signals.
This function sends SIGTERM to the child process, waiting timeout seconds before checking process status and, if the process is still alive, trying again.
- Parameters:
max_retries – max retry attempts
timeout – second to wait before retry
- Returns:
True if process termination was successful
- class ska_oso_oet.mptools.ProcWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]
ProcWorker is a template class for code that should execute in a child Python interpreter process.
ProcWorker contains the standard boilerplate code required to set up a well-behaved child process. It handles starting the process, connecting signal handlers, signalling the parent that startup completed, etc. ProcWorker does not contain any business logic, which should be defined in a subclass of ProcWorker.
The core ProcWorker template method is main_loop, which is called once startup is complete and main execution begins. In ProcWorker this method is left blank and should be overridden by the class extending ProcWorker. Once the main_loop method is complete, the ProcWorker is shut down.
MPTools provides some ProcWorker subclasses with main_loop implementations that provide different kinds of behaviour. For instance,
TimerProcWorker.main_loop has code calls a function on a fixed cadence;
QueueProcWorker.main_loop has code that gets items from a queue, calling a function with every item received.
- __init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]
Create a new ProcWorker.
- Parameters:
name – name of this worker
startup_event – event to set on startup completion
shutdown_event – event to monitor for shutdown
event_q – queue for messages to/from MainWorker
args –
- init_signals() SignalObject [source]
Initialise signal handlers for this worker process.
Calling this method will install SIGTERM and SIGINT signal handlers for the running process.
- static int_handler(signal_object: SignalObject, exception_class, signal_num: int, current_stack_frame: frame | None) None
Custom signal handling function that requests co-operative ProcWorker shutdown by setting the shared Event, forcibly terminating the process by raising an instance of the given exception class if call limit has been exceeded.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
- run() int [source]
Start ProcWorker execution.
This method performs the housekeeping required to set the worker instance running and starts the main loop. An exit code of 0 is returned if the main loop completes and exits cleanly.
- Returns:
exit status code
- static term_handler(signal_object: SignalObject, exception_class, signal_num: int, current_stack_frame: frame | None) None
Custom signal handling function that requests co-operative ProcWorker shutdown by setting the shared Event, forcibly terminating the process by raising an instance of the given exception class if call limit has been exceeded.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
- class ska_oso_oet.mptools.QueueProcWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, **kwargs)[source]
QueueProcWorker is a ProcWorker that calls main_func with every item received on its work queue.
- __init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, **kwargs)[source]
Create a new QueueProcWorker.
The events and MPQueues passed to this constructor should be created and managed within the scope of a MainContext context manager and shared with other ProcWorkers, so that the communication queues are shared correctly between Python processes and there is a common event that can be set to notify all processes when shutdown is required.
- Parameters:
name – name of this worker
startup_event – event to trigger when startup is complete
shutdown_event – event to monitor for shutdown
event_q – outbox for posting messages to main context
work_q – inbox message queue for work messages
args – captures other anonymous arguments
kwargs – captures other keyword arguments
- class ska_oso_oet.mptools.SignalObject(shutdown_event: Event)[source]
SignalObject is a struct holding properties and state referenced by mptools signal handlers during their processing.
Setting the SignalObject.shutdown_event will request all MPTools processes cooperatively shut down. SignalObject also records how many times a signal has been received, allowing escalation for processes that do not co-operate with shutdown_event requests.
- class ska_oso_oet.mptools.TimerProcWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]
TimerProcWorker is a ProcWorker that calls main_func on a fixed cadence.
- ska_oso_oet.mptools.default_signal_handler(signal_object: SignalObject, exception_class, signal_num: int, current_stack_frame: frame | None) None [source]
Custom signal handling function that requests co-operative ProcWorker shutdown by setting the shared Event, forcibly terminating the process by raising an instance of the given exception class if call limit has been exceeded.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
- ska_oso_oet.mptools.init_signals(shutdown_event, int_handler, term_handler) SignalObject [source]
Install SIGINT and SIGTERM signal handlers for the running Python process.
This function returns the SignalObject shared with signal handlers that the handlers use to store signal handling state.
- Parameters:
shutdown_event – Event to set when SIGINT or SIGTERM is received
int_handler – SIGINT handler function to install
term_handler – SIGTERM handler function to install
- Returns:
SignalObject processed by signal handlers
- ska_oso_oet.mptools.proc_worker_wrapper(proc_worker_class: Type[ProcWorker], name: str, startup_evt: Event, shutdown_evt: Event, event_q: MPQueue, *args, **kwargs)[source]
This function is called to launch the worker task from within the child process.
- Parameters:
proc_worker_class – worker class to instantiate
name – name for this ProcWorker
startup_evt – start-up event to share with worker
shutdown_evt – shutdown event to share with worker
event_q – event queue to share with worker
args – any additional arguments to give to worker constructor
- Returns:
ska_oso_oet.procedure
ska_oso_oet.procedure.application
The ska_oso_oet.procedure.application module holds classes and functionality that belong in the application layer of the OET. This layer holds the application interface, delegating to objects in the domain layer for business rules and actions.
- class ska_oso_oet.procedure.application.ArgCapture(fn: str, fn_args: ProcedureInput, time: float | None = None)[source]
ArgCapture is a struct to record function call and time of invocation.
- class ska_oso_oet.procedure.application.PrepareProcessCommand(script: ExecutableScript, init_args: ProcedureInput)[source]
PrepareProcessCommand is input argument dataclass for the ScriptExecutionService prepare command. It holds all the information required to load and prepare a Python script ready for execution.
- __init__(script: ExecutableScript, init_args: ProcedureInput) None
- class ska_oso_oet.procedure.application.ProcedureHistory(process_states: List[Tuple[ProcedureState, float]] | None = None, stacktrace=None)[source]
ProcedureHistory is a non-functional dataclass holding execution history of a Procedure spanning all transactions.
- process_states: records time for each change of ProcedureState (list of
tuples where tuple contains the ProcedureState and time when state was changed to)
- stacktrace: None unless execution_error is True in which case stores
stacktrace from process
- class ska_oso_oet.procedure.application.ProcedureSummary(id: int, script: ExecutableScript, script_args: List[ArgCapture], history: ProcedureHistory, state: ProcedureState)[source]
ProcedureSummary is a brief representation of a runtime Procedure. It captures essential information required to describe a Procedure and to distinguish it from other Procedures.
- __init__(id: int, script: ExecutableScript, script_args: List[ArgCapture], history: ProcedureHistory, state: ProcedureState) None
- class ska_oso_oet.procedure.application.ScriptExecutionService(mp_context: BaseContext | None = None, abort_script: ExecutableScript = FileSystemScript(script_uri='file:///home/docs/checkouts/readthedocs.org/user_builds/ska-telescope-ska-oso-oet/checkouts/5.2.0/src/ska_oso_oet/procedure/abort.py'), on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
ScriptExecutionService provides the high-level interface and facade for the script execution domain (i.e., the ‘procedure’ domain).
The interface is used to load and run Python scripts in their own independent Python child process.
The shutdown method should be called to ensure cleanup of any multiprocessing artefacts owned by this service.
- __init__(mp_context: BaseContext | None = None, abort_script: ExecutableScript = FileSystemScript(script_uri='file:///home/docs/checkouts/readthedocs.org/user_builds/ska-telescope-ska-oso-oet/checkouts/5.2.0/src/ska_oso_oet/procedure/abort.py'), on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
Create a new ScriptExecutionService.
The .stop() method of this ScriptExecutionService can run a second script once the current process has been terminated. By default, this second script calls SubArrayNode.abort() to halt further activities on the sub-array controlled by the terminated script. To run a different script, define the script URI in the abort_script_uri argument to this constructor.
- Parameters:
mp_context – multiprocessing context to use or None for default
abort_script – post-termination script for two-phase abort
on_pubsub – callbacks to call when PUBSUB message is received
- prepare(cmd: PrepareProcessCommand) ProcedureSummary [source]
Load and prepare a Python script for execution, but do not commence execution.
- Parameters:
cmd – dataclass argument capturing the script identity and load arguments
- Returns:
- start(cmd: StartProcessCommand) ProcedureSummary [source]
Start execution of a prepared procedure.
- Parameters:
cmd – dataclass argument capturing the execution arguments
- Returns:
- stop(cmd: StopProcessCommand) List[ProcedureSummary] [source]
Stop execution of a running procedure, optionally running a second script once the first process has terminated.
- Parameters:
cmd – dataclass argument capturing the execution arguments
- Returns:
- summarise(pids: List[int] | None = None) List[ProcedureSummary] [source]
Return ProcedureSummary objects for Procedures with the requested IDs.
This method accepts an optional list of integers, representing the Procedure IDs to summarise. If the pids is left undefined, ProcedureSummary objects for all current Procedures will be returned.
- Parameters:
pids – optional list of Procedure IDs to summarise.
- Returns:
list of ProcedureSummary objects
- class ska_oso_oet.procedure.application.StartProcessCommand(process_uid: int, fn_name: str, run_args: ProcedureInput, force_start: bool = False)[source]
StartProcessCommand is the input argument dataclass for the ScriptExecutionService start command. It holds the references required to start a prepared script process along with any late-binding runtime arguments the script may require.
- class ska_oso_oet.procedure.application.StopProcessCommand(process_uid: int, run_abort: bool)[source]
StopProcessCommand is the input argument dataclass for the ScriptExecutionService Stop command. It holds the references required to Stop a script process along with any late-binding runtime arguments the script may require.
ska_oso_oet.procedure.domain
The ska_oso_oet.procedure.domain module holds domain entities from the script execution domain. Entities in this domain are things like scripts, OS processes, process supervisors, signal handlers, etc.
- class ska_oso_oet.procedure.domain.ExecutableScript[source]
Base class for all executable scripts.
Expected specialisations:
scripts on filesystem
scripts in git repository
scripts given as a string
scripts stored in the ODA
etc.
- class ska_oso_oet.procedure.domain.FileSystemScript(script_uri: str)[source]
Represents a script stored on the file system.
- class ska_oso_oet.procedure.domain.GitScript(script_uri: str, git_args: GitArgs, create_env: bool | None = False)[source]
Represents a script in a git repository.
- class ska_oso_oet.procedure.domain.LifecycleMessage(msg_src: str, new_state: ProcedureState)[source]
LifecycleMessage is a message type for script lifecycle events.
- __init__(msg_src: str, new_state: ProcedureState)[source]
- class ska_oso_oet.procedure.domain.ModuleFactory[source]
Factory class used to return Python Module instances from a variety of storage back-ends.
- static get_module(script: ExecutableScript)[source]
Load Python code from storage, returning an executable Python module.
- Parameters:
script – Script object describing the script to load
- Returns:
Python module
- class ska_oso_oet.procedure.domain.ProcedureInput(*args, **kwargs)[source]
ProcedureInput is a non-functional dataclass holding the arguments passed to a script method.
- class ska_oso_oet.procedure.domain.ProcedureState(value)[source]
Represents the script execution state.
- class ska_oso_oet.procedure.domain.ProcessManager(mp_context: BaseContext | None = None, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
ProcessManager is the parent for all ScriptWorker processes.
ProcessManager is responsible for launching ScriptWorker processes and communicating API requests such as ‘run main() function’ or ‘stop execution’ to the running scripts. If a script execution process does not respond to the request, the process will be forcibly terminated. ProcessManager delegates to the mptools framework for process management functionality. Familiarity with mptools is useful in understanding ProcessManager functionality.
ProcessManager is also responsible for communicating script events to the rest of the system, such as events issued by the script or related to the script execution lifecycle.
It is recommended that ProcessManager.shutdown() be called before the ProcessManager is garbage collected. Failure to call shutdown could break the any multiprocessing state held in the scope of the manager or its child processes. This may or may not be a problem, depending on what is held and whether that state is used elsewhere. In short, be safe and call shutdown().
Note: ProcessManager does not maintain a history of script execution. History is recorded and managed by the ScriptExecutionService.
- __init__(mp_context: BaseContext | None = None, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
Create a new ProcessManager.
Functions passed in the on_pubsub argument will be called by the ProcessManager every time the ProcessManager’s message loop receives a PUBSUB EventMessage. Callbacks should not perform significant processing on the same thread, as this would block the ProcessManager event loop.
- Parameters:
mp_context – multiprocessing context use to create multiprocessing primitives
on_pubsub – functions to call when a PUBSUB message is received
- create(script: ExecutableScript, *, init_args: ProcedureInput) int [source]
Create a new Procedure that will, when executed, run the target Python script.
Objects that can only be shared through inheritance, such as multiprocessing object, can be shared by providing them as init_args here. These arguments will be provided to the init function in the user script, where present.
- Parameters:
script – script URI, e.g. ‘file://myscript.py’
init_args – script initialisation arguments
- Returns:
- run(process_id: int, *, call: str, run_args: ProcedureInput, force_start: bool = False) None [source]
Run a prepared Procedure.
This starts execution of the script prepared by a previous create() call.
- Parameters:
process_id – ID of Procedure to execute
call – name of function to call
run_args – late-binding arguments to provide to the script
force_start – Add run command to queue even if the script is not yet ready to run. Does not add command to queue if ProcedureState is FAILED, STOPPED, COMPLETE or UNKNOWN
- Returns:
- class ska_oso_oet.procedure.domain.ScriptWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, scan_counter: Value | None = None, environment: Environment | None = None, **kwargs)[source]
ScriptWorker loads user code in a child process, running functions of that user code on request.
ScriptWorker acts when a message is received on its work queue. It responds to four types of messages:
LOAD - to load the specified code in this process
ENV - to install the dependencies for the specified script in this process
RUN - to run the named function in this process
PUBSUB - external pubsub messages that should be published locally
ScriptWorker converts external inter-process mptool pub/sub messages to intra-process pypubsub pub/sub messages. That is, EventMessages received on the local work queue are rebroadcast locally as pypubsub messages. Likewise, the ScriptWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other interested ScriptWorkers.
- __init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, scan_counter: Value | None = None, environment: Environment | None = None, **kwargs)[source]
Create a new ProcWorker.
- Parameters:
name – name of this worker
startup_event – event to set on startup completion
shutdown_event – event to monitor for shutdown
event_q – queue for messages to/from MainWorker
args –
- main_loop() None [source]
main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications.
Event delivery will cease when the shutdown event is set or a special sentinel message is sent.
- publish_lifecycle(new_state: ProcedureState)[source]
Broadcast a lifecycle status change event.
- Parameters:
new_state – new lifecycle state
- republish(topic: pubsub.pub.Topic = pubsub.pub.AUTO_TOPIC, **kwargs) None [source]
Republish a local pypubsub event over the inter-process mptools event bus.
- Parameters:
topic – message topic, set automatically by pypubsub
kwargs – any metadata associated with pypubsub message
- Returns:
- static term_handler(signal_object, exception_class, signal_num: int, current_stack_frame) None
Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.
We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
- ska_oso_oet.procedure.domain.script_signal_handler(signal_object, exception_class, signal_num: int, current_stack_frame) None [source]
Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.
We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
ska_oso_oet.procedure.environment
- class ska_oso_oet.procedure.environment.Environment(env_id: str, creating: <bound method BaseContext.Event of <multiprocessing.context.DefaultContext object at 0x7f04a51c3520>>, created: <bound method BaseContext.Event of <multiprocessing.context.DefaultContext object at 0x7f04a51c3520>>, location: str, site_packages: str)[source]
ska_oso_oet.procedure.gitmanager
Static helper functions for cloning and working with a Git repository
ska_oso_oet.procedure.ui
The ska_oso_oet.procedure.ui package contains code that belong to the OET procedure UI layer. This consists of the Procedure REST resources.
- ska_oso_oet.procedure.ui.create_procedure()[source]
Create a new Procedure.
This method requests creation of a new Procedure as specified in the JSON payload POSTed to this function.
- Returns:
JSON summary of created Procedure
- ska_oso_oet.procedure.ui.get_procedure(procedure_id: int)[source]
Get a Procedure.
This returns the Procedure JSON representation of the requested Procedure.
- Parameters:
procedure_id – ID of the Procedure to return
- Returns:
Procedure JSON
- ska_oso_oet.procedure.ui.get_procedures()[source]
List all Procedures.
This returns a list of Procedure JSON representations for all Procedures held by the service.
- Returns:
list of Procedure JSON representations
- ska_oso_oet.procedure.ui.make_public_procedure_summary(procedure: ProcedureSummary)[source]
Convert a ProcedureSummary into JSON ready for client consumption.
The main use of this function is to replace the internal Procedure ID with the resource URI, e.g., 1 -> http://localhost:5000/api/v1.0/procedures/1
- Parameters:
procedure – Procedure to convert
- Returns:
safe JSON representation
ska_oso_oet.utils
The ska_oso_oet.utils.ui module contains common helper code for the UI layers.
- ska_oso_oet.utils.ui.convert_request_dict_to_procedure_input(fn_dict: dict) ProcedureInput [source]
Convert the dict of arguments for a single function into the domain.ProcedureInput
- Parameters:
fn_dict – Dict of the args and kwargs, eg {‘args’: [1, 2], ‘kwargs’: {‘subarray_id’: 42}}
- Returns:
The ProcedureInput, eg <ProcedureInput(1, 2, subarray_id=42)>
Observation Execution Tool
Project description
The ska-oso-oet project contains the code for the Observation Execution Tool (OET), the application which provides on-demand Python script execution for the SKA.
Overview
The core of the OET is a script execution engine which runs a requested script in a child Python process. The engine supervises script execution, in that it can terminate the script at any time when requested, and captures the output and/or errors generated by the script for inspection by a (remote) client.
A REST layer makes the Python API for the script execution engine available via REST over HTTP. This project also contains a command line client to allow users to submit script execution requests to a remote OET backend.
The REST layer is made up of two components that work together to provide the remote script execution functionality:
The OET REST server maintains a list of the scripts that have been loaded and their current state. The server implements the interface specified by the OET Module View: REST API.
The OET OET command line tool provides a Command Line Interface (CLI) to the OET backend.
More details on the OET architecture can be found in C&C view: OET client and OET backend.
Note
SKA control scripts are not packaged as part of this project. The repository of observing scripts executed by the OET can be found in the ska-oso-scripting project.
Quickstart
Build a new OET image:
make oci-build
Execute the test suite and lint the project with:
make python-test
make python-lint
Format and lint on commit
We recommend you use pre-commit to automatically format and lint your commits. The commands below should be enough to get you up and running. Reference the official documentation for full installation details.
Pre-commit installation on Linux
# install pre-commit
sudo pip3 install pre-commit
# install git hook scripts
pre-commit install
# uninstall git hook scripts
pre-commit uninstall
Pre-commit installation on MacOS
The commands below were tested on MacOS 10.15.
# install pre-commit
pip3 install --user pre-commit
# install git hook scripts
~/Library/Python/3.8/bin/pre-commit install
# uninstall git hook scripts
~/Library/Python/3.8/bin/pre-commit uninstall
Makefile targets
This project extends the standard SKA Make targets with a few additional Make targets that can be useful for developers. These targets are:
Makefile target |
Description |
---|---|
dev-up |
deploy the OET using the current developer image, exposing REST ingress on the host |
dev-down |
tear down the developer OET deployment |
rest |
start the OET backend in a Docker container |
diagrams |
recreate PlantUML diagrams whose source has been modified |
k8s-chart-test |
run helm chart unit tests (note: requires helm unittest plugin: https://github.com/quintush/helm-unittest ) |
help |
show a summary of the makefile targets above |
Local development with k8s
OET REST server can be deployed locally using Helm and Kubernetes and OET CLI OET command line tool can be used to communicate with the server. OET CLI is installed as part of the Poetry virtual environment (see README) or can be used inside a running OET container/pod.
If using OET CLI within Poetry virtual environment these steps are needed for the CLI to access the REST server:
set rest.ingress.enabled to true in charts/ska-oso-oet/values.yaml
set OET_REST_URI environment variable with export OET_REST_URI=http://<minikube IP>/<kube namespace>/ska-oso-oet/api/v1.0
To deploy OET REST server run
make k8s-chart-install && make k8s-wait
Feature flags
OET feature flags are configured via environment variables and configuration files. The configuration file, ska_oso_oet.ini, can be located either in the user’s home directory, or the root of the installation folder.
Feature flags are read in this order:
environment variable;
ska_oso_oet.ini configuration file;
default flag value as specified in OET code.
No feature flags are available at this time.