24 #include "cheetah/exporters/SpCclSpeadStreamer.h" 25 #include "panda/Log.h" 26 #include <spead2/send_udp.h> 35 template<
typename T,
typename Enabled =
void>
40 struct TypeHelper<T, typename
std::enable_if<std::is_integral<T>::value>::type>
42 static std::pair<char, std::size_t> format() {
return std::make_pair(
'i',
sizeof(T)*8); }
43 static std::string numpy() {
return "i" + std::to_string(
sizeof(T)); }
47 struct TypeHelper<T, typename
std::enable_if<std::is_floating_point<T>::value>::type>
49 static std::pair<char, std::size_t> format() {
return std::make_pair(
'f',
sizeof(T)*8); }
54 KeepAlive(spead2::flavour& fl, T
const& cand_list)
56 , _data_ptr(cand_list.shared_from_this())
61 spead2::send::heap heap;
62 std::vector<double> tstart_memory;
63 std::size_t number_of_channels;
64 boost::units::quantity<boost::units::si::time, double> sample_interval;
67 std::shared_ptr<const T> _data_ptr;
72 template<
typename NumericalRep>
73 SpCclSpeadStreamer& SpCclSpeadStreamer::operator<<(data::SpCcl<NumericalRep>
const& cand_list)
75 typedef typename data::SpCcl<NumericalRep>::SpCandidateType CandidateType;
77 if(cand_list.empty())
return *
this;
79 data::CandidateWindowConfig
const& window_config = _config.window_config();
80 data::CandidateWindow
const& window = window_config.window();
81 typename data::SpCcl<NumericalRep>::ConstDataIterator data_it = cand_list.data_begin(window);
82 typename data::SpCcl<NumericalRep>::ConstDataIterator data_end = cand_list.data_end();
85 _tf_data_desc.format.clear();
86 _tf_data_desc.format.push_back(TypeHelper<NumericalRep>::format());
88 while(data_it != data_end)
92 typedef KeepAlive<data::SpCcl<NumericalRep>> KeepAliveType;
93 std::shared_ptr<KeepAliveType> keep_alive = std::make_shared<KeepAliveType>(_sp_flavour, cand_list);
95 spead2::send::heap& heap = keep_alive->heap;
96 std::vector<double>& tstart_memory = keep_alive->tstart_memory;
98 heap.add_descriptor(_tf_data_desc);
99 heap.add_descriptor(_data_start_time_desc);
100 heap.add_descriptor(_data_number_of_channels_desc);
101 heap.add_descriptor(_data_channel_1);
102 heap.add_descriptor(_data_channel_width);
103 heap.add_descriptor(_data_sammple_interval);
104 heap.add_descriptor(_candidate_start_time_desc);
105 heap.add_descriptor(_candidate_dm_desc);
106 heap.add_descriptor(_candidate_width_desc);
107 heap.add_descriptor(_candidate_sigma_desc);
108 heap.add_descriptor(_candidate_duration_desc);
111 auto tf_start_it = cand_list.tf_blocks().begin();
112 keep_alive->number_of_channels = (*tf_start_it)->number_of_channels();
113 keep_alive->sample_interval = (*tf_start_it)->sample_interval();
116 auto cand_block_it = data_it.candidate_begin();
117 auto const& cand_block_end = data_it.candidate_end();
118 std::size_t tstart_size=std::distance(cand_block_it, cand_block_end) + cand_list.tf_blocks().size();
119 tstart_memory.reserve(tstart_size);
120 while(cand_block_it != cand_block_end) {
121 auto const& candidate = &*cand_block_it;
122 tstart_memory.emplace_back(cand_list.start_time(*candidate).time_since_epoch().count());
123 auto const& start_mjd=tstart_memory.back();
124 heap.add_item(candidate_start_time_id, (
void*)&(start_mjd),
sizeof(start_mjd),
false);
125 heap.add_item(candidate_dm_id, (
void*)&candidate->dm().value(),
sizeof(candidate->dm().value()),
false);
126 heap.add_item(candidate_width_id, (
void*)&candidate->width(),
sizeof(candidate->width()),
false);
127 heap.add_item(candidate_sigma_id, (
void*)&candidate->sigma(),
sizeof(
typename CandidateType::SigmaType),
false);
128 heap.add_item(candidate_duration_id, (
void*)&candidate->tend(),
sizeof(candidate->tend()),
false);
135 heap.add_item(data_number_of_channels_id, &keep_alive->number_of_channels,
sizeof(std::size_t),
false);
136 heap.add_item(data_sample_interval_id, &reinterpret_cast<double const&>(keep_alive->sample_interval),
sizeof(
double),
false);
139 tstart_memory.emplace_back(data_it.start_time().time_since_epoch().count());
140 auto const& start_time=tstart_memory.back();
141 heap.add_item(tf_data_start_time_id, (
void*)&start_time,
sizeof(decltype(start_time)),
false);
142 heap.add_item(tf_data_id, (
void*)&*(*data_it).begin(), (*data_it).data_size()*
sizeof(NumericalRep),
false);
144 end_block = data_it.end_block();
148 assert(tstart_memory.size() <= tstart_size);
151 _sp_stream.async_send_heap(heap, [ keep_alive ] (
const boost::system::error_code &ec, spead2::item_pointer_t)
154 PANDA_LOG_ERROR <<
"spead2 output stream failure: " << ec.message();
Some limits and constants for FLDO.