123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- #include <list>
- #include <vector>
- #include <mutex>
- #include <atomic>
- #include <memory>
- #include <thread>
- #include <ev++.h>
- #include "log.h"
- #include "worker.h"
- #include "message.h"
- #include "card.h"
- #include "zloop.h"
- struct worker_thread: zloop<task*>
- {
- std::unique_ptr<std::thread> m_thread;
- card_list*m_card_list;
- worker_thread ()
- {
- m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
- m_card_list=card_list::instance(*this);//*********
- }
- void run()
- {
- ev::dynamic_loop::run(0);
- log_info("worker_thread exit.");
- }
- virtual void on_async(const std::list<task*>&task_list)
- {
- for(task*t:task_list)
- {
- do_task(*t);
- free(t);
- }
- }
- void do_task(const task&t)
- {
- switch(t.m_cmd_code)
- {
- case 0x843b://tof
- case 0x863b://tdoa
- log_info("card loc message%04X",t.m_cmd_code);
- m_card_list->on_message(t.body<message_locinfo>(),false);
- //card_message::on_loc_message(this,t.m_param1);
- break;
- case 0x853b://tof his
- case 0x873b://tdoa his
- log_info("site history message%04X",t.m_cmd_code);
- m_card_list->on_message(t.body<message_locinfo>(),true);
- //site_message::on_sync(this,t.m_param1);
- break;
- case 0x804c://ctrl site message
- log_info("ctrl site message%04X",t.m_cmd_code);
- break;
- }
- }
- void join()
- {
- m_thread->join();
- }
- void stop()
- {
- async_stop();
- }
- };
- struct worker_impl:worker
- {
- std::vector<std::shared_ptr<worker_thread>> m_threads;
- std::atomic<int> m_init_flag{-2};
- virtual void stop()
- {
- for(auto&thr:m_threads)
- thr->stop();
- for(auto&thr:m_threads)
- thr->join();
- }
- worker_thread& hash(uint64_t i)
- {
- return *m_threads[i*2003%m_threads.size()].get();
- }
- virtual void request(task*t)
- {
- hash(t->m_hash_id).async_request(t);
- }
- void init(int num_thread)
- {
- int exp=-2;
- if(m_init_flag.compare_exchange_strong (exp,-1))
- {
- m_threads.resize(num_thread);
- for(int i=0;i<num_thread;i++)
- {
- m_threads[i].reset(new worker_thread());
- }
- m_init_flag.store(0);
- }
- while(0!=m_init_flag.load())
- std::this_thread::yield();
- }
- };
- worker_impl _worker_impl;
- worker*worker::instance()
- {
- _worker_impl.init(4);
- return &_worker_impl;
- }
|