import logging
import socket
import struct
from ipaddress import ip_address
[docs]
def convert_uint_to_string(data):
"""
Convert list of 32-bit unsigned integers to string.
:param data: input data
:return: converted string representation
"""
temp = [struct.unpack("BBBB", struct.pack(">I", x)) for x in data]
return "".join([chr(b) for a in temp for b in a])
[docs]
def high_bits(datum, bits):
"""
Return location of on bits in datum.
:param datum: Datum
:param bits: Number of bits in datum
:return: Array containing locations of high bits
"""
result = []
for i in range(bits):
if (datum >> i) & 0xF == 1:
result.append(i)
return result
[docs]
def swap32(x):
return (
((x << 24) & 0xFF000000)
| ((x << 8) & 0x00FF0000)
| ((x >> 8) & 0x0000FF00)
| ((x >> 24) & 0x000000FF)
)
[docs]
def ip_str(ip):
"""
Converts an IP address integer to a human-readable string.
:param ip: IP address
:type ip: int
:return: IP address
:rtype: str
"""
return str(ip_address(ip))
[docs]
def get_octet_from_ip(ip, octet):
"""
Extract an octect from an IP address string.
Example: 10.135.0.12, Octet 1 is 10, Octet 4 is 12.
:param ip: IP address
:type ip: str
:param octet: Octet to extract
:type octet: int
:return: Octet contents
:rtype: int
"""
if not isinstance(octet, int) or not (1 <= octet <= 4):
logging.error("Octet must be an integer between 1 and 4")
return int(ip.split(".")[octet - 1])
[docs]
def ip2long(ip):
"""Convert an IP string to long"""
packed_ip = socket.inet_aton(ip)
return struct.unpack("!L", packed_ip)[0]
[docs]
def long2ip(ip):
"""Convert long to IP string"""
return socket.inet_ntoa(struct.pack("!I", ip))
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
""" Common definitions """
################################################################################
# System imports
import inspect
import itertools
import math
import operator
import time
################################################################################
# Constants
c_nibble_w = 4
c_byte_w = 8
c_halfword_w = 16
c_word_w = 32
c_longword_w = 64
c_byte_sz = 1
c_halfword_sz = 2
c_word_sz = 4
c_longword_sz = 8
c_nibble_mod = 2**c_nibble_w
c_byte_mod = 2**c_byte_w
c_halfword_mod = 2**c_halfword_w
c_word_mod = 2**c_word_w
c_word_sign = 2 ** (c_word_w - 1)
c_longword_mod = 2**c_longword_w
c_longword_sign = 2 ** (c_longword_w - 1)
c_nof_complex = 2
################################################################################
# Functions
[docs]
def greatest_common_div(A, B):
"""Find the greatest common divisor of A and B."""
while B != 0:
rem = A % B
A = B
B = rem
return A
[docs]
def ceil_div(num, den):
return int(math.ceil(num / float(den)))
[docs]
def ceil_log2(num):
return int(math.ceil(math.log(int(num), 2)))
[docs]
def sel_a_b(sel, a, b):
if sel == True:
return a
else:
return b
[docs]
def smallest(a, b):
if a < b:
return a
else:
return b
[docs]
def largest(a, b):
if a > b:
return a
else:
return b
[docs]
def signed32(v):
if v < c_word_sign:
return v
else:
return v - c_word_mod
[docs]
def signed64(v):
if v < c_longword_sign:
return v
else:
return v - c_longword_mod
[docs]
def int_clip(inp, w):
"""
Purpose : Clip an integer value to w bits
Input:
- inp = Integer value
- w = Output width in number of bits
Description: Output range -2**(w-1) to +2**(w-1)-1
Return:
- outp = Clipped value
"""
outp = 0
if w > 0:
clip_p = 2 ** (w - 1) - 1
clip_n = -(2 ** (w - 1))
if inp > clip_p:
outp = clip_p
elif inp < clip_n:
outp = clip_n
else:
outp = inp
return outp
[docs]
def int_wrap(inp, w):
"""
Purpose: Wrap an integer value to w bits
Input:
- inp = Integer value
- w = Output width in number of bits
Description: Remove MSbits, output range -2**(w-1) to +2**(w-1)-1
Return:
- outp = Wrapped value
"""
outp = 0
if w > 0:
wrap_mask = 2 ** (w - 1) - 1
wrap_sign = 2 ** (w - 1)
if (inp & wrap_sign) == 0:
outp = inp & wrap_mask
else:
outp = (inp & wrap_mask) - wrap_sign
return outp
[docs]
def int_round(inp, w, direction="HALF_AWAY"):
"""
Purpose : Round the w LSbits of an integer value
Input:
- inp = Integer value
- w = Number of LSbits to round
- direction = "HALF_AWAY", "HALF_UP"
Description:
direction = "HALF_AWAY" --> Round half away from zero so +0.5 --> 1, -0.5 --> -1.
direction = "HALF_UP" --> Round half to +infinity so +0.5 --> 1, -0.5 --> 0.
Return:
- outp = Rounded value
"""
outp = inp
if w > 0:
round_factor = 2**w
round_p = 2 ** (w - 1)
round_n = 2 ** (w - 1) - 1
if direction == "HALF_UP":
outp = (inp + round_p) // round_factor
if direction == "HALF_AWAY":
if inp >= 0:
outp = (inp + round_p) // round_factor
else:
outp = (inp + round_n) // round_factor
return outp
[docs]
def int_truncate(inp, w):
"""
Purpose : Truncate the w LSbits of an integer value
Input:
- inp = Integer value
- w = Number of LSbits to truncate
Description: Remove LSBits.
Return:
- outp = Truncated value
"""
outp = inp
if w > 0:
if inp >= 0:
outp = inp >> w
else:
outp = -((-inp) >> w)
return outp
[docs]
def int_requantize(
inp, inp_w, outp_w, lsb_w=0, lsb_round=False, msb_clip=False, gain_w=0
):
"""
Purpose : Requantize integer value similar as common_requantize.vhd
Input:
- inp = Integer value
- inp_w = Input data width
- outp_w = Output data width
- lsb_w = Number of LSbits to truncate
- lsb_round = when true round else truncate the input LSbits
- msb_clip = when true clip else wrap the input MSbits
- gain_w = Output gain in number of bits
Description: First round or truncate the LSbits, then clip or wrap the MSbits and then apply optional output gain
Return:
- outp = Requantized value
"""
# Input width
r = int_wrap(inp, inp_w)
# Remove LSBits using ROUND or TRUNCATE
if lsb_round:
r = int_round(r, lsb_w)
else:
r = int_truncate(r, lsb_w)
# Remove MSBits using CLIP or WRAP
if msb_clip:
r = int_clip(r, outp_w)
else:
r = int_wrap(r, outp_w)
# Output gain
r = r << gain_w
outp = int_wrap(r, outp_w)
return outp
[docs]
def flatten(x):
"""Flatten lists of lists of any depth. Preserves tuples."""
result = []
for el in x:
if (
hasattr(el, "__iter__")
and not isinstance(el, str)
and type(el) != tuple
and not issubclass(type(el), tuple)
):
result.extend(flatten(el))
else:
result.append(el)
return result
[docs]
def do_until(method, val, op=operator.eq, ms_retry=10, s_timeout=4, **kwargs):
"""
Default: DO ``[method]`` UNTIL ``[method==val]``
Example: ``do_until( self.read_status, 0)`` will execute ``self.read_status()`` until all
elements in the returned list equal 0.
Use ``**kwargs`` to pass keyworded arguments to the input method;
Use ``ms_retry`` to set the time in milliseconds before rerunning the method;
Use ``s_timeout`` to set the time in seconds before timeout occurs (returns 1). Use
``s_timeout < 0`` to disable the time out
All standard Python operators (see ``operator`` class) are supported, the default
is ``eq`` (equal to).
"""
list_ok = 0
start = time.time()
while list_ok == 0:
if s_timeout >= 0:
if time.time() - start >= s_timeout:
return None
data = []
if len(kwargs) > 0:
data.append(method(**kwargs))
else:
data.append(method())
flat_data = flatten(data)
list_ok = 1
for i in range(0, len(flat_data)):
if not op(flat_data[i], val):
list_ok = 0
if list_ok == 0:
time.sleep(ms_retry / 1000)
if list_ok == 1:
return flat_data[0]
[docs]
def do_until_lt(method, val, ms_retry=10, s_timeout=4, **kwargs):
return do_until(method, val, operator.lt, ms_retry, s_timeout, **kwargs)
[docs]
def do_until_le(method, val, ms_retry=10, s_timeout=4, **kwargs):
return do_until(method, val, operator.le, ms_retry, s_timeout, **kwargs)
[docs]
def do_until_eq(method, val, ms_retry=10, s_timeout=4, **kwargs):
return do_until(method, val, operator.eq, ms_retry, s_timeout, **kwargs)
[docs]
def do_until_ne(method, val, ms_retry=10, s_timeout=4, **kwargs):
return do_until(method, val, operator.ne, ms_retry, s_timeout, **kwargs)
[docs]
def do_until_ge(method, val, ms_retry=10, s_timeout=4, **kwargs):
return do_until(method, val, operator.ge, ms_retry, s_timeout, **kwargs)
[docs]
def do_until_gt(method, val, ms_retry=10, s_timeout=4, **kwargs):
return do_until(method, val, operator.gt, ms_retry, s_timeout, **kwargs)
[docs]
def reverse_byte(byte):
"""Fast way to reverse a byte on 64-bit platforms."""
return (byte * 0x0202020202 & 0x010884422010) % 1023
[docs]
def reverse_word(word):
"""Fast way to reverse a word on 64-bit platforms."""
B0 = word & 0xFF
B1 = (word & 0xFF00) >> 8
B2 = (word & 0xFF0000) >> 16
B3 = (word & 0xFF000000) >> 24
reversed_word = (
(reverse_byte(B0) << 24)
| (reverse_byte(B1) << 16)
| (reverse_byte(B2) << 8)
| (reverse_byte(B3))
)
return reversed_word
[docs]
def add_list(aList, bArg):
"""Element by element add list b to list a or add value b to each element in list a"""
aLen = len(aList)
bList = listify(bArg)
if len(bList) == 1:
bList = bList[0] * aLen
s = []
for i in range(aLen):
s.append(aList[i] + bList[i])
return s
[docs]
def subtract_list(aList, bArg):
"""Element by element subract list b from list a or subract value b from each element in list a"""
aLen = len(aList)
bList = listify(bArg)
if len(bList) == 1:
bList = bList[0] * aLen
s = []
for i in range(aLen):
s.append(aList[i] - bList[i])
return s
[docs]
def split_list(source_list, split_size, sublist_items=None):
"""
Splits a list based on split_size. Optionally, the indices passed in sublist_items
are extracted from each sublist.
"""
sublists = [
source_list[i : i + split_size] for i in range(0, len(source_list), split_size)
]
if sublist_items == None:
return sublists
else:
if len(listify(sublist_items)) == 1:
return [
listify(operator.itemgetter(*listify(sublist_items))(sl))
for sl in sublists
]
else:
return [
list(operator.itemgetter(*listify(sublist_items))(sl))
for sl in sublists
]
[docs]
def index_a_in_b(a, b, duplicates=False):
"""
Find the elements of list a in list b and return their indices (relative to b).
Does not return duplicates by default.
"""
if duplicates == False:
return [i for i, item in enumerate(b) if item in a]
else:
hits = []
for item_in_a in a:
hits.append([i for i, item in enumerate(b) if item == item_in_a])
return flatten(hits)
[docs]
def index_a_in_multi_b(a, b):
"""
Find a in multi-dimensional list b. Returns first hit only.
"""
if a == b:
return []
try:
for i, e in enumerate(b):
r = index_a_in_multi_b(a, e)
if r is not None:
r.insert(0, i)
return r
except TypeError:
pass
return None
[docs]
def unique(in_list):
"""
Extract unique list elements (without changing the order like set() does)
"""
seen = {}
result = []
for item in in_list:
if item in seen:
continue
seen[item] = 1
result.append(item)
return result
[docs]
def list_duplicates(in_list):
"""
find duplicate list elements
"""
# http://stackoverflow.com/questions/9835762/find-and-list-duplicates-in-python-list
seen = set()
seen_add = seen.add
# adds all elements it doesn't know yet to seen and all other to seen_twice
seen_twice = set(x for x in in_list if x in seen or seen_add(x))
# turn the set into a list (as requested)
return list(seen_twice)
[docs]
def all_the_same(lst):
"""Returns True if all the list elements are identical."""
return lst[1:] == lst[:-1]
[docs]
def all_equal_to(lst, value):
"""Returns True if all the list elements equal 'value'."""
if all_the_same(lst) == True and lst[0] == value:
return True
else:
return False
[docs]
def rotate_list(in_list, n):
"""Rotates the list. Positive numbers rotate left. Negative numbers rotate right."""
return in_list[n:] + in_list[:n]
[docs]
def to_unsigned(arg, width):
"""Interpret value[width-1:0] as unsigned"""
c_mask = 2**width - 1
vRet = []
vList = listify(arg)
for value in vList:
# mask the lower [width-1:0] bits, also accept float value by converting to int
v = int(value) & c_mask
vRet.append(v)
return unlistify(vRet)
[docs]
def to_signed(arg, width):
"""Interpret arg value[width-1:0] or list of arg values as signed (two's complement)"""
c_wrap = 2**width
c_mask = 2**width - 1
c_sign = 2 ** (width - 1)
vRet = []
vList = listify(arg)
for value in vList:
# mask the lower [width-1:0] bits, also accept float value by converting to int
v = int(value) & c_mask
if v & c_sign:
v -= c_wrap # keep negative values and wrap too large positive values
vRet.append(v)
return unlistify(vRet)
[docs]
def max_abs(data):
return max(max(data), -min(data))
[docs]
def insert(orig, new, pos):
"""Inserts new (string, element) inside original string or list at pos."""
return orig[:pos] + new + orig[pos:]
[docs]
def deinterleave(input_stream, nof_out, block_size=1):
"""
Deinterleave a stream (=flat list) into nof_out output streams based on block_size.
>> deinterleave( [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16], 4, 2 )
>> [ [1,2,9,10], [3,4,11,12], [5,6,13,14], [7,8,15,16] ]
Note: len(input_stream)/nof_out/block_size should yield an integer.
Note: This method behaves exactly like common_deinterleave.vhd.
"""
# Check passed arguments:
if float(len(input_stream)) / nof_out / block_size % 1 == 0:
# Split the list into block_sized sublists:
if block_size > 1:
block_list = split_list(input_stream, block_size)
else:
block_list = input_stream
# Split block_list into 2 sublists so we can transpose them
sublist_size = nof_out
block_2arr = split_list(block_list, sublist_size)
transposed = transpose(block_2arr)
# Flatten the list so we can re-split it:
flat_out_list = flatten(transposed)
# Divide this new stream across nof_out:
sublist_size = len(input_stream) // nof_out
result = split_list(flat_out_list, sublist_size)
return result
else:
print(
"deinterleave: Error: len(input_stream)/nof_out/block_size should yield an integer!"
)
[docs]
def interleave(input_streams, block_size=1):
"""
Interleave a list of multiple lists into one based on block size.
Note: This method behaves exactly like common_interleave.vhd.
"""
# flatten the list
flat_list = flatten(input_streams)
# Use deinterleave function to pull the stream apart block-wise and restore the correct order
nof_lists = len(input_streams[0]) // block_size
deint_block_list = deinterleave(flat_list, nof_lists, block_size)
# Flatten the list
result = flatten(deint_block_list)
return result
[docs]
def reinterleave(input_streams, nof_out, block_size_in=1, block_size_out=1):
"""
Re-interleave X input streams across nof_out output streams. The input streams are first
deinterleaved with block_size_in, then nof_out interleaved streams are made with block_size_out.
Note: This method behaves exactly like common_reinterleave.vhd.
"""
nof_in = len(input_streams)
# Array of deinterleavers:
# ------------------------
# deint_arr: [nof_in][deinterleaved streams]:
deint_arr = []
for in_no in range(nof_in):
deint_arr.append(deinterleave(input_streams[in_no], nof_out, block_size_in))
# Deinterleavers -> interleavers interconnect:
inter_in_arr = []
for i in range(nof_out):
inter_in = []
for j in range(nof_in):
inter_in.append(deint_arr[j][i])
inter_in_arr.append(inter_in)
# Array of interleavers:
# ----------------------
# inter_arr: [nof_out][interleaved streams]:
inter_out_arr = []
for out_no in range(nof_out):
inter_out_arr.append(interleave(inter_in_arr[out_no], block_size_out))
return inter_out_arr
[docs]
def transpose(matrix):
"""Transpose by using zip()"""
result = []
transposed = list(zip(*matrix))
# Python's zip() returns a list of tuples. We do not want that as tuples
# should only be used to bind items together; methods like flatten()
# preserve tuples (as tuples should not be broken - that's why they're
# tuples in the first place) wich would mean the output of a transpose
# could not be flattened. So, convert the list of tuples to a list of lists:
for i in transposed:
result.append(list(i))
return result
[docs]
def straighten(matrix, padding=" "):
"""
Straighten a crooked matrix by padding the shorter lists with the padding
up to the same length as the longest list.
"""
padded_matrix = []
max_len = len(max(matrix))
for row in matrix:
padded_matrix.append(pad(row, max_len, padding))
return padded_matrix
[docs]
def pad(lst, length, padding=" "):
"""Pad a list up to length with padding"""
return lst + [padding] * (length - len(lst))
[docs]
def depth(x):
"""Returns the depth of x. Returns 0 if x is not iterable (not a list or tuple)."""
if isinstance(x, list) or isinstance(x, tuple):
for level in itertools.count():
if not x:
return level
x = list(
itertools.chain.from_iterable(
s for s in x if isinstance(s, list) or isinstance(s, tuple)
)
)
else:
return 0
[docs]
def listify(x):
"""Can be used to force method input to a list."""
# The isinstance() built-in function is recommended over the type() built-in function for testing the type of an object
if isinstance(x, list):
return x
else:
return [x]
[docs]
def unlistify(x):
"""Converts 1-element list to x."""
# The isinstance() built-in function is recommended over the type() built-in function for testing the type of an object
if isinstance(x, list):
if len(x) == 1:
return x[0]
else:
return x
else:
return x
[docs]
def tuplefy(x):
"""
Similar to listify().
This method enables user to iterate through tuples of inconsistent depth by
always returning a non-flat tuple.
Pushes a flat tuple (depth=1) e.g. (0,1,2) one level deeper: ( (0,1,2), ).
Non-flat tuples are returned untouched.
A non-tuple (depth=0) is also pushed into a tuple 2 levels deep.
"""
if depth(x) == 1:
return (x,)
elif depth(x) == 0:
return ((x,),)
else:
return x
[docs]
def method_name(caller_depth=0):
"""Returns the name of the caller method."""
# Note: inspect.stack()[0][3] would return the name of this method.
return inspect.stack()[caller_depth + 1][3]
[docs]
def method_arg_names(method):
"""Returns the names of the arguments of passed method."""
return inspect.getargspec(method)[0]
[docs]
def concat_complex(list_complex, width_in_bits):
"""
Concatenates the real and imaginary part into one integer.
The specifed width counts for both the real and imaginary part.
Real part is mapped on the LSB. Imaginary part is shifted to the MSB.
"""
result = []
for i in range(len(list_complex)):
real = int(list_complex[i].real) & (2**width_in_bits - 1)
imag = int(list_complex[i].imag) & (2**width_in_bits - 1)
result.append((imag << width_in_bits) + real)
return result
[docs]
def split_complex(list_complex):
"""
Returns the real and imaginary part in two separate lists.
[list_re, list_im] = split_complex(list_complex)
"""
list_real = []
list_imag = []
for i in range(len(list_complex)):
list_real.append(list_complex[i].real)
list_imag.append(list_complex[i].imag)
return (list_real, list_imag)
[docs]
def mac_str(mac):
"""
Converts MAC address integer to the hexadecimal string representation,
separated by ':'.
:param mac: MAC address
:type mac: int
:return: MAC address
:rtype: str
"""
return ":".join(f"{(mac >> (i * 8)) & 0xFF:02X}" for i in reversed(range(6)))
[docs]
def bit_list_to_mask(bit_list):
"""Converts a list of bit indexes to an integer"""
return sum([2**i for i in bit_list]) # Calculate int from bit indexes