24 #include "cheetah/pipeline/BeamLauncher.h" 25 #include "cheetah/pipeline/Pipeline.h" 26 #include "cheetah/pipeline/PipelineHandlerFactory.h" 27 #include "panda/Thread.h" 29 #include <type_traits> 39 virtual int operator()() = 0;
40 virtual void stop() = 0;
42 std::string
const& id()
const;
48 PipelineWrapperBase::PipelineWrapperBase(std::string
const&
id)
53 std::string
const& PipelineWrapperBase::id()
const {
63 , _pipeline(std::move(p))
67 int operator()()
override {
70 std::lock_guard<std::mutex> lock(_mutex);
73 return _pipeline->exec();
76 catch(std::exception
const& e)
78 PANDA_LOG_ERROR <<
"exception caught:" << e.what();
83 PANDA_LOG_DEBUG <<
"unknown exception caught!";
89 void stop()
override {
90 PANDA_LOG <<
"Stopping Beam: " << id();
91 std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
99 while(!_pipeline->is_running()) {
101 if(lock.try_lock()) {
128 : _pipeline(pipeline) {}
130 PipelineHandlerWrapper(PipelineHandlerWrapper&& p)
131 : _pipeline(std::move(p._pipeline)) {}
133 P& operator*() {
return *_pipeline; }
136 std::unique_ptr<P> _pipeline;
140 template<
typename InputDataStream,
typename NumericalT>
141 template<
typename ConfigFactory,
typename PipelineFactory>
143 : _multi_beam_config(multi_beams_config)
145 , _execution_count(0)
147 auto it=multi_beams_config.
beams();
148 PANDA_LOG <<
"Creating Beams....";
149 while(it != multi_beams_config.
beams_end())
151 auto const& beam_config = *it;
152 if(beam_config.active())
155 _streams.emplace_back(std::move(std::unique_ptr<InputDataStream>(
new InputDataStream(config_factory(beam_config)))));
158 typedef decltype(runtime_handler_factory(beam_config)) RuntimeHandlerPtrType;
159 typedef typename std::remove_pointer<RuntimeHandlerPtrType>::type RuntimeHandlerType;
161 _runtime_handlers.emplace_back(
new WrapperHandlerType(runtime_handler_factory(beam_config)));
164 auto pipeline = create_pipeline<InputDataStream>(*_streams.back(), *
static_cast<WrapperHandlerType&
>(*_runtime_handlers.back()));
165 _pipelines.emplace_back(
new detail::PipelineWrapper<decltype(pipeline)>(std::move(pipeline), beam_config.id()));
167 _thread_config.push_back(&beam_config.thread_config());
171 PANDA_LOG <<
"Finished creating pipelines";
172 if(_streams.size() == 0 ) {
173 PANDA_LOG_WARN <<
"No beams have been defined";
177 template<
typename InputDataStream,
typename NumericalT>
183 template<
typename InputDataStream,
typename NumericalT>
186 if(_execution_count != 0)
return 0;
187 std::atomic<std::size_t> execution_start;
203 catch(std::exception
const& e) {
204 PANDA_LOG_ERROR <<
"exception thrown:" << e.what();
207 PANDA_LOG_ERROR <<
"unknown exception thrown";
211 _wait_cv.notify_one();
214 if(_streams.size() == 0 )
return 0;
216 std::unique_lock<std::mutex> lock(_mutex);
217 std::vector<int> rv(_streams.size());
221 for(std::size_t ii = 0; ii < _pipelines.size(); ++ii)
223 PANDA_LOG <<
"Starting Beam: " << _pipelines[ii]->id();
224 _threads.emplace_back(
new panda::Thread(_thread_config[ii]->affinities(), [&, ii,
this]() { beam_thread(*_pipelines[ii], rv[ii]); } ));
227 _wait_cv.wait(lock, [&,
this]{
228 return (!is_running()) && (execution_start==_pipelines.size());
231 for(
auto & thread : _threads ) {
236 return std::accumulate(rv.begin(), rv.end(), 0);
239 template<
typename InputDataStream,
typename NumericalT>
242 return _pipelines.size() == _execution_count;
245 template<
typename InputDataStream,
typename NumericalT>
249 std::lock_guard<std::mutex> lock(_mutex);
253 for(
auto const& pipeline : _pipelines )
259 for(
auto & thread : _threads )
264 PANDA_LOG <<
"threads joined";
267 _wait_cv.notify_one();
270 template<
typename InputDataStream,
typename NumericalT>
int exec()
launch the beam pipelines
Node to access multiple beam configurations.
void join()
Wait for threads to finish.
std::vector< std::unique_ptr< StreamType > > & streams()
return the vector of unique pointers to all the streams
Some limits and constants for FLDO.
ConstIterator beams() const
return the iterator over multiple beam subsections
bool is_running() const
return true if all activated beams are running
ConstIterator beams_end() const
return the end iterator