worker.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #include <list>
  2. #include <vector>
  3. #include <mutex>
  4. #include <atomic>
  5. #include <memory>
  6. #include <thread>
  7. #include <ev++.h>
  8. #include "log.h"
  9. #include "worker.h"
  10. #include "message.h"
  11. #include "card_base.h"
  12. #include "card.h"
  13. #include "zloop.h"
  14. #include "site_message_handle.h"
  15. struct hash_thread
  16. {
  17. int m_num_thread=4;
  18. void set_num_thread(int num_thread)
  19. {
  20. m_num_thread=num_thread;
  21. }
  22. int hash_code(uint32_t card_id)
  23. {
  24. return card_id*2003%m_num_thread;
  25. }
  26. };
  27. hash_thread g_hash;
  28. struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
  29. {
  30. std::unique_ptr<std::thread> m_thread;
  31. int m_thread_id=0;
  32. int m_card_list_version=-1;
  33. std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
  34. worker_thread ()
  35. {
  36. m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
  37. m_local_card_list.reserve(128);
  38. }
  39. void set_thread_id(int id)
  40. {
  41. m_thread_id=id;
  42. }
  43. void run()
  44. {
  45. ev::timer card_timer_1s(*this);
  46. card_timer_1s.set<worker_thread,&worker_thread::on_timeout>(this);
  47. card_timer_1s.start(1,1);
  48. ev::dynamic_loop::run(0);
  49. log_info("worker_thread exit....");
  50. }
  51. virtual void on_async(const std::list<task*>&task_list)
  52. {
  53. for(task*t:task_list)
  54. {
  55. do_task(*t);
  56. free(t);
  57. }
  58. }
  59. void on_timeout()
  60. {
  61. int version=card_list::instance()->version();
  62. if(m_card_list_version!=version)
  63. {
  64. m_card_list_version=version;
  65. init_local_card_list();
  66. }
  67. for(auto&c:m_local_card_list)
  68. {
  69. c->on_timer();
  70. }
  71. }
  72. bool visit(std::shared_ptr<card_location_base> c)
  73. {
  74. if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
  75. {
  76. m_local_card_list.push_back(c);
  77. }
  78. return true;
  79. }
  80. void init_local_card_list()
  81. {
  82. m_local_card_list.clear();
  83. card_list::instance()->accept(*this);
  84. }
  85. void do_task(const task&t)
  86. {
  87. switch(t.m_cmd_code)
  88. {
  89. case 0x843b://tof
  90. case 0x863b://tdoa
  91. log_info("card loc message%04X",t.m_cmd_code);
  92. card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
  93. //card_message::on_loc_message(this,t.m_param1);
  94. break;
  95. case 0x853b://tof his
  96. case 0x873b://tdoa his
  97. log_info("site history message%04X",t.m_cmd_code);
  98. card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
  99. //site_message::on_sync(this,t.m_param1);
  100. break;
  101. case 0x804c://ctrl site message
  102. log_info("ctrl site message%04X",t.m_cmd_code);
  103. break;
  104. }
  105. }
  106. void join()
  107. {
  108. m_thread->join();
  109. }
  110. void stop()
  111. {
  112. async_stop();
  113. }
  114. };
  115. struct worker_impl:worker
  116. {
  117. std::vector<std::shared_ptr<worker_thread>> m_threads;
  118. std::atomic<int> m_init_flag{-2};
  119. virtual void stop()
  120. {
  121. int exp=0;
  122. if(!m_init_flag.compare_exchange_strong (exp,1))
  123. return;
  124. for(auto&thr:m_threads)
  125. thr->stop();
  126. for(auto&thr:m_threads)
  127. thr->join();
  128. }
  129. worker_thread& hash(uint32_t i)
  130. {
  131. return *m_threads[g_hash.hash_code(i)];
  132. }
  133. virtual void request(task*t)
  134. {
  135. hash(t->m_hash_id).async_request(t);
  136. }
  137. void init(int num_thread)
  138. {
  139. int exp=-2;
  140. if(m_init_flag.compare_exchange_strong (exp,-1))
  141. {
  142. m_threads.resize(num_thread);
  143. for(int i=0;i<num_thread;i++)
  144. {
  145. m_threads[i].reset(new worker_thread());
  146. m_threads[i]->set_thread_id(i);
  147. }
  148. m_init_flag.store(0);
  149. }
  150. while(0!=m_init_flag.load())
  151. std::this_thread::yield();
  152. }
  153. };
  154. worker_impl _worker_impl;
  155. worker*worker::instance()
  156. {
  157. int num_thread=std::thread::hardware_concurrency()*2;
  158. log_info("worker thread count=%d",num_thread);
  159. g_hash.set_num_thread(num_thread);
  160. _worker_impl.init(num_thread);
  161. return &_worker_impl;
  162. }