worker.cpp 6.2 KB


  1. #include <functional>
  2. #include <list>
  3. #include <vector>
  4. #include <mutex>
  5. #include <atomic>
  6. #include <memory>
  7. #include <thread>
  8. #include <ev++.h>
  9. #include "log.h"
  10. #include "worker.h"
  11. #include "message.h"
  12. #include "card_base.h"
  13. #include "card.h"
  14. #include "zloop.h"
  15. #include "area.h"
  16. #include "clock.h"
  17. #include "mine_business.h"
  18. #include "bulletin_broad_show.h"
  19. #include "event.h"
  20. loop_thread::loop_thread ()
  21. {
  22. m_thread.reset(new std::thread(std::bind(&loop_thread::run,this)));
  23. }
  24. void loop_thread::run()
  25. {
  26. loop_init();
  27. log_info("worker_thread start....");
  28. while(!check_stop_flag())
  29. {
  30. try
  31. {
  32. ev::dynamic_loop::run(0);
  33. }
  34. catch(const std::exception&e)
  35. {
  36. log_error("捕获到异常:%s",e.what());
  37. }
  38. catch(...)
  39. {
  40. log_error("捕获到未知异常");
  41. }
  42. }
  43. log_info("worker_thread exit....");
  44. }
  45. void loop_thread::on_async(const std::list<task*>&task_list)
  46. {
  47. for(task*t:task_list)
  48. {
  49. do_task(*t);
  50. }
  51. }
  52. void loop_thread::loop_init()
  53. {
  54. }
  55. void loop_thread::join()
  56. {
  57. m_thread->join();
  58. }
  59. void loop_thread::stop()
  60. {
  61. async_stop();
  62. }
  63. loop_thread::~loop_thread()
  64. {
  65. }
  66. struct hash_thread
  67. {
  68. int m_num_thread=4;
  69. void set_num_thread(int num_thread)
  70. {
  71. m_num_thread=num_thread;
  72. }
  73. int hash_code(uint32_t card_id)
  74. {
  75. return card_id*2003%m_num_thread;
  76. }
  77. };
  78. struct timer_worker_thread: loop_thread
  79. {
  80. void do_task_0001()
  81. {
  82. static int version = -1;
  83. zclock clock;
  84. int v = card_list::instance()->version();
  85. card_list_visit clv;
  86. if(v != version){
  87. version=v;
  88. clv._flag=true;
  89. mine_business::inst()->clear_vehicle();
  90. }
  91. card_list::instance()->accept(clv);
  92. bulletin_broad_show::inst()->run_bulletin_board();
  93. mine_business::inst()->run_business();
  94. event_list::instance()->load_his_data_from_db(false);
  95. log_info("timer_worker_thread use time:%ldus", clock.count_us());
  96. }
  97. void on_timeout()
  98. {
  99. do_task_0001();
  100. }
  101. std::unique_ptr<ev::timer> card_timer_1s;
  102. void loop_init()
  103. {
  104. card_timer_1s.reset(new ev::timer(*this));
  105. card_timer_1s->set<timer_worker_thread,&timer_worker_thread::on_timeout>(this);
  106. card_timer_1s->start(1,1);
  107. }
  108. void do_task(task&t)
  109. {
  110. t.destroy();
  111. }
  112. };
  113. std::unique_ptr<timer_worker_thread> m_timer_worker;
  114. hash_thread g_hash;
  115. struct worker_thread: loop_thread ,visitor<std::shared_ptr<card_location_base>>
  116. {
  117. int m_thread_id=0;
  118. int m_card_list_version=-1;
  119. std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
  120. worker_thread ()
  121. {
  122. m_local_card_list.reserve(128);
  123. }
  124. void set_thread_id(int id)
  125. {
  126. m_thread_id=id;
  127. }
  128. std::unique_ptr<ev::timer> card_timer_1s;
  129. void loop_init()
  130. {
  131. card_timer_1s.reset(new ev::timer(*this));
  132. card_timer_1s->set<worker_thread,&worker_thread::on_timeout>(this);
  133. card_timer_1s->start(1,1);
  134. }
  135. void update_local_cards()
  136. {
  137. int version=card_list::instance()->version();
  138. if(m_card_list_version!=version)
  139. {
  140. m_card_list_version=version;
  141. init_local_card_list();
  142. }
  143. }
  144. void on_timeout()
  145. {
  146. update_local_cards();
  147. for(auto&c:m_local_card_list)
  148. {
  149. c->on_timer();
  150. }
  151. }
  152. bool visit(std::shared_ptr<card_location_base> c)
  153. {
  154. if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
  155. {
  156. m_local_card_list.push_back(c);
  157. }
  158. return true;
  159. }
  160. void init_local_card_list()
  161. {
  162. m_local_card_list.clear();
  163. card_list::instance()->accept(*this);
  164. log_info("update local cards,count=%d",m_local_card_list.size());
  165. }
  166. void do_task(task&t)
  167. {
  168. switch(t.m_cmd_code)
  169. {
  170. case 0x843b://tof
  171. case 0x863b://tdoa
  172. log_info("card loc message%04X",t.m_cmd_code);
  173. card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
  174. t.destroy();
  175. //card_message::on_loc_message(this,t.m_param1);
  176. break;
  177. case 0x853b://tof his
  178. case 0x873b://tdoa his
  179. log_info("site history message%04X",t.m_cmd_code);
  180. card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
  181. //site_message::on_sync(this,t.m_param1);
  182. t.destroy();
  183. break;
  184. case 0x804c://ctrl site message
  185. log_info("ctrl site message%04X",t.m_cmd_code);
  186. t.destroy();
  187. break;
  188. case 0x10001://区域业务类型修改
  189. {
  190. update_local_cards();
  191. for(auto&c:m_local_card_list)
  192. {
  193. c->get_area_tool()->on_change_business(c,t);
  194. }
  195. auto&mcb=t.body<message_change_business>();
  196. std::shared_ptr<area> a=area_list::instance()->get(mcb.area_id);
  197. if(a && a->sub_frozen_count()==2)
  198. {
  199. a->set_business_list(std::move(mcb.new_list));
  200. a->sub_frozen_count();
  201. }
  202. if(mcb.ref_count.fetch_sub(1)==1)
  203. {
  204. t.destroy();
  205. }
  206. break;
  207. }
  208. case 0x10002://区域业务类型修改
  209. {
  210. auto&mcb=t.body<message_change_card_display>();
  211. auto c = card_list::instance()->get(mcb.m_card_id);
  212. c->m_display=mcb.m_display;
  213. t.destroy();
  214. break;
  215. }
  216. }
  217. }
  218. };
  219. struct worker_impl:worker
  220. {
  221. std::vector<std::shared_ptr<worker_thread>> m_threads;
  222. std::atomic<int> m_init_flag{-2};
  223. virtual void stop()
  224. {
  225. if(m_timer_worker)
  226. {
  227. m_timer_worker->async_stop();
  228. m_timer_worker->join();
  229. m_timer_worker.reset(nullptr);
  230. }
  231. int exp=0;
  232. if(!m_init_flag.compare_exchange_strong (exp,1))
  233. return;
  234. for(auto&thr:m_threads)
  235. thr->stop();
  236. for(auto&thr:m_threads)
  237. thr->join();
  238. }
  239. worker_thread& hash(uint32_t i)
  240. {
  241. return *m_threads[g_hash.hash_code(i)];
  242. }
  243. virtual int num_thread()
  244. {
  245. return std::thread::hardware_concurrency();
  246. }
  247. bool running()
  248. {
  249. return m_init_flag.load()==0;
  250. }
  251. virtual void request(task*t)
  252. {
  253. hash(t->m_hash_id).async_request(t);
  254. }
  255. virtual void broadcast(task*tk)
  256. {
  257. for(auto&thr:m_threads)
  258. thr->async_request(tk);
  259. }
  260. void init(int num_thread)
  261. {
  262. int exp=-2;
  263. if(m_init_flag.compare_exchange_strong (exp,-1))
  264. {
  265. m_threads.resize(num_thread);
  266. for(int i=0;i<num_thread;i++)
  267. {
  268. m_threads[i].reset(new worker_thread());
  269. m_threads[i]->set_thread_id(i);
  270. }
  271. m_init_flag.store(0);
  272. }
  273. while(0!=m_init_flag.load())
  274. std::this_thread::yield();
  275. }
  276. };
  277. worker_impl _worker_impl;
  278. worker*worker::instance()
  279. {
  280. int num_thread=_worker_impl.num_thread();
  281. if(!_worker_impl.running())
  282. {
  283. log_info("worker thread count=%d",num_thread);
  284. g_hash.set_num_thread(num_thread);
  285. _worker_impl.init(num_thread);
  286. m_timer_worker.reset(new timer_worker_thread);
  287. }
  288. return &_worker_impl;
  289. }