worker.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #include <functional>
  2. #include <list>
  3. #include <vector>
  4. #include <mutex>
  5. #include <atomic>
  6. #include <memory>
  7. #include <thread>
  8. #include <ev++.h>
  9. #include "log.h"
  10. #include "worker.h"
  11. #include "message.h"
  12. #include "card_base.h"
  13. #include "card.h"
  14. #include "zloop.h"
  15. #include "area.h"
  16. struct hash_thread
  17. {
  18. int m_num_thread=4;
  19. void set_num_thread(int num_thread)
  20. {
  21. m_num_thread=num_thread;
  22. }
  23. int hash_code(uint32_t card_id)
  24. {
  25. return card_id*2003%m_num_thread;
  26. }
  27. };
  28. hash_thread g_hash;
  29. struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
  30. {
  31. std::unique_ptr<std::thread> m_thread;
  32. int m_thread_id=0;
  33. int m_card_list_version=-1;
  34. std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
  35. worker_thread ()
  36. {
  37. m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
  38. m_local_card_list.reserve(128);
  39. }
  40. void set_thread_id(int id)
  41. {
  42. m_thread_id=id;
  43. }
  44. void run()
  45. {
  46. ev::timer card_timer_1s(*this);
  47. card_timer_1s.set<worker_thread,&worker_thread::on_timeout>(this);
  48. card_timer_1s.start(1,1);
  49. while(!check_stop_flag())
  50. {
  51. try
  52. {
  53. ev::dynamic_loop::run(0);
  54. }
  55. catch(const std::exception&e)
  56. {
  57. log_error("捕获到异常:%s",e.what());
  58. }
  59. catch(...)
  60. {
  61. log_error("捕获到未知异常");
  62. }
  63. }
  64. log_info("worker_thread exit....");
  65. }
  66. virtual void on_async(const std::list<task*>&task_list)
  67. {
  68. for(task*t:task_list)
  69. {
  70. do_task(*t);
  71. }
  72. }
  73. void update_local_cards()
  74. {
  75. int version=card_list::instance()->version();
  76. if(m_card_list_version!=version)
  77. {
  78. m_card_list_version=version;
  79. init_local_card_list();
  80. }
  81. }
  82. void on_timeout()
  83. {
  84. update_local_cards();
  85. for(auto&c:m_local_card_list)
  86. {
  87. c->on_timer();
  88. }
  89. }
  90. bool visit(std::shared_ptr<card_location_base> c)
  91. {
  92. if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
  93. {
  94. m_local_card_list.push_back(c);
  95. }
  96. return true;
  97. }
  98. void init_local_card_list()
  99. {
  100. m_local_card_list.clear();
  101. card_list::instance()->accept(*this);
  102. log_info("update local cards,count=%d",m_local_card_list.size());
  103. }
  104. void do_task(task&t)
  105. {
  106. switch(t.m_cmd_code)
  107. {
  108. case 0x843b://tof
  109. case 0x863b://tdoa
  110. log_info("card loc message%04X",t.m_cmd_code);
  111. card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
  112. t.destroy();
  113. //card_message::on_loc_message(this,t.m_param1);
  114. break;
  115. case 0x853b://tof his
  116. case 0x873b://tdoa his
  117. log_info("site history message%04X",t.m_cmd_code);
  118. card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
  119. //site_message::on_sync(this,t.m_param1);
  120. t.destroy();
  121. break;
  122. case 0x804c://ctrl site message
  123. log_info("ctrl site message%04X",t.m_cmd_code);
  124. t.destroy();
  125. break;
  126. case 0x10001://区域业务类型修改
  127. {
  128. update_local_cards();
  129. for(auto&c:m_local_card_list)
  130. {
  131. c->get_area_tool()->on_change_business(c,t);
  132. }
  133. auto&mcb=t.body<message_change_business>();
  134. std::shared_ptr<area> a=area_list::instance()->get(mcb.area_id);
  135. if(a && a->sub_frozen_count()==2)
  136. {
  137. a->set_business_list(std::move(mcb.new_list));
  138. a->sub_frozen_count();
  139. }
  140. if(mcb.ref_count.fetch_sub(1)==1)
  141. {
  142. t.destroy();
  143. }
  144. }
  145. }
  146. }
  147. void join()
  148. {
  149. m_thread->join();
  150. }
  151. void stop()
  152. {
  153. async_stop();
  154. }
  155. };
  156. struct worker_impl:worker
  157. {
  158. std::vector<std::shared_ptr<worker_thread>> m_threads;
  159. std::atomic<int> m_init_flag{-2};
  160. virtual void stop()
  161. {
  162. int exp=0;
  163. if(!m_init_flag.compare_exchange_strong (exp,1))
  164. return;
  165. for(auto&thr:m_threads)
  166. thr->stop();
  167. for(auto&thr:m_threads)
  168. thr->join();
  169. }
  170. worker_thread& hash(uint32_t i)
  171. {
  172. return *m_threads[g_hash.hash_code(i)];
  173. }
  174. virtual int num_thread()
  175. {
  176. return std::thread::hardware_concurrency();
  177. }
  178. bool running()
  179. {
  180. return m_init_flag.load()==0;
  181. }
  182. virtual void request(task*t)
  183. {
  184. hash(t->m_hash_id).async_request(t);
  185. }
  186. virtual void broadcast(task*tk)
  187. {
  188. for(auto&thr:m_threads)
  189. thr->async_request(tk);
  190. }
  191. void init(int num_thread)
  192. {
  193. int exp=-2;
  194. if(m_init_flag.compare_exchange_strong (exp,-1))
  195. {
  196. m_threads.resize(num_thread);
  197. for(int i=0;i<num_thread;i++)
  198. {
  199. m_threads[i].reset(new worker_thread());
  200. m_threads[i]->set_thread_id(i);
  201. }
  202. m_init_flag.store(0);
  203. }
  204. while(0!=m_init_flag.load())
  205. std::this_thread::yield();
  206. }
  207. };
  208. worker_impl _worker_impl;
  209. worker*worker::instance()
  210. {
  211. int num_thread=_worker_impl.num_thread();
  212. if(!_worker_impl.running())
  213. {
  214. log_info("worker thread count=%d",num_thread);
  215. g_hash.set_num_thread(num_thread);
  216. _worker_impl.init(num_thread);
  217. }
  218. return &_worker_impl;
  219. }