# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS station device."""
# pylint: disable=too-many-lines
from __future__ import annotations
import functools
import importlib.resources
import json
import logging
import sys
from typing import Any, Callable, Final, Optional
import numpy as np
import tango
from ska_control_model import CommunicationStatus, HealthState, PowerState, ResultCode
from ska_tango_base.base import CommandTracker
from ska_tango_base.commands import FastCommand, JsonValidator, SubmittedSlowCommand
from ska_tango_base.obs import SKAObsDevice
from tango.server import attribute, command, device_property
from ska_low_mccs.station.station_component_manager import StationComponentManager
from ska_low_mccs.station.station_health_model import StationHealthModel
from ska_low_mccs.station.station_obs_state_model import StationObsStateModel
DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]
__all__ = ["MccsStation", "main"]
# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs]class MccsStation(SKAObsDevice):
"""An implementation of a station beam Tango device for MCCS."""
# -----------------
# Device Properties
# -----------------
StationId = device_property(dtype=int, default_value=0)
FieldStationName = device_property(dtype=str, default_value="")
AntennaTrls = device_property(dtype=(str,), default_value=[])
AntennaXs = device_property(dtype=(float,), default_value=[])
AntennaYs = device_property(dtype=(float,), default_value=[])
AntennaZs = device_property(dtype=(float,), default_value=[])
AntennaIDs = device_property(dtype=(int,), default_value=[])
StationCalibratorTrl = device_property(dtype=str, default_value="")
SpsStationTrl = device_property(dtype=str, default_value="")
RefLatitude = device_property(dtype=float, default_value=0.0)
RefLongitude = device_property(dtype=float, default_value=0.0)
RefHeight = device_property(dtype=float, default_value=0.0)
# ---------------
# Initialisation
# ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self._health_state: HealthState = HealthState.UNKNOWN
self._health_model: StationHealthModel
self.component_manager: StationComponentManager
self._delay_centre: list[float]
self._obs_state_model: StationObsStateModel
self._refLatitude: float
self._refLongitude: float
self._refHeight: float
[docs] def init_device(self: MccsStation) -> None:
"""
Initialise the device.
This is overridden here to change the Tango serialisation model.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tStationId: {self.StationId:03}\n"
f"\tReferenceLatitude: {self.RefLatitude:03}\n"
f"\tReferenceLongitude: {self.RefLongitude:03}\n"
f"\tReferenceHeight: {self.RefHeight:03}\n"
f"\tFieldStationName: {self.FieldStationName}\n"
f"\tStationCalibratorTrl: {self.StationCalibratorTrl}\n"
f"\tSpsStationTrl: {self.SpsStationTrl}\n"
f"\tAntennaTrls: {self.AntennaTrls}\n"
f"\tAntennaXs: {self.AntennaXs}\n"
f"\tAntennaYs: {self.AntennaYs}\n"
f"\tAntennaZs: {self.AntennaZs}\n"
f"\tAntennaIDs: {self.AntennaIDs}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
if all(
trl == ""
for trl in (
self.FieldStationName,
self.StationCalibratorTrl,
self.SpsStationTrl,
)
) and all(trllist == [] for trllist in self.AntennaTrls):
self.logger.warning(
"%s initialised with no subdevices. The device will "
"report PowerState.ON and CommunicationStatus.ESTABLISHED",
device_name,
)
def _init_state_model(self: MccsStation) -> None:
super()._init_state_model()
self._obs_state_model = StationObsStateModel(
self.logger, self._update_obs_state
)
self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late.
self._health_model = StationHealthModel(
self.FieldStationName,
self.SpsStationTrl,
self.AntennaTrls,
self.health_changed,
thresholds={
"antenna_degraded": 0.05,
"antenna_failed": 0.2,
},
)
self.set_change_event("healthState", True, False)
[docs] def create_component_manager(
self: MccsStation,
) -> StationComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
# Combine the three antenna location lists into a single array
antenna_station_locations = np.array(
list(zip(self.AntennaXs, self.AntennaYs, self.AntennaZs))
)
return StationComponentManager(
self.StationId,
self.RefLatitude,
self.RefLongitude,
self.RefHeight,
self.FieldStationName,
self.AntennaTrls,
antenna_station_locations,
self.AntennaIDs,
self.StationCalibratorTrl,
self.SpsStationTrl,
self.logger,
self._communication_state_changed,
self._component_state_callback,
)
_schema_scan: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.station",
"MccsStation_Scan_3_0.json",
)
)
_schema_configure_semi_static: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.station",
"MccsStation_ConfigureSemiStatic_3_0.json",
)
)
_schema_track_object: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.station",
"MccsStation_TrackObject_3_0.json",
)
)
[docs] def init_command_objects(self: MccsStation) -> None:
"""Set up the handler objects for Commands."""
super().init_command_objects()
for command_name, method_name, schema in [
(
"ApplyConfiguration",
"apply_configuration",
None,
),
(
"ConfigureSemiStatic",
"configure_semi_static",
self._schema_configure_semi_static,
),
(
"ConfigureChannels",
"configure_channels",
None,
),
(
"DeallocateSubarray",
"deallocate_subarray",
None,
),
# ("LoadCalibrationCoefficients", "load_calibration_coefficients"),
# ("ApplyCalibration", "apply_calibration"),
(
"ApplyPointingDelays",
"apply_pointing_delays",
None,
),
(
"LoadPointingDelays",
"load_pointing_delays",
None,
),
(
"TrackObject",
"track_object",
self._schema_track_object,
),
("Scan", "scan", self._schema_scan),
("EndScan", "end_scan", None),
("AcquireDataForCalibration", "acquire_data_for_calibration", None),
("StopTracking", "stop_tracking", None),
("StopTrackingAll", "stop_tracking_all", None),
(
"Initialise",
"initialise",
None,
),
]:
validator = (
None
if schema is None
else JsonValidator(
command_name,
schema,
logger=self.logger,
)
)
self.register_command_object(
command_name,
SubmittedSlowCommand(
command_name,
self._command_tracker,
self.component_manager,
method_name,
callback=None,
logger=self.logger,
validator=validator,
),
)
self.register_command_object(
"StartAcquisition",
self.StartAcquisitionCommand(
self._command_tracker,
self.component_manager,
callback=None,
logger=self.logger,
),
)
self.register_command_object(
"GetPointingDelays",
self.GetPointingDelaysCommand(
self.component_manager,
logger=self.logger,
),
)
[docs] class InitCommand(SKAObsDevice.InitCommand):
"""
A class for :py:class:`~.MccsStation`'s Init command.
The
:py:meth:`~.MccsStation.InitCommand.do` method below is called upon
:py:class:`~.MccsStation`'s initialisation.
"""
[docs] def do(
self: MccsStation.InitCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Initialise the :py:class:`.MccsStation`.
:param args: positional args to the component manager method
:param kwargs: keyword args to the component manager method
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
self._device._subarray_id = 0
self._device._refLatitude = 0.0
self._device._refLongitude = 0.0
self._device._refHeight = 0.0
self._device._beam_trls = []
self._device._transient_buffer_trl = ""
self._device._delay_centre = []
self._device._calibration_coefficients = []
self._device._is_calibrated = False
self._device._calibration_job_id = 0
self._device._daq_job_id = 0
self._device._data_directory = ""
self._device._build_state = str(
sys.modules["ska_low_mccs"].__version_info__
)
self._device._version_id = sys.modules["ska_low_mccs"].__version__
self._device.set_change_event("beamTrls", True, True)
self._device.set_archive_event("beamTrls", True, True)
self._device.set_change_event("transientBufferTrl", True, False)
self._device.set_archive_event("transientBufferTrl", True, False)
self._device.set_change_event("outsideTemperature", True, False)
self._device.set_archive_event("outsideTemperature", True, False)
self._device.set_change_event("dataReceivedResult", True, False)
self._device.set_archive_event("dataReceivedResult", True, False)
super().do()
return (ResultCode.OK, "Initialisation complete")
[docs] def is_On_allowed(self: MccsStation) -> bool:
"""
Check if command `Off` is allowed in the current device state.
:return: ``True`` if the command is allowed
"""
return self.get_state() in [
tango.DevState.OFF,
tango.DevState.STANDBY,
tango.DevState.ON,
tango.DevState.UNKNOWN,
tango.DevState.FAULT,
]
# ----------
# Callbacks
# ----------
def _communication_state_changed(
self: MccsStation,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: None, # wait for a power mode update
}
action = action_map[communication_state]
if action is not None:
self.op_state_model.perform_action(action)
self._health_model.update_state(
communicating=communication_state == CommunicationStatus.ESTABLISHED
)
# pylint: disable=too-many-arguments, too-many-branches
def _component_state_callback(
self: MccsStation,
power: Optional[PowerState] = None,
fault: Optional[bool] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
is_configured: Optional[bool] = None,
outside_temperature: Optional[float] = None,
data_received_result: Optional[tuple[str, str]] = None,
) -> None:
"""
Handle change in the state of the component.
This is a callback hook, called by the component manager when
the state of the component changes. For the power_state
parameter it is implemented here to drive the op_state. For the
health parameter it is implemented to update the health
attribute and push change events whenever the HealthModel's
evaluated health state changes.
:param power: An optional parameter with the new power state of the device.
:param fault: An optional parameter if the device is entering or
exiting a fault state.
:param health: An optional parameter with the new health state of the device.
:param trl: TRL of the device whose state has changed.
None if the device is a station.
:param is_configured: An optional flag indicating whether the
Station is configured.
:param outside_temperature: the outside temperature reported
by the field station.
:param data_received_result: the dataReceivedResult reported
by the SPS station.
:raises ValueError: If TRL not found
"""
if trl is None:
power_state_changed_callback = self._component_power_state_changed
if power is not None:
self._health_model.update_state(fault=fault, power=power)
else:
self._health_model.update_state(fault=fault)
else:
device_family = trl.split("/")[1]
if device_family == "fieldstation":
health_state_changed_callback = functools.partial(
self._health_model.field_station_health_changed, trl
)
power_state_changed_callback = (
self.component_manager._field_station_power_state_changed
)
elif device_family == "antenna":
health_state_changed_callback = functools.partial(
self._health_model.antenna_health_changed, trl
)
power_state_changed_callback = functools.partial(
self.component_manager._antenna_power_state_changed, trl
)
elif device_family == "spsstation":
health_state_changed_callback = functools.partial(
self._health_model.sps_station_health_changed, trl
)
power_state_changed_callback = (
self.component_manager._sps_station_power_state_changed
)
else:
raise ValueError(
f"Unknown TRL '{trl}', should be None or belong to antenna,"
" tile, spsstation or fieldstation"
)
if power is not None:
with self.component_manager.power_state_lock:
power_state_changed_callback(power)
if health is not None:
health_state_changed_callback(health)
if is_configured is not None:
self._obs_state_model.is_configured_changed(is_configured)
if outside_temperature is not None:
self._outside_temperature_changed(outside_temperature)
if data_received_result is not None:
self._data_received_result_changed(data_received_result)
def _component_power_state_changed(
self: MccsStation,
power_state: PowerState,
) -> None:
"""
Handle change in the power mode of the component.
This is a callback hook, called by the component manager when
the power mode of the component changes. It is implemented here
to drive the op_state.
:param power_state: the power mode of the component.
"""
action_map = {
PowerState.OFF: "component_off",
PowerState.STANDBY: "component_standby",
PowerState.ON: "component_on",
PowerState.UNKNOWN: "component_unknown",
}
self.op_state_model.perform_action(action_map[power_state])
[docs] def health_changed(self: MccsStation, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state != health:
self._health_state = health
self.push_change_event("healthState", health)
def _outside_temperature_changed(
self: MccsStation, outside_temperature: float
) -> None:
"""
Handle change in the outside temperature.
This passes changes in the outside temperature,
which originate from the field station.
:param outside_temperature: the outside temperature
"""
self.push_change_event("outsideTemperature", outside_temperature)
def _data_received_result_changed(
self: MccsStation, data_received_result: tuple[str, str]
) -> None:
"""
Handle change in the dataReceivedResult.
This passes changes in the dataReceivedResult,
which originate from the SPS station.
:param data_received_result: the data receieved result
"""
self.push_change_event("dataReceivedResult", data_received_result)
# ----------
# Attributes
# ----------
[docs] @attribute(
dtype="float",
label="refLongitude",
)
def refLongitude(self: MccsStation) -> float:
"""
Return the refLongitude attribute.
:return: the WGS84 Longitude of the station reference position
"""
return self.component_manager.ref_longitude
[docs] @attribute(
dtype="float",
label="refLatitude",
)
def refLatitude(self: MccsStation) -> float:
"""
Return the refLatitude attribute.
:return: the WGS84 Latitude of the station reference position
"""
return self.component_manager.ref_latitude
[docs] @attribute(
dtype="float",
label="refHeight",
unit="meters",
)
def refHeight(self: MccsStation) -> float:
"""
Return the refHeight attribute.
:return: the ellipsoidal height of the station reference position
"""
return self.component_manager.ref_height
[docs] @attribute(
dtype="DevString",
format="%s",
)
def transientBufferTrl(self: MccsStation) -> str:
"""
Return the TRL of the TANGO device that managers the transient buffer.
:return: the TRL of the TANGO device that managers the
transient buffer
"""
return self._transient_buffer_trl
[docs] @attribute(dtype="DevBoolean")
def isCalibrated(self: MccsStation) -> bool:
"""
Return a flag indicating whether this station is currently calibrated or not.
:return: a flag indicating whether this station is currently
calibrated or not.
"""
return self._is_calibrated
[docs] @attribute(
dtype="DevLong",
format="%i",
)
def calibrationJobId(self: MccsStation) -> int:
"""
Return the calibration job id.
:return: the calibration job id
"""
return self._calibration_job_id
[docs] @attribute(
dtype="DevLong",
format="%i",
)
def daqJobId(self: MccsStation) -> int:
"""
Return the DAQ job id.
:return: the DAQ job id
"""
return self._daq_job_id
[docs] @attribute(
dtype="DevString",
format="%s",
)
def dataDirectory(self: MccsStation) -> str:
"""
Return the data directory.
(the parent directory for all files generated by this station)
:return: the data directory
"""
return self._data_directory
[docs] @attribute(
dtype=("DevString",),
max_dim_x=8,
format="%s",
)
def beamTrls(self: MccsStation) -> list[str]:
"""
Return the TRLs of station beams associated with this station.
:return: the TRLs of station beams associated with this station
"""
return self._beam_trls
@attribute(
dtype=("DevFloat",),
max_dim_x=2,
)
def delayCentre(self: MccsStation) -> list[float]:
"""
Return the WGS84 position of the delay centre of the station.
:todo: WGS84 is a datum. What is the coordinate system? Latitude
and longitude? Or is it SUTM50 eastings and northings?
Either way, do we need to allow for elevation too?
:return: the WGS84 position of the delay centre of the station
"""
return self._delay_centre
[docs] @delayCentre.write # type: ignore[no-redef]
def delayCentre(self: MccsStation, value: list[float]) -> None:
"""
Set the delay centre of the station.
:param value: WGS84 position
"""
self._delay_centre = value
[docs] @attribute(
dtype=("DevFloat",),
max_dim_x=512,
)
def calibrationCoefficients(self: MccsStation) -> list[float]:
"""
Return the calibration coefficients for the station.
:todo: How big should this array be? 4 complex values (Jones
matrix) per channel. This station can have
up to 16 tiles of up to 16 antennas, so that
is 8 x 16 x 16 = 2048 coefficients per channel. But how
many channels? 384 channels, 786432 elements
per station (402M for SKA Low)
:return: the calibration coefficients
"""
return self._calibration_coefficients
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsStation) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsStation, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
self._health_model.update_health()
[docs] @attribute(dtype="DevLong")
def numberOfChannels(self: MccsStation) -> int:
"""
Return the total number of channels in the beamformer.
:return: the total number of channels
"""
return self.component_manager._number_of_channels
[docs] @attribute(dtype="DevBoolean")
def isSynchronised(self: MccsStation) -> bool:
"""
Return true if all tiles in the SpsStation are synchronised.
:return: true if all tiles in the SpsStation are synchronised.
"""
return all(
tileprogrammingstate.lower() == "synchronised"
for tileprogrammingstate in self.component_manager.tileprogrammingstate
)
[docs] @attribute(dtype="DevBoolean")
def isInitialised(self: MccsStation) -> bool:
"""
Return true if all tiles in the SpsStation are initialised.
:return: true if all tiles in the SpsStation are initialised.
"""
return all(
tileprogrammingstate.lower() == "initialised"
for tileprogrammingstate in self.component_manager.tileprogrammingstate
)
[docs] @attribute(dtype="float", label="OutsideTemperature")
def outsideTemperature(self: MccsStation) -> Optional[float]:
"""
Return the OutsideTemperature.
:return: the OutsideTemperature.
"""
return self.component_manager.outside_temperature
[docs] @attribute(dtype="DevString")
def healthReport(self: MccsStation) -> str:
"""
Get the health report.
:return: the health report.
"""
return self._health_model.health_report
[docs] @attribute(
dtype=("str",),
max_dim_x=2, # Always the last result (unique_id, JSON-encoded result)
)
def dataReceivedResult(self: MccsStation) -> tuple[str, str] | None:
"""
Read the result of the receiving of data.
:return: A tuple containing the data mode of transmission and a json
string with any additional data about the data such as the file
name.
"""
return self.component_manager.data_received_result
[docs] @attribute(dtype=("float",), max_dim_x=513)
def lastPointingDelays(self: MccsStation) -> list[float]:
"""
Return last pointing delays applied to the tiles.
Values are initialised to 0.0 if they haven't been set.
These values are in antenna EEP order.
:returns: last pointing delays applied to the tiles.
"""
return self.component_manager.last_pointing_delays
# --------
# Commands
# --------
[docs] @command(
dtype_in="DevVarDoubleArray",
dtype_out="DevVarLongStringArray",
)
def LoadPointingDelays(
self: MccsStation,
argin: np.ndarray,
) -> DevVarLongStringArrayType:
"""
Set the pointing delay parameters of this Station's Tiles.
:param argin: an array containing a beam index followed by antenna delays
in antenna EEP order. 1 + 256*2 = 513 elements.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:example:
>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> dp.command_inout("LoadPointingDelays", delay_list)
"""
handler = self.get_command_object("LoadPointingDelays")
(return_code, message) = handler(argin)
return ([return_code], [message])
[docs] @command(
dtype_in="DevString",
dtype_out="DevVarLongStringArray",
)
def ApplyPointingDelays(self: MccsStation, argin: str) -> DevVarLongStringArrayType:
"""
Set the pointing delay parameters of this Station's Tiles.
:param argin: switch time, in ISO formatted time. Default: now
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:example:
>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> time_string = switch time as ISO formatted time
>>> dp.command_inout("ApplyPointingDelays", time_string)
"""
handler = self.get_command_object("ApplyPointingDelays")
(return_code, message) = handler(argin)
return ([return_code], [message])
[docs] @command(
dtype_in="DevLong",
dtype_out="DevVarLongStringArray",
)
def DeallocateSubarray(
self: MccsStation, subarray_id: int
) -> DevVarLongStringArrayType:
"""
Deallocates entries relative to a subarray in aggregate tables.
:param subarray_id: the ID of the subarray to deallocate
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
return_code = self.component_manager.deallocate_subarray(subarray_id)
message = "DeallocateSubarray completed"
# handler = self.get_command_object("DeallocateSubarray")
# (return_code, message) = handler(subarray_id)
return ([return_code], [message])
[docs] @command(
dtype_in="DevString",
dtype_out="DevVarLongStringArray",
)
def ApplyConfiguration(
self: MccsStation,
transaction_id: str,
) -> DevVarLongStringArrayType:
"""
Apply the aggregated channel table to this Station's SpsStation.
:param transaction_id: transaction id for the configuration
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:example:
>>> dp = tango.DeviceProxy("low-mccs/station/ci-1")
>>> dp.command_inout("ApplyConfiguration")
"""
handler = self.get_command_object("ApplyConfiguration")
(return_code, message) = handler(transaction_id)
return ([return_code], [message])
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def TrackObject(self: MccsStation, argin: str) -> DevVarLongStringArrayType:
"""
Track an object through the sky.
:param argin: Configuration parameters encoded in a json string
:return: The id of the tracking thread.
"""
tracking_id = self.component_manager.tracking_id
handler = self.get_command_object("TrackObject")
(result_code, _) = handler(argin)
return ([result_code], [str(tracking_id)])
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray")
def StopTracking(self: MccsStation, track_id: int) -> DevVarLongStringArrayType:
"""
Stop tracking an object.
:param track_id: The ID of the thread you wish to stop tracking.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("StopTracking")
(result_code, unique_id) = handler(track_id)
return ([result_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def StopTrackingAll(self: MccsStation) -> DevVarLongStringArrayType:
"""
Stop all tracking.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("StopTrackingAll")
(result_code, unique_id) = handler()
return ([result_code], [unique_id])
[docs] class GetPointingDelaysCommand(FastCommand):
"""Class for handling the GetPointingDelays() command."""
SCHEMA: Final = json.loads(
importlib.resources.files("ska_low_mccs.schemas.station")
.joinpath("MccsStation_GetPointingDelays_3_0.json")
.read_text()
)
[docs] def __init__(
self: MccsStation.GetPointingDelaysCommand,
component_manager: StationComponentManager,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param component_manager: the device to which this command belongs.
:param logger: a logger for this command to use.
"""
self._component_manager = component_manager
validator = JsonValidator("GetPointingDelays", self.SCHEMA, logger)
super().__init__(logger, validator)
[docs] def do(
self: MccsStation.GetPointingDelaysCommand,
*args: Any,
**kwargs: Any,
) -> np.ndarray:
"""
Implement :py:meth:`MccsStation.GetPointingDelaysCommand` command.
:param args: unspecified positional arguments. This should be empty and is
provided for type hinting only
:param kwargs: unspecified keyword arguments. This should be empty and is
provided for type hinting only
:return: json encoded string containing list of dictionaries
"""
pointing_type = kwargs.get("pointing_type", None)
values = kwargs.get("values", None)
time_step = kwargs.get("time_step", 10)
reference_time = kwargs.get("reference_time", None)
return self._component_manager.get_pointing_delays(
pointing_type=pointing_type,
values=values,
time_step=time_step,
reference_time=reference_time,
)
[docs] @command(
dtype_in="DevString",
dtype_out="DevVarDoubleArray",
)
def GetPointingDelays(self: MccsStation, argin: str) -> np.ndarray:
"""
Get Pointing Coefficients.
:param argin: stringified dict of args
:return: The pointing delays as pairs of (delay, delay rate) in EEP order.
"""
handler = self.get_command_object("GetPointingDelays")
return handler(argin)
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def Scan(self: MccsStation, argin: str) -> DevVarLongStringArrayType:
"""
Start the scan associated with the station_beam.
:param argin: Configuration parameters encoded in a json string
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("Scan")
(result_code, unique_id) = handler(argin)
return ([result_code], [unique_id])
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray")
def EndScan(self: MccsStation, subarray_id: int) -> DevVarLongStringArrayType:
"""
Stop the current scan associated with the station_beam.
:param subarray_id: the subarray for which the command applies
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("EndScan")
(result_code, unique_id) = handler(subarray_id)
return ([result_code], [unique_id])
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray")
def AcquireDataForCalibration(
self: MccsStation, channel: int
) -> DevVarLongStringArrayType:
"""
Instruct the SpsStation to start acquiring calibration data from the tiles.
:param channel: the frequency channel to calibrate for.
:return: A tuple containing a return code and a string message
indicating status. The message is for information
purpose only.
"""
handler = self.get_command_object("AcquireDataForCalibration")
(result_code, unique_id) = handler(channel)
return ([result_code], [unique_id])
[docs] class StartAcquisitionCommand(SubmittedSlowCommand):
# pylint: disable=line-too-long
"""
Class for handling the StartAcquisition() command.
This command takes as input a JSON string that conforms to the
following schema:
.. literalinclude:: /../../src/ska_low_mccs/schemas/common/Mccs_StartAcquisition.json
:language: json
""" # noqa: E501
SCHEMA: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.common",
"Mccs_StartAcquisition.json",
)
)
[docs] def __init__(
self: MccsStation.StartAcquisitionCommand,
command_tracker: CommandTracker,
component_manager: StationComponentManager,
callback: Optional[Callable] = None,
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
validator = JsonValidator("StartAcquisition", self.SCHEMA, logger)
super().__init__(
"StartAcquisition",
command_tracker,
component_manager,
"start_acquisition",
callback=callback,
logger=logger,
validator=validator,
)
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray")
def StartAcquisition(self: MccsStation, argin: str) -> DevVarLongStringArrayType:
"""
Start data acquisition.
:param argin: json dictionary with optional keywords:
* start_time - (ISO UTC time) start time
* delay - (int) delay start if StartTime is not specified, default 2s
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
handler = self.get_command_object("StartAcquisition")
(return_code, unique_id) = handler(argin)
return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray")
def Initialise(self: MccsStation) -> DevVarLongStringArrayType:
"""
Initialise this station's tiles.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
handler = self.get_command_object("Initialise")
(return_code, unique_id) = handler()
return ([return_code], [unique_id])
# ----------
# Run server
# ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsStation.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()