.subscribing package

Subpackages

Submodules

.subscribing.base module

class AttributeInt(name: str, value: Any, timestamp=None)[source]

Bases: object

An abstraction of a Device Attribute so that it provides the minimum set of fields and methods necessary to work with subscription module

class BaseProducer(name: str)[source]

Bases: object

name() str[source]

delivers a human readable identifier for the producer, should be the same as what is used to instantiated it

class EventDataInt(producer_name: str, attr_name: str, val: Optional[Any] = None, timestamp=None)[source]

Bases: object

An abstraction of event data that provides the minium set of attributes and methods necessary to work with subscription module. It is derived from the specific tango EventData so that the EventData from tango devices can also work in place.

class EventItemBase[source]

Bases: object

TODO

event: ska_ser_skallop.subscribing.base.EventDataInt
abstract event_value() Any[source]
abstract handle_event()[source]
handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
subscription: ska_ser_skallop.subscribing.base.SubscriptionBase
class EventTimeInt(timestamp: Optional[float] = None)[source]

Bases: object

An abstraction of a Device event’s time of occurrence so as to be compatible with the Tango Device attribute’s time object

isoformat() str[source]

renders a datetime into isformatted string

Returns

the isoformattted string

todatetime() datetime.datetime[source]

renders the object as a datetime type

class EventsPusherBase[source]

Bases: ska_ser_skallop.subscribing.base.Subscriber

handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None[source]

the method that is called by the producer when an event occurs on a specific subscription

queue: queue.Queue[ska_ser_skallop.subscribing.base.EventItemBase]
class MessageBoardBase[source]

Bases: object

add_subscription(producer: ska_ser_skallop.subscribing.base.BaseProducer, attr: str, handler: ska_ser_skallop.subscribing.base.MessageHandlerBase, polling: bool = False) ska_ser_skallop.subscribing.base.SubscriptionBase[source]

adds a new subscription

property all_non_expendable_subscriptions_removed: bool
archived_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
board: queue.Queue
describe() str[source]
abstract get_current_items() List[ska_ser_skallop.subscribing.base.EventItemBase][source]

Get the current events contained in the buffer of a message board object.

Note this should be a method of the MessageBoard Class in skallop

Returns

A list of current event items

abstract get_items(timeout: float = 0) Iterator[ska_ser_skallop.subscribing.base.EventItemBase][source]

Iterates over the EventItem instances placed in its internal queue as a consequence of subscriptions to Producer instances. If the queue is currently empty it will wait for a given timeout until either returning with a StopIteration or raising an ska_ser_skallop.subscribing.exceptions.EventTimedOut exception (depending on whether a subscription is configured to suppress or not suppress timeouts)

Parameters

timeout – the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the ska_ser_skallop.subscribing.message_board.MessageBoard.gathered_sleep_time amount of time.

Raises

EventTimedOut – if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled)

Returns

a StopIteration during a suppressed timeout or when all subscriptions have been removed.

Yield

yields an item in the queue if not empty

abstract log(message: str, time: Optional[datetime.datetime] = None, label=None)[source]

puts a log message on the message board

property non_expendable_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
abstract play_log_book(filter_log: bool = True, log_filter_pattern: str = '') str[source]

Returns the contents of the logbook as a string of messages seperated by newline characters

Parameters

filter_log – whether the log book should filter out messages labeled as “log_filter”, defaults to True

Returns

the logbook contents

abstract remove_all_subscriptions()[source]

removes all subscriptions

abstract remove_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase)[source]

removes a given subscription

subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
abstract task_done()[source]

indicates an item popped from the message board is done (in case a wait until all done is called)

class MessageHandlerBase[source]

Bases: object

handle_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, *args) None[source]
handle_timedout(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, *args, **kwargs) str[source]
print_event(event: ska_ser_skallop.subscribing.base.EventDataInt, ignore_first=False) str[source]
replay() str[source]
suppress_timeout() bool[source]
class Producer(name: str)[source]

Bases: ska_ser_skallop.subscribing.base.BaseProducer

something that can be subscribed or unsubscribed to the interface is an generalization of the DeviceProxy so that the DeviceProxy can be a specific instance of a producer without having to change its interface. A specific instance of a Producer must have at least the following methods:

get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

returns events generated on a specific subscription that was held in an internal buffer. This is for when the subscription is polled based.

is_attribute_polled(attr: str) bool[source]

checks if an attribute is polled

poll_attribute(attr: str, period: int)[source]

sets a poll period on a producer to periodically check if an attribute value has change and only then publish it

subscribe_event(attr: str, event_type: Any, subscriber: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int[source]

sets up an subscription by registering a subscriber to be notified (via the push_event method) when a particular event type occurs on the specified attribute. In an integer is given in stead of a Subscriber the subscription is polled and the producers stores any events internally for collection by the subscriber at a later stage

unsubscribe_event(subscription_id: int) None[source]

removes a subscription identified by an id

class Subscriber[source]

Bases: object

push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None[source]

the method that is called by the producer when an event occurs on a specific subscription

class SubscriptionBase[source]

Bases: object

attr: str = ''
buffer_size = 100
describe() Any[source]
property expendable: bool
get_event_pushing_logs() str[source]
get_handler_logs() str[source]
get_internal_logs() str[source]
handle_timedout(*args, **kwargs) str[source]
handler: object = None
polled: bool = False
producer: ska_ser_skallop.subscribing.base.Producer = <ska_ser_skallop.subscribing.base.Producer object>
subscribe_buffer(buffersize=100) None[source]
subscribe_by_callback(board: queue.Queue) None[source]
suppress_timeout() bool[source]
unsubscribe() None[source]

.subscribing.configuration module

class MessageboardContainer(*args: Any, **kwargs: Any)[source]

Bases: object

get_current_provided_messageboard()[source]
get_message_board() ska_ser_skallop.subscribing.base.MessageBoardBase[source]

retrieves the message board for use in the app, if this is the first time call has been made, the object shall first determine a default messageboard

inject(board: ska_ser_skallop.subscribing.base.MessageBoardBase, provided_by: str)[source]
reset()[source]
class Provider(provider, provided_by)[source]

Bases: NamedTuple

provided_by: str

Alias for field number 1

provider: ska_ser_skallop.subscribing.base.MessageBoardBase

Alias for field number 0

determine_messageboard() ska_ser_skallop.subscribing.base.MessageBoardBase[source]
get_messageboard(init_before_provided=True)[source]
patch_messageboard(board: ska_ser_skallop.subscribing.base.MessageBoardBase = None, provided_by: str = '')[source]
reset_configurations()[source]
set_messageboard(board: ska_ser_skallop.subscribing.base.MessageBoardBase, provided_by: str)[source]

.subscribing.event_item module

class EventItem(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase])[source]

Bases: ska_ser_skallop.subscribing.base.EventItemBase

Class for tying an incoming event from a subscription to the subscription itself as well as a registered event handler and allow the extraction of these items

describe() Tuple[str, str, str, str][source]

Returns a description of the event data in the form of a tuple of strings in the following order:

  1. The name of the producer (device)

  2. The name of the attribute for which the subscription was based

  3. The value of the attribute

  4. The date at which the event was generated

Returns

[description]

event: ska_ser_skallop.subscribing.base.EventDataInt
event_value() Any[source]
get_attr_name() str[source]

Returns the attribute of the producer upon which events were raised

Returns

the name of the attribute

get_attr_value_str() str[source]

Returns the current value of the attribute rendered as a string

Returns

the current value of the attribute

get_date_init() datetime.datetime[source]

Returns the date the event item (not the event it self) was created

Returns

the date of event item creation

get_date_lodged() datetime.datetime[source]

Returns the date the event was created by the producer

Returns

the date when the event was lodged

get_date_lodged_isoformat() str[source]

Returns the date the event was generated, but rendered as a string in isoformat (usefull for logging purposes)

Returns

the isoformatted date

get_producer_name() str[source]

Returns the name of the producer (or device) that generatted the evnt

Returns

the name of the producer

handle_event()[source]
handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
subscription: ska_ser_skallop.subscribing.base.SubscriptionBase

.subscribing.exceptions module

exception EventTimedOut(message: str, remaining_events='')[source]

Bases: Exception

Used to indicate a timeout has occurred whilst waiting for events to come in from a set of subscriptions. The timeout message will contain a list of diagnostic messages from remaining subscriptions in order to assist with fault finding.

.subscribing.helpers module

class LogBook[source]

Bases: object

log(message: str, timestamp: Optional[datetime.datetime] = None, label=None)[source]
log_filer = 'log'
read(filter_log=True, log_filter_pattern='') str[source]
class LogMessage(time, log, label)

Bases: tuple

label

Alias for field number 2

log

Alias for field number 1

time

Alias for field number 0

class Tracer(message: Optional[str] = None)[source]

Bases: object

class used to record messages at specific events

message(message: str) None[source]
print_messages() str[source]
class TracerMessage(time, message)[source]

Bases: NamedTuple

message: str

Alias for field number 1

time: datetime.datetime

Alias for field number 0

describe_event(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: datetime.datetime = datetime.datetime(2023, 6, 20, 8, 24, 51, 839901)) Tuple[str, str, str, str][source]

Return an event as a tuple of strings describing the event.

Returns

an event as a tuple of strings describing the event.

get_attr_name(event: ska_ser_skallop.subscribing.base.EventDataInt) str[source]

returns the event attribute for which the value haven been set

get_attr_value_as_str(attr: ska_ser_skallop.subscribing.base.AttributeInt) str[source]

transform a tango base.DeviceAttribute value into a string as determined by its type (name)

get_attr_value_str(event: ska_ser_skallop.subscribing.base.EventDataInt) str[source]

returns the attribute value for an event as a string

get_date_lodged(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: Optional[datetime.datetime] = None) datetime.datetime[source]

returns the initial date an event was generated (if it exists). If it does not exist a new date can either be injected as a parameter or generated at the time of call

get_date_lodged_isoformat(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: datetime.datetime = datetime.datetime(2023, 6, 20, 8, 24, 51, 839893)) str[source]

renders the date for an event as an isoformated string

get_device_name(event: ska_ser_skallop.subscribing.base.EventDataInt) str[source]

returns the tango device owning the event

i_can_subscribe(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, event_type: Any) bool[source]
print_tracers(tracers: List[ska_ser_skallop.subscribing.helpers.Tracer]) str[source]
unpack_event(event: ska_ser_skallop.subscribing.base.EventDataInt, init_date: datetime.datetime = datetime.datetime(2023, 6, 20, 8, 24, 51, 839906)) Tuple[str, str, str, datetime.datetime][source]

returns a tuple of key attributes for an event as device name, attribute, value and date lodged

.subscribing.message_board module

Contains the MessageBoard class and related classes to implement a Message Board

class MessageBoard[source]

Bases: ska_ser_skallop.subscribing.base.MessageBoardBase

Encapsulates a queue containing events that gets places by events pushers onto it. Also keeps track of subscriptions and allow adding and removing of a subscription. Lastly, it keeps log of various internal messages and transitions to assist in describing the occurrence of events.

add_subscription(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, handler: ska_ser_skallop.subscribing.message_handler.MessageHandler, polling: bool = False) ska_ser_skallop.subscribing.subscription.Subscription[source]

Registers and initiate a new subscription on a given producer with a specific attribute. A subscription can be seen as a request to be notified when an event occurs on a particular attribute of an entity. In this implementation an event is seen as when the value of an attribute of that entity has changed. The role of the producer is then to notify the subscriber with two possible ways:

  1. By means of a given call to be made by a producer (call back) on a given

    object

  2. By storing the event temporarily in a buffer for later retrieval

    (polling = True)

However, both of these options achieve the same external result on the MessageBoard by resulting in events being placed on its internal queue. These events can then be retrieved later from the MessageBoard, together with information about the subscription and a given handler to consume the results of the event.

Parameters
  • producer – the object responsible for generating the events and upon which the subscription must be placed.

  • attr – the attribute (item of of interest) for which a change in value must result in a notification event

  • handler – the object responsible for consuming the contents of the event. Note this is different from the subscriber handler that will be given to the producer for handling the incoming event, in this case the handler is just an optional attrubute of the EventItem that will be placed on the queue, providing a means for consuming events as they get pulled from the buffer.

  • polling – whether the internal event generation must be done via polling or call back. In general the call back mechanism is simpler but in cases where a producer is not able to call a subscriber directly then the polling method is also available, defaults to False

Returns

returns the created subscription in cases the client code wants to use the subscription later

archived_subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
board: queue.Queue
gathered_sleep_time = 0.05
get_current_items() List[ska_ser_skallop.subscribing.event_item.EventItem][source]

Get the current events contained in the buffer of a message board object.

Note this should be a method of the MessageBoard Class in skallop

Returns

A list of current event items

get_items(timeout: float = 0) Iterator[ska_ser_skallop.subscribing.event_item.EventItem][source]

Iterates over the EventItem instances placed in its internal queue as a consequence of subscriptions to Producer instances. If the queue is currently empty it will wait for a given timeout until either returning with a StopIteration or raising an ska_ser_skallop.subscribing.exceptions.EventTimedOut exception (depending on whether a subscription is configured to suppress or not suppress timeouts)

Parameters

timeout – the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the gathered_sleep_time amount of time.

Raises

EventTimedOut – if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled)

Returns

returns a StopIteration during a suppressed timeout or when all subscriptions have been removed.

Yield

yields an item in the queue if not empty

log(message: str, time: Optional[datetime.datetime] = None, label=None) None[source]

Logs a message with optional time of occurrence

Parameters
  • message – the messages to log

  • time – the time at which this message was deemed to take place (if None then it will use its own time), defaults to None

  • label – any labels to annotate the message with, defaults to None

log_filter = 'log'
play_log_book(filter_log: bool = True, log_filter_pattern: str = '') str[source]

Returns the contents of the logbook as a string of messages seperated by newline characters

Parameters

filter_log – whether the log book should filter out messages labeled as log_filter, defaults to True

Returns

the logbook contents

print_remaining_subscriptions() str[source]

Returns a description of all current subscriptions

Returns

the description of the subscriptions

remove_all_subscriptions() None[source]

Remove all subscriptions by unsubscribing on the producers. Note all removed subscriptions are placed in the archived list for diagnostic purposes.

remove_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None[source]

Remove a given subscription by unsubscribing on the producer. Note the removed subscriptions is placed in the archived list for diagnostic purposes.

Parameters

subscription – the subscription to remove

replay_self() str[source]

Replay internal logs from a tracer object

Returns

the tracer logs

replay_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) str[source]

Replay the tracer logs generated on a particular subscription

Parameters

subscription – the subscription to replay

Returns

the tracer logs

replay_subscriptions() str[source]

Replay the tracer logs from current and archived subscriptions

Returns

the tracer logs

subscriptions: Set[ska_ser_skallop.subscribing.base.SubscriptionBase]
task_done()[source]

Gets called to indicate the item moved from the queue has finished being ” worked on. This is usefull for synchronization purposes when for example an event handler needs to wait for another event handler in a thread to finish before processing the next event.

exception SubscriptionAlreadyRemoved(message: str)[source]

Bases: KeyError

Indicate a subscription that has been requested to be removed has already been removed from the set of subscriptions

.subscribing.message_handler module

class MessageHandler(board: ska_ser_skallop.subscribing.base.MessageBoardBase, handler_annotation: str = '', enable_pre_handling_annotations: bool = True)[source]

Bases: ska_ser_skallop.subscribing.base.MessageHandlerBase

A basic implementation of the MessageHandlerBase containing typical generic event handling behaviour for convenience. Developers can inherit from this base class in order to implement event handling. The following basic features are provided:

  1. Recording and updating of current state from event data (see load_event(), replay() )

  2. Subscription behavior: remove the subscription generating this event or remove all from messageboard

  3. A pre and post context for handling an event (see handle_context() and handle_event() )

  4. Logging/Printing of the data (see print_event(), replay() )

cancel_at_next_event: bool
describe_self() str[source]

Currently returns empty string. When inheriting you should overide this method and return a string representation of what the current event handler does

Returns

a description of what the handler does

handle_context(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) Iterator[None][source]

Context manager for use with an handler event method. When you place your event handling code within this context then the basic pre and post functionality will be included in the handling:

  1. Pre-handling: determine the basic ordering state (wether it is the first, second or subsequent events). In addition it may also unsubscribe automatically before processing the event if the attribute cancel_at_next_event has been set

  2. Post-handling: redords on the message board when the event item retrieved is finished handling it.

When inheriting you can update the pre and post methods (_pre_handling, _post_handling) with more sophisticated behaviour:

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

handle_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase, *args) None[source]

This is the basic method that normally gets called by the client to handle an event on the messageboard. It should be overridded when you want to create specific behaviour.

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

handle_timedout(producer: ska_ser_skallop.subscribing.base.BaseProducer, attr: str, print_header: bool = True, print_tracer: bool = False) str[source]

called by the messageboard when it has waited too long for events on a subscription. The result of this is a string containing diagnostic data about the event handler to assist a tester in determining the cause of the time out.

Parameters
  • producer – The producer of the event data

  • attr – the attribute upon which the subscription is based

  • print_header – whether the diagnostic data should be preceded by a print header, defaults to True

Returns

the actual diagnostic data used to understand the cause of the time out

load_event(event: ska_ser_skallop.subscribing.base.EventDataInt, subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None[source]

Updates the state of the handler by setting the contents of attr:current_event and attr:current_subscription to input parameters

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

print_event(event: ska_ser_skallop.subscribing.base.EventDataInt, ignore_first: bool = False) str[source]

This method is typically called by the client to assist the testing script. It returns a string describing the received event with annotations about its order and context. It also writes the same string to the messageboard so that all messages coming from event handlers can be centralized and replayed later.

Parameters
  • event – The eventdata as extracted from the messageboard EventItem object

  • ignore_first – whether a the first event resulting from a subscription should be ignored. This is typically when a subscription on a tango device results in an immediate event being generated before a test execution has ran. In such cases it may be useful to treat those events as not part of the actual test scenario, defaults to False

Returns

the event described in a useful context in order to assist the tester in understanding the dynamic situation.

replay() str[source]

returns a string that lists all the messages logged by the handler during its lifetime in order to assist with diagnosis”

Returns

the diagnostic string

suppress_timeout() bool[source]

This method is used by the messageboard to determine whether or not it should raise and exception when the timeout occurred or wether it should simply halt the iteration. Note, this is set to always return False. Override this method, if you want your handler to behave differently.

Returns

indicates whether it should

tracer: ska_ser_skallop.subscribing.helpers.Tracer
unsubscribe(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase)[source]

Unsubscribes the given subscription fom the messageboard. Use this typically if the incoming event indicates no further events is expected (and thus no further waiting is necessary)

Parameters

subscription – The subscription used to generate the event data (also extracted from the messageboard EventItem object)

unsubscribe_all()[source]

Unsubscribe all currently running subscriptions on a messageboard.

Use this if a current event indicates that the behaviour is finished.

.subscribing.producers module

class BufferedSubscriber[source]

Bases: ska_ser_skallop.subscribing.base.Subscriber

push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None[source]

the method that is called by the producer when an event occurs on a specific subscription

class IndexedDictionary[source]

Bases: Dict

append(value) int[source]
class Producer(name: str)[source]

Bases: ska_ser_skallop.subscribing.base.Producer

An emulation of a producer with the ability to be subscribed to and to respond when being called to push events by in turn calling all its subscribers. This Class could be usefull to test subscriptions without needing externally running applications.

describe_subscription(subscription_id: int) Dict[source]

Gives a description of a current subscription

Parameters

subscription_id – the identification of the subscription

Returns

the description

get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

Used for when polling based supscription is used

Parameters

subscription_id – the subscription id

Returns

a list of events generated since last retrieval

name() str[source]

Returns the name of the producer

Returns

producer name

push_event(attr: str, event: ska_ser_skallop.subscribing.base.EventDataInt) None[source]
subscribe_event(attr: str, event_type: Any, subscriber: Union[ska_ser_skallop.subscribing.base.Subscriber, int]) int[source]

Registers a subscription on a producer based on a given attr.

Parameters
  • attr – the attribute for which events must be generated when an event has been pushed

  • event_type – The event type is to ensure the interface matches to the tango Device interface even though only events of Change type are considered.

  • subscriber – The object that will be called by it’s push_event() method when an event has occurred. If instead of an object an integer is given then the subscription will result in internal buffer being populated when a new event is pushed (polling)

Returns

The subscription ID as a sequential index nr starting from 0

unsubscribe_event(subscription_id: int) None[source]

Removes a subscriber from being notified when an event has been pushed for a particular subscription

Parameters

subscription_id – [description]

class Subscript(attr, subscriber)

Bases: tuple

attr

Alias for field number 0

subscriber

Alias for field number 1

.subscribing.subscription module

class EventsPusher(queue: queue.Queue, handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase] = None)[source]

Bases: ska_ser_skallop.subscribing.base.EventsPusherBase

Object that pushes events onto a given buffer when called by a push event

handler: Optional[ska_ser_skallop.subscribing.base.MessageHandlerBase]
push_event(event: ska_ser_skallop.subscribing.base.EventDataInt) None[source]

Called by a callback from a producer with a new incoming event and results in a new “class”EventItem being created and placed on the provided queue

Parameters

event – the incoming event

queue: queue.Queue[ska_ser_skallop.subscribing.base.EventItemBase]
set_subscription(subscription: ska_ser_skallop.subscribing.base.SubscriptionBase) None[source]

This method is used by the calling Subscription object during the initiation of a subscription on a Producer. Because object is needed as part of the subscription call, the subscription can only be created after the EventsPusher and thus requiring this method.

class Subscription(producer: ska_ser_skallop.subscribing.base.Producer, attr: str, handler: ska_ser_skallop.subscribing.base.MessageHandlerBase, master: bool = False)[source]

Bases: ska_ser_skallop.subscribing.base.SubscriptionBase

class that ties a subscription to a producer in order to keep record of subscriptions. It handles the basic subscribing and unsubscribing behaviour to the producer.

buffer_size = 100
describe() Tuple[str, str, Optional[int]][source]

Describe it self in terms of a tuple of items describing key attributes about the subscription in the following order:

  1. Producer Name

  2. Attribute Name

  3. Subscripton id (may be None if a subscription has been removed or not executed yet)

Returns

the tuple of items

get_event_pushing_logs() str[source]

returns any logs generated by the events pusher object used as a subscriber onto a producer during callbacks

Returns

the diagnostic logs

get_handler_logs() str[source]

returns logs generated by the handler during the handling of events to do with this particular subscription

Returns

the logs messages

get_internal_logs() str[source]

returns any internal logs generated by its tracing object for diagnostic purposes

Returns

the internal logs

handle_timedout(*args, **kwargs) str[source]

Delivers diagnostic information about a subscription in case of a timeout

Returns

the diagnostic data

poll() List[ska_ser_skallop.subscribing.event_item.EventItem][source]

Used when a polling based subscription is made on a producer and gets any internally buffered events that may have occurred since the last time it was queried.

Returns

A list (empty if none was generated) of EventItem instances

subscribe_buffer(buffersize: Optional[int] = 100) None[source]

Registers a subscription on its Producer by means of an internal events buffer that holds newly generated events temporarily until collected with the poll

Parameters

buffersize – the size of the internal buffer to be used by the producer, defaults to buffer_size

subscribe_by_callback(board: queue.Queue) None[source]

Registers a subscription on its Producer by means of a callback being called on a provided object that will result in placing the event as an EventItem on the provided queue

Parameters

board – the queue that must be used by the callback object to place new EventItem instances on

suppress_timeout() bool[source]

whether a subscription (implied by its handler) should suppress or not suppress a timeout

Returns

the result

unsubscribe()[source]

Removes a subscription on its producer.

Module contents

Subscribing allows the monitoring of events generated from diverse producers at a central place. If you have producer A and B, for which you expect over the course of a test to produce events 1,2,3; then you set up a message board to which these producers can post if and when they generate the expected event. Then at the appropriate time, you can access these events, check their order, analyse their timing and or verify their content within your test.

The example below illustrates the concept:

import logging
import sys

from ska_ser_skallop.connectors import configuration
from ska_ser_skallop.subscribing import configuration as sub_conf
from ska_ser_skallop.subscribing.base import MessageBoardBase
from ska_ser_skallop.subscribing.message_handler import MessageHandler

logger = logging.getLogger(__name__)


def subscribe(board: MessageBoardBase):
    # adds two subscriptions
    board.add_subscription(
        configuration.get_producer("sys/tg_test/1", fast_load=True),
        "state",
        MessageHandler(board),
    )
    board.add_subscription(
        configuration.get_producer("sys/tg_test/1", fast_load=True),
        "short_scalar",
        MessageHandler(board),
    )


def handle_events(board: MessageBoardBase, option):
    # handle next item on the messageboard...
    if option == "handle most recent":
        next(board.get_items(timeout=1)).handle_event()
    # or if you want to get the next 2 events
    elif option == "block until all handled":
        for index, item in enumerate(board.get_items()):
            item.handle_event()
            print(item.handler.print_event(item.event))
            if index == 2:
                board.remove_all_subscriptions()


def set_logging_for_stdout():
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)


if __name__ == "__main__":
    set_logging_for_stdout()
    brd = sub_conf.get_messageboard()
    subscribe(brd)
    handle_events(brd, "block until all handled")