Source code for talondx

#!/usr/bin/env python3
import argparse
import copy
import getpass
import logging
import os
import re
import subprocess
import time
from enum import Enum

import requests
import tango
from bite_device_client.bite_client import BiteClient
from lmc_interface import LmcInterface
from requests.structures import CaseInsensitiveDict
from talondx_config.talondx_config import TalonDxConfig
from tango import DeviceProxy
from tqdm import tqdm

LOG_FORMAT = "[talondx.py: line %(lineno)s]%(levelname)s: %(message)s"


class bcolors:
    HEADER = "\033[95m"
    OKBLUE = "\033[94m"
    OKCYAN = "\033[96m"
    OKGREEN = "\033[92m"
    WARNING = "\033[93m"
    OK = "\x1b[6;30;42m"
    FAIL = "\x1b[0;30;41m"
    ENDC = "\x1b[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"


[docs]class Target(Enum): TALON_1 = "talon1" TALON_2 = "talon2" TALON_3 = "talon3" TALON_4 = "talon4" TALON_5 = "talon5" TALON_6 = "talon6" TALON_7 = "talon7" TALON_8 = "talon8" DELL = "dell"
[docs]class Version: """ Class to facilitate extracting and comparing version numbers in filenames. :param filename: string containing a version substring in the x.y.z format, where x,y,z are numbers. """ def __init__(self, filename): [ver_x, ver_y, ver_z] = re.findall("[0-9]+", filename) self.X = int(ver_x) self.Y = int(ver_y) self.Z = int(ver_z)
[docs] def match(self, ver): """ Compare two Version object and return true if the versions match. :param ver: Version object being compared to this one. """ return self.X == ver.X and self.Y == ver.Y and self.Z == ver.Z
POWER_SWITCH_USER = os.environ.get("POWER_SWITCH_USER") POWER_SWITCH_PASS = os.environ.get("POWER_SWITCH_PASS") PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) ARTIFACTS_DIR = os.path.join(PROJECT_DIR, "artifacts") TALONDX_CONFIG_FILE = os.path.join(ARTIFACTS_DIR, "talondx-config.json") DOWNLOAD_CHUNK_BYTES = 1024 TALONDX_STATUS_OUTPUT_DIR = os.environ.get("TALONDX_STATUS_OUTPUT_DIR") TALON_UNDER_TEST = os.environ.get("TALON_UNDER_TEST") GITLAB_PROJECTS_URL = "https://gitlab.drao.nrc.ca/api/v4/projects/" GITLAB_API_HEADER = { "PRIVATE-TOKEN": f'{os.environ.get("GIT_ARTIFACTS_TOKEN")}' } NEXUS_API_URL = "https://artefact.skatelescope.org/service/rest/v1/" RAW_REPO_USER = os.environ.get("RAW_USER_ACCOUNT") RAW_REPO_PASS = os.environ.get("RAW_USER_PASS")
[docs]class PowerSwitchState(Enum): ON = "ON" OFF = "OFF" UNKNOWN = "???"
[docs]class PowerSwitch: """ Class to manage the network-controlled power switch. """ def __init__(self): self._base_url = "http://192.168.0.100/restapi/relay/outlets/" # self.state = PowerSwitchState(PowerSwitchState.UNKNOWN) self.outlets = "0" # Only one Talon LRU currently supported
[docs] def state(self): """ Queries the power switch state and returns the result as PowerSwitchState enum """ api_url = f"{self._base_url}={self.outlets}/state/" header = CaseInsensitiveDict() header["Accept"] = "application/json" response = requests.get( url=api_url, headers=header, auth=(POWER_SWITCH_USER, POWER_SWITCH_PASS), ) if response.status_code in [ requests.codes.ok, # pylint: disable=no-member requests.codes.multi_status, # pylint: disable=no-member ]: return self.convert_reponse_to_state(response.text) else: logger_.info( f"Error: unrecognized or failed power switch response: {response}" ) return PowerSwitchState.UNKNOWN
@staticmethod def convert_reponse_to_state(response): if "true" in response and "false" not in response: return PowerSwitchState.ON elif "false" in response and "true" not in response: return PowerSwitchState.OFF else: return PowerSwitchState.UNKNOWN
[docs] def off(self): """ Check the switch state, then powers it off if the switch state is not already off; otherwise no action. NOTE: it is strongly recommended to shut down the Talon boards prior to powering off. """ pwr_state = self.state() if pwr_state != PowerSwitchState.OFF: logger_.info("Powering off...") header = CaseInsensitiveDict() header["Accept"] = "application/json" header["X-CSRF"] = "x" header["Content-Type"] = "application/x-www-form-urlencoded" data = "value=false" response = requests.put( url=f"{self._base_url}={self.outlets}/state/", data=data, headers=header, auth=(POWER_SWITCH_USER, POWER_SWITCH_PASS), ) if response.status_code in [ requests.codes.ok, # pylint: disable=no-member requests.codes.multi_status, # pylint: disable=no-member ]: countdown_message( message="Waiting to ensure power off ...", count=5 ) # check state after power off logger_.info( f"Power Switch (outlets: {self.outlets}): {self.state().value}" ) else: logger_.info( f"Power off request failed - response: {response.status_code}, {response.text}" ) else: logger_.info( f"No action - power switch (outlets: {self.outlets}): {pwr_state.value}" )
def countdown_message(message, count, delay_step=1): countdown = tqdm(range(count)) for c in countdown: countdown.set_description(f"{message} [{(count - c):>2}]") time.sleep(1) logger_.info("") def make_dir(target, dir): logger_.info(f"Creating directory {dir} on {target.value} ...") try: proc = subprocess.Popen( ["ssh", f"root@{target.value}", f"mkdir -p {dir}"] ) count = 0 while proc.poll() is None: count += 1 logger_.info("." * count, end="\r") time.sleep(0.2) except Exception as e: logger_.info(f"Directory creation failed (Exception: {e}).")
[docs]def get_device_version_info(config_commands): """ Reads and displays the `dsVersionId`, `dsBuildDateTime`, and `dsGitCommitHash` attributes of each HPS Tango device running on the Talon DX boards, as specified in the configuration commands -- ref `"config_commands"` in the talondx-config JSON file. :param config_commands: JSON array of configure commands :type config_commands: str """ for config_cmd in config_commands: logger_.info("================") logger_.info(f"Target: {config_cmd['target']}") logger_.info("================") devices = config_cmd["devices"] devices.insert(0, "dshpsmaster") for dev_name in get_device_fqdns( devices, config_cmd["server_instance"] ): try: dev_proxy = DeviceProxy(dev_name) if dev_proxy.import_info().exported: logger_.info(f"{dev_proxy.info().dev_class:<20}{dev_name}") attr_names = [ "dsVersionId", "dsBuildDateTime", "dsGitCommitHash", ] for attr_name in attr_names: try: attr_value = dev_proxy.read_attribute(attr_name) logger_.info( f" {attr_value.name:<20}: {attr_value.value}" ) except Exception as attr_except: logger_.info( f"Error reading attribute: {attr_except}" ) else: logger_.info(f"{dev_name} DEVICE NOT EXPORTED!") except Exception as proxy_except: logger_.info( f"Error on DeviceProxy ({dev_name}): {proxy_except}" )
[docs]def get_device_fqdn_list(): """ Get full list of fully-qualified device names (FQDNs) from the Tango database, excluding "sys" and "dserver" names. Ref: https://tango-controls.readthedocs.io/en/latest/tutorials-and-howtos/how-tos/how-to-pytango.html :returns: alphabetically-sorted list of FQDNs (str) """ try: db = tango.Database() except Exception as db_except: logger_.info(f"Database error: {db_except}") exit() instances = [] for server in db.get_server_list(): # Filter out the unwanted items from the list to get just the FQDNs of our devices... # the full list from get_device_class_list() has the structure: # [device name, class name, device name, class name, ...] # and also includes the admin server (dserver/exec_name/instance) instances += [ dev for dev in db.get_device_class_list(server) if "/" in dev and not dev.startswith("dserver") and not dev.startswith("sys") ] return sorted(instances)
[docs]def get_device_fqdns(devices, server_inst): """ Generate list of fully-qualified device names (FQDNs) from Tango database for the given list of devices and server instance. Ref: https://tango-controls.readthedocs.io/en/latest/tutorials-and-howtos/how-tos/how-to-pytango.html :param devices: device names :type devices: list of str :param server_inst: server instance :type server_inst: string :returns: list of FQDNs (str) """ try: db = tango.Database() except Exception as db_except: logger_.info(f"Database error: {db_except}") exit() dev_names = [] server_list = db.get_server_list() for ds in devices: for server in server_list: if server_inst in server and ds in server: # Filter out the unwanted items from the list to get just the FQDNs of our devices... # the full list from get_device_class_list() has the structure: # [device name, class name, device name, class name, ...] # and also includes the admin server (dserver/exec_name/instance) dev_names += [ dev for dev in db.get_device_class_list(server) if "/" in dev and not dev.startswith("dserver") ] return dev_names
[docs]def get_device_status(config_commands): """ Reads and displays the state and status of each HPS Tango device running on the Talon DX boards, as specified in the configuration commands -- ref `"config_commands"` in the talondx-config JSON file. :param config_commands: JSON array of configure commands :type config_commands: str """ for config_cmd in config_commands: logger_.info("================") logger_.info(f"Target: {config_cmd['target']}") logger_.info("================") devices = copy.deepcopy(config_cmd["devices"]) devices.insert(0, "dshpsmaster") for dev_name in get_device_fqdns( devices, config_cmd["server_instance"] ): try: dev_proxy = DeviceProxy(dev_name) except Exception as proxy_except: logger_.info( f"Error on DeviceProxy {dev_name}: {proxy_except}" ) break if dev_proxy.import_info().exported: try: ds_state = str(dev_proxy.state()) ds_status = dev_proxy.status() logger_.info( f"{dev_name:<50}: state {ds_state:<8} status={ds_status}" ) except Exception as status_except: logger_.info( f"Error reading state or status of {dev_name}: {status_except}" ) else: logger_.info(f"{dev_name} DEVICE NOT EXPORTED!")
if __name__ == "__main__": logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) logger_ = logging.getLogger("talondx.py") logger_.info(f"User: {getpass.getuser()}") parser = argparse.ArgumentParser(description="Talon DX Utility") parser.add_argument( "-v", "--verbose", help="increase output verbosity", action="store_true", ) parser.add_argument( "--db-list", help="list the FQDNs (fully qualified domain names) of the HPS and MCS devices in the Tango database", action="store_true", ) parser.add_argument( "--dish-packet-capture", help="Start the Dish Packet Capture", action="store_true", ) parser.add_argument( "--talon-version", help="get the version information of the Tango devices running on the Talon DX boards", action="store_true", ) parser.add_argument( "--talon-status", help="get the status information of the Tango devices running on the Talon DX boards", action="store_true", ) parser.add_argument( "--talon-power-status", help="get the status of the Talon LRU power supply", action="store_true", ) parser.add_argument( "--mcs-off", help="run the MCS Off command sequence", action="store_true", ) parser.add_argument( "--mcs-on", help="run the MCS On command sequence", action="store_true" ) parser.add_argument( "--scan", help="run the scan subarray commands", action="store_true" ) parser.add_argument( "--add-receptors", help="Add the receptors to subarray", action="store_true", ) parser.add_argument( "--configure-scan", help="run the subarray ConfigureScan command", action="store_true", ) parser.add_argument( "--update-delay-model", help="Update delay model to subarray", action="store_true", ) parser.add_argument( "--end-scan", help="ends the scan subarray commands", action="store_true", ) parser.add_argument( "--write-talon-status", help="write talon board status to file", action="store_true", ) parser.add_argument("--board", type=str) parser.add_argument("--vis_ip_address", type=str) parser.add_argument("--sim_mode", type=str) args = parser.parse_args() if args.db_list: logger_.info("DB List") for inst in get_device_fqdn_list(): logger_.info(inst) elif args.dish_packet_capture: logger_.info("Dish Packet Capture") subprocess.run( "python3 ./mellanox_dish_packet_capture/src/PlotSampleData.py ./mellanox_dish_packet_capture/src/default_inputs.json ./mellanox_dish_packet_capture/src/default_inputs.json", shell=True, ) elif args.talon_version: logger_.info("Talon Version Information") config = TalonDxConfig(config_file=TALONDX_CONFIG_FILE) get_device_version_info(config.config_commands()) elif args.talon_status: logger_.info("Talon Status Information") config = TalonDxConfig(config_file=TALONDX_CONFIG_FILE) while True: os.system("clear") get_device_status(config.config_commands()) time.sleep(2) elif args.talon_power_status: pwr = PowerSwitch() logger_.info( f"Power Switch (outlets: {pwr.outlets}): {pwr.state().value}" ) elif args.mcs_off: lmc_interface = LmcInterface() if args.sim_mode == "1": lmc_interface.simulation_mode = True elif args.sim_mode == "0": lmc_interface.simulation_mode = False lmc_interface.off_command() elif args.mcs_on: lmc_interface = LmcInterface() if args.sim_mode == "1": lmc_interface.simulation_mode = True elif args.sim_mode == "0": lmc_interface.simulation_mode = False lmc_interface.on_command() elif args.add_receptors: lmc_interface = LmcInterface() lmc_interface.add_receptors(int(args.board)) elif args.configure_scan: lmc_interface = LmcInterface() lmc_interface.configure_scan(int(args.board), args.vis_ip_address) elif args.update_delay_model: lmc_interface = LmcInterface() lmc_interface.update_delay_model(int(args.board)) elif args.scan: lmc_interface = LmcInterface() lmc_interface.scan() elif args.end_scan: lmc_interface = LmcInterface() lmc_interface.end_scan() elif args.write_talon_status: logger_.info("Print Talon Status") config = TalonDxConfig(config_file=TALONDX_CONFIG_FILE) for command in config.config_commands(): if "ska-talondx-status-ds" in command["devices"]: bite = BiteClient(command["server_instance"], False) bite.init_devices( "bite_device_client/json/device_server_list.json" ) bite.write_talon_status( "bite_device_client/json/status_attr_list.json", TALONDX_STATUS_OUTPUT_DIR, ) else: logger_.info("Hello from Mid CBF Engineering Console!")