ska_oso_oet.procedure
ska_oso_oet.procedure.application
The ska_oso_oet.procedure.application module holds classes and functionality that belong in the application layer of the OET. This layer holds the application interface, delegating to objects in the domain layer for business rules and actions.
- class ska_oso_oet.procedure.application.ArgCapture(*args: Any, **kwargs: Any)[source]
ArgCapture is a struct to record function call and time of invocation.
- class ska_oso_oet.procedure.application.PrepareProcessCommand(*args: Any, **kwargs: Any)[source]
PrepareProcessCommand is input argument dataclass for the ScriptExecutionService prepare command. It holds all the information required to load and prepare a Python script ready for execution.
- __init__(script: ExecutableScript, init_args: ProcedureInput)[source]
- class ska_oso_oet.procedure.application.ProcedureHistory(*args: Any, **kwargs: Any)[source]
ProcedureHistory is a non-functional dataclass holding execution history of a Procedure spanning all transactions.
- process_states: records time for each change of ProcedureState (list of
tuples where tuple contains the ProcedureState and time when state was changed to)
- stacktrace: None unless execution_error is True in which case stores
stacktrace from process
- class ska_oso_oet.procedure.application.ProcedureSummary(*args: Any, **kwargs: Any)[source]
ProcedureSummary is a brief representation of a runtime Procedure. It captures essential information required to describe a Procedure and to distinguish it from other Procedures.
- __init__(id: int, script: ExecutableScript | None, script_args: List[ArgCapture] | None, history: ProcedureHistory | None, state: ProcedureState | None, uri: str | None = None)[source]
- class ska_oso_oet.procedure.application.ScriptExecutionService(mp_context: BaseContext | None = None, abort_script: ExecutableScript = pydantic.BaseModel, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
ScriptExecutionService provides the high-level interface and facade for the script execution domain (i.e., the ‘procedure’ domain).
The interface is used to load and run Python scripts in their own independent Python child process.
The shutdown method should be called to ensure cleanup of any multiprocessing artefacts owned by this service.
- __init__(mp_context: BaseContext | None = None, abort_script: ExecutableScript = pydantic.BaseModel, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
Create a new ScriptExecutionService.
The .stop() method of this ScriptExecutionService can run a second script once the current process has been terminated. By default, this second script calls SubArrayNode.abort() to halt further activities on the sub-array controlled by the terminated script. To run a different script, define the script URI in the abort_script_uri argument to this constructor.
- Parameters:
mp_context – multiprocessing context to use or None for default
abort_script – post-termination script for two-phase abort
on_pubsub – callbacks to call when PUBSUB message is received
- prepare(cmd: PrepareProcessCommand) ProcedureSummary [source]
Load and prepare a Python script for execution, but do not commence execution.
- Parameters:
cmd – dataclass argument capturing the script identity and load arguments
- Returns:
- start(cmd: StartProcessCommand) ProcedureSummary [source]
Start execution of a prepared procedure.
- Parameters:
cmd – dataclass argument capturing the execution arguments
- Returns:
- stop(cmd: StopProcessCommand) List[ProcedureSummary] [source]
Stop execution of a running procedure, optionally running a second script once the first process has terminated.
- Parameters:
cmd – dataclass argument capturing the execution arguments
- Returns:
- summarise(pids: List[int] | None = None) List[ProcedureSummary] [source]
Return ProcedureSummary objects for Procedures with the requested IDs.
This method accepts an optional list of integers, representing the Procedure IDs to summarise. If the pids is left undefined, ProcedureSummary objects for all current Procedures will be returned.
- Parameters:
pids – optional list of Procedure IDs to summarise.
- Returns:
list of ProcedureSummary objects
- class ska_oso_oet.procedure.application.StartProcessCommand(*args: Any, **kwargs: Any)[source]
StartProcessCommand is the input argument dataclass for the ScriptExecutionService start command. It holds the references required to start a prepared script process along with any late-binding runtime arguments the script may require.
- class ska_oso_oet.procedure.application.StopProcessCommand(*args: Any, **kwargs: Any)[source]
StopProcessCommand is the input argument dataclass for the ScriptExecutionService Stop command. It holds the references required to Stop a script process along with any late-binding runtime arguments the script may require.
ska_oso_oet.procedure.domain
The ska_oso_oet.procedure.domain module holds domain entities from the script execution domain. Entities in this domain are things like scripts, OS processes, process supervisors, signal handlers, etc.
- class ska_oso_oet.procedure.domain.ExecutableScript(*args: Any, **kwargs: Any)[source]
Base class for all executable scripts.
Expected specialisations:
scripts on filesystem
scripts in git repository
scripts given as a string
scripts stored in the ODA
etc.
- class ska_oso_oet.procedure.domain.FileSystemScript(*args: Any, **kwargs: Any)[source]
Represents a script stored on the file system.
- class ska_oso_oet.procedure.domain.GitScript(*args: Any, **kwargs: Any)[source]
Represents a script in a git repository.
- class ska_oso_oet.procedure.domain.LifecycleMessage(msg_src: str, new_state: ProcedureState)[source]
LifecycleMessage is a message type for script lifecycle events.
- __init__(msg_src: str, new_state: ProcedureState)[source]
- class ska_oso_oet.procedure.domain.ModuleFactory[source]
Factory class used to return Python Module instances from a variety of storage back-ends.
- static get_module(script: ExecutableScript)[source]
Load Python code from storage, returning an executable Python module.
- Parameters:
script – Script object describing the script to load
- Returns:
Python module
- class ska_oso_oet.procedure.domain.ProcedureInput(*args: Any, **kwargs: Any)[source]
ProcedureInput is a non-functional dataclass holding the arguments passed to a script method.
- class ska_oso_oet.procedure.domain.ProcedureState(value)[source]
Represents the script execution state.
- class ska_oso_oet.procedure.domain.ProcessManager(mp_context: BaseContext | None = None, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
ProcessManager is the parent for all ScriptWorker processes.
ProcessManager is responsible for launching ScriptWorker processes and communicating API requests such as ‘run main() function’ or ‘stop execution’ to the running scripts. If a script execution process does not respond to the request, the process will be forcibly terminated. ProcessManager delegates to the mptools framework for process management functionality. Familiarity with mptools is useful in understanding ProcessManager functionality.
ProcessManager is also responsible for communicating script events to the rest of the system, such as events issued by the script or related to the script execution lifecycle.
It is recommended that ProcessManager.shutdown() be called before the ProcessManager is garbage collected. Failure to call shutdown could break the any multiprocessing state held in the scope of the manager or its child processes. This may or may not be a problem, depending on what is held and whether that state is used elsewhere. In short, be safe and call shutdown().
Note: ProcessManager does not maintain a history of script execution. History is recorded and managed by the ScriptExecutionService.
- __init__(mp_context: BaseContext | None = None, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]
Create a new ProcessManager.
Functions passed in the on_pubsub argument will be called by the ProcessManager every time the ProcessManager’s message loop receives a PUBSUB EventMessage. Callbacks should not perform significant processing on the same thread, as this would block the ProcessManager event loop.
- Parameters:
mp_context – multiprocessing context use to create multiprocessing primitives
on_pubsub – functions to call when a PUBSUB message is received
- create(script: ExecutableScript, *, init_args: ProcedureInput) int [source]
Create a new Procedure that will, when executed, run the target Python script.
Objects that can only be shared through inheritance, such as multiprocessing object, can be shared by providing them as init_args here. These arguments will be provided to the init function in the user script, where present.
- Parameters:
script – script URI, e.g. ‘file://myscript.py’
init_args – script initialisation arguments
- Returns:
- run(process_id: int, *, call: str, run_args: ProcedureInput, force_start: bool = False) None [source]
Run a prepared Procedure.
This starts execution of the script prepared by a previous create() call.
- Parameters:
process_id – ID of Procedure to execute
call – name of function to call
run_args – late-binding arguments to provide to the script
force_start – Add run command to queue even if the script is not yet ready to run. Does not add command to queue if ProcedureState is FAILED, STOPPED, COMPLETE or UNKNOWN
- Returns:
- class ska_oso_oet.procedure.domain.ScriptWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, environment: Environment | None = None, **kwargs)[source]
ScriptWorker loads user code in a child process, running functions of that user code on request.
ScriptWorker acts when a message is received on its work queue. It responds to four types of messages:
LOAD - to load the specified code in this process
ENV - to install the dependencies for the specified script in this process
RUN - to run the named function in this process
PUBSUB - external pubsub messages that should be published locally
ScriptWorker converts external inter-process mptool pub/sub messages to intra-process pypubsub pub/sub messages. That is, EventMessages received on the local work queue are rebroadcast locally as pypubsub messages. Likewise, the ScriptWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other interested ScriptWorkers.
- __init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, environment: Environment | None = None, **kwargs)[source]
Create a new ProcWorker.
- Parameters:
name – name of this worker
startup_event – event to set on startup completion
shutdown_event – event to monitor for shutdown
event_q – queue for messages to/from MainWorker
args
- main_loop() None [source]
main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications.
Event delivery will cease when the shutdown event is set or a special sentinel message is sent.
- publish_lifecycle(new_state: ProcedureState)[source]
Broadcast a lifecycle status change event.
- Parameters:
new_state – new lifecycle state
- republish(topic: pubsub.pub.Topic = pubsub.pub.AUTO_TOPIC, **kwargs) None [source]
Republish a local pypubsub event over the inter-process mptools event bus.
- Parameters:
topic – message topic, set automatically by pypubsub
kwargs – any metadata associated with pypubsub message
- Returns:
- static term_handler(signal_object, exception_class, signal_num: int, current_stack_frame) None
Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.
We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
- ska_oso_oet.procedure.domain.script_signal_handler(signal_object, exception_class, signal_num: int, current_stack_frame) None [source]
Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.
We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.
- Parameters:
signal_object – SignalObject to modify to reflect signal-handling state
exception_class – Exception type to raise when call limit is exceeded
signal_num – POSIX signal ID
current_stack_frame – current stack frame
ska_oso_oet.procedure.environment
ska_oso_oet.procedure.gitmanager
Static helper functions for cloning and working with a Git repository
ska_oso_oet.procedure.ui
The ska_oso_oet.procedure.ui package contains code that belong to the OET procedure UI layer. This consists of the Procedure REST resources.