ska_oso_oet.procedure

ska_oso_oet.procedure.application

The ska_oso_oet.procedure.application module holds classes and functionality that belong in the application layer of the OET. This layer holds the application interface, delegating to objects in the domain layer for business rules and actions.

class ska_oso_oet.procedure.application.ArgCapture(fn: str, fn_args: ProcedureInput, time: float | None = None)[source]

ArgCapture is a struct to record function call and time of invocation.

__init__(fn: str, fn_args: ProcedureInput, time: float | None = None) None
class ska_oso_oet.procedure.application.PrepareProcessCommand(script: ExecutableScript, init_args: ProcedureInput)[source]

PrepareProcessCommand is input argument dataclass for the ScriptExecutionService prepare command. It holds all the information required to load and prepare a Python script ready for execution.

__init__(script: ExecutableScript, init_args: ProcedureInput) None
class ska_oso_oet.procedure.application.ProcedureHistory(process_states: List[Tuple[ProcedureState, float]] | None = None, stacktrace=None)[source]

ProcedureHistory is a non-functional dataclass holding execution history of a Procedure spanning all transactions.

process_states: records time for each change of ProcedureState (list of

tuples where tuple contains the ProcedureState and time when state was changed to)

stacktrace: None unless execution_error is True in which case stores

stacktrace from process

__init__(process_states: List[Tuple[ProcedureState, float]] | None = None, stacktrace=None)[source]
class ska_oso_oet.procedure.application.ProcedureSummary(id: int, script: ExecutableScript, script_args: List[ArgCapture], history: ProcedureHistory, state: ProcedureState)[source]

ProcedureSummary is a brief representation of a runtime Procedure. It captures essential information required to describe a Procedure and to distinguish it from other Procedures.

__init__(id: int, script: ExecutableScript, script_args: List[ArgCapture], history: ProcedureHistory, state: ProcedureState) None
class ska_oso_oet.procedure.application.ScriptExecutionService(mp_context: BaseContext | None = None, abort_script: ExecutableScript = FileSystemScript(script_uri='file:///home/docs/checkouts/readthedocs.org/user_builds/ska-telescope-ska-oso-oet/checkouts/latest/src/ska_oso_oet/procedure/abort.py'), on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]

ScriptExecutionService provides the high-level interface and facade for the script execution domain (i.e., the ‘procedure’ domain).

The interface is used to load and run Python scripts in their own independent Python child process.

The shutdown method should be called to ensure cleanup of any multiprocessing artefacts owned by this service.

__init__(mp_context: BaseContext | None = None, abort_script: ExecutableScript = FileSystemScript(script_uri='file:///home/docs/checkouts/readthedocs.org/user_builds/ska-telescope-ska-oso-oet/checkouts/latest/src/ska_oso_oet/procedure/abort.py'), on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]

Create a new ScriptExecutionService.

The .stop() method of this ScriptExecutionService can run a second script once the current process has been terminated. By default, this second script calls SubArrayNode.abort() to halt further activities on the sub-array controlled by the terminated script. To run a different script, define the script URI in the abort_script_uri argument to this constructor.

Parameters:
  • mp_context – multiprocessing context to use or None for default

  • abort_script – post-termination script for two-phase abort

  • on_pubsub – callbacks to call when PUBSUB message is received

prepare(cmd: PrepareProcessCommand) ProcedureSummary[source]

Load and prepare a Python script for execution, but do not commence execution.

Parameters:

cmd – dataclass argument capturing the script identity and load arguments

Returns:

start(cmd: StartProcessCommand) ProcedureSummary[source]

Start execution of a prepared procedure.

Parameters:

cmd – dataclass argument capturing the execution arguments

Returns:

stop(cmd: StopProcessCommand) List[ProcedureSummary][source]

Stop execution of a running procedure, optionally running a second script once the first process has terminated.

Parameters:

cmd – dataclass argument capturing the execution arguments

Returns:

summarise(pids: List[int] | None = None) List[ProcedureSummary][source]

Return ProcedureSummary objects for Procedures with the requested IDs.

This method accepts an optional list of integers, representing the Procedure IDs to summarise. If the pids is left undefined, ProcedureSummary objects for all current Procedures will be returned.

Parameters:

pids – optional list of Procedure IDs to summarise.

Returns:

list of ProcedureSummary objects

class ska_oso_oet.procedure.application.StartProcessCommand(process_uid: int, fn_name: str, run_args: ProcedureInput, force_start: bool = False)[source]

StartProcessCommand is the input argument dataclass for the ScriptExecutionService start command. It holds the references required to start a prepared script process along with any late-binding runtime arguments the script may require.

__init__(process_uid: int, fn_name: str, run_args: ProcedureInput, force_start: bool = False) None
class ska_oso_oet.procedure.application.StopProcessCommand(process_uid: int, run_abort: bool)[source]

StopProcessCommand is the input argument dataclass for the ScriptExecutionService Stop command. It holds the references required to Stop a script process along with any late-binding runtime arguments the script may require.

__init__(process_uid: int, run_abort: bool) None

ska_oso_oet.procedure.domain

The ska_oso_oet.procedure.domain module holds domain entities from the script execution domain. Entities in this domain are things like scripts, OS processes, process supervisors, signal handlers, etc.

class ska_oso_oet.procedure.domain.ExecutableScript[source]

Base class for all executable scripts.

Expected specialisations:

  • scripts on filesystem

  • scripts in git repository

  • scripts given as a string

  • scripts stored in the ODA

  • etc.

__init__() None
class ska_oso_oet.procedure.domain.FileSystemScript(script_uri: str)[source]

Represents a script stored on the file system.

__init__(script_uri: str) None
class ska_oso_oet.procedure.domain.GitScript(script_uri: str, git_args: GitArgs, create_env: bool | None = False)[source]

Represents a script in a git repository.

__init__(script_uri: str, git_args: GitArgs, create_env: bool | None = False) None
class ska_oso_oet.procedure.domain.LifecycleMessage(msg_src: str, new_state: ProcedureState)[source]

LifecycleMessage is a message type for script lifecycle events.

__init__(msg_src: str, new_state: ProcedureState)[source]
class ska_oso_oet.procedure.domain.ModuleFactory[source]

Factory class used to return Python Module instances from a variety of storage back-ends.

static get_module(script: ExecutableScript)[source]

Load Python code from storage, returning an executable Python module.

Parameters:

script – Script object describing the script to load

Returns:

Python module

class ska_oso_oet.procedure.domain.ProcedureInput(*args, **kwargs)[source]

ProcedureInput is a non-functional dataclass holding the arguments passed to a script method.

__init__(*args, **kwargs)[source]
class ska_oso_oet.procedure.domain.ProcedureState(value)[source]

Represents the script execution state.

class ska_oso_oet.procedure.domain.ProcessManager(mp_context: BaseContext | None = None, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]

ProcessManager is the parent for all ScriptWorker processes.

ProcessManager is responsible for launching ScriptWorker processes and communicating API requests such as ‘run main() function’ or ‘stop execution’ to the running scripts. If a script execution process does not respond to the request, the process will be forcibly terminated. ProcessManager delegates to the mptools framework for process management functionality. Familiarity with mptools is useful in understanding ProcessManager functionality.

ProcessManager is also responsible for communicating script events to the rest of the system, such as events issued by the script or related to the script execution lifecycle.

It is recommended that ProcessManager.shutdown() be called before the ProcessManager is garbage collected. Failure to call shutdown could break the any multiprocessing state held in the scope of the manager or its child processes. This may or may not be a problem, depending on what is held and whether that state is used elsewhere. In short, be safe and call shutdown().

Note: ProcessManager does not maintain a history of script execution. History is recorded and managed by the ScriptExecutionService.

__init__(mp_context: BaseContext | None = None, on_pubsub: List[Callable[[EventMessage], None]] | None = None)[source]

Create a new ProcessManager.

Functions passed in the on_pubsub argument will be called by the ProcessManager every time the ProcessManager’s message loop receives a PUBSUB EventMessage. Callbacks should not perform significant processing on the same thread, as this would block the ProcessManager event loop.

Parameters:
  • mp_context – multiprocessing context use to create multiprocessing primitives

  • on_pubsub – functions to call when a PUBSUB message is received

create(script: ExecutableScript, *, init_args: ProcedureInput) int[source]

Create a new Procedure that will, when executed, run the target Python script.

Objects that can only be shared through inheritance, such as multiprocessing object, can be shared by providing them as init_args here. These arguments will be provided to the init function in the user script, where present.

Parameters:
  • script – script URI, e.g. ‘file://myscript.py

  • init_args – script initialisation arguments

Returns:

run(process_id: int, *, call: str, run_args: ProcedureInput, force_start: bool = False) None[source]

Run a prepared Procedure.

This starts execution of the script prepared by a previous create() call.

Parameters:
  • process_id – ID of Procedure to execute

  • call – name of function to call

  • run_args – late-binding arguments to provide to the script

  • force_start – Add run command to queue even if the script is not yet ready to run. Does not add command to queue if ProcedureState is FAILED, STOPPED, COMPLETE or UNKNOWN

Returns:

stop(process_id: int) None[source]

Stop a running Procedure.

This stops execution of a currently running script.

Parameters:

process_id – ID of Procedure to stop

Returns:

class ska_oso_oet.procedure.domain.ScriptWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, scan_counter: Value | None = None, environment: Environment | None = None, **kwargs)[source]

ScriptWorker loads user code in a child process, running functions of that user code on request.

ScriptWorker acts when a message is received on its work queue. It responds to four types of messages:

  1. LOAD - to load the specified code in this process

  2. ENV - to install the dependencies for the specified script in this process

  3. RUN - to run the named function in this process

  4. PUBSUB - external pubsub messages that should be published locally

ScriptWorker converts external inter-process mptool pub/sub messages to intra-process pypubsub pub/sub messages. That is, EventMessages received on the local work queue are rebroadcast locally as pypubsub messages. Likewise, the ScriptWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other interested ScriptWorkers.

__init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, scan_counter: Value | None = None, environment: Environment | None = None, **kwargs)[source]

Create a new ProcWorker.

Parameters:
  • name – name of this worker

  • startup_event – event to set on startup completion

  • shutdown_event – event to monitor for shutdown

  • event_q – queue for messages to/from MainWorker

  • args

main_loop() None[source]

main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications.

Event delivery will cease when the shutdown event is set or a special sentinel message is sent.

publish_lifecycle(new_state: ProcedureState)[source]

Broadcast a lifecycle status change event.

Parameters:

new_state – new lifecycle state

republish(topic: pubsub.pub.Topic = pubsub.pub.AUTO_TOPIC, **kwargs) None[source]

Republish a local pypubsub event over the inter-process mptools event bus.

Parameters:
  • topic – message topic, set automatically by pypubsub

  • kwargs – any metadata associated with pypubsub message

Returns:

static term_handler(signal_object, exception_class, signal_num: int, current_stack_frame) None

Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.

We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num – POSIX signal ID

  • current_stack_frame – current stack frame

ska_oso_oet.procedure.domain.script_signal_handler(signal_object, exception_class, signal_num: int, current_stack_frame) None[source]

Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script.

We don’t want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num – POSIX signal ID

  • current_stack_frame – current stack frame

ska_oso_oet.procedure.environment

class ska_oso_oet.procedure.environment.Environment(env_id: str, creating: <bound method BaseContext.Event of <multiprocessing.context.DefaultContext object at 0x7fe05562e7d0>>, created: <bound method BaseContext.Event of <multiprocessing.context.DefaultContext object at 0x7fe05562e7d0>>, location: str, site_packages: str)[source]
__init__(env_id: str, creating: Event, created: Event, location: str, site_packages: str) None

ska_oso_oet.procedure.gitmanager

Static helper functions for cloning and working with a Git repository

class ska_oso_oet.procedure.gitmanager.GitArgs(git_repo: str | None = 'https://gitlab.com/ska-telescope/oso/ska-oso-scripting.git', git_branch: str | None = None, git_commit: str | None = None)[source]

GitArgs captures information required to identify scripts located in git repositories.

__init__(git_repo: str | None = 'https://gitlab.com/ska-telescope/oso/ska-oso-scripting.git', git_branch: str | None = None, git_commit: str | None = None) None

ska_oso_oet.procedure.ui

The ska_oso_oet.procedure.ui package contains code that belong to the OET procedure UI layer. This consists of the Procedure REST resources.

ska_oso_oet.procedure.ui.create_procedure()[source]

Create a new Procedure.

This method requests creation of a new Procedure as specified in the JSON payload POSTed to this function.

Returns:

JSON summary of created Procedure

ska_oso_oet.procedure.ui.get_procedure(procedure_id: int)[source]

Get a Procedure.

This returns the Procedure JSON representation of the requested Procedure.

Parameters:

procedure_id – ID of the Procedure to return

Returns:

Procedure JSON

ska_oso_oet.procedure.ui.get_procedures()[source]

List all Procedures.

This returns a list of Procedure JSON representations for all Procedures held by the service.

Returns:

list of Procedure JSON representations

ska_oso_oet.procedure.ui.make_public_procedure_summary(procedure: ProcedureSummary)[source]

Convert a ProcedureSummary into JSON ready for client consumption.

The main use of this function is to replace the internal Procedure ID with the resource URI, e.g., 1 -> http://localhost:5000/ska-oso-oet/oet/api/v1/procedures/1

Parameters:

procedure – Procedure to convert

Returns:

safe JSON representation

ska_oso_oet.procedure.ui.update_procedure(procedure_id: int)[source]

Update a Procedure resource using the desired Procedure state described in the PUT JSON payload.

Parameters:

procedure_id – ID of Procedure to modify

Returns:

ProcedureSummary reflecting the final state of the Procedure