emulator_engine.services.messaging package
Messaging/event management service implementations.
- class emulator_engine.services.messaging.ApiQueueService(api_request_queue: str = '', api_callback_queue: str = '', subscriber_timeout: int = None, router_impl: RouterImpl = None, name: str | None = None)[source]
Bases:
BaseMessageServiceMessage service which listens for API calls and processes them.
Implements
BaseMessageService.- Parameters:
api_request_queue (
str) – A unique identifier to give the API request queue.api_callback_queue (
str) – A unique identifier to give the API callback queue.subscriber_timeout (
int) – If specified, a timeout value in seconds after which the service will automatically unsubscribe from events. Will never timeout if not specified.router_impl (
RouterImpl) – If specified, a router implementation to supply on initialization.name (
str) – A name to give this service (primarily for logging purposes).
- flush() None[source]
Flush all queues that this service is aware of.
Loops through every queue name available and attempts to flush it. Any queue names that do not exist or cannot be flushed are ignored.
- static get_rpc_response(request_queue: str, callback_queue: str, request_body: InternalRestRequest) InternalRestResponse[source]
Sends an RPC request via RabbitMQ, then waits for and returns the response.
- static init_for_subcontroller(bitstream_emulator_id: str, ip_block_id: str) ApiQueueService[source]
Construct an instance of this service for a specific subcontroller.
This will automatically populate queue names based on the controller and subcontroller IDs.
- Parameters:
- Returns:
The created service instance for the subcontroller.
- Return type:
- class emulator_engine.services.messaging.EventQueueService(pulse_interval: float, bitstream_emulator_id: str = '', ip_block_id: str = '', downstream_ip_block_ids: list[str] = [], signal_output_map: dict[str, str] = {}, subscriber_timeout: int = None, expected_input_pulses: int = 1, name: str | None = None)[source]
Bases:
BaseMessageServiceMessage service which listens for RabbitMQ events, and processes and/or forwards them as necessary.
Implements
BaseMessageService.- Parameters:
pulse_interval (
float) – The interval in seconds between pulses being used by the system. Used to calibrate certain internal timings.bitstream_emulator_id (
str) – The bitstream emulator ID associated with this service.ip_block_id (
str) – The IP block ID associated with this service.downstream_ip_block_ids (
list[str]) – A list of the downstream IP block IDs which are directly connected to this one.signal_output_map (
dict[str, str], optional) – A mapping from signal event tags output by this emulator to intended destination IP block IDs. Default is an empty dict.subscriber_timeout (
int) – If specified, a timeout value in seconds after which the service will automatically unsubscribe from events. Will never timeout if not specified.expected_input_pulses (
int, optional) – The number of parallel input pulses this block should listen for before processing other events. Default is 1.name (
str) – A name to give this service (primarily for logging purposes).
- flush() None[source]
Flush all queues that this service is aware of.
Loops through every queue name available and attempts to flush it. Any queue names that do not exist or cannot be flushed are ignored.
- publish(event: BaseEvent, exchange: str, routing_key: str = '', channel: BlockingChannel | None = None, **kwargs) None[source]
Publish an event.
Encodes an event and publishes it using the provided exchange and/or routing key.
- Parameters:
event (
BaseEvent) – The event to publish.exchange (
str) – The exchange to publish to.routing_key (
str, optional) – The routing key to use for publishing. Default is the empty string.channel (
str, optional) – The channel to use for publishing. Default is None (will create a new connection and channel).**kwargs – Arbitrary keyword arguments.
- publish_pulse_with_signal_event_group(event_group: PulseWithSignalEventGroup, exchange: str, routing_key: str = '', channel: BlockingChannel | None = None, **kwargs) None[source]
Publish an event group.
Encodes an event group and publishes it using the provided exchange and/or routing key.
- Parameters:
event_group (
PulseWithSignalEventGroup) – The event group to publish.exchange (
str) – The exchange to publish to.routing_key (
str, optional) – The routing key to use for publishing. Default is the empty string.channel (
str, optional) – The channel to use for publishing. Default is None (will create a new connection and channel).**kwargs – Arbitrary keyword arguments.