Browse Source

1、区域修改业务类型,
2、屏蔽分站发送历史时刻数据
3、优化单CT处理逻辑

zzj 6 years ago
parent
commit
6466e775ff
11 changed files with 380 additions and 79 deletions
  1. 118 11
      area.cpp
  2. 26 0
      area.h
  3. 1 0
      card_base.cpp
  4. 45 37
      card_message_handle.cpp
  5. 7 0
      card_message_handle.h
  6. 17 5
      net-service.cpp
  7. 16 1
      test.cpp
  8. 56 7
      worker.cpp
  9. 16 0
      worker.h
  10. 76 18
      znet.cpp
  11. 2 0
      znet.h

+ 118 - 11
area.cpp

@@ -4,6 +4,7 @@
 #include "log.h"
 
 #include <area.h>
+#include "worker.h"
 #include "point.h"
 #include "tool_time.h"
 #include "common_tool.h"
@@ -81,9 +82,35 @@ area::area(int id,int limit_count_person, int limit_time_person,double scale,int
 		m_event_vehicle_count = false;
 		m_event_person_show_count = false;
 		m_event_vehicle_show_count = false;
+
+		m_frozen_count.store(0);
     }
 
 
+void area::change_business(uint32_t new_bits)
+{
+	worker*w=worker::instance();
+
+	uint32_t del=((m_area_type^new_bits)|m_area_type)^m_area_type;
+	uint32_t add=((m_area_type^new_bits)|new_bits)^new_bits;
+
+	task*t =task::alloc<message_change_business>();
+
+	t->m_cmd_code=0x10001; //区域业务类型修改的编码
+	t->m_hash_id=0;        //这个不用
+
+	auto&mcb=t->body<message_change_business>();
+	mcb.area_id=m_id;
+	mcb.del_list=area_business::get_instance_list(del,m_id);
+	mcb.add_list=area_business::get_instance_list(add,m_id);
+	mcb.new_list=area_business::get_instance_list(new_bits,m_id);
+	mcb.ref_count.store(w->num_thread());
+
+	add_frozen_count(w->num_thread()+1);
+
+	w->broadcast(t);
+}
+
 void area::on_hover(const std::shared_ptr<area_hover>&a,const std::shared_ptr<card_location_base>&c)
 {
 	a->m_last_time=tool_time::now_to_ms();
@@ -389,15 +416,19 @@ void area_list::init_from_db(int id/*=-1*/)
         else
         {
             auto tmp_ptr = area_list::instance()->get(id);
+			bool newobj=false;
+			
             if(!tmp_ptr)
             {
 				tmp_ptr = create(area_type_id,area_id,over_count_person,over_time_person, scale,map_id,b_type);
-                area_list::instance()->add(id, tmp_ptr);
+				newobj=true;
             }
+
             tmp_ptr->update(over_count_person, over_time_person,scale,map_id,area_type_id, over_count_vehicle,over_time_vehicle);
             tmp_ptr->m_speed=std::move(map_);
             tmp_ptr->m_bound=init_path(path,area_id);
             tmp_ptr->m_is_work_area = is_work_area;
+
             for(const auto &p : tmp_ptr->m_bound)
                 log_info("point:area_id:%d--x:%.2f,y:%.2f",area_id,p.x,p.y);
 
@@ -405,6 +436,15 @@ void area_list::init_from_db(int id/*=-1*/)
                      ,area_type_id:%d,over_count_vehicle:%d,over_time_vehicle:%d",
                      id,over_count_person, over_time_person,scale,map_id,area_type_id,
                      over_count_vehicle,over_time_vehicle);
+
+			if(newobj)
+			{
+				area_list::instance()->add(id, tmp_ptr);
+			}
+			else
+			{
+				tmp_ptr->change_business(b_type);
+			}
         }
     }
 
@@ -481,9 +521,11 @@ void area_tool::on_point(const std::shared_ptr<card_location_base>& c,const poin
 		areas.push_back(area_);
 	}
 	else
+	{
 		std::sort(areas.begin(),areas.end(),[](const std::shared_ptr<area>&l,const std::shared_ptr<area>&r){
 				return l->id()<r->id();
 				});
+	}
 
 	auto c1=m_hover_list.begin(),ce=m_hover_list.end();
 	auto a1=areas.begin() ,ae=areas.end();
@@ -492,35 +534,72 @@ void area_tool::on_point(const std::shared_ptr<card_location_base>& c,const poin
 
 	while (c1!=ce && a1!=ae)
 	{
-		if ((*c1)->id()<(*a1)->id()) 
+		if ((*c1)->id()<(*a1)->id()) //离开区域
 		{ 
-			(*c1)->m_area->on_leave(*c1, c);
+			if((*c1)->m_area->get_frozen_count()==0)//如果该区域未在修改中,调用on_leave
+			{
+				(*c1)->m_area->on_leave(*c1, c);
+			}
+			else//否则,该区域持续保存在卡的区域列表,等待修改完成
+			{
+				nlist.push_back(*c1);
+				log_warn("丢弃离开区域事件,cardid=%d,area_id=%d",c->m_id,(*c1)->id());
+			}
 			++c1;
 		}
-		else if ((*a1)->id()<(*c1)->id()) 
+		else if ((*a1)->id()<(*c1)->id()) //进入新区域
 		{
-			nlist.push_back(std::make_shared<area_hover>(*a1,pt));
-			(*a1)->on_enter(nlist.back(),c);
+			if((*a1)->get_frozen_count()==0)////如果该区域未在修改中,执行正常逻辑
+			{
+				nlist.push_back(std::make_shared<area_hover>(*a1,pt));
+				(*a1)->on_enter(nlist.back(),c);
+			}
+			else//丢弃本次进入的点,等待修改完成
+			{
+				log_warn("丢弃进入区域事件,cardid=%d,area_id=%d",c->m_id,(*a1)->id());
+			}
 			++a1;
 		}
-		else 
+		else //需要持续的区域
 		{ 
 			nlist.push_back(*c1);
-			(*c1)->m_area->on_hover(*c1,c);
+			if((*c1)->m_area->get_frozen_count()==0)//正常情况
+			{
+				(*c1)->m_area->on_hover(*c1,c);
+			}
+			else//丢弃本次on_hover时间
+			{
+				log_warn("丢弃区域行走事件,cardid=%d,area_id=%d",c->m_id,(*c1)->id());
+			}
 			++c1,++a1;
 		}
 	}
 
 	while(c1!=ce)
 	{
-		(*c1)->m_area->on_leave(*c1, c);
+		if((*c1)->m_area->get_frozen_count()==0)
+		{
+			(*c1)->m_area->on_leave(*c1, c);
+		}
+		else
+		{
+			log_warn("丢弃离开区域事件,cardid=%d,area_id=%d",c->m_id,(*c1)->id());
+		}
 		++c1;
 	}
 
 	while(a1!=ae)
 	{
-		nlist.push_back(std::make_shared<area_hover>(*a1,pt));
-		(*a1)->on_enter(nlist.back(),c);
+		if((*a1)->get_frozen_count()==0)
+		{
+			nlist.push_back(std::make_shared<area_hover>(*a1,pt));
+			(*a1)->on_enter(nlist.back(),c);
+		}
+		else
+		{
+			log_warn("丢弃进入区域事件,cardid=%d,area_id=%d",c->m_id,(*a1)->id());
+		}
+
 		++a1;
 	}
 
@@ -577,6 +656,7 @@ void area_tool::set_area_info(int mapid,double scale,int areaid,const point &pt,
 		log_info("wrong type..");
 	}
 }
+
 void area_tool::set_area_info(int mapid,int areaid,const point &pt,uint64_t t)
 {
 	auto lm = Landmark_list::instance()->get(mapid,areaid,pt);
@@ -588,3 +668,30 @@ void area_tool::set_area_info(int mapid,int areaid,const point &pt,uint64_t t)
 	m_area_info[areaid].swap(tinfo);
 }
 
+
+void area_tool::on_change_business(const std::shared_ptr<card_location_base>& c, const task&t)
+{
+	auto&mcb=t.body<message_change_business>();
+
+	auto i=std::lower_bound(m_hover_list.begin(),m_hover_list.end(),mcb.area_id ,[&mcb](std::shared_ptr<area_hover>&ah,int id){
+		return ah->m_area->m_id<id;
+	});
+
+	if(i==m_hover_list.end())
+		return ;
+	
+	if((*i)->m_area->m_id!=mcb.area_id)
+		return;
+	
+	log_info("调用调整的区域业务,card_id=%d,area_id=%d", c->m_id, mcb.area_id);
+	for(auto&biz:mcb.add_list)
+	{
+		biz->on_enter(*i,c,(*i)->get_business_data(biz->area_business_type()));
+	}
+
+	for(auto&biz:mcb.del_list)
+	{
+		biz->on_leave(*i,c,(*i)->get_business_data(biz->area_business_type()));
+	}
+}
+

+ 26 - 0
area.h

@@ -63,6 +63,29 @@ struct area
     }
 public:
 	std::vector<area_business*> m_area_business_list;
+	std::atomic<int> m_frozen_count;
+
+	void set_business_list(std::vector<area_business*>&&business_list)
+	{
+		m_area_business_list=business_list;	
+	}
+
+	int  get_frozen_count()
+	{
+		return m_frozen_count.load(std::memory_order_acquire);
+	}
+
+	int add_frozen_count(int deta=1)
+	{
+		return m_frozen_count.fetch_add(deta,std::memory_order_release);
+	}
+
+	int sub_frozen_count(int deta=1)
+	{
+		return add_frozen_count(-deta);
+	}
+
+	void change_business(uint32_t new_bits);
 public:
     std::vector<point> m_bound;
 	//数据库唯一ID
@@ -182,6 +205,7 @@ struct area_hover
 //每张卡包含一个对象
 //在解析出数据点时,调用on_point
 struct site;
+struct task;
 struct area_tool
 {
 	//卡所在的所有area的列表,以id排序小->大
@@ -192,11 +216,13 @@ struct area_tool
 	int m_mapid=-1;
 	double m_scale=2.0;
     std::shared_ptr<site> m_site=nullptr;
+
     void set(const std::shared_ptr<site>& s)
     {
         if(m_site != s)
           m_site=s;
     }
+	void on_change_business(const std::shared_ptr<card_location_base>& c, const task&t);
 	void on_point(const std::shared_ptr<card_location_base>& c,const point&pt);
     void on_leave(const std::shared_ptr<card_location_base>& c);
 

+ 1 - 0
card_base.cpp

@@ -59,6 +59,7 @@ void card_location_base::on_message(zloop<task*> * loop,const message_locinfo&lo
 
 	if(!site_ptr)
 	{
+		log_warn("接收到分站%d的数据,CARD=%d, CT=%d,但是分站未定义",loc.m_site_id,m_id,loc.m_card_ct);
 		return;
 	}
 

+ 45 - 37
card_message_handle.cpp

@@ -13,13 +13,12 @@
 struct one_ct_message_handle
 {
 	static loc_tool_main m_loc_tool;
-	ev::timer m_min_timer,m_max_timer;
+	ev::timer *m_min_timer=nullptr,*m_max_timer=nullptr;
 	//loc_message.
 	std::vector<loc_message> m_msg_list;
 	card_location_base*m_card;
 	const algo_config*m_ac=nullptr;
 	int  m_ct;
-	bool m_min_timeout=false;
 	ev::dynamic_loop * m_loop = nullptr;
 	one_ct_message_handle(card_location_base*card)
 	{
@@ -27,52 +26,51 @@ struct one_ct_message_handle
 		m_ct=-1;
 	}
 
-	void reset()
+	~one_ct_message_handle()
 	{
-		m_ct=-1;
-		m_min_timeout=false;
-		m_msg_list.clear();
+		delete m_min_timer;
+		delete m_max_timer;
 	}
 
 	void on_min_timer()
 	{
-		m_min_timer.stop();
-
 		if((int)m_msg_list.size()>=m_ac->best_msg_cnt)
 		{
-			m_max_timer.stop();
+			m_max_timer->stop();
 			calc_location();
-			return;
 		}
-		m_min_timeout=true;
 	}
 
 	void on_max_timer()
 	{
-		m_max_timer.stop();
 		calc_location();
 	}
 
 	void set(ev::dynamic_loop * loop)
 	{
-		m_loop = loop;
-
-		m_min_timer.set(*m_loop);
-		m_min_timer.set<one_ct_message_handle,&one_ct_message_handle::on_min_timer>(this);
-		m_max_timer.set(*m_loop);
-		m_max_timer.set<one_ct_message_handle,&one_ct_message_handle::on_max_timer>(this);
+		m_loop=loop;
+		m_min_timer=new ev::timer(*loop);
+		m_min_timer->set<one_ct_message_handle,&one_ct_message_handle::on_min_timer>(this);
+		m_max_timer=new ev::timer(*loop);
+		m_max_timer->set<one_ct_message_handle,&one_ct_message_handle::on_max_timer>(this);
 	}
 
 	void on_message(ev::dynamic_loop *loop,const message_locinfo&loc)
 	{
-		if(m_loop == nullptr && loop!=nullptr)
+		if(loop==nullptr)
+			return;
+
+		if(m_loop == nullptr)
 			set(loop);
-		else if(loop == nullptr)
+
+		if(m_ct==loc.m_card_ct && m_max_timer->remaining()<0) //超过max_timer的点,抛弃
 			return;
-		if(!m_msg_list.empty()&& m_ct!=loc.m_card_ct)
+
+		if(!m_msg_list.empty() && m_ct!=loc.m_card_ct) //ct已经回绕,重置
 		{
 			m_msg_list.clear();
 		}
+
 		auto sitPtr = sit_list::instance()->get(loc.m_site_id);
 		if(sitPtr==nullptr)
 		{
@@ -80,36 +78,32 @@ struct one_ct_message_handle
 			return;
 		}
 		auto s=sit_list::instance()->get(loc.m_site_id);
-		if(m_msg_list.empty())
+		//这里构造loc_message 保存数据
+		m_msg_list.push_back(loc_message(s,loc.m_tof,loc.m_time_stamp,loc.m_card_id,
+					loc.m_card_ct,loc.m_card_type,loc.m_ant_id,loc.m_rav,loc.m_acc,
+					loc.m_sync_ct,loc.m_rssi));
+
+		if(m_msg_list.size()==1)//第一个点
 		{
 			m_ct=loc.m_card_ct;
 			m_ac=&s->config();
-			m_min_timeout=false;
-			//这里构造loc_message 保存数据
-			m_msg_list.push_back(loc_message(s,loc.m_tof,loc.m_time_stamp,loc.m_card_id,
-						loc.m_card_ct,loc.m_card_type,loc.m_ant_id,loc.m_rav,loc.m_acc,
-						loc.m_sync_ct,loc.m_rssi));
 
 			//启动本CT的最小、最大两个定时器
-			m_min_timer.start(m_ac->min_wait_time);
-			m_max_timer.start(m_ac->max_wait_time);
+			m_min_timer->start(m_ac->min_wait_time);
+			m_max_timer->start(m_ac->max_wait_time);
 			return;
 		}
 
-		m_msg_list.push_back(loc_message(s,loc.m_tof,loc.m_time_stamp,loc.m_card_id,
-					loc.m_card_ct,loc.m_card_type,loc.m_ant_id,loc.m_rav,loc.m_acc,
-					loc.m_sync_ct,loc.m_rssi));
-
-		if(m_min_timeout && (int)m_msg_list.size()>=m_ac->best_msg_cnt)
+		if(m_min_timer->remaining()<0 && (int)m_msg_list.size()>=m_ac->best_msg_cnt)
 		{
 			calc_location();
-			m_max_timer.stop();
+			m_max_timer->stop();
 		}
 	}
 
 	void calc_location()
 	{
-		auto v = m_msg_list;
+		auto&v = m_msg_list;
 		if(v.empty())
 		{
 			return;
@@ -122,7 +116,7 @@ struct one_ct_message_handle
 
 		if(!rc.empty()) m_card->on_location(std::move(rc),v);
 
-		reset();
+		m_msg_list.clear();
 		log_info("calc_location_end:card_id=%d",m_card->m_id);
 	}
 };
@@ -149,6 +143,20 @@ card_message_handle::~card_message_handle()
 
 void card_message_handle::on_message(zloop<task*> * loop,const message_locinfo&loc,bool is_history)
 {
+#ifndef _RELEASE_
+	if(m_first_call)
+	{
+		m_first_call=false;
+		m_first_call_thread=std::this_thread::get_id();
+	}
+
+	if(m_first_call_thread!=std::this_thread::get_id())
+	{
+		log_error("%s\n","card_message_handle::on_message 无法支持多线程调用");
+		assert(false);
+	}
+#endif 
+
 	if(is_history)
 	{
 		log_warn("%s","当前代码没有处理历史消息记录。");

+ 7 - 0
card_message_handle.h

@@ -4,6 +4,8 @@
 
 //一张卡一个ct的所有不同天线的信息
 
+#include <thread>
+
 struct card_location_base;
 struct one_ct_message_handle;
 struct message_locinfo;
@@ -17,6 +19,11 @@ struct card_message_handle
 
 	card_message_handle(card_location_base*card);
 	~card_message_handle();
+
+#ifndef _RELEASE_
+	bool m_first_call=true;
+	std::thread::id m_first_call_thread;
+#endif 
 	
 	void on_message(zloop<task*> * loop,const message_locinfo&loc,bool is_history);
 };

+ 17 - 5
net-service.cpp

@@ -48,6 +48,7 @@ void net_service::on_timer()
 
 void net_service::on_message(std::shared_ptr<client> clt,const char*data,size_t len)
 {
+	bool message_handled=true;
 	try
 	{
 		zistream is(data,len-2);
@@ -57,13 +58,19 @@ void net_service::on_message(std::shared_ptr<client> clt,const char*data,size_t
 		{
 			case CHAR_LOCATEDATA_TOF_EXTEND://tof-扩展
 				{
+					if(!clt->check_timestamp(data+10))
+					{
+						logn_error(1,"分站数据时间戳错误:%s",clt->name().c_str());
+						break;
+					}
+
 					uint32_t site_id;
 					uint8_t  power;
 					is>>site_id>>skip(11)>>power;
 					auto site_ptr = sit_list::instance()->get(static_cast<int32_t>(site_id));
 					if(!site_ptr)
 					{
-						log_error("在全局分站列表中找不到分站:分站id=%d", site_id);
+						logn_error(1,"在全局分站列表中找不到分站:%d", site_id);
 						break;
 					}
 					site_ptr->set_client(clt);
@@ -119,17 +126,22 @@ void net_service::on_message(std::shared_ptr<client> clt,const char*data,size_t
 				break;
 			case CHAR_LOCATEDATAHIS_TOF_EXTEND://tof his
 			case CHAR_LOCATEDATAHIS_TDOA_EXTEND://tdoa his
-				break;
 			case CHAR_CTRL_READER_CMD://ctrl site message
-				break;
 			case CHAR_ADHOC://自组网数据
-				break;
+			default:
+				message_handled=false;
 		}
+
 	}
 	catch(const std::exception&e)
 	{
-		log_error("parse site message error,will close the connection:%s",clt->name().c_str());
+		logn_error(1,"分站数据处理失败,将关闭分站连接:%s",clt->name().c_str());
 		clt->close();
 	}
+
+	if(!message_handled)
+	{
+		logn_error(1,"分站数据未被处理,site=%s",clt->name().c_str());
+	}
 }
 

+ 16 - 1
test.cpp

@@ -227,6 +227,19 @@ struct web_client_http:ev::io
 		return -1;
 	}
 
+	int request(const char*what)
+	{
+		int len=strlen(what);
+
+        if(len!=zio::writev(m_fd, what, len))
+		{
+			close();
+			return -1;
+		}
+
+		return 0;
+	}
+
 	int read_until(const char*what,int timeout=10*1000)
 	{
 		int rc=0;
@@ -332,18 +345,20 @@ int main()
 {
 	unsigned char buf[32];
 	const char*s64="puVOuWb7rel6z2AVZBKnfw==";
+	const char*login="{\"cmd\":\"login\",\"data\":{\"user_name\":\"zzj\",\"user_pass\":\"111111\"}}\n";
 
 	int blen=32;
 	base64::decode((char*)s64,strlen(s64),buf,blen);
 
 	web_client_http client;
-	client.connect_ws("60.220.238.150",8086);
+	client.connect_ws("127.0.0.1",9001);
 
 	for(;;)
 	{
 		while(client.ws_parse()==0)
 			client.ws_reset();
 
+		client.request(login);
 		client.read_until("\n");
 	}
 

+ 56 - 7
worker.cpp

@@ -14,6 +14,7 @@
 #include "card_base.h"
 #include "card.h"
 #include "zloop.h"
+#include "area.h"
 
 struct hash_thread
 {
@@ -65,11 +66,10 @@ struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
 		for(task*t:task_list)
 		{
 			do_task(*t);
-			free(t);
 		}
 	}
 
-	void on_timeout()
+	void update_local_cards()
 	{
 		int version=card_list::instance()->version();
 		if(m_card_list_version!=version)
@@ -77,7 +77,11 @@ struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
 			m_card_list_version=version;
 			init_local_card_list();
 		}
+	}
 
+	void on_timeout()
+	{
+		update_local_cards();
 		for(auto&c:m_local_card_list)
 		{
 			c->on_timer();
@@ -100,7 +104,7 @@ struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
 		card_list::instance()->accept(*this);
 	}
 
-	void do_task(const task&t)
+	void do_task(task&t)
 	{
 		switch(t.m_cmd_code)
 		{
@@ -108,6 +112,7 @@ struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
 			case 0x863b://tdoa
 				log_info("card loc message%04X",t.m_cmd_code);
 				card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
+				free(&t);
 
 				//card_message::on_loc_message(this,t.m_param1);
 			break;
@@ -116,11 +121,36 @@ struct worker_thread: zloop<task*> ,visitor<std::shared_ptr<card_location_base>>
 				log_info("site history message%04X",t.m_cmd_code);
 					card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
 				//site_message::on_sync(this,t.m_param1);
+				free(&t);
 			break;
 
 			case 0x804c://ctrl site message
 				log_info("ctrl site message%04X",t.m_cmd_code);
+				free(&t);
 			break;
+
+			case 0x10001://区域业务类型修改
+			{
+				update_local_cards();
+				for(auto&c:m_local_card_list)
+				{
+					c->get_area_tool()->on_change_business(c,t);
+				}
+
+				auto&mcb=t.body<message_change_business>();
+				std::shared_ptr<area> a=area_list::instance()->get(mcb.area_id);
+
+				if(a && a->sub_frozen_count()==2)
+				{
+					a->set_business_list(std::move(mcb.new_list));
+					a->sub_frozen_count();
+				}
+
+				if(mcb.ref_count.fetch_sub(1)==1)
+				{
+					free(&t);
+				}
+			}
 		}
 	}
 
@@ -157,11 +187,27 @@ struct worker_impl:worker
 		return *m_threads[g_hash.hash_code(i)];
 	}
 
+	virtual int  num_thread()
+	{
+		return std::thread::hardware_concurrency();
+	}
+
+	bool running()
+	{
+		return m_init_flag.load()==0;
+	}
+
 	virtual void request(task*t)
 	{
 		hash(t->m_hash_id).async_request(t);
 	}
 
+	virtual void broadcast(task*tk) 
+	{
+		for(auto&thr:m_threads)
+			thr->async_request(tk);
+	}
+
 	void init(int num_thread)
 	{
 		int exp=-2;
@@ -184,11 +230,14 @@ struct worker_impl:worker
 worker_impl _worker_impl;
 worker*worker::instance()
 {
-	int num_thread=std::thread::hardware_concurrency()*2;
+	int num_thread=_worker_impl.num_thread();
+	if(!_worker_impl.running())
+	{
+		log_info("worker thread count=%d",num_thread);
+		g_hash.set_num_thread(num_thread);
+		_worker_impl.init(num_thread);
+	}
 
-	log_info("worker thread count=%d",num_thread);
-	g_hash.set_num_thread(num_thread);
-	_worker_impl.init(num_thread);
 	return &_worker_impl;
 }
 

+ 16 - 0
worker.h

@@ -36,10 +36,26 @@ struct task
 	}
 };
 
+struct area_business;
+struct message_change_business
+{
+	int area_id;
+	std::atomic<int> ref_count;
+	std::vector<area_business*> del_list,add_list,new_list;
+};
+
 struct worker
 {
 	virtual void stop()=0;
 	virtual void request(task*tk)=0;
+	virtual void broadcast(task*tk) 
+	{
+	}
+
+	virtual int  num_thread()
+	{
+		return 1;
+	}
 
 	static worker*instance();
 };

+ 76 - 18
znet.cpp

@@ -1,6 +1,7 @@
 #include <log.h>
 #include <unistd.h>
 #include <signal.h>
+#include <sys/time.h>
 #include <stdio.h>
 #include <list>
 #include <vector>
@@ -10,6 +11,7 @@
 #include <atomic>
 #include <algorithm>
 #include <fstream>
+#include <time.h>
 
 #include <zio.h>
 #include <znet.h>
@@ -21,12 +23,15 @@
 #include "crc.h"
 
 extern config_file config;
+int site_sync=config.get("site_sync",0);//分站时间同步,考虑到双IP双机情况,缺省关闭
+int site_sync_freq=config.get("site_sync.freq",60);//分站时间同步间隔
 struct client_ex:client
 {
 	virtual void on_notify()=0;
 	virtual void close_impl()=0;
 };
 
+
 struct io_context: zloop<std::shared_ptr<client>> ,service_handle
 {
 private:
@@ -41,7 +46,7 @@ public:
 	{
 		m_thread_clts.reserve(2048);
 		m_timer.set<io_context,&io_context::on_timer>(this);
-		m_timer.start(20,1);
+		m_timer.start(0,1);
 	}
 
 	virtual ~io_context()
@@ -188,6 +193,7 @@ struct sock_client:fd_io,client_ex
 	int   m_max_package_size{4096};
 
 	ev::timer  m_recv_timer;
+	ev::timer  m_sync_timer;
 //	ev::timer  m_send_timer;
 	
 	std::mutex m_mutex;
@@ -196,18 +202,26 @@ struct sock_client:fd_io,client_ex
 	size_t m_opos=0;
 	bool   m_can_write{false};
 
+	char m_timestamp[8];
+
 	sock_client(io_context&ic,const char*name,int fd,int recv_time_out,int max_package_size)
 		:fd_io(ic,fd,EV_READ|EV_WRITE)
 		,m_ic(ic)
 		,m_name(name)
 		,m_recv_timer(ic)
+		,m_sync_timer(ic)
 	{ 
 		m_max_package_size=max_package_size;
 
 //		m_recv_timer.set(ic);
-		m_recv_timer.set(recv_time_out,0);
 		m_recv_timer.set<sock_client,&sock_client::on_recv_timeout>(this);
-		m_recv_timer.start();
+		m_recv_timer.start(recv_time_out,0);
+
+		m_sync_timer.set<sock_client,&sock_client::on_sync_timeout>(this);
+		if(site_sync)
+		{
+			m_sync_timer.start(0,site_sync_freq);
+		}
 
 //		m_send_timer.set(ic);
 //		m_send_timer.set(5,0);
@@ -215,6 +229,7 @@ struct sock_client:fd_io,client_ex
 //		m_send_timer.start();
 
 		m_b=(char*)malloc(m_size);
+		m_timestamp[0]=0;
 	}
 
 	~sock_client()
@@ -222,6 +237,27 @@ struct sock_client:fd_io,client_ex
 		free(m_b);
 	}
 
+
+	bool check_timestamp(const char*time)
+	{
+		//秒、分、时、天、周、月、年, 脑残的设计
+		
+		char buf[6];
+		buf[0]=time[6];
+		buf[1]=time[5];
+		buf[2]=time[3];
+		buf[3]=time[2];
+		buf[4]=time[1];
+		buf[5]=time[0];
+
+		if(memcmp(m_timestamp,buf,6)<=0)
+		{
+			memcpy(m_timestamp,buf,6);
+			return true;
+		}
+		return false;
+	}
+
 	int type()
 	{
 		return m_type;
@@ -242,6 +278,38 @@ struct sock_client:fd_io,client_ex
 		m_ic.async_request(shared_from_this());
 	}
 
+	void on_sync_timeout()
+	{
+//	从第一个字节开始,分别表示毫秒(2字节)、秒、分、时、天、月、年
+		char buf[14]={0,12,0x78,0x3b};
+
+		struct timeval tv;
+		gettimeofday(&tv,0);
+
+		struct tm buff={0};
+		const struct tm*t=localtime_r(&tv.tv_sec,&buff);
+
+		int p=4;
+		buf[p++]=(tv.tv_usec/1000)>>8;
+		buf[p++]=(tv.tv_usec/1000)&0xff;
+
+		//为防止分站重启时发送上来过大的时间
+		m_timestamp[5]=buf[p++]=t->tm_sec;
+		m_timestamp[4]=buf[p++]=t->tm_min;
+		m_timestamp[3]=buf[p++]=t->tm_hour;
+		m_timestamp[2]=buf[p++]=t->tm_mday;
+		m_timestamp[1]=buf[p++]=t->tm_mon;
+		m_timestamp[0]=buf[p++]=t->tm_year;
+
+		uint16_t ccrc=do_crc((unsigned char*)buf+2,10);
+
+		buf[p++]=ccrc>>8;
+		buf[p++]=ccrc&0xff;
+
+		std::vector<char> tmp(buf,buf+14);
+		send(std::move(tmp));
+	}
+
 	void on_send_timeout()
 	{
 		m_ic.on_send_timeout(shared_from_this());
@@ -293,11 +361,11 @@ struct sock_client:fd_io,client_ex
 
 			if(rc==0)
 			{
-				log_info("socket %d(%s) close by remote",m_fd,m_name.c_str());
+				logn_info(1,"socket %d(%s) close by remote",m_fd,m_name.c_str());
 			}
 			else if(rc==-1)
 			{
-				log_errno("hava a error on socket %d(%s)",m_fd,m_name.c_str());
+				logn_errno(1,"hava a error on socket %d(%s)",m_fd,m_name.c_str());
 			}
 			return -1;
 		}
@@ -341,7 +409,7 @@ struct sock_client:fd_io,client_ex
 			}
 			else
 			{
-				log_errno("check_crc_error,socket close... site=%s.",m_name.c_str());
+				logn_error(1,"check_crc_error,close socket. site=%s.",m_name.c_str());
 				return -1;
 			}
 
@@ -394,7 +462,7 @@ struct sock_client:fd_io,client_ex
 			}
 			else
 			{
-				log_errno("zio::write(%d,ptr,%d)",m_fd,m_obuf.size()-m_opos);
+				logn_errno(1,"zio::write(%d,ptr,%d)",m_fd,m_obuf.size()-m_opos);
 				return -1;
 			}
 		}
@@ -412,7 +480,7 @@ struct sock_client:fd_io,client_ex
 	{
 		if(flag & EV_WRITE)
 		{
-			log_debug("socket %d(%s) can write,flag=%d." ,m_fd,m_name.c_str(),flag);
+			logn_debug(1,"socket %d(%s) can write,flag=%d." ,m_fd,m_name.c_str(),flag);
 			m_can_write=true;
 //			m_send_timer.stop();
 			if(io_write()<0)
@@ -559,13 +627,3 @@ service_handle*service_handle::instance(service_callback*sc)
 }
 
 
-
-
-
-
-
-
-
-
-
-

+ 2 - 0
znet.h

@@ -14,6 +14,8 @@ struct client:std::enable_shared_from_this<client>
 	virtual void close()=0;
 	virtual void send(std::vector<char>&&b)=0;
 
+	virtual bool check_timestamp(const char*)=0;
+
 	virtual ~client(){}
 };