# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module contains the SKA Low MCCS Controller device prototype."""
from __future__ import annotations # allow forward references in type hints
import importlib # allow forward references in type hints
import json
import logging
import sys
import threading
from typing import Any, Callable, Final, Optional, cast
import tango
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsState,
PowerState,
ResultCode,
)
from ska_tango_base.base import BaseComponentManager, CommandTracker, SKABaseDevice
from ska_tango_base.commands import (
DeviceInitCommand,
FastCommand,
JsonValidator,
SubmittedSlowCommand,
)
from tango.server import attribute, command, device_property
from ska_low_mccs.controller.controller_component_manager import (
ControllerComponentManager,
)
from ska_low_mccs.controller.controller_health_model import ControllerHealthModel
__all__ = ["MccsController", "main"]
DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]
# pylint: disable=too-many-instance-attributes, too-many-public-methods
[docs]class MccsController(SKABaseDevice):
"""An implementation of a controller Tango device for MCCS."""
# -----------------
# Device Properties
# -----------------
MccsSubarrays = device_property(dtype="DevVarStringArray", default_value=[])
MccsStations = device_property(dtype="DevVarStringArray", default_value=[])
MccsSubarrayBeams = device_property(dtype="DevVarStringArray", default_value=[])
MccsStationBeams = device_property(dtype="DevVarStringArray", default_value=[])
ObsCommandTimeout = device_property(
dtype="int",
default_value=60,
doc="The timeout in seconds for Observation commands.",
)
# ---------------
# Initialisation
# ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self._communication_state: Optional[CommunicationStatus]
self._num_subservients: int
self._missed_events: int
self._health_state: HealthState = HealthState.UNKNOWN
self._health_model: ControllerHealthModel
self.component_manager: ControllerComponentManager # type: ignore[assignment]
[docs] def init_device(self: MccsController) -> None:
"""
Initialise the device.
This is overridden here to change the Tango serialisation model.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
self._power_state_lock = threading.RLock()
self._communication_state = None
self._component_power_state: Optional[PowerState] = None
self._num_subservients = 0
self._missed_events = 0
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tMccsSubarrays: {self.MccsSubarrays}\n"
f"\tMccsStations: {self.MccsStations}\n"
f"\tMccsSubarrayBeams: {self.MccsSubarrayBeams}\n"
f"\tMccsStationBeams: {self.MccsStationBeams}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
def _init_state_model(self: MccsController) -> None:
"""Initialise the state model."""
super()._init_state_model()
self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late.
self._health_model = ControllerHealthModel(
self._health_changed,
[trl for trl in self.MccsStations if trl != ""],
[trl for trl in self.MccsSubarrayBeams if trl != ""],
[trl for trl in self.MccsStationBeams if trl != ""],
{
"stations_failed_threshold": 0.05,
"stations_degraded_threshold": 0.2,
"subarray_beams_failed_threshold": 0.05,
"subarray_beams_degraded_threshold": 0.2,
"station_beams_failed_threshold": 0.05,
"station_beams_degraded_threshold": 0.2,
},
)
self.set_change_event("healthState", True, False)
[docs] def create_component_manager(
self: MccsController,
) -> ControllerComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
subarrays = [trl for trl in self.MccsSubarrays if trl != ""]
stations = [trl for trl in self.MccsStations if trl != ""]
subarraybeams = [trl for trl in self.MccsSubarrayBeams if trl != ""]
stationbeams = [trl for trl in self.MccsStationBeams if trl != ""]
self._num_subservients = len(
subarrays + stations + subarraybeams + stationbeams
)
return ControllerComponentManager(
subarrays,
stations,
subarraybeams,
stationbeams,
self.logger,
self.ObsCommandTimeout,
self._communication_state_callback,
self._component_state_callback,
)
[docs] def init_command_objects(self: MccsController) -> None:
"""Set up the handler objects for Commands."""
super().init_command_objects()
#
# Fast commands
#
for command_name, command_object in [
("ReleaseAll", self.ReleaseAllCommand),
("Release", self.ReleaseCommand),
]:
self.register_command_object(
command_name,
command_object(self.component_manager, self.logger),
)
for command_name, method_name in [
("RestartSubarray", "restart_subarray"),
("GetHealthTrl", "get_health_trl"),
]:
self.register_command_object(
command_name,
SubmittedSlowCommand(
command_name,
self._command_tracker,
cast(BaseComponentManager, self.component_manager),
method_name,
callback=None,
logger=self.logger,
),
)
self.register_command_object(
"Allocate",
self.AllocateCommand(
self._command_tracker,
self.component_manager,
callback=None,
logger=self.logger,
),
)
[docs] class InitCommand(DeviceInitCommand):
"""
A class for :py:class:`~.MccsController`'s Init command.
The :py:meth:`~.MccsController.InitCommand.do` method below is
called during :py:class:`~.MccsController`'s initialisation.
"""
[docs] def do( # type: ignore[override]
self: MccsController.InitCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Initialise the attributes and properties of the `MccsController`.
:param args: positional args to the component manager method
:param kwargs: keyword args to the component manager method
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
self._device._build_state = sys.modules["ska_low_mccs"].__version_info__
self._device._version_id = sys.modules["ska_low_mccs"].__version__
return (ResultCode.OK, "Initialisation complete")
# ----------
# Callbacks
# ----------
def _communication_state_callback(
self: MccsController,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
# TODO: This method and the next are implemented to work around
# the following issue:
# * The controller component manager's communication status
# depends on the status of communication with all subservient
# devices.
# * But at present its power mode depends only on the power mode
# of stations.
# Once communication with the stations is
# established, these start reporting power mode, and if they all
# report their power mode before communications with all other
# devices is established, then the component manager ends up
# publishing a power mode change *before* is has announced that
# comms is established. This leads to problems.
# Eventually we should figure out a more elegant way to handle
# this.
self._communication_state = communication_state
if communication_state == CommunicationStatus.DISABLED:
self.op_state_model.perform_action("component_disconnected")
elif communication_state == CommunicationStatus.NOT_ESTABLISHED:
self.op_state_model.perform_action("component_unknown")
elif self._num_subservients == 0:
self.op_state_model.perform_action("component_on")
# If we had subdevices, in a callback for a power change, they would
# trigger this, since we don't have any, we'll have to trigger it manually.
self.component_manager._evaluate_power_state()
elif self._component_power_state == PowerState.OFF:
self.op_state_model.perform_action("component_off")
elif self._component_power_state == PowerState.STANDBY:
self.op_state_model.perform_action("component_standby")
elif self._component_power_state == PowerState.ON:
self.op_state_model.perform_action("component_on")
elif self._component_power_state == PowerState.UNKNOWN:
self.op_state_model.perform_action("component_unknown")
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
# pylint: disable=too-many-branches, too-many-arguments
def _component_state_callback(
self: MccsController,
fault: Optional[bool] = None,
power: Optional[PowerState] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
station_id: Optional[str] = None,
obsstate_changed: Optional[ObsState] = None,
missed_event: Optional[bool] = None,
) -> None:
"""
Handle change in the state of the component.
This is a callback hook, called by the component manager when
the state of the component changes.
:param fault: An optional flag if the device is entering or
exiting a fault state.
:param power: An optional parameter with the new power state of the device.
:param health: An optional parameter with the new health state of the device.
:param trl: The TRL of the device.
:param station_id: The station ID of the component, currently only for
station beam pooling by station.
:param obsstate_changed: An optional parameter with the new ObsState of
a subservient ObsDevice.
:param missed_event: whether the component manager has detected a missed change
event.
"""
action_map = {
PowerState.OFF: "component_off",
PowerState.STANDBY: "component_standby",
PowerState.ON: "component_on",
PowerState.UNKNOWN: "component_unknown",
}
if power is not None:
if trl is None:
if self._communication_state == CommunicationStatus.ESTABLISHED:
self.op_state_model.perform_action(action_map[power])
self._health_model.update_state(power=power)
else:
device_family = trl.split("/")[1]
if device_family in ["station"]:
with self._power_state_lock:
self.component_manager._station_power_state_changed(trl, power)
self.component_manager._device_power_states[trl] = power
self.component_manager._evaluate_power_state()
if health is not None:
if trl is None:
if self._health_state != health:
self._health_state = health
self.push_change_event("healthState", health)
else:
device_family = trl.split("/")[1]
if device_family == "subarray":
self.component_manager._subarray_health_changed(trl, health)
elif device_family == "station":
self._health_model.station_health_changed(trl, health)
elif device_family == "subarraybeam":
self.component_manager._subarray_beam_health_changed(trl, health)
self._health_model.subarray_beam_health_changed(trl, health)
elif device_family == "beam":
self.component_manager._station_beam_health_changed(trl, health)
self._health_model.station_beam_health_changed(trl, health)
if fault is not None:
if fault:
self.op_state_model.perform_action("component_fault")
else:
if self.component_manager.power_state is not None:
self.op_state_model.perform_action(
action_map[self.component_manager.power_state]
)
self._health_model.update_state(fault=fault)
if obsstate_changed is not None and trl is not None:
self.component_manager._device_obs_state_changed(trl, obsstate_changed)
if missed_event:
self._missed_events += 1
def _health_changed(self: MccsController, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state != health:
self._health_state = health
self.push_change_event("healthState", health)
# ----------
# Attributes
# ----------
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsController) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsController, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
[docs] @attribute(dtype="str")
def buildState(self: MccsController) -> str:
"""
Read the Build State of the device.
:return: the build state of the device
"""
return f"MCCS build state: {self._build_state}"
[docs] @attribute(dtype="str")
def versionId(self: MccsController) -> str:
"""
Read the Version Id of the device.
:return: the version id of the device
"""
return self._version_id
[docs] @attribute(dtype="str")
def stationHealths(self: MccsController) -> str:
"""
Read the health of stations controlled by the device.
:return: health of stations in a json format
"""
return json.dumps(self.component_manager.get_healths("station")["station"])
[docs] @attribute(dtype="str")
def subarrayHealths(self: MccsController) -> str:
"""
Read the health of subarrays controlled by the device.
:return: health of subarrays in a json format
"""
return json.dumps(self.component_manager.get_healths("subarray")["subarray"])
[docs] @attribute(dtype="str")
def stationBeamHealths(self: MccsController) -> str:
"""
Read the health of station beams controlled by the device.
:return: health of station beams in a json format
"""
return json.dumps(self.component_manager.get_healths("beam")["beam"])
[docs] @attribute(dtype="str")
def subarrayBeamHealths(self: MccsController) -> str:
"""
Read the health of subarray beams controlled by the device.
:return: health of subarray beams in a json format
"""
return json.dumps(
self.component_manager.get_healths("subarraybeam")["subarraybeam"]
)
[docs] @attribute(dtype="str")
def subDeviceHealths(self: MccsController) -> str:
"""
Read the health of all subdevices controlled by the device.
:return: health of subdevices in a json tree format
"""
return json.dumps(self.component_manager.get_healths())
[docs] @attribute(dtype="DevString")
def healthReport(self: MccsController) -> str:
"""
Get the health report.
:return: the health report.
"""
return self._health_model.health_report
[docs] @attribute(dtype="DevLong")
def missedEvents(self: MccsController) -> int:
"""
Get the amount of missed change events.
Some commands rely on change events from sub-devices, sometime we miss these
events, this attribute keeps track of how many we know we have missed.
:return: the amount of missed change events
"""
return self._missed_events
# --------
# Commands
# --------
[docs] @command(dtype_out="DevVarLongStringArray")
def StandbyFull(self: MccsController) -> DevVarLongStringArrayType:
"""
Put MCCS into standby mode.
Some elements of SKA Mid have both low and full standby modes,
but SKA Low has no such elements. We just need a Standby
command, not separate StandbyLow and StandbyFull.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("Standby")
(result_code, status) = handler()
return ([result_code], [status])
[docs] @command(dtype_out="DevVarLongStringArray")
def StandbyLow(self: MccsController) -> DevVarLongStringArrayType:
"""
Put MCCS into standby mode.
Some elements of SKA Mid have both low and full standby modes,
but SKA Low has no such elements. We just need a Standby
command, not separate StandbyLow and StandbyFull.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("Standby")
(result_code, status) = handler()
return ([result_code], [status])
[docs] class AllocateCommand(SubmittedSlowCommand):
"""
Class for handling the Allocate() command.
This command takes as input a JSON string that conforms to the
following schema:
schemas/MccsController_Allocate_3_0.json
""" # noqa: E501
SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.controller",
"MccsController_Allocate_3_0.json",
)
)
[docs] def __init__(
self: MccsController.AllocateCommand,
command_tracker: CommandTracker,
component_manager: ControllerComponentManager,
callback: Optional[Callable] = None,
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialise a new AllocateCommand instance.
:param command_tracker: the command tracker used to track this command
:param component_manager: the component manager to which
this command belongs.
:param callback: the callback to be called at command completion
:param logger: a logger for this command to use.
"""
self._component_manager = component_manager
validator = JsonValidator("Allocate", self.SCHEMA, logger)
super().__init__(
"Allocate",
command_tracker,
component_manager,
"allocate",
callback=callback,
logger=logger,
validator=validator,
)
SUCCEEDED_MESSAGE = "Allocate command completed OK"
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def Allocate(self: MccsController, argin: str) -> DevVarLongStringArrayType:
"""
Allocate a set of unallocated MCCS resources to a sub-array.
The JSON argument
specifies the overall sub-array composition in terms of which stations should be
allocated to the specified Sub-Array.
:param argin: JSON-formatted string containing an integer
subarray ID, and resources to be allocated to that subarray
:return: A tuple containing a return code, a string
message indicating status and message UID.
The string message is for information purposes only, but
the message UID is for message management use.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.Allocate(
json.dumps(
{
"interface":
"https://schema.skao.int/ska-low-mccs-controller-allocate/3.0"
"subarray_id": 1,
"subarray_beams": [
{
"subarray_beam_id": 3,
"apertures": [
{"station_id": 1, "aperture_id": "1.1" },
{"station_id": 2, "aperture_id": "2.2" },
{"station_id": 2, "aperture_id": "2.3" },
{"station_id": 3, "aperture_id": "3.1" },
{"station_id": 4, "aperture_id": "4.1" },
],
}
"number_of_channels": 32,
],
}
)
)
"""
handler = self.get_command_object("Allocate")
(result_code, message) = handler(argin)
return ([result_code], [message])
[docs] @command(dtype_in=int, dtype_out="DevVarLongStringArray")
def RestartSubarray(self: MccsController, argin: int) -> DevVarLongStringArrayType:
"""
Restart an MCCS subarray.
:param argin: an integer subarray_id.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("RestartSubarray")
(result_code, status) = handler(argin)
return ([result_code], [status])
[docs] class ReleaseCommand(FastCommand):
"""
Class for handling the Release() command.
This command takes as input a JSON string that conforms to the
following schema:
schemas/MccsController_Release_2_0.json
""" # noqa: E501
SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.controller",
"MccsController_Release_2_0.json",
)
)
[docs] def __init__(
self: MccsController.ReleaseCommand,
component_manager: ControllerComponentManager,
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialise a new ReleaseCommand instance.
:param component_manager: the device to which this command belongs.
:param logger: a logger for this command to use.
"""
self._component_manager = component_manager
validator = JsonValidator("Release", self.SCHEMA, logger)
super().__init__(logger, validator)
SUCCEEDED_MESSAGE = "Release command completed OK"
[docs] def do(
self: MccsController.ReleaseCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Implement :py:meth:`.MccsController.Release` command functionality.
:param args: Positional arguments. This should be empty and
is provided for type hinting purposes only.
:param kwargs: keyword arguments unpacked from the JSON
argument to the command.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
subarray_id = kwargs.get("subarray_id", None)
if subarray_id is None or kwargs.get("release_all", False):
return self._component_manager.release_all()
return self._component_manager.release(subarray_id)
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def Release(self: MccsController, argin: str) -> DevVarLongStringArrayType:
"""
Release resources from an MCCS Sub-Array.
:param argin: JSON-formatted string
:return: A tuple containing a return code, a string
message indicating status and message UID.
The string message is for information purposes only, but
the message UID is for message management use.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.Release(
json.dumps(
{
"subarray_id": 1,
"release_all": true
}
)
)
"""
handler = self.get_command_object("Release")
(result_code, status) = handler(argin)
return ([result_code], [status])
[docs] class ReleaseAllCommand(FastCommand):
"""
Class for handling the ReleaseAll() command.
schemas/MccsController_Release_2_0.json
""" # noqa: E501
[docs] def __init__(
self: MccsController.ReleaseAllCommand,
component_manager: ControllerComponentManager,
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialise a new ReleaseAllCommand instance.
:param component_manager: the device to which this command belongs.
:param logger: a logger for this command to use.
"""
self._component_manager = component_manager
super().__init__(logger)
SUCCEEDED_MESSAGE = "ReleaseAll command completed OK"
[docs] def do(
self: MccsController.ReleaseAllCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Implement :py:meth:`.MccsController.Release` command functionality.
:param args: Positional arguments. This should be empty and
is provided for type hinting purposes only.
:param kwargs: keyword arguments unpacked from the JSON
argument to the command.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
return self._component_manager.release_all()
[docs] @command(dtype_out="DevVarLongStringArray")
def ReleaseAll(self: MccsController) -> DevVarLongStringArrayType:
"""
Release all resources from an MCCS Array.
:return: A tuple containing a return code, a string
message indicating status and message UID.
The string message is for information purposes only, but
the message UID is for message management use.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.ReleaseAll()
"""
handler = self.get_command_object("ReleaseAll")
(result_code, status) = handler()
return ([result_code], [status])
[docs] @command(dtype_in="DevLong", dtype_out="DevString")
def GetAssignedResources(self: MccsController, subarray_id: int) -> str:
"""
Return a dictionary of the resources assigned to a given subarray.
:param subarray_id: The subarray ID of the resources
:return: json formatted dictionary
"""
return self.component_manager.get_resources(subarray_id)
[docs] @command(dtype_in="str", dtype_out="str")
def GetHealthTrl(self: MccsController, argin: str) -> Optional[str]:
"""
Return health of device given by TRL.
:param argin: TRL of device to return health of.
:return: health of device given by TRL.
"""
health_trl = self.component_manager.get_health_trl(argin)
if health_trl is not None:
return f"{HealthState(health_trl).name}"
return f"{health_trl}"
# ----------
# Run server
# ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsController.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()