#include "stdafx.h"
#include "ProcessRemodule.h"

#include <Windows.h>
#include <iostream>
#include <exception> 
#include "log_process_module.h"

#include "./system_basic_info/SystemAnalysis.h"

using namespace std;
//#include <thread>
//#include <mutex>
//#include <chrono>

#define BUF_SIZE    5001
#define REAL_BUF    BUF_SIZE-1
ParseData* gBuf = NULL;
unsigned int gBufFlag[BUF_SIZE] = {0};
HANDLE gBufMutex;

HANDLE hThread[MSG_PROCESS_THREAD_NUM];
unsigned nThreadID[MSG_PROCESS_THREAD_NUM];

HANDLE hSqlThread[SQL_PROCESS_THREAD_NUM];
unsigned nSqlThreadID[SQL_PROCESS_THREAD_NUM];

HANDLE hSysLogThread[SYS_LOG_PROCESS_THREAD_NUM];
unsigned nSysLogThreadID[SYS_LOG_PROCESS_THREAD_NUM];

BufferUtility* BufferUtility::instance = NULL;
BufferUtility* BufferUtility::getInstance()
{
	if (instance == NULL)
	{
		instance = new BufferUtility;
	}
	return instance;
}

int BufferUtility::init()
{
	gBuf = (ParseData *)malloc(sizeof(ParseData) * BUF_SIZE);
	memset(gBuf, 0, sizeof(ParseData) * BUF_SIZE);
	memset(gBufFlag, 0, sizeof(gBufFlag));
	gBufMutex  =CreateMutex(NULL, FALSE, NULL);
	return 0;
}

ParseData* BufferUtility::getItem()
{
	int i = 0;

	WaitForSingleObject(gBufMutex,INFINITE);
	
	for(i=0; i<REAL_BUF; i++)
	{
		if(0 == gBufFlag[i])
		{
			//debug_print_syslog(0,"[getItem for:%d] for circle.",i);
			break;
		}
	}
	if(i == REAL_BUF)
	{
		//debug_print_syslog(0,"[getItem] why here");
		ReleaseMutex(gBufMutex);
		Sleep(10);
		return NULL;
	}
	//debug_print_syslog(0,"[getItem for:%d] for circle.",i);
	gBufFlag[i] = 1;
	ReleaseMutex(gBufMutex);
	return &(gBuf[i]);
}

int BufferUtility::releaseItem(int i)
{
	WaitForSingleObject(gBufMutex,INFINITE);
	//debug_print_syslog(0,"[releaseItem--i]:%d",i);
	gBufFlag[i] = 0;
	memset(&(gBuf[i]), 0, sizeof(ParseData));
	ReleaseMutex(gBufMutex);
	return 0;
}

int BufferUtility::releaseItem(ParseData* itemPtr)
{
	int i = 0;
	WaitForSingleObject(gBufMutex,INFINITE);
	for(i=0; i<REAL_BUF; i++)
	{
		if(itemPtr == (ParseData*)(&(gBuf[i])))
		{
			break;
		}
	}
	gBufFlag[i] = 0;
	memset(&(gBuf[i]), 0, sizeof(ParseData));
	//debug_print_syslog(0,"[releaseItem--item]:%d",i);
	ReleaseMutex(gBufMutex);
	return 0;
}


int message_process_thread_init()
{
	//��¼ʱ��ʼ��������
	BufferUtility* bufferInstance = BufferUtility::getInstance();
	bufferInstance->init();
	//��ʼ����Ϣ�����߳�

	int i = 0;
	for(i=0; i<MSG_PROCESS_THREAD_NUM; i++)
	{
		hThread[i] = (HANDLE)_beginthreadex( NULL, 0, &message_process_entry, NULL, 0, &(nThreadID[i]) );
		//printf("thread id: %d.\n",hThread[i]);
		if(hThread[i] == 0)
		{
			printf("start thread failed,errno:%d\n",::GetLastError());
			return 1;
		}
	}

	return 0;
}


unsigned __stdcall message_process_entry(void *param)
{
	MSG msg;
	//check for msg quene isExist or not
	PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);

	while(true)
	{
		if(GetMessage(&msg,0,0,0)) //get msg from message queue
		{
			switch(msg.message)
			{
			case TCP_MSG:
				//add message process
				ParseData* data =  (ParseData*)(msg.wParam);
#ifndef  UT_TEST
				CYAServerDlg* dlg = reinterpret_cast<CYAServerDlg*>(data->handle);
				//#ifdef TRACE_MEMORY_PARSE_PACKAGE
				try
				{
					//CYAServerDlg* handle;
					//lemon 2017/10/14 
					int len = data->len;
					CONNID dwConnID = data->dwConnID;
					BYTE buf[LENGTH_MSG_4K];
					memcpy(buf, data->buf, len);
					BufferUtility::getInstance()->releaseItem(data);
					//dlg->parse_package_data(data->buf,data->len,data->dwConnID);
					dlg->parse_package_data(buf, len, dwConnID);
				}
				catch(exception e)
				{
					debug_print_syslog(0,"[parse_package_data]exception!!~");
				}
				//#endif // TRACE_MEMORY_PARSE_PACKAGE
				//BufferUtility::getInstance()->releaseItem(data);
				break;
#else
				//printf("current thread id: %d, receive tcp msg:%d\n", GetCurrentThreadId(), data->a);
				BufferUtility::getInstance()->releaseItem(data);
#endif

			}
		}
	}
	return 0;
}

int send_tcp_thread_message(ParseData* data)
{
	//send thread message
	LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_91);
	unsigned thread_id = rand() % MSG_PROCESS_THREAD_NUM;
	if(!PostThreadMessage(nThreadID[thread_id], TCP_MSG, (WPARAM)data, 0))//post thread msg
	{
		LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_31);
		//debug_print_syslog(0, "post tcp message failed,errno:%d",::GetLastError());
		BufferUtility::getInstance()->releaseItem(data);
		return 1;
	}
	return 0;
}

int sql_process_thread_init()
{
	int i = 0;
	for(i=0; i<SQL_PROCESS_THREAD_NUM; i++)
	{
		hSqlThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sql_process_entry, NULL, 0, &(nSqlThreadID[i]) );
		if(hSqlThread[i] == 0)
		{
			printf("start thread failed,errno:%d\n",::GetLastError());
			return 1;
		}
	}
	return 0;
}

// thread function
unsigned __stdcall sql_process_entry(void *param)
{
	//printf("thread sql_process_entry start\n");
	LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_99);

	MSG msg;
	//check for msg quene isExist or not
	PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
	while(true)
	{
		if(GetMessage(&msg,0,0,0)) //get msg from message queue
		{
			switch(msg.message)
			{
			case SQL_MSG:
				LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_84);
				char * pInfo = (char *)msg.wParam;
				//printf("current thread id: %d,recv %s\n", GetCurrentThreadId(), pInfo);
				//process sql command

#ifndef  UT_TEST
				_exec_sql(pInfo);
				delete[] pInfo;
#else
				delete[] pInfo;
#endif
				break;
			}
		}
	};
	return 0;
}
int send_sql_message(char* pchr)
{
	LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_102);
	//send thread message
	unsigned thread_id = rand() % SQL_PROCESS_THREAD_NUM;

	if(!PostThreadMessage(nSqlThreadID[thread_id], SQL_MSG, (WPARAM)pchr, 0))//post thread msg
	{
		LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_32);
		debug_print_syslog(0, "post sql message failed,errno:%d", ::GetLastError());
		delete[] pchr;
		return 1;
	}
	return 0;
}

int sys_log_process_thread_init()
{
	int i = 0;
	log_module_init();
	for(i=0; i<SYS_LOG_PROCESS_THREAD_NUM; i++)
	{
		hSysLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sys_log_process_entry, NULL, 0, &(nSysLogThreadID[i]) );
		if(hSysLogThread[i] == 0)
		{
			LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_301);
			return 1;
		}
	}
	return 0;
}

// thread function
unsigned __stdcall sys_log_process_entry(void *param)
{
	MSG msg;
	//check for msg quene isExist or not
	PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
	while(true)
	{
		if(GetMessage(&msg,0,0,0)) //get msg from message queue
		{
			switch(msg.message)
			{
			case SYS_LOG_MSG:
				char * pInfo = (char *)msg.wParam;
#ifndef  UT_TEST
				debug_print(0, pInfo);
				delete[] pInfo;
#else
				delete[] pInfo;
#endif
				break;
			}
		}
	};
	return 0;
}

int service_task_init()
{
	BufferUtility* bufPtr = BufferUtility::getInstance();
	bufPtr->init();
	message_process_thread_init();
	sql_process_thread_init();
	sys_log_process_thread_init();
	::Sleep(1000);
	return 0;
}

int send_sys_log_message(char* pchr)
{
	LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_303);
	//send thread message
	if(!m_log_sys_status)
	{
		return 0;
	}
	LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_304);
	unsigned thread_id = rand() % SYS_LOG_PROCESS_THREAD_NUM;
	if(!PostThreadMessage(nSysLogThreadID[thread_id], SYS_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
	{
		LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_34);
		delete[] pchr;
		return 1;
	}
	return 0;
}

int debug_print_syslog(UINT debugLevel, const char* format, ...)
{
	LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_302);
	char *buffer = new char[LOG_BUF_SIZE];
	memset(buffer, 0, LOG_BUF_SIZE);
	va_list args;
	va_start (args, format);
	vsnprintf_s(buffer, LOG_BUF_SIZE, LOG_BUF_SIZE-1, format, args);
	va_end (args);

	send_sys_log_message(buffer);
	return 0;
}