Source code for ska_pst.lmc.component.component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides the base Component Manager for PST.LMC."""

from __future__ import annotations

import logging
from threading import Event
from typing import Any, Generic, Optional, Tuple, TypeVar, cast

from overrides import EnforceOverrides, final, override
from ska_control_model import (
    CommunicationStatus,
    LoggingLevel,
    ObsState,
    PowerState,
    ResultCode,
    SimulationMode,
)
from ska_csp_lmc_base import CspObsComponentManager
from ska_pst.lmc.component.pst_device_interface import PstDeviceInterface
from ska_pst.lmc.util.callback import Callback, callback_safely
from ska_tango_base.base import check_communicating
from ska_tango_base.executor import TaskExecutorComponentManager, TaskStatus

__all__ = [
    "PstComponentManager",
    "TaskResponse",
]


TaskResponse = Tuple[TaskStatus, str]

DeviceInterface = TypeVar("DeviceInterface", bound=PstDeviceInterface)


[docs]class PstComponentManager( CspObsComponentManager, TaskExecutorComponentManager, EnforceOverrides, Generic[DeviceInterface] ): """ Base Component Manager for the PST.LMC. subsystem. This base class is used to provide the common functionality of the PST TANGO components, such as providing the communication with processes that are running (i.e. RECV, DSP, or SMRB). This class also helps abstract away calling out to whether we're using a simulated process or a real subprocess. This component manager extends from the :py:class:`CspObsComponentManager`. For more details about this check the `CSP obs component manager <https://developer.skao.int/projects/ska-tango-base/en/latest/api/csp/obs/component_manager.html>`_ docs. """ _simulation_mode: SimulationMode def __init__( self: PstComponentManager, *, device_interface: DeviceInterface, simulation_mode: SimulationMode = SimulationMode.TRUE, logger: logging.Logger | None = None, **kwargs: Any, ) -> None: """ Initialise instance of the component manager. :param device_interface: an interface view the TANGO device that the component manager is for. This is used so in testing a normal unit doesn't need to use the TANGO testing infrastructure. :type device_interface: DeviceInterface :param simulation_mode: enum to track if component should be in simulation mode or not, defaults to SimulationMode.TRUE :type simulation_mode: SimulationMode, optional :param logger: a logger for this object to use, defaults to None :type logger: logging.Logger | None, optional """ logger = logger or logging.getLogger(__name__) self._device_interface = device_interface self._attribute_value_updated_callback = device_interface.handle_attribute_value_update self._simulation_mode = simulation_mode self._scan_id = 0 self._config_id = "" super().__init__( logger=logger, communication_state_callback=device_interface.handle_communication_state_change, component_state_callback=device_interface.handle_component_state_change, **kwargs, ) @property def beam_id(self: PstComponentManager) -> int: """ Return the beam id for the current component. This value is set during the construction of the component manager, and is injected from the `DeviceID` property of the TANGO device. """ return cast(PstDeviceInterface, self._device_interface).beam_id @property def device_name(self: PstComponentManager) -> str: """Get the name of the current device.""" return cast(PstDeviceInterface, self._device_interface).device_name @property def obs_state(self: PstComponentManager) -> ObsState: """Get the observation state for the current device.""" return cast(PstDeviceInterface, self._device_interface).obs_state @override def start_communicating(self: PstComponentManager) -> None: """ Establish communication with the component, then start monitoring. This is the place to do things like: * Initiate a connection to the component (if your communication is connection-oriented) * Subscribe to component events (if using "pull" model) * Start a polling loop to monitor the component (if using a "push" model) """ if self._communication_state == CommunicationStatus.ESTABLISHED: return if self._communication_state == CommunicationStatus.DISABLED: self._handle_communication_state_change(CommunicationStatus.NOT_ESTABLISHED) @override def stop_communicating(self: PstComponentManager) -> None: """ Cease monitoring the component, and break off all communication with it. For example, * If you are communicating over a connection, disconnect. * If you have subscribed to events, unsubscribe. * If you are running a polling loop, stop it. """ if self._communication_state == CommunicationStatus.DISABLED: return self._handle_communication_state_change(CommunicationStatus.DISABLED) def _handle_communication_state_change( self: PstComponentManager, communication_state: CommunicationStatus ) -> None: raise NotImplementedError("PstComponentManager is abstract.") @property def simulation_mode(self: PstComponentManager) -> SimulationMode: """ Get value of simulation mode state. :returns: current simulation mode state. """ return self._simulation_mode @simulation_mode.setter def simulation_mode(self: PstComponentManager, simulation_mode: SimulationMode) -> None: """ Set simulation mode state. :param simulation_mode: the new simulation mode value. :type simulation_mode: :py:class:`SimulationMode` """ if self._simulation_mode != simulation_mode: self._simulation_mode = simulation_mode self._simulation_mode_changed() def _simulation_mode_changed(self: PstComponentManager) -> None: """ Handle change of simulation mode. Default implementation of this is to do nothing. It is up to the individual devices to handle what it means when the simulation mode changes. """ @property def config_id(self: PstComponentManager) -> str: """Return the configuration id.""" return self._config_id @config_id.setter def config_id(self: PstComponentManager, config_id: str) -> None: """Set the configuration id.""" self._config_id = config_id @property def scan_id(self: PstComponentManager) -> int: """Return the scan id.""" return self._scan_id @scan_id.setter def scan_id(self: PstComponentManager, scan_id: int) -> None: """Set the scan id.""" self._scan_id = scan_id # --------------- # Commands # --------------- @check_communicating @override def off(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Turn the component off. :param task_callback: callback to be called when the status of the command changes """ def _task( *args: Any, task_callback: Callback = None, task_abort_event: Optional[Event] = None, **kwargs: Any, ) -> None: callback_safely(task_callback, status=TaskStatus.IN_PROGRESS) self._update_component_state(power=PowerState.OFF) callback_safely( task_callback, status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully") ) return self.submit_task(_task, task_callback=task_callback) @check_communicating @override def standby(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Put the component into low-power standby mode. :param task_callback: callback to be called when the status of the command changes """ def _task( *args: Any, task_callback: Callback = None, task_abort_event: Optional[Event] = None, **kwargs: Any, ) -> None: callback_safely(task_callback, status=TaskStatus.IN_PROGRESS) self._update_component_state(power=PowerState.STANDBY) callback_safely( task_callback, status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully") ) return self.submit_task(_task, task_callback=task_callback) @check_communicating @override def on(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Turn the component on. :param task_callback: callback to be called when the status of the command changes """ def _task( *args: Any, task_callback: Callback = None, task_abort_event: Optional[Event] = None, **kwargs: Any, ) -> None: callback_safely(task_callback, status=TaskStatus.IN_PROGRESS) self._update_component_state(power=PowerState.ON) callback_safely( task_callback, status=TaskStatus.COMPLETED, result=(ResultCode.OK, "Completed successfully") ) return self.submit_task(_task, task_callback=task_callback) @check_communicating @override def reset(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Reset the component (from fault state). :param task_callback: callback to be called when the status of the command changes """ raise NotImplementedError("PstComponentManager is abstract.")
[docs] def configure_beam( self: PstComponentManager, task_callback: Callback = None, **kwargs: Any ) -> TaskResponse: """ Configure the beam specific configuration of the component. :param task_callback: callback for background processing to update device status. :type task_callback: Callback :param kwargs: configuration for beam :type kwargs: dict """ raise NotImplementedError("PstComponentManager is abstract.")
[docs] def deconfigure_beam(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Deconfigure the component's beam configuration. This will release all the resources associated with the component, including the SMRBs. :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ raise NotImplementedError("PstComponentManager is abstract.")
@override def configure_scan( self: PstComponentManager, task_callback: Callback = None, **kwargs: Any, ) -> TaskResponse: """ Configure the component for a scan. :param task_callback: callback for background processing to update device status. :type task_callback: Callback :param kwargs: the configuration to be configured :type kwargs: dict """ raise NotImplementedError("PstComponentManager is abstract.") @override @final def deconfigure(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Deconfigure this component for current scan configuration. .. deprecated:: 0.2.2 Use :meth:`deconfigure_scan` :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ return self.deconfigure_scan(task_callback=task_callback)
[docs] def deconfigure_scan(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Deconfigure this component for current scan configuration. :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ raise NotImplementedError("PstComponentManager is abstract.")
@override def obsreset(self: PstComponentManager, task_callback: Callback = None) -> TaskResponse: """ Reset the component to unconfigured but do not release resources. :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ raise NotImplementedError("PstComponentManager is abstract.")
[docs] def go_to_fault( self: PstComponentManager, fault_msg: str, task_callback: Callback = None ) -> TaskResponse: """ Set the component into a FAULT state. For BEAM this will make the sub-devices be put into a FAULT state. For API backed component managers it is expected that the service backing that API should be put into a FAULT state. """ raise NotImplementedError("PstComponentManager is abstract class")
[docs] def set_logging_level(self: PstComponentManager, log_level: LoggingLevel) -> None: """ Set LoggingLevel. :param log_level: The required TANGO LoggingLevel :returns: None. """ raise NotImplementedError("PstComponentManager is abstract class")