Browse Source

在worker线程中,添加定位卡每秒的定时调用

zzj 6 years ago
parent
commit
6ca4cee23d
3 changed files with 97 additions and 10 deletions
  1. 9 3
      area_business.cpp
  2. 72 7
      worker.cpp
  3. 16 0
      write-copy.h

+ 9 - 3
area_business.cpp

@@ -63,7 +63,6 @@ struct area_business_post_area:area_business
 	//将推送区域信息加入人员数据
 	virtual void on_enter(std::shared_ptr<area_hover>&a,std::shared_ptr<card_location_base>&c,std::shared_ptr<business_data> ptr)
 	{
-	
 	}
 
 	//从人员数据中清除区域信息
@@ -79,6 +78,11 @@ struct area_business_post_area:area_business
 */
 struct area_business_speed_checker:area_business
 {
+	struct speed_checker_data:business_data
+	{
+		std::vector<double> speed;
+	};
+
 	virtual int area_business_type()
 	{
 		return 4;
@@ -88,13 +92,13 @@ struct area_business_speed_checker:area_business
 	//在ptr对象中初始化超速检测所需的对象
 	virtual void on_enter(std::shared_ptr<area_hover>&a,std::shared_ptr<card_location_base>&c,std::shared_ptr<business_data> ptr)
 	{
-	
 	}
 
 	//根据超速检测的策略,进行超速判断,超速时进行告警
 	//建议使用最近M秒内N秒超时进行判断,M=20,N=15,策略数据记录在ptr中
 	virtual void on_hover(std::shared_ptr<area_hover>&a,std::shared_ptr<card_location_base>&c,std::shared_ptr<business_data> ptr)
 	{
+
 	
 	}
 
@@ -255,6 +259,7 @@ struct area_business_person_attendance:area_business
 */
 struct area_business_count_checker:area_business
 {
+
 	virtual int area_business_type()
 	{
 		return 3;
@@ -263,12 +268,13 @@ struct area_business_count_checker:area_business
 	//增加计数,并进行判断
 	virtual void on_enter(std::shared_ptr<area_hover>&a,std::shared_ptr<card_location_base>&c,std::shared_ptr<business_data> ptr)
 	{
-	
+
 	}
 
 	//减少计数
 	virtual void on_leave(std::shared_ptr<area_hover>&a,std::shared_ptr<card_location_base>&c,std::shared_ptr<business_data> ptr)
 	{
+		
 	
 	}
 };

+ 72 - 7
worker.cpp

@@ -10,20 +10,52 @@
 #include "log.h"
 #include "worker.h"
 #include "message.h"
+#include "card_base.h"
 #include "card.h"
 #include "zloop.h"
 
-struct worker_thread: zloop<task*>
+struct hash_thread
 {
-	std::unique_ptr<std::thread> m_thread;
+	int m_num_thread=4;
+
+	void set_num_thread(int num_thread)
+	{
+		m_num_thread=num_thread;
+	}
+
+	int hash_code(uint32_t card_id)
+	{
+		return card_id*2003%m_num_thread;
+	}
+};
+
+hash_thread g_hash;
 
+struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
+{
+	std::unique_ptr<std::thread> m_thread;
+	int  m_thread_id=0;
+	int  m_card_list_version=-1;
+	std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
 	worker_thread ()
 	{
 		m_thread.reset(new std::thread(std::bind(&worker_thread::run,this)));
+		m_local_card_list.reserve(128);
+	}
+
+	void set_thread_id(int id)
+	{
+		m_thread_id=id;
 	}
 
 	void run()
 	{
+		ev::timer card_timer_1s(*this);
+
+		card_timer_1s.set(1,1);
+		card_timer_1s.set<worker_thread,&worker_thread::on_timeout>(this);
+		card_timer_1s.start();
+
 		ev::dynamic_loop::run(0);
 		log_info("worker_thread exit....");
 	}
@@ -37,6 +69,37 @@ struct worker_thread: zloop<task*>
 		}
 	}
 
+	void on_timeout()
+	{
+		int version=card_list::instance()->version();
+		if(m_card_list_version!=version)
+		{
+			m_card_list_version=version;
+			init_local_card_list();
+		}
+
+		for(auto&c:m_local_card_list)
+		{
+			c->on_timer();
+		}
+	}
+
+	bool visit(std::shared_ptr<card_location_base> c)
+	{
+		if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
+		{
+			m_local_card_list.push_back(c);
+		}
+
+		return true;
+	}
+
+	void init_local_card_list()
+	{
+		m_local_card_list.clear();
+		card_list::instance()->accept(*this);
+	}
+
 	void do_task(const task&t)
 	{
 		switch(t.m_cmd_code)
@@ -89,9 +152,9 @@ struct worker_impl:worker
 			thr->join();
 	}
 
-	worker_thread& hash(uint64_t i)
+	worker_thread& hash(uint32_t i)
 	{
-		return *m_threads[i*2003%m_threads.size()];
+		return *m_threads[g_hash.hash_code(i)];
 	}
 
 	virtual void request(task*t)
@@ -108,6 +171,7 @@ struct worker_impl:worker
 			for(int i=0;i<num_thread;i++)
 			{
 				m_threads[i].reset(new worker_thread());
+				m_threads[i]->set_thread_id(i);
 			}
 			m_init_flag.store(0);
 		}
@@ -120,10 +184,11 @@ struct worker_impl:worker
 worker_impl _worker_impl;
 worker*worker::instance()
 {
-	int num_thread=std::thread::hardware_concurrency();
+	int num_thread=std::thread::hardware_concurrency()*2;
 
-	log_info("worker thread count=%d",num_thread*2);
-	_worker_impl.init(num_thread<<1);
+	log_info("worker thread count=%d",num_thread);
+	g_hash.set_num_thread(num_thread);
+	_worker_impl.init(num_thread);
 	return &_worker_impl;
 }
 

+ 16 - 0
write-copy.h

@@ -5,6 +5,7 @@
 #include <algorithm>
 #include <memory>
 #include <mutex>
+#include <atomic>
 #include "visit.h"
 
 template<typename T,typename K,typename V> 
@@ -19,9 +20,21 @@ struct write_copy_base:acceptor<V>
 	}
 
 	std::unordered_map<K,V> m_map;
+	int m_version=0;
 	write_copy_base()
 	{}
 
+	int version()const
+	{
+		return m_version;
+	}
+
+	int set_version(int v)
+	{
+		return m_version=++v;
+	}
+
+
 	V get(K k)const
 	{
 		return m_map[k];
@@ -40,6 +53,8 @@ struct write_copy_base:acceptor<V>
 	{
 		std::shared_ptr<T> ret=std::make_shared<T>();
 		ret->m_map.insert(m_map.begin(),m_map.end());
+		ret->set_version(m_version);
+
 		return ret;
 	}
 
@@ -111,6 +126,7 @@ struct single_base:write_copy_base<T,K,V>
 	}
 
 	std::mutex m_mutex;
+
 	void add(K k,V c)
 	{
 		std::lock_guard<std::mutex> lock(m_mutex);