Source code for ska_oso_oet.procedure.domain

"""
The ska_oso_oet.procedure.domain module holds domain entities from the script
execution domain. Entities in this domain are things like scripts,
OS processes, process supervisors, signal handlers, etc.
"""

import abc
import enum
import errno
import functools
import importlib.machinery
import inspect
import itertools
import logging
import multiprocessing
import os
import signal
import subprocess
import sys
import sysconfig
import threading
import types
from dataclasses import dataclass
from typing import Any, Callable, Literal, Optional, Type

from pubsub import pub
from pubsub.core.topicargspec import SenderUnknownMsgDataError
from pydantic import BaseModel, Field, model_serializer, model_validator
from pydantic_core.core_schema import SerializerFunctionWrapHandler

from ska_oso_oet import mptools
from ska_oso_oet.mptools import EventMessage
from ska_oso_oet.procedure.environment import PYPI_EXTRA_URLS, Environment
from ska_oso_oet.procedure.gitmanager import GitArgs, GitManager

LOGGER = logging.getLogger(__name__)

# Maximum time allowed for environment creation. Note that environment
# creation must succeed within this period or creation will be considered as
# failed, regardless of the current status of that installation process.
# Experience shows that the default timeout of 600 secs (10 minutes) is more
# than sufficient for any standard Python project. Installing a project that
# required compilation, such as a project that depends on a new version of
# Tango, could in principle exceed this timeout. However, as the Docker image
# does not include compiler tools, compilation is not possible anyway.
ENV_CREATION_TIMEOUT_SECS = 600.0

# Timeout per SIGTERM retry when terminating a script process. On slow CI
# runners, coverage atexit cleanup keeps the process alive longer than the
# default 0.1s, causing terminate() to report failure (state=UNKNOWN).
PROC_TERMINATE_TIMEOUT_SECS = float(os.getenv("PROC_TERMINATE_TIMEOUT_SECS", "0.1"))

DEFAULT_SIGTERM_HANDLER = signal.getsignal(signal.SIGTERM)


[docs] def script_signal_handler( signal_object, exception_class, signal_num: int, # pylint: disable=unused-argument current_stack_frame, ) -> None: """ Custom signal handling function that simply raises an exception. Assuming the running Python script does not catch this exception, it will interrupt script execution and result in termination of that script. We don't want all sibling script processes to terminate, hence no setting of shutdown_event is done in this handler. :param signal_object: SignalObject to modify to reflect signal-handling state :param exception_class: Exception type to raise when call limit is exceeded :param signal_num: POSIX signal ID :param current_stack_frame: current stack frame """ raise exception_class()
[docs] class ProcedureState(str, enum.Enum): """ Represents the script execution state. """ UNKNOWN = "UNKNOWN" IDLE = "IDLE" CREATING = "CREATING" PREP_ENV = "PREP_ENV" LOADING = "LOADING" INITIALISING = "INITIALISING" READY = "READY" RUNNING = "RUNNING" COMPLETE = "COMPLETE" STOPPED = "STOPPED" FAILED = "FAILED"
[docs] class LifecycleMessage(EventMessage): """ LifecycleMessage is a message type for script lifecycle events. """
[docs] def __init__(self, msg_src: str, new_state: ProcedureState): super().__init__(msg_src, "LIFECYCLE", new_state)
[docs] class ExecutableScript(BaseModel, abc.ABC): """ Base class for all executable scripts. Expected specialisations: - scripts on filesystem - scripts in git repository - scripts given as a string - scripts stored in the ODA - etc. """ @model_validator(mode="after") def validate_prefix(self): if not self.script_uri.startswith(self.get_prefix()): raise ValueError( f"Incorrect prefix for {self.__class__.__name__}: {self.script_uri}" ) return self @model_serializer(mode="wrap") def _serialize_executable_script( self, default_serializer: SerializerFunctionWrapHandler ) -> dict[str, Any]: dumped = default_serializer(self) dumped.update({"script_type": self.get_type()}) return dumped
[docs] class FileSystemScript(ExecutableScript): """ Represents a script stored on the file system. """ script_uri: str script_type: Literal["filesystem"] = "filesystem" @staticmethod def get_type(): return "filesystem" @staticmethod def get_prefix(): return "file://"
[docs] class GitScript(ExecutableScript): """ Represents a script in a git repository. """ script_uri: str git_args: GitArgs = Field(default=GitArgs()) create_env: bool = False script_type: Literal["git"] = "git" @staticmethod def get_type(): return "git" @staticmethod def get_prefix(): return "git://"
[docs] class ProcedureInput(BaseModel): """ ProcedureInput is a non-functional dataclass holding the arguments passed to a script method. """ args: list = Field(default_factory=list) kwargs: dict = Field(default_factory=dict)
[docs] def __init__(self, *args, **kwargs): super().__init__(args=args, kwargs=kwargs)
def __add__(self, other): if other.args: raise NotImplementedError("Combining positional arguments not supported") combined_kwargs = self.kwargs.copy() # pylint: disable=no-member combined_kwargs.update(other.kwargs) return ProcedureInput(*self.args, **combined_kwargs) def __eq__(self, other): if not isinstance(other, ProcedureInput): return False if self.args == other.args and self.kwargs == other.kwargs: return True return False def __repr__(self): args = ", ".join((str(a) for a in self.args)) kwargs = ", ".join( [ "{!s}={!r}".format(k, v) for k, v in self.kwargs.items() # pylint: disable=no-member ] ) return "<ProcedureInput({})>".format(", ".join((args, kwargs)))
[docs] @dataclass class RunThreadState: """ Represents the state of a thread running a user function during its execution. Attributes: thread: The thread whose state is being represented. fn_name: The name of the function associated with the thread. exception: An exception that occurred during thread execution, if any. """ thread: threading.Thread fn_name: str exception: BaseException | None = None
[docs] class ScriptWorker(mptools.ProcWorker): """ ScriptWorker loads user code in a child process, running functions of that user code on request. ScriptWorker acts when a message is received on its work queue. It responds to four types of external message: 1. LOAD - to load the specified code in this process 2. ENV - to install the dependencies for the specified script in this process 3. RUN - to run the named function in this process 4. PUBSUB - external pubsub messages that should be published locally In addition, the event loop also handles two internal messages originating from the user function execution thread: 5. USER_FN_COMPLETE - indicates successful completion of the requested user function 6. USER_FN_FAILED - indicates failure during user function execution ScriptWorker converts external inter-process mptool pub/sub messages to intra-process pypubsub pub/sub messages. That is, EventMessages received on the local work queue are rebroadcast locally as pypubsub messages. Likewise, the ScriptWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the 'main' queue for transmission to other interested ScriptWorkers. """ # install our custom signal handler that raises an exception on SIGTERM term_handler = staticmethod(script_signal_handler) # noqa: E731
[docs] def __init__( self, name: str, startup_event: multiprocessing.Event, shutdown_event: multiprocessing.Event, event_q: mptools.MPQueue, work_q: mptools.MPQueue, *args, environment: Environment | None = None, **kwargs, ): # Message is rolled by hand and sent via a direct message to the # ProcessManager as we want to announce CREATING at the earliest # possible moment; we can't announce via pypubsub just yet as the # intraprocess<->interprocess republish function is not registered # till later in the construction process # # This message is now ignored by the message loop as the managers set # the initial CREATING state themselves to avoid a race msg = EventMessage( msg_src=name, msg_type="PUBSUB", msg=dict( topic="procedure.lifecycle.statechange", kwargs=dict(new_state=ProcedureState.CREATING), ), ) event_q.put(msg) self.name = name self._environment = environment self.work_q = work_q # user_module will be set on LOAD message self.user_module = None # run_state will be set on RUN message and cleared on completion self._run_state: RunThreadState | None = None super().__init__(name, startup_event, shutdown_event, event_q, *args, **kwargs) # Register a callback function so that all pypubsub messages broadcast # in this process are also queued for distribution to remote processes pub.subscribe(self.republish, pub.ALL_TOPICS)
def init_args(self, args, kwargs): self.init_input = ProcedureInput(*args, **kwargs) def startup(self) -> None: super().startup() # mark state as IDLE to signify that this child process started up # successfully self.publish_lifecycle(ProcedureState.IDLE) def shutdown(self) -> None: super().shutdown() # Technically, unsubscribing is unnecessary as pypubsub holds weak # references to listeners and automatically unsubscribes listeners # that have been deleted pub.unsubscribe(self.republish, pub.ALL_TOPICS)
[docs] def publish_lifecycle(self, new_state: ProcedureState): """ Broadcast a lifecycle status change event. :param new_state: new lifecycle state """ # This message could be broadcast on pypubsub, letting the republish # callback rebroadcast it on the mptools bus. But, we know there are no # local subscribers so bypass the pypubsub step and broadcast directly to # the inter-process event bus. # pub.sendMessage( # topics.procedure.lifecycle.statechange, # msg_src=self.name, # new_state=new_state, # ) msg = EventMessage( msg_src=self.name, msg_type="PUBSUB", msg=dict( topic="procedure.lifecycle.statechange", kwargs=dict(new_state=new_state), ), ) self.event_q.put(msg)
[docs] def republish(self, topic: pub.Topic = pub.AUTO_TOPIC, **kwargs) -> None: """ Republish a local pypubsub event over the inter-process mptools event bus. :param topic: message topic, set automatically by pypubsub :param kwargs: any metadata associated with pypubsub message :return: """ # avoid infinite loop - do not republish external events try: msg_src = kwargs.pop("msg_src") except KeyError: # No message source = virgin event published on pypubsub msg_src = self.name # ... but if this is a local message (message source = us), send it # out to the main queue and hence on to other EventBusWorkers if msg_src == self.name: # Convert pypubsub event to the equivalent mptools EventMessage msg = EventMessage( self.name, "PUBSUB", dict(topic=topic.name, kwargs=kwargs) ) # not that this is a blocking put. If the queue is full, this call # will block until the queue has room to accept the message self.log(logging.DEBUG, "Republishing local pypubsub event: %s", msg) self.event_q.put(msg)
def _on_pubsub(self, evt: EventMessage) -> None: # take the work item - the external pub/sub EventMessage - and # rebroadcast it locally as a pypubsub message, avoiding an infinite # loop by ignoring events that originated from us. if evt.msg_src != self.name: self.log(logging.DEBUG, "Republishing external event: %s", evt) payload = evt.msg topic = payload["topic"] try: pub.sendMessage(topic, msg_src=evt.msg_src, **payload["kwargs"]) except SenderUnknownMsgDataError: # Topic doesn't accept msg_src — deliver without it. # Loop prevention still works: the local republish callback # will re-emit with msg_src=self.name, which _on_pubsub # discards on the next round-trip. pub.sendMessage(topic, **payload["kwargs"]) else: self.log(logging.DEBUG, "Discarding internal event: %s", evt) def _on_env(self, evt: EventMessage) -> None: """ Configure the Python environment for script execution. The venv is created in the main process, but pip install happens here in the ScriptWorker process. The creating/created Events coordinate multiple ScriptWorkers requesting the same environment. """ self.publish_lifecycle(ProcedureState.PREP_ENV) if self._environment is None: raise RuntimeError("Install failed, environment has not been defined") if not self._environment.created.is_set(): if not self._environment.creating.is_set(): self._environment.creating.set() script = evt.msg if not isinstance(script, GitScript): raise RuntimeError( "Cannot create virtual environment for script type" f" {script.__class__.__name__}" ) clone_dir = GitManager.clone_repo(script.git_args) pip_path = os.path.join(self._environment.location, "bin", "pip") pypi_extras = [ item for s in PYPI_EXTRA_URLS for item in ["--extra-index-url", s] ] try: # upgrade pip version as the one packages as the default has # bugs w.r.t. installing dependencies subprocess.check_output( [ pip_path, "install", "--index-url=https://pypi.org/simple", "--upgrade", "pip", ] ) # Previously, we used poetry to export requirements and # then use pip to install from that requirements file. Full # PEP 517/PEP 621 support for reading metadata and # installing dependencies during 'pip install .' was # introduced with pip >= 21.3, making this unnecessary. subprocess.check_output( [pip_path, "install", ".", *pypi_extras], cwd=clone_dir, ) except subprocess.CalledProcessError as e: LOGGER.exception("pip install failed: %s", e) raise RuntimeError( "Something went wrong during script environment" f" installation: {e}" ) from None # TODO: How to handle if another process is waiting on created_condition but install fails? self._environment.created.set() else: # Environment is being created by another script. Wait for the # other process to finish environment installation before proceeding. # Throw a timeout error if env creation takes too long, likely means that # the environment installation has failed self._environment.created.wait(timeout=ENV_CREATION_TIMEOUT_SECS) # Clear cached modules that might have been imported from parent environment self._cleanup_modules() # replace inherited sys.path sys.path = self._compute_venv_sys_path(self._environment.location) self.publish_lifecycle(ProcedureState.IDLE) def _compute_venv_sys_path(self, venv_path: str) -> list[str]: """ Compute what sys.path should look like if the given virtual environment were activated. :param venv_path: path to the virtual environment :return: list of paths in predicted sys.path """ venv_path = os.path.abspath(venv_path) pyver = f"python{sys.version_info.major}.{sys.version_info.minor}" # Reconstruct sys.path venv_sys_path = [] # Python adds '' first venv_sys_path.append("") # Then stdlib (from base) stdlib_path = sysconfig.get_path("stdlib") venv_sys_path.append(stdlib_path) # Then site-packages (from venv) site_packages_candidates = [ os.path.join(venv_path, "lib", pyver, "site-packages"), os.path.join(venv_path, "lib64", pyver, "site-packages"), ] site_packages = next( (p for p in site_packages_candidates if os.path.isdir(p)), None ) if site_packages: venv_sys_path.append(site_packages) return venv_sys_path def _cleanup_modules(self) -> None: """ Cleans up cached Python modules except for essential system modules. This prevents conflicts between parent and child environment imports. When using `multiprocessing.spawn`, Python imports the parent module in the child process first to get access to the objects that need to be pickled/unpickled. During this import, Python caches module imports in sys.modules. So, while ScriptWorker sets up a new venv and adds it to sys.path, by this time some modules may already be imported and cached from the parent's original environment. """ ESSENTIAL_MODULES = sys.stdlib_module_names | { # avoid '_pickle.PicklingError: Can't pickle X: it's not the same object as ska_oso_oet.XXX' errors "ska_oso_oet", "pubsub", # This ensures the same pubsub instance is used by the OET and scripts "tango", # work around issue reimporting tango } # Get modules to remove (excluding essential ones) modules_to_remove = [ module_name for module_name in sys.modules.keys() if not any( ( # e.g., don't remove 'ska_oso_oet' or 'ska_oso_oet.XXX', but DO # remove 'ska_oso_oet_blahblahblah' module_name == essential or module_name.startswith(f"{essential}.") for essential in ESSENTIAL_MODULES ) ) ] # Remove non-essential modules for module_name in modules_to_remove: sys.modules.pop(module_name) def _on_load(self, evt: EventMessage) -> None: self.publish_lifecycle(ProcedureState.LOADING) script: ExecutableScript = evt.msg self.log(logging.DEBUG, "Loading user script %s", script) try: self.user_module = ModuleFactory.get_module(script) except FileNotFoundError: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), script.script_uri ) from None self.publish_lifecycle(ProcedureState.IDLE) def _run_user_function( self, fn: Callable[..., Any], fn_args: ProcedureInput ) -> None: """ Executes a user-provided function with the given arguments and report its final completion state via messaging. This function should be run on a separate thread. The result of calling the user function is communicated back to the event loop running on the main thread by adding a message to the work queue, where it can be handled by the event loop like any other message. :param fn: The callable user function to execute. :param fn_args: Arguments for the user function """ msg = EventMessage(self.name, "USER_FN_COMPLETE", "User function complete") try: fn(*fn_args.args, **fn_args.kwargs) except BaseException as exc: # pylint: disable=broad-except # Do not pass the exception in the EventMessage as the # multiprocessing Queue's feeder thread then tries to pickle the # EventMessage containing the exception to write it to the # pipe, which fails because tblib.pickling_support is not importable # in this thread. The feeder thread silently swallows the failure, # which the state never transitions to FAILED. self._run_state.exception = exc msg = EventMessage(self.name, "USER_FN_FAILED", "User function failed") finally: # this is a critical message, so we use a blocking put over safe_put, # just in case safe_put times out self.work_q.put(msg) def _on_user_fn_complete(self, _: EventMessage) -> Type[StopIteration] | None: """ Happy path handler for when a user function completes successfully. :param _: unused EventMessage on USER_FN_COMPLETE topic :return: StopIteration if user function was 'main', else None """ self.publish_lifecycle(ProcedureState.READY) is_main = self._run_state.fn_name == "main" self._run_state = None # This is a historical artefact due for some refactoring. StopIteration # is used to signal 'main' has completed, which will safely end the event # loop and thus the script process - the intended behaviour being that # scripts are one-shot, and once run they can't be re-run. However, # reusing scripts would be way more efficient, e.g., the same script # process can be run multiple times in sequence for multiple SBs. return StopIteration if is_main else None def _on_user_fn_failed(self, _: EventMessage) -> None: """ Sad path handler for when a user function raises an exception. :param _: unused EventMessage on USER_FN_FAILED topic :raises: exception attached to RunThreadState """ # this runs on the main thread, as do all the other handlers, so # there's no race with those other handlers modifying _run_state exc = self._run_state.exception self._run_state = None raise exc def _on_run(self, evt: EventMessage) -> None: fn_name, fn_args = evt.msg if self._run_state: raise RuntimeError( f"Cannot execute {fn_name}: {self._run_state.fn_name} is already" " running" ) # special case: get init args from instance, check for method. # we may want to revisit whether init remains a special case if fn_name == "init": if not hasattr(self.user_module, "init"): self.publish_lifecycle(ProcedureState.READY) return fn_args = self.init_input fn = getattr(self.user_module, fn_name) # Strip 'context' kwarg if the function signature doesn't accept it, # for backward compatibility with scripts that don't expect it if "context" in fn_args.kwargs: sig = inspect.signature(fn) accepts_context = "context" in sig.parameters or any( p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values() ) if not accepts_context: fn_args = fn_args.model_copy( update={ "kwargs": { k: v for k, v in fn_args.kwargs.items() if k != "context" } }, ) self.log( logging.DEBUG, "Calling user function %s", repr(fn_args).replace("<ProcedureInput", fn_name)[:-1], ) next_state = ( ProcedureState.INITIALISING if fn_name == "init" else ProcedureState.RUNNING ) self.publish_lifecycle(next_state) self._run_state = RunThreadState( fn_name=fn_name, exception=None, thread=threading.Thread( target=self._run_user_function, args=(fn, fn_args), name=self.name, # this MUST be a daemon thread otherwise a non-cooperative user # thread will prevent the script process from exiting. Yes, it # means the user thread will be terminated abruptly, but that's # desired for a 'force stop' operation involving SIGTERMs being # sent to the ScriptWorker process. daemon=True, ), ) self._run_state.thread.start() def _get_message_handler(self, msg_type: str): """ Retrieves the appropriate message handler function for the given message type. The function maps specific message types to their corresponding handler methods. :param msg_type: The type of the message to handle. Accepted values include "PUBSUB", "ENV", "LOAD", "RUN", "USER_FN_COMPLETE", and "USER_FN_FAILED". :return: The handler function corresponding to the given message type, or None if no handler is found for the message type. """ handlers = { "PUBSUB": self._on_pubsub, "ENV": self._on_env, "LOAD": self._on_load, "RUN": self._on_run, "USER_FN_COMPLETE": self._on_user_fn_complete, "USER_FN_FAILED": self._on_user_fn_failed, } return handlers.get(msg_type)
[docs] def main_loop(self) -> None: """ main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications. Event delivery will cease when the shutdown event is set or a special sentinel message is sent. """ self.log(logging.DEBUG, "Entering ScriptWorker.main_loop") # stop processing as soon as the shutdown_event is set. Once set, this # while loop terminates, thus ending main_loop and starting shutdown # of this ProcWorker. try: while not self.shutdown_event.is_set(): # Get next work item. This call returns after the default safe_get # timeout unless an item is in the queue. item = self.work_q.safe_get() # Go back to the top of the while loop if no message was received, # thus checking the shutdown event again. if not item: continue # ok - an item was received from queue self.log( logging.DEBUG, f"ScriptWorker.main_loop received '{item}' message" ) # if item is the sentinel message, break to exit out of main_loop # and start shutdown if item == "END": break # otherwise handle the event based on type handler = self._get_message_handler(item.msg_type) if handler: ret = handler(item) if ret == StopIteration: break else: self.log(logging.WARN, "Unexpected message: %s", item) except mptools.TerminateInterrupt: # raised by the signal handler on Proc.terminate() pass else: self.publish_lifecycle(ProcedureState.COMPLETE)
[docs] class ProcessManager: """ ProcessManager tracks and coordinates ScriptWorker processes. ProcessManager is responsible for tracking script execution state and coordinating with main_loop via pypubsub messages. Actual process creation and management is handled by main_loop in main.py. ProcessManager uses pypubsub to send management requests (create, run, stop) to main_loop, which creates and manages the actual ScriptWorker processes. Note: ProcessManager does not maintain a history of script execution. History is recorded and managed by the ScriptExecutionService. """ _DELETEABLE_STATES = [ ProcedureState.COMPLETE, ProcedureState.FAILED, ProcedureState.STOPPED, ProcedureState.UNKNOWN, ]
[docs] def __init__( self, states: Optional[dict[int, ProcedureState]] = None, state_lock: Optional[threading.RLock] = None, ) -> None: """ Create a new ProcessManager. :param states: Optional external states dictionary to use instead of creating an internal one. This allows sharing state with other components (e.g., ScriptExecutionService) to avoid race conditions. When external states are provided, this manager will NOT subscribe to statechange events as the external owner is responsible for updates. :param state_lock: Optional external lock for state updates. Should be provided if states is provided to ensure thread-safe updates. """ # counter used to generate process ID for new processes self._pid_counter = itertools.count(1) # Track if we're using external states (to control subscription behavior) self._using_external_states = states is not None # maps Proc ID to current state - use external dict if provided self.states: dict[int, ProcedureState] = states if states is not None else {} # Use external lock if provided, otherwise create our own self._state_updating = ( state_lock if state_lock is not None else threading.RLock() ) # Partial function for sending pubsub messages with msg_src=None, # which signals to EventBusWorker.republish to use the worker's name # as the source, ensuring the message is queued correctly self._send_message = functools.partial(pub.sendMessage, msg_src=None) # Subscribe to management responses for async failure handling pub.subscribe(self._on_created, "procedure.management.created") # Subscribe to lifecycle events to keep state in sync ONLY if we own states # When using external states, the owner (e.g., ScriptExecutionService) # is responsible for updating the shared dict to avoid subscription order issues if not self._using_external_states: pub.subscribe(self._on_statechange, "procedure.lifecycle.statechange")
def _on_created( # pylint: disable=unused-argument self, msg_src: str, pid: int, success: bool, error: str | None = None ): """ Handle procedure.management.created response. Updates state to FAILED if creation was unsuccessful. :param msg_src: message source :param pid: PID from the create request :param success: whether creation succeeded :param error: error message if failed """ if not success: LOGGER.warning("ScriptWorker #%s creation failed: %s", pid, error) with self._state_updating: self.states[pid] = ProcedureState.FAILED def _on_statechange(self, msg_src: str, new_state: ProcedureState): """ Handle procedure.lifecycle.statechange events. Updates internal state tracking and cleans up resources for terminal states. :param msg_src: PID of the procedure :param new_state: new procedure state """ try: pid = int(msg_src) except (ValueError, TypeError): return with self._state_updating: self.states[pid] = new_state # Clean up state for terminal states if new_state in self._DELETEABLE_STATES: self.states.pop(pid, None)
[docs] def get_state(self, pid: int) -> ProcedureState | None: """ Get the current state of a procedure. :param pid: Procedure ID :return: Current state or None if not found """ return self.states.get(pid)
[docs] def has_running(self) -> bool: """ Check if any procedure is currently running. :return: True if any procedure is in RUNNING state """ return any(state == ProcedureState.RUNNING for state in self.states.values())
[docs] def create(self, script: ExecutableScript, *, init_args: ProcedureInput) -> int: """ Request creation of a new ScriptWorker process. This method emits a procedure.management.create pypubsub message, which main_loop handles to create the actual process. The method returns immediately with state=CREATING. :param script: script to execute :param init_args: script initialisation arguments :return: PID assigned to this procedure """ pid = next(self._pid_counter) LOGGER.debug("Requesting ScriptWorker #%s for %s", pid, script) # Register initial state self.states[pid] = ProcedureState.CREATING # Emit management request via pypubsub # Note: environment creation is handled by main_loop self._send_message( "procedure.management.create", pid=pid, script=script, init_args=init_args, ) return pid
[docs] def run( self, process_id: int, *, call: str, run_args: ProcedureInput, force_start: bool = False, ) -> None: """ Request execution of a function in a prepared procedure. This method emits a procedure.management.run pypubsub message, which main_loop handles to send the run command to the ScriptWorker. :param process_id: ID of Procedure to execute :param call: name of function to call :param run_args: late-binding arguments to provide to the script :param force_start: Add run command even if not READY (ignored for terminal states) """ if process_id not in self.states: raise ValueError(f"PID #{process_id} not found") current_state = self.states[process_id] # These are states where the Procedure cannot be run final_states = [ ProcedureState.COMPLETE, ProcedureState.FAILED, ProcedureState.STOPPED, ProcedureState.UNKNOWN, ] if current_state in final_states: raise ValueError(f"PID #{process_id} unrunnable in state {current_state}") # These are intermediate states where the script is not ready to run # (unless force_start is True) intermediate_states = [ ProcedureState.CREATING, ProcedureState.LOADING, ProcedureState.INITIALISING, ] if not force_start and current_state in intermediate_states: raise ValueError( f"PID #{process_id} not ready to run in state {current_state}" ) # Check if already running (unless forcing) if not force_start and current_state == ProcedureState.RUNNING: raise ValueError(f"PID #{process_id} is already RUNNING") # Ensure no other script is running if self.has_running(): running_pid = next( pid for pid, state in self.states.items() if state == ProcedureState.RUNNING ) raise ValueError( f"Cannot start PID {process_id}: PID #{running_pid} is RUNNING" ) LOGGER.debug("Requesting 'run %s' for PID %d", call, process_id) # Emit management request via pypubsub self._send_message( "procedure.management.run", pid=process_id, fn_name=call, run_args=run_args, )
[docs] def stop(self, process_id: int) -> None: """ Request termination of a running procedure. This method emits a procedure.management.stop pypubsub message, which main_loop handles to terminate the ScriptWorker. :param process_id: ID of Procedure to stop """ if process_id not in self.states: raise ValueError(f"Process {process_id} not found") state = self.states[process_id] stoppable_states = [ ProcedureState.IDLE, ProcedureState.INITIALISING, ProcedureState.READY, ProcedureState.RUNNING, ProcedureState.LOADING, ] if state not in stoppable_states: raise ValueError(f"Cannot stop PID {process_id} with state {state.name}") LOGGER.debug("Requesting stop for PID %d", process_id) self._send_message( "procedure.management.stop", pid=process_id, )
[docs] class ModuleFactory: """ Factory class used to return Python Module instances from a variety of storage back-ends. """
[docs] @staticmethod def get_module(script: ExecutableScript): """ Load Python code from storage, returning an executable Python module. :param script: Script object describing the script to load :return: Python module """ if isinstance(script, GitScript): loader = ModuleFactory._load_module_from_git return loader(script) if isinstance(script, FileSystemScript): loader = ModuleFactory._load_module_from_file return loader(script.script_uri) raise ValueError(f"Script type not handled: {script.__class__.__name__}")
@staticmethod def _load_module_from_file(script_uri: str) -> types.ModuleType: """ Load Python module from file storage. This module handles file:// and git:// URIs. :param script_uri: URI of script to load. :return: Python module """ # remove prefix path = script_uri[7:] loader = importlib.machinery.SourceFileLoader("user_module", path) user_module = types.ModuleType(loader.name) loader.exec_module(user_module) return user_module @staticmethod def _load_module_from_git(script: GitScript) -> types.ModuleType: """ Load Python module from a git repository. Clones the repository if repo has not yet been cloned. The repository will not have been cloned if default environment is being used. This module handles git:// URIs. :param script: GitScript object with information on script location :return: Python module """ clone_path = GitManager.clone_repo(script.git_args) # remove prefix and any leading slashes relative_script_path = script.script_uri[len(script.get_prefix()) :].lstrip("/") script_path = clone_path + "/" + relative_script_path loader = importlib.machinery.SourceFileLoader("user_module", script_path) user_module = types.ModuleType(loader.name) loader.exec_module(user_module) return user_module