Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SpCclSpeadReader.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 
25 #include "SpeadLoggingAdapter.h"
26 #include <panda/Log.h>
27 
28 namespace ska {
29 namespace cheetah {
30 namespace exporters {
31 
32 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
33 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::Stream(SpCclSpeadReader& reader, boost::asio::io_service& engine
34  , DataFactory const& factory
35  , Callback callback)
36 
37  : BaseT(engine, spead2::recv::stream_config().set_max_heaps(reader._max_heaps).set_allow_out_of_order(true))
38  , _engine(engine)
39  , _reader(reader)
40  , _data_factory(factory)
41  , _callback(callback)
42 {
43 }
44 
45 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
46 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::Stream(Stream&& stream)
47  : BaseT(stream._engine, spead2::recv::stream_config().set_max_heaps(stream._reader._max_heaps).set_allow_out_of_order(true))
48  , _engine(stream._engine)
49  , _reader(stream._reader)
50  , _data_factory(std::move(stream._data_factory))
51  , _callback(std::move(stream._callback))
52 {
53 }
54 
55 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
56 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::~Stream()
57 {
58  bool stopped = false;
59  std::thread th([&, this]() { this->stop(); stopped=true; });
60  while(!stopped) {
61  _engine.poll_one();
62  }
63  th.join();
64 }
65 
66 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
67 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::heap_ready(spead2::recv::live_heap&& heap)
68 {
69  if (heap.is_complete())
70  {
71  spead2::recv::heap hp(std::move(heap));
72  if(hp.get_descriptors().size() == hp.get_items().size()) return;
73  std::shared_ptr<SpCclType> data(_data_factory());
74  SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::fill(*data, hp); // heap is destroyed in the recv::heap constructor
75  _callback(data);
76  }
77  else {
78  PANDA_LOG_WARN << "discarding incomplete/currupted SpCcl data heap";
79  }
80 }
81 
82 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
83 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::stop_received()
84 {
85  BaseT::stop_received();
86  get_io_service().post([this]() {
87  _reader.stream_reset();
88  });
89 }
90 
91 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
92 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::process()
93 {
94  _engine.poll_one();
95 }
96 
97 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
98 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::SpCclSpeadReader(SpCclSpeadReaderConfig const& config, Callback callback, DataFactory const& factory)
99  : SpCclSpeadStreamTraits()
100  , _destructor(false)
101  , _max_heaps(spead2::recv::stream_config::default_max_heaps * 4)
102  , _spead_stream(new Stream(*this, static_cast<boost::asio::io_service&>(config.engine()), factory, callback))
103 {
104  // Defined endpoint here and moved emplace reader to start.
105  _spead_stream_endpoint = config.endpoint();
106 }
107 
108 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
109 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::~SpCclSpeadReader()
110 {
111  std::lock_guard<std::mutex> lk(_stop_mutex);
112  _destructor = true;
113  stop();
114 }
115 
116 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
117 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::stream_reset()
118 {
119  std::unique_lock<std::mutex> lk(_stop_mutex, std::defer_lock_t());
120  if(lk.try_lock() && !_destructor) {
121  Stream* stream_ptr = nullptr;
122  try {
123  stream_ptr = new Stream(std::move(*_spead_stream));
124  _spead_stream.reset(stream_ptr);
125  visit_stop_listeners();
126  PANDA_LOG << "Received end of SpCclSpead Stream - resetting";
127  } catch(...) {
128  delete stream_ptr;
129  throw;
130  }
131  do_start();
132  }
133 }
134 
135 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
136 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::do_start()
137 {
138  _spead_stream->template emplace_reader<spead2::recv::udp_reader>( _spead_stream_endpoint
139  , spead2::recv::udp_reader::default_max_size
140  //, 1024 * 1024 // socket buffer size
141  );
142  visit_start_listeners();
143 }
144 
145 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
146 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::start()
147 {
148  std::lock_guard<std::mutex> lk(_stop_mutex);
149  do_start();
150 }
151 
152 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
153 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::add_stop_callback(std::function<void()> const& fn)
154 {
155  _stop_listeners.push_back(fn);
156 }
157 
158 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
159 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::add_start_callback(std::function<void()> const& fn)
160 {
161  _start_listeners.push_back(fn);
162 }
163 
164 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
165 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::visit_stop_listeners()
166 {
167  for(auto const& fn: _stop_listeners) {
168  fn();
169  }
170 }
171 
172 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
173 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::visit_start_listeners()
174 {
175  for(auto const& fn: _start_listeners) {
176  fn();
177  }
178 }
179 
180 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
181 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::stop()
182 {
183  if(_spead_stream) _spead_stream->stop();
184  visit_stop_listeners();
185 }
186 
187 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
188 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::process()
189 {
190  _spead_stream->process();
191 }
192 
193 template<typename TimeFrequencyT, typename Callback, typename DataFactory>
194 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::fill(SpCclType& data, spead2::recv::heap const& heap)
195 {
196  std::vector<spead2::recv::item> const& items = heap.get_items();
197  if(items.size() == 0 ) return;
198 
199  //std::vector<spead2::descriptor> const& descriptors = heap.get_descriptors();
200  FrequencyType fch1;
201  FrequencyType foff;
202  TimeType tsamp;
203  data::DimensionSize<data::Frequency> number_of_channels(0);
204 
205  std::vector<spead2::recv::item const*> tf_items;
206  std::vector<utils::ModifiedJulianClock::time_point> tf_start_time;
207  std::vector<spead2::recv::item const*> ft_items;
208  std::vector<utils::ModifiedJulianClock::time_point> ft_start_time;
209 
210  using pss::astrotypes::units::julian_day;
211  typedef typename utils::ModifiedJulianClock::time_point time_point;
212  typedef typename SpCclType::CandidateType::Dm Dm;
213  typedef typename SpCclType::CandidateType::Width Width;
214  typedef typename SpCclType::CandidateType::Sigma SigmaType;
215  typedef typename SpCclType::CandidateType::MsecTimeType DurationType;
216  std::vector<time_point> candidate_start_time;
217  std::vector<Dm> candidate_dm;
218  std::vector<Width> candidate_width;
219  std::vector<SigmaType> candidate_sigma;
220  std::vector<DurationType> candidate_duration;
221 
222  for (auto const& item : items)
223  {
224  switch(item.id)
225  {
226  case spead2::DESCRIPTOR_ID:
227  break;
228  case tf_data_id:
229  {
230  tf_items.push_back(&item);
231  break;
232  }
233  case ft_data_id:
234  {
235  ft_items.push_back(&item);
236  break;
237  }
238  case data_channel_1_id:
239  fch1 = *(reinterpret_cast<double*>(item.ptr)) * boost::units::si::mega * boost::units::si::hertz;
240  break;
241  case data_channel_width_id:
242  foff = *(reinterpret_cast<double*>(item.ptr)) * boost::units::si::mega * boost::units::si::hertz;
243  break;
244  case data_sample_interval_id:
245  tsamp = *(reinterpret_cast<typename TimeType::value_type*>(item.ptr)) * boost::units::si::second;
246  break;
247  case data_number_of_channels_id:
248  number_of_channels = data::DimensionSize<data::Frequency>(*(reinterpret_cast<std::size_t*>(item.ptr)));
249  break;
250  case tf_data_start_time_id:
251  tf_start_time.push_back(time_point(julian_day(*reinterpret_cast<double*>(item.ptr))));
252  break;
253  case ft_data_start_time_id:
254  ft_start_time.push_back(time_point(julian_day(*reinterpret_cast<double*>(item.ptr))));
255  break;
256  case candidate_start_time_id:
257  candidate_start_time.emplace_back(time_point(julian_day(*reinterpret_cast<double*>(item.ptr))));
258  break;
259  case candidate_dm_id:
260  candidate_dm.emplace_back(*reinterpret_cast<typename Dm::value_type*>(item.ptr) * data::parsecs_per_cube_cm);
261  break;
262  case candidate_width_id:
263  candidate_width.emplace_back(*reinterpret_cast<typename Width::value_type*>(item.ptr) * boost::units::si::milli * boost::units::si::second );
264  break;
265  case candidate_sigma_id:
266  candidate_sigma.emplace_back(*reinterpret_cast<SigmaType*>(item.ptr));
267  break;
268  case candidate_duration_id:
269  candidate_duration.emplace_back(*reinterpret_cast<typename DurationType::value_type*>(item.ptr) * boost::units::si::milli * boost::units::si::second);
270  break;
271 
272  default:
273  PANDA_LOG_WARN << "unknown id in spead packet detectedi: " << item.id;
274  }
275  }
276 
277  // load in tf data
278  if(tf_items.size() != tf_start_time.size()) {
279  PANDA_LOG_ERROR << "TimeFrequency data corrupted" << tf_items.size() << " vs " << tf_start_time.size();
280  }
281 
282  data::DimensionSize<data::Time> number_of_spectra(0);
283  const std::size_t sample_size=sizeof(NumericalRep);
284  for( std::size_t i=0; i < tf_items.size(); ++i) {
285  auto const& item = *tf_items[i];
286  number_of_spectra += data::DimensionSize<data::Time>(item.length/sample_size/(std::size_t)number_of_channels);
287  }
288  std::shared_ptr<TimeFrequencyType> tf_block = std::make_shared<TimeFrequencyType>(number_of_spectra, number_of_channels);
289  TimeFrequencyType& dat = *tf_block;
290  dat.start_time(tf_start_time[0]);
291  dat.sample_interval(tsamp);
292  dat.set_channel_frequencies_const_width(fch1, foff);
293  auto dat_it=dat.begin();
294 
295  // copy data into tf_block
296  for( std::size_t i=0; i < tf_items.size(); ++i) {
297  auto const& item = *tf_items[i];
298  NumericalRep* start=reinterpret_cast<NumericalRep*>(item.ptr);
299  std::size_t length = item.length/sizeof(NumericalRep);
300  std::copy(start, start + length, dat_it);
301  dat_it += length;
302  }
303 
304  // load in ft data
305  if(ft_items.size() !=0) {
306  PANDA_LOG_WARN << "FrequencyTime data not yet supported";
307  /*
308  if(ft_items.size() != ft_start_time.size()) {
309  PANDA_LOG_ERROR << "FrequencyTime data corrupted";
310  }
311  for( std::size_t i=0; i < ft_items.size(); ++i) {
312  auto const& item = ft_items[i];
313  data::DimensionSize<data::Frequency> number_of_channels = item.length/sample_size/(std::size_t)number_of_spectra;
314  ft_blocks.emplace_back(new FrequencyTimeType(number_of_channels, number_of_spectra));
315  FrequencyTimeType& dat = *ft_blocks.back();
316  dat.start_time(ft_start_time[i]);
317  }
318  // convert to tf blocks and add to tf_blocks (sorted by timestamp)
319  */
320  }
321 
322  // load in candidates
323  data::SpCandidateData<TimeFrequencyType> dt(tf_block);
324  std::size_t total_candidates=candidate_dm.size();
325 
326  // sanity check
327  if( total_candidates != candidate_width.size() ||
328  total_candidates != candidate_start_time.size() ||
329  total_candidates != candidate_sigma.size() ||
330  total_candidates != candidate_duration.size() )
331  {
332  PANDA_LOG_ERROR << "candidate data corrupted";
333  return;
334  }
335 
336  for(std::size_t i=0; i< total_candidates; ++i)
337  {
338  typedef typename std::remove_reference<decltype(data)>::type::CandidateType SpCandidateType;
339  dt.add(SpCandidateType(candidate_dm[i]
340  , candidate_start_time[i]
341  , candidate_width[i]
342  , candidate_duration[i]
343  , candidate_sigma[i]
344  ));
345  }
346 
347  // transfer the data to the passed object
348  data = std::move(dt);
349 }
350 
351 } // namespace exporters
352 } // namespace cheetah
353 } // namespace ska
Definition: Units.h:112
Some limits and constants for FLDO.
Definition: Brdz.h:35