IP Block Emulators

Overview

The bitstream emulator functions by pulling in several sub-emulators for individual IP blocks, called IP block emulators. These are located in the same repositories as their respective IP block implementations, in the /emulator directory.

Required Structure

  • Within a full bitstream package, the path to each IP block emulator looks like: emulators/<ip_block_type>/emulator (for example: emulators/ftile_ethernet/emulator).

  • Each of these folders must contain the following component files:

    • __init__.py - Lets Python know this folder is part of a package.

    • ip_block.py - Contains a class simulating basic functionality or values of this IP block for emulator use.

    • state_machine.py - Contains a state machine definition for this IP block emulator, including states, transitions, condition functions, etc.

    • api.py - Contains the API implementation for this IP block emulator.

    • event_handler.py - Contains an event handler class with methods to handle various types of incoming events to this IP block emulator.

  • Other files may be included as necessary.

Note

To import relative files within an IP block emulator, you should use the relative import syntax from .<component> import <object>, for example:

# wideband_frequency_shifter/api.py
from .state_machine import WidebandFrequencyShifterState

These imports will be resolved correctly by the emulator engine and should work for local testing as well, so long as the test script(s) are external to the IP block emulator package(s).

However, the following syntax will work, but is currently not convention as it requires the assumption that the emulator folder is placed on the python module path:

# wideband_frequency_shifter/api.py
from wideband_frequency_shifter.state_machine import WidebandFrequencyShifterState

Simulated IP Block (ip_block.py)

  • Must define a class called EmulatorIPBlock, which acts as a basic simulation of the real IP block, containing for example constants and/or variables which can be manipulated, helper functions, etcetera. The implementation of this class is generally unrestricted except that the __init__ method must be defined and take no arguments except self (as no arguments will be supplied on instantiation). For example:

    class EmulatorIPBlock():
        """IP block simulation for the B123VCC-OSPPFB Channelizer."""
    
        def __init__(self: Self) -> None:
            # const defaults
            self.num_channels = 10
            self.num_polarisations = 2
    
            # variables
            self.sample_rate: int = 0
            self.expected_sample_rate: int = 3_960_000_000
            self.gains: list[dict[str, Any]] = [{
                'pol': p,
                'channel': c,
                'gain': 1.0
            } for pol in range(self.num_polarisations) for c, p in (enumerate([pol] * self.num_channels))]
    

State Machine (state_machine.py)

  • Must define a set of states for this IP block, as a subclass of the BaseState (ska_mid_cbf_emulators.common.BaseState) string enum. The name of this class may be arbitrary. Enum values must be strings (UPPER_SNAKE_CASE recommended). Please ensure this class resides in state_machine.py and not in a separate file. For example:

    class FooState(BaseState):
        """Enum containing possible states for the Foo emulator.
    
        Implements :obj:`BaseState`.
        """
    
        ACTIVE = 'ACTIVE'
        """The Foo is active."""
    
        INACTIVE = 'INACTIVE'
        """The Foo is inactive."""
    
        FAULT = 'FAULT'
        """The Foo has experienced a critical fault."""
    
  • Must define a set of transition “trigger” names for this IP block, as a subclass of the BaseTransitionTrigger (ska_mid_cbf_emulators.common.BaseTransitionTrigger) string enum. The name of this class may be arbitrary. Enum values must be strings (lower_snake_case recommended, e.g. using the enum.auto() function). Please ensure this class resides in state_machine.py and not in a separate file. For example:

    from enum import auto
    
    class FooTransitionTrigger(BaseTransitionTrigger):
        """Enum containing transitions for the Foo emulator.
    
        Implements :obj:`BaseTransitionTrigger`.
        """
    
        RECEIVE_PULSE = auto()
        """The Foo receives a new pulse."""
    
        UPDATE_COUNT = auto()
        """The Foo's count changes."""
    
        CRITICAL_FAULT = auto()
        """The Foo experiences a critical fault."""
    
  • Must define a class called EmulatorStateMachine which implements the base class FiniteStateMachine (ska_mid_cbf_emulators.common.FiniteStateMachine). This class has 3 properties that must be overridden:

    • The _states property returns the list of states used by this state machine. It must be a subset (likely the full set) of the previously defined state enum. For example:

      @override
      @property
      def _states(self: Self) -> list[FooState]:
          return [
              FooState.ACTIVE,
              FooState.INACTIVE,
              FooState.FAULT
          ]
      
    • The _initial_state property returns the starting state of the state machine. For example:

      @override
      @property
      def _initial_state(self: Self) -> FooState:
          return FooState.INACTIVE
      
    • The _transitions property returns the list of possible transitions in this state machine. Each entry must be a dictionary with the following entries:

      • 'source': The source state(s) for this transition. It may be a single state or a list of states. It must only include states defined in the _states property, or be a special state (see below).

      • 'dest': The destination state for this transition. It must be a single state. It must be a state defined in the _states property, or a special state (see below).

      • 'trigger': The transition trigger which will execute this transition.

      • 'conditions' (OPTIONAL): A condition or list of conditions which determine whether this transition is permitted to execute. If specified, the transition will only occur if all conditions return True. Each condition should be a TransitionCondition (ska_mid_cbf_emulators.common.TransitionCondition) instance, which takes in a display name and a condition function. The display name will be used for labeling generated diagrams. The condition function can either be a class method or a lambda, so long as it returns a boolean value. It may take both arguments and keyword arguments, which should then be provided when calling a trigger. Note that all condition functions for a single transition will receive the same set of arguments passed from the trigger when called.

      There also exist two special states exclusively for transition definitions, in addition to any user-defined states, which are as follows:

      • RoutingState.FROM_ANY: Indicates that this transition may occur from any state in the state machine. This may only be used as a source state, and must be the only source state if used.

      • RoutingState.TO_SAME: Indicates a reflexive transition, i.e. that any state listed as a source should remain in the same state when this transition executes. This may only be used as a destination state.

      For example:

      def is_count_nonzero(self, count=None, **kwargs):
          return count is not None and count > 0
      
      @override
      @property
      def _transitions(self) -> list[dict[str, Any]]:
          return [
              {
                  'source': FooState.INACTIVE,
                  'dest': FooState.ACTIVE,
                  'trigger': FooTransitionTrigger.UPDATE_COUNT,
                  'conditions': TransitionCondition(
                      'count > 0',
                      self.is_count_nonzero
                  )
              },
              {
                  'source': FooState.ACTIVE,
                  'dest': FooState.INACTIVE,
                  'trigger': FooTransitionTrigger.UPDATE_COUNT,
                  'conditions': TransitionCondition(
                      'count = 0',
                      lambda count=None, **kwargs: count is not None and count == 0
                  )
              },
              {
                  'source': RoutingState.FROM_ANY,
                  'dest': RoutingState.TO_SAME,
                  'trigger': FooTransitionTrigger.RECEIVE_PULSE
              },
              {
                  'source': RoutingState.FROM_ANY,
                  'dest': FooState.FAULT,
                  'trigger': FooTransitionTrigger.CRITICAL_FAULT
              }
          ]
      

With these definitions in place, the EmulatorSubcontroller (ska_mid_cbf_emulators.common.EmulatorSubcontroller) class defines the following methods which can be leveraged in an API call or event handler, which have access to the IP block emulator’s subcontroller:

EmulatorSubcontroller.get_state(self: Self) BaseState

Returns the current state of the IP block emulator.

EmulatorSubcontroller.trigger(self: Self, trigger_name: BaseTransitionTrigger, *args, **kwargs) None

Triggers the transition with name trigger_name. Passes down any additional provided arguments to the transition’s condition functions.

EmulatorSubcontroller.may_trigger(self: Self, trigger_name: BaseTransitionTrigger, *args, **kwargs) bool

Checks whether the specified transition is currently possible (i.e. is allowed from the current state, and all conditions pass). Returns True if possible, and False otherwise.

EmulatorSubcontroller.trigger_if_allowed(self: Self, trigger_name: BaseTransitionTrigger, *args, **kwargs) bool

A combination of the above two methods. First checks if the transition is allowed, and then executes it if so. Returns True if the check passes and the transition executes successfully. Otherwise, returns False.

API (api.py)

  • Must define a class called EmulatorApi which implements the base class BaseEmulatorApi (ska_mid_cbf_emulators.common.BaseEmulatorApi).

  • Methods of this class are either API route definitions or private internal methods.

    • A route definition is indicated by the @BaseEmulatorApi.route (ska_mid_cbf_emulators.common.BaseEmulatorApi.route()) decorator. It takes the HTTP method of the route as its sole argument. The generated route will always have the same name as the method defining it.

    • To specify route parameters, use the PathParam[T], QueryParam[T], or BodyParam[T] generic type annotations. For example, the route host:8000/api/insert_data/{target_key}?index={index}, which expects a body, may look like:

      @BaseEmulatorApi.route(http_method=HttpMethod.POST)
      def insert_data(
          target_key: PathParam[str],
          index: QueryParam[int],
          data: BodyParam[dict]
      ) -> InternalRestResponse:
          self.targets[target_key][index] = data
          return InternalRestResponse.ok()
      
    • These parameters can be used within the function identically as if they were of type T; the aliases simply provide additional type metadata to the emulator.

    • Note that only a single path parameter and body are supported (per route). The number of query parameters is unrestricted.

    • Any method that is not decorated with @BaseEmulatorApi.route is considered a private internal method. They can be used within route definition methods as helper methods, but routes and metadata will not be generated for them.

    • All methods have access to self.subcontroller: EmulatorSubcontroller (for state manipulation; see the state machine section above) and self.ip_block: EmulatorIPBlock (for data manipulation).

    • The BaseEmulatorApi base class also exposes the basic logger methods self.log_<level>(msg); see: Logging.

    • Note that all methods, regardless of decorator, can be used as functions normally. If doing this, however, due to certain implementation details, self must be explicitly passed as an argument to @route-decorated methods. So for example, in a test script, for the above method, you could call

      some_api.insert_data(some_api, target_key='some_target', index=2, data={'some_key': 'some_value'})
      

      and it would return the correct InternalRestResponse (ska_mid_cbf_emulators.common.InternalRestResponse) object just as it would to the API caller.

Event Handler (event_handler.py)

  • Must define a class called EmulatorEventHandler which implements the base class BaseEventHandler (ska_mid_cbf_emulators.common.BaseEventHandler).

  • This class may override any or all of the following three methods (additional helper methods are acceptable and ignored by validation):

EmulatorEventHandler.handle_pulse_event(self: Self, event: PulseEvent, **kwargs) None

This method will be used to process all incoming pulse events. It is run once per pulse. Most blocks should not need to implement this. The method has no return value (anything returned is discarded). Does nothing of note if not overridden.

EmulatorEventHandler.handle_signal_update_events(self: Self, event_list: SignalUpdateEventList, **kwargs) SignalUpdateEventList

This method will be used to process all incoming signal update events. It is run once per pulse, and takes in a list of signal update events associated with that pulse. It is responsible for adding, removing or updating signal values in some or all of the events in the list. The method should return a list of signal update events (can be the same list, a mutated list, or a new list); whatever list is returned by this method will automatically be sent down to the next block(s) by the engine. If not overridden, simply returns the original list.

EmulatorEventHandler.handle_manual_event(self: Self, event: ManualEvent, **kwargs) None | list[ManualEvent]

This method will be used to process all incoming manual events. It is run once per manual event (possibly many times per pulse). Most blocks will probably have at minimum an implementation for this method. Pretty much any functionality desired can go in this method. It may have no return value, or it may optionally return a list of new or updated manual events; if it does, those returned events will automatically be sent downstream to the next block(s) for processing, similarly to the way signal updates work. (If it has no return value, or returns None, no further processing will occur.) This method does nothing of note if not overridden.

  • The subcontroller (self.subcontroller) and emulator IP block (self.ip_block) are both exposed by the base class and their methods and properties can be used at will.

  • Example:

    class EmulatorEventHandler(BaseEventHandler):
    
    @override
    def handle_pulse_event(self: Self, event: PulseEvent, **kwargs) -> None:
        """Handle an incoming pulse event.
    
        Args:
            event (:obj:`PulseEvent`): The event to handle.
            **kwargs: Arbitrary keyword arguments.
        """
        self.log_trace(f'Foo Pulse event handler called for {event}')
    
        self.subcontroller.trigger_if_allowed(
            FooTransitionTrigger.RECEIVE_PULSE,
            packet_rate=getattr(self, 'packet_rate', 0)
        )
    
    @override
    def handle_signal_update_events(self: Self, event_list: SignalUpdateEventList, **kwargs) -> SignalUpdateEventList:
        """Handle an incoming Signal Update event list.
    
        Args:
            event_list (:obj:`SignalUpdateEventList`): The signal update event list to handle.
            **kwargs: Arbitrary keyword arguments.
    
        Returns:
            :obj:`SignalUpdateEventList` The signal update event list to send to the next block.
        """
        self.log_trace(f'Foo Signal Update event handler called for {event_list}')
    
        if len(event_list) > 1:
            event_list.events = event_list.events[:1]
    
        return event_list
    
    @override
    def handle_manual_event(self: Self, event: ManualEvent, **kwargs) -> None | list[ManualEvent]:
        """Handle an incoming manual event.
    
        Args:
            event (:obj:`ManualEvent`): The manual event to handle.
            **kwargs: Arbitrary keyword arguments.
    
        Returns:
            :obj:`None | list[ManualEvent]` Optionally, a list of one or more new manual events \
                to automatically forward downstream.
        """
        self.log_trace(f'Foo manual event handler called for {event}')
    
        match event.subtype:
    
            case ManualEventSubType.GENERAL:
                self.log_debug(f'{event.subtype} implementation TBD')
    
            case ManualEventSubType.INJECTION:
                if event.value.get('injection_type') == 'update_count':
                    self.subcontroller.trigger_if_allowed(
                        FooTransitionTrigger.UPDATE_COUNT,
                        count=int(event.value.get('count'))
                    )
    
                elif event.severity == EventSeverity.FATAL_ERROR:
                    self.subcontroller.trigger_if_allowed(
                        FooTransitionTrigger.CRITICAL_FAULT
                    )
    
            case _:
                self.log_debug(f'Unhandled event type {event.subtype}')
    

Validation

Any set of IP block emulators may be validated using the helper script validate_bitstream_files.py. Specify a parent folder name as the argument and the script will validate any IP block emulators it finds within that folder, or additionally provide -i <ip_block> to check specific ones within that folder. This script uses the same validation functionality as the full bitstream emulator, so if validation passes here, it should pass in the real thing too. See the script file or run poetry run python validate_bitstream_files.py --help for more details.

Miscellaneous Features

Forced Signal Updates

As part of the subcontroller, it is possible to force a signal update from an IP block emulator using the force_signal_update (ska_mid_cbf_emulators.common.EmulatorSubcontroller.force_signal_update()) method. This causes the IP block in question to re-run its Signal Update Event handler, based on the last known signal from the Signal Generator, and flows the newly generated events downstream. For example:

def some_api_call(self, something):
  self.ip_block.value_that_affects_the_signal = some_new_value
  self.subcontroller.force_signal_update()

Logging

The API and Event Handler base classes both extend the LoggingBase (ska_mid_cbf_emulators.common.LoggingBase) class, which provides the following basic logger methods:

  • self.log_critical(msg): Log a CRITICAL message. These should be used very sparingly and for matters requiring both immediate attention and action.

  • self.log_error(msg): Log an ERROR message. These should be used for errors and/or situations which should garner the rapid attention of a user/operator.

  • self.log_warning(msg): Log a WARNING message. These are messages that should be noted by the user, but may not necessarily require immediate attention.

  • self.log_info(msg): Log an INFO message. These should provide general neutral information that may be useful to an average user.

  • self.log_debug(msg): Log a DEBUG message. These should provide information that may be useful to an advanced user, sysop, or developer.

  • self.log_trace(msg): Log a TRACE message. These should provide a reference to a specific area of the code and are intended only for developers to locate potential issues.

Delayed Actions

In some scenarios, e.g. in some API calls, we may want to execute a sequence of actions on a time delay. By default, each API call to an IP block emulator hogs the API thread for that IP block emulator. One possible workaround is to use the delay_action (ska_mid_cbf_emulators.common.delay_action()) function provided in ska_mid_cbf_emulators.common package:

from ska_mid_cbf_emulators.common import delay_action

def my_handler(self, something):
    self.value = something

    # Execute `self.subcontroller.trigger(SomeTransitionTrigger.START)` after 500ms
    delay_action(500, self.subcontroller.trigger(SomeTransitionTrigger.START))

    # Execute `print(self.ip_block.some_value_to_print)` after 1500ms
    delay_action(1500, print, self.ip_block.some_value_to_print)

The function signature looks like:

delay_action(delay_ms: int, action_fn: Callable, *fn_args, **fn_kwargs) Timer
Parameters:
  • delay_ms – The amount of time to delay the action by, in milliseconds.

  • action_fn – The action to execute after the specified delay.

  • *fn_args – Arbitrary positional arguments to pass to action_fn when executed.

  • **fn_kwargs – Arbitrary keyword arguments to pass to action_fn when executed.

Returns:
  • (threading.Timer) The created Timer object, in case any further manipulation is required.

Notes:

  • The delay_action function simply wraps Python’s built-in threading.Timer thread object, thus each delayed action is run in its own sub-thread. The delay_action function returns as soon as the Timer is started, making it effectively asynchronous.

  • The delay time is relative to the calling thread. In the example above, this means that the second action will be performed (roughly) 1000ms after the first action, not 1500ms.

  • Attempting to join (threading.Thread.join()) a delayed action will have the same effect as calling time.sleep() on the calling thread. Delayed actions should only be used if the calling thread does not rely on their completion.

  • Raising (or encountering) an exception inside the action function will log the error message via the emulator logger and kill the thread without impacting other functionality.

  • It is strongly advised to return 202 Accepted (ska_mid_cbf_emulators.common.InternalRestResponse.accepted()) instead of 200 OK for API functions using delayed actions.

  • You may of course also just spawn a regular Python thread to perform an asynchronous task, if you don’t need a time delay.

  • Delayed actions may be chained recursively. For example, the following is perfectly valid:

from ska_mid_cbf_emulators.common import delay_action

def my_handler(self, something):
    self.value = something
    def action_to_delay():
        self.subcontroller.trigger(MacTransitionTrigger.START)

        # Execute `print(self.ip_block.some_value_to_print)`
        # 5000ms after *this* delay_action call
        # i.e. ~15000ms after the original delay_action call
        delay_action(5000, lambda: print(self.ip_block.some_value_to_print))

    # Execute `self.subcontroller.trigger(SomeTransitionTrigger.START)`
    # (within `action_to_delay`) after 10000ms
    delay_action(10000, action_to_delay)