Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
SigProcDadaStream.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 #include "cheetah/psrdada/SigProcDadaStream.h"
25 #include "cheetah/sigproc/SigProcHeader.h"
26 #include "cheetah/data/TimeFrequency.h"
27 #include "cheetah/psrdada/Config.h"
28 #include "cheetah/utils/ModifiedJulianClock.h"
29 
30 #include "panda/Error.h"
31 #include <type_traits>
32 #include <iostream>
33 #include <new>
34 #include <cstdlib>
35 #include <cmath>
36 
37 namespace ska {
38 namespace cheetah {
39 namespace psrdada {
40 
41 
45 #ifdef ENABLE_PSRDADA
46 SigProcDadaStream::SigProcDadaStream(Config const& config)
47  : _config(config)
48  , _engine(config.engine())
49  , _client(_config.dada_key()
50  , _engine
51  , [&](std::istream& in, std::exception_ptr eptr)
52  {
53  handle_new_sequence(in, eptr);
54  })
55  , _start_time(std::chrono::milliseconds(0))
56  , _error(false)
57 {
58 }
59 
60 void SigProcDadaStream::init()
61 {
62  _client.start();
63 }
64 
65 SigProcDadaStream::~SigProcDadaStream()
66 {
67 }
68 
69 
70 void SigProcDadaStream::handle_new_sequence(std::istream& in, std::exception_ptr eptr)
71 {
72  if(eptr)
73  {
74  PANDA_LOG_ERROR << "Error with reading header\n";
75  _error = true;
76  }
77  else
78  {
79  std::shared_ptr<bool> stopped = _client.stopped();
80  PANDA_LOG_DEBUG << "Reading first header";
81  _header.read(in);
82  _start_time = _header.start_time();
83  _engine.post([this,stopped]() { new_chunk_process(stopped); });
84  }
85 }
86 
87 void SigProcDadaStream::handle_new_sequence(std::istream& in, std::shared_ptr<ChunkType> current_chunk, ChunkType::Iterator it, std::exception_ptr eptr)
88 {
89  try
90  {
91  PANDA_LOG_DEBUG << "Reading new header";
92  _header.read(in);
93  std::shared_ptr<bool> stopped = _client.stopped();
94  if( _header.start_time() > _start_time || (unsigned) _header.number_of_channels() != current_chunk->number_of_channels() )
95  {
96  _start_time = _header.start_time();
97  auto start = current_chunk->begin();
98  std::size_t new_size = std::distance(start,it)/current_chunk->number_of_channels();
99  current_chunk->resize(data::DimensionSize<data::Time>(new_size));
100  _engine.post([this, stopped](){ new_chunk_process(stopped); });
101  }
102  else if( _header.start_time() < _start_time && _start_time - _header.start_time() >= _header.sample_interval())
103  {
104  throw panda::Error("Incorrect timestamp. Data stream broken!!\n");
105  return;
106  }
107  else
108  {
109  PANDA_LOG_DEBUG << "Time consistent!";
110  _engine.post([&,stopped,current_chunk,it](){do_process(stopped,current_chunk,it);});
111  }
112  }
113  catch (std::exception& e)
114  {
115  eptr = std::current_exception();
116  PANDA_LOG_WARN << "Exception caught: " << e.what();
117  _error=true;
118  }
119  catch (...)
120  {
121  PANDA_LOG_WARN << "Unknown exception caught";
122  _error=true;
123  }
124 }
125 
126 void SigProcDadaStream::transfer_header_info(ChunkType& chunk)
127 {
128  if (_config.number_of_samples() > 0)
129  {
130  chunk.resize( data::DimensionSize<data::Time>(_config.number_of_samples()),data::DimensionSize<data::Frequency>(_header.number_of_channels()));
131  }
132  else
133  {
134  throw panda::Error("unable to determine number of sigproc samples required");
135  }
136 
137  chunk.sample_interval(_header.sample_interval());
138  if(chunk.channel_frequencies().size() > 1)
139  {
140  chunk.set_channel_frequencies( _header.frequency_channels().begin()
141  , _header.frequency_channels().end());
142  }
143  else
144  {
145  chunk.set_channel_frequencies_const_width(static_cast<typename data::TimeFrequency<Cpu, uint8_t>::FrequencyType>(*_header.fch1())
146  ,static_cast<typename data::TimeFrequency<Cpu, uint8_t>::FrequencyType>(*_header.foff()));
147  }
148 }
149 
150 
151 
152 bool SigProcDadaStream::process()
153 {
154  if(_error) return _error;
155 
156  if(_config.number_of_threads() == 0)
157  {
158  _engine.poll_one();
159  }
160  return false;
161 
162 }
163 
164 
165 void SigProcDadaStream::new_chunk_process(std::shared_ptr<bool> stopped)
166 {
167  try
168  {
169  auto current_chunk = get_chunk<ChunkType>(data::DimensionSize<data::Time>(_config.number_of_samples()), data::DimensionSize<data::Frequency>(0));
170  assert(current_chunk);
171  transfer_header_info(*current_chunk);
172  current_chunk->start_time(_start_time);
173  auto it = current_chunk->begin();
174  do_process(stopped, current_chunk, it);
175  }
176  catch (std::exception& e)
177  {
178  PANDA_LOG_ERROR << "DadaStream: exception caught." << e.what();
179  _error=true;
180  }
181  catch (...)
182  {
183  PANDA_LOG_ERROR << "exception caught!";
184  _error=true;
185  }
186 }
187 
188 bool SigProcDadaStream::do_process( std::shared_ptr<bool> stopped
189  , std::shared_ptr<ChunkType> current_chunk
190  , ChunkType::Iterator it)
191 {
192  static constexpr unsigned number_of_bits = sizeof(ChunkType::value_type) * 8;
193  try
194  {
196  auto begin = it;
197  // Raise error if nbits not equal to 8
198  if(_header.number_of_bits() != number_of_bits)
199  {
200  panda::Error e("Expecting ");
201  e << number_of_bits << " bits in header. Got " << _header.number_of_bits();
202  throw e;
203  }
204 
205  // Checks for a partial read (due to eod) and resizes the chunk accordingly and stops reading data.
206  if(_client.read(it,current_chunk->end()) != current_chunk->end())
207  {
208  PANDA_LOG_DEBUG << "Partial read of stream, resizing and invoking next_sequence";
209  std::size_t elements_read = std::distance(begin,it);
210  std::lldiv_t dv{0,0};
211  dv = std::div(elements_read,static_cast<long long>(_header.number_of_channels()));
212  if (dv.rem !=0)
213  {
214  int const new_size = dv.quot;
215  PANDA_LOG_ERROR << "Partial read not a multiple of TF chunk. Incomplete spectra read";
216  throw panda::Error("Incorrect Data. Data have incomplete spectrum");
217  current_chunk->resize(data::DimensionSize<data::Time>(new_size - 1),data::DimensionSize<data::Frequency>(_header.number_of_channels()));
218  return true;
219  }
220  _start_time += std::chrono::duration<double>((int)(elements_read/_header.number_of_channels()) * _header.sample_interval().value());
221  _client.next_sequence([&,current_chunk, it](std::istream& in, std::exception_ptr eptr) { handle_new_sequence(in, current_chunk, it, eptr); } );
222  return false;
223  }
224  else
225  {
226  std::size_t elements_read = std::distance(begin,current_chunk->end());
227  _start_time += std::chrono::duration<double>((int)(elements_read/_header.number_of_channels()) * _header.sample_interval().value());
228  PANDA_LOG_DEBUG << "Successful read of the buffer ";
229  }
230  _engine.post([this,stopped]() { new_chunk_process(stopped); } );
231  return false;
232  }
233  catch (...)
234  {
235  PANDA_LOG_DEBUG << "Exception caught!";
236  _error = true;
237  return false;
238  }
239 
240 }
241 
242 void SigProcDadaStream::stop()
243 {
244  _client.stop();
245 }
246 
247 #else // ENABLE_PSRDADA
248 
249 SigProcDadaStream::SigProcDadaStream(Config const& config)
250  : _config(config)
251 {
252 }
253 
254 SigProcDadaStream::~SigprocDadaStream()
255 {
256 }
257 
258 #endif //ENABLE_PSRDADA
259 
260 } // namespace psrdada
261 } // namespace cheetah
262 } // namespace ska
Some limits and constants for FLDO.
Definition: Brdz.h:35
uint8_t DataType
the underlying data storage type for the amplitude of the signal
Definition: TimeFrequency.h:96