# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for the base TANGO device used in PST.LMC."""
from __future__ import annotations
import functools
from typing import Any, Callable, Generic, List, Optional, TypeVar, cast
import tango
from ska_control_model import CommunicationStatus, HealthState, LoggingLevel, ObsState, SimulationMode
from ska_csp_lmc_base import CspSubElementObsDevice
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.lmc.validation import PstConfigValidator, PstScanValidator
from ska_tango_base.base import BaseComponentManager
from ska_tango_base.commands import SubmittedSlowCommand
from tango import DebugIt
from tango.server import attribute, command, device_property
from ska_pst.common import TelescopeFacilityEnum
from .component_manager import PstComponentManager
from .obs_state_model import PstObsStateModel
__all__ = [
"PstBaseDevice",
]
T = TypeVar("T", bound=PstComponentManager)
"""
Create a generic type for the component manager.
Doing this allows us to cast the component manager used in the base to have the correct type and allow for
tools like
`mypy <http://mypy-lang.org/>`_
to check if there are
errors.
"""
MONITOR_DATA_ATTR_MAP: dict = {
# DSP pipeline attributes
"processing_time": "dsp_processing_time",
"processing_time_percent": "dsp_processing_time_percent",
"data_time": "dsp_data_time",
"bytes_processed": "dsp_bytes_processed",
"bytes_processing_rate": "dsp_bytes_processing_rate",
"overall_efficiency": "dsp_overall_efficiency",
"efficiency": "dsp_efficiency",
}
"""
A dictionary to map a monitoring data attribute to a value exposed in the TANGO device.
The DSP pipeline attributes have a ``dsp_`` prefix to them, but it is possible
in the future that we may need to remap other attribute names.
"""
[docs]def as_device_attribute_name(attr_name: str) -> str:
"""
Convert attribute name to a TANGO device attribute name.
Device attribute names should be in lower camel case (i.e. availableDiskSpace) not Python's snake case
(i.e. available_disk_space). This is a utility method that makes the conversion easier.
:param attr_name: the attribute name to convert from snake case.
:type attr_name: str
:return: the lower camel case version of input string.
:rtype: str
"""
attr_name = MONITOR_DATA_ATTR_MAP.get(attr_name, attr_name)
# split attribute name by underscores
head, *tail = attr_name.split("_")
# capitalise the first letter of each component except first.
return head + "".join(x.title() for x in tail)
[docs]class PstBaseDevice(CspSubElementObsDevice, Generic[T]):
"""
Base class for all the TANGO devices in PST.LMC.
This extends from :py:class:`CspSubElementObsDevice` but is also
generic in the type of the component manager.
The `CspSubElementObsDevice` class is used as a base rather
than a `SKASubarray` due to the fact this PST is a software
subelement within CSP.
"""
# ---------------
# Device properties
# ---------------
Facility = device_property(
dtype=str,
default_value="Low",
doc=(
"The SKA facility that this device is being used for. The "
"default value is 'Low' and the only valid values are 'Mid' or 'Low'."
),
)
SubsystemId = device_property(
dtype=str,
doc=(
"The subsystem that PST device is deployed in. This should be "
"equal to either 'pst-low' or 'pst-mid'."
),
)
DefaultMonitoringPollingRate = device_property(
dtype=int,
default_value=DEFAULT_MONITORING_INTERVAL_MS,
doc="Rate at which monitor polling should happen, in milliseconds.",
)
DefaultHealthCheckInterval = device_property(
dtype=int,
default_value=DEFAULT_HEALTH_CHECK_INTERVAL_MS,
doc="The interval with which the CORE apps report their health state, in milliseconds.",
)
# ---------------
# General methods
# ---------------
[docs] def init_device(self: PstBaseDevice) -> None:
"""
Initialise the attributes and properties of the device.
This overrides the :py:class:`SKABaseDevice`.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
self._monitoring_polling_rate_ms: int = self.DefaultMonitoringPollingRate
self._health_check_interval: int = self.DefaultHealthCheckInterval
super().init_device()
[docs] def init_command_objects(self: PstBaseDevice) -> None:
"""Set up the command objects."""
super().init_command_objects()
def _callback(hook: str, running: bool) -> None:
action = "invoked" if running else "completed"
self.obs_state_model.perform_action(f"{hook}_{action}")
for command_name, method_name, state_model_hook in [
("Reset", "reset", "reset"),
("GoToFault", "go_to_fault", None),
]:
callback = None if state_model_hook is None else functools.partial(_callback, state_model_hook)
self.register_command_object(
command_name,
SubmittedSlowCommand(
command_name,
self._command_tracker,
self.component_manager,
method_name,
callback=callback,
logger=None,
),
)
def _init_state_model(self: PstBaseDevice) -> None:
"""Set up the state model for the device."""
super()._init_state_model()
self.obs_state_model = PstObsStateModel(
logger=self.logger,
callback=self._update_obs_state,
)
def _init_logging(self: PstBaseDevice) -> None:
"""
Initialize the logging mechanism, using default properties.
This extends the _init_logging found in SKABaseDevice to monkeypatch of TANGO log streams. This is
needed in 9.4.2 as PyTango added a key 'source' that is not part of the Python standard logging
framework. This is fix here is sort of a backport from ska-tango-base v0.19.1 but is less verbose.
When we migrate to ska-tango-base >= 0.19.1 this can be removed.
"""
super()._init_logging()
def _patch_stream(log_fn: Callable) -> Callable[..., None]:
def _log_patch(
*args: Any,
source: str | None = None, # pylint: disable=unused-argument
**kwargs: Any,
) -> None:
log_fn(*args, **kwargs)
return _log_patch
self.debug_stream = _patch_stream(self.logger.debug)
self.info_stream = _patch_stream(self.logger.info)
self.info_stream = _patch_stream(self.logger.info)
self.warn_stream = _patch_stream(self.logger.warning)
self.error_stream = _patch_stream(self.logger.error)
self.fatal_stream = _patch_stream(self.logger.critical)
[docs] def always_executed_hook(self: PstBaseDevice) -> None:
"""Execute call before any TANGO command is executed."""
[docs] def delete_device(self: PstBaseDevice) -> None:
"""
Delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the init_device method to be
released. This method is called by the device destructor and by the device Init command.
"""
[docs] def create_component_manager(self: PstBaseDevice) -> T:
"""
Create and return a component manager for this device.
:raises NotImplementedError: for no implementation
"""
raise NotImplementedError(
"PstBaseDevice is abstract; implement 'create_component_manager` method in " "a subclass.`"
)
[docs] def handle_component_state_change(self: PstBaseDevice, *args: Any, **kwargs: Any) -> None:
"""
Handle change in this device's state.
This overrides the :py:class:`PstDeviceInterface` and calls `_component_state_changed` on
this class.
"""
self._component_state_changed(*args, **kwargs)
[docs] def handle_communication_state_change(
self: PstBaseDevice, communication_state: CommunicationStatus
) -> None:
"""
Handle a change in device's communication state.
This just calls the `SKABaseDevice._communication_state_changed` method
"""
self._communication_state_changed(communication_state=communication_state)
[docs] def handle_attribute_value_update(self: PstBaseDevice, attribute_name: str, value: Any) -> None:
"""
Handle update of a device attribute value.
:param attribute_name: the name of the attribute to update.
:type attribute_name: str
:param value: the new value of the attribute to update to.
:type value: Any
"""
try:
attr_key = as_device_attribute_name(attribute_name)
self.push_change_event(attr_key, value)
self.push_archive_event(attr_key, value)
except Exception:
self.logger.warning(
f"Error in attempting to set device attribute {attribute_name}.", exc_info=True
)
def _component_state_changed( # type: ignore[override]
self: PstBaseDevice,
obsfault: Optional[bool] = None,
reset: Optional[bool] = None,
**kwargs: Any,
) -> None:
"""
Handle change in this device's state.
This overrides the `ska_tango_base.SKASubarray` method to allow
for handling of when the device goes into a fault state.
:param obsfault: whether there is a fault. If set to true this
will put the system into a FAULT state.
"""
super()._component_state_changed(**kwargs)
if obsfault:
self.obs_state_model.perform_action("component_obsfault")
@property # type: ignore[override]
def component_manager(self: PstBaseDevice) -> T: # type: ignore[override]
"""
Get component manager.
Overrides the super class property of component_manager to be typesafe.
:returns: the component manager casted to type T.
"""
return cast(T, self._component_manager)
@component_manager.setter
def component_manager(self: PstBaseDevice, component_manager: BaseComponentManager) -> None:
self._component_manager = component_manager
@property
def beam_id(self: PstBaseDevice) -> int:
"""Get the ID of the beam this device belongs to."""
return self.DeviceID
@property
def device_name(self: PstBaseDevice) -> str:
"""
Get the name of the device.
This is the relative device name (e.g. low_psi/beam/01) and not the FQDN which can include the Tango
DB in a URL.
"""
return self.get_name()
@property
def facility(self: PstBaseDevice) -> TelescopeFacilityEnum:
"""Get the facility that this device is being used for."""
return TelescopeFacilityEnum[self.Facility]
@property
def subsystem_id(self: PstBaseDevice) -> str:
"""Get the sub-system id where device is deployed."""
return self.SubsystemId
@property
def obs_state(self: PstBaseDevice) -> ObsState:
"""
Get the current observing state of the device.
:return: the current observing state of the device.
:rtype: ObsState
"""
return self._obs_state
[docs] def handle_fault(self: PstBaseDevice, fault_msg: str) -> None:
"""Handle putting the device into a fault state."""
self.logger.warning(f"{self.device_name} received a fault with error message: '{fault_msg}'")
self._health_failure_msg = fault_msg
self._component_state_changed(obsfault=True)
self.update_health_state(health_state=HealthState.FAILED)
[docs] def update_health_state(self: PstBaseDevice, health_state: HealthState) -> None:
"""
Update the health state of the device.
This delegates to the base class `_update_health_state`
:param health_state: the health state of the beam to update to.
:type health_state: HealthState
"""
if self._health_state != health_state:
self._update_health_state(health_state)
[docs] def set_logging_level(self: PstBaseDevice, value: LoggingLevel) -> None:
"""
Set the logging level for the device.
Both the Python logger and the TANGO logger are updated.
This calls :py:meth:`SKABaseDevice.set_logging_level` then delegates
to the component manager to perform necessary updates. This
is used by having the BEAM.MGMT to update the subordinate Tango
devices, which in turn update the core apps.
:param value: Logging level for logger
:raises LoggingLevelError: for invalid value
"""
super().set_logging_level(value)
try:
if hasattr(self, "_component_manager") and self._component_manager is not None:
cast(PstComponentManager, self.component_manager).set_logging_level(value)
except Exception:
pass
# -----------
# Commands
# -----------
[docs] class ScanCommand(CspSubElementObsDevice.ScanCommand):
"""A class for the PST's ConfigureScan command."""
def __init__(self: PstBaseDevice.ScanCommand, *args: Any, **kwargs: Any) -> None:
"""Initialise a new ScanCommand instance."""
super().__init__(*args, **kwargs)
self._validator = PstScanValidator()
# ----------
# Attributes
# ----------
@attribute(
dtype=SimulationMode,
memorized=True,
hw_memorized=True,
)
def simulationMode(self: PstBaseDevice) -> SimulationMode:
"""
Report the simulation mode of the device.
:return: the current simulation mode
"""
return self.component_manager.simulation_mode
def _simulation_mode_allowed_obs_states(self: PstBaseDevice) -> List[ObsState]:
return [ObsState.EMPTY, ObsState.IDLE]
@simulationMode.write # type: ignore[no-redef]
def simulationMode(self: PstBaseDevice, value: SimulationMode) -> None:
"""
Set the simulation mode.
:param value: The simulation mode, as a SimulationMode value
"""
if self._obs_state in self._simulation_mode_allowed_obs_states():
self._simulation_mode = value
self.push_change_event("simulationMode", value)
self.push_archive_event("simulationMode", value)
self.component_manager.simulation_mode = value
else:
self.logger.warning(
f"Attempt to set simulation mode when not in EMPTY state. Current state is {self._obs_state}"
)
raise ValueError("Unable to change simulation mode unless in EMPTY observation state")
@command(
dtype_in="DevString",
doc_in="The reason for why the device needs to go to a FAULT state.",
dtype_out="DevVarLongStringArray",
doc_out="([Command ResultCode], [Unique ID of the command])",
)
@DebugIt()
def GoToFault(self: PstBaseDevice, argin: str) -> Any:
"""
Put the device and sub-devices and services into a FAULT state.
This is implemented as a long running command as a service may take some time to respond.
:return: A tuple containing a result code and the unique ID of the command
:rtype: ([ResultCode], [str])
"""
handler = self.get_command_object("GoToFault")
result_code, message = handler(argin)
return [[result_code], [message]]