#include #include "service_position.h" #include #include #include void service_position::init_cache(info_send_ptr& p_info) { // 最大缓存频率个数数据,对于小于1Hz的卡,固定为1个 p_info->count_max = static_cast(p_info->feq * 0.3); if(0 == p_info->count_max){ p_info->count_max = 1; } // 因为后面是延迟一秒发送,所以缓存的个数时间总和要大于延迟的时间,这里在计算的count_max增加20% std::size_t d = p_info->count_max * 0.2; if(3 > d){ d = 3; } } void service_position::notify(const std::string& msg, const std::string& id, const double& feq) { if(m_stop){ log_info("[service-position] thread's running tag is true, m_stop=true"); return; } if(0.001 > feq || 1000.0 < feq){ // 频率非法 log_info("[service-position] 频率非法,频率值:%.3f", feq); return; } log_info("[service-position] card_id=%s, freq=%.2f, json_pos: %s", id.c_str(), feq, msg.c_str()); service_ptr _service = m_service; if(_service){ boost::shared_ptr p_buffer(new std::string(msg)); _service->dispatch(boost::bind(&service_position::handle_buffer, shared_from_this(), p_buffer, id, feq, _service)); } } void service_position::handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service) { if(m_stop){ return; } info_send_ptr _info; auto iter = m_deques.find(id); if(m_deques.end() == iter){ _info = std::make_shared(); _info->id = id; _info->feq = feq; _info->p_timer = std::make_shared(*p_service); _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast(1000.0 / _info->feq))); // 为了避免回环引用问题,这里没有绑定std::shared_ptr,而是使用std::weak_ptr,不过直接传std::shared_ptr也没事,service_position对象停止时会主动断开联系 _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr(_info))); init_cache(_info); m_deques.insert(std::make_pair(id, _info)); }else{ _info = iter->second; if(0.001 < fabs(_info->feq - feq)){ _info->feq = feq; _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast(1000.0 / _info->feq))); _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr(_info))); init_cache(_info); } } info_data_ptr _data(new info_data()); _data->p_msg = p_buffer; _data->time_receive = std::chrono::system_clock::now(); _info->data.push_back(_data); } std::string service_position::to_str(const std::chrono::system_clock::time_point& t) { uint64_t mill = std::chrono::duration_cast(t.time_since_epoch()).count() - std::chrono::duration_cast(t.time_since_epoch()).count()*1000; time_t tt = std::chrono::system_clock::to_time_t(t); struct tm local_time; localtime_r(&tt, &local_time); char _time[128]; snprintf(_time, 128, "%d-%02d-%02d %02d:%02d:%02d.%03d", local_time.tm_year + 1900, local_time.tm_mon + 1, local_time.tm_mday, local_time.tm_hour,local_time.tm_min, local_time.tm_sec, (int)mill); return _time; } void service_position::handle_timer(const boost::system::error_code& ec, std::weak_ptr p) { if(ec){ return; } if(m_stop){ return; } auto _info = p.lock(); if(!_info){ return; } if(!_info->data.empty()){ if(_info->count_max < _info->data.size()){ auto iter = _info->data.begin(); for(std::size_t i = 0; i < _info->data.size() - _info->count_max; ++i){ handle_notify((*iter)->p_msg); ++iter; } _info->data.erase(_info->data.begin(), iter); } auto _data = *(_info->data.begin()); _info->data.pop_front(); handle_notify(_data->p_msg); } _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast(1000.0 / _info->feq))); _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, p)); } void service_position::set_port(const int port) { m_port = port; } bool service_position::start() { try{ if(0 >= m_port || 65536 < m_port){ log_info("[service-position] port is invalid, port = %d", m_port); return false; } log_info("[service-position] ios service is running, listen port: %d", m_port); m_stop = false; m_service = service_ptr(new service_ptr::element_type()); boost::asio::ip::tcp::endpoint point(boost::asio::ip::address::from_string("0.0.0.0"), m_port); m_acceptor = acceptor_ptr(new acceptor_ptr::element_type(*m_service, point)); socket_ptr _socket = socket_ptr(new socket_ptr::element_type(*m_service)); m_acceptor->async_accept(*_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, _socket)); m_thread = boost::thread(boost::bind(&service_position::handle_thread, shared_from_this(), m_service)); }catch(std::exception& ){ return false; } return true; } void service_position::stop() { m_stop = true; service_ptr _service = m_service; if(_service){ _service->post(boost::bind(&service_position::handle_stop, shared_from_this())); } m_thread.timed_join(boost::posix_time::seconds(1)); } void service_position::handle_thread(service_ptr p_service) { while(true){ try{ boost::asio::io_service::work w(*p_service); p_service->run(); }catch(boost::thread_interrupted&){ break; }catch(std::exception& e){ } boost::this_thread::sleep(boost::posix_time::minutes(1)); } } void service_position::handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket) { if(m_stop){ boost::system::error_code e; if(p_socket){ p_socket->close(e); } return; } if(ec){ boost::system::error_code e; if(p_socket){ p_socket->close(e); } return; } m_sockets.push_back(p_socket); log_info("[service-position] client %s is connected.", p_socket->remote_endpoint().address().to_string().c_str()); auto _service = m_service; if(!_service){ return; } p_socket = socket_ptr(new socket_ptr::element_type(*m_service)); m_acceptor->async_accept(*p_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, p_socket)); } void service_position::handle_stop() { boost::system::error_code ec; service_ptr _service = m_service; m_service.reset(); if(_service){ _service->stop(); } acceptor_ptr _acceptor = m_acceptor; m_acceptor.reset(); if(m_acceptor){ _acceptor->close(ec); } m_sockets.clear(); } void service_position::handle_notify(buffer_ptr p_buffer) { if(m_stop){ return; } //log_info("[service-position] send msg: %s", p_buffer->c_str()); for(auto iter = m_sockets.begin(); iter != m_sockets.end(); ++iter){ (*iter)->async_write_some(boost::asio::buffer(p_buffer->c_str(), p_buffer->size()), boost::bind(&service_position::handle_write, shared_from_this(), _1, _2, *iter, p_buffer)); } } void service_position::handle_write(const boost::system::error_code&, unsigned int, socket_ptr, buffer_ptr) {}