ProcessRemodule.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. #include "stdafx.h"
  2. #include "ProcessRemodule.h"
  3. #include <Windows.h>
  4. #include <iostream>
  5. #include <exception>
  6. #include "log_process_module.h"
  7. #include "./system_basic_info/SystemAnalysis.h"
  8. using namespace std;
  9. //#include <thread>
  10. //#include <mutex>
  11. //#include <chrono>
  12. #define BUF_SIZE 5001
  13. #define REAL_BUF BUF_SIZE-1
  14. ParseData* gBuf = NULL;
  15. unsigned int gBufFlag[BUF_SIZE] = {0};
  16. HANDLE gBufMutex;
  17. HANDLE hThread[MSG_PROCESS_THREAD_NUM];
  18. unsigned nThreadID[MSG_PROCESS_THREAD_NUM];
  19. HANDLE hSqlThread[SQL_PROCESS_THREAD_NUM];
  20. unsigned nSqlThreadID[SQL_PROCESS_THREAD_NUM];
  21. HANDLE hSysLogThread[SYS_LOG_PROCESS_THREAD_NUM];
  22. unsigned nSysLogThreadID[SYS_LOG_PROCESS_THREAD_NUM];
  23. BufferUtility* BufferUtility::instance = NULL;
  24. BufferUtility* BufferUtility::getInstance()
  25. {
  26. if (instance == NULL)
  27. {
  28. instance = new BufferUtility;
  29. }
  30. return instance;
  31. }
  32. int BufferUtility::init()
  33. {
  34. gBuf = (ParseData *)malloc(sizeof(ParseData) * BUF_SIZE);
  35. memset(gBuf, 0, sizeof(ParseData) * BUF_SIZE);
  36. memset(gBufFlag, 0, sizeof(gBufFlag));
  37. gBufMutex =CreateMutex(NULL, FALSE, NULL);
  38. return 0;
  39. }
  40. ParseData* BufferUtility::getItem()
  41. {
  42. int i = 0;
  43. WaitForSingleObject(gBufMutex,INFINITE);
  44. for(i=0; i<REAL_BUF; i++)
  45. {
  46. if(0 == gBufFlag[i])
  47. {
  48. //debug_print_syslog(0,"[getItem for:%d] for circle.",i);
  49. break;
  50. }
  51. }
  52. if(i == REAL_BUF)
  53. {
  54. //debug_print_syslog(0,"[getItem] why here");
  55. ReleaseMutex(gBufMutex);
  56. Sleep(10);
  57. return NULL;
  58. }
  59. //debug_print_syslog(0,"[getItem for:%d] for circle.",i);
  60. gBufFlag[i] = 1;
  61. ReleaseMutex(gBufMutex);
  62. return &(gBuf[i]);
  63. }
  64. int BufferUtility::releaseItem(int i)
  65. {
  66. WaitForSingleObject(gBufMutex,INFINITE);
  67. //debug_print_syslog(0,"[releaseItem--i]:%d",i);
  68. gBufFlag[i] = 0;
  69. memset(&(gBuf[i]), 0, sizeof(ParseData));
  70. ReleaseMutex(gBufMutex);
  71. return 0;
  72. }
  73. int BufferUtility::releaseItem(ParseData* itemPtr)
  74. {
  75. int i = 0;
  76. WaitForSingleObject(gBufMutex,INFINITE);
  77. for(i=0; i<REAL_BUF; i++)
  78. {
  79. if(itemPtr == (ParseData*)(&(gBuf[i])))
  80. {
  81. break;
  82. }
  83. }
  84. gBufFlag[i] = 0;
  85. memset(&(gBuf[i]), 0, sizeof(ParseData));
  86. //debug_print_syslog(0,"[releaseItem--item]:%d",i);
  87. ReleaseMutex(gBufMutex);
  88. return 0;
  89. }
  90. int message_process_thread_init()
  91. {
  92. //登录时初始化缓冲区
  93. BufferUtility* bufferInstance = BufferUtility::getInstance();
  94. bufferInstance->init();
  95. //初始化消息处理线程
  96. int i = 0;
  97. for(i=0; i<MSG_PROCESS_THREAD_NUM; i++)
  98. {
  99. hThread[i] = (HANDLE)_beginthreadex( NULL, 0, &message_process_entry, NULL, 0, &(nThreadID[i]) );
  100. //printf("thread id: %d.\n",hThread[i]);
  101. if(hThread[i] == 0)
  102. {
  103. printf("start thread failed,errno:%d\n",::GetLastError());
  104. return 1;
  105. }
  106. }
  107. return 0;
  108. }
  109. unsigned __stdcall message_process_entry(void *param)
  110. {
  111. MSG msg;
  112. //check for msg quene isExist or not
  113. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  114. while(true)
  115. {
  116. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  117. {
  118. switch(msg.message)
  119. {
  120. case TCP_MSG:
  121. //add message process
  122. ParseData* data = (ParseData*)(msg.wParam);
  123. #ifndef UT_TEST
  124. CYAServerDlg* dlg = reinterpret_cast<CYAServerDlg*>(data->handle);
  125. //#ifdef TRACE_MEMORY_PARSE_PACKAGE
  126. try
  127. {
  128. //CYAServerDlg* handle;
  129. //lemon 2017/10/14
  130. int len = data->len;
  131. CONNID dwConnID = data->dwConnID;
  132. BYTE buf[LENGTH_MSG_4K];
  133. memcpy(buf, data->buf, len);
  134. BufferUtility::getInstance()->releaseItem(data);
  135. //dlg->parse_package_data(data->buf,data->len,data->dwConnID);
  136. dlg->parse_package_data(buf, len, dwConnID);
  137. }
  138. catch(exception e)
  139. {
  140. debug_print_syslog(0,"[parse_package_data]exception!!~");
  141. }
  142. //#endif // TRACE_MEMORY_PARSE_PACKAGE
  143. //BufferUtility::getInstance()->releaseItem(data);
  144. break;
  145. #else
  146. //printf("current thread id: %d, receive tcp msg:%d\n", GetCurrentThreadId(), data->a);
  147. BufferUtility::getInstance()->releaseItem(data);
  148. #endif
  149. }
  150. }
  151. }
  152. return 0;
  153. }
  154. int send_tcp_thread_message(ParseData* data)
  155. {
  156. //send thread message
  157. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_91);
  158. unsigned thread_id = rand() % MSG_PROCESS_THREAD_NUM;
  159. if(!PostThreadMessage(nThreadID[thread_id], TCP_MSG, (WPARAM)data, 0))//post thread msg
  160. {
  161. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_31);
  162. //debug_print_syslog(0, "post tcp message failed,errno:%d",::GetLastError());
  163. BufferUtility::getInstance()->releaseItem(data);
  164. return 1;
  165. }
  166. return 0;
  167. }
  168. int sql_process_thread_init()
  169. {
  170. int i = 0;
  171. for(i=0; i<SQL_PROCESS_THREAD_NUM; i++)
  172. {
  173. hSqlThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sql_process_entry, NULL, 0, &(nSqlThreadID[i]) );
  174. if(hSqlThread[i] == 0)
  175. {
  176. printf("start thread failed,errno:%d\n",::GetLastError());
  177. return 1;
  178. }
  179. }
  180. return 0;
  181. }
  182. // thread function
  183. unsigned __stdcall sql_process_entry(void *param)
  184. {
  185. //printf("thread sql_process_entry start\n");
  186. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_99);
  187. MSG msg;
  188. //check for msg quene isExist or not
  189. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  190. while(true)
  191. {
  192. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  193. {
  194. switch(msg.message)
  195. {
  196. case SQL_MSG:
  197. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_84);
  198. char * pInfo = (char *)msg.wParam;
  199. //printf("current thread id: %d,recv %s\n", GetCurrentThreadId(), pInfo);
  200. //process sql command
  201. #ifndef UT_TEST
  202. _exec_sql(pInfo);
  203. delete[] pInfo;
  204. #else
  205. delete[] pInfo;
  206. #endif
  207. break;
  208. }
  209. }
  210. };
  211. return 0;
  212. }
  213. int send_sql_message(char* pchr)
  214. {
  215. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_102);
  216. //send thread message
  217. unsigned thread_id = rand() % SQL_PROCESS_THREAD_NUM;
  218. if(!PostThreadMessage(nSqlThreadID[thread_id], SQL_MSG, (WPARAM)pchr, 0))//post thread msg
  219. {
  220. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_32);
  221. debug_print_syslog(0, "post sql message failed,errno:%d", ::GetLastError());
  222. delete[] pchr;
  223. return 1;
  224. }
  225. return 0;
  226. }
  227. int sys_log_process_thread_init()
  228. {
  229. int i = 0;
  230. log_module_init();
  231. for(i=0; i<SYS_LOG_PROCESS_THREAD_NUM; i++)
  232. {
  233. hSysLogThread[i] = (HANDLE)_beginthreadex( NULL, 0, &sys_log_process_entry, NULL, 0, &(nSysLogThreadID[i]) );
  234. if(hSysLogThread[i] == 0)
  235. {
  236. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_301);
  237. return 1;
  238. }
  239. }
  240. return 0;
  241. }
  242. // thread function
  243. unsigned __stdcall sys_log_process_entry(void *param)
  244. {
  245. MSG msg;
  246. //check for msg quene isExist or not
  247. PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  248. while(true)
  249. {
  250. if(GetMessage(&msg,0,0,0)) //get msg from message queue
  251. {
  252. switch(msg.message)
  253. {
  254. case SYS_LOG_MSG:
  255. char * pInfo = (char *)msg.wParam;
  256. #ifndef UT_TEST
  257. debug_print(0, pInfo);
  258. delete[] pInfo;
  259. #else
  260. delete[] pInfo;
  261. #endif
  262. break;
  263. }
  264. }
  265. };
  266. return 0;
  267. }
  268. int service_task_init()
  269. {
  270. BufferUtility* bufPtr = BufferUtility::getInstance();
  271. bufPtr->init();
  272. message_process_thread_init();
  273. sql_process_thread_init();
  274. sys_log_process_thread_init();
  275. ::Sleep(1000);
  276. return 0;
  277. }
  278. int send_sys_log_message(char* pchr)
  279. {
  280. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_303);
  281. //send thread message
  282. if(!m_log_sys_status)
  283. {
  284. return 0;
  285. }
  286. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_304);
  287. unsigned thread_id = rand() % SYS_LOG_PROCESS_THREAD_NUM;
  288. if(!PostThreadMessage(nSysLogThreadID[thread_id], SYS_LOG_MSG, (WPARAM)pchr, 0))//post thread msg
  289. {
  290. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_34);
  291. delete[] pchr;
  292. return 1;
  293. }
  294. return 0;
  295. }
  296. int debug_print_syslog(UINT debugLevel, const char* format, ...)
  297. {
  298. LOCATION_SYSTEM_BRANCH(LOCATION_SYSTEM_BRANCH_302);
  299. char *buffer = new char[LOG_BUF_SIZE];
  300. memset(buffer, 0, LOG_BUF_SIZE);
  301. va_list args;
  302. va_start (args, format);
  303. vsnprintf_s(buffer, LOG_BUF_SIZE, LOG_BUF_SIZE-1, format, args);
  304. va_end (args);
  305. send_sys_log_message(buffer);
  306. return 0;
  307. }