# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class file for tracking commands."""
from __future__ import annotations
__all__ = ["CommandTracker"]
import json
import logging
import re
from typing import Any, Callable, Dict, Optional
from ska_control_model import ObsState
from ska_pst.lmc.device_proxy import PstDeviceProxy
from ska_tango_testing.mock.tango import MockTangoEventCallbackGroup
from tango import DevFailed
from .tango import TangoChangeEventHelper, TangoDeviceCommandChecker
[docs]class CommandTracker:
"""Class to track the progress and results of commands on a PstDeviceProxy.
This class also uses the `TangoDeviceCommandChecker` which is more low
level to check for updates of the long running process values. This
provides a high level view of a command. It also will record if the
commands fail and what was the state of the device proxy before the command
was executed.
"""
def __init__(
self: CommandTracker,
device_proxy: PstDeviceProxy,
change_event_callbacks: MockTangoEventCallbackGroup,
logger: logging.Logger | None = None,
default_timeout: float = 60.0,
) -> None:
"""
Create an instance of the command tracker.
:param device_proxy: the device proxy to track commands against.
:type device_proxy: PstDeviceProxy
:param change_event_callbacks: the ska-tango-testutils helper to assert change
events against.
:type change_event_callbacks: MockTangoEventCallbackGroup
:param logger: the logger to use for the instance, defaults to None
:type logger: logging.Logger | None, optional
:param default_timeout: the default timeout for a command to complete, defaults to 60.0
(i.e. 1 minute)
:type default_timeout: float, optional
"""
# need the tango classes here
self.logger = logger or logging.getLogger(__name__)
self.device_proxy = device_proxy
self.tango_change_event_helper = TangoChangeEventHelper(
device_under_test=self.device_proxy,
change_event_callbacks=change_event_callbacks,
logger=logger,
)
self.tango_device_command_checker = TangoDeviceCommandChecker(
tango_change_event_helper=self.tango_change_event_helper,
change_event_callbacks=change_event_callbacks,
logger=logger,
)
self.prev_command_err: Optional[Exception] = None
self.prev_obs_state: ObsState = ObsState.IDLE
self.prev_command: str = ""
self._command_dict: Dict[str, Callable[..., None]] = {
"ConfigureScan": self._configure_scan,
"GoToIdle": self._goto_idle,
"Scan": self._scan,
"Abort": self._abort,
"EndScan": self._end_scan,
"GoToFault": self._goto_fault,
"ObsReset": self._obsreset,
"Reset": self._reset,
"On": self._on,
"Off": self._off,
}
self.default_timeout = default_timeout
[docs] def teardown(self: CommandTracker) -> None:
"""Teardown the command tracker.
This releases all of the subscriptions and change events used by the tracker.
"""
self.tango_change_event_helper.release()
[docs] def assert_previous_command_rejected(self: CommandTracker) -> None:
"""Assert previous command was rejected due to invalid state."""
self.assert_previous_command_error_message_matches(
(
f"ska_tango_base.faults.StateModelError: {self.prev_command} command "
f"not permitted in observation state {self.prev_obs_state.name}"
)
)
[docs] def assert_previous_command_failed(self: CommandTracker) -> None:
"""Assert previous command failed."""
assert self.prev_command_err is not None, "previous command error is not initialised"
[docs] def assert_previous_command_error_message_matches(
self: CommandTracker, expected_message_regexp: str
) -> None:
"""Assert previous command error message is equal to the argument."""
assert self.prev_command_err is not None, "previous command error is not initialised"
if isinstance(self.prev_command_err, DevFailed):
got_message = self.prev_command_err.args[0].desc.strip()
else:
assert self.tango_device_command_checker.prev_command_result is not None
got_message = self.tango_device_command_checker.prev_command_result.result
self.logger.info(f"Got error msg: '{got_message}'")
self.logger.info(f"Expected error msg: '{expected_message_regexp}'")
regexp = re.compile(expected_message_regexp)
assert (
regexp.search(got_message) is not None
), f"error message '{got_message}' doesn't match expected message '{expected_message_regexp}'"
def _configure_scan(
self: CommandTracker,
scan_configuration: dict,
**kwargs: Any,
) -> None:
"""Perform a ConfigureScan request on device proxy."""
self.tango_device_command_checker.assert_command(
command="ConfigureScan",
command_args=(json.dumps(scan_configuration),),
expected_obs_state_events=[
ObsState.CONFIGURING,
ObsState.READY,
],
**kwargs,
)
def _goto_idle(
self: CommandTracker,
**kwargs: Any,
) -> None:
"""Perform a GoToIdle request on device proxy."""
self.tango_device_command_checker.assert_command(
command="GoToIdle",
expected_obs_state_events=[
ObsState.IDLE,
],
**kwargs,
)
def _scan(self: CommandTracker, scan_id: int, start_time: str | None = None, **kwargs: Any) -> None:
"""
Perform a Scan request on device proxy.
:param scan_id: the ID of the scan
:type scan_id: int
:param start_time: the optional start time at which the PST system should start recording data,
defaults to None
:type start_time: str | None, optional
"""
scan_request: dict[str, Any] = {
"scan_id": scan_id,
}
if start_time:
scan_request["start_time"] = start_time
self.tango_device_command_checker.assert_command(
command="Scan",
command_args=(json.dumps(scan_request),),
expected_obs_state_events=[
ObsState.SCANNING,
],
**kwargs,
)
def _end_scan(self: CommandTracker, **kwargs: Any) -> None:
"""Perform an EndScan request on device proxy."""
self.tango_device_command_checker.assert_command(
command="EndScan",
expected_obs_state_events=[
ObsState.READY,
],
**kwargs,
)
def _abort(
self: CommandTracker,
**kwargs: Any,
) -> None:
"""Perform an Abort request on device proxy."""
self.tango_device_command_checker.assert_command(
command="Abort",
expected_obs_state_events=[
ObsState.ABORTING,
ObsState.ABORTED,
],
**kwargs,
)
def _goto_fault(self: CommandTracker, fault_message: str, **kwargs: Any) -> None:
"""Perform a GoToFault request on device proxy."""
self.tango_device_command_checker.assert_command(
command="GoToFault",
command_args=(fault_message,),
expected_obs_state_events=[
ObsState.FAULT,
],
**kwargs,
)
def _obsreset(
self: CommandTracker,
**kwargs: Any,
) -> None:
"""Perform an ObsReset request on device proxy."""
self.tango_device_command_checker.assert_command(
command="ObsReset",
expected_obs_state_events=[
ObsState.RESETTING,
ObsState.IDLE,
],
**kwargs,
)
def _reset(
self: CommandTracker,
**kwargs: Any,
) -> None:
"""Perform an ObsReset request on device proxy."""
self.tango_device_command_checker.assert_command(
command="Reset",
expected_obs_state_events=[
ObsState.RESETTING,
ObsState.IDLE,
],
**kwargs,
)
def _on(
self: CommandTracker,
**kwargs: Any,
) -> None:
"""Perform an On request on device proxy."""
self.tango_device_command_checker.assert_command(
command="On",
expected_obs_state_events=[ObsState.IDLE],
**kwargs,
)
def _off(
self: CommandTracker,
**kwargs: Any,
) -> None:
"""Perform an Off request on device proxy."""
self.tango_device_command_checker.assert_command(
command="Off",
**kwargs,
)