# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing utility methods of SMRB."""
from __future__ import annotations
from typing import Any, Dict
from ska_pst.common.constants import BITS_PER_BYTE, SIZE_OF_FLOAT32_IN_BYTES
from ska_pst.common import CbfPstConfig
__all__ = [
"generate_data_key",
"generate_weights_key",
"calculate_smrb_subband_resources",
]
DATA_KEY_SUFFIX: int = 0
"""Suffix used of the ring buffer keys to denote that it is a input data ring buffer."""
WEIGHTS_KEY_SUFFIX: int = 2
"""Suffix used of the ring buffer keys to denote that it is a input weights ring buffer."""
HEADER_BUFFER_NBUFS: int = 8
"""The number of buffers for the header ring buffers."""
HEADER_BUFFER_BUFSZ: int = 16384
"""The size of the header ring buffers, in bytes."""
[docs]def generate_data_key(beam_id: int, subband_id: int) -> str:
"""
Generate a data header key.
The format of this is a string of 4 chars long. The first two chars is the beam_id represented in
hexadecimal with left zero padding, the next is subband_id, and finally a suffix of 0.
:param beam_id: the beam_id that this LMC component is for.
:type beam_id: int
:param subband_id: the id of the subband to generate the key for.
:type subband_id: int
:returns: the encoded key to be used for the data header ring buffer.
:rtype: str
"""
return "{0:02x}{1}{2}".format(beam_id, subband_id, DATA_KEY_SUFFIX)
[docs]def generate_weights_key(beam_id: int, subband_id: int) -> str:
"""
Generate a weights header key.
The format of this is a string of 4 chars long. The first two
chars is the beam_id represented in hexadecimal with left zero
padding, the next is subband_id, and finally a suffix of 2.
NOTE: the weights key is 2 more than the data key, needed because
SMRB.CORE has 2 keys for each ring buffer.
:param beam_id: the beam_id that this LMC component is for.
:type beam_id: int
:param subband_id: the id of the subband to generate the key for.
:type subband_id: int
:returns: the encoded key to be used for the data header ring buffer.
:rtype: str
"""
return "{0:02x}{1}{2}".format(beam_id, subband_id, WEIGHTS_KEY_SUFFIX)
[docs]def calculate_smrb_subband_resources(
beam_id: int,
cbf_pst_config: CbfPstConfig,
**kwargs: Any,
) -> Dict[int, dict]:
"""
Calculate the ring buffer (RB) resources from request.
This is a common method used to calculate the keys, number of buffers, and
the size of buffers for each subband required for a scan.
:param beam_id: the numerical id of the beam that this RB is for.
:type beam_id: int
:param cbf_pst_config: the CBF/PST configuration for the current request
:type cbf_pst_config: CbfPstConfig
:returns: a dict of dicts, with the top level key being the subband id, while
the second level is the specific parameters. An example would response
is as follows::
{
1: {
'data_key': "a000",
'weights_key': "a010",
'hb_nbufs': 8,
'hb_bufsz': 4096,
'db_nbufs': 8,
'db_bufsz': 1048576,
'wb_nbufs': 8,
'wb_bufsz': 8192,
}
}
"""
rb_config = cbf_pst_config.ring_buffer_config
udp_nchan = cbf_pst_config.udp_nchan
packets_per_buffer = rb_config.packets_per_buffer
num_buffers = rb_config.num_buffers
obsnchan = cbf_pst_config.nchan_for_bandwidth(**kwargs)
obsnpol = cbf_pst_config.npol
ndim = cbf_pst_config.ndim
nbits = cbf_pst_config.nbit
wt_nbits = cbf_pst_config.wt_nbit
udp_nsamp = cbf_pst_config.udp_nsamp
wt_nsamp = cbf_pst_config.wt_nsamp
# this should be 1 as udp_nsamp should equal wt_nsamp
wt_nweight = udp_nsamp // wt_nsamp
data_buffer_resolution = obsnchan * obsnpol * nbits * ndim // BITS_PER_BYTE * udp_nsamp
# this should be effectively 2 * obsnchan as WEIGHTS_NBITS is 16
weights_buffer_resolution = (
obsnchan // udp_nchan * SIZE_OF_FLOAT32_IN_BYTES
) + obsnchan * wt_nbits // BITS_PER_BYTE * wt_nweight
return {
1: {
"data_key": generate_data_key(beam_id=beam_id, subband_id=1),
"weights_key": generate_weights_key(beam_id=beam_id, subband_id=1),
"hb_nbufs": HEADER_BUFFER_NBUFS,
"hb_bufsz": HEADER_BUFFER_BUFSZ,
"db_nbufs": num_buffers,
"db_bufsz": packets_per_buffer * data_buffer_resolution,
"wb_nbufs": num_buffers,
"wb_bufsz": packets_per_buffer * weights_buffer_resolution,
}
}