Source code for ska_low_sps_tpm_api.protocols.ucp

import array
import logging
import random
import socket
import struct
import time
from enum import Enum
from math import ceil

from ska_low_sps_tpm_api import BoardMake, LibraryError
from ska_low_sps_tpm_api.base.protocol import Protocol


[docs] class UCP(Protocol): """Class implementing the UCP communication protocol""" # UCP OPCODE definitions
[docs] class OPCODE(Enum): """Device enumeration""" READ = 0x01 WRITE = 0x02 BITWISE_AND = 0x03 BITWISE_OR = 0x04 FLASH_WRITE = 0x06 FLASH_READ = 0x07 FLASH_ERASE = 0x08 FIFO_READ = 0x09 FIFO_WRITE = 0x0A BIT_WRITE = 0x0B RESET_BOARD = 0x11 PERIODIC_UPDATE = 0x12 ASYNC_UPDATE = 0x13 CANCEL_UPDATE = 0x14 OVERLAPPED_WRITE = 40 WAIT_FOR_PPS = 0xFFFFFFFF
[docs] def __init__(self, board, logger=None): """UCP class constructor""" super(UCP, self).__init__(board) # Initialise sequence number self._sequence_number = int(random.random() * 100000) # Maximum payload size self._max_values_in_payload = 256 # Define socket properties self._socket_timeout = 1 self._socket_buffer_size = 1024 * 1024 self._max_retries = 3 if logger is None: self.logger = logging.getLogger("") else: self.logger = logger
[docs] def create_connection(self, ip, port, src_ip=None): """ Create connection. :param ip: Remote IP :param port: Remote port """ # Ensure any previous socket is released before creating a new one. # This avoids descriptor growth when connect() is retried repeatedly. self.close_connection() # Create and configure socket try: self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if src_ip is not None: self.logger.warning("UCP: bind socket: {}".format(src_ip)) self._sock.bind((src_ip, 0)) self._sock.settimeout(self._socket_timeout) self._sock.setsockopt( socket.SOL_SOCKET, socket.SO_RCVBUF, self._socket_buffer_size ) if ip == "255.255.255.255": self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) except Exception: # Best effort cleanup if socket init/configuration fails midway. self.close_connection() self.logger.debug("UCP: Could not connect to board with IP {}".format(ip)) return False # Save connection information self._ip = ip self._port = port return True
[docs] def close_connection(self): """Close socket connection""" sock = getattr(self, "_sock", None) if sock is None: return True try: sock.close() except Exception: return False self._sock = None return True
[docs] def read_register(self, address, n, offset): """ Read register value from board. :param address: Address to read from :param n: Number of values to read :param offset: Offset to read from """ # Check if we need to split this request into multiple packets # num_packets = ((n - 1) // self._max_values_in_payload) + 1 # List holding values to_return = [] # Issue requests one by one for i in range(num_packets): # Increment sequence number self._sequence_number += 1 # Number of words for current request num_values = ( n - i * self._max_values_in_payload if (i == num_packets - 1) else self._max_values_in_payload ) # New address current_address = address + (offset + i * self._max_values_in_payload) * 4 # Generate request pkt = self._create_request( self.OPCODE.READ.value, num_values, current_address ) # Send packet data, retries, measure = None, 0, None while retries < self._max_retries: # Send request start_time = time.time() if self._send_packet(pkt): # Get reply data = self._receive_packet(10240) measure = time.time() - start_time if data: # Sent and received packet, break from loop break else: self.logger.warning( f"UCP::read. Failed to receive response. " f"Address: {hex(current_address)}. " f"Retrying {retries + 1}" ) retries += 1 else: self.logger.warning( f"UCP::read. Failed to send request. " f"Address: {hex(current_address)}. " f"Retrying {retries + 1}" ) retries += 1 # Check if operation was successful if retries == self._max_retries: # Failure, return logging.error( f"UCP::read::TIMEOUT. Failed operation " f"to read address: {hex(current_address)}" ) raise LibraryError( f"UCP::read::TIMEOUT. Failed operation " f"to read address: {hex(current_address)}" ) # Successfully received reply, add values to list to_return += self._check_reply( data, address=current_address, num_values=num_values, is_read=True, measure=measure, ) # All done, return data return to_return
[docs] def write_register(self, address, values, offset=0, retry=True): """ Write values to register. :param address: Address to write to :param values: Values to write :param offset: Offset to write to :param retry: Enable retries of write operation """ # Check if we need to split this request into multiple packets n = len(values) num_packets = ((n - 1) // self._max_values_in_payload) + 1 # Issue requests one by one for i in range(num_packets): # Increment sequence number self._sequence_number += 1 # Number of words for current request num_values = ( n - i * self._max_values_in_payload if (i == num_packets - 1) else self._max_values_in_payload ) # New address current_address = address + (offset + i * self._max_values_in_payload) * 4 # Generate request pkt = self._create_request( self.OPCODE.WRITE.value, num_values, current_address, values[ i * self._max_values_in_payload : i * self._max_values_in_payload + num_values ], ) # Send packet data, retries, measure = None, 0, None while retries < self._max_retries: # Send request start_time = time.time() if self._send_packet(pkt): # Get reply data = self._receive_packet(10240) measure = time.time() - start_time if retry == False: return if data: break else: # Something went wrong. If we are dealing with a TPM, the check whether # request was processing but we lost the reply if self._board == BoardMake.TpmBoard: if self._last_processed_psn( self._sequence_number, current_address ): # Request was processed by the board, all good break # Otherwise, retry self.logger.warning( f"UCP::write. Failed to receive ack. " f"Address: {hex(current_address)}" f"Retrying {retries + 1}" ) retries += 1 else: self.logger.warning( f"UCP::write. Failed to send request. " f"Address: {hex(current_address)}" f"Retrying {retries + 1}" ) retries += 1 # Check if operation was successful if retries == self._max_retries: # Failure, return self.logger.error( f"UCP::write. Failed operation. " f"Address: {hex(current_address)}" f"Attempted {self._max_retries} retries." ) raise LibraryError( f"UCP::write. Failed operation. " f"Address: {hex(current_address)}" f"Attempted {self._max_retries} retries." ) # Process returned data self._check_reply( data, address=current_address, is_read=False, measure=measure )
[docs] def select_map_program( self, bitfile_data, fifo_addr=None, ucp_smap_write=True, lock_obj=None, ): """ Program the board using select map. :param values: Values to write """ if ucp_smap_write: ucp_opcode = self.OPCODE.OVERLAPPED_WRITE.value else: ucp_opcode = self.OPCODE.WRITE.value # Check if we need to split this request into multiple packets n = len(bitfile_data) // 4 num_packets = int(ceil(float(n) / float(self._max_values_in_payload))) # Send the first packet num_values = ( n if n < self._max_values_in_payload else self._max_values_in_payload ) values = list( struct.unpack_from("I" * num_values, bitfile_data[: num_values * 4]) ) pkt = self._create_request( ucp_opcode, num_values, address=fifo_addr, values=values ) # Temporary FIX, avoid overlap # Send packet # data, retries = None, 0 # while retries < self._max_retries: # # Send request # if self._send_packet(pkt): # break # else: # retries += 1 data, retries = None, 0 while retries < self._max_retries: # Send request if self._send_packet(pkt): # Get reply data = self._receive_packet(10240) if data: break else: retries += 1 else: retries += 1 # Temporary FIX, avoid overlap self._check_reply(data, sequence_number=self._sequence_number) start = time.time() # Issue next requests one by one for i in range(1, num_packets): # reload lock reg if request if lock_obj is not None: end = time.time() if end - start > lock_obj.suggested_relock_time: lock_obj.refresh() start = end # Increment sequence number self._sequence_number += 1 # Number of words for current request num_values = ( n - i * self._max_values_in_payload if (i == num_packets - 1) else self._max_values_in_payload ) values = list( struct.unpack_from( "I" * num_values, bitfile_data[ i * self._max_values_in_payload * 4 : i * self._max_values_in_payload * 4 + num_values * 4 ], ) ) # Generate request pkt = self._create_request( ucp_opcode, num_values, address=fifo_addr, values=values ) # Send packet data, retries = None, 0 while retries < self._max_retries: # Send request if self._send_packet(pkt): # Get reply data = self._receive_packet(10240) if data: break else: retries += 1 else: retries += 1 # Check if operation was successful if retries == self._max_retries: # Failure, return raise LibraryError("UCP::select_map_write. Failed to send request") # Process returned data # self._check_reply(data, sequence_number=self._sequence_number-1) # Temporary FIX, avoid ovelap self._check_reply(data, sequence_number=self._sequence_number)
# Receive last ack # data, retries = None, 0 # while retries < self._max_retries: # data = self._receive_packet(10240) # if data: # break # else: # retries += 1 # Check if operation was successful # if retries == self._max_retries: # # Failure, return # raise LibraryError("UCP::select_map_write. Failed to send request") # Process returned data # self._check_reply(data) def _create_request(self, opcode, n, address=None, values=None): """Creates UCP request :param opcode: UCP OPCODE :param n: Numbre of operands :param address: Address where operation will be performed :param values: Values to write, if any""" pkt = array.array("I") pkt.append(self._sequence_number) pkt.append(opcode) pkt.append(n) if address is not None: pkt.append(address) if values is not None: pkt.extend(values) # Return packed request return bytes(pkt) def _check_reply( self, data, address=None, sequence_number=None, num_values=None, is_read=False, measure=None, ): """Check whether packet was processed correctly :param data: Data to check :param psn: Packet sequence number :param address: Address""" if sequence_number is None: sequence_number = self._sequence_number operation = "read" if is_read == True else "write" # Process returned data data = bytes(data) psn = struct.unpack("I", data[0:4])[0] # Check if request was successful on board if psn != sequence_number: raise LibraryError( f"UCP::{operation}::PSN_ERR_in_{measure:.3f}s. " f"Command failed on board on address {hex(address)} " f"- PSN error: {psn}, {sequence_number}" ) if address is not None: received_address = struct.unpack("I", data[4:8])[0] nack_address = ~(address) & 0xFFFFFFFF if received_address == nack_address: raise LibraryError( f"UCP::{operation}::NACK_in_{measure:.3f}s. " f"Command failed on board. Requested address {hex(address)} " f"received address {hex(received_address)}" ) elif received_address != address: raise LibraryError( f"UCP::{operation}::ADDR_ERR_in_{measure:.3f}s. " f"Command failed on board. Requested address {hex(address)} " f"received address {hex(received_address)}" ) # Check number of bytes read if num_values is not None and len(data) / 4 < num_values: raise LibraryError( f"UCP::{operation}::DATA_ERR_in_{measure:.3f}s. " f"Failed to receive reply. Expecting {num_values * 4} bytes, " f"received {len(data)} bytes" ) if num_values is not None: return struct.unpack("I" * num_values, data[8:]) def _receive_packet(self, max_length): """Receive a packet of up to max_length""" try: data, _ = self._sock.recvfrom(max_length) except: return None return data def _send_packet(self, message): """Send packet""" ret = False try: if self._sock.sendto(message, (self._ip, self._port)) != -1: ret = True except: pass return ret def _last_processed_psn(self, psn, address_sent=None): """Check if a request was processed by reading the last processed PSN""" if address_sent: self.logger.info( f"UCP::last_process_PSN. Trying to recover command. " f"Address: {hex(address_sent)}" ) else: self.logger.info("UCP::last_process_PSN. Trying to recover command.") # Sequence number for checking seqno = 0xBEEF # Create request pkt = array.array("I") pkt.append(seqno) # PSN pkt.append(self.OPCODE.READ.value) # OPCODE pkt.append(1) # Number of operands pkt.append(0x30000004) # Address pkt = bytes(pkt) # Send packet data = None retries = 0 while retries < self._max_retries: # Send request if self._send_packet(pkt): data = self._receive_packet(10240) if data: # Sent and received packet, break from loop break else: retries += 1 else: retries += 1 # Check if operation was successful if retries == self._max_retries: # Failure, return if address_sent: self.logger.error( f"UCP::last_process_PSN. Failed to send request. " f"Address: {hex(address_sent)}" ) else: self.logger.error("UCP::last_process_PSN. Failed to send request") return False # Process returned data data = bytes(data) local_psn = struct.unpack("I", data[0:4])[0] address = struct.unpack("I", data[4:8])[0] # Check if request was successful on board if local_psn != seqno or address != 0x30000004: if address_sent: self.logger.error( f"UCP::last_process_PSN: Command failed on board," f"Address: {address_sent}" ) else: self.logger.error("UCP::last_process_PSN: Command failed on board") # Check whether PSNs match if local_psn == psn: return True else: return False
if __name__ == "__main__": ucp = UCP(BoardMake.TpmBoard) ucp.create_connection("10.0.10.3", 10000) print(ucp.read_register(0x0, 1, 0))