ska_oso_oet.procedure
ska_oso_oet.procedure.application
The ska_oso_oet.procedure.application module holds classes and functionality that belong in the application layer of the OET. This layer holds the application interface, delegating to objects in the domain layer for business rules and actions.
- class ska_oso_oet.procedure.application.ArgCapture(*args, **kwargs)[source]
ArgCapture is a struct to record function call and time of invocation.
- class ska_oso_oet.procedure.application.PrepareProcessCommand(script, init_args)[source]
PrepareProcessCommand is input argument dataclass for the ScriptExecutionService prepare command. It holds all the information required to load and prepare a Python script ready for execution.
- class ska_oso_oet.procedure.application.ProcedureHistory(process_states=None, stacktrace=None)[source]
ProcedureHistory is a non-functional dataclass holding execution history of a Procedure spanning all transactions.
- process_states: records time for each change of ProcedureState (list of
tuples where tuple contains the ProcedureState and time when state was changed to)
- stacktrace: None unless execution_error is True in which case stores
stacktrace from process
- class ska_oso_oet.procedure.application.ProcedureSummary(id, script, script_args, history, state, uri=None)[source]
ProcedureSummary is a brief representation of a runtime Procedure. It captures essential information required to describe a Procedure and to distinguish it from other Procedures.
- class ska_oso_oet.procedure.application.ScriptContext(*args, **kwargs)[source]
ScriptContext holds the current execution context provided to scripts.
This includes persistent operator overrides and any other runtime context that should be available to scripts at start time.
- class ska_oso_oet.procedure.application.ScriptExecutionService(abort_script=pydantic.BaseModel)[source]
ScriptExecutionService provides the high-level interface and facade for the script execution domain (i.e., the ‘procedure’ domain).
The interface is used to load and run Python scripts in their own independent Python child process.
The shutdown method should be called to ensure cleanup of any multiprocessing artefacts owned by this service.
- __init__(abort_script=pydantic.BaseModel)[source]
Create a new ScriptExecutionService.
The .stop() method of this ScriptExecutionService can run a second script once the current process has been terminated. By default, this second script calls SubArrayNode.abort() to halt further activities on the sub-array controlled by the terminated script. To run a different script, define the script URI in the abort_script_uri argument to this constructor.
- Parameters:
abort_script (
ExecutableScript) – post-termination script for two-phase abort
- handle_wait_for_qa_ready_disable(msg_src)[source]
Handler for operator.wait_for_qa_ready.disable topic. Sets wait_for_qa_ready to False.
- Parameters:
msg_src – component from which the request originated
- Return type:
- handle_wait_for_qa_ready_enable(msg_src)[source]
Handler for operator.wait_for_qa_ready.enable topic. Sets wait_for_qa_ready to True.
- Parameters:
msg_src – component from which the request originated
- Return type:
- prepare(cmd)[source]
Load and prepare a Python script for execution, but do not commence execution.
- Parameters:
cmd (
PrepareProcessCommand) – dataclass argument capturing the script identity and load arguments- Return type:
- Returns:
- start(cmd)[source]
Start execution of a prepared procedure.
- Parameters:
cmd (
StartProcessCommand) – dataclass argument capturing the execution arguments- Return type:
- Returns:
- stop(cmd)[source]
Stop execution of a running procedure, optionally running a second script once the first process has terminated.
- Parameters:
cmd (
StopProcessCommand) – dataclass argument capturing the execution arguments- Return type:
- Returns:
- summarise(pids=None)[source]
Return ProcedureSummary objects for Procedures with the requested IDs.
This method accepts an optional list of integers, representing the Procedure IDs to summarise. If the pids is left undefined, ProcedureSummary objects for all current Procedures will be returned.
- class ska_oso_oet.procedure.application.StartProcessCommand(process_uid, fn_name, run_args, force_start=False)[source]
StartProcessCommand is the input argument dataclass for the ScriptExecutionService start command. It holds the references required to start a prepared script process along with any late-binding runtime arguments the script may require.
- class ska_oso_oet.procedure.application.StopProcessCommand(process_uid, run_abort)[source]
StopProcessCommand is the input argument dataclass for the ScriptExecutionService Stop command. It holds the references required to Stop a script process along with any late-binding runtime arguments the script may require.
ska_oso_oet.procedure.domain
The ska_oso_oet.procedure.domain module holds domain entities from the script execution domain. Entities in this domain are things like scripts, OS processes, process supervisors, signal handlers, etc.
- class ska_oso_oet.procedure.domain.ExecutableScript(*args, **kwargs)[source]
Base class for all executable scripts.
Expected specialisations:
scripts on filesystem
scripts in git repository
scripts given as a string
scripts stored in the ODA
etc.
- class ska_oso_oet.procedure.domain.FileSystemScript(*args, **kwargs)[source]
Represents a script stored on the file system.
- class ska_oso_oet.procedure.domain.GitScript(*args, **kwargs)[source]
Represents a script in a git repository.
- class ska_oso_oet.procedure.domain.LifecycleMessage(msg_src, new_state)[source]
LifecycleMessage is a message type for script lifecycle events.
- class ska_oso_oet.procedure.domain.ModuleFactory[source]
Factory class used to return Python Module instances from a variety of storage back-ends.
- static get_module(script)[source]
Load Python code from storage, returning an executable Python module.
- Parameters:
script (
ExecutableScript) – Script object describing the script to load- Returns:
Python module
- class ska_oso_oet.procedure.domain.ProcedureInput(*args: Any, **kwargs: Any)[source]
ProcedureInput is a non-functional dataclass holding the arguments passed to a script method.
- class ska_oso_oet.procedure.domain.ProcedureState(value)[source]
Represents the script execution state.
- class ska_oso_oet.procedure.domain.ProcessManager(states=None, state_lock=None)[source]
ProcessManager tracks and coordinates ScriptWorker processes.
ProcessManager is responsible for tracking script execution state and coordinating with main_loop via pypubsub messages. Actual process creation and management is handled by main_loop in main.py.
ProcessManager uses pypubsub to send management requests (create, run, stop) to main_loop, which creates and manages the actual ScriptWorker processes.
Note: ProcessManager does not maintain a history of script execution. History is recorded and managed by the ScriptExecutionService.
- __init__(states=None, state_lock=None)[source]
Create a new ProcessManager.
- Parameters:
states (
dict[int,ProcedureState] |None) – Optional external states dictionary to use instead of creating an internal one. This allows sharing state with other components (e.g., ScriptExecutionService) to avoid race conditions. When external states are provided, this manager will NOT subscribe to statechange events as the external owner is responsible for updates.state_lock (
RLock|None) – Optional external lock for state updates. Should be provided if states is provided to ensure thread-safe updates.
- create(script, *, init_args)[source]
Request creation of a new ScriptWorker process.
This method emits a procedure.management.create pypubsub message, which main_loop handles to create the actual process. The method returns immediately with state=CREATING.
- Parameters:
script (
ExecutableScript) – script to executeinit_args (
ProcedureInput) – script initialisation arguments
- Return type:
- Returns:
PID assigned to this procedure
- get_state(pid)[source]
Get the current state of a procedure.
- Parameters:
pid (
int) – Procedure ID- Return type:
- Returns:
Current state or None if not found
- has_running()[source]
Check if any procedure is currently running.
- Return type:
- Returns:
True if any procedure is in RUNNING state
- run(process_id, *, call, run_args, force_start=False)[source]
Request execution of a function in a prepared procedure.
This method emits a procedure.management.run pypubsub message, which main_loop handles to send the run command to the ScriptWorker.
- Parameters:
process_id (
int) – ID of Procedure to executecall (
str) – name of function to callrun_args (
ProcedureInput) – late-binding arguments to provide to the scriptforce_start (
bool) – Add run command even if not READY (ignored for terminal states)
- Return type:
- class ska_oso_oet.procedure.domain.RunThreadState(thread, fn_name, exception=None)[source]
Represents the state of a thread running a user function during its execution.
- thread
The thread whose state is being represented.
- fn_name
The name of the function associated with the thread.
- exception
An exception that occurred during thread execution, if any.
- __init__(thread, fn_name, exception=None)
- class ska_oso_oet.procedure.domain.ScriptWorker(name, startup_event, shutdown_event, event_q, work_q, *args, environment=None, **kwargs)[source]
ScriptWorker loads user code in a child process, running functions of that user code on request.
ScriptWorker acts when a message is received on its work queue. It responds to four types of external message:
LOAD - to load the specified code in this process
ENV - to install the dependencies for the specified script in this process
RUN - to run the named function in this process
PUBSUB - external pubsub messages that should be published locally
In addition, the event loop also handles two internal messages originating from the user function execution thread:
USER_FN_COMPLETE - indicates successful completion of the requested user function
USER_FN_FAILED - indicates failure during user function execution
ScriptWorker converts external inter-process mptool pub/sub messages to intra-process pypubsub pub/sub messages. That is, EventMessages received on the local work queue are rebroadcast locally as pypubsub messages. Likewise, the ScriptWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other interested ScriptWorkers.
- __init__(name, startup_event, shutdown_event, event_q, work_q, *args, environment=None, **kwargs)[source]
Create a new ProcWorker.
- main_loop()[source]
main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications.
Event delivery will cease when the shutdown event is set or a special sentinel message is sent.
- Return type:
- publish_lifecycle(new_state)[source]
Broadcast a lifecycle status change event.
- Parameters:
new_state (
ProcedureState) – new lifecycle state
- republish(topic=pubsub.pub.AUTO_TOPIC, **kwargs)[source]
Republish a local pypubsub event over the inter-process mptools event bus.
- Parameters:
topic (
Topic) – message topic, set automatically by pypubsubkwargs – any metadata associated with pypubsub message
- Return type:
- Returns:
- static term_handler(exception_class, signal_num, current_stack_frame)
Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.
We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.
- ska_oso_oet.procedure.domain.script_signal_handler(signal_object, exception_class, signal_num, current_stack_frame)[source]
Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.
We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.
ska_oso_oet.procedure.environment
- class ska_oso_oet.procedure.environment.Environment(env_id, creating, created, location, site_packages)[source]
Represents a Python virtual environment for script execution.
The creating and created Events are used to coordinate duplicate environment creation prevention when multiple procedures request the same environment.
These Events are created in the main process and inherited by ScriptWorker child processes, allowing cross-process coordination without pickling.
- __init__(env_id, creating, created, location, site_packages)
- class ska_oso_oet.procedure.environment.EnvironmentManager(mp_context=None, base_dir='/tmp/environments/')[source]
Manages Python virtual environments for script execution.
Creates the venv structure and Event objects in the main process. The actual pip install happens in ScriptWorker processes, coordinated via the creating/created Events.
- create_env(git_args)[source]
Get an existing environment or create a new one.
Creates the venv structure but NOT the pip install - that happens in ScriptWorker processes coordinated via the creating/created Events.
- Parameters:
git_args (
GitArgs) – Git repository arguments- Return type:
- Returns:
Environment object with venv created but pip not yet installed
ska_oso_oet.procedure.gitmanager
Static helper functions for cloning and working with a Git repository
ska_oso_oet.procedure.ui
The ska_oso_oet.procedure.ui package contains code that belong to the OET procedure UI layer. This consists of the Procedure REST resources.