#include "CDBConnPool.h" #include #include #include namespace YADB { // boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity<32768>> __AsyncQueue;//异步执行无锁队列 std::mutex g_mutex; std::vector<_ASYNC_SQL_*> g_sql_list; CDBConnPool::CDBConnPool() { g_sql_list.reserve(1<<15); __pAsyncDBConn = 0; } CDBConnPool::~CDBConnPool() { Close(); } CDBConnect * CDBConnPool::__CreateIdleConn( std::string& ConnErr, bool IsTemp ) { _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); CDBConnect* pConn = new CDBConnect( IsTemp ); if ( !pConn->Connect( ConnSetting, ConnErr ) ) { delete pConn; pConn = 0; return 0; } //如果设置了预处理SQL要准备预处理 if ( !ConnSetting.stmtSQL.empty() ) { if ( !pConn->Preparestmt( ConnSetting.stmtSQL.c_str(), ConnErr ) ) { delete pConn; pConn = 0; return 0; } } __IdleConnList.push_back( pConn ); return pConn; } bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting, std::string& szError ) { return Create(Setting,true,szError); } bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting,bool bAsync, std::string& szError ) { std::unique_lock lock( __mtx ); if ( Setting.PoolSize < DCC_MIN_COUNT ) { szError = "PoolSize is too small!"; return false; } if ( Setting.PoolSize > DCC_MAX_COUNT ) { szError = "PoolSize is too big!"; return false; } __Setting = Setting; //检查连接池中是否已有连接数量 if ((int)__IdleConnList.size() < __Setting.PoolSize) { for ( int i = 0; i < __Setting.PoolSize; i++ ) { CDBConnect* pConn = __CreateIdleConn( szError ); if ( !pConn ) { return false; } } } //是否已创建异步线程 if (bAsync || !__Running) { //创建异步执行线程 __CreateAsyncThrdConn(); //启动异步执行线程 __StartAsyncThrd(); } return true; } void CDBConnPool::Close() { std::unique_lock lock( __mtx ); //停止异步执行线程 __StopAsyncThrd(); //??????????步执行线程连接 __DestroyAsyncThrdConn(); //把所有列表中的连接对象都关闭删除并清除列表 CDBConnect* pConn = 0; std::list::iterator lit_conn; for ( lit_conn = __BusyConnList.begin(); lit_conn != __BusyConnList.end(); lit_conn++ ) { pConn = *lit_conn; pConn->Close(); delete pConn; pConn = 0; } __BusyConnList.clear(); for ( lit_conn = __IdleConnList.begin(); lit_conn != __IdleConnList.end(); lit_conn++ ) { pConn = *lit_conn; pConn->Close(); delete pConn; pConn = 0; } __IdleConnList.clear(); } CDBConnect * CDBConnPool::GetDBConnect( std::string& Error ) { std::unique_lock lock( __mtx ); CDBConnect* pConn = 0; if ( __IdleConnList.size() > 0 ) { pConn = *(__IdleConnList.begin()); __IdleConnList.pop_front(); __BusyConnList.push_back( pConn ); } else { //如果已经没有空闲连接,只要当前连接池数量没有超过最大连接数就创建一个临时连接 //这个判断无意义 if ( __IdleConnList.size() < DCC_MAX_COUNT ) { pConn = __CreateIdleConn( Error, true ); if ( !pConn ) { Error = "Error,failed connect to database!"; return 0; } __IdleConnList.pop_front(); __BusyConnList.push_back( pConn ); } else { Error = "Error,db connect count beyond the max connect count!"; return 0; } } //验证看数据库连接是否还有效 if ( pConn ) { if ( pConn->ConnctionTest( Error ) != 0 ) { //重连一次 _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); pConn->Close(); int nRet = pConn->Connect( ConnSetting, Error ); if ( nRet < 0 ) { GiveBack( pConn ); Error = "Error,failed connect to database!"; return 0; } } } return pConn; } void CDBConnPool::GiveBack( CDBConnect * pConn ) { std::unique_lock lock( __mtx ); if ( 0 == pConn ) { return; } __BusyConnList.remove( pConn ); //如果是临时连接,直接删除不再放入到空闲连接列表中 if ( pConn->IsTemp() ) { delete pConn; pConn = 0; } else { __IdleConnList.push_back( pConn ); } } void CDBConnPool::ExecAsyncSql(_ASYNC_SQL_*pData) { std::string Error; if ( !pData ) return; if ( __pAsyncDBConn ) { my_ulonglong llRes = 0; llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error ); if ( (my_ulonglong)-1 == llRes ) { //Execute failed, write log... log_error( "Error,调用ExcuteRealSql失败,Err=%s,[%s]\n", Error.c_str(),pData->SQL.c_str()); //如果失败了看是不是数据库断开连接了,尝试重新连接一次 if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 ) { _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); __pAsyncDBConn->Close(); int nRet = __pAsyncDBConn->Connect( ConnSetting, Error ); if ( nRet < 0 ) { Error = "Error,failed connect to database!"; //Connect failed, write log... log_error( "Error,failed connect to database,Err=%s\n", Error.c_str() ); //如果连接失败了休息一下 boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) ); } } //如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行 pData->FailedCount++; if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT ) { std::unique_lock lock(g_mutex); g_sql_list.push_back(pData); return; } } } delete pData; pData = 0; } void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner ) { //uint64_t cc=0; while( pOwner->__Running ) { if(!g_sql_list.empty()){ std::vector<_ASYNC_SQL_*> sql_list; sql_list.reserve(1<<15); { std::unique_lock lock(g_mutex); sql_list.swap(g_sql_list); } //100ms太理想了。上下井高峰期这个有可能1h都没一条日志 //if(++cc % 100 == 0)//100ms 输出一次 logn_info(2,"sql_list size=%lld", sql_list.size()); //1条链接处理sql。sql堆积。速度慢 for(auto pData:sql_list) ExecAsyncSql(pData); } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } //线程退出 __IsExited = true; } void CDBConnPool::__StopAsyncThrd() { if ( !__Running ) { return; } //等待异步执行线程退出 __Running = false; while ( !__IsExited ) { boost::this_thread::sleep(boost::posix_time::millisec(1)); } //把异步执行无锁队列中每个元素释放 { std::unique_lock lock( g_mutex ); std::for_each(g_sql_list.begin(),g_sql_list.end(),[](_ASYNC_SQL_*psql){ delete psql; }); g_sql_list.clear(); #if 0 _ASYNC_SQL_* pData = 0; while ( __AsyncQueue.pop( pData ) ) { if (pData) { delete pData; pData = 0; } } #endif } } void CDBConnPool::__StartAsyncThrd() { __Running = true; boost::thread thrd( boost::bind( &CDBConnPool::_AsyncThreadFunc, this, this ) ); thrd.detach(); } void CDBConnPool::__CreateAsyncThrdConn() { std::string ConnErr; //先断开之前的连接 this->__DestroyAsyncThrdConn(); _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); CDBConnect* pConn = new CDBConnect(); if ( !pConn->Connect( ConnSetting, ConnErr ) ) { delete pConn; pConn = 0; return; } __pAsyncDBConn = pConn; } void CDBConnPool::__DestroyAsyncThrdConn() { if ( __pAsyncDBConn ) { __pAsyncDBConn->Close(); delete __pAsyncDBConn; __pAsyncDBConn = 0; } } bool CDBConnPool::PushAsync( std::string&& strSQL ) { std::unique_lock lock(g_mutex); g_sql_list.push_back(new _ASYNC_SQL_(std::move(strSQL))); return true; } bool CDBConnPool::Query( const char *szSql, CDBResultSet& DBRes,std::string& Error ) { CDBConnect *pConn = GetDBConnect( Error ); if ( 0 == pConn ) { return false; } MYSQL_RES* pRes = pConn->Query( szSql, Error ); GiveBack( pConn ); return DBRes.Bind( pRes, Error ); } MYSQL_RES* CDBConnPool::Query( const char *szSql, std::string& Error) { CDBConnect *pConn = GetDBConnect(Error); if( 0 == pConn){ return nullptr; } MYSQL_RES* pRes = pConn->Query(szSql,Error); GiveBack(pConn); return pRes; } my_ulonglong CDBConnPool::ExecuteSql( const char *szSql, std::string& Error ) { CDBConnect *pConn = GetDBConnect( Error ); if ( 0 == pConn ) { return -1; } my_ulonglong nRet = pConn->ExecuteSql( szSql, Error ); GiveBack( pConn ); return nRet; //return pConn->ExecuteSql( szSql, Error ); } my_ulonglong CDBConnPool::ExecuteSqlID( const char * szSql, std::string & Error ) { CDBConnect *pConn = GetDBConnect( Error ); if ( 0 == pConn ) { return -1; } my_ulonglong nRet = pConn->ExecuteSqlID( szSql, Error ); GiveBack( pConn ); return nRet; } }