# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module contains the SKA Low MCCS Controller device prototype."""
from __future__ import annotations # allow forward references in type hints
import importlib # allow forward references in type hints
import json
import sys
import threading
from typing import Any, Dict, Final, Optional, TypedDict, cast
import numpy as np
import ska_tango_base as stb
from ska_control_model import (
AdminMode,
CommunicationStatus,
HealthState,
ObsState,
PowerState,
ResultCode,
)
from ska_control_model.health_rollup import HealthRollup, HealthSummary
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.faults import CmdNotAllowedError
from ska_tango_base.long_running_commands import LRCReqType
from tango.server import attribute, command, device_property
from ska_low_mccs.controller.controller_component_manager import (
ControllerComponentManager,
)
from ska_low_mccs.controller.controller_health_model import ControllerHealthModel
__all__ = ["MccsController", "main"]
DevVarLongStringArrayType = tuple[list[ResultCode], list[str]]
class ChannelBlockInfo(TypedDict, total=False):
"""TypedDict for channel block information."""
total: int
allocated: int
available: int
allocation_details: Dict[str, int]
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines
# pylint: disable=too-many-ancestors
[docs]
class MccsController(MccsBaseDevice):
"""An implementation of a controller Tango device for MCCS."""
InitCommand = None # type: ignore[assignment]
# -----------------
# Device Properties
# -----------------
MccsSubarrays = device_property(dtype="DevVarStringArray", default_value=[])
MccsStations = device_property(dtype="DevVarStringArray", default_value=[])
MccsSubarrayBeams = device_property(dtype="DevVarStringArray", default_value=[])
MccsStationBeams = device_property(dtype="DevVarStringArray", default_value=[])
ObsCommandTimeout = device_property(
dtype="int",
default_value=60,
doc="The timeout in seconds for Observation commands.",
)
# ---------------
# Initialisation
# ---------------
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
self._use_new_health_model: bool = True
super().__init__(*args, **kwargs)
self._communication_state: Optional[CommunicationStatus]
self._num_subservients: int
self._missed_events: int
self._health_state: HealthState = HealthState.UNKNOWN
self._health_model: ControllerHealthModel
self._health_report: str = ""
self._health_rollup: HealthRollup
self.component_manager: ControllerComponentManager
[docs]
def init_device(self: MccsController) -> None:
"""Initialise the device."""
self._power_state_lock = threading.RLock()
self._communication_state = None
self._component_power_state: Optional[PowerState] = None
self._num_subservients = 0
self._missed_events = 0
super().init_device()
self._health_thresholds: dict[str, Any] = {
"subarrays": (0, 1, 1),
"stations": (
0, # All stations must fail for controller to fail
max(np.ceil(len(self.MccsStations) * 0.05), 1), # 5% failed -> Degraded
max(
np.ceil(len(self.MccsStations) * 0.05), 2
), # 5% degraded -> Degraded
),
"subarraybeams": (0, 1, 1),
"stationbeams": (0, 1, 1),
}
self._health_rollup = self._setup_health_rollup()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tMccsSubarrays: {self.MccsSubarrays}\n"
f"\tMccsStations: {self.MccsStations}\n"
f"\tMccsSubarrayBeams: {self.MccsSubarrayBeams}\n"
f"\tMccsStationBeams: {self.MccsStationBeams}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
self.init_completed()
def _init_state_model(self: MccsController) -> None:
"""Initialise the state model."""
super()._init_state_model()
self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late.
self._health_model = ControllerHealthModel(
self._old_health_changed,
[trl for trl in self.MccsStations if trl != ""],
[trl for trl in self.MccsSubarrayBeams if trl != ""],
[trl for trl in self.MccsStationBeams if trl != ""],
{
"stations_failed_threshold": 0.05,
"stations_degraded_threshold": 0.2,
"subarray_beams_failed_threshold": 0.05,
"subarray_beams_degraded_threshold": 0.2,
"station_beams_failed_threshold": 0.05,
"station_beams_degraded_threshold": 0.2,
},
)
self.set_change_event("healthState", True, False)
def _setup_health_rollup(
self: MccsController,
) -> HealthRollup:
# Rollup is based on three configurable thresholds:
# * the number of FAILED (or UNKNOWN) sources that cause health
# to roll up to overall FAILED;
# * the number of FAILED (or UNKNOWN) sources that cause health
# to roll up to overall DEGRADED;
# * the number of DEGRADED sources that cause health to roll up to
# overall DEGRADED.
# Default health thresholds:
# Subarrays
# All failed is failed.
# Any failed is degraded.
# Any degraded is degraded.
# Stations
# ~25% failed is failed.
# Any failed is degraded.
# 2+ degraded is degraded.
# SubarrayBeams
# All failed is failed.
# Any failed is degraded.
# Any degraded is degraded.
# StationBeams
# All failed is failed.
# Any failed is degraded.
# Any degraded is degraded.
# Here the "self" entry represets MccsController specific health changes
# of which there are currently none.
rollup_members = ["self"]
# TODO: Make these thresholds fully dynamic based on deployment.
thresholds = {"self": (1, 1, 1)}
if len(self.MccsSubarrays) > 0:
rollup_members.append("subarrays")
thresholds["subarrays"] = self._health_thresholds["subarrays"]
if len(self.MccsStations) > 0:
rollup_members.append("stations")
thresholds["stations"] = self._health_thresholds["stations"]
if len(self.MccsSubarrayBeams) > 0:
rollup_members.append("subarraybeams")
thresholds["subarraybeams"] = self._health_thresholds["subarraybeams"]
if len(self.MccsStationBeams) > 0:
rollup_members.append("stationbeams")
thresholds["stationbeams"] = self._health_thresholds["stationbeams"]
health_rollup = HealthRollup(
rollup_members,
thresholds["self"],
self._health_changed,
self._health_summary_changed,
)
if "subarrays" in thresholds:
health_rollup.define(
"subarrays", self.MccsSubarrays, thresholds["subarrays"]
)
if "stations" in thresholds:
health_rollup.define("stations", self.MccsStations, thresholds["stations"])
if "subarraybeams" in thresholds:
health_rollup.define(
"subarraybeams",
self.MccsSubarrayBeams,
thresholds["subarraybeams"],
)
if "stationbeams" in thresholds:
health_rollup.define(
"stationbeams", self.MccsStationBeams, thresholds["stationbeams"]
)
return health_rollup
def _redefine_health_rollup(self: MccsController) -> None:
"""
Redefine the health rollup members and thresholds.
Redefines the health rollup following a change in subdevice thresholds.
This pulls the old/current healths from the health report, instantiates
a new health_rollup instance and restores those healthstates.
"""
def _flatten_dict(d: dict[str, Any]) -> dict[str, Any]:
"""
Return a flattened dictionary given nested dicts.
Returns a flattened dictionary containing the key-value pairs
of the nested dictionaries. Where a key-value pair is itself
a dictionary this will also be flattened and the parent key
omitted.
:param d: the nested dictionary to flatten
:return: flattened dictionary.
"""
def _flatten(d: dict[str, Any]) -> dict[str, Any]:
items: list[Any] = []
for k, v in d.items():
if isinstance(v, dict):
items.extend(_flatten(v).items())
else:
items.append((k, v))
return dict(items)
return _flatten(d)
# Pull out the old healthstates.
old_report = json.loads(self._health_report)
old_subdevice_healths = _flatten_dict(old_report)
old_online = self._health_rollup.online
self._health_rollup = self._setup_health_rollup()
self._health_rollup.online = old_online
# Restore old healthstates.
for subdevice, health in old_subdevice_healths.items():
self._health_rollup.health_changed(subdevice, cast(HealthState, health))
def _update_admin_mode(self: MccsController, admin_mode: AdminMode) -> None:
super()._update_admin_mode(admin_mode)
self._health_rollup.online = admin_mode in [
AdminMode.ENGINEERING,
AdminMode.ONLINE,
]
[docs]
def create_component_manager(
self: MccsController,
) -> ControllerComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
subarrays = [trl for trl in self.MccsSubarrays if trl != ""]
stations = [trl for trl in self.MccsStations if trl != ""]
subarraybeams = [trl for trl in self.MccsSubarrayBeams if trl != ""]
stationbeams = [trl for trl in self.MccsStationBeams if trl != ""]
self._num_subservients = len(
subarrays + stations + subarraybeams + stationbeams
)
return ControllerComponentManager(
subarrays,
stations,
subarraybeams,
stationbeams,
self.logger,
self.ObsCommandTimeout,
self._communication_state_callback,
self._component_state_callback,
event_serialiser=self._event_serialiser,
)
# ----------
# Callbacks
# ----------
def _communication_state_callback(
self: MccsController,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
super()._communication_state_changed(communication_state)
if (
communication_state == CommunicationStatus.ESTABLISHED
and self._num_subservients == 0
):
self.op_state_model.perform_action("component_on")
# If we had subdevices, in a callback for a power change, they would
# trigger this, since we don't have any, we'll have to trigger it manually.
self.component_manager._evaluate_power_state()
# Old health model.
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
# New health model.
if communication_state == CommunicationStatus.ESTABLISHED:
self._component_state_callback(trl="self", health=HealthState.OK)
# pylint: disable=too-many-branches, too-many-arguments
def _component_state_callback(
self: MccsController,
fault: Optional[bool] = None,
power: Optional[PowerState] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
station_id: Optional[str] = None,
obsstate_changed: Optional[ObsState] = None,
missed_event: Optional[bool] = None,
) -> None:
"""
Handle change in the state of the component.
This is a callback hook, called by the component manager when
the state of the component changes.
:param fault: An optional flag if the device is entering or
exiting a fault state.
:param power: An optional parameter with the new power state of the device.
:param health: An optional parameter with the new health state of the device.
:param trl: The TRL of the device.
:param station_id: The station ID of the component, currently only for
station beam pooling by station.
:param obsstate_changed: An optional parameter with the new ObsState of
a subservient ObsDevice.
:param missed_event: whether the component manager has detected a missed change
event.
"""
if power is not None:
if trl is None:
self._health_model.update_state(power=power)
else:
device_family = trl.split("/")[1]
if device_family in ["station"]:
with self._power_state_lock:
self.component_manager._station_power_state_changed(trl, power)
self.component_manager._device_power_states[trl] = power
self.component_manager._evaluate_power_state()
if health is not None and trl is not None:
self._health_rollup.health_changed(source=trl, health=health)
if trl != "self":
device_family = trl.split("/")[1]
if device_family == "subarray":
self.component_manager._subarray_health_changed(
trl, health
) # Calls set_ready on subarray. Should be looking at adminMode.
elif device_family == "station":
self._health_model.station_health_changed(trl, health)
elif device_family == "subarraybeam":
self.component_manager._subarray_beam_health_changed(trl, health)
self._health_model.subarray_beam_health_changed(trl, health)
elif device_family == "beam":
self.component_manager._station_beam_health_changed(trl, health)
self._health_model.station_beam_health_changed(trl, health)
if fault is not None:
self._health_model.update_state(fault=fault)
if obsstate_changed is not None and trl is not None:
self.component_manager._device_obs_state_changed(trl, obsstate_changed)
if missed_event:
self._missed_events += 1
if trl is None:
self._component_state_changed(fault=fault, power=power)
def _health_changed(self: MccsController, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._use_new_health_model:
self._health_state = health
def _old_health_changed(self: MccsController, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if not self._use_new_health_model:
if self._health_state != health:
self._health_state = health
def _health_summary_changed(
self: MccsController, health_summary: HealthSummary
) -> None:
"""
Handle change in this device's health summary.
This is a callback hook, called whenever this device's
evaluated health summary changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health_summary: the new health summary
"""
self._health_report = json.dumps(health_summary)
def _get_channel_blocks_info(
self: MccsController, stations: dict[str, Any], allocation_manager: Any
) -> dict[str, Any]:
"""
Collect and organize detailed information about channel blocks in the system.
This method gathers information about channel blocks, which represent frequency
channels that can be allocated to stations for signal processing. It provides
a comprehensive view of:
- Total channel blocks in the system
- Which channel blocks are currently allocated and to which subarrays
- Available channel blocks that can be allocated
- Detailed breakdown of channel blocks by station
The returned data structure includes allocation counts, usage patterns,
and station-specific allocation details.
:param stations: Dictionary mapping station TRLs (Tango Resource Locators)
to their corresponding station device proxies
:param allocation_manager: The resource manager responsible for tracking
resource allocations across subarrays
:return: Dictionary with structured channel blocks information containing:
- Allocation statistics (count, usage by subarray)
- Availability information
- Station-specific breakdowns of channel block allocations
"""
# Get all channel blocks
all_channel_blocks = self._collect_all_channel_blocks(stations)
# Get allocated channel blocks and their usage
(
allocated_channel_blocks,
channel_block_usage,
) = self._get_allocated_channel_blocks(stations)
# Calculate available channel blocks
available_channel_blocks = all_channel_blocks - allocated_channel_blocks
# Group channel blocks by station
channel_blocks_by_station = self._group_channel_blocks_by_station(
all_channel_blocks, channel_block_usage
)
return {
"allocated": {
"count": len(allocated_channel_blocks),
"usage": channel_block_usage, # Grouped by subarray with station counts
},
"available": {
"count": len(available_channel_blocks),
},
"by_station": {
station_id: {
"total": info["total"],
"allocated": info["allocated"],
"available": info["available"],
"allocation_details": info["allocation_details"],
}
for station_id, info in channel_blocks_by_station.items()
},
}
def _collect_all_channel_blocks(
self: MccsController, stations: dict[str, Any]
) -> set[str]:
"""
Collect and return a set of all channel blocks from all stations in the system.
This method queries each station in the system to identify all available
channel blocks, which represent frequency channels that can be allocated
for signal processing. The result is a comprehensive inventory of all
channel blocks across all stations.
:param stations: Dictionary mapping station TRLs (Tango Resource Locators)
to their corresponding station device proxies
:return: Set of all channel block identifiers in the system, represented
as strings in the format required for resource allocation
"""
all_channel_blocks = set()
for station_trl, _ in stations.items():
station_id = station_trl.split("/")[-1]
for block in range(48): # Each station has 48 channel blocks
all_channel_blocks.add(f"{station_id}:{block}")
return all_channel_blocks
def _get_allocated_channel_blocks(
self: MccsController, stations: dict[str, Any]
) -> tuple[set[str], dict[str, dict[str, int]]]:
"""
Identify all allocated channel blocks and compile their usage information.
This method determines which channel blocks are currently allocated to
subarrays across all stations in the system. It provides both:
1. A set of all allocated channel block identifiers
2. A detailed breakdown of how these blocks are distributed across subarrays
The usage information is structured to show which subarrays are using
channel blocks from which stations, and how many blocks are allocated
to each subarray from each station.
:param stations: Dictionary mapping station TRLs (Tango Resource Locators)
to their corresponding station device proxies
:return: A tuple containing:
- A set of all allocated channel block identifiers
- A nested dictionary mapping subarray IDs to station IDs to counts,
showing how many channel blocks from each station are allocated
to each subarray
"""
allocated_channel_blocks = set()
# New structure: {subarray_station_key: {station_id: count}}
channel_block_usage: dict[str, dict[str, int]] = {}
# For each station, check its resource manager for allocations
for station_trl, station_proxy in stations.items():
station_id = station_trl.split("/")[-1]
station_resource_manager = station_proxy._resource_manager
# Check allocations for each subarray
for subarray_trl in self.MccsSubarrays:
# Get resources allocated to this subarray
allocated_resources = station_resource_manager.get_allocated(
subarray_trl
)
# Process channel blocks if any are allocated
if "channel_blocks" in allocated_resources:
# Initialize if not exists
if subarray_trl not in channel_block_usage:
channel_block_usage[subarray_trl] = {}
# Initialize station count if not exists
if station_id not in channel_block_usage[subarray_trl]:
channel_block_usage[subarray_trl][station_id] = 0
# Add blocks to the count
channel_block_usage[subarray_trl][station_id] += len(
allocated_resources["channel_blocks"]
)
# Add individual blocks to the allocated set
for block in allocated_resources["channel_blocks"]:
block_id = f"{station_id}:{block}"
allocated_channel_blocks.add(block_id)
return allocated_channel_blocks, channel_block_usage
def _group_channel_blocks_by_station(
self: MccsController,
all_channel_blocks: set[str],
channel_block_usage: dict[str, dict[str, int]],
) -> dict[str, ChannelBlockInfo]:
"""
Group channel blocks by station for better readability.
:param all_channel_blocks: Set of all channel blocks
:param channel_block_usage: Dict mapping subarray+station to station counts
:return: Dictionary with channel blocks grouped by station
"""
# Define a type where each station has specific fields with their own types
channel_blocks_by_station: dict[str, ChannelBlockInfo] = {}
# Initialize the structure with all blocks
for block_id in all_channel_blocks:
station_id, _ = block_id.split(":")
if station_id not in channel_blocks_by_station:
channel_blocks_by_station[station_id] = {
"total": 0,
"allocated": 0,
"available": 0,
"allocation_details": {},
}
# Increment the total count
total = channel_blocks_by_station[station_id]["total"]
channel_blocks_by_station[station_id]["total"] = total + 1
# Process allocations - calculate total allocated blocks per station
for subarray_station_key, station_counts in channel_block_usage.items():
for station_id, count in station_counts.items():
if station_id in channel_blocks_by_station:
# Ensure allocated is an integer before incrementing
if not isinstance(
channel_blocks_by_station[station_id]["allocated"], int
):
channel_blocks_by_station[station_id]["allocated"] = 0
allocated = channel_blocks_by_station[station_id]["allocated"]
channel_blocks_by_station[station_id]["allocated"] = (
allocated + count
)
# Use the full TRL for consistency with other resource types
# subarray_station_key is the full TRL like "low-mccs/subarray/01"
subarray_fqdn = subarray_station_key
# Ensure allocation_details is a dictionary
if (
"allocation_details"
not in channel_blocks_by_station[station_id]
):
channel_blocks_by_station[station_id]["allocation_details"] = {}
allocation_details = channel_blocks_by_station[station_id][
"allocation_details"
]
# Now we can safely assign to the dictionary
allocation_details[subarray_fqdn] = count
# Calculate available blocks
for station_id, station_data in channel_blocks_by_station.items():
# Ensure total and allocated are integers
total = (
int(station_data["total"])
if isinstance(station_data["total"], int)
else 0
)
allocated = (
int(station_data["allocated"])
if isinstance(station_data["allocated"], int)
else 0
)
station_data["available"] = total - allocated
return channel_blocks_by_station
def _get_station_beams_info(
self: MccsController, resource_pool: Any, allocation_manager: Any
) -> dict[str, Any]:
"""
Collect and organize detailed information about all station beams in the system.
Station beams are beamforming resources that process signals from multiple tiles
within a station. This method provides a comprehensive view of:
- Total station beams available in the system
- Current allocation status of station beams to subarrays
- Available station beams that can be allocated
- Distribution of station beams across different stations
The information is organized hierarchically to show both system-wide statistics
and station-specific details, making it useful for resource management and
diagnostic purposes.
:param resource_pool: The pool of resources managed by the controller,
containing information about all available station beams
:param allocation_manager: The resource manager responsible for tracking
current allocations of station beams across subarrays
:return: Dictionary with structured station beam information containing:
- Total counts (system-wide and per-station)
- Allocation statistics by subarray
- Availability information by station
- Detailed allocation mappings
"""
# Get all station beams
all_station_beams, station_beams_by_station = self._collect_all_station_beams(
resource_pool
)
# Get allocated station beams and their usage
allocated_station_beams, station_beam_usage = self._get_allocated_station_beams(
allocation_manager
)
# Update station allocation data
self._update_station_beam_allocation(
allocated_station_beams, station_beams_by_station, station_beam_usage
)
# Count station beams allocated to each subarray
subarray_allocations = self._count_beams_by_subarray(station_beam_usage)
# Create result dictionaries
result = {}
station_data_maps = self._create_station_data_maps(station_beams_by_station)
# Build the final result structure
result["total"] = {
"count": len(all_station_beams),
"by_station": station_data_maps["total"],
}
result["allocated"] = {
"count": len(allocated_station_beams),
"by_subarray": subarray_allocations,
}
result["available"] = {
"count": sum(station_data_maps["available"].values()),
"by_station": station_data_maps["available"],
}
return result
def _update_station_beam_allocation(
self: MccsController,
allocated_station_beams: set[str],
station_beams_by_station: dict[str, ChannelBlockInfo],
station_beam_usage: dict[str, str],
) -> None:
"""
Update station beam allocation data.
:param allocated_station_beams: Set of allocated station beams
:param station_beams_by_station: Dictionary of station beam data by station
:param station_beam_usage: Dictionary mapping beam IDs to subarray TRLs
"""
# Reset allocated counts and allocation
# details for each station before recounting
for station_data in station_beams_by_station.values():
station_data["allocated"] = 0
station_data["allocation_details"] = {}
# Update allocation counts and details based on current allocations
for beam_id in allocated_station_beams:
beam_str = str(beam_id)
station_id = beam_str.rsplit("/", maxsplit=1)[-1].rsplit("-", maxsplit=1)[0]
if station_id in station_beams_by_station:
station_beams_by_station[station_id]["allocated"] += 1
# Track allocation details by subarray
subarray_trl = station_beam_usage.get(beam_str, "unknown")
if (
subarray_trl
not in station_beams_by_station[station_id]["allocation_details"]
):
station_beams_by_station[station_id]["allocation_details"][
subarray_trl
] = 0
station_beams_by_station[station_id]["allocation_details"][
subarray_trl
] += 1
# Calculate available beams by station
for station_data in station_beams_by_station.values():
station_data["available"] = (
station_data["total"] - station_data["allocated"]
)
def _count_beams_by_subarray(
self: MccsController, station_beam_usage: dict
) -> dict:
"""
Count station beams allocated to each subarray.
:param station_beam_usage: Dictionary mapping beam IDs to subarray IDs
:return: Dictionary of beam counts by subarray
"""
subarray_allocations = {}
for _, subarray_id in station_beam_usage.items():
if subarray_id not in subarray_allocations:
subarray_allocations[subarray_id] = 0
subarray_allocations[subarray_id] += 1
return subarray_allocations
def _create_station_data_maps(
self: MccsController, station_beams_by_station: dict
) -> dict:
"""
Create maps of station data for total and available beams.
:param station_beams_by_station: Dictionary of station beam data by station
:return: Dictionary containing maps for total and available beams by station
"""
result: dict[str, dict[str, int]] = {"total": {}, "available": {}}
for station_id, station_data in station_beams_by_station.items():
result["total"][station_id] = station_data["total"]
result["available"][station_id] = station_data["available"]
return result
def _collect_all_station_beams(
self: MccsController, resource_pool: Any
) -> tuple[set[str], dict[str, ChannelBlockInfo]]:
"""
Collect all station beams from the resource pool and organize them by station.
This method retrieves info about all station beams available in the system
from the resource pool. Station beams are beamforming resources that process
signals from multiple tiles within a station. The method provides:
1. A complete set of all station beam identifiers in the system
2. A mapping of station IDs to detailed information about station beams,
including counts of total, allocated, and available beams
This information forms the foundation for tracking allocation status and
availability of station beams across the telescope.
:param resource_pool: The pool of resources managed by the controller,
containing information about all available station beams
:return: A tuple containing:
- A set of all station beam identifiers in the system
- A dictionary mapping station IDs to dictionaries containing:
* total: Total number of beams for this station
* allocated: Number of beams currently allocated
* available: Number of beams available for allocation
* allocation_details: Details about which subarrays are using the beams
"""
all_station_beams = set()
station_beams_by_station: Dict[str, ChannelBlockInfo] = {}
if (
hasattr(resource_pool, "_resources")
and "station_beams" in resource_pool._resources
):
all_station_beams = set(resource_pool._resources["station_beams"])
# Group station beams by station
for beam in all_station_beams:
beam_str = str(beam)
station_id = beam_str.rsplit("/", maxsplit=1)[-1].rsplit(
"-", maxsplit=1
)[0]
if station_id not in station_beams_by_station:
station_beams_by_station[station_id] = {
"total": 0,
"allocated": 0,
"available": 0,
"allocation_details": {},
}
# Increment the total count
station_beams_by_station[station_id]["total"] += 1
return all_station_beams, station_beams_by_station
def _get_allocated_station_beams(
self: MccsController,
allocation_manager: Any,
) -> tuple[set[str], dict[str, str]]:
"""
Identify all allocated station beams and compile their usage information.
This method determines which station beams are currently allocated to
subarrays across the system. It queries the resource manager to find
all station beams that have been assigned to subarrays and creates:
1. A set of all allocated station beam identifiers
2. A mapping showing which subarray each station beam is allocated to
This information is essential for tracking resource utilization and
ensuring proper resource management across the telescope.
:param allocation_manager: The resource manager responsible for tracking
current allocations of station beams across subarrays
:return: A tuple containing:
- A set of all allocated station beam identifiers
- A dictionary mapping station beam identifiers to the subarray TRLs
they are allocated to
"""
allocated_station_beams: set[str] = set()
station_beam_usage: dict[str, str] = {}
# Process allocations for each subarray
for subarray_trl in self.MccsSubarrays:
# Get resources allocated to this subarray
allocated_resources = allocation_manager.get_allocated(subarray_trl)
# Process station beams if any are allocated
if "station_beams" in allocated_resources:
# Get the actual list of station beams, not the string representation
resource_list = allocated_resources["station_beams"]
if isinstance(resource_list, str):
# If it's a string, try to parse it as JSON
try:
resource_list = json.loads(resource_list)
except json.JSONDecodeError:
# If it can't be parsed as JSON, treat it as a single resource
resource_list = [resource_list]
# Ensure resource_list is iterable (list, tuple, etc.)
if not isinstance(resource_list, (list, tuple, set)):
resource_list = [resource_list]
# Convert each item to string and update the sets/dictionaries
str_resources = [str(r) for r in resource_list]
allocated_station_beams.update(str_resources)
for beam_str in str_resources:
station_beam_usage[beam_str] = subarray_trl
return allocated_station_beams, station_beam_usage
def _get_subarray_beams_info(
self: MccsController, allocation_manager: Any
) -> dict[str, Any]:
"""
Collect and organize detailed information about subarray beams in the system.
Subarray beams are higher-level beams that combine data from multiple station
beams to form coherent beams for scientific observations. This method provides:
- Total count of subarray beams in the system
- Current allocation status of subarray beams to specific subarrays
- Available subarray beams that can be allocated for observations
- Detailed resource identifiers for available beams
This information is critical for understanding the system's capacity to support
multiple concurrent observations and for diagnosing allocation issues.
:param allocation_manager: The resource manager responsible for tracking
current allocations of subarray beams across subarrays
:return: Dictionary with structured subarray beam information containing:
- Total count of subarray beams in the system
- Allocation statistics (count and usage by subarray)
- Availability information including specific resource identifiers
- Allocation mapping showing which beams are assigned to which subarrays
"""
# Get all subarray beams
all_subarray_beams = set()
for trl in self.component_manager._subarray_beams.keys():
all_subarray_beams.add(trl)
# Get allocated subarray beams and their usage
(
allocated_subarray_beams,
subarray_beam_usage,
) = self._get_allocated_subarray_beams(allocation_manager)
# Calculate available subarray beams
available_subarray_beams = all_subarray_beams - allocated_subarray_beams
return {
"total": {
"count": len(all_subarray_beams),
},
"allocated": {
"count": len(allocated_subarray_beams),
"usage": subarray_beam_usage,
},
"available": {
"count": len(available_subarray_beams),
"resources": [str(r) for r in available_subarray_beams],
},
}
def _get_allocated_subarray_beams(
self: MccsController, allocation_manager: Any
) -> tuple[set[str], dict[str, str]]:
"""
Identify all allocated subarray beams and compile their usage information.
This method determines which subarray beams are currently allocated to
specific subarrays for observations. Subarray beams are higher-level beams
that combine data from multiple station beams to form coherent beams for
scientific observations.
The method queries the resource manager to find all subarray beams that
have been assigned and creates:
1. A set of all allocated subarray beam identifiers
2. A mapping showing which subarray each beam is allocated to
This information is essential for tracking resource utilization and
ensuring proper resource management across the telescope.
:param allocation_manager: The resource manager responsible for tracking
current allocations of subarray beams across subarrays
:return: A tuple containing:
- A set of all allocated subarray beam identifiers
- A dictionary mapping subarray beam identifiers to the subarray TRLs
they are allocated to
"""
allocated_subarray_beams = set()
subarray_beam_usage = {}
# Process allocations for each subarray
for subarray_trl in self.MccsSubarrays:
# Get resources allocated to this subarray
allocated_resources = allocation_manager.get_allocated(subarray_trl)
# Process subarray beams if any are allocated
if "subarray_beams" in allocated_resources:
# Get the actual list of subarray beams
resource_list = allocated_resources["subarray_beams"]
if isinstance(resource_list, str):
# If it's a string, try to parse it as JSON
try:
resource_list = json.loads(resource_list)
except json.JSONDecodeError:
# If it can't be parsed as JSON, treat it as a single resource
resource_list = [resource_list]
# Ensure resource_list is iterable (list, tuple, etc.)
if not isinstance(resource_list, (list, tuple, set)):
resource_list = [resource_list]
# Convert each item to string and update the sets/dictionaries
str_resources = [str(r) for r in resource_list]
allocated_subarray_beams.update(str_resources)
for beam_str in str_resources:
subarray_beam_usage[beam_str] = subarray_trl
return allocated_subarray_beams, subarray_beam_usage
# ----------
# Attributes
# ----------
[docs]
@attribute(
dtype="DevString",
format="%s",
doc="Detailed summary of all resources in the system",
)
def resourceSummary(self: MccsController) -> str:
"""
Return a detailed summary of all resources managed by the controller.
This attribute provides comprehensive information about the allocation and
availability status of all resources in the system, structured as a JSON object.
The summary includes:
1) Total resources present in the system (inventory of all managed resources)
2) Resources currently in use, including which subarray they're allocated to
3) Available resources that are not currently allocated
4) Detailed breakdown of resources by station/location
Resources tracked include:
- Channel blocks: Frequency channels allocated to stations for processing
- Station beams: Beamforming resources that process signals from tiles
- Subarray beams: Higher-level beams that combine data from station beams
This attribute is useful for:
- Monitoring system capacity and utilization
- Diagnosing resource allocation issues
- Planning observations based on available resources
- Verifying the success of allocation/release operations
:return: A JSON-formatted string containing detailed resource information
organized by resource type with allocation statistics
"""
# Get resource managers
controller_resource_manager = self.component_manager._resource_manager
allocation_manager = controller_resource_manager._resource_manager
resource_pool = controller_resource_manager.resource_pool
stations = self.component_manager._stations
# Format the summary with all resource types
summary = {
"resources": {
"channel_blocks": self._get_channel_blocks_info(
stations, allocation_manager
),
"station_beams": self._get_station_beams_info(
resource_pool, allocation_manager
),
"subarray_beams": self._get_subarray_beams_info(allocation_manager),
}
}
return json.dumps(summary, indent=2, ensure_ascii=False)
@attribute(dtype="DevBoolean")
def useNewHealthModel(self: MccsController) -> bool:
"""
Return a flag indicating whether this controller is using the new health model.
:return: a flag indicating whether this controller is currently
using the new health model.
"""
return self._use_new_health_model
[docs]
@useNewHealthModel.write # type: ignore[no-redef]
def useNewHealthModel(self: MccsController, argin: bool) -> None:
"""
Set a flag indicating whether this controller is using the new health model.
:param argin: a flag indicating whether this controller is currently
using the new health model.
"""
self._use_new_health_model = argin
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsController) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
if self._use_new_health_model:
self.logger.warning(
"These are the thresholds for the old health model. "
"New health model is currently in use. "
"To see new health model thresholds use healthThresholds."
)
return json.dumps(self._health_model.health_params)
[docs]
@healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsController, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
if self._use_new_health_model:
self.logger.warning(
"New health model is in use. "
"These thresholds are for the old health model."
"Thresholds will be updated but will not "
"be used unless the old health model is activated. "
"To update new health model thresholds use healthThresholds."
)
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
@attribute(
dtype="DevString",
format="%s",
)
def healthThresholds(self: MccsController) -> str:
"""
Get the health params from the health model.
Default health thresholds:
"devices": (f2f, d2f, d2d),
tuple(int, int, int): Number of devices failed before health failed,
Number of devices degraded before health failed,
Number of devices degraded before health degraded
Valid devices are: "subarrays", "stations", "subarraybeams", "stationbeams".
:return: the health params
"""
if not self._use_new_health_model:
self.logger.warning(
"These are thresholds used by the new health model. "
"Old health model is in use. "
"To see old health model thresholds use healthModelParams."
)
return json.dumps(self._health_thresholds)
[docs]
@healthThresholds.write # type: ignore[no-redef]
def healthThresholds(self: MccsController, argin: str) -> None:
"""
Set the params for health transition rules.
Default health thresholds:
"devices": (f2f, d2f, d2d),
tuple(int, int, int): Number of devices failed before health failed,
Number of devices degraded before health failed,
Number of devices degraded before health degraded
Valid devices are: "subarrays", "stations", "subarraybeams", "stationbeams".
:param argin: JSON-string of dictionary of health thresholds
"""
if not self._use_new_health_model:
self.logger.warning(
"Old health model is in use. "
"These thresholds are for the new health model. "
"Thresholds will be updated but will not be used unless the "
"new health model is activated. "
"To update old health model thresholds use healthModelParams."
)
thresholds = json.loads(argin)
for key, threshold in thresholds.items():
if key not in self._health_thresholds:
self.logger.info(
"Invalid Key Supplied: %s. Allowed keys: %s",
key,
self._health_thresholds.keys(),
)
continue
self._health_thresholds[key] = threshold
# TODO: Modify rollup classes to allow this.
# Redefine health thresholds if needed.
# if key == "subarrays":
# self._health_rollup.define(
# "subarrays", self.MccsSubarrays, thresholds["subarrays"]
# )
# if key == "stations":
# self._health_rollup.define(
# "stations", self.MccsStations, thresholds["stations"]
# )
# if key == "subarraybeams":
# self._health_rollup.define(
# "subarraybeams",
# self.MccsSubarrayBeams,
# thresholds["subarraybeams"],
# )
# if key == "stationbeams":
# self._health_rollup.define(
# "stationbeams", self.MccsStationBeams, thresholds["stationbeams"]
# )
# If we changed thresholds for subdevices, redefine health rollup.
if any(
subdevice in thresholds
for subdevice in ["subarrays", "stations", "subarraybeams", "stationbeams"]
):
self.logger.info("Reconfiguring subdevice health thresholds.")
self._redefine_health_rollup()
# If old health model is around, update it too.
if self._health_model is not None:
self._health_model.health_params = (
self._health_model.health_params | self._health_thresholds
)
[docs]
@attribute(dtype="str")
def buildState(self: MccsController) -> str:
"""
Read the Build State of the device.
:return: the build state of the device
"""
return f"MCCS build state: {self._build_state}"
[docs]
@attribute(dtype="str")
def versionId(self: MccsController) -> str:
"""
Read the Version Id of the device.
:return: the version id of the device
"""
return self._version_id
[docs]
@attribute(dtype="str")
def stationHealths(self: MccsController) -> str:
"""
Read the health of stations controlled by the device.
:return: health of stations in a json format
"""
return json.dumps(self.component_manager.get_healths("station")["station"])
[docs]
@attribute(dtype="str")
def subarrayHealths(self: MccsController) -> str:
"""
Read the health of subarrays controlled by the device.
:return: health of subarrays in a json format
"""
return json.dumps(self.component_manager.get_healths("subarray")["subarray"])
[docs]
@attribute(dtype="str")
def stationBeamHealths(self: MccsController) -> str:
"""
Read the health of station beams controlled by the device.
:return: health of station beams in a json format
"""
return json.dumps(self.component_manager.get_healths("beam")["beam"])
[docs]
@attribute(dtype="str")
def subarrayBeamHealths(self: MccsController) -> str:
"""
Read the health of subarray beams controlled by the device.
:return: health of subarray beams in a json format
"""
return json.dumps(
self.component_manager.get_healths("subarraybeam")["subarraybeam"]
)
[docs]
@attribute(dtype="str")
def subDeviceHealths(self: MccsController) -> str:
"""
Read the health of all subdevices controlled by the device.
:return: health of subdevices in a json tree format
"""
return json.dumps(self.component_manager.get_healths())
[docs]
@attribute(dtype="DevString")
def healthReport(self: MccsController) -> str:
"""
Get the health report.
:return: the health report.
"""
if self._use_new_health_model:
return self._health_report
return self._health_model.health_report
[docs]
@attribute(dtype="DevLong")
def missedEvents(self: MccsController) -> int:
"""
Get the amount of missed change events.
Some commands rely on change events from sub-devices, sometime we miss these
events, this attribute keeps track of how many we know we have missed.
:return: the amount of missed change events
"""
return self._missed_events
# --------
# Commands
# --------
[docs]
def is_On_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the On command is allowed.
:param request_type: The request type
:return: False if command is not allowed by base class
:raises CmdNotAllowedError: if there are no subservient devices
"""
if not super().is_On_allowed(request_type=request_type):
return False
if not self.component_manager._stations:
raise CmdNotAllowedError("No subservient devices to turn on")
return True
[docs]
@stb.long_running_commands.submit_lrc_task
def execute_On(self) -> stb.type_hints.TaskFunctionType:
"""Put the Controller ON.
:return: A tuple containing a return code and a string message
indicating status.
"""
return self.component_manager.on
[docs]
def is_Standby_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the Standby command is allowed.
:param request_type: The request type
:return: False if command is not allowed by base class
:raises CmdNotAllowedError: if there are no subservient devices
"""
if not super().is_Standby_allowed(request_type=request_type):
return False
if not self.component_manager._stations:
raise CmdNotAllowedError("No subservient devices to put into standby")
return True
[docs]
@stb.long_running_commands.submit_lrc_task
def execute_Standby(self) -> stb.type_hints.TaskFunctionType:
"""Put the Controller into STANDBY.
:return: A tuple containing a return code and a string message
indicating status.
"""
return self.component_manager.standby
[docs]
def is_Off_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the OFF command is allowed.
:param request_type: The request type
:return: True if OFF command is allowed
:raises CmdNotAllowedError: if there are no devices to turn off
"""
if not super().is_Off_allowed(request_type=request_type):
return False
if not self.component_manager._stations:
raise CmdNotAllowedError("No subservient devices to turn off")
return True
[docs]
@stb.long_running_commands.submit_lrc_task
def execute_Off(self) -> stb.type_hints.TaskFunctionType:
"""Put the Controller OFF.
:return: A tuple containing a return code and a string message
indicating status.
"""
return self.component_manager.off
[docs]
def is_StandbyFull_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the StandbyFull command is allowed.
:param request_type: The request type
:return: False if command is not allowed by base class
:raises CmdNotAllowedError: if there are no subservient devices
"""
if not super().is_Standby_allowed(request_type=request_type):
return False
if not self.component_manager._stations:
raise CmdNotAllowedError("No subservient devices to put into standby")
return True
[docs]
@command(dtype_out="DevVarLongStringArray")
def StandbyFull(self: MccsController) -> DevVarLongStringArrayType:
"""
Put MCCS into standby mode.
Some elements of SKA Mid have both low and full standby modes,
but SKA Low has no such elements. We just need a Standby
command, not separate StandbyLow and StandbyFull.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.execute_Standby()
[docs]
def is_StandbyLow_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the StandbyLow command is allowed.
:param request_type: The request type
:return: False if command is not allowed by base class
:raises CmdNotAllowedError: if there are no subservient devices
"""
if not super().is_Standby_allowed(request_type=request_type):
return False
if not self.component_manager._stations:
raise CmdNotAllowedError("No subservient devices to put into standby")
return True
[docs]
@command(dtype_out="DevVarLongStringArray")
def StandbyLow(self: MccsController) -> DevVarLongStringArrayType:
"""
Put MCCS into standby mode.
Some elements of SKA Mid have both low and full standby modes,
but SKA Low has no such elements. We just need a Standby
command, not separate StandbyLow and StandbyFull.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.execute_Standby()
Allocate_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.controller",
"MccsController_Allocate_3_0.json",
)
)
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def Allocate(
self: MccsController,
subarray_id: int,
subarray_beams: list[dict] | None = None,
interface: str | None = None,
) -> stb.type_hints.TaskFunctionType:
"""
Allocate a set of unallocated MCCS resources to a sub-array.
The JSON argument specifies the overall sub-array composition in terms of
which stations should be allocated to the specified Sub-Array.
:param subarray_id: integer subarray ID
:param subarray_beams: list of subarray beam allocation dicts
:param interface: schema interface version string
:return: A tuple containing a return code, a string
message indicating status and message UID.
The string message is for information purposes only, but
the message UID is for message management use.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.Allocate(
json.dumps(
{
"interface":
"https://schema.skao.int/ska-low-mccs-controller-allocate/3.0"
"subarray_id": 1,
"subarray_beams": [
{
"subarray_beam_id": 3,
"apertures": [
{"station_id": 1, "aperture_id": "1.1" },
{"station_id": 2, "aperture_id": "2.2" },
{"station_id": 2, "aperture_id": "2.3" },
{"station_id": 3, "aperture_id": "3.1" },
{"station_id": 4, "aperture_id": "4.1" },
],
}
"number_of_channels": 32,
],
}
)
)
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.allocate(
task_callback=task_callback,
subarray_id=subarray_id,
subarray_beams=subarray_beams or [],
interface=interface,
)
return task
[docs]
def is_RestartSubarray_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the Release command is allowed.
:param request_type: The request type
:return: False if command is not allowed by base class
:raises CmdNotAllowedError: if there are no subservient devices
"""
if not self.component_manager._subarrays:
raise CmdNotAllowedError("No subservient subarray devices to restart")
return True
[docs]
@stb.long_running_commands.long_running_command(dtype_in=int)
def RestartSubarray(
self: MccsController, argin: int
) -> stb.type_hints.TaskFunctionType:
"""
Restart an MCCS subarray.
:param argin: an integer subarray_id.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.restart_subarray(argin, task_callback=task_callback)
return task
[docs]
@command(dtype_in=int, dtype_out="DevVarLongStringArray")
def AbortSubarray(self: MccsController, argin: int) -> DevVarLongStringArrayType:
"""
Abort an MCCS subarray.
:param argin: an integer subarray_id.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
command_id, task_callback = self.allocate_lrc("AbortSubarray")
status, message = self.component_manager.abort_subarray(
argin, task_callback=task_callback
)
return self.convert_submission_result_to_lrc_return(command_id, status, message)
[docs]
def is_Release_allowed(
self,
request_type: LRCReqType | None = LRCReqType.ENQUEUE_REQ,
) -> bool:
"""
Return whether the Release command is allowed.
:param request_type: The request type
:return: False if command is not allowed by base class
:raises CmdNotAllowedError: if there are no subservient devices
"""
if not self.component_manager._subarrays:
raise CmdNotAllowedError("No subservient subarray devices to release")
return True
Release_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.controller",
"MccsController_Release_2_0.json",
)
)
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def Release(
self: MccsController,
subarray_id: int,
interface: str | None = None,
) -> stb.type_hints.TaskFunctionType:
"""
Release resources from an MCCS Sub-Array.
:param subarray_id: integer ID of the subarray to release
:param interface: schema interface version string
:return: A tuple containing a return code, a string
message indicating status and message UID.
The string message is for information purposes only, but
the message UID is for message management use.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.Release(
json.dumps(
{
"subarray_id": 1
}
)
)
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.release(
subarray_id=subarray_id,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
def ReleaseAll(self: MccsController) -> stb.type_hints.TaskFunctionType:
"""
Release all resources from an MCCS Array.
:return: A tuple containing a return code, a string
message indicating status and message UID.
The string message is for information purposes only, but
the message UID is for message management use.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.ReleaseAll()
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> Any:
return self.component_manager.release_all(
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@command(dtype_in="DevLong", dtype_out="DevString")
def GetAssignedResources(self: MccsController, subarray_id: int) -> str:
"""
Return a dictionary of the resources assigned to a given subarray.
:param subarray_id: The subarray ID of the resources
:return: json formatted dictionary
"""
return self.component_manager.get_resources(subarray_id)
[docs]
@command(dtype_in="str", dtype_out="str")
def GetHealthTrl(self: MccsController, argin: str) -> Optional[str]:
"""
Return health of device given by TRL.
:param argin: TRL of device to return health of.
:return: health of device given by TRL.
"""
health_trl = self.component_manager.get_health_trl(argin)
if health_trl is not None:
return f"{HealthState(health_trl).name}"
return f"{health_trl}"
TriggerAdcEqualisation_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.controller",
"MccsController_TriggerAdcEqualisation_3_0.json",
)
)
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def TriggerAdcEqualisation(
self: MccsController, station_args: list[dict]
) -> stb.type_hints.TaskFunctionType:
"""
Trigger ADC equalisation.
:param station_args: list of per-station dicts
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purposes only.
:example:
>>> proxy = tango.DeviceProxy("ska-low-mccs/control/control")
>>> proxy.TriggerAdcEqualisation(
json.dumps(
{
"station_args":
[
{
station_id: 1,
target_adc: 17,
bias: 1,
},
{
station_id: 2,
target_adc: 1,
},
]
}
)
)
>>> # For all stations:
>>> proxy.TriggerAdcEqualisation(
json.dumps("")
)
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> Any:
return self.component_manager.trigger_adc_equalisation(
task_callback=task_callback,
task_abort_event=task_abort_event,
station_args=station_args,
)
return task
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsController.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()