ska_oso_oet.procedure

ska_oso_oet.procedure.application

The ska_oso_oet.procedure.application module holds classes and functionality that belong in the application layer of the OET. This layer holds the application interface, delegating to objects in the domain layer for business rules and actions.

class ska_oso_oet.procedure.application.AbortSummary(*args, **kwargs)[source]
class ska_oso_oet.procedure.application.ArgCapture(*args, **kwargs)[source]

ArgCapture is a struct to record function call and time of invocation.

class ska_oso_oet.procedure.application.PrepareProcessCommand(script, init_args)[source]

PrepareProcessCommand is input argument dataclass for the ScriptExecutionService prepare command. It holds all the information required to load and prepare a Python script ready for execution.

__init__(script, init_args)[source]
class ska_oso_oet.procedure.application.ProcedureHistory(process_states=None, stacktrace=None)[source]

ProcedureHistory is a non-functional dataclass holding execution history of a Procedure spanning all transactions.

process_states: records time for each change of ProcedureState (list of

tuples where tuple contains the ProcedureState and time when state was changed to)

stacktrace: None unless execution_error is True in which case stores

stacktrace from process

__init__(process_states=None, stacktrace=None)[source]
class ska_oso_oet.procedure.application.ProcedureSummary(id, script, script_args, history, state, uri=None)[source]

ProcedureSummary is a brief representation of a runtime Procedure. It captures essential information required to describe a Procedure and to distinguish it from other Procedures.

__init__(id, script, script_args, history, state, uri=None)[source]
class ska_oso_oet.procedure.application.ScriptContext(*args, **kwargs)[source]

ScriptContext holds the current execution context provided to scripts.

This includes persistent operator overrides and any other runtime context that should be available to scripts at start time.

class ska_oso_oet.procedure.application.ScriptExecutionService(abort_script=pydantic.BaseModel)[source]

ScriptExecutionService provides the high-level interface and facade for the script execution domain (i.e., the ‘procedure’ domain).

The interface is used to load and run Python scripts in their own independent Python child process.

The shutdown method should be called to ensure cleanup of any multiprocessing artefacts owned by this service.

__init__(abort_script=pydantic.BaseModel)[source]

Create a new ScriptExecutionService.

The .stop() method of this ScriptExecutionService can run a second script once the current process has been terminated. By default, this second script calls SubArrayNode.abort() to halt further activities on the sub-array controlled by the terminated script. To run a different script, define the script URI in the abort_script_uri argument to this constructor.

Parameters:

abort_script (ExecutableScript) – post-termination script for two-phase abort

handle_wait_for_qa_ready_disable(msg_src)[source]

Handler for operator.wait_for_qa_ready.disable topic. Sets wait_for_qa_ready to False.

Parameters:

msg_src – component from which the request originated

Return type:

None

handle_wait_for_qa_ready_enable(msg_src)[source]

Handler for operator.wait_for_qa_ready.enable topic. Sets wait_for_qa_ready to True.

Parameters:

msg_src – component from which the request originated

Return type:

None

prepare(cmd)[source]

Load and prepare a Python script for execution, but do not commence execution.

Parameters:

cmd (PrepareProcessCommand) – dataclass argument capturing the script identity and load arguments

Return type:

ProcedureSummary

Returns:

start(cmd)[source]

Start execution of a prepared procedure.

Parameters:

cmd (StartProcessCommand) – dataclass argument capturing the execution arguments

Return type:

ProcedureSummary

Returns:

stop(cmd)[source]

Stop execution of a running procedure, optionally running a second script once the first process has terminated.

Parameters:

cmd (StopProcessCommand) – dataclass argument capturing the execution arguments

Return type:

list[ProcedureSummary]

Returns:

summarise(pids=None)[source]

Return ProcedureSummary objects for Procedures with the requested IDs.

This method accepts an optional list of integers, representing the Procedure IDs to summarise. If the pids is left undefined, ProcedureSummary objects for all current Procedures will be returned.

Parameters:

pids (list[int] | None) – optional list of Procedure IDs to summarise.

Return type:

list[ProcedureSummary]

Returns:

list of ProcedureSummary objects

class ska_oso_oet.procedure.application.StartProcessCommand(process_uid, fn_name, run_args, force_start=False)[source]

StartProcessCommand is the input argument dataclass for the ScriptExecutionService start command. It holds the references required to start a prepared script process along with any late-binding runtime arguments the script may require.

__init__(process_uid, fn_name, run_args, force_start=False)[source]
class ska_oso_oet.procedure.application.StopProcessCommand(process_uid, run_abort)[source]

StopProcessCommand is the input argument dataclass for the ScriptExecutionService Stop command. It holds the references required to Stop a script process along with any late-binding runtime arguments the script may require.

__init__(process_uid, run_abort)[source]

ska_oso_oet.procedure.domain

The ska_oso_oet.procedure.domain module holds domain entities from the script execution domain. Entities in this domain are things like scripts, OS processes, process supervisors, signal handlers, etc.

class ska_oso_oet.procedure.domain.ExecutableScript(*args, **kwargs)[source]

Base class for all executable scripts.

Expected specialisations:

  • scripts on filesystem

  • scripts in git repository

  • scripts given as a string

  • scripts stored in the ODA

  • etc.

class ska_oso_oet.procedure.domain.FileSystemScript(*args, **kwargs)[source]

Represents a script stored on the file system.

class ska_oso_oet.procedure.domain.GitScript(*args, **kwargs)[source]

Represents a script in a git repository.

class ska_oso_oet.procedure.domain.LifecycleMessage(msg_src, new_state)[source]

LifecycleMessage is a message type for script lifecycle events.

__init__(msg_src, new_state)[source]
class ska_oso_oet.procedure.domain.ModuleFactory[source]

Factory class used to return Python Module instances from a variety of storage back-ends.

static get_module(script)[source]

Load Python code from storage, returning an executable Python module.

Parameters:

script (ExecutableScript) – Script object describing the script to load

Returns:

Python module

class ska_oso_oet.procedure.domain.ProcedureInput(*args: Any, **kwargs: Any)[source]

ProcedureInput is a non-functional dataclass holding the arguments passed to a script method.

__init__(*args, **kwargs)[source]
class ska_oso_oet.procedure.domain.ProcedureState(value)[source]

Represents the script execution state.

class ska_oso_oet.procedure.domain.ProcessManager(states=None, state_lock=None)[source]

ProcessManager tracks and coordinates ScriptWorker processes.

ProcessManager is responsible for tracking script execution state and coordinating with main_loop via pypubsub messages. Actual process creation and management is handled by main_loop in main.py.

ProcessManager uses pypubsub to send management requests (create, run, stop) to main_loop, which creates and manages the actual ScriptWorker processes.

Note: ProcessManager does not maintain a history of script execution. History is recorded and managed by the ScriptExecutionService.

__init__(states=None, state_lock=None)[source]

Create a new ProcessManager.

Parameters:
  • states (dict[int, ProcedureState] | None) – Optional external states dictionary to use instead of creating an internal one. This allows sharing state with other components (e.g., ScriptExecutionService) to avoid race conditions. When external states are provided, this manager will NOT subscribe to statechange events as the external owner is responsible for updates.

  • state_lock (RLock | None) – Optional external lock for state updates. Should be provided if states is provided to ensure thread-safe updates.

create(script, *, init_args)[source]

Request creation of a new ScriptWorker process.

This method emits a procedure.management.create pypubsub message, which main_loop handles to create the actual process. The method returns immediately with state=CREATING.

Parameters:
Return type:

int

Returns:

PID assigned to this procedure

get_state(pid)[source]

Get the current state of a procedure.

Parameters:

pid (int) – Procedure ID

Return type:

ProcedureState | None

Returns:

Current state or None if not found

has_running()[source]

Check if any procedure is currently running.

Return type:

bool

Returns:

True if any procedure is in RUNNING state

run(process_id, *, call, run_args, force_start=False)[source]

Request execution of a function in a prepared procedure.

This method emits a procedure.management.run pypubsub message, which main_loop handles to send the run command to the ScriptWorker.

Parameters:
  • process_id (int) – ID of Procedure to execute

  • call (str) – name of function to call

  • run_args (ProcedureInput) – late-binding arguments to provide to the script

  • force_start (bool) – Add run command even if not READY (ignored for terminal states)

Return type:

None

stop(process_id)[source]

Request termination of a running procedure.

This method emits a procedure.management.stop pypubsub message, which main_loop handles to terminate the ScriptWorker.

Parameters:

process_id (int) – ID of Procedure to stop

Return type:

None

class ska_oso_oet.procedure.domain.RunThreadState(thread, fn_name, exception=None)[source]

Represents the state of a thread running a user function during its execution.

thread

The thread whose state is being represented.

fn_name

The name of the function associated with the thread.

exception

An exception that occurred during thread execution, if any.

__init__(thread, fn_name, exception=None)
class ska_oso_oet.procedure.domain.ScriptWorker(name, startup_event, shutdown_event, event_q, work_q, *args, environment=None, **kwargs)[source]

ScriptWorker loads user code in a child process, running functions of that user code on request.

ScriptWorker acts when a message is received on its work queue. It responds to four types of external message:

  1. LOAD - to load the specified code in this process

  2. ENV - to install the dependencies for the specified script in this process

  3. RUN - to run the named function in this process

  4. PUBSUB - external pubsub messages that should be published locally

In addition, the event loop also handles two internal messages originating from the user function execution thread:

  1. USER_FN_COMPLETE - indicates successful completion of the requested user function

  2. USER_FN_FAILED - indicates failure during user function execution

ScriptWorker converts external inter-process mptool pub/sub messages to intra-process pypubsub pub/sub messages. That is, EventMessages received on the local work queue are rebroadcast locally as pypubsub messages. Likewise, the ScriptWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other interested ScriptWorkers.

__init__(name, startup_event, shutdown_event, event_q, work_q, *args, environment=None, **kwargs)[source]

Create a new ProcWorker.

Parameters:
  • name (str) – name of this worker

  • startup_event (Event) – event to set on startup completion

  • shutdown_event (Event) – event to monitor for shutdown

  • event_q (MPQueue) – queue for messages to/from MainWorker

  • args

main_loop()[source]

main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications.

Event delivery will cease when the shutdown event is set or a special sentinel message is sent.

Return type:

None

publish_lifecycle(new_state)[source]

Broadcast a lifecycle status change event.

Parameters:

new_state (ProcedureState) – new lifecycle state

republish(topic=pubsub.pub.AUTO_TOPIC, **kwargs)[source]

Republish a local pypubsub event over the inter-process mptools event bus.

Parameters:
  • topic (Topic) – message topic, set automatically by pypubsub

  • kwargs – any metadata associated with pypubsub message

Return type:

None

Returns:

static term_handler(exception_class, signal_num, current_stack_frame)

Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.

We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num (int) – POSIX signal ID

  • current_stack_frame – current stack frame

Return type:

None

ska_oso_oet.procedure.domain.script_signal_handler(signal_object, exception_class, signal_num, current_stack_frame)[source]

Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.

We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num (int) – POSIX signal ID

  • current_stack_frame – current stack frame

Return type:

None

ska_oso_oet.procedure.environment

class ska_oso_oet.procedure.environment.Environment(env_id, creating, created, location, site_packages)[source]

Represents a Python virtual environment for script execution.

The creating and created Events are used to coordinate duplicate environment creation prevention when multiple procedures request the same environment.

These Events are created in the main process and inherited by ScriptWorker child processes, allowing cross-process coordination without pickling.

__init__(env_id, creating, created, location, site_packages)
class ska_oso_oet.procedure.environment.EnvironmentManager(mp_context=None, base_dir='/tmp/environments/')[source]

Manages Python virtual environments for script execution.

Creates the venv structure and Event objects in the main process. The actual pip install happens in ScriptWorker processes, coordinated via the creating/created Events.

__init__(mp_context=None, base_dir='/tmp/environments/')[source]
create_env(git_args)[source]

Get an existing environment or create a new one.

Creates the venv structure but NOT the pip install - that happens in ScriptWorker processes coordinated via the creating/created Events.

Parameters:

git_args (GitArgs) – Git repository arguments

Return type:

Environment

Returns:

Environment object with venv created but pip not yet installed

delete_env(env_id)[source]

Delete an environment.

Parameters:

env_id (str) – ID of environment to delete

Return type:

None

ska_oso_oet.procedure.gitmanager

Static helper functions for cloning and working with a Git repository

class ska_oso_oet.procedure.gitmanager.GitArgs(git_repo='https://gitlab.com/ska-telescope/oso/ska-oso-scripting.git', git_branch=None, git_commit=None)[source]

GitArgs captures information required to identify scripts located in git repositories.

__init__(git_repo='https://gitlab.com/ska-telescope/oso/ska-oso-scripting.git', git_branch=None, git_commit=None)[source]

ska_oso_oet.procedure.ui

The ska_oso_oet.procedure.ui package contains code that belong to the OET procedure UI layer. This consists of the Procedure REST resources.

class ska_oso_oet.procedure.ui.ProcedurePostRequest(*args, **kwargs)[source]
class ska_oso_oet.procedure.ui.ProcedurePutRequest(*args, **kwargs)[source]