Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SigProcDadaStreamTest.cpp
1 /*
2  ** The MIT License (MIT)
3  **
4  ** Copyright (c) 2016 The SKA organisation
5  **
6  ** Permission is hereby granted, free of charge, to any person obtaining a copy
7  ** of this software and associated documentation files (the "Software"), to deal
8  ** in the Software without restriction, including without limitation the rights
9  ** to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  ** copies of the Software, and to permit persons to whom the Software is
11  ** furnished to do so, subject to the following conditions:
12  **
13  ** The above copyright notice and this permission notice shall be included in all
14  ** copies or substantial portions of the Software.
15  **
16  ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  ** IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  ** FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  ** AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  ** LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  ** OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  ** SOFTWARE.
23  **/
24 
25 #include "cheetah/psrdada/DadaWriteClient.h"
26 #include "cheetah/sigproc/SigProcHeader.h"
27 #include "cheetah/generators/GaussianNoise.h"
28 #include "cheetah/generators/GaussianNoiseConfig.h"
29 #include "cheetah/psrdada/DadaReadClient.h"
30 #include "cheetah/psrdada/SigProcDadaStream.h"
31 #include "cheetah/psrdada/test/SigProcDadaStreamTest.h"
32 #include "cheetah/psrdada/test_utils/TestDadaDB.h"
33 #include "panda/DataManager.h"
34 #include "panda/test/gtest.h"
35 #include <sstream>
36 #include <fstream>
37 #include <condition_variable>
38 
39 namespace ska {
40 namespace cheetah {
41 namespace psrdada {
42 namespace test {
43 
44 SigProcDadaStreamTest::SigProcDadaStreamTest()
45  : ::testing::Test()
46 {
47 }
48 
49 SigProcDadaStreamTest::~SigProcDadaStreamTest()
50 {
51 }
52 
53 void SigProcDadaStreamTest::SetUp()
54 {
55 }
56 
57 void SigProcDadaStreamTest::TearDown()
58 {
59 }
60 
61 namespace detail {
62 
63 
65  public:
66  SigProcDadaWriter(key_t key)
67  : _key(key)
68  {
69  }
70 
71  template<typename Arch, typename NumericalT>
72  SigProcDadaWriter& operator<<(data::TimeFrequency<Arch, NumericalT> const& tf)
73  {
75  h.number_of_channels(tf.number_of_channels());
76  h.number_of_bits(sizeof(typename data::TimeFrequency<Arch, NumericalT>::DataType)*8);
77  h.start_time(tf.start_time());
78  h.sample_interval(tf.sample_interval());
79  if( _header != h )
80  {
81  h.start_time(_header.start_time() + std::chrono::duration<double>(_header.sample_interval().value() * (double)tf.number_of_spectra()));
82  _header = h;
83  if(_writer)
84  {
85  _writer->new_sequence();
86  }
87  else
88  {
89  _writer.reset(new DadaWriteClient(_key, [this](std::ostream& out) {_header.write(out);}));
90  }
91  }
92  auto it = tf.begin();
93  _writer->write(it,tf.end());
94  _header.start_time(_header.start_time() + std::chrono::duration<double>(_header.sample_interval().value() * (double)tf.number_of_spectra()));
95  return *this;
96  }
97 
98  void new_sequence()
99  {
100  _writer->new_sequence();
101  }
102 
103  void release_block()
104  {
105  _writer->write_eod();
106  }
107 
108  private:
109  key_t _key;
110  sigproc::SigProcHeader _header;
111  std::unique_ptr<DadaWriteClient> _writer;
112 };
113 
114 
115 // Generating some test data
116 
117 data::TimeFrequency<Cpu, uint8_t> test_data(std::size_t number_of_samples, std::uint32_t number_of_channels)
118 {
120  static generators::GaussianNoiseConfig config;
121  static generators::GaussianNoise<uint8_t> generator(config);
122  data_chunk.resize(data::DimensionSize<data::Time>(number_of_samples),data::DimensionSize<data::Frequency>(number_of_channels));
123  generator.next(data_chunk);
124  data::TimeFrequency<Cpu, uint8_t>::TimeType interval(1.0 * boost::units::si::milli * boost::units::si::seconds);
125  data_chunk.sample_interval(interval);
126  return data_chunk;
127 }
128 
129 data::TimeFrequency<Cpu, uint8_t> test_data(std::size_t number_of_samples, std::uint32_t number_of_channels, int offset)
130 {
132  static generators::GaussianNoiseConfig config;
133  static generators::GaussianNoise<uint8_t> generator(config);
134  data_chunk.resize(data::DimensionSize<data::Time>(number_of_samples),data::DimensionSize<data::Frequency>(number_of_channels));
135  generator.next(data_chunk);
136  data::TimeFrequency<Cpu, uint8_t>::TimeType interval(1.0 * boost::units::si::milli * boost::units::si::seconds);
137  std::chrono::milliseconds off(offset);
138  data::TimeFrequency<Cpu, uint8_t>::TimePointType offstart(off);
139  data_chunk.start_time(offstart);
140  data_chunk.sample_interval(interval);
141  return data_chunk;
142 }
143 } // namespace detail
144 
145 TEST_F(SigProcDadaStreamTest, test_construction_ordering)
146 { // Construct a reader before a writer to make sure that it does not block the main thread
147 
148  Config config;
149  test_utils::TestDadaDB test_db( 5, 409600, 10, 4096);
150  test_db.create();
151  config.number_of_samples(10239);
152  config.dada_key(test_db.key());
153  SigProcDadaStream stream(config);
154  detail::SigProcDadaWriter writer(test_db.key());
155 
156 }
157 
158 TEST_F(SigProcDadaStreamTest, test_chunk_smaller_than_dada_stream)
159 {
160  // Read in from a stream where the chunk size < the seqeunce size
161  // expect the next call will be filled from where the previous chunk left off
162  // with correct timestamps set etc.
163 
164  Config config;
165  test_utils::TestDadaDB test_db( 5, 409600, 10, 4096);
166  test_db.create();
167 
168 
169  // Write data to the DADA buffer
170  //detail::SigProcDadaWriter writer(test_db.key());
171  auto data = detail::test_data(20480, 2);
172  detail::SigProcDadaWriter writer(test_db.key());
173  writer << data;
174  writer.new_sequence();
175  writer << data;
176  writer.new_sequence();
177 
178  // Read the data
179  config.number_of_samples(10239);
180  config.dada_key(test_db.key());
181  SigProcDadaStream stream(config);
182  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
183  ASSERT_FALSE(stream.process());
184  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
185  data::TimeFrequency<Cpu, uint8_t>& read_data = *(std::get<0>(data_tuple));
186 
187  // first Data verification
188  ASSERT_EQ(10239U, read_data.number_of_spectra());
189  ASSERT_EQ(std::size_t(2), read_data.number_of_channels());
190  auto it = data.begin();
191  auto rit = read_data.begin();
192  std::size_t total_number_of_samples = read_data.number_of_channels() * read_data.number_of_spectra();
193  for (unsigned ii=0; ii<total_number_of_samples; ii++)
194  {
195  ASSERT_EQ(rit[ii], it[ii]);
196  }
197 
198  // Second chunk verification
199  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
200  data::TimeFrequency<Cpu, uint8_t>& read_data1 = *(std::get<0>(data_tuple1));
201  ASSERT_EQ(10239U, read_data1.number_of_spectra());
202  ASSERT_EQ(std::size_t(2),read_data1.number_of_channels());
203  it = data.begin();
204  rit = read_data1.begin();
205  for (unsigned ii=0; ii<total_number_of_samples; ii++)
206  {
207  ASSERT_EQ(rit[ii], it[ii+total_number_of_samples]);
208  }
209 
210 }
211 
212 
213 TEST_F(SigProcDadaStreamTest, test_chunk_with_eod_no_header)
214 {
215  // Read in from a stream where the chunk size < the seqeunce size
216  // expect the next call will be filled from where the previous chunk left off
217  // expect the stream to exit cleanly when no header is written
218  // to the new sequence after the eod
219 
220  Config config;
221  test_utils::TestDadaDB test_db( 5, 409600, 10, 4096);
222  test_db.create();
223 
224  // Write data to the DADA buffer
225  detail::SigProcDadaWriter writer(test_db.key());
226  auto data = detail::test_data(20480,2);
227  writer << data;
228  writer.release_block();
229 
230  // Read the data
231  config.number_of_samples(10239);
232  config.dada_key(test_db.key());
233  SigProcDadaStream stream(config);
234  ASSERT_FALSE(stream.process());
235  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
236  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t >>> data_tuple1 = chunk_manager.next();
237  data::TimeFrequency<Cpu, uint8_t>& read_data1 = *(std::get<0>(data_tuple1));
238  ASSERT_EQ(10239U, read_data1.number_of_spectra());
239  ASSERT_EQ(std::size_t(2),read_data1.number_of_channels());
240  auto it = data.begin();
241  auto rit = read_data1.begin();
242  for (unsigned ii=0; ii<20478; ii++)
243  {
244  ASSERT_EQ(rit[ii], it[ii]);
245  }
246 
247 }
248 
249 TEST_F(SigProcDadaStreamTest, test_multisequence_dada_stream_inconsistent_headers)
250 {
251  // Read in from a stream where the data available is less than
252  // the expected number of samples in the data block to be filled.
253  // Expect:
254  // At the exhaustion of the data stream, to open up a new sequence and continue
255  // reading from there.
256  // The test needs to verify correct behaviour when the new sequence header
257  // data is inconsistent with the current sequence header data
258  // e.g different nbits, number of channels
259 
260  Config config;
261  test_utils::TestDadaDB test_db( 10, 409600, 10, 4096);
262  test_db.create();
263 
264  // Generate test data and write to the dada buffer
265  auto data = detail::test_data(10240, 2);
266  auto data2 = detail::test_data(10000, 4);
267  detail::SigProcDadaWriter writer(test_db.key());
268  writer << data;
269  writer << data2;
270  writer.new_sequence();
271  writer << data2;
272  writer.new_sequence();
273 
274  // Setup the sigprocdada stream
275  config.number_of_samples(10241);
276  config.dada_key(test_db.key());
277  SigProcDadaStream stream(config);
278  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
279  ASSERT_FALSE(stream.process());
280  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
281  data::TimeFrequency<Cpu, uint8_t>& read_data1 = *(std::get<0>(data_tuple));
282  ASSERT_EQ(10240U, read_data1.number_of_spectra());
283  ASSERT_EQ(std::size_t(2), read_data1.number_of_channels());
284 
285  auto it = data.begin();
286  auto rit = read_data1.begin();
287  // check first data chunk received is as sent
288  for (unsigned ii=0; ii<20480; ++ii)
289  {
290  ASSERT_EQ((rit[ii]), (it[ii]));
291  }
292  // Parsing data with inconsistent header
293  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple2 = chunk_manager.next();
294  data::TimeFrequency<Cpu, uint8_t>& read_data2 = *(std::get<0>(data_tuple2));
295  ASSERT_EQ(10241U, read_data2.number_of_spectra());
296  ASSERT_EQ(std::size_t(4), read_data2.number_of_channels());
297  rit = read_data2.begin();
298  it = data2.begin();
299  for (unsigned ii=0; ii<20482; ++ii)
300  {
301  ASSERT_EQ(rit[ii], it[ii]);
302  }
303 }
304 
305 TEST_F(SigProcDadaStreamTest, test_multisequence_dada_stream_consistent_headers)
306 {
307  // Read in from a stream where the data available is less than
308  // the expected number of samples in in the data block to be filled.
309  // Expect:
310  // At the exhaustion of the data stream, to open up a new sequence and continue
311  // reading from there.
312  // The test needs to verify correct behaviour when
313  // the new sequence header data is consistent with the current data
314 
315  test_utils::TestDadaDB test_db( 10, 409600, 10, 4096);
316  test_db.create();
317 
318  Config config;
319 
320  std::size_t chunk_time_samples=10241;
321  auto test_data = detail::test_data(10240,2);
322  detail::SigProcDadaWriter writer(test_db.key());
323  writer << test_data;
324  writer.new_sequence();
325  writer << test_data;
326  writer.new_sequence();
327  writer << test_data;
328  writer.new_sequence();
329  writer << test_data;
330  writer.new_sequence();
331  writer << test_data;
332  writer.new_sequence();
333 
334  // read the dada buffer data as a stream
335  config.number_of_samples(chunk_time_samples);
336  config.dada_key(test_db.key());
337  SigProcDadaStream stream(config);
338  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
339  ASSERT_FALSE(stream.process());
340  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
341  data::TimeFrequency<Cpu, uint8_t>& read_data = *(std::get<0>(data_tuple));
342  ASSERT_EQ(10241U, read_data.number_of_spectra());
343  auto it = test_data.begin();
344  auto rit = read_data.begin();
345  for(std::size_t ii=0; ii<20480; ++ii)
346  {
347  ASSERT_EQ(rit[ii], it[ii]);
348  }
349  ASSERT_EQ(std::size_t(2), read_data.number_of_channels());
350 
351  // Next chunk
352  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
353  data::TimeFrequency<Cpu, uint8_t>& read_data1 = *(std::get<0>(data_tuple1));
354  ASSERT_EQ(10241U, read_data1.number_of_spectra());
355  rit = read_data1.begin();
356  it = test_data.begin();
357  for(std::size_t ii=0; ii<20478; ++ii)
358  {
359  // [2+ii] because we are offset by 2 samples
360  ASSERT_EQ(rit[ii], it[2+ii]);
361  }
362  ASSERT_EQ(std::size_t(2), read_data1.number_of_channels());
363 
364  // Next chunk
365  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple2 = chunk_manager.next();
366  data::TimeFrequency<Cpu, uint8_t>& read_data2 = *(std::get<0>(data_tuple2));
367  ASSERT_EQ(10241U, read_data2.number_of_spectra());
368  rit = read_data2.begin();
369  it = test_data.begin();
370  for(std::size_t ii=0; ii<20476; ++ii)
371  {
372  // [4+ii] because we are offset by 4 samples
373  ASSERT_EQ(rit[ii], it[4+ii]);
374  }
375  ASSERT_EQ(std::size_t(2), read_data2.number_of_channels());
376 
377 
378 }
379 
380 TEST_F(SigProcDadaStreamTest, test_dada_stream_corrupted_header)
381 {
382  // This test verifies correct behaviour occurs when the
383  // header information is not in the expected sigproc format
384  test_utils::TestDadaDB test_db( 10, 409600, 10, 4096);
385  test_db.create();
386 
387  //Write Data
388  Config config;
389  std::string write_header("This is not SIGPROC format header!!!");
390  DadaWriteClient writer(test_db.key(),[&](std::ostream& out){out << write_header;});
391  std::vector<unsigned char> input_data(1024);
392  auto it = input_data.begin();
393  writer.write(it,input_data.end());
394  writer.new_sequence();
395  config.number_of_samples(512);
396  config.dada_key(test_db.key());
397  SigProcDadaStream stream(config);
398  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
399  ASSERT_ANY_THROW(chunk_manager.next());
400 }
401 
402 TEST_F(SigProcDadaStreamTest, test_matched_size_dada_stream)
403 {
404  // The chunk to fill and the dada stream sequence are of the same size
405  //
406  test_utils::TestDadaDB test_db( 10, 409600, 10, 4096);
407  test_db.create();
408  Config config;
409 
410  std::size_t chunk_time_samples=10240;
411  auto test_data = detail::test_data(chunk_time_samples,2);
412  detail::SigProcDadaWriter writer(test_db.key());
413  writer << test_data;
414  writer.new_sequence();
415  auto test_data2 = detail::test_data(chunk_time_samples,2);
416  writer << test_data2;
417  writer.new_sequence();
418  writer << test_data2;
419  writer.new_sequence();
420  //writer << test_data2;
421  //writer.new_sequence();
422 
423  // read the dada buffer data as a stream
424  config.number_of_samples(10240);
425  config.dada_key(test_db.key());
426  SigProcDadaStream stream(config);
427  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
428  ASSERT_FALSE(stream.process());
429  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
430  data::TimeFrequency<Cpu, uint8_t>& data = *(std::get<0>(data_tuple));
431  auto it = test_data.begin();
432  auto rit = data.begin();
433  for(std::size_t ii=0; ii<10240U; ++ii)
434  {
435  ASSERT_EQ(rit[ii],it[ii]);
436  }
437  ASSERT_EQ(chunk_time_samples, data.number_of_spectra());
438  ASSERT_EQ(test_data.number_of_channels(), data.number_of_channels());
439 
440  // ensure we can still get a new tuple (i.e no lock up)
441  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
442  data::TimeFrequency<Cpu, uint8_t>& data1 = *(std::get<0>(data_tuple1));
443  ASSERT_EQ(chunk_time_samples, data1.number_of_spectra());
444  it = test_data2.begin();
445  rit = data1.begin();
446  for(std::size_t ii=0; ii<10240U; ++ii)
447  {
448  ASSERT_EQ(rit[ii],it[ii]);
449  }
450 
451  // ensure we can still get a new tuple (i.e no lock up)
452  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple2 = chunk_manager.next();
453  data::TimeFrequency<Cpu, uint8_t>& data2 = *(std::get<0>(data_tuple2));
454  ASSERT_EQ(chunk_time_samples, data2.number_of_spectra());
455  it = test_data2.begin();
456  rit = data2.begin();
457  for(std::size_t ii=0; ii<10240U; ++ii)
458  {
459  ASSERT_EQ(rit[ii],it[ii]);
460  }
461 
462 }
463 
464 
465 TEST_F(SigProcDadaStreamTest, test_multithreaded_dada_stream)
466 {
467  // Test to make sure that the stream keeps runnning even for smaller dada buffer.
468  // The test verifies that as long as there are two threads that are reading and
469  // writing data blocks, the stream continues to run until the end of data stream.
470  //
471 
472  test_utils::TestDadaDB test_db( 3, 20480, 10, 4096);
473  test_db.create();
474 
475  Config config;
476 
477  //Generating data
478  std::size_t chunk_time_samples=40960;
479  auto test_data = detail::test_data(chunk_time_samples,2);
480  detail::SigProcDadaWriter writer(test_db.key());
481 
482  // Reading DADA buffer as a stream
483 
484  config.number_of_samples(40000);
485  config.dada_key(test_db.key());
486  std::thread t1([&]()
487  {
488  writer << test_data;
489  writer.new_sequence();
490  writer << test_data;
491  writer.new_sequence();
492 
493  });
494 
495  std::thread t2([&]()
496  {
497  SigProcDadaStream stream(config);
498  panda::DataManager<SigProcDadaStream> chunk_manager(stream);
499  ASSERT_FALSE(stream.process());
500  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
501  data::TimeFrequency<Cpu, uint8_t>& data = *(std::get<0>(data_tuple));
502  ASSERT_EQ(40000U, data.number_of_spectra());
503  ASSERT_EQ(std::size_t(2), data.number_of_channels());
504  std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
505  data::TimeFrequency<Cpu, uint8_t>& data1 = *(std::get<0>(data_tuple1));
506  ASSERT_EQ(std::size_t(2), data1.number_of_channels());
507  });
508 
509  // Running thread
510  t1.join();
511  t2.join();
512 }
513 
514 
515 } // namespace test
516 } // namespace psrdada
517 } // namespace cheetah
518 } // namespace ska
Class that provides means for writing to a DADA ring buffer.
SigProcDadaStream configuration parameters.
Definition: Config.h:90
A mock class for implementing the interface of a DADA DB (Header/Data Unit)
Definition: TestDadaDB.h:46
TimePointType const & start_time() const
Some limits and constants for FLDO.
Definition: Brdz.h:35
NumericalT DataType
the underlying data storage type for the amplitude of the signal
Definition: TimeFrequency.h:96
std::size_t number_of_channels() const
void create()
Create the data and header blocks in shared memory.
Definition: TestDadaDB.cpp:88
void start_time(typename utils::ModifiedJulianClock::time_point const &)
set the start_time (corresponding to the time of the first sample
key_t key() const
Return the hexidecimal shared memory key.
Definition: TestDadaDB.cpp:158
std::size_t number_of_spectra() const
struct for the header information of a SigProc file
Definition: SigProcHeader.h:43