Source code for ska_low_mccs.controller.controller_device

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module contains the SKA Low MCCS Controller device prototype."""

from __future__ import annotations  # allow forward references in type hints

import importlib  # allow forward references in type hints
import json
import logging
import sys
import threading
from typing import Any, Callable, Final, Optional, cast

import tango
from ska_control_model import (
    CommunicationStatus,
    HealthState,
    ObsState,
    PowerState,
    ResultCode,
)
from ska_tango_base.base import BaseComponentManager, CommandTracker, SKABaseDevice
from ska_tango_base.commands import (
    DeviceInitCommand,
    FastCommand,
    JsonValidator,
    SubmittedSlowCommand,
)
from tango.server import attribute, command, device_property

from ska_low_mccs.controller.controller_component_manager import (
    ControllerComponentManager,
)
from ska_low_mccs.controller.controller_health_model import ControllerHealthModel

__all__ = ["MccsController", "main"]

DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]


# pylint: disable=too-many-instance-attributes, too-many-public-methods
[docs]class MccsController(SKABaseDevice): """An implementation of a controller Tango device for MCCS.""" # ----------------- # Device Properties # ----------------- MccsSubarrays = device_property(dtype="DevVarStringArray", default_value=[]) MccsStations = device_property(dtype="DevVarStringArray", default_value=[]) MccsSubarrayBeams = device_property(dtype="DevVarStringArray", default_value=[]) MccsStationBeams = device_property(dtype="DevVarStringArray", default_value=[]) ObsCommandTimeout = device_property( dtype="int", default_value=60, doc="The timeout in seconds for Observation commands.", ) # --------------- # Initialisation # ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """ Initialise this device object. :param args: positional args to the init :param kwargs: keyword args to the init """ # We aren't supposed to define initialisation methods for Tango # devices; we are only supposed to define an `init_device` method. But # we insist on doing so here, just so that we can define some # attributes, thereby stopping the linters from complaining about # "attribute-defined-outside-init" etc. We still need to make sure that # `init_device` re-initialises any values defined in here. super().__init__(*args, **kwargs) self._communication_state: Optional[CommunicationStatus] self._num_subservients: int self._missed_events: int self._health_state: HealthState = HealthState.UNKNOWN self._health_model: ControllerHealthModel self.component_manager: ControllerComponentManager # type: ignore[assignment]
[docs] def init_device(self: MccsController) -> None: """ Initialise the device. This is overridden here to change the Tango serialisation model. """ util = tango.Util.instance() util.set_serial_model(tango.SerialModel.NO_SYNC) self._power_state_lock = threading.RLock() self._communication_state = None self._component_power_state: Optional[PowerState] = None self._num_subservients = 0 self._missed_events = 0 super().init_device() self._build_state = sys.modules["ska_low_mccs"].__version_info__ self._version_id = sys.modules["ska_low_mccs"].__version__ device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' version = f"{device_name} Software Version: {self._version_id}" properties = ( f"Initialised {device_name} device with properties:\n" f"\tMccsSubarrays: {self.MccsSubarrays}\n" f"\tMccsStations: {self.MccsStations}\n" f"\tMccsSubarrayBeams: {self.MccsSubarrayBeams}\n" f"\tMccsStationBeams: {self.MccsStationBeams}\n" ) self.logger.info( "\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties )
def _init_state_model(self: MccsController) -> None: """Initialise the state model.""" super()._init_state_model() self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late. self._health_model = ControllerHealthModel( self._health_changed, [trl for trl in self.MccsStations if trl != ""], [trl for trl in self.MccsSubarrayBeams if trl != ""], [trl for trl in self.MccsStationBeams if trl != ""], { "stations_failed_threshold": 0.05, "stations_degraded_threshold": 0.2, "subarray_beams_failed_threshold": 0.05, "subarray_beams_degraded_threshold": 0.2, "station_beams_failed_threshold": 0.05, "station_beams_degraded_threshold": 0.2, }, ) self.set_change_event("healthState", True, False)
[docs] def create_component_manager( self: MccsController, ) -> ControllerComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ subarrays = [trl for trl in self.MccsSubarrays if trl != ""] stations = [trl for trl in self.MccsStations if trl != ""] subarraybeams = [trl for trl in self.MccsSubarrayBeams if trl != ""] stationbeams = [trl for trl in self.MccsStationBeams if trl != ""] self._num_subservients = len( subarrays + stations + subarraybeams + stationbeams ) return ControllerComponentManager( subarrays, stations, subarraybeams, stationbeams, self.logger, self.ObsCommandTimeout, self._communication_state_callback, self._component_state_callback, )
[docs] def init_command_objects(self: MccsController) -> None: """Set up the handler objects for Commands.""" super().init_command_objects() # # Fast commands # for command_name, command_object in [ ("ReleaseAll", self.ReleaseAllCommand), ("Release", self.ReleaseCommand), ]: self.register_command_object( command_name, command_object(self.component_manager, self.logger), ) for command_name, method_name in [ ("RestartSubarray", "restart_subarray"), ("GetHealthTrl", "get_health_trl"), ]: self.register_command_object( command_name, SubmittedSlowCommand( command_name, self._command_tracker, cast(BaseComponentManager, self.component_manager), method_name, callback=None, logger=self.logger, ), ) self.register_command_object( "Allocate", self.AllocateCommand( self._command_tracker, self.component_manager, callback=None, logger=self.logger, ), )
[docs] class InitCommand(DeviceInitCommand): """ A class for :py:class:`~.MccsController`'s Init command. The :py:meth:`~.MccsController.InitCommand.do` method below is called during :py:class:`~.MccsController`'s initialisation. """
[docs] def do( # type: ignore[override] self: MccsController.InitCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Initialise the attributes and properties of the `MccsController`. :param args: positional args to the component manager method :param kwargs: keyword args to the component manager method :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ self._device._build_state = sys.modules["ska_low_mccs"].__version_info__ self._device._version_id = sys.modules["ska_low_mccs"].__version__ return (ResultCode.OK, "Initialisation complete")
# ---------- # Callbacks # ---------- def _communication_state_callback( self: MccsController, communication_state: CommunicationStatus, ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_state: the status of communications between the component manager and its component. """ # TODO: This method and the next are implemented to work around # the following issue: # * The controller component manager's communication status # depends on the status of communication with all subservient # devices. # * But at present its power mode depends only on the power mode # of stations. # Once communication with the stations is # established, these start reporting power mode, and if they all # report their power mode before communications with all other # devices is established, then the component manager ends up # publishing a power mode change *before* is has announced that # comms is established. This leads to problems. # Eventually we should figure out a more elegant way to handle # this. self._communication_state = communication_state if communication_state == CommunicationStatus.DISABLED: self.op_state_model.perform_action("component_disconnected") elif communication_state == CommunicationStatus.NOT_ESTABLISHED: self.op_state_model.perform_action("component_unknown") elif self._num_subservients == 0: self.op_state_model.perform_action("component_on") # If we had subdevices, in a callback for a power change, they would # trigger this, since we don't have any, we'll have to trigger it manually. self.component_manager._evaluate_power_state() elif self._component_power_state == PowerState.OFF: self.op_state_model.perform_action("component_off") elif self._component_power_state == PowerState.STANDBY: self.op_state_model.perform_action("component_standby") elif self._component_power_state == PowerState.ON: self.op_state_model.perform_action("component_on") elif self._component_power_state == PowerState.UNKNOWN: self.op_state_model.perform_action("component_unknown") self._health_model.update_state( communicating=communication_state == CommunicationStatus.ESTABLISHED ) # pylint: disable=too-many-branches, too-many-arguments def _component_state_callback( self: MccsController, fault: Optional[bool] = None, power: Optional[PowerState] = None, health: Optional[HealthState] = None, trl: Optional[str] = None, station_id: Optional[str] = None, obsstate_changed: Optional[ObsState] = None, missed_event: Optional[bool] = None, ) -> None: """ Handle change in the state of the component. This is a callback hook, called by the component manager when the state of the component changes. :param fault: An optional flag if the device is entering or exiting a fault state. :param power: An optional parameter with the new power state of the device. :param health: An optional parameter with the new health state of the device. :param trl: The TRL of the device. :param station_id: The station ID of the component, currently only for station beam pooling by station. :param obsstate_changed: An optional parameter with the new ObsState of a subservient ObsDevice. :param missed_event: whether the component manager has detected a missed change event. """ action_map = { PowerState.OFF: "component_off", PowerState.STANDBY: "component_standby", PowerState.ON: "component_on", PowerState.UNKNOWN: "component_unknown", } if power is not None: if trl is None: if self._communication_state == CommunicationStatus.ESTABLISHED: self.op_state_model.perform_action(action_map[power]) self._health_model.update_state(power=power) else: device_family = trl.split("/")[1] if device_family in ["station"]: with self._power_state_lock: self.component_manager._station_power_state_changed(trl, power) self.component_manager._device_power_states[trl] = power self.component_manager._evaluate_power_state() if health is not None: if trl is None: if self._health_state != health: self._health_state = health self.push_change_event("healthState", health) else: device_family = trl.split("/")[1] if device_family == "subarray": self.component_manager._subarray_health_changed(trl, health) elif device_family == "station": self._health_model.station_health_changed(trl, health) elif device_family == "subarraybeam": self.component_manager._subarray_beam_health_changed(trl, health) self._health_model.subarray_beam_health_changed(trl, health) elif device_family == "beam": self.component_manager._station_beam_health_changed(trl, health) self._health_model.station_beam_health_changed(trl, health) if fault is not None: if fault: self.op_state_model.perform_action("component_fault") else: if self.component_manager.power_state is not None: self.op_state_model.perform_action( action_map[self.component_manager.power_state] ) self._health_model.update_state(fault=fault) if obsstate_changed is not None and trl is not None: self.component_manager._device_obs_state_changed(trl, obsstate_changed) if missed_event: self._missed_events += 1 def _health_changed(self: MccsController, health: HealthState) -> None: """ Handle change in this device's health state. This is a callback hook, called whenever the HealthModel's evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param health: the new health value """ if self._health_state != health: self._health_state = health self.push_change_event("healthState", health) # ---------- # Attributes # ---------- @attribute( dtype="DevString", format="%s", ) def healthModelParams(self: MccsController) -> str: """ Get the health params from the health model. :return: the health params """ return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef] def healthModelParams(self: MccsController, argin: str) -> None: """ Set the params for health transition rules. :param argin: JSON-string of dictionary of health states """ self._health_model.health_params = json.loads(argin) self._health_model.update_health()
[docs] @attribute(dtype="str") def buildState(self: MccsController) -> str: """ Read the Build State of the device. :return: the build state of the device """ return f"MCCS build state: {self._build_state}"
[docs] @attribute(dtype="str") def versionId(self: MccsController) -> str: """ Read the Version Id of the device. :return: the version id of the device """ return self._version_id
[docs] @attribute(dtype="str") def stationHealths(self: MccsController) -> str: """ Read the health of stations controlled by the device. :return: health of stations in a json format """ return json.dumps(self.component_manager.get_healths("station")["station"])
[docs] @attribute(dtype="str") def subarrayHealths(self: MccsController) -> str: """ Read the health of subarrays controlled by the device. :return: health of subarrays in a json format """ return json.dumps(self.component_manager.get_healths("subarray")["subarray"])
[docs] @attribute(dtype="str") def stationBeamHealths(self: MccsController) -> str: """ Read the health of station beams controlled by the device. :return: health of station beams in a json format """ return json.dumps(self.component_manager.get_healths("beam")["beam"])
[docs] @attribute(dtype="str") def subarrayBeamHealths(self: MccsController) -> str: """ Read the health of subarray beams controlled by the device. :return: health of subarray beams in a json format """ return json.dumps( self.component_manager.get_healths("subarraybeam")["subarraybeam"] )
[docs] @attribute(dtype="str") def subDeviceHealths(self: MccsController) -> str: """ Read the health of all subdevices controlled by the device. :return: health of subdevices in a json tree format """ return json.dumps(self.component_manager.get_healths())
[docs] @attribute(dtype="DevString") def healthReport(self: MccsController) -> str: """ Get the health report. :return: the health report. """ return self._health_model.health_report
[docs] @attribute(dtype="DevLong") def missedEvents(self: MccsController) -> int: """ Get the amount of missed change events. Some commands rely on change events from sub-devices, sometime we miss these events, this attribute keeps track of how many we know we have missed. :return: the amount of missed change events """ return self._missed_events
# -------- # Commands # --------
[docs] @command(dtype_out="DevVarLongStringArray") def StandbyFull(self: MccsController) -> DevVarLongStringArrayType: """ Put MCCS into standby mode. Some elements of SKA Mid have both low and full standby modes, but SKA Low has no such elements. We just need a Standby command, not separate StandbyLow and StandbyFull. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Standby") (result_code, status) = handler() return ([result_code], [status])
[docs] @command(dtype_out="DevVarLongStringArray") def StandbyLow(self: MccsController) -> DevVarLongStringArrayType: """ Put MCCS into standby mode. Some elements of SKA Mid have both low and full standby modes, but SKA Low has no such elements. We just need a Standby command, not separate StandbyLow and StandbyFull. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Standby") (result_code, status) = handler() return ([result_code], [status])
[docs] class AllocateCommand(SubmittedSlowCommand): """ Class for handling the Allocate() command. This command takes as input a JSON string that conforms to the following schema: schemas/MccsController_Allocate_3_0.json """ # noqa: E501 SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.controller", "MccsController_Allocate_3_0.json", ) )
[docs] def __init__( self: MccsController.AllocateCommand, command_tracker: CommandTracker, component_manager: ControllerComponentManager, callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new AllocateCommand instance. :param command_tracker: the command tracker used to track this command :param component_manager: the component manager to which this command belongs. :param callback: the callback to be called at command completion :param logger: a logger for this command to use. """ self._component_manager = component_manager validator = JsonValidator("Allocate", self.SCHEMA, logger) super().__init__( "Allocate", command_tracker, component_manager, "allocate", callback=callback, logger=logger, validator=validator, )
SUCCEEDED_MESSAGE = "Allocate command completed OK"
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def Allocate(self: MccsController, argin: str) -> DevVarLongStringArrayType: """ Allocate a set of unallocated MCCS resources to a sub-array. The JSON argument specifies the overall sub-array composition in terms of which stations should be allocated to the specified Sub-Array. :param argin: JSON-formatted string containing an integer subarray ID, and resources to be allocated to that subarray :return: A tuple containing a return code, a string message indicating status and message UID. The string message is for information purposes only, but the message UID is for message management use. :example: >>> proxy = tango.DeviceProxy("ska-low-mccs/control/control") >>> proxy.Allocate( json.dumps( { "interface": "https://schema.skao.int/ska-low-mccs-controller-allocate/3.0" "subarray_id": 1, "subarray_beams": [ { "subarray_beam_id": 3, "apertures": [ {"station_id": 1, "aperture_id": "1.1" }, {"station_id": 2, "aperture_id": "2.2" }, {"station_id": 2, "aperture_id": "2.3" }, {"station_id": 3, "aperture_id": "3.1" }, {"station_id": 4, "aperture_id": "4.1" }, ], } "number_of_channels": 32, ], } ) ) """ handler = self.get_command_object("Allocate") (result_code, message) = handler(argin) return ([result_code], [message])
[docs] @command(dtype_in=int, dtype_out="DevVarLongStringArray") def RestartSubarray(self: MccsController, argin: int) -> DevVarLongStringArrayType: """ Restart an MCCS subarray. :param argin: an integer subarray_id. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("RestartSubarray") (result_code, status) = handler(argin) return ([result_code], [status])
[docs] class ReleaseCommand(FastCommand): """ Class for handling the Release() command. This command takes as input a JSON string that conforms to the following schema: schemas/MccsController_Release_2_0.json """ # noqa: E501 SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.controller", "MccsController_Release_2_0.json", ) )
[docs] def __init__( self: MccsController.ReleaseCommand, component_manager: ControllerComponentManager, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new ReleaseCommand instance. :param component_manager: the device to which this command belongs. :param logger: a logger for this command to use. """ self._component_manager = component_manager validator = JsonValidator("Release", self.SCHEMA, logger) super().__init__(logger, validator)
SUCCEEDED_MESSAGE = "Release command completed OK"
[docs] def do( self: MccsController.ReleaseCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Implement :py:meth:`.MccsController.Release` command functionality. :param args: Positional arguments. This should be empty and is provided for type hinting purposes only. :param kwargs: keyword arguments unpacked from the JSON argument to the command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ subarray_id = kwargs.get("subarray_id", None) if subarray_id is None or kwargs.get("release_all", False): return self._component_manager.release_all() return self._component_manager.release(subarray_id)
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def Release(self: MccsController, argin: str) -> DevVarLongStringArrayType: """ Release resources from an MCCS Sub-Array. :param argin: JSON-formatted string :return: A tuple containing a return code, a string message indicating status and message UID. The string message is for information purposes only, but the message UID is for message management use. :example: >>> proxy = tango.DeviceProxy("ska-low-mccs/control/control") >>> proxy.Release( json.dumps( { "subarray_id": 1, "release_all": true } ) ) """ handler = self.get_command_object("Release") (result_code, status) = handler(argin) return ([result_code], [status])
[docs] class ReleaseAllCommand(FastCommand): """ Class for handling the ReleaseAll() command. schemas/MccsController_Release_2_0.json """ # noqa: E501
[docs] def __init__( self: MccsController.ReleaseAllCommand, component_manager: ControllerComponentManager, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new ReleaseAllCommand instance. :param component_manager: the device to which this command belongs. :param logger: a logger for this command to use. """ self._component_manager = component_manager super().__init__(logger)
SUCCEEDED_MESSAGE = "ReleaseAll command completed OK"
[docs] def do( self: MccsController.ReleaseAllCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Implement :py:meth:`.MccsController.Release` command functionality. :param args: Positional arguments. This should be empty and is provided for type hinting purposes only. :param kwargs: keyword arguments unpacked from the JSON argument to the command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return self._component_manager.release_all()
[docs] @command(dtype_out="DevVarLongStringArray") def ReleaseAll(self: MccsController) -> DevVarLongStringArrayType: """ Release all resources from an MCCS Array. :return: A tuple containing a return code, a string message indicating status and message UID. The string message is for information purposes only, but the message UID is for message management use. :example: >>> proxy = tango.DeviceProxy("ska-low-mccs/control/control") >>> proxy.ReleaseAll() """ handler = self.get_command_object("ReleaseAll") (result_code, status) = handler() return ([result_code], [status])
[docs] @command(dtype_in="DevLong", dtype_out="DevString") def GetAssignedResources(self: MccsController, subarray_id: int) -> str: """ Return a dictionary of the resources assigned to a given subarray. :param subarray_id: The subarray ID of the resources :return: json formatted dictionary """ return self.component_manager.get_resources(subarray_id)
[docs] @command(dtype_in="str", dtype_out="str") def GetHealthTrl(self: MccsController, argin: str) -> Optional[str]: """ Return health of device given by TRL. :param argin: TRL of device to return health of. :return: health of device given by TRL. """ health_trl = self.component_manager.get_health_trl(argin) if health_trl is not None: return f"{HealthState(health_trl).name}" return f"{health_trl}"
# ---------- # Run server # ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return MccsController.run_server(args=args or None, **kwargs)
if __name__ == "__main__": main()