123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #include <functional>
- #include <list>
- #include <vector>
- #include <mutex>
- #include <atomic>
- #include <memory>
- #include <thread>
- #include <ev++.h>
- #include "log.h"
- #include "worker.h"
- #include "message.h"
- #include "card_base.h"
- #include "card.h"
- #include "zloop.h"
- struct hash_thread
- {
- int m_num_thread=4;
- void set_num_thread(int num_thread)
- {
- m_num_thread=num_thread;
- }
- int hash_code(uint32_t card_id)
- {
- return card_id*2003%m_num_thread;
- }
- };
- hash_thread g_hash;
- struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
- {
- std::unique_ptr<std::thread> m_thread;
- int m_thread_id=0;
- int m_card_list_version=-1;
- std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
- worker_thread ()
- {
- m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
- m_local_card_list.reserve(128);
- }
- void set_thread_id(int id)
- {
- m_thread_id=id;
- }
- void run()
- {
- ev::timer card_timer_1s(*this);
- card_timer_1s.set<worker_thread,&worker_thread::on_timeout>(this);
- card_timer_1s.start(1,1);
- ev::dynamic_loop::run(0);
- log_info("worker_thread exit....");
- }
- virtual void on_async(const std::list<task*>&task_list)
- {
- for(task*t:task_list)
- {
- do_task(*t);
- free(t);
- }
- }
- void on_timeout()
- {
- int version=card_list::instance()->version();
- if(m_card_list_version!=version)
- {
- m_card_list_version=version;
- init_local_card_list();
- }
- for(auto&c:m_local_card_list)
- {
- c->on_timer();
- }
- }
- bool visit(std::shared_ptr<card_location_base> c)
- {
- if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
- {
- m_local_card_list.push_back(c);
- }
- return true;
- }
- void init_local_card_list()
- {
- m_local_card_list.clear();
- card_list::instance()->accept(*this);
- }
- void do_task(const task&t)
- {
- switch(t.m_cmd_code)
- {
- case 0x843b://tof
- case 0x863b://tdoa
- log_info("card loc message%04X",t.m_cmd_code);
- card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
- //card_message::on_loc_message(this,t.m_param1);
- break;
- case 0x853b://tof his
- case 0x873b://tdoa his
- log_info("site history message%04X",t.m_cmd_code);
- card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
- //site_message::on_sync(this,t.m_param1);
- break;
- case 0x804c://ctrl site message
- log_info("ctrl site message%04X",t.m_cmd_code);
- break;
- }
- }
- void join()
- {
- m_thread->join();
- }
- void stop()
- {
- async_stop();
- }
- };
- struct worker_impl:worker
- {
- std::vector<std::shared_ptr<worker_thread>> m_threads;
- std::atomic<int> m_init_flag{-2};
- virtual void stop()
- {
- int exp=0;
- if(!m_init_flag.compare_exchange_strong (exp,1))
- return;
- for(auto&thr:m_threads)
- thr->stop();
- for(auto&thr:m_threads)
- thr->join();
- }
- worker_thread& hash(uint32_t i)
- {
- return *m_threads[g_hash.hash_code(i)];
- }
- virtual void request(task*t)
- {
- hash(t->m_hash_id).async_request(t);
- }
- void init(int num_thread)
- {
- int exp=-2;
- if(m_init_flag.compare_exchange_strong (exp,-1))
- {
- m_threads.resize(num_thread);
- for(int i=0;i<num_thread;i++)
- {
- m_threads[i].reset(new worker_thread());
- m_threads[i]->set_thread_id(i);
- }
- m_init_flag.store(0);
- }
- while(0!=m_init_flag.load())
- std::this_thread::yield();
- }
- };
- worker_impl _worker_impl;
- worker*worker::instance()
- {
- int num_thread=std::thread::hardware_concurrency()*2;
- log_info("worker thread count=%d",num_thread);
- g_hash.set_num_thread(num_thread);
- _worker_impl.init(num_thread);
- return &_worker_impl;
- }
|