Source code for ska_ser_skallop.event_handling.occurrences

"""Module used for checking  and verifying correct occurrence of events."""
import datetime
from functools import reduce
from typing import List, NamedTuple, Tuple, Union

from assertpy import assert_that

from ska_ser_skallop.event_handling import base
from ska_ser_skallop.subscribing import helpers


[docs]class Occurrence(NamedTuple): """Bundles data related to a single occurrence into a single object.""" time_of_occurrence: datetime.datetime time_of_recording: datetime.datetime value: str attr: str device: str
[docs]class Ahead(NamedTuple): """Bundles data about a device that are transitioning ahead of the main one.""" device: str ahead_by: datetime.timedelta
[docs]class Behind(NamedTuple): """Bundles data about a device that are transitioning behind that of the main one.""" device: str behind_by: datetime.timedelta
[docs]class Outcome(NamedTuple): """Bundles data about an outcome for the relative order of occurrences into a single object.""" device: str transition: str devices_ahead: List[Ahead] devices_behind: List[Behind] transit_time: Union[datetime.datetime, str]
[docs]class AssertionOnOccurrence: """Intermediate object to manage the assertion of correct occurrences.""" def __init__(self, device: str, occurrences: "Occurrences") -> None: """Initialise the object. :param device: The device that will be investigated :param occurrences: the given event occurrences that took place """ self.device = device self.occurrences = occurrences
[docs] def is_behind_all_on_transit(self, transit: str): """Assert that events occurred on this device behind all others for a given transition. :param transit: The name of the transition (e.g. the value indicated by the change event) """ outcomes = self.occurrences._generate_outcomes_for(self.device) if outcomes: outcomes_for_device = [o for o in outcomes if o.device == self.device] outcome = [o for o in outcomes_for_device if o.transition == transit][0] assert_that(outcome.devices_behind).is_equal_to([])
[docs] def is_ahead_of_all_on_transit(self, transit: str): """Assert that events occurred on this device ahead of all others for a given transition. :param transit: The name of the transition (e.g. the value indicated by the change event) """ outcomes = self.occurrences._generate_outcomes_for(self.device) if outcomes: outcomes_for_device = [o for o in outcomes if o.device == self.device] outcome = [o for o in outcomes_for_device if o.transition == transit][0] assert_that(outcome.devices_ahead).is_equal_to([])
[docs]class Occurrences(base.WireTap): """Class used to keep track of the occurrences of events. It is given a list of values representing expected transitions and then keeps track of actual events occuring by means of handler objects making calls on it by seeing it as a WireTap object. """ def __init__(self, transits: List[Union[str, Tuple[str, str]]]): """Initialise the object. TODO evaluate if using a tuple for events is redundant :param transits: the expected transits that may occur, this can either be a list of simple strings or tuple for which the second value indicates if the transit is expected to be behind or ahead of all devices. """ self.transits = transits self.expected = [t if isinstance(t, str) else t[0] for t in transits] self._occurrences: List[Occurrence] = [] self._subject_device = None def _get_devices(self) -> List[str]: devices = [] for item in self._occurrences: if item.device not in devices: devices.append(item.device) return list(devices) def _get_transits(self) -> List[str]: devices = [] for item in self._occurrences: if item.value not in devices: devices.append(item.value) return devices def _get_outcomes_for_transit(self, transit: str) -> List[Occurrence]: outcomes = [o for o in self._occurrences if o.value == transit] outcomes.sort(key=lambda o: o.time_of_occurrence) return outcomes @staticmethod def _get_time_of_occurrence( outcomes_for_transit: List[Occurrence], device: str ) -> Union[datetime.datetime, str]: device_time_of_occurrence_select = [ o.time_of_occurrence for o in outcomes_for_transit if o.device == device ] if device_time_of_occurrence_select: return device_time_of_occurrence_select[0] return "transition never occurred" def _generate_outcomes(self) -> List[Outcome]: outcomes = [] for device in self._get_devices(): for transit in self._get_transits(): outcomes_for_transit = self._get_outcomes_for_transit(transit) device_transit_occurred = False devices_behind, devices_ahead = [], [] device_time_of_occurrence = self._get_time_of_occurrence( outcomes_for_transit, device ) if not isinstance(device_time_of_occurrence, str): for occurrence in outcomes_for_transit: if occurrence.device == device: device_transit_occurred = True elif device_transit_occurred: devices_behind.append( Behind( occurrence.device, occurrence.time_of_occurrence - device_time_of_occurrence, ) ) else: devices_ahead.append( Ahead( occurrence.device, device_time_of_occurrence - occurrence.time_of_occurrence, ) ) outcomes.append( Outcome( device, transit, devices_ahead, devices_behind, device_time_of_occurrence, ) ) return outcomes
[docs] def set_subject_device(self, device: str): """Specify the main device or subject of investigating relative occurrences. :param device: the name of the device as FQDN string. """ self._subject_device = device
@property def subject_device(self) -> str: """Get the main device or subject of investigating relative occurrences. :returns: the name of the device as FQDN string. """ device = ( self._subject_device if self._subject_device is not None else self._get_devices()[0] ) return device @staticmethod def _agg_strings(x_arg, y_arg) -> str: return f"{x_arg}\n{y_arg}" def _generate_outcomes_for(self, device: str) -> List[Outcome]: return [o for o in self._generate_outcomes() if o.device == device]
[docs] def print_outcome_for(self, device: str) -> str: """Print a human readable description of events occurring relative to the given device. :param device: the FQDN name of the device to be printing an outcome for :return: A string containing the text documenting the results. """ lines = [] outcomes_for_device = self._generate_outcomes_for(device) previous_time_of_occurrence: Union[None, datetime.datetime, str] = None for outcome in outcomes_for_device: if not previous_time_of_occurrence: previous_time_of_occurrence = outcome.transit_time transition_period = "na" else: if isinstance(outcome.transit_time, str) or isinstance( previous_time_of_occurrence, str ): transition_period = "not calculated as transition was skipped" else: transition_period = ( outcome.transit_time - previous_time_of_occurrence ) devices_behind = [ f"\t\t\t{b.device} after by {b.behind_by}" for b in outcome.devices_behind ] devices_ahead = [ f"\t\t\t{a.device} before by {a.ahead_by}" for a in outcome.devices_ahead ] if devices_behind: devices_behind = reduce(self._agg_strings, devices_behind) devices_behind = f"\n{devices_behind}" else: devices_behind = "none" if devices_ahead: devices_ahead = reduce(self._agg_strings, devices_ahead) devices_ahead = f"\n{devices_ahead}" else: devices_ahead = "none" lines.append( f"transition: {outcome.transition} (duration: {transition_period})\n" f"\t\tdevices transitioning afterwards: {devices_behind}\n" f"\t\tdevices transioning before: {devices_ahead}" ) if lines: agg_lines = reduce(self._agg_strings, lines) else: agg_lines = "" return f"outcome for {device}:\n" f"{agg_lines}"
[docs] def tap_in_on_event(self, event: base.EventDataInt): """Implement the abstract `tap_in_on_event`. This will result in a new occurrence to be added with the event as the input argument. :param event: The event object that has occurred. """ self.add_occurrence(event)
[docs] def add_occurrence(self, event: base.EventDataInt): """Add the occurrence of a new event. :param event: the event, caused by a subscription on a producer (e.g tango device) and typically called by an handler, handling that event. """ self._occurrences.append( Occurrence( helpers.get_date_lodged(event), datetime.datetime.now(), helpers.get_attr_value_str(event), helpers.get_attr_name(event), helpers.get_device_name(event), ) )
[docs] def assert_that(self, device: str) -> AssertionOnOccurrence: """Generate an assertion check on the occurrences for a particular device. :param device: the FQDN name for the device being checked. :returns: an object to specify the assertion in a chain like fashion """ return AssertionOnOccurrence(device, self)