IP Block Emulators
Overview
The bitstream emulator functions by pulling in several sub-emulators for individual IP blocks, called IP block emulators.
These are located in the same repositories as their respective IP block implementations, in the /emulator directory.
Required Structure
Within a full bitstream package, the path to each IP block emulator looks like:
emulators/<ip_block_type>/emulator(for example:emulators/ftile_ethernet/emulator).Each of these folders must contain the following component files:
__init__.py- Lets Python know this folder is part of a package.ip_block.py- Contains a class simulating basic functionality or values of this IP block for emulator use.state_machine.py- Contains a state machine definition for this IP block emulator, including states, transitions, condition functions, etc.api.py- Contains the API implementation for this IP block emulator.event_handler.py- Contains an event handler class with methods to handle various types of incoming events to this IP block emulator.
Other files may be included as necessary.
Note
To import relative files within an IP block emulator,
you should use the relative import syntax from .<component> import <object>, for example:
# wideband_frequency_shifter/api.py from .state_machine import WidebandFrequencyShifterStateThese imports will be resolved correctly by the emulator engine and should work for local testing as well, so long as the test script(s) are external to the IP block emulator package(s).
However, the following syntax will work, but is currently not convention as it requires the assumption that the emulator folder is placed on the python module path:
# wideband_frequency_shifter/api.py from wideband_frequency_shifter.state_machine import WidebandFrequencyShifterState
Simulated IP Block (ip_block.py)
Must define a class called
EmulatorIPBlock, which acts as a basic simulation of the real IP block, containing for example constants and/or variables which can be manipulated, helper functions, etcetera. The implementation of this class is generally unrestricted except that the__init__method must be defined and take no arguments exceptself(as no arguments will be supplied on instantiation). For example:class EmulatorIPBlock(): """IP block simulation for the B123VCC-OSPPFB Channelizer.""" def __init__(self: Self) -> None: # const defaults self.num_channels = 10 self.num_polarisations = 2 # variables self.sample_rate: int = 0 self.expected_sample_rate: int = 3_960_000_000 self.gains: list[dict[str, Any]] = [{ 'pol': p, 'channel': c, 'gain': 1.0 } for pol in range(self.num_polarisations) for c, p in (enumerate([pol] * self.num_channels))]
State Machine (state_machine.py)
Must define a set of states for this IP block, as a subclass of the
BaseState(ska_mid_cbf_emulators.common.BaseState) string enum. The name of this class may be arbitrary. Enum values must be strings (UPPER_SNAKE_CASErecommended). Please ensure this class resides instate_machine.pyand not in a separate file. For example:class FooState(BaseState): """Enum containing possible states for the Foo emulator. Implements :obj:`BaseState`. """ ACTIVE = 'ACTIVE' """The Foo is active.""" INACTIVE = 'INACTIVE' """The Foo is inactive.""" FAULT = 'FAULT' """The Foo has experienced a critical fault."""
Must define a set of transition “trigger” names for this IP block, as a subclass of the
BaseTransitionTrigger(ska_mid_cbf_emulators.common.BaseTransitionTrigger) string enum. The name of this class may be arbitrary. Enum values must be strings (lower_snake_caserecommended, e.g. using theenum.auto()function). Please ensure this class resides instate_machine.pyand not in a separate file. For example:from enum import auto class FooTransitionTrigger(BaseTransitionTrigger): """Enum containing transitions for the Foo emulator. Implements :obj:`BaseTransitionTrigger`. """ RECEIVE_PULSE = auto() """The Foo receives a new pulse.""" UPDATE_COUNT = auto() """The Foo's count changes.""" CRITICAL_FAULT = auto() """The Foo experiences a critical fault."""
Must define a class called
EmulatorStateMachinewhich implements the base classFiniteStateMachine(ska_mid_cbf_emulators.common.FiniteStateMachine). This class has 3 properties that must be overridden:The
_statesproperty returns the list of states used by this state machine. It must be a subset (likely the full set) of the previously defined state enum. For example:@override @property def _states(self: Self) -> list[FooState]: return [ FooState.ACTIVE, FooState.INACTIVE, FooState.FAULT ]
The
_initial_stateproperty returns the starting state of the state machine. For example:@override @property def _initial_state(self: Self) -> FooState: return FooState.INACTIVE
The
_transitionsproperty returns the list of possible transitions in this state machine. Each entry must be a dictionary with the following entries:'source': The source state(s) for this transition. It may be a single state or a list of states. It must only include states defined in the_statesproperty, or be a special state (see below).'dest': The destination state for this transition. It must be a single state. It must be a state defined in the_statesproperty, or a special state (see below).'trigger': The transition trigger which will execute this transition.'conditions'(OPTIONAL): A condition or list of conditions which determine whether this transition is permitted to execute. If specified, the transition will only occur if all conditions returnTrue. Each condition should be aTransitionCondition(ska_mid_cbf_emulators.common.TransitionCondition) instance, which takes in a display name and a condition function. The display name will be used for labeling generated diagrams. The condition function can either be a class method or a lambda, so long as it returns a boolean value. It may take both arguments and keyword arguments, which should then be provided when calling a trigger. Note that all condition functions for a single transition will receive the same set of arguments passed from the trigger when called.
There also exist two special states exclusively for transition definitions, in addition to any user-defined states, which are as follows:
RoutingState.FROM_ANY: Indicates that this transition may occur from any state in the state machine. This may only be used as a source state, and must be the only source state if used.RoutingState.TO_SAME: Indicates a reflexive transition, i.e. that any state listed as a source should remain in the same state when this transition executes. This may only be used as a destination state.
For example:
def is_count_nonzero(self, count=None, **kwargs): return count is not None and count > 0 @override @property def _transitions(self) -> list[dict[str, Any]]: return [ { 'source': FooState.INACTIVE, 'dest': FooState.ACTIVE, 'trigger': FooTransitionTrigger.UPDATE_COUNT, 'conditions': TransitionCondition( 'count > 0', self.is_count_nonzero ) }, { 'source': FooState.ACTIVE, 'dest': FooState.INACTIVE, 'trigger': FooTransitionTrigger.UPDATE_COUNT, 'conditions': TransitionCondition( 'count = 0', lambda count=None, **kwargs: count is not None and count == 0 ) }, { 'source': RoutingState.FROM_ANY, 'dest': RoutingState.TO_SAME, 'trigger': FooTransitionTrigger.RECEIVE_PULSE }, { 'source': RoutingState.FROM_ANY, 'dest': FooState.FAULT, 'trigger': FooTransitionTrigger.CRITICAL_FAULT } ]
With these definitions in place, the EmulatorSubcontroller (ska_mid_cbf_emulators.common.EmulatorSubcontroller) class defines the following methods
which can be leveraged in an API call or event handler, which have access to the IP block emulator’s subcontroller:
- EmulatorSubcontroller.get_state(self: Self) BaseState
Returns the current state of the IP block emulator.
- EmulatorSubcontroller.trigger(self: Self, trigger_name: BaseTransitionTrigger, *args, **kwargs) None
Triggers the transition with name
trigger_name. Passes down any additional provided arguments to the transition’s condition functions.
- EmulatorSubcontroller.may_trigger(self: Self, trigger_name: BaseTransitionTrigger, *args, **kwargs) bool
Checks whether the specified transition is currently possible (i.e. is allowed from the current state, and all conditions pass). Returns
Trueif possible, andFalseotherwise.
- EmulatorSubcontroller.trigger_if_allowed(self: Self, trigger_name: BaseTransitionTrigger, *args, **kwargs) bool
A combination of the above two methods. First checks if the transition is allowed, and then executes it if so. Returns
Trueif the check passes and the transition executes successfully. Otherwise, returnsFalse.
API (api.py)
Must define a class called
EmulatorApiwhich implements the base classBaseEmulatorApi(ska_mid_cbf_emulators.common.BaseEmulatorApi).Methods of this class are either API route definitions or private internal methods.
A route definition is indicated by the
@BaseEmulatorApi.route(ska_mid_cbf_emulators.common.BaseEmulatorApi.route()) decorator. It takes the HTTP method of the route as its sole argument. The generated route will always have the same name as the method defining it.To specify route parameters, use the
PathParam[T],QueryParam[T], orBodyParam[T]generic type annotations. For example, the routehost:8000/api/insert_data/{target_key}?index={index}, which expects a body, may look like:@BaseEmulatorApi.route(http_method=HttpMethod.POST) def insert_data( target_key: PathParam[str], index: QueryParam[int], data: BodyParam[dict] ) -> InternalRestResponse: self.targets[target_key][index] = data return InternalRestResponse.ok()
These parameters can be used within the function identically as if they were of type
T; the aliases simply provide additional type metadata to the emulator.Note that only a single path parameter and body are supported (per route). The number of query parameters is unrestricted.
Any method that is not decorated with
@BaseEmulatorApi.routeis considered a private internal method. They can be used within route definition methods as helper methods, but routes and metadata will not be generated for them.All methods have access to
self.subcontroller: EmulatorSubcontroller(for state manipulation; see the state machine section above) andself.ip_block: EmulatorIPBlock(for data manipulation).The
BaseEmulatorApibase class also exposes the basic logger methodsself.log_<level>(msg); see: Logging.Note that all methods, regardless of decorator, can be used as functions normally. If doing this, however, due to certain implementation details,
selfmust be explicitly passed as an argument to@route-decorated methods. So for example, in a test script, for the above method, you could callsome_api.insert_data(some_api, target_key='some_target', index=2, data={'some_key': 'some_value'})
and it would return the correct
InternalRestResponse(ska_mid_cbf_emulators.common.InternalRestResponse) object just as it would to the API caller.
Event Handler (event_handler.py)
Must define a class called
EmulatorEventHandlerwhich implements the base classBaseEventHandler(ska_mid_cbf_emulators.common.BaseEventHandler).This class may override any or all of the following three methods (additional helper methods are acceptable and ignored by validation):
- EmulatorEventHandler.handle_pulse_event(self: Self, event: PulseEvent, **kwargs) None
This method will be used to process all incoming pulse events. It is run once per pulse. Most blocks should not need to implement this. The method has no return value (anything returned is discarded). Does nothing of note if not overridden.
- EmulatorEventHandler.handle_signal_update_events(self: Self, event_list: SignalUpdateEventList, **kwargs) SignalUpdateEventList
This method will be used to process all incoming signal update events. It is run once per pulse, and takes in a list of signal update events associated with that pulse. It is responsible for adding, removing or updating signal values in some or all of the events in the list. The method should return a list of signal update events (can be the same list, a mutated list, or a new list); whatever list is returned by this method will automatically be sent down to the next block(s) by the engine. If not overridden, simply returns the original list.
- EmulatorEventHandler.handle_manual_event(self: Self, event: ManualEvent, **kwargs) None | list[ManualEvent]
This method will be used to process all incoming manual events. It is run once per manual event (possibly many times per pulse). Most blocks will probably have at minimum an implementation for this method. Pretty much any functionality desired can go in this method. It may have no return value, or it may optionally return a list of new or updated manual events; if it does, those returned events will automatically be sent downstream to the next block(s) for processing, similarly to the way signal updates work. (If it has no return value, or returns None, no further processing will occur.) This method does nothing of note if not overridden.
The subcontroller (
self.subcontroller) and emulator IP block (self.ip_block) are both exposed by the base class and their methods and properties can be used at will.Example:
class EmulatorEventHandler(BaseEventHandler): @override def handle_pulse_event(self: Self, event: PulseEvent, **kwargs) -> None: """Handle an incoming pulse event. Args: event (:obj:`PulseEvent`): The event to handle. **kwargs: Arbitrary keyword arguments. """ self.log_trace(f'Foo Pulse event handler called for {event}') self.subcontroller.trigger_if_allowed( FooTransitionTrigger.RECEIVE_PULSE, packet_rate=getattr(self, 'packet_rate', 0) ) @override def handle_signal_update_events(self: Self, event_list: SignalUpdateEventList, **kwargs) -> SignalUpdateEventList: """Handle an incoming Signal Update event list. Args: event_list (:obj:`SignalUpdateEventList`): The signal update event list to handle. **kwargs: Arbitrary keyword arguments. Returns: :obj:`SignalUpdateEventList` The signal update event list to send to the next block. """ self.log_trace(f'Foo Signal Update event handler called for {event_list}') if len(event_list) > 1: event_list.events = event_list.events[:1] return event_list @override def handle_manual_event(self: Self, event: ManualEvent, **kwargs) -> None | list[ManualEvent]: """Handle an incoming manual event. Args: event (:obj:`ManualEvent`): The manual event to handle. **kwargs: Arbitrary keyword arguments. Returns: :obj:`None | list[ManualEvent]` Optionally, a list of one or more new manual events \ to automatically forward downstream. """ self.log_trace(f'Foo manual event handler called for {event}') match event.subtype: case ManualEventSubType.GENERAL: self.log_debug(f'{event.subtype} implementation TBD') case ManualEventSubType.INJECTION: if event.value.get('injection_type') == 'update_count': self.subcontroller.trigger_if_allowed( FooTransitionTrigger.UPDATE_COUNT, count=int(event.value.get('count')) ) elif event.severity == EventSeverity.FATAL_ERROR: self.subcontroller.trigger_if_allowed( FooTransitionTrigger.CRITICAL_FAULT ) case _: self.log_debug(f'Unhandled event type {event.subtype}')
Validation
Any set of IP block emulators may be validated using the helper script
validate_bitstream_files.py.
Specify a parent folder name as the argument and the script will validate any IP block emulators
it finds within that folder, or additionally provide -i <ip_block> to check specific ones within that folder.
This script uses the same validation functionality as the full bitstream emulator,
so if validation passes here, it should pass in the real thing too.
See the script file or run poetry run python validate_bitstream_files.py --help for more details.
Miscellaneous Features
Forced Signal Updates
As part of the subcontroller, it is possible to force a signal update from an IP block emulator using the
force_signal_update (ska_mid_cbf_emulators.common.EmulatorSubcontroller.force_signal_update()) method.
This causes the IP block in question to re-run its Signal Update Event handler, based on the last known signal from the Signal Generator,
and flows the newly generated events downstream. For example:
def some_api_call(self, something):
self.ip_block.value_that_affects_the_signal = some_new_value
self.subcontroller.force_signal_update()
Logging
The API and Event Handler base classes both extend
the LoggingBase (ska_mid_cbf_emulators.common.LoggingBase) class, which provides the following basic logger methods:
self.log_critical(msg): Log a CRITICAL message. These should be used very sparingly and for matters requiring both immediate attention and action.
self.log_error(msg): Log an ERROR message. These should be used for errors and/or situations which should garner the rapid attention of a user/operator.
self.log_warning(msg): Log a WARNING message. These are messages that should be noted by the user, but may not necessarily require immediate attention.
self.log_info(msg): Log an INFO message. These should provide general neutral information that may be useful to an average user.
self.log_debug(msg): Log a DEBUG message. These should provide information that may be useful to an advanced user, sysop, or developer.
self.log_trace(msg): Log a TRACE message. These should provide a reference to a specific area of the code and are intended only for developers to locate potential issues.
Delayed Actions
In some scenarios, e.g. in some API calls, we may want to execute a sequence of actions on a time delay.
By default, each API call to an IP block emulator hogs the API thread for that IP block emulator.
One possible workaround is to use the delay_action (ska_mid_cbf_emulators.common.delay_action()) function
provided in ska_mid_cbf_emulators.common package:
from ska_mid_cbf_emulators.common import delay_action
def my_handler(self, something):
self.value = something
# Execute `self.subcontroller.trigger(SomeTransitionTrigger.START)` after 500ms
delay_action(500, self.subcontroller.trigger(SomeTransitionTrigger.START))
# Execute `print(self.ip_block.some_value_to_print)` after 1500ms
delay_action(1500, print, self.ip_block.some_value_to_print)
The function signature looks like:
- Parameters:
delay_ms – The amount of time to delay the action by, in milliseconds.
action_fn – The action to execute after the specified delay.
*fn_args – Arbitrary positional arguments to pass to
action_fnwhen executed.**fn_kwargs – Arbitrary keyword arguments to pass to
action_fnwhen executed.
- Returns:
(
threading.Timer) The createdTimerobject, in case any further manipulation is required.
Notes:
The
delay_actionfunction simply wraps Python’s built-inthreading.Timerthread object, thus each delayed action is run in its own sub-thread. Thedelay_actionfunction returns as soon as theTimeris started, making it effectively asynchronous.The delay time is relative to the calling thread. In the example above, this means that the second action will be performed (roughly) 1000ms after the first action, not 1500ms.
Attempting to
join(threading.Thread.join()) a delayed action will have the same effect as callingtime.sleep()on the calling thread. Delayed actions should only be used if the calling thread does not rely on their completion.Raising (or encountering) an exception inside the action function will log the error message via the emulator logger and kill the thread without impacting other functionality.
It is strongly advised to return
202 Accepted(ska_mid_cbf_emulators.common.InternalRestResponse.accepted()) instead of200 OKfor API functions using delayed actions.You may of course also just spawn a regular Python thread to perform an asynchronous task, if you don’t need a time delay.
Delayed actions may be chained recursively. For example, the following is perfectly valid:
from ska_mid_cbf_emulators.common import delay_action
def my_handler(self, something):
self.value = something
def action_to_delay():
self.subcontroller.trigger(MacTransitionTrigger.START)
# Execute `print(self.ip_block.some_value_to_print)`
# 5000ms after *this* delay_action call
# i.e. ~15000ms after the original delay_action call
delay_action(5000, lambda: print(self.ip_block.some_value_to_print))
# Execute `self.subcontroller.trigger(SomeTransitionTrigger.START)`
# (within `action_to_delay`) after 10000ms
delay_action(10000, action_to_delay)