"""Tools for working with JSON compressed channel maps.
The SKA is meant to have a large number of channels, which means that
any type of per-channel configuration might become very cumbersome to
transfer and reason about. To prevent such issues we are using a
simple run-length encoding to "compress" the representation. The idea
is that if we write::
[ [0,0], [200,1], [400, 3] ]
We essentially mean the dictionary::
{ 0: 0, 1: 0, ..., 199:0, 200:1, ..., 399:1, 400: 3, ...}
Furthermore runs of numbers are supported, by adding an increment::
[ [0,0,1], [200,1] ]
Means::
{ 0: 0, 1: 1, 2:2, ..., 199:100, 200:1, ...}
"""
from collections import deque
from typing import Any, List
def _validate_channel_map(
channel_map: List[list], channel: int = 0, check_order: bool = True
):
"""
Check that map is non-empty, defined for the given channel, and sorted.
:param channel_map: Channel map to check
:param channel: Channel to ensure we have a definition for
:raises ValueError: If channel map is found to be invalid
"""
if len(channel_map) == 0:
raise ValueError("Channel map must have at least one entry!")
if channel_map[0][0] > channel:
raise ValueError(f"Channel map must be defined for channel {channel}!")
if check_order:
last_channel = int(channel_map[0][0])
for channel, *_ in channel_map[1:]:
if int(channel) <= last_channel:
raise ValueError(
"Channel map is not ordered: "
f"{last_channel} <= {int(channel)}!"
)
last_channel = int(channel)
[docs]def channel_map_at(
channel_map: List[list], channel: int, make_entry: bool = False
) -> Any:
"""
Query a value from a channel map
:param channel_map: Queried map
:param channel: Channel ID to query
:param make_entry: Return an channel map entry (including increment)
instead of just the value
:returns: Value from map
"""
_validate_channel_map(channel_map, channel, False)
# Locate (TODO: do a binary search...)
entry = channel_map[0]
for e in channel_map[1:]:
if e[0] > channel:
break
entry = e
# Apply incremement if necessary
val = entry[1]
if len(entry) > 2:
val += (channel - entry[0]) * entry[2]
if make_entry:
return [channel, val, *entry[2:]]
else:
return val
[docs]def shift_channel_map(
channel_map: List[list], channel_shift: int
) -> List[list]:
"""
Shift a channel map by some channel distance
:param channel_map: Channel map to use
:param channel_shift: Shift to apply
"""
return list([[ch + channel_shift] + vals for ch, *vals in channel_map])
[docs]def split_channel_map(
channel_map: List[list],
first_channel: int,
channel_group_steps: int,
rebase_groups: int = None,
minimum_groups: int = 0,
) -> List[List[list]]:
"""
Split a channel map using a constant channel step length
:param channel_map: Channel map to split. Each entry is expected to have
the start channel in the first field, and mapped data in the remaining
entries
:param first_channel: First channel to appear in the map
:param channel_group_steps: Chunks to split the channel map into
:param rebase_groups: Start every group at given channel index
(None: left as-is)
:param minimum_groups: Minimum number of groups to return
:returns: List of channel maps
"""
# Validate
_validate_channel_map(channel_map, first_channel)
# Make split points
last_group = max(
channel_map[-1][0] + 1,
first_channel + minimum_groups * channel_group_steps,
)
channel_groups = list(
range(
first_channel,
last_group + channel_group_steps,
channel_group_steps,
)
)
# Split channel map
return split_channel_map_at(channel_map, channel_groups, rebase_groups)
[docs]def split_channel_map_at(
channel_map: List[list],
channel_groups: List[int],
rebase_groups: int = None,
) -> List[List[list]]:
"""
Split a channel map at certain points
:param channel_map: Channel map to split. Each entry is expected to have
the start channel in the first field, and mapped data in the remaining
entries
:param channel_groups: Boundaries between channel groups.
The `n`-th returned channel map will cover channels
`channel_groups[n]..channel_groups[n+1]-1`. Needs to have at least
two entries.
:param rebase_groups: Start every group at given channel index
(None: left as-is)
:returns: List of channel maps
"""
if len(channel_groups) < 2:
return []
# Validate
_validate_channel_map(channel_map, channel_groups[0])
# Generate first entry
maps = []
current_map = [
channel_map_at(channel_map, channel_groups[0], make_entry=True)
]
group = 0
# Remove all entries in map before the first channel
work_map = deque(channel_map)
while len(work_map) > 0 and work_map[0][0] <= channel_groups[group]:
work_map.popleft()
# Add entries
while len(work_map) > 0:
if work_map[0][0] >= channel_groups[group + 1]:
# Last channel group to generate?
if group + 2 >= len(channel_groups):
break
# Add map
maps.append(current_map)
group += 1
# Set up next group
if work_map[0][0] > channel_groups[group]:
current_map = [
channel_map_at(
current_map, channel_groups[group], make_entry=True
)
]
else:
current_map = [work_map.popleft()]
continue
# Append map entry
current = work_map.popleft()
current_map.append(current)
# Add final map
maps.append(current_map)
group += 1
# Pad to requested number of groups
while group + 1 < len(channel_groups):
entry = channel_map_at(
maps[-1], channel_groups[group], make_entry=True
)
maps.append([entry])
group += 1
# Rebase if requested
if rebase_groups is not None:
maps = list(
[
shift_channel_map(chan_map, rebase_groups - first_chan)
for first_chan, chan_map in zip(channel_groups, maps)
]
)
return maps