worker.cpp 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #include <list>
  2. #include <vector>
  3. #include <mutex>
  4. #include <atomic>
  5. #include <memory>
  6. #include <thread>
  7. #include <ev++.h>
  8. #include "log.h"
  9. #include "worker.h"
  10. #include "message.h"
  11. #include "card.h"
  12. #include "zloop.h"
  13. #include "site_message_handle.h"
  14. struct worker_thread: zloop<task*>
  15. {
  16. std::unique_ptr<std::thread> m_thread;
  17. worker_thread ()
  18. {
  19. m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
  20. }
  21. void run()
  22. {
  23. ev::dynamic_loop::run(0);
  24. log_info("worker_thread exit....");
  25. }
  26. virtual void on_async(const std::list<task*>&task_list)
  27. {
  28. for(task*t:task_list)
  29. {
  30. do_task(*t);
  31. free(t);
  32. }
  33. }
  34. void do_task(const task&t)
  35. {
  36. switch(t.m_cmd_code)
  37. {
  38. case 0x843b://tof
  39. case 0x863b://tdoa
  40. log_info("card loc message%04X",t.m_cmd_code);
  41. if (t.m_site_data == 1)
  42. site_message_handle::instance()->parse_data_locate_reader(t.m_cmd_code,t.body<message_siteinfo>(),false);
  43. else
  44. card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
  45. //card_message::on_loc_message(this,t.m_param1);
  46. break;
  47. case 0x853b://tof his
  48. case 0x873b://tdoa his
  49. log_info("site history message%04X",t.m_cmd_code);
  50. card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
  51. //site_message::on_sync(this,t.m_param1);
  52. break;
  53. case 0x804c://ctrl site message
  54. log_info("ctrl site message%04X",t.m_cmd_code);
  55. break;
  56. }
  57. }
  58. void join()
  59. {
  60. m_thread->join();
  61. }
  62. void stop()
  63. {
  64. async_stop();
  65. }
  66. };
  67. struct worker_impl:worker
  68. {
  69. std::vector<std::shared_ptr<worker_thread>> m_threads;
  70. std::atomic<int> m_init_flag{-2};
  71. virtual void stop()
  72. {
  73. int exp=0;
  74. if(!m_init_flag.compare_exchange_strong (exp,1))
  75. return;
  76. for(auto&thr:m_threads)
  77. thr->stop();
  78. for(auto&thr:m_threads)
  79. thr->join();
  80. }
  81. worker_thread& hash(uint64_t i)
  82. {
  83. return *m_threads[i*2003%m_threads.size()];
  84. }
  85. virtual void request(task*t)
  86. {
  87. hash(t->m_hash_id).async_request(t);
  88. }
  89. void init(int num_thread)
  90. {
  91. int exp=-2;
  92. if(m_init_flag.compare_exchange_strong (exp,-1))
  93. {
  94. m_threads.resize(num_thread);
  95. for(int i=0;i<num_thread;i++)
  96. {
  97. m_threads[i].reset(new worker_thread());
  98. }
  99. m_init_flag.store(0);
  100. }
  101. while(0!=m_init_flag.load())
  102. std::this_thread::yield();
  103. }
  104. };
  105. worker_impl _worker_impl;
  106. worker*worker::instance()
  107. {
  108. int num_thread=std::thread::hardware_concurrency();
  109. log_info("worker thread count=%d",num_thread*2);
  110. _worker_impl.init(num_thread<<1);
  111. return &_worker_impl;
  112. }