Browse Source

add service_position

zhuyf 5 years ago
parent
commit
9707f7d56c
5 changed files with 532 additions and 14 deletions
  1. 225 0
      config.h
  2. 7 9
      main.cpp
  3. 232 0
      service_position.cpp
  4. 68 0
      service_position.h
  5. 0 5
      znet.cpp

+ 225 - 0
config.h

@@ -0,0 +1,225 @@
+/* config.h.  Generated from config.h.in by configure.  */
+/* config.h.in.  Generated from configure.ac by autoheader.  */
+
+/* Define to 1 if you have the `alarm' function. */
+#define HAVE_ALARM 1
+
+/* Define to 1 if you have the <arpa/inet.h> header file. */
+#define HAVE_ARPA_INET_H 1
+
+/* Define to 1 if you have the `atexit' function. */
+#define HAVE_ATEXIT 1
+
+/* Define to 1 if you have the `clock_gettime' function. */
+#define HAVE_CLOCK_GETTIME 1
+
+/* Define to 1 if you have the <fcntl.h> header file. */
+#define HAVE_FCNTL_H 1
+
+/* Define to 1 if you have the <float.h> header file. */
+#define HAVE_FLOAT_H 1
+
+/* Define to 1 if you have the `floor' function. */
+/* #undef HAVE_FLOOR */
+
+/* Define to 1 if you have the `gettimeofday' function. */
+#define HAVE_GETTIMEOFDAY 1
+
+/* Define to 1 if you have the `inet_ntoa' function. */
+#define HAVE_INET_NTOA 1
+
+/* Define to 1 if you have the <inttypes.h> header file. */
+#define HAVE_INTTYPES_H 1
+
+/* Define to 1 if you have the `boost_chrono' library (-lboost_chrono). */
+#define HAVE_LIBBOOST_CHRONO 1
+
+/* Define to 1 if you have the `boost_system' library (-lboost_system). */
+#define HAVE_LIBBOOST_SYSTEM 1
+
+/* Define to 1 if you have the `boost_thread' library (-lboost_thread). */
+#define HAVE_LIBBOOST_THREAD 1
+
+/* Define to 1 if you have the `ev' library (-lev). */
+#define HAVE_LIBEV 1
+
+/* Define to 1 if you have the `mysqlclient' library (-lmysqlclient). */
+#define HAVE_LIBMYSQLCLIENT 1
+
+/* Define to 1 if you have the `rt' library (-lrt). */
+#define HAVE_LIBRT 1
+
+/* Define to 1 if you have the `three_rates' library (-lthree_rates). */
+/* #undef HAVE_LIBTHREE_RATES */
+
+/* Define to 1 if you have the `zlog' library (-lzlog). */
+#define HAVE_LIBZLOG 1
+
+/* Define to 1 if you have the <limits.h> header file. */
+#define HAVE_LIMITS_H 1
+
+/* Define to 1 if you have the `localtime_r' function. */
+#define HAVE_LOCALTIME_R 1
+
+/* Define to 1 if your system has a GNU libc compatible `malloc' function, and
+   to 0 otherwise. */
+#define HAVE_MALLOC 0
+
+/* Define to 1 if you have the `memmove' function. */
+#define HAVE_MEMMOVE 1
+
+/* Define to 1 if you have the <memory.h> header file. */
+#define HAVE_MEMORY_H 1
+
+/* Define to 1 if you have the `memset' function. */
+#define HAVE_MEMSET 1
+
+/* Define to 1 if you have the `mkdir' function. */
+#define HAVE_MKDIR 1
+
+/* Define to 1 if you have the <netinet/in.h> header file. */
+#define HAVE_NETINET_IN_H 1
+
+/* Define to 1 if you have the `pow' function. */
+/* #undef HAVE_POW */
+
+/* Define to 1 if your system has a GNU libc compatible `realloc' function,
+   and to 0 otherwise. */
+#define HAVE_REALLOC 0
+
+/* Define to 1 if you have the `socket' function. */
+#define HAVE_SOCKET 1
+
+/* Define to 1 if you have the `sqrt' function. */
+/* #undef HAVE_SQRT */
+
+/* Define to 1 if you have the <stdint.h> header file. */
+#define HAVE_STDINT_H 1
+
+/* Define to 1 if you have the <stdlib.h> header file. */
+#define HAVE_STDLIB_H 1
+
+/* Define to 1 if you have the `stime' function. */
+#define HAVE_STIME 1
+
+/* Define to 1 if you have the `strchr' function. */
+#define HAVE_STRCHR 1
+
+/* Define to 1 if you have the <strings.h> header file. */
+#define HAVE_STRINGS_H 1
+
+/* Define to 1 if you have the <string.h> header file. */
+#define HAVE_STRING_H 1
+
+/* Define to 1 if you have the `strstr' function. */
+#define HAVE_STRSTR 1
+
+/* Define to 1 if you have the <sys/socket.h> header file. */
+#define HAVE_SYS_SOCKET_H 1
+
+/* Define to 1 if you have the <sys/stat.h> header file. */
+#define HAVE_SYS_STAT_H 1
+
+/* Define to 1 if you have the <sys/time.h> header file. */
+#define HAVE_SYS_TIME_H 1
+
+/* Define to 1 if you have the <sys/types.h> header file. */
+#define HAVE_SYS_TYPES_H 1
+
+/* Define to 1 if you have the <unistd.h> header file. */
+#define HAVE_UNISTD_H 1
+
+/* Define to 1 if the system has the type `_Bool'. */
+#define HAVE__BOOL 1
+
+/* Name of package */
+#define PACKAGE "yals"
+
+/* Define to the address where bug reports for this package should be sent. */
+#define PACKAGE_BUGREPORT "BUG-REPORT-ADDRESS"
+
+/* Define to the full name of this package. */
+#define PACKAGE_NAME "yals"
+
+/* Define to the full name and version of this package. */
+#define PACKAGE_STRING "yals 1.0"
+
+/* Define to the one symbol short name of this package. */
+#define PACKAGE_TARNAME "yals"
+
+/* Define to the home page for this package. */
+#define PACKAGE_URL ""
+
+/* Define to the version of this package. */
+#define PACKAGE_VERSION "1.0"
+
+/* Define to 1 if you have the ANSI C header files. */
+/* #undef STDC_HEADERS */
+
+/* Define to 1 if you can safely include both <sys/time.h> and <time.h>. */
+#define TIME_WITH_SYS_TIME 1
+
+/* Version number of package */
+#define VERSION "1.0"
+
+/* Define for Solaris 2.5.1 so the uint32_t typedef from <sys/synch.h>,
+   <pthread.h>, or <semaphore.h> is not used. If the typedef were allowed, the
+   #define below would cause a syntax error. */
+/* #undef _UINT32_T */
+
+/* Define for Solaris 2.5.1 so the uint64_t typedef from <sys/synch.h>,
+   <pthread.h>, or <semaphore.h> is not used. If the typedef were allowed, the
+   #define below would cause a syntax error. */
+/* #undef _UINT64_T */
+
+/* Define for Solaris 2.5.1 so the uint8_t typedef from <sys/synch.h>,
+   <pthread.h>, or <semaphore.h> is not used. If the typedef were allowed, the
+   #define below would cause a syntax error. */
+/* #undef _UINT8_T */
+
+/* Define to `__inline__' or `__inline' if that's what the C compiler
+   calls it, or to nothing if 'inline' is not supported under any name.  */
+#ifndef __cplusplus
+/* #undef inline */
+#endif
+
+/* Define to the type of a signed integer type of width exactly 16 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef int16_t */
+
+/* Define to the type of a signed integer type of width exactly 32 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef int32_t */
+
+/* Define to the type of a signed integer type of width exactly 64 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef int64_t */
+
+/* Define to the type of a signed integer type of width exactly 8 bits if such
+   a type exists and the standard includes do not define it. */
+/* #undef int8_t */
+
+/* Define to rpl_malloc if the replacement function should be used. */
+#define malloc rpl_malloc
+
+/* Define to rpl_realloc if the replacement function should be used. */
+#define realloc rpl_realloc
+
+/* Define to `unsigned int' if <sys/types.h> does not define. */
+/* #undef size_t */
+
+/* Define to the type of an unsigned integer type of width exactly 16 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef uint16_t */
+
+/* Define to the type of an unsigned integer type of width exactly 32 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef uint32_t */
+
+/* Define to the type of an unsigned integer type of width exactly 64 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef uint64_t */
+
+/* Define to the type of an unsigned integer type of width exactly 8 bits if
+   such a type exists and the standard includes do not define it. */
+/* #undef uint8_t */

+ 7 - 9
main.cpp

@@ -14,7 +14,6 @@
 #include "mine.h"
 #include "event.h"
 #include"module_service/module_mgr.h"
-
 #include <config_file.h>
 #include "three_rates.h"
 #include "mine_business.h"
@@ -22,6 +21,7 @@
 #include "websocket/web_connect.h"
 #include "forbid_staff_down_mine.h"
 #include "bulletin_broad_show.h"
+#include "service_position.h"
 
 config_file config;
 void handlereader(uint32_t readerid,bool duration,uint32_t t)
@@ -104,9 +104,7 @@ struct Init_Setting
 		    init_three_rates(dp);
         }
 
-
         log_info("Init_Setting::init  Success. \n" );
-
     }
 
 	void init_three_rates(const db_para& dbs)
@@ -164,15 +162,13 @@ void usage(char ** argv)
     }
     else if(cmd == "--version" || cmd == "-v")
     {
-        printf("yals (采集程序) 2.0.0 build 11\n");
+        printf("dcserver (采集程序) 1.0.0 build 11\n");
     }
     else
     {
         printf("Unknown option:%s\n",cmd.c_str());
         printf( "usage:yals [--version][--help][-v][-H]\n");
     }
-
-
 }
 
 int main(int argc ,char * argv[])
@@ -197,6 +193,11 @@ int main(int argc ,char * argv[])
     net_service mh;
     int port=config.get("service.port",4000);
 
+    int interface_port = config.get("service.interface_port",7001);
+    m_service = service_position_ptr(new service_position_ptr::element_type());
+    m_service->set_port(interface_port);
+    m_service->start();
+
     log_info("service_handle::instance(&mh)->run(%d)",port);
     std_info("service_handle::instance(&mh)->run(%d)",port);
 
@@ -209,6 +210,3 @@ int main(int argc ,char * argv[])
 
     return 0;
 }
-
-
-

+ 232 - 0
service_position.cpp

@@ -0,0 +1,232 @@
+#include "service_position.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <time.h>
+
+void service_position::init_cache(info_send_ptr& p_info)
+{
+    // 最大缓存频率个数数据,对于小于1Hz的卡,固定为1个
+    p_info->count_max = static_cast<std::size_t>(p_info->feq * 0.3);
+    if(0 == p_info->count_max){
+        p_info->count_max = 1;
+    }
+    // 因为后面是延迟一秒发送,所以缓存的个数时间总和要大于延迟的时间,这里在计算的count_max增加20%
+    std::size_t d = p_info->count_max * 0.2;
+    if(3 > d){
+        d = 3;
+    }
+}
+
+void service_position::notify(const std::string& msg, const std::string& id, const double& feq)
+{
+    if(m_stop){
+        return;
+    }
+
+    if(0.001 > feq || 1000.0 < feq){
+        // 频率非法
+        return;
+    }
+
+    service_ptr _service = m_service;
+    if(_service){
+        boost::shared_ptr<std::string> p_buffer(new std::string(msg));
+        _service->dispatch(boost::bind(&service_position::handle_buffer, shared_from_this(), p_buffer, id, feq, _service));
+    }
+}
+
+void service_position::handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service)
+{
+    if(m_stop){
+        return;
+    }
+
+    info_send_ptr _info;
+    auto iter = m_deques.find(id);
+    if(m_deques.end() == iter){
+        _info = std::make_shared<info_send>();
+        _info->id = id;
+        _info->feq = feq;
+        _info->p_timer = std::make_shared<boost::asio::deadline_timer>(*p_service);
+        _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
+        // 为了避免回环引用问题,这里没有绑定std::shared_ptr<info_send>,而是使用std::weak_ptr,不过直接传std::shared_ptr也没事,service_position对象停止时会主动断开联系
+        _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
+        init_cache(_info);
+        m_deques.insert(std::make_pair(id, _info));
+    }else{
+        _info = iter->second;
+        if(0.001 < fabs(_info->feq - feq)){
+            _info->feq = feq;
+            _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
+            _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, std::weak_ptr<info_send>(_info)));
+            init_cache(_info);
+        }
+    }
+
+    info_data_ptr _data(new info_data());
+    _data->p_msg = p_buffer;
+    _data->time_receive = std::chrono::system_clock::now();
+    _info->data.push_back(_data);
+}
+
+std::string service_position::to_str(const std::chrono::system_clock::time_point& t)
+{
+    uint64_t mill = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count() - std::chrono::duration_cast<std::chrono::seconds>(t.time_since_epoch()).count()*1000;
+    time_t tt = std::chrono::system_clock::to_time_t(t);
+    struct tm local_time;
+    localtime_r(&tt, &local_time);
+    char _time[128];
+    snprintf(_time, 128, "%d-%02d-%02d %02d:%02d:%02d.%03d", local_time.tm_year + 1900, local_time.tm_mon + 1, local_time.tm_mday, local_time.tm_hour,local_time.tm_min, local_time.tm_sec, (int)mill);
+
+    return _time;
+}
+
+void service_position::handle_timer(const boost::system::error_code& ec, std::weak_ptr<info_send> p)
+{
+    if(ec){
+        return;
+    }
+    if(m_stop){
+        return;
+    }
+
+    auto _info = p.lock();
+    if(!_info){
+        return;
+    }
+
+    if(!_info->data.empty()){
+        if(_info->count_max < _info->data.size()){
+            auto iter = _info->data.begin();
+            for(std::size_t i = 0; i < _info->data.size() - _info->count_max; ++i){
+                handle_notify((*iter)->p_msg);
+                ++iter;
+            }
+
+            _info->data.erase(_info->data.begin(), iter);
+        }
+
+        auto _data = *(_info->data.begin());
+        _info->data.pop_front();
+        handle_notify(_data->p_msg);
+    }
+
+    _info->p_timer->expires_from_now(boost::posix_time::milliseconds(static_cast<long long>(1000.0 / _info->feq)));
+    _info->p_timer->async_wait(boost::bind(&service_position::handle_timer, shared_from_this(), _1, p));
+}
+
+void service_position::set_port(const int port)
+{
+    m_port = port;
+}
+
+bool service_position::start()
+{
+    try{
+        if(0 >= m_port || 65536 < m_port){
+            return false;
+        }
+
+        m_stop = false;
+        m_service = service_ptr(new service_ptr::element_type());
+        boost::asio::ip::tcp::endpoint point(boost::asio::ip::address::from_string("0.0.0.0"), m_port);
+        m_acceptor = acceptor_ptr(new acceptor_ptr::element_type(*m_service, point));
+        socket_ptr _socket = socket_ptr(new socket_ptr::element_type(*m_service));
+        m_acceptor->async_accept(*_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, _socket));
+        m_thread = boost::thread(boost::bind(&service_position::handle_thread, shared_from_this(), m_service));
+    }catch(std::exception& ){
+        return false;
+    }
+
+    return true;
+}
+
+void service_position::stop()
+{
+    m_stop = true;
+    service_ptr _service = m_service;
+    if(_service){
+        _service->post(boost::bind(&service_position::handle_stop, shared_from_this()));
+    }
+
+    m_thread.timed_join(boost::posix_time::seconds(1));
+}
+
+
+void service_position::handle_thread(service_ptr p_service)
+{
+    while(true){
+        try{
+            boost::asio::io_service::work w(*p_service);
+            p_service->run();
+        }catch(boost::thread_interrupted&){
+            break;
+        }catch(std::exception& e){
+        }
+        boost::this_thread::sleep(boost::posix_time::minutes(1));
+    }
+}
+
+void service_position::handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket)
+{
+    if(m_stop){
+        boost::system::error_code e;
+        if(p_socket){
+            p_socket->close(e);
+        }
+
+        return;
+    }
+
+    if(ec){
+        boost::system::error_code e;
+        if(p_socket){
+            p_socket->close(e);
+        }
+
+        return;
+    }
+
+    m_sockets.push_back(p_socket);
+
+    auto _service = m_service;
+    if(!_service){
+        return;
+    }
+
+    p_socket = socket_ptr(new socket_ptr::element_type(*m_service));
+    m_acceptor->async_accept(*p_socket, boost::bind(&service_position::handle_acceptor, shared_from_this(), _1, p_socket));
+}
+
+void service_position::handle_stop()
+{
+    boost::system::error_code ec;
+
+    service_ptr _service = m_service;
+    m_service.reset();
+    if(_service){
+        _service->stop();
+    }
+
+    acceptor_ptr _acceptor = m_acceptor;
+    m_acceptor.reset();
+    if(m_acceptor){
+        _acceptor->close(ec);
+    }
+
+    m_sockets.clear();
+}
+
+void service_position::handle_notify(buffer_ptr p_buffer)
+{
+    if(m_stop){
+        return;
+    }
+
+    for(auto iter = m_sockets.begin(); iter != m_sockets.end(); ++iter){
+        (*iter)->async_write_some(boost::asio::buffer(p_buffer->c_str(), p_buffer->size()), boost::bind(&service_position::handle_write, shared_from_this(), _1, _2, *iter, p_buffer));
+    }
+}
+
+void service_position::handle_write(const boost::system::error_code&, unsigned int, socket_ptr, buffer_ptr)
+{}

+ 68 - 0
service_position.h

@@ -0,0 +1,68 @@
+#ifndef __service_position_h__
+#define __service_position_h__
+
+#include <string>
+#include <list>
+#include <boost/asio.hpp>
+#include <boost/thread.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/atomic.hpp>
+#include <chrono>
+
+struct info_data{
+    boost::shared_ptr<std::string> p_msg;
+    std::chrono::system_clock::time_point time_receive;
+};
+
+typedef std::shared_ptr<info_data> info_data_ptr;
+
+class info_send{
+    public:
+        std::string id;
+        boost::posix_time::ptime time_last_send;
+        std::list<info_data_ptr> data;
+        std::shared_ptr<boost::asio::deadline_timer> p_timer;
+        double feq;
+        std::size_t count_max = 3;
+};
+
+typedef std::shared_ptr<info_send> info_send_ptr;
+
+class service_position: public boost::enable_shared_from_this<service_position>{
+    public:
+        typedef boost::shared_ptr<boost::asio::io_service> service_ptr;
+        typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr;
+        typedef boost::shared_ptr<boost::asio::ip::tcp::acceptor> acceptor_ptr;
+        typedef boost::shared_ptr<std::string> buffer_ptr;
+    public:
+        virtual void notify(const std::string& msg, const std::string& id, const double& feq);
+        virtual void set_port(const int port);
+        virtual bool start();
+        virtual void stop();
+    protected:
+        int m_port;
+        service_ptr m_service;
+        std::list<socket_ptr> m_sockets;
+        acceptor_ptr m_acceptor;
+        boost::thread m_thread;
+        boost::atomic_bool m_stop;
+
+        std::map<std::string, info_send_ptr> m_deques;
+        std::mutex m_mutex;
+        std::shared_ptr<boost::asio::deadline_timer> mp_timer;
+    protected:
+        virtual void handle_thread(service_ptr p_service);
+        virtual void handle_acceptor(const boost::system::error_code& ec, socket_ptr p_socket);
+        virtual void handle_stop();
+        virtual void handle_notify(buffer_ptr pBuffer);
+        virtual void handle_write(const boost::system::error_code& ec, unsigned int bytes_transferred, socket_ptr p_socket, buffer_ptr p_buffer);
+        virtual void handle_buffer(buffer_ptr p_buffer, std::string id, double feq, service_ptr p_service);
+        virtual void handle_timer(const boost::system::error_code& ec, std::weak_ptr<info_send> p_info);
+        virtual std::string to_str(const std::chrono::system_clock::time_point& time);
+        virtual void init_cache(info_send_ptr& p_info);
+    public:
+        static service_position_ptr instance();
+};
+
+typedef boost::shared_ptr<service_position> service_position_ptr;
+#endif

+ 0 - 5
znet.cpp

@@ -644,13 +644,10 @@ struct main_loop:io_context
 		}
 
 		sock_listen _1(*this,fd);
-
 //		stdin_io _2(*this);
-
 		block_sig(SIGPIPE);
 		signal_w sint(*this,SIGINT),term(*this,SIGTERM);
 
-
 		while(!check_stop_flag())
 		{
 			try
@@ -692,5 +689,3 @@ service_handle*service_handle::instance(service_callback*sc)
 	sc->set_handle(&_impl);
 	return &_impl;
 }
-
-