# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements MCCS functionality for monitoring and control of subarrays."""
from __future__ import annotations # allow forward references in type hints
import functools
import importlib # allow forward references in type hints
import json
import logging
import sys
from typing import Any, Callable, Final, Optional
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_tango_base.base import CommandTracker
from ska_tango_base.commands import JsonValidator, SlowCommand, SubmittedSlowCommand
from ska_tango_base.obs import SKAObsDevice
from ska_tango_base.subarray import SKASubarray
from tango.server import attribute, command, device_property
from ska_low_mccs.subarray.subarray_component_manager import SubarrayComponentManager
from ska_low_mccs.subarray.subarray_health_model import SubarrayHealthModel
__all__ = ["MccsSubarray", "main"]
DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]
[docs]class MccsSubarray(SKASubarray):
"""MccsSubarray is the Tango device class for the MCCS Subarray prototype."""
# -----------------
# Device Properties
# -----------------
SubarrayId = device_property(dtype=int, default_value=0)
SkuidUrl = device_property(dtype=str, default_value="")
ObsCommandTimeout = device_property(
dtype=int,
default_value=60,
doc="The timeout in seconds for Observation commands.",
)
# ---------------
# Initialisation
# ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self.component_manager: SubarrayComponentManager
self._missed_events: int
self._health_state: HealthState = HealthState.OK
self._health_model: SubarrayHealthModel
[docs] def init_device(self: MccsSubarray) -> None:
"""Initialise the device."""
self._missed_events = 0
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tSubarrayId: {self.SubarrayId:02}\n"
f"\tSkuidUrl: {self.SkuidUrl}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
def _init_state_model(self: MccsSubarray) -> None:
super()._init_state_model()
self._health_state = (
HealthState.OK
) # health state for EMPTY is OK. InitCommand.do() does this too late.
self._health_model = SubarrayHealthModel(
self._health_changed,
ignore_power_state=True,
thresholds={
"station_degraded_threshold": 0.05,
"station_failed_threshold": 0.2,
"subarray_beam_degraded_threshold": 0.05,
"subarray_beam_failed_threshold": 0.2,
},
)
self.set_change_event("healthState", True, False)
[docs] def create_component_manager(
self: MccsSubarray,
) -> SubarrayComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return SubarrayComponentManager(
self.SubarrayId,
self.SkuidUrl,
self.logger,
self.ObsCommandTimeout,
self._communication_state_callback,
self._component_state_callback,
)
[docs] class AbortDeviceCommand(SlowCommand[tuple[ResultCode, str]]):
"""A class for MccsSubarray's AbortDevice() command."""
[docs] def __init__(
self: MccsSubarray.AbortDeviceCommand,
command_tracker: CommandTracker,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None],
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new AbortCommand instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: callback to be called when this command
states and finishes
:param logger: a logger for this command object to yuse
"""
self._command_tracker = command_tracker
self._component_manager = component_manager
super().__init__(callback=callback, logger=logger)
[docs] def do(
self: MccsSubarray.AbortDeviceCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Stateless hook for AbortDevice() command functionality.
:param args: positional arguments to the command. This
command does not take any, so this should be empty.
:param kwargs: keyword arguments to the command. This
command does not take any, so this should be empty.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
command_id = self._command_tracker.new_command(
"AbortDevice", completed_callback=self._completed
)
status, _ = self._component_manager.abort_device(
functools.partial(self._command_tracker.update_command_info, command_id)
)
assert status == TaskStatus.IN_PROGRESS
return ResultCode.STARTED, command_id
# commands with a json schema
[docs] class AssignResourcesCommand(SKASubarray.AssignResourcesCommand):
"""A class for SKASubarray's AssignResources() command."""
SCHEMA_ASSIGN_RESOURCES: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.subarray",
"MccsSubarray_AssignResources_3_0.json",
)
)
[docs] def __init__(
self: MccsSubarray.AssignResourcesCommand,
command_tracker: CommandTracker,
component_manager: SubarrayComponentManager,
callback: Optional[Callable] = None,
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
command_tracker,
component_manager,
callback=callback,
logger=logger,
schema=self.SCHEMA_ASSIGN_RESOURCES,
)
[docs] class ScanCommand(SKASubarray.ScanCommand):
"""A class for SKASubarray's Scan() command."""
SCHEMA_SCAN: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.common",
"Mccs_Scan_3_0.json",
)
)
[docs] def __init__(
self: MccsSubarray.ScanCommand,
command_tracker: CommandTracker,
component_manager: SubarrayComponentManager,
callback: Optional[Callable] = None,
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
command_tracker,
component_manager,
callback=callback,
logger=logger,
schema=self.SCHEMA_SCAN,
)
[docs] def init_command_objects(self: MccsSubarray) -> None:
"""Initialise the command handlers for commands supported by this device."""
super().init_command_objects()
def _callback(hook: str, running: bool) -> None:
action = "invoked" if running else "completed"
self.obs_state_model.perform_action(f"{hook}_{action}")
for command_name, method_name, schema in [
(
"SendTransientBuffer",
"send_transient_buffer",
None,
),
# These commands are defined in the superclass, no need to redefine
("ReleaseResources", "release_resources", None),
# ("ReleaseAllResources", "release__all_resources"),
# ("EndScan", "end_scan"),
]:
validator = (
None
if schema is None
else JsonValidator(
command_name,
schema,
logger=self.logger,
)
)
self.register_command_object(
command_name,
SubmittedSlowCommand(
command_name,
self._command_tracker,
self.component_manager,
method_name,
callback=None,
logger=self.logger,
validator=validator,
),
)
self.register_command_object(
"AbortDevice",
self.AbortDeviceCommand(
self._command_tracker,
self.component_manager,
callback=functools.partial(_callback, "abort"),
logger=self.logger,
),
)
[docs] class InitCommand(SKAObsDevice.InitCommand):
"""Command class for device initialisation."""
# pylint: disable-next=arguments-differ
[docs] def do( # type: ignore[override]
self: MccsSubarray.InitCommand,
) -> tuple[ResultCode, str]:
"""
Initialise the attributes and properties of MccsSubarray.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
self._device.set_change_event("stationTrls", True, True)
self._device.set_archive_event("stationTrls", True, True)
self._device._build_state = str(
sys.modules["ska_low_mccs"].__version_info__
)
self._device._version_id = sys.modules["ska_low_mccs"].__version__
super().do()
return (ResultCode.OK, "Init command started")
# ----------
# Callbacks
# ----------
# pylint: disable=too-many-arguments
def _component_state_callback(
self: MccsSubarray,
power: Optional[PowerState] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
resources_changed: Optional[list[set]] = None,
configured_changed: Optional[bool] = None,
scanning_changed: Optional[bool] = None,
obsfault: Optional[bool] = None,
obsstate_changed: Optional[ObsState] = None,
station_power: Optional[PowerState] = None,
task_completed: Optional[str] = None,
missed_event: Optional[bool] = None,
) -> None:
"""
Handle change in this device's state.
This is a callback hook, called whenever the state changes. It
is responsible for updating the tango side of things i.e. making
sure the attribute is up to date, and events are pushed.
:param power: An optional parameter with the new power state of the device.
:param health: An optional parameter with the new health state of this
Subarray or a subservient Station device.
:param trl: The TRL of the subservient Station device or None if the
update is for this Subarray.
:param resources_changed: An optional parameter updating the resources for
this Subarray.
:param configured_changed: An optional flag indicating that this Subarray's
configuration has changed.
:param scanning_changed: An optional flag indicating that this Subarray's
scanning state has changed.
:param task_completed: An optional parameter indicating that this Subarray has
completed a slow command. The value of this parameter indicates the finished
task and can be one of: restart, obsreset, release, assign or configure.
This parameter is used to drive the Subarray ObsState.
:param obsfault: An optional flag indicating whether this Subarray is entering
or exiting an ObsFault state.
:param obsstate_changed: An optional parameter with the new ObsState of
a subservient Station device.
:param station_power: An optional parameter with the new power state of
a subservient Station device.
:param missed_event: whether the component manager has detected a missed change
event.
"""
if health is not None:
self._health_state_changed(health, trl)
if resources_changed is not None:
station_trls = resources_changed[0]
subarray_beam_trls = resources_changed[1]
station_beam_trls = resources_changed[2]
self._resources_changed(station_trls, subarray_beam_trls, station_beam_trls)
if configured_changed is not None:
self._configuration_changed(configured_changed)
if task_completed is not None:
self.obs_state_model.perform_action(f"{task_completed}_completed")
if scanning_changed is not None:
self._scanning_changed(scanning_changed)
if obsfault is not None:
self.obs_state_model.perform_action("component_obsfault")
if obsstate_changed is not None and trl is not None:
if obsstate_changed == ObsState.FAULT:
self.component_manager._device_obs_state_fault(trl, obsstate_changed)
else:
self.component_manager._device_obs_state_changed(trl, obsstate_changed)
if station_power is not None:
self._station_power_state_changed(station_power, trl)
if missed_event:
self._missed_events += 1
def _communication_state_callback(
self: MccsSubarray,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: "component_on",
}
action = action_map[communication_state]
if action is not None:
self.op_state_model.perform_action(action)
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
def _health_state_changed(
self: MccsSubarray,
health: HealthState,
trl: Optional[str] = None,
) -> None:
"""
Handle change in this device's health state.
:param health: The new health of the device
:param trl: The TRL of the device.
"""
if trl is None:
# Do regular health update. This device called the callback.
if self._health_state != health:
self._health_state = health
self.push_change_event("healthState", health)
else:
valid_device_types: dict[str, Callable] = {
"station": self._health_model.station_health_changed,
"beam": self._health_model.station_beam_health_changed,
"subarraybeam": self._health_model.subarray_beam_health_changed,
}
# Identify and call subservient device method.
device_type = trl.split("/")[1]
if device_type in valid_device_types:
valid_device_types[device_type](trl, health)
else:
# We've somehow got a health update for a device type
# we don't manage.
msg = (
f"Received a health state changed event for device {trl} "
"which is not managed by this subarray."
)
self.logger.warning(msg)
def _health_changed(self: MccsSubarray, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state != health:
self._health_state = health
self.push_change_event("healthState", health)
def _station_power_state_changed(
self: MccsSubarray,
station_power: Any,
trl: Optional[str],
) -> None:
"""
Handle the station power state change.
:param station_power: if the station power state has changed
:param trl: TRL of the device
"""
if station_power and trl:
self.component_manager._station_power_state_changed(trl, station_power)
def _configuration_changed(self: MccsSubarray, is_configured: Any) -> None:
"""
Handle the configuration change.
:param is_configured: if the configuration has changed
"""
if is_configured:
self.obs_state_model.perform_action("component_configured")
else:
self.obs_state_model.perform_action("component_unconfigured")
def _scanning_changed(self: MccsSubarray, is_scanning: Any) -> None:
"""
Handle the configuration change.
:param is_scanning: if the scanning has changed
"""
if is_scanning:
self.obs_state_model.perform_action("component_scanning")
else:
self.obs_state_model.perform_action("component_not_scanning")
def _resources_changed(
self: MccsSubarray,
station_trls: set[str],
subarray_beam_trls: set[str],
station_beam_trls: set[str],
) -> None:
"""
Handle change in subarray resources.
This is a callback hook, called by the component manager when
the resources of the subarray changes.
:param station_trls: the TRLs of stations assigned to this subarray
:param subarray_beam_trls: the TRLs of subarray beams assigned
to this subarray
:param station_beam_trls: the TRLs of station beams assigned
to this subarray
"""
if station_trls or subarray_beam_trls or station_beam_trls:
self.obs_state_model.perform_action("component_resourced")
else:
self.obs_state_model.perform_action("component_unresourced")
self._health_model.resources_changed(
station_trls, subarray_beam_trls, station_beam_trls
)
# ----------
# Attributes
# ----------
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsSubarray) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsSubarray, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
@attribute(dtype="DevLong", format="%i")
def scanId(self: MccsSubarray) -> int:
"""
Return the scan id.
:return: the scan id
"""
scan_id = self.component_manager.scan_id
return scan_id if scan_id is not None else -1
[docs] @scanId.write # type: ignore[no-redef]
def scanId(self: MccsSubarray, scan_id) -> None:
"""
Set the scanId attribute.
:param scan_id: the new scanId
"""
self.component_manager.scan_id = scan_id
[docs] @attribute(dtype=("DevString",), max_dim_x=512, format="%s")
def stationTrls(self: MccsSubarray) -> list[str]:
"""
Return the TRLs of stations assigned to this subarray.
:return: TRLs of stations assigned to this subarray
"""
return sorted(self.component_manager.station_trls)
[docs] @attribute(dtype=("DevString",), max_dim_x=512, format="%s")
def stationBeamTrls(self: MccsSubarray) -> list[str]:
"""
Return the TRLs of station beams assigned to this subarray.
:return: TRLs of station beams assigned to this subarray
"""
return sorted(self.component_manager.station_beam_trls)
[docs] @attribute(dtype=("DevString",), max_dim_x=512, format="%s")
def subarrayBeamTrls(self: MccsSubarray) -> list[str]:
"""
Return the TRLs of subarray beams assigned to this subarray.
:return: TRLs of subarray beams assigned to this subarray
"""
return sorted(self.component_manager.subarray_beam_trls)
[docs] @attribute(dtype=("DevString"), max_dim_x=1024)
def assignedResources(self: MccsSubarray) -> str:
"""
Return this subarray's assigned resources.
:return: this subarray's assigned resources.
"""
resource_dict = self.component_manager.assigned_resources_dict
stations = [
station.rsplit("/", 1)[-1].lstrip("0")
for station in resource_dict["stations"]
]
subarray_beams = [
subarray_beam.rsplit("/", 1)[-1].lstrip("0")
for subarray_beam in resource_dict["subarray_beams"]
]
station_beams = [
station_beam.rsplit("/", 1)[-1].lstrip("0")
for station_beam in resource_dict["station_beams"]
]
channels = resource_dict["channels"]
interface = "https://schema.skao.int/ska-low-mccs-assignedresources/1.0"
return json.dumps(
{
"interface": interface,
"subarray_beam_ids": subarray_beams,
"station_beam_ids": station_beams,
"station_ids": stations,
"apertures": list(resource_dict["apertures"]),
"channels": channels,
}
)
[docs] @attribute(dtype="DevString")
def healthReport(self: MccsSubarray) -> str:
"""
Get the health report.
:return: the health report.
"""
return self._health_model.health_report
[docs] @attribute(dtype="DevLong")
def missedEvents(self: MccsSubarray) -> int:
"""
Get the amount of missed change events.
Some commands rely on change events from sub-devices, sometime we miss these
events, this attribute keeps track of how many we know we have missed.
:return: the amount of missed change events
"""
return self._missed_events
# --------
# Commands
# --------
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def ReleaseResources(self: MccsSubarray, argin: str) -> DevVarLongStringArrayType:
"""
Release resources from this subarray.
:param argin: the resources to be released
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("ReleaseResources")
(return_code, unique_id) = handler(argin)
return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def ReleaseAllResources(self: MccsSubarray) -> DevVarLongStringArrayType:
"""
Release all resources from this subarray.
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("ReleaseAllResources")
(return_code, unique_id) = handler()
return ([return_code], [unique_id])
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def Scan(
self: MccsSubarray,
argin: str,
) -> DevVarLongStringArrayType:
"""
Start scanning.
:param argin: Json string containing scan_id and start_time.
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("Scan")
(return_code, unique_id) = handler(argin)
return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def EndScan(self: MccsSubarray) -> DevVarLongStringArrayType:
"""
Stop scanning.
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("EndScan")
(return_code, unique_id) = handler()
return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def End(self: MccsSubarray) -> DevVarLongStringArrayType:
"""
Deconfigure resources.
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("End")
(return_code, unique_id) = handler()
return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def ObsReset(self: MccsSubarray) -> DevVarLongStringArrayType:
"""
Reset the observation by returning to unconfigured state.
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("ObsReset")
(return_code, unique_id) = handler()
return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def Restart(self: MccsSubarray) -> DevVarLongStringArrayType:
"""
Restart the subarray by returning to unresourced state.
:return: A tuple containing a return code and a string message
indicating status.
"""
handler = self.get_command_object("Restart")
(return_code, unique_id) = handler()
return ([return_code], [unique_id])
[docs] @command(dtype_in="DevVarLongArray", dtype_out="DevVarLongStringArray")
def SendTransientBuffer(
self: MccsSubarray, argin: list[int]
) -> DevVarLongStringArrayType:
"""
Cause the subarray to send the requested segment of the transient buffer to SDP.
The requested segment is specified by:
1. Start time (timestamp: milliseconds since UNIX epoch)
2. End time (timestamp: milliseconds since UNIX epoch)
3. Dispersion measure
Together, these parameters narrow the selection of transient
buffer data to the period of time and frequencies that are of
interest.
Additional metadata, such as the ID of a triggering Scheduling
Block, may need to be supplied to allow SDP to assign data
ownership correctly (TBD75).
:todo: This method is a stub that does nothing but return a
dummy string.
:param argin: Specification of the segment of the transient
buffer to send
:return: ASCII String that indicates status, for information
purposes only
"""
handler = self.get_command_object("SendTransientBuffer")
(result_code, unique_id) = handler(argin)
return ([result_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def AbortDevice(self: MccsSubarray) -> DevVarLongStringArrayType:
"""
Abort any long-running command such as ``Configure()`` or ``Scan()``.
This will only cancel commands on this device, not further down the hierarchy,
use Abort() for that use case.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("AbortDevice")
(result_code, message) = handler()
return ([result_code], [message])
# ----------
# Run server
# ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsSubarray.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()