Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
BeamLauncher.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/pipeline/BeamLauncher.h"
25 #include "cheetah/pipeline/Pipeline.h"
26 #include "cheetah/pipeline/PipelineHandlerFactory.h"
27 #include "panda/Thread.h"
28 #include <algorithm>
29 #include <type_traits>
30 
31 namespace ska {
32 namespace cheetah {
33 namespace pipeline {
34 
35 namespace detail {
37  PipelineWrapperBase(std::string const& id);
38  virtual ~PipelineWrapperBase() {}
39  virtual int operator()() = 0;
40  virtual void stop() = 0;
41 
42  std::string const& id() const;
43 
44  private:
45  std::string _id;
46  };
47 
48  PipelineWrapperBase::PipelineWrapperBase(std::string const& id)
49  : _id(id)
50  {
51  }
52 
53  std::string const& PipelineWrapperBase::id() const {
54  return _id;
55  }
56 
57 
58  template<typename P>
59  struct PipelineWrapper final : public PipelineWrapperBase
60  {
61  PipelineWrapper(P&& p, std::string const& id)
63  , _pipeline(std::move(p))
64  , _stop(false)
65  {}
66 
67  int operator()() override {
68  try
69  {
70  std::lock_guard<std::mutex> lock(_mutex);
71  if(! _stop)
72  {
73  return _pipeline->exec();
74  }
75  }
76  catch(std::exception const& e)
77  {
78  PANDA_LOG_ERROR << "exception caught:" << e.what();
79  stop();
80  }
81  catch(...)
82  {
83  PANDA_LOG_DEBUG << "unknown exception caught!";
84  stop();
85  }
86  return 0;
87  }
88 
89  void stop() override {
90  PANDA_LOG << "Stopping Beam: " << id();
91  std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
92  if(lock.try_lock())
93  {
94  // indicate we don't want to start up after all
95  _stop = true;
96  }
97  else {
98  // we will only fail to get the lock if the pipeline is running
99  while(!_pipeline->is_running()) {
100  // we may have to wait for a pipeline to fully start up before we can stop it
101  if(lock.try_lock()) {
102  // the pipeline has been stopped by some other process
103  _stop=false;
104  return;
105  }
106  }
107  _pipeline->stop();
108  lock.lock();
109  _stop=false;
110  }
111  }
112 
113  private:
114  P _pipeline;
115  std::mutex _mutex;
116  bool _stop; // workaround for out of order start/stop calls
117  };
118 
121  virtual ~PipelineHandlerWrapperBase() {}
122  };
123 
124  template<typename P>
126  {
127  PipelineHandlerWrapper(P* pipeline)
128  : _pipeline(pipeline) {}
129 
130  PipelineHandlerWrapper(PipelineHandlerWrapper&& p)
131  : _pipeline(std::move(p._pipeline)) {}
132 
133  P& operator*() { return *_pipeline; }
134 
135  private:
136  std::unique_ptr<P> _pipeline;
137  };
138 } // namespace detail
139 
140 template<typename InputDataStream, typename NumericalT>
141 template<typename ConfigFactory, typename PipelineFactory>
142 BeamLauncher<InputDataStream, NumericalT>::BeamLauncher(MultiBeamConfig<NumericalT> const& multi_beams_config, ConfigFactory const& config_factory, PipelineFactory const& runtime_handler_factory)
143  : _multi_beam_config(multi_beams_config)
144  , _exit(false)
145  , _execution_count(0)
146 {
147  auto it=multi_beams_config.beams();
148  PANDA_LOG << "Creating Beams....";
149  while(it != multi_beams_config.beams_end())
150  {
151  auto const& beam_config = *it;
152  if(beam_config.active())
153  {
154  // create the stream
155  _streams.emplace_back(std::move(std::unique_ptr<InputDataStream>(new InputDataStream(config_factory(beam_config)))));
156 
157  // create the compute section of the pipeline
158  typedef decltype(runtime_handler_factory(beam_config)) RuntimeHandlerPtrType;
159  typedef typename std::remove_pointer<RuntimeHandlerPtrType>::type RuntimeHandlerType;
160  typedef detail::PipelineHandlerWrapper<RuntimeHandlerType> WrapperHandlerType;;
161  _runtime_handlers.emplace_back(new WrapperHandlerType(runtime_handler_factory(beam_config)));
162 
163  // now init a suitable pipeline
164  auto pipeline = create_pipeline<InputDataStream>(*_streams.back(), *static_cast<WrapperHandlerType&>(*_runtime_handlers.back()));
165  _pipelines.emplace_back(new detail::PipelineWrapper<decltype(pipeline)>(std::move(pipeline), beam_config.id()));
166 
167  _thread_config.push_back(&beam_config.thread_config());
168  }
169  ++it;
170  }
171  PANDA_LOG << "Finished creating pipelines";
172  if(_streams.size() == 0 ) {
173  PANDA_LOG_WARN << "No beams have been defined";
174  }
175 }
176 
177 template<typename InputDataStream, typename NumericalT>
179 {
180  join();
181 }
182 
183 template<typename InputDataStream, typename NumericalT>
185 {
186  if(_execution_count != 0) return 0; // don't try to start if its already running
187  std::atomic<std::size_t> execution_start;
188  execution_start = 0;
189  auto beam_thread = [&](detail::PipelineWrapperBase& pipeline, int& rv)
190  {
191  rv = 0;
192 
193  // run the thread
194  ++_execution_count;
195  ++execution_start;
196  if(!_exit)
197  {
198  try
199  {
200  // Increment in execution_count and execution_start has to be in this order to avoid race condition in wait
201  rv = pipeline();
202  }
203  catch(std::exception const& e) {
204  PANDA_LOG_ERROR << "exception thrown:" << e.what();
205  }
206  catch(...) {
207  PANDA_LOG_ERROR << "unknown exception thrown";
208  }
209  }
210  --_execution_count;
211  _wait_cv.notify_one();
212  };
213 
214  if(_streams.size() == 0 ) return 0;
215 
216  std::unique_lock<std::mutex> lock(_mutex);
217  std::vector<int> rv(_streams.size());
218  { // lock context
219  _exit = false;
220  // launch all except one beam in threads
221  for(std::size_t ii = 0; ii < _pipelines.size(); ++ii)
222  {
223  PANDA_LOG << "Starting Beam: " << _pipelines[ii]->id();
224  _threads.emplace_back(new panda::Thread(_thread_config[ii]->affinities(), [&, ii, this]() { beam_thread(*_pipelines[ii], rv[ii]); } ));
225  }
226  }
227  _wait_cv.wait(lock, [&, this]{
228  return (!is_running()) && (execution_start==_pipelines.size());
229  });
230 
231  for( auto & thread : _threads ) {
232  thread->join();
233  }
234  _threads.clear();
235 
236  return std::accumulate(rv.begin(), rv.end(), 0);
237 }
238 
239 template<typename InputDataStream, typename NumericalT>
241 {
242  return _pipelines.size() == _execution_count;
243 }
244 
245 template<typename InputDataStream, typename NumericalT>
247 {
248  { // lock context
249  std::lock_guard<std::mutex> lock(_mutex);
250  if(!_exit) {
251  // stop each pipeline
252  _exit = true;
253  for( auto const& pipeline : _pipelines )
254  {
255  pipeline->stop();
256  }
257 
258  // wait for the threads to finish
259  for( auto & thread : _threads )
260  {
261  thread->join();
262  }
263  _threads.clear();
264  PANDA_LOG << "threads joined";
265  }
266  } // end lock context
267  _wait_cv.notify_one();
268 }
269 
270 template<typename InputDataStream, typename NumericalT>
271 std::vector<std::unique_ptr<InputDataStream>>& BeamLauncher<InputDataStream, NumericalT>::streams()
272 {
273  return _streams;
274 }
275 
276 } // namespace pipeline
277 } // namespace cheetah
278 } // namespace ska
int exec()
launch the beam pipelines
Node to access multiple beam configurations.
void join()
Wait for threads to finish.
std::vector< std::unique_ptr< StreamType > > & streams()
return the vector of unique pointers to all the streams
Some limits and constants for FLDO.
Definition: Brdz.h:35
ConstIterator beams() const
return the iterator over multiple beam subsections
bool is_running() const
return true if all activated beams are running
ConstIterator beams_end() const
return the end iterator