log_queue.cpp 4.3 KB


  1. #include <unistd.h>
  2. #include <errno.h>
  3. #include <sys/shm.h>
  4. #include <thread>
  5. #include <ztomic.h>
  6. #include <memory.h>
  7. #include <clock.h>
  8. #include <tools.h>
  9. #include <log.h>
  10. #include <sysv_shm.h>
  11. #include <log_queue.h>
  12. struct log_header
  13. {
  14. uint64_t _size;
  15. uint64_t _mask;
  16. uint64_t _pad1[6];
  17. uint64_t _gpos;//读取指针
  18. uint64_t _pad2[7];
  19. uint64_t _npos;//申请指针
  20. uint64_t _pad3[7];
  21. uint64_t _cpos;//提交指针
  22. uint64_t _pad4[7];
  23. pid_t _owner;
  24. uint64_t _tick;
  25. uint64_t _pad5[6];
  26. char p[0];
  27. inline void write(uint64_t pos,const char*s,int len)
  28. {
  29. int wp=index(pos);
  30. int ep=index(pos+len);
  31. if(wp<ep)
  32. {
  33. memcpy(&p[wp],s,len);
  34. }
  35. else
  36. {
  37. memcpy(&p[wp],s,_size-wp);
  38. memcpy(&p[0],s+_size-wp,ep);
  39. }
  40. }
  41. inline void read(uint64_t pos,char*b,int len)
  42. {
  43. int rp=index(pos);
  44. int ep=index(pos+len);
  45. if(rp<ep)
  46. {
  47. memcpy(b,&p[rp],len);
  48. }
  49. else
  50. {
  51. memcpy(b,&p[rp],_size-rp);
  52. memcpy(b+_size-rp,&p[0],ep);
  53. }
  54. }
  55. inline uint32_t yield(uint32_t n)
  56. {
  57. if(n<200)
  58. {
  59. }
  60. else if(n<1000)
  61. {
  62. std::this_thread::yield();
  63. }
  64. else if(n<2000)
  65. {
  66. usleep(1);
  67. }
  68. else
  69. {
  70. usleep(100);
  71. }
  72. return n+1;
  73. }
  74. inline int index(uint64_t pos)
  75. {
  76. return (int)(pos&_mask);
  77. }
  78. inline int mini(int i1,int i2)
  79. {
  80. return i1<i2?i1:i2;
  81. }
  82. //-----------|-------------|-----------------|>>>>>>>>
  83. // g c n
  84. void print(const char*s,int len)
  85. {
  86. uint64_t npos=ztomic::load(&_npos);
  87. uint32_t n=0;
  88. //申请空余的空间
  89. for(;;)
  90. {
  91. if(ztomic::cas(&_npos,&npos,npos+len))
  92. break;
  93. n=yield(n);
  94. }
  95. n=0;
  96. zclock c;
  97. uint64_t tm_all=0;
  98. //检查回绕,避免缓冲区重写
  99. for(uint32_t i=0;;i++)
  100. {
  101. uint64_t gpos=ztomic::load(&_gpos);
  102. if(gpos+_size>=npos+len)
  103. break;
  104. if((n=yield(n))>4000 && c.count_ms()>500)
  105. {
  106. tm_all+=c.pin_ms();
  107. std_error("log_queue::print已经阻塞了%ldms,请检查日志输出进程是否已经打开!\n",tm_all);
  108. }
  109. }
  110. write(npos,s,len);
  111. n=0;
  112. c.reset();
  113. for(uint32_t i=0;;i++)
  114. {
  115. // uint64_t cpos=ztomic::load(&_cpos);
  116. uint64_t n=npos;
  117. if(ztomic::cas(&_cpos,&n,n+len))
  118. break;
  119. if(n>=npos)
  120. break;
  121. if(n<npos && i>500 && c.count_ms()>200)//超时的话,直接设置提交指针
  122. {
  123. ztomic::store(&_cpos,npos+len);
  124. break;
  125. }
  126. n=yield(n);
  127. }
  128. }
  129. //-----------|-------------|-----------------|>>>>>>>>
  130. // g c n
  131. int get(char*s,int len)
  132. {
  133. uint64_t cpos=ztomic::load(&_cpos);
  134. if(_gpos<cpos)
  135. {
  136. int rc=mini(len,cpos-_gpos);
  137. read(_gpos,s,rc);
  138. ztomic::store(&_gpos,_gpos+rc);
  139. return rc;
  140. }
  141. return 0;
  142. }
  143. bool wait_owner()
  144. {
  145. pid_t owner=ztomic::load(&_owner);
  146. if(owner==0)
  147. {
  148. ztomic::store(&_owner,getpid());
  149. set_live();
  150. return true;
  151. }
  152. if(_owner==getpid())
  153. {
  154. set_live();
  155. return true;
  156. }
  157. uint64_t age0=age();
  158. for(int i=0;i<1000;i++)
  159. {
  160. if(age0>age())
  161. return false;
  162. usleep(1000);
  163. }
  164. ztomic::store(&_owner,getpid());
  165. set_live();
  166. return true;
  167. }
  168. //设置最后访问的时间戳
  169. void set_live()
  170. {
  171. struct timespec m_tmp;
  172. clock_gettime(CLOCK_MONOTONIC,&m_tmp);
  173. ztomic::store((uint64_t*)&_tick,*(uint64_t*)&m_tmp);
  174. }
  175. //当前时间与存储时间戳的差
  176. uint64_t age()const
  177. {
  178. struct timespec m_tmp;
  179. clock_gettime(CLOCK_MONOTONIC,&m_tmp);
  180. uint64_t start0=ztomic::load((uint64_t*)&_tick);
  181. struct timespec start=*(struct timespec*)&start0;
  182. long long ret=(m_tmp.tv_sec-start.tv_sec)*1000000;
  183. ret+=(m_tmp.tv_nsec-start.tv_nsec)/1000;
  184. return ret/1000;
  185. }
  186. };
  187. log_queue::log_queue()
  188. {
  189. _base=nullptr;
  190. }
  191. log_queue::~log_queue()
  192. {
  193. }
  194. int log_queue::open(const char*name,size_t queue_size)
  195. {
  196. size_t size=1<<20;
  197. while(size<queue_size)
  198. size<<=1;
  199. std_info("share memory fname:%s",name);
  200. if(_shm.open(name,size+sizeof(log_header))<0)
  201. return -1;
  202. _base=(log_header*) _shm.ptr();
  203. _base->_size=size;
  204. _base->_mask=size-1;
  205. return 0;
  206. }
  207. //-----------|-------------|-----------------|>>>>>>>>
  208. // g c n
  209. void log_queue::put(const char*s,int len)
  210. {
  211. _base->print(s,len);
  212. }
  213. int log_queue::get(char*s,int len)
  214. {
  215. return _base->get(s,len);
  216. }
  217. void log_queue::keep_alive()
  218. {
  219. return _base->set_live();
  220. }
  221. bool log_queue::wait_owner()
  222. {
  223. return _base->wait_owner();
  224. }