# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for the MCCS controller."""
# pylint: disable=too-many-lines
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from datetime import datetime
from typing import Any, Callable, Iterable, Optional
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_low_mccs_common.component import (
DeviceComponentManager,
ObsDeviceComponentManager,
)
from ska_low_mccs_common.resource_manager import ResourceManager, ResourcePool
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from ska_low_mccs.controller.controller_resource_manager import (
ControllerResourceManager,
)
__all__ = ["ControllerComponentManager"]
_task_to_result = {
TaskStatus.COMPLETED: ResultCode.OK,
TaskStatus.ABORTED: ResultCode.ABORTED,
TaskStatus.FAILED: ResultCode.FAILED,
}
RFC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
class _StationProxy(DeviceComponentManager):
"""
A controller's proxy to a station.
Each station proxy keeps track of channels it has available. Each
station proxy also has its own resource manager to keep track of
which of its channels are assigned to each subarray.
"""
def __init__(
self: _StationProxy,
trl: str,
subarray_trls: Iterable[str],
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a new instance.
:param trl: the TRL of the device
:param subarray_trls: the TRLs of subarrays which channel
blocks can be assigned to.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be
called when the status of the communications channel
between the component manager and its component changes
:param component_state_callback: callback to be
called when the component state changes
"""
self._channel_block_pool = ResourcePool(
channel_blocks=range(48),
hardware_beams=range(48),
)
self._resource_manager = ResourceManager(
subarray_trls,
channel_blocks=range(48),
hardware_beams=range(48),
)
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
)
def allocate_blocks(
self: _StationProxy, subarray_trl: str, channel_blocks: int
) -> list:
"""
Allocate channel blocks to a subarray.
This method removes the requested number of channel blocks from
the available pool and assigns them to the provided subarray
trl.
:param subarray_trl: The TRL of the subarray to which the
channel blocks are to be assigned.
:param channel_blocks: The number of channel blocks to assign to
the subarray.
:return: a list of blocks or None if allocation failed
:raises ValueError: if the resources cannot be allocated
"""
channel_blocks_to_allocate: list[int] = []
block = None
for _ in range(channel_blocks):
try:
block = self._channel_block_pool.get_free_resource("channel_blocks")
except ValueError as exc:
self._channel_block_pool.free_resources(
{"channel_blocks": channel_blocks_to_allocate}
)
raise ValueError from exc
channel_blocks_to_allocate.append(block) # type: ignore [arg-type]
self._resource_manager.allocate(
subarray_trl, channel_blocks=channel_blocks_to_allocate
)
return channel_blocks_to_allocate
def allocate_beam(self: _StationProxy, subarray_trl: str) -> int:
"""
Allocate a hardware beam to a subarray.
This method removes one hardware beam from the available pool
and assigns it to the provided subarray TRL.
:param subarray_trl: The TRL of the subarray to which the
hardware beam is to be assigned.
:return: index of allocated beam, or None
"""
beam_to_allocate = self._channel_block_pool.get_free_resource("hardware_beams")
self._resource_manager.allocate(subarray_trl, hardware_beams=[beam_to_allocate])
return beam_to_allocate # type: ignore [return-value]
def release_from_subarray(self: _StationProxy, subarray_trl: str) -> None:
"""
Release all channel blocks assigned to a subarray.
Channel blocks and beams are released from the subarray and
marked as free in the station proxy's device pool for
reallocation whenever needed.
:param subarray_trl: The TRL of the subarray from which this
station proxy's channel blocks are to be released.
"""
resources_to_release = self._resource_manager.get_allocated(subarray_trl)
self._channel_block_pool.free_resources(resources_to_release)
self._resource_manager.deallocate_from(subarray_trl)
@check_communicating
def start_acquisition(self: _StationProxy, argin: str) -> ResultCode:
"""
Start the acquisition synchronously for all tiles, checks for synchronisation.
:param argin: json dictionary with optional keywords
* start_time - (str) start time
* delay - (int) delay start
:return: result code of StartAcquisition
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.StartAcquisition(argin)
return result_code
@check_communicating
def initialise(self: _StationProxy) -> ResultCode:
"""
Initialise the station's tiles.
:return: result code of Initialise
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.Initialise()
return result_code
@property
def issynchronised(self: _StationProxy) -> bool:
"""
Returns True is all tiles in this station are synchronised.
:returns: True is all tiles in this station are synchronised
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.issynchronised
@property
def isinitialised(self: _StationProxy) -> bool:
"""
Returns True is all tiles in this station are initialised.
:returns: True is all tiles in this station are initialised
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.isinitialised
class _SubarrayProxy(ObsDeviceComponentManager):
"""A controller's proxy to a subarray."""
@check_communicating
@check_on
def assign_resources(
self: _SubarrayProxy, subarray_id: int, subarray_beams: Iterable[dict]
) -> ResultCode:
"""
Tell the subarray what resources are assigned to it.
The Subarray receives the allocated resources, together with the
subarray allcation description.
This implicitly deallocates all resources.
Subarray is described as a set of station beams
:param subarray_id: This is mainly used for cross check. Subarray knows its ID
:param subarray_beams: list of subarray beam description. Structure containing
* subarray_beam_id: integer
* subarray_beam_trl: str
* first_subarray_channel: (int) First logical channel assigned to subarray
* number_of_channels
* apertures: list of dictionaries iwith each entry containing
* * station_id: (int) in range 1-512
* * aperture_id: (str) with format APx.y; x must match station_ID)
* * station_beam_trl: (str)
* * channel_blocks: Allocated channel blocks for this station
* * hardware_beam: Allocated hardware beam for this station
TRLs of stations are not specified, as they are not used in the Subarray.
Station beam TRLs could also be omitted.
:return: a result code.
"""
assert self._proxy is not None
args = json.dumps(
{"subarray_id": subarray_id, "subarray_beams": subarray_beams}
)
self.logger.debug(f"Subarray AssignResources: {args}")
try:
([result_code], _) = self._proxy.AssignResources(args)
except Exception as exc: # pylint: disable=broad-except
self.logger.debug(f"Caught exception: {exc}")
result_code = ResultCode.FAILED
return result_code
@check_communicating
@check_on
def release_all_resources(
self: _SubarrayProxy,
) -> ResultCode:
"""
Tell the subarray that it no longer has any resources.
:return: a result code.
"""
assert self._proxy is not None
([result_code], _) = self._proxy.ReleaseAllResources()
return result_code
@check_communicating
@check_on
def restart(
self: _SubarrayProxy,
) -> ResultCode:
"""
Restart the subarray and all related sub_elements.
:return: a result code.
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Restart()
return result_code
@check_communicating
@check_on
def abort_device(self: _SubarrayProxy) -> ResultCode:
"""
Abort this device without aborting subdevices.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.AbortDevice()
return result_code
class _SubarrayBeamProxy(ObsDeviceComponentManager):
"""A controller's proxy to a subarray beam."""
@check_communicating
@check_on
def assign_resources(
self: _SubarrayBeamProxy,
subarray_beam: dict[str, Any],
) -> ResultCode:
"""
Assign resources to the Subarray Beam.
:param subarray_beam: subarray beam description. Structure containing
* subarray_id: This is used for cross check. Subarray knows its ID
* subarray_beam_id: integer
* first_subarray_channel: (int) First logical channel assigned to subarray
* number_of_channels: The number of SPS channels assigned to subarray
* apertures: list of dictionaries with each entry containing
* * station_id: (int) in range 1-512
* * aperture_id: (str) with format APx.y; x must match station_ID
* * station_trl: (str)
* * station_beam_trl: (str)
* * channel_blocks: Allocated channel blocks for this station
* * hardware_beam: Allocated hardware beam for this station
:return: a result code
"""
assert self._proxy is not None
self.logger.debug(f"SubarrayBeam AssignResources: {subarray_beam}")
args = json.dumps(subarray_beam)
try:
result_code = self._proxy.AssignResources(args)
except Exception as exc: # pylint: disable=broad-except
self.logger.error(f"Caught exception {exc}")
result_code = ResultCode.FAILED
return result_code
@check_communicating
@check_on
def abort_device(self: _SubarrayBeamProxy) -> ResultCode:
"""
Abort this device without aborting subdevices.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.AbortDevice()
return result_code
@check_communicating
@check_on
def restart(self: _SubarrayBeamProxy) -> ResultCode:
"""
Restart this device without aborting subdevices.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Restart()
return result_code
class _StationBeamProxy(ObsDeviceComponentManager):
"""A controller's proxy to a station beam."""
@check_communicating
@check_on
def assign_resources(
self: _StationBeamProxy,
station_beam: dict[str, Any],
) -> ResultCode:
"""
Assign resources to a station beam.
:param station_beam: Dictionary with command arguments
* param subarray_id: The ID of the subarray to which this beam is assigned
* param subarray_beam_id: The ID of the subarray beam to which this beam
is assigned
* param station_id: ID of the station which implemnts the beam
* param aperture_id: ID of the form APx.y with X equal to the station ID
* param channel_blocks: Channel blocks allocated to this beam
* param hardware_beam: Hardware beam allocated to this beam
:return: a result code
"""
assert self._proxy is not None
self.logger.debug(f"StationBeam AssignResources: {station_beam}")
json_args = json.dumps(station_beam)
try:
result_code = self._proxy.AssignResources(json_args)
except Exception as exc: # pylint: disable=broad-except
self.logger.debug(f"Catched exception {exc}")
result_code = ResultCode.FAILED
return result_code
@check_communicating
@check_on
def restart(self: _StationBeamProxy) -> ResultCode:
"""
Restart this device.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Restart()
return result_code
@check_communicating
@check_on
def abort(
self: _StationBeamProxy, task_callback: TaskCallbackType | None = None
) -> tuple[TaskStatus, str]:
"""
Abort this device and its station.
:param task_callback: Update task state, defaults to None
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Abort()
return result_code
# pylint: disable=too-many-instance-attributes
[docs]class ControllerComponentManager(TaskExecutorComponentManager):
"""
A component manager for an MCCS controller.
This component manager has three jobs:
* Monitoring of the devices in the MCCS subsystem
* Powering the MCCS subsystem off and on
* Allocating resources to subarrays
"""
# pylint: disable=too-many-arguments
[docs] def __init__(
self: ControllerComponentManager,
subarray_trls: Iterable[str],
station_trls: Iterable[str],
subarray_beam_trls: Iterable[str],
station_beam_trls: Iterable[str],
logger: logging.Logger,
obs_command_timeout: int,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a new instance.
:param subarray_trls: TRLS of all subarray devices
:param station_trls: TRLS of all station devices
:param subarray_beam_trls: TRLS of all subarray beam devices
:param station_beam_trls: TRLS of all station beam devices
:param obs_command_timeout: the default timeout for obs
commands in seconds.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be
called when the status of the communications channel
between the component manager and its component changes
:param component_state_callback: callback to be
called when the component state changes
"""
self._communication_state_callback = communication_state_callback
self._component_state_callback = component_state_callback
self._obs_command_timeout = obs_command_timeout
self.__communication_lock = threading.Lock()
self._power_state_lock = threading.RLock()
self._device_communication_states: dict[str, CommunicationStatus] = {}
self._device_obs_states: dict[str, ObsState] = {}
self._device_power_states: dict[str, PowerState] = {}
self._configuring_resources: set[str] = set() # these resources should go
self._desired_obs_state: Optional[ObsState] = None # to this obs state
self._powering_resources: set[str] = set() # these stations should go
self._desired_power_state: Optional[PowerState] = None # to this power state
# this set of stastions should go
self._synchronising_resources: set[str] = set()
self._desired_synchronisation_state: Optional[
bool
] = None # to this synchronisation state
self._initialising_resources: set[str] = set()
self._desired_initialisation_state: Optional[bool] = None
self._station_ids = [
station_trl.split(r"/")[-1] for station_trl in station_trls
]
for trl in subarray_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
for trl in station_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
for trl in subarray_beam_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
for trl in station_beam_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
self._resource_manager = ControllerResourceManager(
subarrays=subarray_trls,
subarray_beams=subarray_beam_trls,
station_beams=station_beam_trls,
channel_blocks=range(1, 49),
)
self._subarrays: dict[str, _SubarrayProxy] = {
trl: _SubarrayProxy(
trl,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
)
for trl in subarray_trls
}
self._stations: dict[str, _StationProxy] = {
trl: _StationProxy(
trl,
subarray_trls,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
)
for trl in station_trls
}
self._subarray_beams: dict[str, _SubarrayBeamProxy] = {
trl: _SubarrayBeamProxy(
trl,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
)
for trl in subarray_beam_trls
}
self._station_beams: dict[str, _StationBeamProxy] = {
trl: _StationBeamProxy(
trl,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
)
for trl in station_beam_trls
}
# dictionaries to map IDs to TRLs
#
self._subarray_trl: dict[int, str] = dict(enumerate(subarray_trls, start=1))
self._subarray_beam_trl: dict[int, str] = dict(
enumerate(subarray_beam_trls, start=1)
)
# Station ID, to Station Beam ID to Station Beam TRL
self.station_beam_trl: dict[str, dict[int, str]] = {}
for station_id in self._station_ids:
# Get all station beam TRLS which are associated with a given station
station_beam_trl_in_station = [
station_beam_trl
for station_beam_trl in station_beam_trls
if station_id in station_beam_trl
]
# Map station beam ID to station beam TRL
station_beam_ids_in_station = dict(
enumerate(station_beam_trl_in_station, start=1)
)
# Map station ID to our map of station beam ID to station beam TRL
self.station_beam_trl[station_id] = station_beam_ids_in_station
self._station_id_trl_map: dict[int, str] = {}
self._device_proxies: dict[str, dict] = {
"subarray": self._subarrays,
"station": self._stations,
"subarraybeam": self._subarray_beams,
"beam": self._station_beams,
}
self._started_acquisition = False
super().__init__(
logger,
communication_state_callback,
component_state_callback,
power=None,
fault=None,
)
[docs] def start_communicating(self: ControllerComponentManager) -> None:
"""Establish communication with the station components."""
if self.communication_state == CommunicationStatus.ESTABLISHED:
return
if self.communication_state == CommunicationStatus.DISABLED:
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
if not self._device_communication_states:
self._update_communication_state(CommunicationStatus.ESTABLISHED)
else:
for subarray_proxy in self._subarrays.values():
subarray_proxy.start_communicating()
for station_proxy in self._stations.values():
station_proxy.start_communicating()
for subarray_beam_proxy in self._subarray_beams.values():
subarray_beam_proxy.start_communicating()
for station_beam_proxy in self._station_beams.values():
station_beam_proxy.start_communicating()
[docs] def stop_communicating(self: ControllerComponentManager) -> None:
"""Break off communication with the station components."""
if self.communication_state == CommunicationStatus.DISABLED:
return
self._update_communication_state(CommunicationStatus.DISABLED)
self._update_component_state(power=None, fault=None)
# Cast the values to list to stop the dict from changing underneath us
for subarray_proxy in list(self._subarrays.values()):
subarray_proxy.stop_communicating()
for station_proxy in self._stations.values():
station_proxy.stop_communicating()
for subarray_beam_proxy in list(self._subarray_beams.values()):
subarray_beam_proxy.stop_communicating()
for station_beam_proxy in list(self._station_beams.values()):
station_beam_proxy.stop_communicating()
def _device_communication_state_changed(
self: ControllerComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
"""
Handle communication changes.
:param trl: TRL of changed device
:param communication_state: new status
"""
if trl not in self._device_communication_states:
self.logger.warning(
f"Received a communication status changed event for device {trl} "
"which is not managed by this controller. "
"Probably it was released just a moment ago. "
"The event will be discarded."
)
return
self._device_communication_states[trl] = communication_state
if self.communication_state == CommunicationStatus.DISABLED:
return
self._evaluate_communication_state()
def _evaluate_communication_state(
self: ControllerComponentManager,
) -> None:
# Many callback threads could be hitting this method at the same time, so it's
# possible (likely) that the GIL will suspend a thread between checking if it
# need to update, and actually updating. This leads to callbacks appearing out
# of order, which breaks tests. Therefore we need to serialise access.
with self.__communication_lock:
if (
CommunicationStatus.DISABLED
in self._device_communication_states.values()
):
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
elif (
CommunicationStatus.NOT_ESTABLISHED
in self._device_communication_states.values()
):
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
else:
self._update_communication_state(CommunicationStatus.ESTABLISHED)
self._update_component_state(fault=False)
def _evaluate_power_state(self: ControllerComponentManager) -> None:
for power_state in [
PowerState.UNKNOWN,
PowerState.OFF,
PowerState.STANDBY,
PowerState.ON,
]:
if power_state in self._device_power_states.values():
break
self.logger.info(
"In ControllerComponentManager._evaluatePowerState with:\n"
f"\tdevices: {self._device_power_states}\n"
f"\tresult: {str(power_state)}"
)
self._update_component_state(power=power_state)
@property
def power_state(self: ControllerComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
def _subarray_health_changed(
self: ControllerComponentManager,
trl: str,
health: HealthState | None,
) -> None:
"""
Handle a change in the health of a subarray.
:param trl: the TRL of the subarray whose health has changed.
:param health: the new health state of the subarray, or None if
the subarray's health should not be taken into account.
"""
# What we're really interested in here is whether the subarray is in the right
# adminMode. We know that when it's in the wrong adminMode, health will be
# reported as None. So instead of subscribing to adminMode, we might as well be
# lazy here and get at the adminMode thorough the healthState that we're already
# subscribed to.
self._resource_manager.set_ready(trl, health is not None)
def _subarray_beam_health_changed(
self: ControllerComponentManager,
trl: str,
health: HealthState | None,
) -> None:
"""
Handle a change in the health of a subarray_beam.
:param trl: the TRL of the subarray_beam whose health has
changed.
:param health: the new health state of the subarray_beam, or
None if the subarray_beam's health should not be taken
into account.
"""
def _station_beam_health_changed(
self: ControllerComponentManager,
trl: str,
health: HealthState | None,
) -> None:
"""
Handle a change in the health of a station_beam.
:param trl: the TRL of the station_beam whose health has
changed.
:param health: the new health state of the station_beam, or None
if the station_beam's health should not be taken into account.
"""
[docs] def get_health_trl(
self: ControllerComponentManager, trl: str
) -> Optional[HealthState]:
"""
Return the health of a subdevice with given TRL.
:param trl: TRL of device to return health of
:return: health of device given by TRL
:raises ValueError: if TRL is for an invalid device
"""
device = trl.split("/")[1]
if (
device not in self._device_proxies
or trl not in self._device_proxies[device]
):
raise ValueError(f"{trl} is an invalid device.")
return self._device_proxies[device][trl].health
[docs] def get_healths(
self: ControllerComponentManager, device_type: str = "all"
) -> dict[str, dict[str, Optional[str]]]:
"""
Return subdevice healths.
:param device_type: the type of device to return the
health state of its instances.
:return: dictionary of [device_type: [TRL : HealthState]] for
each device of that device type, or all
devices if called without an argument.
:raises ValueError: if device_type is not a valid device type.
"""
devices_healths: dict[str, dict[str, Optional[str]]] = {}
if device_type == "all":
devices = [*self._device_proxies]
else:
if device_type not in self._device_proxies:
raise ValueError(f"{device_type} is not a valid device type.")
devices = [device_type]
for device in devices:
device_healths: dict[str, Optional[str]] = {}
for trl in self._device_proxies[device]:
health_trl = self.get_health_trl(trl)
if health_trl is not None:
health = HealthState(health_trl).name
else:
health = None
device_healths.update({trl: health})
devices_healths.update({device: device_healths})
return devices_healths
[docs] @check_communicating
def off(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Turn off the MCCS subsystem.
:param task_callback: Update task state, defaults to None
:return: a TaskStatus and message
"""
if len(self._stations.values()) == 0:
return (TaskStatus.REJECTED, "No subservient devices to turn off")
return self.submit_task(self._off, task_callback=task_callback)
def _off(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn off the MCCS subsystem.
This is simpler than turning ON MCCS, we simply need to
command all MccsStations to turn off and wait for it to happen.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
# Command all our MccsStations to turn OFF which aren't already OFF.
results: list[tuple[str, TaskStatus]] = []
for station_trl, station_proxy in self._stations.items():
if self._device_power_states[station_trl] != PowerState.OFF:
results.append((station_trl, station_proxy.off()[0]))
self._powering_resources.add(station_trl)
# Did any MccsStations reject our request?
for result in results:
if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED):
if task_callback:
task_callback(
status=result[1],
result=(
_task_to_result[result[1]],
f"{result[0]} responded to OFF command with {result[1]}",
),
)
return
# All MccsStations not OFF already are turning OFF, let's wait
# for that to happen.
self._desired_power_state = PowerState.OFF
wait_for_stations = self._wait_for_power_state(600)
if wait_for_stations == TaskStatus.FAILED:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"Timed out waiting for MccsStations to turn off",
),
)
return
if wait_for_stations == TaskStatus.ABORTED:
if task_callback:
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
"Aborted waiting for MccsStations to turn off",
),
)
return
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The off command has completed"),
)
[docs] @check_communicating
def standby(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Put the MCCS subsystem in standby mode.
:param task_callback: Update task state, defaults to None
:return: task status and message
"""
if len(self._stations.values()) == 0:
return (
TaskStatus.REJECTED,
"No subservient devices to put into standby",
)
return self.submit_task(self._standby, task_callback=task_callback)
def _standby(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Put the MCCS subsystem into low power standby mode.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
results = [station_proxy.standby() for station_proxy in self._stations.values()]
completed = True
for result in results:
if result[0] == TaskStatus.FAILED:
completed = False
break
if task_callback:
if completed:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The standby command has completed"),
)
else:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "The standby command has failed"),
)
[docs] @check_communicating
def on(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Turn on the MCCS subsystem.
:param task_callback: Update task state, defaults to None
:return: task status and message
"""
if len(self._stations.values()) == 0:
return (TaskStatus.REJECTED, "No subservient devices to turn on")
return self.submit_task(self._on, task_callback=task_callback)
# pylint: disable=too-many-branches
def _on(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn on the MCCS subsystem.
The procedure involves turning on all Stations.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
# Command all our MccsStations to turn ON which aren't already ON.
results: list[tuple[str, TaskStatus]] = []
for station_trl, station_proxy in self._stations.items():
if self._device_power_states[station_trl] != PowerState.ON:
results.append((station_trl, station_proxy.on()[0]))
self._powering_resources.add(station_trl)
# Did any MccsStations reject our request?
for result in results:
if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED):
if task_callback:
task_callback(
status=result[1],
result=(
_task_to_result[result[1]],
f"{result[0]} responded to ON command with {result[1]}",
),
)
return
# All MccsStations not ON already are turning ON, let's wait
# for that to happen.
self._desired_power_state = PowerState.ON
wait_for_stations = self._wait_for_power_state(600)
if wait_for_stations == TaskStatus.FAILED:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"Timed out waiting for MccsStations to turn on",
),
)
return
if wait_for_stations == TaskStatus.ABORTED:
if task_callback:
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
"Aborted waiting for MccsStations to turn on",
),
)
return
# All of our MccsStations are now ON. So we must do required
# Initialisation/Synchronisation.
self._started_acquisition = False
# All MccsStations already Synchronised, we're happy to call the job done.
if all(
station_proxy.issynchronised for station_proxy in self._stations.values()
):
self.logger.debug("All stations synchronsied.")
self._started_acquisition = True
elif all(
station_proxy.isinitialised for station_proxy in self._stations.values()
):
self.logger.debug("Reinitialising tiles, temporary hack")
initialised = self._intialise_stations()
if initialised:
self.logger.debug(
"All stations initialised, beginning synchronisation."
)
self._started_acquisition = self._synchronise_stations()
# Our MccsStations are all mismatched, lets re-initialise all of them,
# then resynchronise all of them.
elif all(
station_proxy.isinitialised or station_proxy.issynchronised
for station_proxy in self._stations.values()
):
self.logger.debug(
"Some stations are intialised, others"
" synchronised. Reinitialising and synchronising all stations."
)
initialised = self._intialise_stations()
if initialised:
self._started_acquisition = self._synchronise_stations()
# If any of our MccsStations are not in Initialised or Synchronised, something
# is amiss and we have not succeeded.
# There is currently a bug whereby the MccsTile can be
# ON when not Initialised or Synchronised
# This is temporary MCCS-2104 should resolve this.
else:
self.logger.warning(
"There are stations which are not Initialised or "
"Synchronised, they should be in either of these states."
)
initialised = self._intialise_stations()
if initialised:
self._started_acquisition = self._synchronise_stations()
if task_callback:
if self._started_acquisition:
self.logger.info("The On command has completed")
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The On command has completed"),
)
else:
self.logger.info("The On command has failed")
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "The On command has failed"),
)
def _synchronise_stations(self: ControllerComponentManager) -> bool:
start_acq_results = []
start_time = datetime.strftime(
datetime.fromtimestamp(int(time.time()) + 2), RFC_FORMAT
)
for station_trl, station_proxy in self._stations.items():
self._desired_synchronisation_state = True
start_acq_results += [
station_proxy.start_acquisition(json.dumps({"start_time": start_time}))
]
self._synchronising_resources.add(station_trl)
if all(
result not in (TaskStatus.REJECTED, TaskStatus.ABORTED)
for result in start_acq_results
) and self._wait_for_station_synchronisation_state(30) not in (
TaskStatus.FAILED,
TaskStatus.ABORTED,
):
return True
return False
def _intialise_stations(self: ControllerComponentManager) -> bool:
initialise_results = []
for station_trl, station_proxy in self._stations.items():
self._desired_initialisation_state = True
initialise_results += [station_proxy.initialise()]
self._initialising_resources.add(station_trl)
if all(
result not in (TaskStatus.REJECTED, TaskStatus.ABORTED)
for result in initialise_results
) and self._wait_for_initialisation_state(120) not in (
TaskStatus.FAILED,
TaskStatus.ABORTED,
):
return True
return False
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
[docs] @check_communicating
@check_on
def allocate(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
subarray_id: int,
subarray_beams: list[dict],
) -> tuple[TaskStatus, str]:
"""
Allocate and distribute a set of unallocated MCCS resources to a subarray.
The kwargs argument specifies the overall sub-array composition
in terms of which stations should be allocated to the specified
subarray_beam. It contains:
:param interface: the schema version this is running against.
:param subarray_id: int, ID of the subarray which requires allocation
:param subarray_beams: list of dictionaries, each sepcifying a beam allocation
:param task_callback: callback to signal end of command
:return: A tuple containing a task status and a unique id
"""
#
# check that some resources are present
#
self.logger.debug(f"Allocate command started for subarray {subarray_id}")
self.logger.debug(f"subarray definition: {subarray_beams}")
return self.submit_task(
self._assign_resources,
args=[subarray_id, subarray_beams],
task_callback=task_callback,
)
def _create_station_id_trl_map(self: ControllerComponentManager) -> None:
"""
Populate a dictionary with mapping from station_id to TRL.
:raises AttributeError: If the station does not have
expected station_id property.
"""
tmp_map = {}
station_id_property = "StationId"
for station_trl, station in self._stations.items():
assert station._proxy is not None # for the type checker
station_id = station._proxy.get_property(station_id_property)[
station_id_property
]
if station_id:
# station_id looks like ['4']
tmp_map[int(station_id[0])] = station_trl
else:
raise AttributeError(
f"Station {station_trl} has no property value {station_id_property}"
)
self._station_id_trl_map = tmp_map
# pylint: disable=too-many-return-statements
def _allocate_controller_resources(
self: ControllerComponentManager,
subarray_id: int,
subarray_beams: list[dict],
) -> list[dict]:
"""
Allocate a set of unallocated MCCS resources to a subarray.
:param subarray_id: int, ID of the subarray which requires allocation
:param subarray_beams: list of dictionaries, each specifying a beam allocation
:return: updates list dictionaries, each specifying a beam allocation
or a single dictionary with a single "Error" element
"""
subarray_trl = self._subarray_trl.get(subarray_id, None)
if subarray_trl is None:
return [{"Error": f"Subarray {subarray_id} not present"}]
if len(subarray_beams) == 0:
return [{"Error": "No beams defined"}]
# first release all resources in resource managers
self.logger.debug(
f"Device {subarray_trl} power: "
f"{self._subarrays[subarray_trl]._component_state['power']}"
)
self._release_from_subarray(subarray_trl)
self._desired_obs_state = ObsState.EMPTY
self._configuring_resources.add(subarray_trl)
if self._wait_for_obs_state(self._obs_command_timeout) == TaskStatus.FAILED:
return [{"Error": f"Subarray {subarray_id} didn't reach EMPTY"}]
# Map from the station_id to station_trl
if not self._station_id_trl_map:
self._create_station_id_trl_map()
# chech that devices are available. Drop from the configuration those who are
# not physically present
# subarray_beams = kwargs.get("subarray_beams", [])
first_subarray_channel = 0
subarray_beam_list = []
station_beam_list = []
for beam in subarray_beams:
subarray_beam_id = beam["subarray_beam_id"]
self.logger.debug(f"Start processing beam {subarray_beam_id}")
trl = self._subarray_beam_trl.get(subarray_beam_id, None)
if trl is None:
subarray_beams.remove(beam)
self.logger.warning(
f"Subarray beam {subarray_beam_id} not present, "
"dropped from allocation"
)
continue
#
self.logger.debug(f"Allocated beam {trl}")
subarray_beam_list.append(trl)
requested_channels = beam["number_of_channels"]
number_of_blocks = (requested_channels + 7) // 8
number_of_channels = number_of_blocks * 8
if number_of_channels != requested_channels:
self.logger.warning(
f"Number of channels for beam {subarray_beam_id} is "
f"{requested_channels}, not a multiple of 8"
f"{number_of_channels} channels will be allocated"
)
number_of_channels = number_of_blocks * 8
beam["first_subarray_channel"] = first_subarray_channel
beam["number_of_channels"] = number_of_channels
#
# Allocate resources to apertures. Each aperture requires
# - one station beam device
# - one station hardware beam
# - <number_of_blocks> blocks of station channels
#
for aperture in beam["apertures"]:
station_id = aperture["station_id"]
self.logger.debug(f"Start processing station {station_id}")
self.logger.debug(
f"chans {first_subarray_channel}:" f"{number_of_channels}"
)
station_trl = self._station_id_trl_map.get(station_id, None)
if station_trl is None:
self.logger.error(f"Cannot allocate resources: {station_id}")
return [{"Error": f"Cannot allocate resources: {station_id}"}]
aperture["station_trl"] = station_trl
station_name = station_trl.split(r"/")[-1]
try:
beam_id = aperture["hardware_beam"] = self._stations[
station_trl
].allocate_beam(subarray_trl)
aperture["channel_blocks"] = self._stations[
station_trl
].allocate_blocks(subarray_trl, number_of_blocks)
beam_trls = self.station_beam_trl.get(station_name, None)
if beam_trls is None:
msg = f"Cannot allocate station beam: {station_id}, {beam_id}"
self.logger.error(msg)
return [{"Error": msg}]
beam_trl = beam_trls.get(beam_id + 1, None)
if beam_trl is None:
msg = f"Cannot allocate station beam: {station_id}, {beam_id}"
self.logger.error(msg)
return [{"Error": msg}]
station_beam_list.append(beam_trl)
except ValueError:
self.logger.error(
f"Cannot allocate resources: Station {trl} has no"
f" {number_of_blocks} channel blocks to allocate.",
)
return [
{
"Error": f"Cannot allocate resources: Station {trl} has no"
f" {number_of_blocks} channel blocks to allocate."
}
]
aperture["station_beam_trl"] = beam_trl
self.logger.debug(f"Allocated station beam {beam_trl}")
first_subarray_channel += number_of_channels
self.logger.debug(
f"Allocating to {subarray_trl}: {subarray_beam_list},"
f"{station_beam_list}"
)
try:
self._resource_manager.allocate(
subarray_trl,
subarray_beams=subarray_beam_list,
station_beams=station_beam_list,
)
except ValueError as e:
self._resource_manager.resource_pool.free_resources(
{"station_beams": station_beam_list}
)
self.logger.error(str(e))
return [{"Error": str(e)}]
return subarray_beams
def _assign_resources(
self: ControllerComponentManager,
subarray_id: int,
subarray_beams: list[dict],
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Assign resources to a subarray.
:param subarray_id: id of the subarray to which resources are to
be allocated
:param subarray_beams: list of subarray beam resources
as dictionary
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
# Allocate resources in controller resource managers,
# and update the beam descriptors
# If failed, return error message
new_subarray_beams = self._allocate_controller_resources(
subarray_id, subarray_beams
)
message = new_subarray_beams[0].get("Error", None)
if message:
if task_callback:
task_callback(
status=TaskStatus.REJECTED, result=(ResultCode.REJECTED, message)
)
return
for beam in new_subarray_beams:
subarray_beam_id = beam["subarray_beam_id"]
beam_trl = self._subarray_beam_trl[subarray_beam_id]
result_code = self._subarray_beams[beam_trl].assign_resources(
{
"subarray_id": subarray_id,
"subarray_beam_id": beam["subarray_beam_id"],
"apertures": beam["apertures"],
"first_subarray_channel": beam["first_subarray_channel"],
"number_of_channels": beam["number_of_channels"],
}
)
if result_code == ResultCode.FAILED:
self.release(subarray_id)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"The SubarrayBeam.assign_resources command has failed",
),
)
return
for aperture in beam["apertures"]:
station_beam_trl = aperture["station_beam_trl"]
args = {
"subarray_id": subarray_id,
"subarray_beam_id": subarray_beam_id,
"station_id": aperture["station_id"],
"aperture_id": aperture["aperture_id"],
"channel_blocks": aperture["channel_blocks"],
"hardware_beam": aperture["hardware_beam"],
"station_trl": aperture["station_trl"],
"first_subarray_channel": beam["first_subarray_channel"],
"number_of_channels": beam["number_of_channels"],
}
result_code = self._station_beams[station_beam_trl].assign_resources(
args
)
if result_code == ResultCode.FAILED:
self.release(subarray_id)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"StationBeam.assign_resources command has failed",
),
)
return
# Add beam TRL to configuration, so that subarray knows mapping
# between subarray beams ID and TRL
beam["subarray_beam_trl"] = beam_trl
subarray_trl = self._subarray_trl[subarray_id]
result_code = self._subarrays[subarray_trl].assign_resources(
subarray_id, new_subarray_beams
)
self._desired_obs_state = ObsState.IDLE
self._configuring_resources.add(subarray_trl)
if self._wait_for_obs_state(self._obs_command_timeout) == TaskStatus.FAILED:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
f"Subarray {subarray_id} didn't reach IDLE",
),
)
return
if task_callback:
if ResultCode.FAILED == result_code:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"The AssignResources command has failed",
),
)
else:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The AssignResources command has completed"),
)
[docs] @check_communicating
@check_on
def release(
self: ControllerComponentManager,
subarray_id: int,
) -> tuple[ResultCode, str]:
"""
Release a subarray's resources.
:param subarray_id: ID of the subarray which requires release
:return: a result code and message
"""
if len(self._subarrays.values()) == 0:
return (
ResultCode.REJECTED,
"No subservient subarray devices to release",
)
if self.power_state != PowerState.ON:
return (ResultCode.FAILED, "Controller is not turned on.")
subarray_trl = self._subarray_trl[subarray_id]
self._release_from_subarray(subarray_trl)
return (ResultCode.OK, "Release command completed OK")
[docs] def release_all(
self: ControllerComponentManager,
) -> tuple[ResultCode, str]:
"""
Release all subarrays resources.
:return: a result code and message
"""
if len(self._subarrays.values()) == 0:
return (
ResultCode.REJECTED,
"No subservient subarray devices to release",
)
if self.power_state != PowerState.ON:
return (ResultCode.FAILED, "Controller is not turned on.")
for subarray_trl in self._subarrays.keys():
self._release_from_subarray(subarray_trl)
return (ResultCode.OK, "Release command completed OK")
def _release_from_subarray(
self: ControllerComponentManager,
subarray_trl: str,
) -> None:
"""
Release resources from a subarray.
TODO: Actually forward the ReleaseAllResources() to subdevices
:param subarray_trl: subarray from which resources must be released
"""
self._subarrays[subarray_trl].release_all_resources()
resources = self._resource_manager.get_allocated(subarray_trl)
self.logger.debug(
f"Releasing resources for subarray {subarray_trl}: {resources}"
)
for station in self._stations.values():
station.release_from_subarray(subarray_trl)
trls = resources.get("station_beams", None)
if trls is not None:
self._resource_manager.resource_pool.free_resources({"station_beams": trls})
self._resource_manager.deallocate_from(subarray_trl)
[docs] def get_resources(self: ControllerComponentManager, subarray_id: int) -> str:
"""
Return a dictionary of the resources assigned to a given subarray.
:param subarray_id: The subarray ID of the resources
:return: json formatted dictionary
"""
trl = self._subarray_trl[subarray_id]
resources = json.dumps(self._resource_manager.get_allocated(trl))
self.logger.debug(f"get_resources for {trl}: {resources}")
return resources
[docs] @check_communicating
@check_on
def restart_subarray(
self: ControllerComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Restart an MCCS subarray.
:param subarray_id: an integer subarray_id.
:param task_callback: Update task state, defaults to None
:return: a task status and a message
"""
if len(self._subarrays.values()) == 0:
return (
TaskStatus.REJECTED,
"No subservient subarray devices to restart",
)
if self.power_state != PowerState.ON:
return (TaskStatus.FAILED, "Controller is not turned on.")
return self.submit_task(
self._restart_subarray,
[self._subarray_trl[subarray_id]],
task_callback=task_callback,
)
def _abort_subdevices(
self: ControllerComponentManager,
subarray_trl: str,
) -> TaskStatus:
resources_allocated = self._resource_manager.get_allocated(subarray_trl)
self._desired_obs_state = ObsState.ABORTED
for station_beam_trl in resources_allocated["station_beams"]:
station_beam_proxy = self._station_beams[station_beam_trl]
if self._device_obs_states[station_beam_trl] not in [
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
]:
self.logger.info(f"Aborting {station_beam_trl}")
self._configuring_resources.add(station_beam_trl)
station_beam_proxy.abort()
for subarray_beam_trl in resources_allocated["subarray_beams"]:
subarray_beam_proxy = self._subarray_beams[subarray_beam_trl]
if self._device_obs_states[subarray_beam_trl] not in [
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
]:
self.logger.info(f"Aborting {subarray_beam_trl}")
self._configuring_resources.add(subarray_beam_trl)
subarray_beam_proxy.abort_device()
if self._device_obs_states[subarray_trl] not in [
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
]:
self.logger.info(f"Aborting {subarray_trl}")
self._configuring_resources.add(subarray_trl)
self._subarrays[subarray_trl].abort_device()
return self._wait_for_obs_state(self._obs_command_timeout)
def _restart_subdevices(
self: ControllerComponentManager,
subarray_trl: str,
) -> TaskStatus:
resources_allocated = self._resource_manager.get_allocated(subarray_trl)
self._desired_obs_state = ObsState.EMPTY
for station_beam_trl in resources_allocated["station_beams"]:
station_beam_proxy = self._station_beams[station_beam_trl]
if self._device_obs_states[station_beam_trl] in [
ObsState.FAULT,
ObsState.ABORTED,
]:
self.logger.info(f"Restarting {station_beam_trl}")
self._configuring_resources.add(station_beam_trl)
station_beam_proxy.restart()
for subarray_beam_trl in resources_allocated["subarray_beams"]:
subarray_beam_proxy = self._subarray_beams[subarray_beam_trl]
if self._device_obs_states[subarray_beam_trl] in [
ObsState.FAULT,
ObsState.ABORTED,
]:
self.logger.info(f"Restarting {subarray_beam_trl}")
self._configuring_resources.add(subarray_beam_trl)
subarray_beam_proxy.restart()
if self._device_obs_states[subarray_trl] in [
ObsState.FAULT,
ObsState.ABORTED,
]:
self.logger.info(f"Restarting {subarray_trl}")
self._configuring_resources.add(subarray_trl)
self._subarrays[subarray_trl].restart()
return self._wait_for_obs_state(self._obs_command_timeout)
def _restart_subarray(
self: ControllerComponentManager,
subarray_trl: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Deallocate all resources from a subarray.
:param subarray_trl: TRL of the subarray from which all
resources are to be deallocated
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
task_status = self._abort_subdevices(subarray_trl)
if task_status == TaskStatus.COMPLETED:
task_status = self._restart_subdevices(subarray_trl)
if task_status == TaskStatus.COMPLETED:
self._resource_manager.deallocate_from(subarray_trl)
if task_callback:
if task_status == TaskStatus.FAILED:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "The restart command has failed"),
)
else:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The restart command has completed"),
)
def _device_obs_state_changed(
self: ControllerComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
old_state = self._device_obs_states.get(trl, None)
if old_state:
old_name = old_state.name
else:
old_name = "NONE"
new_state = ObsState(obs_state)
self._device_obs_states[trl] = new_state
if obs_state == self._desired_obs_state and trl in self._configuring_resources:
self._configuring_resources.remove(trl)
self.logger.debug(
f"ObsState for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._configuring_resources)} more devices"
)
def _station_power_state_changed(
self: ControllerComponentManager,
trl: str,
power_state: PowerState,
) -> None:
old_state = self._device_power_states.get(trl, None)
if old_state is not None:
old_name = old_state.name
else:
old_name = "NONE"
new_state = PowerState(power_state)
self._device_power_states[trl] = new_state
if power_state == self._desired_power_state and trl in self._powering_resources:
self._powering_resources.remove(trl)
self.logger.debug(
f"State for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._powering_resources)} more devices"
)
def _wait_for_obs_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-device ObsState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_obs_state is not None
resolution = 0.01 # seconds
ticks = int(timeout / resolution) # 10 ms resolution
while self._configuring_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for ObsState {self._desired_obs_state.name}"
f" in {timeout} seconds. Attempting final poll."
)
return self._final_poll()
self.logger.debug(f"Waited ObsState for {timeout-ticks*resolution} seconds")
return TaskStatus.COMPLETED
def _final_poll(self: ControllerComponentManager) -> TaskStatus:
"""
Direct check of device obsstates as a last attempt before declaring failure.
Sometimes it appears we miss change events in production, if for some reason
we haven't received all the change events we expected, double check what we
think the obsstate is, vs what it actually is on all subdevices that we need to.
:returns: a taskstatus dependent on whether or not the device was actually in
the correct state.
"""
for trl in list(self._configuring_resources):
device = (
self._subarrays.get(trl)
or self._subarray_beams.get(trl)
or self._station_beams.get(trl)
)
if (
device
and device._proxy
and device._proxy.obsstate == self._desired_obs_state
):
self.logger.info(
f"Change event for {trl} was missed, device was in correct state."
)
if self._component_state_callback is not None:
self._component_state_callback(missed_event=True)
self._configuring_resources.remove(trl)
return (
TaskStatus.COMPLETED
if not self._configuring_resources
else TaskStatus.FAILED
)
def _wait_for_power_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for station PowerState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_power_state is not None
resolution = 1 # seconds
ticks = int(timeout / resolution)
while self._powering_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for PowerState in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {self._powering_resources} "
f"PowerState {self._desired_power_state.name},"
f" waiting for {ticks*resolution} more seconds"
)
self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds")
return TaskStatus.COMPLETED
def _wait_for_station_synchronisation_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for station Synchronisation to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_synchronisation_state is not None
resolution = 0.5 # seconds
ticks = int(timeout / resolution)
synchronised = False
synchronising_stations = set(
station_trl for station_trl in self._stations.keys()
)
synchronised_stations = set()
self.logger.debug(
f"Waiting for stations to synchronise: {synchronising_stations}"
)
while not synchronised:
# Wait for ALL stations to sync.
for station_trl in synchronising_stations:
station_proxy = self._stations[station_trl]
if not station_proxy.issynchronised:
synchronised = False
break
synchronised_stations.add(station_trl)
synchronised = True
self.logger.debug(f"{station_trl} achieved synchronisation!")
self.logger.debug(f"{ticks*resolution}s remaining on wait time.")
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting {synchronising_stations} "
f"for Synchronisation in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {synchronising_stations} Synchronisation"
f" {self._desired_synchronisation_state},"
f" waiting for {ticks*resolution} more seconds"
)
synchronising_stations -= synchronised_stations
self.logger.debug(
f"Waited Synchronisation for {timeout-ticks*resolution} seconds"
)
return TaskStatus.COMPLETED
def _wait_for_initialisation_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for station Initialisation to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_initialisation_state is not None
resolution = 0.5 # seconds
ticks = int(timeout / resolution)
initialised = False
initialised_stations = set()
while not initialised:
for station_trl in self._initialising_resources:
station_proxy = self._stations[station_trl]
if not station_proxy.isinitialised:
initialised = False
break
initialised_stations.add(station_trl)
initialised = True
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting {self._initialising_resources} "
f"for Initialisation in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {self._initialising_resources} Initialisation"
f" {self._desired_initialisation_state},"
f" waiting for {ticks*resolution} more seconds"
)
self._initialising_resources -= initialised_stations
self.logger.debug(
f"Waited Initialisation for {timeout-ticks*resolution} seconds"
)
return TaskStatus.COMPLETED