123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- #include "stdafx.h"
- #include "ProcessRemodule.h"
- #include <Windows.h>
- #include <iostream>
- #include <exception>
- #include "log_process_module.h"
- #include "./system_basic_info/SystemAnalysis.h"
- using namespace std;
- //#include <thread>
- //#include <mutex>
- //#include <chrono>
- #define BUF_SIZE 5001
- #define REAL_BUF BUF_SIZE-1
- ParseData* gBuf = NULL;
- unsigned int gBufFlag[BUF_SIZE] = {0};
- HANDLE gBufMutex;
- HANDLE hThread[MSG_PROCESS_THREAD_NUM];
- unsigned nThreadID[MSG_PROCESS_THREAD_NUM];
- HANDLE hSqlThread[SQL_PROCESS_THREAD_NUM];
- unsigned nSqlThreadID[SQL_PROCESS_THREAD_NUM];
- HANDLE hSysLogThread[SYS_LOG_PROCESS_THREAD_NUM];
- unsigned nSysLogThreadID[SYS_LOG_PROCESS_THREAD_NUM];
- BufferUtility* BufferUtility::instance = NULL;
- BufferUtility* BufferUtility::getInstance()
- {
- if (instance == NULL)
- {
- instance = new BufferUtility;
- }
- return instance;
- }
- int BufferUtility::init()
- {
- gBuf = (ParseData *)malloc(sizeof(ParseData) * BUF_SIZE);
- memset(gBuf, 0, sizeof(ParseData) * BUF_SIZE);
- memset(gBufFlag, 0, sizeof(gBufFlag));
- gBufMutex =CreateMutex(NULL, FALSE, NULL);
- return 0;
- }
- ParseData* BufferUtility::getItem()
- {
- int i = 0;
- WaitForSingleObject(gBufMutex,INFINITE);
-
- for(i=0; i<REAL_BUF; i++)
- {
- if(0 == gBufFlag[i])
- {
- //debug_print_syslog(0,"[getItem for:%d] for circle.",i);
- break;
- }
- }
- if(i == REAL_BUF)
- {
- //debug_print_syslog(0,"[getItem] why here");
- ReleaseMutex(gBufMutex);
- Sleep(10);
- return NULL;
- }
- //debug_print_syslog(0,"[getItem for:%d] for circle.",i);
- gBufFlag[i] = 1;
- ReleaseMutex(gBufMutex);
- return &(gBuf[i]);
- }
- int BufferUtility::releaseItem(int i)
- {
- WaitForSingleObject(gBufMutex,INFINITE);
- //debug_print_syslog(0,"[releaseItem--i]:%d",i);
- gBufFlag[i] = 0;
- memset(&(gBuf[i]), 0, sizeof(ParseData));
- ReleaseMutex(gBufMutex);
- return 0;
- }
- int BufferUtility::releaseItem(ParseData* itemPtr)
- {
- int i = 0;
- WaitForSingleObject(gBufMutex,INFINITE);
- for(i=0; i<REAL_BUF; i++)
- {
- if(itemPtr == (ParseData*)(&(gBuf[i])))
- {
- break;
- }
- }
- gBufFlag[i] = 0;
- memset(&(gBuf[i]), 0, sizeof(ParseData));
- //debug_print_syslog(0,"[releaseItem--item]:%d",i);
- ReleaseMutex(gBufMutex);
- return 0;
- }
- int message_process_thread_init()
- {
- //登录时初始化缓冲区
- BufferUtility* bufferInstance = BufferUtility::getInstance();
- bufferInstance->init();
- //初始化消息处理线程
- int i = 0;
- for(i=0; i<MSG_PROCESS_THREAD_NUM; i++)
- {
- hThread[i] = (HANDLE)_beginthreadex( NULL, 0, &message_process_entry, NULL, 0, &(nThreadID[i]) );
- //printf("thread id: %d.\n",hThread[i]);
- if(hThread[i] == 0)
- {
- printf("start thread failed,errno:%d\n",::GetLastError());
- return 1;
- }
- }
- return 0;
- }
- unsigned __stdcall message_process_entry(void *param)
- {
- MSG msg;
- //check for msg quene isExist or not
- PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
- while(true)
- {
- if(GetMessage(&msg,0,0,0)) //get msg from message queue
- {
- switch(msg.message)
- {
- case TCP_MSG:
- //add message process
- ParseData* data = (ParseData*)(msg.wParam);
- #ifndef UT_TEST
- CYAServerDlg* dlg = reinterpret_cast<CYAServerDlg*>(data->handle);
- //#ifdef TRACE_MEMORY_PARSE_PACKAGE
- try
- {
- //CYAServerDlg* handle;
- //lemon 2017/10/14
- int len = data->len;
- CONNID dwConnID = data->dwConnID;
- BYTE buf[LENGTH_MSG_4K];
- memcpy(buf, data->buf, len);
- BufferUtility::getInstance()->releaseItem(data);
- //dlg->parse_package_data(data->buf,data->len,data->dwConnID);
- dlg->parse_package_data(buf, len, dwConnID);
- }
- catch(exception e)
- {
- debug_print_syslog(0,"[parse_package_data]exception!!~");
- }
- //#endif // TRACE_MEMORY_PARSE_PACKAGE
- //BufferUtility::getInstance()->releaseItem(data);
- break;
- #else
- //printf("current thread id: %d, receive tcp msg:%d\n", GetCurrentThreadId(), data->a);
- BufferUtility::getInstance()->releaseItem(data);
- #endif
- }
- }
- }
- return 0;
- }
- int send_tcp_thread_message(ParseData* data)
- {
- //send thread message
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_91);
- unsigned thread_id = rand() % MSG_PROCESS_THREAD_NUM;
- if(!PostThreadMessage(nThreadID[thread_id], TCP_MSG, (WPARAM)data, 0))//post thread msg
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_31);
- //debug_print_syslog(0, "post tcp message failed,errno:%d",::GetLastError());
- BufferUtility::getInstance()->releaseItem(data);
- return 1;
- }
- return 0;
- }
- int sql_process_thread_init()
- {
- int i = 0;
- for(i=0; i<SQL_PROCESS_THREAD_NUM; i++)
- {
- hSqlThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sql_process_entry, NULL, 0, &(nSqlThreadID[i]) );
- if(hSqlThread[i] == 0)
- {
- printf("start thread failed,errno:%d\n",::GetLastError());
- return 1;
- }
- }
- return 0;
- }
- // thread function
- unsigned __stdcall sql_process_entry(void *param)
- {
- //printf("thread sql_process_entry start\n");
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_99);
- MSG msg;
- //check for msg quene isExist or not
- PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
- while(true)
- {
- if(GetMessage(&msg,0,0,0)) //get msg from message queue
- {
- switch(msg.message)
- {
- case SQL_MSG:
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_84);
- char * pInfo = (char *)msg.wParam;
- //printf("current thread id: %d,recv %s\n", GetCurrentThreadId(), pInfo);
- //process sql command
- #ifndef UT_TEST
- _exec_sql(pInfo);
- delete[] pInfo;
- #else
- delete[] pInfo;
- #endif
- break;
- }
- }
- };
- return 0;
- }
- int send_sql_message(char* pchr)
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_102);
- //send thread message
- unsigned thread_id = rand() % SQL_PROCESS_THREAD_NUM;
- if(!PostThreadMessage(nSqlThreadID[thread_id], SQL_MSG, (WPARAM)pchr, 0))//post thread msg
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_32);
- debug_print_syslog(0, "post sql message failed,errno:%d", ::GetLastError());
- delete[] pchr;
- return 1;
- }
- return 0;
- }
- int sys_log_process_thread_init()
- {
- int i = 0;
- log_module_init();
- for(i=0; i<SYS_LOG_PROCESS_THREAD_NUM; i++)
- {
- hSysLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sys_log_process_entry, NULL, 0, &(nSysLogThreadID[i]) );
- if(hSysLogThread[i] == 0)
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_301);
- return 1;
- }
- }
- return 0;
- }
- // thread function
- unsigned __stdcall sys_log_process_entry(void *param)
- {
- MSG msg;
- //check for msg quene isExist or not
- PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
- while(true)
- {
- if(GetMessage(&msg,0,0,0)) //get msg from message queue
- {
- switch(msg.message)
- {
- case SYS_LOG_MSG:
- char * pInfo = (char *)msg.wParam;
- #ifndef UT_TEST
- debug_print(0, pInfo);
- delete[] pInfo;
- #else
- delete[] pInfo;
- #endif
- break;
- }
- }
- };
- return 0;
- }
- int service_task_init()
- {
- BufferUtility* bufPtr = BufferUtility::getInstance();
- bufPtr->init();
- message_process_thread_init();
- sql_process_thread_init();
- sys_log_process_thread_init();
- ::Sleep(1000);
- return 0;
- }
- int send_sys_log_message(char* pchr)
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_303);
- //send thread message
- if(!m_log_sys_status)
- {
- return 0;
- }
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_304);
- unsigned thread_id = rand() % SYS_LOG_PROCESS_THREAD_NUM;
- if(!PostThreadMessage(nSysLogThreadID[thread_id], SYS_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_34);
- delete[] pchr;
- return 1;
- }
- return 0;
- }
- int debug_print_syslog(UINT debugLevel, const char* format, ...)
- {
- LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_302);
- char *buffer = new char[LOG_BUF_SIZE];
- memset(buffer, 0, LOG_BUF_SIZE);
- va_list args;
- va_start (args, format);
- vsnprintf_s(buffer, LOG_BUF_SIZE, LOG_BUF_SIZE-1, format, args);
- va_end (args);
- send_sys_log_message(buffer);
- return 0;
- }
|