# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS subarray beam device."""
from __future__ import annotations # allow forward references in type hints
import importlib
import json
import sys
import threading
from typing import Any, Final, Optional
import ska_tango_base as stb
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsState,
ObsStateModel,
PowerState,
TaskStatus,
)
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.obs import SKAObsDevice
from tango.server import attribute, device_property
from ska_low_mccs.subarray_beam.subarray_beam_component_manager import (
SubarrayBeamComponentManager,
)
from ska_low_mccs.subarray_beam.subarray_beam_health_model import (
SubarrayBeamHealthModel,
)
from ska_low_mccs.utils import complex_list_to_interleaved_float_list
__all__ = ["MccsSubarrayBeam", "main"]
# pylint: disable=too-many-public-methods, too-many-ancestors
# pylint: disable=too-many-instance-attributes
[docs]
class MccsSubarrayBeam(MccsBaseDevice, SKAObsDevice):
"""An implementation of a subarray beam Tango device for MCCS."""
InitCommand = None # type: ignore[assignment]
ObsCommandTimeout = device_property(
dtype=int,
default_value=60,
doc="The timeout in seconds for Observation commands.",
)
# ---------------
# Initialisation
# ---------------
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self.component_manager: SubarrayBeamComponentManager
self._health_state: HealthState = HealthState.OK
self._health_model: SubarrayBeamHealthModel
self.obs_state_model: ObsStateModel
self._missed_events: int
self._qa_metrics: str
[docs]
def init_device(self: MccsSubarrayBeam) -> None:
"""Initialise the device."""
self._missed_events = 0
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
self.logger.info("\n%s\n%s", str(self.GetVersionInfo()), version)
self._qa_metrics = ""
self.init_completed()
def _init_state_model(self: MccsSubarrayBeam) -> None:
super()._init_state_model()
self._health_state = (
HealthState.OK
) # Beam starts EMPTY, which is OK. InitCommand.do() does this too late.
self._health_model = SubarrayBeamHealthModel(
[], self._health_changed, ignore_power_state=True
)
self.set_change_event("healthState", True, False)
self.set_change_event("qualityAssuranceMetrics", True, False)
self.obs_state_model = ObsStateModel(
logger=self.logger, callback=self._update_obs_state
)
[docs]
def create_component_manager(
self: MccsSubarrayBeam,
) -> SubarrayBeamComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return SubarrayBeamComponentManager(
self.logger,
self.ObsCommandTimeout,
self._communication_state_callback,
self._component_state_callback,
event_serialiser=self._event_serialiser,
)
# commands with a json schema
AssignResources_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.subarray_beam",
"MccsSubarrayBeam_AssignResources_3_1.json",
)
)
Configure_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.subarray_beam",
"MccsSubarrayBeam_Configure_4_2.json",
)
)
Scan_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.common",
"Mccs_Scan_3_0.json",
)
)
[docs]
def started_AssignResources(self) -> None:
"""Assign Resources command started callback."""
self.obs_state_model.perform_action("assign_invoked")
[docs]
def completed_AssignResources(self) -> None:
"""Assign Resources command completed callback."""
self.obs_state_model.perform_action("assign_completed")
[docs]
def started_ReleaseAllResources(self) -> None:
"""Release All Resources command started callback."""
self.obs_state_model.perform_action("release_invoked")
[docs]
def completed_ReleaseAllResources(self) -> None:
"""Release All Resources command completed callback."""
self.obs_state_model.perform_action("release_completed")
[docs]
def started_ObsReset(self) -> None:
"""Obs Reset command started callback."""
self.obs_state_model.perform_action("obsreset_invoked")
[docs]
def completed_ObsReset(self) -> None:
"""Obs Reset command completed callback."""
self.obs_state_model.perform_action("obsreset_completed")
[docs]
def started_Restart(self) -> None:
"""Restart command started callback."""
self.obs_state_model.perform_action("restart_invoked")
[docs]
def completed_Restart(self) -> None:
"""Restart command completed callback."""
self.obs_state_model.perform_action("restart_completed")
[docs]
def started_Abort(self) -> None:
"""Abort command started callback."""
self.obs_state_model.perform_action("abort_invoked")
[docs]
def completed_Abort(self) -> None:
"""Abort command completed callback."""
self.obs_state_model.perform_action("abort_completed")
[docs]
def started_AbortDevice(self) -> None:
"""Abort device command started callback."""
self.obs_state_model.perform_action("abort_invoked")
[docs]
def completed_AbortDevice(self) -> None:
"""Abort device command completed callback."""
self.obs_state_model.perform_action("abort_completed")
# ----------
# Callbacks
# ----------
# pylint: disable=too-many-arguments, too-many-branches
def _component_state_callback(
self: MccsSubarrayBeam,
power: Optional[PowerState] = None,
obsfault: Optional[bool] = None,
health: Optional[HealthState] = None,
beam_locked: Optional[bool] = None,
resources_changed: Optional[set] = None,
configured_changed: Optional[bool] = None,
scanning_changed: Optional[bool] = None,
trl: Optional[str] = None,
obsstate_changed: Optional[ObsState] = None,
missed_event: Optional[bool] = None,
qa_metrics: Optional[str] = None,
) -> None:
"""
Handle change in the state of the component.
This is a callback hook, called by the component manager when
the state of the component changes.
:param power: An optional parameter with the new power state of
this device.
:param obsfault: An optional flag if the device is entering or
exiting a fault state.
:param health: An optional parameter with the new health state
of this device.
:param beam_locked: An optional flag with the new beam locked
state of this device.
:param resources_changed: An optional parameter updating the resources for
this SubarrayBeam.
:param configured_changed: An optional flag indicating that this SubarrayBeam's
configuration has changed.
:param scanning_changed: An optional flag indicating that this SubarrayBeam's
scanning status has changed.
:param trl: An optional flag indicating the TRL of the the subdevice
:param obsstate_changed: An optional parameter with the new ObsState of
a subservient device.
:param missed_event: whether the component manager has detected a missed change
event.
:param qa_metrics: A json serialised dictionary containing
quality assurance metrics
"""
if trl is None:
if health is not None:
if self._health_state != health:
self._health_state = health
else:
if health is not None:
self._health_model.station_beam_health_changed(trl, health)
if obsstate_changed is not None:
if obsstate_changed == ObsState.FAULT:
self.component_manager._device_obs_state_fault(
trl, obsstate_changed
)
else:
self.component_manager._device_obs_state_changed(
trl, obsstate_changed
)
if beam_locked is not None:
self._health_model.is_beam_locked_changed(beam_locked)
if resources_changed is not None:
station_beam_trls = resources_changed
self._resources_changed(station_beam_trls)
if configured_changed is not None:
self._configuration_changed(configured_changed)
if scanning_changed is not None:
self._scanning_changed(scanning_changed)
if obsfault is not None:
self.obs_state_model.perform_action("component_obsfault")
if missed_event:
self._missed_events += 1
if qa_metrics is not None:
self._qa_metrics = qa_metrics
self.push_change_event("qualityAssuranceMetrics", qa_metrics)
def _communication_state_callback(
self: MccsSubarrayBeam,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: "component_on", # always-on device
}
action = action_map[communication_state]
if action is not None:
self.op_state_model.perform_action(action)
self.logger.debug("Now device is communicating")
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
def _configuration_changed(self: MccsSubarrayBeam, is_configured: Any) -> None:
"""
Handle the configuration change.
:param is_configured: if the configuration has changed
"""
if is_configured:
self.obs_state_model.perform_action("component_configured")
else:
self.obs_state_model.perform_action("component_unconfigured")
def _resources_changed(
self: MccsSubarrayBeam,
station_beam_trls: set[str],
) -> None:
"""
Handle change in subarray_beam resources.
This is a callback hook, called by the component manager when
the resources of the subarray_beam changes.
:param station_beam_trls: the TRLs of station beams assigned
to this subarray
"""
if station_beam_trls:
self.logger.debug("component_resourced")
self.obs_state_model.perform_action("component_resourced")
else:
self.logger.debug("component_unresourced")
self.obs_state_model.perform_action("component_unresourced")
self._health_model.update_station_beams(station_beam_trls)
def _scanning_changed(self: MccsSubarrayBeam, is_scanning: bool) -> None:
"""
Handle change in station beam scanning status.
This is a callback hook, called by the component manager when
the scanning status of the station beam changes.
:param is_scanning: if the component is performing a scan
"""
if is_scanning:
self.logger.debug("component_scanning")
self.obs_state_model.perform_action("component_scanning")
else:
self.logger.debug("component_not_scanning")
self.obs_state_model.perform_action("component_not_scanning")
def _health_changed(self: MccsSubarrayBeam, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state != health:
self._health_state = health
# ----------
# Attributes
# ----------
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsSubarrayBeam) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs]
@attribute(dtype=str)
def qualityAssuranceMetrics(self: MccsSubarrayBeam) -> str:
"""
Expose quality assurance metrics.
Structure:
>>> {
>>> "apertures": {
>>> <aperture_id>: {
>>> "is_beam_locked": <bool>
>>> },
>>> ...
>>> },
>>> "is_beam_locked": <bool>
>>> }
:return: A json serialised dictionary
"""
return self._qa_metrics
[docs]
@healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsSubarrayBeam, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
[docs]
@attribute(dtype="DevLong", format="%i")
def subarrayId(self: MccsSubarrayBeam) -> int:
"""
Return the subarray id.
:return: the subarray id
"""
return self.component_manager.subarray_id
# Local Subarray Beam ID (1-48). TRL uses composite of subarray and beam IDs;
# See documentation.
[docs]
@attribute(dtype="DevLong", format="%i", max_value=48, min_value=1)
def subarrayBeamId(self: MccsSubarrayBeam) -> int:
"""
Return the local subarray beam id (scoped per subarray, 1-48).
:return: the local subarray beam id
"""
return self.component_manager.subarray_beam_id
@attribute(dtype=(str,), max_dim_x=512)
def stationBeamIds(self: MccsSubarrayBeam) -> list[str]:
"""
Return the ids of station beams assigned to this subarray beam.
:return: the station beam ids
"""
return self.component_manager.station_beam_ids
[docs]
@stationBeamIds.write # type: ignore[no-redef]
def stationBeamIds(self: MccsSubarrayBeam, station_beam_ids: list[str]) -> None:
"""
Set the station beam ids.
:param station_beam_ids: ids of the station beams for this subarray beam
"""
self.component_manager.station_beam_ids = station_beam_ids
@attribute(dtype=("DevLong",), max_dim_x=512, format="%i")
def stationIds(self: MccsSubarrayBeam) -> list[int]:
"""
Return the station ids.
:return: the station ids
"""
return self.component_manager.station_ids
[docs]
@stationIds.write # type: ignore[no-redef]
def stationIds(self: MccsSubarrayBeam, station_ids: list[int]) -> None:
"""
Set the station ids.
:param station_ids: ids of the stations for this beam
"""
self.component_manager.station_ids = station_ids
@attribute(dtype="DevLong", format="%i", max_value=7, min_value=0)
def logicalBeamId(self: MccsSubarrayBeam) -> int:
"""
Return the logical beam id.
:todo: this documentation needs to differentiate logical beam id
from beam id
:return: the logical beam id
"""
return self.component_manager.logical_beam_id
[docs]
@logicalBeamId.write # type: ignore[no-redef]
def logicalBeamId(self: MccsSubarrayBeam, logical_beam_id: int) -> None:
"""
Set the logical beam id.
:param logical_beam_id: the logical beam id
"""
self.component_manager.logical_beam_id = logical_beam_id
[docs]
@attribute(
dtype="DevDouble",
unit="Hz",
standard_unit="s^-1",
max_value=1e37,
min_value=0,
)
def updateRate(self: MccsSubarrayBeam) -> float:
"""
Return the update rate (in hertz) for this subarray beam.
:return: the update rate for this subarray beam
"""
return self.component_manager.update_rate
@attribute(dtype="DevBoolean")
def isBeamLocked(self: MccsSubarrayBeam) -> bool:
"""
Return a flag indicating whether the beam is locked or not.
:return: whether the beam is locked or not
"""
return self.component_manager.is_beam_locked
[docs]
@isBeamLocked.write # type: ignore[no-redef]
def isBeamLocked(self: MccsSubarrayBeam, value: bool) -> None:
"""
Set a flag indicating whether the beam is locked or not.
:param value: whether the beam is locked or not
"""
self.component_manager.is_beam_locked = value
[docs]
@attribute(dtype=(("DevLong",),), max_dim_y=384, max_dim_x=4)
def channels(self: MccsSubarrayBeam) -> list[list[int]]:
"""
Return the ids of the channels configured for this beam.
:return: channel ids
"""
return self.component_manager.channels
[docs]
@attribute(dtype=("DevFloat",), max_dim_x=384)
def antennaWeights(self: MccsSubarrayBeam) -> list[float]:
"""
Return the antenna weights configured for this beam.
:return: antenna weight
"""
return complex_list_to_interleaved_float_list(
self.component_manager.antenna_weights
)
@attribute(dtype=("DevDouble",), max_dim_x=5)
def desiredPointing(self: MccsSubarrayBeam) -> list[float]:
"""
Return the desired pointing of this beam.
:return: the desired point of this beam, conforming to the Sky
Coordinate Set definition
"""
return self.component_manager.desired_pointing
[docs]
@desiredPointing.write # type:ignore[no-redef]
def desiredPointing(self: MccsSubarrayBeam, values: list[float]) -> None:
"""
Set the desired pointing of this beam.
* activation time (s) -- value range 0-10^37
* azimuth position (deg) -- value range 0-360
* azimuth speed (deg/s) -- value range 0-10^37
* elevation position (deg) -- value range 0-90
* elevation rate (deg/s) -- value range 0-10^37
:param values: the desired pointing of this beam, expressed as a
sky coordinate set
"""
self.component_manager.desired_pointing = values
[docs]
@attribute(dtype=("DevDouble",), max_dim_x=5)
def phaseCentre(self: MccsSubarrayBeam) -> list[float]:
"""
Return the phase centre.
:return: the phase centre
"""
return self.component_manager.phase_centre
[docs]
@attribute(dtype="DevLong")
def firstSubarrayChannel(self: MccsSubarrayBeam) -> int:
"""
Return the first logical channel allocated to the beam.
:return: the first logical channel allocated to the beam.
"""
return self.component_manager.first_channel
[docs]
@attribute(dtype="DevLong")
def numberOfChannels(self: MccsSubarrayBeam) -> int:
"""
Return the first logical channel allocated to the beam.
:return: the first logical channel allocated to the beam.
"""
return self.component_manager.number_of_channels
[docs]
@attribute(dtype="DevString")
def healthReport(self: MccsSubarrayBeam) -> str:
"""
Get the health report.
:return: the health report.
"""
return self._health_model.health_report
[docs]
@attribute(dtype="DevLong")
def missedEvents(self: MccsSubarrayBeam) -> int:
"""
Get the amount of missed change events.
Some commands rely on change events from sub-devices, sometime we miss these
events, this attribute keeps track of how many we know we have missed.
:return: the amount of missed change events
"""
return self._missed_events
[docs]
@attribute(
dtype="DevString",
format="%s",
)
def targetName(self: MccsSubarrayBeam) -> str:
"""
Get the Name of the current target.
:return: the name of the object being pointed at.
"""
return self.component_manager._target_name
# --------
# Commands
# --------
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def AssignResources(
self: MccsSubarrayBeam,
subarray_id: int,
subarray_beam_id: int,
first_subarray_channel: int,
number_of_channels: int,
apertures: dict,
) -> stb.type_hints.TaskFunctionType:
"""
Assign resources to the subarray_beam with all relevant parameters.
:param subarray_id: ID of the subarray to which the beam belongs
:param subarray_beam_id: Id of (this) beam
:param first_subarray_channel: First logical channel assigned to subarray
:param number_of_channels: Number of channels assigned to beam
:param apertures: list of dictionaries iwith each entry containing
* station_id: (int) in range 1-512
* aperture_id: (str) with format APx.y; x must match station_ID
* station_beam_trl: (str)
* channel_blocks: Allocated channel blocks for this station
* hardware_beam: Allocated hardware beam for this station
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.do_assign_resources(
subarray_id=subarray_id,
subarray_beam_id=subarray_beam_id,
first_subarray_channel=first_subarray_channel,
number_of_channels=number_of_channels,
apertures=apertures,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def Scan(
self: MccsSubarrayBeam, scan_id: int, **kwargs: Any
) -> stb.type_hints.TaskFunctionType:
"""
Start the scan associated with the subarray_beam.
:param scan_id: The ID for this scan
:param kwargs: Optional arguments including:
- start_time: UTC time for begin of scan, None for immediate start
- duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.scan(
scan_id=scan_id,
**kwargs,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
def EndScan(self: MccsSubarrayBeam) -> stb.type_hints.TaskFunctionType:
"""
Stop the current scan associated with the subarray_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.end_scan
[docs]
@stb.long_running_commands.long_running_command
def ReleaseAllResources(
self: MccsSubarrayBeam,
) -> stb.type_hints.TaskFunctionType:
"""
Release all allocated resources from the subarray_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.release_all_resources
[docs]
@stb.long_running_commands.long_running_command
def End(self: MccsSubarrayBeam) -> stb.type_hints.TaskFunctionType:
"""
Deconfigure the subarray_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.end
[docs]
@stb.long_running_commands.long_running_command
def ObsReset(self: MccsSubarrayBeam) -> stb.type_hints.TaskFunctionType:
"""
Reset to IDLE the subarray_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.obsreset
[docs]
@stb.long_running_commands.long_running_command
def Restart(self: MccsSubarrayBeam) -> stb.type_hints.TaskFunctionType:
"""
Restart to EMPTY the subarray_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.restart
[docs]
def schedule_abort_task(
self: MccsSubarrayBeam, task_callback: stb.type_hints.TaskCallbackType
) -> tuple[TaskStatus, str]:
"""
Schedule an Abort task to begin executing immediately.
Subclasses should override this to change the behaviour of the
:py:meth:`!Abort()` command.
:param task_callback: Notified of progress of the abort command.
:return: A tuple containing TaskStatus.IN_PROGRESS and a message
"""
return self.component_manager.abort(task_callback=task_callback)
[docs]
@stb.long_running_commands.long_running_command
def AbortDevice(self: MccsSubarrayBeam) -> stb.type_hints.TaskFunctionType:
"""
Abort any long-running command such as ``Configure()`` or ``Scan()``.
This will only cancel commands on this device, not further down the hierarchy,
use Abort() for that use case.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.abort_device
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsSubarrayBeam.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()