#include #include #include #include #include #include #include #include #include "log.h" #include "worker.h" #include "message.h" #include "card_base.h" #include "card.h" #include "zloop.h" #include "area.h" struct hash_thread { int m_num_thread=4; void set_num_thread(int num_thread) { m_num_thread=num_thread; } int hash_code(uint32_t card_id) { return card_id*2003%m_num_thread; } }; hash_thread g_hash; struct worker_thread: zloop ,visitor> { std::unique_ptr m_thread; int m_thread_id=0; int m_card_list_version=-1; std::vector> m_local_card_list; worker_thread () { m_thread.reset(new std::thread(std::bind(&worker_thread::run,this))); m_local_card_list.reserve(128); } void set_thread_id(int id) { m_thread_id=id; } void run() { ev::timer card_timer_1s(*this); card_timer_1s.set(this); card_timer_1s.start(1,1); ev::dynamic_loop::run(0); log_info("worker_thread exit...."); } virtual void on_async(const std::list&task_list) { for(task*t:task_list) { do_task(*t); } } void update_local_cards() { int version=card_list::instance()->version(); if(m_card_list_version!=version) { m_card_list_version=version; init_local_card_list(); } } void on_timeout() { update_local_cards(); for(auto&c:m_local_card_list) { c->on_timer(); } } bool visit(std::shared_ptr c) { if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以 { m_local_card_list.push_back(c); } return true; } void init_local_card_list() { m_local_card_list.clear(); card_list::instance()->accept(*this); } void do_task(task&t) { switch(t.m_cmd_code) { case 0x843b://tof case 0x863b://tdoa log_info("card loc message%04X",t.m_cmd_code); card_list::instance()->on_message(this,t.body(),false); free(&t); //card_message::on_loc_message(this,t.m_param1); break; case 0x853b://tof his case 0x873b://tdoa his log_info("site history message%04X",t.m_cmd_code); card_list::instance()->on_message(this,t.body(),true); //site_message::on_sync(this,t.m_param1); free(&t); break; case 0x804c://ctrl site message log_info("ctrl site message%04X",t.m_cmd_code); free(&t); break; case 0x10001://区域业务类型修改 { update_local_cards(); for(auto&c:m_local_card_list) { c->get_area_tool()->on_change_business(c,t); } auto&mcb=t.body(); std::shared_ptr a=area_list::instance()->get(mcb.area_id); if(a && a->sub_frozen_count()==2) { a->set_business_list(std::move(mcb.new_list)); a->sub_frozen_count(); } if(mcb.ref_count.fetch_sub(1)==1) { free(&t); } } } } void join() { m_thread->join(); } void stop() { async_stop(); } }; struct worker_impl:worker { std::vector> m_threads; std::atomic m_init_flag{-2}; virtual void stop() { int exp=0; if(!m_init_flag.compare_exchange_strong (exp,1)) return; for(auto&thr:m_threads) thr->stop(); for(auto&thr:m_threads) thr->join(); } worker_thread& hash(uint32_t i) { return *m_threads[g_hash.hash_code(i)]; } virtual int num_thread() { return std::thread::hardware_concurrency(); } bool running() { return m_init_flag.load()==0; } virtual void request(task*t) { hash(t->m_hash_id).async_request(t); } virtual void broadcast(task*tk) { for(auto&thr:m_threads) thr->async_request(tk); } void init(int num_thread) { int exp=-2; if(m_init_flag.compare_exchange_strong (exp,-1)) { m_threads.resize(num_thread); for(int i=0;iset_thread_id(i); } m_init_flag.store(0); } while(0!=m_init_flag.load()) std::this_thread::yield(); } }; worker_impl _worker_impl; worker*worker::instance() { int num_thread=_worker_impl.num_thread(); if(!_worker_impl.running()) { log_info("worker thread count=%d",num_thread); g_hash.set_num_thread(num_thread); _worker_impl.init(num_thread); } return &_worker_impl; }