Browse Source

修改无锁队列为无界vector,避免数据库备份时产生的阻塞导致SQL丢失

zzj 5 years ago
parent
commit
ead4ea4a5c
2 changed files with 81 additions and 59 deletions
  1. 3 2
      db/db_api/CDBCommon.h
  2. 78 57
      db/db_api/CDBConnPool.cpp

+ 3 - 2
db/db_api/CDBCommon.h

@@ -39,7 +39,7 @@ namespace YADB
 	//                                 其它定义
 	//----------------------------------------------------------------------------
 	const int MAX_ASYNC_EXEC_FAILED_COUNT   = 3;//最大异步执行失败次数
-	const int MAX_ASYNC_QUEQUE_CAPACITY     = 32 * 1024;//异步执行队列最大容量
+	const int MAX_ASYNC_QUEQUE_CAPACITY     = 1<<20;//异步执行队列最大容量
 
 	/**
 	* @brief
@@ -93,7 +93,8 @@ namespace YADB
 	{
 		int FailedCount;//执行失败次数
 		std::string SQL; //SQ语句
-		_ASYNC_SQL_()
+		_ASYNC_SQL_(std::string&&sql)
+			:SQL(sql)
 		{
 			FailedCount = 0;
 		}

+ 78 - 57
db/db_api/CDBConnPool.cpp

@@ -4,10 +4,14 @@
 
 namespace YADB
 {
-	boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity<MAX_ASYNC_QUEQUE_CAPACITY>> __AsyncQueue;//异步执行无锁队列
+//	boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity<32768>> __AsyncQueue;//异步执行无锁队列
+
+	std::mutex g_mutex;
+	std::vector<_ASYNC_SQL_*> g_sql_list;
 
 	CDBConnPool::CDBConnPool()
 	{
+		g_sql_list.reserve(1<<15);
 		__pAsyncDBConn = 0;
 	}
 
@@ -201,57 +205,68 @@ namespace YADB
 		}
 	}
 
-	void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner )
+	void CDBConnPool::ExecAsyncSql(_ASYNC_SQL_*pData)
 	{
 		std::string Error;
-		while( pOwner->__Running )
-		{
-			_ASYNC_SQL_* pData = 0;
+		if ( !pData )
+			return;
 
-			while ( __AsyncQueue.pop( pData ) )
+		if ( __pAsyncDBConn )
+		{
+			my_ulonglong llRes = 0;
+			llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error ); 
+			if ( (my_ulonglong)-1 == llRes )
 			{
-				if ( pData )
+				//Execute failed, write log...
+				log_error( "Error,调用ExcuteRealSql失败,Err=%s,[%s]\n", Error.c_str(),pData->SQL.c_str());
+				//如果失败了看是不是数据库断开连接了,尝试重新连接一次
+				if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 )
 				{
-					if ( __pAsyncDBConn )
+					_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
+					__pAsyncDBConn->Close();
+					int nRet = __pAsyncDBConn->Connect( ConnSetting, Error );
+					if ( nRet < 0 )
 					{
-						my_ulonglong llRes = 0;
-						llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error ); 
-						if ( (my_ulonglong)-1 == llRes )
-						{
-							//Execute failed, write log...
-							log_error( "Error,调用ExcuteRealSql失败,Err=%s,[%s]\n", Error.c_str(),pData->SQL.c_str());
-							//如果失败了看是不是数据库断开连接了,尝试重新连接一次
-							if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 )
-							{
-								_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
-								__pAsyncDBConn->Close();
-								int nRet = __pAsyncDBConn->Connect( ConnSetting, Error );
-								if ( nRet < 0 )
-								{
-									Error = "Error,failed connect to database!";
-									//Connect failed, write log...
-									log_error( "Error,failed connect to database,Err=%s\n", Error.c_str() );
-									//如果连接失败了休息一下
-									boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) );
-								}
-							}
-
-							//如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行
-							pData->FailedCount++;
-							if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT )
-							{
-								_ASYNC_SQL_* pNewData = new _ASYNC_SQL_();
-								pNewData->FailedCount = pData->FailedCount;
-								pNewData->SQL         = pData->SQL;
-								__AsyncQueue.push( pNewData );
-							}
-						}
+						Error = "Error,failed connect to database!";
+						//Connect failed, write log...
+						log_error( "Error,failed connect to database,Err=%s\n", Error.c_str() );
+						//如果连接失败了休息一下
+						boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) );
 					}
+				}
 
-					delete pData;
-					pData = 0;
+				//如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行
+				pData->FailedCount++;
+				if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT )
+				{
+					std::unique_lock<std::mutex> lock(g_mutex);
+					g_sql_list.push_back(pData);
+					return;
 				}
 			}
+		}
+
+		delete pData;
+		pData = 0;
+
+	}
+
+	void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner )
+	{
+		while( pOwner->__Running )
+		{
+			std::vector<_ASYNC_SQL_*> sql_list;
+			sql_list.reserve(1<<15);
+
+			{
+				std::unique_lock<std::mutex> lock(g_mutex);
+				sql_list.swap(g_sql_list);
+			}
+
+			for(auto pData:sql_list)
+			{
+				ExecAsyncSql(pData);
+			}
 
 			boost::this_thread::sleep_for( boost::chrono::milliseconds( 1 ) );
 		}
@@ -276,14 +291,26 @@ namespace YADB
 		}
 
 		//把异步执行无锁队列中每个元素释放
-		_ASYNC_SQL_* pData = 0;
-		while ( __AsyncQueue.pop( pData ) )
 		{
-			if (pData)
+			std::unique_lock<std::mutex> lock( g_mutex );
+			std::for_each(g_sql_list.begin(),g_sql_list.end(),[](_ASYNC_SQL_*psql){
+				delete psql;
+			});
+
+			g_sql_list.clear();
+
+
+			#if 0
+			_ASYNC_SQL_* pData = 0;
+			while ( __AsyncQueue.pop( pData ) )
 			{
-				delete pData;
-				pData = 0;
+				if (pData)
+				{
+					delete pData;
+					pData = 0;
+				}
 			}
+			#endif
 		}
 	}
 
@@ -322,17 +349,11 @@ namespace YADB
 		}
 	}
 
-	bool CDBConnPool::PushAsync( const std::string& strSQL )
+	bool CDBConnPool::PushAsync( std::string&& strSQL )
 	{
-		_ASYNC_SQL_* pData = new _ASYNC_SQL_;
-		if ( !pData )
-		{
-            logn_error(2,"PushAsync new pData失败");
-			return false;
-		}
-
-		pData->SQL = strSQL;
-		return __AsyncQueue.push( pData );
+		std::unique_lock<std::mutex> lock(g_mutex);
+		g_sql_list.push_back(new _ASYNC_SQL_(std::move(strSQL)));
+		return true;
 	}
 
 	bool CDBConnPool::Query( const char *szSql, CDBResultSet& DBRes,std::string& Error )