#include "CDBConnPool.h"
#include <boost/bind.hpp>
#include <thread>
#include <log.h>

namespace YADB
{
//	boost::lockfree::queue<_ASYNC_SQL_*, boost::lockfree::capacity<32768>> __AsyncQueue;//异步执行无锁队列

	std::mutex g_mutex;
	std::vector<_ASYNC_SQL_*> g_sql_list;

	CDBConnPool::CDBConnPool()
	{
		g_sql_list.reserve(1<<15);
		__pAsyncDBConn = 0;
	}

	CDBConnPool::~CDBConnPool()
	{
		Close();
	}

	CDBConnect * CDBConnPool::__CreateIdleConn( std::string& ConnErr, bool IsTemp )
	{
		//std::string ConnErr;

		_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
		CDBConnect* pConn = new CDBConnect( IsTemp );
		if ( !pConn->Connect( ConnSetting, ConnErr ) )
		{
			delete pConn;
			pConn = 0;
			return 0;
		}

		//如果设置了预处理SQL要准备预处理
		if ( !ConnSetting.stmtSQL.empty() )
		{
			if ( !pConn->Preparestmt( ConnSetting.stmtSQL.c_str(), ConnErr ) )
			{
				delete pConn;
				pConn = 0;
				return 0;
			}
		}

		__IdleConnList.push_back( pConn );
		return pConn;
	}

	bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting, std::string& szError )
	{
		return Create(Setting,true,szError);
	}

	bool CDBConnPool::Create( const _DB_POOL_SETTING_& Setting,bool bAsync, std::string& szError )
	{
		std::unique_lock<std::mutex> lock( __mtx );

		if ( Setting.PoolSize < DCC_MIN_COUNT )
		{
			szError = "PoolSize is too small!";
			return false;
		}

		if ( Setting.PoolSize > DCC_MAX_COUNT )
		{
			szError = "PoolSize is too big!";
			return false;
		}

		__Setting = Setting;
		//检查连接池中是否已有连接数量
		if ((int)__IdleConnList.size() < __Setting.PoolSize)
		{
			for ( int i = 0; i < __Setting.PoolSize; i++ )
			{
				CDBConnect* pConn = __CreateIdleConn( szError );
				if ( !pConn )
				{
					return false;
				}
			}	
		}
		//是否已创建异步线程
		if (bAsync || !__Running)
		{
			//创建异步执行线程
			__CreateAsyncThrdConn();

			//启动异步执行线程
			__StartAsyncThrd();
		}		
		return true;
	}

	void CDBConnPool::Close()
	{
		std::unique_lock<std::mutex> lock( __mtx );

		//停止异步执行线程
		__StopAsyncThrd();

		//??????????步执行线程连接
		__DestroyAsyncThrdConn();

		//把所有列表中的连接对象都关闭删除并清除列表
		CDBConnect* pConn = 0;
		std::list<CDBConnect*>::iterator lit_conn;
		for ( lit_conn = __BusyConnList.begin(); lit_conn != __BusyConnList.end(); lit_conn++ )
		{
			pConn = *lit_conn;
			pConn->Close();
			delete pConn;
			pConn = 0;
		}
		__BusyConnList.clear();

		for ( lit_conn = __IdleConnList.begin(); lit_conn != __IdleConnList.end(); lit_conn++ )
		{
			pConn = *lit_conn;
			pConn->Close();
			delete pConn;
			pConn = 0;
		}
		__IdleConnList.clear();
	}

	CDBConnect * CDBConnPool::GetDBConnect( std::string& Error )
	{
		std::unique_lock<std::mutex> lock( __mtx );

		CDBConnect* pConn = 0;

		if ( __IdleConnList.size() > 0 )
		{
			pConn = *(__IdleConnList.begin());
			__IdleConnList.pop_front();
			__BusyConnList.push_back( pConn );
		}
		else
		{
			//如果已经没有空闲连接,只要当前连接池数量没有超过最大连接数就创建一个临时连接
			//这个判断无意义
			if ( __IdleConnList.size() < DCC_MAX_COUNT )
			{
				pConn = __CreateIdleConn( Error, true );
				if ( !pConn )
				{
					Error = "Error,failed connect to database!";
					return 0;
				}

				__IdleConnList.pop_front();
				__BusyConnList.push_back( pConn );
			}
			else
			{
				Error = "Error,db connect count beyond the max connect count!";
				return 0;
			}
		}

		//验证看数据库连接是否还有效
		if ( pConn )
		{
			if ( pConn->ConnctionTest( Error ) != 0 )
			{
				//重连一次
				_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
				pConn->Close();
				int nRet = pConn->Connect( ConnSetting, Error );
				if ( nRet < 0 )
				{
					GiveBack( pConn );
					Error = "Error,failed connect to database!";
					return 0;
				}
			}
		}

		return pConn;
	}

	void CDBConnPool::GiveBack( CDBConnect * pConn )
	{
		std::unique_lock<std::mutex> lock( __mtx );

		if ( 0 == pConn )
		{
			return;
		}

		__BusyConnList.remove( pConn );

		//如果是临时连接,直接删除不再放入到空闲连接列表中
		if ( pConn->IsTemp() )
		{
			delete pConn;
			pConn = 0;
		}
		else
		{
			__IdleConnList.push_back( pConn );
		}
	}

	void CDBConnPool::ExecAsyncSql(_ASYNC_SQL_*pData)
	{
		std::string Error;
		if ( !pData )
			return;

		if ( __pAsyncDBConn )
		{
			my_ulonglong llRes = 0;
			llRes = __pAsyncDBConn->ExecuteRealSql( pData->SQL.c_str(), Error ); 
			if ( (my_ulonglong)-1 == llRes )
			{
				//Execute failed, write log...
				log_error( "Error,调用ExcuteRealSql失败,Err=%s,[%s]\n", Error.c_str(),pData->SQL.c_str());
				//如果失败了看是不是数据库断开连接了,尝试重新连接一次
				if ( __pAsyncDBConn->ConnctionTest( Error ) != 0 )
				{
					_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
					__pAsyncDBConn->Close();
					int nRet = __pAsyncDBConn->Connect( ConnSetting, Error );
					if ( nRet < 0 )
					{
						Error = "Error,failed connect to database!";
						//Connect failed, write log...
						log_error( "Error,failed connect to database,Err=%s\n", Error.c_str() );
						//如果连接失败了休息一下
						boost::this_thread::sleep( boost::posix_time::milliseconds( 100 ) );
					}
				}

				//如果执行失败,失败次数加一,失败次数小于最大失败次数放到队尾下次再执行
				pData->FailedCount++;
				if ( pData->FailedCount < MAX_ASYNC_EXEC_FAILED_COUNT )
				{
					std::unique_lock<std::mutex> lock(g_mutex);
					g_sql_list.push_back(pData);
					return;
				}
			}
		}

		delete pData;
		pData = 0;

	}

	void CDBConnPool::_AsyncThreadFunc( CDBConnPool* pOwner )
	{
		//uint64_t cc=0;
		while( pOwner->__Running )
		{
            if(!g_sql_list.empty()){
		    	std::vector<_ASYNC_SQL_*> sql_list;
		    	sql_list.reserve(1<<15);
                
		    	{
		    		std::unique_lock<std::mutex> lock(g_mutex);
		    		sql_list.swap(g_sql_list);
		    	}

                //100ms太理想了。上下井高峰期这个有可能1h都没一条日志
		    	//if(++cc % 100 == 0)//100ms 输出一次
		    	logn_info(2,"sql_list size=%lld", sql_list.size());
                //1条链接处理sql。sql堆积。速度慢 
		    	for(auto pData:sql_list)
		    		ExecAsyncSql(pData);
            }
			std::this_thread::sleep_for(std::chrono::milliseconds(1));
		}

		//线程退出
		__IsExited = true;
	}

	void CDBConnPool::__StopAsyncThrd()
	{
		if ( !__Running )
		{
			return;
		}

		//等待异步执行线程退出
		__Running = false;

		while ( !__IsExited )
		{
			boost::this_thread::sleep(boost::posix_time::millisec(1));
		}

		//把异步执行无锁队列中每个元素释放
		{
			std::unique_lock<std::mutex> lock( g_mutex );
			std::for_each(g_sql_list.begin(),g_sql_list.end(),[](_ASYNC_SQL_*psql){
				delete psql;
			});

			g_sql_list.clear();


			#if 0
			_ASYNC_SQL_* pData = 0;
			while ( __AsyncQueue.pop( pData ) )
			{
				if (pData)
				{
					delete pData;
					pData = 0;
				}
			}
			#endif
		}
	}

	void CDBConnPool::__StartAsyncThrd()
	{
		__Running = true;
		boost::thread thrd( boost::bind( &CDBConnPool::_AsyncThreadFunc, this, this ) );
		thrd.detach();
	}

	void CDBConnPool::__CreateAsyncThrdConn()
	{
		std::string ConnErr;
		//先断开之前的连接
		this->__DestroyAsyncThrdConn();

		_DB_CONN_SETTING_ ConnSetting = static_cast< _DB_CONN_SETTING_ >(__Setting);
		CDBConnect* pConn = new CDBConnect();
		if ( !pConn->Connect( ConnSetting, ConnErr ) )
		{
			delete pConn;
			pConn = 0;
			return;
		}

		__pAsyncDBConn = pConn;
	}

	void CDBConnPool::__DestroyAsyncThrdConn()
	{
		if ( __pAsyncDBConn )
		{
			__pAsyncDBConn->Close();
			delete __pAsyncDBConn;
			__pAsyncDBConn = 0;
		}
	}

	bool CDBConnPool::PushAsync( std::string&& strSQL )
	{
		std::unique_lock<std::mutex> lock(g_mutex);
		g_sql_list.push_back(new _ASYNC_SQL_(std::move(strSQL)));
		return true;
	}

	bool CDBConnPool::Query( const char *szSql, CDBResultSet& DBRes,std::string& Error )
	{
		CDBConnect *pConn = GetDBConnect( Error );
		if ( 0 == pConn )
		{
			return false;
		}

		MYSQL_RES* pRes = pConn->Query( szSql, Error );
		GiveBack( pConn );
		return DBRes.Bind( pRes, Error );
	}

	MYSQL_RES* CDBConnPool::Query( const char *szSql, std::string& Error)
	{
		CDBConnect *pConn = GetDBConnect(Error);
		if( 0 == pConn){
			return nullptr;
		}

		MYSQL_RES* pRes = pConn->Query(szSql,Error);
		GiveBack(pConn);
		return pRes;
	}
	my_ulonglong CDBConnPool::ExecuteSql( const char *szSql, std::string& Error )
	{
		CDBConnect *pConn = GetDBConnect( Error );
		if ( 0 == pConn )
		{
			return -1;
		}
		my_ulonglong nRet = pConn->ExecuteSql( szSql, Error );
		GiveBack( pConn );
		return nRet;
		//return pConn->ExecuteSql( szSql, Error );
	}

	my_ulonglong CDBConnPool::ExecuteSqlID( const char * szSql, std::string & Error )
	{
		CDBConnect *pConn = GetDBConnect( Error );
		if ( 0 == pConn )
		{
			return -1;
		}

		my_ulonglong nRet = pConn->ExecuteSqlID( szSql, Error );

		GiveBack( pConn );
		return nRet;
	}
}