ProcessRemodule.cpp 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. #include "stdafx.h"
  2. #include "ProcessRemodule.h"
  3. #include <Windows.h>
  4. #include <iostream>
  5. #include <exception>
  6. #include "log_process_module.h"
  7. #include "./system_basic_info/SystemAnalysis.h"
  8. using namespace std;
  9. //#include <thread>
  10. //#include <mutex>
  11. //#include <chrono>
  12. #define BUF_SIZE 5001
  13. #define REAL_BUF BUF_SIZE-1
  14. ParseData* gBuf = NULL;
  15. unsigned int gBufFlag[BUF_SIZE] = {0};
  16. HANDLE gBufMutex;
  17. HANDLE hThread[MSG_PROCESS_THREAD_NUM];
  18. unsigned nThreadID[MSG_PROCESS_THREAD_NUM];
  19. HANDLE hSqlThread[SQL_PROCESS_THREAD_NUM];
  20. unsigned nSqlThreadID[SQL_PROCESS_THREAD_NUM];
  21. HANDLE hErrLogThread[ERR_LOG_PROCESS_THREAD_NUM];
  22. unsigned nErrLogThreadID[ERR_LOG_PROCESS_THREAD_NUM];
  23. HANDLE hSysLogThread[SYS_LOG_PROCESS_THREAD_NUM];
  24. unsigned nSysLogThreadID[SYS_LOG_PROCESS_THREAD_NUM];
  25. BufferUtility* BufferUtility::instance = NULL;
  26. BufferUtility* BufferUtility::getInstance()
  27. {
  28. if (instance == NULL)
  29. {
  30. if (instance == NULL)
  31. {
  32. instance = new BufferUtility;
  33. }
  34. }
  35. return instance;
  36. }
  37. int BufferUtility::init()
  38. {
  39. gBuf = (ParseData *)malloc(sizeof(ParseData) * BUF_SIZE);
  40. memset(gBuf, 0, sizeof(ParseData) * BUF_SIZE);
  41. memset(gBufFlag, 0, sizeof(gBufFlag));
  42. gBufMutex =CreateMutex(NULL, FALSE, NULL);
  43. return 0;
  44. }
  45. ParseData* BufferUtility::getItem()
  46. {
  47. int i = 0;
  48. WaitForSingleObject(gBufMutex,INFINITE);
  49. for(i=0; i<REAL_BUF; i++)
  50. {
  51. if(0 == gBufFlag[i])
  52. {
  53. break;
  54. }
  55. }
  56. if(i == REAL_BUF)
  57. {
  58. return NULL;
  59. }
  60. gBufFlag[i] = 1;
  61. ReleaseMutex(gBufMutex);
  62. return &(gBuf[i]);
  63. }
  64. int BufferUtility::releaseItem(int i)
  65. {
  66. WaitForSingleObject(gBufMutex,INFINITE);
  67. gBufFlag[i] = 0;
  68. memset(&(gBuf[i]), 0, sizeof(ParseData));
  69. ReleaseMutex(gBufMutex);
  70. return 0;
  71. }
  72. int BufferUtility::releaseItem(ParseData* itemPtr)
  73. {
  74. int i = 0;
  75. WaitForSingleObject(gBufMutex,INFINITE);
  76. for(i=0; i<REAL_BUF; i++)
  77. {
  78. if(itemPtr == (ParseData*)(&(gBuf[i])))
  79. {
  80. break;
  81. }
  82. }
  83. gBufFlag[i] = 0;
  84. memset(&(gBuf[i]), 0, sizeof(ParseData));
  85. ReleaseMutex(gBufMutex);
  86. return 0;
  87. }
  88. int message_process_thread_init()
  89. {
  90. //登录时初始化缓冲区
  91. BufferUtility* bufferInstance = BufferUtility::getInstance();
  92. bufferInstance->init();
  93. //初始化消息处理线程
  94. int i = 0;
  95. for(i=0; i<MSG_PROCESS_THREAD_NUM; i++)
  96. {
  97. hThread[i] = (HANDLE)_beginthreadex( NULL, 0, &message_process_entry, NULL, 0, &(nThreadID[i]) );
  98. if(hThread[i] == 0)
  99. {
  100. printf("start thread failed,errno:%d\n",::GetLastError());
  101. return 1;
  102. }
  103. }
  104. return 0;
  105. }
  106. unsigned __stdcall message_process_entry(void *param)
  107. {
  108. MSG msg;
  109. //check for msg quene isExist or not
  110. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  111. while(true)
  112. {
  113. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  114. {
  115. switch(msg.message)
  116. {
  117. case TCP_MSG:
  118. //add message process
  119. ParseData* data = (ParseData*)(msg.wParam);
  120. #ifndef UT_TEST
  121. CYAServerDlg* dlg = reinterpret_cast<CYAServerDlg*>(data->handle);
  122. //#ifdef TRACE_MEMORY_PARSE_PACKAGE
  123. try
  124. {
  125. dlg->parse_package_data(data->buf, data->len, data->dwConnID);
  126. }
  127. catch(exception e)
  128. {
  129. ;
  130. }
  131. //#endif // TRACE_MEMORY_PARSE_PACKAGE
  132. BufferUtility::getInstance()->releaseItem(data);
  133. break;
  134. #else
  135. //printf("current thread id: %d, receive tcp msg:%d\n", GetCurrentThreadId(), data->a);
  136. BufferUtility::getInstance()->releaseItem(data);
  137. #endif
  138. }
  139. }
  140. }
  141. return 0;
  142. }
  143. int send_tcp_thread_message(ParseData* data)
  144. {
  145. //send thread message
  146. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_91);
  147. unsigned thread_id = rand() % MSG_PROCESS_THREAD_NUM;
  148. if(!PostThreadMessage(nThreadID[thread_id], TCP_MSG, (WPARAM)data, 0))//post thread msg
  149. {
  150. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_31);
  151. debug_print_syslog(0, "post tcp message failed,errno:%d\n",::GetLastError());
  152. BufferUtility::getInstance()->releaseItem(data);
  153. return 1;
  154. }
  155. return 0;
  156. }
  157. int sql_process_thread_init()
  158. {
  159. int i = 0;
  160. for(i=0; i<SQL_PROCESS_THREAD_NUM; i++)
  161. {
  162. hSqlThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sql_process_entry, NULL, 0, &(nSqlThreadID[i]) );
  163. if(hSqlThread[i] == 0)
  164. {
  165. printf("start thread failed,errno:%d\n",::GetLastError());
  166. return 1;
  167. }
  168. }
  169. return 0;
  170. }
  171. // thread function
  172. unsigned __stdcall sql_process_entry(void *param)
  173. {
  174. //printf("thread sql_process_entry start\n");
  175. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_99);
  176. MSG msg;
  177. //check for msg quene isExist or not
  178. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  179. while(true)
  180. {
  181. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  182. {
  183. switch(msg.message)
  184. {
  185. case SQL_MSG:
  186. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_84);
  187. char * pInfo = (char *)msg.wParam;
  188. //printf("current thread id: %d,recv %s\n", GetCurrentThreadId(), pInfo);
  189. //process sql command
  190. #ifndef UT_TEST
  191. _exec_sql(pInfo);
  192. delete[] pInfo;
  193. #else
  194. delete[] pInfo;
  195. #endif
  196. break;
  197. }
  198. }
  199. };
  200. return 0;
  201. }
  202. int send_sql_message(char* pchr)
  203. {
  204. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_102);
  205. //send thread message
  206. unsigned thread_id = rand() % SQL_PROCESS_THREAD_NUM;
  207. if(!PostThreadMessage(nSqlThreadID[thread_id], SQL_MSG, (WPARAM)pchr, 0))//post thread msg
  208. {
  209. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_32);
  210. debug_print_syslog(0, "post sql message failed,errno:%d\n", ::GetLastError());
  211. delete[] pchr;
  212. return 1;
  213. }
  214. return 0;
  215. }
  216. int err_log_process_thread_init()
  217. {
  218. int i = 0;
  219. for(i=0; i<ERR_LOG_PROCESS_THREAD_NUM; i++)
  220. {
  221. hErrLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &err_log_process_entry, NULL, 0, &(nErrLogThreadID[i]) );
  222. if(hErrLogThread[i] == 0)
  223. {
  224. printf("start thread failed,errno:%d\n",::GetLastError());
  225. return 1;
  226. }
  227. }
  228. return 0;
  229. }
  230. // thread function
  231. unsigned __stdcall err_log_process_entry(void *param)
  232. {
  233. //printf("thread err log_process_entry start\n");
  234. MSG msg;
  235. //check for msg quene isExist or not
  236. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  237. while(true)
  238. {
  239. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  240. {
  241. switch(msg.message)
  242. {
  243. case ERR_LOG_MSG:
  244. char * pInfo = (char *)msg.wParam;
  245. #ifndef UT_TEST
  246. _write_error_log(pInfo);
  247. #else
  248. delete[] pInfo;
  249. #endif
  250. break;
  251. }
  252. }
  253. };
  254. return 0;
  255. }
  256. int send_err_log_message(char* pchr)
  257. {
  258. //send thread message
  259. unsigned thread_id = rand() % ERR_LOG_PROCESS_THREAD_NUM;
  260. if(!PostThreadMessage(nErrLogThreadID[thread_id], ERR_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
  261. {
  262. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_33);
  263. debug_print_syslog(0, "post err log message failed,errno:%d\n",::GetLastError());
  264. delete[] pchr;
  265. return 1;
  266. }
  267. return 0;
  268. }
  269. int sys_log_process_thread_init()
  270. {
  271. int i = 0;
  272. log_module_init();
  273. for(i=0; i<SYS_LOG_PROCESS_THREAD_NUM; i++)
  274. {
  275. hSysLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sys_log_process_entry, NULL, 0, &(nSysLogThreadID[i]) );
  276. if(hSysLogThread[i] == 0)
  277. {
  278. printf("start thread failed,errno:%d\n",::GetLastError());
  279. return 1;
  280. }
  281. }
  282. return 0;
  283. }
  284. // thread function
  285. unsigned __stdcall sys_log_process_entry(void *param)
  286. {
  287. //printf("thread sys log_process_entry start\n");
  288. MSG msg;
  289. //check for msg quene isExist or not
  290. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  291. while(true)
  292. {
  293. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  294. {
  295. switch(msg.message)
  296. {
  297. case SYS_LOG_MSG:
  298. char * pInfo = (char *)msg.wParam;
  299. #ifndef UT_TEST
  300. debug_print(0, pInfo);
  301. delete[] pInfo;
  302. #else
  303. delete[] pInfo;
  304. #endif
  305. break;
  306. }
  307. }
  308. };
  309. return 0;
  310. }
  311. int service_task_init()
  312. {
  313. BufferUtility* bufPtr = BufferUtility::getInstance();
  314. bufPtr->init();
  315. message_process_thread_init();
  316. sql_process_thread_init();
  317. err_log_process_thread_init();
  318. sys_log_process_thread_init();
  319. ::Sleep(1000);
  320. return 0;
  321. }
  322. int send_sys_log_message(char* pchr)
  323. {
  324. //send thread message
  325. if(!m_log_sys_status)
  326. {
  327. return 0;
  328. }
  329. unsigned thread_id = rand() % SYS_LOG_PROCESS_THREAD_NUM;
  330. if(!PostThreadMessage(nSysLogThreadID[thread_id], SYS_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
  331. {
  332. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_34);
  333. delete[] pchr;
  334. return 1;
  335. }
  336. return 0;
  337. }
  338. int debug_print_syslog(UINT debugLevel, const char* format, ...)
  339. {
  340. char *buffer = new char[LOG_BUF_SIZE];
  341. memset(buffer, 0, LOG_BUF_SIZE);
  342. va_list args;
  343. va_start (args, format);
  344. vsnprintf_s(buffer, LOG_BUF_SIZE, LOG_BUF_SIZE-1, format, args);
  345. va_end (args);
  346. send_sys_log_message(buffer);
  347. return 0;
  348. }