ska_oso_oet.mptools

Top-level package for Multiprocessing Tools.

This package is substantially based on Pamela D McA’Nulty’s mptools project, which is hosted at

Pamela presents an excellent article given an overview of the MPTools package at


MPTools is subject to the MIT licence.

MIT License

Copyright (c) 2019, Pamela D McA’Nulty

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

class ska_oso_oet.mptools.EventMessage(msg_src: str, msg_type: str, msg: Any)[source]

EventMessage holds the message and message metadata for events sent on the event queue between MPTools ProcWorkers.

__init__(msg_src: str, msg_type: str, msg: Any)[source]
class ska_oso_oet.mptools.MPQueue(maxsize=0, *, ctx)[source]

MPQueue is a multiprocessing Queue extended with convenience methods that return booleans to reflect success and failure rather than raising exceptions.

MPQueue adds methods to:
  • get next item in an exception-free manner

  • put an item in an exception-free manner

  • drain queue to allow safe closure

  • close queue in an exception-free manner

__init__(maxsize=0, *, ctx)[source]
drain()[source]

Drain all items from this MPQueue, yielding each item until all items have been removed.

safe_close() int[source]

Drain and close this MPQueue.

No more items can be added to this MPQueue one safe_close has been called.

safe_get(timeout: float | None = 0.02)[source]

Remove and return an item from this MPQueue.

If optional arg timeout is None, safe_get returns an item if one is immediately available. If optional arg timeout is a positive number (the default), safe_get blocks at most timeout seconds for an item to become available. In either case, None is returned if no item is available.

Parameters:

timeout – maximum timeout in seconds, or None for no waiting period

Returns:

None if no item is available

safe_put(item, timeout: float | None = 0.02) bool[source]

Put an item on this MPQueue.

safe_put adds an item onto the queue if a free slot is available, blocking at most timeout seconds for a free slot and returning False if no free slot was available within that time.

Parameters:
  • item – item to add

  • timeout – timeout in seconds

Returns:

True if the operation succeeded within the timeout

class ska_oso_oet.mptools.MainContext(mp_ctx: BaseContext | None = None)[source]

MainContext is the parent context for a set of worker processes that communicate via message queues.

MPQueue(*args, **kwargs) MPQueue[source]

Create a new message queue managed by this context.

Parameters:
  • args – queue constructor args

  • kwargs – queue constructor kwargs

Returns:

message queue instance

Proc(name: str, worker_class: Type[ProcWorker], *args, **kwargs) Proc[source]

Create a new process managed by this context.

Parameters:
  • name – name for worker process

  • worker_class – worker process class

  • args – any worker class constructor args

  • kwargs – any worker class constructor kwargs

Returns:

worker instance

__init__(mp_ctx: BaseContext | None = None)[source]
stop_procs() Tuple[int, int][source]

Stop all ProcWorkers managed by this MPContext.

stop_procs requests cooperative shutdown of running ProcWorkers before escalating to more forceful methods using POSIX signals.

This function returns with a 2-tuple, the first item indicating the number of ProcWorkers that returned a non-zero exit status on termination, the second item indicating the number of ProcWorkers that required termination.

Returns:

tuple of process termination stats

stop_queues() int[source]

Drain all queues, blocking until they have stopped.

Returns:

number of items drained

class ska_oso_oet.mptools.Proc(mp: BaseContext, name: str, worker_class: Type[ProcWorker], shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]

Proc represents a child process of a MainContext.

Proc instances exist in the scope of a MainContext instance and in the same Python interpreter process as the MainContext. Procs are the MainContext’s link to the ProcWorkers which run in separate Python interpreters. Every ProcWorker running in a child process is associated with one Proc.

Each Proc is responsible for bootstrapping its ProcWorker and managing its lifecycle. Proc arranges for an instance of the ProcWorker class passed as a constructor argument to be initialised and start running in a new child Python interpreter. Proc checks that the ProcWorker has started successfully by checking the status of a multiprocessing Event passed to the ProcWorker as a constructor argument, which should be set by the ProcWorker on successful startup. If ProcWorker startup does not complete successfully and the event is left unset, Proc will forcibly terminate the child process and report the error.

Proc is able to terminate its associated ProcWorker, first by giving the ProcWorker chance to co-operatively exit by setting the shutdown event. If the ProcWorker does not respond by exiting within the grace period set by Proc.SHUTDOWN_WAIT_SECS, Proc will forcibly terminate the ProcWorker’s process.

Proc ensures that the shutdown event and MPQueues it receives are passed through to the ProcWorker. Note that by default only one shutdown event is created by the MainContext, so setting the shutdown event triggers shutdown in all ProcWorkers!

Proc does not contain any business logic or application-specific code, which should be contained in the ProcWorker - or more likely, a class that extends ProcWorker.

__init__(mp: BaseContext, name: str, worker_class: Type[ProcWorker], shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]
full_stop(wait_time=3.0) None[source]

Stop the ProcWorker child process.

The method will attempt to terminate ProcWorker execution, first by setting the shutdown event and giving the ProcWorker opportunity to cleanly exit. If the ProcWorker has not terminated after wait_time seconds, SIGTERM signals are sent to the child process hosting the ProcWorker.

Parameters:

wait_time – grace time before sending SIGTERM signals

terminate(max_retries=3, timeout=0.1) bool[source]

Terminate the child process using POSIX signals.

This function sends SIGTERM to the child process, waiting timeout seconds before checking process status and, if the process is still alive, trying again.

Parameters:
  • max_retries – max retry attempts

  • timeout – second to wait before retry

Returns:

True if process termination was successful

class ska_oso_oet.mptools.ProcWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]

ProcWorker is a template class for code that should execute in a child Python interpreter process.

ProcWorker contains the standard boilerplate code required to set up a well-behaved child process. It handles starting the process, connecting signal handlers, signalling the parent that startup completed, etc. ProcWorker does not contain any business logic, which should be defined in a subclass of ProcWorker.

The core ProcWorker template method is main_loop, which is called once startup is complete and main execution begins. In ProcWorker this method is left blank and should be overridden by the class extending ProcWorker. Once the main_loop method is complete, the ProcWorker is shut down.

MPTools provides some ProcWorker subclasses with main_loop implementations that provide different kinds of behaviour. For instance,

  • TimerProcWorker.main_loop has code calls a function on a fixed cadence;

  • QueueProcWorker.main_loop has code that gets items from a queue, calling a function with every item received.

__init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]

Create a new ProcWorker.

Parameters:
  • name – name of this worker

  • startup_event – event to set on startup completion

  • shutdown_event – event to monitor for shutdown

  • event_q – queue for messages to/from MainWorker

  • args

init_signals() SignalObject[source]

Initialise signal handlers for this worker process.

Calling this method will install SIGTERM and SIGINT signal handlers for the running process.

static int_handler(signal_object: SignalObject, exception_class, signal_num: int, current_stack_frame: frame | None) None

Custom signal handling function that requests co-operative ProcWorker shutdown by setting the shared Event, forcibly terminating the process by raising an instance of the given exception class if call limit has been exceeded.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num – POSIX signal ID

  • current_stack_frame – current stack frame

run() int[source]

Start ProcWorker execution.

This method performs the housekeeping required to set the worker instance running and starts the main loop. An exit code of 0 is returned if the main loop completes and exits cleanly.

Returns:

exit status code

static term_handler(signal_object: SignalObject, exception_class, signal_num: int, current_stack_frame: frame | None) None

Custom signal handling function that requests co-operative ProcWorker shutdown by setting the shared Event, forcibly terminating the process by raising an instance of the given exception class if call limit has been exceeded.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num – POSIX signal ID

  • current_stack_frame – current stack frame

class ska_oso_oet.mptools.QueueProcWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, **kwargs)[source]

QueueProcWorker is a ProcWorker that calls main_func with every item received on its work queue.

__init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, **kwargs)[source]

Create a new QueueProcWorker.

The events and MPQueues passed to this constructor should be created and managed within the scope of a MainContext context manager and shared with other ProcWorkers, so that the communication queues are shared correctly between Python processes and there is a common event that can be set to notify all processes when shutdown is required.

Parameters:
  • name – name of this worker

  • startup_event – event to trigger when startup is complete

  • shutdown_event – event to monitor for shutdown

  • event_q – outbox for posting messages to main context

  • work_q – inbox message queue for work messages

  • args – captures other anonymous arguments

  • kwargs – captures other keyword arguments

main_loop() None[source]

main_loop delivers each event received on the work queue to the main_func template method, while checking for shutdown notifications.

Event delivery will cease when the shutdown event is set or a special sentinel message is sent.

class ska_oso_oet.mptools.SignalObject(shutdown_event: Event)[source]

SignalObject is a struct holding properties and state referenced by mptools signal handlers during their processing.

Setting the SignalObject.shutdown_event will request all MPTools processes cooperatively shut down. SignalObject also records how many times a signal has been received, allowing escalation for processes that do not co-operate with shutdown_event requests.

__init__(shutdown_event: Event)[source]

Create a new SignalObject.

Parameters:

shutdown_event – shutdown Event shared between all MPTools processes

exception ska_oso_oet.mptools.TerminateInterrupt[source]
class ska_oso_oet.mptools.TimerProcWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, *args, logging_config: dict | None = None, **kwargs)[source]

TimerProcWorker is a ProcWorker that calls main_func on a fixed cadence.

ska_oso_oet.mptools.default_signal_handler(signal_object: SignalObject, exception_class, signal_num: int, current_stack_frame: frame | None) None[source]

Custom signal handling function that requests co-operative ProcWorker shutdown by setting the shared Event, forcibly terminating the process by raising an instance of the given exception class if call limit has been exceeded.

Parameters:
  • signal_object – SignalObject to modify to reflect signal-handling state

  • exception_class – Exception type to raise when call limit is exceeded

  • signal_num – POSIX signal ID

  • current_stack_frame – current stack frame

ska_oso_oet.mptools.init_signals(shutdown_event, int_handler, term_handler) SignalObject[source]

Install SIGINT and SIGTERM signal handlers for the running Python process.

This function returns the SignalObject shared with signal handlers that the handlers use to store signal handling state.

Parameters:
  • shutdown_event – Event to set when SIGINT or SIGTERM is received

  • int_handler – SIGINT handler function to install

  • term_handler – SIGTERM handler function to install

Returns:

SignalObject processed by signal handlers

ska_oso_oet.mptools.proc_worker_wrapper(proc_worker_class: Type[ProcWorker], name: str, startup_evt: Event, shutdown_evt: Event, event_q: MPQueue, *args, **kwargs)[source]

This function is called to launch the worker task from within the child process.

Parameters:
  • proc_worker_class – worker class to instantiate

  • name – name for this ProcWorker

  • startup_evt – start-up event to share with worker

  • shutdown_evt – shutdown event to share with worker

  • event_q – event queue to share with worker

  • args – any additional arguments to give to worker constructor

Returns: