#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_file.h" extern config_file config; struct client_ex:client { virtual void on_notify()=0; virtual void close_impl()=0; }; struct io_context: zloop> ,service_handle { private: service_callback&m_serv; public: std::vector> m_thread_clts; io_context(service_callback&serv) :m_serv(serv) { m_thread_clts.reserve(2048); } virtual ~io_context() { } void boardcast(const std::vector&msg) { for(auto i:m_thread_clts) { std::vector tmp(msg); i->send(std::move(tmp)); } } void on_connect(std::shared_ptr clt) { if(clt->handle()>=(int)m_thread_clts.size()) { m_thread_clts.resize(clt->handle()+1); } m_thread_clts[clt->handle()]=clt; m_serv.on_connect(clt); } void on_close(std::shared_ptr clt) { m_serv.on_close(clt); m_thread_clts[clt->handle()].reset(); } void on_send_timeout(std::shared_ptr clt) { m_serv.on_send_timeout(clt); } void on_recv_timeout(std::shared_ptr clt) { m_serv.on_recv_timeout(clt); } void on_message(std::shared_ptr clt,const char*data,size_t len) { m_serv.on_message(clt,data,len); } void close_all() { for(auto clt:m_thread_clts) { if(!clt) continue; ((client_ex*)clt.get())->close_impl(); } } void on_async(const std::list>¬ify_clts) { for(auto&clt:notify_clts) { ((client_ex*)clt.get())->on_notify(); } } void stop() { async_stop(); } }; struct fd_io:ev::io { io_context&m_ic; int m_fd; fd_io(io_context&ic,int fd,int ev_flag=EV_READ) :ev::io(ic) ,m_ic(ic) ,m_fd(fd) { zio::setblocking(fd,false); this->set(this); this->set(m_fd,ev_flag); this->start(); } void stop () throw () \ { ev::io::stop(); } io_context&context() { return m_ic; } virtual void operator()(ev::io &w, int)=0; virtual ~fd_io() { stop(); zio::close(m_fd); } }; struct stdin_io:fd_io { stdin_io(io_context&ic) :fd_io(ic,0) {} virtual void operator()(ev::io &w, int) { #if 0 char buf[256]; if(!gets(buf)) return; if(strchr(buf,'X')) { log_info("stdin input 'X', exiting..."); worker::instance()->stop(); stop(); m_ic.stop(); } #endif } }; struct sock_client:fd_io,client_ex { io_context&m_ic; std::string m_name; std::atomic m_close_flag{false}; int m_type=0;//site char *m_b{0}; int m_clen{0}; int m_size{1<<16}; int m_max_package_size{4096}; ev::timer m_recv_timer; // ev::timer m_send_timer; std::mutex m_mutex; std::list> m_olist; std::vector m_obuf; size_t m_opos=0; bool m_can_write{false}; sock_client(io_context&ic,const char*name,int fd,int recv_time_out,int max_package_size) :fd_io(ic,fd,EV_READ|EV_WRITE) ,m_ic(ic) ,m_name(name) { m_max_package_size=max_package_size; m_recv_timer.set(ic); m_recv_timer.set(recv_time_out,0); m_recv_timer.set(this); m_recv_timer.start(); // m_send_timer.set(ic); // m_send_timer.set(5,0); // m_send_timer.set(this); // m_send_timer.start(); m_b=(char*)malloc(m_size); } ~sock_client() { free(m_b); } int type() { return m_type; } void close() { m_close_flag.store(true); m_ic.async_request(shared_from_this()); } void send(std::vector&&b) { { std::unique_lock _lock(m_mutex); m_olist.push_back(std::move(b)); } m_ic.async_request(shared_from_this()); } void on_send_timeout() { m_ic.on_send_timeout(shared_from_this()); } void on_recv_timeout() { m_ic.on_recv_timeout(shared_from_this()); } std::string name() { return m_name; } int handle() { return m_fd; } void grow_buf(int len) { if(m_size-m_clen>=len) return; int size=m_size; while(m_size-m_clen0) { m_clen+=rc; continue; } if(rc==-2) return 0; if(rc==0) { log_info("socket %d(%s) close by remote",m_fd,m_name.c_str()); } else if(rc==-1) { log_errno("hava a error on socket %d(%s)",m_fd,m_name.c_str()); } return -1; } return 0; } void close_impl() { m_recv_timer.stop(); // m_send_timer.stop(); fd_io::stop(); m_ic.on_close(shared_from_this()); } size_t calc_length(uint8_t*b)const { return (b[0]<<8)|b[1]; } int io_read() { if(read_clt()<0) return -1; int msg_len; for(;m_clen>=2;) { msg_len=calc_length((uint8_t*)m_b)+2; if(msg_len>m_max_package_size) return -1; if(m_clen _lock(m_mutex); if(m_olist.empty()) break; m_obuf.swap(m_olist.front()); m_olist.pop_front(); m_opos=0; } int left=m_obuf.size()-m_opos; int rc=zio::write(m_fd,&*m_obuf.begin()+m_opos,left); if(rc>0) { m_opos+=rc; if(m_opos==m_obuf.size()) continue; } else if(rc==0||rc==-2)//缓冲区满 { m_can_write=false; set(EV_READ|EV_WRITE); // m_send_timer.set(5); // m_send_timer.start(); break; } else { log_errno("zio::write(%d,ptr,%d)",m_fd,m_obuf.size()-m_opos); return -1; } } if(m_olist.empty() && m_can_write) { set(EV_READ); // m_send_timer.stop(); } return 0; } void operator()(ev::io &w, int flag) { if(flag & EV_WRITE) { log_debug("socket %d(%s) can write,flag=%d." ,m_fd,m_name.c_str(),flag); m_can_write=true; // m_send_timer.stop(); if(io_write()<0) { close_impl(); return; } } if(flag & EV_READ) { // log_debug("socket %d(%s) can read,flag=%d." ,m_fd,m_name.c_str(),flag); // zclock c; if(io_read()<0) { close_impl(); return; } // log_info("use time %d ms.",c.count_us()); m_recv_timer.set(5); } } bool check_crc(const char*b,size_t mlen) { return true; } void on_message(const char*m,int mlen) { m_ic.on_message(shared_from_this(),m,mlen); } void on_notify() { if(m_close_flag) { close_impl(); return; } io_write(); } }; struct sock_listen: fd_io { sock_listen(io_context&ic,int fd):fd_io(ic,fd){} int recv_time_out=config.get("service.recv_timeout",30); int max_package_size=config.get("service.max_package",2048); void operator()(ev::io &w, int) { char name[32]; int fd=zio::accept(m_fd,name); if(fd<0) return; zio::setiobuf(fd,32<<10,32<<10); m_ic.on_connect(std::make_shared(m_ic,name,fd,recv_time_out,max_package_size)); } }; struct signal_w:ev::sig { io_context&m_ic; signal_w(io_context&ic, int s) :m_ic(ic) { this->set(m_ic); this->set(this); this->set(s); this->start(); } void operator()(ev::sig &w, int s) { log_info("recved signal %d",s); worker::instance()->stop(); stop(); m_ic.stop(); } }; struct main_loop:io_context { main_loop(service_callback&sc) :io_context(sc) { } virtual int run(int port) { int fd=zio::listen_on(port); if(fd<0) { log_errno("try listen_on %d",port); return -1; } sock_listen _1(*this,fd); stdin_io _2(*this); block_sig(SIGPIPE); signal_w sint(*this,SIGINT),term(*this,SIGTERM); ev::dynamic_loop::run(0); _1.stop(); close_all(); return 0; } void block_sig(int sig) { sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, sig); if(pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) == -1) { log_errno("block signal:%d",sig); } } }; service_handle*service_handle::instance(service_callback*sc) { static main_loop _impl(*sc); sc->set_handle(&_impl); return &_impl; }