Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SpCclSpeadStreamer.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/exporters/SpCclSpeadStreamer.h"
25 
26 namespace ska {
27 namespace cheetah {
28 namespace exporters {
29 
30 SpCclSpeadStreamer::SpCclSpeadStreamer(SpCclSpeadStreamerConfig const& config, panda::Engine& engine)
31  : _config(config)
32  , _sp_flavour(spead2::maximum_version, 64, 48, spead2::BUG_COMPAT_PYSPEAD_0_5_2)
33  , _sp_end(_sp_flavour)
34  , _sp_stream(engine, config.send_address().end_point<boost::asio::ip::udp::endpoint>()
35  , spead2::send::stream_config().set_max_packet_size(config.packet_size())
36  .set_rate(config.send_rate_limit())
37  .set_burst_size(spead2::send::stream_config::default_burst_size)
38  .set_max_heaps(spead2::send::stream_config::default_max_heaps * 8)
39  )
40 {
41  std::string limit_msg;
42  if(config.send_rate_limit()!=0.0) {
43  limit_msg = " (limited to " + std::to_string(config.send_rate_limit()) + " bytes/sec)";
44  };
45  PANDA_LOG << "Spead UDP output stream on " << config.send_address().address().to_string()
46  << ":" << config.send_address().port()
47  << limit_msg;
48  _sp_end.add_end();
49  _tf_data_desc.id = tf_data_id;
50  _tf_data_desc.name = "TimeFrequency";
51  _tf_data_desc.description = "TimeFrequency Data In Spectra order";
52 
53  _ft_data_desc.id = tf_data_id;
54  _ft_data_desc.name = "FrequencyTime";
55  _ft_data_desc.description = "Frequency TimeSeries Data";
56 
57  _data_start_time_desc.id = candidate_start_time_id;
58  _data_start_time_desc.name = "CandidateStartTime";
59  _data_start_time_desc.description= "MJD of candidate pulse start";
60  _data_start_time_desc.format.emplace_back('f', sizeof(double) * 8);
61 
62  _data_number_of_channels_desc.id = data_number_of_channels_id;
63  _data_number_of_channels_desc.name = "channels";
64  _data_number_of_channels_desc.description = "number of channels in the data block";
65  _data_number_of_channels_desc.format.emplace_back('i', sizeof(std::size_t) * 8);
66 
67  _data_channel_1.id = data_channel_1_id;
68  _data_channel_1.name = "top frequency";
69 
70  _data_channel_width.id = data_channel_width_id;
71  _data_channel_width.name = "channel_width";
72 
73  _data_sammple_interval.id = data_sample_interval_id;
74  _data_sammple_interval.name = "sample_interval";
75 
76  _candidate_start_time_desc.id = candidate_start_time_id;
77  _candidate_start_time_desc.name = "CandidateStartTime";
78  _candidate_start_time_desc.description= "MJD of candidate pulse start";
79  _candidate_start_time_desc.format.emplace_back('f', sizeof(double) * 8);
80 
81  _candidate_dm_desc.id = candidate_dm_id;
82  _candidate_dm_desc.name = "dm";
83  _candidate_dm_desc.description = "DispersionMeasure";
84 
85  _candidate_width_desc.id = candidate_width_id;
86  _candidate_width_desc.name = "width";
87  _candidate_width_desc.description = "Pulse Width";
88 
89  _candidate_sigma_desc.id = candidate_sigma_id;
90  _candidate_sigma_desc.name = "sigma";
91  _candidate_sigma_desc.description = "Pulse Sigma";
92 
93  _candidate_duration_desc.id = candidate_duration_id;
94  _candidate_duration_desc.name = "duration";
95  _candidate_duration_desc.description = "Pulse Duration";
96 
97  typedef typename data::SpCcl<uint8_t>::SpCandidateType CandidateType;
98  _candidate_dm_desc.format.push_back(TypeHelper<typename CandidateType::Dm::value_type>::format());
99  _candidate_width_desc.format.push_back(TypeHelper<typename CandidateType::MsecTimeType::value_type>::format());
100  _candidate_sigma_desc.format.push_back(TypeHelper<typename CandidateType::SigmaType>::format());
101  _candidate_duration_desc.format.push_back(TypeHelper<typename CandidateType::MsecTimeType::value_type>::format());
102 
103 }
104 
105 SpCclSpeadStreamer::~SpCclSpeadStreamer()
106 {
107  _sp_stream.async_send_heap(_sp_end, [] (const boost::system::error_code&, spead2::item_pointer_t /*bytes_transferred*/){});
108  bool stopped = false;
109  std::thread th([&]() { _sp_stream.flush(); stopped=true; });
110  while(!stopped) {
111  _sp_stream.get_io_service().poll_one();
112  }
113  th.join();
114 }
115 
116 
117 } // namespace exporters
118 } // namespace cheetah
119 } // namespace ska
Definition: Units.h:112
Some limits and constants for FLDO.
Definition: Brdz.h:35