Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
Public Member Functions | List of all members
ska::cheetah::pipeline::BeamLauncher< StreamType, NumericalT > Class Template Reference
Collaboration diagram for ska::cheetah::pipeline::BeamLauncher< StreamType, NumericalT >:
Collaboration graph

Public Member Functions

template<typename StreamConfigFactory , typename PipelineFactory >
 BeamLauncher (MultiBeamConfig< NumericalT > const &mb_config, StreamConfigFactory const &config_factory, PipelineFactory const &pipeline_factory)
 
int exec ()
 launch the beam pipelines More...
 
void join ()
 Wait for threads to finish.
 
bool is_running () const
 return true if all activated beams are running
 
std::vector< std::unique_ptr< StreamType > > & streams ()
 return the vector of unique pointers to all the streams
 
template<typename ConfigFactory , typename PipelineFactory >
 BeamLauncher (MultiBeamConfig< NumericalT > const &multi_beams_config, ConfigFactory const &config_factory, PipelineFactory const &runtime_handler_factory)
 

Detailed Description

template<typename StreamType, typename NumericalT>
class ska::cheetah::pipeline::BeamLauncher< StreamType, NumericalT >

Definition at line 51 of file BeamLauncher.h.

Member Function Documentation

◆ exec()

template<typename InputDataStream , typename NumericalT >
int ska::cheetah::pipeline::BeamLauncher< InputDataStream, NumericalT >::exec ( )

launch the beam pipelines

Parameters
PipelineFactorya functor that will return a suitable runtime handler of the incoming data this function will block until join() is called, or the pipelines terminate in some other way

Definition at line 184 of file BeamLauncher.cpp.

185 {
186  if(_execution_count != 0) return 0; // don't try to start if its already running
187  std::atomic<std::size_t> execution_start;
188  execution_start = 0;
189  auto beam_thread = [&](detail::PipelineWrapperBase& pipeline, int& rv)
190  {
191  rv = 0;
192 
193  // run the thread
194  ++_execution_count;
195  ++execution_start;
196  if(!_exit)
197  {
198  try
199  {
200  // Increment in execution_count and execution_start has to be in this order to avoid race condition in wait
201  rv = pipeline();
202  }
203  catch(std::exception const& e) {
204  PANDA_LOG_ERROR << "exception thrown:" << e.what();
205  }
206  catch(...) {
207  PANDA_LOG_ERROR << "unknown exception thrown";
208  }
209  }
210  --_execution_count;
211  _wait_cv.notify_one();
212  };
213 
214  if(_streams.size() == 0 ) return 0;
215 
216  std::unique_lock<std::mutex> lock(_mutex);
217  std::vector<int> rv(_streams.size());
218  { // lock context
219  _exit = false;
220  // launch all except one beam in threads
221  for(std::size_t ii = 0; ii < _pipelines.size(); ++ii)
222  {
223  PANDA_LOG << "Starting Beam: " << _pipelines[ii]->id();
224  _threads.emplace_back(new panda::Thread(_thread_config[ii]->affinities(), [&, ii, this]() { beam_thread(*_pipelines[ii], rv[ii]); } ));
225  }
226  }
227  _wait_cv.wait(lock, [&, this]{
228  return (!is_running()) && (execution_start==_pipelines.size());
229  });
230 
231  for( auto & thread : _threads ) {
232  thread->join();
233  }
234  _threads.clear();
235 
236  return std::accumulate(rv.begin(), rv.end(), 0);
237 }
bool is_running() const
return true if all activated beams are running

The documentation for this class was generated from the following files: