API

ska_tango_testing

This package provides test harness for testing of SKA Tango devices.

ska_tango_testing.context

This module provides support for tango testing contexts.

ska_tango_testing.context.DeviceProxy = <ska_tango_testing.context._DeviceProxyFactory object>

A drop-in replacement for tango.DeviceProxy.

There is a known bug in tango.test_context.MultiDeviceTestContext for which the workaround is a patch to tango.DeviceProxy. This drop-in replacement makes it possible for ThreadedTestTangoContextManager to apply this patch. Until the bug is fixed, all production code that will be tested in that context must use this class instead of tango.DeviceProxy.

(For more information, see https://gitlab.com/tango-controls/pytango/-/issues/459.)

class ska_tango_testing.context.TangoContextProtocol(*args, **kwargs)

Protocol for a tango context.

__init__(*args, **kwargs)
get_device(device_name)

Return a proxy to a specified device.

Parameters:

device_name (str) – name of the device

Return type:

DeviceProxy

Returns:

a proxy to the device

class ska_tango_testing.context.ThreadedTestTangoContextManager

A lightweight context for testing Tango devices.

__init__()

Initialise a new instance.

add_device(device_name, device_class, **properties)

Add a device to the context managed by this manager.

Parameters:
  • device_name (str) – name of the device to be added

  • device_class (Union[str, Type[Device]]) – the class of the device to be added. This can be the class itself, or its name.

  • properties (Any) – a dictionary of device properties

Return type:

None

add_mock_device(device_name, device_mock)

Register a mock at a given device name.

Registering this mock means that when an attempts is made to create a tango.DeviceProxy to that device name, this mock is returned instead.

Parameters:
  • device_name (str) – name of the device for which the mock is to be registered.

  • device_mock (DeviceProxy) – the mock to be registered at this name.

Return type:

None

class ska_tango_testing.context.TrueTangoContextManager

A Tango context in which the device has already been deployed.

For example, Tango has been deployed into a k8s cluster, and now we want to run tests against it.

get_device(device_name)

Return a proxy to a specified device.

Parameters:

device_name (str) – name of the device

Return type:

DeviceProxy

Returns:

a proxy to the device

ska_tango_testing.harness

This module provides a test harness factory for testing Tango devices.

class ska_tango_testing.harness.TangoTestHarness

A test harness for Tango devices.

__init__()

Initialise a new instance.

add_context_manager(context_name, context_manager)

Add a context manager to this test harness.

When we enter the context of this test harness, we enter the contexts of any context managers registered with this method.

For example, suppose we want to test our FooDevice Tango device, but first we need to stand up a HTTP server interface to a FooSimulator, so that the FooDevice has something to monitor and control. To achieve this, we create a context manager that provides a context in which the required FooSimulator HTTP server is running; and we register that context manager with this test harness. When we enter this harness, we also enter the simulator context, which means the HTTP server gets launched.

Parameters:
  • context_name (str) – name of the context. We can use this to recover the context once entered.

  • context_manager (ContextManager[Any]) – the context manager to launch.

Return type:

None

add_device(device_name, device_class, **properties)

Add a Tango device to this test harness.

Parameters:
  • device_name (str) – name of the device to be added

  • device_class (type[Device] | str) – the class of the device to be added. This can be the class itself, or its name.

  • properties (Any) – a dictionary of device properties

Return type:

None

add_mock_device(device_name, device_mock)

Register a mock at a given Tango device name.

Registering this mock means that when an attempts is made to create a tango.DeviceProxy to that device name, this mock is returned instead.

Parameters:
  • device_name (str) – name of the device for which the mock is to be registered.

  • device_mock (DeviceProxy) – the mock to be registered at this name.

Return type:

None

class ska_tango_testing.harness.TangoTestHarnessContext(tango_context, contexts)

Representation of a test harness context.

__init__(tango_context, contexts)

Initialise a new instance.

Parameters:
  • tango_context (TangoContextProtocol) – the Tango context for this test harness.

  • contexts (dict[str, Any]) – other contexts for this test harness.

get_context(context_name)

Return a sub-context of this test harness.

For example, if this test harness contains a context manager that launches a simulator as a TCP server, and returns the address of that server, then this method can be used to recover that address.

Parameters:

context_name (str) – name under which the context manager was added.

Return type:

Any

Returns:

the context hook for the named context.

get_device(device_name)

Return a proxy to a Tango device running in this test harness context.

Parameters:

device_name (str) – name of the Tango device.

Return type:

DeviceProxy

Returns:

a proxy to the Tango device.

ska_tango_testing.mock

This subpackage provides mocks for testing of SKA Tango devices.

class ska_tango_testing.mock.MockCallable(timeout=1.0, wraps=None)

A class for a single mock callable.

__call__(*args, **kwargs)

Register a call on this callable.

Parameters:
  • args (Any) – positional arguments in the call

  • kwargs (Any) – keyword arguments in the call

Return type:

Any

Returns:

whatever this callable is configured to return

__init__(timeout=1.0, wraps=None)

Initialise a new instance.

Parameters:
  • timeout (Optional[float]) – how long to wait for the call, in seconds, or None to wait forever. The default is 1 second.

  • wraps (Optional[Callable]) – a callable to be wrapped by this one. See wraps() for details.

assert_against_call(lookahead=None, consume_nonmatches=False, **kwargs)

Assert that this callable has been called as characterised.

Parameters:
  • lookahead (Optional[int]) – The number of calls to examine in search of a matching call. The default is 1, which means we are asserting against the next call.

  • consume_nonmatches (bool) – whether calls that are examined but do not match should be consumed rather than left on the queue to be examined by future assert calls.

  • kwargs (Any) – the characteristics that we are asserting the call to have. For details see MockCallableGroup.assert_against_call().

Return type:

Dict[str, Any]

Returns:

details of the call

Raises:

AssertionError – if the asserted call has not occurred within the timeout period

assert_call(*args, **kwargs)

Assert that this callable has been called as specified.

For example, assert_call(“b”, c=1, lookahead=2) asserts that one of the next 2 calls to this callable will have call signature (“b”, c=1).

This is syntactic sugar, which simplifies the expression of assertions, but also muddles up the arguments to assert_call with the arguments that we are asserting the call to have. It is equivalent to the more principled and flexible, but long-winded:

assert_against_call(
    call_args=("b",),
    call_kwargs={"c": 1},
    lookahead=2,
)
Parameters:
  • args (Any) – positional arguments asserted to be in the call.

  • kwargs (Any) –

    If a “lookahead” keyword argument is provided, this specifies the number of calls to examine in search of a matching call. The default is 1, in which case we are asserting against the next call.

    If a “consume_nonmatches” keyword argument is provided, this indicates whether calls that are examined but do not match should be consumed rather than left on the queue to be examined by future assert calls.

    All other keyword arguments are keyword arguments asserted to be in the call.

Return type:

Dict[str, Any]

Returns:

details of the call

Raises:

AssertionError – if the asserted call has not occurred within the timeout period

assert_not_called()

Assert that this callable has not been called.

Raises:

AssertionError – if this callable has been called.

Return type:

None

configure_mock(**configuration)

Configure the underlying mock.

Parameters:

configuration (Any) – keyword arguments to be passed to the underlying mock.

Return type:

None

wraps(wrapped)

Specify a callable for this mock callable to wrap.

This allows use of this class as a shim between a caller and a called method. For example, suppose we need to provide an important_callable that we expect to be called with specific arguments. When called, this callable will do some very important work that cannot be mocked out.

In testing, instead of mocking out the callable, we can wrap it in a shim:

important_shim = MockCallable()
important_shim.wraps(important_callable)

This way, important_callable will still be called, but we can also assert on the call(s) as they pass through the shim:

important_shim.assert_call(0.0)

Note: calls to this method override any previous call to configure_mock().

Parameters:

wrapped (Callable) – a callable for this mock callable to wrap

Return type:

None

class ska_tango_testing.mock.MockCallableGroup(*callables, timeout=1.0, **special_callables)

This class implements a group of callables.

__init__(*callables, timeout=1.0, **special_callables)

Initialise a new instance.

Parameters:
  • callables (str) – names of simple callables in this group; that is, callables that do not need a special characterizer.

  • timeout (Optional[float]) – number of seconds to wait for the callable to be called, or None to wait forever. The default is 1.0 seconds.

  • special_callables (Callable[[Dict[str, Any]], Dict[str, Any]]) – keyword argument for special callables that need a special characterizer. Each argument is of the form callable_name=characterizer.

assert_against_call(callable_name, lookahead=None, consume_nonmatches=False, **kwargs)

Assert that the specified callable has been called as characterised.

Parameters:
  • callable_name (str) – name of the callable that we are asserting to have been called

  • lookahead (Optional[int]) – The number of calls to examine in search of a matching call. The default is 1, which means we are asserting against the next call.

  • consume_nonmatches (bool) – whether calls that are examined but do not match should be consumed rather than left on the queue to be examined by future assert calls.

  • kwargs (Any) –

    the characteristics that we are asserting the call to have. All call have

    • call_args and call_kwargs characteristics. For example,

      example_callback("a", b=1)
      example_callback("c", d=1)
      
      assert_against_call(
          "example",
          lookahead=2,
          call_args=("c",),
          call_kwargs={"d": 1},
      )
      

      asserts that one of the next two calls to callback “a” will have the signature (“b”, c=1).

    • argN characteristics for N up to the number of positional arguments. For example, if a callable was called with three positional arguments, the captured call will have characteristics arg0, arg1 and arg2:

      assert_against_call(
          "example",
          lookahead=2,
          arg0="c",
      )
      
    • a characteristic for each keyword argument. For example, if a callable was called with keyword argument power=PowerState.ON, then the captured call will have characteristic power with value PowerState.ON:

      assert_against_call(
          "a",
          lookahead=2,
          d=1,
      )
      

    If a characterizer was provided for the callback in this group’s constructor, then there may be other characteristics that this method can assert against. For example, suppose we expect callable “a” to have been called with signature

    callable_a(
        named_tuple(name="a", value=2, timestamp=1234567890)
    )
    

    but the timestamp is unknown. If we don’t know the timestamp then we can’t construct an equal object to assert:

    assert_against_call(
        "a",
        arg0=named_tuple(name="a", value=2, timestamp=UNKNOWN)
    )
    

    Instead we can provide a characterizer that unpacks the “name” and “value” arguments for us, and then

    assert_against_call(
        "a",
        lookahead=2,
        name="a",
        value=2,
    )
    

Return type:

Dict[str, Any]

Returns:

details of the call

Raises:

AssertionError – if the asserted call has not occurred within the timeout period

assert_call(callable_name, *args, **kwargs)

Assert that the specified callable has been called as specified.

For example, assert_call(“a”, “b”, c=1, lookahead=2) will assert that one of the next 2 calls to callable “a” will have call signature (“b”, c=1).

This is syntactic sugar, which simplifies the expression of assertions, but also muddles up the arguments to assert_call with the arguments that we are asserting the call to have. It is equivalent to the more principled and flexible, but long-winded:

assert_against_call(
    "a",
    call_args=("b",),
    call_kwargs={"c": 1},
    lookahead=2,
)
Parameters:
  • callable_name (str) – name of the callable that we are asserting to have been called

  • args (Any) – positional arguments asserted to be in the call.

  • kwargs (Any) –

    If a “lookahead” keyword argument is provided, this specifies the number of calls to examine in search of a matching call. The default is 1, in which case we are asserting against the next call.

    If a “consume_nonmatches” keyword argument is provided, this indicates whether calls that are examined but do not match should be consumed rather than left on the queue to be examined by future assert calls.

    All other keyword arguments are keyword arguments asserted to be in the call.

Return type:

Dict[str, Any]

Returns:

details of the call

Raises:

AssertionError – if the asserted call has not occurred within the timeout period

assert_not_called()

Assert that no callable in this group has been called.

Raises:

AssertionError – if one of the callables in this group has been called.

Return type:

None

class ska_tango_testing.mock.MockConsumerGroup(producer, categorizer, timeout, *consumers, **special_consumers)

A group of consumers of items from a single producer.

__init__(producer, categorizer, timeout, *consumers, **special_consumers)

Initialise a new instance.

Parameters:
  • producer (Callable[[Optional[float]], TypeVar(ItemType)]) – the producer from which this consumer gets items

  • categorizer (Callable[[Any], str]) – a callable that categorizes items.

  • timeout (Optional[float]) – optional number of seconds to wait for an item. If omitted, the default is 1 second. If explicitly set to None, the wait is forever.

  • consumers (str) – list of simple consumers in this group

  • special_consumers (Optional[Callable[[Any], Dict]]) – keyword arguments specifying special consumers in this group. Consumers are special if they have their own characterizer. Here, each key-value pair is the name of the consumer and the characterizer that it uses.

assert_item(*args, lookahead=1, consume_nonmatches=False, **kwargs)

Assert that an item is available in any category.

Parameters:
  • args (Any) – a single optional positional argument is allowed. If provided, it is asserted that there is an item available that is equal to the argument.

  • lookahead (int) – how many items to look through for the item that we are asserting. The default is 1, in which case we are asserting what the very next item will be. This will be the usual case in deterministic situations where we know the exact order in which items will arrive. In non-deterministic situations, we can provide a higher value. For example, a lookahead of 2 means that we are asserting the item will be one of the first two items.

  • consume_nonmatches (bool) –

    whether to consume items that were examined but did not match the assertion.

    An example where we would set this to True is: we have changed the target fan speed from 3000 to 6000 RPM. We want to assert that the fan speed will become 6000, but we know it will reach that speed only gradually. We expect to see a sequence of items something like [3859, 5104, 5934, 6001], so we assert like:

    assert_item(
        "fan_speed",
        pytest.approx(6000, abs=10),
        lookahead=4,
        consume_nonmatches=True,
    )
    

    The first three items do not match, but they are still consumed. The fourth items matches, and hence the assertion passes.

  • kwargs (Any) – characteristics that the item is expected to have

Return type:

Dict[str, Any]

Returns:

the matched item

assert_no_item()

Assert that no item is available in any category.

Return type:

None

ska_tango_testing.mock.placeholders

This module provides some special cases for equality checking.

Two special cases are provided: Anything and OneOf:

  • Anything can be used as a placeholder for assertions, in situations

    where any item should be matched.

    For example, suppose we want to assert a call with keyword arguments name, value and timestamp, but we don’t know exactly what the value of the timestamp will be. One way to make such an assertion is

    from ska_tango_testing.mock.placeholders import Anything
    
    mock_callback.assert_call(
        name="voltage",
        value=0.0,
        timestamp=Anything,
    )
    

    and this assertion will match irrespective of the actual value of the timestamp keyword.

  • OneOf can be used as a placeholder for assertions, in situations

    where we want to assert that the item will be a member of a specified set. See below for details.

class ska_tango_testing.mock.placeholders.OneOf(*options)

Equality placeholder that is equal if any of its args is equal.

When first initialised, an object of this class is provided with some number of arguments. Whenever we check if this object is equal to some other object, it returns True if and only if the other object is equal to one of its arguments.

This can be thus used as an assertion placeholder in situations where one does not know precisely what value will be returned:

from ska_tango_testing.mock.placeholders import OneOf

mock_callback.assert_call(
    name="state",
    value=OneOf(DevState.ON, DevState.ALARM),
)

and this assertion will match as long as one of the arguments to OneOf is met.

__init__(*options)

Initialise a new instance.

Parameters:

options (Any) – any number of options against which to check equality.

ska_tango_testing.mock.tango

This subpackage provides test harness for testing SKA Tango devices.

class ska_tango_testing.mock.tango.MockTangoEventCallbackGroup(*callables, timeout=1.0, assert_no_error=True)

This class implements a group of Tango change event callbacks.

__init__(*callables, timeout=1.0, assert_no_error=True)

Initialise a new instance.

Parameters:
  • callables (str) – positional arguments providing the names of callables in this group.

  • timeout (Optional[float]) – number of seconds to wait for the callable to be called, or None to wait forever. The default is 1.0 seconds.

  • assert_no_error (bool) – defaults to True, in which case this callback group will assert that each event to arrive is not an error event. Tests can then proceed on that assumption. If False, this callback group will not assert that events are not error events, but rather will return “err” and “errors” values. Tests then have to be written to check for error events.

assert_change_event(callback_name, attribute_value, lookahead=None, consume_nonmatches=False)

Assert that the callback received a change event with the given value.

Parameters:
  • callback_name (str) – name of the change event callback that we are asserting to have been called

  • attribute_value (Any) – new value of the attribute for which the change event has been sent

  • lookahead (Optional[int]) – The number of events to examine in search of a matching call. The default is 1, which means we are asserting against the next call.

  • consume_nonmatches (bool) – whether events that are examined but do not match should be consumed rather than left on the queue to be examined by future assert calls.

Return type:

Dict[str, Any]

Returns:

details of the change event

Raises:

AssertionError – if the asserted call has not occurred within the timeout period

ska_tango_testing.integration

A set of utility tools for integration testing of SKA Tango devices.

This module provides a set of utility tools for integration testing of SKA Tango devices. In particular, it provides tools to subscribe to events, query them (within a timeout), log them in real-time, and build complex queries and assertions to verify the behaviour of a complex set of devices.

For a quick start, you can use the TangoEventTracer class to subscribe to events from a Tango device and then use it with the custom assertions provided by ska_tango_testing.integration.assertions to make assertions on the received events.

from assertpy import assert_that
from ska_tango_testing.integration import TangoEventTracer

def test_a_device_changes_state_when_triggered():

    # create the tracer
    tracer = TangoEventTracer()

    # subscribe to events from a device
    tracer.subscribe_event("sys/tg_test/1", "obsState")

    # do something that triggers the event
    # ...

    # use an assertion to check a state change happened
    assert_that(tracer).described_as(
        "The device should change state"
    ).within_timeout(10).has_change_event_occurred(
        device_name="sys/tg_test/1",
        attribute_name="obsState",
        current_value="ON",
        previous_value="OFF",
    )

If you need to log events in real-time, you can use a quick utility function log_events() to log events from a set of devices and attributes. This is useful for debugging purposes and to see which events are received in real-time while running a test.

# (other imports)

from ska_tango_testing.integration import log_events

def test_a_device_changes_state_when_triggered():

    # log events in real-time
    log_events({
        "sys/tg_test/1": ["obsState"],
        "sys/other_device/100": ["attr1", "attr2"],
    })

    # (rest of the test)

For more advanced usage of the event tracer, we recommend reading the documentation of the TangoEventTracer class, and then give a look at ska_tango_testing.integration.assertions, ska_tango_testing.integration.event, and ska_tango_testing.integration.query.

For more advanced usage of the event logger, we recommend reading the documentation of the TangoEventLogger class.

class ska_tango_testing.integration.TangoEventTracer(event_enum_mapping=None)

Tango proxy client which can trace change events from Tango devices.

MISSION: to represent a tango proxy client that can subscribe to change events from multiple attributes and multiple devices, store the received events as they are notified (in a thread-safe way), and support queries with timeouts to check if and when and who sent certain events.

This class allows you to:

  • subscribe to change events for a specific attribute of a Tango device (see subscribe_event());

  • store and access the events in a thread-safe way (see events);

  • query the stored events based on a predicate function that selects which events satisfy some criteria and a timeout, which permits you to wait for that criteria to be satisfied (see query_events()) or based on any custom query (see evaluate_query()).

Here there follows 3 usage examples: a first very minimal one, a second one more suitable for most of the end-users, and a third one that shows how you can evaluate any kind of custom object-queries.

Usage Example 1: test where you subscribe to a device and assert that it exists exactly one state change event to a TARGET_STATE within 10 seconds:

def test_attribute_change():

    tracer = TangoEventTracer()
    tracer.subscribe_event("sys/tg_test/1", "State")

    # do something that triggers the event
    # ...

    assert len(tracer.query_events(
        lambda e:
            e.has_device("sys/tg_test/1") and
            e.has_attribute("State") and
            e.attribute_value == TARGET_STATE,
        timeout=10)) == 1

Usage Example 2: as an end-user of this module, you can combine this tracer with assertpy custom assertions to write readable and powerful tests. Here is an example of how to use the assertions we provide in ska_tango_testing.integration.assertions:

from assertpy import assert_that

def test_attribute_change(tracer): # tracer is a fixture

    tracer.subscribe_event("sys/tg_test/1", "State")

    # do something that triggers the event

    assert_that(tracer).described_as(
        "There must be a state change from "
        "INITIAL_STATE to TARGET_STATE within 10 seconds."
    ).within_timeout(10).has_change_event_occurred(
        device_name="sys/tg_test/1",
        attribute_name="State",
        attribute_value=TARGET_STATE,
        previous_value=INITIAL_STATE,
    )

Usage Example 3: evaluate any kind of query. The usage shown in the first example is just a quick shortcut to use the tracer in a simplified way. The tracer, potentially, can evaluate any kind of query object, given it is a subclass of EventQuery. You can find a collection of (configurable) query objects in the query module. Here an example of how you can make the same exact interrogation of the first example, but using query objects:

query = NSateChangesQuery(
    device_name="sys/tg_test/1",
    attribute_name="State",
    attribute_value=TARGET_STATE,
    timeout=10,
)
tracer.evaluate_query(query)

Tracer Mechanics: here it follows a brief explanation of how the tracer works internally. The tracer is a tool that captures, stores and interrogates events. It does so by using four main support classes:

  • ReceivedEvent is the class that represents the events captured by the tracer;

  • TangoSubscriber is used to subscribe to the events and react to new events by storing them in the event storage;

  • EventStorage is used to store the events in a thread-safe way and to update all the pending queries when a new event is received;

  • EventQuery and its subclasses are the interrogations on the stored events.

The queries are reactive objects capable of putting the process in a waiting state until the query is satisfied or the timeout is reached and in the meantime auto-update themselves when new events are received.

Thread Safety: the tracer is thread-safe. It uses a thread-safe storage for the events and a thread-safe subscriber to the events. The only part that is not yet fully thread-safe are the queries. The queries base class is thread-safe, but the custom queries you can write may expose variables that could be not thread-safe (so, don’t access queries variables from outside until the evaluation is done and you will be safe; query base method such as status(), describe(), etc. are instead thread safe and can be accessed).

__init__(event_enum_mapping=None)

Initialize the event collection and the lock.

Parameters:

event_enum_mapping (Optional[dict[str, type[Enum]]]) – An optional mapping of attribute names to enums (to handle typed events).

clear_events()

Clear all stored events.

Return type:

None

evaluate_query(query)

Evaluate a query over the current and future captured events.

A EventQuery is a query over the tracer’s present and eventually future events (if a timeout is specified). This method takes an already built and not yet evaluated query object and evaluates it. The evaluation is a blocking operation that waits for the query to be satisfied or for the timeout to be reached.

To know more about the queries, please check the query module, where you will find the base class and some already built query objects you can use.

This method returns nothing, because eventual query results are supposed to be accessed through the query object itself. The most basic and common result is its success status, which can be accessed through the succeeded() method.

Parameters:

query (EventQuery) – The query to evaluate.

Raises:

ValueError – If the query you are trying to evaluate is already being evaluated by another thread.

Return type:

None

property events: list[ReceivedEvent]

A copy of the currently stored events (thread-safe).

Returns:

A copy of the stored events.

query_events(predicate, timeout=0.0, target_n_events=1)

Query stored and future events with a predicate and a timeout.

This method is a shortcut that lets you select the events that match a certain criteria (predicate), optionally waiting for a certain time span (timeout) if the criteria are not satisfied immediately. The method returns all the matching events or an empty list if there are any. The predicate is essentially a function that takes a ReceivedEvent as input and evaluates if the event matches the desired criteria (returning True if it does) or not (False otherwise).

NOTE: If you don’t provide a timeout, the method will evaluate all the events that are already stored and return immediately the matching ones. If you provide a timeout, the method will act as a a blocking operation that waits target_n_events to match the predicate or the timeout to be reached.

Usage example:

# (you already made the right subscriptions)

# query just past events to get all events from a device X
# with attribute Y
all_events = tracer.query_events(
    lambda e: e.has_device("sys/tg_test/1") and
              e.has_attribute("State")
              # NOTE: making this call instead of
              # e.attribute_value == "State" prevents
              # case sensitivity issues
)

# query events aiming to get at least one event
# from device X with attribute Y that has a certain value
# (waiting at most 10 seconds if the event is not there yet)
future_query = tracer.query_events(
    # you can use directly the device proxy instead of the name
    lambda e: e.has_device(X_dev_proxy) and
              e.has_attribute("State") and
              e.current_value == TARGET_STATE,
    timeout=10
)

FINAL NOTE: this method is a shortcut to use the tracer in a simplified way. The tracer, potentially, can evaluate any kind of query object. Please check evaluate_query() for more.

Parameters:
  • predicate (Callable[[ReceivedEvent], bool]) – A function that takes an event as input and returns. True if the event matches the desired criteria.

  • timeout (SupportsFloat) –

    The time span in seconds to wait for a matching event (optional). If not specified or passed 0, the method returns immediately.

    NOTE: if the timeout is < 0 or infinite, it will be considered 0. None values are in theory not supported, but they are converted to 0.0 to guarantee retro-compatibility.

    TECHNICAL NOTE: Timeout may not always be a number but something that can be casted to a float. This is useful for guaranteeing retro-compatibility in custom assertions written before 0.7.2, where the timeout was a number and not an object and some users may still have code where they directly pass the timeout object, ignoring that now it is not a number anymore.

  • target_n_events (int) –

    How many events do you expect to find with this query? If in past events (events which happen prior to the moment in which the query is evaluated) you don’t reach the target number, the method will wait till you reach the target number or you reach the timeout. Defaults to 1 so in case of a waiting loop, the method will return the first event.

    If you set this to a number greater than 1 (and ``timeout`` is not ``None``) the method will wait until you reach the target number of events that match the predicate. E.g., if you set this to 10, at query time there are 4 matching events it will wait for 6 more events to match the predicate. If you set this to 10, at query time there are 12 matching events it will return immediately all the 12 matching events.

    It must be greater or equal to 1.

Return type:

list[ReceivedEvent]

Returns:

all matching events within the timeout period if there are any, or an empty list if there are none.

Raises:

ValueError – If the timeout or the target number of events does not meet the requirements (see above).

subscribe_event(device_name, attribute_name, dev_factory=None)

Subscribe to change events for a Tango device attribute.

It’s the same as subscribing to a change event in a Tango device, but the received events are stored in the tracer instance (in a thread-safe way) and can be accessed later with query_events(), with events or with custom assertions. Every time a change event will happen on Tango device attribute, the tracer will receive it and store it.

Usage example:

# you can provide just the device name and the attribute name
tracer.subscribe_event("sys/tg_test/1", "State")

# if you already have a device proxy, you can pass it directly
tracer.subscribe_event(device_proxy, "State")

# if you have the name of the device, but for some reason
# you don't want us to create the device proxy using the
# default constructor DeviceProxy, you can provide a factory method

def custom_factory(device_name: str) -> tango.DeviceProxy:
    return tango.DeviceProxy(device_name)

tracer.subscribe_event(
    "sys/tg_test/1", "State",
    dev_factory=custom_factory
)

NOTE: when you subscribe to an event, you will automatically receive the current attribute value as an event (or, in other words, the last “change” that happened). Take this into account when you write your queries.

NOTE: if you subscribe to the same attribute of the same device multiple times, the subscription will NOT be duplicated.

Parameters:
  • device_name – The name of the Tango target device. Alternatively, if you already have a device proxy, you can pass it directly.

  • attribute_name – The name of the attribute to subscribe to.

  • dev_factory – A device factory method to get the device proxy. If not specified, the device proxy is created using the default constructor tango.DeviceProxy.

Raises:
  • tango.DevFailed – If the subscription fails. A common reason for this is that the attribute is not subscribable (because the developer didn’t set it to be “event-firing” or pollable). An alternative reason is that the device cannot be reached or it has no such attribute.

  • ValueError – If the device_name is not a string or a DeviceProxy.

unsubscribe_all()

Unsubscribe from all subscriptions.

Return type:

None

ska_tango_testing.integration.log_events(device_attribute_map, dev_factory=None, event_enum_mapping=None)

Log events from a set of devices and attributes.

Quick utility function to log events from a set of devices and attributes. It uses a ska_tango_testing.integration.logger.TangoEventLogger instance to log the events in real-time using the default logger. This is useful for debugging purposes and to see the events in real-time while running a test.

Usage example:

# basic usage
log_events({
    "sys/tg_test/1": ["attr1", "attr2"],
    "sys/tg_test/2": ["State"],
})

# usage with proxy instead of device name
log_events({dev_proxy: ["attr"]})

# usage providing a custom factory to create the device proxy
log_events({
    "sys/tg_test/1": ["attr1", "attr2"],
    "sys/tg_test/2": ["State"],
}, dev_factory=my_custom_dev_factory)

For more advanced usage, you can see ska_tango_testing.integration.logger.TangoEventLogger class directly, which allows you to customize the logging policy (filtering some messages) and the message builder (formatting the messages in a custom way).

NOTE: some events attributes even if technically they are primitive types (like integers or strings), they can be semantically typed with an Enum (e.g., a state machine attribute can be represented as an integer, but it is semantically a state). To handle those cases, when you call log_events(...), you can provide a mapping of attribute names to enums through the event_enum_mapping parameter (see the ska_tango_testing.integration.event.EventEnumMapper class). Typed events attribute values will be logged using the corresponding Enum labels instead of the raw values.

Parameters:
  • device_attribute_map (dict[str | tango.DeviceProxy, list[str]]) – A dictionary mapping devices to a list of attribute names you are interested in logging. Each device could be specified either as a device name (str) or as a tango.DeviceProxy instance.

  • dev_factory (Optional[Callable[[str], DeviceProxy]]) – An optional factory function that can be used instead of the default tango.DeviceProxy constructor (if you need to customize the device proxy creation).

  • event_enum_mapping (Optional[dict[str, type[Enum]]]) – An optional dictionary to map attribute names to Enums. By default, it is an empty dictionary.

Return type:

TangoEventLogger

Returns:

The TangoEventLogger instance that is used to log the given events.

ska_tango_testing.integration.assertions

Custom assertions and related utilities for integration tests.

Basic custom event-based assertions for TangoEventTracer.

This module provides some basic custom assertpy assertions to be used with TangoEventTracer instances to assert that certain events have occurred or not occurred.

Essentially, assertions are query calls to the tracer, within a timeout, to check if there are events which match an expected more or less complex predicate, which include:

  • specifics about the source of the event (device name and attribute name);

  • specifics about the event value (e.g., the event value is 5);

  • specifics about the event value change (e.g., the event value changes from “old_value” to “new_value”);

  • specifics about how many events with certain characteristics you expect to have occurred;

  • further custom matching rules;

  • a way to define a timeout (a maximum time to wait for an assertion to pass or fail) and share it between all the chained assertions (i.e., verify that multiple conditions are met within the same timeout);

  • a way to stop the evaluation of some assertions early and trigger a special failure if a certain condition is met.

Assertions are designed to be used in a chain (in a classic assertpy style), where each assertion is called after the previous one, and the timeout is shared between all of them. At the moment, the main assertions you can use are:

  • has_change_event_occurred(), which asserts that one or more events have occurred (within a timeout);

  • hasnt_change_event_occurred(), which is the negation of the previous one and so asserts that no events occur within a timeout;

  • within_timeout(), which is the way you have to set a timeout for the next chain of assertions.

  • with_early_stop(), which is a way to stop the evaluation of the chain of assertions early if a certain condition is met.

Usage example:

from assertpy import assert_that, add_extension
from ska_tango_testing.integration import (
    TangoEventTracer
)
from ska_tango_testing.integration.assertions (
    has_change_event_occurred,
    within_timeout,
)

def test_event_occurs_within_timeout(sut, tracer: TangoEventTracer):

    # subscribe to the events
    tracer.subscribe_event("devname", "attrname")
    tracer.subscribe_event("devname", "attr2")

    # ... do something that triggers the event

    # Check that a generic event has occurred
    assert_that(tracer).has_change_event_occurred(
        device_name="devname",
        attribute_name="attrname",
        attribute_value=5,
    )

    # Check that an attr change from "old_value" to "new_value"
    # has occurred or will occur within 5 seconds in any device
    # without errors.
    # Describe the eventual failure with an evocative message.
    assert_that(tracer).described_as(
        "An event from 'old_value' to 'new_value' for 'attr2' should have"
        " occurred within 5 seconds in some device."
    ).within_timeout(5).with_early_stop(
       # if I detect an event that contains the "error" keyword,
       # the evaluation will fail.
       # if there is a timeout, the evaluation will be interrupted
       lambda event: str(event.attribute_value).contains("error")
    ).has_change_event_occurred(
        # (if I don't care about the device name, ANY will match)
        attribute_name="attr2",
        attribute_value="new_value",
        previous_value="old_value",
    )

If you feel those assertions aren’t enough for your test cases, you can create your own custom assertions. The assertions provided here may serve as examples. If you are willing to create your own assertions, we suggest:

NOTE: Custom assertions of this module are already exported to the assertpy context in ska_tango_testing.integration, so if you are an end-user, if you import the module somewhere in your tests you already have access to the assertions. Sometimes your IDE may not recognise the custom assertions, but they are there.

ANOTHER NOTE: To make assertions about the events order - i.e., assertions which include a verification with the shape “event1 happens before event2”, like when you use previous_value - we are currently using the reception time (reception_time) as a way to compare events. It’s important to remember that we are dealing with a distributed system and the reception time may be misleading in some cases (e.g., the reception time of the event may not be the same as the time the event was generated by the device). We noticed that in tango.EventData there is a timestamp which tells when the Tango server received the event. Maybe in the future it would be better to use that instead of the reception time as a way to compare events (if it comes from a centralised server and not from the device itself, because it is important to remember that in distributed systems the devices’ clocks may not be perfectly synchronised).

class ska_tango_testing.integration.assertions.ChainedAssertionsTimeout(timeout)

A utility for using the same timeout for multiple chained assertions.

(It is used internally)

This class is used to set a timeout once and share it between multiple chained assertions. It permits you to:

  • Initialise the timeout once, with a specified value in seconds.

  • Start the timeout

  • In various moments, get an updated timeout value that is the remaining time from the initial timeout.

By default, the initialisation is done when within_timeout() is called. The updated timeout should then be used in the next chained assertions. When you print an error message, you can also access the original timeout value.

Usage example:

if this is the assertion code you want to achieve (where the three events must occur within the same 10 seconds timeout):

assert_that(event_tracer).within_timeout(10).has_change_event_occurred(
    ...
).has_change_event_occurred(
    ...
).has_change_event_occurred(
    ...
)

You can use the ChainedAssertionsTimeout inside the has_change_event_occurred method like this:

def has_change_event_occurred(self, ...):
    # ... some code ...
    timeout = self.timeout.get_remaining_timeout()
    query = tracer.query_events(..., timeout=timeout)
    # ... some code ...

    # if I need to access the original timeout value
    # (e.g., for composing an error message)
    error_message = (
        "Expected a change event to occur within"
        f" {self.timeout.initial_timeout} seconds"
    )

Some further notes:

  • For the evaluation to begin, you have to call the start() method, which is automatically called in the within_timeout() assertion. The method can be called many times, but it will only set the start time once.

  • You can directly pass a timeout object both to the within_timeout() assertion and to the query objects and methods. A casting to a float will automatically return the remaining timeout value. Sharing the same timeout object between multiple blocks of assertions is also possible and will lead to the same timeout value for all the blocks. E.g.,

timeout = ChainedAssertionsTimeout(10)

# this asserton block automatically starts the timeout
assert_that(tracer).within_timeout(timeout).has_change_event_occurred(
    ...
)

# this assertion block will access a decreased timeout value
assert_that(tracer).within_timeout(timeout).has_change_event_occurred(
    ...
)
  • The object is protected by an internal lock for ensuring eventual parallel access to the timeout object. This is probably strictly necessary (since those kinds of parallel accesses are edge cases), but we still do it for safety.

__init__(timeout)

Initialise a new timeout for chained assertions.

Parameters:

timeout (float) – The initial timeout value in seconds. If the timeout is < 0 or infinite, it will be set to 0.

get_remaining_timeout()

Get the remaining timeout value, since the initialization time.

Return type:

float

Returns:

The remaining timeout value in seconds. It is at least 0 and at most the initial timeout value. It will decrease over time.

static get_timeout_object(timeout)

Get a timeout object from a timeout value.

This method is a factory method that creates a new timeout object from a timeout value. If the timeout value is already a timeout object, it will return the same object.

Parameters:

timeout (SupportsFloat) – The timeout value in seconds, or a timeout object.

Return type:

ChainedAssertionsTimeout

Returns:

A timeout object.

property initial_timeout: float

Get the initial timeout value in seconds.

Returns:

The initial timeout value in seconds.

is_started()

Check if the timeout has been started.

Return type:

bool

Returns:

True if the timeout has been started, False otherwise.

start()

Start the timeout.

This method sets the start time of the timeout to the current time. It can be called multiple times, but it will only set the start time once.

Return type:

None

property start_time: datetime | None

Get the start time of the timeout.

Returns:

The start time of the timeout.

ska_tango_testing.integration.assertions.get_context_early_stop(assertpy_context)

Retrieve the early stop sentinel from the assertpy context.

(It is used internally by the chained assertions)

This function retrieves the early stop sentinel from the assertpy context. It is used internally by the chained assertions to get the sentinel predicate and evaluate it.

Parameters:

assertpy_context (Any) – The assertpy context object (It is passed automatically)

Return type:

Optional[Callable[[ReceivedEvent], bool]]

Returns:

The sentinel predicate function or None if it is not set.

ska_tango_testing.integration.assertions.get_context_timeout(assertpy_context)

Get the timeout value from the given assertpy context.

(It is used internally)

This function retrieves the timeout value from the given assertpy context. It is used internally in the assertions to get the timeout value to use in the next chained assertions. To set the timeout, you should use the within_timeout() assertion.

(It is used internally)

Parameters:

assertpy_context (Any) – The assertpy context object

Return type:

SupportsFloat

Returns:

An object that supports float operations, representing the timeout value in seconds for this context.

ska_tango_testing.integration.assertions.get_context_tracer(assertpy_context)

Get the TangoEventTracer instance from the assertpy context.

(It is used internally)

Helper method to get the TangoEventTracer instance from the assertpy context which is stored in the ‘val’. It fails if the instance is not found.

Parameters:

assertpy_context (Any) – The assertpy context object.

Return type:

TangoEventTracer

Returns:

The TangoEventTracer instance.

Raises:

ValueError – If the TangoEventTracer instance is not found (i.e., the assertion is not called with a tracer instance).

ska_tango_testing.integration.assertions.has_change_event_occurred(assertpy_context, device_name=None, attribute_name=None, attribute_value=None, previous_value=None, custom_matcher=None, min_n_events=1)

Verify that an event matching a given predicate occurs.

Custom assertpy assertion to verify that a certain event occurs, eventually within a specified timeout. When it fails, it provides a detailed error message with the events captured by the tracer, the passed parameters and some timing information.

If you wish, you can also specify a minimum number of events that must match the predicate (through the min_n_events parameter), to verify that at least a certain number of events occurred [within the timeout]. By default, it checks that at least one event matches the predicate.

To describe the event to match, you can pass the following parameters (all optional):

  • the name of the device you are interested in

  • the name of the attribute you are interested in

  • the current value of the attribute (the value that the attribute has when the event is captured)

  • the previous value of the attribute (the value that the attribute had before the event is captured - pretty useful to catch state transitions from a value to another)

  • an arbitrary predicate over the event (to deal with tricky cases where a simple value comparison is not enough or is not possible)

Usage example:

# (given a subscribed tracer)

# Check that an attr change from "old_value" to "new_value"
assert_that(tracer).has_change_event_occurred(
    device_name="devname",
    attribute_name="attrname",
    attribute_value="new_value",
    previous_value="old_value",
)

# Just check that there is an event with the value "new_value"
# (from any device and with any previous value)
assert_that(tracer).has_change_event_occurred(
    attribute_name="attrname",
    attribute_value="new_value",
)

# Add an arbitrary condition
assert_that(tracer).has_change_event_occurred(
    attribute_name="other_attrname",
    custom_matcher=lambda e: e.attribute_value > 5,
)

# Perform the same check, but look for AT LEAST 3 matching events.
assert_that(tracer).has_change_event_occurred(
    attribute_name="attrname",
    attribute_value="new_value",
    min_n_events=3,
)
Parameters:
  • assertpy_context – The assertpy context object (It is passed automatically)

  • device_name – The device name to match. If not provided, it will match any device name.

  • attribute_name – The attribute name to match. If not provided, it will match any attribute name.

  • attribute_value – The current value to match. If not provided, it will match any current value.

  • previous_value – The previous value to match. If not provided, it will match any previous value.

  • custom_matcher – An arbitrary predicate over the event. It is essentially a function or a lambda that takes an event and returns True if it satisfies your condition. NOTE: it is put in and with the other specified parameters.

  • min_n_events – The minimum number of events to match for the assertion to pass; verifies that at least n events have occurred. If not provided, it defaults to 1. If used without a timeout, the assertion will only check events received up to the time of calling. If specified, it must be a positive integer >= 1.

Returns:

The assertpy context object.

Raises:

ValueError – If the TangoEventTracer instance is not found (i.e., the method is called outside an assert_that(tracer) context).

ska_tango_testing.integration.assertions.hasnt_change_event_occurred(assertpy_context, device_name=None, attribute_name=None, attribute_value=None, previous_value=None, custom_matcher=None, max_n_events=1)

Verify that an event matching a given predicate does not occur.

It is the opposite of has_change_event_occurred(). It verifies that no event(s) matching the given conditions occurs, eventually within a specified timeout. When it fails, it provides a detailed error message with the events captured by the tracer, the passed parameters and some timing information.

If you wish, you can also specify a maximum number of events that must match the predicate (through the max_n_events parameter), to verify that no more than a certain number of events occurred [within the timeout]. By default, it checks that no more than one event matches the predicate.

The parameters are the same as has_change_event_occurred().

Usage example:

# (given a subscribed tracer)

# Check that none of the captured events has the value "ERROR"
assert_that(tracer).hasnt_change_event_occurred(
    attribute_value="ERROR",
)
Parameters:
  • assertpy_context – The assertpy context object (It is passed automatically)

  • device_name – The device name to match. If not provided, it will match any device name.

  • attribute_name – The attribute name to match. If not provided, it will match any attribute name.

  • attribute_value – The current value to match. If not provided, it will match any current value.

  • previous_value – The previous value to match. If not provided, it will match any previous value.

  • custom_matcher – An arbitrary predicate over the event. It is essentially a function or a lambda that takes an event and returns True if it satisfies your condition. NOTE: it is put in and with the other specified parameters.

  • max_n_events – The maximum number of events to match before the assertion fails; verifies that no more than n-1 events have occurred. If not provided, it defaults to 1. If used without a timeout, the assertion will only check events received up to the time of calling. If specified, it must be a positive integer >= 1.

Returns:

The assertpy context object.

Raises:

ValueError – If the TangoEventTracer instance is not found (i.e., the method is called outside an assert_that(tracer) context).

ska_tango_testing.integration.assertions.with_early_stop(assertpy_context, sentinel_predicate=None)

Define a sentinel predicate to stop the chained assertions early.

This function is an assertpy extension that permits setting a sentinel predicate for the following chained TangoEventTracer assertions. The sentinel predicate is essentially a function that receives a ReceivedEvent and determines if the evaluation of the chained assertions should stop early. The function acts as a sort of “sentinel” and evaluates all the new (and old) events every time a new event is received by the tracer. If the function returns True for some event, the evaluation of the chained assertions immediately stops and a failure is raised.

This is particularly useful when a long timeout is set (e.g., because of network delays, slow systems, etc.) but occasionally you are able to detect an error early and thus save a lot of time by avoiding a useless long wait. If you use the early stop sentinel without a timeout, it will still evaluate all the events once and fail if the sentinel predicate is met, even if your assertion would not have failed (in other words, the early stop criteria always have priority).

Usage example:

LONG_TIMEOUT = 250  # seconds
assert_that(event_tracer).described_as(
    "A set of events must occur within a long timeout "
    "AND no error code should be detected in the meantime."
).within_timeout(
    LONG_TIMEOUT
).with_early_stop(
    lambda event: event.has_attribute("longRunningCommandResult") and
        "error code 3: exception" in str(event.attribute_value)
).has_change_event_occurred(
    ...
).has_change_event_occurred(
    ...
).has_change_event_occurred(
    ...
)

NOTE: if you chain multiple with_early_stop assertions, at the moment only the last one will be considered. In the future, we may consider supporting multiple sentinel predicates. At the moment, if you want to have multiple sentinel predicates, you should combine them in a single function (e.g., with an and operator). At the moment, if you chain a with_early_stop(None) assertion after other with_early_stop assertions, it will deactivate them.

NOTE: the behaviour of the early stop sentinel will likely not work for your old custom assertions. If you have some, please update them to use the new mechanisms and explicitly check the sentinel predicate as done in the new assertions.

Parameters:
  • assertpy_context (Any) – The assertpy context object (It is passed automatically)

  • sentinel_predicate (Optional[Callable[[ReceivedEvent], bool]]) –

    The sentinel predicate to stop the chained assertions evaluation early. It is a function that receives a ReceivedEvent and returns a boolean. If the predicate returns True, it means that it detected a condition that requires stopping the evaluation of the chained assertions.

    IMPORTANT NOTE: The sentinel predicate is evaluated every time a new event is received by the tracer. This means that it will evaluate a very heterogeneous set of events, so make it solid and robust.

Return type:

Any

Returns:

The decorated assertion context, with the given sentinel predicate stored in the context ‘early_stop’ attribute.

ska_tango_testing.integration.assertions.within_timeout(assertpy_context, timeout)

Add a timeout for the next chain of tracer assertions.

TangoEventTracer allows querying events within a timeout. In other words, you can make assertions about events that will occur in the future within a certain time frame and “await” for them (if they didn’t occur yet). This method when called inside an assertion context permits you to set a timeout for the next chain of assertions.

IMPORTANT NOTE: The timeout, like one may intuitively expect, is shared between all the chained assertions. This means that if you set a timeout of 10 seconds and you have 3 chained assertions, the total time to wait for all the events to occur is 10 seconds, not 30 seconds. Concretely, each assertion will consume some time from the timeout, until it reaches zero.

Usage example:

# (given a subscribed tracer)

# non-blocking long operation that triggers an event at the end
sut.long_operation_that_triggers_an_event()

# Check that the operation is done within 30 seconds
assert_that(tracer).within_timeout(30).has_change_event_occurred(
    attribute_name="operation_state",
    attribute_value="DONE",
)

Alternatively, when you want to verify a set of events occurring within a certain shared timeout:

# Check that the 3 events occur within 30 seconds
assert_that(tracer).within_timeout(30).has_change_event_occurred(
    attribute_name="operation_state",
    attribute_value="INITIAL_STATE",
).has_change_event_occurred(
    attribute_name="operation_state",
    attribute_value="PROCESSING",
).has_change_event_occurred(
    attribute_name="operation_state",
    attribute_value="DONE",
)

# IMPORTANT NOTE: this will NOT verify that the events occur in the
# given order, just that they occur within the same timeout!

NOTE: this assertion always passes, its only purpose is to set the timeout for the following assertions.

NOTE: Using a (small) timeout is a good practice even in not so long operations, because it makes the test more robust and less prone to flakiness and false positives.

Parameters:
  • assertpy_context (Any) – The assertpy context object (It is passed automatically)

  • timeout (SupportsFloat) – The time in seconds to wait for the event to occur, or a timeout object that supports float operations. NOTE: you can pass a ChainedAssertionsTimeout object to share the same timeout between multiple blocks of assertions.

Return type:

Any

Returns:

The decorated assertion context, with a ChainedAssertionsTimeout instance stored in the event_timeout attribute.

ska_tango_testing.integration.logger

Tango proxy client which can log events from Tango devices.

ska_tango_testing.integration.logger.DEFAULT_LOG_ALL_EVENTS(_)

Log all events.

This is the default filtering rule for TangoEventLogger. It logs all events without any filtering. You can write custom rules by defining a function that takes a received event and returns a boolean. For example:

def custom_filter(e: ReceivedEvent) -> bool:
    return e.attribute_value > 10

logger.log_events_from_device(
    device, "attribute_name",
    filtering_rule=custom_filter
)

It could also be an inline lambda function. For example:

logger.log_events_from_device(
    device, "attribute_name",
    # log only events with attribute_value > 10
    filtering_rule=lambda e: e.attribute_value > 10
)
Parameters:

_ – The received event.

Return type:

bool

Returns:

always True.

ska_tango_testing.integration.logger.DEFAULT_LOG_MESSAGE_BUILDER(event)

Log the event in a human-readable format.

This is the default message builder for TangoEventLogger. It logs the events in a human-readable format, including the device name, attribute name, and the new value of the attribute.

You can write custom message builders by defining a function that takes a received event and returns a string. For example:

def custom_message_builder(e: ReceivedEvent) -> str:
    return (
        f"CUSTOM MESSAGE: At {e.reception_time}, {e.device_name} "
        + f"{e.attribute_name} changed to {e.attribute_value}."
    )

logger.log_events_from_device(
    device, "attribute_name",
    message_builder=custom_message_builder
)

It could also be an inline lambda function. For example:

logger.log_events_from_device(
    device, "attribute_name",
    # log using to string default method
    message_builder=lambda e: str(e)
)
Parameters:

event (ReceivedEvent) – The received event.

Return type:

str

Returns:

The message that will be logged.

class ska_tango_testing.integration.logger.TangoEventLogger(event_enum_mapping=None)

A Tango event logger that logs change events from Tango devices.

The logger subscribes to change events from a Tango device attribute and logs them using a filtering rule and a message builder. By default, all events are logged in a human-readable format.

The logger can be used to log events from multiple devices and attributes.

Usage example:

logger = TangoEventLogger()

# log all events from attribute "attr" of device "A"
logger.log_events_from_device("A", "attr")

# log only events from attribute "attr2" of device "A"
# when value > 10
logger.log_events_from_device(
    "A", "attr2",
    filtering_rule=lambda e: e.attribute_value > 10
)

# display a custom message when "B" changes its state
logger.log_events_from_device(
    "B", "State",
    message_builder=lambda e:
        f"B STATE CHANGED INTO {e.attribute_value}"
)

NOTE: some events attributes even if technically they are primitive types (like integers or strings), they can be semantically typed with an Enum (e.g., a state machine attribute can be represented as an integer, but it is semantically a state). To handle those cases, when you create an instance of the logger, you can provide a mapping of attribute names to enums (see the ska_tango_testing.integration.event.EventEnumMapper class). Typed events attribute values will be logged using the corresponding Enum labels instead of the raw values.

All messages are displayed with the INFO logging level, except the events containing errors that are displayed with the ERROR level.

__init__(event_enum_mapping=None)

Initialise the Tango event logger.

Parameters:

event_enum_mapping (Optional[dict[str, type[Enum]]]) – An optional mapping of attribute names to enums (to handle typed events).

log_events_from_device(device_name, attribute_name, filtering_rule=<function DEFAULT_LOG_ALL_EVENTS>, message_builder=<function DEFAULT_LOG_MESSAGE_BUILDER>, dev_factory=None)

Log change events from a Tango device attribute.

This method subscribes to change events from a Tango device attribute and logs them using a filtering rule and a message builder. By default, all events are logged in a human-readable format.

Usage example:

logger = TangoEventLogger()

# log all events from attribute "attr" of device "A"
logger.log_events_from_device("A", "attr")

# log only events from attribute "attr2" of device "A"
# when value > 10
logger.log_events_from_device(
    "A", "attr2",
    filtering_rule=lambda e: e.attribute_value > 10
)

# display a custom message when "B" changes its state
logger.log_events_from_device(
    "B", "State",
    message_builder=lambda e:
        f"B STATE CHANGED INTO {e.attribute_value}"
)

# subscribe specifying a custom device factory
def custom_factory(device_name: str) -> tango.DeviceProxy:
    return tango.DeviceProxy(device_name)

logger.log_events_from_device(
    "A", "attr",
    dev_factory=custom_factory
)

NOTE: when you subscribe to an event, you will automatically receive the current attribute value as an event (or, in other words, the last “change” that happened). Take this into account.

Parameters:
  • device_name – The name of the Tango target device (e.g., “sys/tg_test/1”) or a tango.DeviceProxy instance.

  • attribute_name – The name of the attribute to subscribe to.

  • filtering_rule – A function that takes a received event and returns whether it should be logged or not. By default, all events are logged. See DEFAULT_LOG_ALL_EVENTS() for more details.

  • message_builder – A function that takes a received event and returns the (str) message to log. By default, it logs the event in a human-readable format. See DEFAULT_LOG_MESSAGE_BUILDER() for more details.

  • dev_factory – A device factory method to get the device proxy. If not specified, the device proxy is created using the default constructor tango.DeviceProxy.

Raises:
  • tango.DevFailed – If the subscription fails. A common reason for this is that the attribute is not subscribable (because the developer didn’t set it to be “event-firing” or pollable). An alternative reason is that the device cannot be reached or it has no such attribute.

  • ValueError – If device_name is not a string or a tango.DeviceProxy instance.

unsubscribe_all()

Unsubscribe from all events.

Return type:

None

ska_tango_testing.integration.event

Event mechanism forming the basis of the tracer and the logger.

The foundation of the TangoEventTracer and the related logging mechanism is the capturing, storing, and reacting to Tango events. In practice, the tracer and the logging core mechanism are both based on the following classes and concepts:

  • ReceivedEvent is the base class to represent a received event from a Tango device.

  • A ReceivedEvent is usually generated through a subscription to a Tango device and attribute; the subscription is managed by a TangoSubscriber.

  • Particular kinds of events can be represented by ReceivedEvent subclasses, such as TypedEvent, which represents an event where the attribute value is supposed to be read as an Enum value.

  • (for the tracer) The events are stored in a EventStorage, which can thread-safely store events and notify observers of changes.

class ska_tango_testing.integration.event.EventEnumMapper(mapping=None)

A class to map attribute names to Enums.

This class allows the association of attribute names with Enums. This is useful for state machine attributes, so when the event is received, the state is automatically converted to the corresponding Enum. This is useful so when you print the event as a string, you can see the state as a human readable label, instead of an integer number.

You use this class as follows:

Usage example:

from enum import Enum

class MyEnum(Enum):
    STATE1 = 1
    STATE2 = 2
    STATE3 = 3

mapper = EventEnumMapper()
mapper.map_attribute_to_enum("State", MyEnum)
# or equivalently
# mapper = EventEnumMapper({"State": MyEnum})


typed_event = mapper.get_typed_event(event_w_state_as_attr_name)
# (this now is a TypedEvent with the attribute value as MyEnum)

unchanged_event = mapper.get_typed_event(event_wo_state_as_attr_name)
# (this is still just a ReceivedEvent)
__init__(mapping=None)

Create a new EventEnumMapper.

Parameters:

mapping (Optional[dict[str, type[Enum]]]) – An optional dictionary to map attribute names to Enums. By default, it is an empty dictionary.

Raises:

TypeError – if any of the values in the mapping is not an Enum.

get_typed_event(event)

Get a TypedEvent if the attribute is associated with an Enum.

Parameters:

event (ReceivedEvent) – The event to type.

Return type:

ReceivedEvent

Returns:

The TypedEvent instance if the attribute is associated with an Enum and the attribute value is effectively mappable to the given enum, the original event otherwise.

map_attribute_to_enum(attribute_name, enum_class)

Associate an attribute name with an Enum.

Parameters:
  • attribute_name (str) – The name of the attribute to associate.

  • enum_class (type[Enum]) – The Enum to associate.

Raises:

TypeError – if enum_class is not an Enum.

Return type:

None

class ska_tango_testing.integration.event.EventStorage

Thread-safe storage for Tango events.

This class provides a thread-safe storage for ReceivedEvent instances. An instance of this class can be used to store the events that are received from multiple Tango devices concurrently.

This class also offers a subscription mechanism to notify observers of changes in the stored events. An observer must implement the EventStorageObserver interface. The observer will be notified of changes in the events list 1) the first time it subscribes, and 2) every time a new event is stored. Every notification will include a full copy of the current events list (maybe in future we will also pass the new event separately).

Both the storing and the notification mechanisms are thread-safe.

The subscription mechanism is inspired by the Observer Design Pattern.

__init__()

Initialise the events storage.

clear_events()

Clear all stored events.

Return type:

None

property events: list[ReceivedEvent]

Get a copy of all stored events.

Returns:

A copy of the current events list

store(event)

Store a new event and notify observers.

Parameters:

event (ReceivedEvent) – The event to store

Return type:

list[ReceivedEvent]

Returns:

A copy of the current events list

subscribe(observer)

Add an observer to be notified of events changes.

Parameters:

observer (EventStorageObserver) – The observer to add

Return type:

None

unsubscribe(observer)

Remove an observer.

Parameters:

observer (EventStorageObserver) – The observer to remove

Return type:

None

class ska_tango_testing.integration.event.EventStorageObserver(*args, **kwargs)

Observer interface for EventStorage changes.

This class is a protocol that must be implemented by classes that want to observe changes in the EventStorage class. See the class documentation for more information.

__init__(*args, **kwargs)
on_events_change(events)

Handle events list change.

Parameters:

events (list[ReceivedEvent]) – Current list of events

Return type:

None

class ska_tango_testing.integration.event.ReceivedEvent(event_data)

A Tango change event received by a device to notify a change.

This class represents a received change event from a Tango device device_name, regarding an attribute attribute_name which contains a new value attribute_value. The event has been received at reception_time in this testing context.

This class is a wrapper around the Tango tango.EventData, which extracts and exposes the most relevant information for testing purposes. If you need to access the original Tango event data, you can use the event_data attribute.

Since in SKAO tests developers do not always use the (string) device name, a method has_device() is provided to check if the event comes from a given device (the same method accepts a string too).

Since the attribute name received by the Tango event is always lower case, a method has_attribute() is provided to check if the event comes from a given attribute (to make it case insensitive).

When printed as a string, a ReceivedEvent will show the device name, the attribute name, the attribute value, and the reception time in a concise and human-readable way.

__init__(event_data)

Initialise the ReceivedEvent with the event data.

Parameters:

event_data (EventData) – The event data.

property attribute_name: str

The (short) name of the attribute that sent the event.

Examples: ‘attribute1’, ‘state’, etc.

IMPORTANT NOTE: The attribute name is always lower case, as it is returned by the Tango event data. To avoid case sensitivity issues, always use lower case when comparing attribute names or use the has_attribute() method.

Example: an event from an attribute ‘State’

event.attribute_name # 'state'
event.attribute_name == 'State' # False
event.attribute_name == 'state' # True
event.has_attribute('State') # True
event.has_attribute('state') # True
Returns:

The name of the attribute.

property attribute_value: Any

The new value of the attribute when the event was sent.

Returns:

The new value of the attribute. The type of the value depends on the attribute type. If for some reason the value is missing, None is returned.

property device_name: str

The name of the device that sent the event.

Example: ‘sys/tg_test/1’

Returns:

The name of the device.

event_data: EventData

The original received tango.EventData object.

has_attribute(target_attribute_name)

Check if the event comes from a given attribute.

IMPORTANT NOTE: A lower case comparison is used to avoid case sensitivity. This is preferred because the attribute name in tango.EventData is always lower case.

Example: an event from an attribute ‘State’

event.attribute_name # 'state'
event.attribute_name == 'State' # False
event.attribute_name == 'state' # True
event.has_attribute('State') # True
event.has_attribute('state') # True
Parameters:

target_attribute_name (str) – The name of the attribute to check against.

Return type:

bool

Returns:

True if the event comes from the given attribute.

has_device(target_device_name)

Check if the event comes from a given device.

Parameters:

target_device_name – The name of the device or the device proxy to check against.

Returns:

True if the event comes from the given device.

property is_error: bool

Check if the event is an error event.

Returns:

True if the event is an error event, False otherwise.

reception_age()

Return the age of the event in seconds since it was received.

The age is calculated as the difference between the current time (local) and the (local) time when the event was received reception_time.

Return type:

float

Returns:

The age of the event in seconds.

reception_time: datetime

The (local) timestamp of when the event was received.

class ska_tango_testing.integration.event.TangoSubscriber(event_enum_mapping=None)

Manager for Tango device event subscriptions.

This class manages subscriptions to Tango device change events in a thread-safe way. It allows you to:

  • subscribe to change events for specific device attributes, providing a callback that will be called when events are received

  • automatically convert event values to enum types when appropriate

  • safely unsubscribe from all events when done

Usage example:

def my_callback(event: ReceivedEvent) -> None:
    print(f"Received event: {event}")

# Create manager with optional enum mapping
manager = TangoSubscriptionManager({
    "State": MyStateEnum
})

# Subscribe to events
manager.subscribe_event("sys/tg_test/1", "State", my_callback)

# ... do something ...

# Clean up
manager.unsubscribe_all()

NOTE: When you subscribe to an event, the callback will be called with the current attribute value as an event.

TECHNICAL NOTE: The subscriptions are protected by a lock to ensure thread safety, potentially you can subscribe/unsubscribe from different threads (even if this probably will not be needed).

__init__(event_enum_mapping=None)

Initialise the subscriber.

Parameters:

event_enum_mapping (Optional[dict[str, type[Enum]]]) – Optional mapping of attribute names to event types (Enum). If you specify this, the event data will be converted to the appropriate type before being passed to the user-defined callback and so eventual prints, string conversions etc. will be more readable.

attribute_enum_mapping

Mapping of attribute names to event types (Enum).

subscribe_event(device_name, attribute_name, callback, dev_factory=None)

Subscribe to change events for a Tango device attribute.

This method sets up a subscription to CHANGE_EVENT events for a specific attribute of a Tango device. When an event is received, it is:

  1. Wrapped in a ReceivedEvent object

  2. Converted to the appropriate enum type if configured

  3. Passed to the provided callback function

Parameters:
  • device_name – Either a device name (str) or an existing DeviceProxy. If a string is provided, a new DeviceProxy will be created using the dev_factory (or tango.DeviceProxy if no factory is provided).

  • attribute_name – Name of the device attribute to subscribe to. Case-sensitive, should match exactly the attribute name in the device.

  • callback – Function that will be called for each received event. Must accept a single ReceivedEvent parameter.

  • dev_factory – Optional factory function to create DeviceProxy instances. Useful for testing or custom proxy creation. If None, tango.DeviceProxy will be used.

Raises:

ValueError – If the device_name is not a string or DeviceProxy

NOTE: Upon subscription, you will immediately receive an event with the current value of the attribute.

NOTE: if you subscribe to the same attribute of the same device multiple times, the subscription will NOT be duplicated.

TECHNICAL NOTE: This method is thread-safe. The subscription ID is stored in a thread-safe way to allow concurrent subscriptions and un-subscriptions.

unsubscribe_all()

Unsubscribe from all active subscriptions.

Return type:

None

class ska_tango_testing.integration.event.TypedEvent(event_data, enum_class)

A ReceivedEvent that is typed with an Enum.

This class is a subclass of ska_tango_testing.integration.event.ReceivedEvent that adds the possibility to associate an Enum with an attribute name. This is useful for state machine attributes, so when the event is received, the state is automatically converted to the corresponding Enum. This is useful so when you print the event as a string, you can see the state as a human readable label, instead of an integer number.

To use this class, you need to define an Enum that represents the states of the attribute, and then associate the attribute name with the Enum using the EventEnumMapper class.

Usage example:

from enum import Enum

class MyEnum(Enum):
    STATE1 = 1
    STATE2 = 2
    STATE3 = 3

event = TypedEvent(event_data, MyEnum)

print(str(event.attribute_value))  # MyEnum.STATE1
__init__(event_data, enum_class)

Create a new TypedEvent.

Parameters:
  • event_data (EventData) – The event data received from Tango.

  • enum_class (type[Enum]) – The Enum class to use for the attribute value.

Raises:

TypeError – if enum_class is not an Enum.

property attribute_value: Enum

The attribute value, eventually cast to the given enum.

NOTE: remember that if you want to have it printed with the Enum label instead of the integer value, you need to cast it to a string, as shown here:

from enum import Enum

class MyEnum(Enum):
    STATE1 = 1
    STATE2 = 2
    STATE3 = 3

event = TypedEvent(event_data, MyEnum)

print(event.attribute_value) # 1
print(str(event.attribute_value))  # MyEnum.STATE1
Returns:

the attribute value, eventually converted to an enum.

ska_tango_testing.integration.query

Query the tracer for events.

This module provides classes to represent the various queries that can be made to the tracer to verify conditions on the events that have been recorded and/or to retrieve events.

The main base class for the query hierarchy is EventQuery, which provides the basic interface for a query.

Some useful queries are provided in this module, such as:

  • NEventsMatchQuery: A query to check if a certain number of events match a given predicate.

  • NStateChangesQuery: A query to check if a certain number of changes in an attribute value are recorded.

  • QueryWithFailCondition: A sort of decorator query that adds a fail condition to another query to make it fail early if some kind of event is detected.

class ska_tango_testing.integration.query.EventQuery(timeout=0.0)

Abstract class for querying events with a timeout mechanism.

An events query is a mechanism to query a set of events within a timeout and succeed if some criteria are met or fail otherwise. A query has the following characteristics.

  • It has a lifecycle (accessible through the status method and represented by EventQueryStatus):

    • it is instantiated and initialised with all the needed parameters (such as the timeout)

    • it is evaluated through an event tracer

    • if it has a >0 timeout, it will wait for the timeout to expire or for the success criteria to be met; if it has a 0 timeout, it will just evaluate the events once and return immediately; in both cases, the success criteria are defined by the succeeded method

    • when the query is completed (because the timeout expired or the success criteria are met), the evaluation ends and the query exposes its success or failed status (through the succeeded method) and further information about the evaluation (like the duration, the initial and remaining timeout and a description of the results and the criteria) through the other methods (evaluation_duration, initial_timeout, remaining_timeout, describe)

  • it is an abstract class, so it cannot be instantiated directly. Instead, a subclass has to be created to define which are exactly the success criteria and the evaluation logic. The subclass must implement the following protected methods:

    • _succeeded defines the success criteria of your query, write here some logic to check if your query is satisfied (return True if it is, False otherwise). Consider that by default a timeout will be awaited if the query is not completed yet.

    • _evaluate_events is a callback method you can implement to analyse new events and update some kind of your internal state. The same internal state can be used in the _succeeded method. The method is activated once when the query evaluation begins and every time new events are received. Consider that every time this method is called all the received events are passed to it (not only the new ones). Consider also that both this method and the _succeeded method are protected by a lock, so you can safely access your internal state

    • you may also want to override the _is_stop_criteria_met method to add more criteria to stop the evaluation (e.g., an early stop condition)

    • you may also want to override the description private method _describe_results to provide a custom description of the query results and the _describe_criteria method to provide a custom description of the query criteria you are using.

From a user’s perspective, to evaluate the query you can simply pass it to a ska_tango_testing.integration.tracer.TangoEventTracer instance. Inside it, the query will automatically subscribe to the events storage and wait for the evaluation to complete. Example:

tracer = TangoEventTracer()
# (do all your subscriptions here)

class MyQuery(EventQuery):

    def _evaluate_events(self, events: List[ReceivedEvent]) -> None:
        # (your logic here, that saves some kind of state)

    def _succeeded(self):
        # (your logic here, that checks if the query is satisfied
        # using the state saved in the _evaluate_events method)

    def _describe_results(self):
        # (your logic here, that describes the results of the query)
        # (optional but recommended)

    def _describe_criteria(self):
        # (your logic here, that describes the criteria of the query)
        # (optional but recommended)

# simple evaluation without timeout (non blocking)
query = MyQuery()
tracer.evaluate_query(query)

# evaluation with timeout (blocking, for at most 10 seconds)
query_with_timeout = MyQuery(timeout=10.0)
tracer.evaluate_query(query_with_timeout)

A SMALL DESCRIPTION OF THE QUERY EVALUATION MECHANISM: the query is evaluated by connecting it to an EventStorage, through a subscription mechanism. The query is then notified when new events are received and it can evaluate them. The query is also notified once when the evaluation begins. This way, when the evaluate method is called, the query is put in evaluation mode, it receives at least once all the events that are in the storage and every time a new event is stored it updates its internal state and checks if the success criteria are met. The timeout mechanism is implemented using a signal that is set when the timeout expires or when the success criteria are met (this way a client can be blocked until the evaluation completes or the timeout expires).

Important inspiration for the query mechanism comes from:

IMPORTANT NOTE: the query is internally thread safe, since all the attributes access are protected by a lock. As you may notice, the protected internal methods are not thread safe by themselves, but the lock is always and only acquired in the public methods. If you implement your own query:

  • do not acquire the lock in the protected methods, because it is already acquired in the public methods and that would cause a deadlock;

  • do not call query public methods from the protected template methods (like _succeeded and _evaluate_events) you implement, because they may acquire the lock again and cause a deadlock (instead, limit yourself to access the internal state and eventually the protected methods, you will already have the lock acquired, so don’t worry about acquiring it again).

Essentially, you can not worry about concurrent access to your internal state, because it is protected by the lock (at least from eventual concurrent internal calls). The only tricky case may be if you wish to expose your internal state to the outside world and you think that some external client may access it concurrently to the evaluation phase. In this case, it’s recommended that your internal state is protected in the same way as the query internal state is protected (with a lock, called just by public methods, that acquire the lock and orchestrate other unprotected private methods). If you think no client will access your state during the evaluation (which is the most common case), you can also not worry about it.

__init__(timeout=0.0)

Initialise the events query.

Parameters:

timeout (SupportsFloat) –

The timeout for the query in seconds. By default, the query will not wait for any timeout.

NOTE: timeouts < 0 and infinite timeouts are automatically converted to 0 timeouts (no timeout).

describe()

Describe the query status, criteria and results.

By default the status is described including the status, the start and end time of the evaluation (if available), the timeout and the remaining timeout. You can override this method to provide a custom description of the query results by overriding the _describe_results method and the query criteria by overriding the _describe_criteria method.

Return type:

str

Returns:

The description of the query. It is a 6 lines string divided in 3 sections: status, criteria and results.

evaluate(storage)

Start the evaluation of the query (USED BY THE TRACER).

This method is used by the tracer to put the query in evaluation mode so it can receive events and evaluate them. This method is blocking and will return only when the query evaluation completes or the timeout expires.

NOTE: this method is meant to be called only one time. If you try to evaluate again or if you try to evaluate a query that is already in an evaluation state, a ValueError will be raised.

NOTE: this method is meant to be called by the tracer, not by the end user. The end user should simply pass the query to the tracer and let it handle the evaluation.

NOTE: this method is not meant to be overridden. If you want to implement the evaluation logic, you should implement the _succeeded and _evaluate_events methods. You can also override the _is_stop_criteria_met method to add more criteria to stop the evaluation.

Parameters:

storage (EventStorage) – The event storage to use to receive events.

Raises:

ValueError – If the evaluation is already started.

Return type:

None

evaluation_duration()

Get the duration of the query evaluation in seconds.

The duration is the time elapsed between the evaluation start and the evaluation end. If the evaluation has not started yet, the duration is None. If the evaluation is in progress, the duration is the time elapsed between the evaluation start and the current time.

Return type:

float | None

Returns:

The duration of the query evaluation in seconds.

initial_timeout()

Get the initial timeout in seconds.

The initial timeout is the timeout set when the evaluation begins. If the evaluation did not start yet, the initial timeout is the value that can be read now from the timeout attribute.

Return type:

float

Returns:

The initial timeout in seconds.

is_completed()

Check if the query is completed.

Return type:

bool

Returns:

True if the query is completed, False otherwise.

on_events_change(events)

Handle events change and evaluate them against the query criteria.

This method is the callback that is called when new events are received by the events manager. It evaluates again all the events and stops the evaluation if the criteria are met.

NOTE: this method is meant to be called by the events manager when new events are received. The end user should not call this method directly.

NOTE: this method is not meant to be overridden. If you want to implement the evaluation logic, you should implement the _succeeded and _evaluate_events methods. You can also override the _is_stop_criteria_met method to add more criteria to stop the evaluation.

Parameters:

events (List[ReceivedEvent]) – The updated list of events (with both new and old events).

Return type:

None

remaining_timeout()

Get the remaining timeout in seconds.

The remaining timeout is the time left before the timeout expires. If the evaluation did not start yet, the remaining timeout is the value of the timeout attribute.

Return type:

float

Returns:

The remaining timeout in seconds.

set_timeout(timeout)

Change the timeout of the query (only before evaluation).

Before the evaluation begins, you can change this query’s timeout.

Parameters:

timeout (SupportsFloat) – The new timeout for the query in seconds.

Raises:

RuntimeError – If the evaluation already started.

Return type:

None

status()

Get the status of the query.

The query can be in one of the following states:

  • NOT_STARTED: the query is created but not evaluated yet.

  • IN_PROGRESS: the query is being evaluated.

  • SUCCEEDED: the query evaluation terminated and succeeded.

  • FAILED: the query evaluation terminated and failed.

Return type:

EventQueryStatus

Returns:

The status of the query.

succeeded()

Check if the query succeeded.

Return type:

bool

Returns:

True if the query succeeded, False otherwise.

class ska_tango_testing.integration.query.EventQueryStatus(value)

Enumeration for the status of an events query.

The status of an events query can be one of the following:

  • NOT_STARTED: the query is created but not evaluated yet.

  • IN_PROGRESS: the query is being evaluated.

  • SUCCEEDED: the query evaluation terminated and succeeded.

  • FAILED: the query evaluation terminated and failed.

class ska_tango_testing.integration.query.NEventsMatchQuery(predicate, target_n_events=1, timeout=0.0)

Query that looks for N events that match a given predicate.

This query will succeed when there are N received events that match a certain predicate (without duplicates). The query will evaluate the predicate for each event and store the matching events (avoiding duplicates) and will succeed when the number of matching events is equal to or greater than the target number of events.

Here follows an example of how to use this query:

def predicate(event: ReceivedEvent, all_events: list[ReceivedEvent]) -> bool:
    return (
        event.has_device("sys/tg_test/1") and
        event.has_attribute("attr1") and
        event.attribute_value >= 42
    )

# query for 3 events that match from a certain device and attribute
# with a value greater or equal to 42
query = NEventsMatchQuery(predicate, target_n_events=3, timeout=10)

# evaluate the query
tracer.evaluate_query(query)

# access the matching events
if query.succeeded():
    first_matching_event = query.matching_events[0]

# description will include some information about the criteria
# (e.g., the target number of events) and the results
# (e.g., the number of matching events)
logging.info(query.describe())

IMPORTANT NOTE: At the moment, the internal matching events list is not protected from external modifications with a lock, so if a user modifies the list while the query is being evaluated, there may be unexpected results. But this is not likely to happen if the client is using the query as intended.

__init__(predicate, target_n_events=1, timeout=0.0)

Initialise the query with the predicate and target number of events.

Parameters:
  • predicate (Callable[[ReceivedEvent, list[ReceivedEvent]], bool]) – A function that takes an event and the list of all events as input and returns True if the event matches the desired criteria. The predicate can evaluate just the event in isolation or also the event in the context of the other events. The list of events is supposed to be ordered by the time they were received.

  • target_n_events (int) – The target number of events to match. Defaults to 1. It must be greater or equal to 1.

  • timeout (SupportsFloat) – The timeout for the query in seconds. Defaults to 0.

Raises:

ValueError – If the target number of events is less than 1.

class ska_tango_testing.integration.query.NStateChangesQuery(device_name=None, attribute_name=None, attribute_value=None, previous_value=None, custom_matcher=None, target_n_events=1, timeout=0.0)

Query that looks for N state change events.

This query extends NEventsMatchQuery and will succeed when N state change events are received using the provided criteria. The supported criteria are the following:

  • the device name

  • the attribute name

  • the current value

  • the previous value

  • a custom matcher function

All the criteria are optional and can be combined to define the state change events you are looking for. The query will evaluate the criteria for each event and store the matching events (avoiding duplicates) and will succeed when the number of matching events is equal to or greater than the target number of events.

NOTE: passing None to any of the criteria will match any value for that criterion.

Here follows an example of how to use this query:

# query to detect a state change from OFF to ON
query = NStateChangesQuery(
    device_name=device, # device name or device proxy, it's the same
    attribute_name="state",
    attribute_value=tango.DevState.ON,
    previous_value=tango.DevState.OFF,
    timeout=10,
)

# evaluate the query
tracer.evaluate_query(query)

# access the matching events
if query.succeeded():
    first_matching_event = query.matching_events[0]

# another more elaborate query that replicates the NEventsMatchQuery
# example but with state change criteria
query2 = NStateChangesQuery(
    device_name="sys/tg_test/1",
    attribute_name="attr1",
    custom_matcher=lambda event: event.attribute_value >= 42,
    target_n_events=3,
    timeout=10,
)

# ...

IMPORTANT NOTE: At the moment, the internal matching events list is not protected from external modifications with a lock, so if a user modifies the list while the query is being evaluated, there may be unexpected results. But this is not likely to happen if the client is using the query as intended.

__init__(device_name=None, attribute_name=None, attribute_value=None, previous_value=None, custom_matcher=None, target_n_events=1, timeout=0.0)

Initialise the query with the state change parameters.

Parameters:
  • device_name – The name of the device to match. Optional, by default it will match any device.

  • attribute_name – The name of the attribute to match. Optional, by default it will match any attribute.

  • attribute_value – The current value of the attribute. Optional, by default it will match any current value.

  • previous_value – The previous value of the attribute. Optional, by default it will match any previous value.

  • custom_matcher – A custom matcher function to apply to events to define further rules. Optional, by default no further rules are applied.

  • target_n_events – The target number of events to match. Defaults to 1.

  • timeout – The timeout for the query in seconds. Defaults to 0.

NOTE: passing None to any of the criteria will match any value for that criterion.

as_predicate(event, events)

Define this query as a predicate to evaluate the events.

Parameters:
Return type:

bool

Returns:

True if the event matches the state change criteria, False otherwise.

class ska_tango_testing.integration.query.QueryWithFailCondition(wrapped_query, stop_condition)

A query that wraps another query and stops early if a condition is met.

This query wraps another query and stops the evaluation early if a given stop condition is met. The stop condition is a function that takes an event as input and returns True if the query should stop evaluating events. Each new event is evaluated by the stop condition before being passed to the wrapped query. A few notes:

  • this query will succeed if the wrapped query succeeds (and the stop condition is not met);

  • the stop condition is evaluated before the wrapped query, so the wrapped query will not be evaluated if the stop condition is met;

  • this query’s timeout is exactly the same as the wrapped query’s timeout.

Here follows an example of how to use this query:

# define a wrapped query
wrapped_query = NStateChangesQuery(
    device_name="sys/tg_test/1",
    attribute_name="attr1",
    custom_matcher=lambda event: event.attribute_value >= 42,
    target_n_events=3,
    timeout=10, # this timeout will be used
)

# define a stop condition that detects error events from any device
def stop_condition(event: ReceivedEvent) -> bool:
    return (
        event.has_attribute("longRunningCommandResult") and
        "error code 3: exception" in str(event.attribute_value)
    )

# wrap the query with the stop condition
query = QueryWithFailCondition(wrapped_query, stop_condition)

# evaluate the query
tracer.evaluate_query(query)

if query.succeeded():
    # access the matching events
    first_matching_event = wrapped_query.matching_events[0]
elif query.failed_event is not None:
    # query failed early because of the stop condition
    # ...
else:
    # query failed for another reason (e.g., timeout)
    # ...

# description will combine the wrapped query description with
# the stop condition description and the eventual detected
# early stop event
logging.info(query.describe())

The general idea of this kind of query comes from the Decorator Design Pattern, because this query is a subclass that wraps a generic query to add a new behaviour (the early stop condition).

IMPORTANT NOTE: At the moment, the internal fail event variable is not protected from external modifications with a lock, so if a user modifies the value while the query is being evaluated, there may be unexpected results. But this is not likely to happen if the client is using the query as intended.

__init__(wrapped_query, stop_condition)

Initialise the query with the wrapped query and stop condition.

Parameters:
  • wrapped_query (EventQuery) – The query to wrap.

  • stop_condition (Callable[[ReceivedEvent], bool]) – A function that takes an event as input and returns True if the stop condition is met.

ska_tango_testing.integration.predicates (DEPRECATED)

Predicates to filter TangoEventTracer events in queries.

WARNING: This module is deprecated and will be removed in future. It is replaced by queries objects, which are able to print by themselves their details; see query module.

A collection of predicates to filter ReceivedEvent instances when calling the ska_tango_testing.integration.TangoEventTracer.query_events() method. The main purpose of these predicates is to allow the user to compose complex queries to filter events based on their attributes but also on their position in the event sequence.

If you are an end-user of this module, you will probably not need to write or use these predicates directly. Instead, you will use the custom assertpy assertions (see ska_tango_testing.integration.assertions). If you wish to write custom predicates we still recommend to check the custom code for usage examples.

WARNING: This module is deprecated and will be removed in future. It is replaced by queries objects, which are able to print by themselves their details; see query.

ska_tango_testing.integration.predicates.event_has_previous_value(target_event, tracer, previous_value)

Check if an event has a specific previous value.

This predicate can be used to match events based on the value they had before the current one. It is useful to check if an event was triggered by a specific value change. If the event has no previous value, it will return False.

WARNING: This method is deprecated and will be removed in future. It is replaced by queries objects, which are able to print by themselves their details; see query.

Parameters:
  • target_event (ReceivedEvent) – The event to check.

  • tracer (TangoEventTracer) – The event tracer containing the events.

  • previous_value (Any) – The value to match.

Return type:

bool

Returns:

True if the event has the provided previous value, False if the event has no previous value or if the previous value does not match.

Deprecated since version 0.8.0: This method is deprecated and will likely be removed in future. It is replaced by queries objects, which are able to print by themselves their details; see ska_tango_testing.integration.query.

ska_tango_testing.integration.predicates.event_matches_parameters(target_event, device_name=None, attribute_name=None, attribute_value=None)

Check if an event matches the provided criteria.

If a criterion is not given (ANY_VALUE), the predicate will always return True (only the given and not None criteria will be checked).

WARNING: This method is deprecated and will be removed in future. It is replaced by queries objects, which are able to print by themselves their details; see query.

Parameters:
  • target_event – The event to check.

  • device_name – The device name to match. If not provided, it will match any device name.

  • attribute_name – The attribute name to match. If not provided, it will match any attribute name.

  • attribute_value – The current value to match. If not provided, it will match any current value.

Returns:

True if the event matches the provided criteria, False otherwise.

Deprecated since version 0.8.0: This method is deprecated and will likely be removed in future. It is replaced by queries objects, which are able to print by themselves their details; see ska_tango_testing.integration.query.