Source code for ska_tango_base.testing.reference.reference_subarray_device

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
A reference implementation of an SKA subarray device.

It inherits from SKASubarray but provides schemas for some commands.
"""

from __future__ import annotations

import logging
from collections.abc import Callable
from threading import Event
from typing import Final, cast

from tango.server import command

from ...long_running_commands import submit_lrc_task
from ...subarray.subarray_device import SKASubarray
from ...type_hints import (
    CommandTrackerProtocol,
    JSONData,
    TaskCallbackType,
    TaskFunctionType,
)
from ...validators import validate_json_args
from .reference_subarray_component_manager import ReferenceSubarrayComponentManager

__all__ = ["SKASubarray", "main"]


class ReferenceSkaSubarray(SKASubarray[ReferenceSubarrayComponentManager]):
    """Implements a reference SKA Subarray device using the new interface."""

    __version__ = "2.0.0"

    def create_component_manager(
        self: ReferenceSkaSubarray,
    ) -> ReferenceSubarrayComponentManager:
        """
        Create and return a component manager for this device.

        :returns: a reference subarray component manager.
        """
        return ReferenceSubarrayComponentManager(
            self.CapabilityTypes,
            self.logger,
            self._communication_state_changed,
            self._component_state_changed,
        )

    AssignResources_SCHEMA: dict[str, JSONData] = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_AssignResources.json",  # noqa: E501
        "title": "ska-tango-base ReferenceSkaSubarray AssignResources schema",
        "description": "Schema for ska-tango-base ReferenceSkaSubarray AssignResources command",  # noqa: E501
        "type": "object",
        "properties": {
            "resources": {
                "description": "Resources to assign",
                "type": "array",
                "items": {"type": "string"},
            },
        },
        "required": ["resources"],
    }

    @validate_json_args(schema=AssignResources_SCHEMA)
    @submit_lrc_task
    def execute_AssignResources(self, resources: list[str]) -> TaskFunctionType:
        """Assign resources to the component."""

        def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
            self.component_manager._component.assign(
                set(resources),
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

    ReleaseResources_SCHEMA: dict[str, JSONData] = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_ReleaseResources.json",  # noqa: E501
        "title": "ska-tango-base ReferenceSkaSubarray ReleaseResources schema",
        "description": "Schema for ska-tango-base ReferenceSkaSubarray ReleaseResources command",  # noqa: E501
        "type": "object",
        "properties": {
            "resources": {
                "description": "Resources to release",
                "type": "array",
                "items": {"type": "string"},
            }
        },
        "required": ["resources"],
    }

    @validate_json_args(schema=ReleaseResources_SCHEMA)
    @submit_lrc_task
    def execute_ReleaseResources(self, resources: list[str]) -> TaskFunctionType:
        """Release resources from the component."""

        def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
            self.component_manager._component.release(
                set(resources),
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

    @submit_lrc_task
    def execute_ReleaseAllResources(self) -> TaskFunctionType:
        """Release all resources from the component."""

        def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
            self.component_manager._component.release_all(
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

    Configure_SCHEMA: dict[str, JSONData] = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Configure.json",  # noqa: E501
        "title": "ska-tango-base ReferenceSkaSubarray Configure schema",
        "description": "Schema for ska-tango-base ReferenceSkaSubarray Configure command",  # noqa: E501
        "type": "object",
        "properties": {
            "blocks": {
                "description": "Number of blocks in this scan",
                "type": "integer",
                "minimum": 0,
            },
            "channels": {
                "description": "Number of channels in this scan",
                "type": "integer",
                "minimum": 0,
            },
        },
    }

    @validate_json_args(schema=Configure_SCHEMA)
    @submit_lrc_task
    def execute_Configure(
        self, blocks: int, channels: int | None = None
    ) -> TaskFunctionType:
        """Configure the component."""

        def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
            self.component_manager._component.configure(
                blocks,
                channels,
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

    @submit_lrc_task
    def execute_End(self) -> TaskFunctionType:
        """Deconfigure component."""
        return self.component_manager._component.deconfigure

    Scan_SCHEMA: dict[str, JSONData] = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Scan.json",
        "title": "ska-tango-base ReferenceSkaSubarray Scan schema",
        "description": "Schema for ska-tango-base ReferenceSkaSubarray Scan command",  # noqa: E501
        "type": "object",
        "properties": {
            "scan_id": {
                "description": "Scan ID",
                "type": "string",
            },
        },
        "required": ["scan_id"],
    }

    @validate_json_args(schema=Scan_SCHEMA)
    @submit_lrc_task
    def execute_Scan(self, scan_id: str) -> TaskFunctionType:
        """Start scanning."""

        def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
            self.component_manager._component.scan(
                scan_id,
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

    @submit_lrc_task
    def execute_EndScan(self) -> TaskFunctionType:
        """End the scan."""
        return self.component_manager._component.end_scan

    @submit_lrc_task
    def execute_Restart(self) -> TaskFunctionType:
        """Restart the component."""

        def task(task_callback: TaskCallbackType, task_abort_event: Event) -> None:
            self.component_manager._component.restart(
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

    @command()
    def SimulateFault(self: ReferenceDeprecatedSkaSubarray) -> None:
        """Simulate a fault state."""
        self.component_manager._component.set_fault()

    @command()
    def SimulateObsFault(self: ReferenceDeprecatedSkaSubarray) -> None:
        """Simulate an observation fault state."""
        self.component_manager.abort_tasks()
        self.component_manager._component.simulate_obsfault()


class ReferenceDeprecatedSkaSubarray(SKASubarray[ReferenceSubarrayComponentManager]):
    """Implements a reference SKA Subarray device using the deprecated interface."""

    __version__ = "1.0.0"

    def create_component_manager(
        self: ReferenceDeprecatedSkaSubarray,
    ) -> ReferenceSubarrayComponentManager:
        """
        Create and return a component manager for this device.

        :returns: a reference subarray component manager.
        """
        return ReferenceSubarrayComponentManager(
            self.CapabilityTypes,
            self.logger,
            self._communication_state_changed,
            self._component_state_changed,
        )

    class AssignResourcesCommand(SKASubarray.AssignResourcesCommand):
        """A class for SKASubarray's AssignResources() command."""

        SCHEMA: Final = {
            "$schema": "https://json-schema.org/draft/2020-12/schema",
            "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_AssignResources.json",  # noqa: E501
            "title": "ska-tango-base ReferenceSkaSubarray AssignResources schema",
            "description": "Schema for ska-tango-base ReferenceSkaSubarray AssignResources command",  # noqa: E501
            "type": "object",
            "properties": {
                "resources": {
                    "description": "Resources to assign",
                    "type": "array",
                    "items": {"type": "string"},
                },
            },
            "required": ["resources"],
        }

        def __init__(
            self: ReferenceDeprecatedSkaSubarray.AssignResourcesCommand,
            command_tracker: CommandTrackerProtocol,
            component_manager: ReferenceSubarrayComponentManager,
            callback: Callable[[bool], None] | None = None,
            logger: logging.Logger | None = None,
        ) -> None:
            """
            Initialise a new instance.

            :param command_tracker: the device's command tracker
            :param component_manager: the device's component manager
            :param callback: an optional callback to be called when this
                command starts and finishes.
            :param logger: a logger for this command to log with.
            """
            super().__init__(
                command_tracker,
                component_manager,
                callback=callback,
                logger=logger,
                schema=self.SCHEMA,
            )

    class ReleaseResourcesCommand(SKASubarray.ReleaseResourcesCommand):
        """A class for SKASubarray's ReleaseResources() command."""

        SCHEMA: Final = {
            "$schema": "https://json-schema.org/draft/2020-12/schema",
            "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_ReleaseResources.json",  # noqa: E501
            "title": "ska-tango-base ReferenceSkaSubarray ReleaseResources schema",
            "description": "Schema for ska-tango-base ReferenceSkaSubarray ReleaseResources command",  # noqa: E501
            "type": "object",
            "properties": {
                "resources": {
                    "description": "Resources to release",
                    "type": "array",
                    "items": {"type": "string"},
                }
            },
            "required": ["resources"],
        }

        def __init__(
            self: ReferenceDeprecatedSkaSubarray.ReleaseResourcesCommand,
            command_tracker: CommandTrackerProtocol,
            component_manager: ReferenceSubarrayComponentManager,
            callback: Callable[[bool], None] | None = None,
            logger: logging.Logger | None = None,
        ) -> None:
            """
            Initialise a new instance.

            :param command_tracker: the device's command tracker
            :param component_manager: the device's component manager
            :param callback: an optional callback to be called when this
                command starts and finishes.
            :param logger: a logger for this command to log with.
            """
            super().__init__(
                command_tracker,
                component_manager,
                callback=callback,
                logger=logger,
                schema=self.SCHEMA,
            )

    class ConfigureCommand(SKASubarray.ConfigureCommand):
        """A class for SKASubarray's Configure() command."""

        SCHEMA: Final = {
            "$schema": "https://json-schema.org/draft/2020-12/schema",
            "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Configure.json",  # noqa: E501
            "title": "ska-tango-base ReferenceSkaSubarray Configure schema",
            "description": "Schema for ska-tango-base ReferenceSkaSubarray Configure command",  # noqa: E501
            "type": "object",
            "properties": {
                "blocks": {
                    "description": "Number of blocks in this scan",
                    "type": "integer",
                    "minimum": 0,
                },
                "channels": {
                    "description": "Number of channels in this scan",
                    "type": "integer",
                    "minimum": 0,
                },
            },
        }

        def __init__(
            self: ReferenceDeprecatedSkaSubarray.ConfigureCommand,
            command_tracker: CommandTrackerProtocol,
            component_manager: ReferenceSubarrayComponentManager,
            callback: Callable[[bool], None] | None = None,
            logger: logging.Logger | None = None,
        ) -> None:
            """
            Initialise a new instance.

            :param command_tracker: the device's command tracker
            :param component_manager: the device's component manager
            :param callback: an optional callback to be called when this
                command starts and finishes.
            :param logger: a logger for this command to log with.
            """
            super().__init__(
                command_tracker,
                component_manager,
                callback=callback,
                logger=logger,
                schema=self.SCHEMA,
            )

    class ScanCommand(SKASubarray.ScanCommand):
        """A class for SKASubarray's Scan() command."""

        SCHEMA: Final = {
            "$schema": "https://json-schema.org/draft/2020-12/schema",
            "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_Scan.json",
            "title": "ska-tango-base ReferenceSkaSubarray Scan schema",
            "description": "Schema for ska-tango-base ReferenceSkaSubarray Scan command",  # noqa: E501
            "type": "object",
            "properties": {
                "scan_id": {
                    "description": "Scan ID",
                    "type": "string",
                },
            },
            "required": ["scan_id"],
        }

        def __init__(
            self: ReferenceDeprecatedSkaSubarray.ScanCommand,
            command_tracker: CommandTrackerProtocol,
            component_manager: ReferenceSubarrayComponentManager,
            callback: Callable[[bool], None] | None = None,
            logger: logging.Logger | None = None,
        ) -> None:
            """
            Initialise a new instance.

            :param command_tracker: the device's command tracker
            :param component_manager: the device's component manager
            :param callback: an optional callback to be called when this
                command starts and finishes.
            :param logger: a logger for this command to log with.
            """
            super().__init__(
                command_tracker,
                component_manager,
                callback=callback,
                logger=logger,
                schema=self.SCHEMA,
            )

    @command()
    def SimulateFault(self: ReferenceDeprecatedSkaSubarray) -> None:
        """Simulate a fault state."""
        self.component_manager._component.set_fault()

    @command()
    def SimulateObsFault(self: ReferenceDeprecatedSkaSubarray) -> None:
        """Simulate an observation fault state."""
        self.component_manager.abort_tasks()
        self.component_manager._component.simulate_obsfault()


# ----------
# Run server
# ----------
[docs] def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return cast( int, ReferenceDeprecatedSkaSubarray.run_server(args=args or None, **kwargs) )
if __name__ == "__main__": main()