Source code for ptp

# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
PTP (Precision Time Protocol) Peripheral ICL (Instrument Control Layer).
Abstracts the FPGA registers into a more user-friendly interface, including a
wrapper for configuration registers inside the IP core.
"""
from enum import IntEnum

from ska_low_cbf_fpga import DISCOVER_PROPERTIES, FpgaPeripheral, IclField


[docs]class PtpCommand(IntEnum): """Command codes used by the PTP core""" RELOAD_PROFILE = 0 ENABLE = 1 DISABLE = 2 SERVO_STOP = 3 SERVO_RESUME = 4 PPS_MODE = 5 PTP_MODE = 6
[docs]class Ptp(FpgaPeripheral): """ ICL for Base PTP Peripheral Configuration """ _cfg_properties = { "seconds_lo": 0x8040, "seconds_hi": 0x8041, "nsec": 0x8042, "valid_time": 0x8043, "profile_announce_intvl": 0x8044, "profile_delay_intvl": 0x8045, "profile_announce_rcpt_timeout": 0x8046, "profile_domain_num": 0x8047, "profile_delay_mechanism": 0x8048, "profile_report_period": 0x8049, "profile_mac_hi": 0x804A, # HH------ "profile_mac_lo": 0x804B, # ----LLXX "profile_this_ip_addr": 0x804C, "profile_transport_proto": 0x804D, "cmd": 0x804E, "cmd_seq": 0x804F, "seq_id": 0x8100, "blk1_pathdly_lo": 0x8114, "blk1_pathdly_hi": 0x8115, "blk1_unk1": 0x8116, "blk1_seq_id": 0x8117, "blk1_time_secs": 0x8118, "blk1_time_frac_secs": 0x8119, "blk1_last_delta": 0x811A, "blk1_last_phaseinc_lo": 0x811B, "blk1_last_phaseinc_hi": 0x811C, "blk1_seq_errs": 0x811D, "blk1_packet_drop": 0x811E, "blk1_t1_sec": 0x811F, "blk1_t1_nano": 0x8120, "blk1_t2_sec": 0x8121, "blk1_t2_nano": 0x8122, "blk1_t3_sec": 0x8123, "blk1_t3_nano": 0x8124, "blk1_t4_sec": 0x8125, "blk1_t4_nano": 0x8126, "blk1_t4_ref_clk_per_pps": 0x8127, } """key: name, value: offset""" _signed_cfg_properties = { "blk1_last_delta", "blk1_t1_sec", "blk1_t1_nano", "blk1_t2_sec", "blk1_t2_nano", "blk1_t3_sec", "blk1_t3_nano", "blk1_t4_sec", "blk1_t4_nano", } """names of cfg_properties that should be interpreted as signed""" # the dynamic config attrs are not automatically discovered, # so we list here any that we want shown to the control system _user_attributes = {DISCOVER_PROPERTIES, "blk1_last_delta"} def __getattr__(self, item) -> IclField: """Get config param from ram buffer""" if item in self._cfg_properties: offset = self._cfg_properties[item] value = self["data"][offset] if item in self._signed_cfg_properties: # Convert 2s complement to signed int value = (value ^ 0x80000000) - 0x80000000 return IclField( address=self["data"].address + offset, description=item, value=value, type_=int, user_write=False, ) return super().__getattr__(item) def __setattr__(self, key, value): """Set config param in ram buffer""" if key in self._cfg_properties: self["data"][self._cfg_properties[key]] = value return super().__setattr__(key, value) def __dir__(self): """Add our config params to the directory""" return list(super().__dir__()) + list(self._cfg_properties.keys()) @property def user_mac_address(self) -> IclField[int]: """ Get the user-configurable portion of the MAC address (lower 3 bytes) """ a = (self.profile_mac_hi.value & 0xFF000000) >> 24 b = self.profile_mac_lo.value & 0xFF c = (self.profile_mac_lo.value & 0xFF00) >> 8 return IclField( description="Low 3 bytes of MAC address", value=(a << 16) | (b << 8) | c, type_=int, ) @user_mac_address.setter def user_mac_address(self, mac_address) -> None: """ Set the user-configurable portion of the MAC address (lower 3 bytes) :param mac_address: MAC address, only the low 3 bytes are used. """ original_hi = self.profile_mac_hi.value original_lo = self.profile_mac_lo.value # shift by 8 is equivalent to shifting down 16 then up 24 self.profile_mac_hi = (original_hi & 0x00FFFFFF) | ( (mac_address & 0xFF0000) << 8 ) # swap lower two bytes self.profile_mac_lo = (original_lo & 0xFFFF0000) | ( ((mac_address & 0xFF) << 8) | ((mac_address & 0xFF00) >> 8) ) @property def mac_address(self) -> IclField[str]: """Get MAC address""" return IclField( value="DC:3C:F6:" # top 3 bytes are hard coded in PTP core + ":".join( f"{self.user_mac_address.value:06x}"[_ : _ + 2] for _ in range(0, 6, 2) ).upper(), description="Full MAC address", type_=str, )
[docs] def command(self, cmd: PtpCommand) -> None: """Execute a PTP command""" self.cmd = cmd.value self.cmd_seq += 1
[docs] def startup(self, mac_address: int = 0x010203, domain: int = 1) -> None: """ Start PTP :param int mac_address: MAC address, only the low 3 bytes are used. :param int domain: PTP domain """ if self.profile_domain_num == domain and self.user_mac_address == mac_address: self._logger.debug("PTP already configured, will not reload profile") return self.profile_domain_num = domain self.user_mac_address = mac_address self.command(PtpCommand.RELOAD_PROFILE)