"""Module containing the fanned out command actions."""
import json
import logging
from typing import Callable, Optional
import tango
from ska_control_model import AdminMode, ResultCode, TaskStatus
from ska_mid_dish_dcp_lib.device.b5dc_device_mappings import B5dcFrequency
from ska_mid_dish_manager.models.action_handlers import (
Action,
ActionHandler,
SequentialActionHandler,
)
from ska_mid_dish_manager.models.constants import (
DEFAULT_ACTION_TIMEOUT_S,
DSC_MIN_POWER_LIMIT_KW,
OPERATOR_TAG,
)
from ska_mid_dish_manager.models.dish_enums import (
Band,
DishMode,
DSOperatingMode,
DSPowerState,
IndexerPosition,
PointingState,
SPFOperatingMode,
SPFRxOperatingMode,
)
from ska_mid_dish_manager.models.fanned_out_command import (
DishManagerCMMethod,
DishManagerCMMethodResultCode,
FannedOutTangoCommand,
FannedOutTangoLongRunningCommand,
)
from ska_mid_dish_manager.utils.action_helpers import (
update_task_status,
)
# -------------------------
# Concrete Actions
# -------------------------
[docs]class SetStandbyLPModeAction(Action):
"""Transition the dish to STANDBY_LP mode."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
self.spf_command = FannedOutTangoCommand(
logger=self.logger,
device="SPF",
command_name="SetStandbyLPMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["SPF"],
command_argument=None,
awaited_component_state={"operatingmode": SPFOperatingMode.STANDBY_LP},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("SPF"),
)
self.spfrx_command = FannedOutTangoCommand(
logger=self.logger,
device="SPFRX",
command_name="SetStandbyMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["SPFRX"],
command_argument=None,
awaited_component_state={"operatingmode": SPFRxOperatingMode.STANDBY},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("SPFRX"),
)
self.ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="SetStandbyMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
command_argument=None,
awaited_component_state={
"operatingmode": DSOperatingMode.STANDBY,
"powerstate": DSPowerState.LOW_POWER,
},
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
self.logger,
"SetStandbyLPMode",
[self.spf_command, self.spfrx_command, self.ds_command],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
awaited_component_state={"dishmode": DishMode.STANDBY_LP},
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=self.timeout_s,
)
[docs] def execute(self, task_callback, task_abort_event, completed_response_msg: str = ""):
if not self.dish_manager_cm.is_device_ignored("SPFRX"):
spfrx_cm = self.dish_manager_cm.sub_component_managers["SPFRX"]
if spfrx_cm._component_state["adminmode"] == AdminMode.ENGINEERING:
try:
spfrx_cm.write_attribute_value("adminmode", AdminMode.ONLINE)
except tango.DevFailed:
self.handler._trigger_failure(
task_callback,
task_abort_event,
"Failed to transition SPFRx from AdminMode ENGINEERING to ONLINE",
)
return
return super().execute(task_callback, task_abort_event, completed_response_msg)
[docs]class SetStandbyFPModeAction(Action):
"""Transition the dish to STANDBY_FP mode."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
ds_set_standby_mode = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="SetStandbyMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={"operatingmode": DSOperatingMode.STANDBY},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("DS"),
)
# Action to set the power mode of DS to Full Power
dsc_power_limit = dish_manager_cm._component_state.get(
"dscpowerlimitkw", DSC_MIN_POWER_LIMIT_KW
)
ds_set_full_power_mode = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="SetPowerMode",
command_argument=[False, dsc_power_limit],
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={"powerstate": DSPowerState.FULL_POWER},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("DS"),
)
spf_set_operate_mode = FannedOutTangoCommand(
logger=self.logger,
device="SPF",
command_name="SetOperateMode",
command_argument=None,
device_component_manager=self.dish_manager_cm.sub_component_managers["SPF"],
awaited_component_state={"operatingmode": SPFOperatingMode.OPERATE},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("SPF"),
)
self._handler = ActionHandler(
self.logger,
"SetStandbyFPMode",
[ds_set_standby_mode, ds_set_full_power_mode, spf_set_operate_mode],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
awaited_component_state={"dishmode": DishMode.STANDBY_FP},
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=self.timeout_s,
)
[docs]class SetOperateModeAction(Action):
"""Transition the dish to OPERATE mode."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
spf_command = FannedOutTangoCommand(
logger=self.logger,
device="SPF",
command_name="SetOperateMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["SPF"],
awaited_component_state={"operatingmode": SPFOperatingMode.OPERATE},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("SPF"),
)
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="SetPointMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={"operatingmode": DSOperatingMode.POINT},
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
self.logger,
"SetOperateMode",
[spf_command, ds_command],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
awaited_component_state={"dishmode": DishMode.OPERATE},
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=self.timeout_s,
)
[docs] def execute(self, task_callback, task_abort_event, completed_response_msg: str = ""):
if self.dish_manager_cm._component_state["configuredband"] in [Band.NONE, Band.UNKNOWN]:
self.handler._trigger_failure(
task_callback,
task_abort_event,
"No configured band: SetOperateMode execution not allowed",
task_status=TaskStatus.REJECTED,
result_code=ResultCode.NOT_ALLOWED,
)
return
return super().execute(task_callback, task_abort_event, completed_response_msg)
[docs]class SetMaintenanceModeAction(Action):
"""Transition the dish to MAINTENANCE mode."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
# This is not a FannedOutTangoLongRunningCommand because DSManager bypasses the lrcQueue
# and does not return the command ID in its response.
ds_command = FannedOutTangoCommand(
logger=self.logger,
device="DS",
command_name="Stow",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={"operatingmode": DSOperatingMode.STOW},
progress_callback=self._progress_callback,
)
spfrx_command = FannedOutTangoCommand(
logger=self.logger,
device="SPFRX",
command_name="SetStandbyMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["SPFRX"],
awaited_component_state={"operatingmode": SPFRxOperatingMode.STANDBY},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("SPFRX"),
)
spf_command = FannedOutTangoCommand(
logger=self.logger,
device="SPF",
command_name="SetMaintenanceMode",
device_component_manager=self.dish_manager_cm.sub_component_managers["SPF"],
awaited_component_state={"operatingmode": SPFOperatingMode.MAINTENANCE},
progress_callback=self._progress_callback,
is_device_ignored=self.dish_manager_cm.is_device_ignored("SPF"),
)
self._handler = ActionHandler(
self.logger,
"SetMaintenanceMode",
[ds_command, spfrx_command, spf_command],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
awaited_component_state={"dishmode": DishMode.STOW},
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=timeout_s,
)
[docs] def execute(self, task_callback, task_abort_event, completed_response_msg: str = ""):
if not self.dish_manager_cm.is_device_ignored("SPFRX"):
# TODO: Wait for the SPFRx to implement maintenance mode
self.logger.debug("Nothing done on SPFRx, awaiting implementation on it.")
return super().execute(task_callback, task_abort_event, completed_response_msg)
[docs]class TrackAction(Action):
"""Transition the dish to Track mode."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
""":param logger: Logger instance.
:type logger: logging.Logger
:param dish_manager_cm: The DishManagerComponentManager instance.
:type dish_manager_cm: DishManagerComponentManager
:param action_on_success: Optional Action to execute automatically if this action succeeds.
:type action_on_success: Optional[Action]
:param action_on_failure: Optional Action to execute automatically if this action fails.
:type action_on_failure: Optional[Action]
:param waiting_callback: Optional callback executed periodically while waiting on commands.
:type waiting_callback: Optional[Callable]
"""
super().__init__(
logger,
dish_manager_cm,
0, # no timeout for the track action since there is no awaited_component_state
action_on_success,
action_on_failure,
waiting_callback,
)
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="Track",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
self.logger,
"Track",
[ds_command],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
)
self.completed_message = (
"Track command has been executed on DS. "
"Monitor the achievedTargetLock attribute to determine when the dish is on source."
)
[docs] def execute(self, task_callback, task_abort_event, completed_response_msg: str = ""):
return super().execute(
task_callback, task_abort_event, completed_response_msg=self.completed_message
)
[docs]class TrackStopAction(Action):
"""Stop Tracking."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="TrackStop",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={"pointingstate": PointingState.READY},
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
self.logger,
"TrackStop",
[ds_command],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
awaited_component_state={"pointingstate": PointingState.READY},
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=timeout_s,
)
[docs]def apply_pointing_model(
band_param_name: str,
band_name: str,
task_callback,
logger: logging.Logger,
dish_manager_cm,
):
"""Apply pointing model parameters for a given band if they exist and are not all zeros.
Args:
band_param_name: The key in component_state for this band.
band_name: Name/identifier of the band (for logging).
task_callback: Callback to update task status on failure.
logger: Logger instance for info/debug/error messages.
dish_manager_cm: Component manager with component_state and update_pointing_model_params().
Returns:
TaskStatus.FAILED and error string if an error occurs, otherwise None.
"""
try:
values = dish_manager_cm.component_state[band_param_name]
if values:
dish_manager_cm.update_pointing_model_params(band_param_name, values)
logger.info(
f"Pointing model for band {band_name} applied successfully", extra=OPERATOR_TAG
)
else:
logger.info(
f"Skipped applying pointing model for band {band_name} due to invalid params: []",
extra=OPERATOR_TAG,
)
except (tango.DevFailed, ValueError) as err:
logger.error(
f"Failed to apply pointing model for band {band_name}: {err}", extra=OPERATOR_TAG
)
update_task_status(
task_callback,
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "Apply pointing model failed"),
)
return TaskStatus.FAILED, str(err)
[docs]class SlewAction(Action):
"""Slew the dish to the specified target coordinates."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
target: list[float],
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
""":param logger: Logger instance.
:type logger: logging.Logger
:param dish_manager_cm: The DishManagerComponentManager instance.
:type dish_manager_cm: DishManagerComponentManager
:target: The target coordinates to slew to.
:type target: list[float]
:param action_on_success: Optional Action to execute automatically if this action succeeds.
:type action_on_success: Optional[Action]
:param action_on_failure: Optional Action to execute automatically if this action fails.
:type action_on_failure: Optional[Action]
:param waiting_callback: Optional callback executed periodically while waiting on commands.
:type waiting_callback: Optional[Callable]
"""
super().__init__(
logger,
dish_manager_cm,
0, # no timeout for the slew action since there is no awaited_component_state
action_on_success,
action_on_failure,
waiting_callback,
)
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="Slew",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
command_argument=target,
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
logger=self.logger,
action_name="Slew",
fanned_out_commands=[ds_command],
component_state=self.dish_manager_cm._component_state,
action_on_success=action_on_success,
action_on_failure=action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
)
self.completed_message = (
f"The DS has been commanded to Slew to {target}. "
"Monitor the pointing attributes for the completion status of the task."
)
[docs] def execute(self, task_callback, task_abort_event, completed_response_msg: str = ""):
return super().execute(
task_callback, task_abort_event, completed_response_msg=self.completed_message
)
[docs]class TrackLoadStaticOffAction(Action):
"""TrackLoadStaticOff action."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
off_xel,
off_el,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="TrackLoadStaticOff",
device_component_manager=dish_manager_cm.sub_component_managers["DS"],
command_argument=[off_xel, off_el],
awaited_component_state={
"actstaticoffsetvaluexel": off_xel,
"actstaticoffsetvalueel": off_el,
},
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
logger=self.logger,
action_name="TrackLoadStaticOff",
fanned_out_commands=[ds_command],
component_state=self.dish_manager_cm._component_state,
awaited_component_state={
"actstaticoffsetvaluexel": off_xel,
"actstaticoffsetvalueel": off_el,
},
action_on_success=action_on_success,
action_on_failure=action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=timeout_s,
)
[docs]class AbortScanSequence(Action):
"""Sequence to stop dish movement.
Reset the track table and then clear the scan_id
"""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
self.completed_message = "Abort sequence completed."
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="TrackStop",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={"pointingstate": PointingState.READY},
progress_callback=self._progress_callback,
timeout_s=DEFAULT_ACTION_TIMEOUT_S,
)
reset_track_table = DishManagerCMMethodResultCode(
logger,
dish_manager_cm.reset_track_table,
self.dish_manager_cm._component_state,
timeout_s=DEFAULT_ACTION_TIMEOUT_S,
)
reset_scan_id = DishManagerCMMethod(
logger,
dish_manager_cm._update_component_state,
self.dish_manager_cm._component_state,
command_kwargs={"scanid": ""},
timeout_s=DEFAULT_ACTION_TIMEOUT_S,
)
self._handler = SequentialActionHandler(
logger=self.logger,
action_name="Abort LRC Tasks",
fanned_out_commands=[reset_scan_id, reset_track_table, ds_command],
component_state=self.dish_manager_cm._component_state,
awaited_component_state={},
action_on_success=action_on_success,
action_on_failure=action_on_failure,
waiting_callback=waiting_callback,
progress_callback=self._progress_callback,
timeout_s=timeout_s,
)
[docs]class InterlockAckAction(Action):
"""Acknowledge Interlock on DSC."""
[docs] def __init__(
self,
logger: logging.Logger,
dish_manager_cm,
timeout_s: float = DEFAULT_ACTION_TIMEOUT_S,
action_on_success: Optional["Action"] = None,
action_on_failure: Optional["Action"] = None,
waiting_callback: Optional[Callable] = None,
):
super().__init__(
logger,
dish_manager_cm,
timeout_s,
action_on_success,
action_on_failure,
waiting_callback,
)
ds_command = FannedOutTangoLongRunningCommand(
logger=self.logger,
device="DS",
command_name="InterLockAck",
device_component_manager=self.dish_manager_cm.sub_component_managers["DS"],
awaited_component_state={},
progress_callback=self._progress_callback,
)
self._handler = ActionHandler(
self.logger,
"InterLockAck",
[ds_command],
# use _dish_manager_cm._component_state to pass the dict by reference
# _dish_manager_cm.component_state will use the tango base property which will do a
# deep copy
component_state=self.dish_manager_cm._component_state,
awaited_component_state={},
action_on_success=self.action_on_success,
action_on_failure=self.action_on_failure,
waiting_callback=self.waiting_callback,
progress_callback=self._progress_callback,
timeout_s=timeout_s,
)