log_queue.cpp 4.1 KB


  1. #include <unistd.h>
  2. #include <errno.h>
  3. #include <sys/shm.h>
  4. #include <thread>
  5. #include <ztomic.h>
  6. #include <memory.h>
  7. #include <clock.h>
  8. #include <tools.h>
  9. #include <log.h>
  10. #include <sysv_shm.h>
  11. #include <log_queue.h>
  12. struct log_header
  13. {
  14. uint64_t _size;
  15. uint64_t _mask;
  16. uint64_t _pad1[6];
  17. uint64_t _gpos;//读取指针
  18. uint64_t _pad2[7];
  19. uint64_t _npos;//申请指针
  20. uint64_t _pad3[7];
  21. uint64_t _cpos;//提交指针
  22. uint64_t _pad4[7];
  23. pid_t _owner;
  24. uint64_t _tick;
  25. uint64_t _pad5[6];
  26. char p[0];
  27. inline void write(uint64_t pos,const char*s,int len)
  28. {
  29. int wp=index(pos);
  30. int ep=index(pos+len);
  31. if(wp<ep)
  32. {
  33. memcpy(&p[wp],s,len);
  34. }
  35. else
  36. {
  37. memcpy(&p[wp],s,_size-wp);
  38. memcpy(&p[0],s+_size-wp,ep);
  39. }
  40. }
  41. inline void read(uint64_t pos,char*b,int len)
  42. {
  43. int rp=index(pos);
  44. int ep=index(pos+len);
  45. if(rp<ep)
  46. {
  47. memcpy(b,&p[rp],len);
  48. }
  49. else
  50. {
  51. memcpy(b,&p[rp],_size-rp);
  52. memcpy(b+_size-rp,&p[0],ep);
  53. }
  54. }
  55. inline uint32_t yield(uint32_t n)
  56. {
  57. if(n<200)
  58. {
  59. }
  60. else if(n<1000)
  61. {
  62. std::this_thread::yield();
  63. }
  64. else if(n<2000)
  65. {
  66. usleep(1);
  67. }
  68. else
  69. {
  70. usleep(100);
  71. }
  72. return n+1;
  73. }
  74. inline int index(uint64_t pos)
  75. {
  76. return (int)(pos&_mask);
  77. }
  78. inline int mini(int i1,int i2)
  79. {
  80. return i1<i2?i1:i2;
  81. }
  82. void print(const char*s,int len)
  83. {
  84. uint64_t npos=ztomic::load(&_npos);
  85. uint32_t n=0;
  86. //申请空余的空间
  87. for(;;)
  88. {
  89. if(ztomic::cas(&_npos,&npos,npos+len))
  90. break;
  91. n=yield(n);
  92. }
  93. n=0;
  94. zclock c;
  95. uint64_t tm_all=0;
  96. //检查回绕,避免缓冲区重写
  97. for(uint32_t i=0;;i++)
  98. {
  99. uint64_t gpos=ztomic::load(&_gpos);
  100. if(gpos+_size>=npos+len)
  101. break;
  102. if((n=yield(n))>4000 && c.count_ms()>500)
  103. {
  104. tm_all+=c.pin_ms();
  105. std_error("log_queue::print已经阻塞了%ldms,请检查日志输出进程是否已经打开!\n",tm_all);
  106. }
  107. }
  108. write(npos,s,len);
  109. n=0;
  110. c.reset();
  111. for(uint32_t i=0;;i++)
  112. {
  113. uint64_t cpos=ztomic::load(&_cpos);
  114. if(ztomic::cas(&_cpos,&npos,npos+len))
  115. break;
  116. if(cpos>npos)
  117. break;
  118. if(cpos<npos && i>500 && c.count_ms()>200)//超时的话,直接设置提交指针
  119. {
  120. ztomic::store(&_cpos,npos+len);
  121. break;
  122. }
  123. n=yield(n);
  124. }
  125. }
  126. //-----------|-------------|-----------------|>>>>>>>>
  127. // g c n
  128. int get(char*s,int len)
  129. {
  130. uint64_t cpos=ztomic::load(&_cpos);
  131. if(_gpos<cpos)
  132. {
  133. int rc=mini(len,cpos-_gpos);
  134. read(_gpos,s,rc);
  135. ztomic::store(&_gpos,_gpos+rc);
  136. return rc;
  137. }
  138. return 0;
  139. }
  140. bool wait_owner()
  141. {
  142. pid_t owner=ztomic::load(&_owner);
  143. if(owner==0)
  144. {
  145. ztomic::store(&_owner,getpid());
  146. set_live();
  147. return true;
  148. }
  149. if(_owner==getpid())
  150. {
  151. set_live();
  152. return true;
  153. }
  154. uint64_t age0=age();
  155. for(int i=0;i<1000;i++)
  156. {
  157. if(age0>age())
  158. return false;
  159. usleep(1000);
  160. }
  161. ztomic::store(&_owner,getpid());
  162. set_live();
  163. return true;
  164. }
  165. //设置最后访问的时间戳
  166. void set_live()
  167. {
  168. struct timespec m_tmp;
  169. clock_gettime(CLOCK_MONOTONIC,&m_tmp);
  170. ztomic::store((uint64_t*)&_tick,*(uint64_t*)&m_tmp);
  171. }
  172. //当前时间与存储时间戳的差
  173. uint64_t age()const
  174. {
  175. struct timespec m_tmp;
  176. clock_gettime(CLOCK_MONOTONIC,&m_tmp);
  177. uint64_t start0=ztomic::load((uint64_t*)&_tick);
  178. struct timespec start=*(struct timespec*)&start0;
  179. long long ret=(m_tmp.tv_sec-start.tv_sec)*1000000;
  180. ret+=(m_tmp.tv_nsec-start.tv_nsec)/1000;
  181. return ret/1000;
  182. }
  183. };
  184. log_queue::log_queue()
  185. {
  186. _base=nullptr;
  187. }
  188. log_queue::~log_queue()
  189. {
  190. }
  191. int log_queue::open(const char*name,size_t queue_size)
  192. {
  193. size_t size=1<<20;
  194. while(size<queue_size)
  195. size<<=1;
  196. if(_shm.open(name,size+sizeof(log_header))<0)
  197. return -1;
  198. _base=(log_header*) _shm.ptr();
  199. _base->_size=size;
  200. _base->_mask=size-1;
  201. return 0;
  202. }
  203. //-----------|-------------|-----------------|>>>>>>>>
  204. // g c n
  205. void log_queue::put(const char*s,int len)
  206. {
  207. _base->print(s,len);
  208. }
  209. int log_queue::get(char*s,int len)
  210. {
  211. return _base->get(s,len);
  212. }
  213. void log_queue::keep_alive()
  214. {
  215. return _base->set_live();
  216. }
  217. bool log_queue::wait_owner()
  218. {
  219. return _base->wait_owner();
  220. }