Source code for ska_oso_scripting.core.execution

"""
Command execution with state monitoring.
"""
import logging
from typing import Iterable

from ska_oso_scripting.core.iterators import TimeoutIterator
from ska_oso_scripting.core.monitoring.wait import (
    WAIT_FOR_TRANSITION_FAILURE_RESPONSE,
    DeviceStateTypes,
    wait_for_transition,
)
from ska_oso_scripting.core.tango import Attribute, Command, tangoclient
from ska_oso_scripting.exceptions import DeviceControlError

LOGGER = logging.getLogger(__name__)


class ValueTransitionError(DeviceControlError):
    """
    Exception raised when unexpected device state transition is encountered.
    """

    def __init__(
        self,
        value,
        expected_value,
        attribute,
        msg="Unexpected value transition",
    ):
        super().__init__(msg)
        self.msg = msg
        self.value = value
        self.expected_value = expected_value
        self.attribute = attribute

    def __str__(self):
        return (
            f"{self.msg}: Expected {self.attribute} to transition to"
            f" {self.expected_value} but instead received {self.value}"
        )


[docs] def call_and_wait_for_transition( command: Command, target_transitions: Iterable[DeviceStateTypes], attribute_name: str, device_to_monitor: str = None, optional_transitions: Iterable[DeviceStateTypes] = None, timeout: float = None, ): """ Send a command and block until attribute value has transitioned to/through the requested target state(s). :param command: command to execute :param target_transitions: happy path transitions :param attribute_name: attribute which is monitored :param device_to_monitor: optional device for attribute monitoring, if None the device defined in command.device is used. :param optional_transitions: optional states that are not the target state but should not be counted as error states either :param timeout: custom timeout provided while execution of command's :return: Command response :raises EventTimeoutError: if timeout triggered before state transition occurs """ if device_to_monitor is None: device_to_monitor = command.device attribute = Attribute(device_to_monitor, attribute_name) response = execute_command_and_wait_for_transition( command=command, target_transitions=target_transitions, attribute=attribute, optional_transitions=optional_transitions, timeout=timeout, ) return response
[docs] def execute_command_and_wait_for_transition( command: Command, target_transitions: Iterable[DeviceStateTypes], attribute: Attribute, optional_transitions: Iterable[DeviceStateTypes] = None, timeout: float = None, ): """ Send a command and block until attribute value has transitioned to/through the requested target state(s). :param command: command to execute :param target_transitions: happy path transitions :param attribute: attribute which is monitored :param timeout: custom timeout provided while execution of command's if systems do not respond within reasonable timescales then method raised EventTimeoutError. :param optional_transitions: optional states that are not the target state but should not be counted as error states either :return: Command response :raises EventTimeoutError: if timeout triggered before state transition occurs """ # create the stream first so that we start capturing events before the # command executes - but do NOT start iterating over it yet! with tangoclient().events_stream([attribute]) as raw_stream: response = tangoclient().execute(command) # allow at most timeout secs between events. could change to # mode='absolute' if we want to constrain total duration with TimeoutIterator(raw_stream, timeout, mode="idle") as timeout_stream: for target_value in target_transitions: value_response = wait_for_transition( event_stream=timeout_stream, attribute=attribute, target_transition=target_value, optional_transitions=optional_transitions, ) if value_response.response_msg == WAIT_FOR_TRANSITION_FAILURE_RESPONSE: raise ValueTransitionError( value_response.final_value, target_value, attribute ) return response