Source code for ska_low_mccs.subarray.subarray_component_manager

#  -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
# pylint: disable = too-many-lines
"""This module implements component management for subarrays."""
from __future__ import annotations

import functools
import json
import logging
import threading
import time
from typing import Any, Callable, Optional, Sequence

import ska_tango_base.subarray
from ska_control_model import (
    CommunicationStatus,
    ObsState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_low_mccs_common.component import ObsDeviceComponentManager
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
    CompositeCommandResultEvaluator,
    MccsCompositeCommandProxy,
    pretty_format,
)
from ska_ser_skuid.client import SkuidClient  # type: ignore
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager

__all__ = ["SubarrayComponentManager"]


class _StationProxy(ObsDeviceComponentManager):
    """A subarray's proxy to its stations."""

    # TODO: remove comments when commands implemented in station
    @check_communicating
    @check_on
    def deallocate_subarray(
        self: _StationProxy, subarray_id: int
    ) -> tuple[TaskStatus, str]:
        """
        Deconfigure the station.

        :param subarray_id: the subarray which configuration must be removed
        :return: A task status and response message.
        """
        assert self._proxy is not None

        (result_code, unique_id) = self._proxy.DeallocateSubarray(subarray_id)
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def scan(
        self: _StationProxy,
        subarray_id: int,
        scan_id: int,
        start_time: Optional[str],
        duration: float,
    ) -> tuple[TaskStatus, str]:
        """
        Start the subarray beam scanning.

        :param subarray_id: the subarray ID which scan is started
        :param scan_id: the id of the scan
        :param start_time: UTC time for begin of scan, None for immediate start
        :param duration: Scan duration in seconds. 0.0 or omitted means forever
        :return: A task status and response message.
        """
        assert self._proxy is not None
        self.logger.debug(f"Scan command issued to station for scan {scan_id}")
        scan_arg: dict[str, int | float | str] = {
            "subarray_id": subarray_id,
            "scan_id": scan_id,
            "duration": duration,
        }
        if start_time is not None:
            scan_arg["start_time"] = start_time

        ([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def end_scan(self: _StationProxy, subarray_id: int) -> tuple[TaskStatus, str]:
        """
        Stop the subarray beam scanning.

        :param subarray_id: the subarray ID which scan is stopped
        :return: A task status and response message.
        """
        assert self._proxy is not None

        ([result_code], unique_id) = self._proxy.EndScan(subarray_id)
        return (result_code, unique_id)


class _SubarrayBeamProxy(ObsDeviceComponentManager):
    """A subarray's proxy to its subarray beams."""

    @check_communicating
    @check_on
    def configure(
        self: _SubarrayBeamProxy, configuration: dict
    ) -> tuple[TaskStatus, str]:
        """
        Configure the subarray beam.

        :param configuration: the configuration to be applied to this
            subarray beam
        :return: A task status and response message.
        """
        assert self._proxy is not None
        configuration_str = json.dumps(configuration)
        (result_code, unique_id) = self._proxy.Configure(configuration_str)
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def scan(
        self: _SubarrayBeamProxy,
        scan_id: int,
        start_time: Optional[str],
        duration: float,
    ) -> tuple[TaskStatus, str]:
        """
        Start the subarray beam scanning.

        :param scan_id: the id of the scan
        :param start_time: UTC time for begin of scan, None for immediate start
        :param duration: Scan duration in seconds. 0.0 or omitted means forever
        :return: A task status and response message.
        """
        assert self._proxy is not None
        scan_arg: dict[str, int | float | str] = {
            "scan_id": scan_id,
            "duration": duration,
        }
        if start_time is not None:
            scan_arg["start_time"] = start_time

        ([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def release_all(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        Release subarray beam resources.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.ReleaseAllResources()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def deconfigure(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        De-configure the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.End()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def end_scan(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        End the current scan.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.EndScan()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def abort(
        self: _SubarrayBeamProxy, task_callback: TaskCallbackType | None = None
    ) -> tuple[TaskStatus, str]:
        """
        Abort the current scan associated with the subarray beam.

        :param task_callback: Update task state, defaults to None

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.Abort()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def obsreset(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        Reset to IDLE the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.ObsReset()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def restart(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        Restart to EMPTY the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.Restart()
        return (result_code, unique_id)


class _StationBeamProxy(ObsDeviceComponentManager):
    """A subarray's proxy to its station beams."""

    @check_communicating
    @check_on
    def configure(
        self: _StationBeamProxy, configuration: dict
    ) -> tuple[TaskStatus, str]:
        """
        Configure the station beam.

        :param configuration: the configuration to be applied to this
            station beam
        :return: A task status and response message.
        """
        assert self._proxy is not None
        configuration_str = json.dumps(configuration)
        (result_code, unique_id) = self._proxy.Configure(configuration_str)
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def scan(
        self: _StationBeamProxy,
        scan_id: int,
        start_time: Optional[str],
        duration: float,
    ) -> tuple[TaskStatus, str]:
        """
        Start the subarray beam scanning.

        :param scan_id: the id of the scan
        :param start_time: the start time of the scan
        :param duration: Scan duration in seconds. 0.0 or omitted means forever
        :return: A task status and response message.
        """
        assert self._proxy is not None
        scan_arg: dict[str, int | float | str] = {
            "scan_id": scan_id,
            "duration": duration,
        }
        if start_time is not None:
            scan_arg["start_time"] = start_time

        ([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def deconfigure(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
        """
        Configure the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.End()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def release_all(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
        """
        Release station beam resources.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.ReleaseAllResources()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def end_scan(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
        """
        Release station beam resources.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.EndScan()
        return (result_code, unique_id)


# pylint: disable=too-many-instance-attributes, too-many-public-methods
[docs]class SubarrayComponentManager( TaskExecutorComponentManager, ska_tango_base.subarray.SubarrayComponentManager, ): """A component manager for a subarray.""" # messages for long running command task callback _task_message = { TaskStatus.COMPLETED: "completed OK.", TaskStatus.ABORTED: "been aborted.", TaskStatus.FAILED: "timed out.", } _task_to_result = { TaskStatus.COMPLETED: ResultCode.OK, TaskStatus.ABORTED: ResultCode.ABORTED, TaskStatus.FAILED: ResultCode.FAILED, } # pylint: disable=too-many-arguments
[docs] def __init__( self: SubarrayComponentManager, subarray_id: int, skuid_url: str, logger: logging.Logger, obs_command_timeout: int, communication_state_callback: Callable[[CommunicationStatus], None], component_state_callback: Callable[..., None], ) -> None: """ Initialise a new instance. :param subarray_id: the subarray ID for this device :param skuid_url: The address at which a SKUID service is running. :param obs_command_timeout: the default timeout for obs commands in seconds. :param logger: the logger to be used by this object. :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_state_callback: callback to be called when the component state changes. """ self._power_state_lock = threading.RLock() self._component_state_callback: Callable[..., None] self._component_state_callback = component_state_callback self._abort_complete = threading.Event() self._obs_command_timeout = obs_command_timeout self._device_communication_states: dict[str, CommunicationStatus] = {} self._station_power_modes: dict[str, Optional[PowerState]] = {} self._device_obs_states: dict[str, Optional[ObsState]] = {} self._is_assigning = False self._is_configured = False self._configuring_resources: set[str] = set() # these resources should go self._desired_state = ObsState.READY # to this end state self._stations: dict[str, _StationProxy] = {} self._subarray_beams: dict[str, _SubarrayBeamProxy] = {} self._station_beams: dict[str, _StationBeamProxy] = {} self._apertures: dict[str, str] = {} self._subarray_beam_trl: dict[int, str] = {} self._number_of_channels = 0 self._subarray_id = subarray_id self._skuid_url = skuid_url self._scan_id: Optional[int] = None self._scan_start_time: str | None = None self._scan_duration = 0.0 super().__init__( logger, communication_state_callback, component_state_callback, resources_changed=None, configured_changed=None, scanning_changed=None, task_completed=None, obsfault=None, obsstate_changed=None, station_power=None, power=None, fault=None, )
[docs] def start_communicating(self: SubarrayComponentManager) -> None: """Establish communication with the station components.""" if self.communication_state == CommunicationStatus.ESTABLISHED: return if self.communication_state == CommunicationStatus.DISABLED: self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) if self._stations or self._subarray_beams or self._station_beams: for station_proxy in self._stations.values(): station_proxy.start_communicating() for subarray_beam_proxy in self._subarray_beams.values(): subarray_beam_proxy.start_communicating() for station_beam_proxy in self._station_beams.values(): station_beam_proxy.start_communicating() else: self._update_communication_state(CommunicationStatus.ESTABLISHED) with self._power_state_lock: self._update_component_state(power=PowerState.ON)
[docs] def stop_communicating(self: SubarrayComponentManager) -> None: """Break off communication with the station components.""" if self.communication_state == CommunicationStatus.DISABLED: return for station in self._stations.values(): station.stop_communicating() for subarray_beam in self._subarray_beams.values(): subarray_beam.stop_communicating() for station_beam in self._station_beams.values(): station_beam.stop_communicating() self._update_communication_state(CommunicationStatus.DISABLED) self._update_component_state(power=None, fault=None)
@property def max_executing_tasks(self) -> int: """ Get the max number of tasks that can be executing at once. :return: max number of simultaneously executing tasks. """ return 2 @property def scan_id(self: SubarrayComponentManager) -> Optional[int]: """ Return the scan id, or None if a scan is not current. :return: the scan id, or None if a scan is not current. """ return self._scan_id @property def subarray_beam_trls(self: SubarrayComponentManager) -> set[str]: """ Return the set of TRLs of subarray beams assigned to this subarray. :return: the set of TRLs of subarray beams assigned to this subarray. """ return set(self._subarray_beams.keys()) @property def station_beam_trls(self: SubarrayComponentManager) -> set[str]: """ Return the set of TRLs of station beams assigned to this subarray. :return: the set of TRLs of station beams assigned to this subarray. """ return set(self._station_beams.keys()) @property def station_trls(self: SubarrayComponentManager) -> set[str]: """ Return the set of TRLs of stations assigned to this subarray. :return: the set of TRLs of stations assigned to this subarray. """ return set(self._stations.keys()) @property def power_state(self: SubarrayComponentManager) -> Optional[PowerState]: """ Return my power state. :return: my power state """ return self._component_state["power"] def _check_aborted( self: SubarrayComponentManager, task_abort_event: Optional[threading.Event], task_callback: Optional[Callable], method_name: str, ) -> bool: if task_abort_event is None or task_callback is None: self.logger.warning(f"Cannot check {method_name} for abort status") return False if task_abort_event.is_set(): self.logger.info(f"{method_name} has been aborted") task_callback( status=TaskStatus.ABORTED, result=(ResultCode.ABORTED, f"{method_name} aborted"), ) return True return False
[docs] @check_communicating def assign( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, **kwargs: Any, ) -> tuple[TaskStatus, str]: """ Submit the `AssignResources` slow command. This method returns immediately after it is submitted for execution. :param task_callback: Update task state, defaults to None :param kwargs: keyword arguments to the command. Required keys are "subarray_id" (the ID of the subarray), and subarray_beams (resource specification for each beam). :return: a result code and response message. """ self._is_assigning = True return self.submit_task( self._assign, args=[kwargs["subarray_id"], kwargs["subarray_beams"]], task_callback=task_callback, )
# pylint: disable=too-many-branches, too-many-locals, too-many-statements @check_communicating def _assign( self: SubarrayComponentManager, # resources: dict[str, Any], subarray_id: int, subarray_beams: dict, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Assign resources to this subarray. This is just for communication and health roll-up, resource management is done by controller. :param subarray_id: ID of the subarray :param subarray_beams: resource specification for each beam :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_assign"): return station_trls: list[str] = [] subarray_beam_trls: list[str] = [] station_beam_trls: list[str] = [] number_of_channels = 0 self._apertures.clear() self._subarray_beam_trl.clear() self._number_of_channels = 0 for beam in subarray_beams: subarray_beam_id = beam.get("subarray_beam_id", 1) subarray_beam_trl = beam.get( "subarray_beam_trl", f"low-mccs/subarraybeam/{subarray_beam_id:02d}", ) self._subarray_beam_trl[subarray_beam_id] = subarray_beam_trl subarray_beam_trls.append(subarray_beam_trl) number_of_channels += beam.get("number_of_channels") for aperture in beam.get("apertures", []): aperture_id = aperture.get("aperture_id", None) station_id = aperture.get("station_id") if aperture_id is None: aperture_id = f"AP{station_id}.1" station_trl = aperture.get("station_trl", None) if station_trl is not None and station_trl not in station_trls: station_trls.append(station_trl) station_beam_trl = aperture.get("station_beam_trl", None) station_beam_trls.append(station_beam_trl) self._apertures[aperture_id] = station_beam_trl if self._check_aborted(task_abort_event, task_callback, "_assign"): return self._number_of_channels = number_of_channels station_trls_to_add = sorted(station_trls) - self._stations.keys() subarray_beam_trls_to_add = subarray_beam_trls - self._subarray_beams.keys() station_beam_trls_to_add = station_beam_trls - self._station_beams.keys() trls_to_add = station_trls_to_add.union( subarray_beam_trls_to_add, station_beam_trls_to_add ) self._desired_state = ObsState.IDLE self._configuring_resources.clear() if trls_to_add: self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) for trl in trls_to_add: self._device_communication_states[trl] = CommunicationStatus.DISABLED self._device_obs_states[trl] = ObsState.EMPTY if self._check_aborted(task_abort_event, task_callback, "_assign"): return self._evaluate_communication_state() for trl in station_trls_to_add: self._stations[trl] = _StationProxy( trl, self.logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), ) for trl in subarray_beam_trls_to_add: self._subarray_beams[trl] = _SubarrayBeamProxy( trl, self.logger, functools.partial(self._device_communication_state_changed, trl), functools.partial( self._component_state_callback, trl=trl, ), ) for trl in station_beam_trls_to_add: self._station_beams[trl] = _StationBeamProxy( trl, self.logger, functools.partial(self._device_communication_state_changed, trl), functools.partial( self._component_state_callback, trl=trl, ), ) self._is_assigning = True self._component_state_callback( resources_changed=[ set(self._stations.keys()), set(self._subarray_beams.keys()), set(self._station_beams.keys()), ] ) for trl in station_trls_to_add: self._stations[trl].start_communicating() for trl in subarray_beam_trls_to_add: self._configuring_resources.add(trl) self._subarray_beams[trl].start_communicating() for trl in station_beam_trls_to_add: self._configuring_resources.add(trl) self._station_beams[trl].start_communicating() if self._check_aborted(task_abort_event, task_callback, "_assign"): return # wait for all subdevices to reach ObsState.IDLE task_result = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "_assign"): return self._is_assigning = False self._is_configured = False if task_callback is not None: task_callback( status=task_result, result=( self._task_to_result[task_result], "Assign resources " + self._task_message[task_result], ), ) @property # type: ignore[misc] @check_communicating def assigned_resources( self: SubarrayComponentManager, ) -> list[str]: """ Return this subarray's resources. :return: this subarray's resources. """ return list( set(self._stations) | set(self._subarray_beams) | set(self._station_beams), ) @property # type: ignore[misc] @check_communicating def assigned_resources_dict( self: SubarrayComponentManager, ) -> dict[str, Sequence[Any]]: """ Return a dictionary of resource types and TRLs. :return: this subarray's resources. """ return { "stations": sorted(self._stations.keys()), "subarray_beams": sorted(self._subarray_beams.keys()), "station_beams": sorted(self._station_beams.keys()), "apertures": sorted(self._apertures.keys()), "channels": [self._number_of_channels], }
[docs] @check_communicating def release( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, # *, # resources: str, **kwargs: Any, ) -> tuple[TaskStatus, str]: """ Submit the `release` slow command. :param task_callback: Update task state, defaults to None :param kwargs: keyword arguments to the command. The only required key is "resources", which contains a list of resource names to release :return: A task status and response message. """ return self.submit_task( self._release, args=[kwargs["resources"]], task_callback=task_callback, )
@check_communicating def _release( self: SubarrayComponentManager, resources: dict[str, list[str]], task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Release resources from this subarray. :param resources: list of resource TRLs to release. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :raises NotImplementedError: because MCCS Subarray cannot perform a partial release of resources. """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_release"): return if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.OK, ( "Not Implemented: MCCS Subarray cannot" "partially release resources." ), ), ) raise NotImplementedError("MCCS Subarray cannot partially release resources.")
[docs] @check_communicating def release_all( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the `ReleaseAllResources` slow command. Release all resources from this subarray. :param task_callback: Update task state, defaults to None :return: a task status and response message """ return self.submit_task( self._release_all, args=[], task_callback=task_callback, )
@check_communicating def _release_all( self: SubarrayComponentManager, task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Release all resources from this subarray. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_release_all"): return self._desired_state = ObsState.EMPTY self._configuring_resources.clear() # Release resources in subelements, and check return codes self.logger.debug("Deallocating resources in stations") for station_proxy in self._stations.values(): station_proxy.deallocate_subarray(self._subarray_id) self.logger.debug("Deallocating resources in subarray beams") for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) subarray_beam_proxy.release_all() if self._check_aborted(task_abort_event, task_callback, "_release_all"): return # Wait for beams to go to EMPTY task_status = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "_release_all"): return self._release_internal_resources() if task_status == TaskStatus.FAILED: self._component_state_callback(obsfault=True) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "ReleaseAllResources has " + self._task_message[task_status], ), ) def _delete_proxies(self) -> None: self.logger.debug("Deleting proxies") self._device_communication_states.clear() # Before we clear the dicts, we must make the the underlying object removes # it's subcriptions. for proxy in ( list(self._stations.values()) + list(self._subarray_beams.values()) + list(self._station_beams.values()) ): if proxy._proxy is not None: proxy._proxy.unsubscribe_all_change_events() self._stations.clear() self._subarray_beams.clear() self._station_beams.clear() def _release_internal_resources(self) -> None: self.logger.debug("Deallocating resources") if self._stations or self._subarray_beams or self._station_beams: self._delete_proxies() self._device_obs_states.clear() self._apertures.clear() self._subarray_beam_trl.clear() self._number_of_channels = 0 self._configuring_resources.clear() self._component_state_callback( resources_changed=[ set(self._stations.keys()), set(self._subarray_beams.keys()), set(self._station_beams.keys()), ] ) self._evaluate_communication_state()
[docs] @check_communicating def configure( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, **kwargs: Any, ) -> tuple[TaskStatus, str]: """ Submit the `configure` slow command. This method returns immediately after it is submitted for execution. :param task_callback: Update task state, defaults to None :param kwargs: keyword arguments to the command. The only required key is "subarray_beams", which contains the beam configuration. :return: A task status and response message. """ subarray_beams = kwargs["subarray_beams"] for beam in subarray_beams: self.logger.debug(f"Configure received, beam: {beam}") return self.submit_task( self._configure, args=[subarray_beams], task_callback=task_callback, )
@check_communicating def _configure( self: SubarrayComponentManager, subarray_beams: list[dict[str, Any]], task_callback: Optional[Callable], task_abort_event: threading.Event, transaction_id: str = "", ) -> None: """ Configure the resources for a scan. :param subarray_beams: the beam configurations to be applied :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :param transaction_id: the transaction id for the configuration Will be generated if not provided. """ def _report_status(status: TaskStatus, **kwargs: Any) -> None: if task_callback is not None: task_callback(status=status, **kwargs) _report_status(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_configure"): return if transaction_id == "": transaction_id = self._get_unique_id() was_empty = not self._is_configured for station_trl, station_proxy in self._stations.items(): station_proxy.deallocate_subarray(self._subarray_id) if self._check_aborted(task_abort_event, task_callback, "_configure"): return configuration: dict[str, dict] = {} for beam in subarray_beams: beam_id = beam.get("subarray_beam_id") assert isinstance(beam_id, int) beam_trl = self._subarray_beam_trl.get(beam_id, None) if beam_trl is None: self.logger.error(f"undefined beam {beam_id}") continue self.logger.debug(f"Processing beam {beam_id} ({beam_trl})") beam["subarray_id"] = self._subarray_id for aperture in beam["apertures"]: aperture_id = aperture["aperture_id"] station_beam_trl = self._apertures.get(aperture_id, None) if station_beam_trl is None: self.logger.error(f"undefined aperture {aperture_id}") continue aperture["station_beam_trl"] = station_beam_trl configuration[beam_trl] = beam if self._check_aborted(task_abort_event, task_callback, "_configure"): return # configure subarray beams. When configured, configure stations task_status = self._configure_subarray_beams(configuration) # Must wait for all configuration to complete if task_status == TaskStatus.COMPLETED: task_status = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "_configure"): return if task_status == TaskStatus.COMPLETED: self.logger.debug("configuring stations") result_code = self._configure_stations(transaction_id=transaction_id) if result_code != ResultCode.OK: task_status = TaskStatus.FAILED # Obsstate update if task_status == TaskStatus.COMPLETED: self._is_configured = True if was_empty: self._component_state_callback(configured_changed=True) elif task_status == TaskStatus.FAILED: self._component_state_callback(obsfault=True) _report_status( status=task_status, result=( self._task_to_result[task_status], "Configure has " + self._task_message[task_status], ), ) def _configure_stations( self: SubarrayComponentManager, transaction_id: str ) -> ResultCode: """ Configure the station resources for a scan. :param transaction_id: the transaction id for the configuration :return: a result code """ apply_configuration_commands = MccsCompositeCommandProxy(self.logger) for station_trl in self._stations: apply_configuration_commands += MccsCommandProxy( station_trl, "ApplyConfiguration", self.logger, default_args=transaction_id, ) result, message = apply_configuration_commands( command_evaluator=CompositeCommandResultEvaluator() ) if result != ResultCode.OK: self.logger.error( f"MccsCompositeCommandProxy was not happy {result=}" f" {pretty_format(message)}" ) return result def _configure_subarray_beams( self: SubarrayComponentManager, subarray_beam_configuration: dict[str, Any], ) -> TaskStatus: """ Configure the station beam resources for a scan. :param subarray_beam_configuration: the subarray beam configuration to be applied :return: a result code """ self._configuring_resources.clear() self._desired_state = ObsState.READY result_code = TaskStatus.COMPLETED for ( subarray_beam_trl, configuration, ) in subarray_beam_configuration.items(): subarray_beam_proxy = self._subarray_beams[subarray_beam_trl] proxy_result_code = subarray_beam_proxy.configure(configuration) if proxy_result_code == ResultCode.FAILED: result_code = TaskStatus.FAILED else: self._configuring_resources.add(subarray_beam_trl) return result_code # pylint: disable=arguments-differ
[docs] @check_communicating def scan( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, scan_id: int, start_time: Optional[str] = None, duration: Optional[float] = 0.0, ) -> tuple[TaskStatus, str]: """ Submit the `Scan` command. :param task_callback: Update task state, defaults to None :param interface: the schema version this is running against. :param scan_id: The ID for this scan :param start_time: UTC time for begin of scan, None for immediate start :param duration: Scan duration in seconds. 0.0 or omitted means foreve :return: A task status and response message. """ return self.submit_task( self._scan, args=[scan_id, start_time, duration], task_callback=task_callback, )
@check_communicating def _scan( self: SubarrayComponentManager, scan_id: int, start_time: Optional[str], duration: float, task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Start scanning. :param scan_id: the id of the scan :param start_time: the start time of the scan :param duration: Scan duration in seconds. 0.0 or omitted means foreve :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) self._scan_id = scan_id self._scan_start_time = start_time self._scan_duration = duration if self._check_aborted(task_abort_event, task_callback, "_scan"): return self._desired_state = ObsState.SCANNING self._configuring_resources.clear() task_status = TaskStatus.COMPLETED # if everything below does not fail for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) proxy_result_code = subarray_beam_proxy.scan(scan_id, start_time, duration) if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED # wait for beams to start if task_status == TaskStatus.COMPLETED: task_status = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "_scan"): return # Start scan on stations if task_status == TaskStatus.COMPLETED: for trl, station_proxy in self._stations.items(): self.logger.debug(f"Starting scan in station {trl}") proxy_result_code = station_proxy.scan( self._subarray_id, scan_id, start_time, duration, ) if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED if self._check_aborted(task_abort_event, task_callback, "_scan"): return if task_status == TaskStatus.COMPLETED: self._component_state_callback(scanning_changed=True) else: self._component_state_callback(obsfault=True) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Scan has " + self._task_message[task_status], ), )
[docs] @check_communicating def end_scan( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the `end_scan` slow command. :param task_callback: Update task state, defaults to None :return: A task status and response message. """ return self.submit_task( self._end_scan, args=[], task_callback=task_callback, )
def _end_scan( self: SubarrayComponentManager, task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Submit the `end_scan` slow command. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_end_scan"): return self._scan_id = None self._scan_start_time = None self._scan_duration = 0.0 # Stuff goes here. This should tell the subarray beam device to # stop scanning. task_status = TaskStatus.COMPLETED for subarray_beam_proxy in self._subarray_beams.values(): proxy_result_code = subarray_beam_proxy.end_scan() if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED # Stop scan on stations if task_status == TaskStatus.COMPLETED: for trl, station_proxy in self._stations.items(): self.logger.debug(f"Stopping scan in station {trl}") proxy_result_code = station_proxy.end_scan(self._subarray_id) if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED if self._check_aborted(task_abort_event, task_callback, "_end_scan"): return if task_status == TaskStatus.COMPLETED: self._component_state_callback(scanning_changed=False) else: self._component_state_callback(obsfault=True) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "EndScan has " + self._task_message[task_status], ), )
[docs] @check_communicating def deconfigure( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the `deconfigure` (End) slow command. :param task_callback: Update task state, defaults to None :return: A task status and response message. """ self.logger.debug("End command invoked") return self.submit_task( self._deconfigure, args=[], task_callback=task_callback, )
@check_communicating def _deconfigure( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Deconfigure resources. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ self.logger.debug("deconfigure task invoked") if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_deconfigure"): return self._desired_state = ObsState.IDLE task_status = self._deconfigure_subelements() if task_status == TaskStatus.COMPLETED: self._wait_for_obs_state(self._obs_command_timeout, task_abort_event) if self._check_aborted(task_abort_event, task_callback, "_deconfigure"): return self._component_state_callback(configured_changed=False) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Subarray Deconfigure has " + self._task_message[task_status], ), ) @check_communicating def _deconfigure_subelements( self: SubarrayComponentManager, ) -> TaskStatus: """ Deconfigure resources. :return: result code """ self.logger.debug("deconfigure subelements invoked") task_status = TaskStatus.COMPLETED for trl, station_proxy in self._stations.items(): self.logger.debug(f"deconfigure station {trl}") proxy_task_status, response = station_proxy.deallocate_subarray( self._subarray_id ) if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED): task_status = TaskStatus.FAILED # # Set all station beams to no logical bands, point statically at zenith # for trl, subarray_beam_proxy in self._subarray_beams.items(): self.logger.debug(f"deconfigure subarray beam {trl}") self._configuring_resources.add(trl) proxy_task_status, response = subarray_beam_proxy.deconfigure() if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED): task_status = TaskStatus.FAILED self._is_configured = False return task_status def _abort_callback( self: SubarrayComponentManager, *, status: Optional[TaskStatus] = None, exception: Optional[Exception] = None, **kwargs: Any, ) -> None: if exception is not None: self.logger.error(f"abort_commands raised exception: {repr(exception)}") if status == TaskStatus.COMPLETED: self.logger.debug("abort_commands has finished, setting flag") self._abort_complete.set()
[docs] @check_communicating def abort( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Abort the observation. :param task_callback: callback to be called when the status of the command changes :return: A task status and response message. """ # We have to spin up a separate thread such that we skip the queue which # we would append to if we used submit_task threading.Thread( target=self._abort, kwargs={"task_callback": task_callback}, ).start() return (TaskStatus.IN_PROGRESS, "Abort has started")
[docs] @check_communicating def abort_device( self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Abort only this device, for use in RestartSubarray(). :param task_callback: callback to be called when the status of the command changes :return: A task status and response message. """ # We have to spin up a separate thread such that we skip the queue which # we would append to if we used submit_task threading.Thread( target=self._abort, kwargs={"cascade": False, "task_callback": task_callback}, ).start() return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort( # type: ignore[override] self: SubarrayComponentManager, cascade: bool = True, task_callback: Optional[Callable] = None, ) -> None: """ Abort command execution. :param cascade: whether or not to abort sub-devices. :param task_callback: Update task state, defaults to None. """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) task_status = TaskStatus.COMPLETED # This is sent in a thread, we record progress using the _abort_callback, which # will set an Event once the task is complete. self.abort_commands(task_callback=self._abort_callback) # type:ignore # Wait until the timeout for the Event to be set by abort_commands timeout = 10.0 if self._abort_complete.wait(timeout=timeout): self._abort_complete.clear() else: self.logger.error(f"Abort timed out in {timeout} seconds.") task_status = TaskStatus.FAILED # Reinitialize our event. The old one could get set at any point now. self._abort_complete = threading.Event() # Abort sub-devices if cascade: if task_status == TaskStatus.COMPLETED: self._desired_state = ObsState.ABORTED self._configuring_resources.clear() for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) proxy_result_code, response = subarray_beam_proxy.abort() self.logger.debug( f"Aborting {trl} has {ResultCode(proxy_result_code).name}" ) if proxy_result_code in ( ResultCode.FAILED, ResultCode.REJECTED, ResultCode.NOT_ALLOWED, ): task_status = TaskStatus.FAILED break # Wait until subdevices reach ObsState.ABORTED if task_status == TaskStatus.COMPLETED: task_status = self._wait_for_obs_state(timeout=10.0) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Abort command has " + self._task_message[task_status], ), )
[docs] @check_communicating def obsreset( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the `ObsReset` slow command. :param task_callback: Update task state, defaults to None :return: A task status and response message. """ return self.submit_task( self._obsreset, args=[], task_callback=task_callback, )
@check_communicating def _obsreset( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Reset the observation by returning to unconfigured state. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_obsreset"): return task_status = TaskStatus.COMPLETED for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) proxy_result_code = subarray_beam_proxy.obsreset() if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "ObsReset command has " + self._task_message[task_status], ), )
[docs] @check_communicating def restart( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the `Restart` slow command. :param task_callback: Update task state, defaults to None :return: A task status and response message. """ return self.submit_task( self._restart, args=[], task_callback=task_callback, )
@check_communicating def _restart( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Restart the subarray by returning to unresourced state. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "_restart"): return task_status = TaskStatus.COMPLETED for trl, station_proxy in self._stations.items(): proxy_result_code = station_proxy.deallocate_subarray(self._subarray_id) if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED # Scanning related self._scan_id = None self._scan_start_time = None self._scan_duration = 0.0 # Configure related self._is_configured = False # Allocate related self._release_internal_resources() if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Restart command has " + self._task_message[task_status], ), )
[docs] @check_communicating def send_transient_buffer( self: SubarrayComponentManager, argin: list[int], task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the send_transient_buffer slow task. This method returns immediately after it is submitted for execution. :param argin: list of requested segments. :param task_callback: Update task state. Defaults to None. :return: Task status and response message. """ return self.submit_task( self._send_transient_buffer, args=[argin], task_callback=task_callback, )
@check_communicating def _send_transient_buffer( self: SubarrayComponentManager, argin: list[int], task_callback: Optional[Callable], task_abort_event: threading.Event, ) -> None: """ Send the transient buffer. :param argin: list of list of requested segment. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) # do stuff here if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "send_transient_buffer command completed."), ) def _device_communication_state_changed( self: SubarrayComponentManager, trl: str, communication_state: CommunicationStatus, ) -> None: if trl not in self._device_communication_states: self.logger.warning( "Received a communication status changed event for a device not " "managed by this subarray. Probably it was released just a moment ago. " "The event will be discarded." ) return self._device_communication_states[trl] = communication_state if self.communication_state == CommunicationStatus.DISABLED: return self._evaluate_communication_state() def _evaluate_communication_state(self: SubarrayComponentManager) -> None: if CommunicationStatus.DISABLED in self._device_communication_states: self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) elif CommunicationStatus.NOT_ESTABLISHED in self._device_communication_states: self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) else: self._update_communication_state(CommunicationStatus.ESTABLISHED) def _station_power_state_changed( self: SubarrayComponentManager, trl: str, power_mode: PowerState, ) -> None: self._station_power_modes[trl] = power_mode if self._is_assigning and all( power_mode is not None for power_mode in self._station_power_modes.values() ): self._is_assigning = False def _wait_for_obs_state( self: SubarrayComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for sub-device ObsState to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ ticks = int(timeout / 0.01) # 10 ms resolution while self._configuring_resources: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(0.01) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting for ObsState {self._desired_state.name}" f" resources {self._configuring_resources} " f"failed to transition in {timeout} seconds. Attempting final poll." ) return self._final_poll() self.logger.debug( f"Waited ObsState {self._desired_state.name} for" f" {(timeout-ticks*0.01):.3f} seconds" ) return TaskStatus.COMPLETED def _device_obs_state_changed( self: SubarrayComponentManager, trl: str, obs_state: ObsState, ) -> None: old_state = self._device_obs_states.get(trl, None) if old_state: old_name = old_state.name else: old_name = "NONE" new_state = ObsState(obs_state) self._device_obs_states[trl] = new_state if obs_state == self._desired_state and trl in self._configuring_resources: self._configuring_resources.remove(trl) # if not self._configuring_resources: # self._component_state_callback(task_completed="configure") self.logger.debug( f"ObsState for {trl} changed: {old_name} -> {new_state.name}, " f"waiting for {len(self._configuring_resources)} more devices" ) def _device_obs_state_fault( self: SubarrayComponentManager, trl: str, obs_state: ObsState, ) -> None: if obs_state != ObsState.FAULT: self.logger.warning( "_device_obs_state_fault called with" f" incorrect ObsState {trl} {obs_state.name}" ) return self.logger.error(f"{trl} moved to FAULT.") self._device_obs_states[trl] = ObsState(obs_state) self._component_state_callback(obsfault=True) def _final_poll(self: SubarrayComponentManager) -> TaskStatus: """ Direct check of device obsstates as a last attempt before declaring failure. Sometimes it appears we miss change events in production, if for some reason we haven't received all the change events we expected, double check what we think the obsstate is, vs what it actually is on all subdevices that we need to. :returns: a taskstatus dependent on whether or not the device was actually in the correct state. """ for trl in list(self._configuring_resources): device = ( self._station_beams.get(trl) or self._subarray_beams.get(trl) or self._stations.get(trl) ) if ( device and device._proxy and device._proxy.obsstate == self._desired_state ): self.logger.info( f"Change event for {trl} was missed, device was in correct state." ) self._component_state_callback(missed_event=True) self._configuring_resources.remove(trl) return ( TaskStatus.COMPLETED if not self._configuring_resources else TaskStatus.FAILED ) def _get_unique_id(self: SubarrayComponentManager) -> str: """ Get a UID. Retrieves a unique ID from a running SKUID service if available otherwise returns a locally unique ID. :return: A unique ID. """ if self._skuid_url: client = SkuidClient(self._skuid_url) # TODO: hanging on client.fetch_transaction_id() # but this is a weak test anyhow: # we are passing a nonexistent URL, # which makes the client fail to fetch a transaction ID, # and fall back to generating one locally. # So for now I have switched over to # generating locally in the first place. return client.get_local_transaction_id() # return client.fetch_transaction_id() return SkuidClient.get_local_transaction_id()