.event_handling Package

Module contents

Event handlers and configuration scripts for handling and checking correct occurrence of events.

This sub package is a particular usage of the ska_ser_skallop.subscribing package to manage tango related events that occur during the execution of runtime tests. It consists of a set of connectors calling the ska_ser_skallop.subscribing package creating the necessary objects for managing tango events.

The example below illustrates the concept:

foo = 'bar'

Submodules

.base Module

Contains imported types and base objects used by event_handling package.

class EventDataInt(producer_name: str, attr_name: str, val: Optional[Any] = None, timestamp=None)[source]

Bases: object

An abstraction of event data that provides the minium set of attributes and methods necessary to work with subscription module. It is derived from the specific tango EventData so that the EventData from tango devices can also work in place.

class MessageBoard[source]

Bases: ska_ser_skallop.subscribing.base.MessageBoardBase

Encapsulates a queue containing events that gets places by events pushers onto it. Also keeps track of subscriptions and allow adding and removing of a subscription. Lastly, it keeps log of various internal messages and transitions to assist in describing the occurrence of events.

add_subscription(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, handler: ska_ser_skallop.subscribing.message_handler.MessageHandler, polling: bool = False) ska_ser_skallop.subscribing.subscription.Subscription[source]

Registers and initiate a new subscription on a given producer with a specific attribute. A subscription can be seen as a request to be notified when an event occurs on a particular attribute of an entity. In this implementation an event is seen as when the value of an attribute of that entity has changed. The role of the producer is then to notify the subscriber with two possible ways:

  1. By means of a given call to be made by a producer (call back) on a given

    object

  2. By storing the event temporarily in a buffer for later retrieval

    (polling = True)

However, both of these options achieve the same external result on the MessageBoard by resulting in events being placed on its internal queue. These events can then be retrieved later from the MessageBoard, together with information about the subscription and a given handler to consume the results of the event.

Parameters
  • producer – the object responsible for generating the events and upon which the subscription must be placed.

  • attr – the attribute (item of of interest) for which a change in value must result in a notification event

  • handler – the object responsible for consuming the contents of the event. Note this is different from the subscriber handler that will be given to the producer for handling the incoming event, in this case the handler is just an optional attrubute of the EventItem that will be placed on the queue, providing a means for consuming events as they get pulled from the buffer.

  • polling – whether the internal event generation must be done via polling or call back. In general the call back mechanism is simpler but in cases where a producer is not able to call a subscriber directly then the polling method is also available, defaults to False

Returns

returns the created subscription in cases the client code wants to use the subscription later

archived_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
board: queue.Queue
gathered_sleep_time = 0.05
get_current_items() List[ska_ser_skallop.subscribing.event_item.EventItem][source]

Get the current events contained in the buffer of a message board object.

Note this should be a method of the MessageBoard Class in skallop

Returns

A list of current event items

get_items(timeout: float = 0) Iterator[ska_ser_skallop.subscribing.event_item.EventItem][source]

Iterates over the EventItem instances placed in its internal queue as a consequence of subscriptions to Producer instances. If the queue is currently empty it will wait for a given timeout until either returning with a StopIteration or raising an ska_ser_skallop.subscribing.exceptions.EventTimedOut exception (depending on whether a subscription is configured to suppress or not suppress timeouts)

Parameters

timeout – the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the gathered_sleep_time amount of time.

Raises

EventTimedOut – if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled)

Returns

returns a StopIteration during a suppressed timeout or when all subscriptions have been removed.

Yield

yields an item in the queue if not empty

log(message: str, time: Optional[datetime.datetime] = None, label=None) None[source]

Logs a message with optional time of occurrence

Parameters
  • message – the messages to log

  • time – the time at which this message was deemed to take place (if None then it will use its own time), defaults to None

  • label – any labels to annotate the message with, defaults to None

log_filter = 'log'
play_log_book(filter_log: bool = True, log_filter_pattern: str = '') str[source]

Returns the contents of the logbook as a string of messages seperated by newline characters

Parameters

filter_log – whether the log book should filter out messages labeled as log_filter, defaults to True

Returns

the logbook contents

print_remaining_subscriptions() str[source]

Returns a description of all current subscriptions

Returns

the description of the subscriptions

remove_all_subscriptions() None[source]

Remove all subscriptions by unsubscribing on the producers. Note all removed subscriptions are placed in the archived list for diagnostic purposes.

remove_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None[source]

Remove a given subscription by unsubscribing on the producer. Note the removed subscriptions is placed in the archived list for diagnostic purposes.

Parameters

subscription – the subscription to remove

replay_self() str[source]

Replay internal logs from a tracer object

Returns

the tracer logs

replay_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) str[source]

Replay the tracer logs generated on a particular subscription

Parameters

subscription – the subscription to replay

Returns

the tracer logs

replay_subscriptions() str[source]

Replay the tracer logs from current and archived subscriptions

Returns

the tracer logs

subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
task_done()[source]

Gets called to indicate the item moved from the queue has finished being ” worked on. This is usefull for synchronization purposes when for example an event handler needs to wait for another event handler in a thread to finish before processing the next event.

class MessageBoardBase[source]

Bases: object

add_subscription(producer: ska_ser_skallop.subscribing.base.BaseProducer, attr: str, handler: ska_ser_skallop.subscribing.base.MessageHandlerBase, polling: bool = False) ska_ser_skallop.subscribing.base.SubscriptionBase[source]

adds a new subscription

property all_non_expendable_subscriptions_removed: bool
archived_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
board: queue.Queue
describe() str[source]
abstract get_current_items() List[ska_ser_skallop.subscribing.base.EventItemBase][source]

Get the current events contained in the buffer of a message board object.

Note this should be a method of the MessageBoard Class in skallop

Returns

A list of current event items

abstract get_items(timeout: float = 0) Iterator[ska_ser_skallop.subscribing.base.EventItemBase][source]

Iterates over the EventItem instances placed in its internal queue as a consequence of subscriptions to Producer instances. If the queue is currently empty it will wait for a given timeout until either returning with a StopIteration or raising an ska_ser_skallop.subscribing.exceptions.EventTimedOut exception (depending on whether a subscription is configured to suppress or not suppress timeouts)

Parameters

timeout – the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the ska_ser_skallop.subscribing.message_board.MessageBoard.gathered_sleep_time amount of time.

Raises

EventTimedOut – if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled)

Returns

a StopIteration during a suppressed timeout or when all subscriptions have been removed.

Yield

yields an item in the queue if not empty

abstract log(message: str, time: Optional[datetime.datetime] = None, label=None)[source]

puts a log message on the message board

property non_expendable_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
abstract play_log_book(filter_log: bool = True, log_filter_pattern: str = '') str[source]

Returns the contents of the logbook as a string of messages seperated by newline characters

Parameters

filter_log – whether the log book should filter out messages labeled as “log_filter”, defaults to True

Returns

the logbook contents

abstract remove_all_subscriptions()[source]

removes all subscriptions

abstract remove_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase)[source]

removes a given subscription

subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
abstract task_done()[source]

indicates an item popped from the message board is done (in case a wait until all done is called)

class MessageHandler(board: ska_ser_skallop.subscribing.base.MessageBoardBase, handler_annotation: str = '', enable_pre_handling_annotations: bool = True)[source]

Bases: ska_ser_skallop.subscribing.base.MessageHandlerBase

A basic implementation of the MessageHandlerBase containing typical generic event handling behaviour for convenience. Developers can inherit from this base class in order to implement event handling. The following basic features are provided:

  1. Recording and updating of current state from event data (see load_event(), replay() )

  2. Subscription behavior: remove the subscription generating this event or remove all from messageboard

  3. A pre and post context for handling an event (see handle_context() and handle_event() )

  4. Logging/Printing of the data (see print_event(), replay() )

describe_self() str[source]

Currently returns empty string. When inheriting you should overide this method and return a string representation of what the current event handler does

Returns

a description of what the handler does

handle_context(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) Iterator[None][source]

Context manager for use with an handler event method. When you place your event handling code within this context then the basic pre and post functionality will be included in the handling:

  1. Pre-handling: determine the basic ordering state (wether it is the first, second or subsequent events). In addition it may also unsubscribe automatically before processing the event if the attribute cancel_at_next_event has been set

  2. Post-handling: redords on the message board when the event item retrieved is finished handling it.

When inheriting you can update the pre and post methods (_pre_handling, _post_handling) with more sophisticated behaviour:

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

handle_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, *args) None[source]

This is the basic method that normally gets called by the client to handle an event on the messageboard. It should be overridded when you want to create specific behaviour.

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

handle_timedout(producer: ska_ser_skallop.subscribing.base.BaseProducer, attr: str, print_header: bool = True, print_tracer: bool = False) str[source]

called by the messageboard when it has waited too long for events on a subscription. The result of this is a string containing diagnostic data about the event handler to assist a tester in determining the cause of the time out.

Parameters
  • producer – The producer of the event data

  • attr – the attribute upon which the subscription is based

  • print_header – whether the diagnostic data should be preceded by a print header, defaults to True

Returns

the actual diagnostic data used to understand the cause of the time out

load_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None[source]

Updates the state of the handler by setting the contents of attr:current_event and attr:current_subscription to input parameters

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

print_event(event: ska_ser_skallop.subscribing.base.EventDataInt, ignore_first: bool = False) str[source]

This method is typically called by the client to assist the testing script. It returns a string describing the received event with annotations about its order and context. It also writes the same string to the messageboard so that all messages coming from event handlers can be centralized and replayed later.

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • ignore_first – whether a the first event resulting from a subscription should be ignored. This is typically when a subscription on a tango device results in an immediate event being generated before a test execution has ran. In such cases it may be useful to treat those events as not part of the actual test scenario, defaults to False

Returns

the event described in a useful context in order to assist the tester in understanding the dynamic situation.

replay() str[source]

returns a string that lists all the messages logged by the handler during its lifetime in order to assist with diagnosis”

Returns

the diagnostic string

suppress_timeout() bool[source]

This method is used by the messageboard to determine whether or not it should raise and exception when the timeout occurred or wether it should simply halt the iteration. Note, this is set to always return False. Override this method, if you want your handler to behave differently.

Returns

indicates whether it should

tracer: ska_ser_skallop.subscribing.helpers.Tracer
unsubscribe(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase)[source]

Unsubscribes the given subscription fom the messageboard. Use this typically if the incoming event indicates no further events is expected (and thus no further waiting is necessary)

Parameters

subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

unsubscribe_all()[source]

Unsubscribe all currently running subscriptions on a messageboard.

Use this if a current event indicates that the behaviour is finished.

class MessageHandlerBase[source]

Bases: object

handle_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, *args) None[source]
handle_timedout(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, *args, **kwargs) str[source]
print_event(event: ska_ser_skallop.subscribing.base.EventDataInt, ignore_first=False) str[source]
replay() str[source]
suppress_timeout() bool[source]
class Producer(name: str)[source]

Bases: ska_ser_skallop.subscribing.base.BaseProducer

something that can be subscribed or unsubscribed to the interface is an generalization of the DeviceProxy so that the DeviceProxy can be a specific instance of a producer without having to change its interface. A specific instance of a Producer must have at least the following methods:

get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

returns events generated on a specific subscription that was held in an internal buffer. This is for when the subscription is polled based.

is_attribute_polled(attr: str) bool[source]

checks if an attribute is polled

poll_attribute(attr: str, period: int)[source]

sets a poll period on a producer to periodically check if an attribute value has change and only then publish it

subscribe_event(attr: str, event_type: Any, subscriber: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int[source]

sets up an subscription by registering a subscriber to be notified (via the push_event method) when a particular event type occurs on the specified attribute. In an integer is given in stead of a Subscriber the subscription is polled and the producers stores any events internally for collection by the subscriber at a later stage

unsubscribe_event(subscription_id: int) None[source]

removes a subscription identified by an id

class SubscriptionBase[source]

Bases: object

attr: str = ''
buffer_size = 100
describe() Any[source]
property expendable: bool
get_event_pushing_logs() str[source]
get_handler_logs() str[source]
get_internal_logs() str[source]
handle_timedout(*args, **kwargs) str[source]
handler: object = None
polled: bool = False
producer: ska_ser_skallop.subscribing.base.Producer = <ska_ser_skallop.subscribing.base.Producer object>
subscribe_buffer(buffersize=100) None[source]
subscribe_by_callback(board: queue.Queue) None[source]
suppress_timeout() bool[source]
unsubscribe() None[source]
class Tracer(message: Optional[str] = None)[source]

Bases: object

class used to record messages at specific events

message(message: str) None[source]
print_messages() str[source]
print_tracers(tracers: List[ska_ser_skallop.subscribing.helpers.Tracer]) str[source]

.builders Module

.handlers Module

.logging Module

.occurrences Module

Module used for checking and verifying correct occurrence of events.

class Ahead(device: str, ahead_by: datetime.timedelta)[source]

Bases: NamedTuple

Bundles data about a device that are transitioning ahead of the main one.

ahead_by: datetime.timedelta

Alias for field number 1

device: str

Alias for field number 0

class AssertionOnOccurrence(device: str, occurrences: ska_ser_skallop.event_handling.occurrences.Occurrences)[source]

Bases: object

Intermediate object to manage the assertion of correct occurrences.

is_ahead_of_all_on_transit(transit: str)[source]

Assert that events occurred on this device ahead of all others for a given transition.

Parameters

transit – The name of the transition (e.g. the value indicated by the change event)

is_behind_all_on_transit(transit: str)[source]

Assert that events occurred on this device behind all others for a given transition.

Parameters

transit – The name of the transition (e.g. the value indicated by the change event)

class Behind(device: str, behind_by: datetime.timedelta)[source]

Bases: NamedTuple

Bundles data about a device that are transitioning behind that of the main one.

behind_by: datetime.timedelta

Alias for field number 1

device: str

Alias for field number 0

class Occurrence(time_of_occurrence: datetime.datetime, time_of_recording: datetime.datetime, value: str, attr: str, device: str)[source]

Bases: NamedTuple

Bundles data related to a single occurrence into a single object.

attr: str

Alias for field number 3

device: str

Alias for field number 4

time_of_occurrence: datetime.datetime

Alias for field number 0

time_of_recording: datetime.datetime

Alias for field number 1

value: str

Alias for field number 2

class Occurrences(transits: List[Union[str, Tuple[str, str]]])[source]

Bases: ska_ser_skallop.event_handling.base.WireTap

Class used to keep track of the occurrences of events.

It is given a list of values representing expected transitions and then keeps track of actual events occuring by means of handler objects making calls on it by seeing it as a WireTap object.

add_occurrence(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Add the occurrence of a new event.

Parameters

event – the event, caused by a subscription on a producer (e.g tango device) and typically called by an handler, handling that event.

assert_that(device: str) ska_ser_skallop.event_handling.occurrences.AssertionOnOccurrence[source]

Generate an assertion check on the occurrences for a particular device.

Parameters

device – the FQDN name for the device being checked.

Returns

an object to specify the assertion in a chain like fashion

print_outcome_for(device: str) str[source]

Print a human readable description of events occurring relative to the given device.

Parameters

device – the FQDN name of the device to be printing an outcome for

Returns

A string containing the text documenting the results.

set_subject_device(device: str)[source]

Specify the main device or subject of investigating relative occurrences.

Parameters

device – the name of the device as FQDN string.

property subject_device: str

Get the main device or subject of investigating relative occurrences.

Returns

the name of the device as FQDN string.

tap_in_on_event(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Implement the abstract tap_in_on_event.

This will result in a new occurrence to be added with the event as the input argument.

Parameters

event – The event object that has occurred.

class Outcome(device: str, transition: str, devices_ahead: List[ska_ser_skallop.event_handling.occurrences.Ahead], devices_behind: List[ska_ser_skallop.event_handling.occurrences.Behind], transit_time: Union[datetime.datetime, str])[source]

Bases: NamedTuple

Bundles data about an outcome for the relative order of occurrences into a single object.

device: str

Alias for field number 0

devices_ahead: List[ska_ser_skallop.event_handling.occurrences.Ahead]

Alias for field number 2

devices_behind: List[ska_ser_skallop.event_handling.occurrences.Behind]

Alias for field number 3

transit_time: Union[datetime.datetime, str]

Alias for field number 4

transition: str

Alias for field number 1