SKA Tango Event Monitor API
ska_tango_event_monitor is a package for Tango event monitoring.
QueryEventSystem response parsing
QueryEventSystemResponse.from_json() can be used to parse the result of the QueryEventSystem() device server command into a python data class object. For example
>>> response = QueryEventSystemResponse.from_json(json.loads(admin_device_proxy.QueryEventSystem()))
- class ska_tango_event_monitor.QueryEventSystemResponse(version: int, server: ServerResponse | None, client: ClientResponse | None)
The root object returned by the QueryEventSystem command.
- client: ClientResponse | None
Client-side state response. None if the client has never subscribed to an event.
- classmethod from_json(json_object: str | dict[str, Any]) QueryEventSystemResponse
Convert the JSON response from QueryEventSystem into this dataclass.
See QueryEventSystemResponse schema for the expected schema for the
json_object.- Parameters:
json_object – Data to parse.
- server: ServerResponse | None
Server-side response. None if server not setup for ZMQ events.
- version: int
The schema version (currently 1).
- class ska_tango_event_monitor.ServerResponse(event_counters: dict[str, int], perf: list[ServerPerfSample] | None)
Response from the ZmqEventSupplier (server-side).
- event_counters: dict[str, int]
A map of event stream names to the number of events pushed on that stream.
- perf: list[ServerPerfSample] | None
Performance samples.
Noneif performance monitoring has not been enabled withEnableEventSystemPerfMon().
- class ska_tango_event_monitor.ServerPerfSample(micros_since_last_event: int | None, push_event_micros: int)
Performance monitoring data for a single server-side event.
- micros_since_last_event: int | None
Microseconds since the last event was sampled, or null if first sample.
Nonefor the first sample since performance monitoring started.
- push_event_micros: int
Microseconds it took for the event to be handed off to ZMQ.
- class ska_tango_event_monitor.ClientResponse(event_callbacks: dict[str, EventCallback], not_connected: list[DisconnectedEventStream], event_channels: dict[str, EventChannel], perf: list[ClientPerfSample] | None)
Response from the ZmqEventConsumer (client-side).
- event_callbacks: dict[str, EventCallback]
A map of event topics to their callback data.
- event_channels: dict[str, EventChannel]
A map of device servers to their ZMQ connection details.
- not_connected: list[DisconnectedEventStream]
A list of event streams currently being connected to for the first time.
- perf: list[ClientPerfSample] | None
Performance samples.
Noneif performance monitoring has not been enabled withEnableEventSystemPerfMon().
- class ska_tango_event_monitor.EventCallback(channel_name: str, callback_count: int, server_counter: int, event_count: int, missed_event_count: int, discarded_event_count: int, last_resubscribed: str | None)
Data describing a specific event subscription callback.
- callback_count: int
Number of user callbacks interested in the event topic.
- channel_name: str
Name of the device server supplying the event topic.
- discarded_event_count: int
The number of events discarded for this topic.
This corresponds to a duplicate event being sent by the ZMQ socket and is invisible to the application.
- event_count: int
The number of events received and not discarded for this topic.
- last_resubscribed: str | None
Timestamp of the last resubscription, or None if using the initial subscription.
- missed_event_count: int
The number of times the ZmqEventConsumer detected missed events.
This is accompanied by an error event.
- server_counter: int
The last counter received from the device server.
- class ska_tango_event_monitor.DisconnectedEventStream(device: str, attribute: str, event_type: str, last_heartbeat: str, tango_host: str | None)
Represents an event stream the consumer has not yet connected to.
- attribute: str
The name of the attribute associated with the event.
- device: str
The name of the Tango device attempting to be connected to.
- event_type: str
The type of the event.
- last_heartbeat: str
The timestamp of the last attempt to connect to the device.
- tango_host: str | None
The TANGO_HOST used to locate the device, or
Noneifdbase=no.
- class ska_tango_event_monitor.EventChannel(endpoint: str)
Describes a ZMQ endpoint connected to a device server.
- endpoint: str
The ZMQ endpoint URI the ZmqEventConsumer connects to for this server.
- class ska_tango_event_monitor.ClientPerfSample(attr_name: str, micros_since_last_event: int | None, sleep_micros: int, process_micros: int, first_callback_latency_micros: int | None, callback_count: int, wake_count: int, discarded: bool)
Performance monitoring data for a single client-side event.
- attr_name: str
First 31 characters of the attribute name.
Empty if the event is not associated with an attribute or the event was discarded.
- callback_count: int
Number of user callbacks called to process this event.
- discarded: bool
Trueif the event was discarded before processing.This occurs for events with a duplicate server counter.
- first_callback_latency_micros: int | None
Microseconds between server pushing the event and the first client callback processing the event.
- micros_since_last_event: int | None
Microseconds since the last event was sampled.
Nonefor the first sample since sample since performance monitoring started.
- process_micros: int
Microseconds taken to process the event, including all user callbacks.
- sleep_micros: int
Microseconds the ZmqEventConsumer thread slept before receiving this event.
- wake_count: int
Number of times the ZmqEventConsumer thread woke while waiting for this event.
Summary computation
ResponseChangeSummary.from_responses() can be used to compare subsequent responses from the QueryEventSystem() device server command. For example, to compute the changes over a 10 second period:
>>> first = QueryEventSystemResponse.from_json(json.loads(admin_device_proxy.QueryEventSystem()))
>>> time.sleep(10)
>>> second = QueryEventSystemResponse.from_json(json.loads(admin_device_proxy.QueryEventSystem()))
>>> change_summary = ResponseChangeSummary.from_responses(seconds, first)
- class ska_tango_event_monitor.ResponseChangeSummary(server: ServerSummary | None, client: ClientSummary | None)
High-level summary of changes and performance for both server and client.
- client: ClientSummary | None
Summary of client-side differences and performance.
- classmethod from_responses(new: QueryEventSystemResponse, old: QueryEventSystemResponse | None) ResponseChangeSummary
Create a change summary from a pair of responses.
- Parameters:
new – The latest response from the QueryEventSystem command.
old – The previous response from the QueryEventSystem command, or None if this is the first response received.
- Returns:
A summary of the differences between old and new.
- server: ServerSummary | None
Summary of server-side differences and performance.
- class ska_tango_event_monitor.ServerSummary(diff: ServerDiff, perf: ServerPerfSummary | None)
Combined summary of server-side structural changes and performance metrics.
- diff: ServerDiff
The differences in publishers compared to a previous state.
- classmethod from_responses(new: ServerResponse | None, old: ServerResponse | None) ServerSummary | None
Create a chnage summary from a pair of server responses.
- Parameters:
new – The latest response from the QueryEventSystem command, or None if no server data.
old – The previous response from the QueryEventSystem command, or None if no server data.
- Returns:
A summary of the differences between old and new.
- perf: ServerPerfSummary | None
The aggregated performance metrics for the current state.
- class ska_tango_event_monitor.ServerDiff(added_publishers: dict[str, int], changed_publishers: dict[str, tuple[int, int]], removed_publishers: list[str])
Summarizes the differences in server publishers between two states.
- added_publishers: dict[str, int]
Mapping of newly detected event streams to the count of the next event to be sent by the server.
- changed_publishers: dict[str, tuple[int, int]]
Mapping of changed event streams to tuples of (previous_count, current_count) for the next event to be sent by the sever.
- removed_publishers: list[str]
List of publisher names that are no longer present.
- class ska_tango_event_monitor.ServerPerfSummary(event_gaps: StatsSummary | None, push_time: StatsSummary)
Aggregated statistical summaries of server-side performance metrics.
- event_gaps: StatsSummary | None
Statistical summary of the time intervals between generated events.
- classmethod from_samples(perf: list[ServerPerfSample] | None) ServerPerfSummary | None
Return statistical summary of server performance data.
- push_time: StatsSummary
Statistical summary of the time taken to send events to the network.
- class ska_tango_event_monitor.ClientSummary(diff: ClientDiff, perf: ClientPerfSummary | None)
Combined summary of client-side structural changes and performance metrics.
- diff: ClientDiff
The differences in subscriptions compared to a previous state.
- classmethod from_responses(new: ClientResponse | None, old: ClientResponse | None) ClientSummary | None
Create a change summary from a pair of server responses.
- Parameters:
new – The latest response from the QueryEventSystem command, or None if no client data.
old – The previous response from the QueryEventSystem command, or None if no client data.
- Returns:
A summary of the differences between old and new.
- perf: ClientPerfSummary | None
The aggregated performance metrics for the current state.
- class ska_tango_event_monitor.ClientDiff(added_subscriptions: dict[str, NewSubscriptionState], changed_subscriptions: dict[str, ChangedSubscriptionState], removed_subscriptions: list[str], added_not_connected: list[str], removed_not_connected: list[str])
Summarizes the differences in client subscriptions between two states.
- added_not_connected: list[str]
List of new event streams the client has not successfully connected to yet.
- added_subscriptions: dict[str, NewSubscriptionState]
Mapping of newly detected event streams.
- changed_subscriptions: dict[str, ChangedSubscriptionState]
Mapping of changed event streams.
- removed_not_connected: list[str]
List of event streams the client is no longer attempting to initially connect to.
This could be because the event stream has been connected to successfully, or because the client application has “given up”, i.e. has called
unsubscribe_event.()
- removed_subscriptions: list[str]
List of event streams that are no longer subscribed to.
- class ska_tango_event_monitor.ClientPerfSummary(event_gaps: StatsSummary | None, sleep_time: StatsSummary, latency: StatsSummary | None, callback_count: StatsSummary, wake_count: StatsSummary, processing_time: dict[str, StatsSummary])
Aggregated statistical summaries of client-side performance metrics.
- callback_count: StatsSummary
Summary of the number of user callbacks executed per event.
- event_gaps: StatsSummary | None
Summary of the time intervals between consecutive events.
- classmethod from_samples(perf: list[ClientPerfSample] | None) ClientPerfSummary | None
Return statistical summary of client performance data.
- latency: StatsSummary | None
Summary of the delay between the server sending and client processing.
- processing_time: dict[str, StatsSummary]
Mapping of attribute names to a processing time summary.
- sleep_time: StatsSummary
Summary of the time the consumer thread spent sleeping.
- wake_count: StatsSummary
Summary of the number of times the consumer thread woke up.
- class ska_tango_event_monitor.NewSubscriptionState(event_count: int, server_counter: int, callback_count: int, missed_event_count: int, discarded_event_count: int)
Snapshot of a subscription that has been newly detected.
- callback_count: int
The number of local callbacks registered for this subscription.
- discarded_event_count: int
The number of discarded events detected for this subscription.
This corresponds to duplicate events being sent by the ZMQ socket and is invisible to the application.
- event_count: int
The total number of events received for this subscription.
- missed_event_count: int
The number of missed events detected for this subscription.
This corresponds to a jump in the server_count detected by the client and is accompanied by an error event.
- server_counter: int
The most recent counter value received from the server.
- class ska_tango_event_monitor.ChangedSubscriptionState(event_count: tuple[int, int] | None, server_counter: tuple[int, int] | None, callback_count: tuple[int, int] | None, missed_event_count: tuple[int, int] | None, discard_event_count: tuple[int, int] | None)
Snapshot of a subscription that has changed between two observations.
- callback_count: tuple[int, int] | None
Tuple of (previous, current) for registered callbacks.
Noneif there has been no change.
- discard_event_count: tuple[int, int] | None
Tuple of (previous, current) for discarded event counts.
Noneif there has been no change.
- event_count: tuple[int, int] | None
Tuple of (previous, current) for the number of processed events.
Noneif there has been no change.
- missed_event_count: tuple[int, int] | None
Tuple of (previous, current) for detected missed event counts.
Noneif there has been no change.
- server_counter: tuple[int, int] | None
Tuple of (previous, current) for the most recently received counter value.
Noneif there has been no change.
- class ska_tango_event_monitor.StatsSummary(mean: float, stderr: float, quantiles: Quantiles, count: int)
Statistical summary of a data set.
- count: int
The number of elements in the data set.
- classmethod from_data(data: list[float] | list[int]) StatsSummary
Return a statistical summary of the data set.
The dataset must have at least one element.
- Parameters:
data – The data set to summarize
- Returns:
The summary
- Raises:
ValueError – if
len(data) == 0
- mean: float
The arithmetic mean of the data set.
- stderr: float
The standard error on the mean.
- class ska_tango_event_monitor.Quantiles(median: float, deciles: tuple[float, float] | None, percentiles: tuple[float, float] | None, extrema: tuple[float, float])
Data quantiles describing the distribution of the data set.
- deciles: tuple[float, float] | None
The 10th and 90th percentiles, or None if fewer than 10 elements.
- extrema: tuple[float, float]
The minimum and maximum of the data set.
- median: float
The 50th percentile; the middle value of the sorted data set.
- percentiles: tuple[float, float] | None
The 1st and 99th percentiles, or None if fewer than 100 elements.