123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- #include "CDBConnPool.h"
- #include <boost/bind.hpp>
- #include <thread>
- #include <log.h>
- namespace YADB
- {
- // boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity<32768>> __AsyncQueue;//异步执行无锁队列
- std::mutex g_mutex;
- std::vector<_ASYNC_SQL_*> g_sql_list;
- CDBConnPool::CDBConnPool()
- {
- g_sql_list.reserve(1<<15);
- __pAsyncDBConn = 0;
- }
- CDBConnPool::~CDBConnPool()
- {
- Close();
- }
- CDBConnect * CDBConnPool::__CreateIdleConn( std::string& ConnErr, bool IsTemp )
- {
- _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
- CDBConnect* pConn = new CDBConnect( IsTemp );
- if ( !pConn->Connect( ConnSetting, ConnErr ) )
- {
- delete pConn;
- pConn = 0;
- return 0;
- }
- //如果设置了预处理SQL要准备预处理
- if ( !ConnSetting.stmtSQL.empty() )
- {
- if ( !pConn->Preparestmt( ConnSetting.stmtSQL.c_str(), ConnErr ) )
- {
- delete pConn;
- pConn = 0;
- return 0;
- }
- }
- __IdleConnList.push_back( pConn );
- return pConn;
- }
- bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting, std::string& szError )
- {
- return Create(Setting,true,szError);
- }
- bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting,bool bAsync, std::string& szError )
- {
- std::unique_lock<std::mutex> lock( __mtx );
- if ( Setting.PoolSize < DCC_MIN_COUNT )
- {
- szError = "PoolSize is too small!";
- return false;
- }
- if ( Setting.PoolSize > DCC_MAX_COUNT )
- {
- szError = "PoolSize is too big!";
- return false;
- }
- __Setting = Setting;
- //检查连接池中是否已有连接数量
- if ((int)__IdleConnList.size() < __Setting.PoolSize)
- {
- for ( int i = 0; i < __Setting.PoolSize; i++ )
- {
- CDBConnect* pConn = __CreateIdleConn( szError );
- if ( !pConn )
- {
- return false;
- }
- }
- }
- //是否已创建异步线程
- if (bAsync || !__Running)
- {
- //创建异步执行线程
- __CreateAsyncThrdConn();
- //启动异步执行线程
- __StartAsyncThrd();
- }
- return true;
- }
- void CDBConnPool::Close()
- {
- std::unique_lock<std::mutex> lock( __mtx );
- //停止异步执行线程
- __StopAsyncThrd();
- //??????????步执行线程连接
- __DestroyAsyncThrdConn();
- //把所有列表中的连接对象都关闭删除并清除列表
- CDBConnect* pConn = 0;
- std::list<CDBConnect*>::iterator lit_conn;
- for ( lit_conn = __BusyConnList.begin(); lit_conn != __BusyConnList.end(); lit_conn++ )
- {
- pConn = *lit_conn;
- pConn->Close();
- delete pConn;
- pConn = 0;
- }
- __BusyConnList.clear();
- for ( lit_conn = __IdleConnList.begin(); lit_conn != __IdleConnList.end(); lit_conn++ )
- {
- pConn = *lit_conn;
- pConn->Close();
- delete pConn;
- pConn = 0;
- }
- __IdleConnList.clear();
- }
- CDBConnect * CDBConnPool::GetDBConnect( std::string& Error )
- {
- std::unique_lock<std::mutex> lock( __mtx );
- CDBConnect* pConn = 0;
- if ( __IdleConnList.size() > 0 )
- {
- pConn = *(__IdleConnList.begin());
- __IdleConnList.pop_front();
- __BusyConnList.push_back( pConn );
- }
- else
- {
- //如果已经没有空闲连接,只要当前连接池数量没有超过最大连接数就创建一个临时连接
- //这个判断无意义
- if ( __IdleConnList.size() < DCC_MAX_COUNT )
- {
- pConn = __CreateIdleConn( Error, true );
- if ( !pConn )
- {
- Error = "Error,failed connect to database!";
- return 0;
- }
- __IdleConnList.pop_front();
- __BusyConnList.push_back( pConn );
- }
- else
- {
- Error = "Error,db connect count beyond the max connect count!";
- return 0;
- }
- }
- //验证看数据库连接是否还有效
- if ( pConn )
- {
- if ( pConn->ConnctionTest( Error ) != 0 )
- {
- //重连一次
- _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
- pConn->Close();
- int nRet = pConn->Connect( ConnSetting, Error );
- if ( nRet < 0 )
- {
- GiveBack( pConn );
- Error = "Error,failed connect to database!";
- return 0;
- }
- }
- }
- return pConn;
- }
- void CDBConnPool::GiveBack( CDBConnect * pConn )
- {
- std::unique_lock<std::mutex> lock( __mtx );
- if ( 0 == pConn )
- {
- return;
- }
- __BusyConnList.remove( pConn );
- //如果是临时连接,直接删除不再放入到空闲连接列表中
- if ( pConn->IsTemp() )
- {
- delete pConn;
- pConn = 0;
- }
- else
- {
- __IdleConnList.push_back( pConn );
- }
- }
- void CDBConnPool::ExecAsyncSql(_ASYNC_SQL_*pData)
- {
- std::string Error;
- if ( !pData )
- return;
- if ( __pAsyncDBConn )
- {
- my_ulonglong llRes = 0;
- llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error );
- if ( (my_ulonglong)-1 == llRes )
- {
- //Execute failed, write log...
- log_error( "Error,调用ExcuteRealSql失败,Err=%s,[%s]\n", Error.c_str(),pData->SQL.c_str());
- //如果失败了看是不是数据库断开连接了,尝试重新连接一次
- if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 )
- {
- _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
- __pAsyncDBConn->Close();
- int nRet = __pAsyncDBConn->Connect( ConnSetting, Error );
- if ( nRet < 0 )
- {
- Error = "Error,failed connect to database!";
- //Connect failed, write log...
- log_error( "Error,failed connect to database,Err=%s\n", Error.c_str() );
- //如果连接失败了休息一下
- boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) );
- }
- }
- //如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行
- pData->FailedCount++;
- if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT )
- {
- std::unique_lock<std::mutex> lock(g_mutex);
- g_sql_list.push_back(pData);
- return;
- }
- }
- }
- delete pData;
- pData = 0;
- }
- void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner )
- {
- //uint64_t cc=0;
- while( pOwner->__Running )
- {
- if(!g_sql_list.empty()){
- std::vector<_ASYNC_SQL_*> sql_list;
- sql_list.reserve(1<<15);
-
- {
- std::unique_lock<std::mutex> lock(g_mutex);
- sql_list.swap(g_sql_list);
- }
- //100ms太理想了。上下井高峰期这个有可能1h都没一条日志
- //if(++cc % 100 == 0)//100ms 输出一次
- logn_info(2,"sql_list size=%lld", sql_list.size());
- //1条链接处理sql。sql堆积。速度慢
- for(auto pData:sql_list)
- ExecAsyncSql(pData);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- //线程退出
- __IsExited = true;
- }
- void CDBConnPool::__StopAsyncThrd()
- {
- if ( !__Running )
- {
- return;
- }
- //等待异步执行线程退出
- __Running = false;
- while ( !__IsExited )
- {
- boost::this_thread::sleep(boost::posix_time::millisec(1));
- }
- //把异步执行无锁队列中每个元素释放
- {
- std::unique_lock<std::mutex> lock( g_mutex );
- std::for_each(g_sql_list.begin(),g_sql_list.end(),[](_ASYNC_SQL_*psql){
- delete psql;
- });
- g_sql_list.clear();
- #if 0
- _ASYNC_SQL_* pData = 0;
- while ( __AsyncQueue.pop( pData ) )
- {
- if (pData)
- {
- delete pData;
- pData = 0;
- }
- }
- #endif
- }
- }
- void CDBConnPool::__StartAsyncThrd()
- {
- __Running = true;
- boost::thread thrd( boost::bind( &CDBConnPool::_AsyncThreadFunc, this, this ) );
- thrd.detach();
- }
- void CDBConnPool::__CreateAsyncThrdConn()
- {
- std::string ConnErr;
- //先断开之前的连接
- this->__DestroyAsyncThrdConn();
- _DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
- CDBConnect* pConn = new CDBConnect();
- if ( !pConn->Connect( ConnSetting, ConnErr ) )
- {
- delete pConn;
- pConn = 0;
- return;
- }
- __pAsyncDBConn = pConn;
- }
- void CDBConnPool::__DestroyAsyncThrdConn()
- {
- if ( __pAsyncDBConn )
- {
- __pAsyncDBConn->Close();
- delete __pAsyncDBConn;
- __pAsyncDBConn = 0;
- }
- }
- bool CDBConnPool::PushAsync( std::string&& strSQL )
- {
- std::unique_lock<std::mutex> lock(g_mutex);
- g_sql_list.push_back(new _ASYNC_SQL_(std::move(strSQL)));
- return true;
- }
- bool CDBConnPool::Query( const char *szSql, CDBResultSet& DBRes,std::string& Error )
- {
- CDBConnect *pConn = GetDBConnect( Error );
- if ( 0 == pConn )
- {
- return false;
- }
- MYSQL_RES* pRes = pConn->Query( szSql, Error );
- GiveBack( pConn );
- return DBRes.Bind( pRes, Error );
- }
- MYSQL_RES* CDBConnPool::Query( const char *szSql, std::string& Error)
- {
- CDBConnect *pConn = GetDBConnect(Error);
- if( 0 == pConn){
- return nullptr;
- }
- MYSQL_RES* pRes = pConn->Query(szSql,Error);
- GiveBack(pConn);
- return pRes;
- }
- my_ulonglong CDBConnPool::ExecuteSql( const char *szSql, std::string& Error )
- {
- CDBConnect *pConn = GetDBConnect( Error );
- if ( 0 == pConn )
- {
- return -1;
- }
- my_ulonglong nRet = pConn->ExecuteSql( szSql, Error );
- GiveBack( pConn );
- return nRet;
- //return pConn->ExecuteSql( szSql, Error );
- }
- my_ulonglong CDBConnPool::ExecuteSqlID( const char * szSql, std::string & Error )
- {
- CDBConnect *pConn = GetDBConnect( Error );
- if ( 0 == pConn )
- {
- return -1;
- }
- my_ulonglong nRet = pConn->ExecuteSqlID( szSql, Error );
- GiveBack( pConn );
- return nRet;
- }
- }
|