Source code for ska_oso_oet.main

# pylint: disable=attribute-defined-outside-init
import logging
import logging.config
import logging.handlers
import multiprocessing
import os
import threading
import traceback

import ska_ser_logging
import uvicorn
from pubsub import pub
from ska_aaa_authhelpers import AuditLogFilter
from ska_ser_logging import configure_logging

from ska_oso_oet import ui
from ska_oso_oet.activity.application import ActivityCommand, ActivityService
from ska_oso_oet.event import topics
from ska_oso_oet.mptools import (
    EventMessage,
    MainContext,
    MPQueue,
    Proc,
    QueueProcWorker,
    default_signal_handler,
    init_signals,
)
from ska_oso_oet.procedure.application import (
    PrepareProcessCommand,
    ProcedureSummary,
    ScriptExecutionService,
    StartProcessCommand,
    StopProcessCommand,
)
from ska_oso_oet.procedure.domain import (
    PROC_TERMINATE_TIMEOUT_SECS,
    GitScript,
    ProcedureInput,
    ProcedureState,
    ScriptWorker,
)
from ska_oso_oet.procedure.environment import EnvironmentManager
from ska_oso_oet.utils.ui import API_PATH


[docs] class EventBusWorker(QueueProcWorker): """ EventBusWorker converts external inter-process pub/sub messages to and from local intra-process pubsub messages. EventBusWorker uses the QueueProcWorker's 'work queue' as an inbox for pub/sub EventMessages sent by other ProcWorkers. EventMessages received on this queue are rebroadcast locally as pypubsub messages. Likewise, the EventBusWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the 'main' queue for transmission to other EventBusWorkers. """
[docs] def republish(self, topic: pub.Topic = pub.AUTO_TOPIC, **kwargs) -> None: """ Republish a local event over the inter-process event bus. :param topic: message topic, set automatically by pypubsub :param kwargs: any metadata associated with pypubsub message :return: """ # No message source = virgin event published on pypubsub if (msg_src := kwargs.pop("msg_src", self.name)) is None: msg_src = self.name # To avoid an infinite loop, do not republish external events. # Only local messages (message source = this process), are sent # out to the main queue and hence on to other EventBusWorkers if msg_src == self.name: # Convert pypubsub event to the equivalent mptools EventMessage msg = EventMessage( self.name, "PUBSUB", dict(topic=topic.name, kwargs=kwargs) ) # not that this is a blocking put. If the queue is full, this call # will block until the queue has room to accept the message self.log(logging.DEBUG, "Queueing internal event: %s", msg) self.event_q.put(msg)
[docs] def startup(self) -> None: """ Connect republishing function to pypubsub. """ super().startup() # AT2-591. Clear any subscriptions inherited from parent process during fork unsubscribed = pub.unsubAll() self.log( logging.DEBUG, "Unsubscribed %s pypubsub subscriptions in Procedure #%s (PID=%s)", len(unsubscribed), self.name, os.getpid(), ) # Request republish method be called for all pypubsub messages pub.subscribe(self.republish, pub.ALL_TOPICS)
[docs] def shutdown(self) -> None: """ Disconnect republishing function from pypubsub """ super().shutdown() # Technically, unsubscribing is unnecessary as pypubsub holds weak # references to listeners and automatically unsubscribes listeners # that have been deleted pub.unsubscribe(self.republish, pub.ALL_TOPICS)
# relax pylint to ignore renaming of item to evt. The base class handles # items of any type. We want to constrain this subclass to a more specific # event type.
[docs] def main_func(self, evt: EventMessage) -> None: # pylint: disable=arguments-renamed """ Republish external pub/sub message locally. QueueProcWorker ensures that main_func is called for every item in the work queue. This function takes that work item - the external pub/sub EventMessage - and rebroadcasts it locally as a pypubsub message. :param evt: pub/sub EventMessage to broadcast locally """ # avoid infinite loop - do not reprocess events that originated from us if evt.msg_src != self.name: self.log(logging.DEBUG, "Republishing external event: %s", evt) payload = evt.msg topic = payload["topic"] pub.sendMessage(topic, msg_src=evt.msg_src, **payload["kwargs"]) else: self.log(logging.DEBUG, "Discarding internal event: %s", evt)
def send_message(self, topic, **kwargs): pub.sendMessage(topic, msg_src=self.name, **kwargs)
[docs] class FastAPIWorker(EventBusWorker): """ FastAPIWorker is an EventBusWorker that runs a FastAPI app. By extending EventBusWorker, FastAPI functions can use pypubsub to subscribe to and publish messages, and these messages will put on the main queue to be broadcast to other EventBusWorkers. """
[docs] def startup(self) -> None: # Call super.startup to enable pypubsub <-> event queue republishing super().startup() server_log_level = os.getenv("LOG_LEVEL", "ERROR") self.app = ui.create_fastapi_app() config = uvicorn.Config( app=self.app, host="0.0.0.0", port=5000, log_config=None, # allows config set in main() to be inherited log_level=server_log_level.lower(), ) self.server = uvicorn.Server(config=config) # override default msg_src with our real process name self.app.state.msg_src = self.name self.app.state.sse_shutdown_event = threading.Event() # start FastAPI in a thread as app.run is a blocking call # Can't be created in __init__ as we want this thread to belong to # the child process, not the spawning process self.server_thread = threading.Thread(target=self.server.run) self.server_thread.start() logging.info("OET listening for REST API requests at %s", API_PATH)
[docs] def shutdown(self) -> None: self.app.state.sse_shutdown_event.set() self.server.should_exit = True self.server_thread.join(timeout=3) # Call super.shutdown to disconnect from pypubsub super().shutdown()
[docs] class ScriptExecutionServiceWorker(EventBusWorker): """ ScriptExecutionService listens for user request messages, calling the appropriate ScriptExecutionService function and broadcasting its response. Actions that occur in the user request domain ('user clicked start observation', 'user aborted observation using the CLI', etc.) are broadcast as events. ScriptExecutionServiceWorker listens for events on these topics and triggers the required action in the script execution domain ('start a script', 'abort a script', etc.). Currently, the result of the action that occurred in the script execution domain (=the return object from the ScriptExecutionService) is broadcast to the world by the ScriptExecutionServiceWorker. This could change so that the ScriptExecutionService itself sends the message. """
[docs] def __init__( self, name: str, startup_event: multiprocessing.Event, shutdown_event: multiprocessing.Event, event_q: MPQueue, work_q: MPQueue, mp_context: multiprocessing.context.BaseContext, *args, **kwargs, ): super().__init__( name, startup_event, shutdown_event, event_q, work_q, *args, **kwargs ) self._mp_context = mp_context
def prepare( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: str, cmd: PrepareProcessCommand, ): self.log(logging.DEBUG, "Prepare procedure request %s: %s", request_id, cmd) try: summary = self.ses.prepare(cmd) # Catch all exceptions so that they can be properly displayed by the rest server except Exception as e: # pylint: disable=broad-except self.log(logging.INFO, "Prepare procedure %s failed: %s", request_id, e) # TODO create failure topic for failures in procedure domain self.send_message( topics.procedure.lifecycle.created, request_id=request_id, result=e ) else: self.log( logging.DEBUG, "Prepare procedure %s result: %s", request_id, summary ) self.send_message( topics.procedure.lifecycle.created, request_id=request_id, result=summary, ) def start( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: str, cmd: StartProcessCommand, ): try: self.log(logging.DEBUG, "Start procedure request %s: %s", request_id, cmd) summary = self.ses.start(cmd) except Exception as e: # pylint: disable=broad-except self.log(logging.INFO, "Start procedure %s failed: %s", request_id, e) # TODO create failure topic for failures in procedure domain self.send_message( topics.procedure.lifecycle.started, request_id=request_id, result=e ) else: self.log( logging.DEBUG, "Start procedure %s result: %s", request_id, summary ) self.send_message( topics.procedure.lifecycle.started, request_id=request_id, result=summary, ) def list( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: str, pids=None, ): self.log(logging.DEBUG, "List procedures for request %s", request_id) try: summaries = self.ses.summarise(pids) except ValueError: # ValueError raised when PID not found. summaries = [] self.log(logging.DEBUG, "List result: %s", summaries) self.send_message( topics.procedure.pool.list, request_id=request_id, result=summaries ) def stop( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: str, cmd: StopProcessCommand, ): self.log(logging.DEBUG, "Stop procedure request %s: %s", request_id, cmd) try: summary = self.ses.stop(cmd) except FileNotFoundError as e: # FileNotFoundError raised when abort.py script not found self.log(logging.INFO, "Stop procedure %s failed: %s", request_id, e) # TODO create failure topic for failures in procedure domain # (or refactor abortion script creation so that FileNotFound # is caught only once in prepare) self.send_message( topics.procedure.lifecycle.stopped, request_id=request_id, result=e ) else: self.log(logging.DEBUG, "Stop result: %s", summary) self.send_message( topics.procedure.lifecycle.stopped, request_id=request_id, result=summary, ) def get_override_state( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: str, ): self.log(logging.DEBUG, "Get override state request %s", request_id) self.send_message( topics.operator.override.state, request_id=request_id, result=self.ses.script_context, )
[docs] def startup(self) -> None: super().startup() # self.ses can't be created in __init__ as we want the service to belong to # the child process, not the spawning process self.ses = ScriptExecutionService() # wire up topics to the corresponding SES methods pub.subscribe(self.prepare, topics.request.procedure.create) pub.subscribe(self.start, topics.request.procedure.start) pub.subscribe(self.list, topics.request.procedure.list) pub.subscribe(self.stop, topics.request.procedure.stop) # wire up wait_for_qa_ready override handlers pub.subscribe( self.ses.handle_wait_for_qa_ready_enable, topics.operator.wait_for_qa_ready.enable, ) pub.subscribe( self.ses.handle_wait_for_qa_ready_disable, topics.operator.wait_for_qa_ready.disable, ) pub.subscribe( self.get_override_state, topics.request.operator.override.state, )
[docs] def shutdown(self) -> None: # unsubscribing isn't technically required as pypubsub uses weak # references to clients, but for completeness we do it anyway. pub.unsubscribe(self.prepare, pub.ALL_TOPICS) pub.unsubscribe(self.start, pub.ALL_TOPICS) pub.unsubscribe(self.list, pub.ALL_TOPICS) pub.unsubscribe(self.stop, pub.ALL_TOPICS) pub.unsubscribe(self.get_override_state, pub.ALL_TOPICS) pub.unsubscribe(self.ses.handle_wait_for_qa_ready_enable, pub.ALL_TOPICS) pub.unsubscribe(self.ses.handle_wait_for_qa_ready_disable, pub.ALL_TOPICS) super().shutdown()
[docs] class ActivityServiceWorker(EventBusWorker): """ ActivityServiceWorker listens for user request messages, calling the appropriate ActivityService function and broadcasting its response. """
[docs] def __init__( self, name: str, startup_event: multiprocessing.Event, shutdown_event: multiprocessing.Event, event_q: MPQueue, work_q: MPQueue, mp_context: multiprocessing.context.BaseContext, *args, **kwargs, ): super().__init__( name, startup_event, shutdown_event, event_q, work_q, *args, **kwargs ) self._mp_context = mp_context
[docs] def startup(self) -> None: super().startup() # self.activity_service can't be created in __init__ as we want the service to belong to # the child process, not the spawning process self.activity_service = ActivityService() # wire up topics to the corresponding ActivityService methods pub.subscribe(self.list, topics.request.activity.list) pub.subscribe(self.prepare, topics.request.activity.run) pub.subscribe(self.on_procedure_created, topics.procedure.lifecycle.created) pub.subscribe(self.on_procedure_ready, topics.procedure.lifecycle.ready)
[docs] def shutdown(self) -> None: pub.unsubscribe(self.list, topics.request.activity.list) pub.unsubscribe(self.prepare, topics.request.activity.run) pub.unsubscribe(self.on_procedure_created, topics.procedure.lifecycle.created) pub.unsubscribe(self.on_procedure_ready, topics.procedure.lifecycle.ready) # TODO ActivityService doesn't have same shutdown method SES does? super().shutdown()
def list( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: int, activity_ids=None, ): self.log(logging.DEBUG, "List activities for request %s", request_id) try: summaries = self.activity_service.summarise(activity_ids) except ValueError: # ValueError raised when Activity ID not found. summaries = [] self.log(logging.DEBUG, "Activity List result: %s", summaries) self.send_message( topics.activity.pool.list, request_id=request_id, result=summaries ) def prepare( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: int, cmd: ActivityCommand, ): try: self.log( logging.DEBUG, "Preparing activity for request %s: %s", request_id, cmd ) self.activity_service.prepare_run_activity(cmd, request_id) except Exception as e: # pylint: disable=broad-except self.log( logging.ERROR, "Preparing activity for request %s failed: %s", request_id, e, ) # TODO create failure topic for failures in activity domain self.send_message( topics.activity.lifecycle.running, request_id=request_id, result=e )
[docs] def on_procedure_created( self, # msg_src MUST be part of method signature for pypubsub to function msg_src, # pylint: disable=unused-argument request_id: int, result: ProcedureSummary, ): """ Handle procedure.lifecycle.created by linking the Procedure to its requesting Activity. The "created" event fires at the LOADING->IDLE transition. At this point the OS subprocess has been spawned and the user module has been loaded into it (imported), but no user-defined functions (init, main) have been called yet. This handler records the procedure ID on the Activity so that later lifecycle events (e.g., READY) can be correlated back to the correct Activity. """ try: self.log( logging.DEBUG, "Linking procedure to activity for request %s: %s", request_id, result, ) summary = self.activity_service.link_procedure_to_activity( result, request_id ) except Exception as e: # pylint: disable=broad-except self.log( logging.ERROR, "Linking procedure to activity for request %s failed: %s", request_id, e, ) # TODO create failure topic for failures in activity domain self.send_message( topics.activity.lifecycle.running, request_id=request_id, result=e ) else: if summary is not None: self.log( logging.DEBUG, "Activity request %s result: %s", request_id, summary ) self.send_message( topics.activity.lifecycle.running, request_id=request_id, result=summary, )
[docs] def on_procedure_ready( self, msg_src, request_id, # pylint: disable=unused-argument result: ProcedureSummary, ): """ Handle procedure.lifecycle.ready by dispatching RUN(main) for the linked Activity. The "ready" event fires at the INITIALISING->READY transition, after the user-defined init() function has returned. At this point the subprocess is fully prepared: the user module is loaded and init has completed, so it is safe to call main() without colliding with a still-running init. This handler is skipped for prepare-only activities and for activities where main has already been dispatched. """ try: self.log( logging.DEBUG, "Starting activity for procedure %s: %s", msg_src, result, ) self.activity_service.dispatch_main(result) except Exception as e: # pylint: disable=broad-except self.log( logging.ERROR, "Starting activity for procedure %s failed: %s", msg_src, e, )
[docs] class ScriptWorkerPool: """ Manages the lifecycle of ScriptWorker processes. This pool owns the queues and process references for all running ScriptWorkers, handling creation, messaging, and cleanup. """
[docs] def __init__( self, main_ctx: MainContext, env_manager: EnvironmentManager, ): self.main_ctx = main_ctx self.env_manager = env_manager self._queues: dict[int, MPQueue] = {} self._procs: dict[int, Proc] = {}
[docs] def create_worker( self, pid: int, script: GitScript, init_args: ProcedureInput, ) -> None: """ Create a new ScriptWorker process. :param pid: Process ID for the worker :param script: Script to execute :param init_args: Initialization arguments for the script """ self.main_ctx.log(logging.DEBUG, f"Creating ScriptWorker #{pid} for {script}") # Create environment if needed (done in main process for Event inheritance) environment = None if isinstance(script, GitScript) and script.create_env: environment = self.env_manager.create_env(script.git_args) msg_src = "main_loop" try: # Create work queue for this script work_q = self.main_ctx.MPQueue() # Prime the work queue with ENV, LOAD, RUN messages # ENV message (if environment is needed) if environment is not None: env_msg = EventMessage(msg_src=msg_src, msg_type="ENV", msg=script) work_q.safe_put(env_msg) # LOAD message load_msg = EventMessage(msg_src=msg_src, msg_type="LOAD", msg=script) work_q.safe_put(load_msg) # RUN message for init function init_msg = EventMessage(msg_src=msg_src, msg_type="RUN", msg=("init", None)) work_q.safe_put(init_msg) # Create the ScriptWorker process proc = self.main_ctx.Proc( str(pid), ScriptWorker, work_q, *init_args.args, environment=environment, **init_args.kwargs, ) # Store references self._queues[pid] = work_q self._procs[pid] = proc # Send success response self._queue_pubsub( msg_src=msg_src, topic="procedure.management.created", pid=pid, success=True, error=None, ) except Exception: # pylint: disable=broad-exception-caught # Send failure response for any error during ScriptWorker creation error_msg = traceback.format_exc() self.main_ctx.log( logging.ERROR, f"Failed to create ScriptWorker #{pid}: {error_msg}" ) self._queue_pubsub( msg_src=msg_src, topic="procedure.management.created", pid=pid, success=False, error=error_msg, )
[docs] def send_run(self, pid: int, fn_name: str, run_args: ProcedureInput) -> None: """ Send a RUN message to a ScriptWorker. :param pid: Process ID of the worker :param fn_name: Function name to run :param run_args: Arguments for the function """ self.main_ctx.log( logging.DEBUG, f"Sending 'run {fn_name}' message to PID {pid}" ) if pid not in self._queues: self.main_ctx.log(logging.ERROR, f"PID {pid} not found in script_queues") return work_q = self._queues[pid] msg = EventMessage( msg_src="main_loop", msg_type="RUN", msg=(fn_name, run_args), ) work_q.safe_put(msg)
[docs] def broadcast_to_workers(self, msg: EventMessage) -> None: """ Forward a message to all active ScriptWorker work queues. :param msg: EventMessage to forward """ for work_q in self._queues.values(): work_q.safe_put(msg)
[docs] def stop_worker(self, pid: int) -> None: """ Terminate a ScriptWorker and publish state change. :param pid: Process ID of the worker to stop """ self.main_ctx.log(logging.DEBUG, f"Stopping ScriptWorker #{pid}") if pid not in self._procs: self.main_ctx.log(logging.ERROR, f"PID {pid} not found in script_procs") return proc = self._procs[pid] if proc.proc.is_alive(): terminated = proc.terminate( max_retries=3, timeout=PROC_TERMINATE_TIMEOUT_SECS ) final_state = ( ProcedureState.STOPPED if terminated else ProcedureState.UNKNOWN ) # Publish state change via event queue # msg_src must be the PID for ScriptExecutionService._on_statechange self._queue_pubsub( msg_src=str(pid), topic="procedure.lifecycle.statechange", new_state=final_state, ) # Clean up zombie processes multiprocessing.active_children() # Only cleanup if termination succeeded, otherwise we clean up # resources for an unresponsive process that may still be running. # The orphaned process should be cleaned up during normal shutdown. if terminated: self.cleanup_worker(pid) else: # Process already dead, safe to cleanup self.cleanup_worker(pid)
[docs] def cleanup_worker(self, pid: int) -> None: """ Remove worker tracking (called on SHUTDOWN/FATAL). :param pid: Process ID of the worker to clean up """ if queue := self._queues.pop(pid, None): self.main_ctx.stop_queues([queue]) if proc := self._procs.pop(pid, None): self.main_ctx.remove_proc(proc)
[docs] def handle_fatal(self, msg_src: str, error_msg: str) -> None: """ Handle a ScriptWorker crash - publish FAILED state and stacktrace. :param msg_src: Source of the fatal event (PID as string) :param error_msg: Error message/stacktrace from the crashed worker """ self.main_ctx.log(logging.INFO, f"Fatal Event received: {error_msg}") try: pid = int(msg_src) except (ValueError, TypeError): return # Not a numeric PID, ignore # Publish FAILED state change via event queue # msg_src must be the PID for ScriptExecutionService._on_statechange self._queue_pubsub( msg_src=msg_src, topic="procedure.lifecycle.statechange", new_state=ProcedureState.FAILED, ) # Publish stacktrace via event queue # msg_src must be the PID for ScriptExecutionService._on_stacktrace self._queue_pubsub( msg_src=msg_src, topic="procedure.lifecycle.stacktrace", stacktrace=error_msg, ) # Clean up ScriptWorker resources self.cleanup_worker(pid)
[docs] def handle_management_topic(self, topic: str, kwargs: dict): """ Handle a procedure.management.* topic. :param topic: The management topic to handle :param kwargs: Topic arguments """ try: match topic: case "procedure.management.create": self.create_worker( kwargs["pid"], kwargs["script"], kwargs["init_args"] ) case "procedure.management.run": self.send_run(kwargs["pid"], kwargs["fn_name"], kwargs["run_args"]) case "procedure.management.stop": self.stop_worker(kwargs["pid"]) case _: self.main_ctx.log( logging.WARNING, f"Unknown management topic: {topic}" ) except KeyError as e: self.main_ctx.log(logging.ERROR, f"Missing required kwarg for {topic}: {e}")
[docs] def handle_shutdown(self, msg_src: str) -> None: """ Handle SHUTDOWN event - clean up worker resources. :param msg_src: Source of the shutdown event (PID as string) """ self.main_ctx.log(logging.INFO, f"Process complete (main loop): {msg_src}") try: pid = int(msg_src) self.cleanup_worker(pid) except (ValueError, TypeError): pass # Not a numeric PID, ignore
def _queue_pubsub(self, msg_src: str, topic: str, **kwargs) -> None: """ Queue a PUBSUB message to be distributed via _handle_pubsub_event. :param msg_src: message source, identifying where the message originates :param topic: PUBSUB topic :param kwargs: Topic arguments """ msg = EventMessage( msg_src=msg_src, msg_type="PUBSUB", msg=dict(topic=topic, kwargs=kwargs), ) self.main_ctx.event_queue.put(msg)
[docs] def main( mp_ctx: multiprocessing.context.BaseContext, logging_config_override: dict = None ): """ Create the OET components and start an event loop that dispatches messages between them. :param logging_config_override: """ # All queues and processes are created via a MainContext so that they are # shared correctly and have consistent lifecycle management with MainContext(mp_ctx) as main_ctx: if logging_config_override: main_ctx.init_logging(logging_config_override) # wire SIGINT and SIGTERM signal handlers to the shutdown_event Event # monitored by all processes, so that the processes know when # application termination has been requested. init_signals( main_ctx.shutdown_event, default_signal_handler, default_signal_handler ) # create our message queues: # script_executor_q is the message queue for messages from the ScriptExecutionServiceWorker # activity_q is the message queue for messages from the ActivityServiceWorker # fastapi_q is the queue for messages intended for the FastAPIWorker process script_executor_q = main_ctx.MPQueue() activity_q = main_ctx.MPQueue() fastapi_q = main_ctx.MPQueue() # event bus messages received on the event_queue (the main queue that # child processes push to and which the while loop below listens to) # will be pushed onto the queues in this list event_bus_queues = [script_executor_q, activity_q, fastapi_q] # Create environment manager for script execution environments env_manager = EnvironmentManager(mp_ctx) # create the OET components, which will run in child Python processes # and monitor the message queues here for event bus messages main_ctx.Proc( "SESWorker", ScriptExecutionServiceWorker, script_executor_q, mp_ctx ) main_ctx.Proc( "ActivityServiceWorker", ActivityServiceWorker, activity_q, mp_ctx ) main_ctx.Proc("FastAPIWorker", FastAPIWorker, fastapi_q) # with all workers and queues set up, start processing messages main_loop(main_ctx, event_bus_queues, env_manager)
[docs] def main_loop( main_ctx: MainContext, event_bus_queues: list[MPQueue], env_manager: EnvironmentManager, ): """ Main message parsing and routing loop, extracted from main() to increase testability. :param main_ctx: Main context for queue and process management :param event_bus_queues: Queues for broadcasting to EventBusWorkers :param env_manager: Environment manager for script execution environments """ worker_pool = ScriptWorkerPool(main_ctx, env_manager) while not main_ctx.shutdown_event.is_set(): event = main_ctx.event_queue.safe_get() if not event: continue match event.msg_type: case "END": main_ctx.log(logging.INFO, f"Shutdown Event received: {event.msg}") break case "PUBSUB": main_ctx.log(logging.DEBUG, f"Handling PUBSUB message: {event}") _handle_pubsub_event(event, worker_pool, event_bus_queues) case "SHUTDOWN": main_ctx.log( logging.INFO, f"Process complete (main loop): {event.msg_src}" ) _handle_shutdown_event(event, worker_pool) case "FATAL": main_ctx.log(logging.INFO, f"Fatal Event received: {event.msg}") _handle_fatal_event(event, worker_pool) case _: main_ctx.log(logging.ERROR, f"Unhandled Event: {event}")
def _handle_pubsub_event( event: EventMessage, worker_pool: ScriptWorkerPool, event_bus_queues: list[MPQueue], ) -> None: """ Handle PUBSUB event type. :param event: The PUBSUB event message :param worker_pool: ScriptWorker pool instance :param event_bus_queues: Queues for broadcasting to EventBusWorkers """ payload = event.msg topic = payload["topic"] # Only route request topics to ScriptWorkerPool management_request_topics = { "procedure.management.create", "procedure.management.run", "procedure.management.stop", } if topic in management_request_topics: worker_pool.handle_management_topic(topic, payload["kwargs"]) else: # Broadcast to all EventBusWorkers for q in event_bus_queues: q.put(event) # Forward to all active ScriptWorkers worker_pool.broadcast_to_workers(event) def _handle_shutdown_event( event: EventMessage, worker_pool: ScriptWorkerPool, ) -> None: """ Handle SHUTDOWN event - clean up worker resources. :param event: The SHUTDOWN event message :param worker_pool: ScriptWorker pool instance """ worker_pool.handle_shutdown(event.msg_src) def _handle_fatal_event( event: EventMessage, worker_pool: ScriptWorkerPool, ) -> None: """ Handle FATAL event - publish FAILED state and stacktrace. :param event: The FATAL event message :param worker_pool: ScriptWorker pool instance """ worker_pool.handle_fatal(event.msg_src, event.msg) if __name__ == "__main__": log_level = os.getenv("LOG_LEVEL", "ERROR") configure_logging(level=log_level, tags_filter=AuditLogFilter) # BTN-2667: spawn mode is now a hard requirement, otherwise parent dependencies # are not overridden in the child processes mp = multiprocessing.get_context("spawn") # The only way to access the SKA logging config is via a private property. :( # pylint: disable=protected-access logging_config = ska_ser_logging.configuration._LOGGING_CONFIG.copy() logging_config["root"]["level"] = log_level main(mp, logging_config)