123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- #include <log.h>
- #include "service_position.h"
- #include <boost/bind.hpp>
- #include <boost/format.hpp>
- #include <time.h>
- void service_position::init_cache(info_send_ptr& p_info)
- {
- // 最大缓存频率个数数据,对于小于1Hz的卡,固定为1个
- p_info->count_max = static_cast<std::size_t>(p_info->feq * 0.3);
- if(0 == p_info->count_max){
- p_info->count_max = 1;
- }
- // 因为后面是延迟一秒发送,所以缓存的个数时间总和要大于延迟的时间,这里在计算的count_max增加20%
- std::size_t d = p_info->count_max * 0.2;
- if(3 > d){
- d = 3;
- }
- }
- void service_position::notify(const std::string& msg, const std::string& id, const double& feq)
- {
- if(m_stop){
- log_info("[service-position] thread's running tag is true, m_stop=true");
- return;
- }
- if(0.001 > feq || 1000.0 < feq){
- // 频率非法
- log_info("[service-position] 频率非法,频率值:%.3f", feq);
- return;
- }
- log_info("[service-position] card_id=%s, freq=%.2f, json_pos: %s", id.c_str(), feq, msg.c_str());
- service_ptr _service = m_service;
- if(_service){
- boost::shared_ptr<std::string> p_buffer(new std::string(msg));
- _service->dispatch(boost::bind(&service_position::handle_buffer, shared_from_this(), p_buffer, id, feq, _service));
- }
- }
- void service_position::handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service)
- {
- if(m_stop){
- return;
- }
- info_send_ptr _info;
- auto iter = m_deques.find(id);
- if(m_deques.end() == iter){
- _info = std::make_shared<info_send>();
- _info->id = id;
- _info->feq = feq;
- _info->p_timer = std::make_shared<boost::asio::deadline_timer>(*p_service);
- _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
- // 为了避免回环引用问题,这里没有绑定std::shared_ptr<info_send>,而是使用std::weak_ptr,不过直接传std::shared_ptr也没事,service_position对象停止时会主动断开联系
- _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
- init_cache(_info);
- m_deques.insert(std::make_pair(id, _info));
- }else{
- _info = iter->second;
- if(0.001 < fabs(_info->feq - feq)){
- _info->feq = feq;
- _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
- _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
- init_cache(_info);
- }
- }
- info_data_ptr _data(new info_data());
- _data->p_msg = p_buffer;
- _data->time_receive = std::chrono::system_clock::now();
- _info->data.push_back(_data);
- }
- std::string service_position::to_str(const std::chrono::system_clock::time_point& t)
- {
- uint64_t mill = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count() - std::chrono::duration_cast<std::chrono::seconds>(t.time_since_epoch()).count()*1000;
- time_t tt = std::chrono::system_clock::to_time_t(t);
- struct tm local_time;
- localtime_r(&tt, &local_time);
- char _time[128];
- snprintf(_time, 128, "%d-%02d-%02d %02d:%02d:%02d.%03d", local_time.tm_year + 1900, local_time.tm_mon + 1, local_time.tm_mday, local_time.tm_hour,local_time.tm_min, local_time.tm_sec, (int)mill);
- return _time;
- }
- void service_position::handle_timer(const boost::system::error_code& ec, std::weak_ptr<info_send> p)
- {
- if(ec){
- return;
- }
- if(m_stop){
- return;
- }
- auto _info = p.lock();
- if(!_info){
- return;
- }
- if(!_info->data.empty()){
- if(_info->count_max < _info->data.size()){
- auto iter = _info->data.begin();
- for(std::size_t i = 0; i < _info->data.size() - _info->count_max; ++i){
- handle_notify((*iter)->p_msg);
- ++iter;
- }
- _info->data.erase(_info->data.begin(), iter);
- }
- auto _data = *(_info->data.begin());
- _info->data.pop_front();
- handle_notify(_data->p_msg);
- }
- _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
- _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, p));
- }
- void service_position::set_port(const int port)
- {
- m_port = port;
- }
- bool service_position::start()
- {
- try{
- if(0 >= m_port || 65536 < m_port){
- log_info("[service-position] port is invalid, port = %d", m_port);
- return false;
- }
- log_info("[service-position] ios service is running, listen port: %d", m_port);
- m_stop = false;
- m_service = service_ptr(new service_ptr::element_type());
- boost::asio::ip::tcp::endpoint point(boost::asio::ip::address::from_string("0.0.0.0"), m_port);
- m_acceptor = acceptor_ptr(new acceptor_ptr::element_type(*m_service, point));
- socket_ptr _socket = socket_ptr(new socket_ptr::element_type(*m_service));
- m_acceptor->async_accept(*_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, _socket));
- m_thread = boost::thread(boost::bind(&service_position::handle_thread, shared_from_this(), m_service));
- }catch(std::exception& ){
- return false;
- }
- return true;
- }
- void service_position::stop()
- {
- m_stop = true;
- service_ptr _service = m_service;
- if(_service){
- _service->post(boost::bind(&service_position::handle_stop, shared_from_this()));
- }
- m_thread.timed_join(boost::posix_time::seconds(1));
- }
- void service_position::handle_thread(service_ptr p_service)
- {
- while(true){
- try{
- boost::asio::io_service::work w(*p_service);
- p_service->run();
- }catch(boost::thread_interrupted&){
- break;
- }catch(std::exception& e){
- }
- boost::this_thread::sleep(boost::posix_time::minutes(1));
- }
- }
- void service_position::handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket)
- {
- if(m_stop){
- boost::system::error_code e;
- if(p_socket){
- p_socket->close(e);
- }
- return;
- }
- if(ec){
- boost::system::error_code e;
- if(p_socket){
- p_socket->close(e);
- }
- return;
- }
- m_sockets.push_back(p_socket);
- log_info("[service-position] client %s is connected.", p_socket->remote_endpoint().address().to_string().c_str());
- auto _service = m_service;
- if(!_service){
- return;
- }
- p_socket = socket_ptr(new socket_ptr::element_type(*m_service));
- m_acceptor->async_accept(*p_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, p_socket));
- }
- void service_position::handle_stop()
- {
- boost::system::error_code ec;
- service_ptr _service = m_service;
- m_service.reset();
- if(_service){
- _service->stop();
- }
- acceptor_ptr _acceptor = m_acceptor;
- m_acceptor.reset();
- if(m_acceptor){
- _acceptor->close(ec);
- }
- m_sockets.clear();
- }
- void service_position::handle_notify(buffer_ptr p_buffer)
- {
- if(m_stop){
- return;
- }
- //log_info("[service-position] send msg: %s", p_buffer->c_str());
- for(auto iter = m_sockets.begin(); iter != m_sockets.end(); ++iter){
- (*iter)->async_write_some(boost::asio::buffer(p_buffer->c_str(), p_buffer->size()), boost::bind(&service_position::handle_write, shared_from_this(), _1, _2, *iter, p_buffer));
- }
- }
- void service_position::handle_write(const boost::system::error_code&, unsigned int, socket_ptr, buffer_ptr)
- {}
|