ska_oso_oet

Reading ska_oso_oet.ini file value and initializing constant of feature toggle with enabling event based polling/pubsub

ska_oso_oet.main

class ska_oso_oet.main.ActivityServiceWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, mp_context: BaseContext, *args, **kwargs)[source]

ActivityServiceWorker listens for user request messages, calling the appropriate ActivityService function and broadcasting its response.

__init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, mp_context: BaseContext, *args, **kwargs)[source]

Create a new QueueProcWorker.

The events and MPQueues passed to this constructor should be created and managed within the scope of a MainContext context manager and shared with other ProcWorkers, so that the communication queues are shared correctly between Python processes and there is a common event that can be set to notify all processes when shutdown is required.

Parameters:
  • name – name of this worker

  • startup_event – event to trigger when startup is complete

  • shutdown_event – event to monitor for shutdown

  • event_q – outbox for posting messages to main context

  • work_q – inbox message queue for work messages

  • args – captures other anonymous arguments

  • kwargs – captures other keyword arguments

shutdown() None[source]

Disconnect republishing function from pypubsub

startup() None[source]

Connect republishing function to pypubsub.

class ska_oso_oet.main.EventBusWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, **kwargs)[source]

EventBusWorker converts external inter-process pub/sub messages to and from local intra-process pubsub messages.

EventBusWorker uses the QueueProcWorker’s ‘work queue’ as an inbox for pub/sub EventMessages sent by other ProcWorkers. EventMessages received on this queue are rebroadcast locally as pypubsub messages. Likewise, the EventBusWorker listens to all pypubsub messages broadcast locally, converts them to pub/sub EventQueue messages, and puts them on the ‘main’ queue for transmission to other EventBusWorkers.

main_func(evt: EventMessage) None[source]

Republish external pub/sub message locally.

QueueProcWorker ensures that main_func is called for every item in the work queue. This function takes that work item - the external pub/sub EventMessage - and rebroadcasts it locally as a pypubsub message.

Parameters:

evt – pub/sub EventMessage to broadcast locally

republish(topic: pubsub.pub.Topic = pubsub.pub.AUTO_TOPIC, **kwargs) None[source]

Republish a local event over the inter-process event bus.

Parameters:
  • topic – message topic, set automatically by pypubsub

  • kwargs – any metadata associated with pypubsub message

Returns:

shutdown() None[source]

Disconnect republishing function from pypubsub

startup() None[source]

Connect republishing function to pypubsub.

class ska_oso_oet.main.FlaskWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, *args, **kwargs)[source]

FlaskWorker is an EventBusWorker that runs Flask.

By extending EventBusWorker, Flask functions can use pypubsub to subscribe to and publish messages, and these messages will put on the main queue to be broadcast to other EventBusWorkers.

shutdown() None[source]

Disconnect republishing function from pypubsub

startup() None[source]

Connect republishing function to pypubsub.

class ska_oso_oet.main.ScriptExecutionServiceWorker(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, mp_context: BaseContext, *args, **kwargs)[source]

ScriptExecutionService listens for user request messages, calling the appropriate ScriptExecutionService function and broadcasting its response.

Actions that occur in the user request domain (‘user clicked start observation’, ‘user aborted observation using the CLI’, etc.) are broadcast as events. ScriptExecutionServiceWorker listens for events on these topics and triggers the required action in the script execution domain (‘start a script’, ‘abort a script’, etc.).

Currently, the result of the action that occurred in the script execution domain (=the return object from the ScriptExecutionService) is broadcast to the world by the ScriptExecutionServiceWorker. This could change so that the ScriptExecutionService itself sends the message.

__init__(name: str, startup_event: Event, shutdown_event: Event, event_q: MPQueue, work_q: MPQueue, mp_context: BaseContext, *args, **kwargs)[source]

Create a new QueueProcWorker.

The events and MPQueues passed to this constructor should be created and managed within the scope of a MainContext context manager and shared with other ProcWorkers, so that the communication queues are shared correctly between Python processes and there is a common event that can be set to notify all processes when shutdown is required.

Parameters:
  • name – name of this worker

  • startup_event – event to trigger when startup is complete

  • shutdown_event – event to monitor for shutdown

  • event_q – outbox for posting messages to main context

  • work_q – inbox message queue for work messages

  • args – captures other anonymous arguments

  • kwargs – captures other keyword arguments

shutdown() None[source]

Disconnect republishing function from pypubsub

startup() None[source]

Connect republishing function to pypubsub.

ska_oso_oet.main.main(mp_ctx: BaseContext)[source]

Create the OET components and start an event loop that dispatches messages between them.

Parameters:

logging_config

ska_oso_oet.main.main_loop(main_ctx: MainContext, event_bus_queues: List[MPQueue])[source]

Main message parsing and routing loop, extracted from main() to increase testability.

Parameters:
  • main_ctx

  • event_bus_queues

Returns:

ska_oso_oet.tango

class ska_oso_oet.tango.TangoDeviceProxyFactory[source]

A call to create Tango DeviceProxy clients. This class exists to allow unit tests to override the factory with an implementation that returns mock DeviceProxy instances.

class ska_oso_oet.tango.TangoExecutor(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]

TangoExecutor is the proxy between calling code and Tango devices. It accepts encapsulated Tango interactions and performs them on behalf of the calling code.

class SingleQueueEventStrategy(mgr: SubscriptionManager)[source]

SingleQueueEventStrategy encapsulates the event handling behaviour of the TangoExecutor from ~October 2021, when all events were added to a single queue and subscriptions were created and released after each attribute read operation.

We hope to replace this with a more advanced implementation that allows subscriptions to multiple events.

Parameters:

mgr – SubscriptionManager instance used to observe events

__init__(mgr: SubscriptionManager)[source]
notify(evt: tango.EventData)[source]

This implements the SubscriptionManager EventObserver interface. Tango ChangeEvents republished by the SubscriptionManager are received via this method.

Queue is thread-safe so we do not need to synchronise this method with read_event.

read_event(attr: Attribute) tango.EventData[source]

Read an event from the queue. This function blocks until an event is received.

With a single subscription active at any one time, the attribute is ignored by this implementation but is expected to be required by strategy that support multiple attribute subscriptions.

subscribe_event(attr: Attribute) int[source]

Subscribe to change events published by a Tango attribute.

This strategy only supports one active subscription at any time. An exception will be raised if a second subscription is attempted.

This method returns a subscription identifier which should be supplied to a subsequent unsubscribe_event method.

Parameters:

attr – attribute to subscribe to

Returns:

subscription identifier

unsubscribe_event(attr: Attribute, subscription_id: int) None[source]

Unsubscribe to change events published by a Tango attribute.

This strategy only supports one active subscription at any time. An exception will be raised if a second subscription is attempted.

Parameters:
  • attr – attribute to unsubscribe from

  • subscription_id – subscription identifier

__init__(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]

Create a new TangoExecutor.

Parameters:

proxy_factory – a function or object which, when called, returns an object that conforms to the PyTango DeviceProxy interface.

execute(command: Command, **kwargs)[source]

Execute a Command on a Tango device.

Additional kwargs to the DeviceProxy can be specified if required.

Parameters:

command – the command to execute

Returns:

the response, if any, returned by the Tango device

read(attribute: Attribute)[source]

Read an attribute on a Tango device.

Parameters:

attribute – the attribute to read

Returns:

the attribute value

read_event(attr: Attribute) tango.EventData[source]

Get an event for the specified attribute.

subscribe_event(attribute: Attribute)[source]

Subscribe event on a Tango device.

Parameters:

attribute – the attribute to subscribe to

Returns:

subscription ID

unsubscribe_event(attribute: Attribute, event_id: int)[source]

unsubscribe event on a Tango device.

Parameters:
  • attribute – the attribute to unsubscribe

  • event_id – event subscribe id

Returns:

class ska_oso_oet.tango.SubscriptionManager(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]

SubscriptionManager is a proxy for Tango event subscriptions that prevents duplicate subscriptions and minimises subscribe/unsubscribe calls.

Previously, each time a script listened to an event, it would subscribe to an event, wait for reception of the appropriate event, then unsubscribe. These multiple subscribe/unsubscribe calls were found to create problems. SubscriptionManager was introduced to manage subscriptions, with the aim of having fewer, longer-lived subscriptions. Clients subscribe to the SubscriptionManager, and the SubscriptionManager handles any required subscriptions to Tango devices.

The SubscriptionManager component is responsible for managing events and event subscriptions in the OET. The SubscriptionManager sits as a proxy between client and Tango event subscriptions, moving the pub/sub layer accessed by clients away from the Tango layer and into the OET layer. Clients register with the SubscriptionManager as observers of an attribute. If required, one long-lived Tango subscription per attribute is created on demand by the SubscriptionManager. The SubscriptionManager relays received Tango events to all attribute observers registered at the time of event reception. Unregistering an observer from the SubscriptionManager prevents subsequent notifications but does not affect the underlying Tango event subscription, which continues to operate until the Python interpreter exits.

Legacy calling code expects a maximum of one subscription to be active at any one time. Additionally, the caller always sandwiched read_event calls between subscribe_attribute and unsubscribe_attribute calls. Together, this meant subscriptions were short-lived, existing for the duration of a single attribute monitoring operation, and that one Queue to hold events was sufficient as there would only ever be one Tango event subscription. To maintain this legacy behaviour, subscribe_attribute and unsubscribe_attribute register and unregister the TangoExecutor as an observer of events, with the TangoExecutor.notify method adding received events to the TangoExecutor queue read by the legacy TangoExecutor.read_event method.

../_images/subscriptionmanager-class.svg

Class diagram for components involved in OET event handling

../_images/subscriptionmanager-sequence.svg

Sequence diagram from OET event handling

Members:

__init__(proxy_factory=<ska_oso_oet.tango.TangoDeviceProxyFactory object>)[source]
register_observer(attr: Attribute, observer)[source]

Register an EventObserver as an observer of a Tango attribute.

Once registered, the EventObserver will be notified of each Tango event published by the attribute.

Parameters:
  • attr – Tango attribute to observe

  • observer – the EventObserver to notify

unregister_observer(attr: Attribute, observer)[source]

Deregister an EventObserver as an observer of a Tango attribute.

Parameters:
  • attr – the observed Tango attribute

  • observer – the EventObserver to unsubscribe

class ska_oso_oet.tango.LocalScanIdGenerator(start=1)[source]

LocalScanIdGenerator is an abstraction of a service that will generate scan IDs as unique integers. Expect scan UID generation to be a database operation or similar in the production implementation.

__init__(start=1)[source]
next()[source]

Get the next scan ID.

Returns:

integer scan ID

property value

Get the current scan ID.

class ska_oso_oet.tango.RemoteScanIdGenerator(hostname)[source]

RemoteScanIdGenerator connects to the skuid service to retrieve IDs

__init__(hostname)[source]
next()[source]

Get the next scan ID.

Returns:

integer scan ID

property value

Get the current scan ID.

class ska_oso_oet.tango.Callback[source]

Callback is an observable that distributes Tango events received by the callback instance to all observers registered at the moment of event reception.

__init__()[source]
notify_observers(evt: tango.EventData)[source]

Distribute an event to all registered observers.

Parameters:

evt – event to distribute

register_observer(observer)[source]

Register an EventObserver.

Once registered, the observer will be notified of all Tango events received by this instance.

Parameters:

observer – observer to register

unregister_observer(observer)[source]

Unregister an EventObserver.

Unsubscribed observers will not receive Tango events subsequently received by this instance.

Parameters:

observer – observer to register

ska_oso_oet.ui

The ska_oso_oet.ui package contains code that present the OET interface to the outside world. In practical terms, this means the OET application’s REST interface

class ska_oso_oet.ui.Message(data: str | dict, type: str | None = None, id: float | int | str | None = None, retry: int | None = None)[source]

Data that is published as a server-sent event.

__init__(data: str | dict, type: str | None = None, id: float | int | str | None = None, retry: int | None = None)[source]

Create a server-sent event.

Parameters:
  • data – The event data.

  • type – An optional event type.

  • id – An optional event ID.

  • retry – An optional integer, to specify the reconnect time for disconnected clients of this stream.

class ska_oso_oet.ui.ServerSentEventsBlueprint(*args: Any, **kwargs: Any)[source]

A flask.Blueprint subclass that knows how to subscribe to pypubsub topics and stream pubsub events as server-sent events.

__init__(*args, mp_context=None, **kwargs)[source]
messages() Generator[Message, None, None][source]

A generator of Message objects created from received pubsub events

ska_oso_oet.ui.create_app(open_api_spec=None)[source]

Returns Flask App using Connexion

ska_oso_oet.ui.get_openapi_spec() Dict[str, Any][source]

Parses and Returns OpenAPI spec