"""
The ska_oso_oet.procedure.domain module holds domain entities from the script
execution domain. Entities in this domain are things like scripts,
OS processes, process supervisors, signal handlers, etc.
"""
import abc
import enum
import errno
import functools
import importlib.machinery
import inspect
import itertools
import logging
import multiprocessing
import os
import signal
import subprocess
import sys
import sysconfig
import threading
import types
from dataclasses import dataclass
from typing import Any, Callable, Literal, Optional, Type
from pubsub import pub
from pubsub.core.topicargspec import SenderUnknownMsgDataError
from pydantic import BaseModel, Field, model_serializer, model_validator
from pydantic_core.core_schema import SerializerFunctionWrapHandler
from ska_oso_oet import mptools
from ska_oso_oet.mptools import EventMessage
from ska_oso_oet.procedure.environment import PYPI_EXTRA_URLS, Environment
from ska_oso_oet.procedure.gitmanager import GitArgs, GitManager
LOGGER = logging.getLogger(__name__)
# Maximum time allowed for environment creation. Note that environment
# creation must succeed within this period or creation will be considered as
# failed, regardless of the current status of that installation process.
# Experience shows that the default timeout of 600 secs (10 minutes) is more
# than sufficient for any standard Python project. Installing a project that
# required compilation, such as a project that depends on a new version of
# Tango, could in principle exceed this timeout. However, as the Docker image
# does not include compiler tools, compilation is not possible anyway.
ENV_CREATION_TIMEOUT_SECS = 600.0
# Timeout per SIGTERM retry when terminating a script process. On slow CI
# runners, coverage atexit cleanup keeps the process alive longer than the
# default 0.1s, causing terminate() to report failure (state=UNKNOWN).
PROC_TERMINATE_TIMEOUT_SECS = float(os.getenv("PROC_TERMINATE_TIMEOUT_SECS", "0.1"))
DEFAULT_SIGTERM_HANDLER = signal.getsignal(signal.SIGTERM)
[docs]
def script_signal_handler(
signal_object,
exception_class,
signal_num: int, # pylint: disable=unused-argument
current_stack_frame,
) -> None:
"""
Custom signal handling function that simply raises an exception.
Assuming the running Python script does not catch this exception, it
will interrupt script execution and result in termination of that script.
We don't want all sibling script processes to terminate, hence no setting
of shutdown_event is done in this handler.
:param signal_object: SignalObject to modify to reflect signal-handling
state
:param exception_class: Exception type to raise when call limit is
exceeded
:param signal_num: POSIX signal ID
:param current_stack_frame: current stack frame
"""
raise exception_class()
[docs]
class ProcedureState(str, enum.Enum):
"""
Represents the script execution state.
"""
UNKNOWN = "UNKNOWN"
IDLE = "IDLE"
CREATING = "CREATING"
PREP_ENV = "PREP_ENV"
LOADING = "LOADING"
INITIALISING = "INITIALISING"
READY = "READY"
RUNNING = "RUNNING"
COMPLETE = "COMPLETE"
STOPPED = "STOPPED"
FAILED = "FAILED"
[docs]
class LifecycleMessage(EventMessage):
"""
LifecycleMessage is a message type for script lifecycle events.
"""
[docs]
def __init__(self, msg_src: str, new_state: ProcedureState):
super().__init__(msg_src, "LIFECYCLE", new_state)
[docs]
class ExecutableScript(BaseModel, abc.ABC):
"""
Base class for all executable scripts.
Expected specialisations:
- scripts on filesystem
- scripts in git repository
- scripts given as a string
- scripts stored in the ODA
- etc.
"""
@model_validator(mode="after")
def validate_prefix(self):
if not self.script_uri.startswith(self.get_prefix()):
raise ValueError(
f"Incorrect prefix for {self.__class__.__name__}: {self.script_uri}"
)
return self
@model_serializer(mode="wrap")
def _serialize_executable_script(
self, default_serializer: SerializerFunctionWrapHandler
) -> dict[str, Any]:
dumped = default_serializer(self)
dumped.update({"script_type": self.get_type()})
return dumped
[docs]
class FileSystemScript(ExecutableScript):
"""
Represents a script stored on the file system.
"""
script_uri: str
script_type: Literal["filesystem"] = "filesystem"
@staticmethod
def get_type():
return "filesystem"
@staticmethod
def get_prefix():
return "file://"
[docs]
class GitScript(ExecutableScript):
"""
Represents a script in a git repository.
"""
script_uri: str
git_args: GitArgs = Field(default=GitArgs())
create_env: bool = False
script_type: Literal["git"] = "git"
@staticmethod
def get_type():
return "git"
@staticmethod
def get_prefix():
return "git://"
[docs]
@dataclass
class RunThreadState:
"""
Represents the state of a thread running a user function during its
execution.
Attributes:
thread: The thread whose state is being represented.
fn_name: The name of the function associated with the thread.
exception: An exception that occurred during thread execution, if any.
"""
thread: threading.Thread
fn_name: str
exception: BaseException | None = None
[docs]
class ScriptWorker(mptools.ProcWorker):
"""
ScriptWorker loads user code in a child process, running functions of that
user code on request.
ScriptWorker acts when a message is received on its work queue. It responds to four
types of external message:
1. LOAD - to load the specified code in this process
2. ENV - to install the dependencies for the specified script in this process
3. RUN - to run the named function in this process
4. PUBSUB - external pubsub messages that should be published locally
In addition, the event loop also handles two internal messages originating from
the user function execution thread:
5. USER_FN_COMPLETE - indicates successful completion of the requested user function
6. USER_FN_FAILED - indicates failure during user function execution
ScriptWorker converts external inter-process mptool pub/sub messages to
intra-process pypubsub pub/sub messages. That is, EventMessages received on the
local work queue are rebroadcast locally as pypubsub messages. Likewise, the
ScriptWorker listens to all pypubsub messages broadcast locally,
converts them to pub/sub EventQueue messages, and puts them on the 'main'
queue for transmission to other interested ScriptWorkers.
"""
# install our custom signal handler that raises an exception on SIGTERM
term_handler = staticmethod(script_signal_handler) # noqa: E731
[docs]
def __init__(
self,
name: str,
startup_event: multiprocessing.Event,
shutdown_event: multiprocessing.Event,
event_q: mptools.MPQueue,
work_q: mptools.MPQueue,
*args,
environment: Environment | None = None,
**kwargs,
):
# Message is rolled by hand and sent via a direct message to the
# ProcessManager as we want to announce CREATING at the earliest
# possible moment; we can't announce via pypubsub just yet as the
# intraprocess<->interprocess republish function is not registered
# till later in the construction process
#
# This message is now ignored by the message loop as the managers set
# the initial CREATING state themselves to avoid a race
msg = EventMessage(
msg_src=name,
msg_type="PUBSUB",
msg=dict(
topic="procedure.lifecycle.statechange",
kwargs=dict(new_state=ProcedureState.CREATING),
),
)
event_q.put(msg)
self.name = name
self._environment = environment
self.work_q = work_q
# user_module will be set on LOAD message
self.user_module = None
# run_state will be set on RUN message and cleared on completion
self._run_state: RunThreadState | None = None
super().__init__(name, startup_event, shutdown_event, event_q, *args, **kwargs)
# Register a callback function so that all pypubsub messages broadcast
# in this process are also queued for distribution to remote processes
pub.subscribe(self.republish, pub.ALL_TOPICS)
def init_args(self, args, kwargs):
self.init_input = ProcedureInput(*args, **kwargs)
def startup(self) -> None:
super().startup()
# mark state as IDLE to signify that this child process started up
# successfully
self.publish_lifecycle(ProcedureState.IDLE)
def shutdown(self) -> None:
super().shutdown()
# Technically, unsubscribing is unnecessary as pypubsub holds weak
# references to listeners and automatically unsubscribes listeners
# that have been deleted
pub.unsubscribe(self.republish, pub.ALL_TOPICS)
[docs]
def publish_lifecycle(self, new_state: ProcedureState):
"""
Broadcast a lifecycle status change event.
:param new_state: new lifecycle state
"""
# This message could be broadcast on pypubsub, letting the republish
# callback rebroadcast it on the mptools bus. But, we know there are no
# local subscribers so bypass the pypubsub step and broadcast directly to
# the inter-process event bus.
# pub.sendMessage(
# topics.procedure.lifecycle.statechange,
# msg_src=self.name,
# new_state=new_state,
# )
msg = EventMessage(
msg_src=self.name,
msg_type="PUBSUB",
msg=dict(
topic="procedure.lifecycle.statechange",
kwargs=dict(new_state=new_state),
),
)
self.event_q.put(msg)
[docs]
def republish(self, topic: pub.Topic = pub.AUTO_TOPIC, **kwargs) -> None:
"""
Republish a local pypubsub event over the inter-process mptools event
bus.
:param topic: message topic, set automatically by pypubsub
:param kwargs: any metadata associated with pypubsub message
:return:
"""
# avoid infinite loop - do not republish external events
try:
msg_src = kwargs.pop("msg_src")
except KeyError:
# No message source = virgin event published on pypubsub
msg_src = self.name
# ... but if this is a local message (message source = us), send it
# out to the main queue and hence on to other EventBusWorkers
if msg_src == self.name:
# Convert pypubsub event to the equivalent mptools EventMessage
msg = EventMessage(
self.name, "PUBSUB", dict(topic=topic.name, kwargs=kwargs)
)
# not that this is a blocking put. If the queue is full, this call
# will block until the queue has room to accept the message
self.log(logging.DEBUG, "Republishing local pypubsub event: %s", msg)
self.event_q.put(msg)
def _on_pubsub(self, evt: EventMessage) -> None:
# take the work item - the external pub/sub EventMessage - and
# rebroadcast it locally as a pypubsub message, avoiding an infinite
# loop by ignoring events that originated from us.
if evt.msg_src != self.name:
self.log(logging.DEBUG, "Republishing external event: %s", evt)
payload = evt.msg
topic = payload["topic"]
try:
pub.sendMessage(topic, msg_src=evt.msg_src, **payload["kwargs"])
except SenderUnknownMsgDataError:
# Topic doesn't accept msg_src — deliver without it.
# Loop prevention still works: the local republish callback
# will re-emit with msg_src=self.name, which _on_pubsub
# discards on the next round-trip.
pub.sendMessage(topic, **payload["kwargs"])
else:
self.log(logging.DEBUG, "Discarding internal event: %s", evt)
def _on_env(self, evt: EventMessage) -> None:
"""
Configure the Python environment for script execution.
The venv is created in the main process, but pip install happens here
in the ScriptWorker process. The creating/created Events coordinate
multiple ScriptWorkers requesting the same environment.
"""
self.publish_lifecycle(ProcedureState.PREP_ENV)
if self._environment is None:
raise RuntimeError("Install failed, environment has not been defined")
if not self._environment.created.is_set():
if not self._environment.creating.is_set():
self._environment.creating.set()
script = evt.msg
if not isinstance(script, GitScript):
raise RuntimeError(
"Cannot create virtual environment for script type"
f" {script.__class__.__name__}"
)
clone_dir = GitManager.clone_repo(script.git_args)
pip_path = os.path.join(self._environment.location, "bin", "pip")
pypi_extras = [
item for s in PYPI_EXTRA_URLS for item in ["--extra-index-url", s]
]
try:
# upgrade pip version as the one packages as the default has
# bugs w.r.t. installing dependencies
subprocess.check_output(
[
pip_path,
"install",
"--index-url=https://pypi.org/simple",
"--upgrade",
"pip",
]
)
# Previously, we used poetry to export requirements and
# then use pip to install from that requirements file. Full
# PEP 517/PEP 621 support for reading metadata and
# installing dependencies during 'pip install .' was
# introduced with pip >= 21.3, making this unnecessary.
subprocess.check_output(
[pip_path, "install", ".", *pypi_extras],
cwd=clone_dir,
)
except subprocess.CalledProcessError as e:
LOGGER.exception("pip install failed: %s", e)
raise RuntimeError(
"Something went wrong during script environment"
f" installation: {e}"
) from None
# TODO: How to handle if another process is waiting on created_condition but install fails?
self._environment.created.set()
else:
# Environment is being created by another script. Wait for the
# other process to finish environment installation before proceeding.
# Throw a timeout error if env creation takes too long, likely means that
# the environment installation has failed
self._environment.created.wait(timeout=ENV_CREATION_TIMEOUT_SECS)
# Clear cached modules that might have been imported from parent environment
self._cleanup_modules()
# replace inherited sys.path
sys.path = self._compute_venv_sys_path(self._environment.location)
self.publish_lifecycle(ProcedureState.IDLE)
def _compute_venv_sys_path(self, venv_path: str) -> list[str]:
"""
Compute what sys.path should look like if the given virtual environment were
activated.
:param venv_path: path to the virtual environment
:return: list of paths in predicted sys.path
"""
venv_path = os.path.abspath(venv_path)
pyver = f"python{sys.version_info.major}.{sys.version_info.minor}"
# Reconstruct sys.path
venv_sys_path = []
# Python adds '' first
venv_sys_path.append("")
# Then stdlib (from base)
stdlib_path = sysconfig.get_path("stdlib")
venv_sys_path.append(stdlib_path)
# Then site-packages (from venv)
site_packages_candidates = [
os.path.join(venv_path, "lib", pyver, "site-packages"),
os.path.join(venv_path, "lib64", pyver, "site-packages"),
]
site_packages = next(
(p for p in site_packages_candidates if os.path.isdir(p)), None
)
if site_packages:
venv_sys_path.append(site_packages)
return venv_sys_path
def _cleanup_modules(self) -> None:
"""
Cleans up cached Python modules except for essential system modules.
This prevents conflicts between parent and child environment imports.
When using `multiprocessing.spawn`, Python imports the parent module
in the child process first to get access to the objects that need to
be pickled/unpickled. During this import, Python caches module
imports in sys.modules. So, while ScriptWorker sets up a new venv and
adds it to sys.path, by this time some modules may already be imported
and cached from the parent's original environment.
"""
ESSENTIAL_MODULES = sys.stdlib_module_names | {
# avoid '_pickle.PicklingError: Can't pickle X: it's not the same object as ska_oso_oet.XXX' errors
"ska_oso_oet",
"pubsub", # This ensures the same pubsub instance is used by the OET and scripts
"tango", # work around issue reimporting tango
}
# Get modules to remove (excluding essential ones)
modules_to_remove = [
module_name
for module_name in sys.modules.keys()
if not any(
(
# e.g., don't remove 'ska_oso_oet' or 'ska_oso_oet.XXX', but DO
# remove 'ska_oso_oet_blahblahblah'
module_name == essential or module_name.startswith(f"{essential}.")
for essential in ESSENTIAL_MODULES
)
)
]
# Remove non-essential modules
for module_name in modules_to_remove:
sys.modules.pop(module_name)
def _on_load(self, evt: EventMessage) -> None:
self.publish_lifecycle(ProcedureState.LOADING)
script: ExecutableScript = evt.msg
self.log(logging.DEBUG, "Loading user script %s", script)
try:
self.user_module = ModuleFactory.get_module(script)
except FileNotFoundError:
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), script.script_uri
) from None
self.publish_lifecycle(ProcedureState.IDLE)
def _run_user_function(
self, fn: Callable[..., Any], fn_args: ProcedureInput
) -> None:
"""
Executes a user-provided function with the given arguments and report
its final completion state via messaging.
This function should be run on a separate thread. The result of
calling the user function is communicated back to the event loop
running on the main thread by adding a message to the work queue,
where it can be handled by the event loop like any other message.
:param fn: The callable user function to execute.
:param fn_args: Arguments for the user function
"""
msg = EventMessage(self.name, "USER_FN_COMPLETE", "User function complete")
try:
fn(*fn_args.args, **fn_args.kwargs)
except BaseException as exc: # pylint: disable=broad-except
# Do not pass the exception in the EventMessage as the
# multiprocessing Queue's feeder thread then tries to pickle the
# EventMessage containing the exception to write it to the
# pipe, which fails because tblib.pickling_support is not importable
# in this thread. The feeder thread silently swallows the failure,
# which the state never transitions to FAILED.
self._run_state.exception = exc
msg = EventMessage(self.name, "USER_FN_FAILED", "User function failed")
finally:
# this is a critical message, so we use a blocking put over safe_put,
# just in case safe_put times out
self.work_q.put(msg)
def _on_user_fn_complete(self, _: EventMessage) -> Type[StopIteration] | None:
"""
Happy path handler for when a user function completes successfully.
:param _: unused EventMessage on USER_FN_COMPLETE topic
:return: StopIteration if user function was 'main', else None
"""
self.publish_lifecycle(ProcedureState.READY)
is_main = self._run_state.fn_name == "main"
self._run_state = None
# This is a historical artefact due for some refactoring. StopIteration
# is used to signal 'main' has completed, which will safely end the event
# loop and thus the script process - the intended behaviour being that
# scripts are one-shot, and once run they can't be re-run. However,
# reusing scripts would be way more efficient, e.g., the same script
# process can be run multiple times in sequence for multiple SBs.
return StopIteration if is_main else None
def _on_user_fn_failed(self, _: EventMessage) -> None:
"""
Sad path handler for when a user function raises an exception.
:param _: unused EventMessage on USER_FN_FAILED topic
:raises: exception attached to RunThreadState
"""
# this runs on the main thread, as do all the other handlers, so
# there's no race with those other handlers modifying _run_state
exc = self._run_state.exception
self._run_state = None
raise exc
def _on_run(self, evt: EventMessage) -> None:
fn_name, fn_args = evt.msg
if self._run_state:
raise RuntimeError(
f"Cannot execute {fn_name}: {self._run_state.fn_name} is already"
" running"
)
# special case: get init args from instance, check for method.
# we may want to revisit whether init remains a special case
if fn_name == "init":
if not hasattr(self.user_module, "init"):
self.publish_lifecycle(ProcedureState.READY)
return
fn_args = self.init_input
fn = getattr(self.user_module, fn_name)
# Strip 'context' kwarg if the function signature doesn't accept it,
# for backward compatibility with scripts that don't expect it
if "context" in fn_args.kwargs:
sig = inspect.signature(fn)
accepts_context = "context" in sig.parameters or any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
)
if not accepts_context:
fn_args = fn_args.model_copy(
update={
"kwargs": {
k: v for k, v in fn_args.kwargs.items() if k != "context"
}
},
)
self.log(
logging.DEBUG,
"Calling user function %s",
repr(fn_args).replace("<ProcedureInput", fn_name)[:-1],
)
next_state = (
ProcedureState.INITIALISING if fn_name == "init" else ProcedureState.RUNNING
)
self.publish_lifecycle(next_state)
self._run_state = RunThreadState(
fn_name=fn_name,
exception=None,
thread=threading.Thread(
target=self._run_user_function,
args=(fn, fn_args),
name=self.name,
# this MUST be a daemon thread otherwise a non-cooperative user
# thread will prevent the script process from exiting. Yes, it
# means the user thread will be terminated abruptly, but that's
# desired for a 'force stop' operation involving SIGTERMs being
# sent to the ScriptWorker process.
daemon=True,
),
)
self._run_state.thread.start()
def _get_message_handler(self, msg_type: str):
"""
Retrieves the appropriate message handler function for the given message type.
The function maps specific message types to their corresponding handler
methods.
:param msg_type: The type of the message to handle. Accepted values include
"PUBSUB", "ENV", "LOAD", "RUN", "USER_FN_COMPLETE", and "USER_FN_FAILED".
:return: The handler function corresponding to the given message type, or
None if no handler is found for the message type.
"""
handlers = {
"PUBSUB": self._on_pubsub,
"ENV": self._on_env,
"LOAD": self._on_load,
"RUN": self._on_run,
"USER_FN_COMPLETE": self._on_user_fn_complete,
"USER_FN_FAILED": self._on_user_fn_failed,
}
return handlers.get(msg_type)
[docs]
def main_loop(self) -> None:
"""
main_loop delivers each event received on the work queue to the
main_func template method, while checking for shutdown notifications.
Event delivery will cease when the shutdown event is set or a special
sentinel message is sent.
"""
self.log(logging.DEBUG, "Entering ScriptWorker.main_loop")
# stop processing as soon as the shutdown_event is set. Once set, this
# while loop terminates, thus ending main_loop and starting shutdown
# of this ProcWorker.
try:
while not self.shutdown_event.is_set():
# Get next work item. This call returns after the default safe_get
# timeout unless an item is in the queue.
item = self.work_q.safe_get()
# Go back to the top of the while loop if no message was received,
# thus checking the shutdown event again.
if not item:
continue
# ok - an item was received from queue
self.log(
logging.DEBUG, f"ScriptWorker.main_loop received '{item}' message"
)
# if item is the sentinel message, break to exit out of main_loop
# and start shutdown
if item == "END":
break
# otherwise handle the event based on type
handler = self._get_message_handler(item.msg_type)
if handler:
ret = handler(item)
if ret == StopIteration:
break
else:
self.log(logging.WARN, "Unexpected message: %s", item)
except mptools.TerminateInterrupt:
# raised by the signal handler on Proc.terminate()
pass
else:
self.publish_lifecycle(ProcedureState.COMPLETE)
[docs]
class ProcessManager:
"""
ProcessManager tracks and coordinates ScriptWorker processes.
ProcessManager is responsible for tracking script execution state and
coordinating with main_loop via pypubsub messages. Actual process
creation and management is handled by main_loop in main.py.
ProcessManager uses pypubsub to send management requests (create, run,
stop) to main_loop, which creates and manages the actual ScriptWorker
processes.
Note: ProcessManager does not maintain a history of script execution.
History is recorded and managed by the ScriptExecutionService.
"""
_DELETEABLE_STATES = [
ProcedureState.COMPLETE,
ProcedureState.FAILED,
ProcedureState.STOPPED,
ProcedureState.UNKNOWN,
]
[docs]
def __init__(
self,
states: Optional[dict[int, ProcedureState]] = None,
state_lock: Optional[threading.RLock] = None,
) -> None:
"""
Create a new ProcessManager.
:param states: Optional external states dictionary to use instead of
creating an internal one. This allows sharing state with other
components (e.g., ScriptExecutionService) to avoid race conditions.
When external states are provided, this manager will NOT subscribe
to statechange events as the external owner is responsible for updates.
:param state_lock: Optional external lock for state updates. Should be
provided if states is provided to ensure thread-safe updates.
"""
# counter used to generate process ID for new processes
self._pid_counter = itertools.count(1)
# Track if we're using external states (to control subscription behavior)
self._using_external_states = states is not None
# maps Proc ID to current state - use external dict if provided
self.states: dict[int, ProcedureState] = states if states is not None else {}
# Use external lock if provided, otherwise create our own
self._state_updating = (
state_lock if state_lock is not None else threading.RLock()
)
# Partial function for sending pubsub messages with msg_src=None,
# which signals to EventBusWorker.republish to use the worker's name
# as the source, ensuring the message is queued correctly
self._send_message = functools.partial(pub.sendMessage, msg_src=None)
# Subscribe to management responses for async failure handling
pub.subscribe(self._on_created, "procedure.management.created")
# Subscribe to lifecycle events to keep state in sync ONLY if we own states
# When using external states, the owner (e.g., ScriptExecutionService)
# is responsible for updating the shared dict to avoid subscription order issues
if not self._using_external_states:
pub.subscribe(self._on_statechange, "procedure.lifecycle.statechange")
def _on_created( # pylint: disable=unused-argument
self, msg_src: str, pid: int, success: bool, error: str | None = None
):
"""
Handle procedure.management.created response.
Updates state to FAILED if creation was unsuccessful.
:param msg_src: message source
:param pid: PID from the create request
:param success: whether creation succeeded
:param error: error message if failed
"""
if not success:
LOGGER.warning("ScriptWorker #%s creation failed: %s", pid, error)
with self._state_updating:
self.states[pid] = ProcedureState.FAILED
def _on_statechange(self, msg_src: str, new_state: ProcedureState):
"""
Handle procedure.lifecycle.statechange events.
Updates internal state tracking and cleans up resources for terminal states.
:param msg_src: PID of the procedure
:param new_state: new procedure state
"""
try:
pid = int(msg_src)
except (ValueError, TypeError):
return
with self._state_updating:
self.states[pid] = new_state
# Clean up state for terminal states
if new_state in self._DELETEABLE_STATES:
self.states.pop(pid, None)
[docs]
def get_state(self, pid: int) -> ProcedureState | None:
"""
Get the current state of a procedure.
:param pid: Procedure ID
:return: Current state or None if not found
"""
return self.states.get(pid)
[docs]
def has_running(self) -> bool:
"""
Check if any procedure is currently running.
:return: True if any procedure is in RUNNING state
"""
return any(state == ProcedureState.RUNNING for state in self.states.values())
[docs]
def create(self, script: ExecutableScript, *, init_args: ProcedureInput) -> int:
"""
Request creation of a new ScriptWorker process.
This method emits a procedure.management.create pypubsub message,
which main_loop handles to create the actual process. The method
returns immediately with state=CREATING.
:param script: script to execute
:param init_args: script initialisation arguments
:return: PID assigned to this procedure
"""
pid = next(self._pid_counter)
LOGGER.debug("Requesting ScriptWorker #%s for %s", pid, script)
# Register initial state
self.states[pid] = ProcedureState.CREATING
# Emit management request via pypubsub
# Note: environment creation is handled by main_loop
self._send_message(
"procedure.management.create",
pid=pid,
script=script,
init_args=init_args,
)
return pid
[docs]
def run(
self,
process_id: int,
*,
call: str,
run_args: ProcedureInput,
force_start: bool = False,
) -> None:
"""
Request execution of a function in a prepared procedure.
This method emits a procedure.management.run pypubsub message,
which main_loop handles to send the run command to the ScriptWorker.
:param process_id: ID of Procedure to execute
:param call: name of function to call
:param run_args: late-binding arguments to provide to the script
:param force_start: Add run command even if not READY (ignored for terminal states)
"""
if process_id not in self.states:
raise ValueError(f"PID #{process_id} not found")
current_state = self.states[process_id]
# These are states where the Procedure cannot be run
final_states = [
ProcedureState.COMPLETE,
ProcedureState.FAILED,
ProcedureState.STOPPED,
ProcedureState.UNKNOWN,
]
if current_state in final_states:
raise ValueError(f"PID #{process_id} unrunnable in state {current_state}")
# These are intermediate states where the script is not ready to run
# (unless force_start is True)
intermediate_states = [
ProcedureState.CREATING,
ProcedureState.LOADING,
ProcedureState.INITIALISING,
]
if not force_start and current_state in intermediate_states:
raise ValueError(
f"PID #{process_id} not ready to run in state {current_state}"
)
# Check if already running (unless forcing)
if not force_start and current_state == ProcedureState.RUNNING:
raise ValueError(f"PID #{process_id} is already RUNNING")
# Ensure no other script is running
if self.has_running():
running_pid = next(
pid
for pid, state in self.states.items()
if state == ProcedureState.RUNNING
)
raise ValueError(
f"Cannot start PID {process_id}: PID #{running_pid} is RUNNING"
)
LOGGER.debug("Requesting 'run %s' for PID %d", call, process_id)
# Emit management request via pypubsub
self._send_message(
"procedure.management.run",
pid=process_id,
fn_name=call,
run_args=run_args,
)
[docs]
def stop(self, process_id: int) -> None:
"""
Request termination of a running procedure.
This method emits a procedure.management.stop pypubsub message,
which main_loop handles to terminate the ScriptWorker.
:param process_id: ID of Procedure to stop
"""
if process_id not in self.states:
raise ValueError(f"Process {process_id} not found")
state = self.states[process_id]
stoppable_states = [
ProcedureState.IDLE,
ProcedureState.INITIALISING,
ProcedureState.READY,
ProcedureState.RUNNING,
ProcedureState.LOADING,
]
if state not in stoppable_states:
raise ValueError(f"Cannot stop PID {process_id} with state {state.name}")
LOGGER.debug("Requesting stop for PID %d", process_id)
self._send_message(
"procedure.management.stop",
pid=process_id,
)
[docs]
class ModuleFactory:
"""
Factory class used to return Python Module instances from a variety of
storage back-ends.
"""
[docs]
@staticmethod
def get_module(script: ExecutableScript):
"""
Load Python code from storage, returning an executable Python module.
:param script: Script object describing the script to load
:return: Python module
"""
if isinstance(script, GitScript):
loader = ModuleFactory._load_module_from_git
return loader(script)
if isinstance(script, FileSystemScript):
loader = ModuleFactory._load_module_from_file
return loader(script.script_uri)
raise ValueError(f"Script type not handled: {script.__class__.__name__}")
@staticmethod
def _load_module_from_file(script_uri: str) -> types.ModuleType:
"""
Load Python module from file storage. This module handles file://
and git:// URIs.
:param script_uri: URI of script to load.
:return: Python module
"""
# remove prefix
path = script_uri[7:]
loader = importlib.machinery.SourceFileLoader("user_module", path)
user_module = types.ModuleType(loader.name)
loader.exec_module(user_module)
return user_module
@staticmethod
def _load_module_from_git(script: GitScript) -> types.ModuleType:
"""
Load Python module from a git repository. Clones the repository if repo has not yet
been cloned. The repository will not have been cloned if default environment is being
used. This module handles git:// URIs.
:param script: GitScript object with information on script location
:return: Python module
"""
clone_path = GitManager.clone_repo(script.git_args)
# remove prefix and any leading slashes
relative_script_path = script.script_uri[len(script.get_prefix()) :].lstrip("/")
script_path = clone_path + "/" + relative_script_path
loader = importlib.machinery.SourceFileLoader("user_module", script_path)
user_module = types.ModuleType(loader.name)
loader.exec_module(user_module)
return user_module