Source code for ska_mid_dish_simulators.utils

"""Shared methods in package."""

import logging
import math
import random
import time
from datetime import datetime, timezone
from functools import wraps
from itertools import takewhile

from astropy.time import Time

SKA_EPOCH = "1999-12-31T23:59:28Z"


[docs]def random_delay_execution(func): """Delay a command a bit.""" @wraps(func) def inner(*args, **kwargs): time.sleep(round(random.uniform(1.5, 2.5), 2)) return func(*args, **kwargs) return inner
_PYTHON_TO_TANGO_LOGGING_LEVEL = { logging.CRITICAL: 200, logging.ERROR: 300, logging.WARNING: 400, logging.INFO: 500, logging.DEBUG: 600, }
[docs]def raise_exception_in_cmd(func): """Raise an exception in the command.""" @wraps(func) def inner(*args, **kwargs): # args[0] is self raise_cmd_ex = args[0]._raise_cmd_exception if raise_cmd_ex: raise Exception(f"{func.__name__} raised an exception") return func(*args, **kwargs) return inner
[docs]def get_tai_timestamp_from_unix_s(unix_s: float) -> float: """ Calculate atomic time from SKA epoch (in seconds) from unix time (in seconds). :param unix_s: Unix time in seconds :return: atomic time (tai) in seconds """ unix_time = Time(unix_s, format="unix") ska_epoch_tai = Time(SKA_EPOCH, scale="utc").unix_tai return unix_time.unix_tai - ska_epoch_tai
[docs]def get_unix_s_from_tai_timestamp(tai: float) -> float: """ Calculate unix time (in seconds) from atomic time SKA epoch (in seconds). :param tai: atomic time (tai) in seconds :return: Unix time in seconds """ ska_epoch_tai = Time(SKA_EPOCH, scale="utc").unix_tai unix_tai = tai + ska_epoch_tai return Time(unix_tai, format="unix_tai").unix
[docs]def get_datetime_from_tai(tai: float) -> datetime: """ Get the datetime object of tai representation. :param tai: atomic time (tai) in seconds :return: datetime object with timezone set as utc """ unix_s = get_unix_s_from_tai_timestamp(tai) return datetime.fromtimestamp(unix_s, tz=timezone.utc)
[docs]def get_current_tai_timestamp() -> float: """Get the current time as a TAI timestamp.""" return get_tai_timestamp_from_unix_s(time.time())
[docs]def arcsec_to_degrees(val_arcsec: float) -> float: """ Convert angle from arcsec to degrees. :param val_arcsec: Angle in arcsecs :return: Angle in degrees """ return val_arcsec / 3600
[docs]def degrees_to_arcsec(val_deg: float) -> float: """ Convert angle from degrees to arcsec. :param val_deg: Angle in degrees :return: Angle in arcsec """ return val_deg * 3600
[docs]def radians_to_arcsec(radians: float) -> float: """ Convert an angle from radians to arcseconds. :param radians: Angle in radians :return: Angle in arcseconds """ return radians * (180 / math.pi) * 3600
[docs]def linearly_interpolate_points(point1, point2, segments): """ Linearly interpolate between two values. :param point1: Float value for point1. :param point2: Float value for point2. :return: List of <segments> points linearly interpolated between point1 and point2. """ def spline(x_coord): return point1 + (point2 - point1) * x_coord points = [] for i in range(segments): x_coord = i / segments points.append(spline(x_coord)) return points
[docs]def reformat_track_load_table_log_msg_args( load_mode: str, sequence_length: int, timestamp_data: list[float], az_data: list[float], el_data: list[float], max_entries_to_log: int = 10, ) -> str: """ Truncate TrackLoadTable command args sequences. This is done by removing the padded zeroes in the supplied tracktable to be shown in the log message. :param load_mode: The tracktable load mode value. :type load_mode: str :param sequence_length: The number of valid data entries in the seqeunces. :type sequence_length: int :param timestamp_data: List of tracktable timestamp values. :type timestamp_data: list[float] :param az_data: List of tracktable azimuth values. :type az_data: list[float] :param el_data: List of tracktable elevation values. :type el_data: list[float] :return: Formatted string with truncated arguments and zero padding count. :rtype: str """ # get the padded zero's - loop backwards until the first non-zero is found zero_padding_count = len(list(takewhile(lambda x: str(x) == "0.0", reversed(timestamp_data)))) end_idx = len(timestamp_data) - zero_padding_count truncated_ts_data = timestamp_data[:end_idx] truncated_az_data = az_data[:end_idx] truncated_el_data = el_data[:end_idx] def _summarize(data: list[float]) -> str: """ Summarize long sequences for logging. Example: [1, 2, 3, ..., 998, 999] (1000 entries) """ if len(data) <= max_entries_to_log: return str(data) head_count = max_entries_to_log // 2 tail_count = max_entries_to_log - head_count head = data[:head_count] tail = data[-tail_count:] return f"{head} ... {tail} ({len(data)} entries)" formatted_args = ( f"[{load_mode}, {sequence_length}, " f"{_summarize(truncated_ts_data)}, " f"{_summarize(truncated_az_data)}, " f"{_summarize(truncated_el_data)}]. " f"Track table data padded with " f"[{zero_padding_count}] zeroes" ) return formatted_args