service_position.cpp 7.7 KB


  1. #include <log.h>
  2. #include "service_position.h"
  3. #include <boost/bind.hpp>
  4. #include <boost/format.hpp>
  5. #include <time.h>
  6. void service_position::init_cache(info_send_ptr& p_info)
  7. {
  8. // 最大缓存频率个数数据,对于小于1Hz的卡,固定为1个
  9. p_info->count_max = static_cast<std::size_t>(p_info->feq * 0.3);
  10. if(0 == p_info->count_max){
  11. p_info->count_max = 1;
  12. }
  13. // 因为后面是延迟一秒发送,所以缓存的个数时间总和要大于延迟的时间,这里在计算的count_max增加20%
  14. std::size_t d = p_info->count_max * 0.2;
  15. if(3 > d){
  16. d = 3;
  17. }
  18. }
  19. void service_position::notify(const std::string& msg, const std::string& id, const double& feq)
  20. {
  21. if(m_stop){
  22. log_info("[service-position] thread's running tag is true, m_stop=true");
  23. return;
  24. }
  25. if(0.001 > feq || 1000.0 < feq){
  26. // 频率非法
  27. log_info("[service-position] 频率非法,频率值:%.3f", feq);
  28. return;
  29. }
  30. log_info("[service-position] card_id=%s, freq=%.2f, json_pos: %s", id.c_str(), feq, msg.c_str());
  31. service_ptr _service = m_service;
  32. if(_service){
  33. boost::shared_ptr<std::string> p_buffer(new std::string(msg));
  34. _service->dispatch(boost::bind(&service_position::handle_buffer, shared_from_this(), p_buffer, id, feq, _service));
  35. }
  36. }
  37. void service_position::handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service)
  38. {
  39. if(m_stop){
  40. return;
  41. }
  42. info_send_ptr _info;
  43. auto iter = m_deques.find(id);
  44. if(m_deques.end() == iter){
  45. _info = std::make_shared<info_send>();
  46. _info->id = id;
  47. _info->feq = feq;
  48. _info->p_timer = std::make_shared<boost::asio::deadline_timer>(*p_service);
  49. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  50. // 为了避免回环引用问题,这里没有绑定std::shared_ptr<info_send>,而是使用std::weak_ptr,不过直接传std::shared_ptr也没事,service_position对象停止时会主动断开联系
  51. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
  52. init_cache(_info);
  53. m_deques.insert(std::make_pair(id, _info));
  54. }else{
  55. _info = iter->second;
  56. if(0.001 < fabs(_info->feq - feq)){
  57. _info->feq = feq;
  58. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  59. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
  60. init_cache(_info);
  61. }
  62. }
  63. info_data_ptr _data(new info_data());
  64. _data->p_msg = p_buffer;
  65. _data->time_receive = std::chrono::system_clock::now();
  66. _info->data.push_back(_data);
  67. }
  68. std::string service_position::to_str(const std::chrono::system_clock::time_point& t)
  69. {
  70. uint64_t mill = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count() - std::chrono::duration_cast<std::chrono::seconds>(t.time_since_epoch()).count()*1000;
  71. time_t tt = std::chrono::system_clock::to_time_t(t);
  72. struct tm local_time;
  73. localtime_r(&tt, &local_time);
  74. char _time[128];
  75. snprintf(_time, 128, "%d-%02d-%02d %02d:%02d:%02d.%03d", local_time.tm_year + 1900, local_time.tm_mon + 1, local_time.tm_mday, local_time.tm_hour,local_time.tm_min, local_time.tm_sec, (int)mill);
  76. return _time;
  77. }
  78. void service_position::handle_timer(const boost::system::error_code& ec, std::weak_ptr<info_send> p)
  79. {
  80. if(ec){
  81. return;
  82. }
  83. if(m_stop){
  84. return;
  85. }
  86. auto _info = p.lock();
  87. if(!_info){
  88. return;
  89. }
  90. if(!_info->data.empty()){
  91. if(_info->count_max < _info->data.size()){
  92. auto iter = _info->data.begin();
  93. for(std::size_t i = 0; i < _info->data.size() - _info->count_max; ++i){
  94. handle_notify((*iter)->p_msg);
  95. ++iter;
  96. }
  97. _info->data.erase(_info->data.begin(), iter);
  98. }
  99. auto _data = *(_info->data.begin());
  100. _info->data.pop_front();
  101. handle_notify(_data->p_msg);
  102. }
  103. _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
  104. _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, p));
  105. }
  106. void service_position::set_port(const int port)
  107. {
  108. m_port = port;
  109. }
  110. bool service_position::start()
  111. {
  112. try{
  113. if(0 >= m_port || 65536 < m_port){
  114. log_info("[service-position] port is invalid, port = %d", m_port);
  115. return false;
  116. }
  117. log_info("[service-position] ios service is running, listen port: %d", m_port);
  118. m_stop = false;
  119. m_service = service_ptr(new service_ptr::element_type());
  120. boost::asio::ip::tcp::endpoint point(boost::asio::ip::address::from_string("0.0.0.0"), m_port);
  121. m_acceptor = acceptor_ptr(new acceptor_ptr::element_type(*m_service, point));
  122. socket_ptr _socket = socket_ptr(new socket_ptr::element_type(*m_service));
  123. m_acceptor->async_accept(*_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, _socket));
  124. m_thread = boost::thread(boost::bind(&service_position::handle_thread, shared_from_this(), m_service));
  125. }catch(std::exception& ){
  126. return false;
  127. }
  128. return true;
  129. }
  130. void service_position::stop()
  131. {
  132. m_stop = true;
  133. service_ptr _service = m_service;
  134. if(_service){
  135. _service->post(boost::bind(&service_position::handle_stop, shared_from_this()));
  136. }
  137. m_thread.timed_join(boost::posix_time::seconds(1));
  138. }
  139. void service_position::handle_thread(service_ptr p_service)
  140. {
  141. while(true){
  142. try{
  143. boost::asio::io_service::work w(*p_service);
  144. p_service->run();
  145. }catch(boost::thread_interrupted&){
  146. break;
  147. }catch(std::exception& e){
  148. }
  149. boost::this_thread::sleep(boost::posix_time::minutes(1));
  150. }
  151. }
  152. void service_position::handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket)
  153. {
  154. if(m_stop){
  155. boost::system::error_code e;
  156. if(p_socket){
  157. p_socket->close(e);
  158. }
  159. return;
  160. }
  161. if(ec){
  162. boost::system::error_code e;
  163. if(p_socket){
  164. p_socket->close(e);
  165. }
  166. return;
  167. }
  168. m_sockets.push_back(p_socket);
  169. log_info("[service-position] client %s is connected.", p_socket->remote_endpoint().address().to_string().c_str());
  170. auto _service = m_service;
  171. if(!_service){
  172. return;
  173. }
  174. p_socket = socket_ptr(new socket_ptr::element_type(*m_service));
  175. m_acceptor->async_accept(*p_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, p_socket));
  176. }
  177. void service_position::handle_stop()
  178. {
  179. boost::system::error_code ec;
  180. service_ptr _service = m_service;
  181. m_service.reset();
  182. if(_service){
  183. _service->stop();
  184. }
  185. acceptor_ptr _acceptor = m_acceptor;
  186. m_acceptor.reset();
  187. if(m_acceptor){
  188. _acceptor->close(ec);
  189. }
  190. m_sockets.clear();
  191. }
  192. void service_position::handle_notify(buffer_ptr p_buffer)
  193. {
  194. if(m_stop){
  195. return;
  196. }
  197. //log_info("[service-position] send msg: %s", p_buffer->c_str());
  198. for(auto iter = m_sockets.begin(); iter != m_sockets.end(); ++iter){
  199. (*iter)->async_write_some(boost::asio::buffer(p_buffer->c_str(), p_buffer->size()), boost::bind(&service_position::handle_write, shared_from_this(), _1, _2, *iter, p_buffer));
  200. }
  201. }
  202. void service_position::handle_write(const boost::system::error_code&, unsigned int, socket_ptr, buffer_ptr)
  203. {}