ska_oso_oet

Reading ska_oso_oet.ini file value and initializing constant of feature toggle with enabling event based polling/pubsub

ska_oso_oet.main

class ska_oso_oet.main.ActivityServiceWorker(name, startup_event, shutdown_event, event_q, work_q, mp_context, *args, **kwargs)[source]

ActivityServiceWorker listens for user request messages, calling the appropriate ActivityService function and broadcasting its response.

__init__(name, startup_event, shutdown_event, event_q, work_q, mp_context, *args, **kwargs)[source]

Create a new QueueProcWorker.

The events and MPQueues passed to this constructor should be created and managed within the scope of a MainContext context manager and shared with other ProcWorkers, so that the communication queues are shared correctly between Python processes and there is a common event that can be set to notify all processes when shutdown is required.

Parameters:
  • name (str) – name of this worker

  • startup_event (Event) – event to trigger when startup is complete

  • shutdown_event (Event) – event to monitor for shutdown

  • event_q (MPQueue) – outbox for posting messages to main context

  • work_q (MPQueue) – inbox message queue for work messages

  • args – captures other anonymous arguments

  • kwargs – captures other keyword arguments

on_procedure_created(msg_src, request_id, result)[source]

Handle procedure.lifecycle.created by linking the Procedure to its requesting Activity.

The “created” event fires at the LOADING->IDLE transition. At this point the OS subprocess has been spawned and the user module has been loaded into it (imported), but no user-defined functions (init, main) have been called yet. This handler records the procedure ID on the Activity so that later lifecycle events (e.g., READY) can be correlated back to the correct Activity.

on_procedure_ready(msg_src, request_id, result)[source]

Handle procedure.lifecycle.ready by dispatching RUN(main) for the linked Activity.

The “ready” event fires at the INITIALISING->READY transition, after the user-defined init() function has returned. At this point the subprocess is fully prepared: the user module is loaded and init has completed, so it is safe to call main() without colliding with a still-running init. This handler is skipped for prepare-only activities and for activities where main has already been dispatched.

shutdown()[source]

Disconnect republishing function from pypubsub

Return type:

None

startup()[source]

Connect republishing function to pypubsub.

Return type:

None

class ska_oso_oet.main.EventBusWorker(name, startup_event, shutdown_event, event_q, work_q, *args, **kwargs)[source]

EventBusWorker converts external inter-process pub/sub messages to and from local intra-process pubsub messages.

EventBusWorker uses the QueueProcWorker’s ‘work queue’ as an inbox for pub/sub EventMessages sent by other ProcWorkers. EventMessages received on this queue are rebroadcast locally as pypubsub messages. Likewise, the EventBusWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other EventBusWorkers.

main_func(evt)[source]

Republish external pub/sub message locally.

QueueProcWorker ensures that main_func is called for every item in the work queue. This function takes that work item - the external pub/sub EventMessage - and rebroadcasts it locally as a pypubsub message.

Parameters:

evt (EventMessage) – pub/sub EventMessage to broadcast locally

Return type:

None

republish(topic=pubsub.pub.AUTO_TOPIC, **kwargs)[source]

Republish a local event over the inter-process event bus.

Parameters:
  • topic (Topic) – message topic, set automatically by pypubsub

  • kwargs – any metadata associated with pypubsub message

Return type:

None

Returns:

shutdown()[source]

Disconnect republishing function from pypubsub

Return type:

None

startup()[source]

Connect republishing function to pypubsub.

Return type:

None

class ska_oso_oet.main.FastAPIWorker(name, startup_event, shutdown_event, event_q, work_q, *args, **kwargs)[source]

FastAPIWorker is an EventBusWorker that runs a FastAPI app.

By extending EventBusWorker, FastAPI functions can use pypubsub to subscribe to and publish messages, and these messages will put on the main queue to be broadcast to other EventBusWorkers.

shutdown()[source]

Disconnect republishing function from pypubsub

Return type:

None

startup()[source]

Connect republishing function to pypubsub.

Return type:

None

class ska_oso_oet.main.ScriptExecutionServiceWorker(name, startup_event, shutdown_event, event_q, work_q, mp_context, *args, **kwargs)[source]

ScriptExecutionService listens for user request messages, calling the appropriate ScriptExecutionService function and broadcasting its response.

Actions that occur in the user request domain (‘user clicked start observation’, ‘user aborted observation using the CLI’, etc.) are broadcast as events. ScriptExecutionServiceWorker listens for events on these topics and triggers the required action in the script execution domain (‘start a script’, ‘abort a script’, etc.).

Currently, the result of the action that occurred in the script execution domain (=the return object from the ScriptExecutionService) is broadcast to the world by the ScriptExecutionServiceWorker. This could change so that the ScriptExecutionService itself sends the message.

__init__(name, startup_event, shutdown_event, event_q, work_q, mp_context, *args, **kwargs)[source]

Create a new QueueProcWorker.

The events and MPQueues passed to this constructor should be created and managed within the scope of a MainContext context manager and shared with other ProcWorkers, so that the communication queues are shared correctly between Python processes and there is a common event that can be set to notify all processes when shutdown is required.

Parameters:
  • name (str) – name of this worker

  • startup_event (Event) – event to trigger when startup is complete

  • shutdown_event (Event) – event to monitor for shutdown

  • event_q (MPQueue) – outbox for posting messages to main context

  • work_q (MPQueue) – inbox message queue for work messages

  • args – captures other anonymous arguments

  • kwargs – captures other keyword arguments

shutdown()[source]

Disconnect republishing function from pypubsub

Return type:

None

startup()[source]

Connect republishing function to pypubsub.

Return type:

None

class ska_oso_oet.main.ScriptWorkerPool(main_ctx, env_manager)[source]

Manages the lifecycle of ScriptWorker processes.

This pool owns the queues and process references for all running ScriptWorkers, handling creation, messaging, and cleanup.

__init__(main_ctx, env_manager)[source]
broadcast_to_workers(msg)[source]

Forward a message to all active ScriptWorker work queues.

Parameters:

msg (EventMessage) – EventMessage to forward

Return type:

None

cleanup_worker(pid)[source]

Remove worker tracking (called on SHUTDOWN/FATAL).

Parameters:

pid (int) – Process ID of the worker to clean up

Return type:

None

create_worker(pid, script, init_args)[source]

Create a new ScriptWorker process.

Parameters:
  • pid (int) – Process ID for the worker

  • script (GitScript) – Script to execute

  • init_args (ProcedureInput) – Initialization arguments for the script

Return type:

None

handle_fatal(msg_src, error_msg)[source]

Handle a ScriptWorker crash - publish FAILED state and stacktrace.

Parameters:
  • msg_src (str) – Source of the fatal event (PID as string)

  • error_msg (str) – Error message/stacktrace from the crashed worker

Return type:

None

handle_management_topic(topic, kwargs)[source]

Handle a procedure.management.* topic.

Parameters:
  • topic (str) – The management topic to handle

  • kwargs (dict) – Topic arguments

handle_shutdown(msg_src)[source]

Handle SHUTDOWN event - clean up worker resources.

Parameters:

msg_src (str) – Source of the shutdown event (PID as string)

Return type:

None

send_run(pid, fn_name, run_args)[source]

Send a RUN message to a ScriptWorker.

Parameters:
  • pid (int) – Process ID of the worker

  • fn_name (str) – Function name to run

  • run_args (ProcedureInput) – Arguments for the function

Return type:

None

stop_worker(pid)[source]

Terminate a ScriptWorker and publish state change.

Parameters:

pid (int) – Process ID of the worker to stop

Return type:

None

ska_oso_oet.main.main(mp_ctx, logging_config_override=None)[source]

Create the OET components and start an event loop that dispatches messages between them.

Parameters:

logging_config_override (dict | None)

ska_oso_oet.main.main_loop(main_ctx, event_bus_queues, env_manager)[source]

Main message parsing and routing loop, extracted from main() to increase testability.

Parameters:
  • main_ctx (MainContext) – Main context for queue and process management

  • event_bus_queues (list[MPQueue]) – Queues for broadcasting to EventBusWorkers

  • env_manager (EnvironmentManager) – Environment manager for script execution environments

ska_oso_oet.ui

The ska_oso_oet.ui package contains code that present the OET interface to the outside world. In practical terms, this means the OET application’s REST interface

class ska_oso_oet.ui.Message(*args, **kwargs)[source]

Data that is published as a server-sent event.

ska_oso_oet.ui.messages(shutdown_event)[source]

A generator of Message objects created from received pubsub events

Return type:

Generator[Message, None, None]