10 #ifndef __PION_PIONLOCKFREEQUEUE_HEADER__
11 #define __PION_PIONLOCKFREEQUEUE_HEADER__
13 #ifndef PION_HAVE_LOCKFREE
14 #error "PionLockFreeQueue requires the boost::lockfree library!"
19 #pragma warning(disable: 4800) // forcing value to bool 'true' or 'false' (performance warning)
21 #include <boost/lockfree/detail/tagged_ptr.hpp>
25 #include <boost/lockfree/detail/cas.hpp>
26 #include <boost/lockfree/detail/freelist.hpp>
27 #include <boost/lockfree/detail/branch_hints.hpp>
28 #include <boost/detail/atomic_count.hpp>
29 #include <boost/noncopyable.hpp>
30 #include <boost/thread/thread.hpp>
31 #include <pion/PionConfig.hpp>
52 private boost::noncopyable
76 QueueNode *node_ptr = m_free_list.allocate();
83 node_ptr->~QueueNode();
84 m_free_list.deallocate(node_ptr);
95 m_head_ptr.set_ptr(dummy_ptr);
96 m_tail_ptr.set_ptr(dummy_ptr);
107 return (m_head_ptr.
get_ptr() == m_tail_ptr.get_ptr());
111 inline std::size_t
size(
void)
const {
120 m_head_ptr = m_head_ptr->
next;
143 boost::lockfree::memory_barrier();
146 if (boost::lockfree::likely(tail_ptr == m_tail_ptr)) {
148 if (next_ptr.
get_ptr() == NULL) {
150 if (tail_ptr->
next.
cas(next_ptr, node_ptr)) {
152 m_tail_ptr.cas(tail_ptr, node_ptr);
157 m_tail_ptr.cas(tail_ptr, next_ptr.
get_ptr());
182 boost::lockfree::memory_barrier();
185 if (boost::lockfree::likely(head_ptr == m_head_ptr)) {
190 if (next_ptr == NULL)
194 m_tail_ptr.cas(tail_ptr, next_ptr);
203 if (m_head_ptr.
cas(head_ptr, next_ptr)) {
227 boost::detail::atomic_count m_size;
230 NodeFreeList m_free_list;
248 template <
typename T,
253 boost::uint16_t MaxSize = 50000,
254 boost::uint32_t SleepMilliSec = 10 >
255 class PionLockFreeQueue :
256 private boost::noncopyable
261 BOOST_STATIC_ASSERT(
sizeof(boost::uint32_t) >= (
sizeof(boost::uint16_t) * 2));
268 boost::uint16_t index;
270 boost::uint16_t counter;
273 boost::int32_t value;
294 return m_nodes[node_ptr.data.index];
307 boost::uint16_t new_index)
310 new_ptr.data.index = new_index;
311 new_ptr.data.counter = old_ptr.data.counter + 1;
312 return boost::lockfree::cas(&(cur_ptr.value), old_ptr.value, new_ptr.value);
316 inline boost::uint16_t acquireNode(
void) {
318 boost::uint16_t new_free_index;
319 boost::uint16_t avail_index;
324 current_free_ptr.value = m_free_ptr.value;
326 if (current_free_ptr.data.index > 0)
329 boost::system_time wakeup_time = boost::get_system_time()
330 + boost::posix_time::millisec(SleepMilliSec);
331 boost::thread::sleep(wakeup_time);
335 new_free_index = current_free_ptr.data.index - 1;
338 avail_index = m_free_nodes[new_free_index];
342 && cas(m_free_ptr, current_free_ptr, new_free_index))
344 m_free_nodes[new_free_index] = 0;
353 inline void releaseNode(
const boost::uint16_t node_index) {
355 boost::uint16_t new_free_index;
359 current_free_ptr.value = m_free_ptr.value;
362 new_free_index = current_free_ptr.data.index + 1;
365 if (m_free_nodes[current_free_ptr.data.index] == 0
366 && cas(m_free_ptr, current_free_ptr, new_free_index))
369 m_free_nodes[current_free_ptr.data.index] = node_index;
387 m_head_ptr.data.index = m_tail_ptr.data.index = 1;
388 m_head_ptr.data.counter = m_tail_ptr.data.counter = 0;
390 m_free_ptr.value = 0;
392 for (boost::uint16_t n = 0; n < MaxSize; ++n)
395 for (boost::uint16_t n = 0; n < MaxSize+2; ++n)
396 m_nodes[n].m_next.value = 0;
398 for (boost::uint16_t n = 2; n < MaxSize+2; ++n)
403 inline bool empty(
void)
const {
return m_free_ptr.data.index == 0; }
406 inline boost::uint16_t
size(
void)
const {
return m_free_ptr.data.index; }
413 inline void push(
const T& t) {
415 const boost::uint16_t node_index(acquireNode());
418 QueueNode& node_ref = m_nodes[node_index];
420 node_ref.m_next.data.index = 0;
426 tail_ptr.value = m_tail_ptr.value;
427 next_ptr.value = getQueueNode(tail_ptr).m_next.value;
429 if (tail_ptr.value == m_tail_ptr.value) {
431 if (next_ptr.data.index == 0) {
433 if (cas(getQueueNode(tail_ptr).m_next, next_ptr, node_index))
437 cas(m_tail_ptr, tail_ptr, next_ptr.data.index);
443 cas(m_tail_ptr, tail_ptr, node_index);
453 inline bool pop(T& t) {
460 head_ptr.value = m_head_ptr.value;
461 tail_ptr.value = m_tail_ptr.value;
462 next_ptr.value = getQueueNode(head_ptr).m_next.value;
464 if (head_ptr.value == m_head_ptr.value) {
466 if (head_ptr.data.index == tail_ptr.data.index) {
468 if (next_ptr.data.index == 0)
471 cas(m_tail_ptr, tail_ptr, next_ptr.data.index);
476 t = getQueueNode(next_ptr).m_data;
478 if (cas(m_head_ptr, head_ptr, next_ptr.data.index))
485 releaseNode(const_cast<boost::uint16_t&>(head_ptr.data.index));
493 boost::array<QueueNode, MaxSize+2> m_nodes;
496 boost::array<volatile boost::uint16_t, MaxSize> m_free_nodes;
data structure used to wrap each item in the queue
bool cas(tagged_ptr const &oldval, T *newptr)
virtual ~PionLockFreeQueue()
virtual destructor
QueueNode(void)
default constructor
PionLockFreeQueue(void)
constructs a new PionLockFreeQueue
bool empty(void) const
returns true if the queue is empty; false if it is not
boost::lockfree::tagged_ptr< QueueNode > next
points to the next node in the queue
QueueNode * createNode(void)
returns a new queue node item for use in the queue
volatile void clear(void)
boost::lockfree::tagged_ptr< QueueNode > QueueNodePtr
data type for an atomic QueueNode pointer
std::size_t size(void) const
returns the number of items that are currently in the queue
T data
data wrapped by the node item
void destroyNode(QueueNode *node_ptr)
frees memory for an existing queue node item
QueueNode(const T &d)
constructs QueueNode with a data value