Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
BeamLauncherTest.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/pipeline/test/BeamLauncherTest.h"
25 #include "cheetah/pipeline/BeamLauncher.h"
26 #include "cheetah/data/TimeFrequency.h"
27 #include "panda/test/TestProducer.h"
28 #include "cheetah/pipeline/Empty.h"
29 #include "cheetah/sigproc/Config.h"
30 #include "cheetah/sigproc/SigProcFileStream.h"
31 
32 namespace ska {
33 namespace cheetah {
34 namespace pipeline {
35 namespace test {
36 
37 
38 BeamLauncherTest::BeamLauncherTest()
39  : ::testing::Test()
40 {
41 }
42 
43 BeamLauncherTest::~BeamLauncherTest()
44 {
45 }
46 
47 void BeamLauncherTest::SetUp()
48 {
49 }
50 
51 void BeamLauncherTest::TearDown()
52 {
53 }
54 
55 class TestConfigModule : public panda::ConfigModule
56 {
57  public:
58  TestConfigModule() : panda::ConfigModule("test") {}
59 
60  protected:
61  void add_options(OptionsDescriptionEasyInit&) {};
62 };
63 
64 typedef panda::test::TestProducer<data::TimeFrequency<Cpu, uint8_t>> TestStream;
65 
66 
67 class TestPipeline : public pipeline::PipelineHandler<uint8_t>
68 {
70 
71  protected:
72  volatile unsigned _operator_called;
74 
75  public:
76  TestPipeline(BeamConfig<uint8_t> const& beam_config)
77  : BaseT(_cheetah_config, beam_config)
78  , _operator_called(0) {}
79 
80  void operator()(TimeFrequencyType&) override
81  {
82  ++_operator_called;
83  PANDA_LOG_DEBUG << "Operator called";
84  }
85 
86  volatile unsigned& operator_called() {
87  return _operator_called;
88  }
89 
90  private:
91  CheetahConfig<uint8_t> _cheetah_config;
92 };
93 
95 {
96  private:
97  unsigned _throw;
98 
99  public:
100  ExceptionPipeline(BeamConfig<uint8_t> const& beam_config)
101  : TestPipeline(beam_config)
102  , _throw(0)
103  {}
104 
106  {
107  ++_operator_called;
108  if(_throw==0) { // throw the first time only
109  ++_throw;
110  throw std::runtime_error("oh oh");
111  }
112  }
113 
114  bool has_thrown() const {
115  return _throw > 0;
116  }
117 };
118 
119 TEST_F(BeamLauncherTest, test_no_beam)
120 {
121  MultiBeamConfig<uint8_t> mb_config;
122  TestConfigModule tcm;
123 
124  unsigned exec_factory_called = 0;
125  auto factory = [&](BeamConfig<uint8_t> const& beam_config) { ++exec_factory_called; return new TestPipeline(beam_config); };
126 
127  unsigned config_factory_called = 0;
128  BeamLauncher<TestStream, uint8_t> launcher(mb_config, [&](BeamConfig<uint8_t> const&) -> TestConfigModule const& { ++config_factory_called; return tcm; }, factory );
129  ASSERT_EQ(config_factory_called, 0U);
130 
131  ASSERT_EQ(0, launcher.exec());
132  ASSERT_EQ(exec_factory_called, 0U);
133 }
134 
135 TEST_F(BeamLauncherTest, test_single_beam_inactive)
136 {
137  BeamConfig<uint8_t> beam1;
138  beam1.active(false);
139  MultiBeamConfig<uint8_t> mb_config;
140  mb_config.add(beam1);
141  TestConfigModule tcm;
143 
144  unsigned exec_factory_called = 0;
145  auto factory = [&](BeamConfig<uint8_t> const& beam_config) { ++exec_factory_called; return new TestPipeline(beam_config); };
146 
147  unsigned config_factory_called = 0;
148  BeamLauncher<TestStream, uint8_t> launcher(mb_config, [&](BeamConfig<uint8_t> const&) -> TestConfigModule const& { ++config_factory_called; return tcm; }, factory );
149 
150 // Sending data to the stream
151  for (std::uint32_t ii =0 ; ii < launcher.streams().size(); ++ii)
152  {
153  *(launcher.streams()[ii]) << test_data;
154  }
155 
156  ASSERT_EQ(config_factory_called, 0U);
157 
158  ASSERT_EQ(0, launcher.exec());
159  ASSERT_EQ(exec_factory_called, 0U);
160 }
161 
162 TEST_F(BeamLauncherTest, test_single_beam_active)
163 {
164  BeamConfig<uint8_t> beam1;
165  beam1.active(true);
166  MultiBeamConfig<uint8_t> mb_config;
167  mb_config.add(beam1);
168  TestConfigModule tcm;
169 
170  std::shared_ptr<data::TimeFrequency<Cpu,uint8_t>> test_data = std::make_shared<data::TimeFrequency<Cpu, uint8_t>>();
171 
172  unsigned exec_factory_called = 0;
173  TestPipeline* test_pipeline=nullptr;
174  auto factory = [&](BeamConfig<uint8_t> const& beam_config) { ++exec_factory_called; test_pipeline = new TestPipeline(beam_config); return test_pipeline; };
175 
176  unsigned config_factory_called = 0;
177  BeamLauncher<TestStream, uint8_t> launcher(mb_config, [&](BeamConfig<uint8_t> const&) -> TestConfigModule const& { ++config_factory_called; return tcm; }, factory );
178 
179 
180  ASSERT_EQ(config_factory_called, 1U);
181  ASSERT_EQ(exec_factory_called, 1U);
182 
183 
184  std::thread th1([&]() {
185  launcher.exec();
186  });
187 
188  volatile bool is_running=launcher.is_running();
189  while(!is_running) {
190  is_running=launcher.is_running();
191  }
192 
193  // Sending data to the stream
194  for (std::uint32_t ii = 0 ; ii < launcher.streams().size(); ++ii)
195  {
196  *(launcher.streams()[ii]) << *test_data;
197  }
198 
199  ASSERT_NE(nullptr, test_pipeline);
200  try
201  {
202  volatile unsigned op_called;
203  do {
204  op_called=test_pipeline->operator_called();
205  } while(op_called == 0);
206  EXPECT_EQ(test_pipeline->operator_called(), 1U);
207  launcher.join();
208  }
209  catch(...)
210  {
211  PANDA_LOG_DEBUG <<"exception caught";
212  launcher.join();
213  th1.join();
214  throw;
215  }
216  th1.join();
217 }
218 
219 TEST_F(BeamLauncherTest, test_two_beam_active)
220 {
221  BeamConfig<uint8_t> beam1;
222  beam1.active(true);
223  BeamConfig<uint8_t> beam2;
224  beam2.active(true);
225  MultiBeamConfig<uint8_t> mb_config;
226  mb_config.add(beam1);
227  mb_config.add(beam2);
228  TestConfigModule tcm;
229 
231 
232  unsigned exec_factory_called = 0;
233  TestPipeline* test_pipeline=nullptr;
234  auto factory = [&](BeamConfig<uint8_t> const& beam_config) { ++exec_factory_called; test_pipeline = new TestPipeline(beam_config); return test_pipeline; };
235 
236  unsigned config_factory_called = 0;
237  BeamLauncher<TestStream, uint8_t> launcher(mb_config, [&](BeamConfig<uint8_t> const&) -> TestConfigModule const& { ++config_factory_called; return tcm; }, factory );
238  ASSERT_EQ(config_factory_called, 2U);
239  ASSERT_EQ(exec_factory_called, 2U);
240 
241  // Sending data to the stream
242  for (std::uint32_t ii =0 ; ii < launcher.streams().size(); ++ii)
243  {
244  *(launcher.streams()[ii]) << test_data;
245  }
246 
247  std::thread th1([&]() { launcher.exec(); });
248  volatile bool is_running=launcher.is_running();
249  while(!is_running) {
250  is_running=launcher.is_running();
251  }
252 
253  try
254  {
255  volatile unsigned op_called=test_pipeline->operator_called();
256  while(op_called == 0)
257  {
258  op_called=test_pipeline->operator_called();
259  }
260  ASSERT_EQ(test_pipeline->operator_called(), 1U);
261  //launcher.join();
262  }
263  catch(...)
264  {
265  launcher.join();
266  th1.join();
267  throw;
268  }
269  launcher.join();
270  th1.join();
271 }
272 
273 TEST_F(BeamLauncherTest, test_one_beam_active_one_inactive)
274 {
275  BeamConfig<uint8_t> beam1;
276  beam1.active(true);
277  BeamConfig<uint8_t> beam2;
278  beam2.active(false);
279  MultiBeamConfig<uint8_t> mb_config;
280  mb_config.add(beam1);
281  mb_config.add(beam2);
282  TestConfigModule tcm;
284 
285  unsigned exec_factory_called = 0;
286  TestPipeline* test_pipeline=nullptr;
287  auto factory = [&](BeamConfig<uint8_t> const& beam_config) { ++exec_factory_called; test_pipeline = new TestPipeline(beam_config); return test_pipeline; };
288 
289  unsigned config_factory_called = 0;
290  BeamLauncher<TestStream, uint8_t> launcher(mb_config, [&](BeamConfig<uint8_t> const&) -> TestConfigModule const& { ++config_factory_called; return tcm; }, factory );
291  ASSERT_EQ(config_factory_called, 1U);
292  ASSERT_EQ(exec_factory_called, 1U);
293 
294 // Sending data to the stream
295  for (std::uint32_t ii =0 ; ii < launcher.streams().size(); ++ii)
296  {
297  *(launcher.streams()[ii]) << test_data;
298  }
299 
300  std::thread th1([&]() { launcher.exec(); });
301  volatile bool is_running=launcher.is_running();
302  while(!is_running) {
303  is_running=launcher.is_running();
304  }
305 
306  try
307  {
308  volatile unsigned op_called;
309  do {
310  op_called=test_pipeline->operator_called();
311  } while(op_called == 0);
312  ASSERT_EQ(test_pipeline->operator_called(), 1U);
313  launcher.join();
314  }
315  catch(...)
316  {
317  launcher.join();
318  th1.join();
319  throw;
320  }
321  th1.join();
322 }
323 
324 TEST_F(BeamLauncherTest, test_single_beam_pipeline_exception)
325 {
326  BeamConfig<uint8_t> beam1;
327  beam1.active(true);
328  MultiBeamConfig<uint8_t> mb_config;
329  mb_config.add(beam1);
330  TestConfigModule tcm;
331 
332  std::shared_ptr<data::TimeFrequency<Cpu,uint8_t>> test_data = std::make_shared<data::TimeFrequency<Cpu, uint8_t>>();
333 
334  ExceptionPipeline* exception_pipeline=nullptr; // will throw an exception on first call to operatort()
335  auto factory = [&](BeamConfig<uint8_t> const& beam_config) { exception_pipeline = new ExceptionPipeline(beam_config); return exception_pipeline; };
336 
337  BeamLauncher<TestStream, uint8_t> launcher(mb_config, [&](BeamConfig<uint8_t> const&) -> TestConfigModule const& { return tcm; }, factory );
338 
339  std::thread th1([&]() {
340  launcher.exec();
341  });
342 
343  ASSERT_NE(nullptr, exception_pipeline); // sanity check
344 
345  // Sending data to the stream
346  for (std::uint32_t ii = 0 ; ii < launcher.streams().size(); ++ii)
347  {
348  *(launcher.streams()[ii]) << *test_data;
349  }
350 
351  try
352  {
353  volatile unsigned ex_op;
354  do {
355  ex_op = exception_pipeline->operator_called();
356  } while(ex_op == 0);
357  ASSERT_EQ(exception_pipeline->operator_called(), 1U);
358  ASSERT_TRUE(exception_pipeline->has_thrown());
359  launcher.join();
360  }
361  catch(...)
362  {
363  PANDA_LOG_DEBUG << "unexpected exception thrown - cleaning up test";
364  launcher.join();
365  th1.join();
366  throw;
367  }
368  th1.join();
369 }
370 
371 
372 } // namespace test
373 } // namespace pipeline
374 } // namespace cheetah
375 } // namespace ska
void active(bool status)
set the enabled status of the beam
Definition: BeamConfig.cpp:108
void join()
Wait for threads to finish.
Some limits and constants for FLDO.
Definition: Brdz.h:35
void operator()(TimeFrequencyType &) override
called each time data becomes available
Parse configuration parameters for a single beam in the pipeline instance of cheetah.
Definition: BeamConfig.h:48
Base class for Cheetah Pipeline Handlers.
Definition: Pipeline.h:37
void operator()(TimeFrequencyType &) override
called each time data becomes available