# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
# pylint: disable = too-many-lines
"""This module implements component management for subarrays."""
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from typing import Any, Callable, Optional, Sequence
import katpoint
import ska_tango_base.subarray
from ska_control_model import (
CommunicationStatus,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_low_mccs_common import EventSerialiser
from ska_low_mccs_common.communication_manager import CommunicationManager
from ska_low_mccs_common.component import ObsDeviceComponentManager
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
CompositeCommandResultEvaluator,
MccsCompositeCommandProxy,
)
from ska_ser_skuid.client import SkuidClient # type: ignore
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from tango import EnsureOmniThread
__all__ = ["SubarrayComponentManager"]
class _StationProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its stations."""
# TODO: remove comments when commands implemented in station
@check_communicating
def deallocate_subarray(
self: _StationProxy, subarray_id: int
) -> tuple[ResultCode, str]:
"""
Deconfigure the station.
:param subarray_id: the subarray which configuration must be removed
:return: A task status and response message.
"""
assert self._proxy is not None
([result_code], [message]) = self._proxy.DeallocateSubarray(subarray_id)
return (ResultCode(result_code), message)
@check_communicating
def end_scan(self: _StationProxy, subarray_id: int) -> tuple[TaskStatus, str]:
"""
Stop the subarray beam scanning.
:param subarray_id: the subarray ID which scan is stopped
:return: A task status and response message.
"""
assert self._proxy is not None
([result_code], unique_id) = self._proxy.EndScan(subarray_id)
return (result_code, unique_id)
@check_communicating
def beamformer_running(self: _StationProxy, subarray_id: int) -> bool:
"""
Check that the beamformer is running for given subarray.
:param subarray_id: The ID of the subarray to be checked
:return: beamformer running status
"""
assert self._proxy is not None
return self._proxy.BeamformerRunningForSubarray(subarray_id)
@check_communicating
def stop_tracking_all(self: _StationProxy) -> ResultCode:
"""
Stop tracking.
:return: a result code.
"""
assert self._proxy is not None
return self._proxy.StopTrackingAll()
@check_communicating
def set_csp_ingest(self: _StationProxy, argin: str) -> ResultCode:
"""
Configure link for beam data packets to CSP.
:param argin: json dictionary with optional keywords:
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetCspIngest(argin)
return result_code
@check_communicating
def reset_csp_ingest(self: _StationProxy) -> tuple[list[ResultCode], list[str]]:
"""
Reset link for beam data packets to CSP to defaults.
:return: result code of ResetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.ResetCspIngest()
class _SubarrayBeamProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its subarray beams."""
@check_communicating
@check_on
def configure(
self: _SubarrayBeamProxy, configuration: dict
) -> tuple[TaskStatus, str]:
"""
Configure the subarray beam.
:param configuration: the configuration to be applied to this
subarray beam
:return: A task status and response message.
"""
assert self._proxy is not None
configuration_str = json.dumps(configuration)
(result_code, unique_id) = self._proxy.Configure(configuration_str)
return (result_code, unique_id)
@check_communicating
@check_on
def scan(
self: _SubarrayBeamProxy,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param scan_id: the id of the scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def release_all(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Release subarray beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ReleaseAllResources()
return (result_code, unique_id)
@check_communicating
@check_on
def deconfigure(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
De-configure the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.End()
return (result_code, unique_id)
@check_communicating
def end_scan(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
End the current scan.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.EndScan()
return (result_code, unique_id)
@check_communicating
def abort(
self: _SubarrayBeamProxy, task_callback: TaskCallbackType | None = None
) -> tuple[TaskStatus, str]:
"""
Abort the current scan associated with the subarray beam.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.Abort()
return (result_code, unique_id)
@check_communicating
def obsreset(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Reset to IDLE the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ObsReset()
return (result_code, unique_id)
@check_communicating
def restart(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Restart to EMPTY the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.Restart()
return (result_code, unique_id)
class _StationBeamProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its station beams."""
@check_communicating
@check_on
def configure(
self: _StationBeamProxy, configuration: dict
) -> tuple[TaskStatus, str]:
"""
Configure the station beam.
:param configuration: the configuration to be applied to this
station beam
:return: A task status and response message.
"""
assert self._proxy is not None
configuration_str = json.dumps(configuration)
(result_code, unique_id) = self._proxy.Configure(configuration_str)
return (result_code, unique_id)
@check_communicating
@check_on
def scan(
self: _StationBeamProxy,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param scan_id: the id of the scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def deconfigure(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Configure the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.End()
return (result_code, unique_id)
@check_communicating
@check_on
def release_all(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Release station beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ReleaseAllResources()
return (result_code, unique_id)
@check_communicating
def end_scan(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Release station beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.EndScan()
return (result_code, unique_id)
# pylint: disable=too-many-instance-attributes, too-many-public-methods
[docs]
class SubarrayComponentManager(
TaskExecutorComponentManager,
ska_tango_base.subarray.SubarrayComponentManager,
):
"""A component manager for a subarray."""
# messages for long running command task callback
_task_message = {
TaskStatus.COMPLETED: "completed OK.",
TaskStatus.ABORTED: "been aborted.",
TaskStatus.FAILED: "timed out.",
}
_task_to_result = {
TaskStatus.COMPLETED: ResultCode.OK,
TaskStatus.ABORTED: ResultCode.ABORTED,
TaskStatus.FAILED: ResultCode.FAILED,
}
# pylint: disable=too-many-arguments
[docs]
def __init__(
self: SubarrayComponentManager,
subarray_id: int,
skuid_url: str,
logger: logging.Logger,
obs_command_timeout: int,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
"""
Initialise a new instance.
:param subarray_id: the subarray ID for this device
:param skuid_url: The address at which a SKUID service is running.
:param obs_command_timeout: the default timeout for obs
commands in seconds.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be called when
the status of the communications channel between the
component manager and its component changes
:param component_state_callback: callback to be called when the
component state changes.
:param event_serialiser: the event serialiser to be used by this object.
"""
self._event_serialiser = event_serialiser
self._power_state_lock = threading.RLock()
self._device_communication_state_lock = threading.Lock()
self._component_state_callback: Callable[..., None]
self._component_state_callback = component_state_callback
self._abort_complete = threading.Event()
self._obs_command_timeout = obs_command_timeout
self._device_communication_states: dict[str, CommunicationStatus] = {}
self._station_power_modes: dict[str, Optional[PowerState]] = {}
self._device_obs_states: dict[str, Optional[ObsState]] = {}
self._is_assigning = False
self._is_configured = False
self._configuring_resources: set[str] = set() # these resources should go
self._desired_state = ObsState.READY # to this end state
self._stations: dict[str, _StationProxy] = {}
self._subarray_beams: dict[str, _SubarrayBeamProxy] = {}
self._station_beams: dict[str, _StationBeamProxy] = {}
self._apertures: dict[str, str] = {}
self._subarray_beam_trl: dict[int, str] = {}
self._beam_to_station: dict[str, str] = {}
self._calibration_ids: dict[str, str] = {}
self._number_of_channels = 0
self._subarray_id = subarray_id
self._skuid_url = skuid_url
self._scan_id: Optional[int] = None
self._scan_start_time: str | None = None
self._scan_duration = 0.0
self.reset_csp_ingest_on_scan: bool = True
super().__init__(
logger,
communication_state_callback,
component_state_callback,
resources_changed=None,
configured_changed=None,
scanning_changed=None,
task_completed=None,
obsfault=None,
obsstate_changed=None,
station_power=None,
power=None,
fault=None,
)
self._communication_manager = CommunicationManager(
self._update_communication_state, self._update_component_state, self.logger
)
[docs]
def start_communicating(self: SubarrayComponentManager) -> None:
"""Establish communication with the station components."""
self._communication_manager.start_communicating()
[docs]
def stop_communicating(self: SubarrayComponentManager) -> None:
"""Break off communication with the station components."""
self._communication_manager.stop_communicating()
@property
def max_executing_tasks(self) -> int:
"""
Get the max number of tasks that can be executing at once.
:return: max number of simultaneously executing tasks.
"""
return 2
@property
def scan_id(self: SubarrayComponentManager) -> Optional[int]:
"""
Return the scan id, or None if a scan is not current.
:return: the scan id, or None if a scan is not current.
"""
return self._scan_id
@property
def subarray_beam_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of subarray beams assigned to this subarray.
:return: the set of TRLs of subarray beams assigned to this subarray.
"""
return set(self._subarray_beams.keys())
@property
def station_beam_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of station beams assigned to this subarray.
:return: the set of TRLs of station beams assigned to this subarray.
"""
return set(self._station_beams.keys())
@property
def station_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of stations assigned to this subarray.
:return: the set of TRLs of stations assigned to this subarray.
"""
return set(self._stations.keys())
@property
def power_state(self: SubarrayComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
def _check_aborted(
self: SubarrayComponentManager,
task_abort_event: Optional[threading.Event],
task_callback: Optional[Callable],
method_name: str,
) -> bool:
if task_abort_event is None or task_callback is None:
self.logger.warning(f"Cannot check {method_name} for abort status")
return False
if task_abort_event.is_set():
self.logger.info(f"{method_name} has been aborted")
task_callback(
status=TaskStatus.ABORTED,
result=(ResultCode.ABORTED, f"{method_name} aborted"),
)
return True
return False
[docs]
@check_communicating
def assign( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
**kwargs: Any,
) -> tuple[TaskStatus, str]:
"""
Submit the `AssignResources` slow command.
This method returns immediately after it is submitted for execution.
:param task_callback: Update task state, defaults to None
:param kwargs: keyword arguments to the command.
Required keys are "subarray_id" (the ID of the subarray),
and subarray_beams (resource specification for each beam).
:return: a result code and response message.
"""
self._is_assigning = True
return self.submit_task(
self._assign,
args=[kwargs["subarray_id"], kwargs["subarray_beams"]],
task_callback=task_callback,
)
# pylint: disable=too-many-branches, too-many-locals, too-many-statements
@check_communicating
def _assign(
self: SubarrayComponentManager,
# resources: dict[str, Any],
subarray_id: int,
subarray_beams: dict,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Assign resources to this subarray.
This is just for communication and health roll-up, resource management
is done by controller.
:param subarray_id: ID of the subarray
:param subarray_beams: resource specification for each beam
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
station_trls: list[str] = []
subarray_beam_trls: list[str] = []
station_beam_trls: list[str] = []
number_of_channels = 0
self._apertures.clear()
self._subarray_beam_trl.clear()
self._number_of_channels = 0
for beam in subarray_beams:
subarray_beam_id = beam.get("subarray_beam_id", 1)
subarray_beam_trl = beam.get(
"subarray_beam_trl",
f"low-mccs/subarraybeam/{subarray_id:02d}-{subarray_beam_id:02d}",
)
self._subarray_beam_trl[subarray_beam_id] = subarray_beam_trl
subarray_beam_trls.append(subarray_beam_trl)
number_of_channels += beam.get("number_of_channels")
for aperture in beam.get("apertures", []):
aperture_id = aperture.get("aperture_id", None)
station_id = aperture.get("station_id")
if aperture_id is None:
aperture_id = f"AP{station_id}.1"
station_trl = aperture.get("station_trl", None)
if station_trl is not None and station_trl not in station_trls:
station_trls.append(station_trl)
station_beam_trl = aperture.get("station_beam_trl", None)
station_beam_trls.append(station_beam_trl)
self._beam_to_station[station_beam_trl] = station_trl
self._apertures[aperture_id] = station_beam_trl
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
self._number_of_channels = number_of_channels
station_trls_to_add = sorted(station_trls) - self._stations.keys()
subarray_beam_trls_to_add = subarray_beam_trls - self._subarray_beams.keys()
station_beam_trls_to_add = station_beam_trls - self._station_beams.keys()
trls_to_add = station_trls_to_add.union(
subarray_beam_trls_to_add, station_beam_trls_to_add
)
self._desired_state = ObsState.IDLE
self._configuring_resources.clear()
if trls_to_add:
for trl in trls_to_add:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
self._device_obs_states[trl] = ObsState.EMPTY
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
for trl in station_trls_to_add:
self._stations[trl] = _StationProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
event_serialiser=self._event_serialiser,
)
for trl in subarray_beam_trls_to_add:
self._subarray_beams[trl] = _SubarrayBeamProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(
self._component_state_callback,
trl=trl,
),
event_serialiser=self._event_serialiser,
)
self._configuring_resources.add(trl)
for trl in station_beam_trls_to_add:
self._station_beams[trl] = _StationBeamProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(
self._component_state_callback,
trl=trl,
),
event_serialiser=self._event_serialiser,
)
self._configuring_resources.add(trl)
self._communication_manager.replace_device_pool(
self._stations, self._station_beams, self._subarray_beams
)
self._is_assigning = True
self._component_state_callback(
resources_changed=[
set(self._stations.keys()),
set(self._subarray_beams.keys()),
set(self._station_beams.keys()),
]
)
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
# wait for all subdevices to reach ObsState.IDLE
task_result = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
self._is_assigning = False
self._is_configured = False
if task_callback is not None:
task_callback(
status=task_result,
result=(
self._task_to_result[task_result],
"Assign resources " + self._task_message[task_result],
),
)
@property # type: ignore[misc]
@check_communicating
def assigned_resources(
self: SubarrayComponentManager,
) -> list[str]:
"""
Return this subarray's resources.
:return: this subarray's resources.
"""
return list(
set(self._stations) | set(self._subarray_beams) | set(self._station_beams),
)
@property # type: ignore[misc]
@check_communicating
def assigned_resources_dict(
self: SubarrayComponentManager,
) -> dict[str, Sequence[Any]]:
"""
Return a dictionary of resource types and TRLs.
:return: this subarray's resources.
"""
return {
"stations": sorted(self._stations.keys()),
"subarray_beams": sorted(self._subarray_beams.keys()),
"station_beams": sorted(self._station_beams.keys()),
"apertures": sorted(self._apertures.keys()),
"channels": [self._number_of_channels],
}
[docs]
@check_communicating
def release( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
# *,
# resources: str,
**kwargs: Any,
) -> tuple[TaskStatus, str]:
"""
Submit the `release` slow command.
:param task_callback: Update task state, defaults to None
:param kwargs: keyword arguments to the command. The
only required key is "resources", which
contains a list of resource names to release
:return: A task status and response message.
"""
return self.submit_task(
self._release,
args=[kwargs["resources"]],
task_callback=task_callback,
)
@check_communicating
def _release(
self: SubarrayComponentManager,
resources: dict[str, list[str]],
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Release resources from this subarray.
:param resources: list of resource TRLs to release.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:raises NotImplementedError: because MCCS Subarray cannot
perform a partial release of resources.
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_release"):
return
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.OK,
(
"Not Implemented: MCCS Subarray cannot"
"partially release resources."
),
),
)
raise NotImplementedError("MCCS Subarray cannot partially release resources.")
[docs]
@check_communicating
def release_all( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `ReleaseAllResources` slow command.
Release all resources from this subarray.
:param task_callback: Update task state, defaults to None
:return: a task status and response message
"""
return self.submit_task(
self._release_all,
args=[],
task_callback=task_callback,
)
@check_communicating
def _release_all(
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Release all resources from this subarray.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_release_all"):
return
self._desired_state = ObsState.EMPTY
self._configuring_resources.clear()
# Release resources in subelements, and check return codes
self.logger.debug("Deallocating resources in stations")
for station_proxy in self._stations.values():
station_proxy.deallocate_subarray(self._subarray_id)
self.logger.debug("Deallocating resources in subarray beams")
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
subarray_beam_proxy.release_all()
if self._check_aborted(task_abort_event, task_callback, "_release_all"):
return
# Wait for beams to go to EMPTY
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_release_all"):
return
self._release_internal_resources()
if task_status == TaskStatus.FAILED:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"ReleaseAllResources has " + self._task_message[task_status],
),
)
[docs]
def cleanup(self: SubarrayComponentManager) -> None:
"""
Cleanup resources held by the component manager.
This includes cleaning up resources held by all sub-component managers.
"""
self._delete_proxies()
self._communication_manager.shutdown()
super().cleanup()
def _delete_proxies(self) -> None:
self.logger.debug("Deleting proxies")
self._device_communication_states.clear()
self._communication_manager.replace_device_pool()
for station_proxy in self._stations.values():
station_proxy.cleanup()
self._stations.clear()
for subarray_beam_proxy in self._subarray_beams.values():
subarray_beam_proxy.cleanup()
self._subarray_beams.clear()
for station_beam_proxy in self._station_beams.values():
station_beam_proxy.cleanup()
self._station_beams.clear()
def _release_internal_resources(self) -> None:
self.logger.debug("Deallocating resources")
if self._stations or self._subarray_beams or self._station_beams:
self._delete_proxies()
self._device_obs_states.clear()
self._apertures.clear()
self._subarray_beam_trl.clear()
self._beam_to_station.clear()
self._calibration_ids.clear()
self._number_of_channels = 0
self._configuring_resources.clear()
self._component_state_callback(
resources_changed=[
set(self._stations.keys()),
set(self._subarray_beams.keys()),
set(self._station_beams.keys()),
]
)
# pylint: disable=too-many-return-statements
@check_communicating
def _configure(
self: SubarrayComponentManager,
subarray_beams: list[dict[str, Any]],
task_callback: Optional[Callable],
task_abort_event: threading.Event,
transaction_id: str = "",
) -> None:
"""
Configure the resources for a scan.
:param subarray_beams: the beam configurations to be applied
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:param transaction_id: the transaction id for the configuration
Will be generated if not provided.
"""
def _report_status(status: TaskStatus, **kwargs: Any) -> None:
if task_callback is not None:
task_callback(status=status, **kwargs)
def __fail(result_code: ResultCode, message: str) -> None:
"""
Handle failure reporting and state update.
:param result_code: the ResultCode to report.
:param message: For information.
"""
self._component_state_callback(obsfault=True)
_report_status(
status=TaskStatus.FAILED,
result=(result_code, message),
)
_report_status(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
if transaction_id == "":
transaction_id = self._get_unique_id()
was_empty = not self._is_configured
for station_trl, station_proxy in self._stations.items():
station_proxy.deallocate_subarray(self._subarray_id)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
configuration: dict[str, dict] = {}
for beam in subarray_beams:
beam_id = beam.get("subarray_beam_id")
assert isinstance(beam_id, int)
beam_trl = self._subarray_beam_trl.get(beam_id, None)
if beam_trl is None:
self.logger.error(f"undefined beam {beam_id}")
continue
self.logger.debug(f"Processing beam {beam_id} ({beam_trl})")
beam["subarray_id"] = self._subarray_id
for aperture in beam["apertures"]:
aperture_id = aperture["aperture_id"]
station_beam_trl = self._apertures.get(aperture_id, None)
calibration_id = aperture.get("calibration_id", "")
if station_beam_trl is None:
self.logger.error(f"undefined aperture {aperture_id}")
continue
aperture["station_beam_trl"] = station_beam_trl
station_trl = self._beam_to_station[station_beam_trl]
if station_trl in self._calibration_ids:
self.logger.warning(
f"Aperture {aperture_id} for station {station_trl} "
f"given calibration ID {calibration_id}. "
f"MCCS currently only supports one calibration ID per "
f"station, using {self._calibration_ids[station_trl]}"
)
else:
self._calibration_ids[station_trl] = calibration_id
configuration[beam_trl] = beam
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
# Configure subarray beams.
if configuration:
result_code, message = self._configure_subarray_beams(
configuration, task_abort_event=task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
if result_code != ResultCode.OK:
__fail(result_code, message)
return
self._wait_for_obs_state(0.0, task_abort_event)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
result_code, message = self._configure_stations(
transaction_id=transaction_id, task_abort_event=task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
if result_code != ResultCode.OK:
__fail(result_code, message)
return
# Obsstate update
self._is_configured = True
if was_empty:
self._component_state_callback(configured_changed=True)
_report_status(
status=TaskStatus.COMPLETED,
result=(
self._task_to_result[TaskStatus.COMPLETED],
"Configure has " + self._task_message[TaskStatus.COMPLETED],
),
)
def _configure_stations(
self: SubarrayComponentManager,
transaction_id: str,
task_abort_event: threading.Event,
) -> tuple[ResultCode, str]:
"""
Configure the station resources for a scan.
:param transaction_id: the transaction id for the configuration
:param task_abort_event: abort event to cancel waiting for result.
:return: a tuple containing a result code and a json serialised string
"""
apply_configuration_commands = MccsCompositeCommandProxy(
self.logger, task_abort_event=task_abort_event
)
for station_trl in self._stations:
argin = {"transaction_id": transaction_id, "subarray_id": self._subarray_id}
if self._calibration_ids[station_trl]:
argin["calibration_id"] = self._calibration_ids[station_trl]
apply_configuration_commands += MccsCommandProxy(
device_name=station_trl,
command_name="ApplyConfiguration",
logger=self.logger,
default_args=json.dumps(argin),
)
result, message = apply_configuration_commands(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result), message
def _configure_subarray_beams(
self: SubarrayComponentManager,
subarray_beam_configuration: dict[str, Any],
task_abort_event: threading.Event,
) -> tuple[ResultCode, str]:
"""
Configure the station beam resources for a scan.
:param subarray_beam_configuration: the subarray beam configuration
to be applied
:param task_abort_event: abort event to cancel waiting for result.
:return: a tuple containing a result code and json serialised string.
"""
self._configuring_resources.clear()
self._desired_state = ObsState.READY
configure_subarraybeams_command = MccsCompositeCommandProxy(
self.logger, task_abort_event=task_abort_event
)
for (
subarray_beam_trl,
configuration,
) in subarray_beam_configuration.items():
configure_subarraybeams_command += MccsCommandProxy(
device_name=subarray_beam_trl,
command_name="Configure",
logger=self.logger,
default_args=json.dumps(configuration),
)
self._configuring_resources.add(subarray_beam_trl)
result_code, message = configure_subarraybeams_command(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result_code), message
# pylint: disable=arguments-differ
[docs]
@check_communicating
def scan( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
scan_id: int,
start_time: Optional[str] = None,
duration: Optional[float] = 0.0,
) -> tuple[TaskStatus, str]:
"""
Submit the `Scan` command.
:param task_callback: Update task state, defaults to None
:param interface: the schema version this is running against.
:param scan_id: The ID for this scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means foreve
:return: A task status and response message.
"""
return self.submit_task(
self._scan,
args=[scan_id, start_time, duration],
task_callback=task_callback,
)
@check_communicating
def _scan(
self: SubarrayComponentManager,
scan_id: int,
start_time: Optional[str],
duration: float,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Start scanning.
:param scan_id: the id of the scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means foreve
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
self._scan_id = scan_id
self._scan_start_time = start_time
self._scan_duration = duration
if self._check_aborted(task_abort_event, task_callback, "_scan"):
return
if self.reset_csp_ingest_on_scan:
# Reset CSPIngest to default.
self.logger.debug(
"Resetting CSPIngest for all stations allocated to this Subarray."
)
csp_reset_result = []
msg = ""
for trl, station in self._stations.items():
[rc], [_] = station.reset_csp_ingest()
if rc != ResultCode.OK:
csp_reset_result.append(trl)
if csp_reset_result:
msg = (
"Scan has failed to reset CSPIngest on "
f"stations: {csp_reset_result}"
)
self.logger.error(msg)
self._desired_state = ObsState.SCANNING
self._configuring_resources.clear()
task_status = TaskStatus.COMPLETED # if everything below does not fail
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code = subarray_beam_proxy.scan(scan_id, start_time, duration)
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# wait for beams to start
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_scan"):
return
# Start scan on stations
if task_status == TaskStatus.COMPLETED:
result_code, message = self.__start_station_scan(
subarray_id=self._subarray_id,
scan_id=scan_id,
start_time=start_time,
duration=duration,
)
self.logger.warning(message)
if result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
if self._check_aborted(task_abort_event, task_callback, "_scan"):
return
if task_status == TaskStatus.COMPLETED:
self._component_state_callback(scanning_changed=True)
else:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
(
"Scan has "
+ self._task_message[task_status]
+ (f" {msg}" if msg else "")
),
),
)
def __start_station_scan(
self: SubarrayComponentManager,
subarray_id: int,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[ResultCode, str]:
scan_arg: dict[str, int | float | str] = {
"subarray_id": subarray_id,
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
start_scan_command = MccsCompositeCommandProxy(self.logger)
for station_trl in self._stations:
start_scan_command += MccsCommandProxy(
station_trl,
"Scan",
self.logger,
default_args=json.dumps(scan_arg),
)
result, message = start_scan_command(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result), message
def __end_station_scan(
self: SubarrayComponentManager,
subarray_id: int,
) -> tuple[ResultCode, str]:
end_scan_command = MccsCompositeCommandProxy(self.logger)
for station_trl in self._stations:
end_scan_command += MccsCommandProxy(
station_trl,
"EndScan",
self.logger,
default_args=subarray_id,
)
result, message = end_scan_command(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result), message
[docs]
@check_communicating
def end_scan( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `end_scan` slow command.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
return self.submit_task(
self._end_scan,
args=[],
task_callback=task_callback,
)
def _end_scan(
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Submit the `end_scan` slow command.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_end_scan"):
return
self._scan_id = None
self._scan_start_time = None
self._scan_duration = 0.0
# Stuff goes here. This should tell the subarray beam device to
# stop scanning.
task_status = TaskStatus.COMPLETED
for subarray_beam_proxy in self._subarray_beams.values():
proxy_result_code = subarray_beam_proxy.end_scan()
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# Start scan on stations
if task_status == TaskStatus.COMPLETED:
result_code, message = self.__end_station_scan(
subarray_id=self._subarray_id,
)
self.logger.warning(message)
if result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
# check that beamformer has stopped in all stations
if self._wait_for_end_scan():
self.logger.info("Beamformer stopped")
else:
self.logger.error("Beamformer still running after 3 seconds")
task_status = TaskStatus.FAILED
if self._check_aborted(task_abort_event, task_callback, "_end_scan"):
return
if task_status == TaskStatus.COMPLETED:
self._component_state_callback(scanning_changed=False)
else:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"EndScan has " + self._task_message[task_status],
),
)
@check_communicating
def _deconfigure( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Deconfigure resources.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
self.logger.debug("deconfigure task invoked")
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_deconfigure"):
return
self._desired_state = ObsState.IDLE
task_status = self._deconfigure_subelements()
if task_status == TaskStatus.COMPLETED:
self._wait_for_obs_state(self._obs_command_timeout, task_abort_event)
if self._check_aborted(task_abort_event, task_callback, "_deconfigure"):
return
self._component_state_callback(configured_changed=False)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Subarray Deconfigure has " + self._task_message[task_status],
),
)
@check_communicating
def _deconfigure_subelements(
self: SubarrayComponentManager,
) -> TaskStatus:
"""
Deconfigure resources.
:return: result code
"""
self.logger.debug("deconfigure subelements invoked")
task_status = TaskStatus.COMPLETED
for trl, station_proxy in self._stations.items():
self.logger.debug(f"deconfigure station {trl}")
proxy_result_code, _ = station_proxy.deallocate_subarray(self._subarray_id)
if proxy_result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
#
# Set all station beams to no logical bands, point statically at zenith
#
for trl, subarray_beam_proxy in self._subarray_beams.items():
self.logger.debug(f"deconfigure subarray beam {trl}")
self._configuring_resources.add(trl)
proxy_task_status, response = subarray_beam_proxy.deconfigure()
if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED):
task_status = TaskStatus.FAILED
self._is_configured = False
return task_status
def _abort_callback(
self: SubarrayComponentManager,
*,
status: Optional[TaskStatus] = None,
exception: Optional[Exception] = None,
**kwargs: Any,
) -> None:
if exception is not None:
self.logger.error(f"abort_tasks raised exception: {repr(exception)}")
if status == TaskStatus.COMPLETED:
self.logger.debug("abort_tasks has finished, setting flag")
self._abort_complete.set()
[docs]
@check_communicating
def abort( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Abort the observation.
:param task_callback: callback to be called when the status of
the command changes
:return: A task status and response message.
"""
# We have to spin up a separate thread such that we skip the queue which
# we would append to if we used submit_task
threading.Thread(
target=self._abort,
kwargs={"task_callback": task_callback},
).start()
return (TaskStatus.IN_PROGRESS, "Abort has started")
[docs]
def abort_device(
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Abort only this device, for use in RestartSubarray().
:param task_callback: callback to be called when the status of
the command changes
:return: A task status and response message.
"""
# We have to spin up a separate thread such that we skip the queue which
# we would append to if we used submit_task
threading.Thread(
target=self._abort,
kwargs={"cascade": False, "task_callback": task_callback},
).start()
return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort( # type: ignore[override]
self: SubarrayComponentManager,
cascade: bool = True,
task_callback: Optional[Callable] = None,
) -> None:
"""
Abort command execution.
:param cascade: whether or not to abort sub-devices.
:param task_callback: Update task state, defaults to None.
"""
with EnsureOmniThread():
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
task_status = TaskStatus.COMPLETED
# This is sent in a thread, we record progress using the _abort_callback,
# which will set an Event once the task is complete.
self.abort_tasks(task_callback=self._abort_callback) # type:ignore
# Wait until the timeout for the Event to be set by abort_tasks
timeout = 10.0
if self._abort_complete.wait(timeout=timeout):
self._abort_complete.clear()
else:
self.logger.error(f"Abort timed out in {timeout} seconds.")
task_status = TaskStatus.FAILED
# Reinitialize our event. The old one could get set at any point now.
self._abort_complete = threading.Event()
# Abort sub-devices
if cascade:
if task_status == TaskStatus.COMPLETED:
self._desired_state = ObsState.ABORTED
self._configuring_resources.clear()
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code, response = subarray_beam_proxy.abort()
self.logger.debug(
f"Aborting {trl} has {ResultCode(proxy_result_code).name}"
)
if proxy_result_code in (
ResultCode.FAILED,
ResultCode.REJECTED,
ResultCode.NOT_ALLOWED,
):
task_status = TaskStatus.FAILED
break
# Wait until subdevices reach ObsState.ABORTED
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(timeout=10.0)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Abort command has " + self._task_message[task_status],
),
)
@check_communicating
def _wait_for_end_scan(self: SubarrayComponentManager) -> bool:
"""
Wait for the scan to have stopped.
:return: True if OK, False if timed out
"""
timeout = 3.0 # seconds
while timeout > 0.0:
time.sleep(0.1)
timeout -= 0.1
if all(
station_proxy.beamformer_running(self._subarray_id) is False
for station_proxy in self._stations.values()
):
return True
return False
[docs]
@check_communicating
def obsreset( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `ObsReset` slow command.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
return self.submit_task(
self._obsreset,
args=[],
task_callback=task_callback,
)
@check_communicating
def _obsreset( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Reset the observation by returning to unconfigured state.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_obsreset"):
return
task_status = TaskStatus.COMPLETED
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code = subarray_beam_proxy.obsreset()
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"ObsReset command has " + self._task_message[task_status],
),
)
[docs]
@check_communicating
def restart( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `Restart` slow command.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
return self.submit_task(
self._restart,
args=[],
task_callback=task_callback,
)
@check_communicating
def _restart( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Restart the subarray by returning to unresourced state.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_restart"):
return
task_status = TaskStatus.COMPLETED
for trl, station_proxy in self._stations.items():
proxy_result_code, _ = station_proxy.deallocate_subarray(self._subarray_id)
if proxy_result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
# Scanning related
self._scan_id = None
self._scan_start_time = None
self._scan_duration = 0.0
# Configure related
self._is_configured = False
# Allocate related
self._release_internal_resources()
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Restart command has " + self._task_message[task_status],
),
)
[docs]
@check_communicating
def send_transient_buffer(
self: SubarrayComponentManager,
argin: list[int],
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the send_transient_buffer slow task.
This method returns immediately after it is submitted for
execution.
:param argin: list of requested segments.
:param task_callback: Update task state. Defaults to None.
:return: Task status and response message.
"""
return self.submit_task(
self._send_transient_buffer,
args=[argin],
task_callback=task_callback,
)
@check_communicating
def _send_transient_buffer(
self: SubarrayComponentManager,
argin: list[int],
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Send the transient buffer.
:param argin: list of list of requested segment.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
# do stuff here
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "send_transient_buffer command completed."),
)
def _device_communication_state_changed(
self: SubarrayComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
self._communication_manager.update_communication_status(
trl, communication_state
)
def _station_power_state_changed(
self: SubarrayComponentManager,
trl: str,
power_mode: PowerState,
) -> None:
self._station_power_modes[trl] = power_mode
if self._is_assigning and all(
power_mode is not None for power_mode in self._station_power_modes.values()
):
self._is_assigning = False
def _wait_for_obs_state(
self: SubarrayComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-device ObsState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
ticks = int(timeout / 0.01) # 10 ms resolution
while self._configuring_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(0.01)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for ObsState {self._desired_state.name}"
f" resources {self._configuring_resources} "
f"failed to transition in {timeout} seconds. Attempting final poll."
)
return self._final_poll()
self.logger.debug(
f"Waited ObsState {self._desired_state.name} for"
f" {(timeout-ticks*0.01):.3f} seconds"
)
return TaskStatus.COMPLETED
def _device_obs_state_changed(
self: SubarrayComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
old_state = self._device_obs_states.get(trl, None)
if old_state:
old_name = old_state.name
else:
old_name = "NONE"
new_state = ObsState(obs_state)
self._device_obs_states[trl] = new_state
if obs_state == self._desired_state and trl in self._configuring_resources:
self._configuring_resources.remove(trl)
# if not self._configuring_resources:
# self._component_state_callback(task_completed="configure")
self.logger.debug(
f"ObsState for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._configuring_resources)} more devices"
)
def _device_obs_state_fault(
self: SubarrayComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
if obs_state != ObsState.FAULT:
self.logger.warning(
"_device_obs_state_fault called with"
f" incorrect ObsState {trl} {obs_state.name}"
)
return
self.logger.error(f"{trl} moved to FAULT.")
self._device_obs_states[trl] = ObsState(obs_state)
self._component_state_callback(obsfault=True)
def _final_poll(self: SubarrayComponentManager) -> TaskStatus:
"""
Direct check of device obsstates as a last attempt before declaring failure.
Sometimes it appears we miss change events in production, if for some reason
we haven't received all the change events we expected, double check what we
think the obsstate is, vs what it actually is on all subdevices that we need to.
:returns: a taskstatus dependent on whether or not the device was actually in
the correct state.
"""
for trl in list(self._configuring_resources):
device = (
self._station_beams.get(trl)
or self._subarray_beams.get(trl)
or self._stations.get(trl)
)
if (
device
and device._proxy
and device._proxy.obsstate == self._desired_state
):
self.logger.info(
f"Change event for {trl} was missed, device was in correct state."
)
self._component_state_callback(missed_event=True)
self._configuring_resources.remove(trl)
return (
TaskStatus.COMPLETED
if not self._configuring_resources
else TaskStatus.FAILED
)
def _get_unique_id(self: SubarrayComponentManager) -> str:
"""
Get a UID.
Retrieves a unique ID from a running SKUID service if available
otherwise returns a locally unique ID.
:return: A unique ID.
"""
if self._skuid_url:
client = SkuidClient(self._skuid_url)
# TODO: hanging on client.fetch_transaction_id()
# but this is a weak test anyhow:
# we are passing a nonexistent URL,
# which makes the client fail to fetch a transaction ID,
# and fall back to generating one locally.
# So for now I have switched over to
# generating locally in the first place.
return client.get_local_transaction_id()
# return client.fetch_transaction_id()
return SkuidClient.get_local_transaction_id()