MatLogger2  1.0.0
Library for logging of numeric data to HDF5 MAT-files, which is RT-safe and multithreaded.
spsc_queue_logger.hpp
Go to the documentation of this file.
1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
3 //
4 // Copyright (C) 2009-2013 Tim Blechmann
5 //
6 // Distributed under the Boost Software License, Version 1.0. (See
7 // accompanying file LICENSE_1_0.txt or copy at
8 // http://www.boost.org/LICENSE_1_0.txt)
9 
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12 
13 
14 #define old_likely likely
15 #define old_unlikely unlikely
16 #undef likely // is_likely
17 #undef unlikely // is_unlikely
18 
19 
20 #include <algorithm>
21 #include <memory>
22 
23 #include <boost/aligned_storage.hpp>
24 #include <boost/assert.hpp>
25 #include <boost/static_assert.hpp>
26 #include <boost/utility.hpp>
27 #include <boost/utility/enable_if.hpp>
28 
29 #include <boost/type_traits/has_trivial_destructor.hpp>
30 #include <boost/type_traits/is_convertible.hpp>
31 
32 #include <boost/lockfree/detail/atomic.hpp>
33 #include <boost/lockfree/detail/copy_payload.hpp>
34 #include <boost/lockfree/detail/parameter.hpp>
35 #include <boost/lockfree/detail/prefix.hpp>
36 
37 #ifdef BOOST_HAS_PRAGMA_ONCE
38 #pragma once
39 #endif
40 
41 // namespace ci_internal {
42 
43 namespace boost {
44 namespace lockfree {
45 namespace detail {
46 
47 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
48  boost::parameter::optional<tag::allocator>
50 
51 template <typename T>
53 {
54 #ifndef BOOST_DOXYGEN_INVOKED
55 protected:
56  typedef std::size_t size_t;
57  static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
58  atomic<size_t> write_index_;
59  char padding1[padding_size]; /* force read_index and write_index to different cache lines */
60  atomic<size_t> read_index_;
61 
62  BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
63  BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
64 
65 protected:
67  write_index_(0), read_index_(0)
68  {}
69 
70  static size_t next_index(size_t arg, size_t max_size)
71  {
72  size_t ret = arg + 1;
73  while ((ret >= max_size))
74  ret -= max_size;
75  return ret;
76  }
77 
78  static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
79  {
80  if (write_index >= read_index)
81  return write_index - read_index;
82 
83  const size_t ret = write_index + max_size - read_index;
84  return ret;
85  }
86 
87  static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
88  {
89  size_t ret = read_index - write_index - 1;
90  if (write_index >= read_index)
91  ret += max_size;
92  return ret;
93  }
94 
95  size_t read_available(size_t max_size) const
96  {
97  size_t write_index = write_index_.load(memory_order_relaxed);
98  const size_t read_index = read_index_.load(memory_order_relaxed);
99  return read_available(write_index, read_index, max_size);
100  }
101 
102  size_t write_available(size_t max_size) const
103  {
104  size_t write_index = write_index_.load(memory_order_relaxed);
105  const size_t read_index = read_index_.load(memory_order_relaxed);
106  return write_available(write_index, read_index, max_size);
107  }
108 
109  bool push(T const & t, T * buffer, size_t max_size)
110  {
111  const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
112  const size_t next = next_index(write_index, max_size);
113 
114 
115  if (next == read_index_.load(memory_order_acquire))
116  return false; /* ringbuffer is full */
117 
118  // NOTE: placement new + copy constructor -> pre-construction + copy assignment
119  *(buffer + write_index) = t;
120 
121  write_index_.store(next, memory_order_release);
122 
123  return true;
124  }
125 
126  size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
127  {
128  return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
129  }
130 
131  template <typename ConstIterator>
132  ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
133  {
134  // FIXME: avoid std::distance
135 
136  const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
137  const size_t read_index = read_index_.load(memory_order_acquire);
138  const size_t avail = write_available(write_index, read_index, max_size);
139 
140  if (avail == 0)
141  return begin;
142 
143  size_t input_count = std::distance(begin, end);
144  input_count = (std::min)(input_count, avail);
145 
146  size_t new_write_index = write_index + input_count;
147 
148  const ConstIterator last = boost::next(begin, input_count);
149 
150  if (write_index + input_count > max_size) {
151  /* copy data in two sections */
152  const size_t count0 = max_size - write_index;
153  const ConstIterator midpoint = boost::next(begin, count0);
154 
155  std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
156  std::uninitialized_copy(midpoint, last, internal_buffer);
157  new_write_index -= max_size;
158  } else {
159  std::uninitialized_copy(begin, last, internal_buffer + write_index);
160 
161  if (new_write_index == max_size)
162  new_write_index = 0;
163  }
164 
165  write_index_.store(new_write_index, memory_order_release);
166  return last;
167  }
168 
169  template <typename Functor>
170  bool consume_one(Functor & functor, T * buffer, size_t max_size)
171  {
172  const size_t write_index = write_index_.load(memory_order_acquire);
173  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
174  if ( empty(write_index, read_index) )
175  return false;
176 
177  T & object_to_consume = buffer[read_index];
178  functor( object_to_consume );
179 // object_to_consume.~T(); // NOTE don't destroy
180 
181  size_t next = next_index(read_index, max_size);
182  read_index_.store(next, memory_order_release);
183  return true;
184  }
185 
186  template <typename Functor>
187  bool consume_one(Functor const & functor, T * buffer, size_t max_size)
188  {
189  const size_t write_index = write_index_.load(memory_order_acquire);
190  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
191  if ( empty(write_index, read_index) )
192  return false;
193 
194  T & object_to_consume = buffer[read_index];
195  functor( object_to_consume );
196 // object_to_consume.~T(); // NOTE don't destroy
197 
198  size_t next = next_index(read_index, max_size);
199  read_index_.store(next, memory_order_release);
200  return true;
201  }
202 
203  template <typename Functor>
204  size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
205  {
206  const size_t write_index = write_index_.load(memory_order_acquire);
207  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
208 
209  const size_t avail = read_available(write_index, read_index, max_size);
210 
211  if (avail == 0)
212  return 0;
213 
214  const size_t output_count = avail;
215 
216  size_t new_read_index = read_index + output_count;
217 
218  if (read_index + output_count > max_size) {
219  /* copy data in two sections */
220  const size_t count0 = max_size - read_index;
221  const size_t count1 = output_count - count0;
222 
223  run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
224  run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
225 
226  new_read_index -= max_size;
227  } else {
228  run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
229 
230  if (new_read_index == max_size)
231  new_read_index = 0;
232  }
233 
234  read_index_.store(new_read_index, memory_order_release);
235  return output_count;
236  }
237 
238  template <typename Functor>
239  size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
240  {
241  const size_t write_index = write_index_.load(memory_order_acquire);
242  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
243 
244  const size_t avail = read_available(write_index, read_index, max_size);
245 
246  if (avail == 0)
247  return 0;
248 
249  const size_t output_count = avail;
250 
251  size_t new_read_index = read_index + output_count;
252 
253  if (read_index + output_count > max_size) {
254  /* copy data in two sections */
255  const size_t count0 = max_size - read_index;
256  const size_t count1 = output_count - count0;
257 
258  run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
259  run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
260 
261  new_read_index -= max_size;
262  } else {
263  run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
264 
265  if (new_read_index == max_size)
266  new_read_index = 0;
267  }
268 
269  read_index_.store(new_read_index, memory_order_release);
270  return output_count;
271  }
272 
273  size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
274  {
275  const size_t write_index = write_index_.load(memory_order_acquire);
276  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
277 
278  const size_t avail = read_available(write_index, read_index, max_size);
279 
280  if (avail == 0)
281  return 0;
282 
283  output_count = (std::min)(output_count, avail);
284 
285  size_t new_read_index = read_index + output_count;
286 
287  if (read_index + output_count > max_size) {
288  /* copy data in two sections */
289  const size_t count0 = max_size - read_index;
290  const size_t count1 = output_count - count0;
291 
292  copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
293  copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
294 
295  new_read_index -= max_size;
296  } else {
297  copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
298  if (new_read_index == max_size)
299  new_read_index = 0;
300  }
301 
302  read_index_.store(new_read_index, memory_order_release);
303  return output_count;
304  }
305 
306  template <typename OutputIterator>
307  size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
308  {
309  const size_t write_index = write_index_.load(memory_order_acquire);
310  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
311 
312  const size_t avail = read_available(write_index, read_index, max_size);
313  if (avail == 0)
314  return 0;
315 
316  size_t new_read_index = read_index + avail;
317 
318  if (read_index + avail > max_size) {
319  /* copy data in two sections */
320  const size_t count0 = max_size - read_index;
321  const size_t count1 = avail - count0;
322 
323  it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
324  copy_and_delete(internal_buffer, internal_buffer + count1, it);
325 
326  new_read_index -= max_size;
327  } else {
328  copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
329  if (new_read_index == max_size)
330  new_read_index = 0;
331  }
332 
333  read_index_.store(new_read_index, memory_order_release);
334  return avail;
335  }
336 
337  const T& front(const T * internal_buffer) const
338  {
339  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
340  return *(internal_buffer + read_index);
341  }
342 
343  T& front(T * internal_buffer)
344  {
345  const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
346  return *(internal_buffer + read_index);
347  }
348 #endif
349 
350 
351 public:
356  void reset(void)
357  {
358  if ( !boost::has_trivial_destructor<T>::value ) {
359  // make sure to call all destructors!
360 
361  T dummy_element;
362  while (pop(dummy_element))
363  {}
364  } else {
365  write_index_.store(0, memory_order_relaxed);
366  read_index_.store(0, memory_order_release);
367  }
368  }
369 
375  bool empty(void)
376  {
377  return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
378  }
379 
384  bool is_lock_free(void) const
385  {
386  return write_index_.is_lock_free() && read_index_.is_lock_free();
387  }
388 
389 private:
390  bool empty(size_t write_index, size_t read_index)
391  {
392  return write_index == read_index;
393  }
394 
395  template< class OutputIterator >
396  OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
397  {
398  if (boost::has_trivial_destructor<T>::value) {
399  return std::copy(first, last, out); // will use memcpy if possible
400  } else {
401  for (; first != last; ++first, ++out) {
402  *out = *first;
403 // first->~T(); // NOTE don't destroy
404  }
405  return out;
406  }
407  }
408 
409  template< class Functor >
410  void run_functor_and_delete( T * first, T * last, Functor & functor )
411  {
412  for (; first != last; ++first) {
413  functor(*first);
414 // first->~T(); // NOTE don't destroy
415  }
416  }
417 
418  template< class Functor >
419  void run_functor_and_delete( T * first, T * last, Functor const & functor )
420  {
421  for (; first != last; ++first) {
422  functor(*first);
423 // first->~T(); // NOTE don't destroy
424  }
425  }
426 };
427 
428 template <typename T, std::size_t MaxSize>
430  public ringbuffer_base<T>
431 {
432  typedef std::size_t size_type;
433  static const std::size_t max_size = MaxSize + 1;
434 
435  typedef typename boost::aligned_storage<max_size * sizeof(T),
436  boost::alignment_of<T>::value
437  >::type storage_type;
438 
439  storage_type storage_;
440 
441  T * data()
442  {
443  return static_cast<T*>(storage_.address());
444  }
445 
446  const T * data() const
447  {
448  return static_cast<const T*>(storage_.address());
449  }
450 
451 protected:
452  size_type max_number_of_elements() const
453  {
454  return max_size;
455  }
456 
457 public:
458 
459  // NOTE: default constructor to initialize buffer with newly-constructed elements [XBOT]
461  {
462  for(int i = 0; i < max_size; i++)
463  {
464  new (data() + i) T;
465  }
466  }
467 
468  bool push(T const & t)
469  {
470  return ringbuffer_base<T>::push(t, data(), max_size);
471  }
472 
473  template <typename Functor>
474  bool consume_one(Functor & f)
475  {
476  return ringbuffer_base<T>::consume_one(f, data(), max_size);
477  }
478 
479  template <typename Functor>
480  bool consume_one(Functor const & f)
481  {
482  return ringbuffer_base<T>::consume_one(f, data(), max_size);
483  }
484 
485  template <typename Functor>
486  size_type consume_all(Functor & f)
487  {
488  return ringbuffer_base<T>::consume_all(f, data(), max_size);
489  }
490 
491  template <typename Functor>
492  size_type consume_all(Functor const & f)
493  {
494  return ringbuffer_base<T>::consume_all(f, data(), max_size);
495  }
496 
497  size_type push(T const * t, size_type size)
498  {
499  return ringbuffer_base<T>::push(t, size, data(), max_size);
500  }
501 
502  template <size_type size>
503  size_type push(T const (&t)[size])
504  {
505  return push(t, size);
506  }
507 
508  template <typename ConstIterator>
509  ConstIterator push(ConstIterator begin, ConstIterator end)
510  {
511  return ringbuffer_base<T>::push(begin, end, data(), max_size);
512  }
513 
514  size_type pop(T * ret, size_type size)
515  {
516  return ringbuffer_base<T>::pop(ret, size, data(), max_size);
517  }
518 
519  template <typename OutputIterator>
520  size_type pop_to_output_iterator(OutputIterator it)
521  {
522  return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
523  }
524 
525  const T& front(void) const
526  {
527  return ringbuffer_base<T>::front(data());
528  }
529 
530  T& front(void)
531  {
532  return ringbuffer_base<T>::front(data());
533  }
534 
536  {
537  for(int i = 0; i < max_size; i++)
538  {
539  (data() + i)->~T();
540  }
541  }
542 };
543 
544 template <typename T, typename Alloc>
546  public ringbuffer_base<T>,
547  private Alloc
548 {
549  typedef std::size_t size_type;
550  size_type max_elements_;
551  typedef typename Alloc::pointer pointer;
552  pointer array_;
553 
554 protected:
555  size_type max_number_of_elements() const
556  {
557  return max_elements_;
558  }
559 
560 public:
561  explicit runtime_sized_ringbuffer(size_type max_elements):
562  max_elements_(max_elements + 1)
563  {
564  array_ = Alloc::allocate(max_elements_);
565  }
566 
567  template <typename U>
568  runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
569  Alloc(alloc), max_elements_(max_elements + 1)
570  {
571  array_ = Alloc::allocate(max_elements_);
572  }
573 
574  runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
575  Alloc(alloc), max_elements_(max_elements + 1)
576  {
577  array_ = Alloc::allocate(max_elements_);
578  }
579 
581  {
582  // destroy all remaining items
583  T out;
584  while (pop(&out, 1)) {}
585 
586  Alloc::deallocate(array_, max_elements_);
587  }
588 
589  bool push(T const & t)
590  {
591  return ringbuffer_base<T>::push(t, &*array_, max_elements_);
592  }
593 
594  template <typename Functor>
595  bool consume_one(Functor & f)
596  {
597  return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
598  }
599 
600  template <typename Functor>
601  bool consume_one(Functor const & f)
602  {
603  return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
604  }
605 
606  template <typename Functor>
607  size_type consume_all(Functor & f)
608  {
609  return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
610  }
611 
612  template <typename Functor>
613  size_type consume_all(Functor const & f)
614  {
615  return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
616  }
617 
618  size_type push(T const * t, size_type size)
619  {
620  return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
621  }
622 
623  template <size_type size>
624  size_type push(T const (&t)[size])
625  {
626  return push(t, size);
627  }
628 
629  template <typename ConstIterator>
630  ConstIterator push(ConstIterator begin, ConstIterator end)
631  {
632  return ringbuffer_base<T>::push(begin, end, array_, max_elements_);
633  }
634 
635  size_type pop(T * ret, size_type size)
636  {
637  return ringbuffer_base<T>::pop(ret, size, array_, max_elements_);
638  }
639 
640  template <typename OutputIterator>
641  size_type pop_to_output_iterator(OutputIterator it)
642  {
643  return ringbuffer_base<T>::pop_to_output_iterator(it, array_, max_elements_);
644  }
645 
646  const T& front(void) const
647  {
648  return ringbuffer_base<T>::front(array_);
649  }
650 
651  T& front(void)
652  {
653  return ringbuffer_base<T>::front(array_);
654  }
655 };
656 
657 template <typename T, typename A0, typename A1>
659 {
660  typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
661 
662  typedef extract_capacity<bound_args> extract_capacity_t;
663 
664  static const bool runtime_sized = !extract_capacity_t::has_capacity;
665  static const size_t capacity = extract_capacity_t::capacity;
666 
667  typedef extract_allocator<bound_args, T> extract_allocator_t;
668  typedef typename extract_allocator_t::type allocator;
669 
670  // allocator argument is only sane, for run-time sized ringbuffers
671  BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
672  mpl::bool_<!extract_allocator_t::has_allocator>,
673  mpl::true_
674  >::type::value));
675 
676  typedef typename mpl::if_c<runtime_sized,
680 };
681 
682 
683 } /* namespace detail */
684 
685 
700 #ifndef BOOST_DOXYGEN_INVOKED
701 template <typename T,
702  class A0 = boost::parameter::void_,
703  class A1 = boost::parameter::void_>
704 #else
705 template <typename T, ...Options>
706 #endif
708  public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
709 {
710 private:
711 
712 #ifndef BOOST_DOXYGEN_INVOKED
713  typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
714  static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
715  typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
716 
717  struct implementation_defined
718  {
719  typedef allocator_arg allocator;
720  typedef std::size_t size_type;
721  };
722 #endif
723 
724 public:
725  typedef T value_type;
726  typedef typename implementation_defined::allocator allocator;
727  typedef typename implementation_defined::size_type size_type;
728 
729 
734  // @{
736  {
737  BOOST_ASSERT(!runtime_sized);
738  }
739 
744  explicit spsc_queue(const T& t)
745  {
746  BOOST_ASSERT(!runtime_sized);
747  while(push(t)){}
748  while(pop()){}
749  }
750 
751 
752  template <typename U>
753  explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc)
754  {
755  // just for API compatibility: we don't actually need an allocator
756  BOOST_STATIC_ASSERT(!runtime_sized);
757  }
758 
759  explicit spsc_queue(allocator const & alloc)
760  {
761  // just for API compatibility: we don't actually need an allocator
762  BOOST_ASSERT(!runtime_sized);
763  }
764  // @}
765 
766 
771  // @{
772  explicit spsc_queue(size_type element_count):
773  base_type(element_count)
774  {
775  BOOST_ASSERT(runtime_sized);
776  }
777 
778  template <typename U>
779  spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
780  base_type(alloc, element_count)
781  {
782  BOOST_STATIC_ASSERT(runtime_sized);
783  }
784 
785  spsc_queue(size_type element_count, allocator_arg const & alloc):
786  base_type(alloc, element_count)
787  {
788  BOOST_ASSERT(runtime_sized);
789  }
790  // @}
791 
800  bool push(T const & t)
801  {
802  return base_type::push(t);
803  }
804 
813  bool pop ()
814  {
815  detail::consume_noop consume_functor;
816  return consume_one( consume_functor );
817  }
818 
827  template <typename U>
828  typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
829  pop (U & ret)
830  {
831  detail::consume_via_copy<U> consume_functor(ret);
832  return consume_one( consume_functor );
833  }
834 
842  size_type push(T const * t, size_type size)
843  {
844  return base_type::push(t, size);
845  }
846 
854  template <size_type size>
855  size_type push(T const (&t)[size])
856  {
857  return push(t, size);
858  }
859 
867  template <typename ConstIterator>
868  ConstIterator push(ConstIterator begin, ConstIterator end)
869  {
870  return base_type::push(begin, end);
871  }
872 
880  size_type pop(T * ret, size_type size)
881  {
882  return base_type::pop(ret, size);
883  }
884 
892  template <size_type size>
893  size_type pop(T (&ret)[size])
894  {
895  return pop(ret, size);
896  }
897 
905  template <typename OutputIterator>
906  typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
907  pop(OutputIterator it)
908  {
909  return base_type::pop_to_output_iterator(it);
910  }
911 
920  template <typename Functor>
921  bool consume_one(Functor & f)
922  {
923  return base_type::consume_one(f);
924  }
925 
927  template <typename Functor>
928  bool consume_one(Functor const & f)
929  {
930  return base_type::consume_one(f);
931  }
932 
941  template <typename Functor>
942  size_type consume_all(Functor & f)
943  {
944  return base_type::consume_all(f);
945  }
946 
948  template <typename Functor>
949  size_type consume_all(Functor const & f)
950  {
951  return base_type::consume_all(f);
952  }
953 
960  size_type read_available() const
961  {
962  return base_type::read_available(base_type::max_number_of_elements());
963  }
964 
971  size_type write_available() const
972  {
973  return base_type::write_available(base_type::max_number_of_elements());
974  }
975 
986  const T& front() const
987  {
988  BOOST_ASSERT(read_available() > 0);
989  return base_type::front();
990  }
991 
993  T& front()
994  {
995  BOOST_ASSERT(read_available() > 0);
996  return base_type::front();
997  }
998 
1003  void reset(void)
1004  {
1005  if ( !boost::has_trivial_destructor<T>::value ) {
1006  // make sure to call all destructors!
1007 
1008  T dummy_element;
1009  while (pop(dummy_element))
1010  {}
1011  } else {
1012  base_type::write_index_.store(0, memory_order_relaxed);
1013  base_type::read_index_.store(0, memory_order_release);
1014  }
1015  }
1016 
1021  void reset(const T& t)
1022  {
1023  if ( !boost::has_trivial_destructor<T>::value ) {
1024  while(pop()){}
1025  while(push(t)){}
1026  while(pop()){}
1027  } else {
1028  base_type::write_index_.store(0, memory_order_relaxed);
1029  base_type::read_index_.store(0, memory_order_release);
1030  }
1031  }
1032 };
1033 
1034 } /* namespace lockfree */
1035 } /* namespace boost */
1036 
1037 // } /* namespace ci_internal */
1038 
1039 #define likely old_likely
1040 #define unlikely old_unlikely
1041 
1042 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */
extract_capacity< bound_args > extract_capacity_t
boost::enable_if< typename is_convertible< T, U >::type, bool >::type pop(U &ret)
Pops one object from ringbuffer.
const T & front() const
get reference to element in the front of the queue
spsc_queue(typename allocator::template rebind< U >::other const &alloc)
mpl::if_c< runtime_sized, runtime_sized_ringbuffer< T, allocator >, compile_time_sized_ringbuffer< T, capacity > >::type ringbuffer_type
ConstIterator push(ConstIterator begin, ConstIterator end, T *internal_buffer, size_t max_size)
size_type push(T const *t, size_type size)
size_t pop(T *output_buffer, size_t output_count, T *internal_buffer, size_t max_size)
size_type write_available() const
get write space to write elements
ringbuffer_signature::bind< A0, A1 >::type bound_args
size_t consume_all(Functor &functor, T *internal_buffer, size_t max_size)
static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
void reset(const T &t)
reset the ringbuffer
size_type push(T const *t, size_type size)
Pushes as many objects from the array t as there is space.
ConstIterator push(ConstIterator begin, ConstIterator end)
bool consume_one(Functor const &functor, T *buffer, size_t max_size)
bool consume_one(Functor &f)
consumes one element via a functor
boost::disable_if< typename is_convertible< T, OutputIterator >::type, size_type >::type pop(OutputIterator it)
Pops objects to the output iterator it.
size_t read_available(size_t max_size) const
bool pop()
Pops one object from ringbuffer.
size_t consume_all(Functor const &functor, T *internal_buffer, size_t max_size)
void reset(void)
reset the ringbuffer
size_type pop(T(&ret)[size])
Pops a maximum of size objects from spsc_queue.
ConstIterator push(ConstIterator begin, ConstIterator end)
Pushes as many objects from the range [begin, end) as there is space .
bool consume_one(Functor &functor, T *buffer, size_t max_size)
The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-f...
size_t push(const T *input_buffer, size_t input_count, T *internal_buffer, size_t max_size)
runtime_sized_ringbuffer(typename Alloc::template rebind< U >::other const &alloc, size_type max_elements)
runtime_sized_ringbuffer(Alloc const &alloc, size_type max_elements)
ConstIterator push(ConstIterator begin, ConstIterator end)
spsc_queue(size_type element_count, allocator_arg const &alloc)
spsc_queue(size_type element_count, typename allocator::template rebind< U >::other const &alloc)
parameter::parameters< boost::parameter::optional< tag::capacity >, boost::parameter::optional< tag::allocator > > ringbuffer_signature
spsc_queue(allocator const &alloc)
bool empty(void)
Check if the ringbuffer is empty.
bool consume_one(Functor const &f)
consumes one element via a functor
implementation_defined::allocator allocator
bool push(T const &t)
Pushes object t to the ringbuffer.
size_type push(T const (&t)[size])
Pushes as many objects from the array t as there is space available.
T & front()
get reference to element in the front of the queue
void reset(void)
reset the ringbuffer
size_type pop(T *ret, size_type size)
Pops a maximum of size objects from ringbuffer.
bool push(T const &t, T *buffer, size_t max_size)
size_t pop_to_output_iterator(OutputIterator it, T *internal_buffer, size_t max_size)
size_t write_available(size_t max_size) const
size_type consume_all(Functor const &f)
consumes all elements via a functor
const T & front(const T *internal_buffer) const
static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
size_type read_available() const
get number of elements that are available for read
size_type consume_all(Functor &f)
consumes all elements via a functor
static size_t next_index(size_t arg, size_t max_size)
implementation_defined::size_type size_type
spsc_queue(void)
Constructs a spsc_queue.
spsc_queue(size_type element_count)
Constructs a spsc_queue for element_count elements.
spsc_queue(const T &t)
Constructs a spsc_queue.
extract_allocator< bound_args, T > extract_allocator_t