"""Module used for checking and verifying correct occurrence of events."""
import datetime
from functools import reduce
from typing import List, NamedTuple, Tuple, Union
from assertpy import assert_that
from ska_ser_skallop.event_handling import base
from ska_ser_skallop.subscribing import helpers
[docs]class Occurrence(NamedTuple):
"""Bundles data related to a single occurrence into a single object."""
time_of_occurrence: datetime.datetime
time_of_recording: datetime.datetime
value: str
attr: str
device: str
[docs]class Ahead(NamedTuple):
"""Bundles data about a device that are transitioning ahead of the main one."""
device: str
ahead_by: datetime.timedelta
[docs]class Behind(NamedTuple):
"""Bundles data about a device that are transitioning behind that of the main one."""
device: str
behind_by: datetime.timedelta
[docs]class Outcome(NamedTuple):
"""Bundles data about an outcome for the relative order of occurrences into a single object."""
device: str
transition: str
devices_ahead: List[Ahead]
devices_behind: List[Behind]
transit_time: Union[datetime.datetime, str]
[docs]class AssertionOnOccurrence:
"""Intermediate object to manage the assertion of correct occurrences."""
def __init__(self, device: str, occurrences: "Occurrences") -> None:
"""Initialise the object.
:param device: The device that will be investigated
:param occurrences: the given event occurrences that took place
"""
self.device = device
self.occurrences = occurrences
[docs] def is_behind_all_on_transit(self, transit: str):
"""Assert that events occurred on this device behind all others for a given transition.
:param transit: The name of the transition (e.g. the value indicated by the change event)
"""
outcomes = self.occurrences._generate_outcomes_for(self.device)
if outcomes:
outcomes_for_device = [o for o in outcomes if o.device == self.device]
outcome = [o for o in outcomes_for_device if o.transition == transit][0]
assert_that(outcome.devices_behind).is_equal_to([])
[docs] def is_ahead_of_all_on_transit(self, transit: str):
"""Assert that events occurred on this device ahead of all others for a given transition.
:param transit: The name of the transition (e.g. the value indicated by the change event)
"""
outcomes = self.occurrences._generate_outcomes_for(self.device)
if outcomes:
outcomes_for_device = [o for o in outcomes if o.device == self.device]
outcome = [o for o in outcomes_for_device if o.transition == transit][0]
assert_that(outcome.devices_ahead).is_equal_to([])
[docs]class Occurrences(base.WireTap):
"""Class used to keep track of the occurrences of events.
It is given a list of values representing expected transitions
and then keeps track of actual events occuring by means of handler
objects making calls on it by seeing it as a WireTap object.
"""
def __init__(self, transits: List[Union[str, Tuple[str, str]]]):
"""Initialise the object.
TODO evaluate if using a tuple for events is redundant
:param transits: the expected transits that may occur, this can
either be a list of simple strings or tuple for which
the second value indicates if the transit is expected to be behind or ahead
of all devices.
"""
self.transits = transits
self.expected = [t if isinstance(t, str) else t[0] for t in transits]
self._occurrences: List[Occurrence] = []
self._subject_device = None
def _get_devices(self) -> List[str]:
devices = []
for item in self._occurrences:
if item.device not in devices:
devices.append(item.device)
return list(devices)
def _get_transits(self) -> List[str]:
devices = []
for item in self._occurrences:
if item.value not in devices:
devices.append(item.value)
return devices
def _get_outcomes_for_transit(self, transit: str) -> List[Occurrence]:
outcomes = [o for o in self._occurrences if o.value == transit]
outcomes.sort(key=lambda o: o.time_of_occurrence)
return outcomes
@staticmethod
def _get_time_of_occurrence(
outcomes_for_transit: List[Occurrence], device: str
) -> Union[datetime.datetime, str]:
device_time_of_occurrence_select = [
o.time_of_occurrence for o in outcomes_for_transit if o.device == device
]
if device_time_of_occurrence_select:
return device_time_of_occurrence_select[0]
return "transition never occurred"
def _generate_outcomes(self) -> List[Outcome]:
outcomes = []
for device in self._get_devices():
for transit in self._get_transits():
outcomes_for_transit = self._get_outcomes_for_transit(transit)
device_transit_occurred = False
devices_behind, devices_ahead = [], []
device_time_of_occurrence = self._get_time_of_occurrence(
outcomes_for_transit, device
)
if not isinstance(device_time_of_occurrence, str):
for occurrence in outcomes_for_transit:
if occurrence.device == device:
device_transit_occurred = True
elif device_transit_occurred:
devices_behind.append(
Behind(
occurrence.device,
occurrence.time_of_occurrence
- device_time_of_occurrence,
)
)
else:
devices_ahead.append(
Ahead(
occurrence.device,
device_time_of_occurrence
- occurrence.time_of_occurrence,
)
)
outcomes.append(
Outcome(
device,
transit,
devices_ahead,
devices_behind,
device_time_of_occurrence,
)
)
return outcomes
[docs] def set_subject_device(self, device: str):
"""Specify the main device or subject of investigating relative occurrences.
:param device: the name of the device as FQDN string.
"""
self._subject_device = device
@property
def subject_device(self) -> str:
"""Get the main device or subject of investigating relative occurrences.
:returns: the name of the device as FQDN string.
"""
device = (
self._subject_device
if self._subject_device is not None
else self._get_devices()[0]
)
return device
@staticmethod
def _agg_strings(x_arg, y_arg) -> str:
return f"{x_arg}\n{y_arg}"
def _generate_outcomes_for(self, device: str) -> List[Outcome]:
return [o for o in self._generate_outcomes() if o.device == device]
[docs] def print_outcome_for(self, device: str) -> str:
"""Print a human readable description of events occurring relative to the given device.
:param device: the FQDN name of the device to be printing an outcome for
:return: A string containing the text documenting the results.
"""
lines = []
outcomes_for_device = self._generate_outcomes_for(device)
previous_time_of_occurrence: Union[None, datetime.datetime, str] = None
for outcome in outcomes_for_device:
if not previous_time_of_occurrence:
previous_time_of_occurrence = outcome.transit_time
transition_period = "na"
else:
if isinstance(outcome.transit_time, str) or isinstance(
previous_time_of_occurrence, str
):
transition_period = "not calculated as transition was skipped"
else:
transition_period = (
outcome.transit_time - previous_time_of_occurrence
)
devices_behind = [
f"\t\t\t{b.device} after by {b.behind_by}"
for b in outcome.devices_behind
]
devices_ahead = [
f"\t\t\t{a.device} before by {a.ahead_by}"
for a in outcome.devices_ahead
]
if devices_behind:
devices_behind = reduce(self._agg_strings, devices_behind)
devices_behind = f"\n{devices_behind}"
else:
devices_behind = "none"
if devices_ahead:
devices_ahead = reduce(self._agg_strings, devices_ahead)
devices_ahead = f"\n{devices_ahead}"
else:
devices_ahead = "none"
lines.append(
f"transition: {outcome.transition} (duration: {transition_period})\n"
f"\t\tdevices transitioning afterwards: {devices_behind}\n"
f"\t\tdevices transioning before: {devices_ahead}"
)
if lines:
agg_lines = reduce(self._agg_strings, lines)
else:
agg_lines = ""
return f"outcome for {device}:\n" f"{agg_lines}"
[docs] def tap_in_on_event(self, event: base.EventDataInt):
"""Implement the abstract `tap_in_on_event`.
This will result in a new occurrence to be added with
the event as the input argument.
:param event: The event object that has occurred.
"""
self.add_occurrence(event)
[docs] def add_occurrence(self, event: base.EventDataInt):
"""Add the occurrence of a new event.
:param event: the event, caused by a subscription on a producer
(e.g tango device) and typically called by an handler,
handling that event.
"""
self._occurrences.append(
Occurrence(
helpers.get_date_lodged(event),
datetime.datetime.now(),
helpers.get_attr_value_str(event),
helpers.get_attr_name(event),
helpers.get_device_name(event),
)
)
[docs] def assert_that(self, device: str) -> AssertionOnOccurrence:
"""Generate an assertion check on the occurrences for a particular device.
:param device: the FQDN name for the device being checked.
:returns: an object to specify the assertion in a chain like fashion
"""
return AssertionOnOccurrence(device, self)