Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
UdpStreamTest.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/rcpt_low/test/UdpStreamTest.h"
25 #include "cheetah/rcpt_low/UdpStream.h"
26 #include "cheetah/rcpt_low/Config.h"
27 #include "cheetah/rcpt_low/PacketGenerator.h"
28 #include "cheetah/rcpt_low/PacketGeneratorConfig.h"
29 #include "panda/DataManager.h"
30 #include "panda/IpAddress.h"
31 #include "panda/Log.h"
32 #include "panda/Engine.h"
33 #include <algorithm>
34 #include <deque>
35 #include <mutex>
36 #include <iostream>
37 #include <sstream>
38 #include <random>
39 
40 namespace ska {
41 namespace cheetah {
42 namespace rcpt_low {
43 namespace test {
44 
45 
46 UdpStreamTest::UdpStreamTest()
47  : ::testing::Test()
48 {
49 }
50 
51 UdpStreamTest::~UdpStreamTest()
52 {
53 }
54 
55 void UdpStreamTest::SetUp()
56 {
57 }
58 
59 void UdpStreamTest::TearDown()
60 {
61 }
62 
63 struct TestModel {
64  public:
66 
67 
68  DataType& next(DataType& data) {
69  //Generate random data
70  std::default_random_engine generator;
71  std::normal_distribution<float> distribution(0.0,25.0);
72  // fill data with values
73  std::unique_lock<std::mutex> lk(_mutex);
74 
75  std::generate(data.begin(), data.end(), [&]{ return ((typename DataType::DataType)distribution(generator)); } );
76  _data.push_back(data);
77  return data;
78  }
79 
80  DataType sent_data() {
81  std::lock_guard<std::mutex> lk(_mutex);
82  if(_data.empty()) throw panda::Error("no more sent data");
83  DataType d = _data.front();
84  _data.pop_front();
85  return d;
86  }
87 
88  bool has_sent_data() const {
89  std::lock_guard<std::mutex> lk(_mutex);
90  return !_data.empty();
91  }
92 
93  private:
94  mutable std::mutex _mutex;
95  std::deque<DataType> _data;
96 };
97 
98 
99 template<typename Packet>
100 void test_udp_packets_stream_data_consistency(std::size_t number_of_channels, std::size_t number_of_samples)
101 {
102  typedef ska::panda::Connection<ska::panda::ConnectionTraits<ska::panda::Udp>> ConnectionType;
104 
105  rcpt_low::PacketGeneratorConfig generator_config;
106  generator_config.interval(std::chrono::microseconds(2000));
107 
108  panda::IpAddress address(0, "127.0.0.1");
109  boost::asio::ip::udp::endpoint local_endpoint = address.end_point<boost::asio::ip::udp::endpoint>();
110 
111  rcpt_low::Config config;
112  config.spectra_per_chunk(number_of_samples); // this is the number of time samples
113  ASSERT_EQ(number_of_samples, config.spectra_per_chunk());
114 
115 
116  SCOPED_TRACE(number_of_channels);
117  SCOPED_TRACE("number_of_channels per time sample:");
118  PANDA_LOG << "number_of_channles per time sample=" << number_of_channels;
119  config.number_of_channels(number_of_channels);
120  config.remote_end_point(local_endpoint);
121  ASSERT_EQ(number_of_channels, config.number_of_channels());
122 
123  //Listening to the endpoint
124  rcpt_low::UdpStream stream(config);
125  //std::cout<<"Listening now..... \n";
126  ska::panda::DataManager<UdpStream> dm(stream);
127  auto endpoint = stream.local_end_point();
128  //std::cout<<"Data Size "<<dm.size()<<" \n";
129  // Start a UDP stream
130  TestModel model;
131  typedef PacketGenerator<TestModel> GeneratorType;
132  GeneratorType generator(model, generator_config);
133  panda::Engine& engine = config.engine();
134  ConnectionType connection(engine); // to send packets
135  connection.set_remote_end_point(endpoint);
136 
137  // construct a data set that matches the specified config to use as a reference
138 
139  DataType data_ref(data::DimensionSize<data::Time>(config.spectra_per_chunk())
140  , data::DimensionSize<data::Frequency>(config.number_of_channels()));
141 
143 
144  // test data throughput
145  unsigned delta = 0U;
146  unsigned received_count=0U;
147  engine.poll(); // ensure the DataManager has completed all init jobs before we start testing throughput
148 
149  // send enough packets to fill a single chunk
150  std::size_t packet_tot = delta;
151  auto chunk_size = data_ref.number_of_channels()*data_ref.number_of_spectra();
152  //size_t chunk_size = 995328U;
153  unsigned packet_count = 0U;
154  // We send more packet than we really need as some OS's will drop the first packet if there is no ARP entry in the
155  // cache (https://stackoverflow.com/questions/11812731/first-udp-message-to-a-specific-remote-ip-gets-lost) and will only
156  // queue one more until resolved
157  PANDA_LOG <<" chunk size "<<chunk_size;
158  while(packet_tot < 2 * chunk_size )
159  {
160  ++packet_count;
161  connection.send(generator.next(), [&](ska::panda::Error e){
162  ASSERT_FALSE(e);
163  ++received_count;
164  });
165  engine.poll_one();
166  packet_tot += 1152U;
167  }
168  delta = packet_tot - chunk_size;
169 
170 
171  unsigned max_attempts=1000;
172  while(received_count < packet_count) {
173  engine.poll_one();
174  if(--max_attempts == 0) FAIL() << "time out: expected packets not received";
175  }
176 
177  auto data_out_ptr = std::get<0>(dm.next()); // keep it in context
178 
179  auto const& data_out = *data_out_ptr;
180  auto it = data_out.cbegin();
181 
182  //metadata tests
183  boost::units::quantity<ska::cheetah::data::MegaHertz, double> bandwidth = static_cast<boost::units::quantity<ska::cheetah::data::MegaHertz, double>>(data_out.low_high_frequencies().second);
184  ASSERT_EQ(bandwidth.value(),350.0);
185  ASSERT_LT(data_out.start_time(),ska::cheetah::utils::ModifiedJulianClock::now());
186  ASSERT_GT(data_out.start_time()+std::chrono::seconds(10),ska::cheetah::utils::ModifiedJulianClock::now());
187 
188  auto model_data = model.sent_data();
189  auto sent_it = model_data.cbegin();
190  auto sent_it_end = model_data.cend();
191 
192  ASSERT_NE(nullptr, &*data_out.begin());
193 
194  for(int channel=0;channel<9;channel++)
195  {
196  for(int sample=0; sample<128; sample++)
197  {
198  if(sent_it_end>sent_it+(channel*512+sample*2+257))
199  {
200  int s1 = (int)(*(sent_it+(channel*256+sample)));
201  int s2 = (int)(*(sent_it+(channel*256+sample)));
202  int s3 = (int)(*(sent_it+(channel*256+sample+128)));
203  int s4 = (int)(*(sent_it+(channel*256+sample+128)));
204  uint8_t intensity = uint8_t((s1*s1 + s2*s2)/4096) + uint8_t((s3*s3 + s4*s4)/4096);
205  ASSERT_EQ(*(it+(channel*128+sample)) , intensity);
206  }
207  }
208  }
209 
210 
211 }
212 
213 TEST_F(UdpStreamTest, test_udp_packets_stream_data_consistency_channels_size_equal_packet_size)
214 {
215  typedef BeamFormerPacket<int8_t,128,9> Packet;
216  test_udp_packets_stream_data_consistency<Packet>(128, 7776);
217 }
218 
219 
220 
221 } // namespace test
222 } // namespace rcpt_low
223 } // namespace cheetah
224 } // namespace ska
Interface to packing/unpacking rcpt from the BeamFormer rcpt stream UDP packet.
configurable parameters for the rcpt
Definition: Config.h:42
const std::chrono::microseconds & interval() const
time sepearation between consequetive packets in micro seconds
unsigned spectra_per_chunk() const
return the number of time samples in a chunk
Definition: Config.cpp:73
Packs data into a UDP stream Packet Header format of the BeamFormer.
Some limits and constants for FLDO.
Definition: Brdz.h:35
NumericalT DataType
the underlying data storage type for the amplitude of the signal
Definition: TimeFrequency.h:96
std::size_t number_of_channels() const
Configuration for the packet generator to the time seperation of the packets.
Traits describing the BeamFormer Data Stream to the panda::PacketSream system.
std::size_t number_of_spectra() const