#include #include #include #include #include #include #include #include #include "log.h" #include "worker.h" #include "message.h" #include "card_base.h" #include "card.h" #include "zloop.h" #include "area.h" #include "clock.h" #include "mine_business.h" #include "bulletin_broad_show.h" #include "event.h" #include "protocol.h" loop_thread::loop_thread () { m_thread.reset(new std::thread(std::bind(&loop_thread::run,this))); } void loop_thread::run() { loop_init(); log_info("worker_thread start...."); while(!check_stop_flag()) { try { ev::dynamic_loop::run(0); } catch(const std::exception&e) { log_error("捕获到异常:%s",e.what()); } catch(...) { log_error("捕获到未知异常"); } } log_info("worker_thread exit...."); } void loop_thread::on_async(const std::list&task_list) { for(task*t:task_list) { do_task(*t); } } void loop_thread::loop_init() { } void loop_thread::join() { m_thread->join(); } void loop_thread::stop() { async_stop(); } loop_thread::~loop_thread() { } struct hash_thread { int m_num_thread=4; void set_num_thread(int num_thread) { m_num_thread=num_thread; } int hash_code(uint32_t card_id) { return card_id*2003%m_num_thread; } }; struct timer_worker_thread: loop_thread { void do_task_0001() { static int version = -1; zclock clock; int v = card_list::instance()->version(); card_list_visit clv; if(v != version){ version = v; clv._flag = true; mine_business::inst()->clear_vehicle(); } card_list::instance()->accept(clv); //bulletin_broad_show::inst()->run_bulletin_board(); mine_business::inst()->run_business(); //event_list::instance()->load_his_data_from_db(false); //log_info("timer_worker_thread use time:%ldus", clock.count_us()); } void on_timeout() { do_task_0001(); } std::unique_ptr card_timer_1s; void loop_init() { card_timer_1s.reset(new ev::timer(*this)); card_timer_1s->set(this); card_timer_1s->start(1,1); } void do_task(task&t) { t.destroy(); } }; std::unique_ptr m_timer_worker; hash_thread g_hash; struct worker_thread: loop_thread ,visitor> { int m_thread_id=0; int m_card_list_version=-1; std::vector> m_local_card_list; worker_thread () { m_local_card_list.reserve(128); } void set_thread_id(int id) { m_thread_id=id; } std::unique_ptr card_timer_1s; void loop_init() { card_timer_1s.reset(new ev::timer(*this)); card_timer_1s->set(this); card_timer_1s->start(1,1); } void update_local_cards() { int version=card_list::instance()->version(); if(m_card_list_version!=version) { m_card_list_version=version; init_local_card_list(); } } void on_timeout() { update_local_cards(); for(auto &c : m_local_card_list) { c->on_timer(); } } bool visit(std::shared_ptr c) { if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以 { m_local_card_list.push_back(c); } return true; } void init_local_card_list() { m_local_card_list.clear(); card_list::instance()->accept(*this); log_info("update local cards,count=%d",m_local_card_list.size()); } /* * 任务队列处理消息队列,调用定位模块进行定位处理 * * */ void do_task(task&t) { switch(t.m_cmd_code) { case 0x843b://tof //log_info("card loc message: 0x%04X", t.m_cmd_code); card_list::instance()->on_message(this,t.body(),false); t.destroy(); break; case CHAR_LOCATEDATA_TOF_INS: log_info("card loc message: 0x%04X", t.m_cmd_code); card_list::instance()->on_message(this, t.body(), false); t.destroy(); break; case CHAR_LOCATEDATA_PDOA: // pdoa实时定位数据 card_list::instance()->on_message(this, t.body(), false); t.destroy(); break; case CHAR_LOCATEDATA_TDOA: // tdoa实时定位数据 case CHAR_LOCATEDATA_TDOA_EXTEND: // 扩展tdoa实时定位数据 case CHAR_LOCATEDATA_TDOA_EXTEND_INS: // 扩展tdoa实时定位数据,带惯导数据 log_info("card loc message: 0x%04X", t.m_cmd_code); card_list::instance()->on_message(this, t.body(), false); break; //card_message::on_loc_message(this,t.m_param1); break; case 0x853b://tof his case 0x873b://tdoa his log_info("site history message: 0x%04X",t.m_cmd_code); card_list::instance()->on_message(this,t.body(),true); //site_message::on_sync(this,t.m_param1); t.destroy(); break; case CHAR_NET_CALI_TIME: case CHAR_CAN_CALI_TIME: log_info("site_cali_time: 0x%04X", t.m_cmd_code); break; case 0x804c://ctrl site message log_info("ctrl site message%04X",t.m_cmd_code); t.destroy(); break; case CHAR_LIGHT_STATUS: log_info("light message: 0x%04X", t.m_cmd_code); break; case CHAR_LIGHT_HEART: log_info("light message: 0x%04X", t.m_cmd_code); break; case 0x10001://区域业务类型修改 { update_local_cards(); for(auto&c:m_local_card_list) { c->get_area_tool()->on_change_business(c,t); } auto&mcb=t.body(); std::shared_ptr a=area_list::instance()->get(mcb.area_id); if(a && a->sub_frozen_count()==2) { a->set_business_list(std::move(mcb.new_list)); a->sub_frozen_count(); } if(mcb.ref_count.fetch_sub(1)==1) { t.destroy(); } break; } case 0x10002://区域业务类型修改 { auto&mcb=t.body(); auto c = card_list::instance()->get(mcb.m_card_id); c->m_display=mcb.m_display; t.destroy(); break; } } } }; struct worker_impl:worker { std::vector> m_threads; std::atomic m_init_flag{-2}; virtual void stop() { if(m_timer_worker) { m_timer_worker->async_stop(); m_timer_worker->join(); m_timer_worker.reset(nullptr); } int exp=0; if(!m_init_flag.compare_exchange_strong (exp,1)) return; for(auto&thr:m_threads) thr->stop(); for(auto&thr:m_threads) thr->join(); } worker_thread& hash(uint32_t i) { return *m_threads[g_hash.hash_code(i)]; } virtual int num_thread() { return std::thread::hardware_concurrency(); } bool running() { return m_init_flag.load()==0; } virtual void request(task*t) { hash(t->m_hash_id).async_request(t); } virtual void broadcast(task*tk) { for(auto&thr:m_threads) thr->async_request(tk); } void init(int num_thread) { int exp=-2; if(m_init_flag.compare_exchange_strong (exp,-1)) { m_threads.resize(num_thread); for(int i=0;iset_thread_id(i); } m_init_flag.store(0); } while(0!=m_init_flag.load()) std::this_thread::yield(); } }; worker_impl _worker_impl; worker*worker::instance() { int num_thread=_worker_impl.num_thread(); if(!_worker_impl.running()) { log_info("worker thread count=%d",num_thread); g_hash.set_num_thread(num_thread); _worker_impl.init(num_thread); m_timer_worker.reset(new timer_worker_thread); } return &_worker_impl; }