Source code for ska_tango_base.subarray.subarray_device

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
SKASubarray.

A SubArray handling device. It allows the assigning/releasing of
resources into/from Subarray, configuring capabilities, and exposes the
related information like assigned resources, configured capabilities,
etc.
"""

from __future__ import annotations

import functools
import logging
from collections import namedtuple
from collections.abc import Callable
from typing import Any, TypeVar, cast

from ska_control_model import ObsState, PowerState, ResultCode

from ..base import BaseInterface
from ..commands import JsonValidator, SubmittedSlowCommand
from ..obs import SKAObsDevice
from ..type_hints import CommandTrackerProtocol, DevVarLongStringArrayType
from .subarray_component_manager import SubarrayComponentManager
from .subarray_interface import AbstractSubarrayInterface

__all__ = ["SKASubarray", "main"]


ComponentManagerT = TypeVar("ComponentManagerT", bound=SubarrayComponentManager)


[docs] class SKASubarray( SKAObsDevice[ComponentManagerT], AbstractSubarrayInterface, ): """ A generic subarray device for SKA. :class:`~ska_tango_base.subarray.subarray_device.SKASubarray` inherits from the :class:`~ska_tango_base.obs.obs_device.SKAObsDevice` and :class:`~ska_tango_base.subarray.subarray_interface.AbstractSubarrayInterface`, and expects a component manager to be provided by implementing the :meth:`~ska_tango_base.base.base_device.SKABaseDevice.create_component_manager` method. """ Reset: Callable[[BaseInterface], DevVarLongStringArrayType] | None Standby: Callable[[BaseInterface], DevVarLongStringArrayType] | None Off: Callable[[BaseInterface], DevVarLongStringArrayType] | None On: Callable[[BaseInterface], DevVarLongStringArrayType] | None
[docs] def __init__( self: SKASubarray[ComponentManagerT], *args: Any, **kwargs: Any, ) -> None: """ Initialise a new instance. :param args: positional arguments. :param kwargs: keyword arguments. """ # Dictionary of SKASubarray command info tuples: CommandInfo = namedtuple( "CommandInfo", ["command_class", "state_model_hook", "expected_obs_state"] ) self._SUBARRAY_COMMANDS = { "AssignResources": CommandInfo( self.AssignResourcesCommand, "assign", ObsState.IDLE ), "ReleaseResources": CommandInfo( self.ReleaseResourcesCommand, "release", ObsState.IDLE ), "ReleaseAllResources": CommandInfo( self.ReleaseAllResourcesCommand, "release", ObsState.EMPTY ), "Configure": CommandInfo( self.ConfigureCommand, "configure", ObsState.READY ), "Scan": CommandInfo(self.ScanCommand, None, ObsState.READY), "EndScan": CommandInfo(self.EndScanCommand, None, ObsState.READY), "End": CommandInfo(self.EndCommand, None, ObsState.IDLE), "Abort": CommandInfo(None, None, ObsState.ABORTED), "ObsReset": CommandInfo(self.ObsResetCommand, "obsreset", ObsState.IDLE), "Restart": CommandInfo(self.RestartCommand, "restart", ObsState.EMPTY), } super().__init__(*args, **kwargs)
[docs] class InitCommand(SKAObsDevice.InitCommand): """A class for the SKASubarray's init_device() "command"."""
[docs] def do( self: SKASubarray.InitCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Stateless hook for device initialisation. :param args: positional arguments to the command. This command does not take any, so this should be empty. :param kwargs: keyword arguments to the command. This command does not take any, so this should be empty. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ super().do() message = "SKASubarray Init command completed OK" self.logger.info(message) self._completed() return (ResultCode.OK, message)
[docs] def on_new_shared_bus(self) -> None: """Initialise signals.""" super().on_new_shared_bus() self._activation_time = 0.0
[docs] def create_component_manager( self: SKASubarray[ComponentManagerT], ) -> ComponentManagerT: """ Create and return a component manager for this device. :raises NotImplementedError: because it is not implemented. """ raise NotImplementedError( "'create_component_manager' method must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'SKASubarray' is an abstract base class." )
[docs] def _component_state_changed( self, fault: bool | None = None, power: PowerState | None = None, obsfault: bool | None = None, resourced: bool | None = None, configured: bool | None = None, scanning: bool | None = None, ) -> None: super()._component_state_changed(fault=fault, power=power) if obsfault: self.component_obsfault() if resourced is not None: if resourced: self.component_resourced() else: self.component_unresourced() if configured is not None: if configured: self.component_configured() else: self.component_unconfigured() if scanning is not None: if scanning: self.component_scanning() else: self.component_not_scanning()
[docs] def read_assignedResources(self) -> list[str]: """ Read the resources assigned to the device. The list of resources assigned to the subarray. :return: Resources assigned to the device. """ return self.component_manager.assigned_resources
[docs] def read_configuredCapabilities(self) -> list[str]: """ Read capabilities configured in the Subarray. A list of capability types with no. of instances in use on this subarray; e.g. Correlators:512, PssBeams:4 PstBeams:4, VlbiBeams:0. :return: A list of capability types with no. of instances used in the Subarray """ return self.component_manager.configured_capabilities
[docs] class AssignResourcesCommand(SubmittedSlowCommand): """A class for SKASubarray's AssignResources() command."""
[docs] def __init__( self: SKASubarray.AssignResourcesCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, schema: dict[str, Any] | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. :param schema: an optional JSON schema for the command argument. """ super().__init__( "AssignResources", command_tracker, component_manager, "assign", callback=callback, logger=logger, validator=JsonValidator("AssignResources", schema, logger=logger), )
[docs] class ReleaseResourcesCommand(SubmittedSlowCommand): """A class for SKASubarray's ReleaseResources() command."""
[docs] def __init__( self: SKASubarray.ReleaseResourcesCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, schema: dict[str, Any] | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. :param schema: an optional JSON schema for the command argument. """ super().__init__( "ReleaseResources", command_tracker, component_manager, "release", callback=callback, logger=logger, validator=JsonValidator("ReleaseResources", schema, logger=logger), )
[docs] class ReleaseAllResourcesCommand(SubmittedSlowCommand): """A class for SKASubarray's ReleaseAllResources() command."""
[docs] def __init__( self: SKASubarray.ReleaseAllResourcesCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( "ReleaseAllResources", command_tracker, component_manager, "release_all", callback=callback, logger=logger, )
[docs] class ConfigureCommand(SubmittedSlowCommand): """A class for SKASubarray's Configure() command."""
[docs] def __init__( self: SKASubarray.ConfigureCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, schema: dict[str, Any] | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. :param schema: an optional JSON schema for the command argument. """ super().__init__( "Configure", command_tracker, component_manager, "configure", callback=callback, logger=logger, validator=JsonValidator("Configure", schema, logger=logger), )
[docs] class ScanCommand(SubmittedSlowCommand): """A class for SKASubarray's Scan() command."""
[docs] def __init__( self: SKASubarray.ScanCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, schema: dict[str, Any] | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. :param schema: an optional JSON schema for the command argument. """ super().__init__( "Scan", command_tracker, component_manager, "scan", callback=callback, logger=logger, validator=JsonValidator("Scan", schema, logger=logger), )
[docs] class EndScanCommand(SubmittedSlowCommand): """A class for SKASubarray's EndScan() command."""
[docs] def __init__( self: SKASubarray.EndScanCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( "EndScan", command_tracker, component_manager, "end_scan", callback=callback, logger=logger, )
[docs] class EndCommand(SubmittedSlowCommand): """A class for SKASubarray's End() command."""
[docs] def __init__( self: SKASubarray.EndCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( "End", command_tracker, component_manager, "deconfigure", callback=callback, logger=logger, )
[docs] class ObsResetCommand(SubmittedSlowCommand): """A class for SKASubarray's ObsReset() command."""
[docs] def __init__( self: SKASubarray.ObsResetCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( "ObsReset", command_tracker, component_manager, "obsreset", callback=callback, logger=logger, )
[docs] class RestartCommand(SubmittedSlowCommand): """A class for SKASubarray's Restart() command."""
[docs] def __init__( self: SKASubarray.RestartCommand, command_tracker: CommandTrackerProtocol, component_manager: SubarrayComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ super().__init__( "Restart", command_tracker, component_manager, "restart", callback=callback, logger=logger, )
[docs] def init_command_objects(self: SKASubarray[ComponentManagerT]) -> None: """Set up the command objects.""" super().init_command_objects() def _callback(hook: str, running: bool) -> None: action = "invoked" if running else "completed" self.obs_state_model.perform_action(f"{hook}_{action}") for command_name, command_tuple in self._SUBARRAY_COMMANDS.items(): command_class = command_tuple.command_class if command_class is None: continue state_model_hook = command_tuple.state_model_hook callback = ( None if state_model_hook is None else functools.partial(_callback, state_model_hook) ) self._register_default_command_object( command_name, command_class( self._command_tracker, self.component_manager, callback=callback, logger=None, ), )
def _update_commanded_state( self: SKASubarray[ComponentManagerT], command_name: str ) -> None: super()._update_commanded_state(command_name) # Update commandedObsState after a SKASubarray command's status is 'IN_PROGRESS' if command_name in self._SUBARRAY_COMMANDS: expected_obs_state = self._SUBARRAY_COMMANDS[ command_name ].expected_obs_state # Handle special case if any resourcing command was interrupted if ( command_name == "ObsReset" and self.obs_state_model._obs_state_machine.state == "RESETTING_EMPTY" ): expected_obs_state = ObsState.EMPTY self._commanded_obs_state = expected_obs_state # ----------------- # Device Properties # ----------------- # ---------- # Attributes # ---------- # -------- # Commands # --------
[docs] def execute_AssignResources( self: SKASubarray[ComponentManagerT], argin: str ) -> DevVarLongStringArrayType: """ Assign resources to this subarray. To modify behaviour for this command, modify the do() method of the command class. :param argin: the resources to be assigned :return: A tuple containing a result code and a string message. If the result code indicates that the command was accepted, the message is the unique ID of the task that will execute the command. If the result code indicates that the command was not excepted, the message explains why. """ handler = self.get_command_object("AssignResources") (result_code, message) = handler(argin) return ([result_code], [message])
[docs] def execute_ReleaseResources( self: SKASubarray[ComponentManagerT], argin: str ) -> DevVarLongStringArrayType: """ Delta removal of assigned resources. To modify behaviour for this command, modify the do() method of the command class. :param argin: the resources to be released :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("ReleaseResources") (result_code, message) = handler(argin) return ([result_code], [message])
[docs] def execute_ReleaseAllResources( self: SKASubarray[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Remove all resources to tear down to an empty subarray. To modify behaviour for this command, modify the do() method of the command class. :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("ReleaseAllResources") (result_code, message) = handler() return ([result_code], [message])
[docs] def execute_Configure( self: SKASubarray[ComponentManagerT], argin: str ) -> DevVarLongStringArrayType: """ Configure the capabilities of this subarray. To modify behaviour for this command, modify the do() method of the command class. :param argin: JSON-encoded string with the scan configuration", configuration specification :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("Configure") (result_code, message) = handler(argin) return ([result_code], [message])
[docs] def execute_Scan( self: SKASubarray[ComponentManagerT], argin: str ) -> DevVarLongStringArrayType: """ Start scanning. To modify behaviour for this command, modify the do() method of the command class. :param argin: JSON-encoded string with the per-scan configuration :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("Scan") (result_code, message) = handler(argin) return ([result_code], [message])
[docs] def execute_EndScan( self: SKASubarray[ComponentManagerT], ) -> DevVarLongStringArrayType: """ End the scan. To modify behaviour for this command, modify the do() method of the command class. :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("EndScan") (result_code, message) = handler() return ([result_code], [message])
[docs] def execute_End(self: SKASubarray[ComponentManagerT]) -> DevVarLongStringArrayType: """ End the scan block. To modify behaviour for this command, modify the do() method of the command class. :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("End") (result_code, message) = handler() return ([result_code], [message])
[docs] def execute_ObsReset( self: SKASubarray[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Reset the current observation process. To modify behaviour for this command, modify the do() method of the command class. :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("ObsReset") (result_code, message) = handler() return ([result_code], [message])
[docs] def execute_Restart( self: SKASubarray[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Restart the subarray. That is, deconfigure and release all resources. To modify behaviour for this command, modify the do() method of the command class. :return: A tuple containing a result code and the unique ID of the command """ handler = self.get_command_object("Restart") (result_code, message) = handler() return ([result_code], [message])
# ---------- # Run server # ----------
[docs] def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ def _create_component_manager( self: SKASubarray[SubarrayComponentManager], ) -> SubarrayComponentManager: return SubarrayComponentManager(self.logger) SKASubarray.create_component_manager = _create_component_manager # type: ignore return cast(int, SKASubarray.run_server(args=args or None, **kwargs))
if __name__ == "__main__": main()