#include "StdAfx.h" #include "MysqlConnPool.h" #include "./mylog/log_module.h" #include "./system_basic_info/SystemAnalysis.h" CMysqlConnPool::CMysqlConnPool() { this->m_nMinCount = 3;//3 this->m_nMaxCount = 30; this->m_bNeedStop = FALSE; this->m_bNeedConnection = FALSE; this->m_needConnect = false; m_hMaintanceThread = INVALID_HANDLE_VALUE; m_hMaintanceThreadForConn = INVALID_HANDLE_VALUE; m_hHaveData = CreateEvent(NULL, TRUE, FALSE, _T("DataConnPool")); // 划定临界区 InitializeCriticalSectionAndSpinCount(&m_csIdleConnList, MAXCRITICALSECTIONSPINCOUNT); InitializeCriticalSectionAndSpinCount(&m_csBusyConnList, MAXCRITICALSECTIONSPINCOUNT); } CMysqlConnPool::~CMysqlConnPool() { m_hMaintanceThread = INVALID_HANDLE_VALUE; m_hMaintanceThreadForConn = INVALID_HANDLE_VALUE; m_bNeedStop = TRUE; CloseHandle(m_hHaveData); CloseHandle(m_hMaintanceThread); CloseHandle(m_hMaintanceThreadForConn); CloseAllConn(); DeleteCriticalSection(&m_csIdleConnList); DeleteCriticalSection(&m_csBusyConnList); } int CMysqlConnPool::InitAllConn() { CloseAllConn(); // 按照最小数量创建 //CMysqlConn * pMysqlConn = NULL; int nCount = 0; for(unsigned int i = 0; i < m_nMinCount; i++) { nCount = InitNewConn(); } // 创建守护线程,用来进行维护工作 if(INVALID_HANDLE_VALUE == m_hMaintanceThread) { m_hMaintanceThread = CreateThread(NULL, NULL,thread_run, (LPVOID)this, 0, NULL); } if(INVALID_HANDLE_VALUE == m_hMaintanceThreadForConn) { m_hMaintanceThreadForConn = CreateThread(NULL, NULL, thread_runforConn, (LPVOID)this, 0, NULL); } return nCount; } void CMysqlConnPool::CloseAllConn() { // 销毁数据连接,使用大锁 EnterCriticalSection(&m_csIdleConnList); DBConnectList::iterator itIdle = m_listIdleConnection.begin(); for (; itIdle != m_listIdleConnection.end();) { if(NULL != (*itIdle)){ (*itIdle)->Close(); delete (*itIdle); } itIdle = m_listIdleConnection.erase(itIdle); } LeaveCriticalSection(&m_csIdleConnList); // 使用中的链接 EnterCriticalSection(&m_csBusyConnList); DBConnectList::iterator itBusy = m_listBusyConnection.begin(); for(; itBusy != m_listBusyConnection.end();) { if(NULL != (*itBusy)) { (*itBusy)->Close(); delete(*itBusy); } //itBusy = m_listBusyConnection.erase(itBusy); m_listBusyConnection.erase(itBusy++); } LeaveCriticalSection(&m_csBusyConnList); } CMysqlConn* CMysqlConnPool::GetNewConn() { CMysqlConn * pDBEngine = NULL; // 做一个循环,反复尝试五次取连接,每次间隔1秒钟 int nTimes = 0; if(m_listBusyConnection.size() <=0 && m_listIdleConnection.size()<=0) return NULL; while ((m_listIdleConnection.size() <= 0) && (nTimes < 5)) { Sleep(1000); nTimes++; } if (5 == nTimes) { // 这样狼狈的进来肯定是没有可用连接了,记录日志退出 // g_pSvrLog->AddRunLog(LL_ERROR, _T("Waiting for a connection for a long time, but failed.")); return pDBEngine; } // 从空闲队列中取出,并且加入到使用队列中 EnterCriticalSection(&m_csIdleConnList); if (m_listIdleConnection.size() > 0) { pDBEngine = m_listIdleConnection.front(); m_listIdleConnection.pop_front(); // 加入使用的连接队列 EnterCriticalSection(&m_csBusyConnList); m_listBusyConnection.push_back(pDBEngine); LeaveCriticalSection(&m_csBusyConnList); } LeaveCriticalSection(&m_csIdleConnList); if (m_listIdleConnection.size() <= 1) { // 剩余空闲连接的数目小于等于1个时候需要检查开始创建 if ((m_listIdleConnection.size() + m_listBusyConnection.size()) < m_nMaxCount) { // 还小于最大限制,可以创建 SetEvent(m_hHaveData); m_bNeedConnection = TRUE; } else { // 超出限制了,做个记录吧 //g_pSvrLog->AddRunLog(LL_ERROR, _T("Database connection reached max count.")); } } return pDBEngine; } int CMysqlConnPool::RestoreConn( CMysqlConn* pDBEngine ) { if (NULL != pDBEngine) { // 从使用中的队列取出 EnterCriticalSection(&m_csBusyConnList); m_listBusyConnection.remove(pDBEngine); LeaveCriticalSection(&m_csBusyConnList); // 加入到空闲队列中 EnterCriticalSection(&m_csIdleConnList); m_listIdleConnection.push_back(pDBEngine); LeaveCriticalSection(&m_csIdleConnList); } EnterCriticalSection(&m_csIdleConnList); int nCount = m_listIdleConnection.size(); LeaveCriticalSection(&m_csIdleConnList); return nCount; } void CMysqlConnPool::Init(const char* szOption, const char *szHost, const char *szUser, const char *szPwd, const char *szDbName, unsigned int nPort, const char * szUnixSocket, unsigned long nClientFlag ) { m_strOption = szOption; m_strHost = szHost; m_strUser = szUser; m_strPwd = szPwd; m_strDbName = szDbName; m_nPort = nPort; if(szUnixSocket != NULL){ m_strUnixSocket = szUnixSocket; } m_nClientFlag = nClientFlag; } int CMysqlConnPool::InitNewConn() { bool bSuccess = false; CMysqlConn * pDBEngine = new CMysqlConn(m_strOption.c_str(), m_strHost.c_str(), m_strUser.c_str(), m_strPwd.c_str(), m_strDbName.c_str(), m_nPort, m_strUnixSocket.c_str(), m_nClientFlag, bSuccess); if (bSuccess) { m_bNeedConnection = FALSE; pDBEngine->setPool(this); return RestoreConn(pDBEngine); } else { delete pDBEngine; return m_listIdleConnection.size(); } } void CMysqlConnPool::CloseConn( CMysqlConn* pDBEngine ) { pDBEngine->Close(); // 从空闲队列将其删除 EnterCriticalSection(&m_csIdleConnList); m_listIdleConnection.remove(pDBEngine); LeaveCriticalSection(&m_csIdleConnList); } void CMysqlConnPool::StopThread() { m_bNeedStop = TRUE; // 因为线程是无限制等待信号的,所以这里先把标志位置为停止,然后发信号让线程检测 SetEvent(m_hHaveData); // 等待退出 WaitForSingleObject(m_hMaintanceThread, INFINITE); WaitForSingleObject(m_hMaintanceThreadForConn, INFINITE); CloseHandle(m_hMaintanceThread); CloseHandle(m_hMaintanceThreadForConn); } BOOL CMysqlConnPool::IsNeedStop() { return m_bNeedStop; } BOOL CMysqlConnPool::IsNeedConnection() { return m_bNeedConnection; } DWORD WINAPI thread_runforConn( LPVOID pdata ) { CMysqlConnPool * pConPool = (CMysqlConnPool *) pdata; while(true) { if (pConPool->m_needConnect) { int count = pConPool->InitAllConn(); if (count > 0) { pConPool->m_needConnect = false; } } //先简单这样处理 Sleep(1000*4); } return 0; } DWORD WINAPI thread_run( LPVOID pdata ) { CMysqlConnPool * pConPool = (CMysqlConnPool *) pdata; while (!pConPool->IsNeedStop()) { // 设置事件为无信号, 并且无限制等待 ResetEvent(pConPool->m_hHaveData); WaitForSingleObject(pConPool->m_hHaveData, INFINITE); if (pConPool->IsNeedConnection()) { pConPool->InitNewConn(); } } return 0; } CMysqlConn::CMysqlConn( const char *option, const char *host, const char *user, const char *password, const char* dbname, unsigned int port, const char *unix_socket, unsigned long clientflag, bool &state ) { m_option = option; m_host = host; m_user = user; m_pwd = password; m_db = dbname; m_port = port; m_unix_socket = unix_socket; m_clientflag = clientflag; m_bOpen = false; // m_pConn = mysql_init(NULL); if(NULL != m_pConn) { if( 0 == SetOption()) { if(0 == Open()) { state = true; }else{ state = false; } }else{ state = false; } } else { state = false; } // } CMysqlConn::~CMysqlConn() { Close(); } int CMysqlConn::SetOption() { if(mysql_options(m_pConn, MYSQL_SET_CHARSET_NAME, m_option)) return 1; return 0; } int CMysqlConn::Open() { HRESULT hr = ::CoInitialize(NULL); if(FAILED(hr)) return -1; //mysql_options(m_pConn, MYSQL_OPT_RECONNECT, m_option); m_clientflag |= CLIENT_MULTI_STATEMENTS; if(!mysql_real_connect(m_pConn, m_host, m_user, m_pwd, m_db, m_port, m_unix_socket, m_clientflag)) return 1; m_bOpen = true; return 0; } int CMysqlConn::Close() { if (m_bOpen) { mysql_close(m_pConn); ::CoUninitialize(); m_bOpen = false; m_pConn = NULL; } return 0; } bool CMysqlConn::IsOpen() const { return m_bOpen; } int CMysqlConn::Get_MysqlEror() { return mysql_errno(m_pConn); } MYSQL_RES * CMysqlConn::Execute( const char * strSQL, int &err) { if(strSQL == "") return NULL; MYSQL_RES * pRes; if (m_pConn == NULL) { return NULL; } int _error = mysql_query(m_pConn, strSQL); err = Get_MysqlEror(); switch (err) { case CR_COMMANDS_OUT_OF_SYNC: break; case CR_SERVER_GONE_ERROR: case CR_SERVER_LOST: { //首先要把这个链接注销掉,不然会在析构函数中放入队列。但是指针已经delete //增加一个合法性判断,标识走析构,不入队列 //pDbGuard.isVaild(); getPool()->IsNeedConn(); return NULL; } break; case CR_UNKNOWN_ERROR: default: break; } pRes = mysql_use_result(m_pConn); return pRes; } void CMysqlConn::MultiExecute( const char * strSQL, int &err) { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_89); if(strSQL == "") { err = 1; return ; } MYSQL_RES * pRes; err = mysql_query(m_pConn, strSQL); err = Get_MysqlEror(); switch (err) { case CR_COMMANDS_OUT_OF_SYNC: break; case CR_SERVER_GONE_ERROR: case CR_SERVER_LOST: { //首先要把这个链接注销掉,不然会在析构函数中放入队列。但是指针已经delete //增加一个合法性判断,标识走析构,不入队列 //pDbGuard.isVaild(); getPool()->IsNeedConn(); return ; } break; case CR_UNKNOWN_ERROR: default: break; } //每条语句对应一个pRes,需要释放每一个结果集;避免数据错乱 //不然通过mysql_error()会得知是Commands out of sync; you can't run this command now错误 do { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_90); pRes = mysql_use_result(m_pConn); mysql_free_result(pRes); } while (!mysql_next_result(m_pConn)); }