# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements MCCS functionality for monitoring and control of subarrays."""
from __future__ import annotations # allow forward references in type hints
import importlib # allow forward references in type hints
import json
import sys
import threading
from typing import Any, Callable, Final, Optional
import ska_tango_base as stb
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsState,
PowerState,
TaskStatus,
)
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.subarray import SKASubarray
from tango.server import attribute, device_property
from ska_low_mccs.subarray.subarray_component_manager import SubarrayComponentManager
from ska_low_mccs.subarray.subarray_health_model import SubarrayHealthModel
__all__ = ["MccsSubarray", "main"]
# pylint: disable=too-many-ancestors, too-many-public-methods, too-many-ancestors
[docs]
class MccsSubarray(MccsBaseDevice, SKASubarray):
"""MccsSubarray is the Tango device class for the MCCS Subarray prototype."""
InitCommand = None # type: ignore[assignment]
# -----------------
# Device Properties
# -----------------
SubarrayId = device_property(dtype=int, default_value=0)
SkuidUrl = device_property(dtype=str, default_value="")
ObsCommandTimeout = device_property(
dtype=int,
default_value=30,
doc="The timeout in seconds for Observation commands.",
)
DefaultSolutionType = device_property(
dtype=str,
default_value="fitted",
doc="Default calibration solution type ('fitted' or 'raw') used when "
"an aperture in Configure does not specify solution_type.",
)
# ---------------
# Initialisation
# ---------------
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self.component_manager: SubarrayComponentManager
self._missed_events: int
self._health_state: HealthState = HealthState.OK
self._health_model: SubarrayHealthModel
self._qa_metrics: str
[docs]
def init_device(self: MccsSubarray) -> None:
"""Initialise the device."""
self._missed_events = 0
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tSubarrayId: {self.SubarrayId:02}\n"
f"\tSkuidUrl: {self.SkuidUrl}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
self._qa_metrics = ""
self.init_completed()
def _init_state_model(self: MccsSubarray) -> None:
super()._init_state_model()
self._health_state = (
HealthState.OK
) # health state for EMPTY is OK. InitCommand.do() does this too late.
self._health_model = SubarrayHealthModel(
self._health_changed,
ignore_power_state=True,
thresholds={
"station_degraded_threshold": 0.05,
"station_failed_threshold": 0.2,
"subarray_beam_degraded_threshold": 0.05,
"subarray_beam_failed_threshold": 0.2,
},
)
self.set_change_event("healthState", True, False)
self.set_change_event("qualityAssuranceMetrics", True, False)
[docs]
def create_component_manager(
self: MccsSubarray,
) -> SubarrayComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return SubarrayComponentManager(
self.SubarrayId,
self.SkuidUrl,
self.logger,
self.ObsCommandTimeout,
self._communication_state_callback,
self._component_state_callback,
default_solution_type=self.DefaultSolutionType,
event_serialiser=self._event_serialiser,
)
AssignResources_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.subarray",
"MccsSubarray_AssignResources_3_0.json",
)
)
Configure_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.subarray",
"MccsSubarray_Configure_5_3.json",
)
)
Scan_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.common",
"Mccs_Scan_3_0.json",
)
)
# ----------
# Callbacks
# ----------
# Not sure if we actually need all of these?
[docs]
def started_AssignResources(self) -> None:
"""Assign Resources command started callback."""
self.obs_state_model.perform_action("assign_invoked")
[docs]
def completed_AssignResources(self) -> None:
"""Assign Resources command completed callback."""
self.obs_state_model.perform_action("assign_completed")
[docs]
def started_ReleaseAllResources(self) -> None:
"""Release All Resources command started callback."""
self.obs_state_model.perform_action("release_invoked")
[docs]
def completed_ReleaseAllResources(self) -> None:
"""Release All Resources command completed callback."""
self.obs_state_model.perform_action("release_completed")
[docs]
def started_ObsReset(self) -> None:
"""Obs Reset command started callback."""
self.obs_state_model.perform_action("obsreset_invoked")
[docs]
def completed_ObsReset(self) -> None:
"""Obs Reset command completed callback."""
self.obs_state_model.perform_action("obsreset_completed")
[docs]
def started_Restart(self) -> None:
"""Restart command started callback."""
self.obs_state_model.perform_action("restart_invoked")
[docs]
def completed_Restart(self) -> None:
"""Restart command completed callback."""
self.obs_state_model.perform_action("restart_completed")
[docs]
def started_Abort(self) -> None:
"""Abort command started callback."""
self.obs_state_model.perform_action("abort_invoked")
[docs]
def completed_Abort(self) -> None:
"""Abort command completed callback."""
self.obs_state_model.perform_action("abort_completed")
[docs]
def started_AbortDevice(self) -> None:
"""Abort device command started callback."""
self.obs_state_model.perform_action("abort_invoked")
[docs]
def completed_AbortDevice(self) -> None:
"""Abort device command completed callback."""
self.obs_state_model.perform_action("abort_completed")
# pylint: disable=too-many-arguments, too-many-locals
def _component_state_callback(
self: MccsSubarray,
power: Optional[PowerState] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
resources_changed: Optional[list[set]] = None,
configured_changed: Optional[bool] = None,
scanning_changed: Optional[bool] = None,
obsfault: Optional[bool] = None,
obsstate_changed: Optional[ObsState] = None,
station_power: Optional[PowerState] = None,
task_completed: Optional[str] = None,
missed_event: Optional[bool] = None,
qa_metrics: Optional[str] = None,
) -> None:
"""
Handle change in this device's state.
This is a callback hook, called whenever the state changes. It
is responsible for updating the tango side of things i.e. making
sure the attribute is up to date, and events are pushed.
:param power: An optional parameter with the new power state of the device.
:param health: An optional parameter with the new health state of this
Subarray or a subservient Station device.
:param trl: The TRL of the subservient Station device or None if the
update is for this Subarray.
:param resources_changed: An optional parameter updating the resources for
this Subarray.
:param configured_changed: An optional flag indicating that this Subarray's
configuration has changed.
:param scanning_changed: An optional flag indicating that this Subarray's
scanning state has changed.
:param task_completed: An optional parameter indicating that this Subarray has
completed a slow command. The value of this parameter indicates the finished
task and can be one of: restart, obsreset, release, assign or configure.
This parameter is used to drive the Subarray ObsState.
:param obsfault: An optional flag indicating whether this Subarray is entering
or exiting an ObsFault state.
:param obsstate_changed: An optional parameter with the new ObsState of
a subservient Station device.
:param station_power: An optional parameter with the new power state of
a subservient Station device.
:param missed_event: whether the component manager has detected a missed change
event.
:param qa_metrics: quality assurance metrics.
"""
if health is not None:
self._health_state_changed(health, trl)
if resources_changed is not None:
station_trls = resources_changed[0]
subarray_beam_trls = resources_changed[1]
station_beam_trls = resources_changed[2]
self._resources_changed(station_trls, subarray_beam_trls, station_beam_trls)
if configured_changed is not None:
self._configuration_changed(configured_changed)
if task_completed is not None:
self.obs_state_model.perform_action(f"{task_completed}_completed")
if scanning_changed is not None:
self._scanning_changed(scanning_changed)
if obsfault is not None:
self.obs_state_model.perform_action("component_obsfault")
if obsstate_changed is not None and trl is not None:
if obsstate_changed == ObsState.FAULT:
self.component_manager._device_obs_state_fault(trl, obsstate_changed)
else:
self.component_manager._device_obs_state_changed(trl, obsstate_changed)
if station_power is not None:
self._station_power_state_changed(station_power, trl)
if missed_event:
self._missed_events += 1
if qa_metrics is not None:
self._qa_metrics = qa_metrics
self.push_change_event("qualityAssuranceMetrics", qa_metrics)
def _communication_state_callback(
self: MccsSubarray,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: "component_on",
}
action = action_map[communication_state]
if action is not None:
self.op_state_model.perform_action(action)
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
def _health_state_changed(
self: MccsSubarray,
health: HealthState,
trl: Optional[str] = None,
) -> None:
"""
Handle change in this device's health state.
:param health: The new health of the device
:param trl: The TRL of the device.
"""
if trl is None:
# Do regular health update. This device called the callback.
if self._health_state != health:
self._health_state = health
else:
valid_device_types: dict[str, Callable] = {
"station": self._health_model.station_health_changed,
"beam": self._health_model.station_beam_health_changed,
"subarraybeam": self._health_model.subarray_beam_health_changed,
}
# Identify and call subservient device method.
device_type = trl.split("/")[1]
if device_type in valid_device_types:
valid_device_types[device_type](trl, health)
else:
# We've somehow got a health update for a device type
# we don't manage.
msg = (
f"Received a health state changed event for device {trl} "
"which is not managed by this subarray."
)
self.logger.warning(msg)
def _health_changed(self: MccsSubarray, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state != health:
self._health_state = health
def _station_power_state_changed(
self: MccsSubarray,
station_power: Any,
trl: Optional[str],
) -> None:
"""
Handle the station power state change.
:param station_power: if the station power state has changed
:param trl: TRL of the device
"""
if station_power and trl:
self.component_manager._station_power_state_changed(trl, station_power)
def _configuration_changed(self: MccsSubarray, is_configured: Any) -> None:
"""
Handle the configuration change.
:param is_configured: if the configuration has changed
"""
if is_configured:
self.obs_state_model.perform_action("component_configured")
else:
self.obs_state_model.perform_action("component_unconfigured")
def _scanning_changed(self: MccsSubarray, is_scanning: Any) -> None:
"""
Handle the configuration change.
:param is_scanning: if the scanning has changed
"""
if is_scanning:
self.obs_state_model.perform_action("component_scanning")
else:
self.obs_state_model.perform_action("component_not_scanning")
def _resources_changed(
self: MccsSubarray,
station_trls: set[str],
subarray_beam_trls: set[str],
station_beam_trls: set[str],
) -> None:
"""
Handle change in subarray resources.
This is a callback hook, called by the component manager when
the resources of the subarray changes.
:param station_trls: the TRLs of stations assigned to this subarray
:param subarray_beam_trls: the TRLs of subarray beams assigned
to this subarray
:param station_beam_trls: the TRLs of station beams assigned
to this subarray
"""
if station_trls or subarray_beam_trls or station_beam_trls:
self.obs_state_model.perform_action("component_resourced")
else:
self.obs_state_model.perform_action("component_unresourced")
self._health_model.resources_changed(
station_trls, subarray_beam_trls, station_beam_trls
)
# ----------
# Attributes
# ----------
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsSubarray) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs]
@healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsSubarray, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
[docs]
@attribute(dtype="DevString", format="%s")
def qualityAssuranceMetrics(self: MccsSubarray) -> str:
"""
Expose quality assurance metrics.
The returned structure is:
>>> {
>>> "subarray_beams": {
>>> "<subarray_beam_id>": {
>>> "apertures": {
>>> "<aperture_id>": {
>>> "is_beam_locked": bool,
>>> "<metric_name>": value
>>> },
>>> ...
>>> },
>>> "is_beam_locked": bool
>>> },
>>> ...
>>> },
>>> "beam_locked_percent": float
>>> }
Example:
>>> {
>>> "subarray_beams": {
>>> "1" : {
>>> "apertures": {
>>> "AP001.01": {
>>> "is_beam_locked": True,
>>> }
>>> "AP002.01": {
>>> "is_beam_locked": False,
>>> }
>>> },
>>> "is_beam_locked": False,
>>> },
>>> "2" : {
>>> "apertures": {
>>> "AP003.01": {
>>> "is_beam_locked": False,
>>> }
>>> },
>>> "is_beam_locked": False,
>>> }
>>> },
>>> "beam_locked_percent": 0.0
>>> }
:return: A json serialised dictionary
"""
return self._qa_metrics
@attribute(dtype="DevLong", format="%i")
def scanId(self: MccsSubarray) -> int:
"""
Return the scan id.
:return: the scan id
"""
scan_id = self.component_manager.scan_id
return scan_id if scan_id is not None else -1
[docs]
@scanId.write # type: ignore[no-redef]
def scanId(self: MccsSubarray, scan_id) -> None:
"""
Set the scanId attribute.
:param scan_id: the new scanId
"""
self.component_manager.scan_id = scan_id
[docs]
@attribute(dtype=("DevString",), max_dim_x=512, format="%s")
def stationTrls(self: MccsSubarray) -> list[str]:
"""
Return the TRLs of stations assigned to this subarray.
:return: TRLs of stations assigned to this subarray
"""
return sorted(self.component_manager.station_trls)
[docs]
@attribute(dtype=("DevString",), max_dim_x=512, format="%s")
def stationBeamTrls(self: MccsSubarray) -> list[str]:
"""
Return the TRLs of station beams assigned to this subarray.
:return: TRLs of station beams assigned to this subarray
"""
return sorted(self.component_manager.station_beam_trls)
[docs]
@attribute(dtype=("DevString",), max_dim_x=512, format="%s")
def subarrayBeamTrls(self: MccsSubarray) -> list[str]:
"""
Return the TRLs of subarray beams assigned to this subarray.
:return: TRLs of subarray beams assigned to this subarray
"""
return sorted(self.component_manager.subarray_beam_trls)
[docs]
@attribute(dtype=("DevString"), max_dim_x=1024)
def assignedResources(self: MccsSubarray) -> str:
"""
Return this subarray's assigned resources.
:return: this subarray's assigned resources.
"""
resource_dict = self.component_manager.assigned_resources_dict
stations = [station.rsplit("/", 1)[-1] for station in resource_dict["stations"]]
subarray_beams = [
subarray_beam.rsplit("/", 1)[-1]
for subarray_beam in resource_dict["subarray_beams"]
]
station_beams = [
station_beam.rsplit("/", 1)[-1]
for station_beam in resource_dict["station_beams"]
]
channels = resource_dict["channels"]
interface = "https://schema.skao.int/ska-low-mccs-assignedresources/1.0"
return json.dumps(
{
"interface": interface,
"subarray_beam_ids": subarray_beams,
"station_beam_ids": station_beams,
"station_ids": stations,
"apertures": list(resource_dict["apertures"]),
"channels": channels,
}
)
[docs]
@attribute(dtype="DevString")
def healthReport(self: MccsSubarray) -> str:
"""
Get the health report.
:return: the health report.
"""
return self._health_model.health_report
[docs]
@attribute(dtype="DevLong")
def missedEvents(self: MccsSubarray) -> int:
"""
Get the amount of missed change events.
Some commands rely on change events from sub-devices, sometime we miss these
events, this attribute keeps track of how many we know we have missed.
:return: the amount of missed change events
"""
return self._missed_events
@attribute(dtype="DevBoolean")
def resetCspIngestOnScan(self: MccsSubarray) -> bool:
"""
Return whether this subarray resets the CSP Ingest of its stations by default.
A toggle to control whether this subarray calls ResetCspIngest on its stations
when Scan is called. True by default.
:return: whether this subarray is configured.
"""
return self.component_manager.reset_csp_ingest_on_scan
[docs]
@resetCspIngestOnScan.write # type: ignore[no-redef]
def resetCspIngestOnScan(self: MccsSubarray, reset: bool) -> None:
"""
Set the resetCspIngestOnScan attribute.
:param reset: Whether to reset CSP Ingest on stations when Scan is called.
"""
self.component_manager.reset_csp_ingest_on_scan = reset
# --------
# Commands
# --------
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
# pylint: disable=W0221
def AssignResources(
self: MccsSubarray, **kwargs: Any
) -> stb.type_hints.TaskFunctionType:
"""
Assign resources to this subarray with all relevant parameters.
:param kwargs: mandatory arguments:
- subarray_id: Description of observed sky frequency bands
- subarray_beams: The beams to be assigned
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.do_assign(
**kwargs,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
def is_ReleaseAllResources_allowed(
self, request_type: LRCReqType | None = None
) -> bool:
"""
Return whether the :py:meth:`!ReleaseAllResources()` command may be called.
Overriding base class behaviour (and ADR-8) due to behavioural changes in
ska-tango-base 1.4.2.
:param request_type: ENQUEUE_REQ when the LRC is enqueued by the Tango command
and EXECUTE_REQ when the LRC is about to be executed by the executor.
:return: whether the command may be called in the current device
state
"""
if self._obs_state in [ObsState.IDLE, ObsState.EMPTY]:
return True
return super().is_ReleaseAllResources_allowed(request_type)
[docs]
@stb.long_running_commands.long_running_command
def ReleaseAllResources(
self: MccsSubarray,
) -> stb.type_hints.TaskFunctionType:
"""
Release all allocated resources from this subarray.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.release_all
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
# pylint: disable=W0237
def Scan(
self: MccsSubarray,
scan_id: int,
start_time: Optional[str] = None,
duration: float = 0.0,
**kwargs: Any,
) -> stb.type_hints.TaskFunctionType:
"""
Start the scan associated with this subarray.
:param scan_id: The ID for this scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:param kwargs: Any other kwargs
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.scan(
scan_id=scan_id,
start_time=start_time,
duration=duration,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
def EndScan(self: MccsSubarray) -> stb.type_hints.TaskFunctionType:
"""
Stop the current scan associated with this subarray.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.end_scan
[docs]
@stb.long_running_commands.long_running_command
def End(self: MccsSubarray) -> stb.type_hints.TaskFunctionType:
"""
Deconfigure resources for this subarray.
:return: A tuple containing a return code and a string message
indicating status.
"""
return self.component_manager.deconfigure
[docs]
@stb.long_running_commands.long_running_command
def ObsReset(self: MccsSubarray) -> stb.type_hints.TaskFunctionType:
"""
Reset this subarray to IDLE.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.obsreset
[docs]
@stb.long_running_commands.long_running_command
def Restart(self: MccsSubarray) -> stb.type_hints.TaskFunctionType:
"""
Restart this subarray to EMPTY.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.restart
[docs]
@stb.long_running_commands.long_running_command
def SendTransientBuffer(
self: MccsSubarray, argin: list[int]
) -> stb.type_hints.TaskFunctionType:
"""
Cause the subarray to send the requested segment of the transient buffer to SDP.
The requested segment is specified by:
1. Start time (timestamp: milliseconds since UNIX epoch)
2. End time (timestamp: milliseconds since UNIX epoch)
3. Dispersion measure
Together, these parameters narrow the selection of transient
buffer data to the period of time and frequencies that are of
interest.
Additional metadata, such as the ID of a triggering Scheduling
Block, may need to be supplied to allow SDP to assign data
ownership correctly (TBD75).
:todo: This method is a stub that does nothing but return a
dummy string.
:param argin: Specification of the segment of the transient
buffer to send
:return: ASCII String that indicates status, for information
purposes only
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.send_transient_buffer(
argin=argin,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
def schedule_abort_task(
self: MccsSubarray, task_callback: stb.type_hints.TaskCallbackType
) -> tuple[TaskStatus, str]:
"""
Schedule an Abort task to begin executing immediately.
Subclasses should override this to change the behaviour of the
:py:meth:`!Abort()` command.
:param task_callback: Notified of progress of the abort command.
:return: A tuple containing TaskStatus.IN_PROGRESS and a message
"""
return self.component_manager.abort(task_callback=task_callback)
[docs]
@stb.long_running_commands.long_running_command
def AbortDevice(self: MccsSubarray) -> stb.type_hints.TaskFunctionType:
"""
Abort any long-running command such as ``Configure()`` or ``Scan()``.
This will only cancel commands on this device, not further down the hierarchy,
use Abort() for that use case.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.abort_device
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsSubarray.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()