25 #include "cheetah/psrdada/DadaWriteClient.h" 26 #include "cheetah/sigproc/SigProcHeader.h" 27 #include "cheetah/generators/GaussianNoise.h" 28 #include "cheetah/generators/GaussianNoiseConfig.h" 29 #include "cheetah/psrdada/DadaReadClient.h" 30 #include "cheetah/psrdada/SigProcDadaStream.h" 31 #include "cheetah/psrdada/test/SigProcDadaStreamTest.h" 32 #include "cheetah/psrdada/test_utils/TestDadaDB.h" 33 #include "panda/DataManager.h" 34 #include "panda/test/gtest.h" 37 #include <condition_variable> 44 SigProcDadaStreamTest::SigProcDadaStreamTest()
49 SigProcDadaStreamTest::~SigProcDadaStreamTest()
53 void SigProcDadaStreamTest::SetUp()
57 void SigProcDadaStreamTest::TearDown()
71 template<
typename Arch,
typename NumericalT>
75 h.number_of_channels(tf.number_of_channels());
78 h.sample_interval(tf.sample_interval());
81 h.
start_time(_header.start_time() + std::chrono::duration<double>(_header.sample_interval().value() * (double)tf.number_of_spectra()));
85 _writer->new_sequence();
89 _writer.reset(
new DadaWriteClient(_key, [
this](std::ostream& out) {_header.write(out);}));
93 _writer->write(it,tf.end());
94 _header.
start_time(_header.start_time() + std::chrono::duration<double>(_header.sample_interval().value() * (double)tf.number_of_spectra()));
100 _writer->new_sequence();
105 _writer->write_eod();
111 std::unique_ptr<DadaWriteClient> _writer;
122 data_chunk.resize(data::DimensionSize<data::Time>(number_of_samples),data::DimensionSize<data::Frequency>(number_of_channels));
123 generator.next(data_chunk);
124 data::TimeFrequency<Cpu, uint8_t>::TimeType interval(1.0 * boost::units::si::milli * boost::units::si::seconds);
134 data_chunk.resize(data::DimensionSize<data::Time>(number_of_samples),data::DimensionSize<data::Frequency>(number_of_channels));
135 generator.next(data_chunk);
136 data::TimeFrequency<Cpu, uint8_t>::TimeType interval(1.0 * boost::units::si::milli * boost::units::si::seconds);
137 std::chrono::milliseconds off(offset);
138 data::TimeFrequency<Cpu, uint8_t>::TimePointType offstart(off);
151 config.number_of_samples(10239);
152 config.dada_key(test_db.
key());
153 SigProcDadaStream stream(config);
171 auto data = detail::test_data(20480, 2);
174 writer.new_sequence();
176 writer.new_sequence();
179 config.number_of_samples(10239);
180 config.dada_key(test_db.
key());
181 SigProcDadaStream stream(config);
182 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
183 ASSERT_FALSE(stream.process());
184 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
190 auto it = data.begin();
191 auto rit = read_data.begin();
193 for (
unsigned ii=0; ii<total_number_of_samples; ii++)
195 ASSERT_EQ(rit[ii], it[ii]);
199 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
204 rit = read_data1.begin();
205 for (
unsigned ii=0; ii<total_number_of_samples; ii++)
207 ASSERT_EQ(rit[ii], it[ii+total_number_of_samples]);
226 auto data = detail::test_data(20480,2);
228 writer.release_block();
231 config.number_of_samples(10239);
232 config.dada_key(test_db.
key());
233 SigProcDadaStream stream(config);
234 ASSERT_FALSE(stream.process());
235 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
236 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t >>> data_tuple1 = chunk_manager.next();
240 auto it = data.begin();
241 auto rit = read_data1.begin();
242 for (
unsigned ii=0; ii<20478; ii++)
244 ASSERT_EQ(rit[ii], it[ii]);
265 auto data = detail::test_data(10240, 2);
266 auto data2 = detail::test_data(10000, 4);
270 writer.new_sequence();
272 writer.new_sequence();
275 config.number_of_samples(10241);
276 config.dada_key(test_db.
key());
277 SigProcDadaStream stream(config);
278 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
279 ASSERT_FALSE(stream.process());
280 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
285 auto it = data.begin();
286 auto rit = read_data1.begin();
288 for (
unsigned ii=0; ii<20480; ++ii)
290 ASSERT_EQ((rit[ii]), (it[ii]));
293 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple2 = chunk_manager.next();
297 rit = read_data2.begin();
299 for (
unsigned ii=0; ii<20482; ++ii)
301 ASSERT_EQ(rit[ii], it[ii]);
320 std::size_t chunk_time_samples=10241;
321 auto test_data = detail::test_data(10240,2);
324 writer.new_sequence();
326 writer.new_sequence();
328 writer.new_sequence();
330 writer.new_sequence();
332 writer.new_sequence();
335 config.number_of_samples(chunk_time_samples);
336 config.dada_key(test_db.
key());
337 SigProcDadaStream stream(config);
338 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
339 ASSERT_FALSE(stream.process());
340 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
343 auto it = test_data.begin();
344 auto rit = read_data.begin();
345 for(std::size_t ii=0; ii<20480; ++ii)
347 ASSERT_EQ(rit[ii], it[ii]);
352 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
355 rit = read_data1.begin();
356 it = test_data.begin();
357 for(std::size_t ii=0; ii<20478; ++ii)
360 ASSERT_EQ(rit[ii], it[2+ii]);
365 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple2 = chunk_manager.next();
368 rit = read_data2.begin();
369 it = test_data.begin();
370 for(std::size_t ii=0; ii<20476; ++ii)
373 ASSERT_EQ(rit[ii], it[4+ii]);
389 std::string write_header(
"This is not SIGPROC format header!!!");
391 std::vector<unsigned char> input_data(1024);
392 auto it = input_data.begin();
393 writer.write(it,input_data.end());
394 writer.new_sequence();
395 config.number_of_samples(512);
396 config.dada_key(test_db.
key());
397 SigProcDadaStream stream(config);
398 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
399 ASSERT_ANY_THROW(chunk_manager.next());
410 std::size_t chunk_time_samples=10240;
411 auto test_data = detail::test_data(chunk_time_samples,2);
414 writer.new_sequence();
415 auto test_data2 = detail::test_data(chunk_time_samples,2);
416 writer << test_data2;
417 writer.new_sequence();
418 writer << test_data2;
419 writer.new_sequence();
424 config.number_of_samples(10240);
425 config.dada_key(test_db.
key());
426 SigProcDadaStream stream(config);
427 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
428 ASSERT_FALSE(stream.process());
429 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
431 auto it = test_data.begin();
432 auto rit = data.begin();
433 for(std::size_t ii=0; ii<10240U; ++ii)
435 ASSERT_EQ(rit[ii],it[ii]);
441 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
444 it = test_data2.begin();
446 for(std::size_t ii=0; ii<10240U; ++ii)
448 ASSERT_EQ(rit[ii],it[ii]);
452 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple2 = chunk_manager.next();
455 it = test_data2.begin();
457 for(std::size_t ii=0; ii<10240U; ++ii)
459 ASSERT_EQ(rit[ii],it[ii]);
478 std::size_t chunk_time_samples=40960;
479 auto test_data = detail::test_data(chunk_time_samples,2);
484 config.number_of_samples(40000);
485 config.dada_key(test_db.
key());
489 writer.new_sequence();
491 writer.new_sequence();
497 SigProcDadaStream stream(config);
498 panda::DataManager<SigProcDadaStream> chunk_manager(stream);
499 ASSERT_FALSE(stream.process());
500 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple = chunk_manager.next();
504 std::tuple<std::shared_ptr<data::TimeFrequency<Cpu, uint8_t>>> data_tuple1 = chunk_manager.next();
Class that provides means for writing to a DADA ring buffer.
SigProcDadaStream configuration parameters.
A mock class for implementing the interface of a DADA DB (Header/Data Unit)
TimePointType const & start_time() const
Some limits and constants for FLDO.
NumericalT DataType
the underlying data storage type for the amplitude of the signal
std::size_t number_of_channels() const
TimeType sample_interval() const
void create()
Create the data and header blocks in shared memory.
key_t key() const
Return the hexidecimal shared memory key.
std::size_t number_of_spectra() const