#include "CDBConnPool.h" #include namespace YADB { boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity> __AsyncQueue;//异步执行无锁队列 CDBConnPool::CDBConnPool() { __pAsyncDBConn = 0; } CDBConnPool::~CDBConnPool() { Close(); } CDBConnect * CDBConnPool::__CreateIdleConn( std::string& ConnErr, bool IsTemp ) { //std::string ConnErr; _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); CDBConnect* pConn = new CDBConnect( IsTemp ); if ( !pConn->Connect( ConnSetting, ConnErr ) ) { delete pConn; pConn = 0; return 0; } //如果设置了预处理SQL要准备预处理 if ( !ConnSetting.stmtSQL.empty() ) { if ( !pConn->Preparestmt( ConnSetting.stmtSQL.c_str(), ConnErr ) ) { delete pConn; pConn = 0; return 0; } } __IdleConnList.push_back( pConn ); return pConn; } bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting, std::string& Error ) { std::unique_lock lock( __mtx ); if ( Setting.PoolSize < DCC_MIN_COUNT ) { Error = "PoolSize is too small!"; return false; } if ( Setting.PoolSize > DCC_MAX_COUNT ) { Error = "PoolSize is too big!"; return false; } __Setting = Setting; std::string ConnErr; for ( int i = 0; i < __Setting.PoolSize; i++ ) { CDBConnect* pConn = __CreateIdleConn( Error ); if ( !pConn ) { return false; } } //创建异步执行线程 __CreateAsyncThrdConn(); //启动异步执行线程 __StartAsyncThrd(); return true; } void CDBConnPool::Close() { std::unique_lock lock( __mtx ); //停止异步执行线程 __StopAsyncThrd(); //刪除异步执行线程连接 __DestroyAsyncThrdConn(); //把所有列表中的连接对象都关闭删除并清除列表 CDBConnect* pConn = 0; std::list::iterator lit_conn; for ( lit_conn = __BusyConnList.begin(); lit_conn != __BusyConnList.end(); lit_conn++ ) { pConn = *lit_conn; pConn->Close(); delete pConn; pConn = 0; } __BusyConnList.clear(); for ( lit_conn = __IdleConnList.begin(); lit_conn != __IdleConnList.end(); lit_conn++ ) { pConn = *lit_conn; pConn->Close(); delete pConn; pConn = 0; } __IdleConnList.clear(); } CDBConnect * CDBConnPool::GetDBConnect( std::string& Error ) { std::unique_lock lock( __mtx ); CDBConnect* pConn = 0; if ( __IdleConnList.size() > 0 ) { pConn = *(__IdleConnList.begin()); __IdleConnList.pop_front(); __BusyConnList.push_back( pConn ); } else { //如果已经没有空闲连接,只要当前连接池数量没有超过最大连接数就创建一个临时连接 if ( __IdleConnList.size() < DCC_MAX_COUNT ) { pConn = __CreateIdleConn( Error, true ); if ( !pConn ) { Error = "Error,failed connect to database!"; return 0; } __IdleConnList.pop_front(); __BusyConnList.push_back( pConn ); } else { Error = "Error,db connect count beyond the max connect count!"; return 0; } } //验证看数据库连接是否还有效 if ( pConn ) { if ( pConn->ConnctionTest( Error ) != 0 ) { //重连一次 _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); pConn->Close(); int nRet = pConn->Connect( ConnSetting, Error ); if ( nRet < 0 ) { GiveBack( pConn ); Error = "Error,failed connect to database!"; return 0; } } } return pConn; } void CDBConnPool::GiveBack( CDBConnect * pConn ) { std::unique_lock lock( __mtx ); if ( 0 == pConn ) { return; } __BusyConnList.remove( pConn ); //如果是临时连接,直接删除不再放入到空闲连接列表中 if ( pConn->IsTemp() ) { delete pConn; pConn = 0; } else { __IdleConnList.push_back( pConn ); } } void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner ) { std::string Error; while( pOwner->__Running ) { _ASYNC_SQL_* pData = 0; while ( __AsyncQueue.pop( pData ) ) { if ( pData ) { if ( __pAsyncDBConn ) { my_ulonglong llRes = 0; llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error ); if ( (my_ulonglong)-1 == llRes ) { //Execute failed, write log... printf( "Error,调用ExcuteRealSql失败,Err=%s\n", Error.c_str() ); //如果失败了看是不是数据库断开连接了,尝试重新连接一次 if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 ) { _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); __pAsyncDBConn->Close(); int nRet = __pAsyncDBConn->Connect( ConnSetting, Error ); if ( nRet < 0 ) { Error = "Error,failed connect to database!"; //Connect failed, write log... printf( "Error,failed connect to database,Err=%s\n", Error.c_str() ); //如果连接失败了休息一下 boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) ); } } //如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行 pData->FailedCount++; if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT ) { _ASYNC_SQL_* pNewData = new _ASYNC_SQL_(); pNewData->FailedCount = pData->FailedCount; pNewData->SQL = pData->SQL; __AsyncQueue.push( pNewData ); } } } delete pData; pData = 0; } } boost::this_thread::sleep( boost::posix_time::microseconds( 10 ) ); } //线程退出 __IsExited = true; } void CDBConnPool::__StopAsyncThrd() { if ( !__Running ) { return; } //等待异步执行线程退出 __Running = false; while ( !__IsExited ) { boost::this_thread::sleep(boost::posix_time::millisec(1)); } //把异步执行无锁队列中每个元素释放 _ASYNC_SQL_* pData = 0; while ( __AsyncQueue.pop( pData ) ) { if (pData) { delete pData; pData = 0; } } } void CDBConnPool::__StartAsyncThrd() { boost::thread thrd( boost::bind( &CDBConnPool::_AsyncThreadFunc, this, this ) ); thrd.detach(); } void CDBConnPool::__CreateAsyncThrdConn() { std::string ConnErr; _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting); CDBConnect* pConn = new CDBConnect(); if ( !pConn->Connect( ConnSetting, ConnErr ) ) { delete pConn; pConn = 0; return; } __pAsyncDBConn = pConn; } void CDBConnPool::__DestroyAsyncThrdConn() { if ( __pAsyncDBConn ) { __pAsyncDBConn->Close(); delete __pAsyncDBConn; __pAsyncDBConn = 0; } } bool CDBConnPool::PushAsync( const std::string& strSQL ) { _ASYNC_SQL_* pData = new _ASYNC_SQL_; if ( !pData ) { return false; } pData->SQL = strSQL; return __AsyncQueue.push( pData ); } bool CDBConnPool::Query( const char *szSql, CDBResultSet& DBRes,std::string& Error ) { CDBConnect *pConn = GetDBConnect( Error ); if ( 0 == pConn ) { return false; } MYSQL_RES* pRes = pConn->Query( szSql, Error ); return DBRes.Bind( pRes, Error ); } my_ulonglong CDBConnPool::ExecuteSql( const char *szSql, std::string& Error ) { CDBConnect *pConn = GetDBConnect( Error ); if ( 0 == pConn ) { return -1; } return pConn->ExecuteSql( szSql, Error ); } }