# pylint: disable=attribute-defined-outside-init
import logging
import logging.config
import logging.handlers
import multiprocessing
import os
import threading
import traceback
import ska_ser_logging
import uvicorn
from pubsub import pub
from ska_aaa_authhelpers import AuditLogFilter
from ska_ser_logging import configure_logging
from ska_oso_oet import ui
from ska_oso_oet.activity.application import ActivityCommand, ActivityService
from ska_oso_oet.event import topics
from ska_oso_oet.mptools import (
EventMessage,
MainContext,
MPQueue,
Proc,
QueueProcWorker,
default_signal_handler,
init_signals,
)
from ska_oso_oet.procedure.application import (
PrepareProcessCommand,
ProcedureSummary,
ScriptExecutionService,
StartProcessCommand,
StopProcessCommand,
)
from ska_oso_oet.procedure.domain import (
PROC_TERMINATE_TIMEOUT_SECS,
GitScript,
ProcedureInput,
ProcedureState,
ScriptWorker,
)
from ska_oso_oet.procedure.environment import EnvironmentManager
from ska_oso_oet.utils.ui import API_PATH
[docs]
class EventBusWorker(QueueProcWorker):
"""
EventBusWorker converts external inter-process pub/sub messages to and
from local intra-process pubsub messages.
EventBusWorker uses the QueueProcWorker's 'work queue' as an inbox for
pub/sub EventMessages sent by other ProcWorkers. EventMessages received
on this queue are rebroadcast locally as pypubsub messages. Likewise, the
EventBusWorker listens to all pypubsub messages broadcast locally,
converts them to pub/sub EventQueue messages, and puts them on the 'main'
queue for transmission to other EventBusWorkers.
"""
[docs]
def republish(self, topic: pub.Topic = pub.AUTO_TOPIC, **kwargs) -> None:
"""
Republish a local event over the inter-process event bus.
:param topic: message topic, set automatically by pypubsub
:param kwargs: any metadata associated with pypubsub message
:return:
"""
# No message source = virgin event published on pypubsub
if (msg_src := kwargs.pop("msg_src", self.name)) is None:
msg_src = self.name
# To avoid an infinite loop, do not republish external events.
# Only local messages (message source = this process), are sent
# out to the main queue and hence on to other EventBusWorkers
if msg_src == self.name:
# Convert pypubsub event to the equivalent mptools EventMessage
msg = EventMessage(
self.name, "PUBSUB", dict(topic=topic.name, kwargs=kwargs)
)
# not that this is a blocking put. If the queue is full, this call
# will block until the queue has room to accept the message
self.log(logging.DEBUG, "Queueing internal event: %s", msg)
self.event_q.put(msg)
[docs]
def startup(self) -> None:
"""
Connect republishing function to pypubsub.
"""
super().startup()
# AT2-591. Clear any subscriptions inherited from parent process during fork
unsubscribed = pub.unsubAll()
self.log(
logging.DEBUG,
"Unsubscribed %s pypubsub subscriptions in Procedure #%s (PID=%s)",
len(unsubscribed),
self.name,
os.getpid(),
)
# Request republish method be called for all pypubsub messages
pub.subscribe(self.republish, pub.ALL_TOPICS)
[docs]
def shutdown(self) -> None:
"""
Disconnect republishing function from pypubsub
"""
super().shutdown()
# Technically, unsubscribing is unnecessary as pypubsub holds weak
# references to listeners and automatically unsubscribes listeners
# that have been deleted
pub.unsubscribe(self.republish, pub.ALL_TOPICS)
# relax pylint to ignore renaming of item to evt. The base class handles
# items of any type. We want to constrain this subclass to a more specific
# event type.
[docs]
def main_func(self, evt: EventMessage) -> None: # pylint: disable=arguments-renamed
"""
Republish external pub/sub message locally.
QueueProcWorker ensures that main_func is called for every item in the
work queue. This function takes that work item - the external pub/sub
EventMessage - and rebroadcasts it locally as a pypubsub message.
:param evt: pub/sub EventMessage to broadcast locally
"""
# avoid infinite loop - do not reprocess events that originated from us
if evt.msg_src != self.name:
self.log(logging.DEBUG, "Republishing external event: %s", evt)
payload = evt.msg
topic = payload["topic"]
pub.sendMessage(topic, msg_src=evt.msg_src, **payload["kwargs"])
else:
self.log(logging.DEBUG, "Discarding internal event: %s", evt)
def send_message(self, topic, **kwargs):
pub.sendMessage(topic, msg_src=self.name, **kwargs)
[docs]
class FastAPIWorker(EventBusWorker):
"""
FastAPIWorker is an EventBusWorker that runs a FastAPI app.
By extending EventBusWorker, FastAPI functions can use pypubsub to subscribe
to and publish messages, and these messages will put on the main queue to
be broadcast to other EventBusWorkers.
"""
[docs]
def startup(self) -> None:
# Call super.startup to enable pypubsub <-> event queue republishing
super().startup()
server_log_level = os.getenv("LOG_LEVEL", "ERROR")
self.app = ui.create_fastapi_app()
config = uvicorn.Config(
app=self.app,
host="0.0.0.0",
port=5000,
log_config=None, # allows config set in main() to be inherited
log_level=server_log_level.lower(),
)
self.server = uvicorn.Server(config=config)
# override default msg_src with our real process name
self.app.state.msg_src = self.name
self.app.state.sse_shutdown_event = threading.Event()
# start FastAPI in a thread as app.run is a blocking call
# Can't be created in __init__ as we want this thread to belong to
# the child process, not the spawning process
self.server_thread = threading.Thread(target=self.server.run)
self.server_thread.start()
logging.info("OET listening for REST API requests at %s", API_PATH)
[docs]
def shutdown(self) -> None:
self.app.state.sse_shutdown_event.set()
self.server.should_exit = True
self.server_thread.join(timeout=3)
# Call super.shutdown to disconnect from pypubsub
super().shutdown()
[docs]
class ScriptExecutionServiceWorker(EventBusWorker):
"""
ScriptExecutionService listens for user request messages, calling the
appropriate ScriptExecutionService function and broadcasting its response.
Actions that occur in the user request domain ('user clicked start
observation', 'user aborted observation using the CLI', etc.) are
broadcast as events. ScriptExecutionServiceWorker listens for events on
these topics and triggers the required action in the script execution
domain ('start a script', 'abort a script', etc.).
Currently, the result of the action that occurred in the script execution
domain (=the return object from the ScriptExecutionService) is broadcast
to the world by the ScriptExecutionServiceWorker. This could change so
that the ScriptExecutionService itself sends the message.
"""
[docs]
def __init__(
self,
name: str,
startup_event: multiprocessing.Event,
shutdown_event: multiprocessing.Event,
event_q: MPQueue,
work_q: MPQueue,
mp_context: multiprocessing.context.BaseContext,
*args,
**kwargs,
):
super().__init__(
name, startup_event, shutdown_event, event_q, work_q, *args, **kwargs
)
self._mp_context = mp_context
def prepare(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: str,
cmd: PrepareProcessCommand,
):
self.log(logging.DEBUG, "Prepare procedure request %s: %s", request_id, cmd)
try:
summary = self.ses.prepare(cmd)
# Catch all exceptions so that they can be properly displayed by the rest server
except Exception as e: # pylint: disable=broad-except
self.log(logging.INFO, "Prepare procedure %s failed: %s", request_id, e)
# TODO create failure topic for failures in procedure domain
self.send_message(
topics.procedure.lifecycle.created, request_id=request_id, result=e
)
else:
self.log(
logging.DEBUG, "Prepare procedure %s result: %s", request_id, summary
)
self.send_message(
topics.procedure.lifecycle.created,
request_id=request_id,
result=summary,
)
def start(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: str,
cmd: StartProcessCommand,
):
try:
self.log(logging.DEBUG, "Start procedure request %s: %s", request_id, cmd)
summary = self.ses.start(cmd)
except Exception as e: # pylint: disable=broad-except
self.log(logging.INFO, "Start procedure %s failed: %s", request_id, e)
# TODO create failure topic for failures in procedure domain
self.send_message(
topics.procedure.lifecycle.started, request_id=request_id, result=e
)
else:
self.log(
logging.DEBUG, "Start procedure %s result: %s", request_id, summary
)
self.send_message(
topics.procedure.lifecycle.started,
request_id=request_id,
result=summary,
)
def list(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: str,
pids=None,
):
self.log(logging.DEBUG, "List procedures for request %s", request_id)
try:
summaries = self.ses.summarise(pids)
except ValueError:
# ValueError raised when PID not found.
summaries = []
self.log(logging.DEBUG, "List result: %s", summaries)
self.send_message(
topics.procedure.pool.list, request_id=request_id, result=summaries
)
def stop(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: str,
cmd: StopProcessCommand,
):
self.log(logging.DEBUG, "Stop procedure request %s: %s", request_id, cmd)
try:
summary = self.ses.stop(cmd)
except FileNotFoundError as e:
# FileNotFoundError raised when abort.py script not found
self.log(logging.INFO, "Stop procedure %s failed: %s", request_id, e)
# TODO create failure topic for failures in procedure domain
# (or refactor abortion script creation so that FileNotFound
# is caught only once in prepare)
self.send_message(
topics.procedure.lifecycle.stopped, request_id=request_id, result=e
)
else:
self.log(logging.DEBUG, "Stop result: %s", summary)
self.send_message(
topics.procedure.lifecycle.stopped,
request_id=request_id,
result=summary,
)
def get_override_state(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: str,
):
self.log(logging.DEBUG, "Get override state request %s", request_id)
self.send_message(
topics.operator.override.state,
request_id=request_id,
result=self.ses.script_context,
)
[docs]
def startup(self) -> None:
super().startup()
# self.ses can't be created in __init__ as we want the service to belong to
# the child process, not the spawning process
self.ses = ScriptExecutionService()
# wire up topics to the corresponding SES methods
pub.subscribe(self.prepare, topics.request.procedure.create)
pub.subscribe(self.start, topics.request.procedure.start)
pub.subscribe(self.list, topics.request.procedure.list)
pub.subscribe(self.stop, topics.request.procedure.stop)
# wire up wait_for_qa_ready override handlers
pub.subscribe(
self.ses.handle_wait_for_qa_ready_enable,
topics.operator.wait_for_qa_ready.enable,
)
pub.subscribe(
self.ses.handle_wait_for_qa_ready_disable,
topics.operator.wait_for_qa_ready.disable,
)
pub.subscribe(
self.get_override_state,
topics.request.operator.override.state,
)
[docs]
def shutdown(self) -> None:
# unsubscribing isn't technically required as pypubsub uses weak
# references to clients, but for completeness we do it anyway.
pub.unsubscribe(self.prepare, pub.ALL_TOPICS)
pub.unsubscribe(self.start, pub.ALL_TOPICS)
pub.unsubscribe(self.list, pub.ALL_TOPICS)
pub.unsubscribe(self.stop, pub.ALL_TOPICS)
pub.unsubscribe(self.get_override_state, pub.ALL_TOPICS)
pub.unsubscribe(self.ses.handle_wait_for_qa_ready_enable, pub.ALL_TOPICS)
pub.unsubscribe(self.ses.handle_wait_for_qa_ready_disable, pub.ALL_TOPICS)
super().shutdown()
[docs]
class ActivityServiceWorker(EventBusWorker):
"""
ActivityServiceWorker listens for user request messages, calling the
appropriate ActivityService function and broadcasting its response.
"""
[docs]
def __init__(
self,
name: str,
startup_event: multiprocessing.Event,
shutdown_event: multiprocessing.Event,
event_q: MPQueue,
work_q: MPQueue,
mp_context: multiprocessing.context.BaseContext,
*args,
**kwargs,
):
super().__init__(
name, startup_event, shutdown_event, event_q, work_q, *args, **kwargs
)
self._mp_context = mp_context
[docs]
def startup(self) -> None:
super().startup()
# self.activity_service can't be created in __init__ as we want the service to belong to
# the child process, not the spawning process
self.activity_service = ActivityService()
# wire up topics to the corresponding ActivityService methods
pub.subscribe(self.list, topics.request.activity.list)
pub.subscribe(self.prepare, topics.request.activity.run)
pub.subscribe(self.on_procedure_created, topics.procedure.lifecycle.created)
pub.subscribe(self.on_procedure_ready, topics.procedure.lifecycle.ready)
[docs]
def shutdown(self) -> None:
pub.unsubscribe(self.list, topics.request.activity.list)
pub.unsubscribe(self.prepare, topics.request.activity.run)
pub.unsubscribe(self.on_procedure_created, topics.procedure.lifecycle.created)
pub.unsubscribe(self.on_procedure_ready, topics.procedure.lifecycle.ready)
# TODO ActivityService doesn't have same shutdown method SES does?
super().shutdown()
def list(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: int,
activity_ids=None,
):
self.log(logging.DEBUG, "List activities for request %s", request_id)
try:
summaries = self.activity_service.summarise(activity_ids)
except ValueError:
# ValueError raised when Activity ID not found.
summaries = []
self.log(logging.DEBUG, "Activity List result: %s", summaries)
self.send_message(
topics.activity.pool.list, request_id=request_id, result=summaries
)
def prepare(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: int,
cmd: ActivityCommand,
):
try:
self.log(
logging.DEBUG, "Preparing activity for request %s: %s", request_id, cmd
)
self.activity_service.prepare_run_activity(cmd, request_id)
except Exception as e: # pylint: disable=broad-except
self.log(
logging.ERROR,
"Preparing activity for request %s failed: %s",
request_id,
e,
)
# TODO create failure topic for failures in activity domain
self.send_message(
topics.activity.lifecycle.running, request_id=request_id, result=e
)
[docs]
def on_procedure_created(
self,
# msg_src MUST be part of method signature for pypubsub to function
msg_src, # pylint: disable=unused-argument
request_id: int,
result: ProcedureSummary,
):
"""
Handle procedure.lifecycle.created by linking the Procedure to its
requesting Activity.
The "created" event fires at the LOADING->IDLE transition. At this
point the OS subprocess has been spawned and the user module has been
loaded into it (imported), but no user-defined functions (init, main)
have been called yet. This handler records the procedure ID on the
Activity so that later lifecycle events (e.g., READY) can be correlated
back to the correct Activity.
"""
try:
self.log(
logging.DEBUG,
"Linking procedure to activity for request %s: %s",
request_id,
result,
)
summary = self.activity_service.link_procedure_to_activity(
result, request_id
)
except Exception as e: # pylint: disable=broad-except
self.log(
logging.ERROR,
"Linking procedure to activity for request %s failed: %s",
request_id,
e,
)
# TODO create failure topic for failures in activity domain
self.send_message(
topics.activity.lifecycle.running, request_id=request_id, result=e
)
else:
if summary is not None:
self.log(
logging.DEBUG, "Activity request %s result: %s", request_id, summary
)
self.send_message(
topics.activity.lifecycle.running,
request_id=request_id,
result=summary,
)
[docs]
def on_procedure_ready(
self,
msg_src,
request_id, # pylint: disable=unused-argument
result: ProcedureSummary,
):
"""
Handle procedure.lifecycle.ready by dispatching RUN(main) for the
linked Activity.
The "ready" event fires at the INITIALISING->READY transition, after
the user-defined init() function has returned. At this point the
subprocess is fully prepared: the user module is loaded and init has
completed, so it is safe to call main() without colliding with a
still-running init. This handler is skipped for prepare-only activities
and for activities where main has already been dispatched.
"""
try:
self.log(
logging.DEBUG,
"Starting activity for procedure %s: %s",
msg_src,
result,
)
self.activity_service.dispatch_main(result)
except Exception as e: # pylint: disable=broad-except
self.log(
logging.ERROR,
"Starting activity for procedure %s failed: %s",
msg_src,
e,
)
[docs]
class ScriptWorkerPool:
"""
Manages the lifecycle of ScriptWorker processes.
This pool owns the queues and process references for all running
ScriptWorkers, handling creation, messaging, and cleanup.
"""
[docs]
def __init__(
self,
main_ctx: MainContext,
env_manager: EnvironmentManager,
):
self.main_ctx = main_ctx
self.env_manager = env_manager
self._queues: dict[int, MPQueue] = {}
self._procs: dict[int, Proc] = {}
[docs]
def create_worker(
self,
pid: int,
script: GitScript,
init_args: ProcedureInput,
) -> None:
"""
Create a new ScriptWorker process.
:param pid: Process ID for the worker
:param script: Script to execute
:param init_args: Initialization arguments for the script
"""
self.main_ctx.log(logging.DEBUG, f"Creating ScriptWorker #{pid} for {script}")
# Create environment if needed (done in main process for Event inheritance)
environment = None
if isinstance(script, GitScript) and script.create_env:
environment = self.env_manager.create_env(script.git_args)
msg_src = "main_loop"
try:
# Create work queue for this script
work_q = self.main_ctx.MPQueue()
# Prime the work queue with ENV, LOAD, RUN messages
# ENV message (if environment is needed)
if environment is not None:
env_msg = EventMessage(msg_src=msg_src, msg_type="ENV", msg=script)
work_q.safe_put(env_msg)
# LOAD message
load_msg = EventMessage(msg_src=msg_src, msg_type="LOAD", msg=script)
work_q.safe_put(load_msg)
# RUN message for init function
init_msg = EventMessage(msg_src=msg_src, msg_type="RUN", msg=("init", None))
work_q.safe_put(init_msg)
# Create the ScriptWorker process
proc = self.main_ctx.Proc(
str(pid),
ScriptWorker,
work_q,
*init_args.args,
environment=environment,
**init_args.kwargs,
)
# Store references
self._queues[pid] = work_q
self._procs[pid] = proc
# Send success response
self._queue_pubsub(
msg_src=msg_src,
topic="procedure.management.created",
pid=pid,
success=True,
error=None,
)
except Exception: # pylint: disable=broad-exception-caught
# Send failure response for any error during ScriptWorker creation
error_msg = traceback.format_exc()
self.main_ctx.log(
logging.ERROR, f"Failed to create ScriptWorker #{pid}: {error_msg}"
)
self._queue_pubsub(
msg_src=msg_src,
topic="procedure.management.created",
pid=pid,
success=False,
error=error_msg,
)
[docs]
def send_run(self, pid: int, fn_name: str, run_args: ProcedureInput) -> None:
"""
Send a RUN message to a ScriptWorker.
:param pid: Process ID of the worker
:param fn_name: Function name to run
:param run_args: Arguments for the function
"""
self.main_ctx.log(
logging.DEBUG, f"Sending 'run {fn_name}' message to PID {pid}"
)
if pid not in self._queues:
self.main_ctx.log(logging.ERROR, f"PID {pid} not found in script_queues")
return
work_q = self._queues[pid]
msg = EventMessage(
msg_src="main_loop",
msg_type="RUN",
msg=(fn_name, run_args),
)
work_q.safe_put(msg)
[docs]
def broadcast_to_workers(self, msg: EventMessage) -> None:
"""
Forward a message to all active ScriptWorker work queues.
:param msg: EventMessage to forward
"""
for work_q in self._queues.values():
work_q.safe_put(msg)
[docs]
def stop_worker(self, pid: int) -> None:
"""
Terminate a ScriptWorker and publish state change.
:param pid: Process ID of the worker to stop
"""
self.main_ctx.log(logging.DEBUG, f"Stopping ScriptWorker #{pid}")
if pid not in self._procs:
self.main_ctx.log(logging.ERROR, f"PID {pid} not found in script_procs")
return
proc = self._procs[pid]
if proc.proc.is_alive():
terminated = proc.terminate(
max_retries=3, timeout=PROC_TERMINATE_TIMEOUT_SECS
)
final_state = (
ProcedureState.STOPPED if terminated else ProcedureState.UNKNOWN
)
# Publish state change via event queue
# msg_src must be the PID for ScriptExecutionService._on_statechange
self._queue_pubsub(
msg_src=str(pid),
topic="procedure.lifecycle.statechange",
new_state=final_state,
)
# Clean up zombie processes
multiprocessing.active_children()
# Only cleanup if termination succeeded, otherwise we clean up
# resources for an unresponsive process that may still be running.
# The orphaned process should be cleaned up during normal shutdown.
if terminated:
self.cleanup_worker(pid)
else:
# Process already dead, safe to cleanup
self.cleanup_worker(pid)
[docs]
def cleanup_worker(self, pid: int) -> None:
"""
Remove worker tracking (called on SHUTDOWN/FATAL).
:param pid: Process ID of the worker to clean up
"""
if queue := self._queues.pop(pid, None):
self.main_ctx.stop_queues([queue])
if proc := self._procs.pop(pid, None):
self.main_ctx.remove_proc(proc)
[docs]
def handle_fatal(self, msg_src: str, error_msg: str) -> None:
"""
Handle a ScriptWorker crash - publish FAILED state and stacktrace.
:param msg_src: Source of the fatal event (PID as string)
:param error_msg: Error message/stacktrace from the crashed worker
"""
self.main_ctx.log(logging.INFO, f"Fatal Event received: {error_msg}")
try:
pid = int(msg_src)
except (ValueError, TypeError):
return # Not a numeric PID, ignore
# Publish FAILED state change via event queue
# msg_src must be the PID for ScriptExecutionService._on_statechange
self._queue_pubsub(
msg_src=msg_src,
topic="procedure.lifecycle.statechange",
new_state=ProcedureState.FAILED,
)
# Publish stacktrace via event queue
# msg_src must be the PID for ScriptExecutionService._on_stacktrace
self._queue_pubsub(
msg_src=msg_src,
topic="procedure.lifecycle.stacktrace",
stacktrace=error_msg,
)
# Clean up ScriptWorker resources
self.cleanup_worker(pid)
[docs]
def handle_management_topic(self, topic: str, kwargs: dict):
"""
Handle a procedure.management.* topic.
:param topic: The management topic to handle
:param kwargs: Topic arguments
"""
try:
match topic:
case "procedure.management.create":
self.create_worker(
kwargs["pid"], kwargs["script"], kwargs["init_args"]
)
case "procedure.management.run":
self.send_run(kwargs["pid"], kwargs["fn_name"], kwargs["run_args"])
case "procedure.management.stop":
self.stop_worker(kwargs["pid"])
case _:
self.main_ctx.log(
logging.WARNING, f"Unknown management topic: {topic}"
)
except KeyError as e:
self.main_ctx.log(logging.ERROR, f"Missing required kwarg for {topic}: {e}")
[docs]
def handle_shutdown(self, msg_src: str) -> None:
"""
Handle SHUTDOWN event - clean up worker resources.
:param msg_src: Source of the shutdown event (PID as string)
"""
self.main_ctx.log(logging.INFO, f"Process complete (main loop): {msg_src}")
try:
pid = int(msg_src)
self.cleanup_worker(pid)
except (ValueError, TypeError):
pass # Not a numeric PID, ignore
def _queue_pubsub(self, msg_src: str, topic: str, **kwargs) -> None:
"""
Queue a PUBSUB message to be distributed via _handle_pubsub_event.
:param msg_src: message source, identifying where the message originates
:param topic: PUBSUB topic
:param kwargs: Topic arguments
"""
msg = EventMessage(
msg_src=msg_src,
msg_type="PUBSUB",
msg=dict(topic=topic, kwargs=kwargs),
)
self.main_ctx.event_queue.put(msg)
[docs]
def main(
mp_ctx: multiprocessing.context.BaseContext, logging_config_override: dict = None
):
"""
Create the OET components and start an event loop that dispatches messages
between them.
:param logging_config_override:
"""
# All queues and processes are created via a MainContext so that they are
# shared correctly and have consistent lifecycle management
with MainContext(mp_ctx) as main_ctx:
if logging_config_override:
main_ctx.init_logging(logging_config_override)
# wire SIGINT and SIGTERM signal handlers to the shutdown_event Event
# monitored by all processes, so that the processes know when
# application termination has been requested.
init_signals(
main_ctx.shutdown_event, default_signal_handler, default_signal_handler
)
# create our message queues:
# script_executor_q is the message queue for messages from the ScriptExecutionServiceWorker
# activity_q is the message queue for messages from the ActivityServiceWorker
# fastapi_q is the queue for messages intended for the FastAPIWorker process
script_executor_q = main_ctx.MPQueue()
activity_q = main_ctx.MPQueue()
fastapi_q = main_ctx.MPQueue()
# event bus messages received on the event_queue (the main queue that
# child processes push to and which the while loop below listens to)
# will be pushed onto the queues in this list
event_bus_queues = [script_executor_q, activity_q, fastapi_q]
# Create environment manager for script execution environments
env_manager = EnvironmentManager(mp_ctx)
# create the OET components, which will run in child Python processes
# and monitor the message queues here for event bus messages
main_ctx.Proc(
"SESWorker", ScriptExecutionServiceWorker, script_executor_q, mp_ctx
)
main_ctx.Proc(
"ActivityServiceWorker", ActivityServiceWorker, activity_q, mp_ctx
)
main_ctx.Proc("FastAPIWorker", FastAPIWorker, fastapi_q)
# with all workers and queues set up, start processing messages
main_loop(main_ctx, event_bus_queues, env_manager)
[docs]
def main_loop(
main_ctx: MainContext,
event_bus_queues: list[MPQueue],
env_manager: EnvironmentManager,
):
"""
Main message parsing and routing loop, extracted from main() to increase
testability.
:param main_ctx: Main context for queue and process management
:param event_bus_queues: Queues for broadcasting to EventBusWorkers
:param env_manager: Environment manager for script execution environments
"""
worker_pool = ScriptWorkerPool(main_ctx, env_manager)
while not main_ctx.shutdown_event.is_set():
event = main_ctx.event_queue.safe_get()
if not event:
continue
match event.msg_type:
case "END":
main_ctx.log(logging.INFO, f"Shutdown Event received: {event.msg}")
break
case "PUBSUB":
main_ctx.log(logging.DEBUG, f"Handling PUBSUB message: {event}")
_handle_pubsub_event(event, worker_pool, event_bus_queues)
case "SHUTDOWN":
main_ctx.log(
logging.INFO, f"Process complete (main loop): {event.msg_src}"
)
_handle_shutdown_event(event, worker_pool)
case "FATAL":
main_ctx.log(logging.INFO, f"Fatal Event received: {event.msg}")
_handle_fatal_event(event, worker_pool)
case _:
main_ctx.log(logging.ERROR, f"Unhandled Event: {event}")
def _handle_pubsub_event(
event: EventMessage,
worker_pool: ScriptWorkerPool,
event_bus_queues: list[MPQueue],
) -> None:
"""
Handle PUBSUB event type.
:param event: The PUBSUB event message
:param worker_pool: ScriptWorker pool instance
:param event_bus_queues: Queues for broadcasting to EventBusWorkers
"""
payload = event.msg
topic = payload["topic"]
# Only route request topics to ScriptWorkerPool
management_request_topics = {
"procedure.management.create",
"procedure.management.run",
"procedure.management.stop",
}
if topic in management_request_topics:
worker_pool.handle_management_topic(topic, payload["kwargs"])
else:
# Broadcast to all EventBusWorkers
for q in event_bus_queues:
q.put(event)
# Forward to all active ScriptWorkers
worker_pool.broadcast_to_workers(event)
def _handle_shutdown_event(
event: EventMessage,
worker_pool: ScriptWorkerPool,
) -> None:
"""
Handle SHUTDOWN event - clean up worker resources.
:param event: The SHUTDOWN event message
:param worker_pool: ScriptWorker pool instance
"""
worker_pool.handle_shutdown(event.msg_src)
def _handle_fatal_event(
event: EventMessage,
worker_pool: ScriptWorkerPool,
) -> None:
"""
Handle FATAL event - publish FAILED state and stacktrace.
:param event: The FATAL event message
:param worker_pool: ScriptWorker pool instance
"""
worker_pool.handle_fatal(event.msg_src, event.msg)
if __name__ == "__main__":
log_level = os.getenv("LOG_LEVEL", "ERROR")
configure_logging(level=log_level, tags_filter=AuditLogFilter)
# BTN-2667: spawn mode is now a hard requirement, otherwise parent dependencies
# are not overridden in the child processes
mp = multiprocessing.get_context("spawn")
# The only way to access the SKA logging config is via a private property. :(
# pylint: disable=protected-access
logging_config = ska_ser_logging.configuration._LOGGING_CONFIG.copy()
logging_config["root"]["level"] = log_level
main(mp, logging_config)