#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
SKASubarray.
A SubArray handling device. It allows the assigning/releasing of
resources into/from Subarray, configuring capabilities, and exposes the
related information like assigned resources, configured capabilities,
etc.
"""
from __future__ import annotations
import functools
import logging
from collections import namedtuple
from collections.abc import Callable
from typing import Any, TypeVar, cast
from ska_control_model import ObsState, PowerState, ResultCode
from ..base import BaseInterface
from ..commands import JsonValidator, SubmittedSlowCommand
from ..obs import SKAObsDevice
from ..type_hints import CommandTrackerProtocol, DevVarLongStringArrayType
from .subarray_component_manager import SubarrayComponentManager
from .subarray_interface import AbstractSubarrayInterface
__all__ = ["SKASubarray", "main"]
ComponentManagerT = TypeVar("ComponentManagerT", bound=SubarrayComponentManager)
[docs]
class SKASubarray(
SKAObsDevice[ComponentManagerT],
AbstractSubarrayInterface,
):
"""
A generic subarray device for SKA.
:class:`~ska_tango_base.subarray.subarray_device.SKASubarray` inherits from
the :class:`~ska_tango_base.obs.obs_device.SKAObsDevice` and
:class:`~ska_tango_base.subarray.subarray_interface.AbstractSubarrayInterface`, and
expects a component manager to be provided by implementing the
:meth:`~ska_tango_base.base.base_device.SKABaseDevice.create_component_manager`
method.
"""
Reset: Callable[[BaseInterface], DevVarLongStringArrayType] | None
Standby: Callable[[BaseInterface], DevVarLongStringArrayType] | None
Off: Callable[[BaseInterface], DevVarLongStringArrayType] | None
On: Callable[[BaseInterface], DevVarLongStringArrayType] | None
[docs]
def __init__(
self: SKASubarray[ComponentManagerT],
*args: Any,
**kwargs: Any,
) -> None:
"""
Initialise a new instance.
:param args: positional arguments.
:param kwargs: keyword arguments.
"""
# Dictionary of SKASubarray command info tuples:
CommandInfo = namedtuple(
"CommandInfo", ["command_class", "state_model_hook", "expected_obs_state"]
)
self._SUBARRAY_COMMANDS = {
"AssignResources": CommandInfo(
self.AssignResourcesCommand, "assign", ObsState.IDLE
),
"ReleaseResources": CommandInfo(
self.ReleaseResourcesCommand, "release", ObsState.IDLE
),
"ReleaseAllResources": CommandInfo(
self.ReleaseAllResourcesCommand, "release", ObsState.EMPTY
),
"Configure": CommandInfo(
self.ConfigureCommand, "configure", ObsState.READY
),
"Scan": CommandInfo(self.ScanCommand, None, ObsState.READY),
"EndScan": CommandInfo(self.EndScanCommand, None, ObsState.READY),
"End": CommandInfo(self.EndCommand, None, ObsState.IDLE),
"Abort": CommandInfo(None, None, ObsState.ABORTED),
"ObsReset": CommandInfo(self.ObsResetCommand, "obsreset", ObsState.IDLE),
"Restart": CommandInfo(self.RestartCommand, "restart", ObsState.EMPTY),
}
super().__init__(*args, **kwargs)
[docs]
class InitCommand(SKAObsDevice.InitCommand):
"""A class for the SKASubarray's init_device() "command"."""
[docs]
def do(
self: SKASubarray.InitCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Stateless hook for device initialisation.
:param args: positional arguments to the command. This
command does not take any, so this should be empty.
:param kwargs: keyword arguments to the command. This
command does not take any, so this should be empty.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
super().do()
message = "SKASubarray Init command completed OK"
self.logger.info(message)
self._completed()
return (ResultCode.OK, message)
[docs]
def on_new_shared_bus(self) -> None:
"""Initialise signals."""
super().on_new_shared_bus()
self._activation_time = 0.0
[docs]
def create_component_manager(
self: SKASubarray[ComponentManagerT],
) -> ComponentManagerT:
"""
Create and return a component manager for this device.
:raises NotImplementedError: because it is not implemented.
"""
raise NotImplementedError(
"'create_component_manager' method must be implemented by "
f"'{self.__class__.__name__}'. "
"The parent 'SKASubarray' is an abstract base class."
)
[docs]
def _component_state_changed(
self,
fault: bool | None = None,
power: PowerState | None = None,
obsfault: bool | None = None,
resourced: bool | None = None,
configured: bool | None = None,
scanning: bool | None = None,
) -> None:
super()._component_state_changed(fault=fault, power=power)
if obsfault:
self.component_obsfault()
if resourced is not None:
if resourced:
self.component_resourced()
else:
self.component_unresourced()
if configured is not None:
if configured:
self.component_configured()
else:
self.component_unconfigured()
if scanning is not None:
if scanning:
self.component_scanning()
else:
self.component_not_scanning()
[docs]
def read_assignedResources(self) -> list[str]:
"""
Read the resources assigned to the device.
The list of resources assigned to the subarray.
:return: Resources assigned to the device.
"""
return self.component_manager.assigned_resources
[docs]
class AssignResourcesCommand(SubmittedSlowCommand):
"""A class for SKASubarray's AssignResources() command."""
[docs]
def __init__(
self: SKASubarray.AssignResourcesCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
schema: dict[str, Any] | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
:param schema: an optional JSON schema for the command
argument.
"""
super().__init__(
"AssignResources",
command_tracker,
component_manager,
"assign",
callback=callback,
logger=logger,
validator=JsonValidator("AssignResources", schema, logger=logger),
)
[docs]
class ReleaseResourcesCommand(SubmittedSlowCommand):
"""A class for SKASubarray's ReleaseResources() command."""
[docs]
def __init__(
self: SKASubarray.ReleaseResourcesCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
schema: dict[str, Any] | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
:param schema: an optional JSON schema for the command
argument.
"""
super().__init__(
"ReleaseResources",
command_tracker,
component_manager,
"release",
callback=callback,
logger=logger,
validator=JsonValidator("ReleaseResources", schema, logger=logger),
)
[docs]
class ReleaseAllResourcesCommand(SubmittedSlowCommand):
"""A class for SKASubarray's ReleaseAllResources() command."""
[docs]
def __init__(
self: SKASubarray.ReleaseAllResourcesCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
"ReleaseAllResources",
command_tracker,
component_manager,
"release_all",
callback=callback,
logger=logger,
)
[docs]
class ScanCommand(SubmittedSlowCommand):
"""A class for SKASubarray's Scan() command."""
[docs]
def __init__(
self: SKASubarray.ScanCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
schema: dict[str, Any] | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
:param schema: an optional JSON schema for the command
argument.
"""
super().__init__(
"Scan",
command_tracker,
component_manager,
"scan",
callback=callback,
logger=logger,
validator=JsonValidator("Scan", schema, logger=logger),
)
[docs]
class EndScanCommand(SubmittedSlowCommand):
"""A class for SKASubarray's EndScan() command."""
[docs]
def __init__(
self: SKASubarray.EndScanCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
"EndScan",
command_tracker,
component_manager,
"end_scan",
callback=callback,
logger=logger,
)
[docs]
class EndCommand(SubmittedSlowCommand):
"""A class for SKASubarray's End() command."""
[docs]
def __init__(
self: SKASubarray.EndCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
"End",
command_tracker,
component_manager,
"deconfigure",
callback=callback,
logger=logger,
)
[docs]
class ObsResetCommand(SubmittedSlowCommand):
"""A class for SKASubarray's ObsReset() command."""
[docs]
def __init__(
self: SKASubarray.ObsResetCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
"ObsReset",
command_tracker,
component_manager,
"obsreset",
callback=callback,
logger=logger,
)
[docs]
class RestartCommand(SubmittedSlowCommand):
"""A class for SKASubarray's Restart() command."""
[docs]
def __init__(
self: SKASubarray.RestartCommand,
command_tracker: CommandTrackerProtocol,
component_manager: SubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
"Restart",
command_tracker,
component_manager,
"restart",
callback=callback,
logger=logger,
)
[docs]
def init_command_objects(self: SKASubarray[ComponentManagerT]) -> None:
"""Set up the command objects."""
super().init_command_objects()
def _callback(hook: str, running: bool) -> None:
action = "invoked" if running else "completed"
self.obs_state_model.perform_action(f"{hook}_{action}")
for command_name, command_tuple in self._SUBARRAY_COMMANDS.items():
command_class = command_tuple.command_class
if command_class is None:
continue
state_model_hook = command_tuple.state_model_hook
callback = (
None
if state_model_hook is None
else functools.partial(_callback, state_model_hook)
)
self._register_default_command_object(
command_name,
command_class(
self._command_tracker,
self.component_manager,
callback=callback,
logger=None,
),
)
def _update_commanded_state(
self: SKASubarray[ComponentManagerT], command_name: str
) -> None:
super()._update_commanded_state(command_name)
# Update commandedObsState after a SKASubarray command's status is 'IN_PROGRESS'
if command_name in self._SUBARRAY_COMMANDS:
expected_obs_state = self._SUBARRAY_COMMANDS[
command_name
].expected_obs_state
# Handle special case if any resourcing command was interrupted
if (
command_name == "ObsReset"
and self.obs_state_model._obs_state_machine.state == "RESETTING_EMPTY"
):
expected_obs_state = ObsState.EMPTY
self._commanded_obs_state = expected_obs_state
# -----------------
# Device Properties
# -----------------
# ----------
# Attributes
# ----------
# --------
# Commands
# --------
[docs]
def execute_AssignResources(
self: SKASubarray[ComponentManagerT], argin: str
) -> DevVarLongStringArrayType:
"""
Assign resources to this subarray.
To modify behaviour for this command, modify the do() method of
the command class.
:param argin: the resources to be assigned
:return: A tuple containing a result code and a string message. If the result
code indicates that the command was accepted, the message is the unique ID
of the task that will execute the command. If the result code indicates that
the command was not excepted, the message explains why.
"""
handler = self.get_command_object("AssignResources")
(result_code, message) = handler(argin)
return ([result_code], [message])
[docs]
def execute_ReleaseResources(
self: SKASubarray[ComponentManagerT], argin: str
) -> DevVarLongStringArrayType:
"""
Delta removal of assigned resources.
To modify behaviour for this command, modify the do() method of
the command class.
:param argin: the resources to be released
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("ReleaseResources")
(result_code, message) = handler(argin)
return ([result_code], [message])
[docs]
def execute_ReleaseAllResources(
self: SKASubarray[ComponentManagerT],
) -> DevVarLongStringArrayType:
"""
Remove all resources to tear down to an empty subarray.
To modify behaviour for this command, modify the do() method of
the command class.
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("ReleaseAllResources")
(result_code, message) = handler()
return ([result_code], [message])
[docs]
def execute_Scan(
self: SKASubarray[ComponentManagerT], argin: str
) -> DevVarLongStringArrayType:
"""
Start scanning.
To modify behaviour for this command, modify the do() method of
the command class.
:param argin: JSON-encoded string with the per-scan configuration
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("Scan")
(result_code, message) = handler(argin)
return ([result_code], [message])
[docs]
def execute_EndScan(
self: SKASubarray[ComponentManagerT],
) -> DevVarLongStringArrayType:
"""
End the scan.
To modify behaviour for this command, modify the do() method of
the command class.
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("EndScan")
(result_code, message) = handler()
return ([result_code], [message])
[docs]
def execute_End(self: SKASubarray[ComponentManagerT]) -> DevVarLongStringArrayType:
"""
End the scan block.
To modify behaviour for this command, modify the do() method of
the command class.
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("End")
(result_code, message) = handler()
return ([result_code], [message])
[docs]
def execute_ObsReset(
self: SKASubarray[ComponentManagerT],
) -> DevVarLongStringArrayType:
"""
Reset the current observation process.
To modify behaviour for this command, modify the do() method of
the command class.
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("ObsReset")
(result_code, message) = handler()
return ([result_code], [message])
[docs]
def execute_Restart(
self: SKASubarray[ComponentManagerT],
) -> DevVarLongStringArrayType:
"""
Restart the subarray. That is, deconfigure and release all resources.
To modify behaviour for this command, modify the do() method of
the command class.
:return: A tuple containing a result code and the unique ID of the command
"""
handler = self.get_command_object("Restart")
(result_code, message) = handler()
return ([result_code], [message])
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
def _create_component_manager(
self: SKASubarray[SubarrayComponentManager],
) -> SubarrayComponentManager:
return SubarrayComponentManager(self.logger)
SKASubarray.create_component_manager = _create_component_manager # type: ignore
return cast(int, SKASubarray.run_server(args=args or None, **kwargs))
if __name__ == "__main__":
main()