Source code for ska_low_mccs.transient_buffer.transient_buffer_device

#  -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS transient buffer device."""
from __future__ import annotations

import sys
from typing import Any, Optional

import tango
from ska_control_model import CommunicationStatus, HealthState, ResultCode
from ska_tango_base.base import SKABaseDevice
from tango.server import attribute

from ska_low_mccs.transient_buffer.transient_buffer_component_manager import (
    TransientBufferComponentManager,
)
from ska_low_mccs.transient_buffer.transient_buffer_health_model import (
    TransientBufferHealthModel,
)

__all__ = ["MccsTransientBuffer", "main"]


[docs]class MccsTransientBuffer(SKABaseDevice): """An implementation of a transient buffer Tango device for MCCS.""" # --------------- # Initialisation # ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """ Initialise this device object. :param args: positional args to the init :param kwargs: keyword args to the init """ # We aren't supposed to define initialisation methods for Tango # devices; we are only supposed to define an `init_device` method. But # we insist on doing so here, just so that we can define some # attributes, thereby stopping the linters from complaining about # "attribute-defined-outside-init" etc. We still need to make sure that # `init_device` re-initialises any values defined in here. super().__init__(*args, **kwargs) self.component_manager: TransientBufferComponentManager self._health_state: HealthState = HealthState.UNKNOWN self._health_model: TransientBufferHealthModel
[docs] def init_device(self: MccsTransientBuffer) -> None: """ Initialise the device. This is overridden here to change the Tango serialisation model. """ util = tango.Util.instance() util.set_serial_model(tango.SerialModel.NO_SYNC) super().init_device() self._build_state = sys.modules["ska_low_mccs"].__version_info__ self._version_id = sys.modules["ska_low_mccs"].__version__ device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' version = f"{device_name} Software Version: {self._version_id}" properties = f"Initialised {device_name}. Device has no properties." self.logger.info( "\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties )
def _init_state_model(self: MccsTransientBuffer) -> None: super()._init_state_model() self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late. self._health_model = TransientBufferHealthModel(self._component_state_callback) self.set_change_event("healthState", True, False)
[docs] def create_component_manager( self: MccsTransientBuffer, ) -> TransientBufferComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ return TransientBufferComponentManager( self.logger, self._communication_state_callback, self._component_state_callback, )
[docs] class InitCommand(SKABaseDevice.InitCommand): """ A class for :py:class:`~.MccsTransientBuffer`'s Init command. The :py:meth:`~.MccsTransientBuffer.InitCommand.do` method below is called upon :py:class:`~.MccsTransientBuffer`'s initialisation. """
[docs] def do( # type: ignore[override] self: MccsTransientBuffer.InitCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Initialise the attributes and properties of the MccsTransientBuffer. :param args: positional args to the component manager method :param kwargs: keyword args to the component manager method :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return (ResultCode.OK, "Init command completed OK")
# ---------- # Callbacks # ---------- def _communication_state_callback( self: MccsTransientBuffer, communication_state: CommunicationStatus, ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_state: the status of communications between the component manager and its component. """ action_map = { CommunicationStatus.DISABLED: "component_disconnected", CommunicationStatus.NOT_ESTABLISHED: "component_unknown", CommunicationStatus.ESTABLISHED: None, # wait for a power mode update } action = action_map[communication_state] if action is not None: self.op_state_model.perform_action(action) self._health_model.is_communicating( communication_state == CommunicationStatus.ESTABLISHED ) def _component_state_callback( self: MccsTransientBuffer, health: Optional[HealthState] = None, ) -> None: """ Handle change in the state of the component. This is a callback hook, called by the component manager when the state of the component changes. :param health: An optional parameter with the new health state of the device. """ if health is not None: if self._health_state != health: self._health_state = health self.push_change_event("healthState", health) # ---------- # Attributes # ----------
[docs] @attribute(dtype="DevString", label="stationId") def stationId(self: MccsTransientBuffer) -> int: """ Return the station id. :return: the station id """ return self.component_manager.station_id
[docs] @attribute(dtype="DevString", label="transientBufferJobId") def transientBufferJobId(self: MccsTransientBuffer) -> int: """ Return the transient buffer job id. :return: the transient buffer job id """ return self.component_manager.transient_buffer_job_id
[docs] @attribute(dtype="DevLong", label="resamplingBits") def resamplingBits(self: MccsTransientBuffer) -> int: """ Return the resampling bit depth. :return: the resampling bit depth """ return self.component_manager.resampling_bits
[docs] @attribute(dtype="DevShort", label="nStations") def nStations(self: MccsTransientBuffer) -> int: """ Return the number of stations. :return: the number of stations """ return self.component_manager.n_stations
[docs] @attribute( dtype=("DevDouble",), max_dim_x=100, label="transientFrequencyWindow", ) def transientFrequencyWindow(self: MccsTransientBuffer) -> list[float]: """ Return the transient frequency window. :return: the transient frequency window """ return self.component_manager.transient_frequency_window
[docs] @attribute(dtype=("DevString",), max_dim_x=100, label="stationIds") def stationIds(self: MccsTransientBuffer) -> list[str]: """ Return the station ids. :return: the station ids """ return self.component_manager.station_ids
# ---------- # Run server # ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return MccsTransientBuffer.run_server(args=args or None, **kwargs)
if __name__ == "__main__": main()