MatLogger2  1.0.0
Library for logging of numeric data to HDF5 MAT-files, which is RT-safe and multithreaded.
var_buffer.cpp
Go to the documentation of this file.
2 
4 
5 using namespace XBot;
6 
7 VariableBuffer::BufferBlock::BufferBlock():
8  _write_idx(0),
9  _buf(0,0)
10 {
11 
12 }
13 
14 VariableBuffer::BufferBlock::BufferBlock(int dim, int block_size):
15  _write_idx(0),
16  _buf(dim, block_size)
17 {
18 
19 }
20 
21 int VariableBuffer::BufferBlock::get_size() const
22 {
23  return _buf.cols();
24 }
25 
26 const Eigen::MatrixXd& VariableBuffer::BufferBlock::get_data() const
27 {
28  return _buf;
29 }
30 
31 int VariableBuffer::BufferBlock::get_valid_elements() const
32 {
33  return _write_idx;
34 }
35 
36 void VariableBuffer::BufferBlock::reset()
37 {
38  _write_idx = 0;
39 }
40 
41 namespace lf = boost::lockfree;
42 
53 {
54 public:
55 
56  // fixed number of blocks
57  static const int NUM_BLOCKS = 20;
58 
59  template <typename T>
61 
62  QueueImpl(int elem_size, int buffer_size)
63  {
64  // allocate all blocks and push them into the pool
65  for(int i = 0; i < NUM_BLOCKS; i++)
66  {
67  _block_pool.push_back(std::make_shared<BufferBlock>(elem_size, buffer_size));
68  }
69 
70  // pre allocate queues
71  _read_queue.reset(BufferBlock::Ptr());
72  _write_queue.reset(BufferBlock::Ptr());
73  }
74 
75 
81  BufferBlock::Ptr get_new_block()
82  {
83  // update pool with elements from write queue
84  _write_queue.consume_all(
85  [this](BufferBlock::Ptr block)
86  {
87  _block_pool.push_back(block);
88  }
89  );
90 
91  if(_block_pool.empty())
92  {
93  return nullptr;
94  }
95 
96  auto ret = _block_pool.back();
97  ret->reset();
98  _block_pool.pop_back();
99 
100  return ret;
101  }
102 
107  {
108  return _read_queue;
109  }
110 
115  {
116  return _write_queue;
117  }
118 
119  static int Size()
120  {
121  return NUM_BLOCKS;
122  }
123 
124 private:
125 
126  // pool of available blocks
127  std::vector<BufferBlock::Ptr> _block_pool;
128 
129  // queue for blocks that are ready to be flushed by consumer
131 
132  // queue for blocks that are ready to be filled by producer
133  LockfreeQueue<BufferBlock::Ptr> _write_queue;
134 };
135 
137  int dim_rows,
138  int dim_cols,
139  int block_size):
140  _name(name),
141  _rows(dim_rows),
142  _cols(dim_cols),
143  _queue(new QueueImpl(dim_rows*dim_cols, block_size)),
144  _buffer_mode(Mode::producer_consumer)
145 {
146  // intialize current block
147  _current_block = _queue->get_new_block();
148 }
149 
150 std::pair< int, int > VariableBuffer::get_dimension() const
151 {
152  return std::make_pair(_rows, _cols);
153 }
154 
156 {
157  _on_block_available = callback;
158 }
159 
160 int VariableBuffer::BufferBlock::get_size_bytes() const
161 {
162  return _buf.size() * sizeof(_buf[0]);
163 }
164 
165 bool XBot::VariableBuffer::read_block(Eigen::MatrixXd& data, int& valid_elements)
166 {
167  if(_buffer_mode == Mode::circular_buffer)
168  {
169  throw std::logic_error("Cannot call read_block() when in circular_buffer mode!");
170  }
171 
172  // this function is not allowed to use class members,
173  // except consuming elements from read queue
174  // and pushing elements into write queue
175 
176  int ret = 0;
177 
178  BufferBlock::Ptr block;
179  if(_queue->get_read_queue().pop(block))
180  {
181  // copy data from block to output buffer
182  data = block->get_data();
183 
184  ret = block->get_valid_elements();
185 
186  // reset block and send it back to producer thread
187  block->reset();
188  _queue->get_write_queue().push(block);
189  }
190 
191  valid_elements = ret;
192 
193  return ret > 0;
194 }
195 
197 {
198  // no valid elements in the current block, we just return true
199  if(_current_block->get_valid_elements() == 0)
200  {
201  return true;
202  }
203 
204  // fill buffer info struct
205  BufferInfo buf_info;
206  buf_info.new_available_bytes = _current_block->get_size_bytes();
207  buf_info.variable_free_space = _queue->get_read_queue().write_available() / (double)VariableBuffer::NumBlocks();
208  buf_info.variable_name = _name.c_str();
209 
210  // try to push current block into the queue & obtain a new block
211  BufferBlock::Ptr new_block = _queue->get_new_block();
212 
213  // handle failure to obtain new block
214  if(!new_block)
215  {
216  // if we are in circular buffer mode..
217  if(_buffer_mode == Mode::circular_buffer)
218  {
219  // read oldest block from read queue,
220  // and set it as new block
221  // NOTE: pop() is safe because read_block() cannot be called
222  // from circular_buffer mode
223  if(!_queue->get_read_queue().pop(new_block))
224  {
225  // should never be called
226  throw std::logic_error("Failed to pop a new block for variable '" + _name + "'");
227  }
228 
229  }
230  else // producer-consumer mode
231  {
232  // in this case we can't do anything but keep writing on the current block
233  fprintf(stderr, "Failed to get new block for variable '%s'\n", _name.c_str());
234  return false;
235  }
236  }
237 
238  // we managed to get a new block, try to push the current into the queue
239  // this should never fail
240  bool push_to_queue_success = _queue->get_read_queue().push(_current_block);
241 
242  if(!push_to_queue_success)
243  {
244  // should never be called
245  throw std::logic_error("Failed to push current block for variable '" + _name + "'");
246  }
247 
248  // we managed to push a block into the queue
249  _current_block = new_block;
250 
251  // if a callback was registered, we call it
252  if(_on_block_available)
253  {
254  _on_block_available(buf_info);
255  }
256 
257  return true;
258 
259 }
260 
262 {
263  return QueueImpl::Size();
264 }
265 
267 {
268  _buffer_mode = mode;
269 }
270 
271 
273 {
274  // this is needed to avoid the compiler generate a default destructor,
275  // which is incompatible with the forward-declaration of QueueImpl
276 }
277 
278 
std::pair< int, int > get_dimension() const
Definition: var_buffer.cpp:150
bool flush_to_queue()
Writes current block to the queue.
Definition: var_buffer.cpp:196
void set_buffer_mode(VariableBuffer::Mode mode)
Set whether this buffer should be treated as a (possibly dual threaded) producer-consumer queue...
Definition: var_buffer.cpp:266
Mode
Enum for specifying the type of buffer.
Definition: var_buffer.h:40
BufferBlock::Ptr get_new_block()
Get a new block from the pool, if available.
Definition: var_buffer.cpp:81
LockfreeQueue< BufferBlock::Ptr > & get_read_queue()
Handle to the read queue.
Definition: var_buffer.cpp:106
The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-f...
bool read_block(Eigen::MatrixXd &data, int &valid_elements)
Reads a whole block from the queue, if one is available.
Definition: var_buffer.cpp:165
VariableBuffer(std::string name, int dim_rows, int dim_cols, int block_size)
Constructor.
Definition: var_buffer.cpp:136
LockfreeQueue< BufferBlock::Ptr > & get_write_queue()
Handle to the write queue.
Definition: var_buffer.cpp:114
static int NumBlocks()
Definition: var_buffer.cpp:261
QueueImpl(int elem_size, int buffer_size)
Definition: var_buffer.cpp:62
std::function< void(BufferInfo)> CallbackType
Definition: var_buffer.h:63
void set_on_block_available(CallbackType callback)
Sets a callback that is used to notify that a new block has been pushed into the queue.
Definition: var_buffer.cpp:155
The QueueImpl class implements the buffering strategy for a single logged variable.
Definition: var_buffer.cpp:52