service_position.cpp 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #include "service_position.h"
  2. #include <boost/bind.hpp>
  3. #include <boost/format.hpp>
  4. #include <time.h>
  5. void service_position::init_cache(info_send_ptr& p_info)
  6. {
  7. // 最大缓存频率个数数据,对于小于1Hz的卡,固定为1个
  8. p_info->count_max = static_cast<std::size_t>(p_info->feq * 0.3);
  9. if(0 == p_info->count_max){
  10. p_info->count_max = 1;
  11. }
  12. // 因为后面是延迟一秒发送,所以缓存的个数时间总和要大于延迟的时间,这里在计算的count_max增加20%
  13. std::size_t d = p_info->count_max * 0.2;
  14. if(3 > d){
  15. d = 3;
  16. }
  17. }
  18. void service_position::notify(const std::string& msg, const std::string& id, const double& feq)
  19. {
  20. if(m_stop){
  21. return;
  22. }
  23. if(0.001 > feq || 1000.0 < feq){
  24. // 频率非法
  25. return;
  26. }
  27. service_ptr _service = m_service;
  28. if(_service){
  29. boost::shared_ptr<std::string> p_buffer(new std::string(msg));
  30. _service->dispatch(boost::bind(&service_position::handle_buffer, shared_from_this(), p_buffer, id, feq, _service));
  31. }
  32. }
  33. void service_position::handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service)
  34. {
  35. if(m_stop){
  36. return;
  37. }
  38. info_send_ptr _info;
  39. auto iter = m_deques.find(id);
  40. if(m_deques.end() == iter){
  41. _info = std::make_shared<info_send>();
  42. _info->id = id;
  43. _info->feq = feq;
  44. _info->p_timer = std::make_shared<boost::asio::deadline_timer>(*p_service);
  45. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  46. // 为了避免回环引用问题,这里没有绑定std::shared_ptr<info_send>,而是使用std::weak_ptr,不过直接传std::shared_ptr也没事,service_position对象停止时会主动断开联系
  47. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
  48. init_cache(_info);
  49. m_deques.insert(std::make_pair(id, _info));
  50. }else{
  51. _info = iter->second;
  52. if(0.001 < fabs(_info->feq - feq)){
  53. _info->feq = feq;
  54. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  55. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
  56. init_cache(_info);
  57. }
  58. }
  59. info_data_ptr _data(new info_data());
  60. _data->p_msg = p_buffer;
  61. _data->time_receive = std::chrono::system_clock::now();
  62. _info->data.push_back(_data);
  63. }
  64. std::string service_position::to_str(const std::chrono::system_clock::time_point& t)
  65. {
  66. uint64_t mill = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count() - std::chrono::duration_cast<std::chrono::seconds>(t.time_since_epoch()).count()*1000;
  67. time_t tt = std::chrono::system_clock::to_time_t(t);
  68. struct tm local_time;
  69. localtime_r(&tt, &local_time);
  70. char _time[128];
  71. snprintf(_time, 128, "%d-%02d-%02d %02d:%02d:%02d.%03d", local_time.tm_year + 1900, local_time.tm_mon + 1, local_time.tm_mday, local_time.tm_hour,local_time.tm_min, local_time.tm_sec, (int)mill);
  72. return _time;
  73. }
  74. void service_position::handle_timer(const boost::system::error_code& ec, std::weak_ptr<info_send> p)
  75. {
  76. if(ec){
  77. return;
  78. }
  79. if(m_stop){
  80. return;
  81. }
  82. auto _info = p.lock();
  83. if(!_info){
  84. return;
  85. }
  86. if(!_info->data.empty()){
  87. if(_info->count_max < _info->data.size()){
  88. auto iter = _info->data.begin();
  89. for(std::size_t i = 0; i < _info->data.size() - _info->count_max; ++i){
  90. handle_notify((*iter)->p_msg);
  91. ++iter;
  92. }
  93. _info->data.erase(_info->data.begin(), iter);
  94. }
  95. auto _data = *(_info->data.begin());
  96. _info->data.pop_front();
  97. handle_notify(_data->p_msg);
  98. }
  99. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  100. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, p));
  101. }
  102. void service_position::set_port(const int port)
  103. {
  104. m_port = port;
  105. }
  106. bool service_position::start()
  107. {
  108. try{
  109. if(0 >= m_port || 65536 < m_port){
  110. return false;
  111. }
  112. m_stop = false;
  113. m_service = service_ptr(new service_ptr::element_type());
  114. boost::asio::ip::tcp::endpoint point(boost::asio::ip::address::from_string("0.0.0.0"), m_port);
  115. m_acceptor = acceptor_ptr(new acceptor_ptr::element_type(*m_service, point));
  116. socket_ptr _socket = socket_ptr(new socket_ptr::element_type(*m_service));
  117. m_acceptor->async_accept(*_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, _socket));
  118. m_thread = boost::thread(boost::bind(&service_position::handle_thread, shared_from_this(), m_service));
  119. }catch(std::exception& ){
  120. return false;
  121. }
  122. return true;
  123. }
  124. void service_position::stop()
  125. {
  126. m_stop = true;
  127. service_ptr _service = m_service;
  128. if(_service){
  129. _service->post(boost::bind(&service_position::handle_stop, shared_from_this()));
  130. }
  131. m_thread.timed_join(boost::posix_time::seconds(1));
  132. }
  133. void service_position::handle_thread(service_ptr p_service)
  134. {
  135. while(true){
  136. try{
  137. boost::asio::io_service::work w(*p_service);
  138. p_service->run();
  139. }catch(boost::thread_interrupted&){
  140. break;
  141. }catch(std::exception& e){
  142. }
  143. boost::this_thread::sleep(boost::posix_time::minutes(1));
  144. }
  145. }
  146. void service_position::handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket)
  147. {
  148. if(m_stop){
  149. boost::system::error_code e;
  150. if(p_socket){
  151. p_socket->close(e);
  152. }
  153. return;
  154. }
  155. if(ec){
  156. boost::system::error_code e;
  157. if(p_socket){
  158. p_socket->close(e);
  159. }
  160. return;
  161. }
  162. m_sockets.push_back(p_socket);
  163. auto _service = m_service;
  164. if(!_service){
  165. return;
  166. }
  167. p_socket = socket_ptr(new socket_ptr::element_type(*m_service));
  168. m_acceptor->async_accept(*p_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, p_socket));
  169. }
  170. void service_position::handle_stop()
  171. {
  172. boost::system::error_code ec;
  173. service_ptr _service = m_service;
  174. m_service.reset();
  175. if(_service){
  176. _service->stop();
  177. }
  178. acceptor_ptr _acceptor = m_acceptor;
  179. m_acceptor.reset();
  180. if(m_acceptor){
  181. _acceptor->close(ec);
  182. }
  183. m_sockets.clear();
  184. }
  185. void service_position::handle_notify(buffer_ptr p_buffer)
  186. {
  187. if(m_stop){
  188. return;
  189. }
  190. for(auto iter = m_sockets.begin(); iter != m_sockets.end(); ++iter){
  191. (*iter)->async_write_some(boost::asio::buffer(p_buffer->c_str(), p_buffer->size()), boost::bind(&service_position::handle_write, shared_from_this(), _1, _2, *iter, p_buffer));
  192. }
  193. }
  194. void service_position::handle_write(const boost::system::error_code&, unsigned int, socket_ptr, buffer_ptr)
  195. {}
  196. service_position_ptr service_position::instance(int port)
  197. {
  198. service_position_ptr _service = service_position_ptr(new service_position_ptr::element_type());
  199. _service->set_port(port);
  200. _service->start();
  201. return _service;
  202. }