SKA Tango Event Monitor API

ska_tango_event_monitor is a package for Tango event monitoring.

QueryEventSystem response parsing

QueryEventSystemResponse.from_json() can be used to parse the result of the QueryEventSystem() device server command into a python data class object. For example

>>> response = QueryEventSystemResponse.from_json(json.loads(admin_device_proxy.QueryEventSystem()))
class ska_tango_event_monitor.QueryEventSystemResponse(version: int, server: ServerResponse | None, client: ClientResponse | None)

The root object returned by the QueryEventSystem command.

client: ClientResponse | None

Client-side state response. None if the client has never subscribed to an event.

classmethod from_json(json_object: str | dict[str, Any]) QueryEventSystemResponse

Convert the JSON response from QueryEventSystem into this dataclass.

See QueryEventSystemResponse schema for the expected schema for the json_object.

Parameters:

json_object – Data to parse.

server: ServerResponse | None

Server-side response. None if server not setup for ZMQ events.

version: int

The schema version (currently 1).

class ska_tango_event_monitor.ServerResponse(event_counters: dict[str, int], perf: list[ServerPerfSample] | None)

Response from the ZmqEventSupplier (server-side).

event_counters: dict[str, int]

A map of event stream names to the number of events pushed on that stream.

perf: list[ServerPerfSample] | None

Performance samples.

None if performance monitoring has not been enabled with EnableEventSystemPerfMon().

class ska_tango_event_monitor.ServerPerfSample(micros_since_last_event: int | None, push_event_micros: int)

Performance monitoring data for a single server-side event.

micros_since_last_event: int | None

Microseconds since the last event was sampled, or null if first sample.

None for the first sample since performance monitoring started.

push_event_micros: int

Microseconds it took for the event to be handed off to ZMQ.

class ska_tango_event_monitor.ClientResponse(event_callbacks: dict[str, EventCallback], not_connected: list[DisconnectedEventStream], event_channels: dict[str, EventChannel], perf: list[ClientPerfSample] | None)

Response from the ZmqEventConsumer (client-side).

event_callbacks: dict[str, EventCallback]

A map of event topics to their callback data.

event_channels: dict[str, EventChannel]

A map of device servers to their ZMQ connection details.

not_connected: list[DisconnectedEventStream]

A list of event streams currently being connected to for the first time.

perf: list[ClientPerfSample] | None

Performance samples.

None if performance monitoring has not been enabled with EnableEventSystemPerfMon().

class ska_tango_event_monitor.EventCallback(channel_name: str, callback_count: int, server_counter: int, event_count: int, missed_event_count: int, discarded_event_count: int, last_resubscribed: str | None)

Data describing a specific event subscription callback.

callback_count: int

Number of user callbacks interested in the event topic.

channel_name: str

Name of the device server supplying the event topic.

discarded_event_count: int

The number of events discarded for this topic.

This corresponds to a duplicate event being sent by the ZMQ socket and is invisible to the application.

event_count: int

The number of events received and not discarded for this topic.

last_resubscribed: str | None

Timestamp of the last resubscription, or None if using the initial subscription.

missed_event_count: int

The number of times the ZmqEventConsumer detected missed events.

This is accompanied by an error event.

server_counter: int

The last counter received from the device server.

class ska_tango_event_monitor.DisconnectedEventStream(device: str, attribute: str, event_type: str, last_heartbeat: str, tango_host: str | None)

Represents an event stream the consumer has not yet connected to.

attribute: str

The name of the attribute associated with the event.

device: str

The name of the Tango device attempting to be connected to.

event_type: str

The type of the event.

last_heartbeat: str

The timestamp of the last attempt to connect to the device.

tango_host: str | None

The TANGO_HOST used to locate the device, or None if dbase=no.

class ska_tango_event_monitor.EventChannel(endpoint: str)

Describes a ZMQ endpoint connected to a device server.

endpoint: str

The ZMQ endpoint URI the ZmqEventConsumer connects to for this server.

class ska_tango_event_monitor.ClientPerfSample(attr_name: str, micros_since_last_event: int | None, sleep_micros: int, process_micros: int, first_callback_latency_micros: int | None, callback_count: int, wake_count: int, discarded: bool)

Performance monitoring data for a single client-side event.

attr_name: str

First 31 characters of the attribute name.

Empty if the event is not associated with an attribute or the event was discarded.

callback_count: int

Number of user callbacks called to process this event.

discarded: bool

True if the event was discarded before processing.

This occurs for events with a duplicate server counter.

first_callback_latency_micros: int | None

Microseconds between server pushing the event and the first client callback processing the event.

micros_since_last_event: int | None

Microseconds since the last event was sampled.

None for the first sample since sample since performance monitoring started.

process_micros: int

Microseconds taken to process the event, including all user callbacks.

sleep_micros: int

Microseconds the ZmqEventConsumer thread slept before receiving this event.

wake_count: int

Number of times the ZmqEventConsumer thread woke while waiting for this event.

Summary computation

ResponseChangeSummary.from_responses() can be used to compare subsequent responses from the QueryEventSystem() device server command. For example, to compute the changes over a 10 second period:

>>> first = QueryEventSystemResponse.from_json(json.loads(admin_device_proxy.QueryEventSystem()))
>>> time.sleep(10)
>>> second = QueryEventSystemResponse.from_json(json.loads(admin_device_proxy.QueryEventSystem()))
>>> change_summary = ResponseChangeSummary.from_responses(seconds, first)
class ska_tango_event_monitor.ResponseChangeSummary(server: ServerSummary | None, client: ClientSummary | None)

High-level summary of changes and performance for both server and client.

client: ClientSummary | None

Summary of client-side differences and performance.

classmethod from_responses(new: QueryEventSystemResponse, old: QueryEventSystemResponse | None) ResponseChangeSummary

Create a change summary from a pair of responses.

Parameters:
  • new – The latest response from the QueryEventSystem command.

  • old – The previous response from the QueryEventSystem command, or None if this is the first response received.

Returns:

A summary of the differences between old and new.

server: ServerSummary | None

Summary of server-side differences and performance.

class ska_tango_event_monitor.ServerSummary(diff: ServerDiff, perf: ServerPerfSummary | None)

Combined summary of server-side structural changes and performance metrics.

diff: ServerDiff

The differences in publishers compared to a previous state.

classmethod from_responses(new: ServerResponse | None, old: ServerResponse | None) ServerSummary | None

Create a chnage summary from a pair of server responses.

Parameters:
  • new – The latest response from the QueryEventSystem command, or None if no server data.

  • old – The previous response from the QueryEventSystem command, or None if no server data.

Returns:

A summary of the differences between old and new.

perf: ServerPerfSummary | None

The aggregated performance metrics for the current state.

class ska_tango_event_monitor.ServerDiff(added_publishers: dict[str, int], changed_publishers: dict[str, tuple[int, int]], removed_publishers: list[str])

Summarizes the differences in server publishers between two states.

added_publishers: dict[str, int]

Mapping of newly detected event streams to the count of the next event to be sent by the server.

changed_publishers: dict[str, tuple[int, int]]

Mapping of changed event streams to tuples of (previous_count, current_count) for the next event to be sent by the sever.

removed_publishers: list[str]

List of publisher names that are no longer present.

class ska_tango_event_monitor.ServerPerfSummary(event_gaps: StatsSummary | None, push_time: StatsSummary)

Aggregated statistical summaries of server-side performance metrics.

event_gaps: StatsSummary | None

Statistical summary of the time intervals between generated events.

classmethod from_samples(perf: list[ServerPerfSample] | None) ServerPerfSummary | None

Return statistical summary of server performance data.

push_time: StatsSummary

Statistical summary of the time taken to send events to the network.

class ska_tango_event_monitor.ClientSummary(diff: ClientDiff, perf: ClientPerfSummary | None)

Combined summary of client-side structural changes and performance metrics.

diff: ClientDiff

The differences in subscriptions compared to a previous state.

classmethod from_responses(new: ClientResponse | None, old: ClientResponse | None) ClientSummary | None

Create a change summary from a pair of server responses.

Parameters:
  • new – The latest response from the QueryEventSystem command, or None if no client data.

  • old – The previous response from the QueryEventSystem command, or None if no client data.

Returns:

A summary of the differences between old and new.

perf: ClientPerfSummary | None

The aggregated performance metrics for the current state.

class ska_tango_event_monitor.ClientDiff(added_subscriptions: dict[str, NewSubscriptionState], changed_subscriptions: dict[str, ChangedSubscriptionState], removed_subscriptions: list[str], added_not_connected: list[str], removed_not_connected: list[str])

Summarizes the differences in client subscriptions between two states.

added_not_connected: list[str]

List of new event streams the client has not successfully connected to yet.

added_subscriptions: dict[str, NewSubscriptionState]

Mapping of newly detected event streams.

changed_subscriptions: dict[str, ChangedSubscriptionState]

Mapping of changed event streams.

removed_not_connected: list[str]

List of event streams the client is no longer attempting to initially connect to.

This could be because the event stream has been connected to successfully, or because the client application has “given up”, i.e. has called unsubscribe_event.()

removed_subscriptions: list[str]

List of event streams that are no longer subscribed to.

class ska_tango_event_monitor.ClientPerfSummary(event_gaps: StatsSummary | None, sleep_time: StatsSummary, latency: StatsSummary | None, callback_count: StatsSummary, wake_count: StatsSummary, processing_time: dict[str, StatsSummary])

Aggregated statistical summaries of client-side performance metrics.

callback_count: StatsSummary

Summary of the number of user callbacks executed per event.

event_gaps: StatsSummary | None

Summary of the time intervals between consecutive events.

classmethod from_samples(perf: list[ClientPerfSample] | None) ClientPerfSummary | None

Return statistical summary of client performance data.

latency: StatsSummary | None

Summary of the delay between the server sending and client processing.

processing_time: dict[str, StatsSummary]

Mapping of attribute names to a processing time summary.

sleep_time: StatsSummary

Summary of the time the consumer thread spent sleeping.

wake_count: StatsSummary

Summary of the number of times the consumer thread woke up.

class ska_tango_event_monitor.NewSubscriptionState(event_count: int, server_counter: int, callback_count: int, missed_event_count: int, discarded_event_count: int)

Snapshot of a subscription that has been newly detected.

callback_count: int

The number of local callbacks registered for this subscription.

discarded_event_count: int

The number of discarded events detected for this subscription.

This corresponds to duplicate events being sent by the ZMQ socket and is invisible to the application.

event_count: int

The total number of events received for this subscription.

missed_event_count: int

The number of missed events detected for this subscription.

This corresponds to a jump in the server_count detected by the client and is accompanied by an error event.

server_counter: int

The most recent counter value received from the server.

class ska_tango_event_monitor.ChangedSubscriptionState(event_count: tuple[int, int] | None, server_counter: tuple[int, int] | None, callback_count: tuple[int, int] | None, missed_event_count: tuple[int, int] | None, discard_event_count: tuple[int, int] | None)

Snapshot of a subscription that has changed between two observations.

callback_count: tuple[int, int] | None

Tuple of (previous, current) for registered callbacks.

None if there has been no change.

discard_event_count: tuple[int, int] | None

Tuple of (previous, current) for discarded event counts.

None if there has been no change.

event_count: tuple[int, int] | None

Tuple of (previous, current) for the number of processed events.

None if there has been no change.

missed_event_count: tuple[int, int] | None

Tuple of (previous, current) for detected missed event counts.

None if there has been no change.

server_counter: tuple[int, int] | None

Tuple of (previous, current) for the most recently received counter value.

None if there has been no change.

class ska_tango_event_monitor.StatsSummary(mean: float, stderr: float, quantiles: Quantiles, count: int)

Statistical summary of a data set.

count: int

The number of elements in the data set.

classmethod from_data(data: list[float] | list[int]) StatsSummary

Return a statistical summary of the data set.

The dataset must have at least one element.

Parameters:

data – The data set to summarize

Returns:

The summary

Raises:

ValueError – if len(data) == 0

mean: float

The arithmetic mean of the data set.

quantiles: Quantiles

The distribution of the data set.

stderr: float

The standard error on the mean.

class ska_tango_event_monitor.Quantiles(median: float, deciles: tuple[float, float] | None, percentiles: tuple[float, float] | None, extrema: tuple[float, float])

Data quantiles describing the distribution of the data set.

deciles: tuple[float, float] | None

The 10th and 90th percentiles, or None if fewer than 10 elements.

extrema: tuple[float, float]

The minimum and maximum of the data set.

median: float

The 50th percentile; the middle value of the sorted data set.

percentiles: tuple[float, float] | None

The 1st and 99th percentiles, or None if fewer than 100 elements.