worker.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. #include <list>
  2. #include <vector>
  3. #include <mutex>
  4. #include <atomic>
  5. #include <memory>
  6. #include <thread>
  7. #include <ev++.h>
  8. #include "log.h"
  9. #include "worker.h"
  10. #include "message.h"
  11. #include "card_base.h"
  12. #include "card.h"
  13. #include "zloop.h"
  14. struct hash_thread
  15. {
  16. int m_num_thread=4;
  17. void set_num_thread(int num_thread)
  18. {
  19. m_num_thread=num_thread;
  20. }
  21. int hash_code(uint32_t card_id)
  22. {
  23. return card_id*2003%m_num_thread;
  24. }
  25. };
  26. hash_thread g_hash;
  27. struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
  28. {
  29. std::unique_ptr<std::thread> m_thread;
  30. int m_thread_id=0;
  31. int m_card_list_version=-1;
  32. std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
  33. worker_thread ()
  34. {
  35. m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
  36. m_local_card_list.reserve(128);
  37. }
  38. void set_thread_id(int id)
  39. {
  40. m_thread_id=id;
  41. }
  42. void run()
  43. {
  44. ev::timer card_timer_1s(*this);
  45. card_timer_1s.set<worker_thread,&worker_thread::on_timeout>(this);
  46. card_timer_1s.start(1,1);
  47. ev::dynamic_loop::run(0);
  48. log_info("worker_thread exit....");
  49. }
  50. virtual void on_async(const std::list<task*>&task_list)
  51. {
  52. for(task*t:task_list)
  53. {
  54. do_task(*t);
  55. free(t);
  56. }
  57. }
  58. void on_timeout()
  59. {
  60. int version=card_list::instance()->version();
  61. if(m_card_list_version!=version)
  62. {
  63. m_card_list_version=version;
  64. init_local_card_list();
  65. }
  66. for(auto&c:m_local_card_list)
  67. {
  68. c->on_timer();
  69. }
  70. }
  71. bool visit(std::shared_ptr<card_location_base> c)
  72. {
  73. if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
  74. {
  75. m_local_card_list.push_back(c);
  76. }
  77. return true;
  78. }
  79. void init_local_card_list()
  80. {
  81. m_local_card_list.clear();
  82. card_list::instance()->accept(*this);
  83. }
  84. void do_task(const task&t)
  85. {
  86. switch(t.m_cmd_code)
  87. {
  88. case 0x843b://tof
  89. case 0x863b://tdoa
  90. log_info("card loc message%04X",t.m_cmd_code);
  91. card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
  92. //card_message::on_loc_message(this,t.m_param1);
  93. break;
  94. case 0x853b://tof his
  95. case 0x873b://tdoa his
  96. log_info("site history message%04X",t.m_cmd_code);
  97. card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
  98. //site_message::on_sync(this,t.m_param1);
  99. break;
  100. case 0x804c://ctrl site message
  101. log_info("ctrl site message%04X",t.m_cmd_code);
  102. break;
  103. }
  104. }
  105. void join()
  106. {
  107. m_thread->join();
  108. }
  109. void stop()
  110. {
  111. async_stop();
  112. }
  113. };
  114. struct worker_impl:worker
  115. {
  116. std::vector<std::shared_ptr<worker_thread>> m_threads;
  117. std::atomic<int> m_init_flag{-2};
  118. virtual void stop()
  119. {
  120. int exp=0;
  121. if(!m_init_flag.compare_exchange_strong (exp,1))
  122. return;
  123. for(auto&thr:m_threads)
  124. thr->stop();
  125. for(auto&thr:m_threads)
  126. thr->join();
  127. }
  128. worker_thread& hash(uint32_t i)
  129. {
  130. return *m_threads[g_hash.hash_code(i)];
  131. }
  132. virtual void request(task*t)
  133. {
  134. hash(t->m_hash_id).async_request(t);
  135. }
  136. void init(int num_thread)
  137. {
  138. int exp=-2;
  139. if(m_init_flag.compare_exchange_strong (exp,-1))
  140. {
  141. m_threads.resize(num_thread);
  142. for(int i=0;i<num_thread;i++)
  143. {
  144. m_threads[i].reset(new worker_thread());
  145. m_threads[i]->set_thread_id(i);
  146. }
  147. m_init_flag.store(0);
  148. }
  149. while(0!=m_init_flag.load())
  150. std::this_thread::yield();
  151. }
  152. };
  153. worker_impl _worker_impl;
  154. worker*worker::instance()
  155. {
  156. int num_thread=std::thread::hardware_concurrency()*2;
  157. log_info("worker thread count=%d",num_thread);
  158. g_hash.set_num_thread(num_thread);
  159. _worker_impl.init(num_thread);
  160. return &_worker_impl;
  161. }