Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
UdpStreamTest.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/rcpt/test/UdpStreamTest.h"
25 #include "cheetah/rcpt/UdpStream.h"
26 #include "cheetah/rcpt/Config.h"
27 #include "cheetah/rcpt/PacketGenerator.h"
28 #include "panda/DataManager.h"
29 #include "panda/IpAddress.h"
30 #include "panda/Log.h"
31 #include "panda/Engine.h"
32 #include <algorithm>
33 #include <deque>
34 #include <mutex>
35 #include <iostream>
36 #include <sstream>
37 
38 
39 namespace ska {
40 namespace cheetah {
41 namespace rcpt {
42 namespace test {
43 
44 
45 UdpStreamTest::UdpStreamTest()
46  : ::testing::Test()
47 {
48 }
49 
50 UdpStreamTest::~UdpStreamTest()
51 {
52 }
53 
54 void UdpStreamTest::SetUp()
55 {
56 }
57 
58 void UdpStreamTest::TearDown()
59 {
60 }
61 
62 struct TestModel {
63  public:
65 
66  DataType& next(DataType& data) {
67  // fill data with values
68  std::unique_lock<std::mutex> lk(_mutex);
69  typename DataType::DataType val = 0;
70  std::generate(data.begin(), data.end(), [&]{ return ++val; } );
71  _data.push_back(data);
72  return data;
73  }
74 
75  DataType sent_data() {
76  std::lock_guard<std::mutex> lk(_mutex);
77  if(_data.empty()) throw panda::Error("no more sent data");
78  DataType d = _data.front();
79  _data.pop_front();
80  return d;
81  }
82 
83  bool has_sent_data() const {
84  std::lock_guard<std::mutex> lk(_mutex);
85  return !_data.empty();
86  }
87 
88  private:
89  mutable std::mutex _mutex;
90  std::deque<DataType> _data;
91 };
92 
93 static
94 std::string write_data(BeamFormerDataTraits::DataType const& data)
95 {
96  std::stringstream ss;
97  ss << "(";
98  for(auto d : data) {
99  ss << (unsigned)d << ",";
100  }
101  ss << ")";
102  return ss.str();
103 }
104 
105 template<typename Packet>
106 void test_udp_packets_stream_data_consistency(std::size_t number_of_channels)
107 {
108  typedef ska::panda::Connection<ska::panda::ConnectionTraits<ska::panda::Udp>> ConnectionType;
109 
110  panda::IpAddress address(0, "127.0.0.1");
111  boost::asio::ip::udp::endpoint local_endpoint = address.end_point<boost::asio::ip::udp::endpoint>();
112 
113  rcpt::Config config;
114  config.samples_per_chunk(1); // this is the number of time samples
115  ASSERT_EQ(1U, config.samples_per_chunk());
116 
117 
118  SCOPED_TRACE(number_of_channels);
119  SCOPED_TRACE("number_of_channels per time sample:");
120  PANDA_LOG_DEBUG << "number_of_channles per time sample=" << number_of_channels;
121  config.number_of_channels(number_of_channels);
122  config.remote_end_point(local_endpoint);
123  ASSERT_EQ(number_of_channels, config.number_of_channels());
124 
125  rcpt::UdpStream stream(config);
126  ska::panda::DataManager<UdpStream> dm(stream);
127  auto endpoint = stream.local_end_point();
128 
129  // Start a UDP stream
130  TestModel model;
131  typedef PacketGenerator<TestModel> GeneratorType;
132  GeneratorType generator(model, data::DimensionSize<data::Frequency>(config.number_of_channels()), data::DimensionSize<data::Time>(config.samples_per_chunk()));
133  panda::Engine& engine = config.engine();
134  ConnectionType connection(engine); // to send packets
135  connection.set_remote_end_point(endpoint);
136 
137  // construct a data set that matches the specified config to use as a reference
138  BeamFormerDataTraits::DataType data_ref(data::DimensionSize<data::Time>(config.samples_per_chunk())
139  , data::DimensionSize<data::Frequency>(config.number_of_channels()));
140  BeamFormerDataTraits traits;
141 
142  // test data throughput
143  unsigned delta = 0U;
144  unsigned received_count=0U;
145  engine.poll(); // ensure the DataManager has completed all init jobs before we start testing throughput
146 
147  // send enough packets to fill a single chunk
148  std::size_t packet_tot = delta;
149  auto chunk_size = traits.chunk_size(data_ref);
150  unsigned packet_count = 0U;
151  // We send more packet than we really need as some OS's will drop the first packet if there is no ARP entry in the
152  // cache (https://stackoverflow.com/questions/11812731/first-udp-message-to-a-specific-remote-ip-gets-lost) and will only
153  // queue one more until resolved
154  while(packet_tot < 2 * chunk_size )
155  {
156  ++packet_count;
157  connection.send(generator.next(), [&](ska::panda::Error e){
158  ASSERT_FALSE(e);
159  ++received_count;
160  });
161  engine.poll_one();
162  packet_tot += traits.packet_size();
163  }
164  delta = packet_tot - chunk_size;
165  PANDA_LOG << "sent " << packet_count << " packets";
166 
167  unsigned max_attempts=1000;
168  while(received_count < packet_count) {
169  engine.poll_one();
170  if(--max_attempts == 0) FAIL() << "time out: expected packets not received";
171  }
172 
173  auto data_out_ptr = std::get<0>(dm.next()); // keep it in context
174  auto const& data_out = *data_out_ptr;
175  ASSERT_NE(nullptr, &*data_out.begin());
176 
177  // find where the stream has started and align
178  auto model_data = model.sent_data();
179  auto sent_it = model_data.cbegin();
180  auto it = data_out.cbegin();
181 
182  // we don't know which packet would of been realigned toaso iterate through until we find one that matches
183  unsigned count=0;
184  unsigned packet_num=0;
185  do {
186  if(*sent_it != *it) {
187  // TODO should make info easier to interpret by writing from the packe size offset in the model data
188  // rather than the whole data chunk
189  PANDA_LOG_ERROR << "did not match packet " << packet_num << "\n"
190  << "got " << (int)*it << " sent:" << (int)*sent_it << " at position " << count
191  << "\n\tpacket samples (" << Packet::number_of_samples() << ")\n"
192  << "\tmodel data size(" << model_data.data_size() << ")\n"
193  << write_data(model_data) << "\n"
194  << "\tactual size=" << data_out.data_size()<< "( " << &data_out << ")\n"
195  << write_data(data_out);
196 
197  // increase by a packets worth of data
198  for(unsigned i=count; i < Packet::number_of_samples(); ++i)
199  {
200  if(++sent_it == model_data.end())
201  {
202  ASSERT_TRUE(model.has_sent_data()) << "unable to match data to any packets sent after checking " << packet_num << " packets." << " i=" << i;
203  model_data=model.sent_data();
204  sent_it=model_data.cbegin();
205  }
206  }
207  ++packet_num;
208  it = data_out.cbegin();
209  count = 0;
210  }
211  else {
212  if(++it == data_out.cend()) break;
213  ++count;
214  if(++sent_it == model_data.end())
215  {
216  ASSERT_TRUE(model.has_sent_data()) << "unable to match data to any packets sent after checking " << packet_num << " packets.";
217  model_data=model.sent_data();
218  sent_it=model_data.cbegin();
219  }
220  }
221  } while(true);
222 }
223 
224 TEST_F(UdpStreamTest, test_udp_packets_stream_data_consistency_channels_size_equal_packet_size)
225 {
226  typedef BeamFormerPacketMid Packet;
227  test_udp_packets_stream_data_consistency<Packet>(Packet::number_of_samples());
228 }
229 
230 TEST_F(UdpStreamTest, test_udp_packets_stream_data_consistency_channels_size_greater_packet_size)
231 {
232  typedef BeamFormerPacketMid Packet;
233  test_udp_packets_stream_data_consistency<Packet>(Packet::number_of_samples() + 1);
234 }
235 
236 TEST_F(UdpStreamTest, test_udp_packets_stream_data_consistency_channels_size_less_packet_size)
237 {
238  typedef BeamFormerPacketMid Packet;
239  test_udp_packets_stream_data_consistency<Packet>(Packet::number_of_samples() - 1);
240 }
241 
242 
243 } // namespace test
244 } // namespace rcpt
245 } // namespace cheetah
246 } // namespace ska
Packs data into a UDP stream Packet Header format of the BeamFormer.
static std::size_t chunk_size(DataType const &data)
return the total number of samples (time_samples * channels) in the data
unsigned samples_per_chunk() const
return the number of time samples in a chunk
Definition: Config.cpp:73
Traits describing the BeamFormer Data Stream to the panda::PacketSream system.
static std::size_t packet_size()
return the number of signal samples in a packet
Some limits and constants for FLDO.
Definition: Brdz.h:35
uint8_t DataType
the underlying data storage type for the amplitude of the signal
Definition: TimeFrequency.h:96
configurable parameters for the rcpt
Definition: Config.h:42