zzj 5 years ago
parent
commit
d9d580b051
9 changed files with 41 additions and 22 deletions
  1. 1 1
      Makefile.am
  2. 1 1
      event.cpp
  3. 1 1
      event.h
  4. 1 2
      net-service.cpp
  5. 11 0
      websocket/wsClient.cpp
  6. 3 0
      websocket/wsClient.h
  7. 9 14
      websocket/wsClientMgr.cpp
  8. 10 0
      worker.cpp
  9. 4 3
      znet.cpp

+ 1 - 1
Makefile.am

@@ -38,7 +38,7 @@ yals_LDADD=db/libyadb.a websocket/libwebsocket.a
 
 async_SOURCES=async.cpp
 async_CPPFLAGS=${AM_CPPFLAGS}
-async_LDFLAGS=${AM_LDFLAGS}  -L. -lzlog -lrt
+async_LDFLAGS=${AM_LDFLAGS}  -L. -lzlog -lrt -lev
 
 client_SOURCES=client.cpp message_file.cpp
 client_CPPFLAGS=${AM_CPPFLAGS}

+ 1 - 1
event.cpp

@@ -59,9 +59,9 @@ struct card_event:Event
     virtual std::shared_ptr<ya_event> on_message(EVENT_TYPE et,uint64_t id,bool f);
 };
 
+static event_tool et;
 event_tool * event_tool::instance()
 {
-    static event_tool et;
     return &et;
 }
 void event_tool::make_event_object()

+ 1 - 1
event.h

@@ -86,11 +86,11 @@ struct event_tool
 	}
 	void handle_event(OBJECT_TYPE ot,EVENT_TYPE et,uint64_t id,double limit_value,double cur_value,bool f,EVENT_DIS_TYPE edt=DT_COMMON,const std::string &desc="");
 	static event_tool * instance();
-private:
     event_tool()
     {
         make_event_object();
     }
+private:
     void make_event_object();
     std::map<OBJECT_TYPE,std::shared_ptr<Event>>  m_map;
 };

+ 1 - 2
net-service.cpp

@@ -1,4 +1,3 @@
-
 #include <thread>
 #include <mutex>
 #include <atomic>
@@ -80,7 +79,7 @@ void net_service::on_message(const std::shared_ptr<client> &clt,const char*data,
 						char timebuf[64]{0};
 						strftime(timebuf,64,"%F %T",&site_tm);
 						logn_info(1,"分站数据信息:net=%s,sid=%d,tm=%s,sct=%d,power=%s", clt->name().c_str(), site_id,timebuf,
-										site_ct, site_ptr->m_power_check_enable?((power&1)?"ac":"dc"):"unkown");
+										site_ct, site_ptr->m_power_check_enable?((power&1)?"ac":"dc"):"??");
 
 						time_t site_time=mktime(&site_tm);
 						double diff=difftime(site_time, site_ptr->last_site_time());

+ 11 - 0
websocket/wsClient.cpp

@@ -36,6 +36,17 @@ namespace YA
 	
 	}
 
+	std::shared_ptr<wsClient> wsClient::clone()
+	{
+		std::shared_ptr<wsClient> ret=std::make_shared<wsClient>();
+
+		ret->__ID=__ID;
+		ret->__uri=__uri;
+		ret->__MsgFuncList=__MsgFuncList;
+
+		return ret;
+	}
+
 	void wsClient::init( int ID, const std::string & uri, const std::map<std::string, MSG_HANDLE_FUNC_TYPE>& MsgFuncList )
 	{
 		__ID          = ID;

+ 3 - 0
websocket/wsClient.h

@@ -116,6 +116,9 @@ namespace YA
 	public:
 		wsClient();
 		~wsClient();
+
+		std::shared_ptr<wsClient> clone();
+
 		/**
 		* @brief	初始化函数。
 		* @param  [in] int ID  当前客户端的唯一标识\n

+ 9 - 14
websocket/wsClientMgr.cpp

@@ -166,22 +166,17 @@ namespace YA
 
     void wsClientMgr::Reconnect(std::shared_ptr<wsClient> & ws)
     {
-		if (nullptr != ws)
-		{
-//			std_info("wsClinet Reconnect : id[%d],url[%s]", ws->GetID(), ws->get_uri().c_str());
-			log_info("wsClinet Reconnect : id[%d],url[%s]", ws->GetID(), ws->get_uri().c_str());
-
-			std::shared_ptr<wsClient> pClient = std::make_shared<wsClient>();
-			pClient->init( ws->GetID(), ws->get_uri(), ws->__MsgFuncList );
+		if (nullptr == ws)
+			return;
 
-			ws=pClient;
+		ws=ws->clone();
+		log_info("wsClinet Reconnect : id[%d],url[%s]", ws->GetID(), ws->get_uri().c_str());
 
-			if (ws->connect() == 0)
-			{
-				ws->login();
-				log_info("wsClinet Reconnect : id[%d],url[%s] Success.", ws->GetID(), ws->get_uri().c_str());
-//				std_info("wsClinet Reconnect : id[%d],url[%s] Success.", ws->GetID(), ws->get_uri().c_str());
-			}
+		if (ws->connect() == 0)
+		{
+			ws->login();
+			log_info("wsClinet Reconnect : id[%d],url[%s] Success.", ws->GetID(), ws->get_uri().c_str());
+//			std_info("wsClinet Reconnect : id[%d],url[%s] Success.", ws->GetID(), ws->get_uri().c_str());
 		}
     }
 

+ 10 - 0
worker.cpp

@@ -52,6 +52,7 @@ void loop_thread::on_async(const std::list<task*>&task_list)
 		do_task(*t);
 	}
 }
+
 void loop_thread::loop_init()
 {
 
@@ -258,6 +259,13 @@ struct worker_impl:worker
 	std::atomic<int> m_init_flag{-2};
 	virtual void stop()
 	{
+
+		if(m_timer_worker)
+		{
+			m_timer_worker->async_stop();
+			m_timer_worker->join();
+		}
+
 		int exp=0;
 		if(!m_init_flag.compare_exchange_strong (exp,1))
 			return;
@@ -267,6 +275,8 @@ struct worker_impl:worker
 
 		for(auto&thr:m_threads)
 			thr->join();
+
+
 	}
 
 	worker_thread& hash(uint32_t i)

+ 4 - 3
znet.cpp

@@ -44,7 +44,7 @@ public:
 	{
 		m_thread_clts.reserve(2048);
 		m_timer.set<io_context,&io_context::on_timer>(this);
-		m_timer.start(0,1);
+		m_timer.start(0,0.1);
 	}
 
 	virtual ~io_context()
@@ -61,6 +61,7 @@ public:
 	}
 	void on_timer()
 	{
+		logn_info(1,"tick timer timeout.");
 		m_serv.on_timer();	
 	}
 	void on_connect(const std::shared_ptr<client> &clt)
@@ -430,7 +431,7 @@ struct sock_client:fd_io,client_ex
 					return -1;
 				}
 
-				if(msg_len<=1)
+				if(msg_len<=2)
 				{
 					logn_error(1,"package too small:%d,close socket. site=%s.",msg_len,m_name.c_str());
 					return -1;
@@ -653,7 +654,7 @@ struct main_loop:io_context
 		{
 			try
 			{
-			ev::dynamic_loop::run(0);
+				ev::dynamic_loop::run(0);
 			}
 			catch(const std::exception&e)
 			{