emulator_engine.services.messaging package

Messaging/event management service implementations.

class emulator_engine.services.messaging.ApiQueueService(api_request_queue: str = '', api_callback_queue: str = '', subscriber_timeout: int = None, router_impl: RouterImpl = None, name: str | None = None)[source]

Bases: BaseMessageService

Message service which listens for API calls and processes them.

Implements BaseMessageService.

Parameters:
  • api_request_queue (str) – A unique identifier to give the API request queue.

  • api_callback_queue (str) – A unique identifier to give the API callback queue.

  • subscriber_timeout (int) – If specified, a timeout value in seconds after which the service will automatically unsubscribe from events. Will never timeout if not specified.

  • router_impl (RouterImpl) – If specified, a router implementation to supply on initialization.

  • name (str) – A name to give this service (primarily for logging purposes).

flush() None[source]

Flush all queues that this service is aware of.

Loops through every queue name available and attempts to flush it. Any queue names that do not exist or cannot be flushed are ignored.

force_stop() None[source]

Force stop the API listener.

static get_rpc_response(request_queue: str, callback_queue: str, request_body: InternalRestRequest) InternalRestResponse[source]

Sends an RPC request via RabbitMQ, then waits for and returns the response.

static init_for_subcontroller(bitstream_emulator_id: str, ip_block_id: str) ApiQueueService[source]

Construct an instance of this service for a specific subcontroller.

This will automatically populate queue names based on the controller and subcontroller IDs.

Parameters:
  • bitstream_emulator_id (str) – The (unique) bitstream emulator ID.

  • ip_block_id (str) – The (unique) IP block ID associated with this subcontroller.

  • **kwargs – Other keyword arguments to pass to the service constructor/setup function.

Returns:

The created service instance for the subcontroller.

Return type:

ApiQueueService

publish(**kwargs) None[source]

Publish an event.

Parameters:
  • event (BaseEvent) – The event to publish.

  • **kwargs – Arbitrary keyword arguments which may be refined by individual subclass implementations.

subscribe(**kwargs) None[source]

Subscribe to API calls.

Starts the main listener thread.

unsubscribe() None[source]

Unsubscribe from events.

Signals the consumer thread to exit its main loop and waits for the thread to join.

class emulator_engine.services.messaging.EventQueueService(pulse_interval: float, bitstream_emulator_id: str = '', ip_block_id: str = '', downstream_ip_block_ids: list[str] = [], signal_output_map: dict[str, str] = {}, subscriber_timeout: int = None, expected_input_pulses: int = 1, name: str | None = None)[source]

Bases: BaseMessageService

Message service which listens for RabbitMQ events, and processes and/or forwards them as necessary.

Implements BaseMessageService.

Parameters:
  • pulse_interval (float) – The interval in seconds between pulses being used by the system. Used to calibrate certain internal timings.

  • bitstream_emulator_id (str) – The bitstream emulator ID associated with this service.

  • ip_block_id (str) – The IP block ID associated with this service.

  • downstream_ip_block_ids (list[str]) – A list of the downstream IP block IDs which are directly connected to this one.

  • signal_output_map (dict[str, str], optional) – A mapping from signal event tags output by this emulator to intended destination IP block IDs. Default is an empty dict.

  • subscriber_timeout (int) – If specified, a timeout value in seconds after which the service will automatically unsubscribe from events. Will never timeout if not specified.

  • expected_input_pulses (int, optional) – The number of parallel input pulses this block should listen for before processing other events. Default is 1.

  • name (str) – A name to give this service (primarily for logging purposes).

flush() None[source]

Flush all queues that this service is aware of.

Loops through every queue name available and attempts to flush it. Any queue names that do not exist or cannot be flushed are ignored.

publish(event: BaseEvent, exchange: str, routing_key: str = '', channel: BlockingChannel | None = None, **kwargs) None[source]

Publish an event.

Encodes an event and publishes it using the provided exchange and/or routing key.

Parameters:
  • event (BaseEvent) – The event to publish.

  • exchange (str) – The exchange to publish to.

  • routing_key (str, optional) – The routing key to use for publishing. Default is the empty string.

  • channel (str, optional) – The channel to use for publishing. Default is None (will create a new connection and channel).

  • **kwargs – Arbitrary keyword arguments.

publish_pulse_with_signal_event_group(event_group: PulseWithSignalEventGroup, exchange: str, routing_key: str = '', channel: BlockingChannel | None = None, **kwargs) None[source]

Publish an event group.

Encodes an event group and publishes it using the provided exchange and/or routing key.

Parameters:
  • event_group (PulseWithSignalEventGroup) – The event group to publish.

  • exchange (str) – The exchange to publish to.

  • routing_key (str, optional) – The routing key to use for publishing. Default is the empty string.

  • channel (str, optional) – The channel to use for publishing. Default is None (will create a new connection and channel).

  • **kwargs – Arbitrary keyword arguments.

subscribe(**kwargs) None[source]

Subscribe to events.

Starts the main listener thread.

Parameters:
  • event_handler (Callable[[Any], Any]) – A callback function to handle an event.

  • **kwargs – Arbitrary keyword arguments.

unsubscribe() None[source]

Unsubscribe from events.

Signals the consumer thread to exit its main loop and waits for the thread to join.