Source code for ska_low_mccs.subarray.subarray_device

#  -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements MCCS functionality for monitoring and control of subarrays."""

from __future__ import annotations  # allow forward references in type hints

import functools
import importlib  # allow forward references in type hints
import json
import logging
import sys
from typing import Any, Callable, Final, Optional

from ska_control_model import (
    CommunicationStatus,
    HealthState,
    ObsState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_tango_base.base import CommandTracker
from ska_tango_base.commands import JsonValidator, SlowCommand, SubmittedSlowCommand
from ska_tango_base.obs import SKAObsDevice
from ska_tango_base.subarray import SKASubarray
from tango.server import attribute, command, device_property

from ska_low_mccs.subarray.subarray_component_manager import SubarrayComponentManager
from ska_low_mccs.subarray.subarray_health_model import SubarrayHealthModel

__all__ = ["MccsSubarray", "main"]


DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]


[docs]class MccsSubarray(SKASubarray): """MccsSubarray is the Tango device class for the MCCS Subarray prototype.""" # ----------------- # Device Properties # ----------------- SubarrayId = device_property(dtype=int, default_value=0) SkuidUrl = device_property(dtype=str, default_value="") ObsCommandTimeout = device_property( dtype=int, default_value=60, doc="The timeout in seconds for Observation commands.", ) # --------------- # Initialisation # ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """ Initialise this device object. :param args: positional args to the init :param kwargs: keyword args to the init """ # We aren't supposed to define initialisation methods for Tango # devices; we are only supposed to define an `init_device` method. But # we insist on doing so here, just so that we can define some # attributes, thereby stopping the linters from complaining about # "attribute-defined-outside-init" etc. We still need to make sure that # `init_device` re-initialises any values defined in here. super().__init__(*args, **kwargs) self.component_manager: SubarrayComponentManager self._missed_events: int self._health_state: HealthState = HealthState.OK self._health_model: SubarrayHealthModel
[docs] def init_device(self: MccsSubarray) -> None: """Initialise the device.""" self._missed_events = 0 super().init_device() self._build_state = sys.modules["ska_low_mccs"].__version_info__ self._version_id = sys.modules["ska_low_mccs"].__version__ device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' version = f"{device_name} Software Version: {self._version_id}" properties = ( f"Initialised {device_name} device with properties:\n" f"\tSubarrayId: {self.SubarrayId:02}\n" f"\tSkuidUrl: {self.SkuidUrl}\n" ) self.logger.info( "\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties )
def _init_state_model(self: MccsSubarray) -> None: super()._init_state_model() self._health_state = ( HealthState.OK ) # health state for EMPTY is OK. InitCommand.do() does this too late. self._health_model = SubarrayHealthModel( self._health_changed, ignore_power_state=True, thresholds={ "station_degraded_threshold": 0.05, "station_failed_threshold": 0.2, "subarray_beam_degraded_threshold": 0.05, "subarray_beam_failed_threshold": 0.2, }, ) self.set_change_event("healthState", True, False)
[docs] def create_component_manager( self: MccsSubarray, ) -> SubarrayComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ return SubarrayComponentManager( self.SubarrayId, self.SkuidUrl, self.logger, self.ObsCommandTimeout, self._communication_state_callback, self._component_state_callback, )
[docs] class AbortDeviceCommand(SlowCommand[tuple[ResultCode, str]]): """A class for MccsSubarray's AbortDevice() command."""
[docs] def __init__( self: MccsSubarray.AbortDeviceCommand, command_tracker: CommandTracker, component_manager: SubarrayComponentManager, callback: Callable[[bool], None], logger: logging.Logger | None = None, ) -> None: """ Initialise a new AbortCommand instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: callback to be called when this command states and finishes :param logger: a logger for this command object to yuse """ self._command_tracker = command_tracker self._component_manager = component_manager super().__init__(callback=callback, logger=logger)
[docs] def do( self: MccsSubarray.AbortDeviceCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Stateless hook for AbortDevice() command functionality. :param args: positional arguments to the command. This command does not take any, so this should be empty. :param kwargs: keyword arguments to the command. This command does not take any, so this should be empty. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ command_id = self._command_tracker.new_command( "AbortDevice", completed_callback=self._completed ) status, _ = self._component_manager.abort_device( functools.partial(self._command_tracker.update_command_info, command_id) ) assert status == TaskStatus.IN_PROGRESS return ResultCode.STARTED, command_id
# commands with a json schema
[docs] class AssignResourcesCommand(SKASubarray.AssignResourcesCommand): """A class for SKASubarray's AssignResources() command.""" SCHEMA_ASSIGN_RESOURCES: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.subarray", "MccsSubarray_AssignResources_3_0.json", ) )
[docs] def __init__( self: MccsSubarray.AssignResourcesCommand, command_tracker: CommandTracker, component_manager: SubarrayComponentManager, callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( command_tracker, component_manager, callback=callback, logger=logger, schema=self.SCHEMA_ASSIGN_RESOURCES, )
[docs] class ConfigureCommand(SKASubarray.ConfigureCommand): """A class for SKASubarray's Configure() command.""" SCHEMA_CONFIGURE: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.subarray", "MccsSubarray_Configure_3_0.json", ) )
[docs] def __init__( self: MccsSubarray.ConfigureCommand, command_tracker: CommandTracker, component_manager: SubarrayComponentManager, callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( command_tracker, component_manager, callback=callback, logger=logger, schema=self.SCHEMA_CONFIGURE, )
[docs] class ScanCommand(SKASubarray.ScanCommand): """A class for SKASubarray's Scan() command.""" SCHEMA_SCAN: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.common", "Mccs_Scan_3_0.json", ) )
[docs] def __init__( self: MccsSubarray.ScanCommand, command_tracker: CommandTracker, component_manager: SubarrayComponentManager, callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( command_tracker, component_manager, callback=callback, logger=logger, schema=self.SCHEMA_SCAN, )
[docs] def init_command_objects(self: MccsSubarray) -> None: """Initialise the command handlers for commands supported by this device.""" super().init_command_objects() def _callback(hook: str, running: bool) -> None: action = "invoked" if running else "completed" self.obs_state_model.perform_action(f"{hook}_{action}") for command_name, method_name, schema in [ ( "SendTransientBuffer", "send_transient_buffer", None, ), # These commands are defined in the superclass, no need to redefine ("ReleaseResources", "release_resources", None), # ("ReleaseAllResources", "release__all_resources"), # ("EndScan", "end_scan"), ]: validator = ( None if schema is None else JsonValidator( command_name, schema, logger=self.logger, ) ) self.register_command_object( command_name, SubmittedSlowCommand( command_name, self._command_tracker, self.component_manager, method_name, callback=None, logger=self.logger, validator=validator, ), ) self.register_command_object( "AbortDevice", self.AbortDeviceCommand( self._command_tracker, self.component_manager, callback=functools.partial(_callback, "abort"), logger=self.logger, ), )
[docs] class InitCommand(SKAObsDevice.InitCommand): """Command class for device initialisation.""" # pylint: disable-next=arguments-differ
[docs] def do( # type: ignore[override] self: MccsSubarray.InitCommand, ) -> tuple[ResultCode, str]: """ Initialise the attributes and properties of MccsSubarray. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ self._device.set_change_event("stationTrls", True, True) self._device.set_archive_event("stationTrls", True, True) self._device._build_state = str( sys.modules["ska_low_mccs"].__version_info__ ) self._device._version_id = sys.modules["ska_low_mccs"].__version__ super().do() return (ResultCode.OK, "Init command started")
# ---------- # Callbacks # ---------- # pylint: disable=too-many-arguments def _component_state_callback( self: MccsSubarray, power: Optional[PowerState] = None, health: Optional[HealthState] = None, trl: Optional[str] = None, resources_changed: Optional[list[set]] = None, configured_changed: Optional[bool] = None, scanning_changed: Optional[bool] = None, obsfault: Optional[bool] = None, obsstate_changed: Optional[ObsState] = None, station_power: Optional[PowerState] = None, task_completed: Optional[str] = None, missed_event: Optional[bool] = None, ) -> None: """ Handle change in this device's state. This is a callback hook, called whenever the state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param power: An optional parameter with the new power state of the device. :param health: An optional parameter with the new health state of this Subarray or a subservient Station device. :param trl: The TRL of the subservient Station device or None if the update is for this Subarray. :param resources_changed: An optional parameter updating the resources for this Subarray. :param configured_changed: An optional flag indicating that this Subarray's configuration has changed. :param scanning_changed: An optional flag indicating that this Subarray's scanning state has changed. :param task_completed: An optional parameter indicating that this Subarray has completed a slow command. The value of this parameter indicates the finished task and can be one of: restart, obsreset, release, assign or configure. This parameter is used to drive the Subarray ObsState. :param obsfault: An optional flag indicating whether this Subarray is entering or exiting an ObsFault state. :param obsstate_changed: An optional parameter with the new ObsState of a subservient Station device. :param station_power: An optional parameter with the new power state of a subservient Station device. :param missed_event: whether the component manager has detected a missed change event. """ if health is not None: self._health_state_changed(health, trl) if resources_changed is not None: station_trls = resources_changed[0] subarray_beam_trls = resources_changed[1] station_beam_trls = resources_changed[2] self._resources_changed(station_trls, subarray_beam_trls, station_beam_trls) if configured_changed is not None: self._configuration_changed(configured_changed) if task_completed is not None: self.obs_state_model.perform_action(f"{task_completed}_completed") if scanning_changed is not None: self._scanning_changed(scanning_changed) if obsfault is not None: self.obs_state_model.perform_action("component_obsfault") if obsstate_changed is not None and trl is not None: if obsstate_changed == ObsState.FAULT: self.component_manager._device_obs_state_fault(trl, obsstate_changed) else: self.component_manager._device_obs_state_changed(trl, obsstate_changed) if station_power is not None: self._station_power_state_changed(station_power, trl) if missed_event: self._missed_events += 1 def _communication_state_callback( self: MccsSubarray, communication_state: CommunicationStatus, ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_state: the status of communications between the component manager and its component. """ action_map = { CommunicationStatus.DISABLED: "component_disconnected", CommunicationStatus.NOT_ESTABLISHED: "component_unknown", CommunicationStatus.ESTABLISHED: "component_on", } action = action_map[communication_state] if action is not None: self.op_state_model.perform_action(action) self._health_model.update_state( communicating=communication_state == CommunicationStatus.ESTABLISHED ) def _health_state_changed( self: MccsSubarray, health: HealthState, trl: Optional[str] = None, ) -> None: """ Handle change in this device's health state. :param health: The new health of the device :param trl: The TRL of the device. """ if trl is None: # Do regular health update. This device called the callback. if self._health_state != health: self._health_state = health self.push_change_event("healthState", health) else: valid_device_types: dict[str, Callable] = { "station": self._health_model.station_health_changed, "beam": self._health_model.station_beam_health_changed, "subarraybeam": self._health_model.subarray_beam_health_changed, } # Identify and call subservient device method. device_type = trl.split("/")[1] if device_type in valid_device_types: valid_device_types[device_type](trl, health) else: # We've somehow got a health update for a device type # we don't manage. msg = ( f"Received a health state changed event for device {trl} " "which is not managed by this subarray." ) self.logger.warning(msg) def _health_changed(self: MccsSubarray, health: HealthState) -> None: """ Handle change in this device's health state. This is a callback hook, called whenever the HealthModel's evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param health: the new health value """ if self._health_state != health: self._health_state = health self.push_change_event("healthState", health) def _station_power_state_changed( self: MccsSubarray, station_power: Any, trl: Optional[str], ) -> None: """ Handle the station power state change. :param station_power: if the station power state has changed :param trl: TRL of the device """ if station_power and trl: self.component_manager._station_power_state_changed(trl, station_power) def _configuration_changed(self: MccsSubarray, is_configured: Any) -> None: """ Handle the configuration change. :param is_configured: if the configuration has changed """ if is_configured: self.obs_state_model.perform_action("component_configured") else: self.obs_state_model.perform_action("component_unconfigured") def _scanning_changed(self: MccsSubarray, is_scanning: Any) -> None: """ Handle the configuration change. :param is_scanning: if the scanning has changed """ if is_scanning: self.obs_state_model.perform_action("component_scanning") else: self.obs_state_model.perform_action("component_not_scanning") def _resources_changed( self: MccsSubarray, station_trls: set[str], subarray_beam_trls: set[str], station_beam_trls: set[str], ) -> None: """ Handle change in subarray resources. This is a callback hook, called by the component manager when the resources of the subarray changes. :param station_trls: the TRLs of stations assigned to this subarray :param subarray_beam_trls: the TRLs of subarray beams assigned to this subarray :param station_beam_trls: the TRLs of station beams assigned to this subarray """ if station_trls or subarray_beam_trls or station_beam_trls: self.obs_state_model.perform_action("component_resourced") else: self.obs_state_model.perform_action("component_unresourced") self._health_model.resources_changed( station_trls, subarray_beam_trls, station_beam_trls ) # ---------- # Attributes # ---------- @attribute( dtype="DevString", format="%s", ) def healthModelParams(self: MccsSubarray) -> str: """ Get the health params from the health model. :return: the health params """ return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef] def healthModelParams(self: MccsSubarray, argin: str) -> None: """ Set the params for health transition rules. :param argin: JSON-string of dictionary of health states """ self._health_model.health_params = json.loads(argin) self._health_model.update_health()
@attribute(dtype="DevLong", format="%i") def scanId(self: MccsSubarray) -> int: """ Return the scan id. :return: the scan id """ scan_id = self.component_manager.scan_id return scan_id if scan_id is not None else -1
[docs] @scanId.write # type: ignore[no-redef] def scanId(self: MccsSubarray, scan_id) -> None: """ Set the scanId attribute. :param scan_id: the new scanId """ self.component_manager.scan_id = scan_id
[docs] @attribute(dtype=("DevString",), max_dim_x=512, format="%s") def stationTrls(self: MccsSubarray) -> list[str]: """ Return the TRLs of stations assigned to this subarray. :return: TRLs of stations assigned to this subarray """ return sorted(self.component_manager.station_trls)
[docs] @attribute(dtype=("DevString",), max_dim_x=512, format="%s") def stationBeamTrls(self: MccsSubarray) -> list[str]: """ Return the TRLs of station beams assigned to this subarray. :return: TRLs of station beams assigned to this subarray """ return sorted(self.component_manager.station_beam_trls)
[docs] @attribute(dtype=("DevString",), max_dim_x=512, format="%s") def subarrayBeamTrls(self: MccsSubarray) -> list[str]: """ Return the TRLs of subarray beams assigned to this subarray. :return: TRLs of subarray beams assigned to this subarray """ return sorted(self.component_manager.subarray_beam_trls)
[docs] @attribute(dtype=("DevString"), max_dim_x=1024) def assignedResources(self: MccsSubarray) -> str: """ Return this subarray's assigned resources. :return: this subarray's assigned resources. """ resource_dict = self.component_manager.assigned_resources_dict stations = [ station.rsplit("/", 1)[-1].lstrip("0") for station in resource_dict["stations"] ] subarray_beams = [ subarray_beam.rsplit("/", 1)[-1].lstrip("0") for subarray_beam in resource_dict["subarray_beams"] ] station_beams = [ station_beam.rsplit("/", 1)[-1].lstrip("0") for station_beam in resource_dict["station_beams"] ] channels = resource_dict["channels"] interface = "https://schema.skao.int/ska-low-mccs-assignedresources/1.0" return json.dumps( { "interface": interface, "subarray_beam_ids": subarray_beams, "station_beam_ids": station_beams, "station_ids": stations, "apertures": list(resource_dict["apertures"]), "channels": channels, } )
[docs] @attribute(dtype="DevString") def healthReport(self: MccsSubarray) -> str: """ Get the health report. :return: the health report. """ return self._health_model.health_report
[docs] @attribute(dtype="DevLong") def missedEvents(self: MccsSubarray) -> int: """ Get the amount of missed change events. Some commands rely on change events from sub-devices, sometime we miss these events, this attribute keeps track of how many we know we have missed. :return: the amount of missed change events """ return self._missed_events
# -------- # Commands # --------
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def ReleaseResources(self: MccsSubarray, argin: str) -> DevVarLongStringArrayType: """ Release resources from this subarray. :param argin: the resources to be released :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("ReleaseResources") (return_code, unique_id) = handler(argin) return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def ReleaseAllResources(self: MccsSubarray) -> DevVarLongStringArrayType: """ Release all resources from this subarray. :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("ReleaseAllResources") (return_code, unique_id) = handler() return ([return_code], [unique_id])
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def Scan( self: MccsSubarray, argin: str, ) -> DevVarLongStringArrayType: """ Start scanning. :param argin: Json string containing scan_id and start_time. :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("Scan") (return_code, unique_id) = handler(argin) return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def EndScan(self: MccsSubarray) -> DevVarLongStringArrayType: """ Stop scanning. :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("EndScan") (return_code, unique_id) = handler() return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def End(self: MccsSubarray) -> DevVarLongStringArrayType: """ Deconfigure resources. :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("End") (return_code, unique_id) = handler() return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def ObsReset(self: MccsSubarray) -> DevVarLongStringArrayType: """ Reset the observation by returning to unconfigured state. :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("ObsReset") (return_code, unique_id) = handler() return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def Restart(self: MccsSubarray) -> DevVarLongStringArrayType: """ Restart the subarray by returning to unresourced state. :return: A tuple containing a return code and a string message indicating status. """ handler = self.get_command_object("Restart") (return_code, unique_id) = handler() return ([return_code], [unique_id])
[docs] @command(dtype_in="DevVarLongArray", dtype_out="DevVarLongStringArray") def SendTransientBuffer( self: MccsSubarray, argin: list[int] ) -> DevVarLongStringArrayType: """ Cause the subarray to send the requested segment of the transient buffer to SDP. The requested segment is specified by: 1. Start time (timestamp: milliseconds since UNIX epoch) 2. End time (timestamp: milliseconds since UNIX epoch) 3. Dispersion measure Together, these parameters narrow the selection of transient buffer data to the period of time and frequencies that are of interest. Additional metadata, such as the ID of a triggering Scheduling Block, may need to be supplied to allow SDP to assign data ownership correctly (TBD75). :todo: This method is a stub that does nothing but return a dummy string. :param argin: Specification of the segment of the transient buffer to send :return: ASCII String that indicates status, for information purposes only """ handler = self.get_command_object("SendTransientBuffer") (result_code, unique_id) = handler(argin) return ([result_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def AbortDevice(self: MccsSubarray) -> DevVarLongStringArrayType: """ Abort any long-running command such as ``Configure()`` or ``Scan()``. This will only cancel commands on this device, not further down the hierarchy, use Abort() for that use case. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("AbortDevice") (result_code, message) = handler() return ([result_code], [message])
# ---------- # Run server # ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return MccsSubarray.run_server(args=args or None, **kwargs)
if __name__ == "__main__": main()