# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS transient buffer device."""
from __future__ import annotations
import sys
from typing import Any, Optional
import tango
from ska_control_model import CommunicationStatus, HealthState, ResultCode
from ska_tango_base.base import SKABaseDevice
from tango.server import attribute
from ska_low_mccs.transient_buffer.transient_buffer_component_manager import (
TransientBufferComponentManager,
)
from ska_low_mccs.transient_buffer.transient_buffer_health_model import (
TransientBufferHealthModel,
)
__all__ = ["MccsTransientBuffer", "main"]
[docs]class MccsTransientBuffer(SKABaseDevice):
"""An implementation of a transient buffer Tango device for MCCS."""
# ---------------
# Initialisation
# ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self.component_manager: TransientBufferComponentManager
self._health_state: HealthState = HealthState.UNKNOWN
self._health_model: TransientBufferHealthModel
[docs] def init_device(self: MccsTransientBuffer) -> None:
"""
Initialise the device.
This is overridden here to change the Tango serialisation model.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = f"Initialised {device_name}. Device has no properties."
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
def _init_state_model(self: MccsTransientBuffer) -> None:
super()._init_state_model()
self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late.
self._health_model = TransientBufferHealthModel(self._component_state_callback)
self.set_change_event("healthState", True, False)
[docs] def create_component_manager(
self: MccsTransientBuffer,
) -> TransientBufferComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return TransientBufferComponentManager(
self.logger,
self._communication_state_callback,
self._component_state_callback,
)
[docs] class InitCommand(SKABaseDevice.InitCommand):
"""
A class for :py:class:`~.MccsTransientBuffer`'s Init command.
The :py:meth:`~.MccsTransientBuffer.InitCommand.do` method below is
called upon :py:class:`~.MccsTransientBuffer`'s initialisation.
"""
[docs] def do( # type: ignore[override]
self: MccsTransientBuffer.InitCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Initialise the attributes and properties of the MccsTransientBuffer.
:param args: positional args to the component manager method
:param kwargs: keyword args to the component manager method
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
return (ResultCode.OK, "Init command completed OK")
# ----------
# Callbacks
# ----------
def _communication_state_callback(
self: MccsTransientBuffer,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: None, # wait for a power mode update
}
action = action_map[communication_state]
if action is not None:
self.op_state_model.perform_action(action)
self._health_model.is_communicating(
communication_state == CommunicationStatus.ESTABLISHED
)
def _component_state_callback(
self: MccsTransientBuffer,
health: Optional[HealthState] = None,
) -> None:
"""
Handle change in the state of the component.
This is a callback hook, called by the component manager when
the state of the component changes.
:param health: An optional parameter with the new health state of the device.
"""
if health is not None:
if self._health_state != health:
self._health_state = health
self.push_change_event("healthState", health)
# ----------
# Attributes
# ----------
[docs] @attribute(dtype="DevString", label="stationId")
def stationId(self: MccsTransientBuffer) -> int:
"""
Return the station id.
:return: the station id
"""
return self.component_manager.station_id
[docs] @attribute(dtype="DevString", label="transientBufferJobId")
def transientBufferJobId(self: MccsTransientBuffer) -> int:
"""
Return the transient buffer job id.
:return: the transient buffer job id
"""
return self.component_manager.transient_buffer_job_id
[docs] @attribute(dtype="DevLong", label="resamplingBits")
def resamplingBits(self: MccsTransientBuffer) -> int:
"""
Return the resampling bit depth.
:return: the resampling bit depth
"""
return self.component_manager.resampling_bits
[docs] @attribute(dtype="DevShort", label="nStations")
def nStations(self: MccsTransientBuffer) -> int:
"""
Return the number of stations.
:return: the number of stations
"""
return self.component_manager.n_stations
[docs] @attribute(
dtype=("DevDouble",),
max_dim_x=100,
label="transientFrequencyWindow",
)
def transientFrequencyWindow(self: MccsTransientBuffer) -> list[float]:
"""
Return the transient frequency window.
:return: the transient frequency window
"""
return self.component_manager.transient_frequency_window
[docs] @attribute(dtype=("DevString",), max_dim_x=100, label="stationIds")
def stationIds(self: MccsTransientBuffer) -> list[str]:
"""
Return the station ids.
:return: the station ids
"""
return self.component_manager.station_ids
# ----------
# Run server
# ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsTransientBuffer.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()