Source code for ska_low_mccs.controller.controller_component_manager

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for the MCCS controller."""
# pylint: disable=too-many-lines
from __future__ import annotations

import functools
import json
import logging
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Iterable, Optional

import erfa  # type: ignore
import numpy as np
from ska_control_model import (
    CommunicationStatus,
    HealthState,
    ObsState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_low_mccs_common import EventSerialiser
from ska_low_mccs_common.communication_manager import CommunicationManager
from ska_low_mccs_common.component import (
    DeviceComponentManager,
    ObsDeviceComponentManager,
)
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
    CompositeCommandResultEvaluator,
    MccsCompositeCommandProxy,
)
from ska_low_mccs_common.resource_manager import ResourceManager, ResourcePool
from ska_low_mccs_common.utils import lock_power_state
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from tango import DevFailed, DeviceProxy, EnsureOmniThread

from ska_low_mccs.controller.controller_resource_manager import (
    ControllerResourceManager,
)
from ska_low_mccs.utils import subarraybeam_trl_from_ids

__all__ = ["ControllerComponentManager"]

_task_to_result = {
    TaskStatus.COMPLETED: ResultCode.OK,
    TaskStatus.ABORTED: ResultCode.ABORTED,
    TaskStatus.FAILED: ResultCode.FAILED,
}

RFC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
# SKA longitude location. Used for RMS calculation
LON_SKA_LOW = 116.764


class _StationProxy(DeviceComponentManager):
    """
    A controller's proxy to a station.

    Each station proxy keeps track of channels it has available. Each
    station proxy also has its own resource manager to keep track of
    which of its channels are assigned to each subarray.
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self: _StationProxy,
        trl: str,
        subarray_trls: Iterable[str],
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
        event_serialiser: Optional[EventSerialiser] = None,
    ) -> None:
        """
        Initialise a new instance.

        :param trl: the TRL of the device
        :param subarray_trls: the TRLs of subarrays which channel
            blocks can be assigned to.

        :param logger: the logger to be used by this object.
        :param communication_state_callback: callback to be
            called when the status of the communications channel
            between the component manager and its component changes
        :param component_state_callback: callback to be
            called when the component state changes
        :param event_serialiser: an optional event serialiser to
            use when serialising events
        """
        self._channel_block_pool = ResourcePool(
            channel_blocks=range(48),
            hardware_beams=range(48),
        )
        self._resource_manager = ResourceManager(
            subarray_trls,
            channel_blocks=range(48),
            hardware_beams=range(48),
        )

        super().__init__(
            trl,
            logger,
            communication_state_callback,
            component_state_callback,
            event_serialiser=event_serialiser,
        )

    def allocate_blocks(
        self: _StationProxy, subarray_trl: str, channel_blocks: int
    ) -> list:
        """
        Allocate channel blocks to a subarray.

        This method removes the requested number of channel blocks from
        the available pool and assigns them to the provided subarray
        trl.

        :param subarray_trl: The TRL of the subarray to which the
            channel blocks are to be assigned.
        :param channel_blocks: The number of channel blocks to assign to
            the subarray.
        :return: a list of blocks or None if allocation failed
        :raises ValueError: if the resources cannot be allocated
        """
        channel_blocks_to_allocate: list[int] = []
        block = None
        for _ in range(channel_blocks):
            try:
                block = self._channel_block_pool.get_free_resource("channel_blocks")
            except ValueError as exc:
                self._channel_block_pool.free_resources(
                    {"channel_blocks": channel_blocks_to_allocate}
                )
                raise ValueError from exc
            channel_blocks_to_allocate.append(block)  # type: ignore [arg-type]

        self._resource_manager.allocate(
            subarray_trl, channel_blocks=channel_blocks_to_allocate
        )
        return channel_blocks_to_allocate

    def allocate_beam(self: _StationProxy, subarray_trl: str) -> int:
        """
        Allocate a hardware beam to a subarray.

        This method removes one hardware beam from the available pool
        and assigns it to the provided subarray TRL.

        :param subarray_trl: The TRL of the subarray to which the
            hardware beam is to be assigned.
        :return: index of allocated beam, or None
        """
        beam_to_allocate = self._channel_block_pool.get_free_resource("hardware_beams")
        self._resource_manager.allocate(subarray_trl, hardware_beams=[beam_to_allocate])
        return beam_to_allocate  # type: ignore [return-value]

    def release_from_subarray(self: _StationProxy, subarray_trl: str) -> None:
        """
        Release all channel blocks assigned to a subarray.

        Channel blocks and beams are released from the subarray and
        marked as free in the station proxy's device pool for
        reallocation whenever needed.

        :param subarray_trl: The TRL of the subarray from which this
            station proxy's channel blocks are to be released.
        """
        resources_to_release = self._resource_manager.get_allocated(subarray_trl)
        self._channel_block_pool.free_resources(resources_to_release)
        self._resource_manager.deallocate_from(subarray_trl)

    @check_communicating
    def start_acquisition(self: _StationProxy, argin: str) -> ResultCode:
        """
        Start the acquisition synchronously for all tiles, checks for synchronisation.

        :param argin: json dictionary with optional keywords

            * start_time - (str) start time
            * delay - (int) delay start

        :return: result code of StartAcquisition
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.StartAcquisition(argin)
        return result_code

    @check_communicating
    def initialise(self: _StationProxy) -> ResultCode:
        """
        Initialise the station's tiles.

        :return: result code of Initialise
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.Initialise()
        return result_code

    @check_communicating
    def trigger_adc_equalisation(
        self: _StationProxy, args: str
    ) -> tuple[list[ResultCode], list[str]]:
        """
        Start a station's Adc Equalisation.

        :param args: the arguments as json formatted string

        :return: result code of Initialise
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        return self._proxy._device.TriggerAdcEqualisation(args)

    def check_lrc_status(self: _StationProxy, uid: str) -> str:
        """
        Return the last longRunningCommandResult.

        :param uid: command id

        :returns: True is all tiles in this station are synchronised
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        return self._proxy._device.CheckLongRunningCommandStatus(uid)

    @property
    def issynchronised(self: _StationProxy) -> bool:
        """
        Returns True is all tiles in this station are synchronised.

        :returns: True is all tiles in this station are synchronised
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        return self._proxy._device.issynchronised

    @property
    def isinitialised(self: _StationProxy) -> bool:
        """
        Returns True is all tiles in this station are initialised.

        :returns: True is all tiles in this station are initialised
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        return self._proxy._device.isinitialised

    def get_available_blocks(self: _StationProxy) -> int:
        """
        Get the number of available channels.

        :returns: the number of available channels
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        free_resources = self._resource_manager.get_unallocated()
        available_blocks = list(free_resources["channel_blocks"])
        return len(available_blocks)


class _SubarrayProxy(ObsDeviceComponentManager):
    """A controller's proxy to a subarray."""

    @check_communicating
    @check_on
    def assign_resources(
        self: _SubarrayProxy, subarray_id: int, subarray_beams: Iterable[dict]
    ) -> ResultCode:
        """
        Tell the subarray what resources are assigned to it.

        The Subarray receives the allocated resources, together with the
        subarray allcation description.
        This implicitly deallocates all resources.
        Subarray is described as a set of station beams

        :param subarray_id: This is mainly used for cross check. Subarray knows its ID
        :param subarray_beams: list of subarray beam description. Structure containing
            * subarray_beam_id: integer
            * subarray_beam_trl: str
            * first_subarray_channel: (int) First logical channel assigned to subarray
            * number_of_channels
            * apertures: list of dictionaries iwith each entry containing
            *   * station_id: (int) in range 1-512
            *   * aperture_id: (str) with format APx.y; x must match station_ID)
            *   * station_beam_trl: (str)
            *   * channel_blocks: Allocated channel blocks for this station
            *   * hardware_beam: Allocated hardware beam for this station

        TRLs of stations are not specified, as they are not used in the Subarray.
        Station beam TRLs could also be omitted.

        :return: a result code.
        """
        assert self._proxy is not None
        args = json.dumps(
            {"subarray_id": subarray_id, "subarray_beams": subarray_beams}
        )
        self.logger.debug(f"Subarray AssignResources: {args}")
        try:
            ([result_code], _) = self._proxy.AssignResources(args)
        except Exception as exc:  # pylint: disable=broad-except
            self.logger.debug(f"Caught exception: {exc}")
            result_code = ResultCode.FAILED
        return result_code

    @check_communicating
    @check_on
    def release_all_resources(
        self: _SubarrayProxy,
    ) -> ResultCode:
        """
        Tell the subarray that it no longer has any resources.

        :return: a result code.
        """
        assert self._proxy is not None
        release_command = MccsCommandProxy(
            self._name, "ReleaseAllResources", self._logger
        )
        result_code, _ = release_command()
        return ResultCode(result_code)

    @check_communicating
    def restart(
        self: _SubarrayProxy,
    ) -> ResultCode:
        """
        Restart the subarray and all related sub-elements.

        :return: a result code.
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Restart()
        return result_code

    @check_communicating
    def abort_device(self: _SubarrayProxy) -> ResultCode:
        """
        Abort this device without aborting subdevices.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.AbortDevice()
        return result_code


class _SubarrayBeamProxy(ObsDeviceComponentManager):
    """A controller's proxy to a subarray beam."""

    @check_communicating
    @check_on
    def assign_resources(
        self: _SubarrayBeamProxy,
        subarray_beam: dict[str, Any],
    ) -> ResultCode:
        """
        Assign resources to the Subarray Beam.

        :param subarray_beam: subarray beam description. Structure containing
            * subarray_id: This is used for cross check. Subarray knows its ID
            * subarray_beam_id: integer
            * first_subarray_channel: (int) First logical channel assigned to subarray
            * number_of_channels: The number of SPS channels assigned to subarray
            * apertures: list of dictionaries with each entry containing
            *   * station_id: (int) in range 1-512
            *   * aperture_id: (str) with format APx.y; x must match station_ID
            *   * station_trl: (str)
            *   * station_beam_trl: (str)
            *   * channel_blocks: Allocated channel blocks for this station
            *   * hardware_beam: Allocated hardware beam for this station

        :return: a result code
        """
        assert self._proxy is not None

        self.logger.debug(f"SubarrayBeam AssignResources: {subarray_beam}")
        args = json.dumps(subarray_beam)
        try:
            result_code = self._proxy.AssignResources(args)
        except Exception as exc:  # pylint: disable=broad-except
            self.logger.error(f"Caught exception {exc}")
            result_code = ResultCode.FAILED
        return result_code

    @check_communicating
    def abort_device(self: _SubarrayBeamProxy) -> ResultCode:
        """
        Abort this device without aborting subdevices.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.AbortDevice()
        return result_code

    @check_communicating
    def restart(self: _SubarrayBeamProxy) -> ResultCode:
        """
        Restart this device without aborting subdevices.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Restart()
        return result_code

    @check_communicating
    def set_parent_trl(self: _SubarrayBeamProxy, new_parent_trl: str) -> ResultCode:
        """
        Update the device with a new parent TRL.

        :param new_parent_trl: the trl of the new parent device.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.SetParentTrl(new_parent_trl)
        return result_code

    @check_communicating
    def reset_parent_trl(self: _SubarrayBeamProxy) -> ResultCode:
        """
        Update the device with a new parent TRL.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.ResetParentTrl()
        return result_code


class _StationBeamProxy(ObsDeviceComponentManager):
    """A controller's proxy to a station beam."""

    @check_communicating
    @check_on
    def assign_resources(
        self: _StationBeamProxy,
        station_beam: dict[str, Any],
    ) -> ResultCode:
        """
        Assign resources to a station beam.

        :param station_beam: Dictionary with command arguments
            * param subarray_id: The ID of the subarray to which this beam is assigned
            * param subarray_beam_id: The ID of the subarray beam to which this beam
            is assigned
            * param station_id: ID of the station which implemnts the beam
            * param aperture_id: ID of the form APx.y with X equal to the station ID
            * param channel_blocks: Channel blocks allocated to this beam
            * param hardware_beam: Hardware beam allocated to this beam

        :return: a result code
        """
        assert self._proxy is not None

        self.logger.debug(f"StationBeam AssignResources: {station_beam}")
        json_args = json.dumps(station_beam)

        try:
            result_code = self._proxy.AssignResources(json_args)
        except Exception as exc:  # pylint: disable=broad-except
            self.logger.debug(f"Catched exception {exc}")
            result_code = ResultCode.FAILED
        return result_code

    @check_communicating
    def restart(self: _StationBeamProxy) -> ResultCode:
        """
        Restart this device.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Restart()
        return result_code

    @check_communicating
    def abort(
        self: _StationBeamProxy, task_callback: TaskCallbackType | None = None
    ) -> tuple[TaskStatus, str]:
        """
        Abort this device and its station.

        :param task_callback: Update task state, defaults to None

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Abort()
        return result_code

    @check_communicating
    def set_parent_trl(self: _StationBeamProxy, new_parent_trl: str) -> ResultCode:
        """
        Update the device with a new parent TRL.

        :param new_parent_trl: the trl of the new parent device.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.SetParentTrl(new_parent_trl)
        return result_code

    @check_communicating
    def reset_parent_trl(self: _StationBeamProxy) -> ResultCode:
        """
        Update the device with a new parent TRL.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.ResetParentTrl()
        return result_code


# pylint: disable=too-many-instance-attributes
[docs] class ControllerComponentManager(TaskExecutorComponentManager): """ A component manager for an MCCS controller. This component manager has three jobs: * Monitoring of the devices in the MCCS subsystem * Powering the MCCS subsystem off and on * Allocating resources to subarrays """ # pylint: disable=too-many-arguments, too-many-locals
[docs] def __init__( self: ControllerComponentManager, subarray_trls: Iterable[str], station_trls: Iterable[str], subarray_beam_trls: Iterable[str], station_beam_trls: Iterable[str], logger: logging.Logger, obs_command_timeout: int, communication_state_callback: Callable[[CommunicationStatus], None], component_state_callback: Callable[..., None], event_serialiser: Optional[EventSerialiser] = None, ) -> None: """ Initialise a new instance. :param subarray_trls: TRLS of all subarray devices :param station_trls: TRLS of all station devices :param subarray_beam_trls: TRLS of all subarray beam devices :param station_beam_trls: TRLS of all station beam devices :param obs_command_timeout: the default timeout for obs commands in seconds. :param logger: the logger to be used by this object. :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_state_callback: callback to be called when the component state changes :param event_serialiser: an optional event serialiser to use when serialising events """ self._event_serialiser = event_serialiser self._communication_state_callback = communication_state_callback self._component_state_callback = component_state_callback self._obs_command_timeout = obs_command_timeout self._power_state_lock = threading.RLock() self._device_communication_states: dict[str, CommunicationStatus] = {} self._device_obs_states: dict[str, ObsState] = {} self._device_power_states: dict[str, PowerState] = {} self._configuring_resources: dict[str, set[str]] = defaultdict( set ) # these resources should go self._desired_obs_state: dict[str, ObsState] = {} # to this obs state self._powering_resources: set[str] = set() # these stations should go self._desired_power_state: Optional[PowerState] = None # to this power state self._power_command_in_progress = ( threading.Lock() ) # Used to lock PowerState during power command execution. # this set of stastions should go self._synchronising_resources: set[str] = set() self._desired_synchronisation_state: Optional[ bool ] = None # to this synchronisation state self._initialising_resources: set[str] = set() self._desired_initialisation_state: Optional[bool] = None self._station_ids = [ station_trl.split(r"/")[-1] for station_trl in station_trls ] for trl in subarray_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED for trl in station_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED for trl in subarray_beam_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED for trl in station_beam_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED self._resource_manager = ControllerResourceManager( subarrays=subarray_trls, subarray_beams=subarray_beam_trls, station_beams=station_beam_trls, channel_blocks=range(1, 49), ) self._subarrays: dict[str, _SubarrayProxy] = { trl: _SubarrayProxy( trl, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), event_serialiser=self._event_serialiser, ) for trl in subarray_trls } self._stations: dict[str, _StationProxy] = { trl: _StationProxy( trl, subarray_trls, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), event_serialiser=self._event_serialiser, ) for trl in station_trls } self._subarray_beams: dict[str, _SubarrayBeamProxy] = { trl: _SubarrayBeamProxy( trl, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), event_serialiser=self._event_serialiser, ) for trl in subarray_beam_trls } self._station_beams: dict[str, _StationBeamProxy] = { trl: _StationBeamProxy( trl, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), event_serialiser=self._event_serialiser, ) for trl in station_beam_trls } # dictionaries to map IDs to TRLs self._subarray_trl: dict[int, str] = dict(enumerate(subarray_trls, start=1)) # Map from (subarray_id, local_beam_id) pairs to TRLs. self._subarray_beam_trl: dict[tuple[int, int], str] = {} for subarray_beam_trl in subarray_beam_trls: subarray_id = int(subarray_beam_trl.rsplit("/", 1)[-1].split("-")[0]) local_subarray_beam_id = int( subarray_beam_trl.rsplit("/", 1)[-1].split("-")[1] ) self._subarray_beam_trl[ (subarray_id, local_subarray_beam_id) ] = subarray_beam_trl # Station ID to (station_beam_id -> station_beam_trl) self.station_beam_trl: dict[str, dict[int, str]] = {} for station_id in self._station_ids: # Get all station beam TRLS which are associated with a given station station_beam_trl_in_station = [ station_beam_trl for station_beam_trl in station_beam_trls if station_id in station_beam_trl ] # Map station beam ID to station beam TRL self.station_beam_trl[station_id] = dict( enumerate(station_beam_trl_in_station, start=1) ) self._station_id_trl_map: dict[int, str] = {} self._device_proxies: dict[str, dict] = { "subarray": self._subarrays, "station": self._stations, "subarraybeam": self._subarray_beams, "beam": self._station_beams, } self._started_acquisition = False self._assigning_subarray: Optional[int] = None self._abort_complete = threading.Event() super().__init__( logger, communication_state_callback, component_state_callback, power=None, fault=None, ) self._communication_manager = CommunicationManager( self._update_communication_state, self._update_component_state, self.logger, self._stations, self._subarrays, self._subarray_beams, self._station_beams, )
[docs] def cleanup(self: ControllerComponentManager) -> None: """Cleanup proxies and task executor.""" self._communication_manager.shutdown() for station_proxy in self._stations.values(): station_proxy.cleanup() for subarray_proxy in self._subarrays.values(): subarray_proxy.cleanup() for subarray_beam_proxy in self._subarray_beams.values(): subarray_beam_proxy.cleanup() for station_beam_proxy in self._station_beams.values(): station_beam_proxy.cleanup() super().cleanup()
[docs] def start_communicating(self: ControllerComponentManager) -> None: """Establish communication with the station components.""" self._communication_manager.start_communicating()
[docs] def stop_communicating(self: ControllerComponentManager) -> None: """Break off communication with the station components.""" self._communication_manager.stop_communicating()
def _device_communication_state_changed( self: ControllerComponentManager, trl: str, communication_state: CommunicationStatus, ) -> None: """ Handle communication changes. :param trl: TRL of changed device :param communication_state: new status """ self._communication_manager.update_communication_status( trl, communication_state ) def _evaluate_power_state(self: ControllerComponentManager) -> None: if self._power_command_in_progress.locked(): # Suppress power state evaluation whilst power command in progress. # This is to prevent the Controller changing to PowerState.ON before all # sub-devices have had a chance to turn on/off. return if any( power_state == PowerState.ON for power_state in self._device_power_states.values() ): power_state_target = PowerState.ON elif any( power_state == PowerState.STANDBY for power_state in self._device_power_states.values() ): power_state_target = PowerState.STANDBY elif any( power_state == PowerState.OFF for power_state in self._device_power_states.values() ): power_state_target = PowerState.OFF elif all( power_state == PowerState.UNKNOWN for power_state in self._device_power_states.values() ): power_state_target = PowerState.UNKNOWN else: power_state_target = PowerState.ON if len(self._device_power_states) == 0: # related to test_controller_no_subserviants # previously, when no devices existed it would default to ON as the last # state now this would default to OFF, so we must overwrite it power_state_target = PowerState.ON self.logger.info( "In ControllerComponentManager._evaluatePowerState with:\n" f"\tdevices: {self._device_power_states}\n" f"\tresult: {str(power_state_target)}" ) self._update_component_state(power=power_state_target) @property def power_state(self: ControllerComponentManager) -> Optional[PowerState]: """ Return my power state. :return: my power state """ return self._component_state["power"] def _subarray_health_changed( self: ControllerComponentManager, trl: str, health: HealthState | None, ) -> None: """ Handle a change in the health of a subarray. :param trl: the TRL of the subarray whose health has changed. :param health: the new health state of the subarray, or None if the subarray's health should not be taken into account. """ # What we're really interested in here is whether the subarray is in the right # adminMode. We know that when it's in the wrong adminMode, health will be # reported as None. So instead of subscribing to adminMode, we might as well be # lazy here and get at the adminMode thorough the healthState that we're already # subscribed to. self._resource_manager.set_ready(trl, health is not None) def _subarray_beam_health_changed( self: ControllerComponentManager, trl: str, health: HealthState | None, ) -> None: """ Handle a change in the health of a subarray_beam. :param trl: the TRL of the subarray_beam whose health has changed. :param health: the new health state of the subarray_beam, or None if the subarray_beam's health should not be taken into account. """ def _station_beam_health_changed( self: ControllerComponentManager, trl: str, health: HealthState | None, ) -> None: """ Handle a change in the health of a station_beam. :param trl: the TRL of the station_beam whose health has changed. :param health: the new health state of the station_beam, or None if the station_beam's health should not be taken into account. """
[docs] def get_health_trl( self: ControllerComponentManager, trl: str ) -> Optional[HealthState]: """ Return the health of a subdevice with given TRL. :param trl: TRL of device to return health of :return: health of device given by TRL :raises ValueError: if TRL is for an invalid device """ device = trl.split("/")[1] if ( device not in self._device_proxies or trl not in self._device_proxies[device] ): raise ValueError(f"{trl} is an invalid device.") return self._device_proxies[device][trl].health
[docs] def get_healths( self: ControllerComponentManager, device_type: str = "all" ) -> dict[str, dict[str, Optional[str]]]: """ Return subdevice healths. :param device_type: the type of device to return the health state of its instances. :return: dictionary of [device_type: [TRL : HealthState]] for each device of that device type, or all devices if called without an argument. :raises ValueError: if device_type is not a valid device type. """ devices_healths: dict[str, dict[str, Optional[str]]] = {} if device_type == "all": devices = [*self._device_proxies] else: if device_type not in self._device_proxies: raise ValueError(f"{device_type} is not a valid device type.") devices = [device_type] for device in devices: device_healths: dict[str, Optional[str]] = {} for trl in self._device_proxies[device]: health_trl = self.get_health_trl(trl) if health_trl is not None: health = HealthState(health_trl).name else: health = None device_healths.update({trl: health}) devices_healths.update({device: device_healths}) return devices_healths
[docs] @check_communicating @lock_power_state def off( # type: ignore[override] self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn off the MCCS subsystem. This is simpler than turning ON MCCS, we simply need to command all MccsStations to turn off and wait for it to happen. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) # Command all our MccsStations to turn OFF which aren't already OFF. self._desired_power_state = PowerState.OFF results: list[tuple[str, TaskStatus]] = [] for station_trl, station_proxy in self._stations.items(): if self._device_power_states.get(station_trl) != PowerState.OFF: results.append((station_trl, station_proxy.off()[0])) self._powering_resources.add(station_trl) # Did any MccsStations reject our request? for result in results: if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED): if task_callback: task_callback( status=result[1], result=( _task_to_result[result[1]], f"{result[0]} responded to OFF command with {result[1]}", ), ) return # All MccsStations not OFF already are turning OFF, let's wait # for that to happen. wait_for_stations = self._wait_for_power_state(600) if wait_for_stations == TaskStatus.FAILED: if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "Timed out waiting for MccsStations to turn off", ), ) return if wait_for_stations == TaskStatus.ABORTED: if task_callback: task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, "Aborted waiting for MccsStations to turn off", ), ) return if task_callback: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The off command has completed"), )
[docs] @check_communicating @lock_power_state def standby( # type: ignore[override] self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Put the MCCS subsystem into low power standby mode. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) results = [station_proxy.standby() for station_proxy in self._stations.values()] completed = True for result in results: if result[0] == TaskStatus.FAILED: completed = False break if task_callback: if completed: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The standby command has completed"), ) else: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "The standby command has failed"), )
# pylint: disable=too-many-branches
[docs] @check_communicating @lock_power_state def on( # type: ignore[override] self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn on the MCCS subsystem. The procedure involves turning on all Stations. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) # Command all our MccsStations to turn ON which aren't already ON. self._desired_power_state = PowerState.ON results: list[tuple[str, TaskStatus]] = [] for station_trl, station_proxy in self._stations.items(): if self._device_power_states.get(station_trl) != PowerState.ON: results.append((station_trl, station_proxy.on()[0])) self._powering_resources.add(station_trl) # Did any MccsStations reject our request? for result in results: if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED): if task_callback: task_callback( status=result[1], result=( _task_to_result[result[1]], f"{result[0]} responded to ON command with {result[1]}", ), ) return # All MccsStations not ON already are turning ON, let's wait # for that to happen. wait_for_stations = self._wait_for_power_state(600) if wait_for_stations == TaskStatus.FAILED: if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "Timed out waiting for MccsStations to turn on", ), ) return if wait_for_stations == TaskStatus.ABORTED: if task_callback: task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, "Aborted waiting for MccsStations to turn on", ), ) return # All of our MccsStations are now ON. So we must do required # Initialisation/Synchronisation. self._started_acquisition = False # All MccsStations already Synchronised, we're happy to call the job done. if all( station_proxy.issynchronised for station_proxy in self._stations.values() ): self.logger.debug("All stations synchronsied.") self._started_acquisition = True elif all( station_proxy.isinitialised for station_proxy in self._stations.values() ): self.logger.debug("Reinitialising tiles, temporary hack") initialised = self._intialise_stations() if initialised: self.logger.debug( "All stations initialised, beginning synchronisation." ) self._started_acquisition = self._synchronise_stations() # Our MccsStations are all mismatched, lets re-initialise all of them, # then resynchronise all of them. elif all( station_proxy.isinitialised or station_proxy.issynchronised for station_proxy in self._stations.values() ): self.logger.debug( "Some stations are intialised, others" " synchronised. Reinitialising and synchronising all stations." ) initialised = self._intialise_stations() if initialised: self._started_acquisition = self._synchronise_stations() # If any of our MccsStations are not in Initialised or Synchronised, something # is amiss and we have not succeeded. # There is currently a bug whereby the MccsTile can be # ON when not Initialised or Synchronised # This is temporary MCCS-2104 should resolve this. else: self.logger.warning( "There are stations which are not Initialised or " "Synchronised, they should be in either of these states." ) initialised = self._intialise_stations() if initialised: self._started_acquisition = self._synchronise_stations() if task_callback: if self._started_acquisition: self.logger.info("The On command has completed") task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The On command has completed"), ) else: self.logger.info("The On command has failed") task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "The On command has failed"), )
def _synchronise_stations(self: ControllerComponentManager) -> bool: start_acq_results = [] for station_trl, station_proxy in self._stations.items(): self._desired_synchronisation_state = True start_acq_results += [station_proxy.start_acquisition(json.dumps({}))] self._synchronising_resources.add(station_trl) if all( result not in (TaskStatus.REJECTED, TaskStatus.ABORTED) for result in start_acq_results ) and self._wait_for_station_synchronisation_state(30) not in ( TaskStatus.FAILED, TaskStatus.ABORTED, ): return True return False def _intialise_stations(self: ControllerComponentManager) -> bool: initialise_results = [] for station_trl, station_proxy in self._stations.items(): self._desired_initialisation_state = True initialise_results += [station_proxy.initialise()] self._initialising_resources.add(station_trl) if all( result not in (TaskStatus.REJECTED, TaskStatus.ABORTED) for result in initialise_results ) and self._wait_for_initialisation_state(120) not in ( TaskStatus.FAILED, TaskStatus.ABORTED, ): return True return False # pylint: disable=too-many-locals # pylint: disable=too-many-statements
[docs] @check_communicating @check_on def allocate( self: ControllerComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, subarray_id: int, subarray_beams: list[dict], ) -> None: """ Allocate and distribute a set of unallocated MCCS resources to a subarray. The kwargs argument specifies the overall sub-array composition in terms of which stations should be allocated to the specified subarray_beam. It contains: :param interface: the schema version this is running against. :param subarray_id: int, ID of the subarray which requires allocation :param subarray_beams: list of dictionaries, each sepcifying a beam allocation :param task_callback: callback to signal end of command """ self._assign_resources( subarray_id, subarray_beams, task_callback=task_callback, )
def _create_station_id_trl_map(self: ControllerComponentManager) -> None: """ Populate a dictionary with mapping from station_id to TRL. :raises AttributeError: If the station does not have expected station_id property. """ tmp_map = {} station_id_property = "StationId" for station_trl, station in self._stations.items(): assert station._proxy is not None # for the type checker station_id = station._proxy.stationID if station_id: tmp_map[station_id] = station_trl else: raise AttributeError( f"Station {station_trl} has no property value {station_id_property}" ) self._station_id_trl_map = tmp_map # pylint: disable=too-many-return-statements def _allocate_controller_resources( self: ControllerComponentManager, subarray_id: int, subarray_beams: list[dict], task_abort_event: Optional[threading.Event] = None, ) -> list[dict]: """ Allocate a set of unallocated MCCS resources to a subarray. :param subarray_id: int, ID of the subarray which requires allocation :param subarray_beams: list of dictionaries, each specifying a beam allocation :param task_abort_event: optional event to check for task abort :return: updates list dictionaries, each specifying a beam allocation or a single dictionary with a single "Error" element """ subarray_trl = self._subarray_trl.get(subarray_id, None) if subarray_trl is None: return [{"Error": f"Subarray {subarray_id} not present"}] if not subarray_beams: return [{"Error": "No beams defined"}] self.logger.debug( f"Device {subarray_trl} power: " f"{self._subarrays[subarray_trl]._component_state['power']}" ) self._desired_obs_state[subarray_trl] = ObsState.EMPTY self._configuring_resources[subarray_trl].add(subarray_trl) self._release_from_subarray(subarray_trl) if ( self._wait_for_obs_state(subarray_trl, self._obs_command_timeout) == TaskStatus.FAILED ): return [{"Error": f"Subarray {subarray_id} didn't reach EMPTY"}] # Map from the station_id to station_trl if not self._station_id_trl_map: self._create_station_id_trl_map() # check that devices are available. Drop from the configuration those who are # not physically present first_subarray_channel = 0 subarray_beam_list: list[str] = [] station_beam_list: list[str] = [] for beam in list(subarray_beams): # copy to allow safe removal local_subarray_beam_id = beam["subarray_beam_id"] trl = subarraybeam_trl_from_ids(subarray_id, local_subarray_beam_id) self.logger.debug( f"Start processing beam local={local_subarray_beam_id}, " f"(subarray={subarray_id}) -> trl={trl}", ) if trl not in self._subarray_beams: subarray_beams.remove(beam) self.logger.warning( f"Subarray beam {local_subarray_beam_id} (trl {trl}) not present " f"in subarray {subarray_id}, dropped from allocation" ) continue self.logger.debug(f"Allocated beam {trl}") subarray_beam_list.append(trl) requested_channels = beam["number_of_channels"] number_of_blocks = (requested_channels + 7) // 8 number_of_channels = number_of_blocks * 8 if number_of_channels != requested_channels: self.logger.warning( f"Number of channels requested for {trl} ({requested_channels}) " "is not a multiple of 8; " f"{number_of_channels} channels will be allocated", ) beam["first_subarray_channel"] = first_subarray_channel beam["number_of_channels"] = number_of_channels # # Allocate resources to apertures. Each aperture requires # - one station beam device # - one station hardware beam # - <number_of_blocks> blocks of station channels # for aperture in beam["apertures"]: station_id = aperture["station_id"] self.logger.debug(f"Start processing station {station_id}") self.logger.debug( f"chans {first_subarray_channel}:" f"{number_of_channels}" ) station_trl = self._station_id_trl_map.get(station_id, None) if station_trl is None: self.logger.error(f"Cannot allocate resources: {station_id}") return [{"Error": f"Cannot allocate resources: {station_id}"}] aperture["station_trl"] = station_trl station_name = station_trl.split(r"/")[-1] available_blocks = self._stations[station_trl].get_available_blocks() try: beam_id = aperture["hardware_beam"] = self._stations[ station_trl ].allocate_beam(subarray_trl) aperture["channel_blocks"] = self._stations[ station_trl ].allocate_blocks(subarray_trl, number_of_blocks) beam_trls = self.station_beam_trl.get(station_name, None) if beam_trls is None: msg = f"Cannot allocate station beam: {station_id}, {beam_id}" self.logger.error(msg) return [{"Error": msg}] beam_trl = beam_trls.get(beam_id + 1, None) if beam_trl is None: msg = f"Cannot allocate station beam: {station_id}, {beam_id}" self.logger.error(msg) return [{"Error": msg}] station_beam_list.append(beam_trl) except ValueError as err: error_response = ( "Failed to allocate controller resources due to the following" f" error: {str(err)}. Requested number of channels/blocks: " f"{number_of_channels}/{number_of_blocks} out of " f"{available_blocks*8}/{available_blocks}, " f"on {station_trl} for beam {local_subarray_beam_id} in " f"subarray {subarray_id} and aperture " f"{aperture['aperture_id']}" ) self.logger.error(error_response) return [{"Error": error_response}] aperture["station_beam_trl"] = beam_trl self.logger.debug(f"Allocated station beam {beam_trl}") first_subarray_channel += number_of_channels self.logger.debug( f"Allocating to {subarray_trl}: {subarray_beam_list}," f"{station_beam_list}" ) try: self._resource_manager.allocate( subarray_trl, subarray_beams=subarray_beam_list, station_beams=station_beam_list, ) except ValueError as e: self._resource_manager.resource_pool.free_resources( {"station_beams": station_beam_list} ) self.logger.error(str(e)) return [{"Error": str(e)}] return subarray_beams def _check_aborted( self: ControllerComponentManager, task_abort_event: Optional[threading.Event], task_callback: Optional[Callable], method_name: str, ) -> bool: if task_abort_event is None or task_callback is None: self.logger.warning(f"Cannot check {method_name} for abort status") return False if task_abort_event.is_set(): self.logger.info(f"{method_name} has been aborted") task_callback( status=TaskStatus.ABORTED, result=(ResultCode.ABORTED, f"{method_name} aborted"), ) return True return False def _assign_resources( self: ControllerComponentManager, subarray_id: int, subarray_beams: list[dict], task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Assign resources to a subarray. :param subarray_id: id of the subarray to which resources are to be allocated :param subarray_beams: list of subarray beam resources as dictionary :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) self._assigning_subarray = subarray_id if self._check_aborted(task_abort_event, task_callback, "AssignResources"): self._assigning_subarray = None return self.logger.debug(f"Allocate command started for subarray {subarray_id}") self.logger.debug(f"subarray definition: {subarray_beams}") # Allocate resources in controller resource managers, # and update the beam descriptors # If failed, return error message new_subarray_beams = self._allocate_controller_resources( subarray_id, subarray_beams, task_abort_event ) message = new_subarray_beams[0].get("Error", None) if message: if task_callback: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, message) ) return for beam in new_subarray_beams: if self._check_aborted(task_abort_event, task_callback, "AssignResources"): self._assigning_subarray = None return local_id = beam["subarray_beam_id"] beam_trl = self._subarray_beam_trl[(subarray_id, local_id)] self._subarray_beams[beam_trl].set_parent_trl( self._subarray_trl[subarray_id] ) result_code = self._subarray_beams[beam_trl].assign_resources( { "subarray_id": subarray_id, "subarray_beam_id": beam["subarray_beam_id"], "apertures": beam["apertures"], "first_subarray_channel": beam["first_subarray_channel"], "number_of_channels": beam["number_of_channels"], } ) if result_code == ResultCode.FAILED: self.release(subarray_id=subarray_id) if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "The SubarrayBeam.assign_resources command has failed", ), ) return for aperture in beam["apertures"]: if self._check_aborted( task_abort_event, task_callback, "AssignResources" ): self._assigning_subarray = None return station_beam_trl = aperture["station_beam_trl"] self._station_beams[station_beam_trl].set_parent_trl( self._subarray_trl[subarray_id] ) args = { "subarray_id": subarray_id, "subarray_beam_id": beam["subarray_beam_id"], "station_id": aperture["station_id"], "aperture_id": aperture["aperture_id"], "channel_blocks": aperture["channel_blocks"], "hardware_beam": aperture["hardware_beam"], "station_trl": aperture["station_trl"], "first_subarray_channel": beam["first_subarray_channel"], "number_of_channels": beam["number_of_channels"], } result_code = self._station_beams[station_beam_trl].assign_resources( args ) if result_code == ResultCode.FAILED: self.release(subarray_id=subarray_id) if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "StationBeam.assign_resources command has failed", ), ) return # Add beam TRL to configuration, so that subarray knows mapping # between subarray beams ID and TRL beam["subarray_beam_trl"] = beam_trl subarray_trl = self._subarray_trl[subarray_id] if self._check_aborted(task_abort_event, task_callback, "AssignResources"): self._assigning_subarray = None return result_code = self._subarrays[subarray_trl].assign_resources( subarray_id, new_subarray_beams ) self._desired_obs_state[subarray_trl] = ObsState.IDLE self._configuring_resources[subarray_trl].add(subarray_trl) if ( self._wait_for_obs_state( subarray_trl, self._obs_command_timeout, task_abort_event ) == TaskStatus.FAILED ): if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, f"Subarray {subarray_id} didn't reach IDLE", ), ) return if self._check_aborted(task_abort_event, task_callback, "AssignResources"): self._assigning_subarray = None return if task_callback: if result_code == ResultCode.FAILED: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "The AssignResources command has failed", ), ) else: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The AssignResources command has completed"), ) self._assigning_subarray = None
[docs] @check_communicating @check_on def trigger_adc_equalisation( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, *, station_args: list[dict], ) -> None: """ Trigger adc equalisation. :param station_args: a list of station ids :param task_callback: callback to signal end of command :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) self.logger.debug( "Starting station equalisation for the following stations: " f"{station_args}" ) if not self._station_id_trl_map: self._create_station_id_trl_map() # Given no arguments, the command assumes users meant to equalise all stations if len(station_args) == 0: for i in range(len(self._station_id_trl_map)): station_args.append({"station_id": i + 1}) apply_configuration_commands = MccsCompositeCommandProxy(self.logger) station_args_failed: list = [] for station_arg in station_args: station_id = station_arg.get("station_id", None) # Input validation if station_id is None: self.logger.error(f"Station ID not supplied for values {station_arg}") station_args_failed.append(station_arg) continue if station_id not in self._station_id_trl_map: self.logger.error(f"Station with ID {station_id} is not available") station_args_failed.append(station_arg) continue # If the target adc is not supplied, use the default calculated value if "target_adc" not in station_arg.keys(): station_arg["target_adc"] = self.calculate_target_adc() station_trl = self._station_id_trl_map[station_id] del station_arg["station_id"] # not part of subcommand apply_configuration_commands += MccsCommandProxy( device_name=station_trl, command_name="TriggerAdcEqualisation", logger=self.logger, default_args=json.dumps(station_arg), ) if len(station_args_failed) != len(station_args): result, message = apply_configuration_commands( command_evaluator=CompositeCommandResultEvaluator(), timeout=self._obs_command_timeout, ) status = TaskStatus.COMPLETED if 0 < len(station_args_failed) < len(station_args): # Some of the arguments where correct, so the command executed for those # stations. Add a list of the incorrect arguments to the result of the # command. message = ( f"Adc Equalisation command finished with result: {message}; but had " f"the following incorrect arguments: {station_args_failed}" ) self.logger.error(message) if len(station_args_failed) == len(station_args): # Overwrite both result code and message. The command cannot be run. message = ( "Rejected Adc Equalisation command: all given arguments for the station" f" where incorrect: {station_args_failed}" ) self.logger.error(message) result = ResultCode.REJECTED status = TaskStatus.REJECTED if task_callback: task_callback( status=status, result=(ResultCode(result), message), )
[docs] @check_communicating @check_on def release( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, *, subarray_id: int, **kwargs: Any, ) -> None: """ Release a subarray's resources. :param subarray_id: ID of the subarray which requires release :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :param kwargs: additional arguments """ if self.power_state != PowerState.ON: if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "Controller is not turned on", ), ) return if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) subarray_trl = self._subarray_trl.get(subarray_id, None) if subarray_trl is None: if task_callback: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, f"Subarray {subarray_id} not present"), ) return result_code = self._release_from_subarray(subarray_trl) if task_callback: task_callback( status=( TaskStatus.COMPLETED if result_code == ResultCode.OK else TaskStatus.FAILED ), result=(result_code, f"Release command finished: {result_code.name}"), )
[docs] def release_all( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Release all subarrays resources. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if self.power_state != PowerState.ON: if task_callback: task_callback((TaskStatus.FAILED, "Controller is not nturned on.")) return if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) failures = [] for subarray_trl in self._subarrays.keys(): result_code = self._release_from_subarray(subarray_trl) if result_code != ResultCode.OK: failures.append((subarray_trl, result_code.name)) overall_result = ResultCode.OK if not failures else ResultCode.FAILED if failures: self.logger.error( f"Release all command had failures: {failures}", ) message = f"Failures during release of subarrays: {failures}" else: message = "Release all command completed successfully." if task_callback: task_callback( status=( TaskStatus.COMPLETED if overall_result == ResultCode.OK else TaskStatus.FAILED ), result=(overall_result, message), )
def _release_from_subarray( self: ControllerComponentManager, subarray_trl: str, ) -> ResultCode: """ Release resources from a subarray. TODO: Actually forward the ReleaseAllResources() to subdevices :param subarray_trl: subarray from which resources must be released :return: a result code """ result_code = self._subarrays[subarray_trl].release_all_resources() resources = self._resource_manager.get_allocated(subarray_trl) self.logger.debug( f"Releasing resources for subarray {subarray_trl}: {resources}" ) for station_beam_trl in resources.get("station_beams", []): self._station_beams[station_beam_trl].reset_parent_trl() for subarray_beam_trl in resources.get("subarray_beams", []): self._subarray_beams[subarray_beam_trl].reset_parent_trl() for station in self._stations.values(): station.release_from_subarray(subarray_trl) trls = resources.get("station_beams", None) if trls is not None: self._resource_manager.resource_pool.free_resources({"station_beams": trls}) self._resource_manager.deallocate_from(subarray_trl) return result_code
[docs] def get_resources(self: ControllerComponentManager, subarray_id: int) -> str: """ Return a dictionary of the resources assigned to a given subarray. :param subarray_id: The subarray ID of the resources :return: json formatted dictionary :raises ValueError: if subarray_id is not valid """ try: trl = self._subarray_trl[subarray_id] except ValueError as e: raise ValueError( "Please use a valid Subarray ID. Subarrays are 1-indexed." f"Valid Subarray IDs are 1 to {len(self._subarrays)}" ) from e resources = json.dumps(self._resource_manager.get_allocated(trl)) self.logger.debug(f"get_resources for {trl}: {resources}") return resources
[docs] @check_communicating @check_on def restart_subarray( self: ControllerComponentManager, subarray_id: int, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Restart an MCCS subarray. :param subarray_id: an integer subarray_id. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if self.power_state != PowerState.ON: if task_callback: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "Controller is not turned on."), ) return subarray_trl = self._subarray_trl[subarray_id] if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) self._configuring_resources[subarray_trl].clear() self._check_subdevice_obs_states() # Local cache can get out of sync. task_status = self._abort_subdevices(subarray_trl) if task_status == TaskStatus.COMPLETED: task_status = self._restart_subdevices(subarray_trl) resources = self._resource_manager.get_allocated(subarray_trl) for station_beam_trl in resources.get("station_beams", []): self._station_beams[station_beam_trl].reset_parent_trl() for subarray_beam_trl in resources.get("subarray_beams", []): self._subarray_beams[subarray_beam_trl].reset_parent_trl() if task_status == TaskStatus.COMPLETED: self._resource_manager.deallocate_from(subarray_trl) if task_callback: if task_status == TaskStatus.FAILED: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "The restart command has failed"), ) else: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The restart command has completed"), )
[docs] @check_communicating @check_on def abort_subarray( self: ControllerComponentManager, subarray_id: int, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Abort an MCCS subarray. :param subarray_id: an integer subarray_id. :param task_callback: Update task state, defaults to None :return: A tuple of (TaskStatus, message string). """ if len(self._subarrays.values()) == 0: if task_callback: task_callback( status=TaskStatus.REJECTED, result=( ResultCode.REJECTED, "No subservient subarray devices to abort", ), ) return (TaskStatus.REJECTED, "No subservient subarray devices to abort") if self.power_state != PowerState.ON: if task_callback: task_callback( status=TaskStatus.FAILED, result_code=(ResultCode.FAILED, "Controller is not turned on"), ) return (TaskStatus.FAILED, "Controller is not turned on") # Spawn a raw thread to bypass the TaskExecutor queue — abort must # run immediately, not wait behind already-queued tasks. threading.Thread( target=self._abort_subarray, kwargs={"subarray_id": subarray_id, "task_callback": task_callback}, ).start() if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort_callback( self: ControllerComponentManager, *, status: Optional[TaskStatus] = None, exception: Optional[Exception] = None, **kwargs: Any, ) -> None: if exception is not None: self.logger.error(f"abort_tasks raised exception: {repr(exception)}") if status == TaskStatus.COMPLETED: self.logger.debug("abort_tasks has finished, setting flag") self._abort_complete.set() def _abort_subarray( self: ControllerComponentManager, subarray_id: int, task_callback: Optional[Callable] = None, ) -> None: """ Abort an MCCS subarray. :param subarray_id: subarray_id to abort. :param task_callback: Update task state, defaults to None """ with EnsureOmniThread(): if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) task_status = TaskStatus.COMPLETED if self._assigning_subarray == subarray_id: self.abort_tasks(self._abort_callback) # type:ignore # Wait until the timeout for the Event to be set by abort_tasks timeout = 10.0 if self._abort_complete.wait(timeout=timeout): self._abort_complete.clear() else: self.logger.error(f"Abort timed out in {timeout} seconds.") task_status = TaskStatus.FAILED # Reinitialize our event. # The old one could get set at any point now. self._abort_complete = threading.Event() if task_status == TaskStatus.COMPLETED: self.logger.debug(f"Aborting subarray {subarray_id}") task_status = self._abort_subdevices(self._subarray_trl[subarray_id]) if task_callback: task_callback( status=task_status, result=( ( ResultCode.OK if task_status == TaskStatus.COMPLETED else ResultCode.FAILED ), f"Abort command finished: {task_status.name}", ), ) def _abort_subdevices( self: ControllerComponentManager, subarray_trl: str, ) -> TaskStatus: resources_allocated = self._resource_manager.get_allocated(subarray_trl) self._desired_obs_state[subarray_trl] = ObsState.ABORTED if "station_beams" in resources_allocated: for station_beam_trl in resources_allocated["station_beams"]: station_beam_proxy = self._station_beams[station_beam_trl] # If a subdevice is already RESTARTING then it's about to be EMPTY. if self._device_obs_states[station_beam_trl] not in [ ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ObsState.EMPTY, ObsState.RESTARTING, ]: self.logger.info(f"Aborting {station_beam_trl}") # getattr(self._subarrays[subarray_trl]._proxy, "obsState", None) try: station_beam_proxy.abort() self._configuring_resources[subarray_trl].add(station_beam_trl) except DevFailed as e: # If we're in the wrong ObsState we can get here. # This happens if the device has changed ObsState but # self._device_obs_states has not been updated yet. self.logger.warning("Could not execute command: %s", repr(e)) if "subarray_beams" in resources_allocated: for subarray_beam_trl in resources_allocated["subarray_beams"]: subarray_beam_proxy = self._subarray_beams[subarray_beam_trl] # If a subdevice is already RESTARTING then it's about to be EMPTY. if self._device_obs_states[subarray_beam_trl] not in [ ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ObsState.EMPTY, ObsState.RESTARTING, ]: self.logger.info(f"Aborting {subarray_beam_trl}") try: subarray_beam_proxy.abort_device() self._configuring_resources[subarray_trl].add(subarray_beam_trl) except DevFailed as e: self.logger.warning("Could not execute command: %s", repr(e)) if self._device_obs_states[subarray_trl] not in [ ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ObsState.EMPTY, ObsState.RESTARTING, ]: self.logger.info(f"Aborting {subarray_trl}") try: self._subarrays[subarray_trl].abort_device() self._configuring_resources[subarray_trl].add(subarray_trl) except DevFailed as e: self.logger.warning("Could not execute command: %s", repr(e)) return self._wait_for_obs_state(subarray_trl, self._obs_command_timeout) def _restart_subdevices( self: ControllerComponentManager, subarray_trl: str, ) -> TaskStatus: resources_allocated = self._resource_manager.get_allocated(subarray_trl) self._desired_obs_state[subarray_trl] = ObsState.EMPTY if "station_beams" in resources_allocated: for station_beam_trl in resources_allocated["station_beams"]: station_beam_proxy = self._station_beams[station_beam_trl] if self._device_obs_states[station_beam_trl] in [ ObsState.FAULT, ObsState.ABORTED, ]: self.logger.info(f"Restarting {station_beam_trl}") self._configuring_resources[subarray_trl].add(station_beam_trl) station_beam_proxy.restart() if "subarray_beams" in resources_allocated: for subarray_beam_trl in resources_allocated["subarray_beams"]: subarray_beam_proxy = self._subarray_beams[subarray_beam_trl] if self._device_obs_states[subarray_beam_trl] in [ ObsState.FAULT, ObsState.ABORTED, ]: self.logger.info(f"Restarting {subarray_beam_trl}") self._configuring_resources[subarray_trl].add(subarray_beam_trl) subarray_beam_proxy.restart() if self._device_obs_states[subarray_trl] in [ ObsState.FAULT, ObsState.ABORTED, ]: self.logger.info(f"Restarting {subarray_trl}") self._configuring_resources[subarray_trl].add(subarray_trl) self._subarrays[subarray_trl].restart() return self._wait_for_obs_state(subarray_trl, self._obs_command_timeout) def _check_subdevice_obs_states(self: ControllerComponentManager) -> None: """Poll subdevices for their ObsState and update local cache.""" def _check_and_update(devices: dict[str, DeviceProxy]) -> None: for trl, device in devices.items(): try: device_obs_state = ObsState(device._proxy.obsstate) # pylint: disable=broad-exception-caught except Exception as e: self.logger.warning( f"Failed to read ObsState for {trl=}, {device=}: {e}" ) continue if self._device_obs_states[trl] != device_obs_state: self.logger.debug( f"Updating {trl} ObsState: " f"{self._device_obs_states[trl]} -> {device_obs_state}" ) self._device_obs_states[trl] = device_obs_state _check_and_update(self._subarrays) _check_and_update(self._subarray_beams) _check_and_update(self._station_beams) def _device_obs_state_changed( self: ControllerComponentManager, trl: str, obs_state: ObsState, ) -> None: old_state = self._device_obs_states.get(trl, None) if old_state: old_name = old_state.name else: old_name = "NONE" new_state = ObsState(obs_state) self._device_obs_states[trl] = new_state for subarray_trl, desired_obs_state in self._desired_obs_state.items(): if ( obs_state == desired_obs_state and trl in self._configuring_resources[subarray_trl] ): self._configuring_resources[subarray_trl].remove(trl) self.logger.debug( f"ObsState for {trl} changed: {old_name} -> {new_state.name}, " f"waiting for {len(self._configuring_resources[subarray_trl])} " "more devices" ) def _station_power_state_changed( self: ControllerComponentManager, trl: str, power_state: PowerState, ) -> None: old_state = self._device_power_states.get(trl, None) if old_state is not None: old_name = old_state.name else: old_name = "NONE" new_state = PowerState(power_state) self._device_power_states[trl] = new_state if power_state == self._desired_power_state and trl in self._powering_resources: self._powering_resources.remove(trl) self.logger.debug( f"State for {trl} changed: {old_name} -> {new_state.name}, " f"waiting for {len(self._powering_resources)} more devices" ) def _wait_for_obs_state( self: ControllerComponentManager, subarray_trl: str, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for sub-device ObsState to reach desired state. :param subarray_trl: TRL of subarray whose sub-devices to monitor :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_obs_state[subarray_trl] is not None resolution = 0.01 # seconds ticks = int(timeout / resolution) # 10 ms resolution while self._configuring_resources[subarray_trl]: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( "Timed out waiting for ObsState " f"{self._desired_obs_state[subarray_trl].name}" f" in {timeout} seconds. Attempting final poll." ) return self._final_poll(subarray_trl) self.logger.debug(f"Waited ObsState for {timeout-ticks*resolution} seconds") return TaskStatus.COMPLETED def _final_poll(self: ControllerComponentManager, subarray_trl: str) -> TaskStatus: """ Direct check of device obsstates as a last attempt before declaring failure. Sometimes it appears we miss change events in production, if for some reason we haven't received all the change events we expected, double check what we think the obsstate is, vs what it actually is on all subdevices that we need to. :param subarray_trl: TRL of subarray whose sub-devices to monitor :returns: a taskstatus dependent on whether or not the device was actually in the correct state. """ for trl in list(self._configuring_resources[subarray_trl]): device = ( self._subarrays.get(trl) or self._subarray_beams.get(trl) or self._station_beams.get(trl) ) if ( device and device._proxy and device._proxy.obsstate == self._desired_obs_state[subarray_trl] ): self.logger.info( f"Change event for {trl} was missed, device was in correct state." ) if self._component_state_callback is not None: self._component_state_callback(missed_event=True) self._configuring_resources[subarray_trl].remove(trl) return ( TaskStatus.COMPLETED if not self._configuring_resources[subarray_trl] else TaskStatus.FAILED ) def _wait_for_power_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for station PowerState to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_power_state is not None resolution = 1 # seconds ticks = int(timeout / resolution) while self._powering_resources: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting for PowerState in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {self._powering_resources} " f"PowerState {self._desired_power_state.name}," f" waiting for {ticks*resolution} more seconds" ) self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds") return TaskStatus.COMPLETED def _wait_for_station_synchronisation_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for station Synchronisation to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_synchronisation_state is not None resolution = 0.5 # seconds ticks = int(timeout / resolution) synchronised = False synchronising_stations = set( station_trl for station_trl in self._stations.keys() ) synchronised_stations = set() self.logger.debug( f"Waiting for stations to synchronise: {synchronising_stations}" ) while not synchronised: # Wait for ALL stations to sync. for station_trl in synchronising_stations: station_proxy = self._stations[station_trl] if not station_proxy.issynchronised: synchronised = False break synchronised_stations.add(station_trl) synchronised = True self.logger.debug(f"{station_trl} achieved synchronisation!") self.logger.debug(f"{ticks*resolution}s remaining on wait time.") if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting {synchronising_stations} " f"for Synchronisation in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {synchronising_stations} Synchronisation" f" {self._desired_synchronisation_state}," f" waiting for {ticks*resolution} more seconds" ) synchronising_stations -= synchronised_stations self.logger.debug( f"Waited Synchronisation for {timeout-ticks*resolution} seconds" ) return TaskStatus.COMPLETED def _wait_for_initialisation_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for station Initialisation to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_initialisation_state is not None resolution = 0.5 # seconds ticks = int(timeout / resolution) initialised = False initialised_stations = set() while not initialised: for station_trl in self._initialising_resources: station_proxy = self._stations[station_trl] if not station_proxy.isinitialised: initialised = False break initialised_stations.add(station_trl) initialised = True if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting {self._initialising_resources} " f"for Initialisation in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {self._initialising_resources} Initialisation" f" {self._desired_initialisation_state}," f" waiting for {ticks*resolution} more seconds" ) self._initialising_resources -= initialised_stations self.logger.debug( f"Waited Initialisation for {timeout-ticks*resolution} seconds" ) return TaskStatus.COMPLETED
[docs] @staticmethod def calculate_target_adc(epoch_time: float | None = None) -> float: """ Calculate the targert RMS for ADC Equalisation. In order to maintain the optimal range for ADC RMS value the telescope requires re-equalisation as the average power received by the antennas changes during sidereal time. This function calculates a target value for ADC re-equalisation, based on the sidereal time. More details can be found in Memo 4: https://confluence.skatelescope.org/pages/viewpage.action?spaceKey=LC&title=Memo+004+-+Antenna+ADC+RMS+target+vs+LST :param epoch_time: Time in seconds since the epcoh. Exists for testing. :return: target ADC RMS value in ADUs """ # Greewitch meridian time gm_time = time.gmtime(epoch_time) # returns a Julian Date in two parts: # ejd1 = day number # ejd2 = time of day # https://pyerfa.readthedocs.io/en/latest/api/erfa.dtf2d.html#erfa.dtf2d ejd1, ejd2 = erfa.dtf2d( "UTC", gm_time.tm_year, gm_time.tm_mon, gm_time.tm_mday, gm_time.tm_hour, gm_time.tm_min, gm_time.tm_sec, ) # https://pyerfa.readthedocs.io/en/latest/api/erfa.gst00b.html#erfa.gst00b # This value is in radians greenwitch_sidereal_time = erfa.gst00b(ejd1, ejd2) # Shift the greewtich time to the correct longitude angle, normalize, and # convert to hours. lst_hr = erfa.anp(greenwitch_sidereal_time + LON_SKA_LOW * np.pi / 180.0) * ( 12.0 / np.pi ) # MEMO 4 equation target_rms = ( 20 + 4.76497 * np.cos(lst_hr / 24 * 2 * np.pi + 1.8347) + 1.17475 * np.cos(2 * lst_hr / 24 * 2 * np.pi - 3.1031) ) return target_rms