25 #include "SpeadLoggingAdapter.h" 26 #include <panda/Log.h> 32 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
33 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::Stream(SpCclSpeadReader& reader, boost::asio::io_service& engine
34 , DataFactory
const& factory
37 : BaseT(engine, spead2::recv::stream_config().set_max_heaps(reader._max_heaps).set_allow_out_of_order(true))
40 , _data_factory(factory)
45 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
46 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::Stream(Stream&& stream)
47 : BaseT(stream._engine, spead2::recv::stream_config().set_max_heaps(stream._reader._max_heaps).set_allow_out_of_order(true))
48 , _engine(stream._engine)
49 , _reader(stream._reader)
50 , _data_factory(
std::move(stream._data_factory))
51 , _callback(
std::move(stream._callback))
55 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
56 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::~Stream()
59 std::thread th([&,
this]() { this->stop(); stopped=
true; });
66 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
67 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::heap_ready(spead2::recv::live_heap&& heap)
69 if (heap.is_complete())
71 spead2::recv::heap hp(std::move(heap));
72 if(hp.get_descriptors().size() == hp.get_items().size())
return;
73 std::shared_ptr<SpCclType> data(_data_factory());
74 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::fill(*data, hp);
78 PANDA_LOG_WARN <<
"discarding incomplete/currupted SpCcl data heap";
82 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
83 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::stop_received()
85 BaseT::stop_received();
86 get_io_service().post([
this]() {
87 _reader.stream_reset();
91 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
92 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::Stream::process()
97 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
98 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::SpCclSpeadReader(SpCclSpeadReaderConfig
const& config, Callback callback, DataFactory
const& factory)
99 : SpCclSpeadStreamTraits()
101 , _max_heaps(spead2::recv::stream_config::default_max_heaps * 4)
102 , _spead_stream(new Stream(*this, static_cast<
boost::asio::io_service&>(config.engine()), factory, callback))
105 _spead_stream_endpoint = config.endpoint();
108 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
109 SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::~SpCclSpeadReader()
111 std::lock_guard<std::mutex> lk(_stop_mutex);
116 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
117 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::stream_reset()
119 std::unique_lock<std::mutex> lk(_stop_mutex, std::defer_lock_t());
120 if(lk.try_lock() && !_destructor) {
121 Stream* stream_ptr =
nullptr;
123 stream_ptr =
new Stream(std::move(*_spead_stream));
124 _spead_stream.reset(stream_ptr);
125 visit_stop_listeners();
126 PANDA_LOG <<
"Received end of SpCclSpead Stream - resetting";
135 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
136 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::do_start()
138 _spead_stream->template emplace_reader<spead2::recv::udp_reader>( _spead_stream_endpoint
139 , spead2::recv::udp_reader::default_max_size
142 visit_start_listeners();
145 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
146 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::start()
148 std::lock_guard<std::mutex> lk(_stop_mutex);
152 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
153 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::add_stop_callback(std::function<
void()>
const& fn)
155 _stop_listeners.push_back(fn);
158 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
159 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::add_start_callback(std::function<
void()>
const& fn)
161 _start_listeners.push_back(fn);
164 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
165 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::visit_stop_listeners()
167 for(
auto const& fn: _stop_listeners) {
172 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
173 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::visit_start_listeners()
175 for(
auto const& fn: _start_listeners) {
180 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
181 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::stop()
183 if(_spead_stream) _spead_stream->stop();
184 visit_stop_listeners();
187 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
188 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::process()
190 _spead_stream->process();
193 template<
typename TimeFrequencyT,
typename Callback,
typename DataFactory>
194 void SpCclSpeadReader<TimeFrequencyT, Callback, DataFactory>::fill(SpCclType& data, spead2::recv::heap
const& heap)
196 std::vector<spead2::recv::item>
const& items = heap.get_items();
197 if(items.size() == 0 )
return;
203 data::DimensionSize<data::Frequency> number_of_channels(0);
205 std::vector<spead2::recv::item const*> tf_items;
206 std::vector<utils::ModifiedJulianClock::time_point> tf_start_time;
207 std::vector<spead2::recv::item const*> ft_items;
208 std::vector<utils::ModifiedJulianClock::time_point> ft_start_time;
210 using pss::astrotypes::units::julian_day;
211 typedef typename utils::ModifiedJulianClock::time_point time_point;
212 typedef typename SpCclType::CandidateType::Dm Dm;
213 typedef typename SpCclType::CandidateType::Width Width;
214 typedef typename SpCclType::CandidateType::Sigma SigmaType;
215 typedef typename SpCclType::CandidateType::MsecTimeType DurationType;
216 std::vector<time_point> candidate_start_time;
217 std::vector<Dm> candidate_dm;
218 std::vector<Width> candidate_width;
219 std::vector<SigmaType> candidate_sigma;
220 std::vector<DurationType> candidate_duration;
222 for (
auto const& item : items)
226 case spead2::DESCRIPTOR_ID:
230 tf_items.push_back(&item);
235 ft_items.push_back(&item);
238 case data_channel_1_id:
239 fch1 = *(
reinterpret_cast<double*
>(item.ptr)) * boost::units::si::mega * boost::units::si::hertz;
241 case data_channel_width_id:
242 foff = *(
reinterpret_cast<double*
>(item.ptr)) * boost::units::si::mega * boost::units::si::hertz;
244 case data_sample_interval_id:
245 tsamp = *(
reinterpret_cast<typename TimeType::value_type*
>(item.ptr)) * boost::units::si::second;
247 case data_number_of_channels_id:
248 number_of_channels = data::DimensionSize<data::Frequency>(*(
reinterpret_cast<std::size_t*
>(item.ptr)));
250 case tf_data_start_time_id:
251 tf_start_time.push_back(time_point(julian_day(*reinterpret_cast<double*>(item.ptr))));
253 case ft_data_start_time_id:
254 ft_start_time.push_back(time_point(julian_day(*reinterpret_cast<double*>(item.ptr))));
256 case candidate_start_time_id:
257 candidate_start_time.emplace_back(time_point(julian_day(*reinterpret_cast<double*>(item.ptr))));
259 case candidate_dm_id:
260 candidate_dm.emplace_back(*reinterpret_cast<typename Dm::value_type*>(item.ptr) * data::parsecs_per_cube_cm);
262 case candidate_width_id:
263 candidate_width.emplace_back(*reinterpret_cast<typename Width::value_type*>(item.ptr) * boost::units::si::milli * boost::units::si::second );
265 case candidate_sigma_id:
266 candidate_sigma.emplace_back(*reinterpret_cast<SigmaType*>(item.ptr));
268 case candidate_duration_id:
269 candidate_duration.emplace_back(*reinterpret_cast<typename DurationType::value_type*>(item.ptr) * boost::units::si::milli * boost::units::si::second);
273 PANDA_LOG_WARN <<
"unknown id in spead packet detectedi: " << item.id;
278 if(tf_items.size() != tf_start_time.size()) {
279 PANDA_LOG_ERROR <<
"TimeFrequency data corrupted" << tf_items.size() <<
" vs " << tf_start_time.size();
282 data::DimensionSize<data::Time> number_of_spectra(0);
283 const std::size_t sample_size=
sizeof(NumericalRep);
284 for( std::size_t i=0; i < tf_items.size(); ++i) {
285 auto const& item = *tf_items[i];
286 number_of_spectra += data::DimensionSize<data::Time>(item.length/sample_size/(std::size_t)number_of_channels);
288 std::shared_ptr<TimeFrequencyType> tf_block = std::make_shared<TimeFrequencyType>(number_of_spectra, number_of_channels);
289 TimeFrequencyType& dat = *tf_block;
290 dat.start_time(tf_start_time[0]);
291 dat.sample_interval(tsamp);
292 dat.set_channel_frequencies_const_width(fch1, foff);
293 auto dat_it=dat.begin();
296 for( std::size_t i=0; i < tf_items.size(); ++i) {
297 auto const& item = *tf_items[i];
298 NumericalRep* start=
reinterpret_cast<NumericalRep*
>(item.ptr);
299 std::size_t length = item.length/
sizeof(NumericalRep);
300 std::copy(start, start + length, dat_it);
305 if(ft_items.size() !=0) {
306 PANDA_LOG_WARN <<
"FrequencyTime data not yet supported";
323 data::SpCandidateData<TimeFrequencyType> dt(tf_block);
324 std::size_t total_candidates=candidate_dm.size();
327 if( total_candidates != candidate_width.size() ||
328 total_candidates != candidate_start_time.size() ||
329 total_candidates != candidate_sigma.size() ||
330 total_candidates != candidate_duration.size() )
332 PANDA_LOG_ERROR <<
"candidate data corrupted";
336 for(std::size_t i=0; i< total_candidates; ++i)
338 typedef typename std::remove_reference<decltype(data)>::type::CandidateType SpCandidateType;
339 dt.add(SpCandidateType(candidate_dm[i]
340 , candidate_start_time[i]
342 , candidate_duration[i]
348 data = std::move(dt);
Some limits and constants for FLDO.