7 VariableBuffer::BufferBlock::BufferBlock():
14 VariableBuffer::BufferBlock::BufferBlock(
int dim,
int block_size):
21 int VariableBuffer::BufferBlock::get_size()
const 26 const Eigen::MatrixXd& VariableBuffer::BufferBlock::get_data()
const 31 int VariableBuffer::BufferBlock::get_valid_elements()
const 36 void VariableBuffer::BufferBlock::reset()
57 static const int NUM_BLOCKS = 20;
65 for(
int i = 0; i < NUM_BLOCKS; i++)
67 _block_pool.push_back(std::make_shared<BufferBlock>(elem_size, buffer_size));
71 _read_queue.reset(BufferBlock::Ptr());
72 _write_queue.reset(BufferBlock::Ptr());
84 _write_queue.consume_all(
85 [
this](BufferBlock::Ptr block)
87 _block_pool.push_back(block);
91 if(_block_pool.empty())
96 auto ret = _block_pool.back();
98 _block_pool.pop_back();
127 std::vector<BufferBlock::Ptr> _block_pool;
143 _queue(new
QueueImpl(dim_rows*dim_cols, block_size)),
144 _buffer_mode(
Mode::producer_consumer)
147 _current_block = _queue->get_new_block();
152 return std::make_pair(_rows, _cols);
157 _on_block_available = callback;
160 int VariableBuffer::BufferBlock::get_size_bytes()
const 162 return _buf.size() *
sizeof(_buf[0]);
169 throw std::logic_error(
"Cannot call read_block() when in circular_buffer mode!");
178 BufferBlock::Ptr block;
179 if(_queue->get_read_queue().pop(block))
182 data = block->get_data();
184 ret = block->get_valid_elements();
188 _queue->get_write_queue().push(block);
191 valid_elements = ret;
199 if(_current_block->get_valid_elements() == 0)
211 BufferBlock::Ptr new_block = _queue->get_new_block();
223 if(!_queue->get_read_queue().pop(new_block))
226 throw std::logic_error(
"Failed to pop a new block for variable '" + _name +
"'");
233 fprintf(stderr,
"Failed to get new block for variable '%s'\n", _name.c_str());
240 bool push_to_queue_success = _queue->get_read_queue().push(_current_block);
242 if(!push_to_queue_success)
245 throw std::logic_error(
"Failed to push current block for variable '" + _name +
"'");
249 _current_block = new_block;
252 if(_on_block_available)
254 _on_block_available(buf_info);
std::pair< int, int > get_dimension() const
bool flush_to_queue()
Writes current block to the queue.
void set_buffer_mode(VariableBuffer::Mode mode)
Set whether this buffer should be treated as a (possibly dual threaded) producer-consumer queue...
Mode
Enum for specifying the type of buffer.
BufferBlock::Ptr get_new_block()
Get a new block from the pool, if available.
LockfreeQueue< BufferBlock::Ptr > & get_read_queue()
Handle to the read queue.
The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-f...
bool read_block(Eigen::MatrixXd &data, int &valid_elements)
Reads a whole block from the queue, if one is available.
VariableBuffer(std::string name, int dim_rows, int dim_cols, int block_size)
Constructor.
LockfreeQueue< BufferBlock::Ptr > & get_write_queue()
Handle to the write queue.
QueueImpl(int elem_size, int buffer_size)
double variable_free_space
const char * variable_name
std::function< void(BufferInfo)> CallbackType
void set_on_block_available(CallbackType callback)
Sets a callback that is used to notify that a new block has been pushed into the queue.
The QueueImpl class implements the buffering strategy for a single logged variable.