# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
# pylint: disable = too-many-lines
"""This module implements component management for subarrays."""
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from typing import Any, Callable, Optional, Sequence
import ska_tango_base.subarray
from ska_control_model import (
CommunicationStatus,
ObsState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_low_mccs_common.component import ObsDeviceComponentManager
from ska_low_mccs_common.component.command_proxy import MccsCommandProxy
from ska_low_mccs_common.component.composite_command_proxy import (
CompositeCommandResultEvaluator,
MccsCompositeCommandProxy,
pretty_format,
)
from ska_ser_skuid.client import SkuidClient # type: ignore
from ska_tango_base.base import TaskCallbackType, check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
__all__ = ["SubarrayComponentManager"]
class _StationProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its stations."""
# TODO: remove comments when commands implemented in station
@check_communicating
@check_on
def deallocate_subarray(
self: _StationProxy, subarray_id: int
) -> tuple[TaskStatus, str]:
"""
Deconfigure the station.
:param subarray_id: the subarray which configuration must be removed
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.DeallocateSubarray(subarray_id)
return (result_code, unique_id)
@check_communicating
@check_on
def scan(
self: _StationProxy,
subarray_id: int,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param subarray_id: the subarray ID which scan is started
:param scan_id: the id of the scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
self.logger.debug(f"Scan command issued to station for scan {scan_id}")
scan_arg: dict[str, int | float | str] = {
"subarray_id": subarray_id,
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def end_scan(self: _StationProxy, subarray_id: int) -> tuple[TaskStatus, str]:
"""
Stop the subarray beam scanning.
:param subarray_id: the subarray ID which scan is stopped
:return: A task status and response message.
"""
assert self._proxy is not None
([result_code], unique_id) = self._proxy.EndScan(subarray_id)
return (result_code, unique_id)
class _SubarrayBeamProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its subarray beams."""
@check_communicating
@check_on
def configure(
self: _SubarrayBeamProxy, configuration: dict
) -> tuple[TaskStatus, str]:
"""
Configure the subarray beam.
:param configuration: the configuration to be applied to this
subarray beam
:return: A task status and response message.
"""
assert self._proxy is not None
configuration_str = json.dumps(configuration)
(result_code, unique_id) = self._proxy.Configure(configuration_str)
return (result_code, unique_id)
@check_communicating
@check_on
def scan(
self: _SubarrayBeamProxy,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param scan_id: the id of the scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def release_all(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Release subarray beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ReleaseAllResources()
return (result_code, unique_id)
@check_communicating
@check_on
def deconfigure(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
De-configure the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.End()
return (result_code, unique_id)
@check_communicating
@check_on
def end_scan(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
End the current scan.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.EndScan()
return (result_code, unique_id)
@check_communicating
@check_on
def abort(
self: _SubarrayBeamProxy, task_callback: TaskCallbackType | None = None
) -> tuple[TaskStatus, str]:
"""
Abort the current scan associated with the subarray beam.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.Abort()
return (result_code, unique_id)
@check_communicating
@check_on
def obsreset(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Reset to IDLE the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ObsReset()
return (result_code, unique_id)
@check_communicating
@check_on
def restart(self: _SubarrayBeamProxy) -> tuple[TaskStatus, str]:
"""
Restart to EMPTY the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.Restart()
return (result_code, unique_id)
class _StationBeamProxy(ObsDeviceComponentManager):
"""A subarray's proxy to its station beams."""
@check_communicating
@check_on
def configure(
self: _StationBeamProxy, configuration: dict
) -> tuple[TaskStatus, str]:
"""
Configure the station beam.
:param configuration: the configuration to be applied to this
station beam
:return: A task status and response message.
"""
assert self._proxy is not None
configuration_str = json.dumps(configuration)
(result_code, unique_id) = self._proxy.Configure(configuration_str)
return (result_code, unique_id)
@check_communicating
@check_on
def scan(
self: _StationBeamProxy,
scan_id: int,
start_time: Optional[str],
duration: float,
) -> tuple[TaskStatus, str]:
"""
Start the subarray beam scanning.
:param scan_id: the id of the scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A task status and response message.
"""
assert self._proxy is not None
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], unique_id) = self._proxy.Scan(json.dumps(scan_arg))
return (result_code, unique_id)
@check_communicating
@check_on
def deconfigure(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Configure the station beam.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.End()
return (result_code, unique_id)
@check_communicating
@check_on
def release_all(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Release station beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.ReleaseAllResources()
return (result_code, unique_id)
@check_communicating
@check_on
def end_scan(self: _StationBeamProxy) -> tuple[TaskStatus, str]:
"""
Release station beam resources.
:return: A task status and response message.
"""
assert self._proxy is not None
(result_code, unique_id) = self._proxy.EndScan()
return (result_code, unique_id)
# pylint: disable=too-many-instance-attributes, too-many-public-methods
[docs]class SubarrayComponentManager(
TaskExecutorComponentManager,
ska_tango_base.subarray.SubarrayComponentManager,
):
"""A component manager for a subarray."""
# messages for long running command task callback
_task_message = {
TaskStatus.COMPLETED: "completed OK.",
TaskStatus.ABORTED: "been aborted.",
TaskStatus.FAILED: "timed out.",
}
_task_to_result = {
TaskStatus.COMPLETED: ResultCode.OK,
TaskStatus.ABORTED: ResultCode.ABORTED,
TaskStatus.FAILED: ResultCode.FAILED,
}
# pylint: disable=too-many-arguments
[docs] def __init__(
self: SubarrayComponentManager,
subarray_id: int,
skuid_url: str,
logger: logging.Logger,
obs_command_timeout: int,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a new instance.
:param subarray_id: the subarray ID for this device
:param skuid_url: The address at which a SKUID service is running.
:param obs_command_timeout: the default timeout for obs
commands in seconds.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be called when
the status of the communications channel between the
component manager and its component changes
:param component_state_callback: callback to be called when the
component state changes.
"""
self._power_state_lock = threading.RLock()
self._component_state_callback: Callable[..., None]
self._component_state_callback = component_state_callback
self._abort_complete = threading.Event()
self._obs_command_timeout = obs_command_timeout
self._device_communication_states: dict[str, CommunicationStatus] = {}
self._station_power_modes: dict[str, Optional[PowerState]] = {}
self._device_obs_states: dict[str, Optional[ObsState]] = {}
self._is_assigning = False
self._is_configured = False
self._configuring_resources: set[str] = set() # these resources should go
self._desired_state = ObsState.READY # to this end state
self._stations: dict[str, _StationProxy] = {}
self._subarray_beams: dict[str, _SubarrayBeamProxy] = {}
self._station_beams: dict[str, _StationBeamProxy] = {}
self._apertures: dict[str, str] = {}
self._subarray_beam_trl: dict[int, str] = {}
self._number_of_channels = 0
self._subarray_id = subarray_id
self._skuid_url = skuid_url
self._scan_id: Optional[int] = None
self._scan_start_time: str | None = None
self._scan_duration = 0.0
super().__init__(
logger,
communication_state_callback,
component_state_callback,
resources_changed=None,
configured_changed=None,
scanning_changed=None,
task_completed=None,
obsfault=None,
obsstate_changed=None,
station_power=None,
power=None,
fault=None,
)
[docs] def start_communicating(self: SubarrayComponentManager) -> None:
"""Establish communication with the station components."""
if self.communication_state == CommunicationStatus.ESTABLISHED:
return
if self.communication_state == CommunicationStatus.DISABLED:
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
if self._stations or self._subarray_beams or self._station_beams:
for station_proxy in self._stations.values():
station_proxy.start_communicating()
for subarray_beam_proxy in self._subarray_beams.values():
subarray_beam_proxy.start_communicating()
for station_beam_proxy in self._station_beams.values():
station_beam_proxy.start_communicating()
else:
self._update_communication_state(CommunicationStatus.ESTABLISHED)
with self._power_state_lock:
self._update_component_state(power=PowerState.ON)
[docs] def stop_communicating(self: SubarrayComponentManager) -> None:
"""Break off communication with the station components."""
if self.communication_state == CommunicationStatus.DISABLED:
return
for station in self._stations.values():
station.stop_communicating()
for subarray_beam in self._subarray_beams.values():
subarray_beam.stop_communicating()
for station_beam in self._station_beams.values():
station_beam.stop_communicating()
self._update_communication_state(CommunicationStatus.DISABLED)
self._update_component_state(power=None, fault=None)
@property
def max_executing_tasks(self) -> int:
"""
Get the max number of tasks that can be executing at once.
:return: max number of simultaneously executing tasks.
"""
return 2
@property
def scan_id(self: SubarrayComponentManager) -> Optional[int]:
"""
Return the scan id, or None if a scan is not current.
:return: the scan id, or None if a scan is not current.
"""
return self._scan_id
@property
def subarray_beam_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of subarray beams assigned to this subarray.
:return: the set of TRLs of subarray beams assigned to this subarray.
"""
return set(self._subarray_beams.keys())
@property
def station_beam_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of station beams assigned to this subarray.
:return: the set of TRLs of station beams assigned to this subarray.
"""
return set(self._station_beams.keys())
@property
def station_trls(self: SubarrayComponentManager) -> set[str]:
"""
Return the set of TRLs of stations assigned to this subarray.
:return: the set of TRLs of stations assigned to this subarray.
"""
return set(self._stations.keys())
@property
def power_state(self: SubarrayComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
def _check_aborted(
self: SubarrayComponentManager,
task_abort_event: Optional[threading.Event],
task_callback: Optional[Callable],
method_name: str,
) -> bool:
if task_abort_event is None or task_callback is None:
self.logger.warning(f"Cannot check {method_name} for abort status")
return False
if task_abort_event.is_set():
self.logger.info(f"{method_name} has been aborted")
task_callback(
status=TaskStatus.ABORTED,
result=(ResultCode.ABORTED, f"{method_name} aborted"),
)
return True
return False
[docs] @check_communicating
def assign( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
**kwargs: Any,
) -> tuple[TaskStatus, str]:
"""
Submit the `AssignResources` slow command.
This method returns immediately after it is submitted for execution.
:param task_callback: Update task state, defaults to None
:param kwargs: keyword arguments to the command.
Required keys are "subarray_id" (the ID of the subarray),
and subarray_beams (resource specification for each beam).
:return: a result code and response message.
"""
self._is_assigning = True
return self.submit_task(
self._assign,
args=[kwargs["subarray_id"], kwargs["subarray_beams"]],
task_callback=task_callback,
)
# pylint: disable=too-many-branches, too-many-locals, too-many-statements
@check_communicating
def _assign(
self: SubarrayComponentManager,
# resources: dict[str, Any],
subarray_id: int,
subarray_beams: dict,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Assign resources to this subarray.
This is just for communication and health roll-up, resource management
is done by controller.
:param subarray_id: ID of the subarray
:param subarray_beams: resource specification for each beam
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
station_trls: list[str] = []
subarray_beam_trls: list[str] = []
station_beam_trls: list[str] = []
number_of_channels = 0
self._apertures.clear()
self._subarray_beam_trl.clear()
self._number_of_channels = 0
for beam in subarray_beams:
subarray_beam_id = beam.get("subarray_beam_id", 1)
subarray_beam_trl = beam.get(
"subarray_beam_trl",
f"low-mccs/subarraybeam/{subarray_beam_id:02d}",
)
self._subarray_beam_trl[subarray_beam_id] = subarray_beam_trl
subarray_beam_trls.append(subarray_beam_trl)
number_of_channels += beam.get("number_of_channels")
for aperture in beam.get("apertures", []):
aperture_id = aperture.get("aperture_id", None)
station_id = aperture.get("station_id")
if aperture_id is None:
aperture_id = f"AP{station_id}.1"
station_trl = aperture.get("station_trl", None)
if station_trl is not None and station_trl not in station_trls:
station_trls.append(station_trl)
station_beam_trl = aperture.get("station_beam_trl", None)
station_beam_trls.append(station_beam_trl)
self._apertures[aperture_id] = station_beam_trl
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
self._number_of_channels = number_of_channels
station_trls_to_add = sorted(station_trls) - self._stations.keys()
subarray_beam_trls_to_add = subarray_beam_trls - self._subarray_beams.keys()
station_beam_trls_to_add = station_beam_trls - self._station_beams.keys()
trls_to_add = station_trls_to_add.union(
subarray_beam_trls_to_add, station_beam_trls_to_add
)
self._desired_state = ObsState.IDLE
self._configuring_resources.clear()
if trls_to_add:
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
for trl in trls_to_add:
self._device_communication_states[trl] = CommunicationStatus.DISABLED
self._device_obs_states[trl] = ObsState.EMPTY
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
self._evaluate_communication_state()
for trl in station_trls_to_add:
self._stations[trl] = _StationProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(self._component_state_callback, trl=trl),
)
for trl in subarray_beam_trls_to_add:
self._subarray_beams[trl] = _SubarrayBeamProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(
self._component_state_callback,
trl=trl,
),
)
for trl in station_beam_trls_to_add:
self._station_beams[trl] = _StationBeamProxy(
trl,
self.logger,
functools.partial(self._device_communication_state_changed, trl),
functools.partial(
self._component_state_callback,
trl=trl,
),
)
self._is_assigning = True
self._component_state_callback(
resources_changed=[
set(self._stations.keys()),
set(self._subarray_beams.keys()),
set(self._station_beams.keys()),
]
)
for trl in station_trls_to_add:
self._stations[trl].start_communicating()
for trl in subarray_beam_trls_to_add:
self._configuring_resources.add(trl)
self._subarray_beams[trl].start_communicating()
for trl in station_beam_trls_to_add:
self._configuring_resources.add(trl)
self._station_beams[trl].start_communicating()
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
# wait for all subdevices to reach ObsState.IDLE
task_result = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_assign"):
return
self._is_assigning = False
self._is_configured = False
if task_callback is not None:
task_callback(
status=task_result,
result=(
self._task_to_result[task_result],
"Assign resources " + self._task_message[task_result],
),
)
@property # type: ignore[misc]
@check_communicating
def assigned_resources(
self: SubarrayComponentManager,
) -> list[str]:
"""
Return this subarray's resources.
:return: this subarray's resources.
"""
return list(
set(self._stations) | set(self._subarray_beams) | set(self._station_beams),
)
@property # type: ignore[misc]
@check_communicating
def assigned_resources_dict(
self: SubarrayComponentManager,
) -> dict[str, Sequence[Any]]:
"""
Return a dictionary of resource types and TRLs.
:return: this subarray's resources.
"""
return {
"stations": sorted(self._stations.keys()),
"subarray_beams": sorted(self._subarray_beams.keys()),
"station_beams": sorted(self._station_beams.keys()),
"apertures": sorted(self._apertures.keys()),
"channels": [self._number_of_channels],
}
[docs] @check_communicating
def release( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
# *,
# resources: str,
**kwargs: Any,
) -> tuple[TaskStatus, str]:
"""
Submit the `release` slow command.
:param task_callback: Update task state, defaults to None
:param kwargs: keyword arguments to the command. The
only required key is "resources", which
contains a list of resource names to release
:return: A task status and response message.
"""
return self.submit_task(
self._release,
args=[kwargs["resources"]],
task_callback=task_callback,
)
@check_communicating
def _release(
self: SubarrayComponentManager,
resources: dict[str, list[str]],
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Release resources from this subarray.
:param resources: list of resource TRLs to release.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:raises NotImplementedError: because MCCS Subarray cannot
perform a partial release of resources.
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_release"):
return
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.OK,
(
"Not Implemented: MCCS Subarray cannot"
"partially release resources."
),
),
)
raise NotImplementedError("MCCS Subarray cannot partially release resources.")
[docs] @check_communicating
def release_all( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `ReleaseAllResources` slow command.
Release all resources from this subarray.
:param task_callback: Update task state, defaults to None
:return: a task status and response message
"""
return self.submit_task(
self._release_all,
args=[],
task_callback=task_callback,
)
@check_communicating
def _release_all(
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Release all resources from this subarray.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_release_all"):
return
self._desired_state = ObsState.EMPTY
self._configuring_resources.clear()
# Release resources in subelements, and check return codes
self.logger.debug("Deallocating resources in stations")
for station_proxy in self._stations.values():
station_proxy.deallocate_subarray(self._subarray_id)
self.logger.debug("Deallocating resources in subarray beams")
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
subarray_beam_proxy.release_all()
if self._check_aborted(task_abort_event, task_callback, "_release_all"):
return
# Wait for beams to go to EMPTY
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_release_all"):
return
self._release_internal_resources()
if task_status == TaskStatus.FAILED:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"ReleaseAllResources has " + self._task_message[task_status],
),
)
def _delete_proxies(self) -> None:
self.logger.debug("Deleting proxies")
self._device_communication_states.clear()
# Before we clear the dicts, we must make the the underlying object removes
# it's subcriptions.
for proxy in (
list(self._stations.values())
+ list(self._subarray_beams.values())
+ list(self._station_beams.values())
):
if proxy._proxy is not None:
proxy._proxy.unsubscribe_all_change_events()
self._stations.clear()
self._subarray_beams.clear()
self._station_beams.clear()
def _release_internal_resources(self) -> None:
self.logger.debug("Deallocating resources")
if self._stations or self._subarray_beams or self._station_beams:
self._delete_proxies()
self._device_obs_states.clear()
self._apertures.clear()
self._subarray_beam_trl.clear()
self._number_of_channels = 0
self._configuring_resources.clear()
self._component_state_callback(
resources_changed=[
set(self._stations.keys()),
set(self._subarray_beams.keys()),
set(self._station_beams.keys()),
]
)
self._evaluate_communication_state()
@check_communicating
def _configure(
self: SubarrayComponentManager,
subarray_beams: list[dict[str, Any]],
task_callback: Optional[Callable],
task_abort_event: threading.Event,
transaction_id: str = "",
) -> None:
"""
Configure the resources for a scan.
:param subarray_beams: the beam configurations to be applied
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:param transaction_id: the transaction id for the configuration
Will be generated if not provided.
"""
def _report_status(status: TaskStatus, **kwargs: Any) -> None:
if task_callback is not None:
task_callback(status=status, **kwargs)
_report_status(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
if transaction_id == "":
transaction_id = self._get_unique_id()
was_empty = not self._is_configured
for station_trl, station_proxy in self._stations.items():
station_proxy.deallocate_subarray(self._subarray_id)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
configuration: dict[str, dict] = {}
for beam in subarray_beams:
beam_id = beam.get("subarray_beam_id")
assert isinstance(beam_id, int)
beam_trl = self._subarray_beam_trl.get(beam_id, None)
if beam_trl is None:
self.logger.error(f"undefined beam {beam_id}")
continue
self.logger.debug(f"Processing beam {beam_id} ({beam_trl})")
beam["subarray_id"] = self._subarray_id
for aperture in beam["apertures"]:
aperture_id = aperture["aperture_id"]
station_beam_trl = self._apertures.get(aperture_id, None)
if station_beam_trl is None:
self.logger.error(f"undefined aperture {aperture_id}")
continue
aperture["station_beam_trl"] = station_beam_trl
configuration[beam_trl] = beam
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
# configure subarray beams. When configured, configure stations
task_status = self._configure_subarray_beams(configuration)
# Must wait for all configuration to complete
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_configure"):
return
if task_status == TaskStatus.COMPLETED:
self.logger.debug("configuring stations")
result_code = self._configure_stations(transaction_id=transaction_id)
if result_code != ResultCode.OK:
task_status = TaskStatus.FAILED
# Obsstate update
if task_status == TaskStatus.COMPLETED:
self._is_configured = True
if was_empty:
self._component_state_callback(configured_changed=True)
elif task_status == TaskStatus.FAILED:
self._component_state_callback(obsfault=True)
_report_status(
status=task_status,
result=(
self._task_to_result[task_status],
"Configure has " + self._task_message[task_status],
),
)
def _configure_stations(
self: SubarrayComponentManager, transaction_id: str
) -> ResultCode:
"""
Configure the station resources for a scan.
:param transaction_id: the transaction id for the configuration
:return: a result code
"""
apply_configuration_commands = MccsCompositeCommandProxy(self.logger)
for station_trl in self._stations:
apply_configuration_commands += MccsCommandProxy(
station_trl,
"ApplyConfiguration",
self.logger,
default_args=transaction_id,
)
result, message = apply_configuration_commands(
command_evaluator=CompositeCommandResultEvaluator()
)
if result != ResultCode.OK:
self.logger.error(
f"MccsCompositeCommandProxy was not happy {result=}"
f" {pretty_format(message)}"
)
return result
def _configure_subarray_beams(
self: SubarrayComponentManager,
subarray_beam_configuration: dict[str, Any],
) -> TaskStatus:
"""
Configure the station beam resources for a scan.
:param subarray_beam_configuration: the subarray beam configuration
to be applied
:return: a result code
"""
self._configuring_resources.clear()
self._desired_state = ObsState.READY
result_code = TaskStatus.COMPLETED
for (
subarray_beam_trl,
configuration,
) in subarray_beam_configuration.items():
subarray_beam_proxy = self._subarray_beams[subarray_beam_trl]
proxy_result_code = subarray_beam_proxy.configure(configuration)
if proxy_result_code == ResultCode.FAILED:
result_code = TaskStatus.FAILED
else:
self._configuring_resources.add(subarray_beam_trl)
return result_code
# pylint: disable=arguments-differ
[docs] @check_communicating
def scan( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
scan_id: int,
start_time: Optional[str] = None,
duration: Optional[float] = 0.0,
) -> tuple[TaskStatus, str]:
"""
Submit the `Scan` command.
:param task_callback: Update task state, defaults to None
:param interface: the schema version this is running against.
:param scan_id: The ID for this scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means foreve
:return: A task status and response message.
"""
return self.submit_task(
self._scan,
args=[scan_id, start_time, duration],
task_callback=task_callback,
)
@check_communicating
def _scan(
self: SubarrayComponentManager,
scan_id: int,
start_time: Optional[str],
duration: float,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Start scanning.
:param scan_id: the id of the scan
:param start_time: the start time of the scan
:param duration: Scan duration in seconds. 0.0 or omitted means foreve
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
self._scan_id = scan_id
self._scan_start_time = start_time
self._scan_duration = duration
if self._check_aborted(task_abort_event, task_callback, "_scan"):
return
self._desired_state = ObsState.SCANNING
self._configuring_resources.clear()
task_status = TaskStatus.COMPLETED # if everything below does not fail
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code = subarray_beam_proxy.scan(scan_id, start_time, duration)
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# wait for beams to start
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(
self._obs_command_timeout, task_abort_event
)
if self._check_aborted(task_abort_event, task_callback, "_scan"):
return
# Start scan on stations
if task_status == TaskStatus.COMPLETED:
for trl, station_proxy in self._stations.items():
self.logger.debug(f"Starting scan in station {trl}")
proxy_result_code = station_proxy.scan(
self._subarray_id,
scan_id,
start_time,
duration,
)
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
if self._check_aborted(task_abort_event, task_callback, "_scan"):
return
if task_status == TaskStatus.COMPLETED:
self._component_state_callback(scanning_changed=True)
else:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Scan has " + self._task_message[task_status],
),
)
[docs] @check_communicating
def end_scan( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `end_scan` slow command.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
return self.submit_task(
self._end_scan,
args=[],
task_callback=task_callback,
)
def _end_scan(
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Submit the `end_scan` slow command.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_end_scan"):
return
self._scan_id = None
self._scan_start_time = None
self._scan_duration = 0.0
# Stuff goes here. This should tell the subarray beam device to
# stop scanning.
task_status = TaskStatus.COMPLETED
for subarray_beam_proxy in self._subarray_beams.values():
proxy_result_code = subarray_beam_proxy.end_scan()
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# Stop scan on stations
if task_status == TaskStatus.COMPLETED:
for trl, station_proxy in self._stations.items():
self.logger.debug(f"Stopping scan in station {trl}")
proxy_result_code = station_proxy.end_scan(self._subarray_id)
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
if self._check_aborted(task_abort_event, task_callback, "_end_scan"):
return
if task_status == TaskStatus.COMPLETED:
self._component_state_callback(scanning_changed=False)
else:
self._component_state_callback(obsfault=True)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"EndScan has " + self._task_message[task_status],
),
)
@check_communicating
def _deconfigure( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Deconfigure resources.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
self.logger.debug("deconfigure task invoked")
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_deconfigure"):
return
self._desired_state = ObsState.IDLE
task_status = self._deconfigure_subelements()
if task_status == TaskStatus.COMPLETED:
self._wait_for_obs_state(self._obs_command_timeout, task_abort_event)
if self._check_aborted(task_abort_event, task_callback, "_deconfigure"):
return
self._component_state_callback(configured_changed=False)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Subarray Deconfigure has " + self._task_message[task_status],
),
)
@check_communicating
def _deconfigure_subelements(
self: SubarrayComponentManager,
) -> TaskStatus:
"""
Deconfigure resources.
:return: result code
"""
self.logger.debug("deconfigure subelements invoked")
task_status = TaskStatus.COMPLETED
for trl, station_proxy in self._stations.items():
self.logger.debug(f"deconfigure station {trl}")
proxy_task_status, response = station_proxy.deallocate_subarray(
self._subarray_id
)
if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED):
task_status = TaskStatus.FAILED
#
# Set all station beams to no logical bands, point statically at zenith
#
for trl, subarray_beam_proxy in self._subarray_beams.items():
self.logger.debug(f"deconfigure subarray beam {trl}")
self._configuring_resources.add(trl)
proxy_task_status, response = subarray_beam_proxy.deconfigure()
if proxy_task_status in (TaskStatus.REJECTED, TaskStatus.FAILED):
task_status = TaskStatus.FAILED
self._is_configured = False
return task_status
def _abort_callback(
self: SubarrayComponentManager,
*,
status: Optional[TaskStatus] = None,
exception: Optional[Exception] = None,
**kwargs: Any,
) -> None:
if exception is not None:
self.logger.error(f"abort_commands raised exception: {repr(exception)}")
if status == TaskStatus.COMPLETED:
self.logger.debug("abort_commands has finished, setting flag")
self._abort_complete.set()
[docs] @check_communicating
def abort( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Abort the observation.
:param task_callback: callback to be called when the status of
the command changes
:return: A task status and response message.
"""
# We have to spin up a separate thread such that we skip the queue which
# we would append to if we used submit_task
threading.Thread(
target=self._abort,
kwargs={"task_callback": task_callback},
).start()
return (TaskStatus.IN_PROGRESS, "Abort has started")
[docs] @check_communicating
def abort_device(
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Abort only this device, for use in RestartSubarray().
:param task_callback: callback to be called when the status of
the command changes
:return: A task status and response message.
"""
# We have to spin up a separate thread such that we skip the queue which
# we would append to if we used submit_task
threading.Thread(
target=self._abort,
kwargs={"cascade": False, "task_callback": task_callback},
).start()
return (TaskStatus.IN_PROGRESS, "Abort has started")
def _abort( # type: ignore[override]
self: SubarrayComponentManager,
cascade: bool = True,
task_callback: Optional[Callable] = None,
) -> None:
"""
Abort command execution.
:param cascade: whether or not to abort sub-devices.
:param task_callback: Update task state, defaults to None.
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
task_status = TaskStatus.COMPLETED
# This is sent in a thread, we record progress using the _abort_callback, which
# will set an Event once the task is complete.
self.abort_commands(task_callback=self._abort_callback) # type:ignore
# Wait until the timeout for the Event to be set by abort_commands
timeout = 10.0
if self._abort_complete.wait(timeout=timeout):
self._abort_complete.clear()
else:
self.logger.error(f"Abort timed out in {timeout} seconds.")
task_status = TaskStatus.FAILED
# Reinitialize our event. The old one could get set at any point now.
self._abort_complete = threading.Event()
# Abort sub-devices
if cascade:
if task_status == TaskStatus.COMPLETED:
self._desired_state = ObsState.ABORTED
self._configuring_resources.clear()
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code, response = subarray_beam_proxy.abort()
self.logger.debug(
f"Aborting {trl} has {ResultCode(proxy_result_code).name}"
)
if proxy_result_code in (
ResultCode.FAILED,
ResultCode.REJECTED,
ResultCode.NOT_ALLOWED,
):
task_status = TaskStatus.FAILED
break
# Wait until subdevices reach ObsState.ABORTED
if task_status == TaskStatus.COMPLETED:
task_status = self._wait_for_obs_state(timeout=10.0)
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Abort command has " + self._task_message[task_status],
),
)
[docs] @check_communicating
def obsreset( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `ObsReset` slow command.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
return self.submit_task(
self._obsreset,
args=[],
task_callback=task_callback,
)
@check_communicating
def _obsreset( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Reset the observation by returning to unconfigured state.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_obsreset"):
return
task_status = TaskStatus.COMPLETED
for trl, subarray_beam_proxy in self._subarray_beams.items():
self._configuring_resources.add(trl)
proxy_result_code = subarray_beam_proxy.obsreset()
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"ObsReset command has " + self._task_message[task_status],
),
)
[docs] @check_communicating
def restart( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the `Restart` slow command.
:param task_callback: Update task state, defaults to None
:return: A task status and response message.
"""
return self.submit_task(
self._restart,
args=[],
task_callback=task_callback,
)
@check_communicating
def _restart( # type: ignore[override]
self: SubarrayComponentManager,
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Restart the subarray by returning to unresourced state.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if self._check_aborted(task_abort_event, task_callback, "_restart"):
return
task_status = TaskStatus.COMPLETED
for trl, station_proxy in self._stations.items():
proxy_result_code = station_proxy.deallocate_subarray(self._subarray_id)
if proxy_result_code in (TaskStatus.FAILED, TaskStatus.REJECTED):
task_status = TaskStatus.FAILED
# Scanning related
self._scan_id = None
self._scan_start_time = None
self._scan_duration = 0.0
# Configure related
self._is_configured = False
# Allocate related
self._release_internal_resources()
if task_callback is not None:
task_callback(
status=task_status,
result=(
self._task_to_result[task_status],
"Restart command has " + self._task_message[task_status],
),
)
[docs] @check_communicating
def send_transient_buffer(
self: SubarrayComponentManager,
argin: list[int],
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the send_transient_buffer slow task.
This method returns immediately after it is submitted for
execution.
:param argin: list of requested segments.
:param task_callback: Update task state. Defaults to None.
:return: Task status and response message.
"""
return self.submit_task(
self._send_transient_buffer,
args=[argin],
task_callback=task_callback,
)
@check_communicating
def _send_transient_buffer(
self: SubarrayComponentManager,
argin: list[int],
task_callback: Optional[Callable],
task_abort_event: threading.Event,
) -> None:
"""
Send the transient buffer.
:param argin: list of list of requested segment.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
# do stuff here
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(ResultCode.OK, "send_transient_buffer command completed."),
)
def _device_communication_state_changed(
self: SubarrayComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
if trl not in self._device_communication_states:
self.logger.warning(
"Received a communication status changed event for a device not "
"managed by this subarray. Probably it was released just a moment ago. "
"The event will be discarded."
)
return
self._device_communication_states[trl] = communication_state
if self.communication_state == CommunicationStatus.DISABLED:
return
self._evaluate_communication_state()
def _evaluate_communication_state(self: SubarrayComponentManager) -> None:
if CommunicationStatus.DISABLED in self._device_communication_states:
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
elif CommunicationStatus.NOT_ESTABLISHED in self._device_communication_states:
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
else:
self._update_communication_state(CommunicationStatus.ESTABLISHED)
def _station_power_state_changed(
self: SubarrayComponentManager,
trl: str,
power_mode: PowerState,
) -> None:
self._station_power_modes[trl] = power_mode
if self._is_assigning and all(
power_mode is not None for power_mode in self._station_power_modes.values()
):
self._is_assigning = False
def _wait_for_obs_state(
self: SubarrayComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-device ObsState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
ticks = int(timeout / 0.01) # 10 ms resolution
while self._configuring_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(0.01)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for ObsState {self._desired_state.name}"
f" resources {self._configuring_resources} "
f"failed to transition in {timeout} seconds. Attempting final poll."
)
return self._final_poll()
self.logger.debug(
f"Waited ObsState {self._desired_state.name} for"
f" {(timeout-ticks*0.01):.3f} seconds"
)
return TaskStatus.COMPLETED
def _device_obs_state_changed(
self: SubarrayComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
old_state = self._device_obs_states.get(trl, None)
if old_state:
old_name = old_state.name
else:
old_name = "NONE"
new_state = ObsState(obs_state)
self._device_obs_states[trl] = new_state
if obs_state == self._desired_state and trl in self._configuring_resources:
self._configuring_resources.remove(trl)
# if not self._configuring_resources:
# self._component_state_callback(task_completed="configure")
self.logger.debug(
f"ObsState for {trl} changed: {old_name} -> {new_state.name}, "
f"waiting for {len(self._configuring_resources)} more devices"
)
def _device_obs_state_fault(
self: SubarrayComponentManager,
trl: str,
obs_state: ObsState,
) -> None:
if obs_state != ObsState.FAULT:
self.logger.warning(
"_device_obs_state_fault called with"
f" incorrect ObsState {trl} {obs_state.name}"
)
return
self.logger.error(f"{trl} moved to FAULT.")
self._device_obs_states[trl] = ObsState(obs_state)
self._component_state_callback(obsfault=True)
def _final_poll(self: SubarrayComponentManager) -> TaskStatus:
"""
Direct check of device obsstates as a last attempt before declaring failure.
Sometimes it appears we miss change events in production, if for some reason
we haven't received all the change events we expected, double check what we
think the obsstate is, vs what it actually is on all subdevices that we need to.
:returns: a taskstatus dependent on whether or not the device was actually in
the correct state.
"""
for trl in list(self._configuring_resources):
device = (
self._station_beams.get(trl)
or self._subarray_beams.get(trl)
or self._stations.get(trl)
)
if (
device
and device._proxy
and device._proxy.obsstate == self._desired_state
):
self.logger.info(
f"Change event for {trl} was missed, device was in correct state."
)
self._component_state_callback(missed_event=True)
self._configuring_resources.remove(trl)
return (
TaskStatus.COMPLETED
if not self._configuring_resources
else TaskStatus.FAILED
)
def _get_unique_id(self: SubarrayComponentManager) -> str:
"""
Get a UID.
Retrieves a unique ID from a running SKUID service if available
otherwise returns a locally unique ID.
:return: A unique ID.
"""
if self._skuid_url:
client = SkuidClient(self._skuid_url)
# TODO: hanging on client.fetch_transaction_id()
# but this is a weak test anyhow:
# we are passing a nonexistent URL,
# which makes the client fail to fetch a transaction ID,
# and fall back to generating one locally.
# So for now I have switched over to
# generating locally in the first place.
return client.get_local_transaction_id()
# return client.fetch_transaction_id()
return SkuidClient.get_local_transaction_id()