tqueue.h 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #ifndef INCLUDED_QUEUE_H
  2. #define INCLUDED_QUEUE_H
  3. #include <mutex>
  4. #include <condition_variable>
  5. #include <vector>
  6. #include <iostream>
  7. template<class T>
  8. class BoundedQueue
  9. {
  10. public:
  11. BoundedQueue(int max)
  12. : m_begin(0)
  13. , m_end(0)
  14. , m_buffered(0)
  15. , m_circularBuf(max)
  16. {
  17. }
  18. BoundedQueue(const BoundedQueue & rhs)=delete;
  19. BoundedQueue& operator=(const BoundedQueue & rhs)=delete;
  20. void put(const T &m)
  21. {
  22. std::unique_lock<std::mutex> lk(m_monitor);
  23. m_notFull.wait(lk,[this]{return m_buffered<m_circularBuf.size();});
  24. purePut(m);
  25. lk.unlock();
  26. m_notEmpty.notify_one();
  27. }
  28. void putHead(const T& m)
  29. {
  30. std::unique_lock<std::mutex> lk(m_monitor);
  31. m_notFull.wait(lk,[this]{return m_buffered<m_circularBuf.size();});
  32. purePutHead(m);
  33. lk.unlock();
  34. m_notEmpty.notify_one();
  35. }
  36. bool tryPut(const T &m)
  37. {
  38. std::unique_lock<std::mutex> lk(m_monitor);
  39. if(m_buffered >= m_circularBuf.size())
  40. {
  41. return false;
  42. }
  43. else
  44. {
  45. purePut(m);
  46. lk.unlock();
  47. m_notEmpty.notify_one();
  48. return true;
  49. }
  50. }
  51. bool tryPutHead(const T &m)
  52. {
  53. std::unique_lock<std::mutex> lk(m_monitor);
  54. if(m_buffered >= m_circularBuf.size())
  55. {
  56. return false;
  57. }
  58. else
  59. {
  60. purePutHead(m);
  61. lk.unlock();
  62. m_notEmpty.notify_one();
  63. return true;
  64. }
  65. }
  66. void get(T &m)
  67. {
  68. std::unique_lock<std::mutex> lk(m_monitor);
  69. m_notEmpty.wait(lk,[this]{return m_buffered>0;});
  70. pureGet(m);
  71. lk.unlock();
  72. m_notFull.notify_all();
  73. }
  74. bool get(T &m, int wait)
  75. {
  76. bool signaled = false;
  77. std::unique_lock<std::mutex> lk(m_monitor);
  78. signaled = m_notEmpty.wait_for(lk,std::chrono::seconds(wait),[this]{return m_buffered>0;});
  79. if(!signaled)
  80. return false;
  81. pureGet(m);
  82. lk.unlock();
  83. m_notFull.notify_all();
  84. return true;
  85. }
  86. bool tryGet(T &m)
  87. {
  88. std::unique_lock<std::mutex> lk(m_monitor);
  89. if(m_buffered <= 0)
  90. {
  91. return false;
  92. }
  93. else
  94. {
  95. pureGet(m);
  96. lk.unlock();
  97. m_notFull.notify_all();
  98. return true;
  99. }
  100. }
  101. size_t size()
  102. {
  103. return m_buffered;
  104. }
  105. private:
  106. inline void purePut(const T& m)
  107. {
  108. m_circularBuf[m_end] = m;
  109. m_end = (m_end+1) % m_circularBuf.size();
  110. ++m_buffered;
  111. }
  112. inline void purePutHead(const T& m)
  113. {
  114. if(m_begin == 0)
  115. m_begin = m_circularBuf.size();
  116. m_begin--;
  117. m_circularBuf[m_begin] = m;
  118. ++m_buffered;
  119. }
  120. inline void pureGet(T& m)
  121. {
  122. m = m_circularBuf[m_begin];
  123. // clear the memory
  124. T t;
  125. std::swap(m_circularBuf[m_begin], t);
  126. m_begin = (m_begin+1) % m_circularBuf.size();
  127. --m_buffered;
  128. }
  129. private:
  130. size_t m_begin;
  131. size_t m_end;
  132. size_t m_buffered;
  133. std::vector<T> m_circularBuf;
  134. std::condition_variable m_notFull;
  135. std::condition_variable m_notEmpty;
  136. std::mutex m_monitor;
  137. };
  138. #endif // INCLUDED_QUEUE_H