#include "stdafx.h" #include "ProcessRemodule.h" #include <Windows.h> #include <iostream> #include <exception> #include "log_process_module.h" #include "./system_basic_info/SystemAnalysis.h" using namespace std; //#include <thread> //#include <mutex> //#include <chrono> #define BUF_SIZE 5001 #define REAL_BUF BUF_SIZE-1 ParseData* gBuf = NULL; unsigned int gBufFlag[BUF_SIZE] = {0}; HANDLE gBufMutex; HANDLE hThread[MSG_PROCESS_THREAD_NUM]; unsigned nThreadID[MSG_PROCESS_THREAD_NUM]; HANDLE hSqlThread[SQL_PROCESS_THREAD_NUM]; unsigned nSqlThreadID[SQL_PROCESS_THREAD_NUM]; HANDLE hSysLogThread[SYS_LOG_PROCESS_THREAD_NUM]; unsigned nSysLogThreadID[SYS_LOG_PROCESS_THREAD_NUM]; BufferUtility* BufferUtility::instance = NULL; BufferUtility* BufferUtility::getInstance() { if (instance == NULL) { instance = new BufferUtility; } return instance; } int BufferUtility::init() { gBuf = (ParseData *)malloc(sizeof(ParseData) * BUF_SIZE); memset(gBuf, 0, sizeof(ParseData) * BUF_SIZE); memset(gBufFlag, 0, sizeof(gBufFlag)); gBufMutex =CreateMutex(NULL, FALSE, NULL); return 0; } ParseData* BufferUtility::getItem() { int i = 0; WaitForSingleObject(gBufMutex,INFINITE); for(i=0; i<REAL_BUF; i++) { if(0 == gBufFlag[i]) { //debug_print_syslog(0,"[getItem for:%d] for circle.",i); break; } } if(i == REAL_BUF) { //debug_print_syslog(0,"[getItem] why here"); ReleaseMutex(gBufMutex); Sleep(10); return NULL; } //debug_print_syslog(0,"[getItem for:%d] for circle.",i); gBufFlag[i] = 1; ReleaseMutex(gBufMutex); return &(gBuf[i]); } int BufferUtility::releaseItem(int i) { WaitForSingleObject(gBufMutex,INFINITE); //debug_print_syslog(0,"[releaseItem--i]:%d",i); gBufFlag[i] = 0; memset(&(gBuf[i]), 0, sizeof(ParseData)); ReleaseMutex(gBufMutex); return 0; } int BufferUtility::releaseItem(ParseData* itemPtr) { int i = 0; WaitForSingleObject(gBufMutex,INFINITE); for(i=0; i<REAL_BUF; i++) { if(itemPtr == (ParseData*)(&(gBuf[i]))) { break; } } gBufFlag[i] = 0; memset(&(gBuf[i]), 0, sizeof(ParseData)); //debug_print_syslog(0,"[releaseItem--item]:%d",i); ReleaseMutex(gBufMutex); return 0; } int message_process_thread_init() { //��¼ʱ��ʼ�������� BufferUtility* bufferInstance = BufferUtility::getInstance(); bufferInstance->init(); //��ʼ����Ϣ�����߳� int i = 0; for(i=0; i<MSG_PROCESS_THREAD_NUM; i++) { hThread[i] = (HANDLE)_beginthreadex( NULL, 0, &message_process_entry, NULL, 0, &(nThreadID[i]) ); //printf("thread id: %d.\n",hThread[i]); if(hThread[i] == 0) { printf("start thread failed,errno:%d\n",::GetLastError()); return 1; } } return 0; } unsigned __stdcall message_process_entry(void *param) { MSG msg; //check for msg quene isExist or not PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); while(true) { if(GetMessage(&msg,0,0,0)) //get msg from message queue { switch(msg.message) { case TCP_MSG: //add message process ParseData* data = (ParseData*)(msg.wParam); #ifndef UT_TEST CYAServerDlg* dlg = reinterpret_cast<CYAServerDlg*>(data->handle); //#ifdef TRACE_MEMORY_PARSE_PACKAGE try { //CYAServerDlg* handle; //lemon 2017/10/14 int len = data->len; CONNID dwConnID = data->dwConnID; BYTE buf[LENGTH_MSG_4K]; memcpy(buf, data->buf, len); BufferUtility::getInstance()->releaseItem(data); //dlg->parse_package_data(data->buf,data->len,data->dwConnID); dlg->parse_package_data(buf, len, dwConnID); } catch(exception e) { debug_print_syslog(0,"[parse_package_data]exception!!~"); } //#endif // TRACE_MEMORY_PARSE_PACKAGE //BufferUtility::getInstance()->releaseItem(data); break; #else //printf("current thread id: %d, receive tcp msg:%d\n", GetCurrentThreadId(), data->a); BufferUtility::getInstance()->releaseItem(data); #endif } } } return 0; } int send_tcp_thread_message(ParseData* data) { //send thread message LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_91); unsigned thread_id = rand() % MSG_PROCESS_THREAD_NUM; if(!PostThreadMessage(nThreadID[thread_id], TCP_MSG, (WPARAM)data, 0))//post thread msg { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_31); //debug_print_syslog(0, "post tcp message failed,errno:%d",::GetLastError()); BufferUtility::getInstance()->releaseItem(data); return 1; } return 0; } int sql_process_thread_init() { int i = 0; for(i=0; i<SQL_PROCESS_THREAD_NUM; i++) { hSqlThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sql_process_entry, NULL, 0, &(nSqlThreadID[i]) ); if(hSqlThread[i] == 0) { printf("start thread failed,errno:%d\n",::GetLastError()); return 1; } } return 0; } // thread function unsigned __stdcall sql_process_entry(void *param) { //printf("thread sql_process_entry start\n"); LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_99); MSG msg; //check for msg quene isExist or not PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); while(true) { if(GetMessage(&msg,0,0,0)) //get msg from message queue { switch(msg.message) { case SQL_MSG: LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_84); char * pInfo = (char *)msg.wParam; //printf("current thread id: %d,recv %s\n", GetCurrentThreadId(), pInfo); //process sql command #ifndef UT_TEST _exec_sql(pInfo); delete[] pInfo; #else delete[] pInfo; #endif break; } } }; return 0; } int send_sql_message(char* pchr) { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_102); //send thread message unsigned thread_id = rand() % SQL_PROCESS_THREAD_NUM; if(!PostThreadMessage(nSqlThreadID[thread_id], SQL_MSG, (WPARAM)pchr, 0))//post thread msg { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_32); debug_print_syslog(0, "post sql message failed,errno:%d", ::GetLastError()); delete[] pchr; return 1; } return 0; } int sys_log_process_thread_init() { int i = 0; log_module_init(); for(i=0; i<SYS_LOG_PROCESS_THREAD_NUM; i++) { hSysLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sys_log_process_entry, NULL, 0, &(nSysLogThreadID[i]) ); if(hSysLogThread[i] == 0) { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_301); return 1; } } return 0; } // thread function unsigned __stdcall sys_log_process_entry(void *param) { MSG msg; //check for msg quene isExist or not PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); while(true) { if(GetMessage(&msg,0,0,0)) //get msg from message queue { switch(msg.message) { case SYS_LOG_MSG: char * pInfo = (char *)msg.wParam; #ifndef UT_TEST debug_print(0, pInfo); delete[] pInfo; #else delete[] pInfo; #endif break; } } }; return 0; } int service_task_init() { BufferUtility* bufPtr = BufferUtility::getInstance(); bufPtr->init(); message_process_thread_init(); sql_process_thread_init(); sys_log_process_thread_init(); ::Sleep(1000); return 0; } int send_sys_log_message(char* pchr) { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_303); //send thread message if(!m_log_sys_status) { return 0; } LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_304); unsigned thread_id = rand() % SYS_LOG_PROCESS_THREAD_NUM; if(!PostThreadMessage(nSysLogThreadID[thread_id], SYS_LOG_MSG, (WPARAM)pchr, 0))//post thread msg { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_34); delete[] pchr; return 1; } return 0; } int debug_print_syslog(UINT debugLevel, const char* format, ...) { LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_302); char *buffer = new char[LOG_BUF_SIZE]; memset(buffer, 0, LOG_BUF_SIZE); va_list args; va_start (args, format); vsnprintf_s(buffer, LOG_BUF_SIZE, LOG_BUF_SIZE-1, format, args); va_end (args); send_sys_log_message(buffer); return 0; }