#include #include #include #include #include #include #include #include "log.h" #include "worker.h" #include "message.h" #include "card.h" #include "zloop.h" struct worker_thread: zloop { std::unique_ptr m_thread; worker_thread () { m_thread.reset(new std::thread(std::bind(&worker_thread::run,this))); } void run() { ev::dynamic_loop::run(0); log_info("worker_thread exit...."); } virtual void on_async(const std::list&task_list) { for(task*t:task_list) { do_task(*t); free(t); } } void do_task(const task&t) { switch(t.m_cmd_code) { case 0x843b://tof case 0x863b://tdoa log_info("card loc message%04X",t.m_cmd_code); card_list::instance()->on_message(this,t.body(),false); //card_message::on_loc_message(this,t.m_param1); break; case 0x853b://tof his case 0x873b://tdoa his log_info("site history message%04X",t.m_cmd_code); card_list::instance()->on_message(this,t.body(),true); //site_message::on_sync(this,t.m_param1); break; case 0x804c://ctrl site message log_info("ctrl site message%04X",t.m_cmd_code); break; } } void join() { m_thread->join(); } void stop() { async_stop(); } }; struct worker_impl:worker { std::vector> m_threads; std::atomic m_init_flag{-2}; virtual void stop() { int exp=0; if(!m_init_flag.compare_exchange_strong (exp,1)) return; for(auto&thr:m_threads) thr->stop(); for(auto&thr:m_threads) thr->join(); } worker_thread& hash(uint64_t i) { return *m_threads[i*2003%m_threads.size()].get(); } virtual void request(task*t) { hash(t->m_hash_id).async_request(t); } void init(int num_thread) { int exp=-2; if(m_init_flag.compare_exchange_strong (exp,-1)) { m_threads.resize(num_thread); for(int i=0;i