from __future__ import annotations
import json
import logging
import os
from collections import namedtuple
from ssl import SSLEOFError
from urllib.error import URLError
import backoff
import toolz
from ska_telmodel_client import TMData
from tango import Database, DeviceProxy
from ska_oso_scripting.engineering.low.utils.tango_utils import get_device
LOG = logging.getLogger(__name__)
PASD_FIELDS = "station pasdbus fndh smartboxes"
PASDDevices = namedtuple("PASDDevices", PASD_FIELDS)
MCCS_FIELDS = "controller stations subarrays subarray_beams station_beams"
MCCSDevices = namedtuple("MCCSDevices", MCCS_FIELDS)
CSP_FIELDS = "controller subarrays"
CSPDevices = namedtuple("CSPDevices", CSP_FIELDS)
SPS_FIELDS = "station subracks tpms daqs"
SPSDevices = namedtuple("SPSDevices", SPS_FIELDS)
CBF_FIELDS = "controller allocator connector processors delaypoly cnics subarrays"
CBFDevices = namedtuple("CBFDevices", CBF_FIELDS)
SDP_FIELDS = "controller queue_connectors subarrays"
SDPDevices = namedtuple("SDPDevices", SDP_FIELDS)
TMC_FIELDS = (
"central_node "
+ "csp_master_leafnode csp_subarray_leafnodes "
+ "mccs_master_leafnode mccs_subarray_leafnodes "
+ "sdp_master_leafnode sdp_subarray_leafnodes "
+ "tmc_subarray_nodes"
)
TMCDevices = namedtuple("TMCDevices", TMC_FIELDS)
[docs]
def get_tmc_devices() -> TMCDevices:
"""
Returns TMC Tango devices.
Of which the fields are:
central_node
csp_master_leafnode csp_subarray_leafnodes
mccs_master_leafnode mccs_subarray_leafnodes
sdp_master_leafnode sdp_subarray_leafnodes
tmc_subarray_nodes
These devices are derived by following Tango properties from the TMC
central node. This device has properties containing the TRLs of each
of the subsystem controller leaf nodes, and to the TMC subarray nodes.
The TMC subarray nodes have a property containing the TRL of the
corresponding subarray leaf node device for each subsystem.
Therefore, it is possible for TMC devices to exist that are not returned
by this function, if they are not referenced directly or transitively
via the Tango properties of the TMC central node.
:return: TMCDevice namedtuple containing central node, controller leaf
node, subarray node and subarray leaf node devices.
"""
database: Database = Database()
[cn_dev_name] = database.get_device_exported_for_class("LowTmcCentralNode")
central_node = get_device(cn_dev_name)
cn_props = central_node.get_property(central_node.get_property_list("*"))
csp_master_leafnode = get_device(cn_props["CspMasterLeafNodeFQDN"][0])
sdp_master_leafnode = get_device(cn_props["SdpMasterLeafNodeFQDN"][0])
mccs_master_leafnode = get_device(cn_props["MccsMasterLeafNodeFQDN"][0])
tmc_subarray_nodes = [get_device(trl) for trl in cn_props["TMCSubarrayNodes"]]
sdp_subarray_leafnodes = []
csp_subarray_leafnodes = []
mccs_subarray_leafnodes = []
for subarray_node in tmc_subarray_nodes:
sa_props = subarray_node.get_property(subarray_node.get_property_list("*"))
sdp_subarray_leafnodes.append(get_device(sa_props["SdpSubarrayLNFQDN"][0]))
csp_subarray_leafnodes.append(get_device(sa_props["CspSubarrayLNFQDN"][0]))
mccs_subarray_leafnodes.append(get_device(sa_props["MccsSubarrayLNFQDN"][0]))
return TMCDevices(
central_node,
csp_master_leafnode,
csp_subarray_leafnodes,
mccs_master_leafnode,
mccs_subarray_leafnodes,
sdp_master_leafnode,
sdp_subarray_leafnodes,
tmc_subarray_nodes,
)
[docs]
def get_mccs_devices() -> MCCSDevices:
"""
Return stations, subarray beams, station beams, and subarrays.
"""
database = get_mccs_tangodb()
controller = get_mccs_device(
database.get_device_exported_for_class("MccsController")[0]
)
props = controller.get_property(controller.get_property_list("*"))
def get_subdevices(prop):
return [get_mccs_device(trl) for trl in props[prop]]
stations = get_subdevices("MccsStations")
subarray_beams = get_subdevices("MccsSubarrayBeams")
station_beams = get_subdevices("MccsStationBeams")
subarrays = get_subdevices("MccsSubarrays")
return MCCSDevices(controller, stations, subarrays, subarray_beams, station_beams)
[docs]
def get_cbf_devices() -> CBFDevices:
"""
CBF Devices.
:return: CBFDevices named tuple
"""
database: Database = Database()
controller = get_device("low-cbf/control/0")
allocator = get_device("low-cbf/allocator/0")
connector = get_device("low-cbf/connector/0")
delaypoly = get_device("low-cbf/delaypoly/0")
cnics = [
get_device(trl) for trl in database.get_device_exported_for_class("CnicDevice")
]
subarrays = [
get_device(trl)
for trl in database.get_device_exported_for_class("LowCbfSubarray")
]
processors = [
get_device(trl)
for trl in database.get_device_exported_for_class("LowCbfProcessor")
]
return CBFDevices(
controller, allocator, connector, processors, delaypoly, cnics, subarrays
)
[docs]
def get_sdp_devices() -> SDPDevices:
"""
SDP Devices.
:return: A named tuple instance holding SDP devices.
"""
database: Database = Database()
controller = get_device("low-sdp/control/0")
queue_connectors = [
get_device(trl)
for trl in database.get_device_exported_for_class("SDPQueueConnector")
]
subarrays = [
get_device(trl) for trl in database.get_device_exported_for_class("SDPSubarray")
]
return SDPDevices(controller, queue_connectors, subarrays)
[docs]
def get_mccs_tangodb() -> Database:
"""
Get the Database object for the Tango DB containing MCCS devices.
:return: a tango.Database object representing the MCCS Tango database
"""
mccs_host = os.getenv("TANGO_HOST_MCCS")
return Database(*split_host_port(mccs_host, 10000) if mccs_host else [])
[docs]
def get_mccs_device(trl: str, timeout_ms=10000) -> DeviceProxy:
"""
Grab the Tango device using a TRL, using TANGO_HOST_MCCS.
:params trl: the fully qualified domain name
:return device: The Tango device
"""
return get_device(trl, os.getenv("TANGO_HOST_MCCS"), timeout_ms=timeout_ms)
[docs]
def get_stations():
"""
Returns a list of MccsStation devices.
:param host: the Tango database host
:return: A list of MccsStation devices
"""
database = get_mccs_tangodb()
return [
get_mccs_device(trl)
for trl in database.get_device_exported_for_class("MccsStation")
]
[docs]
def split_host_port(endpoint, default_port=4660):
"""Split a host:port string into a tuple of (host, port)."""
host, port, *_ = endpoint.split(":") + [default_port]
return host, int(port)
@backoff.on_exception(backoff.expo, (SSLEOFError, URLError), max_tries=5)
def load_tmdata(*args, **kwargs) -> TMData:
"""Wrap TMData with backoff to handle flaky networks.
Arguments are as for TMData.__init__().
:returns: A TMData object.
"""
return TMData(*args, **kwargs)
@toolz.memoize
def get_telescope_name() -> str:
"""
Returns the "telescope" field from the low-layout.json file.
"""
tmdata = load_tmdata()
tmdata_file = "instrument/ska1_low/layout/low-layout.json"
json_string = tmdata[tmdata_file].get().decode()
return json.loads(json_string)["telescope"]
@toolz.memoize
def _station_id_label_mappings() -> tuple[dict[int, str], dict[str, int]]:
"""
Returns mapping from station ID to label and vice versa.
"""
tmdata = load_tmdata()
tmdata_file = "instrument/ska1_low/layout/low-layout.json"
json_string = tmdata[tmdata_file].get().decode()
stations = json.loads(json_string)["receptors"]
ids_labels = [(stn["station_id"], stn["station_label"]) for stn in stations]
return (
dict(ids_labels),
dict(map(reversed, ids_labels)),
)
[docs]
def station_label_from_id(station_id: int) -> str:
"""
Returns the station label for a given station ID.
"""
ids_to_labels, _ = _station_id_label_mappings()
return ids_to_labels[station_id]