import array
import logging
import random
import socket
import struct
import time
from enum import Enum
from math import ceil
from ska_low_sps_tpm_api import BoardMake, LibraryError
from ska_low_sps_tpm_api.base.protocol import Protocol
[docs]
class UCP(Protocol):
"""Class implementing the UCP communication protocol"""
# UCP OPCODE definitions
[docs]
class OPCODE(Enum):
"""Device enumeration"""
READ = 0x01
WRITE = 0x02
BITWISE_AND = 0x03
BITWISE_OR = 0x04
FLASH_WRITE = 0x06
FLASH_READ = 0x07
FLASH_ERASE = 0x08
FIFO_READ = 0x09
FIFO_WRITE = 0x0A
BIT_WRITE = 0x0B
RESET_BOARD = 0x11
PERIODIC_UPDATE = 0x12
ASYNC_UPDATE = 0x13
CANCEL_UPDATE = 0x14
OVERLAPPED_WRITE = 40
WAIT_FOR_PPS = 0xFFFFFFFF
[docs]
def __init__(self, board, logger=None):
"""UCP class constructor"""
super(UCP, self).__init__(board)
# Initialise sequence number
self._sequence_number = int(random.random() * 100000)
# Maximum payload size
self._max_values_in_payload = 256
# Define socket properties
self._socket_timeout = 1
self._socket_buffer_size = 1024 * 1024
self._max_retries = 3
if logger is None:
self.logger = logging.getLogger("")
else:
self.logger = logger
[docs]
def create_connection(self, ip, port, src_ip=None):
"""
Create connection.
:param ip: Remote IP
:param port: Remote port
"""
# Ensure any previous socket is released before creating a new one.
# This avoids descriptor growth when connect() is retried repeatedly.
self.close_connection()
# Create and configure socket
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if src_ip is not None:
self.logger.warning("UCP: bind socket: {}".format(src_ip))
self._sock.bind((src_ip, 0))
self._sock.settimeout(self._socket_timeout)
self._sock.setsockopt(
socket.SOL_SOCKET, socket.SO_RCVBUF, self._socket_buffer_size
)
if ip == "255.255.255.255":
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
except Exception:
# Best effort cleanup if socket init/configuration fails midway.
self.close_connection()
self.logger.debug("UCP: Could not connect to board with IP {}".format(ip))
return False
# Save connection information
self._ip = ip
self._port = port
return True
[docs]
def close_connection(self):
"""Close socket connection"""
sock = getattr(self, "_sock", None)
if sock is None:
return True
try:
sock.close()
except Exception:
return False
self._sock = None
return True
[docs]
def read_register(self, address, n, offset):
"""
Read register value from board.
:param address: Address to read from
:param n: Number of values to read
:param offset: Offset to read from
"""
# Check if we need to split this request into multiple packets
#
num_packets = ((n - 1) // self._max_values_in_payload) + 1
# List holding values
to_return = []
# Issue requests one by one
for i in range(num_packets):
# Increment sequence number
self._sequence_number += 1
# Number of words for current request
num_values = (
n - i * self._max_values_in_payload
if (i == num_packets - 1)
else self._max_values_in_payload
)
# New address
current_address = address + (offset + i * self._max_values_in_payload) * 4
# Generate request
pkt = self._create_request(
self.OPCODE.READ.value, num_values, current_address
)
# Send packet
data, retries, measure = None, 0, None
while retries < self._max_retries:
# Send request
start_time = time.time()
if self._send_packet(pkt):
# Get reply
data = self._receive_packet(10240)
measure = time.time() - start_time
if data:
# Sent and received packet, break from loop
break
else:
self.logger.warning(
f"UCP::read. Failed to receive response. "
f"Address: {hex(current_address)}. "
f"Retrying {retries + 1}"
)
retries += 1
else:
self.logger.warning(
f"UCP::read. Failed to send request. "
f"Address: {hex(current_address)}. "
f"Retrying {retries + 1}"
)
retries += 1
# Check if operation was successful
if retries == self._max_retries:
# Failure, return
logging.error(
f"UCP::read::TIMEOUT. Failed operation "
f"to read address: {hex(current_address)}"
)
raise LibraryError(
f"UCP::read::TIMEOUT. Failed operation "
f"to read address: {hex(current_address)}"
)
# Successfully received reply, add values to list
to_return += self._check_reply(
data,
address=current_address,
num_values=num_values,
is_read=True,
measure=measure,
)
# All done, return data
return to_return
[docs]
def write_register(self, address, values, offset=0, retry=True):
"""
Write values to register.
:param address: Address to write to
:param values: Values to write
:param offset: Offset to write to
:param retry: Enable retries of write operation
"""
# Check if we need to split this request into multiple packets
n = len(values)
num_packets = ((n - 1) // self._max_values_in_payload) + 1
# Issue requests one by one
for i in range(num_packets):
# Increment sequence number
self._sequence_number += 1
# Number of words for current request
num_values = (
n - i * self._max_values_in_payload
if (i == num_packets - 1)
else self._max_values_in_payload
)
# New address
current_address = address + (offset + i * self._max_values_in_payload) * 4
# Generate request
pkt = self._create_request(
self.OPCODE.WRITE.value,
num_values,
current_address,
values[
i * self._max_values_in_payload : i * self._max_values_in_payload
+ num_values
],
)
# Send packet
data, retries, measure = None, 0, None
while retries < self._max_retries:
# Send request
start_time = time.time()
if self._send_packet(pkt):
# Get reply
data = self._receive_packet(10240)
measure = time.time() - start_time
if retry == False:
return
if data:
break
else:
# Something went wrong. If we are dealing with a TPM, the check whether
# request was processing but we lost the reply
if self._board == BoardMake.TpmBoard:
if self._last_processed_psn(
self._sequence_number, current_address
):
# Request was processed by the board, all good
break
# Otherwise, retry
self.logger.warning(
f"UCP::write. Failed to receive ack. "
f"Address: {hex(current_address)}"
f"Retrying {retries + 1}"
)
retries += 1
else:
self.logger.warning(
f"UCP::write. Failed to send request. "
f"Address: {hex(current_address)}"
f"Retrying {retries + 1}"
)
retries += 1
# Check if operation was successful
if retries == self._max_retries:
# Failure, return
self.logger.error(
f"UCP::write. Failed operation. "
f"Address: {hex(current_address)}"
f"Attempted {self._max_retries} retries."
)
raise LibraryError(
f"UCP::write. Failed operation. "
f"Address: {hex(current_address)}"
f"Attempted {self._max_retries} retries."
)
# Process returned data
self._check_reply(
data, address=current_address, is_read=False, measure=measure
)
[docs]
def select_map_program(
self,
bitfile_data,
fifo_addr=None,
ucp_smap_write=True,
lock_obj=None,
):
"""
Program the board using select map.
:param values: Values to write
"""
if ucp_smap_write:
ucp_opcode = self.OPCODE.OVERLAPPED_WRITE.value
else:
ucp_opcode = self.OPCODE.WRITE.value
# Check if we need to split this request into multiple packets
n = len(bitfile_data) // 4
num_packets = int(ceil(float(n) / float(self._max_values_in_payload)))
# Send the first packet
num_values = (
n if n < self._max_values_in_payload else self._max_values_in_payload
)
values = list(
struct.unpack_from("I" * num_values, bitfile_data[: num_values * 4])
)
pkt = self._create_request(
ucp_opcode, num_values, address=fifo_addr, values=values
)
# Temporary FIX, avoid overlap
# Send packet
# data, retries = None, 0
# while retries < self._max_retries:
# # Send request
# if self._send_packet(pkt):
# break
# else:
# retries += 1
data, retries = None, 0
while retries < self._max_retries:
# Send request
if self._send_packet(pkt):
# Get reply
data = self._receive_packet(10240)
if data:
break
else:
retries += 1
else:
retries += 1
# Temporary FIX, avoid overlap
self._check_reply(data, sequence_number=self._sequence_number)
start = time.time()
# Issue next requests one by one
for i in range(1, num_packets):
# reload lock reg if request
if lock_obj is not None:
end = time.time()
if end - start > lock_obj.suggested_relock_time:
lock_obj.refresh()
start = end
# Increment sequence number
self._sequence_number += 1
# Number of words for current request
num_values = (
n - i * self._max_values_in_payload
if (i == num_packets - 1)
else self._max_values_in_payload
)
values = list(
struct.unpack_from(
"I" * num_values,
bitfile_data[
i
* self._max_values_in_payload
* 4 : i
* self._max_values_in_payload
* 4
+ num_values * 4
],
)
)
# Generate request
pkt = self._create_request(
ucp_opcode, num_values, address=fifo_addr, values=values
)
# Send packet
data, retries = None, 0
while retries < self._max_retries:
# Send request
if self._send_packet(pkt):
# Get reply
data = self._receive_packet(10240)
if data:
break
else:
retries += 1
else:
retries += 1
# Check if operation was successful
if retries == self._max_retries:
# Failure, return
raise LibraryError("UCP::select_map_write. Failed to send request")
# Process returned data
# self._check_reply(data, sequence_number=self._sequence_number-1)
# Temporary FIX, avoid ovelap
self._check_reply(data, sequence_number=self._sequence_number)
# Receive last ack
# data, retries = None, 0
# while retries < self._max_retries:
# data = self._receive_packet(10240)
# if data:
# break
# else:
# retries += 1
# Check if operation was successful
# if retries == self._max_retries:
# # Failure, return
# raise LibraryError("UCP::select_map_write. Failed to send request")
# Process returned data
# self._check_reply(data)
def _create_request(self, opcode, n, address=None, values=None):
"""Creates UCP request
:param opcode: UCP OPCODE
:param n: Numbre of operands
:param address: Address where operation will be performed
:param values: Values to write, if any"""
pkt = array.array("I")
pkt.append(self._sequence_number)
pkt.append(opcode)
pkt.append(n)
if address is not None:
pkt.append(address)
if values is not None:
pkt.extend(values)
# Return packed request
return bytes(pkt)
def _check_reply(
self,
data,
address=None,
sequence_number=None,
num_values=None,
is_read=False,
measure=None,
):
"""Check whether packet was processed correctly
:param data: Data to check
:param psn: Packet sequence number
:param address: Address"""
if sequence_number is None:
sequence_number = self._sequence_number
operation = "read" if is_read == True else "write"
# Process returned data
data = bytes(data)
psn = struct.unpack("I", data[0:4])[0]
# Check if request was successful on board
if psn != sequence_number:
raise LibraryError(
f"UCP::{operation}::PSN_ERR_in_{measure:.3f}s. "
f"Command failed on board on address {hex(address)} "
f"- PSN error: {psn}, {sequence_number}"
)
if address is not None:
received_address = struct.unpack("I", data[4:8])[0]
nack_address = ~(address) & 0xFFFFFFFF
if received_address == nack_address:
raise LibraryError(
f"UCP::{operation}::NACK_in_{measure:.3f}s. "
f"Command failed on board. Requested address {hex(address)} "
f"received address {hex(received_address)}"
)
elif received_address != address:
raise LibraryError(
f"UCP::{operation}::ADDR_ERR_in_{measure:.3f}s. "
f"Command failed on board. Requested address {hex(address)} "
f"received address {hex(received_address)}"
)
# Check number of bytes read
if num_values is not None and len(data) / 4 < num_values:
raise LibraryError(
f"UCP::{operation}::DATA_ERR_in_{measure:.3f}s. "
f"Failed to receive reply. Expecting {num_values * 4} bytes, "
f"received {len(data)} bytes"
)
if num_values is not None:
return struct.unpack("I" * num_values, data[8:])
def _receive_packet(self, max_length):
"""Receive a packet of up to max_length"""
try:
data, _ = self._sock.recvfrom(max_length)
except:
return None
return data
def _send_packet(self, message):
"""Send packet"""
ret = False
try:
if self._sock.sendto(message, (self._ip, self._port)) != -1:
ret = True
except:
pass
return ret
def _last_processed_psn(self, psn, address_sent=None):
"""Check if a request was processed by reading the last processed PSN"""
if address_sent:
self.logger.info(
f"UCP::last_process_PSN. Trying to recover command. "
f"Address: {hex(address_sent)}"
)
else:
self.logger.info("UCP::last_process_PSN. Trying to recover command.")
# Sequence number for checking
seqno = 0xBEEF
# Create request
pkt = array.array("I")
pkt.append(seqno) # PSN
pkt.append(self.OPCODE.READ.value) # OPCODE
pkt.append(1) # Number of operands
pkt.append(0x30000004) # Address
pkt = bytes(pkt)
# Send packet
data = None
retries = 0
while retries < self._max_retries:
# Send request
if self._send_packet(pkt):
data = self._receive_packet(10240)
if data:
# Sent and received packet, break from loop
break
else:
retries += 1
else:
retries += 1
# Check if operation was successful
if retries == self._max_retries:
# Failure, return
if address_sent:
self.logger.error(
f"UCP::last_process_PSN. Failed to send request. "
f"Address: {hex(address_sent)}"
)
else:
self.logger.error("UCP::last_process_PSN. Failed to send request")
return False
# Process returned data
data = bytes(data)
local_psn = struct.unpack("I", data[0:4])[0]
address = struct.unpack("I", data[4:8])[0]
# Check if request was successful on board
if local_psn != seqno or address != 0x30000004:
if address_sent:
self.logger.error(
f"UCP::last_process_PSN: Command failed on board,"
f"Address: {address_sent}"
)
else:
self.logger.error("UCP::last_process_PSN: Command failed on board")
# Check whether PSNs match
if local_psn == psn:
return True
else:
return False
if __name__ == "__main__":
ucp = UCP(BoardMake.TpmBoard)
ucp.create_connection("10.0.10.3", 10000)
print(ucp.read_register(0x0, 1, 0))