ProcessRemodule.cpp 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #include "stdafx.h"
  2. #include "ProcessRemodule.h"
  3. #include <Windows.h>
  4. #include <iostream>
  5. #include <exception>
  6. #include "log_process_module.h"
  7. #include "./system_basic_info/SystemAnalysis.h"
  8. using namespace std;
  9. //#include <thread>
  10. //#include <mutex>
  11. //#include <chrono>
  12. #define BUF_SIZE 5001
  13. #define REAL_BUF BUF_SIZE-1
  14. ParseData* gBuf = NULL;
  15. unsigned int gBufFlag[BUF_SIZE] = {0};
  16. HANDLE gBufMutex;
  17. HANDLE hThread[MSG_PROCESS_THREAD_NUM];
  18. unsigned nThreadID[MSG_PROCESS_THREAD_NUM];
  19. HANDLE hSqlThread[SQL_PROCESS_THREAD_NUM];
  20. unsigned nSqlThreadID[SQL_PROCESS_THREAD_NUM];
  21. HANDLE hErrLogThread[ERR_LOG_PROCESS_THREAD_NUM];
  22. unsigned nErrLogThreadID[ERR_LOG_PROCESS_THREAD_NUM];
  23. HANDLE hSysLogThread[SYS_LOG_PROCESS_THREAD_NUM];
  24. unsigned nSysLogThreadID[SYS_LOG_PROCESS_THREAD_NUM];
  25. BufferUtility* BufferUtility::instance = NULL;
  26. BufferUtility* BufferUtility::getInstance()
  27. {
  28. if (instance == NULL)
  29. {
  30. if (instance == NULL)
  31. {
  32. instance = new BufferUtility;
  33. }
  34. }
  35. return instance;
  36. }
  37. int BufferUtility::init()
  38. {
  39. gBuf = (ParseData *)malloc(sizeof(ParseData) * BUF_SIZE);
  40. memset(gBuf, 0, sizeof(ParseData) * BUF_SIZE);
  41. memset(gBufFlag, 0, sizeof(gBufFlag));
  42. gBufMutex =CreateMutex(NULL, FALSE, NULL);
  43. return 0;
  44. }
  45. ParseData* BufferUtility::getItem()
  46. {
  47. int i = 0;
  48. WaitForSingleObject(gBufMutex,INFINITE);
  49. for(i=0; i<REAL_BUF; i++)
  50. {
  51. if(0 == gBufFlag[i])
  52. {
  53. break;
  54. }
  55. }
  56. if(i == REAL_BUF)
  57. {
  58. return NULL;
  59. }
  60. gBufFlag[i] = 1;
  61. ReleaseMutex(gBufMutex);
  62. return &(gBuf[i]);
  63. }
  64. int BufferUtility::releaseItem(int i)
  65. {
  66. WaitForSingleObject(gBufMutex,INFINITE);
  67. gBufFlag[i] = 0;
  68. memset(&(gBuf[i]), 0, sizeof(ParseData));
  69. ReleaseMutex(gBufMutex);
  70. return 0;
  71. }
  72. int BufferUtility::releaseItem(ParseData* itemPtr)
  73. {
  74. int i = 0;
  75. WaitForSingleObject(gBufMutex,INFINITE);
  76. for(i=0; i<REAL_BUF; i++)
  77. {
  78. if(itemPtr == (ParseData*)(&(gBuf[i])))
  79. {
  80. break;
  81. }
  82. }
  83. gBufFlag[i] = 0;
  84. memset(&(gBuf[i]), 0, sizeof(ParseData));
  85. ReleaseMutex(gBufMutex);
  86. return 0;
  87. }
  88. int message_process_thread_init()
  89. {
  90. //登录时初始化缓冲区
  91. BufferUtility* bufferInstance = BufferUtility::getInstance();
  92. bufferInstance->init();
  93. //初始化消息处理线程
  94. int i = 0;
  95. for(i=0; i<MSG_PROCESS_THREAD_NUM; i++)
  96. {
  97. hThread[i] = (HANDLE)_beginthreadex( NULL, 0, &message_process_entry, NULL, 0, &(nThreadID[i]) );
  98. if(hThread[i] == 0)
  99. {
  100. printf("start thread failed,errno:%d\n",::GetLastError());
  101. return 1;
  102. }
  103. }
  104. return 0;
  105. }
  106. unsigned __stdcall message_process_entry(void *param)
  107. {
  108. MSG msg;
  109. //check for msg quene isExist or not
  110. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  111. while(true)
  112. {
  113. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  114. {
  115. switch(msg.message)
  116. {
  117. case TCP_MSG:
  118. //add message process
  119. ParseData* data = (ParseData*)(msg.wParam);
  120. #ifndef UT_TEST
  121. CYAServerDlg* dlg = reinterpret_cast<CYAServerDlg*>(data->handle);
  122. //#ifdef TRACE_MEMORY_PARSE_PACKAGE
  123. try
  124. {
  125. dlg->parse_package_data(data->buf, data->len, data->dwConnID);
  126. }
  127. catch(exception e)
  128. {
  129. }
  130. //#endif // TRACE_MEMORY_PARSE_PACKAGE
  131. BufferUtility::getInstance()->releaseItem(data);
  132. break;
  133. #else
  134. //printf("current thread id: %d, receive tcp msg:%d\n", GetCurrentThreadId(), data->a);
  135. BufferUtility::getInstance()->releaseItem(data);
  136. #endif
  137. }
  138. }
  139. }
  140. return 0;
  141. }
  142. int send_tcp_thread_message(ParseData* data)
  143. {
  144. //send thread message
  145. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_91);
  146. unsigned thread_id = rand() % MSG_PROCESS_THREAD_NUM;
  147. if(!PostThreadMessage(nThreadID[thread_id], TCP_MSG, (WPARAM)data, 0))//post thread msg
  148. {
  149. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_31);
  150. debug_print_syslog(0, "post tcp message failed,errno:%d\n",::GetLastError());
  151. BufferUtility::getInstance()->releaseItem(data);
  152. return 1;
  153. }
  154. return 0;
  155. }
  156. int sql_process_thread_init()
  157. {
  158. int i = 0;
  159. for(i=0; i<SQL_PROCESS_THREAD_NUM; i++)
  160. {
  161. hSqlThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sql_process_entry, NULL, 0, &(nSqlThreadID[i]) );
  162. if(hSqlThread[i] == 0)
  163. {
  164. printf("start thread failed,errno:%d\n",::GetLastError());
  165. return 1;
  166. }
  167. }
  168. return 0;
  169. }
  170. // thread function
  171. unsigned __stdcall sql_process_entry(void *param)
  172. {
  173. //printf("thread sql_process_entry start\n");
  174. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_99);
  175. MSG msg;
  176. //check for msg quene isExist or not
  177. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  178. while(true)
  179. {
  180. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  181. {
  182. switch(msg.message)
  183. {
  184. case SQL_MSG:
  185. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_84);
  186. char * pInfo = (char *)msg.wParam;
  187. //printf("current thread id: %d,recv %s\n", GetCurrentThreadId(), pInfo);
  188. //process sql command
  189. #ifndef UT_TEST
  190. _exec_sql(pInfo);
  191. delete[] pInfo;
  192. #else
  193. delete[] pInfo;
  194. #endif
  195. break;
  196. }
  197. }
  198. };
  199. return 0;
  200. }
  201. int send_sql_message(char* pchr)
  202. {
  203. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_102);
  204. //send thread message
  205. unsigned thread_id = rand() % SQL_PROCESS_THREAD_NUM;
  206. if(!PostThreadMessage(nSqlThreadID[thread_id], SQL_MSG, (WPARAM)pchr, 0))//post thread msg
  207. {
  208. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_32);
  209. debug_print_syslog(0, "post sql message failed,errno:%d\n", ::GetLastError());
  210. delete[] pchr;
  211. return 1;
  212. }
  213. return 0;
  214. }
  215. int err_log_process_thread_init()
  216. {
  217. int i = 0;
  218. for(i=0; i<ERR_LOG_PROCESS_THREAD_NUM; i++)
  219. {
  220. hErrLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &err_log_process_entry, NULL, 0, &(nErrLogThreadID[i]) );
  221. if(hErrLogThread[i] == 0)
  222. {
  223. printf("start thread failed,errno:%d\n",::GetLastError());
  224. return 1;
  225. }
  226. }
  227. return 0;
  228. }
  229. // thread function
  230. unsigned __stdcall err_log_process_entry(void *param)
  231. {
  232. //printf("thread err log_process_entry start\n");
  233. MSG msg;
  234. //check for msg quene isExist or not
  235. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  236. while(true)
  237. {
  238. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  239. {
  240. switch(msg.message)
  241. {
  242. case ERR_LOG_MSG:
  243. char * pInfo = (char *)msg.wParam;
  244. #ifndef UT_TEST
  245. _write_error_log(pInfo);
  246. #else
  247. delete[] pInfo;
  248. #endif
  249. break;
  250. }
  251. }
  252. };
  253. return 0;
  254. }
  255. int send_err_log_message(char* pchr)
  256. {
  257. //send thread message
  258. unsigned thread_id = rand() % ERR_LOG_PROCESS_THREAD_NUM;
  259. if(!PostThreadMessage(nErrLogThreadID[thread_id], ERR_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
  260. {
  261. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_33);
  262. debug_print_syslog(0, "post err log message failed,errno:%d\n",::GetLastError());
  263. delete[] pchr;
  264. return 1;
  265. }
  266. return 0;
  267. }
  268. int sys_log_process_thread_init()
  269. {
  270. int i = 0;
  271. log_module_init();
  272. for(i=0; i<SYS_LOG_PROCESS_THREAD_NUM; i++)
  273. {
  274. hSysLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sys_log_process_entry, NULL, 0, &(nSysLogThreadID[i]) );
  275. if(hSysLogThread[i] == 0)
  276. {
  277. printf("start thread failed,errno:%d\n",::GetLastError());
  278. return 1;
  279. }
  280. }
  281. return 0;
  282. }
  283. // thread function
  284. unsigned __stdcall sys_log_process_entry(void *param)
  285. {
  286. //printf("thread sys log_process_entry start\n");
  287. MSG msg;
  288. //check for msg quene isExist or not
  289. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  290. while(true)
  291. {
  292. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  293. {
  294. switch(msg.message)
  295. {
  296. case SYS_LOG_MSG:
  297. char * pInfo = (char *)msg.wParam;
  298. #ifndef UT_TEST
  299. debug_print(0, pInfo);
  300. delete[] pInfo;
  301. #else
  302. delete[] pInfo;
  303. #endif
  304. break;
  305. }
  306. }
  307. };
  308. return 0;
  309. }
  310. int service_task_init()
  311. {
  312. BufferUtility* bufPtr = BufferUtility::getInstance();
  313. bufPtr->init();
  314. message_process_thread_init();
  315. sql_process_thread_init();
  316. err_log_process_thread_init();
  317. sys_log_process_thread_init();
  318. ::Sleep(1000);
  319. return 0;
  320. }
  321. int send_sys_log_message(char* pchr)
  322. {
  323. //send thread message
  324. if(!m_log_sys_status)
  325. {
  326. return 0;
  327. }
  328. unsigned thread_id = rand() % SYS_LOG_PROCESS_THREAD_NUM;
  329. if(!PostThreadMessage(nSysLogThreadID[thread_id], SYS_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
  330. {
  331. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_34);
  332. delete[] pchr;
  333. return 1;
  334. }
  335. return 0;
  336. }
  337. int debug_print_syslog(UINT debugLevel, const char* format, ...)
  338. {
  339. char *buffer = new char[LOG_BUF_SIZE];
  340. memset(buffer, 0, LOG_BUF_SIZE);
  341. va_list args;
  342. va_start (args, format);
  343. vsnprintf_s(buffer, LOG_BUF_SIZE, LOG_BUF_SIZE-1, format, args);
  344. va_end (args);
  345. send_sys_log_message(buffer);
  346. return 0;
  347. }