25 #include "cheetah/psrdada/DadaWriteClient.h" 26 #include "panda/Log.h" 27 #include "panda/Error.h" 36 , _new_sequence_callback(new_sequence_callback)
37 , _current_block(nullptr)
38 , _current_writer(nullptr)
45 DadaWriteClient::~DadaWriteClient()
57 void DadaWriteClient::new_sequence_impl()
59 PANDA_LOG_DEBUG <<
id() <<
"Acquiring next header block";
60 char* tmp = ipcbuf_get_next_write(_hdu->header_block);
63 _log.
write(LOG_ERR,
"Could not acquire next header block\n");
64 throw panda::Error(
"Could not acquire next header block");
67 auto deleter = [&](std::ostream* i)
70 std::size_t used_bytes = std::distance(tmp, sbuf.writer_position());
71 PANDA_LOG_DEBUG <<
id() <<
"Releasing current header block";
72 PANDA_LOG_DEBUG <<
id() <<
"Header bytes used " << used_bytes;
73 if (ipcbuf_mark_filled(_hdu->header_block, used_bytes) < 0)
75 _log.
write(LOG_ERR,
"Could not mark filled header block\n");
76 throw panda::Error(
"Could not mark filled header block");
79 std::unique_ptr<std::ostream, decltype(deleter)> header(
new std::ostream(&sbuf), deleter);
80 _new_sequence_callback(*header);
83 throw panda::Error(
"Failbit set on header stream after writing");
87 std::unique_ptr<detail::RawBytesWriter>& DadaWriteClient::acquire_data_block()
90 PANDA_LOG_DEBUG <<
id() <<
"Acquiring next data block";
92 char* tmp = ipcio_open_block_write(_hdu->data_block, &block_idx);
95 _log.
write(LOG_ERR,
"Could not get data block\n");
96 throw panda::Error(
"Could not open block to write");
99 PANDA_LOG_DEBUG <<
id() <<
"Acquired data block " << block_idx;
101 return _current_writer;
104 void DadaWriteClient::release_data_block()
106 if (!_current_block)
return;
107 PANDA_LOG_DEBUG <<
id() <<
"Releasing data block";
108 PANDA_LOG_DEBUG <<
id() << _current_block->used_bytes() <<
" bytes written";
109 if (ipcio_close_block_write (_hdu->data_block, _current_block->used_bytes()) < 0)
111 _log.
write(LOG_ERR,
"close_buffer: ipcio_close_block_write failed\n");
112 throw panda::Error(
"Could not close ipcio data block");
114 _current_writer.reset(
nullptr);
115 _current_block.reset(
nullptr);
118 void DadaWriteClient::lock()
122 throw panda::Error(
"Lock requested on unconnected HDU");
124 PANDA_LOG_DEBUG << this->
id() <<
"Acquiring writing lock on dada buffer";
125 if (dada_hdu_lock_write (_hdu) < 0)
127 _log.
write(LOG_ERR,
"open_hdu: could not lock write\n");
128 throw panda::Error(
"Error locking HDU");
133 void DadaWriteClient::unlock()
137 throw panda::Error(
"Release requested on unlocked HDU\n");
139 PANDA_LOG_DEBUG << this->
id() <<
"Releasing writing lock on dada buffer";
140 if (dada_hdu_unlock_write (_hdu) < 0)
142 _log.
write(LOG_ERR,
"open_hdu: could not release write\n");
143 throw panda::Error(
"Error releasing HDU");
148 void DadaWriteClient::reset()
150 release_data_block();
157 release_data_block();
std::size_t data_buffer_size() const
Get the sizes of each data block in the ring buffer.
Class for wrapping a raw pointer to a buffer of shared memory.
void new_sequence()
Start a new writing sequence in the buffer.
A general base class for DADA readers and writers.
Some limits and constants for FLDO.
DadaWriteClient(key_t key, NewSequenceCallback const &new_sequence_callback)
Instatiate new DadaWriteClient.
void write_eod()
: Write an EOD marker without writing a next header
std::size_t header_buffer_size() const
Get the sizes of each header block in the ring buffer.
std::string const & id() const
Return a string identifier based on the buffer key and log name.
void write(int priority, const char *format, Args &&... args)
Write to the log.