24 #include "cheetah/psrdada/SigProcDadaStream.h" 25 #include "cheetah/sigproc/SigProcHeader.h" 26 #include "cheetah/data/TimeFrequency.h" 27 #include "cheetah/psrdada/Config.h" 28 #include "cheetah/utils/ModifiedJulianClock.h" 30 #include "panda/Error.h" 31 #include <type_traits> 46 SigProcDadaStream::SigProcDadaStream(Config
const& config)
48 , _engine(config.engine())
49 , _client(_config.dada_key()
51 , [&](
std::istream& in,
std::exception_ptr eptr)
53 handle_new_sequence(in, eptr);
55 , _start_time(std::chrono::milliseconds(0))
60 void SigProcDadaStream::init()
65 SigProcDadaStream::~SigProcDadaStream()
70 void SigProcDadaStream::handle_new_sequence(std::istream& in, std::exception_ptr eptr)
74 PANDA_LOG_ERROR <<
"Error with reading header\n";
79 std::shared_ptr<bool> stopped = _client.stopped();
80 PANDA_LOG_DEBUG <<
"Reading first header";
82 _start_time = _header.start_time();
83 _engine.post([
this,stopped]() { new_chunk_process(stopped); });
87 void SigProcDadaStream::handle_new_sequence(std::istream& in, std::shared_ptr<ChunkType> current_chunk, ChunkType::Iterator it, std::exception_ptr eptr)
91 PANDA_LOG_DEBUG <<
"Reading new header";
93 std::shared_ptr<bool> stopped = _client.stopped();
94 if( _header.start_time() > _start_time || (unsigned) _header.number_of_channels() != current_chunk->number_of_channels() )
96 _start_time = _header.start_time();
97 auto start = current_chunk->begin();
98 std::size_t new_size = std::distance(start,it)/current_chunk->number_of_channels();
99 current_chunk->resize(data::DimensionSize<data::Time>(new_size));
100 _engine.post([
this, stopped](){ new_chunk_process(stopped); });
102 else if( _header.start_time() < _start_time && _start_time - _header.start_time() >= _header.sample_interval())
104 throw panda::Error(
"Incorrect timestamp. Data stream broken!!\n");
109 PANDA_LOG_DEBUG <<
"Time consistent!";
110 _engine.post([&,stopped,current_chunk,it](){do_process(stopped,current_chunk,it);});
113 catch (std::exception& e)
115 eptr = std::current_exception();
116 PANDA_LOG_WARN <<
"Exception caught: " << e.what();
121 PANDA_LOG_WARN <<
"Unknown exception caught";
126 void SigProcDadaStream::transfer_header_info(ChunkType& chunk)
128 if (_config.number_of_samples() > 0)
130 chunk.resize( data::DimensionSize<data::Time>(_config.number_of_samples()),data::DimensionSize<data::Frequency>(_header.number_of_channels()));
134 throw panda::Error(
"unable to determine number of sigproc samples required");
137 chunk.sample_interval(_header.sample_interval());
138 if(chunk.channel_frequencies().size() > 1)
140 chunk.set_channel_frequencies( _header.frequency_channels().begin()
141 , _header.frequency_channels().end());
145 chunk.set_channel_frequencies_const_width(
static_cast<typename data::TimeFrequency<Cpu, uint8_t>::FrequencyType
>(*_header.fch1())
146 ,
static_cast<typename data::TimeFrequency<Cpu, uint8_t>::FrequencyType
>(*_header.foff()));
152 bool SigProcDadaStream::process()
154 if(_error)
return _error;
156 if(_config.number_of_threads() == 0)
165 void SigProcDadaStream::new_chunk_process(std::shared_ptr<bool> stopped)
169 auto current_chunk = get_chunk<ChunkType>(data::DimensionSize<data::Time>(_config.number_of_samples()), data::DimensionSize<data::Frequency>(0));
170 assert(current_chunk);
171 transfer_header_info(*current_chunk);
172 current_chunk->start_time(_start_time);
173 auto it = current_chunk->begin();
174 do_process(stopped, current_chunk, it);
176 catch (std::exception& e)
178 PANDA_LOG_ERROR <<
"DadaStream: exception caught." << e.what();
183 PANDA_LOG_ERROR <<
"exception caught!";
188 bool SigProcDadaStream::do_process( std::shared_ptr<bool> stopped
189 , std::shared_ptr<ChunkType> current_chunk
190 , ChunkType::Iterator it)
192 static constexpr
unsigned number_of_bits =
sizeof(ChunkType::value_type) * 8;
198 if(_header.number_of_bits() != number_of_bits)
200 panda::Error e(
"Expecting ");
201 e << number_of_bits <<
" bits in header. Got " << _header.number_of_bits();
206 if(_client.read(it,current_chunk->end()) != current_chunk->end())
208 PANDA_LOG_DEBUG <<
"Partial read of stream, resizing and invoking next_sequence";
209 std::size_t elements_read = std::distance(begin,it);
210 std::lldiv_t dv{0,0};
211 dv = std::div(elements_read,static_cast<long long>(_header.number_of_channels()));
214 int const new_size = dv.quot;
215 PANDA_LOG_ERROR <<
"Partial read not a multiple of TF chunk. Incomplete spectra read";
216 throw panda::Error(
"Incorrect Data. Data have incomplete spectrum");
217 current_chunk->resize(data::DimensionSize<data::Time>(new_size - 1),data::DimensionSize<data::Frequency>(_header.number_of_channels()));
220 _start_time += std::chrono::duration<double>((int)(elements_read/_header.number_of_channels()) * _header.sample_interval().value());
221 _client.next_sequence([&,current_chunk, it](std::istream& in, std::exception_ptr eptr) { handle_new_sequence(in, current_chunk, it, eptr); } );
226 std::size_t elements_read = std::distance(begin,current_chunk->end());
227 _start_time += std::chrono::duration<double>((int)(elements_read/_header.number_of_channels()) * _header.sample_interval().value());
228 PANDA_LOG_DEBUG <<
"Successful read of the buffer ";
230 _engine.post([
this,stopped]() { new_chunk_process(stopped); } );
235 PANDA_LOG_DEBUG <<
"Exception caught!";
242 void SigProcDadaStream::stop()
247 #else // ENABLE_PSRDADA 249 SigProcDadaStream::SigProcDadaStream(Config
const& config)
254 SigProcDadaStream::~SigprocDadaStream()
258 #endif //ENABLE_PSRDADA
Some limits and constants for FLDO.
uint8_t DataType
the underlying data storage type for the amplitude of the signal