# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
# pylint: disable = too-many-lines
"""This module implements component management for subarrays."""
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Optional, Sequence
import katpoint
import ska_tango_base.subarray
from ska_control_model import (
CommunicationStatus,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_low_mccs_common import EventSerialiser
from ska_low_mccs_common.communication_manager import CommunicationManager
from ska_low_mccs_common.component import ObsDeviceComponentManager
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
CompositeCommandResultEvaluator,
MccsCompositeCommandProxy,
)
from ska_ser_skuid.client import SkuidClient # type: ignore
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from tango import AttrQuality, EnsureOmniThread
from ska_low_mccs.subarray.qa_metrics_builder import QAMetricsBuilder
__all__ = ["SubarrayComponentManager"]
class _StationProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its stations."""
# TODO: remove comments when commands implemented in station
@check_communicating
def deallocate_subarray(
self: _StationProxy, subarray_id: int
) -> tuple[ResultCode, str]:
"""
Deconfigure the station.
:param subarray_id: the subarray which configuration must be removed
:return: A task status and response message.
"""
assert self._proxy is not None
([result_code], [message]) = self._proxy.DeallocateSubarray(subarray_id)
return (ResultCode(result_code), message)
@check_communicating
def end_scan(self: _StationProxy, subarray_id: int) -> tuple[TaskStatus, str]:
"""
Stop the subarray beam scanning.
:param subarray_id: the subarray ID which scan is stopped
:return: A task status and response message.
"""
assert self._proxy is not None
([result_code], unique_id) = self._proxy.EndScan(subarray_id)
return (result_code, unique_id)
@check_communicating
def beamformer_running(self: _StationProxy, subarray_id: int) -> bool:
"""
Check that the beamformer is running for given subarray.
:param subarray_id: The ID of the subarray to be checked
:return: beamformer running status
"""
assert self._proxy is not None
return self._proxy.BeamformerRunningForSubarray(subarray_id)
@check_communicating
def stop_tracking_all(self: _StationProxy) -> ResultCode:
"""
Stop tracking.
:return: a result code.
"""
assert self._proxy is not None
return self._proxy.StopTrackingAll()
@check_communicating
def set_csp_ingest(self: _StationProxy, argin: str) -> ResultCode:
"""
Configure link for beam data packets to CSP.
:param argin: json dictionary with optional keywords:
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetCspIngest(argin)
return result_code
@check_communicating
def reset_csp_ingest(self: _StationProxy) -> tuple[list[ResultCode], list[str]]:
"""
Reset link for beam data packets to CSP to defaults.
:return: result code of ResetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.ResetCspIngest()
class _SubarrayBeamProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its subarray beams."""
# pylint: disable=too-many-arguments
def __init__(
self: _SubarrayBeamProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
qa_metric_callback: Callable[[str], None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
self._qa_metric_callback = qa_metric_callback
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
event_serialiser=event_serialiser,
)
@check_communicating
@check_on
def configure(
self: _SubarrayBeamProxy, configuration: dict
) -> tuple[TaskStatus, str]:
"""
Configure the subarray beam.
:param configuration: the configuration to be applied to this
subarray beam
:return: A task status and response message.
"""
assert self._proxy is not None
configuration_str = json.dumps(configuration)
(result_code, unique_id) = self._proxy.Configure(configuration_str)
return (result_code, unique_id)
def get_change_event_callbacks(self: _SubarrayBeamProxy) -> dict[str, Callable]:
return {
**super().get_change_event_callbacks(),
"qualityAssuranceMetrics": self._qa_changed,
}
def _qa_changed(
self: _SubarrayBeamProxy,
attr_name: str,
attr_value: str,
attr_quality: AttrQuality,
) -> None:
if attr_name.lower() != "qualityassurancemetrics":
self._logger.warning(
f"_qa_changed callback called by unexpected callback {attr_name}."
)
return
self._qa_metric_callback(attr_value)
@check_communicating
@check_on
def scan(
self: _SubarrayBeamProxy,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param scan_id: the id of the scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def release_all(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Release subarray beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ReleaseAllResources()
return (result_code, unique_id)
@check_communicating
@check_on
def deconfigure(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
De-configure the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.End()
return (result_code, unique_id)
@check_communicating
def end_scan(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
End the current scan.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.EndScan()
return (result_code, unique_id)
@check_communicating
def abort(
self: _SubarrayBeamProxy, task_callback: TaskCallbackType | None = None
) -> tuple[TaskStatus, str]:
"""
Abort the current scan associated with the subarray beam.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.Abort()
return (result_code, unique_id)
@check_communicating
def obsreset(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Reset to IDLE the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ObsReset()
return (result_code, unique_id)
@check_communicating
def restart(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Restart to EMPTY the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.Restart()
return (result_code, unique_id)
class _StationBeamProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its station beams."""
@check_communicating
@check_on
def configure(
self: _StationBeamProxy, configuration: dict
) -> tuple[TaskStatus, str]:
"""
Configure the station beam.
:param configuration: the configuration to be applied to this
station beam
:return: A task status and response message.
"""
assert self._proxy is not None
configuration_str = json.dumps(configuration)
(result_code, unique_id) = self._proxy.Configure(configuration_str)
return (result_code, unique_id)
@check_communicating
@check_on
def scan(
self: _StationBeamProxy,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param scan_id: the id of the scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def deconfigure(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Configure the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.End()
return (result_code, unique_id)
@check_communicating
@check_on
def release_all(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Release station beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ReleaseAllResources()
return (result_code, unique_id)
@check_communicating
def end_scan(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Release station beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.EndScan()
return (result_code, unique_id)
# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs]
class SubarrayComponentManager(
TaskExecutorComponentManager,
ska_tango_base.subarray.SubarrayComponentManager,
):
"""A component manager for a subarray."""
# messages for long running command task callback
_task_message = {
TaskStatus.COMPLETED: "completed OK.",
TaskStatus.ABORTED: "been aborted.",
TaskStatus.FAILED: "timed out.",
}
_task_to_result = {
TaskStatus.COMPLETED: ResultCode.OK,
TaskStatus.ABORTED: ResultCode.ABORTED,
TaskStatus.FAILED: ResultCode.FAILED,
}
# pylint: disable=too-many-arguments
[docs]
def __init__(
self: SubarrayComponentManager,
subarray_id: int,
skuid_url: str,
logger: logging.Logger,
obs_command_timeout: int,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
default_solution_type: str = "fitted",
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
"""
Initialise a new instance.
:param subarray_id: the subarray ID for this device
:param skuid_url: The address at which a SKUID service is running.
:param obs_command_timeout: the default timeout for obs
commands in seconds.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be called when
the status of the communications channel between the
component manager and its component changes
:param component_state_callback: callback to be called when the
component state changes.
:param default_solution_type: default calibration solution type used
when an aperture in Configure does not specify solution_type.
:param event_serialiser: the event serialiser to be used by this object.
"""
self._event_serialiser = event_serialiser
self._power_state_lock = threading.RLock()
self._device_communication_state_lock = threading.Lock()
self._component_state_callback: Callable[..., None]
self._component_state_callback = component_state_callback
self._abort_complete = threading.Event()
self._obs_command_timeout = obs_command_timeout
self._device_communication_states: dict[str, CommunicationStatus] = {}
self._station_power_modes: dict[str, Optional[PowerState]] = {}
self._device_obs_states: dict[str, Optional[ObsState]] = {}
self._is_assigning = False
self._is_configured = False
self._configuring_resources: set[str] = set() # these resources should go
self._desired_state = ObsState.READY # to this end state
self._stations: dict[str, _StationProxy] = {}
self._subarray_beams: dict[str, _SubarrayBeamProxy] = {}
self._station_beams: dict[str, _StationBeamProxy] = {}
self._apertures: dict[int, dict[str, str]] = defaultdict(dict)
self._subarray_beam_trl: dict[int, str] = {}
self._trl_to_subarray_beam_id: dict[str, int] = {}
self._beam_to_station: dict[str, str] = {}
self._calibration_ids: dict[str, str] = {}
self._solution_types: dict[str, str] = {}
self._number_of_channels = 0
self._subarray_id = subarray_id
self._skuid_url = skuid_url
self._default_solution_type = default_solution_type
self._scan_id: Optional[int] = None
self._scan_start_time: str | None = None
self._scan_duration = 0.0
self.reset_csp_ingest_on_scan: bool = True
self._qa_metric_builder = QAMetricsBuilder(logger)
super().__init__(
logger,
communication_state_callback,
component_state_callback,
resources_changed=None,
configured_changed=None,
scanning_changed=None,
task_completed=None,
obsfault=None,
obsstate_changed=None,
station_power=None,
power=None,
fault=None,
qa_metrics=json.dumps(self._qa_metric_builder.build()),
)
self._communication_manager = CommunicationManager(
self._update_communication_state, self._update_component_state, self.logger
)
[docs]
def start_communicating(self: SubarrayComponentManager) -> None:
"""Establish communication with the station components."""
self._communication_manager.start_communicating()
[docs]
def stop_communicating(self: SubarrayComponentManager) -> None:
"""Break off communication with the station components."""
self._communication_manager.stop_communicating()
@property
def max_executing_tasks(self) -> int:
"""
Get the max number of tasks that can be executing at once.
:return: max number of simultaneously executing tasks.
"""
return 2
@property
def scan_id(self: SubarrayComponentManager) -> Optional[int]:
"""
Return the scan id, or None if a scan is not current.
:return: the scan id, or None if a scan is not current.
"""
return self._scan_id
@property
def subarray_beam_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of subarray beams assigned to this subarray.
:return: the set of TRLs of subarray beams assigned to this subarray.
"""
return set(self._subarray_beams.keys())
@property
def station_beam_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of station beams assigned to this subarray.
:return: the set of TRLs of station beams assigned to this subarray.
"""
return set(self._station_beams.keys())
@property
def station_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of stations assigned to this subarray.
:return: the set of TRLs of stations assigned to this subarray.
"""
return set(self._stations.keys())
@property
def power_state(self: SubarrayComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
def _check_aborted(
self: SubarrayComponentManager,
task_abort_event: Optional[threading.Event],
task_callback: Optional[Callable],
method_name: str,
) -> bool:
if task_abort_event is None or task_callback is None:
self.logger.warning(f"Cannot check {method_name} for abort status")
return False
if task_abort_event.is_set():
self.logger.info(f"{method_name} has been aborted")
task_callback(
status=TaskStatus.ABORTED,
result=(ResultCode.ABORTED, f"{method_name} aborted"),
)
return True
return False
# pylint: disable=too-many-branches, too-many-locals, too-many-statements
[docs]
@check_communicating
def do_assign(
self: SubarrayComponentManager,
subarray_id: int,
subarray_beams: dict,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Assign resources to this subarray.
This is just for communication and health roll-up, resource management
is done by controller.
:param subarray_id: ID of the subarray
:param subarray_beams: resource specification for each beam
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
self._is_assigning = True
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "do_assign"):
return
station_trls: list[str] = []
subarray_beam_trls: list[str] = []
station_beam_trls: list[str] = []
number_of_channels = 0
self._apertures.clear()
self._subarray_beam_trl.clear()
self._trl_to_subarray_beam_id.clear()
self._number_of_channels = 0
for beam in subarray_beams:
subarray_beam_id = beam.get("subarray_beam_id", 1)
subarray_beam_trl = beam.get(
"subarray_beam_trl",
f"low-mccs/subarraybeam/{subarray_id:02d}-{subarray_beam_id:02d}",
)
self._subarray_beam_trl[subarray_beam_id] = subarray_beam_trl
self._trl_to_subarray_beam_id[subarray_beam_trl] = subarray_beam_id
subarray_beam_trls.append(subarray_beam_trl)
number_of_channels += beam.get("number_of_channels")
for aperture in beam.get("apertures", []):
aperture_id = aperture.get("aperture_id", None)
station_id = aperture.get("station_id")
if aperture_id is None:
aperture_id = f"AP{station_id}.1"
station_trl = aperture.get("station_trl", None)
if station_trl is not None and station_trl not in station_trls:
station_trls.append(station_trl)
station_beam_trl = aperture.get("station_beam_trl", None)
station_beam_trls.append(station_beam_trl)
self._beam_to_station[station_beam_trl] = station_trl
self._apertures[subarray_beam_id][aperture_id] = station_beam_trl
# Define beam for qa_metrics.
self._qa_metric_builder.define_beam(
str(subarray_beam_id),
[str(k) for k in self._apertures],
)
if self._check_aborted(task_abort_event, task_callback, "do_assign"):
return
self._number_of_channels = number_of_channels
station_trls_to_add = sorted(station_trls) - self._stations.keys()
subarray_beam_trls_to_add = subarray_beam_trls - self._subarray_beams.keys()
station_beam_trls_to_add = station_beam_trls - self._station_beams.keys()
trls_to_add = station_trls_to_add.union(
subarray_beam_trls_to_add, station_beam_trls_to_add
)
self._desired_state = ObsState.IDLE
self._configuring_resources.clear()
if trls_to_add:
for trl in trls_to_add:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
self._device_obs_states[trl] = ObsState.EMPTY
if self._check_aborted(task_abort_event, task_callback, "do_assign"):
return
for trl in station_trls_to_add:
self._stations[trl] = _StationProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
event_serialiser=self._event_serialiser,
)
for trl in subarray_beam_trls_to_add:
self._subarray_beams[trl] = _SubarrayBeamProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(
self._component_state_callback,
trl=trl,
),
functools.partial(
self._qa_attribute_changed,
trl,
),
event_serialiser=self._event_serialiser,
)
self._configuring_resources.add(trl)
for trl in station_beam_trls_to_add:
self._station_beams[trl] = _StationBeamProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(
self._component_state_callback,
trl=trl,
),
event_serialiser=self._event_serialiser,
)
self._configuring_resources.add(trl)
self._communication_manager.replace_device_pool(
self._stations, self._station_beams, self._subarray_beams
)
self._is_assigning = True
self._component_state_callback(
resources_changed=[
set(self._stations.keys()),
set(self._subarray_beams.keys()),
set(self._station_beams.keys()),
]
)
if self._check_aborted(task_abort_event, task_callback, "do_assign"):
return
# wait for all subdevices to reach ObsState.IDLE
task_result = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "do_assign"):
return
self._is_assigning = False
self._is_configured = False
if task_callback is not None:
task_callback(
status=task_result,
result=(
self._task_to_result[task_result],
"Assign resources " + self._task_message[task_result],
),
)
def _qa_attribute_changed(
self: SubarrayComponentManager,
trl: str,
qa_metric: str,
) -> None:
subarray_beam_id: int = self._trl_to_subarray_beam_id[trl]
loaded_qa_metric = json.loads(qa_metric) if qa_metric else {}
self._qa_metric_builder.update_beam_metrics(
str(subarray_beam_id), loaded_qa_metric
)
processed_metrics = self._qa_metric_builder.build()
self._update_component_state(qa_metrics=json.dumps(processed_metrics))
@property # type: ignore[misc]
@check_communicating
def assigned_resources(
self: SubarrayComponentManager,
) -> list[str]:
"""
Return this subarray's resources.
:return: this subarray's resources.
"""
return list(
set(self._stations) | set(self._subarray_beams) | set(self._station_beams),
)
@property # type: ignore[misc]
@check_communicating
def assigned_resources_dict(
self: SubarrayComponentManager,
) -> dict[str, Sequence[Any]]:
"""
Return a dictionary of resource types and TRLs.
:return: this subarray's resources.
"""
return {
"stations": sorted(self._stations.keys()),
"subarray_beams": sorted(self._subarray_beams.keys()),
"station_beams": sorted(self._station_beams.keys()),
"apertures": sorted(
[
aperture_id
for apertures in self._apertures.values()
for aperture_id in apertures
]
),
"channels": [self._number_of_channels],
}
[docs]
@check_communicating
def release( # type: ignore[override] # pylint: disable=arguments-differ
self: SubarrayComponentManager,
beam_resources: dict[str, list[str]],
task_callback: Optional[Callable],
task_abort_event: Optional[threading.Event],
) -> None:
"""
Release resources from this subarray.
:param beam_resources: list of resource TRLs to release.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:raises NotImplementedError: because MCCS Subarray cannot
perform a partial release of beam_resources.
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "release"):
return
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.OK,
(
"Not Implemented: MCCS Subarray cannot"
"partially release resources."
),
),
)
raise NotImplementedError("MCCS Subarray cannot partially release resources.")
[docs]
@check_communicating
def release_all( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Release all resources from this subarray.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "release_all"):
return
self._desired_state = ObsState.EMPTY
self._configuring_resources.clear()
# Release resources in subelements, and check return codes
self.logger.debug("Deallocating resources in stations")
for station_proxy in self._stations.values():
station_proxy.deallocate_subarray(self._subarray_id)
self.logger.debug("Deallocating resources in subarray beams")
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
subarray_beam_proxy.release_all()
if self._check_aborted(task_abort_event, task_callback, "release_all"):
return
# Wait for beams to go to EMPTY
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "release_all"):
return
self._release_internal_resources()
if task_status == TaskStatus.FAILED:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"ReleaseAllResources has " + self._task_message[task_status],
),
)
[docs]
def cleanup(self: SubarrayComponentManager) -> None:
"""
Cleanup resources held by the component manager.
This includes cleaning up resources held by all sub-component managers.
"""
self._delete_proxies()
self._communication_manager.shutdown()
super().cleanup()
def _delete_proxies(self) -> None:
self.logger.debug("Deleting proxies")
self._device_communication_states.clear()
self._communication_manager.replace_device_pool()
for station_proxy in self._stations.values():
station_proxy.cleanup()
self._stations.clear()
for subarray_beam_proxy in self._subarray_beams.values():
subarray_beam_proxy.cleanup()
self._subarray_beams.clear()
for station_beam_proxy in self._station_beams.values():
station_beam_proxy.cleanup()
self._station_beams.clear()
def _release_internal_resources(self) -> None:
self.logger.debug("Deallocating resources")
if self._stations or self._subarray_beams or self._station_beams:
self._delete_proxies()
self._device_obs_states.clear()
self._apertures.clear()
self._subarray_beam_trl.clear()
self._trl_to_subarray_beam_id.clear()
self._qa_metric_builder.reset()
processed_metrics = self._qa_metric_builder.build()
self._update_component_state(qa_metrics=json.dumps(processed_metrics))
self._beam_to_station.clear()
self._calibration_ids.clear()
self._solution_types.clear()
self._number_of_channels = 0
self._configuring_resources.clear()
self._component_state_callback(
resources_changed=[
set(self._stations.keys()),
set(self._subarray_beams.keys()),
set(self._station_beams.keys()),
]
)
[docs]
@check_communicating
def get_coords( # type: ignore[override]
self: SubarrayComponentManager,
subarray_beams: list[dict],
) -> list[dict]:
"""
Modify the coordinates to have correct pointing.
:param subarray_beams: subarray beams
:raises KeyError: coordinates are invalid
:return: The modified beams with correct coords
"""
# while we reformat the input, check for inconsitent values
subarray_beams_correct = True
error_message = ""
for beam in subarray_beams:
beam_id = beam.get("subarray_beam_id", None)
if beam_id:
sub_beam_trl = self._subarray_beam_trl.get(beam_id, None)
if sub_beam_trl is None:
subarray_beams_correct = False
error_message += (
f"\n{beam_id =} is not assigned to subarray {self._subarray_id}"
)
apertures = beam.get("apertures", None)
if apertures:
for aperture in apertures:
aperture_id = aperture.get("aperture_id", None)
apertures_for_beam = (
self._apertures.get(beam_id, {}) if beam_id is not None else {}
)
station_beam_trl = apertures_for_beam.get(aperture_id, None)
if station_beam_trl is None:
subarray_beams_correct = False
error_message += (
f"\n{aperture_id =} is not assigned to subarray "
f"{self._subarray_id}"
)
self.logger.debug(f"Configure received, beam: {beam}")
sky_coordinates = beam.pop("sky_coordinates", None)
if sky_coordinates:
field = {
key: val
for key, val in sky_coordinates.items()
if key in ["timestamp", "reference_frame", "target_name"]
}
if sky_coordinates.get("reference_frame").casefold() in [
"topocentric",
"altaz",
"icrs",
"galactic",
]:
field["attrs"] = {
key: val
for key, val in sky_coordinates.items()
if key in ["c1", "c1_rate", "c2", "c2_rate"]
}
field.setdefault("target_name", "No name provided")
if sky_coordinates.get("reference_frame").casefold() == "tle":
field["attrs"] = {
key: val
for key, val in sky_coordinates.items()
if key in ["line1", "line2"]
}
field.setdefault("target_name", "No name provided")
beam["field"] = field
field = beam["field"]
if field.get("reference_frame").casefold() == "tle":
line1 = field["attrs"]["line1"]
line2 = field["attrs"]["line2"]
try:
katpoint.Target(f"tle, {line1}, {line2}")
except ValueError as err:
self.logger.error(f"tle format incorrect: {err}")
error_message += f"tle format incorrect: {err}"
subarray_beams_correct = False
if not subarray_beams_correct:
raise KeyError(error_message)
return subarray_beams
# pylint: disable=too-many-return-statements
def _configure_stations(
self: SubarrayComponentManager,
transaction_id: str,
task_abort_event: Optional[threading.Event],
) -> tuple[ResultCode, str]:
"""
Configure the station resources for a scan.
:param transaction_id: the transaction id for the configuration
:param task_abort_event: abort event to cancel waiting for result.
:return: a tuple containing a result code and a json serialised string
"""
apply_configuration_commands = MccsCompositeCommandProxy(
self.logger, task_abort_event=task_abort_event
)
for station_trl in self._stations:
argin: dict[str, Any] = {
"transaction_id": transaction_id,
"subarray_id": self._subarray_id,
"solution_type": self._solution_types.get(
station_trl, self._default_solution_type
),
}
if self._calibration_ids[station_trl]:
argin["calibration_id"] = self._calibration_ids[station_trl]
apply_configuration_commands += MccsCommandProxy(
device_name=station_trl,
command_name="ApplyConfiguration",
logger=self.logger,
default_args=json.dumps(argin),
)
result, message = apply_configuration_commands(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result), message
def _configure_subarray_beams(
self: SubarrayComponentManager,
subarray_beam_configuration: dict[str, Any],
task_abort_event: Optional[threading.Event],
) -> tuple[ResultCode, str]:
"""
Configure the station beam resources for a scan.
:param subarray_beam_configuration: the subarray beam configuration
to be applied
:param task_abort_event: abort event to cancel waiting for result.
:return: a tuple containing a result code and json serialised string.
"""
self._configuring_resources.clear()
self._desired_state = ObsState.READY
configure_subarraybeams_command = MccsCompositeCommandProxy(
self.logger, task_abort_event=task_abort_event
)
for (
subarray_beam_trl,
configuration,
) in subarray_beam_configuration.items():
configure_subarraybeams_command += MccsCommandProxy(
device_name=subarray_beam_trl,
command_name="Configure",
logger=self.logger,
default_args=json.dumps(configuration),
)
self._configuring_resources.add(subarray_beam_trl)
result_code, message = configure_subarraybeams_command(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
if result_code != ResultCode.OK:
obj = json.loads(message)
pretty = json.dumps(obj, indent=4)
self.logger.error("Failed to configure subarrays with:\n%s", pretty)
return ResultCode(result_code), message
[docs]
@check_communicating
# pylint: disable-next=arguments-differ,arguments-renamed
def scan( # type: ignore[override]
self: SubarrayComponentManager,
scan_id: int = 0,
start_time: Optional[str] = None,
duration: float = 0.0,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Start scanning.
:param scan_id: the id of the scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
self._scan_id = scan_id
self._scan_start_time = start_time
self._scan_duration = duration
msg = ""
failure_message = ""
if self._check_aborted(task_abort_event, task_callback, "scan"):
return
if self.reset_csp_ingest_on_scan:
# Reset CSPIngest to default.
self.logger.debug(
"Resetting CSPIngest for all stations allocated to this Subarray."
)
csp_reset_result = []
for trl, station in self._stations.items():
[rc], [_] = station.reset_csp_ingest()
if rc != ResultCode.OK:
csp_reset_result.append(trl)
if csp_reset_result:
msg = (
"Scan has failed to reset CSPIngest on "
f"stations: {csp_reset_result}"
)
self.logger.error(msg)
self._desired_state = ObsState.SCANNING
self._configuring_resources.clear()
task_status = TaskStatus.COMPLETED # if everything below does not fail
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code = subarray_beam_proxy.scan(scan_id, start_time, duration)
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# wait for beams to start
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "scan"):
return
# Start scan on stations
if task_status == TaskStatus.COMPLETED:
result_code, message = self.__start_station_scan(
subarray_id=self._subarray_id,
scan_id=scan_id,
start_time=start_time,
duration=duration,
)
self.logger.warning(message)
if result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
failure_message = message
if self._check_aborted(task_abort_event, task_callback, "scan"):
return
if task_status == TaskStatus.COMPLETED:
self._component_state_callback(scanning_changed=True)
else:
self._component_state_callback(obsfault=True)
if task_callback is not None:
result_message = (
failure_message
if task_status == TaskStatus.FAILED and failure_message
else "Scan has " + self._task_message[task_status]
)
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
result_message + (f" {msg}" if msg else ""),
),
)
def __start_station_scan(
self: SubarrayComponentManager,
subarray_id: int,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[ResultCode, str]:
scan_arg: dict[str, int | float | str] = {
"subarray_id": subarray_id,
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
start_scan_command = MccsCompositeCommandProxy(self.logger)
for station_trl in self._stations:
start_scan_command += MccsCommandProxy(
station_trl,
"Scan",
self.logger,
default_args=json.dumps(scan_arg),
)
result, message = start_scan_command(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result), message
def __end_station_scan(
self: SubarrayComponentManager,
subarray_id: int,
) -> tuple[ResultCode, str]:
end_scan_command = MccsCompositeCommandProxy(self.logger)
for station_trl in self._stations:
end_scan_command += MccsCommandProxy(
station_trl,
"EndScan",
self.logger,
default_args=subarray_id,
)
result, message = end_scan_command(
command_evaluator=CompositeCommandResultEvaluator(),
timeout=self._obs_command_timeout,
)
return ResultCode(result), message
[docs]
def end_scan( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Execute the `end_scan` slow command.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "end_scan"):
return
self._scan_id = None
self._scan_start_time = None
self._scan_duration = 0.0
# Stuff goes here. This should tell the subarray beam device to
# stop scanning.
task_status = TaskStatus.COMPLETED
for subarray_beam_proxy in self._subarray_beams.values():
proxy_result_code = subarray_beam_proxy.end_scan()
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# Start scan on stations
if task_status == TaskStatus.COMPLETED:
result_code, message = self.__end_station_scan(
subarray_id=self._subarray_id,
)
self.logger.warning(message)
if result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
# check that beamformer has stopped in all stations
if self._wait_for_end_scan():
self.logger.info("Beamformer stopped")
else:
self.logger.error("Beamformer still running after 3 seconds")
task_status = TaskStatus.FAILED
if self._check_aborted(task_abort_event, task_callback, "end_scan"):
return
if task_status == TaskStatus.COMPLETED:
self._component_state_callback(scanning_changed=False)
else:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"EndScan has " + self._task_message[task_status],
),
)
@check_communicating
def _deconfigure_subelements(
self: SubarrayComponentManager,
) -> TaskStatus:
"""
Deconfigure resources.
:return: result code
"""
self.logger.debug("deconfigure subelements invoked")
task_status = TaskStatus.COMPLETED
for trl, station_proxy in self._stations.items():
self.logger.debug(f"deconfigure station {trl}")
proxy_result_code, _ = station_proxy.deallocate_subarray(self._subarray_id)
if proxy_result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
#
# Set all station beams to no logical bands, point statically at zenith
#
for trl, subarray_beam_proxy in self._subarray_beams.items():
self.logger.debug(f"deconfigure subarray beam {trl}")
self._configuring_resources.add(trl)
proxy_task_status, response = subarray_beam_proxy.deconfigure()
if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED):
task_status = TaskStatus.FAILED
self._is_configured = False
return task_status
def _abort_callback(
self: SubarrayComponentManager,
*,
status: Optional[TaskStatus] = None,
exception: Optional[Exception] = None,
**kwargs: Any,
) -> None:
if exception is not None:
self.logger.error(f"abort_tasks raised exception: {repr(exception)}")
if status == TaskStatus.COMPLETED:
self.logger.debug("abort_tasks has finished, setting flag")
self._abort_complete.set()
[docs]
@check_communicating
def abort( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> tuple[TaskStatus, str]:
"""
Abort the observation.
:param task_callback: callback to be called when the status of
the command changes
:param task_abort_event: Check for abort, defaults to None
:return: A task status and response message.
"""
# We have to spin up a separate thread such that we skip the queue which
# we would append to if we used submit_task
threading.Thread(
target=self._abort,
kwargs={
"task_callback": task_callback,
"task_abort_event": task_abort_event,
},
).start()
return (TaskStatus.IN_PROGRESS, "Abort has started")
[docs]
def abort_device( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> Any:
"""
Abort only this device, for use in RestartSubarray().
:param task_callback: callback to be called when the status of
the command changes
:param task_abort_event: Check for abort, defaults to None
:return: A task status and response message.
"""
# We have to spin up a separate thread such that we skip the queue which
# we would append to if we used submit_task
threading.Thread(
target=self._abort,
kwargs={
"cascade": False,
"task_callback": task_callback,
"task_abort_event": task_abort_event,
},
).start()
return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
cascade: bool = True,
) -> None:
"""
Abort command execution.
:param cascade: whether or not to abort sub-devices.
:param task_callback: Update task state, defaults to None.
:param task_abort_event: Check for abort, defaults to None
"""
with EnsureOmniThread():
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
task_status = TaskStatus.COMPLETED
# This is sent in a thread, we record progress using the _abort_callback,
# which will set an Event once the task is complete.
self.abort_tasks(task_callback=self._abort_callback) # type:ignore
# Wait until the timeout for the Event to be set by abort_tasks
timeout = 10.0
if self._abort_complete.wait(timeout=timeout):
self._abort_complete.clear()
else:
self.logger.error(f"Abort timed out in {timeout} seconds.")
task_status = TaskStatus.FAILED
# Reinitialize our event. The old one could get set at any point now.
self._abort_complete = threading.Event()
# Abort sub-devices
if cascade:
if task_status == TaskStatus.COMPLETED:
self._desired_state = ObsState.ABORTED
self._configuring_resources.clear()
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code, response = subarray_beam_proxy.abort()
self.logger.debug(
f"Aborting {trl} has {ResultCode(proxy_result_code).name}"
)
if proxy_result_code in (
ResultCode.FAILED,
ResultCode.REJECTED,
ResultCode.NOT_ALLOWED,
):
task_status = TaskStatus.FAILED
break
# Wait until subdevices reach ObsState.ABORTED
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(timeout=10.0)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Abort command has " + self._task_message[task_status],
),
)
@check_communicating
def _wait_for_end_scan(self: SubarrayComponentManager) -> bool:
"""
Wait for the scan to have stopped.
:return: True if OK, False if timed out
"""
timeout = 3.0 # seconds
while timeout > 0.0:
time.sleep(0.1)
timeout -= 0.1
if all(
station_proxy.beamformer_running(self._subarray_id) is False
for station_proxy in self._stations.values()
):
return True
return False
[docs]
@check_communicating
def obsreset( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Reset the observation by returning to unconfigured state.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "obsreset"):
return
task_status = TaskStatus.COMPLETED
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code = subarray_beam_proxy.obsreset()
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"ObsReset command has " + self._task_message[task_status],
),
)
[docs]
@check_communicating
def restart( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Restart the subarray by returning to unresourced state.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "restart"):
return
task_status = TaskStatus.COMPLETED
for trl, station_proxy in self._stations.items():
proxy_result_code, _ = station_proxy.deallocate_subarray(self._subarray_id)
if proxy_result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
# Scanning related
self._scan_id = None
self._scan_start_time = None
self._scan_duration = 0.0
# Configure related
self._is_configured = False
# Allocate related
self._release_internal_resources()
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Restart command has " + self._task_message[task_status],
),
)
[docs]
@check_communicating
def send_transient_buffer(
self: SubarrayComponentManager,
argin: list[int],
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Send the transient buffer.
:param argin: list of list of requested segment.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
self.logger.info("Command currently has no implementation.")
# do stuff here
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "send_transient_buffer command completed."),
)
def _device_communication_state_changed(
self: SubarrayComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
self._communication_manager.update_communication_status(
trl, communication_state
)
def _station_power_state_changed(
self: SubarrayComponentManager,
trl: str,
power_mode: PowerState,
) -> None:
self._station_power_modes[trl] = power_mode
if self._is_assigning and all(
power_mode is not None for power_mode in self._station_power_modes.values()
):
self._is_assigning = False
def _wait_for_obs_state(
self: SubarrayComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-device ObsState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
ticks = int(timeout / 0.01) # 10 ms resolution
while self._configuring_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(0.01)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for ObsState {self._desired_state.name}"
f" resources {self._configuring_resources} "
f"failed to transition in {timeout} seconds. Attempting final poll."
)
return self._final_poll()
self.logger.debug(
f"Waited ObsState {self._desired_state.name} for"
f" {(timeout-ticks*0.01):.3f} seconds"
)
return TaskStatus.COMPLETED
def _device_obs_state_changed(
self: SubarrayComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
old_state = self._device_obs_states.get(trl, None)
if old_state:
old_name = old_state.name
else:
old_name = "NONE"
new_state = ObsState(obs_state)
self._device_obs_states[trl] = new_state
if obs_state == self._desired_state and trl in self._configuring_resources:
self._configuring_resources.remove(trl)
# if not self._configuring_resources:
# self._component_state_callback(task_completed="configure")
self.logger.debug(
f"ObsState for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._configuring_resources)} more devices"
)
def _device_obs_state_fault(
self: SubarrayComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
if obs_state != ObsState.FAULT:
self.logger.warning(
"_device_obs_state_fault called with"
f" incorrect ObsState {trl} {obs_state.name}"
)
return
self.logger.error(f"{trl} moved to FAULT.")
self._device_obs_states[trl] = ObsState(obs_state)
self._component_state_callback(obsfault=True)
def _final_poll(self: SubarrayComponentManager) -> TaskStatus:
"""
Direct check of device obsstates as a last attempt before declaring failure.
Sometimes it appears we miss change events in production, if for some reason
we haven't received all the change events we expected, double check what we
think the obsstate is, vs what it actually is on all subdevices that we need to.
:returns: a taskstatus dependent on whether or not the device was actually in
the correct state.
"""
for trl in list(self._configuring_resources):
device = (
self._station_beams.get(trl)
or self._subarray_beams.get(trl)
or self._stations.get(trl)
)
if (
device
and device._proxy
and device._proxy.obsstate == self._desired_state
):
self.logger.info(
f"Change event for {trl} was missed, device was in correct state."
)
self._component_state_callback(missed_event=True)
self._configuring_resources.remove(trl)
return (
TaskStatus.COMPLETED
if not self._configuring_resources
else TaskStatus.FAILED
)
def _get_unique_id(self: SubarrayComponentManager) -> str:
"""
Get a UID.
Retrieves a unique ID from a running SKUID service if available
otherwise returns a locally unique ID.
:return: A unique ID.
"""
if self._skuid_url:
client = SkuidClient(self._skuid_url)
# TODO: hanging on client.fetch_transaction_id()
# but this is a weak test anyhow:
# we are passing a nonexistent URL,
# which makes the client fail to fetch a transaction ID,
# and fall back to generating one locally.
# So for now I have switched over to
# generating locally in the first place.
return client.get_local_transaction_id()
# return client.fetch_transaction_id()
return SkuidClient.get_local_transaction_id()