"""
Command execution with state monitoring.
"""
import logging
from typing import Iterable
from ska_oso_scripting.core.iterators import TimeoutIterator
from ska_oso_scripting.core.monitoring.wait import (
WAIT_FOR_TRANSITION_FAILURE_RESPONSE,
DeviceStateTypes,
wait_for_transition,
)
from ska_oso_scripting.core.tango import Attribute, Command, tangoclient
from ska_oso_scripting.exceptions import DeviceControlError
LOGGER = logging.getLogger(__name__)
class ValueTransitionError(DeviceControlError):
"""
Exception raised when unexpected device state transition is encountered.
"""
def __init__(
self,
value,
expected_value,
attribute,
msg="Unexpected value transition",
):
super().__init__(msg)
self.msg = msg
self.value = value
self.expected_value = expected_value
self.attribute = attribute
def __str__(self):
return (
f"{self.msg}: Expected {self.attribute} to transition to"
f" {self.expected_value} but instead received {self.value}"
)
[docs]
def call_and_wait_for_transition(
command: Command,
target_transitions: Iterable[DeviceStateTypes],
attribute_name: str,
device_to_monitor: str = None,
optional_transitions: Iterable[DeviceStateTypes] = None,
timeout: float = None,
):
"""
Send a command and block until attribute value has transitioned to/through the
requested target state(s).
:param command: command to execute
:param target_transitions: happy path transitions
:param attribute_name: attribute which is monitored
:param device_to_monitor: optional device for attribute monitoring, if None the device defined
in command.device is used.
:param optional_transitions: optional states that are not the target state but should not be
counted as error states either
:param timeout: custom timeout provided while execution of command's
:return: Command response
:raises EventTimeoutError: if timeout triggered before state transition occurs
"""
if device_to_monitor is None:
device_to_monitor = command.device
attribute = Attribute(device_to_monitor, attribute_name)
response = execute_command_and_wait_for_transition(
command=command,
target_transitions=target_transitions,
attribute=attribute,
optional_transitions=optional_transitions,
timeout=timeout,
)
return response
[docs]
def execute_command_and_wait_for_transition(
command: Command,
target_transitions: Iterable[DeviceStateTypes],
attribute: Attribute,
optional_transitions: Iterable[DeviceStateTypes] = None,
timeout: float = None,
):
"""
Send a command and block until attribute value has transitioned to/through the
requested target state(s).
:param command: command to execute
:param target_transitions: happy path transitions
:param attribute: attribute which is monitored
:param timeout: custom timeout provided while execution of command's
if systems do not respond within reasonable timescales then method raised EventTimeoutError.
:param optional_transitions: optional states that are not the target state but should not be
counted as error states either
:return: Command response
:raises EventTimeoutError: if timeout triggered before state transition occurs
"""
# create the stream first so that we start capturing events before the
# command executes - but do NOT start iterating over it yet!
with tangoclient().events_stream([attribute]) as raw_stream:
response = tangoclient().execute(command)
# allow at most timeout secs between events. could change to
# mode='absolute' if we want to constrain total duration
with TimeoutIterator(raw_stream, timeout, mode="idle") as timeout_stream:
for target_value in target_transitions:
value_response = wait_for_transition(
event_stream=timeout_stream,
attribute=attribute,
target_transition=target_value,
optional_transitions=optional_transitions,
)
if value_response.response_msg == WAIT_FOR_TRANSITION_FAILURE_RESPONSE:
raise ValueTransitionError(
value_response.final_value, target_value, attribute
)
return response