10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED 14 #define old_likely likely 15 #define old_unlikely unlikely 16 #undef likely // is_likely 17 #undef unlikely // is_unlikely 23 #include <boost/aligned_storage.hpp> 24 #include <boost/assert.hpp> 25 #include <boost/static_assert.hpp> 26 #include <boost/utility.hpp> 27 #include <boost/utility/enable_if.hpp> 29 #include <boost/type_traits/has_trivial_destructor.hpp> 30 #include <boost/type_traits/is_convertible.hpp> 32 #include <boost/lockfree/detail/atomic.hpp> 33 #include <boost/lockfree/detail/copy_payload.hpp> 34 #include <boost/lockfree/detail/parameter.hpp> 35 #include <boost/lockfree/detail/prefix.hpp> 37 #ifdef BOOST_HAS_PRAGMA_ONCE 47 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
48 boost::parameter::optional<tag::allocator>
54 #ifndef BOOST_DOXYGEN_INVOKED 67 write_index_(0), read_index_(0)
73 while ((ret >= max_size))
78 static size_t read_available(
size_t write_index,
size_t read_index,
size_t max_size)
80 if (write_index >= read_index)
81 return write_index - read_index;
83 const size_t ret = write_index + max_size - read_index;
87 static size_t write_available(
size_t write_index,
size_t read_index,
size_t max_size)
89 size_t ret = read_index - write_index - 1;
90 if (write_index >= read_index)
97 size_t write_index = write_index_.load(memory_order_relaxed);
98 const size_t read_index = read_index_.load(memory_order_relaxed);
104 size_t write_index = write_index_.load(memory_order_relaxed);
105 const size_t read_index = read_index_.load(memory_order_relaxed);
109 bool push(T
const & t, T * buffer,
size_t max_size)
111 const size_t write_index = write_index_.load(memory_order_relaxed);
112 const size_t next =
next_index(write_index, max_size);
115 if (next == read_index_.load(memory_order_acquire))
119 *(buffer + write_index) = t;
121 write_index_.store(next, memory_order_release);
126 size_t push(
const T * input_buffer,
size_t input_count, T * internal_buffer,
size_t max_size)
128 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
131 template <
typename ConstIterator>
132 ConstIterator
push(ConstIterator begin, ConstIterator end, T * internal_buffer,
size_t max_size)
136 const size_t write_index = write_index_.load(memory_order_relaxed);
137 const size_t read_index = read_index_.load(memory_order_acquire);
138 const size_t avail =
write_available(write_index, read_index, max_size);
143 size_t input_count = std::distance(begin, end);
144 input_count = (std::min)(input_count, avail);
146 size_t new_write_index = write_index + input_count;
148 const ConstIterator last = boost::next(begin, input_count);
150 if (write_index + input_count > max_size) {
152 const size_t count0 = max_size - write_index;
153 const ConstIterator midpoint = boost::next(begin, count0);
155 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
156 std::uninitialized_copy(midpoint, last, internal_buffer);
157 new_write_index -= max_size;
159 std::uninitialized_copy(begin, last, internal_buffer + write_index);
161 if (new_write_index == max_size)
165 write_index_.store(new_write_index, memory_order_release);
169 template <
typename Functor>
172 const size_t write_index = write_index_.load(memory_order_acquire);
173 const size_t read_index = read_index_.load(memory_order_relaxed);
174 if (
empty(write_index, read_index) )
177 T & object_to_consume = buffer[read_index];
178 functor( object_to_consume );
181 size_t next =
next_index(read_index, max_size);
182 read_index_.store(next, memory_order_release);
186 template <
typename Functor>
187 bool consume_one(Functor
const & functor, T * buffer,
size_t max_size)
189 const size_t write_index = write_index_.load(memory_order_acquire);
190 const size_t read_index = read_index_.load(memory_order_relaxed);
191 if (
empty(write_index, read_index) )
194 T & object_to_consume = buffer[read_index];
195 functor( object_to_consume );
198 size_t next =
next_index(read_index, max_size);
199 read_index_.store(next, memory_order_release);
203 template <
typename Functor>
204 size_t consume_all (Functor
const & functor, T * internal_buffer,
size_t max_size)
206 const size_t write_index = write_index_.load(memory_order_acquire);
207 const size_t read_index = read_index_.load(memory_order_relaxed);
209 const size_t avail =
read_available(write_index, read_index, max_size);
214 const size_t output_count = avail;
216 size_t new_read_index = read_index + output_count;
218 if (read_index + output_count > max_size) {
220 const size_t count0 = max_size - read_index;
221 const size_t count1 = output_count - count0;
223 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
224 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
226 new_read_index -= max_size;
228 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
230 if (new_read_index == max_size)
234 read_index_.store(new_read_index, memory_order_release);
238 template <
typename Functor>
239 size_t consume_all (Functor & functor, T * internal_buffer,
size_t max_size)
241 const size_t write_index = write_index_.load(memory_order_acquire);
242 const size_t read_index = read_index_.load(memory_order_relaxed);
244 const size_t avail =
read_available(write_index, read_index, max_size);
249 const size_t output_count = avail;
251 size_t new_read_index = read_index + output_count;
253 if (read_index + output_count > max_size) {
255 const size_t count0 = max_size - read_index;
256 const size_t count1 = output_count - count0;
258 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
259 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
261 new_read_index -= max_size;
263 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
265 if (new_read_index == max_size)
269 read_index_.store(new_read_index, memory_order_release);
273 size_t pop (T * output_buffer,
size_t output_count, T * internal_buffer,
size_t max_size)
275 const size_t write_index = write_index_.load(memory_order_acquire);
276 const size_t read_index = read_index_.load(memory_order_relaxed);
278 const size_t avail =
read_available(write_index, read_index, max_size);
283 output_count = (std::min)(output_count, avail);
285 size_t new_read_index = read_index + output_count;
287 if (read_index + output_count > max_size) {
289 const size_t count0 = max_size - read_index;
290 const size_t count1 = output_count - count0;
292 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
293 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
295 new_read_index -= max_size;
297 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
298 if (new_read_index == max_size)
302 read_index_.store(new_read_index, memory_order_release);
306 template <
typename OutputIterator>
309 const size_t write_index = write_index_.load(memory_order_acquire);
310 const size_t read_index = read_index_.load(memory_order_relaxed);
312 const size_t avail =
read_available(write_index, read_index, max_size);
316 size_t new_read_index = read_index + avail;
318 if (read_index + avail > max_size) {
320 const size_t count0 = max_size - read_index;
321 const size_t count1 = avail - count0;
323 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
324 copy_and_delete(internal_buffer, internal_buffer + count1, it);
326 new_read_index -= max_size;
328 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
329 if (new_read_index == max_size)
333 read_index_.store(new_read_index, memory_order_release);
337 const T&
front(
const T * internal_buffer)
const 339 const size_t read_index = read_index_.load(memory_order_relaxed);
340 return *(internal_buffer + read_index);
345 const size_t read_index = read_index_.load(memory_order_relaxed);
346 return *(internal_buffer + read_index);
358 if ( !boost::has_trivial_destructor<T>::value ) {
362 while (
pop(dummy_element))
365 write_index_.store(0, memory_order_relaxed);
366 read_index_.store(0, memory_order_release);
377 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
386 return write_index_.is_lock_free() && read_index_.is_lock_free();
390 bool empty(
size_t write_index,
size_t read_index)
392 return write_index == read_index;
395 template<
class OutputIterator >
396 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
398 if (boost::has_trivial_destructor<T>::value) {
399 return std::copy(first, last, out);
401 for (; first != last; ++first, ++out) {
409 template<
class Functor >
410 void run_functor_and_delete( T * first, T * last, Functor & functor )
412 for (; first != last; ++first) {
418 template<
class Functor >
419 void run_functor_and_delete( T * first, T * last, Functor
const & functor )
421 for (; first != last; ++first) {
428 template <
typename T, std::
size_t MaxSize>
432 typedef std::size_t size_type;
433 static const std::size_t max_size = MaxSize + 1;
435 typedef typename boost::aligned_storage<max_size *
sizeof(T),
436 boost::alignment_of<T>::value
437 >::type storage_type;
439 storage_type storage_;
443 return static_cast<T*
>(storage_.address());
446 const T * data()
const 448 return static_cast<const T*
>(storage_.address());
462 for(
int i = 0; i < max_size; i++)
473 template <
typename Functor>
479 template <
typename Functor>
485 template <
typename Functor>
491 template <
typename Functor>
497 size_type
push(T
const * t, size_type size)
502 template <
size_type size>
503 size_type
push(T
const (&t)[size])
505 return push(t, size);
508 template <
typename ConstIterator>
509 ConstIterator
push(ConstIterator begin, ConstIterator end)
514 size_type
pop(T * ret, size_type size)
519 template <
typename OutputIterator>
537 for(
int i = 0; i < max_size; i++)
544 template <
typename T,
typename Alloc>
549 typedef std::size_t size_type;
550 size_type max_elements_;
551 typedef typename Alloc::pointer pointer;
557 return max_elements_;
562 max_elements_(max_elements + 1)
564 array_ = Alloc::allocate(max_elements_);
567 template <
typename U>
569 Alloc(alloc), max_elements_(max_elements + 1)
571 array_ = Alloc::allocate(max_elements_);
575 Alloc(alloc), max_elements_(max_elements + 1)
577 array_ = Alloc::allocate(max_elements_);
584 while (
pop(&out, 1)) {}
586 Alloc::deallocate(array_, max_elements_);
594 template <
typename Functor>
600 template <
typename Functor>
606 template <
typename Functor>
612 template <
typename Functor>
618 size_type
push(T
const * t, size_type size)
623 template <
size_type size>
624 size_type
push(T
const (&t)[size])
626 return push(t, size);
629 template <
typename ConstIterator>
630 ConstIterator
push(ConstIterator begin, ConstIterator end)
635 size_type
pop(T * ret, size_type size)
640 template <
typename OutputIterator>
657 template <
typename T,
typename A0,
typename A1>
660 typedef typename ringbuffer_signature::bind<A0, A1>::type
bound_args;
664 static const bool runtime_sized = !extract_capacity_t::has_capacity;
665 static const size_t capacity = extract_capacity_t::capacity;
671 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
672 mpl::bool_<!extract_allocator_t::has_allocator>,
676 typedef typename mpl::if_c<runtime_sized,
700 #ifndef BOOST_DOXYGEN_INVOKED 701 template <
typename T,
702 class A0 = boost::parameter::void_,
703 class A1 = boost::parameter::void_>
705 template <
typename T, ...Options>
712 #ifndef BOOST_DOXYGEN_INVOKED 717 struct implementation_defined
720 typedef std::size_t size_type;
726 typedef typename implementation_defined::allocator
allocator;
727 typedef typename implementation_defined::size_type
size_type;
737 BOOST_ASSERT(!runtime_sized);
746 BOOST_ASSERT(!runtime_sized);
752 template <
typename U>
753 explicit spsc_queue(
typename allocator::template rebind<U>::other
const & alloc)
756 BOOST_STATIC_ASSERT(!runtime_sized);
762 BOOST_ASSERT(!runtime_sized);
773 base_type(element_count)
775 BOOST_ASSERT(runtime_sized);
778 template <
typename U>
779 spsc_queue(size_type element_count,
typename allocator::template rebind<U>::other
const & alloc):
780 base_type(alloc, element_count)
782 BOOST_STATIC_ASSERT(runtime_sized);
785 spsc_queue(size_type element_count, allocator_arg
const & alloc):
786 base_type(alloc, element_count)
788 BOOST_ASSERT(runtime_sized);
802 return base_type::push(t);
815 detail::consume_noop consume_functor;
827 template <
typename U>
828 typename boost::enable_if<typename is_convertible<T, U>::type,
bool>::type
831 detail::consume_via_copy<U> consume_functor(ret);
842 size_type
push(T
const * t, size_type size)
844 return base_type::push(t, size);
854 template <
size_type size>
855 size_type
push(T
const (&t)[size])
857 return push(t, size);
867 template <
typename ConstIterator>
868 ConstIterator
push(ConstIterator begin, ConstIterator end)
870 return base_type::push(begin, end);
880 size_type
pop(T * ret, size_type size)
882 return base_type::pop(ret, size);
892 template <
size_type size>
893 size_type
pop(T (&ret)[size])
895 return pop(ret, size);
905 template <
typename OutputIterator>
906 typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
909 return base_type::pop_to_output_iterator(it);
920 template <
typename Functor>
923 return base_type::consume_one(f);
927 template <
typename Functor>
930 return base_type::consume_one(f);
941 template <
typename Functor>
944 return base_type::consume_all(f);
948 template <
typename Functor>
951 return base_type::consume_all(f);
962 return base_type::read_available(base_type::max_number_of_elements());
973 return base_type::write_available(base_type::max_number_of_elements());
989 return base_type::front();
996 return base_type::front();
1005 if ( !boost::has_trivial_destructor<T>::value ) {
1009 while (
pop(dummy_element))
1012 base_type::write_index_.store(0, memory_order_relaxed);
1013 base_type::read_index_.store(0, memory_order_release);
1023 if ( !boost::has_trivial_destructor<T>::value ) {
1028 base_type::write_index_.store(0, memory_order_relaxed);
1029 base_type::read_index_.store(0, memory_order_release);
1039 #define likely old_likely 1040 #define unlikely old_unlikely ~runtime_sized_ringbuffer(void)
size_type pop_to_output_iterator(OutputIterator it)
extract_capacity< bound_args > extract_capacity_t
boost::enable_if< typename is_convertible< T, U >::type, bool >::type pop(U &ret)
Pops one object from ringbuffer.
const T & front() const
get reference to element in the front of the queue
spsc_queue(typename allocator::template rebind< U >::other const &alloc)
mpl::if_c< runtime_sized, runtime_sized_ringbuffer< T, allocator >, compile_time_sized_ringbuffer< T, capacity > >::type ringbuffer_type
size_type consume_all(Functor &f)
ConstIterator push(ConstIterator begin, ConstIterator end, T *internal_buffer, size_t max_size)
bool consume_one(Functor const &f)
size_type push(T const *t, size_type size)
size_t pop(T *output_buffer, size_t output_count, T *internal_buffer, size_t max_size)
size_type pop(T *ret, size_type size)
size_type write_available() const
get write space to write elements
size_type consume_all(Functor &f)
ringbuffer_signature::bind< A0, A1 >::type bound_args
size_t consume_all(Functor &functor, T *internal_buffer, size_t max_size)
static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
void reset(const T &t)
reset the ringbuffer
compile_time_sized_ringbuffer()
size_type push(T const *t, size_type size)
Pushes as many objects from the array t as there is space.
ConstIterator push(ConstIterator begin, ConstIterator end)
size_type consume_all(Functor const &f)
bool is_lock_free(void) const
bool consume_one(Functor const &functor, T *buffer, size_t max_size)
bool consume_one(Functor &f)
consumes one element via a functor
boost::disable_if< typename is_convertible< T, OutputIterator >::type, size_type >::type pop(OutputIterator it)
Pops objects to the output iterator it.
size_t read_available(size_t max_size) const
bool pop()
Pops one object from ringbuffer.
size_t consume_all(Functor const &functor, T *internal_buffer, size_t max_size)
bool consume_one(Functor const &f)
void reset(void)
reset the ringbuffer
size_type pop(T(&ret)[size])
Pops a maximum of size objects from spsc_queue.
ConstIterator push(ConstIterator begin, ConstIterator end)
Pushes as many objects from the range [begin, end) as there is space .
size_type push(T const *t, size_type size)
bool consume_one(Functor &functor, T *buffer, size_t max_size)
size_type push(T const (&t)[size])
char padding1[padding_size]
The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-f...
size_t push(const T *input_buffer, size_t input_count, T *internal_buffer, size_t max_size)
runtime_sized_ringbuffer(typename Alloc::template rebind< U >::other const &alloc, size_type max_elements)
runtime_sized_ringbuffer(Alloc const &alloc, size_type max_elements)
size_type pop(T *ret, size_type size)
static const int padding_size
size_type pop_to_output_iterator(OutputIterator it)
size_type max_number_of_elements() const
ConstIterator push(ConstIterator begin, ConstIterator end)
spsc_queue(size_type element_count, allocator_arg const &alloc)
~compile_time_sized_ringbuffer()
spsc_queue(size_type element_count, typename allocator::template rebind< U >::other const &alloc)
const T & front(void) const
parameter::parameters< boost::parameter::optional< tag::capacity >, boost::parameter::optional< tag::allocator > > ringbuffer_signature
spsc_queue(allocator const &alloc)
bool empty(void)
Check if the ringbuffer is empty.
bool consume_one(Functor const &f)
consumes one element via a functor
implementation_defined::allocator allocator
bool push(T const &t)
Pushes object t to the ringbuffer.
bool consume_one(Functor &f)
atomic< size_t > write_index_
size_type push(T const (&t)[size])
Pushes as many objects from the array t as there is space available.
T & front()
get reference to element in the front of the queue
void reset(void)
reset the ringbuffer
size_type pop(T *ret, size_type size)
Pops a maximum of size objects from ringbuffer.
runtime_sized_ringbuffer(size_type max_elements)
bool push(T const &t, T *buffer, size_t max_size)
atomic< size_t > read_index_
size_t pop_to_output_iterator(OutputIterator it, T *internal_buffer, size_t max_size)
T & front(T *internal_buffer)
size_t write_available(size_t max_size) const
size_type consume_all(Functor const &f)
bool consume_one(Functor &f)
size_type consume_all(Functor const &f)
consumes all elements via a functor
const T & front(const T *internal_buffer) const
static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
size_type read_available() const
get number of elements that are available for read
size_type consume_all(Functor &f)
consumes all elements via a functor
static size_t next_index(size_t arg, size_t max_size)
extract_allocator_t::type allocator
size_type max_number_of_elements() const
implementation_defined::size_type size_type
spsc_queue(void)
Constructs a spsc_queue.
const T & front(void) const
size_type push(T const (&t)[size])
spsc_queue(size_type element_count)
Constructs a spsc_queue for element_count elements.
spsc_queue(const T &t)
Constructs a spsc_queue.
extract_allocator< bound_args, T > extract_allocator_t