Cheetah - SKA - PSS - Prototype Time Domain Search Pipeline
DadaWriteClient.cpp
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2016 The SKA organisation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  */
24 
25 #include "cheetah/psrdada/DadaWriteClient.h"
26 #include "panda/Log.h"
27 #include "panda/Error.h"
28 #include <cstddef>
29 
30 namespace ska {
31 namespace cheetah {
32 namespace psrdada {
33 
34 DadaWriteClient::DadaWriteClient(key_t key, NewSequenceCallback const& new_sequence_callback)
35  : DadaClientBase(key, "write_client")
36  , _new_sequence_callback(new_sequence_callback)
37  , _current_block(nullptr)
38  , _current_writer(nullptr)
39  , _locked(false)
40 {
41  lock();
42  new_sequence_impl();
43 }
44 
45 DadaWriteClient::~DadaWriteClient()
46 {
47  release_data_block();
48  unlock();
49 }
50 
52 {
53  reset();
54  new_sequence_impl();
55 }
56 
57 void DadaWriteClient::new_sequence_impl()
58 {
59  PANDA_LOG_DEBUG << id() << "Acquiring next header block";
60  char* tmp = ipcbuf_get_next_write(_hdu->header_block);
61  if (!tmp)
62  {
63  _log.write(LOG_ERR, "Could not acquire next header block\n");
64  throw panda::Error("Could not acquire next header block");
65  }
66  detail::membuf sbuf(tmp, tmp + header_buffer_size());
67  auto deleter = [&](std::ostream* i)
68  {
69  delete i;
70  std::size_t used_bytes = std::distance(tmp, sbuf.writer_position());
71  PANDA_LOG_DEBUG << id() << "Releasing current header block";
72  PANDA_LOG_DEBUG << id() << "Header bytes used " << used_bytes;
73  if (ipcbuf_mark_filled(_hdu->header_block, used_bytes) < 0)
74  {
75  _log.write(LOG_ERR, "Could not mark filled header block\n");
76  throw panda::Error("Could not mark filled header block");
77  }
78  };
79  std::unique_ptr<std::ostream, decltype(deleter)> header(new std::ostream(&sbuf), deleter);
80  _new_sequence_callback(*header);
81  if (header->fail())
82  {
83  throw panda::Error("Failbit set on header stream after writing");
84  }
85 }
86 
87 std::unique_ptr<detail::RawBytesWriter>& DadaWriteClient::acquire_data_block()
88 {
89  release_data_block();
90  PANDA_LOG_DEBUG << id() << "Acquiring next data block";
91  uint64_t block_idx;
92  char* tmp = ipcio_open_block_write(_hdu->data_block, &block_idx);
93  if (!tmp)
94  {
95  _log.write(LOG_ERR, "Could not get data block\n");
96  throw panda::Error("Could not open block to write");
97  }
98  _current_block.reset(new detail::RawBytes(tmp, data_buffer_size()));
99  PANDA_LOG_DEBUG << id() << "Acquired data block " << block_idx;
100  _current_writer.reset(new detail::RawBytesWriter(*_current_block));
101  return _current_writer;
102 }
103 
104 void DadaWriteClient::release_data_block()
105 {
106  if (!_current_block) return;
107  PANDA_LOG_DEBUG << id() << "Releasing data block";
108  PANDA_LOG_DEBUG << id() << _current_block->used_bytes() << " bytes written";
109  if (ipcio_close_block_write (_hdu->data_block, _current_block->used_bytes()) < 0)
110  {
111  _log.write(LOG_ERR, "close_buffer: ipcio_close_block_write failed\n");
112  throw panda::Error("Could not close ipcio data block");
113  }
114  _current_writer.reset(nullptr);
115  _current_block.reset(nullptr);
116 }
117 
118 void DadaWriteClient::lock()
119 {
120  if (!_connected)
121  {
122  throw panda::Error("Lock requested on unconnected HDU");
123  }
124  PANDA_LOG_DEBUG << this->id() << "Acquiring writing lock on dada buffer";
125  if (dada_hdu_lock_write (_hdu) < 0)
126  {
127  _log.write(LOG_ERR, "open_hdu: could not lock write\n");
128  throw panda::Error("Error locking HDU");
129  }
130  _locked = true;
131 }
132 
133 void DadaWriteClient::unlock()
134 {
135  if (!_locked)
136  {
137  throw panda::Error("Release requested on unlocked HDU\n");
138  }
139  PANDA_LOG_DEBUG << this->id() << "Releasing writing lock on dada buffer";
140  if (dada_hdu_unlock_write (_hdu) < 0)
141  {
142  _log.write(LOG_ERR, "open_hdu: could not release write\n");
143  throw panda::Error("Error releasing HDU");
144  }
145  _locked = false;
146 }
147 
148 void DadaWriteClient::reset()
149 {
150  release_data_block();
151  unlock();
152  lock();
153 }
154 
156 {
157  release_data_block();
158 }
159 
160 } // namespace psrdada
161 } // namespace cheetah
162 } // namespace ska
std::size_t data_buffer_size() const
Get the sizes of each data block in the ring buffer.
Class for wrapping a raw pointer to a buffer of shared memory.
Definition: RawBytes.h:45
void new_sequence()
Start a new writing sequence in the buffer.
A general base class for DADA readers and writers.
Some limits and constants for FLDO.
Definition: Brdz.h:35
DadaWriteClient(key_t key, NewSequenceCallback const &new_sequence_callback)
Instatiate new DadaWriteClient.
void write_eod()
: Write an EOD marker without writing a next header
std::size_t header_buffer_size() const
Get the sizes of each header block in the ring buffer.
std::string const & id() const
Return a string identifier based on the buffer key and log name.
void write(int priority, const char *format, Args &&... args)
Write to the log.
Definition: MultiLog.cpp:34