ska_oso_scripting.core
Core SKA OSO scripting functionality.
This package provides fundamental utilities for command execution and monitoring of Tango devices in the SKA Observatory Scripting framework.
- ska_oso_scripting.core.call_and_wait_for_transition(command: Command, target_transitions: Iterable[ska_tango_base.control_model.ObsState | tango.DevState], attribute_name: str, device_to_monitor: str | None = None, optional_transitions: Iterable[ska_tango_base.control_model.ObsState | tango.DevState] | None = None, timeout: float | None = None)[source]
Send a command and block until attribute value has transitioned to/through the requested target state(s).
- Parameters:
command – command to execute
target_transitions – happy path transitions
attribute_name – attribute which is monitored
device_to_monitor – optional device for attribute monitoring, if None the device defined in command.device is used.
optional_transitions – optional states that are not the target state but should not be counted as error states either
timeout – custom timeout provided while execution of command’s
- Returns:
Command response
- Raises:
EventTimeoutError – if timeout triggered before state transition occurs
- ska_oso_scripting.core.execute_command_and_wait_for_transition(command: Command, target_transitions: Iterable[ska_tango_base.control_model.ObsState | tango.DevState], attribute: Attribute, optional_transitions: Iterable[ska_tango_base.control_model.ObsState | tango.DevState] | None = None, timeout: float | None = None)[source]
Send a command and block until attribute value has transitioned to/through the requested target state(s).
- Parameters:
command – command to execute
target_transitions – happy path transitions
attribute – attribute which is monitored
timeout – custom timeout provided while execution of command’s if systems do not respond within reasonable timescales then method raised EventTimeoutError.
optional_transitions – optional states that are not the target state but should not be counted as error states either
- Returns:
Command response
- Raises:
EventTimeoutError – if timeout triggered before state transition occurs
ska_oso_scripting.core.iterators
Iterator utilities for event monitoring and timeout handling.
This package provides iterator-based utilities for monitoring and handling events from Tango devices with timeout capabilities and merging multiple event streams.
- class ska_oso_scripting.core.iterators.TimeoutIterator(iterator: Iterator[T], timeout: float | None, mode: Literal['idle', 'absolute'] = 'idle', *, daemon: bool | None = None, cleanup_timeout: float | None = 1.0)[source]
Iterator wrapper that raises EventTimeoutError if no item yielded within timeout.
A background thread pulls items from the wrapped iterator into a queue. The main thread pulls from the queue with a timeout. If no item is available within the timeout period, EventTimeoutError is raised.
Here’s the flow:
Main thread (consumer) calls __next__(), which does a queue.get(timeout=…) on self._queue. That timeout is either per-item (mode=”idle”) or from the first __next__ call (mode=”absolute”).
Worker thread (self._producer_thread, target _pull_from_iterator) is the only thread that interacts with the wrapped iterator. It iterates over items in the wrapped iterator, pushing items into self._queue.
Termination: The worker thread forwards _ITERATOR_DONE (sentinel) or any exception once and then stops. Exceptions raised by the wrapped iterator are passed through to the main thread and re-raised there.
Timeout / stop behaviour: If the main thread times out (queue.Empty), it sets _stop_event and _iteration_complete, attempts to close the wrapped iterator, and raises EventTimeoutError. __exit__ / _cleanup() sets the stop event, closes the iterator, and joins the worker thread with cleanup_timeout.
- Parameters:
iterator – The iterator to wrap
timeout – Timeout in seconds, or None for no timeout
mode – ‘idle’ (timeout resets per item) or ‘absolute’ (timeout from first call)
daemon – Whether the worker thread should be a daemon thread
cleanup_timeout – Timeout for thread cleanup in seconds, or None to wait indefinitely
Example:
iter = TimeoutIterator(iter([1, 2, 3]), timeout=5.0, mode='idle') list(iter) # Raises EventTimeoutError if items don't arrive within 5s
- class ska_oso_scripting.core.iterators.MergedIterator(sources: dict[str, Iterator[T]], *, mode: Literal['any', 'all'] = 'all', daemon: bool | None = None)[source]
Iterator that merges multiple source iterators into a single tagged iterable.
Each source iterator runs in its own worker thread, pulling items and placing them into a shared queue. The main iterator yields items from the queue as they become available, tagged with their source identifier.
- Parameters:
sources – Dictionary mapping source names to their iterators
mode – “any” = stop when ANY source exhausts, “all” = continue until ALL sources exhaust
daemon – Whether worker threads should be daemon threads. If None, threads are always daemon (default behaviour)
- Raises:
ValueError – If sources dict is empty or mode is invalid
- ska_oso_scripting.core.iterators.merge_iterators(sources: dict[str, Iterator[T]], *, mode: Literal['any', 'all'] = 'all', daemon: bool | None = None) Iterator[tuple[str, T]][source]
Merge multiple iterators into a single tagged iterable.
Each source iterator runs in its own worker thread. Items are yielded as (tag, item) tuples where tag is the key from the sources dict.
- Parameters:
sources – Dict mapping tag names to iterators
mode – “any” = stop when ANY source exhausts, “all” = continue until ALL sources exhaust
daemon – Whether worker threads should be daemon threads
- Yields:
(tag, item) tuples from whichever source produces first
- Raises:
Any exception from input iterators (propagated immediately)
- Raises:
ValueError – If sources dict is empty
Example:
events = iter([event1, event2]) timeouts = iter([timeout1]) for tag, item in merge_iterators({"events": events, "timeouts": timeouts}): if tag == "events": handle_event(item) elif tag == "timeouts": handle_timeout(item)
- ska_oso_scripting.core.iterators.timeout_iterator(iterator: Iterator[T], timeout: float | None, mode: Literal['idle', 'absolute'] = 'idle', *, daemon: bool | None = None, cleanup_timeout: float | None = 1.0) Iterator[TimeoutIterator[T]][source]
Context manager for timeout-wrapped iteration.
Provides a convenient way to create a TimeoutIterator with automatic resource management via the context manager protocol.
- Parameters:
iterator – The iterator to wrap
timeout – Timeout in seconds, or None for no timeout
mode – ‘idle’ (timeout resets per item) or ‘absolute’ (timeout from first call)
daemon – Whether the worker thread should be a daemon thread
cleanup_timeout – Timeout for thread cleanup in seconds, or None to wait indefinitely
- Yields:
A TimeoutIterator wrapping the input iterator
Example:
with timeout_iterator(iter([1, 2, 3]), timeout=5.0, mode='idle') as timed: for item in timed: print(item)
- exception ska_oso_scripting.core.iterators.EventTimeoutError(msg='Timeout error occurred while read event from queue', value=None)[source]
Exception raised to unblock function while reading an event from the queue. Subclasses queue.Empty to add custom messages and value.
- class ska_oso_scripting.core.iterators.PubSubIterator(topic: str)[source]
Iterator that subscribes to a PyPubSub topic and yields messages.
Warning
Publishing to this topic from the same thread that is iterating will cause a deadlock. Messages MUST be published from a different thread. If in doubt, wrap this in a MergedIterator, where each iterator is iterated over in a separate thread.
The iterator subscribes to the topic immediately upon creation, meaning messages published before iteration begins are queued and will be yielded when iteration starts. The iterator automatically unsubscribes when iteration stops (via break, exception, or exhaustion).
PyPubSub calls listeners synchronously during sendMessage(), so no background thread is needed - messages are placed in the queue during the publisher’s sendMessage() call.
- Parameters:
topic – The PyPubSub topic name to subscribe to
Example:
for msg in PubSubIterator('user_topics.script.announce'): print(msg) # {'msg_src': 'MainThread', 'msg': 'hello'} if some_condition(msg): break # Automatically unsubscribes
- ska_oso_scripting.core.iterators.pubsub_iterator(topic: str) PubSubIterator[source]
Convenience function to create a PubSubIterator.
- Parameters:
topic – The PyPubSub topic name to subscribe to
- Returns:
A new PubSubIterator instance
Example:
for msg in pubsub_iterator('user_topics.script.announce'): print(msg)
ska_oso_scripting.core.monitoring
Pure monitoring utilities for observing Tango device state without controlling devices.
This package provides functions to wait for device attribute values and transitions. These are observation utilities only - they do not execute commands or control devices.
- ska_oso_scripting.core.monitoring.wait_for_value(event_stream: ~typing.Iterator[tuple[tango.EventData, ~ska_oso_scripting.core.tango.client.Attribute]], target_values: ~typing.Iterable[~typing.Any], key=<function <lambda>>) Any[source]
Block until a Tango device attribute has reached one of target values.
This function requires a subscription to have been established to the target attribute before calling this function. Subscribing is not handled within this function in case multiple reads of the same attribute are required, when we would NOT want to subscribe/unsubscribe after each event.
If defined, the optional ‘key’ function will be used to process the device attribute value before comparison to the target value.
- Parameters:
event_stream – iterable of attribute change events
target_values – target value to wait for
key – function to process each attribute value before comparison
- Returns:
Attribute value read from device (one of target_values)
- ska_oso_scripting.core.monitoring.wait_for_transition(event_stream: Iterator[tuple[tango.EventData, Attribute]], attribute: Attribute, target_transition: ska_tango_base.control_model.ObsState | tango.DevState, optional_transitions: Iterable[ska_tango_base.control_model.ObsState | tango.DevState] | None = None) WaitForTransitionResponse[source]
Block until a Tango attribute has reached a target enumeration value.
An error response will be returned if the value does not match the target value.
This function is intended to be used to monitor enumerated device states such as telescopeState and ObsState, which emit a change event when the device transitions to a new state.
- Parameters:
event_stream – iterable of attribute change events
attribute – attribute to monitor
target_transition – target state to wait for
optional_transitions – optional states that are not the target state but should not be counted as error states either
- Returns:
WaitForTransitionResponse with status and final value
ska_oso_scripting.core.tango
Tango-specific infrastructure for device control.
- class ska_oso_scripting.core.tango.Attribute(device: str, name: str)[source]
An abstraction of a Tango attribute.
- Parameters:
device – the FQDN of the target Tango device
name – the name of the attribute to read
- static from_trl(trl: str) Attribute[source]
Creates an Attribute instance from a given TRL (Tango Resource Locator) string.
The attribute component of the input TRL is mandatory. Put another way, the component after the final ‘/’ will ALWAYS be interpreted as an attribute specifier and not part of the device name.
- Parameters:
trl – the TRL string to parse
- Raises:
ValueError – if the TRL is invalid.
- class ska_oso_scripting.core.tango.Command(device: str, command_name: str, *args)[source]
An abstraction of a Tango command.
- class ska_oso_scripting.core.tango.TangoClient(proxy_factory=<ska_oso_scripting.core.tango.client.TangoDeviceProxyFactory object>)[source]
TangoClient is the proxy between calling code and Tango devices. It accepts encapsulated Tango interactions and performs them on behalf of the calling code.
- execute(command: Command, **kwargs)[source]
Execute a Command on a Tango device.
Additional kwargs to the DeviceProxy can be specified if required.
- Parameters:
command – the command to execute
- Returns:
the response, if any, returned by the Tango device
- read(attribute: Attribute)[source]
Perform a blocking attribute read on a Tango device.
- Parameters:
attribute – the attribute to read
- Returns:
the attribute value
- read_event(attr: Attribute, timeout: float | None = None) tango.EventData[source]
DEPRECATED.
Use TangoClient.events_stream() instead.
- events_stream(attributes: list[Attribute]) AbstractContextManager[Iterator[tuple[tango.EventData, Attribute]]][source]
Create a merged event stream from multiple subscribed attributes.
Multiple concurrent streams are supported. Each stream receives events for its subscribed attributes independently.
The stream yields (event, attribute) tuples and continues indefinitely until the context manager exits. This is the primary API for consuming Tango events in a streaming fashion.
Example usage:
attr1 = Attribute("device/1", "state") attr2 = Attribute("device/2", "state") with client.events_stream([attr1, attr2]) as stream: for event, attr in stream: print(f"Event from {attr.device}/{attr.name}")
- Parameters:
attributes – List of Attributes to subscribe to and stream
- Returns:
Context manager that yields an iterator over (event, attribute) tuples
- class ska_oso_scripting.core.tango.TangoDeviceProxyFactory(default_source: DevSource | None = None, default_timeout_millis: int = 10000)[source]
A call to create Tango DeviceProxy clients. This class exists to allow unit tests to override the factory with an implementation that returns mock DeviceProxy instances.
- class ska_oso_scripting.core.tango.TangoRegistry[source]
Registry used to look up Tango Resource Locators (TRLs).
This is a simple class used to decouple TangoClient from TRLs. At some point, this class could change to do something more complicated, e.g., perform database lookups, or perhaps the TRLs might be specified on the domain classes, but for now these are simple string formatting methods.
- static get_subarray_node(subarray_id: int)[source]
Get the TRL of the Subarray appropriate to the object.
- static get_csp_subarray_leaf_node(subarray_id: int)[source]
Get the TRL of a CSP Subarray Leaf Node.
@param subarray_id: The subarray ID.
- static get_mccs_subarray_leaf_node(subarray_id: int) str[source]
Get the TRL for a TMC MCCS Subarray Leaf Node.
- Parameters:
subarray_id – Subarray ID
- Returns:
full TRL for the TMC MCCS Subarray Leaf Node
- static get_subarray_quality_monitor(subarray_id: int) str[source]
Get the TRL of the SubarrayQualityMonitor device.
- Parameters:
subarray_id – The subarray ID.
- ska_oso_scripting.core.tango.tangoclient() TangoClient[source]
Get the current TangoClient.
This function returns the client from the current context, or the default client if no context is set. This allows tests to inject mock clients using client_var.set() for testing without affecting production code.
- Returns:
The current client (either context-local or default TangoClient)