Source code for ska_low_mccs.subarray.subarray_component_manager

#  -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
# pylint: disable = too-many-lines
"""This module implements component management for subarrays."""
from __future__ import annotations

import functools
import json
import logging
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Optional, Sequence

import katpoint
import ska_tango_base.subarray
from ska_control_model import (
    CommunicationStatus,
    ObsState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_low_mccs_common import EventSerialiser
from ska_low_mccs_common.communication_manager import CommunicationManager
from ska_low_mccs_common.component import ObsDeviceComponentManager
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
    CompositeCommandResultEvaluator,
    MccsCompositeCommandProxy,
)
from ska_ser_skuid.client import SkuidClient  # type: ignore
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from tango import AttrQuality, EnsureOmniThread

from ska_low_mccs.subarray.qa_metrics_builder import QAMetricsBuilder

__all__ = ["SubarrayComponentManager"]


class _StationProxy(ObsDeviceComponentManager):
    """A subarray's proxy to its stations."""

    # TODO: remove comments when commands implemented in station
    @check_communicating
    def deallocate_subarray(
        self: _StationProxy, subarray_id: int
    ) -> tuple[ResultCode, str]:
        """
        Deconfigure the station.

        :param subarray_id: the subarray which configuration must be removed
        :return: A task status and response message.
        """
        assert self._proxy is not None
        ([result_code], [message]) = self._proxy.DeallocateSubarray(subarray_id)
        return (ResultCode(result_code), message)

    @check_communicating
    def end_scan(self: _StationProxy, subarray_id: int) -> tuple[TaskStatus, str]:
        """
        Stop the subarray beam scanning.

        :param subarray_id: the subarray ID which scan is stopped
        :return: A task status and response message.
        """
        assert self._proxy is not None

        ([result_code], unique_id) = self._proxy.EndScan(subarray_id)
        return (result_code, unique_id)

    @check_communicating
    def beamformer_running(self: _StationProxy, subarray_id: int) -> bool:
        """
        Check that the beamformer is running for given subarray.

        :param subarray_id: The ID of the subarray to be checked
        :return: beamformer running status
        """
        assert self._proxy is not None
        return self._proxy.BeamformerRunningForSubarray(subarray_id)

    @check_communicating
    def stop_tracking_all(self: _StationProxy) -> ResultCode:
        """
        Stop tracking.

        :return: a result code.
        """
        assert self._proxy is not None
        return self._proxy.StopTrackingAll()

    @check_communicating
    def set_csp_ingest(self: _StationProxy, argin: str) -> ResultCode:
        """
        Configure link for beam data packets to CSP.

        :param argin: json dictionary with optional keywords:

            * destination_ip - (string) Destination IP
            * source_port - (int) Source port for integrated data streams
            * destination_port - (int) Destination port for integrated data streams

        :return: result code of SetCspIngest
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SetCspIngest(argin)
        return result_code

    @check_communicating
    def reset_csp_ingest(self: _StationProxy) -> tuple[list[ResultCode], list[str]]:
        """
        Reset link for beam data packets to CSP to defaults.

        :return: result code of ResetCspIngest
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker
        return self._proxy._device.ResetCspIngest()


class _SubarrayBeamProxy(ObsDeviceComponentManager):
    """A subarray's proxy to its subarray beams."""

    # pylint: disable=too-many-arguments
    def __init__(
        self: _SubarrayBeamProxy,
        trl: str,
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
        qa_metric_callback: Callable[[str], None],
        event_serialiser: Optional[EventSerialiser] = None,
    ) -> None:
        self._qa_metric_callback = qa_metric_callback
        super().__init__(
            trl,
            logger,
            communication_state_callback,
            component_state_callback,
            event_serialiser=event_serialiser,
        )

    @check_communicating
    @check_on
    def configure(
        self: _SubarrayBeamProxy, configuration: dict
    ) -> tuple[TaskStatus, str]:
        """
        Configure the subarray beam.

        :param configuration: the configuration to be applied to this
            subarray beam
        :return: A task status and response message.
        """
        assert self._proxy is not None
        configuration_str = json.dumps(configuration)
        (result_code, unique_id) = self._proxy.Configure(configuration_str)
        return (result_code, unique_id)

    def get_change_event_callbacks(self: _SubarrayBeamProxy) -> dict[str, Callable]:
        return {
            **super().get_change_event_callbacks(),
            "qualityAssuranceMetrics": self._qa_changed,
        }

    def _qa_changed(
        self: _SubarrayBeamProxy,
        attr_name: str,
        attr_value: str,
        attr_quality: AttrQuality,
    ) -> None:
        if attr_name.lower() != "qualityassurancemetrics":
            self._logger.warning(
                f"_qa_changed callback called by unexpected callback {attr_name}."
            )
            return
        self._qa_metric_callback(attr_value)

    @check_communicating
    @check_on
    def scan(
        self: _SubarrayBeamProxy,
        scan_id: int,
        start_time: Optional[str],
        duration: float,
    ) -> tuple[TaskStatus, str]:
        """
        Start the subarray beam scanning.

        :param scan_id: the id of the scan
        :param start_time: UTC time for begin of scan, None for immediate start
        :param duration: Scan duration in seconds. 0.0 or omitted means forever
        :return: A task status and response message.
        """
        assert self._proxy is not None
        scan_arg: dict[str, int | float | str] = {
            "scan_id": scan_id,
            "duration": duration,
        }
        if start_time is not None:
            scan_arg["start_time"] = start_time

        ([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def release_all(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        Release subarray beam resources.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.ReleaseAllResources()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def deconfigure(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        De-configure the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.End()
        return (result_code, unique_id)

    @check_communicating
    def end_scan(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        End the current scan.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.EndScan()
        return (result_code, unique_id)

    @check_communicating
    def abort(
        self: _SubarrayBeamProxy, task_callback: TaskCallbackType | None = None
    ) -> tuple[TaskStatus, str]:
        """
        Abort the current scan associated with the subarray beam.

        :param task_callback: Update task state, defaults to None

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.Abort()
        return (result_code, unique_id)

    @check_communicating
    def obsreset(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        Reset to IDLE the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.ObsReset()
        return (result_code, unique_id)

    @check_communicating
    def restart(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
        """
        Restart to EMPTY the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.Restart()
        return (result_code, unique_id)


class _StationBeamProxy(ObsDeviceComponentManager):
    """A subarray's proxy to its station beams."""

    @check_communicating
    @check_on
    def configure(
        self: _StationBeamProxy, configuration: dict
    ) -> tuple[TaskStatus, str]:
        """
        Configure the station beam.

        :param configuration: the configuration to be applied to this
            station beam
        :return: A task status and response message.
        """
        assert self._proxy is not None
        configuration_str = json.dumps(configuration)
        (result_code, unique_id) = self._proxy.Configure(configuration_str)
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def scan(
        self: _StationBeamProxy,
        scan_id: int,
        start_time: Optional[str],
        duration: float,
    ) -> tuple[TaskStatus, str]:
        """
        Start the subarray beam scanning.

        :param scan_id: the id of the scan
        :param start_time: the start time of the scan
        :param duration: Scan duration in seconds. 0.0 or omitted means forever
        :return: A task status and response message.
        """
        assert self._proxy is not None
        scan_arg: dict[str, int | float | str] = {
            "scan_id": scan_id,
            "duration": duration,
        }
        if start_time is not None:
            scan_arg["start_time"] = start_time

        ([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def deconfigure(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
        """
        Configure the station beam.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.End()
        return (result_code, unique_id)

    @check_communicating
    @check_on
    def release_all(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
        """
        Release station beam resources.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.ReleaseAllResources()
        return (result_code, unique_id)

    @check_communicating
    def end_scan(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
        """
        Release station beam resources.

        :return: A task status and response message.
        """
        assert self._proxy is not None
        (result_code, unique_id) = self._proxy.EndScan()
        return (result_code, unique_id)


# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs] class SubarrayComponentManager( TaskExecutorComponentManager, ska_tango_base.subarray.SubarrayComponentManager, ): """A component manager for a subarray.""" # messages for long running command task callback _task_message = { TaskStatus.COMPLETED: "completed OK.", TaskStatus.ABORTED: "been aborted.", TaskStatus.FAILED: "timed out.", } _task_to_result = { TaskStatus.COMPLETED: ResultCode.OK, TaskStatus.ABORTED: ResultCode.ABORTED, TaskStatus.FAILED: ResultCode.FAILED, } # pylint: disable=too-many-arguments
[docs] def __init__( self: SubarrayComponentManager, subarray_id: int, skuid_url: str, logger: logging.Logger, obs_command_timeout: int, communication_state_callback: Callable[[CommunicationStatus], None], component_state_callback: Callable[..., None], default_solution_type: str = "fitted", event_serialiser: Optional[EventSerialiser] = None, ) -> None: """ Initialise a new instance. :param subarray_id: the subarray ID for this device :param skuid_url: The address at which a SKUID service is running. :param obs_command_timeout: the default timeout for obs commands in seconds. :param logger: the logger to be used by this object. :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_state_callback: callback to be called when the component state changes. :param default_solution_type: default calibration solution type used when an aperture in Configure does not specify solution_type. :param event_serialiser: the event serialiser to be used by this object. """ self._event_serialiser = event_serialiser self._power_state_lock = threading.RLock() self._device_communication_state_lock = threading.Lock() self._component_state_callback: Callable[..., None] self._component_state_callback = component_state_callback self._abort_complete = threading.Event() self._obs_command_timeout = obs_command_timeout self._device_communication_states: dict[str, CommunicationStatus] = {} self._station_power_modes: dict[str, Optional[PowerState]] = {} self._device_obs_states: dict[str, Optional[ObsState]] = {} self._is_assigning = False self._is_configured = False self._configuring_resources: set[str] = set() # these resources should go self._desired_state = ObsState.READY # to this end state self._stations: dict[str, _StationProxy] = {} self._subarray_beams: dict[str, _SubarrayBeamProxy] = {} self._station_beams: dict[str, _StationBeamProxy] = {} self._apertures: dict[int, dict[str, str]] = defaultdict(dict) self._subarray_beam_trl: dict[int, str] = {} self._trl_to_subarray_beam_id: dict[str, int] = {} self._beam_to_station: dict[str, str] = {} self._calibration_ids: dict[str, str] = {} self._solution_types: dict[str, str] = {} self._number_of_channels = 0 self._subarray_id = subarray_id self._skuid_url = skuid_url self._default_solution_type = default_solution_type self._scan_id: Optional[int] = None self._scan_start_time: str | None = None self._scan_duration = 0.0 self.reset_csp_ingest_on_scan: bool = True self._qa_metric_builder = QAMetricsBuilder(logger) super().__init__( logger, communication_state_callback, component_state_callback, resources_changed=None, configured_changed=None, scanning_changed=None, task_completed=None, obsfault=None, obsstate_changed=None, station_power=None, power=None, fault=None, qa_metrics=json.dumps(self._qa_metric_builder.build()), ) self._communication_manager = CommunicationManager( self._update_communication_state, self._update_component_state, self.logger )
[docs] def start_communicating(self: SubarrayComponentManager) -> None: """Establish communication with the station components.""" self._communication_manager.start_communicating()
[docs] def stop_communicating(self: SubarrayComponentManager) -> None: """Break off communication with the station components.""" self._communication_manager.stop_communicating()
@property def max_executing_tasks(self) -> int: """ Get the max number of tasks that can be executing at once. :return: max number of simultaneously executing tasks. """ return 2 @property def scan_id(self: SubarrayComponentManager) -> Optional[int]: """ Return the scan id, or None if a scan is not current. :return: the scan id, or None if a scan is not current. """ return self._scan_id @property def subarray_beam_trls(self: SubarrayComponentManager) -> set[str]: """ Return the set of TRLs of subarray beams assigned to this subarray. :return: the set of TRLs of subarray beams assigned to this subarray. """ return set(self._subarray_beams.keys()) @property def station_beam_trls(self: SubarrayComponentManager) -> set[str]: """ Return the set of TRLs of station beams assigned to this subarray. :return: the set of TRLs of station beams assigned to this subarray. """ return set(self._station_beams.keys()) @property def station_trls(self: SubarrayComponentManager) -> set[str]: """ Return the set of TRLs of stations assigned to this subarray. :return: the set of TRLs of stations assigned to this subarray. """ return set(self._stations.keys()) @property def power_state(self: SubarrayComponentManager) -> Optional[PowerState]: """ Return my power state. :return: my power state """ return self._component_state["power"] def _check_aborted( self: SubarrayComponentManager, task_abort_event: Optional[threading.Event], task_callback: Optional[Callable], method_name: str, ) -> bool: if task_abort_event is None or task_callback is None: self.logger.warning(f"Cannot check {method_name} for abort status") return False if task_abort_event.is_set(): self.logger.info(f"{method_name} has been aborted") task_callback( status=TaskStatus.ABORTED, result=(ResultCode.ABORTED, f"{method_name} aborted"), ) return True return False # pylint: disable=too-many-branches, too-many-locals, too-many-statements
[docs] @check_communicating def do_assign( self: SubarrayComponentManager, subarray_id: int, subarray_beams: dict, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Assign resources to this subarray. This is just for communication and health roll-up, resource management is done by controller. :param subarray_id: ID of the subarray :param subarray_beams: resource specification for each beam :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ self._is_assigning = True if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "do_assign"): return station_trls: list[str] = [] subarray_beam_trls: list[str] = [] station_beam_trls: list[str] = [] number_of_channels = 0 self._apertures.clear() self._subarray_beam_trl.clear() self._trl_to_subarray_beam_id.clear() self._number_of_channels = 0 for beam in subarray_beams: subarray_beam_id = beam.get("subarray_beam_id", 1) subarray_beam_trl = beam.get( "subarray_beam_trl", f"low-mccs/subarraybeam/{subarray_id:02d}-{subarray_beam_id:02d}", ) self._subarray_beam_trl[subarray_beam_id] = subarray_beam_trl self._trl_to_subarray_beam_id[subarray_beam_trl] = subarray_beam_id subarray_beam_trls.append(subarray_beam_trl) number_of_channels += beam.get("number_of_channels") for aperture in beam.get("apertures", []): aperture_id = aperture.get("aperture_id", None) station_id = aperture.get("station_id") if aperture_id is None: aperture_id = f"AP{station_id}.1" station_trl = aperture.get("station_trl", None) if station_trl is not None and station_trl not in station_trls: station_trls.append(station_trl) station_beam_trl = aperture.get("station_beam_trl", None) station_beam_trls.append(station_beam_trl) self._beam_to_station[station_beam_trl] = station_trl self._apertures[subarray_beam_id][aperture_id] = station_beam_trl # Define beam for qa_metrics. self._qa_metric_builder.define_beam( str(subarray_beam_id), [str(k) for k in self._apertures], ) if self._check_aborted(task_abort_event, task_callback, "do_assign"): return self._number_of_channels = number_of_channels station_trls_to_add = sorted(station_trls) - self._stations.keys() subarray_beam_trls_to_add = subarray_beam_trls - self._subarray_beams.keys() station_beam_trls_to_add = station_beam_trls - self._station_beams.keys() trls_to_add = station_trls_to_add.union( subarray_beam_trls_to_add, station_beam_trls_to_add ) self._desired_state = ObsState.IDLE self._configuring_resources.clear() if trls_to_add: for trl in trls_to_add: self._device_communication_states[trl] = CommunicationStatus.DISABLED self._device_obs_states[trl] = ObsState.EMPTY if self._check_aborted(task_abort_event, task_callback, "do_assign"): return for trl in station_trls_to_add: self._stations[trl] = _StationProxy( trl, self.logger, functools.partial(self._device_communication_state_changed, trl), functools.partial(self._component_state_callback, trl=trl), event_serialiser=self._event_serialiser, ) for trl in subarray_beam_trls_to_add: self._subarray_beams[trl] = _SubarrayBeamProxy( trl, self.logger, functools.partial(self._device_communication_state_changed, trl), functools.partial( self._component_state_callback, trl=trl, ), functools.partial( self._qa_attribute_changed, trl, ), event_serialiser=self._event_serialiser, ) self._configuring_resources.add(trl) for trl in station_beam_trls_to_add: self._station_beams[trl] = _StationBeamProxy( trl, self.logger, functools.partial(self._device_communication_state_changed, trl), functools.partial( self._component_state_callback, trl=trl, ), event_serialiser=self._event_serialiser, ) self._configuring_resources.add(trl) self._communication_manager.replace_device_pool( self._stations, self._station_beams, self._subarray_beams ) self._is_assigning = True self._component_state_callback( resources_changed=[ set(self._stations.keys()), set(self._subarray_beams.keys()), set(self._station_beams.keys()), ] ) if self._check_aborted(task_abort_event, task_callback, "do_assign"): return # wait for all subdevices to reach ObsState.IDLE task_result = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "do_assign"): return self._is_assigning = False self._is_configured = False if task_callback is not None: task_callback( status=task_result, result=( self._task_to_result[task_result], "Assign resources " + self._task_message[task_result], ), )
def _qa_attribute_changed( self: SubarrayComponentManager, trl: str, qa_metric: str, ) -> None: subarray_beam_id: int = self._trl_to_subarray_beam_id[trl] loaded_qa_metric = json.loads(qa_metric) if qa_metric else {} self._qa_metric_builder.update_beam_metrics( str(subarray_beam_id), loaded_qa_metric ) processed_metrics = self._qa_metric_builder.build() self._update_component_state(qa_metrics=json.dumps(processed_metrics)) @property # type: ignore[misc] @check_communicating def assigned_resources( self: SubarrayComponentManager, ) -> list[str]: """ Return this subarray's resources. :return: this subarray's resources. """ return list( set(self._stations) | set(self._subarray_beams) | set(self._station_beams), ) @property # type: ignore[misc] @check_communicating def assigned_resources_dict( self: SubarrayComponentManager, ) -> dict[str, Sequence[Any]]: """ Return a dictionary of resource types and TRLs. :return: this subarray's resources. """ return { "stations": sorted(self._stations.keys()), "subarray_beams": sorted(self._subarray_beams.keys()), "station_beams": sorted(self._station_beams.keys()), "apertures": sorted( [ aperture_id for apertures in self._apertures.values() for aperture_id in apertures ] ), "channels": [self._number_of_channels], }
[docs] @check_communicating def release( # type: ignore[override] # pylint: disable=arguments-differ self: SubarrayComponentManager, beam_resources: dict[str, list[str]], task_callback: Optional[Callable], task_abort_event: Optional[threading.Event], ) -> None: """ Release resources from this subarray. :param beam_resources: list of resource TRLs to release. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :raises NotImplementedError: because MCCS Subarray cannot perform a partial release of beam_resources. """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "release"): return if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.OK, ( "Not Implemented: MCCS Subarray cannot" "partially release resources." ), ), ) raise NotImplementedError("MCCS Subarray cannot partially release resources.")
[docs] @check_communicating def release_all( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable], task_abort_event: Optional[threading.Event] = None, ) -> None: """ Release all resources from this subarray. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "release_all"): return self._desired_state = ObsState.EMPTY self._configuring_resources.clear() # Release resources in subelements, and check return codes self.logger.debug("Deallocating resources in stations") for station_proxy in self._stations.values(): station_proxy.deallocate_subarray(self._subarray_id) self.logger.debug("Deallocating resources in subarray beams") for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) subarray_beam_proxy.release_all() if self._check_aborted(task_abort_event, task_callback, "release_all"): return # Wait for beams to go to EMPTY task_status = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "release_all"): return self._release_internal_resources() if task_status == TaskStatus.FAILED: self._component_state_callback(obsfault=True) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "ReleaseAllResources has " + self._task_message[task_status], ), )
[docs] def cleanup(self: SubarrayComponentManager) -> None: """ Cleanup resources held by the component manager. This includes cleaning up resources held by all sub-component managers. """ self._delete_proxies() self._communication_manager.shutdown() super().cleanup()
def _delete_proxies(self) -> None: self.logger.debug("Deleting proxies") self._device_communication_states.clear() self._communication_manager.replace_device_pool() for station_proxy in self._stations.values(): station_proxy.cleanup() self._stations.clear() for subarray_beam_proxy in self._subarray_beams.values(): subarray_beam_proxy.cleanup() self._subarray_beams.clear() for station_beam_proxy in self._station_beams.values(): station_beam_proxy.cleanup() self._station_beams.clear() def _release_internal_resources(self) -> None: self.logger.debug("Deallocating resources") if self._stations or self._subarray_beams or self._station_beams: self._delete_proxies() self._device_obs_states.clear() self._apertures.clear() self._subarray_beam_trl.clear() self._trl_to_subarray_beam_id.clear() self._qa_metric_builder.reset() processed_metrics = self._qa_metric_builder.build() self._update_component_state(qa_metrics=json.dumps(processed_metrics)) self._beam_to_station.clear() self._calibration_ids.clear() self._solution_types.clear() self._number_of_channels = 0 self._configuring_resources.clear() self._component_state_callback( resources_changed=[ set(self._stations.keys()), set(self._subarray_beams.keys()), set(self._station_beams.keys()), ] )
[docs] @check_communicating def get_coords( # type: ignore[override] self: SubarrayComponentManager, subarray_beams: list[dict], ) -> list[dict]: """ Modify the coordinates to have correct pointing. :param subarray_beams: subarray beams :raises KeyError: coordinates are invalid :return: The modified beams with correct coords """ # while we reformat the input, check for inconsitent values subarray_beams_correct = True error_message = "" for beam in subarray_beams: beam_id = beam.get("subarray_beam_id", None) if beam_id: sub_beam_trl = self._subarray_beam_trl.get(beam_id, None) if sub_beam_trl is None: subarray_beams_correct = False error_message += ( f"\n{beam_id =} is not assigned to subarray {self._subarray_id}" ) apertures = beam.get("apertures", None) if apertures: for aperture in apertures: aperture_id = aperture.get("aperture_id", None) apertures_for_beam = ( self._apertures.get(beam_id, {}) if beam_id is not None else {} ) station_beam_trl = apertures_for_beam.get(aperture_id, None) if station_beam_trl is None: subarray_beams_correct = False error_message += ( f"\n{aperture_id =} is not assigned to subarray " f"{self._subarray_id}" ) self.logger.debug(f"Configure received, beam: {beam}") sky_coordinates = beam.pop("sky_coordinates", None) if sky_coordinates: field = { key: val for key, val in sky_coordinates.items() if key in ["timestamp", "reference_frame", "target_name"] } if sky_coordinates.get("reference_frame").casefold() in [ "topocentric", "altaz", "icrs", "galactic", ]: field["attrs"] = { key: val for key, val in sky_coordinates.items() if key in ["c1", "c1_rate", "c2", "c2_rate"] } field.setdefault("target_name", "No name provided") if sky_coordinates.get("reference_frame").casefold() == "tle": field["attrs"] = { key: val for key, val in sky_coordinates.items() if key in ["line1", "line2"] } field.setdefault("target_name", "No name provided") beam["field"] = field field = beam["field"] if field.get("reference_frame").casefold() == "tle": line1 = field["attrs"]["line1"] line2 = field["attrs"]["line2"] try: katpoint.Target(f"tle, {line1}, {line2}") except ValueError as err: self.logger.error(f"tle format incorrect: {err}") error_message += f"tle format incorrect: {err}" subarray_beams_correct = False if not subarray_beams_correct: raise KeyError(error_message) return subarray_beams
# pylint: disable=too-many-return-statements
[docs] @check_communicating # pylint: disable-next=arguments-differ def configure( # type: ignore[override] # noqa: C901 self: SubarrayComponentManager, subarray_beams: list[dict[str, Any]], task_callback: Optional[Callable], task_abort_event: Optional[threading.Event] = None, transaction_id: Optional[str] = "", ) -> None: """ Configure the resources for a scan. :param subarray_beams: the beam configurations to be applied :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :param transaction_id: the transaction id for the configuration Will be generated if not provided. """ try: new_beams = self.get_coords(subarray_beams) except KeyError as err: msg = f"Configuration values supplied are incorrect: {err}" self.logger.error(msg) if task_callback is not None: task_callback( status=TaskStatus.REJECTED, result=( ResultCode.REJECTED, "Configuration values supplied are incorrect", ), ) return def _report_status(status: TaskStatus, **kwargs: Any) -> None: if task_callback is not None: task_callback(status=status, **kwargs) def __fail(result_code: ResultCode, message: str) -> None: """ Handle failure reporting and state update. :param result_code: the ResultCode to report. :param message: For information. """ self._component_state_callback(obsfault=True) _report_status( status=TaskStatus.FAILED, result=(result_code, message), ) _report_status(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "configure"): return if transaction_id == "" or transaction_id is None: transaction_id = self._get_unique_id() was_empty = not self._is_configured for station_trl, station_proxy in self._stations.items(): station_proxy.deallocate_subarray(self._subarray_id) if self._check_aborted(task_abort_event, task_callback, "configure"): return configuration: dict[str, dict] = {} for beam in new_beams: beam_id = beam.get("subarray_beam_id") assert isinstance(beam_id, int) beam_trl = self._subarray_beam_trl.get(beam_id, None) if beam_trl is None: self.logger.error(f"undefined beam {beam_id}") continue self.logger.debug(f"Processing beam {beam_id} ({beam_trl})") beam["subarray_id"] = self._subarray_id for aperture in beam["apertures"]: aperture_id = aperture["aperture_id"] apertures_for_beam = self._apertures.get(beam_id, {}) station_beam_trl = apertures_for_beam.get(aperture_id, None) calibration_id = aperture.get("calibration_id", "") solution_type = aperture.get( "solution_type", self._default_solution_type ) if station_beam_trl is None: self.logger.error(f"undefined aperture {aperture_id}") continue aperture["station_beam_trl"] = station_beam_trl station_trl = self._beam_to_station[station_beam_trl] self._calibration_ids[station_trl] = calibration_id self._solution_types[station_trl] = solution_type configuration[beam_trl] = beam if self._check_aborted(task_abort_event, task_callback, "configure"): return # Configure subarray beams. if configuration: result_code, message = self._configure_subarray_beams( configuration, task_abort_event=task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "configure"): return if result_code != ResultCode.OK: __fail(result_code, message) return self._wait_for_obs_state(0.0, task_abort_event) if self._check_aborted(task_abort_event, task_callback, "configure"): return result_code, message = self._configure_stations( transaction_id=transaction_id, task_abort_event=task_abort_event, ) if self._check_aborted(task_abort_event, task_callback, "configure"): return if result_code != ResultCode.OK: __fail(result_code, message) return # Obsstate update self._is_configured = True if was_empty: self._component_state_callback(configured_changed=True) _report_status( status=TaskStatus.COMPLETED, result=( self._task_to_result[TaskStatus.COMPLETED], "Configure has " + self._task_message[TaskStatus.COMPLETED], ), )
def _configure_stations( self: SubarrayComponentManager, transaction_id: str, task_abort_event: Optional[threading.Event], ) -> tuple[ResultCode, str]: """ Configure the station resources for a scan. :param transaction_id: the transaction id for the configuration :param task_abort_event: abort event to cancel waiting for result. :return: a tuple containing a result code and a json serialised string """ apply_configuration_commands = MccsCompositeCommandProxy( self.logger, task_abort_event=task_abort_event ) for station_trl in self._stations: argin: dict[str, Any] = { "transaction_id": transaction_id, "subarray_id": self._subarray_id, "solution_type": self._solution_types.get( station_trl, self._default_solution_type ), } if self._calibration_ids[station_trl]: argin["calibration_id"] = self._calibration_ids[station_trl] apply_configuration_commands += MccsCommandProxy( device_name=station_trl, command_name="ApplyConfiguration", logger=self.logger, default_args=json.dumps(argin), ) result, message = apply_configuration_commands( command_evaluator=CompositeCommandResultEvaluator(), timeout=self._obs_command_timeout, ) return ResultCode(result), message def _configure_subarray_beams( self: SubarrayComponentManager, subarray_beam_configuration: dict[str, Any], task_abort_event: Optional[threading.Event], ) -> tuple[ResultCode, str]: """ Configure the station beam resources for a scan. :param subarray_beam_configuration: the subarray beam configuration to be applied :param task_abort_event: abort event to cancel waiting for result. :return: a tuple containing a result code and json serialised string. """ self._configuring_resources.clear() self._desired_state = ObsState.READY configure_subarraybeams_command = MccsCompositeCommandProxy( self.logger, task_abort_event=task_abort_event ) for ( subarray_beam_trl, configuration, ) in subarray_beam_configuration.items(): configure_subarraybeams_command += MccsCommandProxy( device_name=subarray_beam_trl, command_name="Configure", logger=self.logger, default_args=json.dumps(configuration), ) self._configuring_resources.add(subarray_beam_trl) result_code, message = configure_subarraybeams_command( command_evaluator=CompositeCommandResultEvaluator(), timeout=self._obs_command_timeout, ) if result_code != ResultCode.OK: obj = json.loads(message) pretty = json.dumps(obj, indent=4) self.logger.error("Failed to configure subarrays with:\n%s", pretty) return ResultCode(result_code), message
[docs] @check_communicating # pylint: disable-next=arguments-differ,arguments-renamed def scan( # type: ignore[override] self: SubarrayComponentManager, scan_id: int = 0, start_time: Optional[str] = None, duration: float = 0.0, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Start scanning. :param scan_id: the id of the scan :param start_time: the start time of the scan :param duration: Scan duration in seconds. 0.0 or omitted means forever :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) self._scan_id = scan_id self._scan_start_time = start_time self._scan_duration = duration msg = "" failure_message = "" if self._check_aborted(task_abort_event, task_callback, "scan"): return if self.reset_csp_ingest_on_scan: # Reset CSPIngest to default. self.logger.debug( "Resetting CSPIngest for all stations allocated to this Subarray." ) csp_reset_result = [] for trl, station in self._stations.items(): [rc], [_] = station.reset_csp_ingest() if rc != ResultCode.OK: csp_reset_result.append(trl) if csp_reset_result: msg = ( "Scan has failed to reset CSPIngest on " f"stations: {csp_reset_result}" ) self.logger.error(msg) self._desired_state = ObsState.SCANNING self._configuring_resources.clear() task_status = TaskStatus.COMPLETED # if everything below does not fail for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) proxy_result_code = subarray_beam_proxy.scan(scan_id, start_time, duration) if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED # wait for beams to start if task_status == TaskStatus.COMPLETED: task_status = self._wait_for_obs_state( self._obs_command_timeout, task_abort_event ) if self._check_aborted(task_abort_event, task_callback, "scan"): return # Start scan on stations if task_status == TaskStatus.COMPLETED: result_code, message = self.__start_station_scan( subarray_id=self._subarray_id, scan_id=scan_id, start_time=start_time, duration=duration, ) self.logger.warning(message) if result_code != ResultCode.OK: task_status = TaskStatus.FAILED failure_message = message if self._check_aborted(task_abort_event, task_callback, "scan"): return if task_status == TaskStatus.COMPLETED: self._component_state_callback(scanning_changed=True) else: self._component_state_callback(obsfault=True) if task_callback is not None: result_message = ( failure_message if task_status == TaskStatus.FAILED and failure_message else "Scan has " + self._task_message[task_status] ) task_callback( status=task_status, result=( self._task_to_result[task_status], result_message + (f" {msg}" if msg else ""), ), )
def __start_station_scan( self: SubarrayComponentManager, subarray_id: int, scan_id: int, start_time: Optional[str], duration: float, ) -> tuple[ResultCode, str]: scan_arg: dict[str, int | float | str] = { "subarray_id": subarray_id, "scan_id": scan_id, "duration": duration, } if start_time is not None: scan_arg["start_time"] = start_time start_scan_command = MccsCompositeCommandProxy(self.logger) for station_trl in self._stations: start_scan_command += MccsCommandProxy( station_trl, "Scan", self.logger, default_args=json.dumps(scan_arg), ) result, message = start_scan_command( command_evaluator=CompositeCommandResultEvaluator(), timeout=self._obs_command_timeout, ) return ResultCode(result), message def __end_station_scan( self: SubarrayComponentManager, subarray_id: int, ) -> tuple[ResultCode, str]: end_scan_command = MccsCompositeCommandProxy(self.logger) for station_trl in self._stations: end_scan_command += MccsCommandProxy( station_trl, "EndScan", self.logger, default_args=subarray_id, ) result, message = end_scan_command( command_evaluator=CompositeCommandResultEvaluator(), timeout=self._obs_command_timeout, ) return ResultCode(result), message
[docs] def end_scan( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Execute the `end_scan` slow command. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "end_scan"): return self._scan_id = None self._scan_start_time = None self._scan_duration = 0.0 # Stuff goes here. This should tell the subarray beam device to # stop scanning. task_status = TaskStatus.COMPLETED for subarray_beam_proxy in self._subarray_beams.values(): proxy_result_code = subarray_beam_proxy.end_scan() if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED # Start scan on stations if task_status == TaskStatus.COMPLETED: result_code, message = self.__end_station_scan( subarray_id=self._subarray_id, ) self.logger.warning(message) if result_code != ResultCode.OK: task_status = TaskStatus.FAILED # check that beamformer has stopped in all stations if self._wait_for_end_scan(): self.logger.info("Beamformer stopped") else: self.logger.error("Beamformer still running after 3 seconds") task_status = TaskStatus.FAILED if self._check_aborted(task_abort_event, task_callback, "end_scan"): return if task_status == TaskStatus.COMPLETED: self._component_state_callback(scanning_changed=False) else: self._component_state_callback(obsfault=True) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "EndScan has " + self._task_message[task_status], ), )
[docs] @check_communicating def deconfigure( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Deconfigure resources. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ self.logger.debug("deconfigure task invoked") if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "deconfigure"): return self._desired_state = ObsState.IDLE task_status = self._deconfigure_subelements() if task_status == TaskStatus.COMPLETED: self._wait_for_obs_state(self._obs_command_timeout, task_abort_event) if self._check_aborted(task_abort_event, task_callback, "deconfigure"): return self._component_state_callback(configured_changed=False) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Subarray Deconfigure has " + self._task_message[task_status], ), )
@check_communicating def _deconfigure_subelements( self: SubarrayComponentManager, ) -> TaskStatus: """ Deconfigure resources. :return: result code """ self.logger.debug("deconfigure subelements invoked") task_status = TaskStatus.COMPLETED for trl, station_proxy in self._stations.items(): self.logger.debug(f"deconfigure station {trl}") proxy_result_code, _ = station_proxy.deallocate_subarray(self._subarray_id) if proxy_result_code != ResultCode.OK: task_status = TaskStatus.FAILED # # Set all station beams to no logical bands, point statically at zenith # for trl, subarray_beam_proxy in self._subarray_beams.items(): self.logger.debug(f"deconfigure subarray beam {trl}") self._configuring_resources.add(trl) proxy_task_status, response = subarray_beam_proxy.deconfigure() if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED): task_status = TaskStatus.FAILED self._is_configured = False return task_status def _abort_callback( self: SubarrayComponentManager, *, status: Optional[TaskStatus] = None, exception: Optional[Exception] = None, **kwargs: Any, ) -> None: if exception is not None: self.logger.error(f"abort_tasks raised exception: {repr(exception)}") if status == TaskStatus.COMPLETED: self.logger.debug("abort_tasks has finished, setting flag") self._abort_complete.set()
[docs] @check_communicating def abort( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> tuple[TaskStatus, str]: """ Abort the observation. :param task_callback: callback to be called when the status of the command changes :param task_abort_event: Check for abort, defaults to None :return: A task status and response message. """ # We have to spin up a separate thread such that we skip the queue which # we would append to if we used submit_task threading.Thread( target=self._abort, kwargs={ "task_callback": task_callback, "task_abort_event": task_abort_event, }, ).start() return (TaskStatus.IN_PROGRESS, "Abort has started")
[docs] def abort_device( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> Any: """ Abort only this device, for use in RestartSubarray(). :param task_callback: callback to be called when the status of the command changes :param task_abort_event: Check for abort, defaults to None :return: A task status and response message. """ # We have to spin up a separate thread such that we skip the queue which # we would append to if we used submit_task threading.Thread( target=self._abort, kwargs={ "cascade": False, "task_callback": task_callback, "task_abort_event": task_abort_event, }, ).start() return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, cascade: bool = True, ) -> None: """ Abort command execution. :param cascade: whether or not to abort sub-devices. :param task_callback: Update task state, defaults to None. :param task_abort_event: Check for abort, defaults to None """ with EnsureOmniThread(): if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) task_status = TaskStatus.COMPLETED # This is sent in a thread, we record progress using the _abort_callback, # which will set an Event once the task is complete. self.abort_tasks(task_callback=self._abort_callback) # type:ignore # Wait until the timeout for the Event to be set by abort_tasks timeout = 10.0 if self._abort_complete.wait(timeout=timeout): self._abort_complete.clear() else: self.logger.error(f"Abort timed out in {timeout} seconds.") task_status = TaskStatus.FAILED # Reinitialize our event. The old one could get set at any point now. self._abort_complete = threading.Event() # Abort sub-devices if cascade: if task_status == TaskStatus.COMPLETED: self._desired_state = ObsState.ABORTED self._configuring_resources.clear() for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) proxy_result_code, response = subarray_beam_proxy.abort() self.logger.debug( f"Aborting {trl} has {ResultCode(proxy_result_code).name}" ) if proxy_result_code in ( ResultCode.FAILED, ResultCode.REJECTED, ResultCode.NOT_ALLOWED, ): task_status = TaskStatus.FAILED break # Wait until subdevices reach ObsState.ABORTED if task_status == TaskStatus.COMPLETED: task_status = self._wait_for_obs_state(timeout=10.0) if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Abort command has " + self._task_message[task_status], ), ) @check_communicating def _wait_for_end_scan(self: SubarrayComponentManager) -> bool: """ Wait for the scan to have stopped. :return: True if OK, False if timed out """ timeout = 3.0 # seconds while timeout > 0.0: time.sleep(0.1) timeout -= 0.1 if all( station_proxy.beamformer_running(self._subarray_id) is False for station_proxy in self._stations.values() ): return True return False
[docs] @check_communicating def obsreset( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Reset the observation by returning to unconfigured state. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "obsreset"): return task_status = TaskStatus.COMPLETED for trl, subarray_beam_proxy in self._subarray_beams.items(): self._configuring_resources.add(trl) proxy_result_code = subarray_beam_proxy.obsreset() if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED): task_status = TaskStatus.FAILED if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "ObsReset command has " + self._task_message[task_status], ), )
[docs] @check_communicating def restart( # type: ignore[override] self: SubarrayComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Restart the subarray by returning to unresourced state. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) if self._check_aborted(task_abort_event, task_callback, "restart"): return task_status = TaskStatus.COMPLETED for trl, station_proxy in self._stations.items(): proxy_result_code, _ = station_proxy.deallocate_subarray(self._subarray_id) if proxy_result_code != ResultCode.OK: task_status = TaskStatus.FAILED # Scanning related self._scan_id = None self._scan_start_time = None self._scan_duration = 0.0 # Configure related self._is_configured = False # Allocate related self._release_internal_resources() if task_callback is not None: task_callback( status=task_status, result=( self._task_to_result[task_status], "Restart command has " + self._task_message[task_status], ), )
[docs] @check_communicating def send_transient_buffer( self: SubarrayComponentManager, argin: list[int], task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Send the transient buffer. :param argin: list of list of requested segment. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) self.logger.info("Command currently has no implementation.") # do stuff here if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.OK, "send_transient_buffer command completed."), )
def _device_communication_state_changed( self: SubarrayComponentManager, trl: str, communication_state: CommunicationStatus, ) -> None: self._communication_manager.update_communication_status( trl, communication_state ) def _station_power_state_changed( self: SubarrayComponentManager, trl: str, power_mode: PowerState, ) -> None: self._station_power_modes[trl] = power_mode if self._is_assigning and all( power_mode is not None for power_mode in self._station_power_modes.values() ): self._is_assigning = False def _wait_for_obs_state( self: SubarrayComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for sub-device ObsState to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ ticks = int(timeout / 0.01) # 10 ms resolution while self._configuring_resources: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(0.01) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting for ObsState {self._desired_state.name}" f" resources {self._configuring_resources} " f"failed to transition in {timeout} seconds. Attempting final poll." ) return self._final_poll() self.logger.debug( f"Waited ObsState {self._desired_state.name} for" f" {(timeout-ticks*0.01):.3f} seconds" ) return TaskStatus.COMPLETED def _device_obs_state_changed( self: SubarrayComponentManager, trl: str, obs_state: ObsState, ) -> None: old_state = self._device_obs_states.get(trl, None) if old_state: old_name = old_state.name else: old_name = "NONE" new_state = ObsState(obs_state) self._device_obs_states[trl] = new_state if obs_state == self._desired_state and trl in self._configuring_resources: self._configuring_resources.remove(trl) # if not self._configuring_resources: # self._component_state_callback(task_completed="configure") self.logger.debug( f"ObsState for {trl} changed: {old_name} -> {new_state.name}, " f"waiting for {len(self._configuring_resources)} more devices" ) def _device_obs_state_fault( self: SubarrayComponentManager, trl: str, obs_state: ObsState, ) -> None: if obs_state != ObsState.FAULT: self.logger.warning( "_device_obs_state_fault called with" f" incorrect ObsState {trl} {obs_state.name}" ) return self.logger.error(f"{trl} moved to FAULT.") self._device_obs_states[trl] = ObsState(obs_state) self._component_state_callback(obsfault=True) def _final_poll(self: SubarrayComponentManager) -> TaskStatus: """ Direct check of device obsstates as a last attempt before declaring failure. Sometimes it appears we miss change events in production, if for some reason we haven't received all the change events we expected, double check what we think the obsstate is, vs what it actually is on all subdevices that we need to. :returns: a taskstatus dependent on whether or not the device was actually in the correct state. """ for trl in list(self._configuring_resources): device = ( self._station_beams.get(trl) or self._subarray_beams.get(trl) or self._stations.get(trl) ) if ( device and device._proxy and device._proxy.obsstate == self._desired_state ): self.logger.info( f"Change event for {trl} was missed, device was in correct state." ) self._component_state_callback(missed_event=True) self._configuring_resources.remove(trl) return ( TaskStatus.COMPLETED if not self._configuring_resources else TaskStatus.FAILED ) def _get_unique_id(self: SubarrayComponentManager) -> str: """ Get a UID. Retrieves a unique ID from a running SKUID service if available otherwise returns a locally unique ID. :return: A unique ID. """ if self._skuid_url: client = SkuidClient(self._skuid_url) # TODO: hanging on client.fetch_transaction_id() # but this is a weak test anyhow: # we are passing a nonexistent URL, # which makes the client fail to fetch a transaction ID, # and fall back to generating one locally. # So for now I have switched over to # generating locally in the first place. return client.get_local_transaction_id() # return client.fetch_transaction_id() return SkuidClient.get_local_transaction_id()