#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
A reference implementation of an SKA subarray device.
It inherits from SKASubarray but provides schemas for some commands.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from threading import Event
from typing import Final, cast
from tango.server import command
from ...long_running_commands import submit_lrc_task
from ...subarray.subarray_device import SKASubarray
from ...type_hints import (
CommandTrackerProtocol,
JSONData,
TaskCallbackType,
TaskFunctionType,
)
from ...validators import validate_json_args
from .reference_subarray_component_manager import ReferenceSubarrayComponentManager
__all__ = ["SKASubarray", "main"]
class ReferenceSkaSubarray(SKASubarray[ReferenceSubarrayComponentManager]):
"""Implements a reference SKA Subarray device using the new interface."""
__version__ = "2.0.0"
def create_component_manager(
self: ReferenceSkaSubarray,
) -> ReferenceSubarrayComponentManager:
"""
Create and return a component manager for this device.
:returns: a reference subarray component manager.
"""
return ReferenceSubarrayComponentManager(
self.CapabilityTypes,
self.logger,
self._communication_state_changed,
self._component_state_changed,
)
AssignResources_SCHEMA: dict[str, JSONData] = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_AssignResources.json", # noqa: E501
"title": "ska-tango-base ReferenceSkaSubarray AssignResources schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray AssignResources command", # noqa: E501
"type": "object",
"properties": {
"resources": {
"description": "Resources to assign",
"type": "array",
"items": {"type": "string"},
},
},
"required": ["resources"],
}
@validate_json_args(schema=AssignResources_SCHEMA)
@submit_lrc_task
def execute_AssignResources(self, resources: list[str]) -> TaskFunctionType:
"""Assign resources to the component."""
def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
self.component_manager._component.assign(
set(resources),
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
ReleaseResources_SCHEMA: dict[str, JSONData] = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_ReleaseResources.json", # noqa: E501
"title": "ska-tango-base ReferenceSkaSubarray ReleaseResources schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray ReleaseResources command", # noqa: E501
"type": "object",
"properties": {
"resources": {
"description": "Resources to release",
"type": "array",
"items": {"type": "string"},
}
},
"required": ["resources"],
}
@validate_json_args(schema=ReleaseResources_SCHEMA)
@submit_lrc_task
def execute_ReleaseResources(self, resources: list[str]) -> TaskFunctionType:
"""Release resources from the component."""
def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
self.component_manager._component.release(
set(resources),
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
@submit_lrc_task
def execute_ReleaseAllResources(self) -> TaskFunctionType:
"""Release all resources from the component."""
def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
self.component_manager._component.release_all(
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
Configure_SCHEMA: dict[str, JSONData] = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Configure.json", # noqa: E501
"title": "ska-tango-base ReferenceSkaSubarray Configure schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray Configure command", # noqa: E501
"type": "object",
"properties": {
"blocks": {
"description": "Number of blocks in this scan",
"type": "integer",
"minimum": 0,
},
"channels": {
"description": "Number of channels in this scan",
"type": "integer",
"minimum": 0,
},
},
}
@validate_json_args(schema=Configure_SCHEMA)
@submit_lrc_task
def execute_Configure(
self, blocks: int, channels: int | None = None
) -> TaskFunctionType:
"""Configure the component."""
def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
self.component_manager._component.configure(
blocks,
channels,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
@submit_lrc_task
def execute_End(self) -> TaskFunctionType:
"""Deconfigure component."""
return self.component_manager._component.deconfigure
Scan_SCHEMA: dict[str, JSONData] = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Scan.json",
"title": "ska-tango-base ReferenceSkaSubarray Scan schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray Scan command", # noqa: E501
"type": "object",
"properties": {
"scan_id": {
"description": "Scan ID",
"type": "string",
},
},
"required": ["scan_id"],
}
@validate_json_args(schema=Scan_SCHEMA)
@submit_lrc_task
def execute_Scan(self, scan_id: str) -> TaskFunctionType:
"""Start scanning."""
def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
self.component_manager._component.scan(
scan_id,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
@submit_lrc_task
def execute_EndScan(self) -> TaskFunctionType:
"""End the scan."""
return self.component_manager._component.end_scan
@submit_lrc_task
def execute_Restart(self) -> TaskFunctionType:
"""Restart the component."""
def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
self.component_manager._component.restart(
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
@command()
def SimulateFault(self: ReferenceDeprecatedSkaSubarray) -> None:
"""Simulate a fault state."""
self.component_manager._component.set_fault()
@command()
def SimulateObsFault(self: ReferenceDeprecatedSkaSubarray) -> None:
"""Simulate an observation fault state."""
self.component_manager.abort_tasks()
self.component_manager._component.simulate_obsfault()
class ReferenceDeprecatedSkaSubarray(SKASubarray[ReferenceSubarrayComponentManager]):
"""Implements a reference SKA Subarray device using the deprecated interface."""
__version__ = "1.0.0"
def create_component_manager(
self: ReferenceDeprecatedSkaSubarray,
) -> ReferenceSubarrayComponentManager:
"""
Create and return a component manager for this device.
:returns: a reference subarray component manager.
"""
return ReferenceSubarrayComponentManager(
self.CapabilityTypes,
self.logger,
self._communication_state_changed,
self._component_state_changed,
)
class AssignResourcesCommand(SKASubarray.AssignResourcesCommand):
"""A class for SKASubarray's AssignResources() command."""
SCHEMA: Final = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_AssignResources.json", # noqa: E501
"title": "ska-tango-base ReferenceSkaSubarray AssignResources schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray AssignResources command", # noqa: E501
"type": "object",
"properties": {
"resources": {
"description": "Resources to assign",
"type": "array",
"items": {"type": "string"},
},
},
"required": ["resources"],
}
def __init__(
self: ReferenceDeprecatedSkaSubarray.AssignResourcesCommand,
command_tracker: CommandTrackerProtocol,
component_manager: ReferenceSubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
command_tracker,
component_manager,
callback=callback,
logger=logger,
schema=self.SCHEMA,
)
class ReleaseResourcesCommand(SKASubarray.ReleaseResourcesCommand):
"""A class for SKASubarray's ReleaseResources() command."""
SCHEMA: Final = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_ReleaseResources.json", # noqa: E501
"title": "ska-tango-base ReferenceSkaSubarray ReleaseResources schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray ReleaseResources command", # noqa: E501
"type": "object",
"properties": {
"resources": {
"description": "Resources to release",
"type": "array",
"items": {"type": "string"},
}
},
"required": ["resources"],
}
def __init__(
self: ReferenceDeprecatedSkaSubarray.ReleaseResourcesCommand,
command_tracker: CommandTrackerProtocol,
component_manager: ReferenceSubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
command_tracker,
component_manager,
callback=callback,
logger=logger,
schema=self.SCHEMA,
)
class ConfigureCommand(SKASubarray.ConfigureCommand):
"""A class for SKASubarray's Configure() command."""
SCHEMA: Final = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Configure.json", # noqa: E501
"title": "ska-tango-base ReferenceSkaSubarray Configure schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray Configure command", # noqa: E501
"type": "object",
"properties": {
"blocks": {
"description": "Number of blocks in this scan",
"type": "integer",
"minimum": 0,
},
"channels": {
"description": "Number of channels in this scan",
"type": "integer",
"minimum": 0,
},
},
}
def __init__(
self: ReferenceDeprecatedSkaSubarray.ConfigureCommand,
command_tracker: CommandTrackerProtocol,
component_manager: ReferenceSubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
command_tracker,
component_manager,
callback=callback,
logger=logger,
schema=self.SCHEMA,
)
class ScanCommand(SKASubarray.ScanCommand):
"""A class for SKASubarray's Scan() command."""
SCHEMA: Final = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Scan.json",
"title": "ska-tango-base ReferenceSkaSubarray Scan schema",
"description": "Schema for ska-tango-base ReferenceSkaSubarray Scan command", # noqa: E501
"type": "object",
"properties": {
"scan_id": {
"description": "Scan ID",
"type": "string",
},
},
"required": ["scan_id"],
}
def __init__(
self: ReferenceDeprecatedSkaSubarray.ScanCommand,
command_tracker: CommandTrackerProtocol,
component_manager: ReferenceSubarrayComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
super().__init__(
command_tracker,
component_manager,
callback=callback,
logger=logger,
schema=self.SCHEMA,
)
@command()
def SimulateFault(self: ReferenceDeprecatedSkaSubarray) -> None:
"""Simulate a fault state."""
self.component_manager._component.set_fault()
@command()
def SimulateObsFault(self: ReferenceDeprecatedSkaSubarray) -> None:
"""Simulate an observation fault state."""
self.component_manager.abort_tasks()
self.component_manager._component.simulate_obsfault()
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return cast(
int, ReferenceDeprecatedSkaSubarray.run_server(args=args or None, **kwargs)
)
if __name__ == "__main__":
main()