#include "CDBConnPool.h"
#include <boost/bind.hpp>
#include <log.h>

namespace YADB
{
	boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity<MAX_ASYNC_QUEQUE_CAPACITY>> __AsyncQueue;//异步执行无锁队列

	CDBConnPool::CDBConnPool()
	{
		__pAsyncDBConn = 0;
	}

	CDBConnPool::~CDBConnPool()
	{
		Close();
	}

	CDBConnect * CDBConnPool::__CreateIdleConn( std::string& ConnErr, bool IsTemp )
	{
		//std::string ConnErr;

		_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
		CDBConnect* pConn = new CDBConnect( IsTemp );
		if ( !pConn->Connect( ConnSetting, ConnErr ) )
		{
			delete pConn;
			pConn = 0;
			return 0;
		}

		//如果设置了预处理SQL要准备预处理
		if ( !ConnSetting.stmtSQL.empty() )
		{
			if ( !pConn->Preparestmt( ConnSetting.stmtSQL.c_str(), ConnErr ) )
			{
				delete pConn;
				pConn = 0;
				return 0;
			}
		}

		__IdleConnList.push_back( pConn );
		return pConn;
	}

	bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting, std::string& szError )
	{
		return Create(Setting,true,szError);
	}

	bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting,bool bAsync, std::string& szError )
	{
		std::unique_lock<std::mutex> lock( __mtx );

		if ( Setting.PoolSize < DCC_MIN_COUNT )
		{
			szError = "PoolSize is too small!";
			return false;
		}

		if ( Setting.PoolSize > DCC_MAX_COUNT )
		{
			szError = "PoolSize is too big!";
			return false;
		}

		__Setting = Setting;
		//检查连接池中是否已有连接数量
		if ((int)__IdleConnList.size() < __Setting.PoolSize)
		{
			for ( int i = 0; i < __Setting.PoolSize; i++ )
			{
				CDBConnect* pConn = __CreateIdleConn( szError );
				if ( !pConn )
				{
					return false;
				}
			}	
		}
		//是否已创建异步线程
		if (bAsync || !__Running)
		{
			//创建异步执行线程
			__CreateAsyncThrdConn();

			//启动异步执行线程
			__StartAsyncThrd();
		}		
		return true;
	}

	void CDBConnPool::Close()
	{
		std::unique_lock<std::mutex> lock( __mtx );

		//停止异步执行线程
		__StopAsyncThrd();

		//??????????步执行线程连接
		__DestroyAsyncThrdConn();

		//把所有列表中的连接对象都关闭删除并清除列表
		CDBConnect* pConn = 0;
		std::list<CDBConnect*>::iterator lit_conn;
		for ( lit_conn = __BusyConnList.begin(); lit_conn != __BusyConnList.end(); lit_conn++ )
		{
			pConn = *lit_conn;
			pConn->Close();
			delete pConn;
			pConn = 0;
		}
		__BusyConnList.clear();

		for ( lit_conn = __IdleConnList.begin(); lit_conn != __IdleConnList.end(); lit_conn++ )
		{
			pConn = *lit_conn;
			pConn->Close();
			delete pConn;
			pConn = 0;
		}
		__IdleConnList.clear();
	}

	CDBConnect * CDBConnPool::GetDBConnect( std::string& Error )
	{
		std::unique_lock<std::mutex> lock( __mtx );

		CDBConnect* pConn = 0;

		if ( __IdleConnList.size() > 0 )
		{
			pConn = *(__IdleConnList.begin());
			__IdleConnList.pop_front();
			__BusyConnList.push_back( pConn );
		}
		else
		{
			//如果已经没有空闲连接,只要当前连接池数量没有超过最大连接数就创建一个临时连接
			//这个判断无意义
			if ( __IdleConnList.size() < DCC_MAX_COUNT )
			{
				pConn = __CreateIdleConn( Error, true );
				if ( !pConn )
				{
					Error = "Error,failed connect to database!";
					return 0;
				}

				__IdleConnList.pop_front();
				__BusyConnList.push_back( pConn );
			}
			else
			{
				Error = "Error,db connect count beyond the max connect count!";
				return 0;
			}
		}

		//验证看数据库连接是否还有效
		if ( pConn )
		{
			if ( pConn->ConnctionTest( Error ) != 0 )
			{
				//重连一次
				_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
				pConn->Close();
				int nRet = pConn->Connect( ConnSetting, Error );
				if ( nRet < 0 )
				{
					GiveBack( pConn );
					Error = "Error,failed connect to database!";
					return 0;
				}
			}
		}

		return pConn;
	}

	void CDBConnPool::GiveBack( CDBConnect * pConn )
	{
		std::unique_lock<std::mutex> lock( __mtx );

		if ( 0 == pConn )
		{
			return;
		}

		__BusyConnList.remove( pConn );

		//如果是临时连接,直接删除不再放入到空闲连接列表中
		if ( pConn->IsTemp() )
		{
			delete pConn;
			pConn = 0;
		}
		else
		{
			__IdleConnList.push_back( pConn );
		}
	}

	void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner )
	{
		std::string Error;
		while( pOwner->__Running )
		{
			_ASYNC_SQL_* pData = 0;

			while ( __AsyncQueue.pop( pData ) )
			{
				if ( pData )
				{
					if ( __pAsyncDBConn )
					{
						my_ulonglong llRes = 0;
						llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error ); 
						if ( (my_ulonglong)-1 == llRes )
						{
							//Execute failed, write log...
							log_error( "Error,调用ExcuteRealSql失败,Err=%s,[%s]\n", Error.c_str(),pData->SQL.c_str());
							//如果失败了看是不是数据库断开连接了,尝试重新连接一次
							if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 )
							{
								_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
								__pAsyncDBConn->Close();
								int nRet = __pAsyncDBConn->Connect( ConnSetting, Error );
								if ( nRet < 0 )
								{
									Error = "Error,failed connect to database!";
									//Connect failed, write log...
									log_error( "Error,failed connect to database,Err=%s\n", Error.c_str() );
									//如果连接失败了休息一下
									boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) );
								}
							}

							//如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行
							pData->FailedCount++;
							if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT )
							{
								_ASYNC_SQL_* pNewData = new _ASYNC_SQL_();
								pNewData->FailedCount = pData->FailedCount;
								pNewData->SQL         = pData->SQL;
								__AsyncQueue.push( pNewData );
							}
						}
					}

					delete pData;
					pData = 0;
				}
			}

			boost::this_thread::sleep_for( boost::chrono::milliseconds( 1 ) );
		}

		//线程退出
		__IsExited = true;
	}

	void CDBConnPool::__StopAsyncThrd()
	{
		if ( !__Running )
		{
			return;
		}

		//等待异步执行线程退出
		__Running = false;

		while ( !__IsExited )
		{
			boost::this_thread::sleep(boost::posix_time::millisec(1));
		}

		//把异步执行无锁队列中每个元素释放
		_ASYNC_SQL_* pData = 0;
		while ( __AsyncQueue.pop( pData ) )
		{
			if (pData)
			{
				delete pData;
				pData = 0;
			}
		}
	}

	void CDBConnPool::__StartAsyncThrd()
	{
		__Running = true;
		boost::thread thrd( boost::bind( &CDBConnPool::_AsyncThreadFunc, this, this ) );
		thrd.detach();
	}

	void CDBConnPool::__CreateAsyncThrdConn()
	{
		std::string ConnErr;
		//先断开之前的连接
		this->__DestroyAsyncThrdConn();

		_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
		CDBConnect* pConn = new CDBConnect();
		if ( !pConn->Connect( ConnSetting, ConnErr ) )
		{
			delete pConn;
			pConn = 0;
			return;
		}

		__pAsyncDBConn = pConn;
	}

	void CDBConnPool::__DestroyAsyncThrdConn()
	{
		if ( __pAsyncDBConn )
		{
			__pAsyncDBConn->Close();
			delete __pAsyncDBConn;
			__pAsyncDBConn = 0;
		}
	}

	bool CDBConnPool::PushAsync( const std::string& strSQL )
	{
		_ASYNC_SQL_* pData = new _ASYNC_SQL_;
		if ( !pData )
		{
            logn_error(2,"PushAsync new pData失败");
			return false;
		}

		pData->SQL = strSQL;
		return __AsyncQueue.push( pData );
	}

	bool CDBConnPool::Query( const char *szSql, CDBResultSet& DBRes,std::string& Error )
	{
		CDBConnect *pConn = GetDBConnect( Error );
		if ( 0 == pConn )
		{
			return false;
		}

		MYSQL_RES* pRes = pConn->Query( szSql, Error );
		GiveBack( pConn );
		return DBRes.Bind( pRes, Error );
	}

	MYSQL_RES* CDBConnPool::Query( const char *szSql, std::string& Error)
	{
		CDBConnect *pConn = GetDBConnect(Error);
		if( 0 == pConn){
			return nullptr;
		}

		MYSQL_RES* pRes = pConn->Query(szSql,Error);
		GiveBack(pConn);
		return pRes;
	}
	my_ulonglong CDBConnPool::ExecuteSql( const char *szSql, std::string& Error )
	{
		CDBConnect *pConn = GetDBConnect( Error );
		if ( 0 == pConn )
		{
			return -1;
		}
		my_ulonglong nRet = pConn->ExecuteSql( szSql, Error );
		GiveBack( pConn );
		return nRet;
		//return pConn->ExecuteSql( szSql, Error );
	}

	my_ulonglong CDBConnPool::ExecuteSqlID( const char * szSql, std::string & Error )
	{
		CDBConnect *pConn = GetDBConnect( Error );
		if ( 0 == pConn )
		{
			return -1;
		}

		my_ulonglong nRet = pConn->ExecuteSqlID( szSql, Error );

		GiveBack( pConn );
		return nRet;
	}
}