service_position.cpp 7.5 KB


  1. #include <log.h>
  2. #include "service_position.h"
  3. #include <boost/bind.hpp>
  4. #include <boost/format.hpp>
  5. #include <time.h>
  6. void service_position::init_cache(info_send_ptr& p_info)
  7. {
  8. // 最大缓存频率个数数据,对于小于1Hz的卡,固定为1个
  9. p_info->count_max = static_cast<std::size_t>(p_info->feq * 0.3);
  10. if(0 == p_info->count_max){
  11. p_info->count_max = 1;
  12. }
  13. // 因为后面是延迟一秒发送,所以缓存的个数时间总和要大于延迟的时间,这里在计算的count_max增加20%
  14. std::size_t d = p_info->count_max * 0.2;
  15. if(3 > d){
  16. d = 3;
  17. }
  18. }
  19. void service_position::notify(const std::string& msg, const std::string& id, const double& feq)
  20. {
  21. if(m_stop){
  22. return;
  23. }
  24. if(0.001 > feq || 1000.0 < feq){
  25. // 频率非法
  26. log_info("[service-position] 频率非法,频率值:%.3f", feq);
  27. return;
  28. }
  29. service_ptr _service = m_service;
  30. if(_service){
  31. boost::shared_ptr<std::string> p_buffer(new std::string(msg));
  32. _service->dispatch(boost::bind(&service_position::handle_buffer, shared_from_this(), p_buffer, id, feq, _service));
  33. }
  34. }
  35. void service_position::handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service)
  36. {
  37. if(m_stop){
  38. return;
  39. }
  40. info_send_ptr _info;
  41. auto iter = m_deques.find(id);
  42. if(m_deques.end() == iter){
  43. _info = std::make_shared<info_send>();
  44. _info->id = id;
  45. _info->feq = feq;
  46. _info->p_timer = std::make_shared<boost::asio::deadline_timer>(*p_service);
  47. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  48. // 为了避免回环引用问题,这里没有绑定std::shared_ptr<info_send>,而是使用std::weak_ptr,不过直接传std::shared_ptr也没事,service_position对象停止时会主动断开联系
  49. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
  50. init_cache(_info);
  51. m_deques.insert(std::make_pair(id, _info));
  52. }else{
  53. _info = iter->second;
  54. if(0.001 < fabs(_info->feq - feq)){
  55. _info->feq = feq;
  56. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  57. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
  58. init_cache(_info);
  59. }
  60. }
  61. info_data_ptr _data(new info_data());
  62. _data->p_msg = p_buffer;
  63. _data->time_receive = std::chrono::system_clock::now();
  64. _info->data.push_back(_data);
  65. }
  66. std::string service_position::to_str(const std::chrono::system_clock::time_point& t)
  67. {
  68. uint64_t mill = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count() - std::chrono::duration_cast<std::chrono::seconds>(t.time_since_epoch()).count()*1000;
  69. time_t tt = std::chrono::system_clock::to_time_t(t);
  70. struct tm local_time;
  71. localtime_r(&tt, &local_time);
  72. char _time[128];
  73. snprintf(_time, 128, "%d-%02d-%02d %02d:%02d:%02d.%03d", local_time.tm_year + 1900, local_time.tm_mon + 1, local_time.tm_mday, local_time.tm_hour,local_time.tm_min, local_time.tm_sec, (int)mill);
  74. return _time;
  75. }
  76. void service_position::handle_timer(const boost::system::error_code& ec, std::weak_ptr<info_send> p)
  77. {
  78. if(ec){
  79. return;
  80. }
  81. if(m_stop){
  82. return;
  83. }
  84. auto _info = p.lock();
  85. if(!_info){
  86. return;
  87. }
  88. if(!_info->data.empty()){
  89. if(_info->count_max < _info->data.size()){
  90. auto iter = _info->data.begin();
  91. for(std::size_t i = 0; i < _info->data.size() - _info->count_max; ++i){
  92. handle_notify((*iter)->p_msg);
  93. ++iter;
  94. }
  95. _info->data.erase(_info->data.begin(), iter);
  96. }
  97. auto _data = *(_info->data.begin());
  98. _info->data.pop_front();
  99. handle_notify(_data->p_msg);
  100. }
  101. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  102. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, p));
  103. }
  104. void service_position::set_port(const int port)
  105. {
  106. m_port = port;
  107. }
  108. bool service_position::start()
  109. {
  110. try{
  111. if(0 >= m_port || 65536 < m_port){
  112. log_info("[service-position] port is invalid, port = %d", m_port);
  113. return false;
  114. }
  115. log_info("[service-position] ios service is running, listen port: %d", m_port);
  116. m_stop = false;
  117. m_service = service_ptr(new service_ptr::element_type());
  118. boost::asio::ip::tcp::endpoint point(boost::asio::ip::address::from_string("0.0.0.0"), m_port);
  119. m_acceptor = acceptor_ptr(new acceptor_ptr::element_type(*m_service, point));
  120. socket_ptr _socket = socket_ptr(new socket_ptr::element_type(*m_service));
  121. m_acceptor->async_accept(*_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, _socket));
  122. m_thread = boost::thread(boost::bind(&service_position::handle_thread, shared_from_this(), m_service));
  123. }catch(std::exception& ){
  124. return false;
  125. }
  126. return true;
  127. }
  128. void service_position::stop()
  129. {
  130. m_stop = true;
  131. service_ptr _service = m_service;
  132. if(_service){
  133. _service->post(boost::bind(&service_position::handle_stop, shared_from_this()));
  134. }
  135. m_thread.timed_join(boost::posix_time::seconds(1));
  136. }
  137. void service_position::handle_thread(service_ptr p_service)
  138. {
  139. while(true){
  140. try{
  141. boost::asio::io_service::work w(*p_service);
  142. p_service->run();
  143. }catch(boost::thread_interrupted&){
  144. break;
  145. }catch(std::exception& e){
  146. }
  147. boost::this_thread::sleep(boost::posix_time::minutes(1));
  148. }
  149. }
  150. void service_position::handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket)
  151. {
  152. if(m_stop){
  153. boost::system::error_code e;
  154. if(p_socket){
  155. p_socket->close(e);
  156. }
  157. return;
  158. }
  159. if(ec){
  160. boost::system::error_code e;
  161. if(p_socket){
  162. p_socket->close(e);
  163. }
  164. return;
  165. }
  166. m_sockets.push_back(p_socket);
  167. log_info("[service-position] client %s is connected.", p_socket->remote_endpoint().address().to_string().c_str());
  168. auto _service = m_service;
  169. if(!_service){
  170. return;
  171. }
  172. p_socket = socket_ptr(new socket_ptr::element_type(*m_service));
  173. m_acceptor->async_accept(*p_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, p_socket));
  174. }
  175. void service_position::handle_stop()
  176. {
  177. boost::system::error_code ec;
  178. service_ptr _service = m_service;
  179. m_service.reset();
  180. if(_service){
  181. _service->stop();
  182. }
  183. acceptor_ptr _acceptor = m_acceptor;
  184. m_acceptor.reset();
  185. if(m_acceptor){
  186. _acceptor->close(ec);
  187. }
  188. m_sockets.clear();
  189. }
  190. void service_position::handle_notify(buffer_ptr p_buffer)
  191. {
  192. if(m_stop){
  193. return;
  194. }
  195. log_info("[service-position] send msg: %s", p_buffer->c_str());
  196. for(auto iter = m_sockets.begin(); iter != m_sockets.end(); ++iter){
  197. (*iter)->async_write_some(boost::asio::buffer(p_buffer->c_str(), p_buffer->size()), boost::bind(&service_position::handle_write, shared_from_this(), _1, _2, *iter, p_buffer));
  198. }
  199. }
  200. void service_position::handle_write(const boost::system::error_code&, unsigned int, socket_ptr, buffer_ptr)
  201. {}