Source code for ska_pst.lmc.component.pst_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for the base TANGO device used in PST.LMC."""

from __future__ import annotations

import functools
from typing import Any, Callable, Generic, List, Optional, TypeVar, cast

import tango
from ska_control_model import CommunicationStatus, HealthState, LoggingLevel, ObsState, SimulationMode
from ska_csp_lmc_base import CspSubElementObsDevice
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.lmc.validation import PstConfigValidator, PstScanValidator
from ska_tango_base.base import BaseComponentManager
from ska_tango_base.commands import SubmittedSlowCommand
from tango import DebugIt
from tango.server import attribute, command, device_property

from ska_pst.common import TelescopeFacilityEnum

from .component_manager import PstComponentManager
from .obs_state_model import PstObsStateModel

__all__ = [
    "PstBaseDevice",
]

T = TypeVar("T", bound=PstComponentManager)
"""
Create a generic type for the component manager.

Doing this allows us to cast the component manager used in the base to have the correct type and allow for
tools like
`mypy <http://mypy-lang.org/>`_
to check if there are
errors.
"""

MONITOR_DATA_ATTR_MAP: dict = {
    # DSP pipeline attributes
    "processing_time": "dsp_processing_time",
    "processing_time_percent": "dsp_processing_time_percent",
    "data_time": "dsp_data_time",
    "bytes_processed": "dsp_bytes_processed",
    "bytes_processing_rate": "dsp_bytes_processing_rate",
    "overall_efficiency": "dsp_overall_efficiency",
    "efficiency": "dsp_efficiency",
}
"""
A dictionary to map a monitoring data attribute to a value exposed in the TANGO device.

The DSP pipeline attributes have a ``dsp_`` prefix to them, but it is possible
in the future that we may need to remap other attribute names.
"""


[docs]def as_device_attribute_name(attr_name: str) -> str: """ Convert attribute name to a TANGO device attribute name. Device attribute names should be in lower camel case (i.e. availableDiskSpace) not Python's snake case (i.e. available_disk_space). This is a utility method that makes the conversion easier. :param attr_name: the attribute name to convert from snake case. :type attr_name: str :return: the lower camel case version of input string. :rtype: str """ attr_name = MONITOR_DATA_ATTR_MAP.get(attr_name, attr_name) # split attribute name by underscores head, *tail = attr_name.split("_") # capitalise the first letter of each component except first. return head + "".join(x.title() for x in tail)
[docs]class PstBaseDevice(CspSubElementObsDevice, Generic[T]): """ Base class for all the TANGO devices in PST.LMC. This extends from :py:class:`CspSubElementObsDevice` but is also generic in the type of the component manager. The `CspSubElementObsDevice` class is used as a base rather than a `SKASubarray` due to the fact this PST is a software subelement within CSP. """ # --------------- # Device properties # --------------- Facility = device_property( dtype=str, default_value="Low", doc=( "The SKA facility that this device is being used for. The " "default value is 'Low' and the only valid values are 'Mid' or 'Low'." ), ) SubsystemId = device_property( dtype=str, doc=( "The subsystem that PST device is deployed in. This should be " "equal to either 'pst-low' or 'pst-mid'." ), ) DefaultMonitoringPollingRate = device_property( dtype=int, default_value=DEFAULT_MONITORING_INTERVAL_MS, doc="Rate at which monitor polling should happen, in milliseconds.", ) DefaultHealthCheckInterval = device_property( dtype=int, default_value=DEFAULT_HEALTH_CHECK_INTERVAL_MS, doc="The interval with which the CORE apps report their health state, in milliseconds.", ) # --------------- # General methods # ---------------
[docs] def init_device(self: PstBaseDevice) -> None: """ Initialise the attributes and properties of the device. This overrides the :py:class:`SKABaseDevice`. """ util = tango.Util.instance() util.set_serial_model(tango.SerialModel.NO_SYNC) self._monitoring_polling_rate_ms: int = self.DefaultMonitoringPollingRate self._health_check_interval: int = self.DefaultHealthCheckInterval super().init_device()
[docs] def init_command_objects(self: PstBaseDevice) -> None: """Set up the command objects.""" super().init_command_objects() def _callback(hook: str, running: bool) -> None: action = "invoked" if running else "completed" self.obs_state_model.perform_action(f"{hook}_{action}") for command_name, method_name, state_model_hook in [ ("Reset", "reset", "reset"), ("GoToFault", "go_to_fault", None), ]: callback = None if state_model_hook is None else functools.partial(_callback, state_model_hook) self.register_command_object( command_name, SubmittedSlowCommand( command_name, self._command_tracker, self.component_manager, method_name, callback=callback, logger=None, ), )
def _init_state_model(self: PstBaseDevice) -> None: """Set up the state model for the device.""" super()._init_state_model() self.obs_state_model = PstObsStateModel( logger=self.logger, callback=self._update_obs_state, ) def _init_logging(self: PstBaseDevice) -> None: """ Initialize the logging mechanism, using default properties. This extends the _init_logging found in SKABaseDevice to monkeypatch of TANGO log streams. This is needed in 9.4.2 as PyTango added a key 'source' that is not part of the Python standard logging framework. This is fix here is sort of a backport from ska-tango-base v0.19.1 but is less verbose. When we migrate to ska-tango-base >= 0.19.1 this can be removed. """ super()._init_logging() def _patch_stream(log_fn: Callable) -> Callable[..., None]: def _log_patch( *args: Any, source: str | None = None, # pylint: disable=unused-argument **kwargs: Any, ) -> None: log_fn(*args, **kwargs) return _log_patch self.debug_stream = _patch_stream(self.logger.debug) self.info_stream = _patch_stream(self.logger.info) self.info_stream = _patch_stream(self.logger.info) self.warn_stream = _patch_stream(self.logger.warning) self.error_stream = _patch_stream(self.logger.error) self.fatal_stream = _patch_stream(self.logger.critical)
[docs] def always_executed_hook(self: PstBaseDevice) -> None: """Execute call before any TANGO command is executed."""
[docs] def delete_device(self: PstBaseDevice) -> None: """ Delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """
[docs] def create_component_manager(self: PstBaseDevice) -> T: """ Create and return a component manager for this device. :raises NotImplementedError: for no implementation """ raise NotImplementedError( "PstBaseDevice is abstract; implement 'create_component_manager` method in " "a subclass.`" )
[docs] def handle_component_state_change(self: PstBaseDevice, *args: Any, **kwargs: Any) -> None: """ Handle change in this device's state. This overrides the :py:class:`PstDeviceInterface` and calls `_component_state_changed` on this class. """ self._component_state_changed(*args, **kwargs)
[docs] def handle_communication_state_change( self: PstBaseDevice, communication_state: CommunicationStatus ) -> None: """ Handle a change in device's communication state. This just calls the `SKABaseDevice._communication_state_changed` method """ self._communication_state_changed(communication_state=communication_state)
[docs] def handle_attribute_value_update(self: PstBaseDevice, attribute_name: str, value: Any) -> None: """ Handle update of a device attribute value. :param attribute_name: the name of the attribute to update. :type attribute_name: str :param value: the new value of the attribute to update to. :type value: Any """ try: attr_key = as_device_attribute_name(attribute_name) self.push_change_event(attr_key, value) self.push_archive_event(attr_key, value) except Exception: self.logger.warning( f"Error in attempting to set device attribute {attribute_name}.", exc_info=True )
def _component_state_changed( # type: ignore[override] self: PstBaseDevice, obsfault: Optional[bool] = None, reset: Optional[bool] = None, **kwargs: Any, ) -> None: """ Handle change in this device's state. This overrides the `ska_tango_base.SKASubarray` method to allow for handling of when the device goes into a fault state. :param obsfault: whether there is a fault. If set to true this will put the system into a FAULT state. """ super()._component_state_changed(**kwargs) if obsfault: self.obs_state_model.perform_action("component_obsfault") @property # type: ignore[override] def component_manager(self: PstBaseDevice) -> T: # type: ignore[override] """ Get component manager. Overrides the super class property of component_manager to be typesafe. :returns: the component manager casted to type T. """ return cast(T, self._component_manager) @component_manager.setter def component_manager(self: PstBaseDevice, component_manager: BaseComponentManager) -> None: self._component_manager = component_manager @property def beam_id(self: PstBaseDevice) -> int: """Get the ID of the beam this device belongs to.""" return self.DeviceID @property def device_name(self: PstBaseDevice) -> str: """ Get the name of the device. This is the relative device name (e.g. low_psi/beam/01) and not the FQDN which can include the Tango DB in a URL. """ return self.get_name() @property def facility(self: PstBaseDevice) -> TelescopeFacilityEnum: """Get the facility that this device is being used for.""" return TelescopeFacilityEnum[self.Facility] @property def subsystem_id(self: PstBaseDevice) -> str: """Get the sub-system id where device is deployed.""" return self.SubsystemId @property def obs_state(self: PstBaseDevice) -> ObsState: """ Get the current observing state of the device. :return: the current observing state of the device. :rtype: ObsState """ return self._obs_state
[docs] def handle_fault(self: PstBaseDevice, fault_msg: str) -> None: """Handle putting the device into a fault state.""" self.logger.warning(f"{self.device_name} received a fault with error message: '{fault_msg}'") self._health_failure_msg = fault_msg self._component_state_changed(obsfault=True) self.update_health_state(health_state=HealthState.FAILED)
[docs] def update_health_state(self: PstBaseDevice, health_state: HealthState) -> None: """ Update the health state of the device. This delegates to the base class `_update_health_state` :param health_state: the health state of the beam to update to. :type health_state: HealthState """ if self._health_state != health_state: self._update_health_state(health_state)
[docs] def set_logging_level(self: PstBaseDevice, value: LoggingLevel) -> None: """ Set the logging level for the device. Both the Python logger and the TANGO logger are updated. This calls :py:meth:`SKABaseDevice.set_logging_level` then delegates to the component manager to perform necessary updates. This is used by having the BEAM.MGMT to update the subordinate Tango devices, which in turn update the core apps. :param value: Logging level for logger :raises LoggingLevelError: for invalid value """ super().set_logging_level(value) try: if hasattr(self, "_component_manager") and self._component_manager is not None: cast(PstComponentManager, self.component_manager).set_logging_level(value) except Exception: pass
# ----------- # Commands # -----------
[docs] class ConfigureScanCommand(CspSubElementObsDevice.ConfigureScanCommand): """ A class for the ObsDevice ConfigureScan command. This overrides the constructor to ensure that the correct validator is used. The base CSP ObsDevice ConfigureScanCommand doesn't allow passing through a validator. """ def __init__(self: PstBaseDevice.ConfigureScanCommand, *args: Any, **kwargs: Any) -> None: """Initialise a new ConfigureScanCommand instance.""" super().__init__(*args, **kwargs) self._validator = PstConfigValidator()
[docs] class ScanCommand(CspSubElementObsDevice.ScanCommand): """A class for the PST's ConfigureScan command.""" def __init__(self: PstBaseDevice.ScanCommand, *args: Any, **kwargs: Any) -> None: """Initialise a new ScanCommand instance.""" super().__init__(*args, **kwargs) self._validator = PstScanValidator()
# ---------- # Attributes # ---------- @attribute( dtype=SimulationMode, memorized=True, hw_memorized=True, ) def simulationMode(self: PstBaseDevice) -> SimulationMode: """ Report the simulation mode of the device. :return: the current simulation mode """ return self.component_manager.simulation_mode def _simulation_mode_allowed_obs_states(self: PstBaseDevice) -> List[ObsState]: return [ObsState.EMPTY, ObsState.IDLE] @simulationMode.write # type: ignore[no-redef] def simulationMode(self: PstBaseDevice, value: SimulationMode) -> None: """ Set the simulation mode. :param value: The simulation mode, as a SimulationMode value """ if self._obs_state in self._simulation_mode_allowed_obs_states(): self._simulation_mode = value self.push_change_event("simulationMode", value) self.push_archive_event("simulationMode", value) self.component_manager.simulation_mode = value else: self.logger.warning( f"Attempt to set simulation mode when not in EMPTY state. Current state is {self._obs_state}" ) raise ValueError("Unable to change simulation mode unless in EMPTY observation state") @command( dtype_in="DevString", doc_in="The reason for why the device needs to go to a FAULT state.", dtype_out="DevVarLongStringArray", doc_out="([Command ResultCode], [Unique ID of the command])", ) @DebugIt() def GoToFault(self: PstBaseDevice, argin: str) -> Any: """ Put the device and sub-devices and services into a FAULT state. This is implemented as a long running command as a service may take some time to respond. :return: A tuple containing a result code and the unique ID of the command :rtype: ([ResultCode], [str]) """ handler = self.get_command_object("GoToFault") result_code, message = handler(argin) return [[result_code], [message]]