Source code for ptp_scheduler

# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
PTP (Precision Time Protocol) Peripheral ICL (Instrument Control Layer)
with Scheduling.
Abstracts the FPGA registers into a more user-friendly interface for reading
the time and setting transmit/receive start/stop times, and provides utility
functions for format conversion.
"""
from datetime import datetime
from decimal import Decimal

from ska_low_cbf_fpga import IclField

from ska_low_cbf_sw_cnic.icl.ptp import Ptp

TIME_STR_FORMAT = "%Y-%m-%d %H:%M:%S.%f"

TIMESTAMP_BITS = 80
"""48 bits (integer) seconds, 32 bits of nanoseconds"""
TIMESTAMP_NS_BITS = 32


[docs]def combine_ptp_registers(upper: IclField, lower: IclField, sub: IclField) -> int: """ Combine 3x PTP registers into an 80 bit PTP timestamp :param upper: Upper (most significant) seconds register :param lower: Lower seconds register :param sub: Sub-seconds (nanoseconds) register """ return (upper.value << 64) | (lower.value << 32) | sub.value
[docs]def datetime_from_str(time_str: str) -> datetime: """ Convert user-supplied string to datetime object :param time_str: "%Y-%m-%d %H:%M:%S[.%f]" (microseconds is optional) """ try: return datetime.strptime(time_str, TIME_STR_FORMAT) except ValueError: return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
[docs]def ptp_ts_from_float(ts: float) -> int: """ Convert a UNIX timestamp to PTP 80 bit value :param ts: UNIX timestamp :return: 80 bit PTP value (48 bits of seconds and 32 bits of nanoseconds) """ seconds, fractional_seconds = divmod(ts, 1) seconds = int(seconds) & 0xFFFF_FFFF_FFFF nanoseconds = int(fractional_seconds * 1_000_000_000) return (seconds << TIMESTAMP_NS_BITS) | nanoseconds
[docs]def split_datetime(t: datetime) -> (int, int, int): """ Split a datetime into 3 register values :param t: target time to be decoded :return: seconds upper 32 bits, seconds lower 32 bits, sub seconds (nanoseconds) """ ptp_ts = ptp_ts_from_float(t.timestamp()) upper = (ptp_ts & 0xFFFF_0000_0000_0000_0000) >> 64 lower = (ptp_ts & 0x0000_FFFF_FFFF_0000_0000) >> 32 subsec = ptp_ts & 0x0000_0000_0000_FFFF_FFFF return upper, lower, subsec
[docs]def time_str_from_registers(upper: IclField, lower: IclField, sub: IclField) -> str: """ Combine 3 PTP time registers and render as string :param upper: Upper (most significant) seconds register :param lower: Lower seconds register :param sub: Sub-seconds (nanoseconds) register """ timestamp = unix_ts_from_ptp(combine_ptp_registers(upper, lower, sub)) dt = datetime.fromtimestamp(float(timestamp)) return dt.strftime(TIME_STR_FORMAT)
[docs]def unix_ts_from_ptp(ptp_timestamp: int) -> Decimal: """Get UNIX timestamp from 80 bit PTP value""" ns_mask = (1 << TIMESTAMP_NS_BITS) - 1 sub_seconds = Decimal(ptp_timestamp & ns_mask) / Decimal(1e9) seconds = ptp_timestamp >> TIMESTAMP_NS_BITS return seconds + sub_seconds
[docs]class PtpScheduler(Ptp): """ ICL for PTP with Scheduling """ @property def unix_timestamp(self) -> IclField[float]: """Get current time (UNIX ts)""" return IclField( value=float( unix_ts_from_ptp( combine_ptp_registers( self.current_ptp_seconds_upper, self.current_ptp_seconds_lower, self.current_ptp_sub_seconds, ) ) ), description="Current UNIX time", type_=float, ) @property def time(self) -> IclField[str]: """Get current time""" return IclField( value=( time_str_from_registers( self.current_ptp_seconds_upper, self.current_ptp_seconds_lower, self.current_ptp_sub_seconds, ) ), description="Current time", type_=str, ) @property def tx_start_time(self) -> IclField[str]: """Read the scheduled transmission start time""" return IclField( description="Transmit Start Time", type_=str, value=time_str_from_registers( self.tx_start_ptp_seconds_upper, self.tx_start_ptp_seconds_lower, self.tx_start_ptp_sub_seconds, ), ) @tx_start_time.setter def tx_start_time(self, start_time: str) -> None: """ Schedule a transmission start time :param start_time: time to start at, see datetime_from_str for string format. Use empty string or None to disable """ if start_time: ( self.tx_start_ptp_seconds_upper, self.tx_start_ptp_seconds_lower, self.tx_start_ptp_sub_seconds, ) = split_datetime(datetime_from_str(start_time)) self.schedule_control_tx_start_time = bool(start_time) @property def tx_stop_time(self) -> IclField[str]: """Read the scheduled transmission stop time""" return IclField( description="Transmit Stop Time", type_=str, value=time_str_from_registers( self.tx_stop_ptp_seconds_upper, self.tx_stop_ptp_seconds_lower, self.tx_stop_ptp_sub_seconds, ), ) @tx_stop_time.setter def tx_stop_time(self, stop_time: str) -> None: """ Schedule a transmission stop time :param stop_time: time to stop at, see datetime_from_str for string format. Use empty string or None to disable """ if stop_time: ( self.tx_stop_ptp_seconds_upper, self.tx_stop_ptp_seconds_lower, self.tx_stop_ptp_sub_seconds, ) = split_datetime(datetime_from_str(stop_time)) self.schedule_control_tx_stop_time = bool(stop_time) @property def rx_start_time(self) -> IclField[str]: """Read the scheduled reception start time""" return IclField( description="Receive Start Time", type_=str, value=time_str_from_registers( self.rx_start_ptp_seconds_upper, self.rx_start_ptp_seconds_lower, self.rx_start_ptp_sub_seconds, ), ) @rx_start_time.setter def rx_start_time(self, start_time: str) -> None: """ Schedule a reception start time :param start_time: time to start at, see datetime_from_str for string format. Use empty string or None to disable """ if start_time: ( self.rx_start_ptp_seconds_upper, self.rx_start_ptp_seconds_lower, self.rx_start_ptp_sub_seconds, ) = split_datetime(datetime_from_str(start_time)) self.schedule_control_rx_start_time = bool(start_time) @property def rx_stop_time(self) -> IclField[str]: """Read the scheduled reception stop time""" return IclField( description="Receive Stop Time", type_=str, value=time_str_from_registers( self.rx_stop_ptp_seconds_upper, self.rx_stop_ptp_seconds_lower, self.rx_stop_ptp_sub_seconds, ), ) @rx_stop_time.setter def rx_stop_time(self, stop_time: str) -> None: """ Schedule a reception stop time :param stop_time: time to stop at, see datetime_from_str for string format. Use empty string or None to disable """ if stop_time: ( self.rx_stop_ptp_seconds_upper, self.rx_stop_ptp_seconds_lower, self.rx_stop_ptp_sub_seconds, ) = split_datetime(datetime_from_str(stop_time)) self.schedule_control_rx_stop_time = bool(stop_time)
[docs] def reset(self) -> None: self.schedule_control_reset = True self.schedule_control_tx_start_time = False self.schedule_control_tx_stop_time = False self.schedule_control_rx_start_time = False self.schedule_control_rx_stop_time = False self.schedule_control_reset = False