Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
DadaReadClient.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 
25 #include "cheetah/psrdada/DadaReadClient.h"
26 #include "panda/Error.h"
27 #include "panda/Log.h"
28 #include "ipcbuf.h"
29 #include "ipcutil.h"
30 #include "tmutil.h"
31 #include <sys/ipc.h>
32 #include <streambuf>
33 #include <mutex>
34 
35 #define IPCBUF_VIEWER 1 /* connected */
36 #define IPCBUF_READER 5 /* one process that reads from the buffer */
37 #define IPCBUF_READING 6 /* start-of-data flag has been raised */
38 #define IPCBUF_VIEWING 8 /* currently viewing */
39 #define IPCBUF_VSTOP 9 /* end-of-data while viewer */
40 
41 
42 
43 extern "C"
44 {
45  char* ipcbuf_get_next_read_work(ipcbuf_t* id, uint64_t* bytes, int flag);
46 }
47 
48 namespace ska {
49 namespace cheetah {
50 namespace psrdada {
51 
52 DadaReadClient::DadaReadClient(key_t key,panda::Engine& engine, NextSequenceCallback const& next_sequence_callback)
53  : DadaClientBase(key, "read_client")
54  , _next_sequence_callback(next_sequence_callback)
55  , _current_block(nullptr)
56  , _current_reader(nullptr)
57  , _locked(false)
58  , _engine(engine)
59  , _destructor_flag(std::make_shared<bool>(false))
60 {
61  lock();
62  //_engine.post(std::bind(&DadaReadClient::do_next_sequence<NextSequenceCallback>
63  //, this, _destructor_flag, _next_sequence_callback));
64 }
65 
66 void DadaReadClient::start()
67 {
68  _engine.post(std::bind(&DadaReadClient::do_next_sequence<NextSequenceCallback>
69  , this, _destructor_flag, _next_sequence_callback));
70 }
71 
72 DadaReadClient::~DadaReadClient()
73 {
74  PANDA_LOG_DEBUG << "Destructing the read client";
75  stop();
76  unlock();
77  while(_destructor_flag.use_count() != 1 )
78  {
79  _engine.poll_one();
80  }
81 }
82 
83 
84 void DadaReadClient::stop()
85 {
86  if (!*_destructor_flag)
87  {
88  *_destructor_flag = true;
89  PANDA_LOG_DEBUG << "Stop!";
90  release_data_block();
91  }
92 }
93 
94 
96 {
97  next_sequence(_next_sequence_callback);
98 }
99 
100 
101 std::unique_ptr<detail::RawBytesReader>& DadaReadClient::acquire_data_block()
102 {
103  release_data_block();
104  PANDA_LOG_DEBUG << id() << "Acquiring next data block";
105  uint64_t block_idx, nbytes = 0;
106  char* tmp;
107  do {
108  tmp = open_block_read(_hdu->data_block, &nbytes, &block_idx);
109  if (!tmp)
110  {
111  if (eod())
112  {
113  _current_block.reset(nullptr);
114  PANDA_LOG << id() << "Reached EOD in data ring";
115  _current_reader.reset(nullptr);
116  return _current_reader;
117  }
118  if(*_destructor_flag)
119  {
120  release_data_block();
121  _current_reader.reset(nullptr);
122  return _current_reader;
123  }
124  }
125  } while(!tmp);
126 
127  if(*_destructor_flag)
128  {
129  release_data_block();
130  }
131  else
132  {
133  std::lock_guard<std::mutex> lock(_block_mutex);
134  PANDA_LOG_DEBUG << id() << "Acquired block " << block_idx;
135  _current_block.reset(new detail::RawBytes(tmp, data_buffer_size(), nbytes));
136  PANDA_LOG_DEBUG << id() << "Data block used/total bytes = "
137  << _current_block->used_bytes() <<"/"<<_current_block->total_bytes();
138  _current_reader.reset(new detail::RawBytesReader(*_current_block));
139  }
140  return _current_reader;
141 }
142 
143 
144 void DadaReadClient::release_data_block()
145 {
146 
147  PANDA_LOG_DEBUG << id() << "Releasing data block";
148  std::lock_guard<std::mutex> lock(_block_mutex);
149  if (!_current_block)
150  {
151  if(_hdu->data_block)
152  {
153  ipcio_close_block_read (_hdu->data_block, 0);
154  }
155  }
156  else if (ipcio_close_block_read (_hdu->data_block, _current_block->used_bytes()) < 0)
157  {
158  _log.write(LOG_ERR, "close_buffer: ipcio_close_block_read failed\n");
159  throw panda::Error("Could not close ipcio data block");
160  }
161  _current_reader.reset(nullptr);
162  _current_block.reset(nullptr);
163  PANDA_LOG_DEBUG << "Release done";
164 }
165 
166 
167 void DadaReadClient::flush()
168 {
169  PANDA_LOG_DEBUG << id() << "Reader flush called";
170  while (!eod() && !*_destructor_flag)
171  {
172  if(!acquire_data_block())
173  {
174  break;
175  }
176  PANDA_LOG_DEBUG << "Return from acquire";
177  }
178  release_data_block();
179  reset();
180 }
181 
182 
183 void DadaReadClient::lock()
184 {
185  if (!_connected)
186  {
187  throw panda::Error("Lock requested on unconnected HDU");
188  }
189  if (dada_hdu_lock_read (_hdu) < 0)
190  {
191  _log.write(LOG_ERR, "open_hdu: could not lock read\n");
192  throw panda::Error("Error locking HDU");
193  }
194  _locked = true;
195 }
196 
197 void DadaReadClient::unlock()
198 {
199  if (!_locked)
200  {
201  throw panda::Error("Release requested on unlocked HDU");
202  }
203  if (dada_hdu_unlock_read (_hdu) < 0)
204  {
205  _log.write(LOG_ERR, "open_hdu: could not release read\n");
206  throw panda::Error("Error releasing HDU");
207  }
208  _locked = false;
209 }
210 
211 void DadaReadClient::reset()
212 {
213  if(!*_destructor_flag)
214  {
215  unlock();
216  lock();
217  }
218 }
219 
220 bool DadaReadClient::eod() const
221 {
222  return (bool) (ipcbuf_eod((ipcbuf_t *)(_hdu->data_block)));
223 }
224 
225 
226 // Overloading the open_block method from psrdada for multi-threading purposes
227 
228 char* DadaReadClient::open_block_read (ipcio_t* ipc, std::uint64_t *curbufsz, std::uint64_t *block_id)
229 {
230  if (ipc->bytes != 0)
231  {
232  PANDA_LOG_ERROR << "ipcio_open_block_read: ipc->bytes != 0";
233  return 0;
234  }
235 
236  if (ipc->curbuf)
237  {
238  PANDA_LOG_ERROR << "ipcio_open_block_read: ipc->curbuf != 0";
239  return 0;
240  }
241 
242  if (ipc -> rdwrt != 'r' && ipc -> rdwrt != 'R')
243  {
244  PANDA_LOG_ERROR << "ipcio_open_block_read: ipc -> rdwrt != [rR]";
245  return 0;
246  }
247  // Test for eod
248 
249  if (ipcbuf_eod((ipcbuf_t*)ipc))
250  {
251  PANDA_LOG_ERROR << "ipcio_open_block_read: ipcbuf_eod true, returning null ptr";
252  return 0;
253  }
254  ipc->curbuf = ipcbuf_get_next_read ((ipcbuf_t*)ipc, &(ipc->curbufsz));
255 
256  if (!ipc->curbuf)
257  {
258  //PANDA_LOG_ERROR << "ipcio_open_block_read: could not get next block rdwrt=%c" << ipc -> rdwrt;
259  return 0;
260  }
261 
262  *block_id = ipcbuf_get_read_index ((ipcbuf_t*)ipc);
263  *curbufsz = ipc->curbufsz;
264  ipc->bytes = 0;
265 
266  if(!*_destructor_flag)
267  {
268  return ipc->curbuf;
269  }
270  else
271  {
272  return nullptr;
273  }
274 }
275 
276 static
277 char* psrdada_ipcbuf_get_next_read_work (ipcbuf_t* id, uint64_t* bytes, int flag)
278 {
279  int iread = -1;
280  uint64_t bufnum;
281  uint64_t start_byte = 0;
282  ipcsync_t* sync = 0;
283 
284  if (ipcbuf_eod (id))
285  return NULL;
286 
287  sync = id->sync;
288 
289  if (ipcbuf_is_reader (id))
290  {
291  iread = id->iread;
292 
293  /* decrement the buffers written semaphore */
294  if (ipc_semop (id->semid_data[iread], IPCBUF_FULL, -1, flag) < 0)
295  {
296  return NULL;
297  }
298 
299  if (id->state == IPCBUF_READER)
300  {
301  id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS;
302 
303  id->state = IPCBUF_READING;
304  id->sync->r_states[iread] = IPCBUF_READING;
305 
306  sync->r_bufs[iread] = sync->s_buf[id->xfer];
307  start_byte = sync->s_byte[id->xfer];
308 
309  /* increment the start-of-data acknowlegement semaphore */
310  if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, 1, flag) < 0)
311  {
312  PANDA_LOG_ERROR << "ipcbuf_get_next_read: error increment SODACK\n";
313  return NULL;
314  }
315 
316  }
317 
318  bufnum = sync->r_bufs[iread];
319 
320  }
321  else
322  {
323  // TODO - check if we should always just use id->iread = 0 for this??
324  iread = 0;
325 
326  if (id->state == IPCBUF_VIEWER)
327  {
328 
329  id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS;
330  id->state = IPCBUF_VIEWING;
331 
332  id->viewbuf = sync->s_buf[id->xfer];
333  start_byte = sync->s_byte[id->xfer];
334 
335  if (sync->w_buf_curr > id->viewbuf + 1)
336  {
337  id->viewbuf = sync->w_buf_curr - 1;
338  start_byte = 0;
339  }
340  }
341 
342  /* Viewers wait until w_buf_curr is incremented without semaphore operations */
343  while (sync->w_buf_curr <= id->viewbuf)
344  {
345 
346  // AJ added: sync->r_bufs[iread] to ensure that a buffer has been read by a reader
347  if (sync->eod[id->xfer] && sync->r_bufs[iread] && sync->r_bufs[iread] == sync->e_buf[id->xfer])
348  {
349  id->state = IPCBUF_VSTOP;
350  break;
351  }
352 
353  float_sleep (0.1);
354  }
355 
356  if (id->viewbuf + sync->nbufs < sync->w_buf_curr)
357  id->viewbuf = sync->w_buf_curr - sync->nbufs + 1;
358 
359  bufnum = id->viewbuf;
360  id->viewbuf ++;
361  }
362 
363  bufnum %= sync->nbufs;
364 
365  if (bytes)
366  {
367  if (sync->eod[id->xfer] && sync->r_bufs[iread] == sync->e_buf[id->xfer])
368  {
369  *bytes = sync->e_byte[id->xfer] - start_byte;
370  }
371  else
372  *bytes = sync->bufsz - start_byte;
373  }
374 
375  return id->buffer[bufnum] + start_byte;
376 }
377 
378 
379 char* DadaReadClient::ipcbuf_get_next_read (ipcbuf_t* id, uint64_t* bytes)
380 {
381  return psrdada_ipcbuf_get_next_read_work (id, bytes, IPC_NOWAIT);
382 }
383 
384 } // namespace psrdada
385 } // namespace cheetah
386 } // namespace ska
std::size_t data_buffer_size() const
Get the sizes of each data block in the ring buffer.
DadaReadClient(key_t key, panda::Engine &engine, NextSequenceCallback const &next_sequence_callback)
Instatiate new DadaReadClient.
Class for wrapping a raw pointer to a buffer of shared memory.
Definition: RawBytes.h:45
A general base class for DADA readers and writers.
Some limits and constants for FLDO.
Definition: Brdz.h:35
void next_sequence()
Move to the next sequence in the ring buffer.
std::string const & id() const
Return a string identifier based on the buffer key and log name.
void write(int priority, const char *format, Args &&... args)
Write to the log.
Definition: MultiLog.cpp:34