Source code for ska_tango_base.obs.obs_device

# pylint: disable=invalid-name
# -*- coding: utf-8 -*-
#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
SKAObsDevice.

A generic base device for Observations for SKA. It inherits
SKABaseDevice class. Any device implementing an obsMode will inherit
from SKAObsDevice instead of just SKABaseDevice.
"""
from __future__ import annotations

from typing import Any, TypeVar, cast

from ska_control_model import ObsMode, ObsState, ResultCode
from tango.server import attribute

from ..base import BaseComponentManager, SKABaseDevice
from ..commands import DeviceInitCommand

__all__ = ["ObsDeviceComponentManager", "SKAObsDevice", "main"]


# pylint: disable-next=abstract-method
[docs] class ObsDeviceComponentManager(BaseComponentManager): """A stub for an observing device component manager."""
# TODO ComponentManagerT = TypeVar("ComponentManagerT", bound=ObsDeviceComponentManager)
[docs] class SKAObsDevice(SKABaseDevice[ComponentManagerT]): """A generic base device for Observations for SKA.""" def __init__( self: SKAObsDevice[ComponentManagerT], *args: Any, **kwargs: Any, ) -> None: """ Initialise a new instance. :param args: positional arguments. :param kwargs: keyword arguments. """ # This __init__ method is created for type-hinting purposes only. # Tango devices are not supposed to have __init__ methods, # And they have a strange __new__ method, # that calls __init__ when you least expect it. # So don't put anything executable in here # (other than the super() call). self._obs_state: ObsState self._commanded_obs_state: ObsState self._obs_mode: ObsMode self._config_progress: int self._config_delay_expected: int super().__init__(*args, **kwargs)
[docs] class InitCommand(DeviceInitCommand): # pylint: disable=protected-access # command classes are friend classes """A class for the SKAObsDevice's init_device() "command"."""
[docs] def do( self: SKAObsDevice.InitCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Stateless hook for device initialisation. :param args: positional arguments to the command. This command does not take any, so this should be empty. :param kwargs: keyword arguments to the command. This command does not take any, so this should be empty. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ for attribute_name in [ "obsState", "commandedObsState", "obsMode", "configurationProgress", "configurationDelayExpected", ]: self._device.set_change_event(attribute_name, True) self._device.set_archive_event(attribute_name, True) self._device._obs_state = ObsState.EMPTY self._device._commanded_obs_state = ObsState.EMPTY self._device._obs_mode = ObsMode.IDLE self._device._config_progress = 0 self._device._config_delay_expected = 0 message = "SKAObsDevice Init command completed OK" self.logger.info(message) self._completed() return (ResultCode.OK, message)
[docs] def create_component_manager( self: SKAObsDevice[ComponentManagerT], ) -> ComponentManagerT: """ Create and return a component manager for this device. :raises NotImplementedError: because it is not implemented. """ raise NotImplementedError("SKAObsDevice is abstract.")
# ----------------- # Device Properties # ----------------- # --------------- # General methods # --------------- def _update_obs_state( self: SKAObsDevice[ComponentManagerT], obs_state: ObsState ) -> None: """ Perform Tango operations in response to a change in obsState. This helper method is passed to the observation state model as a callback, so that the model can trigger actions in the Tango device. :param obs_state: the new obs_state value """ self._obs_state = obs_state self.push_change_event("obsState", obs_state) self.push_archive_event("obsState", obs_state) def _update_commanded_obs_state( self: SKAObsDevice[ComponentManagerT], commanded_obs_state: ObsState ) -> None: self._commanded_obs_state = commanded_obs_state self.push_change_event("commandedObsState", commanded_obs_state) self.push_archive_event("commandedObsState", commanded_obs_state) # ---------- # Attributes # ---------- @attribute( # type: ignore[misc] # "Untyped decorator makes function untyped" dtype=ObsState ) def obsState(self: SKAObsDevice[ComponentManagerT]) -> ObsState: """ Read the Observation State of the device. :return: the current ObsState enum value """ return self._obs_state @attribute( # type: ignore[misc] # "Untyped decorator makes function untyped" dtype=ObsState ) def commandedObsState(self: SKAObsDevice[ComponentManagerT]) -> ObsState: """ Read the last commanded stable Observation State of the device. Initial value is EMPTY. The only stable (nontransitional) state values it can change to is EMPTY, IDLE, READY or ABORTED following the start of any of the SKASubarray device's long running commands. :return: the commanded ObsState enum value. """ return self._commanded_obs_state @attribute( # type: ignore[misc] # "Untyped decorator makes function untyped" dtype=ObsMode ) def obsMode(self: SKAObsDevice[ComponentManagerT]) -> ObsMode: """ Read the Observation Mode of the device. :return: the current obs_mode value """ return self._obs_mode @attribute( # type: ignore[misc] # "Untyped decorator makes function untyped" dtype="uint16", unit="%", max_value=100, min_value=0, ) def configurationProgress(self: SKAObsDevice[ComponentManagerT]) -> int: """ Read the percentage configuration progress of the device. :return: the percentage configuration progress """ return self._config_progress @attribute( # type: ignore[misc] # "Untyped decorator makes function untyped" dtype="uint16", unit="seconds" ) def configurationDelayExpected(self: SKAObsDevice[ComponentManagerT]) -> int: """ Read the expected Configuration Delay in seconds. :return: the expected configuration delay """ return self._config_delay_expected
# -------- # Commands # -------- # ---------- # Run server # ----------
[docs] def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return cast(int, SKAObsDevice.run_server(args=args or None, **kwargs))
if __name__ == "__main__": main()