Source code for ska_telmodel.channel_map

"""Tools for working with JSON compressed channel maps.

The SKA is meant to have a large number of channels, which means that
any type of per-channel configuration might become very cumbersome to
transfer and reason about. To prevent such issues we are using a
simple run-length encoding to "compress" the representation. The idea
is that if we write::

    [ [0,0], [200,1], [400, 3] ]

We essentially mean the dictionary::

    { 0: 0, 1: 0, ..., 199:0, 200:1, ..., 399:1, 400: 3, ...}

Furthermore runs of numbers are supported, by adding an increment::

    [ [0,0,1], [200,1] ]

Means::

    { 0: 0, 1: 1, 2:2, ..., 199:100, 200:1, ...}
"""

from collections import deque
from typing import Any, List


def _validate_channel_map(
    channel_map: List[list], channel: int = 0, check_order: bool = True
):
    """
    Check that map is non-empty, defined for the given channel, and sorted.

    :param channel_map: Channel map to check
    :param channel: Channel to ensure we have a definition for
    :raises ValueError: If channel map is found to be invalid
    """

    if len(channel_map) == 0:
        raise ValueError("Channel map must have at least one entry!")
    if channel_map[0][0] > channel:
        raise ValueError(f"Channel map must be defined for channel {channel}!")

    if check_order:
        last_channel = int(channel_map[0][0])
        for channel, *_ in channel_map[1:]:
            if int(channel) <= last_channel:
                raise ValueError(
                    "Channel map is not ordered: "
                    f"{last_channel} <= {int(channel)}!"
                )
            last_channel = int(channel)


[docs]def channel_map_at( channel_map: List[list], channel: int, make_entry: bool = False ) -> Any: """ Query a value from a channel map :param channel_map: Queried map :param channel: Channel ID to query :param make_entry: Return an channel map entry (including increment) instead of just the value :returns: Value from map """ _validate_channel_map(channel_map, channel, False) # Locate (TODO: do a binary search...) entry = channel_map[0] for e in channel_map[1:]: if e[0] > channel: break entry = e # Apply incremement if necessary val = entry[1] if len(entry) > 2: val += (channel - entry[0]) * entry[2] if make_entry: return [channel, val, *entry[2:]] else: return val
[docs]def shift_channel_map( channel_map: List[list], channel_shift: int ) -> List[list]: """ Shift a channel map by some channel distance :param channel_map: Channel map to use :param channel_shift: Shift to apply """ return list([[ch + channel_shift] + vals for ch, *vals in channel_map])
[docs]def split_channel_map( channel_map: List[list], first_channel: int, channel_group_steps: int, rebase_groups: int = None, minimum_groups: int = 0, ) -> List[List[list]]: """ Split a channel map using a constant channel step length :param channel_map: Channel map to split. Each entry is expected to have the start channel in the first field, and mapped data in the remaining entries :param first_channel: First channel to appear in the map :param channel_group_steps: Chunks to split the channel map into :param rebase_groups: Start every group at given channel index (None: left as-is) :param minimum_groups: Minimum number of groups to return :returns: List of channel maps """ # Validate _validate_channel_map(channel_map, first_channel) # Make split points last_group = max( channel_map[-1][0] + 1, first_channel + minimum_groups * channel_group_steps, ) channel_groups = list( range( first_channel, last_group + channel_group_steps, channel_group_steps, ) ) # Split channel map return split_channel_map_at(channel_map, channel_groups, rebase_groups)
[docs]def split_channel_map_at( channel_map: List[list], channel_groups: List[int], rebase_groups: int = None, ) -> List[List[list]]: """ Split a channel map at certain points :param channel_map: Channel map to split. Each entry is expected to have the start channel in the first field, and mapped data in the remaining entries :param channel_groups: Boundaries between channel groups. The `n`-th returned channel map will cover channels `channel_groups[n]..channel_groups[n+1]-1`. Needs to have at least two entries. :param rebase_groups: Start every group at given channel index (None: left as-is) :returns: List of channel maps """ if len(channel_groups) < 2: return [] # Validate _validate_channel_map(channel_map, channel_groups[0]) # Generate first entry maps = [] current_map = [ channel_map_at(channel_map, channel_groups[0], make_entry=True) ] group = 0 # Remove all entries in map before the first channel work_map = deque(channel_map) while len(work_map) > 0 and work_map[0][0] <= channel_groups[group]: work_map.popleft() # Add entries while len(work_map) > 0: if work_map[0][0] >= channel_groups[group + 1]: # Last channel group to generate? if group + 2 >= len(channel_groups): break # Add map maps.append(current_map) group += 1 # Set up next group if work_map[0][0] > channel_groups[group]: current_map = [ channel_map_at( current_map, channel_groups[group], make_entry=True ) ] else: current_map = [work_map.popleft()] continue # Append map entry current = work_map.popleft() current_map.append(current) # Add final map maps.append(current_map) group += 1 # Pad to requested number of groups while group + 1 < len(channel_groups): entry = channel_map_at( maps[-1], channel_groups[group], make_entry=True ) maps.append([entry]) group += 1 # Rebase if requested if rebase_groups is not None: maps = list( [ shift_channel_map(chan_map, rebase_groups - first_chan) for first_chan, chan_map in zip(channel_groups, maps) ] ) return maps