Source code for ska_low_mccs.controller.controller_component_manager

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for the MCCS controller."""
# pylint: disable=too-many-lines
from __future__ import annotations

import functools
import json
import logging
import threading
import time
from datetime import datetime
from typing import Any, Callable, Iterable, Optional

from ska_control_model import (
    CommunicationStatus,
    HealthState,
    ObsState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_low_mccs_common.component import (
    DeviceComponentManager,
    ObsDeviceComponentManager,
)
from ska_low_mccs_common.resource_manager import ResourceManager, ResourcePool
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager

from ska_low_mccs.controller.controller_resource_manager import (
    ControllerResourceManager,
)

__all__ = ["ControllerComponentManager"]

_task_to_result = {
    TaskStatus.COMPLETED: ResultCode.OK,
    TaskStatus.ABORTED: ResultCode.ABORTED,
    TaskStatus.FAILED: ResultCode.FAILED,
}

RFC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"


class _StationProxy(DeviceComponentManager):
    """
    A controller's proxy to a station.

    Each station proxy keeps track of channels it has available. Each
    station proxy also has its own resource manager to keep track of
    which of its channels are assigned to each subarray.
    """

    def __init__(
        self: _StationProxy,
        trl: str,
        subarray_trls: Iterable[str],
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
    ) -> None:
        """
        Initialise a new instance.

        :param trl: the TRL of the device
        :param subarray_trls: the TRLs of subarrays which channel
            blocks can be assigned to.

        :param logger: the logger to be used by this object.
        :param communication_state_callback: callback to be
            called when the status of the communications channel
            between the component manager and its component changes

        :param component_state_callback: callback to be
            called when the component state changes

        """
        self._channel_block_pool = ResourcePool(
            channel_blocks=range(48),
            hardware_beams=range(48),
        )
        self._resource_manager = ResourceManager(
            subarray_trls,
            channel_blocks=range(48),
            hardware_beams=range(48),
        )

        super().__init__(
            trl,
            logger,
            communication_state_callback,
            component_state_callback,
        )

    def allocate_blocks(
        self: _StationProxy, subarray_trl: str, channel_blocks: int
    ) -> list:
        """
        Allocate channel blocks to a subarray.

        This method removes the requested number of channel blocks from
        the available pool and assigns them to the provided subarray
        trl.

        :param subarray_trl: The TRL of the subarray to which the
            channel blocks are to be assigned.
        :param channel_blocks: The number of channel blocks to assign to
            the subarray.
        :return: a list of blocks or None if allocation failed
        :raises ValueError: if the resources cannot be allocated
        """
        channel_blocks_to_allocate: list[int] = []
        block = None
        for _ in range(channel_blocks):
            try:
                block = self._channel_block_pool.get_free_resource("channel_blocks")
            except ValueError as exc:
                self._channel_block_pool.free_resources(
                    {"channel_blocks": channel_blocks_to_allocate}
                )
                raise ValueError from exc
            channel_blocks_to_allocate.append(block)  # type: ignore [arg-type]

        self._resource_manager.allocate(
            subarray_trl, channel_blocks=channel_blocks_to_allocate
        )
        return channel_blocks_to_allocate

    def allocate_beam(self: _StationProxy, subarray_trl: str) -> int:
        """
        Allocate a hardware beam to a subarray.

        This method removes one hardware beam from the available pool
        and assigns it to the provided subarray TRL.

        :param subarray_trl: The TRL of the subarray to which the
            hardware beam is to be assigned.
        :return: index of allocated beam, or None
        """
        beam_to_allocate = self._channel_block_pool.get_free_resource("hardware_beams")
        self._resource_manager.allocate(subarray_trl, hardware_beams=[beam_to_allocate])
        return beam_to_allocate  # type: ignore [return-value]

    def release_from_subarray(self: _StationProxy, subarray_trl: str) -> None:
        """
        Release all channel blocks assigned to a subarray.

        Channel blocks and beams are released from the subarray and
        marked as free in the station proxy's device pool for
        reallocation whenever needed.

        :param subarray_trl: The TRL of the subarray from which this
            station proxy's channel blocks are to be released.
        """
        resources_to_release = self._resource_manager.get_allocated(subarray_trl)
        self._channel_block_pool.free_resources(resources_to_release)
        self._resource_manager.deallocate_from(subarray_trl)

    @check_communicating
    def start_acquisition(self: _StationProxy, argin: str) -> ResultCode:
        """
        Start the acquisition synchronously for all tiles, checks for synchronisation.

        :param argin: json dictionary with optional keywords

            * start_time - (str) start time
            * delay - (int) delay start

        :return: result code of StartAcquisition
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.StartAcquisition(argin)
        return result_code

    @check_communicating
    def initialise(self: _StationProxy) -> ResultCode:
        """
        Initialise the station's tiles.

        :return: result code of Initialise
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.Initialise()
        return result_code

    @property
    def issynchronised(self: _StationProxy) -> bool:
        """
        Returns True is all tiles in this station are synchronised.

        :returns: True is all tiles in this station are synchronised
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        return self._proxy._device.issynchronised

    @property
    def isinitialised(self: _StationProxy) -> bool:
        """
        Returns True is all tiles in this station are initialised.

        :returns: True is all tiles in this station are initialised
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        return self._proxy._device.isinitialised


class _SubarrayProxy(ObsDeviceComponentManager):
    """A controller's proxy to a subarray."""

    @check_communicating
    @check_on
    def assign_resources(
        self: _SubarrayProxy, subarray_id: int, subarray_beams: Iterable[dict]
    ) -> ResultCode:
        """
        Tell the subarray what resources are assigned to it.

        The Subarray receives the allocated resources, together with the
        subarray allcation description.
        This implicitly deallocates all resources.
        Subarray is described as a set of station beams

        :param subarray_id: This is mainly used for cross check. Subarray knows its ID
        :param subarray_beams: list of subarray beam description. Structure containing
            * subarray_beam_id: integer
            * subarray_beam_trl: str
            * first_subarray_channel: (int) First logical channel assigned to subarray
            * number_of_channels
            * apertures: list of dictionaries iwith each entry containing
            *   * station_id: (int) in range 1-512
            *   * aperture_id: (str) with format APx.y; x must match station_ID)
            *   * station_beam_trl: (str)
            *   * channel_blocks: Allocated channel blocks for this station
            *   * hardware_beam: Allocated hardware beam for this station

        TRLs of stations are not specified, as they are not used in the Subarray.
        Station beam TRLs could also be omitted.

        :return: a result code.
        """
        assert self._proxy is not None
        args = json.dumps(
            {"subarray_id": subarray_id, "subarray_beams": subarray_beams}
        )
        self.logger.debug(f"Subarray AssignResources: {args}")
        try:
            ([result_code], _) = self._proxy.AssignResources(args)
        except Exception as exc:  # pylint: disable=broad-except
            self.logger.debug(f"Caught exception: {exc}")
            result_code = ResultCode.FAILED
        return result_code

    @check_communicating
    @check_on
    def release_all_resources(
        self: _SubarrayProxy,
    ) -> ResultCode:
        """
        Tell the subarray that it no longer has any resources.

        :return: a result code.
        """
        assert self._proxy is not None

        ([result_code], _) = self._proxy.ReleaseAllResources()
        return result_code

    @check_communicating
    @check_on
    def restart(
        self: _SubarrayProxy,
    ) -> ResultCode:
        """
        Restart the subarray and all related sub_elements.

        :return: a result code.
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Restart()
        return result_code

    @check_communicating
    @check_on
    def abort_device(self: _SubarrayProxy) -> ResultCode:
        """
        Abort this device without aborting subdevices.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.AbortDevice()
        return result_code


class _SubarrayBeamProxy(ObsDeviceComponentManager):
    """A controller's proxy to a subarray beam."""

    @check_communicating
    @check_on
    def assign_resources(
        self: _SubarrayBeamProxy,
        subarray_beam: dict[str, Any],
    ) -> ResultCode:
        """
        Assign resources to the Subarray Beam.

        :param subarray_beam: subarray beam description. Structure containing
            * subarray_id: This is used for cross check. Subarray knows its ID
            * subarray_beam_id: integer
            * first_subarray_channel: (int) First logical channel assigned to subarray
            * number_of_channels: The number of SPS channels assigned to subarray
            * apertures: list of dictionaries with each entry containing
            *   * station_id: (int) in range 1-512
            *   * aperture_id: (str) with format APx.y; x must match station_ID
            *   * station_trl: (str)
            *   * station_beam_trl: (str)
            *   * channel_blocks: Allocated channel blocks for this station
            *   * hardware_beam: Allocated hardware beam for this station

        :return: a result code
        """
        assert self._proxy is not None

        self.logger.debug(f"SubarrayBeam AssignResources: {subarray_beam}")
        args = json.dumps(subarray_beam)
        try:
            result_code = self._proxy.AssignResources(args)
        except Exception as exc:  # pylint: disable=broad-except
            self.logger.error(f"Caught exception {exc}")
            result_code = ResultCode.FAILED
        return result_code

    @check_communicating
    @check_on
    def abort_device(self: _SubarrayBeamProxy) -> ResultCode:
        """
        Abort this device without aborting subdevices.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.AbortDevice()
        return result_code

    @check_communicating
    @check_on
    def restart(self: _SubarrayBeamProxy) -> ResultCode:
        """
        Restart this device without aborting subdevices.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Restart()
        return result_code


class _StationBeamProxy(ObsDeviceComponentManager):
    """A controller's proxy to a station beam."""

    @check_communicating
    @check_on
    def assign_resources(
        self: _StationBeamProxy,
        station_beam: dict[str, Any],
    ) -> ResultCode:
        """
        Assign resources to a station beam.

        :param station_beam: Dictionary with command arguments
            * param subarray_id: The ID of the subarray to which this beam is assigned
            * param subarray_beam_id: The ID of the subarray beam to which this beam
            is assigned
            * param station_id: ID of the station which implemnts the beam
            * param aperture_id: ID of the form APx.y with X equal to the station ID
            * param channel_blocks: Channel blocks allocated to this beam
            * param hardware_beam: Hardware beam allocated to this beam

        :return: a result code
        """
        assert self._proxy is not None

        self.logger.debug(f"StationBeam AssignResources: {station_beam}")
        json_args = json.dumps(station_beam)

        try:
            result_code = self._proxy.AssignResources(json_args)
        except Exception as exc:  # pylint: disable=broad-except
            self.logger.debug(f"Catched exception {exc}")
            result_code = ResultCode.FAILED
        return result_code

    @check_communicating
    @check_on
    def restart(self: _StationBeamProxy) -> ResultCode:
        """
        Restart this device.

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Restart()
        return result_code

    @check_communicating
    @check_on
    def abort(
        self: _StationBeamProxy, task_callback: TaskCallbackType | None = None
    ) -> tuple[TaskStatus, str]:
        """
        Abort this device and its station.

        :param task_callback: Update task state, defaults to None

        :return: a result code
        """
        assert self._proxy is not None
        ([result_code], _) = self._proxy.Abort()
        return result_code


# pylint: disable=too-many-instance-attributes
[docs]class ControllerComponentManager(TaskExecutorComponentManager): """ A component manager for an MCCS controller. This component manager has three jobs: * Monitoring of the devices in the MCCS subsystem * Powering the MCCS subsystem off and on * Allocating resources to subarrays """ # pylint: disable=too-many-arguments
[docs] def __init__( self: ControllerComponentManager, subarray_trls: Iterable[str], station_trls: Iterable[str], subarray_beam_trls: Iterable[str], station_beam_trls: Iterable[str], logger: logging.Logger, obs_command_timeout: int, communication_state_callback: Callable[[CommunicationStatus], None], component_state_callback: Callable[..., None], ) -> None: """ Initialise a new instance. :param subarray_trls: TRLS of all subarray devices :param station_trls: TRLS of all station devices :param subarray_beam_trls: TRLS of all subarray beam devices :param station_beam_trls: TRLS of all station beam devices :param obs_command_timeout: the default timeout for obs commands in seconds. :param logger: the logger to be used by this object. :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_state_callback: callback to be called when the component state changes """ self._communication_state_callback = communication_state_callback self._component_state_callback = component_state_callback self._obs_command_timeout = obs_command_timeout self.__communication_lock = threading.Lock() self._power_state_lock = threading.RLock() self._device_communication_states: dict[str, CommunicationStatus] = {} self._device_obs_states: dict[str, ObsState] = {} self._device_power_states: dict[str, PowerState] = {} self._configuring_resources: set[str] = set() # these resources should go self._desired_obs_state: Optional[ObsState] = None # to this obs state self._powering_resources: set[str] = set() # these stations should go self._desired_power_state: Optional[PowerState] = None # to this power state # this set of stastions should go self._synchronising_resources: set[str] = set() self._desired_synchronisation_state: Optional[ bool ] = None # to this synchronisation state self._initialising_resources: set[str] = set() self._desired_initialisation_state: Optional[bool] = None self._station_ids = [ station_trl.split(r"/")[-1] for station_trl in station_trls ] for trl in subarray_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED for trl in station_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED for trl in subarray_beam_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED for trl in station_beam_trls: self._device_communication_states[trl] = CommunicationStatus.DISABLED self._resource_manager = ControllerResourceManager( subarrays=subarray_trls, subarray_beams=subarray_beam_trls, station_beams=station_beam_trls, channel_blocks=range(1, 49), ) self._subarrays: dict[str, _SubarrayProxy] = { trl: _SubarrayProxy( trl, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), ) for trl in subarray_trls } self._stations: dict[str, _StationProxy] = { trl: _StationProxy( trl, subarray_trls, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), ) for trl in station_trls } self._subarray_beams: dict[str, _SubarrayBeamProxy] = { trl: _SubarrayBeamProxy( trl, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), ) for trl in subarray_beam_trls } self._station_beams: dict[str, _StationBeamProxy] = { trl: _StationBeamProxy( trl, logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), ) for trl in station_beam_trls } # dictionaries to map IDs to TRLs # self._subarray_trl: dict[int, str] = dict(enumerate(subarray_trls, start=1)) self._subarray_beam_trl: dict[int, str] = dict( enumerate(subarray_beam_trls, start=1) ) # Station ID, to Station Beam ID to Station Beam TRL self.station_beam_trl: dict[str, dict[int, str]] = {} for station_id in self._station_ids: # Get all station beam TRLS which are associated with a given station station_beam_trl_in_station = [ station_beam_trl for station_beam_trl in station_beam_trls if station_id in station_beam_trl ] # Map station beam ID to station beam TRL station_beam_ids_in_station = dict( enumerate(station_beam_trl_in_station, start=1) ) # Map station ID to our map of station beam ID to station beam TRL self.station_beam_trl[station_id] = station_beam_ids_in_station self._station_id_trl_map: dict[int, str] = {} self._device_proxies: dict[str, dict] = { "subarray": self._subarrays, "station": self._stations, "subarraybeam": self._subarray_beams, "beam": self._station_beams, } self._started_acquisition = False super().__init__( logger, communication_state_callback, component_state_callback, power=None, fault=None, )
[docs] def start_communicating(self: ControllerComponentManager) -> None: """Establish communication with the station components.""" if self.communication_state == CommunicationStatus.ESTABLISHED: return if self.communication_state == CommunicationStatus.DISABLED: self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) if not self._device_communication_states: self._update_communication_state(CommunicationStatus.ESTABLISHED) else: for subarray_proxy in self._subarrays.values(): subarray_proxy.start_communicating() for station_proxy in self._stations.values(): station_proxy.start_communicating() for subarray_beam_proxy in self._subarray_beams.values(): subarray_beam_proxy.start_communicating() for station_beam_proxy in self._station_beams.values(): station_beam_proxy.start_communicating()
[docs] def stop_communicating(self: ControllerComponentManager) -> None: """Break off communication with the station components.""" if self.communication_state == CommunicationStatus.DISABLED: return self._update_communication_state(CommunicationStatus.DISABLED) self._update_component_state(power=None, fault=None) # Cast the values to list to stop the dict from changing underneath us for subarray_proxy in list(self._subarrays.values()): subarray_proxy.stop_communicating() for station_proxy in self._stations.values(): station_proxy.stop_communicating() for subarray_beam_proxy in list(self._subarray_beams.values()): subarray_beam_proxy.stop_communicating() for station_beam_proxy in list(self._station_beams.values()): station_beam_proxy.stop_communicating()
def _device_communication_state_changed( self: ControllerComponentManager, trl: str, communication_state: CommunicationStatus, ) -> None: """ Handle communication changes. :param trl: TRL of changed device :param communication_state: new status """ if trl not in self._device_communication_states: self.logger.warning( f"Received a communication status changed event for device {trl} " "which is not managed by this controller. " "Probably it was released just a moment ago. " "The event will be discarded." ) return self._device_communication_states[trl] = communication_state if self.communication_state == CommunicationStatus.DISABLED: return self._evaluate_communication_state() def _evaluate_communication_state( self: ControllerComponentManager, ) -> None: # Many callback threads could be hitting this method at the same time, so it's # possible (likely) that the GIL will suspend a thread between checking if it # need to update, and actually updating. This leads to callbacks appearing out # of order, which breaks tests. Therefore we need to serialise access. with self.__communication_lock: if ( CommunicationStatus.DISABLED in self._device_communication_states.values() ): self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) elif ( CommunicationStatus.NOT_ESTABLISHED in self._device_communication_states.values() ): self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) else: self._update_communication_state(CommunicationStatus.ESTABLISHED) self._update_component_state(fault=False) def _evaluate_power_state(self: ControllerComponentManager) -> None: for power_state in [ PowerState.UNKNOWN, PowerState.OFF, PowerState.STANDBY, PowerState.ON, ]: if power_state in self._device_power_states.values(): break self.logger.info( "In ControllerComponentManager._evaluatePowerState with:\n" f"\tdevices: {self._device_power_states}\n" f"\tresult: {str(power_state)}" ) self._update_component_state(power=power_state) @property def power_state(self: ControllerComponentManager) -> Optional[PowerState]: """ Return my power state. :return: my power state """ return self._component_state["power"] def _subarray_health_changed( self: ControllerComponentManager, trl: str, health: HealthState | None, ) -> None: """ Handle a change in the health of a subarray. :param trl: the TRL of the subarray whose health has changed. :param health: the new health state of the subarray, or None if the subarray's health should not be taken into account. """ # What we're really interested in here is whether the subarray is in the right # adminMode. We know that when it's in the wrong adminMode, health will be # reported as None. So instead of subscribing to adminMode, we might as well be # lazy here and get at the adminMode thorough the healthState that we're already # subscribed to. self._resource_manager.set_ready(trl, health is not None) def _subarray_beam_health_changed( self: ControllerComponentManager, trl: str, health: HealthState | None, ) -> None: """ Handle a change in the health of a subarray_beam. :param trl: the TRL of the subarray_beam whose health has changed. :param health: the new health state of the subarray_beam, or None if the subarray_beam's health should not be taken into account. """ def _station_beam_health_changed( self: ControllerComponentManager, trl: str, health: HealthState | None, ) -> None: """ Handle a change in the health of a station_beam. :param trl: the TRL of the station_beam whose health has changed. :param health: the new health state of the station_beam, or None if the station_beam's health should not be taken into account. """
[docs] def get_health_trl( self: ControllerComponentManager, trl: str ) -> Optional[HealthState]: """ Return the health of a subdevice with given TRL. :param trl: TRL of device to return health of :return: health of device given by TRL :raises ValueError: if TRL is for an invalid device """ device = trl.split("/")[1] if ( device not in self._device_proxies or trl not in self._device_proxies[device] ): raise ValueError(f"{trl} is an invalid device.") return self._device_proxies[device][trl].health
[docs] def get_healths( self: ControllerComponentManager, device_type: str = "all" ) -> dict[str, dict[str, Optional[str]]]: """ Return subdevice healths. :param device_type: the type of device to return the health state of its instances. :return: dictionary of [device_type: [TRL : HealthState]] for each device of that device type, or all devices if called without an argument. :raises ValueError: if device_type is not a valid device type. """ devices_healths: dict[str, dict[str, Optional[str]]] = {} if device_type == "all": devices = [*self._device_proxies] else: if device_type not in self._device_proxies: raise ValueError(f"{device_type} is not a valid device type.") devices = [device_type] for device in devices: device_healths: dict[str, Optional[str]] = {} for trl in self._device_proxies[device]: health_trl = self.get_health_trl(trl) if health_trl is not None: health = HealthState(health_trl).name else: health = None device_healths.update({trl: health}) devices_healths.update({device: device_healths}) return devices_healths
[docs] @check_communicating def off( self: ControllerComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Turn off the MCCS subsystem. :param task_callback: Update task state, defaults to None :return: a TaskStatus and message """ if len(self._stations.values()) == 0: return (TaskStatus.REJECTED, "No subservient devices to turn off") return self.submit_task(self._off, task_callback=task_callback)
def _off( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn off the MCCS subsystem. This is simpler than turning ON MCCS, we simply need to command all MccsStations to turn off and wait for it to happen. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) # Command all our MccsStations to turn OFF which aren't already OFF. results: list[tuple[str, TaskStatus]] = [] for station_trl, station_proxy in self._stations.items(): if self._device_power_states[station_trl] != PowerState.OFF: results.append((station_trl, station_proxy.off()[0])) self._powering_resources.add(station_trl) # Did any MccsStations reject our request? for result in results: if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED): if task_callback: task_callback( status=result[1], result=( _task_to_result[result[1]], f"{result[0]} responded to OFF command with {result[1]}", ), ) return # All MccsStations not OFF already are turning OFF, let's wait # for that to happen. self._desired_power_state = PowerState.OFF wait_for_stations = self._wait_for_power_state(600) if wait_for_stations == TaskStatus.FAILED: if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "Timed out waiting for MccsStations to turn off", ), ) return if wait_for_stations == TaskStatus.ABORTED: if task_callback: task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, "Aborted waiting for MccsStations to turn off", ), ) return if task_callback: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The off command has completed"), )
[docs] @check_communicating def standby( self: ControllerComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Put the MCCS subsystem in standby mode. :param task_callback: Update task state, defaults to None :return: task status and message """ if len(self._stations.values()) == 0: return ( TaskStatus.REJECTED, "No subservient devices to put into standby", ) return self.submit_task(self._standby, task_callback=task_callback)
def _standby( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Put the MCCS subsystem into low power standby mode. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) results = [station_proxy.standby() for station_proxy in self._stations.values()] completed = True for result in results: if result[0] == TaskStatus.FAILED: completed = False break if task_callback: if completed: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The standby command has completed"), ) else: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "The standby command has failed"), )
[docs] @check_communicating def on( self: ControllerComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Turn on the MCCS subsystem. :param task_callback: Update task state, defaults to None :return: task status and message """ if len(self._stations.values()) == 0: return (TaskStatus.REJECTED, "No subservient devices to turn on") return self.submit_task(self._on, task_callback=task_callback)
# pylint: disable=too-many-branches def _on( self: ControllerComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn on the MCCS subsystem. The procedure involves turning on all Stations. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) # Command all our MccsStations to turn ON which aren't already ON. results: list[tuple[str, TaskStatus]] = [] for station_trl, station_proxy in self._stations.items(): if self._device_power_states[station_trl] != PowerState.ON: results.append((station_trl, station_proxy.on()[0])) self._powering_resources.add(station_trl) # Did any MccsStations reject our request? for result in results: if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED): if task_callback: task_callback( status=result[1], result=( _task_to_result[result[1]], f"{result[0]} responded to ON command with {result[1]}", ), ) return # All MccsStations not ON already are turning ON, let's wait # for that to happen. self._desired_power_state = PowerState.ON wait_for_stations = self._wait_for_power_state(600) if wait_for_stations == TaskStatus.FAILED: if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "Timed out waiting for MccsStations to turn on", ), ) return if wait_for_stations == TaskStatus.ABORTED: if task_callback: task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, "Aborted waiting for MccsStations to turn on", ), ) return # All of our MccsStations are now ON. So we must do required # Initialisation/Synchronisation. self._started_acquisition = False # All MccsStations already Synchronised, we're happy to call the job done. if all( station_proxy.issynchronised for station_proxy in self._stations.values() ): self.logger.debug("All stations synchronsied.") self._started_acquisition = True elif all( station_proxy.isinitialised for station_proxy in self._stations.values() ): self.logger.debug("Reinitialising tiles, temporary hack") initialised = self._intialise_stations() if initialised: self.logger.debug( "All stations initialised, beginning synchronisation." ) self._started_acquisition = self._synchronise_stations() # Our MccsStations are all mismatched, lets re-initialise all of them, # then resynchronise all of them. elif all( station_proxy.isinitialised or station_proxy.issynchronised for station_proxy in self._stations.values() ): self.logger.debug( "Some stations are intialised, others" " synchronised. Reinitialising and synchronising all stations." ) initialised = self._intialise_stations() if initialised: self._started_acquisition = self._synchronise_stations() # If any of our MccsStations are not in Initialised or Synchronised, something # is amiss and we have not succeeded. # There is currently a bug whereby the MccsTile can be # ON when not Initialised or Synchronised # This is temporary MCCS-2104 should resolve this. else: self.logger.warning( "There are stations which are not Initialised or " "Synchronised, they should be in either of these states." ) initialised = self._intialise_stations() if initialised: self._started_acquisition = self._synchronise_stations() if task_callback: if self._started_acquisition: self.logger.info("The On command has completed") task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The On command has completed"), ) else: self.logger.info("The On command has failed") task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "The On command has failed"), ) def _synchronise_stations(self: ControllerComponentManager) -> bool: start_acq_results = [] start_time = datetime.strftime( datetime.fromtimestamp(int(time.time()) + 2), RFC_FORMAT ) for station_trl, station_proxy in self._stations.items(): self._desired_synchronisation_state = True start_acq_results += [ station_proxy.start_acquisition(json.dumps({"start_time": start_time})) ] self._synchronising_resources.add(station_trl) if all( result not in (TaskStatus.REJECTED, TaskStatus.ABORTED) for result in start_acq_results ) and self._wait_for_station_synchronisation_state(30) not in ( TaskStatus.FAILED, TaskStatus.ABORTED, ): return True return False def _intialise_stations(self: ControllerComponentManager) -> bool: initialise_results = [] for station_trl, station_proxy in self._stations.items(): self._desired_initialisation_state = True initialise_results += [station_proxy.initialise()] self._initialising_resources.add(station_trl) if all( result not in (TaskStatus.REJECTED, TaskStatus.ABORTED) for result in initialise_results ) and self._wait_for_initialisation_state(120) not in ( TaskStatus.FAILED, TaskStatus.ABORTED, ): return True return False # pylint: disable=too-many-locals # pylint: disable=too-many-statements
[docs] @check_communicating @check_on def allocate( self: ControllerComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, subarray_id: int, subarray_beams: list[dict], ) -> tuple[TaskStatus, str]: """ Allocate and distribute a set of unallocated MCCS resources to a subarray. The kwargs argument specifies the overall sub-array composition in terms of which stations should be allocated to the specified subarray_beam. It contains: :param interface: the schema version this is running against. :param subarray_id: int, ID of the subarray which requires allocation :param subarray_beams: list of dictionaries, each sepcifying a beam allocation :param task_callback: callback to signal end of command :return: A tuple containing a task status and a unique id """ # # check that some resources are present # self.logger.debug(f"Allocate command started for subarray {subarray_id}") self.logger.debug(f"subarray definition: {subarray_beams}") return self.submit_task( self._assign_resources, args=[subarray_id, subarray_beams], task_callback=task_callback, )
def _create_station_id_trl_map(self: ControllerComponentManager) -> None: """ Populate a dictionary with mapping from station_id to TRL. :raises AttributeError: If the station does not have expected station_id property. """ tmp_map = {} station_id_property = "StationId" for station_trl, station in self._stations.items(): assert station._proxy is not None # for the type checker station_id = station._proxy.get_property(station_id_property)[ station_id_property ] if station_id: # station_id looks like ['4'] tmp_map[int(station_id[0])] = station_trl else: raise AttributeError( f"Station {station_trl} has no property value {station_id_property}" ) self._station_id_trl_map = tmp_map # pylint: disable=too-many-return-statements def _allocate_controller_resources( self: ControllerComponentManager, subarray_id: int, subarray_beams: list[dict], ) -> list[dict]: """ Allocate a set of unallocated MCCS resources to a subarray. :param subarray_id: int, ID of the subarray which requires allocation :param subarray_beams: list of dictionaries, each specifying a beam allocation :return: updates list dictionaries, each specifying a beam allocation or a single dictionary with a single "Error" element """ subarray_trl = self._subarray_trl.get(subarray_id, None) if subarray_trl is None: return [{"Error": f"Subarray {subarray_id} not present"}] if len(subarray_beams) == 0: return [{"Error": "No beams defined"}] # first release all resources in resource managers self.logger.debug( f"Device {subarray_trl} power: " f"{self._subarrays[subarray_trl]._component_state['power']}" ) self._release_from_subarray(subarray_trl) self._desired_obs_state = ObsState.EMPTY self._configuring_resources.add(subarray_trl) if self._wait_for_obs_state(self._obs_command_timeout) == TaskStatus.FAILED: return [{"Error": f"Subarray {subarray_id} didn't reach EMPTY"}] # Map from the station_id to station_trl if not self._station_id_trl_map: self._create_station_id_trl_map() # chech that devices are available. Drop from the configuration those who are # not physically present # subarray_beams = kwargs.get("subarray_beams", []) first_subarray_channel = 0 subarray_beam_list = [] station_beam_list = [] for beam in subarray_beams: subarray_beam_id = beam["subarray_beam_id"] self.logger.debug(f"Start processing beam {subarray_beam_id}") trl = self._subarray_beam_trl.get(subarray_beam_id, None) if trl is None: subarray_beams.remove(beam) self.logger.warning( f"Subarray beam {subarray_beam_id} not present, " "dropped from allocation" ) continue # self.logger.debug(f"Allocated beam {trl}") subarray_beam_list.append(trl) requested_channels = beam["number_of_channels"] number_of_blocks = (requested_channels + 7) // 8 number_of_channels = number_of_blocks * 8 if number_of_channels != requested_channels: self.logger.warning( f"Number of channels for beam {subarray_beam_id} is " f"{requested_channels}, not a multiple of 8" f"{number_of_channels} channels will be allocated" ) number_of_channels = number_of_blocks * 8 beam["first_subarray_channel"] = first_subarray_channel beam["number_of_channels"] = number_of_channels # # Allocate resources to apertures. Each aperture requires # - one station beam device # - one station hardware beam # - <number_of_blocks> blocks of station channels # for aperture in beam["apertures"]: station_id = aperture["station_id"] self.logger.debug(f"Start processing station {station_id}") self.logger.debug( f"chans {first_subarray_channel}:" f"{number_of_channels}" ) station_trl = self._station_id_trl_map.get(station_id, None) if station_trl is None: self.logger.error(f"Cannot allocate resources: {station_id}") return [{"Error": f"Cannot allocate resources: {station_id}"}] aperture["station_trl"] = station_trl station_name = station_trl.split(r"/")[-1] try: beam_id = aperture["hardware_beam"] = self._stations[ station_trl ].allocate_beam(subarray_trl) aperture["channel_blocks"] = self._stations[ station_trl ].allocate_blocks(subarray_trl, number_of_blocks) beam_trls = self.station_beam_trl.get(station_name, None) if beam_trls is None: msg = f"Cannot allocate station beam: {station_id}, {beam_id}" self.logger.error(msg) return [{"Error": msg}] beam_trl = beam_trls.get(beam_id + 1, None) if beam_trl is None: msg = f"Cannot allocate station beam: {station_id}, {beam_id}" self.logger.error(msg) return [{"Error": msg}] station_beam_list.append(beam_trl) except ValueError: self.logger.error( f"Cannot allocate resources: Station {trl} has no" f" {number_of_blocks} channel blocks to allocate.", ) return [ { "Error": f"Cannot allocate resources: Station {trl} has no" f" {number_of_blocks} channel blocks to allocate." } ] aperture["station_beam_trl"] = beam_trl self.logger.debug(f"Allocated station beam {beam_trl}") first_subarray_channel += number_of_channels self.logger.debug( f"Allocating to {subarray_trl}: {subarray_beam_list}," f"{station_beam_list}" ) try: self._resource_manager.allocate( subarray_trl, subarray_beams=subarray_beam_list, station_beams=station_beam_list, ) except ValueError as e: self._resource_manager.resource_pool.free_resources( {"station_beams": station_beam_list} ) self.logger.error(str(e)) return [{"Error": str(e)}] return subarray_beams def _assign_resources( self: ControllerComponentManager, subarray_id: int, subarray_beams: list[dict], task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Assign resources to a subarray. :param subarray_id: id of the subarray to which resources are to be allocated :param subarray_beams: list of subarray beam resources as dictionary :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) # Allocate resources in controller resource managers, # and update the beam descriptors # If failed, return error message new_subarray_beams = self._allocate_controller_resources( subarray_id, subarray_beams ) message = new_subarray_beams[0].get("Error", None) if message: if task_callback: task_callback( status=TaskStatus.REJECTED, result=(ResultCode.REJECTED, message) ) return for beam in new_subarray_beams: subarray_beam_id = beam["subarray_beam_id"] beam_trl = self._subarray_beam_trl[subarray_beam_id] result_code = self._subarray_beams[beam_trl].assign_resources( { "subarray_id": subarray_id, "subarray_beam_id": beam["subarray_beam_id"], "apertures": beam["apertures"], "first_subarray_channel": beam["first_subarray_channel"], "number_of_channels": beam["number_of_channels"], } ) if result_code == ResultCode.FAILED: self.release(subarray_id) if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "The SubarrayBeam.assign_resources command has failed", ), ) return for aperture in beam["apertures"]: station_beam_trl = aperture["station_beam_trl"] args = { "subarray_id": subarray_id, "subarray_beam_id": subarray_beam_id, "station_id": aperture["station_id"], "aperture_id": aperture["aperture_id"], "channel_blocks": aperture["channel_blocks"], "hardware_beam": aperture["hardware_beam"], "station_trl": aperture["station_trl"], "first_subarray_channel": beam["first_subarray_channel"], "number_of_channels": beam["number_of_channels"], } result_code = self._station_beams[station_beam_trl].assign_resources( args ) if result_code == ResultCode.FAILED: self.release(subarray_id) if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "StationBeam.assign_resources command has failed", ), ) return # Add beam TRL to configuration, so that subarray knows mapping # between subarray beams ID and TRL beam["subarray_beam_trl"] = beam_trl subarray_trl = self._subarray_trl[subarray_id] result_code = self._subarrays[subarray_trl].assign_resources( subarray_id, new_subarray_beams ) self._desired_obs_state = ObsState.IDLE self._configuring_resources.add(subarray_trl) if self._wait_for_obs_state(self._obs_command_timeout) == TaskStatus.FAILED: if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, f"Subarray {subarray_id} didn't reach IDLE", ), ) return if task_callback: if ResultCode.FAILED == result_code: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "The AssignResources command has failed", ), ) else: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The AssignResources command has completed"), )
[docs] @check_communicating @check_on def release( self: ControllerComponentManager, subarray_id: int, ) -> tuple[ResultCode, str]: """ Release a subarray's resources. :param subarray_id: ID of the subarray which requires release :return: a result code and message """ if len(self._subarrays.values()) == 0: return ( ResultCode.REJECTED, "No subservient subarray devices to release", ) if self.power_state != PowerState.ON: return (ResultCode.FAILED, "Controller is not turned on.") subarray_trl = self._subarray_trl[subarray_id] self._release_from_subarray(subarray_trl) return (ResultCode.OK, "Release command completed OK")
[docs] def release_all( self: ControllerComponentManager, ) -> tuple[ResultCode, str]: """ Release all subarrays resources. :return: a result code and message """ if len(self._subarrays.values()) == 0: return ( ResultCode.REJECTED, "No subservient subarray devices to release", ) if self.power_state != PowerState.ON: return (ResultCode.FAILED, "Controller is not turned on.") for subarray_trl in self._subarrays.keys(): self._release_from_subarray(subarray_trl) return (ResultCode.OK, "Release command completed OK")
def _release_from_subarray( self: ControllerComponentManager, subarray_trl: str, ) -> None: """ Release resources from a subarray. TODO: Actually forward the ReleaseAllResources() to subdevices :param subarray_trl: subarray from which resources must be released """ self._subarrays[subarray_trl].release_all_resources() resources = self._resource_manager.get_allocated(subarray_trl) self.logger.debug( f"Releasing resources for subarray {subarray_trl}: {resources}" ) for station in self._stations.values(): station.release_from_subarray(subarray_trl) trls = resources.get("station_beams", None) if trls is not None: self._resource_manager.resource_pool.free_resources({"station_beams": trls}) self._resource_manager.deallocate_from(subarray_trl)
[docs] def get_resources(self: ControllerComponentManager, subarray_id: int) -> str: """ Return a dictionary of the resources assigned to a given subarray. :param subarray_id: The subarray ID of the resources :return: json formatted dictionary """ trl = self._subarray_trl[subarray_id] resources = json.dumps(self._resource_manager.get_allocated(trl)) self.logger.debug(f"get_resources for {trl}: {resources}") return resources
[docs] @check_communicating @check_on def restart_subarray( self: ControllerComponentManager, subarray_id: int, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Restart an MCCS subarray. :param subarray_id: an integer subarray_id. :param task_callback: Update task state, defaults to None :return: a task status and a message """ if len(self._subarrays.values()) == 0: return ( TaskStatus.REJECTED, "No subservient subarray devices to restart", ) if self.power_state != PowerState.ON: return (TaskStatus.FAILED, "Controller is not turned on.") return self.submit_task( self._restart_subarray, [self._subarray_trl[subarray_id]], task_callback=task_callback, )
def _abort_subdevices( self: ControllerComponentManager, subarray_trl: str, ) -> TaskStatus: resources_allocated = self._resource_manager.get_allocated(subarray_trl) self._desired_obs_state = ObsState.ABORTED for station_beam_trl in resources_allocated["station_beams"]: station_beam_proxy = self._station_beams[station_beam_trl] if self._device_obs_states[station_beam_trl] not in [ ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ]: self.logger.info(f"Aborting {station_beam_trl}") self._configuring_resources.add(station_beam_trl) station_beam_proxy.abort() for subarray_beam_trl in resources_allocated["subarray_beams"]: subarray_beam_proxy = self._subarray_beams[subarray_beam_trl] if self._device_obs_states[subarray_beam_trl] not in [ ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ]: self.logger.info(f"Aborting {subarray_beam_trl}") self._configuring_resources.add(subarray_beam_trl) subarray_beam_proxy.abort_device() if self._device_obs_states[subarray_trl] not in [ ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ]: self.logger.info(f"Aborting {subarray_trl}") self._configuring_resources.add(subarray_trl) self._subarrays[subarray_trl].abort_device() return self._wait_for_obs_state(self._obs_command_timeout) def _restart_subdevices( self: ControllerComponentManager, subarray_trl: str, ) -> TaskStatus: resources_allocated = self._resource_manager.get_allocated(subarray_trl) self._desired_obs_state = ObsState.EMPTY for station_beam_trl in resources_allocated["station_beams"]: station_beam_proxy = self._station_beams[station_beam_trl] if self._device_obs_states[station_beam_trl] in [ ObsState.FAULT, ObsState.ABORTED, ]: self.logger.info(f"Restarting {station_beam_trl}") self._configuring_resources.add(station_beam_trl) station_beam_proxy.restart() for subarray_beam_trl in resources_allocated["subarray_beams"]: subarray_beam_proxy = self._subarray_beams[subarray_beam_trl] if self._device_obs_states[subarray_beam_trl] in [ ObsState.FAULT, ObsState.ABORTED, ]: self.logger.info(f"Restarting {subarray_beam_trl}") self._configuring_resources.add(subarray_beam_trl) subarray_beam_proxy.restart() if self._device_obs_states[subarray_trl] in [ ObsState.FAULT, ObsState.ABORTED, ]: self.logger.info(f"Restarting {subarray_trl}") self._configuring_resources.add(subarray_trl) self._subarrays[subarray_trl].restart() return self._wait_for_obs_state(self._obs_command_timeout) def _restart_subarray( self: ControllerComponentManager, subarray_trl: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Deallocate all resources from a subarray. :param subarray_trl: TRL of the subarray from which all resources are to be deallocated :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) task_status = self._abort_subdevices(subarray_trl) if task_status == TaskStatus.COMPLETED: task_status = self._restart_subdevices(subarray_trl) if task_status == TaskStatus.COMPLETED: self._resource_manager.deallocate_from(subarray_trl) if task_callback: if task_status == TaskStatus.FAILED: task_callback( status=TaskStatus.FAILED, result=(ResultCode.FAILED, "The restart command has failed"), ) else: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "The restart command has completed"), ) def _device_obs_state_changed( self: ControllerComponentManager, trl: str, obs_state: ObsState, ) -> None: old_state = self._device_obs_states.get(trl, None) if old_state: old_name = old_state.name else: old_name = "NONE" new_state = ObsState(obs_state) self._device_obs_states[trl] = new_state if obs_state == self._desired_obs_state and trl in self._configuring_resources: self._configuring_resources.remove(trl) self.logger.debug( f"ObsState for {trl} changed: {old_name} -> {new_state.name}, " f"waiting for {len(self._configuring_resources)} more devices" ) def _station_power_state_changed( self: ControllerComponentManager, trl: str, power_state: PowerState, ) -> None: old_state = self._device_power_states.get(trl, None) if old_state is not None: old_name = old_state.name else: old_name = "NONE" new_state = PowerState(power_state) self._device_power_states[trl] = new_state if power_state == self._desired_power_state and trl in self._powering_resources: self._powering_resources.remove(trl) self.logger.debug( f"State for {trl} changed: {old_name} -> {new_state.name}, " f"waiting for {len(self._powering_resources)} more devices" ) def _wait_for_obs_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for sub-device ObsState to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_obs_state is not None resolution = 0.01 # seconds ticks = int(timeout / resolution) # 10 ms resolution while self._configuring_resources: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting for ObsState {self._desired_obs_state.name}" f" in {timeout} seconds. Attempting final poll." ) return self._final_poll() self.logger.debug(f"Waited ObsState for {timeout-ticks*resolution} seconds") return TaskStatus.COMPLETED def _final_poll(self: ControllerComponentManager) -> TaskStatus: """ Direct check of device obsstates as a last attempt before declaring failure. Sometimes it appears we miss change events in production, if for some reason we haven't received all the change events we expected, double check what we think the obsstate is, vs what it actually is on all subdevices that we need to. :returns: a taskstatus dependent on whether or not the device was actually in the correct state. """ for trl in list(self._configuring_resources): device = ( self._subarrays.get(trl) or self._subarray_beams.get(trl) or self._station_beams.get(trl) ) if ( device and device._proxy and device._proxy.obsstate == self._desired_obs_state ): self.logger.info( f"Change event for {trl} was missed, device was in correct state." ) if self._component_state_callback is not None: self._component_state_callback(missed_event=True) self._configuring_resources.remove(trl) return ( TaskStatus.COMPLETED if not self._configuring_resources else TaskStatus.FAILED ) def _wait_for_power_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for station PowerState to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_power_state is not None resolution = 1 # seconds ticks = int(timeout / resolution) while self._powering_resources: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting for PowerState in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {self._powering_resources} " f"PowerState {self._desired_power_state.name}," f" waiting for {ticks*resolution} more seconds" ) self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds") return TaskStatus.COMPLETED def _wait_for_station_synchronisation_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for station Synchronisation to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_synchronisation_state is not None resolution = 0.5 # seconds ticks = int(timeout / resolution) synchronised = False synchronising_stations = set( station_trl for station_trl in self._stations.keys() ) synchronised_stations = set() self.logger.debug( f"Waiting for stations to synchronise: {synchronising_stations}" ) while not synchronised: # Wait for ALL stations to sync. for station_trl in synchronising_stations: station_proxy = self._stations[station_trl] if not station_proxy.issynchronised: synchronised = False break synchronised_stations.add(station_trl) synchronised = True self.logger.debug(f"{station_trl} achieved synchronisation!") self.logger.debug(f"{ticks*resolution}s remaining on wait time.") if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting {synchronising_stations} " f"for Synchronisation in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {synchronising_stations} Synchronisation" f" {self._desired_synchronisation_state}," f" waiting for {ticks*resolution} more seconds" ) synchronising_stations -= synchronised_stations self.logger.debug( f"Waited Synchronisation for {timeout-ticks*resolution} seconds" ) return TaskStatus.COMPLETED def _wait_for_initialisation_state( self: ControllerComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for station Initialisation to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_initialisation_state is not None resolution = 0.5 # seconds ticks = int(timeout / resolution) initialised = False initialised_stations = set() while not initialised: for station_trl in self._initialising_resources: station_proxy = self._stations[station_trl] if not station_proxy.isinitialised: initialised = False break initialised_stations.add(station_trl) initialised = True if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting {self._initialising_resources} " f"for Initialisation in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {self._initialising_resources} Initialisation" f" {self._desired_initialisation_state}," f" waiting for {ticks*resolution} more seconds" ) self._initialising_resources -= initialised_stations self.logger.debug( f"Waited Initialisation for {timeout-ticks*resolution} seconds" ) return TaskStatus.COMPLETED