# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for the MCCS controller."""
# pylint: disable=too-many-lines
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Iterable, Optional
import erfa # type: ignore
import numpy as np
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_low_mccs_common import EventSerialiser
from ska_low_mccs_common.communication_manager import CommunicationManager
from ska_low_mccs_common.component import (
DeviceComponentManager,
ObsDeviceComponentManager,
)
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
CompositeCommandResultEvaluator,
MccsCompositeCommandProxy,
)
from ska_low_mccs_common.resource_manager import ResourceManager, ResourcePool
from ska_low_mccs_common.utils import lock_power_state
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from tango import DevFailed, DeviceProxy, EnsureOmniThread
from ska_low_mccs.controller.controller_resource_manager import (
ControllerResourceManager,
)
from ska_low_mccs.utils import subarraybeam_trl_from_ids
__all__ = ["ControllerComponentManager"]
_task_to_result = {
TaskStatus.COMPLETED: ResultCode.OK,
TaskStatus.ABORTED: ResultCode.ABORTED,
TaskStatus.FAILED: ResultCode.FAILED,
}
RFC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
# SKA longitude location. Used for RMS calculation
LON_SKA_LOW = 116.764
class _StationProxy(DeviceComponentManager):
"""
A controller's proxy to a station.
Each station proxy keeps track of channels it has available. Each
station proxy also has its own resource manager to keep track of
which of its channels are assigned to each subarray.
"""
# pylint: disable=too-many-arguments
def __init__(
self: _StationProxy,
trl: str,
subarray_trls: Iterable[str],
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
"""
Initialise a new instance.
:param trl: the TRL of the device
:param subarray_trls: the TRLs of subarrays which channel
blocks can be assigned to.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be
called when the status of the communications channel
between the component manager and its component changes
:param component_state_callback: callback to be
called when the component state changes
:param event_serialiser: an optional event serialiser to
use when serialising events
"""
self._channel_block_pool = ResourcePool(
channel_blocks=range(48),
hardware_beams=range(48),
)
self._resource_manager = ResourceManager(
subarray_trls,
channel_blocks=range(48),
hardware_beams=range(48),
)
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
event_serialiser=event_serialiser,
)
def allocate_blocks(
self: _StationProxy, subarray_trl: str, channel_blocks: int
) -> list:
"""
Allocate channel blocks to a subarray.
This method removes the requested number of channel blocks from
the available pool and assigns them to the provided subarray
trl.
:param subarray_trl: The TRL of the subarray to which the
channel blocks are to be assigned.
:param channel_blocks: The number of channel blocks to assign to
the subarray.
:return: a list of blocks or None if allocation failed
:raises ValueError: if the resources cannot be allocated
"""
channel_blocks_to_allocate: list[int] = []
block = None
for _ in range(channel_blocks):
try:
block = self._channel_block_pool.get_free_resource("channel_blocks")
except ValueError as exc:
self._channel_block_pool.free_resources(
{"channel_blocks": channel_blocks_to_allocate}
)
raise ValueError from exc
channel_blocks_to_allocate.append(block) # type: ignore [arg-type]
self._resource_manager.allocate(
subarray_trl, channel_blocks=channel_blocks_to_allocate
)
return channel_blocks_to_allocate
def allocate_beam(self: _StationProxy, subarray_trl: str) -> int:
"""
Allocate a hardware beam to a subarray.
This method removes one hardware beam from the available pool
and assigns it to the provided subarray TRL.
:param subarray_trl: The TRL of the subarray to which the
hardware beam is to be assigned.
:return: index of allocated beam, or None
"""
beam_to_allocate = self._channel_block_pool.get_free_resource("hardware_beams")
self._resource_manager.allocate(subarray_trl, hardware_beams=[beam_to_allocate])
return beam_to_allocate # type: ignore [return-value]
def release_from_subarray(self: _StationProxy, subarray_trl: str) -> None:
"""
Release all channel blocks assigned to a subarray.
Channel blocks and beams are released from the subarray and
marked as free in the station proxy's device pool for
reallocation whenever needed.
:param subarray_trl: The TRL of the subarray from which this
station proxy's channel blocks are to be released.
"""
resources_to_release = self._resource_manager.get_allocated(subarray_trl)
self._channel_block_pool.free_resources(resources_to_release)
self._resource_manager.deallocate_from(subarray_trl)
@check_communicating
def start_acquisition(self: _StationProxy, argin: str) -> ResultCode:
"""
Start the acquisition synchronously for all tiles, checks for synchronisation.
:param argin: json dictionary with optional keywords
* start_time - (str) start time
* delay - (int) delay start
:return: result code of StartAcquisition
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.StartAcquisition(argin)
return result_code
@check_communicating
def initialise(self: _StationProxy) -> ResultCode:
"""
Initialise the station's tiles.
:return: result code of Initialise
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.Initialise()
return result_code
@check_communicating
def trigger_adc_equalisation(
self: _StationProxy, args: str
) -> tuple[list[ResultCode], list[str]]:
"""
Start a station's Adc Equalisation.
:param args: the arguments as json formatted string
:return: result code of Initialise
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.TriggerAdcEqualisation(args)
def check_lrc_status(self: _StationProxy, uid: str) -> str:
"""
Return the last longRunningCommandResult.
:param uid: command id
:returns: True is all tiles in this station are synchronised
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.CheckLongRunningCommandStatus(uid)
@property
def issynchronised(self: _StationProxy) -> bool:
"""
Returns True is all tiles in this station are synchronised.
:returns: True is all tiles in this station are synchronised
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.issynchronised
@property
def isinitialised(self: _StationProxy) -> bool:
"""
Returns True is all tiles in this station are initialised.
:returns: True is all tiles in this station are initialised
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.isinitialised
def get_available_blocks(self: _StationProxy) -> int:
"""
Get the number of available channels.
:returns: the number of available channels
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
free_resources = self._resource_manager.get_unallocated()
available_blocks = list(free_resources["channel_blocks"])
return len(available_blocks)
class _SubarrayProxy(ObsDeviceComponentManager):
"""A controller's proxy to a subarray."""
@check_communicating
@check_on
def assign_resources(
self: _SubarrayProxy, subarray_id: int, subarray_beams: Iterable[dict]
) -> ResultCode:
"""
Tell the subarray what resources are assigned to it.
The Subarray receives the allocated resources, together with the
subarray allcation description.
This implicitly deallocates all resources.
Subarray is described as a set of station beams
:param subarray_id: This is mainly used for cross check. Subarray knows its ID
:param subarray_beams: list of subarray beam description. Structure containing
* subarray_beam_id: integer
* subarray_beam_trl: str
* first_subarray_channel: (int) First logical channel assigned to subarray
* number_of_channels
* apertures: list of dictionaries iwith each entry containing
* * station_id: (int) in range 1-512
* * aperture_id: (str) with format APx.y; x must match station_ID)
* * station_beam_trl: (str)
* * channel_blocks: Allocated channel blocks for this station
* * hardware_beam: Allocated hardware beam for this station
TRLs of stations are not specified, as they are not used in the Subarray.
Station beam TRLs could also be omitted.
:return: a result code.
"""
assert self._proxy is not None
args = json.dumps(
{"subarray_id": subarray_id, "subarray_beams": subarray_beams}
)
self.logger.debug(f"Subarray AssignResources: {args}")
try:
([result_code], _) = self._proxy.AssignResources(args)
except Exception as exc: # pylint: disable=broad-except
self.logger.debug(f"Caught exception: {exc}")
result_code = ResultCode.FAILED
return result_code
@check_communicating
@check_on
def release_all_resources(
self: _SubarrayProxy,
) -> ResultCode:
"""
Tell the subarray that it no longer has any resources.
:return: a result code.
"""
assert self._proxy is not None
release_command = MccsCommandProxy(
self._name, "ReleaseAllResources", self._logger
)
result_code, _ = release_command()
return ResultCode(result_code)
@check_communicating
def restart(
self: _SubarrayProxy,
) -> ResultCode:
"""
Restart the subarray and all related sub-elements.
:return: a result code.
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Restart()
return result_code
@check_communicating
def abort_device(self: _SubarrayProxy) -> ResultCode:
"""
Abort this device without aborting subdevices.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.AbortDevice()
return result_code
class _SubarrayBeamProxy(ObsDeviceComponentManager):
"""A controller's proxy to a subarray beam."""
@check_communicating
@check_on
def assign_resources(
self: _SubarrayBeamProxy,
subarray_beam: dict[str, Any],
) -> ResultCode:
"""
Assign resources to the Subarray Beam.
:param subarray_beam: subarray beam description. Structure containing
* subarray_id: This is used for cross check. Subarray knows its ID
* subarray_beam_id: integer
* first_subarray_channel: (int) First logical channel assigned to subarray
* number_of_channels: The number of SPS channels assigned to subarray
* apertures: list of dictionaries with each entry containing
* * station_id: (int) in range 1-512
* * aperture_id: (str) with format APx.y; x must match station_ID
* * station_trl: (str)
* * station_beam_trl: (str)
* * channel_blocks: Allocated channel blocks for this station
* * hardware_beam: Allocated hardware beam for this station
:return: a result code
"""
assert self._proxy is not None
self.logger.debug(f"SubarrayBeam AssignResources: {subarray_beam}")
args = json.dumps(subarray_beam)
try:
result_code = self._proxy.AssignResources(args)
except Exception as exc: # pylint: disable=broad-except
self.logger.error(f"Caught exception {exc}")
result_code = ResultCode.FAILED
return result_code
@check_communicating
def abort_device(self: _SubarrayBeamProxy) -> ResultCode:
"""
Abort this device without aborting subdevices.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.AbortDevice()
return result_code
@check_communicating
def restart(self: _SubarrayBeamProxy) -> ResultCode:
"""
Restart this device without aborting subdevices.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Restart()
return result_code
@check_communicating
def set_parent_trl(self: _SubarrayBeamProxy, new_parent_trl: str) -> ResultCode:
"""
Update the device with a new parent TRL.
:param new_parent_trl: the trl of the new parent device.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.SetParentTrl(new_parent_trl)
return result_code
@check_communicating
def reset_parent_trl(self: _SubarrayBeamProxy) -> ResultCode:
"""
Update the device with a new parent TRL.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.ResetParentTrl()
return result_code
class _StationBeamProxy(ObsDeviceComponentManager):
"""A controller's proxy to a station beam."""
@check_communicating
@check_on
def assign_resources(
self: _StationBeamProxy,
station_beam: dict[str, Any],
) -> ResultCode:
"""
Assign resources to a station beam.
:param station_beam: Dictionary with command arguments
* param subarray_id: The ID of the subarray to which this beam is assigned
* param subarray_beam_id: The ID of the subarray beam to which this beam
is assigned
* param station_id: ID of the station which implemnts the beam
* param aperture_id: ID of the form APx.y with X equal to the station ID
* param channel_blocks: Channel blocks allocated to this beam
* param hardware_beam: Hardware beam allocated to this beam
:return: a result code
"""
assert self._proxy is not None
self.logger.debug(f"StationBeam AssignResources: {station_beam}")
json_args = json.dumps(station_beam)
try:
result_code = self._proxy.AssignResources(json_args)
except Exception as exc: # pylint: disable=broad-except
self.logger.debug(f"Catched exception {exc}")
result_code = ResultCode.FAILED
return result_code
@check_communicating
def restart(self: _StationBeamProxy) -> ResultCode:
"""
Restart this device.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Restart()
return result_code
@check_communicating
def abort(
self: _StationBeamProxy, task_callback: TaskCallbackType | None = None
) -> tuple[TaskStatus, str]:
"""
Abort this device and its station.
:param task_callback: Update task state, defaults to None
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.Abort()
return result_code
@check_communicating
def set_parent_trl(self: _StationBeamProxy, new_parent_trl: str) -> ResultCode:
"""
Update the device with a new parent TRL.
:param new_parent_trl: the trl of the new parent device.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.SetParentTrl(new_parent_trl)
return result_code
@check_communicating
def reset_parent_trl(self: _StationBeamProxy) -> ResultCode:
"""
Update the device with a new parent TRL.
:return: a result code
"""
assert self._proxy is not None
([result_code], _) = self._proxy.ResetParentTrl()
return result_code
# pylint: disable=too-many-instance-attributes
[docs]
class ControllerComponentManager(TaskExecutorComponentManager):
"""
A component manager for an MCCS controller.
This component manager has three jobs:
* Monitoring of the devices in the MCCS subsystem
* Powering the MCCS subsystem off and on
* Allocating resources to subarrays
"""
# pylint: disable=too-many-arguments, too-many-locals
[docs]
def __init__(
self: ControllerComponentManager,
subarray_trls: Iterable[str],
station_trls: Iterable[str],
subarray_beam_trls: Iterable[str],
station_beam_trls: Iterable[str],
logger: logging.Logger,
obs_command_timeout: int,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
"""
Initialise a new instance.
:param subarray_trls: TRLS of all subarray devices
:param station_trls: TRLS of all station devices
:param subarray_beam_trls: TRLS of all subarray beam devices
:param station_beam_trls: TRLS of all station beam devices
:param obs_command_timeout: the default timeout for obs
commands in seconds.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be
called when the status of the communications channel
between the component manager and its component changes
:param component_state_callback: callback to be
called when the component state changes
:param event_serialiser: an optional event serialiser to
use when serialising events
"""
self._event_serialiser = event_serialiser
self._communication_state_callback = communication_state_callback
self._component_state_callback = component_state_callback
self._obs_command_timeout = obs_command_timeout
self._power_state_lock = threading.RLock()
self._device_communication_states: dict[str, CommunicationStatus] = {}
self._device_obs_states: dict[str, ObsState] = {}
self._device_power_states: dict[str, PowerState] = {}
self._configuring_resources: dict[str, set[str]] = defaultdict(
set
) # these resources should go
self._desired_obs_state: dict[str, ObsState] = {} # to this obs state
self._powering_resources: set[str] = set() # these stations should go
self._desired_power_state: Optional[PowerState] = None # to this power state
self._power_command_in_progress = (
threading.Lock()
) # Used to lock PowerState during power command execution.
# this set of stastions should go
self._synchronising_resources: set[str] = set()
self._desired_synchronisation_state: Optional[
bool
] = None # to this synchronisation state
self._initialising_resources: set[str] = set()
self._desired_initialisation_state: Optional[bool] = None
self._station_ids = [
station_trl.split(r"/")[-1] for station_trl in station_trls
]
for trl in subarray_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
for trl in station_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
for trl in subarray_beam_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
for trl in station_beam_trls:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
self._resource_manager = ControllerResourceManager(
subarrays=subarray_trls,
subarray_beams=subarray_beam_trls,
station_beams=station_beam_trls,
channel_blocks=range(1, 49),
)
self._subarrays: dict[str, _SubarrayProxy] = {
trl: _SubarrayProxy(
trl,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
event_serialiser=self._event_serialiser,
)
for trl in subarray_trls
}
self._stations: dict[str, _StationProxy] = {
trl: _StationProxy(
trl,
subarray_trls,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
event_serialiser=self._event_serialiser,
)
for trl in station_trls
}
self._subarray_beams: dict[str, _SubarrayBeamProxy] = {
trl: _SubarrayBeamProxy(
trl,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
event_serialiser=self._event_serialiser,
)
for trl in subarray_beam_trls
}
self._station_beams: dict[str, _StationBeamProxy] = {
trl: _StationBeamProxy(
trl,
logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
event_serialiser=self._event_serialiser,
)
for trl in station_beam_trls
}
# dictionaries to map IDs to TRLs
self._subarray_trl: dict[int, str] = dict(enumerate(subarray_trls, start=1))
# Map from (subarray_id, local_beam_id) pairs to TRLs.
self._subarray_beam_trl: dict[tuple[int, int], str] = {}
for subarray_beam_trl in subarray_beam_trls:
subarray_id = int(subarray_beam_trl.rsplit("/", 1)[-1].split("-")[0])
local_subarray_beam_id = int(
subarray_beam_trl.rsplit("/", 1)[-1].split("-")[1]
)
self._subarray_beam_trl[
(subarray_id, local_subarray_beam_id)
] = subarray_beam_trl
# Station ID to (station_beam_id -> station_beam_trl)
self.station_beam_trl: dict[str, dict[int, str]] = {}
for station_id in self._station_ids:
# Get all station beam TRLS which are associated with a given station
station_beam_trl_in_station = [
station_beam_trl
for station_beam_trl in station_beam_trls
if station_id in station_beam_trl
]
# Map station beam ID to station beam TRL
self.station_beam_trl[station_id] = dict(
enumerate(station_beam_trl_in_station, start=1)
)
self._station_id_trl_map: dict[int, str] = {}
self._device_proxies: dict[str, dict] = {
"subarray": self._subarrays,
"station": self._stations,
"subarraybeam": self._subarray_beams,
"beam": self._station_beams,
}
self._started_acquisition = False
self._assigning_subarray: Optional[int] = None
self._abort_complete = threading.Event()
super().__init__(
logger,
communication_state_callback,
component_state_callback,
power=None,
fault=None,
)
self._communication_manager = CommunicationManager(
self._update_communication_state,
self._update_component_state,
self.logger,
self._stations,
self._subarrays,
self._subarray_beams,
self._station_beams,
)
[docs]
def cleanup(self: ControllerComponentManager) -> None:
"""Cleanup proxies and task executor."""
self._communication_manager.shutdown()
for station_proxy in self._stations.values():
station_proxy.cleanup()
for subarray_proxy in self._subarrays.values():
subarray_proxy.cleanup()
for subarray_beam_proxy in self._subarray_beams.values():
subarray_beam_proxy.cleanup()
for station_beam_proxy in self._station_beams.values():
station_beam_proxy.cleanup()
super().cleanup()
[docs]
def start_communicating(self: ControllerComponentManager) -> None:
"""Establish communication with the station components."""
self._communication_manager.start_communicating()
[docs]
def stop_communicating(self: ControllerComponentManager) -> None:
"""Break off communication with the station components."""
self._communication_manager.stop_communicating()
def _device_communication_state_changed(
self: ControllerComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
"""
Handle communication changes.
:param trl: TRL of changed device
:param communication_state: new status
"""
self._communication_manager.update_communication_status(
trl, communication_state
)
def _evaluate_power_state(self: ControllerComponentManager) -> None:
if self._power_command_in_progress.locked():
# Suppress power state evaluation whilst power command in progress.
# This is to prevent the Controller changing to PowerState.ON before all
# sub-devices have had a chance to turn on/off.
return
if any(
power_state == PowerState.ON
for power_state in self._device_power_states.values()
):
power_state_target = PowerState.ON
elif any(
power_state == PowerState.STANDBY
for power_state in self._device_power_states.values()
):
power_state_target = PowerState.STANDBY
elif any(
power_state == PowerState.OFF
for power_state in self._device_power_states.values()
):
power_state_target = PowerState.OFF
elif all(
power_state == PowerState.UNKNOWN
for power_state in self._device_power_states.values()
):
power_state_target = PowerState.UNKNOWN
else:
power_state_target = PowerState.ON
if len(self._device_power_states) == 0:
# related to test_controller_no_subserviants
# previously, when no devices existed it would default to ON as the last
# state now this would default to OFF, so we must overwrite it
power_state_target = PowerState.ON
self.logger.info(
"In ControllerComponentManager._evaluatePowerState with:\n"
f"\tdevices: {self._device_power_states}\n"
f"\tresult: {str(power_state_target)}"
)
self._update_component_state(power=power_state_target)
@property
def power_state(self: ControllerComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
def _subarray_health_changed(
self: ControllerComponentManager,
trl: str,
health: HealthState | None,
) -> None:
"""
Handle a change in the health of a subarray.
:param trl: the TRL of the subarray whose health has changed.
:param health: the new health state of the subarray, or None if
the subarray's health should not be taken into account.
"""
# What we're really interested in here is whether the subarray is in the right
# adminMode. We know that when it's in the wrong adminMode, health will be
# reported as None. So instead of subscribing to adminMode, we might as well be
# lazy here and get at the adminMode thorough the healthState that we're already
# subscribed to.
self._resource_manager.set_ready(trl, health is not None)
def _subarray_beam_health_changed(
self: ControllerComponentManager,
trl: str,
health: HealthState | None,
) -> None:
"""
Handle a change in the health of a subarray_beam.
:param trl: the TRL of the subarray_beam whose health has
changed.
:param health: the new health state of the subarray_beam, or
None if the subarray_beam's health should not be taken
into account.
"""
def _station_beam_health_changed(
self: ControllerComponentManager,
trl: str,
health: HealthState | None,
) -> None:
"""
Handle a change in the health of a station_beam.
:param trl: the TRL of the station_beam whose health has
changed.
:param health: the new health state of the station_beam, or None
if the station_beam's health should not be taken into account.
"""
[docs]
def get_health_trl(
self: ControllerComponentManager, trl: str
) -> Optional[HealthState]:
"""
Return the health of a subdevice with given TRL.
:param trl: TRL of device to return health of
:return: health of device given by TRL
:raises ValueError: if TRL is for an invalid device
"""
device = trl.split("/")[1]
if (
device not in self._device_proxies
or trl not in self._device_proxies[device]
):
raise ValueError(f"{trl} is an invalid device.")
return self._device_proxies[device][trl].health
[docs]
def get_healths(
self: ControllerComponentManager, device_type: str = "all"
) -> dict[str, dict[str, Optional[str]]]:
"""
Return subdevice healths.
:param device_type: the type of device to return the
health state of its instances.
:return: dictionary of [device_type: [TRL : HealthState]] for
each device of that device type, or all
devices if called without an argument.
:raises ValueError: if device_type is not a valid device type.
"""
devices_healths: dict[str, dict[str, Optional[str]]] = {}
if device_type == "all":
devices = [*self._device_proxies]
else:
if device_type not in self._device_proxies:
raise ValueError(f"{device_type} is not a valid device type.")
devices = [device_type]
for device in devices:
device_healths: dict[str, Optional[str]] = {}
for trl in self._device_proxies[device]:
health_trl = self.get_health_trl(trl)
if health_trl is not None:
health = HealthState(health_trl).name
else:
health = None
device_healths.update({trl: health})
devices_healths.update({device: device_healths})
return devices_healths
[docs]
@check_communicating
@lock_power_state
def off( # type: ignore[override]
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn off the MCCS subsystem.
This is simpler than turning ON MCCS, we simply need to
command all MccsStations to turn off and wait for it to happen.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
# Command all our MccsStations to turn OFF which aren't already OFF.
self._desired_power_state = PowerState.OFF
results: list[tuple[str, TaskStatus]] = []
for station_trl, station_proxy in self._stations.items():
if self._device_power_states.get(station_trl) != PowerState.OFF:
results.append((station_trl, station_proxy.off()[0]))
self._powering_resources.add(station_trl)
# Did any MccsStations reject our request?
for result in results:
if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED):
if task_callback:
task_callback(
status=result[1],
result=(
_task_to_result[result[1]],
f"{result[0]} responded to OFF command with {result[1]}",
),
)
return
# All MccsStations not OFF already are turning OFF, let's wait
# for that to happen.
wait_for_stations = self._wait_for_power_state(600)
if wait_for_stations == TaskStatus.FAILED:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"Timed out waiting for MccsStations to turn off",
),
)
return
if wait_for_stations == TaskStatus.ABORTED:
if task_callback:
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
"Aborted waiting for MccsStations to turn off",
),
)
return
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The off command has completed"),
)
[docs]
@check_communicating
@lock_power_state
def standby( # type: ignore[override]
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Put the MCCS subsystem into low power standby mode.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
results = [station_proxy.standby() for station_proxy in self._stations.values()]
completed = True
for result in results:
if result[0] == TaskStatus.FAILED:
completed = False
break
if task_callback:
if completed:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The standby command has completed"),
)
else:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "The standby command has failed"),
)
# pylint: disable=too-many-branches
[docs]
@check_communicating
@lock_power_state
def on( # type: ignore[override]
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn on the MCCS subsystem.
The procedure involves turning on all Stations.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
# Command all our MccsStations to turn ON which aren't already ON.
self._desired_power_state = PowerState.ON
results: list[tuple[str, TaskStatus]] = []
for station_trl, station_proxy in self._stations.items():
if self._device_power_states.get(station_trl) != PowerState.ON:
results.append((station_trl, station_proxy.on()[0]))
self._powering_resources.add(station_trl)
# Did any MccsStations reject our request?
for result in results:
if result[1] in (TaskStatus.REJECTED, TaskStatus.ABORTED):
if task_callback:
task_callback(
status=result[1],
result=(
_task_to_result[result[1]],
f"{result[0]} responded to ON command with {result[1]}",
),
)
return
# All MccsStations not ON already are turning ON, let's wait
# for that to happen.
wait_for_stations = self._wait_for_power_state(600)
if wait_for_stations == TaskStatus.FAILED:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"Timed out waiting for MccsStations to turn on",
),
)
return
if wait_for_stations == TaskStatus.ABORTED:
if task_callback:
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
"Aborted waiting for MccsStations to turn on",
),
)
return
# All of our MccsStations are now ON. So we must do required
# Initialisation/Synchronisation.
self._started_acquisition = False
# All MccsStations already Synchronised, we're happy to call the job done.
if all(
station_proxy.issynchronised for station_proxy in self._stations.values()
):
self.logger.debug("All stations synchronsied.")
self._started_acquisition = True
elif all(
station_proxy.isinitialised for station_proxy in self._stations.values()
):
self.logger.debug("Reinitialising tiles, temporary hack")
initialised = self._intialise_stations()
if initialised:
self.logger.debug(
"All stations initialised, beginning synchronisation."
)
self._started_acquisition = self._synchronise_stations()
# Our MccsStations are all mismatched, lets re-initialise all of them,
# then resynchronise all of them.
elif all(
station_proxy.isinitialised or station_proxy.issynchronised
for station_proxy in self._stations.values()
):
self.logger.debug(
"Some stations are intialised, others"
" synchronised. Reinitialising and synchronising all stations."
)
initialised = self._intialise_stations()
if initialised:
self._started_acquisition = self._synchronise_stations()
# If any of our MccsStations are not in Initialised or Synchronised, something
# is amiss and we have not succeeded.
# There is currently a bug whereby the MccsTile can be
# ON when not Initialised or Synchronised
# This is temporary MCCS-2104 should resolve this.
else:
self.logger.warning(
"There are stations which are not Initialised or "
"Synchronised, they should be in either of these states."
)
initialised = self._intialise_stations()
if initialised:
self._started_acquisition = self._synchronise_stations()
if task_callback:
if self._started_acquisition:
self.logger.info("The On command has completed")
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The On command has completed"),
)
else:
self.logger.info("The On command has failed")
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "The On command has failed"),
)
def _synchronise_stations(self: ControllerComponentManager) -> bool:
start_acq_results = []
for station_trl, station_proxy in self._stations.items():
self._desired_synchronisation_state = True
start_acq_results += [station_proxy.start_acquisition(json.dumps({}))]
self._synchronising_resources.add(station_trl)
if all(
result not in (TaskStatus.REJECTED, TaskStatus.ABORTED)
for result in start_acq_results
) and self._wait_for_station_synchronisation_state(30) not in (
TaskStatus.FAILED,
TaskStatus.ABORTED,
):
return True
return False
def _intialise_stations(self: ControllerComponentManager) -> bool:
initialise_results = []
for station_trl, station_proxy in self._stations.items():
self._desired_initialisation_state = True
initialise_results += [station_proxy.initialise()]
self._initialising_resources.add(station_trl)
if all(
result not in (TaskStatus.REJECTED, TaskStatus.ABORTED)
for result in initialise_results
) and self._wait_for_initialisation_state(120) not in (
TaskStatus.FAILED,
TaskStatus.ABORTED,
):
return True
return False
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
[docs]
@check_communicating
@check_on
def allocate(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
subarray_id: int,
subarray_beams: list[dict],
) -> None:
"""
Allocate and distribute a set of unallocated MCCS resources to a subarray.
The kwargs argument specifies the overall sub-array composition
in terms of which stations should be allocated to the specified
subarray_beam. It contains:
:param interface: the schema version this is running against.
:param subarray_id: int, ID of the subarray which requires allocation
:param subarray_beams: list of dictionaries, each sepcifying a beam allocation
:param task_callback: callback to signal end of command
"""
self._assign_resources(
subarray_id,
subarray_beams,
task_callback=task_callback,
)
def _create_station_id_trl_map(self: ControllerComponentManager) -> None:
"""
Populate a dictionary with mapping from station_id to TRL.
:raises AttributeError: If the station does not have
expected station_id property.
"""
tmp_map = {}
station_id_property = "StationId"
for station_trl, station in self._stations.items():
assert station._proxy is not None # for the type checker
station_id = station._proxy.stationID
if station_id:
tmp_map[station_id] = station_trl
else:
raise AttributeError(
f"Station {station_trl} has no property value {station_id_property}"
)
self._station_id_trl_map = tmp_map
# pylint: disable=too-many-return-statements
def _allocate_controller_resources(
self: ControllerComponentManager,
subarray_id: int,
subarray_beams: list[dict],
task_abort_event: Optional[threading.Event] = None,
) -> list[dict]:
"""
Allocate a set of unallocated MCCS resources to a subarray.
:param subarray_id: int, ID of the subarray which requires allocation
:param subarray_beams: list of dictionaries, each specifying a beam allocation
:param task_abort_event: optional event to check for task abort
:return: updates list dictionaries, each specifying a beam allocation
or a single dictionary with a single "Error" element
"""
subarray_trl = self._subarray_trl.get(subarray_id, None)
if subarray_trl is None:
return [{"Error": f"Subarray {subarray_id} not present"}]
if not subarray_beams:
return [{"Error": "No beams defined"}]
self.logger.debug(
f"Device {subarray_trl} power: "
f"{self._subarrays[subarray_trl]._component_state['power']}"
)
self._desired_obs_state[subarray_trl] = ObsState.EMPTY
self._configuring_resources[subarray_trl].add(subarray_trl)
self._release_from_subarray(subarray_trl)
if (
self._wait_for_obs_state(subarray_trl, self._obs_command_timeout)
== TaskStatus.FAILED
):
return [{"Error": f"Subarray {subarray_id} didn't reach EMPTY"}]
# Map from the station_id to station_trl
if not self._station_id_trl_map:
self._create_station_id_trl_map()
# check that devices are available. Drop from the configuration those who are
# not physically present
first_subarray_channel = 0
subarray_beam_list: list[str] = []
station_beam_list: list[str] = []
for beam in list(subarray_beams): # copy to allow safe removal
local_subarray_beam_id = beam["subarray_beam_id"]
trl = subarraybeam_trl_from_ids(subarray_id, local_subarray_beam_id)
self.logger.debug(
f"Start processing beam local={local_subarray_beam_id}, "
f"(subarray={subarray_id}) -> trl={trl}",
)
if trl not in self._subarray_beams:
subarray_beams.remove(beam)
self.logger.warning(
f"Subarray beam {local_subarray_beam_id} (trl {trl}) not present "
f"in subarray {subarray_id}, dropped from allocation"
)
continue
self.logger.debug(f"Allocated beam {trl}")
subarray_beam_list.append(trl)
requested_channels = beam["number_of_channels"]
number_of_blocks = (requested_channels + 7) // 8
number_of_channels = number_of_blocks * 8
if number_of_channels != requested_channels:
self.logger.warning(
f"Number of channels requested for {trl} ({requested_channels}) "
"is not a multiple of 8; "
f"{number_of_channels} channels will be allocated",
)
beam["first_subarray_channel"] = first_subarray_channel
beam["number_of_channels"] = number_of_channels
#
# Allocate resources to apertures. Each aperture requires
# - one station beam device
# - one station hardware beam
# - <number_of_blocks> blocks of station channels
#
for aperture in beam["apertures"]:
station_id = aperture["station_id"]
self.logger.debug(f"Start processing station {station_id}")
self.logger.debug(
f"chans {first_subarray_channel}:" f"{number_of_channels}"
)
station_trl = self._station_id_trl_map.get(station_id, None)
if station_trl is None:
self.logger.error(f"Cannot allocate resources: {station_id}")
return [{"Error": f"Cannot allocate resources: {station_id}"}]
aperture["station_trl"] = station_trl
station_name = station_trl.split(r"/")[-1]
available_blocks = self._stations[station_trl].get_available_blocks()
try:
beam_id = aperture["hardware_beam"] = self._stations[
station_trl
].allocate_beam(subarray_trl)
aperture["channel_blocks"] = self._stations[
station_trl
].allocate_blocks(subarray_trl, number_of_blocks)
beam_trls = self.station_beam_trl.get(station_name, None)
if beam_trls is None:
msg = f"Cannot allocate station beam: {station_id}, {beam_id}"
self.logger.error(msg)
return [{"Error": msg}]
beam_trl = beam_trls.get(beam_id + 1, None)
if beam_trl is None:
msg = f"Cannot allocate station beam: {station_id}, {beam_id}"
self.logger.error(msg)
return [{"Error": msg}]
station_beam_list.append(beam_trl)
except ValueError as err:
error_response = (
"Failed to allocate controller resources due to the following"
f" error: {str(err)}. Requested number of channels/blocks: "
f"{number_of_channels}/{number_of_blocks} out of "
f"{available_blocks*8}/{available_blocks}, "
f"on {station_trl} for beam {local_subarray_beam_id} in "
f"subarray {subarray_id} and aperture "
f"{aperture['aperture_id']}"
)
self.logger.error(error_response)
return [{"Error": error_response}]
aperture["station_beam_trl"] = beam_trl
self.logger.debug(f"Allocated station beam {beam_trl}")
first_subarray_channel += number_of_channels
self.logger.debug(
f"Allocating to {subarray_trl}: {subarray_beam_list},"
f"{station_beam_list}"
)
try:
self._resource_manager.allocate(
subarray_trl,
subarray_beams=subarray_beam_list,
station_beams=station_beam_list,
)
except ValueError as e:
self._resource_manager.resource_pool.free_resources(
{"station_beams": station_beam_list}
)
self.logger.error(str(e))
return [{"Error": str(e)}]
return subarray_beams
def _check_aborted(
self: ControllerComponentManager,
task_abort_event: Optional[threading.Event],
task_callback: Optional[Callable],
method_name: str,
) -> bool:
if task_abort_event is None or task_callback is None:
self.logger.warning(f"Cannot check {method_name} for abort status")
return False
if task_abort_event.is_set():
self.logger.info(f"{method_name} has been aborted")
task_callback(
status=TaskStatus.ABORTED,
result=(ResultCode.ABORTED, f"{method_name} aborted"),
)
return True
return False
def _assign_resources(
self: ControllerComponentManager,
subarray_id: int,
subarray_beams: list[dict],
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Assign resources to a subarray.
:param subarray_id: id of the subarray to which resources are to
be allocated
:param subarray_beams: list of subarray beam resources
as dictionary
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self._assigning_subarray = subarray_id
if self._check_aborted(task_abort_event, task_callback, "AssignResources"):
self._assigning_subarray = None
return
self.logger.debug(f"Allocate command started for subarray {subarray_id}")
self.logger.debug(f"subarray definition: {subarray_beams}")
# Allocate resources in controller resource managers,
# and update the beam descriptors
# If failed, return error message
new_subarray_beams = self._allocate_controller_resources(
subarray_id, subarray_beams, task_abort_event
)
message = new_subarray_beams[0].get("Error", None)
if message:
if task_callback:
task_callback(
status=TaskStatus.FAILED, result=(ResultCode.FAILED, message)
)
return
for beam in new_subarray_beams:
if self._check_aborted(task_abort_event, task_callback, "AssignResources"):
self._assigning_subarray = None
return
local_id = beam["subarray_beam_id"]
beam_trl = self._subarray_beam_trl[(subarray_id, local_id)]
self._subarray_beams[beam_trl].set_parent_trl(
self._subarray_trl[subarray_id]
)
result_code = self._subarray_beams[beam_trl].assign_resources(
{
"subarray_id": subarray_id,
"subarray_beam_id": beam["subarray_beam_id"],
"apertures": beam["apertures"],
"first_subarray_channel": beam["first_subarray_channel"],
"number_of_channels": beam["number_of_channels"],
}
)
if result_code == ResultCode.FAILED:
self.release(subarray_id=subarray_id)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"The SubarrayBeam.assign_resources command has failed",
),
)
return
for aperture in beam["apertures"]:
if self._check_aborted(
task_abort_event, task_callback, "AssignResources"
):
self._assigning_subarray = None
return
station_beam_trl = aperture["station_beam_trl"]
self._station_beams[station_beam_trl].set_parent_trl(
self._subarray_trl[subarray_id]
)
args = {
"subarray_id": subarray_id,
"subarray_beam_id": beam["subarray_beam_id"],
"station_id": aperture["station_id"],
"aperture_id": aperture["aperture_id"],
"channel_blocks": aperture["channel_blocks"],
"hardware_beam": aperture["hardware_beam"],
"station_trl": aperture["station_trl"],
"first_subarray_channel": beam["first_subarray_channel"],
"number_of_channels": beam["number_of_channels"],
}
result_code = self._station_beams[station_beam_trl].assign_resources(
args
)
if result_code == ResultCode.FAILED:
self.release(subarray_id=subarray_id)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"StationBeam.assign_resources command has failed",
),
)
return
# Add beam TRL to configuration, so that subarray knows mapping
# between subarray beams ID and TRL
beam["subarray_beam_trl"] = beam_trl
subarray_trl = self._subarray_trl[subarray_id]
if self._check_aborted(task_abort_event, task_callback, "AssignResources"):
self._assigning_subarray = None
return
result_code = self._subarrays[subarray_trl].assign_resources(
subarray_id, new_subarray_beams
)
self._desired_obs_state[subarray_trl] = ObsState.IDLE
self._configuring_resources[subarray_trl].add(subarray_trl)
if (
self._wait_for_obs_state(
subarray_trl, self._obs_command_timeout, task_abort_event
)
== TaskStatus.FAILED
):
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
f"Subarray {subarray_id} didn't reach IDLE",
),
)
return
if self._check_aborted(task_abort_event, task_callback, "AssignResources"):
self._assigning_subarray = None
return
if task_callback:
if result_code == ResultCode.FAILED:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"The AssignResources command has failed",
),
)
else:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The AssignResources command has completed"),
)
self._assigning_subarray = None
[docs]
@check_communicating
@check_on
def trigger_adc_equalisation(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
*,
station_args: list[dict],
) -> None:
"""
Trigger adc equalisation.
:param station_args: a list of station ids
:param task_callback: callback to signal end of command
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self.logger.debug(
"Starting station equalisation for the following stations: "
f"{station_args}"
)
if not self._station_id_trl_map:
self._create_station_id_trl_map()
# Given no arguments, the command assumes users meant to equalise all stations
if len(station_args) == 0:
for i in range(len(self._station_id_trl_map)):
station_args.append({"station_id": i + 1})
apply_configuration_commands = MccsCompositeCommandProxy(self.logger)
station_args_failed: list = []
for station_arg in station_args:
station_id = station_arg.get("station_id", None)
# Input validation
if station_id is None:
self.logger.error(f"Station ID not supplied for values {station_arg}")
station_args_failed.append(station_arg)
continue
if station_id not in self._station_id_trl_map:
self.logger.error(f"Station with ID {station_id} is not available")
station_args_failed.append(station_arg)
continue
# If the target adc is not supplied, use the default calculated value
if "target_adc" not in station_arg.keys():
station_arg["target_adc"] = self.calculate_target_adc()
station_trl = self._station_id_trl_map[station_id]
del station_arg["station_id"] # not part of subcommand
apply_configuration_commands += MccsCommandProxy(
device_name=station_trl,
command_name="TriggerAdcEqualisation",
logger=self.logger,
default_args=json.dumps(station_arg),
)
if len(station_args_failed) != len(station_args):
result, message = apply_configuration_commands(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
status = TaskStatus.COMPLETED
if 0 < len(station_args_failed) < len(station_args):
# Some of the arguments where correct, so the command executed for those
# stations. Add a list of the incorrect arguments to the result of the
# command.
message = (
f"Adc Equalisation command finished with result: {message}; but had "
f"the following incorrect arguments: {station_args_failed}"
)
self.logger.error(message)
if len(station_args_failed) == len(station_args):
# Overwrite both result code and message. The command cannot be run.
message = (
"Rejected Adc Equalisation command: all given arguments for the station"
f" where incorrect: {station_args_failed}"
)
self.logger.error(message)
result = ResultCode.REJECTED
status = TaskStatus.REJECTED
if task_callback:
task_callback(
status=status,
result=(ResultCode(result), message),
)
[docs]
@check_communicating
@check_on
def release(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
*,
subarray_id: int,
**kwargs: Any,
) -> None:
"""
Release a subarray's resources.
:param subarray_id: ID of the subarray which requires release
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:param kwargs: additional arguments
"""
if self.power_state != PowerState.ON:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"Controller is not turned on",
),
)
return
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
subarray_trl = self._subarray_trl.get(subarray_id, None)
if subarray_trl is None:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, f"Subarray {subarray_id} not present"),
)
return
result_code = self._release_from_subarray(subarray_trl)
if task_callback:
task_callback(
status=(
TaskStatus.COMPLETED
if result_code == ResultCode.OK
else TaskStatus.FAILED
),
result=(result_code, f"Release command finished: {result_code.name}"),
)
[docs]
def release_all(
self: ControllerComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Release all subarrays resources.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if self.power_state != PowerState.ON:
if task_callback:
task_callback((TaskStatus.FAILED, "Controller is not nturned on."))
return
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
failures = []
for subarray_trl in self._subarrays.keys():
result_code = self._release_from_subarray(subarray_trl)
if result_code != ResultCode.OK:
failures.append((subarray_trl, result_code.name))
overall_result = ResultCode.OK if not failures else ResultCode.FAILED
if failures:
self.logger.error(
f"Release all command had failures: {failures}",
)
message = f"Failures during release of subarrays: {failures}"
else:
message = "Release all command completed successfully."
if task_callback:
task_callback(
status=(
TaskStatus.COMPLETED
if overall_result == ResultCode.OK
else TaskStatus.FAILED
),
result=(overall_result, message),
)
def _release_from_subarray(
self: ControllerComponentManager,
subarray_trl: str,
) -> ResultCode:
"""
Release resources from a subarray.
TODO: Actually forward the ReleaseAllResources() to subdevices
:param subarray_trl: subarray from which resources must be released
:return: a result code
"""
result_code = self._subarrays[subarray_trl].release_all_resources()
resources = self._resource_manager.get_allocated(subarray_trl)
self.logger.debug(
f"Releasing resources for subarray {subarray_trl}: {resources}"
)
for station_beam_trl in resources.get("station_beams", []):
self._station_beams[station_beam_trl].reset_parent_trl()
for subarray_beam_trl in resources.get("subarray_beams", []):
self._subarray_beams[subarray_beam_trl].reset_parent_trl()
for station in self._stations.values():
station.release_from_subarray(subarray_trl)
trls = resources.get("station_beams", None)
if trls is not None:
self._resource_manager.resource_pool.free_resources({"station_beams": trls})
self._resource_manager.deallocate_from(subarray_trl)
return result_code
[docs]
def get_resources(self: ControllerComponentManager, subarray_id: int) -> str:
"""
Return a dictionary of the resources assigned to a given subarray.
:param subarray_id: The subarray ID of the resources
:return: json formatted dictionary
:raises ValueError: if subarray_id is not valid
"""
try:
trl = self._subarray_trl[subarray_id]
except ValueError as e:
raise ValueError(
"Please use a valid Subarray ID. Subarrays are 1-indexed."
f"Valid Subarray IDs are 1 to {len(self._subarrays)}"
) from e
resources = json.dumps(self._resource_manager.get_allocated(trl))
self.logger.debug(f"get_resources for {trl}: {resources}")
return resources
[docs]
@check_communicating
@check_on
def restart_subarray(
self: ControllerComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Restart an MCCS subarray.
:param subarray_id: an integer subarray_id.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if self.power_state != PowerState.ON:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "Controller is not turned on."),
)
return
subarray_trl = self._subarray_trl[subarray_id]
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self._configuring_resources[subarray_trl].clear()
self._check_subdevice_obs_states() # Local cache can get out of sync.
task_status = self._abort_subdevices(subarray_trl)
if task_status == TaskStatus.COMPLETED:
task_status = self._restart_subdevices(subarray_trl)
resources = self._resource_manager.get_allocated(subarray_trl)
for station_beam_trl in resources.get("station_beams", []):
self._station_beams[station_beam_trl].reset_parent_trl()
for subarray_beam_trl in resources.get("subarray_beams", []):
self._subarray_beams[subarray_beam_trl].reset_parent_trl()
if task_status == TaskStatus.COMPLETED:
self._resource_manager.deallocate_from(subarray_trl)
if task_callback:
if task_status == TaskStatus.FAILED:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, "The restart command has failed"),
)
else:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "The restart command has completed"),
)
[docs]
@check_communicating
@check_on
def abort_subarray(
self: ControllerComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Abort an MCCS subarray.
:param subarray_id: an integer subarray_id.
:param task_callback: Update task state, defaults to None
:return: A tuple of (TaskStatus, message string).
"""
if len(self._subarrays.values()) == 0:
if task_callback:
task_callback(
status=TaskStatus.REJECTED,
result=(
ResultCode.REJECTED,
"No subservient subarray devices to abort",
),
)
return (TaskStatus.REJECTED, "No subservient subarray devices to abort")
if self.power_state != PowerState.ON:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result_code=(ResultCode.FAILED, "Controller is not turned on"),
)
return (TaskStatus.FAILED, "Controller is not turned on")
# Spawn a raw thread to bypass the TaskExecutor queue — abort must
# run immediately, not wait behind already-queued tasks.
threading.Thread(
target=self._abort_subarray,
kwargs={"subarray_id": subarray_id, "task_callback": task_callback},
).start()
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort_callback(
self: ControllerComponentManager,
*,
status: Optional[TaskStatus] = None,
exception: Optional[Exception] = None,
**kwargs: Any,
) -> None:
if exception is not None:
self.logger.error(f"abort_tasks raised exception: {repr(exception)}")
if status == TaskStatus.COMPLETED:
self.logger.debug("abort_tasks has finished, setting flag")
self._abort_complete.set()
def _abort_subarray(
self: ControllerComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
) -> None:
"""
Abort an MCCS subarray.
:param subarray_id: subarray_id to abort.
:param task_callback: Update task state, defaults to None
"""
with EnsureOmniThread():
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
task_status = TaskStatus.COMPLETED
if self._assigning_subarray == subarray_id:
self.abort_tasks(self._abort_callback) # type:ignore
# Wait until the timeout for the Event to be set by abort_tasks
timeout = 10.0
if self._abort_complete.wait(timeout=timeout):
self._abort_complete.clear()
else:
self.logger.error(f"Abort timed out in {timeout} seconds.")
task_status = TaskStatus.FAILED
# Reinitialize our event.
# The old one could get set at any point now.
self._abort_complete = threading.Event()
if task_status == TaskStatus.COMPLETED:
self.logger.debug(f"Aborting subarray {subarray_id}")
task_status = self._abort_subdevices(self._subarray_trl[subarray_id])
if task_callback:
task_callback(
status=task_status,
result=(
(
ResultCode.OK
if task_status == TaskStatus.COMPLETED
else ResultCode.FAILED
),
f"Abort command finished: {task_status.name}",
),
)
def _abort_subdevices(
self: ControllerComponentManager,
subarray_trl: str,
) -> TaskStatus:
resources_allocated = self._resource_manager.get_allocated(subarray_trl)
self._desired_obs_state[subarray_trl] = ObsState.ABORTED
if "station_beams" in resources_allocated:
for station_beam_trl in resources_allocated["station_beams"]:
station_beam_proxy = self._station_beams[station_beam_trl]
# If a subdevice is already RESTARTING then it's about to be EMPTY.
if self._device_obs_states[station_beam_trl] not in [
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
ObsState.EMPTY,
ObsState.RESTARTING,
]:
self.logger.info(f"Aborting {station_beam_trl}")
# getattr(self._subarrays[subarray_trl]._proxy, "obsState", None)
try:
station_beam_proxy.abort()
self._configuring_resources[subarray_trl].add(station_beam_trl)
except DevFailed as e:
# If we're in the wrong ObsState we can get here.
# This happens if the device has changed ObsState but
# self._device_obs_states has not been updated yet.
self.logger.warning("Could not execute command: %s", repr(e))
if "subarray_beams" in resources_allocated:
for subarray_beam_trl in resources_allocated["subarray_beams"]:
subarray_beam_proxy = self._subarray_beams[subarray_beam_trl]
# If a subdevice is already RESTARTING then it's about to be EMPTY.
if self._device_obs_states[subarray_beam_trl] not in [
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
ObsState.EMPTY,
ObsState.RESTARTING,
]:
self.logger.info(f"Aborting {subarray_beam_trl}")
try:
subarray_beam_proxy.abort_device()
self._configuring_resources[subarray_trl].add(subarray_beam_trl)
except DevFailed as e:
self.logger.warning("Could not execute command: %s", repr(e))
if self._device_obs_states[subarray_trl] not in [
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
ObsState.EMPTY,
ObsState.RESTARTING,
]:
self.logger.info(f"Aborting {subarray_trl}")
try:
self._subarrays[subarray_trl].abort_device()
self._configuring_resources[subarray_trl].add(subarray_trl)
except DevFailed as e:
self.logger.warning("Could not execute command: %s", repr(e))
return self._wait_for_obs_state(subarray_trl, self._obs_command_timeout)
def _restart_subdevices(
self: ControllerComponentManager,
subarray_trl: str,
) -> TaskStatus:
resources_allocated = self._resource_manager.get_allocated(subarray_trl)
self._desired_obs_state[subarray_trl] = ObsState.EMPTY
if "station_beams" in resources_allocated:
for station_beam_trl in resources_allocated["station_beams"]:
station_beam_proxy = self._station_beams[station_beam_trl]
if self._device_obs_states[station_beam_trl] in [
ObsState.FAULT,
ObsState.ABORTED,
]:
self.logger.info(f"Restarting {station_beam_trl}")
self._configuring_resources[subarray_trl].add(station_beam_trl)
station_beam_proxy.restart()
if "subarray_beams" in resources_allocated:
for subarray_beam_trl in resources_allocated["subarray_beams"]:
subarray_beam_proxy = self._subarray_beams[subarray_beam_trl]
if self._device_obs_states[subarray_beam_trl] in [
ObsState.FAULT,
ObsState.ABORTED,
]:
self.logger.info(f"Restarting {subarray_beam_trl}")
self._configuring_resources[subarray_trl].add(subarray_beam_trl)
subarray_beam_proxy.restart()
if self._device_obs_states[subarray_trl] in [
ObsState.FAULT,
ObsState.ABORTED,
]:
self.logger.info(f"Restarting {subarray_trl}")
self._configuring_resources[subarray_trl].add(subarray_trl)
self._subarrays[subarray_trl].restart()
return self._wait_for_obs_state(subarray_trl, self._obs_command_timeout)
def _check_subdevice_obs_states(self: ControllerComponentManager) -> None:
"""Poll subdevices for their ObsState and update local cache."""
def _check_and_update(devices: dict[str, DeviceProxy]) -> None:
for trl, device in devices.items():
try:
device_obs_state = ObsState(device._proxy.obsstate)
# pylint: disable=broad-exception-caught
except Exception as e:
self.logger.warning(
f"Failed to read ObsState for {trl=}, {device=}: {e}"
)
continue
if self._device_obs_states[trl] != device_obs_state:
self.logger.debug(
f"Updating {trl} ObsState: "
f"{self._device_obs_states[trl]} -> {device_obs_state}"
)
self._device_obs_states[trl] = device_obs_state
_check_and_update(self._subarrays)
_check_and_update(self._subarray_beams)
_check_and_update(self._station_beams)
def _device_obs_state_changed(
self: ControllerComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
old_state = self._device_obs_states.get(trl, None)
if old_state:
old_name = old_state.name
else:
old_name = "NONE"
new_state = ObsState(obs_state)
self._device_obs_states[trl] = new_state
for subarray_trl, desired_obs_state in self._desired_obs_state.items():
if (
obs_state == desired_obs_state
and trl in self._configuring_resources[subarray_trl]
):
self._configuring_resources[subarray_trl].remove(trl)
self.logger.debug(
f"ObsState for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._configuring_resources[subarray_trl])} "
"more devices"
)
def _station_power_state_changed(
self: ControllerComponentManager,
trl: str,
power_state: PowerState,
) -> None:
old_state = self._device_power_states.get(trl, None)
if old_state is not None:
old_name = old_state.name
else:
old_name = "NONE"
new_state = PowerState(power_state)
self._device_power_states[trl] = new_state
if power_state == self._desired_power_state and trl in self._powering_resources:
self._powering_resources.remove(trl)
self.logger.debug(
f"State for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._powering_resources)} more devices"
)
def _wait_for_obs_state(
self: ControllerComponentManager,
subarray_trl: str,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-device ObsState to reach desired state.
:param subarray_trl: TRL of subarray whose sub-devices to monitor
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_obs_state[subarray_trl] is not None
resolution = 0.01 # seconds
ticks = int(timeout / resolution) # 10 ms resolution
while self._configuring_resources[subarray_trl]:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
"Timed out waiting for ObsState "
f"{self._desired_obs_state[subarray_trl].name}"
f" in {timeout} seconds. Attempting final poll."
)
return self._final_poll(subarray_trl)
self.logger.debug(f"Waited ObsState for {timeout-ticks*resolution} seconds")
return TaskStatus.COMPLETED
def _final_poll(self: ControllerComponentManager, subarray_trl: str) -> TaskStatus:
"""
Direct check of device obsstates as a last attempt before declaring failure.
Sometimes it appears we miss change events in production, if for some reason
we haven't received all the change events we expected, double check what we
think the obsstate is, vs what it actually is on all subdevices that we need to.
:param subarray_trl: TRL of subarray whose sub-devices to monitor
:returns: a taskstatus dependent on whether or not the device was actually in
the correct state.
"""
for trl in list(self._configuring_resources[subarray_trl]):
device = (
self._subarrays.get(trl)
or self._subarray_beams.get(trl)
or self._station_beams.get(trl)
)
if (
device
and device._proxy
and device._proxy.obsstate == self._desired_obs_state[subarray_trl]
):
self.logger.info(
f"Change event for {trl} was missed, device was in correct state."
)
if self._component_state_callback is not None:
self._component_state_callback(missed_event=True)
self._configuring_resources[subarray_trl].remove(trl)
return (
TaskStatus.COMPLETED
if not self._configuring_resources[subarray_trl]
else TaskStatus.FAILED
)
def _wait_for_power_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for station PowerState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_power_state is not None
resolution = 1 # seconds
ticks = int(timeout / resolution)
while self._powering_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for PowerState in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {self._powering_resources} "
f"PowerState {self._desired_power_state.name},"
f" waiting for {ticks*resolution} more seconds"
)
self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds")
return TaskStatus.COMPLETED
def _wait_for_station_synchronisation_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for station Synchronisation to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_synchronisation_state is not None
resolution = 0.5 # seconds
ticks = int(timeout / resolution)
synchronised = False
synchronising_stations = set(
station_trl for station_trl in self._stations.keys()
)
synchronised_stations = set()
self.logger.debug(
f"Waiting for stations to synchronise: {synchronising_stations}"
)
while not synchronised:
# Wait for ALL stations to sync.
for station_trl in synchronising_stations:
station_proxy = self._stations[station_trl]
if not station_proxy.issynchronised:
synchronised = False
break
synchronised_stations.add(station_trl)
synchronised = True
self.logger.debug(f"{station_trl} achieved synchronisation!")
self.logger.debug(f"{ticks*resolution}s remaining on wait time.")
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting {synchronising_stations} "
f"for Synchronisation in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {synchronising_stations} Synchronisation"
f" {self._desired_synchronisation_state},"
f" waiting for {ticks*resolution} more seconds"
)
synchronising_stations -= synchronised_stations
self.logger.debug(
f"Waited Synchronisation for {timeout-ticks*resolution} seconds"
)
return TaskStatus.COMPLETED
def _wait_for_initialisation_state(
self: ControllerComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for station Initialisation to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_initialisation_state is not None
resolution = 0.5 # seconds
ticks = int(timeout / resolution)
initialised = False
initialised_stations = set()
while not initialised:
for station_trl in self._initialising_resources:
station_proxy = self._stations[station_trl]
if not station_proxy.isinitialised:
initialised = False
break
initialised_stations.add(station_trl)
initialised = True
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting {self._initialising_resources} "
f"for Initialisation in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {self._initialising_resources} Initialisation"
f" {self._desired_initialisation_state},"
f" waiting for {ticks*resolution} more seconds"
)
self._initialising_resources -= initialised_stations
self.logger.debug(
f"Waited Initialisation for {timeout-ticks*resolution} seconds"
)
return TaskStatus.COMPLETED
[docs]
@staticmethod
def calculate_target_adc(epoch_time: float | None = None) -> float:
"""
Calculate the targert RMS for ADC Equalisation.
In order to maintain the optimal range for ADC RMS value the telescope requires
re-equalisation as the average power received by the antennas changes during
sidereal time. This function calculates a target value for ADC re-equalisation,
based on the sidereal time.
More details can be found in Memo 4:
https://confluence.skatelescope.org/pages/viewpage.action?spaceKey=LC&title=Memo+004+-+Antenna+ADC+RMS+target+vs+LST
:param epoch_time: Time in seconds since the epcoh. Exists for testing.
:return: target ADC RMS value in ADUs
"""
# Greewitch meridian time
gm_time = time.gmtime(epoch_time)
# returns a Julian Date in two parts:
# ejd1 = day number
# ejd2 = time of day
# https://pyerfa.readthedocs.io/en/latest/api/erfa.dtf2d.html#erfa.dtf2d
ejd1, ejd2 = erfa.dtf2d(
"UTC",
gm_time.tm_year,
gm_time.tm_mon,
gm_time.tm_mday,
gm_time.tm_hour,
gm_time.tm_min,
gm_time.tm_sec,
)
# https://pyerfa.readthedocs.io/en/latest/api/erfa.gst00b.html#erfa.gst00b
# This value is in radians
greenwitch_sidereal_time = erfa.gst00b(ejd1, ejd2)
# Shift the greewtich time to the correct longitude angle, normalize, and
# convert to hours.
lst_hr = erfa.anp(greenwitch_sidereal_time + LON_SKA_LOW * np.pi / 180.0) * (
12.0 / np.pi
)
# MEMO 4 equation
target_rms = (
20
+ 4.76497 * np.cos(lst_hr / 24 * 2 * np.pi + 1.8347)
+ 1.17475 * np.cos(2 * lst_hr / 24 * 2 * np.pi - 3.1031)
)
return target_rms