Source code for ska_oso_oet.event.topics

# Relax pylint in the face of some pypubsub requirements. Pypubsub topics use
# msg_src rather than self, and they define a topic hierarchy rather than a
# class hierarchy where implementation is required.
#
# pylint: disable=no-self-argument,too-few-public-methods


[docs] class request: """ Root topic for events emitted when a user or system component has made a request. """
[docs] class procedure: """ Topic for user requests related to procedures. """
[docs] class create: """ Emitted when a request to create a procedure is received. """
[docs] def msgDataSpec(msg_src, request_id, cmd): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - cmd: PrepareProcessCommand containing request parameters """
[docs] class list: """ Emitted when a request to enumerate all procedures is received. """
[docs] def msgDataSpec(msg_src, request_id, pids=None): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - pids: Procedure IDs to list """
[docs] class start: """ Emitted when a request to start procedure execution is received. """
[docs] def msgDataSpec(msg_src, request_id, cmd): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - cmd: StartProcessCommand containing request parameters """
[docs] class stop: """ Emitted when a request to stop a procedure is received. """
[docs] def msgDataSpec(msg_src, request_id, cmd): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - cmd: StartProcessCommand containing request parameters """
[docs] class activity: """ Topic for user requests related to activities. """
[docs] class run: """ Emitted when a request to run an activity is received. """
[docs] def msgDataSpec(msg_src, request_id, cmd): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - cmd: ActivityCommand containing request parameters """
[docs] class list: """ Emitted when a request to enumerate all activities is received. """
[docs] def msgDataSpec(msg_src, request_id, activity_ids=None): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - activity_ids: Activity IDs to list. """
[docs] class operator: """ Topic for user requests related to operator overrides. """
[docs] class override: """ Topic for requests to query operator override state. """
[docs] class state: """ Emitted when a request to query override state is received. """
[docs] def msgDataSpec(msg_src, request_id): """ - msg_src: component from which the request originated - request_id: unique identifier for this request """
[docs] class procedure: """ Root topic for events related to procedures. """
[docs] class management: """ Topic for management commands between ProcessManager and main_loop. """
[docs] class create: """ Request to create a ScriptWorker process. """
[docs] def msgDataSpec(msg_src, pid, script, init_args): """ - msg_src: component from which the request originated - pid: PID assigned by ProcessManager - script: ExecutableScript to execute - init_args: ProcedureInput initialisation arguments """
[docs] class created: """ Response to procedure.management.create request. """
[docs] def msgDataSpec(msg_src, pid, success, error=None): """ - msg_src: component from which the response originated - pid: PID from the create request - success: bool indicating if creation succeeded - error: str | None - full exception traceback if failed """
[docs] class run: """ Request to run a function in a prepared procedure. """
[docs] def msgDataSpec(msg_src, pid, fn_name, run_args): """ - msg_src: component from which the request originated - pid: PID of procedure to run - fn_name: name of function to call - run_args: ProcedureInput runtime arguments """
[docs] class stop: """ Request to stop a running procedure. """
[docs] def msgDataSpec(msg_src, pid): """ - msg_src: component from which the request originated - pid: PID of procedure to stop """
[docs] class lifecycle: """ Topic for events related to procedure lifecycle. """
[docs] class statechange: """ Emitted when a procedure status changes. To be amalgamated and rationalised with other lifecycle events to better handle rerunnable scripts. """
[docs] def msgDataSpec(msg_src, new_state): """ - msg_src: component from which the request originated - new_state: new state """
[docs] class stacktrace: """ Announces cause of a Procedure failure. """
[docs] def msgDataSpec(msg_src, stacktrace): """ - msg_src: component from which the request originated - stacktrace: stacktrace as a string """
[docs] class complete: """ Emitted when a Procedure has completed successfully and is no longer available to be called. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: ID of Procedure that completed """
[docs] class created: """ Emitted when a procedure is created, i.e., a script is loaded and Python interpreter initialised. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: ProcedureSummary characterising the created procedure """
[docs] class ready: """ Emitted when a procedure is ready to accept run commands, i.e., script function has completed or init() was not required. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: ProcedureSummary characterising the ready procedure """
[docs] class started: """ Emitted when any user function in a procedure is running, i.e., script init is called """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: ProcedureSummary characterising the created procedure """
[docs] class stopped: """ Emitted when a procedure stops, e.g., script completes or is aborted. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: ProcedureSummary characterising the created procedure """
[docs] class failed: """ Emitted when a procedure fails. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the event originated - request_id: unique identifier for this event - result: ProcedureSummary characterising the failed procedure """
[docs] class pool: """ Topic for events on characterisation of the process pool. """
[docs] class list: """ Emitted when current procedures and their status is enumerated. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: list of ProcedureSummary instances characterising procedures and their states. """
[docs] class activity: """ Root topic for events related to activities. """
[docs] class lifecycle: """ Topic for events related to activity lifecycle. """
[docs] class statechange: """ Emitted when an activity status changes. """
[docs] def msgDataSpec(msg_src, new_state): """ - msg_src: component from which the request originated - new_state: new state """
[docs] class running: """ Emitted when an activity starts running. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: ActivitySummary characterising the running activity """
[docs] class pool: """ Topic for events on characterisation of the activity pool. """
[docs] class list: """ Emitted when current activities and their status is enumerated. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the request originated - request_id: unique identifier for this request - result: list of ActivitySummary instances characterising activites and their states. """
[docs] class sb: """ Root topic for events emitted relating to Scheduling Blocks """
[docs] class lifecycle: """ Topic for events related to Scheduling Block lifecycle """
[docs] class started: """ Emitted when an observation is started """
[docs] def msgDataSpec(msg_src, sbi_id): """ - msg_src: component from which the request originated - sbi_id: Scheduling Block Instance ID """
[docs] class operator: """ Root topic for operator override events. """
[docs] class wait_for_qa_ready: """ Topic for QA wait-for-ready override events. """
[docs] class override: """ One-shot signal to override QA wait-for-ready for the current scan. """
[docs] def msgDataSpec(msg_src): """ - msg_src: component from which the request originated """
[docs] class enable: """ Enable QA wait-for-ready override for all scripts on this subarray. Persists until disabled. """
[docs] def msgDataSpec(msg_src): """ - msg_src: component from which the request originated """
[docs] class disable: """ Disable QA wait-for-ready override. """
[docs] def msgDataSpec(msg_src): """ - msg_src: component from which the request originated """
[docs] class override: """ Topic for operator override state responses. """
[docs] class state: """ Emitted when the current override state is queried. """
[docs] def msgDataSpec(msg_src, request_id, result): """ - msg_src: component from which the response originated - request_id: unique identifier for this request - result: ScriptContext with current override state """
[docs] class test: """ Root topic for test events. These topics are used by tests to synchronize with script execution without requiring Barrier objects (which cannot be pickled). """
[docs] class script: """ Topic for test events from script execution. """
[docs] class init_started: """ Emitted when a test script's init() function starts. Note: No msgDataSpec defined as these messages have no required args. """
[docs] class main_started: """ Emitted when a test script's main() function starts. Note: No msgDataSpec defined as these messages have no required args. """
# TODO: Once Activity state management is implemented, send these messages # when activity execution has completed.
[docs] class finished: """ Emitted when an observation is finished """
[docs] class succeeded: """ Emitted when an observation is finished successfully """
[docs] def msgDataSpec(msg_src, sbi_id): """ - msg_src: component from which the request originated - sbi_id: Scheduling Block Instance ID """
[docs] class failed: """ Emitted when an error was encountered during observation execution """
[docs] def msgDataSpec(msg_src, sbi_id): """ - msg_src: component from which the request originated - sbi_id: Scheduling Block Instance ID """