Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SpCclSpeadStreamer.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/exporters/SpCclSpeadStreamer.h"
25 #include "panda/Log.h"
26 #include <spead2/send_udp.h>
27 
28 
29 namespace ska {
30 namespace cheetah {
31 namespace exporters {
32 
33 namespace {
34 
35 template<typename T, typename Enabled = void>
36 struct TypeHelper
37 {};
38 
39 template<typename T>
40 struct TypeHelper<T, typename std::enable_if<std::is_integral<T>::value>::type>
41 {
42  static std::pair<char, std::size_t> format() { return std::make_pair('i', sizeof(T)*8); }
43  static std::string numpy() { return "i" + std::to_string(sizeof(T)); }
44 };
45 
46 template<typename T>
47 struct TypeHelper<T, typename std::enable_if<std::is_floating_point<T>::value>::type>
48 {
49  static std::pair<char, std::size_t> format() { return std::make_pair('f', sizeof(T)*8); }
50 };
51 
52 template<typename T>
53 struct KeepAlive {
54  KeepAlive(spead2::flavour& fl, T const& cand_list)
55  : heap(fl)
56  , _data_ptr(cand_list.shared_from_this())
57  {
58  }
59 
60  public:
61  spead2::send::heap heap;
62  std::vector<double> tstart_memory;
63  std::size_t number_of_channels;
64  boost::units::quantity<boost::units::si::time, double> sample_interval;
65 
66  private:
67  std::shared_ptr<const T> _data_ptr;
68 };
69 
70 } // namespace
71 
72 template<typename NumericalRep>
73 SpCclSpeadStreamer& SpCclSpeadStreamer::operator<<(data::SpCcl<NumericalRep> const& cand_list)
74 {
75  typedef typename data::SpCcl<NumericalRep>::SpCandidateType CandidateType;
76 
77  if(cand_list.empty()) return *this;
78 
79  data::CandidateWindowConfig const& window_config = _config.window_config();
80  data::CandidateWindow const& window = window_config.window();
81  typename data::SpCcl<NumericalRep>::ConstDataIterator data_it = cand_list.data_begin(window);
82  typename data::SpCcl<NumericalRep>::ConstDataIterator data_end = cand_list.data_end();
83 
84  // descriptors
85  _tf_data_desc.format.clear();
86  _tf_data_desc.format.push_back(TypeHelper<NumericalRep>::format());
87 
88  while(data_it != data_end)
89  {
90  // Any local variables here will not be in scope when the async call to write a heap
91  // is made. So any variables we want to write out we store in the KeepAlive block
92  typedef KeepAlive<data::SpCcl<NumericalRep>> KeepAliveType;
93  std::shared_ptr<KeepAliveType> keep_alive = std::make_shared<KeepAliveType>(_sp_flavour, cand_list);
94 
95  spead2::send::heap& heap = keep_alive->heap;
96  std::vector<double>& tstart_memory = keep_alive->tstart_memory;
97 
98  heap.add_descriptor(_tf_data_desc);
99  heap.add_descriptor(_data_start_time_desc);
100  heap.add_descriptor(_data_number_of_channels_desc);
101  heap.add_descriptor(_data_channel_1);
102  heap.add_descriptor(_data_channel_width);
103  heap.add_descriptor(_data_sammple_interval);
104  heap.add_descriptor(_candidate_start_time_desc);
105  heap.add_descriptor(_candidate_dm_desc);
106  heap.add_descriptor(_candidate_width_desc);
107  heap.add_descriptor(_candidate_sigma_desc);
108  heap.add_descriptor(_candidate_duration_desc);
109 
110  // tf meta data
111  auto tf_start_it = cand_list.tf_blocks().begin();
112  keep_alive->number_of_channels = (*tf_start_it)->number_of_channels();
113  keep_alive->sample_interval = (*tf_start_it)->sample_interval();
114 
115  // grouped candidates
116  auto cand_block_it = data_it.candidate_begin();
117  auto const& cand_block_end = data_it.candidate_end();
118  std::size_t tstart_size=std::distance(cand_block_it, cand_block_end) + cand_list.tf_blocks().size();
119  tstart_memory.reserve(tstart_size);
120  while(cand_block_it != cand_block_end) {
121  auto const& candidate = &*cand_block_it;
122  tstart_memory.emplace_back(cand_list.start_time(*candidate).time_since_epoch().count());
123  auto const& start_mjd=tstart_memory.back();
124  heap.add_item(candidate_start_time_id, (void*)&(start_mjd), sizeof(start_mjd), false);
125  heap.add_item(candidate_dm_id, (void*)&candidate->dm().value(), sizeof(candidate->dm().value()), false);
126  heap.add_item(candidate_width_id, (void*)&candidate->width(), sizeof(candidate->width()), false);
127  heap.add_item(candidate_sigma_id, (void*)&candidate->sigma(), sizeof(typename CandidateType::SigmaType), false);
128  heap.add_item(candidate_duration_id, (void*)&candidate->tend(), sizeof(candidate->tend()), false);
129  ++cand_block_it;
130  }
131 
132  // TF data associated with this candidate block
133  bool end_block;
134  do {
135  heap.add_item(data_number_of_channels_id, &keep_alive->number_of_channels, sizeof(std::size_t), false);
136  heap.add_item(data_sample_interval_id, &reinterpret_cast<double const&>(keep_alive->sample_interval), sizeof(double), false);
137 
138  // candidate tf data
139  tstart_memory.emplace_back(data_it.start_time().time_since_epoch().count());
140  auto const& start_time=tstart_memory.back();
141  heap.add_item(tf_data_start_time_id, (void*)&start_time, sizeof(decltype(start_time)), false);
142  heap.add_item(tf_data_id, (void*)&*(*data_it).begin(), (*data_it).data_size()*sizeof(NumericalRep), false);
143 
144  end_block = data_it.end_block();
145  ++data_it;
146  }
147  while(!end_block);
148  assert(tstart_memory.size() <= tstart_size);
149 
150  // send the heap
151  _sp_stream.async_send_heap(heap, [ keep_alive ] (const boost::system::error_code &ec, spead2::item_pointer_t)
152  {
153  if (ec) {
154  PANDA_LOG_ERROR << "spead2 output stream failure: " << ec.message();
155  }
156  });
157 
158  }
159 
160  return *this;
161 }
162 
163 } // namespace exporters
164 } // namespace cheetah
165 } // namespace ska
Some limits and constants for FLDO.
Definition: Brdz.h:35