25 #include "cheetah/psrdada/DadaReadClient.h" 26 #include "panda/Error.h" 27 #include "panda/Log.h" 35 #define IPCBUF_VIEWER 1 36 #define IPCBUF_READER 5 37 #define IPCBUF_READING 6 38 #define IPCBUF_VIEWING 8 39 #define IPCBUF_VSTOP 9 45 char* ipcbuf_get_next_read_work(ipcbuf_t*
id, uint64_t* bytes,
int flag);
54 , _next_sequence_callback(next_sequence_callback)
55 , _current_block(nullptr)
56 , _current_reader(nullptr)
59 , _destructor_flag(
std::make_shared<bool>(false))
66 void DadaReadClient::start()
68 _engine.post(std::bind(&DadaReadClient::do_next_sequence<NextSequenceCallback>
69 ,
this, _destructor_flag, _next_sequence_callback));
72 DadaReadClient::~DadaReadClient()
74 PANDA_LOG_DEBUG <<
"Destructing the read client";
77 while(_destructor_flag.use_count() != 1 )
84 void DadaReadClient::stop()
86 if (!*_destructor_flag)
88 *_destructor_flag =
true;
89 PANDA_LOG_DEBUG <<
"Stop!";
101 std::unique_ptr<detail::RawBytesReader>& DadaReadClient::acquire_data_block()
103 release_data_block();
104 PANDA_LOG_DEBUG <<
id() <<
"Acquiring next data block";
105 uint64_t block_idx, nbytes = 0;
108 tmp = open_block_read(_hdu->data_block, &nbytes, &block_idx);
113 _current_block.reset(
nullptr);
114 PANDA_LOG <<
id() <<
"Reached EOD in data ring";
115 _current_reader.reset(
nullptr);
116 return _current_reader;
118 if(*_destructor_flag)
120 release_data_block();
121 _current_reader.reset(
nullptr);
122 return _current_reader;
127 if(*_destructor_flag)
129 release_data_block();
133 std::lock_guard<std::mutex> lock(_block_mutex);
134 PANDA_LOG_DEBUG <<
id() <<
"Acquired block " << block_idx;
136 PANDA_LOG_DEBUG <<
id() <<
"Data block used/total bytes = " 137 << _current_block->used_bytes() <<
"/"<<_current_block->total_bytes();
140 return _current_reader;
144 void DadaReadClient::release_data_block()
147 PANDA_LOG_DEBUG <<
id() <<
"Releasing data block";
148 std::lock_guard<std::mutex> lock(_block_mutex);
153 ipcio_close_block_read (_hdu->data_block, 0);
156 else if (ipcio_close_block_read (_hdu->data_block, _current_block->used_bytes()) < 0)
158 _log.
write(LOG_ERR,
"close_buffer: ipcio_close_block_read failed\n");
159 throw panda::Error(
"Could not close ipcio data block");
161 _current_reader.reset(
nullptr);
162 _current_block.reset(
nullptr);
163 PANDA_LOG_DEBUG <<
"Release done";
167 void DadaReadClient::flush()
169 PANDA_LOG_DEBUG <<
id() <<
"Reader flush called";
170 while (!eod() && !*_destructor_flag)
172 if(!acquire_data_block())
176 PANDA_LOG_DEBUG <<
"Return from acquire";
178 release_data_block();
183 void DadaReadClient::lock()
187 throw panda::Error(
"Lock requested on unconnected HDU");
189 if (dada_hdu_lock_read (_hdu) < 0)
191 _log.
write(LOG_ERR,
"open_hdu: could not lock read\n");
192 throw panda::Error(
"Error locking HDU");
197 void DadaReadClient::unlock()
201 throw panda::Error(
"Release requested on unlocked HDU");
203 if (dada_hdu_unlock_read (_hdu) < 0)
205 _log.
write(LOG_ERR,
"open_hdu: could not release read\n");
206 throw panda::Error(
"Error releasing HDU");
211 void DadaReadClient::reset()
213 if(!*_destructor_flag)
220 bool DadaReadClient::eod()
const 222 return (
bool) (ipcbuf_eod((ipcbuf_t *)(_hdu->data_block)));
228 char* DadaReadClient::open_block_read (ipcio_t* ipc, std::uint64_t *curbufsz, std::uint64_t *block_id)
232 PANDA_LOG_ERROR <<
"ipcio_open_block_read: ipc->bytes != 0";
238 PANDA_LOG_ERROR <<
"ipcio_open_block_read: ipc->curbuf != 0";
242 if (ipc -> rdwrt !=
'r' && ipc -> rdwrt !=
'R')
244 PANDA_LOG_ERROR <<
"ipcio_open_block_read: ipc -> rdwrt != [rR]";
249 if (ipcbuf_eod((ipcbuf_t*)ipc))
251 PANDA_LOG_ERROR <<
"ipcio_open_block_read: ipcbuf_eod true, returning null ptr";
254 ipc->curbuf = ipcbuf_get_next_read ((ipcbuf_t*)ipc, &(ipc->curbufsz));
262 *block_id = ipcbuf_get_read_index ((ipcbuf_t*)ipc);
263 *curbufsz = ipc->curbufsz;
266 if(!*_destructor_flag)
277 char* psrdada_ipcbuf_get_next_read_work (ipcbuf_t*
id, uint64_t* bytes,
int flag)
281 uint64_t start_byte = 0;
289 if (ipcbuf_is_reader (
id))
294 if (ipc_semop (id->semid_data[iread], IPCBUF_FULL, -1, flag) < 0)
299 if (id->state == IPCBUF_READER)
301 id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS;
303 id->state = IPCBUF_READING;
304 id->sync->r_states[iread] = IPCBUF_READING;
306 sync->r_bufs[iread] = sync->s_buf[
id->xfer];
307 start_byte = sync->s_byte[
id->xfer];
310 if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, 1, flag) < 0)
312 PANDA_LOG_ERROR <<
"ipcbuf_get_next_read: error increment SODACK\n";
318 bufnum = sync->r_bufs[iread];
326 if (id->state == IPCBUF_VIEWER)
329 id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS;
330 id->state = IPCBUF_VIEWING;
332 id->viewbuf = sync->s_buf[
id->xfer];
333 start_byte = sync->s_byte[
id->xfer];
335 if (sync->w_buf_curr > id->viewbuf + 1)
337 id->viewbuf = sync->w_buf_curr - 1;
343 while (sync->w_buf_curr <= id->viewbuf)
347 if (sync->eod[id->xfer] && sync->r_bufs[iread] && sync->r_bufs[iread] == sync->e_buf[id->xfer])
349 id->state = IPCBUF_VSTOP;
356 if (id->viewbuf + sync->nbufs < sync->w_buf_curr)
357 id->viewbuf = sync->w_buf_curr - sync->nbufs + 1;
359 bufnum =
id->viewbuf;
363 bufnum %= sync->nbufs;
367 if (sync->eod[id->xfer] && sync->r_bufs[iread] == sync->e_buf[id->xfer])
369 *bytes = sync->e_byte[
id->xfer] - start_byte;
372 *bytes = sync->bufsz - start_byte;
375 return id->buffer[bufnum] + start_byte;
379 char* DadaReadClient::ipcbuf_get_next_read (ipcbuf_t*
id, uint64_t* bytes)
381 return psrdada_ipcbuf_get_next_read_work (
id, bytes, IPC_NOWAIT);
std::size_t data_buffer_size() const
Get the sizes of each data block in the ring buffer.
DadaReadClient(key_t key, panda::Engine &engine, NextSequenceCallback const &next_sequence_callback)
Instatiate new DadaReadClient.
Class for wrapping a raw pointer to a buffer of shared memory.
A general base class for DADA readers and writers.
Some limits and constants for FLDO.
void next_sequence()
Move to the next sequence in the ring buffer.
std::string const & id() const
Return a string identifier based on the buffer key and log name.
void write(int priority, const char *format, Args &&... args)
Write to the log.