# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS station beam device."""
from __future__ import annotations # allow forward references in type hints
import importlib.resources
import json
import sys
import threading
import time
from typing import Any, Final, Optional
import ska_tango_base as stb
from ska_control_model import (
CommunicationStatus,
HealthState,
ObsStateModel,
PowerState,
ResultCode,
TaskStatus,
)
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.obs import SKAObsDevice
from tango import AttrQuality
from tango.server import attribute, command, device_property
from ska_low_mccs.station_beam.station_beam_component_manager import (
StationBeamComponentManager,
)
from ska_low_mccs.station_beam.station_beam_health_model import StationBeamHealthModel
from ska_low_mccs.utils import complex_list_to_interleaved_float_list
from .station_beam_component_manager import FAILED_POINTING_UPDATES_NOT_TRACKING
__all__ = ["MccsStationBeam", "main"]
DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]
# pylint: disable=too-many-public-methods, too-many-instance-attributes
# pylint: disable=too-many-ancestors
[docs]
class MccsStationBeam(MccsBaseDevice, SKAObsDevice):
"""An implementation of a station beam Tango device for MCCS."""
InitCommand = None # type: ignore[assignment]
# -----------------
# Device Properties
# -----------------
BeamId = device_property(dtype=int, default_value=0)
StationTRL = device_property(dtype=str, default_value="")
BeamWeightStoreTRL = device_property(dtype=str, default_value="")
DegradedPointingThreshold = device_property(dtype=int, default_value=1)
FailedPointingThreshold = device_property(dtype=int, default_value=3)
# ---------------
# Initialisation
# ---------------
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self.component_manager: StationBeamComponentManager
self._health_state: HealthState = HealthState.OK
self._health_model: StationBeamHealthModel
self.obs_state_model: ObsStateModel
self._is_beam_locked: bool | None
self._failed_pointing_updates: int | None
[docs]
def init_device(self: MccsStationBeam) -> None:
"""Initialise the device."""
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tBeamId: {self.BeamId}\n"
f"\tStationTrl: {self.StationTRL}\n"
f"\tBeamWeightStoreTrl: {self.BeamWeightStoreTRL}\n"
f"\tDegradedPointingUpdatesThreshold: {self.DegradedPointingThreshold}\n"
f"\tFailedPointingUpdatesThreshold: {self.FailedPointingThreshold}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
self._is_beam_locked = None
self._failed_pointing_updates = None
self.init_completed()
def _init_state_model(self: MccsStationBeam) -> None:
super()._init_state_model()
self._health_state = (
HealthState.OK
) # Beam starts empty which is OK. InitCommand.do() does this too late.
self._health_model = StationBeamHealthModel(
self._health_changed,
ignore_power_state=True,
thresholds={
"antenna_degraded_threshold": StationBeamHealthModel.DEGRADED_CRITERIA,
"antenna_failed_threshold": StationBeamHealthModel.FAILED_CRITERIA,
"degraded_pointing_updates_threshold": self.DegradedPointingThreshold,
"failed_pointing_updates_threshold": self.FailedPointingThreshold,
},
beam_locked_callback=self._update_beam_locked,
)
self.set_change_event("healthState", True, False)
self.set_archive_event("healthState", True, False)
self.set_change_event("isBeamLocked", True, False)
self.set_archive_event("isBeamLocked", True, False)
self.set_change_event("failedPointingUpdates", True, False)
self.set_archive_event("failedPointingUpdates", True, False)
self.obs_state_model = ObsStateModel(
logger=self.logger, callback=self._update_obs_state
)
[docs]
def create_component_manager(
self: MccsStationBeam,
) -> StationBeamComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return StationBeamComponentManager(
self.BeamId,
self.StationTRL,
self.BeamWeightStoreTRL,
self.logger,
self._communication_state_changed,
self._component_state_callback,
event_serialiser=self._event_serialiser,
)
AssignResources_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.station_beam",
"MccsStationBeam_AssignResources_3_1.json",
)
)
Configure_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.station_beam",
"MccsStationBeam_Configure_4_3.json",
)
)
Scan_SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.common",
"Mccs_Scan_3_0.json",
)
)
[docs]
def started_AssignResources(self) -> None:
"""Assign Resources command started callback."""
self.obs_state_model.perform_action("assign_invoked")
[docs]
def completed_AssignResources(self) -> None:
"""Assign Resources command completed callback."""
self.obs_state_model.perform_action("assign_completed")
[docs]
def started_ReleaseAllResources(self) -> None:
"""Release All Resources command started callback."""
self.obs_state_model.perform_action("release_invoked")
[docs]
def completed_ReleaseAllResources(self) -> None:
"""Release All Resources command completed callback."""
self.obs_state_model.perform_action("release_completed")
[docs]
def started_ObsReset(self) -> None:
"""Obs Reset command started callback."""
self.obs_state_model.perform_action("obsreset_invoked")
[docs]
def completed_ObsReset(self) -> None:
"""Obs Reset command completed callback."""
self.obs_state_model.perform_action("obsreset_completed")
[docs]
def started_Restart(self) -> None:
"""Restart command started callback."""
self.obs_state_model.perform_action("restart_invoked")
[docs]
def completed_Restart(self) -> None:
"""Restart command completed callback."""
self.obs_state_model.perform_action("restart_completed")
[docs]
def started_Abort(self) -> None:
"""Abort command started callback."""
self.obs_state_model.perform_action("abort_invoked")
[docs]
def completed_Abort(self) -> None:
"""Abort command completed callback."""
self.obs_state_model.perform_action("abort_completed")
# ----------
# Callbacks
# ----------
# pylint: disable=too-many-arguments
def _component_state_callback(
self: MccsStationBeam,
fault: Optional[bool] = None,
power: Optional[PowerState] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
resources_changed: Optional[bool] = None,
configured_changed: Optional[bool] = None,
scanning_changed: Optional[bool] = None,
failed_pointing_updates: Optional[bool] = None,
) -> None:
"""
Handle change in this device's state.
This is a callback hook, called whenever the state changes. It
is responsible for updating the tango side of things i.e. making
sure the attribute is up to date, and events are pushed.
:param fault: An optional flag if the device is entering or
exiting a fault state.
:param power: An optional parameter with the new power state of the device.
:param health: An optional parameter with the new health state of the device.
:param trl: The TRL of the calling device or `None` if this
device is the caller.
:param resources_changed: An optional flag to update the resourced state
:param configured_changed: An optional flag to update the configuration state
:param scanning_changed: An optional flag indicating that this beam
scanning state has changed.
:param failed_pointing_updates: number of failed updates in a row for pointing.
-1 indicates tracking has stopped.
"""
if health is not None:
if trl is None:
# Do regular health update. This device called the callback.
if self._health_state != health:
self._health_state = health
else:
# Call station health changed.
self._health_model.station_health_changed(HealthState(health))
# Probably more to do here with fault.
if fault is not None:
self._health_model.station_fault_changed(fault)
self.obs_state_model.perform_action("component_obsfault")
if failed_pointing_updates is not None:
self._failed_pointing_updates_changed(failed_pointing_updates)
if resources_changed is not None:
self._resources_changed(resources_changed)
if configured_changed is not None:
self._configuration_changed(configured_changed)
if scanning_changed is not None:
self._scanning_changed(scanning_changed)
def _communication_state_changed(
self: MccsStationBeam,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: "component_on", # it's an always-on device
}
self.op_state_model.perform_action(action_map[communication_state])
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
def _failed_pointing_updates_changed(
self: MccsStationBeam, failed_pointing_updates: int | None
) -> None:
quality = (
AttrQuality.ATTR_INVALID
if failed_pointing_updates is not None and failed_pointing_updates < 0
else AttrQuality.ATTR_VALID
)
# Normalise failures to None if not tracking.
normalized_failures = (
None
if failed_pointing_updates == FAILED_POINTING_UPDATES_NOT_TRACKING
else failed_pointing_updates
)
self._health_model.failed_pointing_updates_changed(normalized_failures)
if failed_pointing_updates != self._failed_pointing_updates:
self._failed_pointing_updates = failed_pointing_updates
timestamp = time.time()
self.push_change_event(
"failedPointingUpdates",
self._failed_pointing_updates,
timestamp,
quality,
)
self.push_archive_event(
"failedPointingUpdates",
self._failed_pointing_updates,
timestamp,
quality,
)
def _update_beam_locked(self: MccsStationBeam, beam_locked: bool | None) -> None:
if beam_locked != self._is_beam_locked:
quality = (
AttrQuality.ATTR_INVALID
if beam_locked is None
else AttrQuality.ATTR_VALID
)
timestamp = time.time()
self._is_beam_locked = beam_locked
self.push_change_event(
"isBeamLocked", self._is_beam_locked, timestamp, quality
)
self.push_archive_event(
"isBeamLocked", self._is_beam_locked, timestamp, quality
)
def _configuration_changed(self: MccsStationBeam, is_configured: Any) -> None:
"""
Handle the configuration change.
:param is_configured: if the configuration has changed
"""
if is_configured:
self.obs_state_model.perform_action("component_configured")
else:
self.obs_state_model.perform_action("component_unconfigured")
def _resources_changed(self: MccsStationBeam, is_resourced: Any) -> None:
"""
Handle change in station beam resources.
This is a callback hook, called by the component manager when
the resources of the station beam changes.
:param is_resourced: if the component has been resourced
"""
if is_resourced:
self.logger.debug("component_resourced")
self.obs_state_model.perform_action("component_resourced")
else:
self.logger.debug("component_unresourced")
self.obs_state_model.perform_action("component_unresourced")
def _scanning_changed(self: MccsStationBeam, is_scanning: bool) -> None:
"""
Handle change in station beam scanning status.
This is a callback hook, called by the component manager when
the scanning status of the station beam changes.
:param is_scanning: if the component is performing a scan
"""
if is_scanning:
self.logger.debug("component_scanning")
self.obs_state_model.perform_action("component_scanning")
else:
self.logger.debug("component_not_scanning")
self.obs_state_model.perform_action("component_not_scanning")
def _health_changed(self: MccsStationBeam, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state != health:
self._health_state = health
# ----------
# Attributes
# ----------
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsStationBeam) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs]
@healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsStationBeam, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
[docs]
@attribute(dtype="DevLong", format="%i")
def subarrayId(self: MccsStationBeam) -> int:
"""
Return the subarray id.
:return: the subarray id
"""
return self.component_manager.subarray_id
[docs]
@attribute(dtype="DevLong", format="%i", max_value=47, min_value=0)
def beamId(self: MccsStationBeam) -> int:
"""
Return the station beam id.
:return: the station beam id
"""
return self.component_manager.beam_id
[docs]
@attribute(dtype=str)
def stationTrl(self: MccsStationBeam) -> str:
"""
Return the station TRL.
:return: the station TRL
"""
return self.component_manager._station_trl
[docs]
@attribute(dtype=str)
def beamWeightStoreTrl(self: MccsStationBeam) -> str:
"""
Return the beam weight store TRL.
:return: the beam weight store TRL
"""
return self.component_manager._beam_weight_store_trl
[docs]
@attribute(dtype="DevLong")
def stationId(self: MccsStationBeam) -> int:
"""
Return the station id.
:return: the station id
"""
return self.component_manager.station_id
[docs]
@attribute(dtype="DevLong", format="%i", max_value=7, min_value=0)
def logicalBeamId(self: MccsStationBeam) -> int:
"""
Return the logical beam id.
:todo: this documentation needs to differentiate logical beam id
from beam id
:return: the logical beam id
"""
return self.component_manager.logical_beam_id
[docs]
@attribute(
dtype="DevDouble",
unit="s",
max_value=1e37,
min_value=0,
)
def updateRate(self: MccsStationBeam) -> float:
"""
Return the update rate (in seconds) for this station beam.
:return: the update rate for this station beam
"""
return self.component_manager.update_rate
[docs]
@attribute(dtype="DevBoolean")
def isBeamLocked(self: MccsStationBeam) -> bool | None:
"""
Return a flag indicating whether the beam is locked or not.
:return: whether the beam is locked or not
"""
return self._is_beam_locked
[docs]
@attribute(dtype="DevLong")
def failedPointingUpdates(self: MccsStationBeam) -> int | None:
"""
Return how many failed pointing updates we have had in a row.
:return: how many failed pointing updates we have had in a row.
"""
return self._failed_pointing_updates
[docs]
@attribute(dtype=(("DevLong",),), max_dim_y=48, max_dim_x=8)
def channels(self: MccsStationBeam) -> list[list[int]]:
"""
Return the ids of the channels configured for this beam.
:return: channel ids
"""
return self.component_manager.channels
[docs]
@attribute(dtype=("DevFloat",), max_dim_x=384)
def antennaWeights(self: MccsStationBeam) -> list[float]:
"""
Return the antenna weights configured for this beam.
:return: antenna weightd
"""
return complex_list_to_interleaved_float_list(
self.component_manager.antenna_weights
)
[docs]
@attribute(dtype=("DevDouble",), max_dim_x=5)
def desiredPointing(self: MccsStationBeam) -> list[float] | str:
"""
Return the desired pointing of this beam.
:return: the desired point of this beam, conforming to the Sky
Coordinate Set definition
"""
return self.component_manager.desired_pointing
[docs]
@attribute(dtype=("DevDouble",), max_dim_x=384)
def pointingDelay(self: MccsStationBeam) -> list[float]:
"""
Return the pointing delay of this beam.
:return: the pointing delay of this beam
"""
return self.component_manager.pointing_delay
[docs]
@attribute(dtype=("DevDouble",), max_dim_x=384)
def pointingDelayRate(self: MccsStationBeam) -> list[float]:
"""
Return the pointing delay rate of this beam.
:return: the pointing delay rate of this beam
"""
return self.component_manager.pointing_delay_rate
[docs]
@attribute(dtype=("DevDouble",), max_dim_x=5)
def phaseCentre(self: MccsStationBeam) -> list[float]:
"""
Return the phase centre.
:return: the phase centre
"""
return self.component_manager.phase_centre
[docs]
@attribute(dtype="DevString")
def apertureId(self: MccsStationBeam) -> str:
"""
Return the Aperture Id.
:return: the station beam Aperture Id, in the form APx.y y=substation
"""
return self.component_manager.aperture_id
[docs]
@attribute(dtype="DevString")
def pointingReferenceFrame(self: MccsStationBeam) -> str:
"""
Return the Reference Frame ID for the pointing coordinates.
:return: the Reference Frame ID for the pointing coordinates
"""
return self.component_manager._pointing_reference_frame
[docs]
@attribute(dtype="DevString")
def pointingTimestamp(self: MccsStationBeam) -> str:
"""
Return the timestamp used for the pointing coordinates.
:return: the timestamp used for the pointing coordinates.
"""
return self.component_manager._pointing_timestamp
[docs]
@attribute(dtype="DevString")
def calibrationId(self: MccsStationBeam) -> str:
"""
Return the calibration ID set during the last Configure.
:return: the calibration ID, or an empty string if none was resolved.
"""
return self.component_manager._calibration_id
[docs]
@attribute(dtype="DevLong")
def hardwareBeamId(self: MccsStationBeam) -> int:
"""
Return the timestamp used for the pointing coordinates.
:return: the timestamp used for the pointing coordinates.
"""
return self.component_manager._hardware_beam_id
[docs]
@attribute(dtype="DevLong")
def scanId(self: MccsStationBeam) -> int:
"""
Return the scan ID for the current scan.
:return: the scan ID or 0 if not scanning
"""
return self.component_manager._scan_id
[docs]
@attribute(dtype="DevString")
def healthReport(self: MccsStationBeam) -> str:
"""
Get the health report.
:return: the health report.
"""
return self._health_model.health_report
# --------
# Commands
# --------
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def AssignResources(
self: MccsStationBeam,
subarray_id: int,
subarray_beam_id: int,
station_id: int,
station_trl: str,
channel_blocks: list[int],
hardware_beam: int,
aperture_id: str,
first_subarray_channel: int,
number_of_channels: int,
**kwargs: Any,
) -> stb.type_hints.TaskFunctionType:
"""
Assign resources to the station beam with all relevant parameters.
:param subarray_id: ID of the subarray to which the beam belongs
:param subarray_beam_id: ID of the subarray beam
:param station_id: ID of the associated station
:param station_trl: TRL of the associated station
:param channel_blocks: List of the allocated station channel blocks
:param hardware_beam: Allocated station hardware beam
:param first_subarray_channel: First channel
:param number_of_channels: Number of channels
:param aperture_id: ID of the aperture in format "APx.y"
:param kwargs: optional arguments including:
- interface.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.do_assign(
subarray_id=subarray_id,
subarray_beam_id=subarray_beam_id,
station_id=station_id,
station_trl=station_trl,
channel_blocks=channel_blocks,
hardware_beam=hardware_beam,
aperture_id=aperture_id,
first_subarray_channel=first_subarray_channel,
number_of_channels=number_of_channels,
**kwargs,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
def ApplyPointing(self: MccsStationBeam) -> stb.type_hints.TaskFunctionType:
"""
Apply pointing delays to antennas associated with the station_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.apply_pointing(
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def Scan(
self: MccsStationBeam, scan_id: int, **kwargs: Any
) -> stb.type_hints.TaskFunctionType:
"""
Start the scan associated with the station_beam.
:param scan_id: The ID for this scan
:param kwargs: Optional arguments including:
- start_time: UTC time for begin of scan, None for immediate start
- duration: Scan duration in seconds. 0.0 or omitted means forever
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.scan(
scan_id=scan_id,
**kwargs,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
[docs]
@stb.long_running_commands.long_running_command
def EndScan(self: MccsStationBeam) -> stb.type_hints.TaskFunctionType:
"""
Stop the current scan associated with the station_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.end_scan
[docs]
def schedule_abort_task(
self: MccsStationBeam, task_callback: stb.type_hints.TaskCallbackType
) -> tuple[TaskStatus, str]:
"""
Schedule an Abort task to begin executing immediately.
Subclasses should override this to change the behaviour of the
:py:meth:`!Abort()` command.
:param task_callback: Notified of progress of the abort command.
:return: A tuple containing TaskStatus.IN_PROGRESS and a message
"""
return self.component_manager.abort(task_callback)
[docs]
@stb.long_running_commands.long_running_command
def ObsReset(self: MccsStationBeam) -> stb.type_hints.TaskFunctionType:
"""
Reset to IDLE the station_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.obsreset
[docs]
@stb.long_running_commands.long_running_command
def Restart(self: MccsStationBeam) -> stb.type_hints.TaskFunctionType:
"""
Restart to EMPTY the station_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.restart
[docs]
@stb.long_running_commands.long_running_command
def End(self: MccsStationBeam) -> stb.type_hints.TaskFunctionType:
"""
Deconfigure the station_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.end
[docs]
@stb.long_running_commands.long_running_command
def ReleaseAllResources(
self: MccsStationBeam,
) -> stb.type_hints.TaskFunctionType:
"""
Release all allocated resources from the station_beam.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return self.component_manager.release_all_resources
[docs]
@command(dtype_out="DevVarLongStringArray")
def ToFault(
self: MccsStationBeam,
) -> DevVarLongStringArrayType:
"""
Put this station beam in ObsState.FAULT.
This is for use in testing only.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
self.obs_state_model._straight_to_state("FAULT")
return ([ResultCode.OK], ["Device sent to FAULT state."])
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsStationBeam.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()