#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_file.h" #include "crc.h" extern config_file config; struct client_ex:client { virtual void on_notify()=0; virtual void close_impl()=0; }; struct io_context: zloop> ,service_handle { private: service_callback&m_serv; ev::timer m_timer; public: std::vector> m_thread_clts; io_context(service_callback&serv) :m_serv(serv) ,m_timer(*this) { m_thread_clts.reserve(2048); m_timer.set(this); m_timer.start(0,1); } virtual ~io_context() { } void boardcast(const std::vector&msg) { for(const auto&i:m_thread_clts) { std::vector tmp(msg); i->send(std::move(tmp)); } } void on_timer() { // logn_info(1,"tick timer timeout."); m_serv.on_timer(); } void on_connect(const std::shared_ptr &clt) { if(clt->handle()>=(int)m_thread_clts.size()) { m_thread_clts.resize(clt->handle()+1); } m_thread_clts[clt->handle()]=clt; m_serv.on_connect(clt); } void on_close(const std::shared_ptr &clt) { m_serv.on_close(clt); m_thread_clts[clt->handle()].reset(); } void on_send_timeout(const std::shared_ptr &clt) { m_serv.on_send_timeout(clt); } void on_recv_timeout(const std::shared_ptr &clt) { m_serv.on_recv_timeout(clt); } void on_message(const std::shared_ptr& clt,const char*data,size_t len) { m_serv.on_message(clt,data,len); } void close_all() { for(const auto&clt:m_thread_clts) { if(!clt) continue; ((client_ex*)clt.get())->close_impl(); } } void on_async(const std::list>¬ify_clts) { for(const auto&clt:notify_clts) { ((client_ex*)clt.get())->on_notify(); } } void stop() { async_stop(); } }; struct fd_io:ev::io { io_context&m_ic; int m_fd; fd_io(io_context&ic,int fd,int ev_flag=EV_READ) :ev::io(ic) ,m_ic(ic) ,m_fd(fd) { zio::setblocking(fd,false); this->set(this); this->set(m_fd,ev_flag); this->start(); } void stop () throw () \ { ev::io::stop(); } io_context&context() { return m_ic; } virtual void operator()(ev::io &w, int)=0; virtual ~fd_io() { stop(); zio::close(m_fd); } }; struct stdin_io:fd_io { stdin_io(io_context&ic) :fd_io(ic,0) {} virtual void operator()(ev::io &w, int) { #if 0 char buf[256]; if(!gets(buf)) return; if(strchr(buf,'X')) { log_info("stdin input 'X', exiting..."); worker::instance()->stop(); stop(); m_ic.stop(); } #endif } }; struct sock_client:fd_io,client_ex { io_context&m_ic; std::string m_name; std::atomic m_close_flag{false}; int m_type=1;//site char *m_b{0}; int m_clen{0}; int m_size{1<<16}; int m_max_package_size{2048}; int m_recv_time_out; ev::timer m_recv_timer; ev::timer m_sync_timer; // ev::timer m_send_timer; std::mutex m_mutex; std::list> m_olist; std::vector m_obuf; size_t m_opos=0; bool m_can_write{false}; int m_site_id{-1}; // char m_timestamp[8]; sock_client(io_context&ic,const char*name,int fd,int recv_time_out,int max_package_size) :fd_io(ic,fd,EV_READ|EV_WRITE) ,m_ic(ic) ,m_name(name) ,m_recv_timer(ic) ,m_sync_timer(ic) { m_max_package_size=max_package_size; m_recv_time_out=recv_time_out; // m_recv_timer.set(ic); m_recv_timer.set(this); int recv_timeout_first=config.get("service.recv_timeout_first",10); m_recv_timer.start(recv_timeout_first,0); int site_sync=config.get("site_sync",0);//分站时间同步,考虑到双IP双机情况,缺省关闭 int site_sync_freq=config.get("site_sync.freq",10*60);//分站时间同步间隔 m_sync_timer.set(this); if(site_sync) { log_info("启动分站同步定时器:%s,%d",name,site_sync_freq); struct timeval tv; gettimeofday(&tv,0); m_sync_timer.start(2-tv.tv_usec/1000000.+0.01,site_sync_freq); } // m_send_timer.set(ic); // m_send_timer.set(5,0); // m_send_timer.set(this); // m_send_timer.start(); m_b=(char*)malloc(m_size); // m_timestamp[0]=0; } ~sock_client() { free(m_b); } #if 0 bool check_timestamp(const char*time) { //秒、分、时、天、周、月、年, 脑残的设计 char buf[6]; buf[0]=time[6]; buf[1]=time[5]; buf[2]=time[3]; buf[3]=time[2]; buf[4]=time[1]; buf[5]=time[0]; if(memcmp(m_timestamp,buf,6)<=0) { memcpy(m_timestamp,buf,6); return true; } return false; } #endif int type() { return m_type; } void set_conn_type(int t) { m_type=t; } void close() { m_close_flag.store(true); m_ic.async_request(shared_from_this()); } void send(std::vector&&b) { { std::unique_lock _lock(m_mutex); m_olist.push_back(std::move(b)); } m_ic.async_request(shared_from_this()); } void on_sync_timeout() { // 从第一个字节开始,分别表示毫秒(2字节)、秒、分、时、天、月、年 unsigned char buf[20]={0,13,0x78,0x3b}; struct timeval tv; gettimeofday(&tv,0); // tv.tv_sec+=365*86400; struct tm buff={0}; const struct tm*t=localtime_r(&tv.tv_sec,&buff); int p=4; buf[p++]=(tv.tv_usec/1000)&0xFF; buf[p++]=((tv.tv_usec/1000)>>8)&0xFF; buf[p++]=t->tm_sec; buf[p++]=t->tm_min; buf[p++]=t->tm_hour; buf[p++]=t->tm_mday; buf[p++]=t->tm_wday; buf[p++]=t->tm_mon+1; buf[p++]=t->tm_year-100; uint16_t ccrc=do_crc(buf+2,11); buf[p++]=ccrc>>8; buf[p++]=ccrc&0xff; std::vector tmp((char*)buf,(char*)buf+15); send(std::move(tmp)); logn_info(1,"分站同步:ip=%s,time=%d-%02d-%02d %02d:%02d:%02d.%03d",m_name.c_str(),buf[12]+2000,buf[11],buf[9],buf[8],buf[7],buf[6],buf[5]*256+buf[4]); } void on_send_timeout() { m_ic.on_send_timeout(shared_from_this()); } void on_recv_timeout() { m_ic.on_recv_timeout(shared_from_this()); logn_warn(1,"socket %s recv timeout.",m_name.c_str()); close_impl(); } std::string name() { return m_name; } int handle() { return m_fd; } void grow_buf(int len) { if(m_size-m_clen>=len) return; int size=m_size; while(m_size-m_clen0) { m_clen+=rc; continue; } if(rc==-2) return 0; if(rc==0) { logn_info(1,"socket %d(%s) close by remote",m_fd,m_name.c_str()); } else if(rc==-1) { logn_errno(1,"hava a error on socket %d(%s)",m_fd,m_name.c_str()); } return -1; } return 0; } void close_impl() { m_recv_timer.stop(); fd_io::stop(); logn_info(1,"socket %s closed.",m_name.c_str()); m_ic.on_close(shared_from_this()); } size_t calc_length(uint8_t*b)const { return (b[0]<<8)|b[1]; } int io_read() { if(read_clt()<0) return -1; try { int msg_len; for(;m_clen>=2;) { msg_len=calc_length((uint8_t*)m_b)+2; if(msg_len>m_max_package_size) { logn_error(1,"package too big:%d/%d,close socket. site=%s.",msg_len,m_max_package_size,m_name.c_str()); return -1; } if(msg_len<=2) { logn_error(1,"package too small:%d,close socket. site=%s.",msg_len,m_name.c_str()); return -1; } if(m_clen _lock(m_mutex); if(m_olist.empty()) break; m_obuf.swap(m_olist.front()); m_olist.pop_front(); m_opos=0; } int left=m_obuf.size()-m_opos; int rc=zio::write(m_fd,&*m_obuf.begin()+m_opos,left); if(rc>0) { m_opos+=rc; if(m_opos==m_obuf.size()) continue; } else if(rc==0||rc==-2)//缓冲区满 { m_can_write=false; set(EV_READ|EV_WRITE); // m_send_timer.set(5); // m_send_timer.start(); break; } else { logn_errno(1,"zio::write(%d,ptr,%d)",m_fd,m_obuf.size()-m_opos); return -1; } } if(m_olist.empty() && m_can_write) { set(EV_READ); // m_send_timer.stop(); } return 0; } void operator()(ev::io &w, int flag) { if(flag & EV_WRITE) { logn_debug(1,"socket %d(%s) can write,flag=%d." ,m_fd,m_name.c_str(),flag); m_can_write=true; // m_send_timer.stop(); if(io_write()<0) { close_impl(); return; } } if(flag & EV_READ) { // log_debug("socket %d(%s) can read,flag=%d." ,m_fd,m_name.c_str(),flag); // zclock c; if(io_read()<0) { close_impl(); return; } // log_info("use time %d ms.",c.count_us()); m_recv_timer.start(m_recv_time_out); } } bool check_crc(void *b0,size_t mlen) { uint8_t*b=(uint8_t*)b0; uint16_t crc=b[mlen-2]; crc<<=8; crc|=b[mlen-1]; uint16_t ccrc=do_crc((unsigned char*)b+2,mlen-4); return ccrc==crc; } void on_message(const char*m,int mlen) { m_ic.on_message(shared_from_this(),m,mlen); } void on_notify() { if(m_close_flag) { close_impl(); return; } io_write(); } }; struct sock_listen: fd_io { sock_listen(io_context&ic,int fd):fd_io(ic,fd){} int recv_time_out=config.get("service.recv_timeout",3); int max_package_size=config.get("service.max_package",2048); void operator()(ev::io &w, int) { char name[32]; int fd=zio::accept(m_fd,name); if(fd<0) { logn_errno(1,"socket %s connect error.",name); return; } zio::setiobuf(fd,32<<10,32<<10); m_ic.on_connect(std::make_shared(m_ic,name,fd,recv_time_out,max_package_size)); logn_info(1,"socket %s connected.",name); } }; struct signal_w:ev::sig { io_context&m_ic; signal_w(io_context&ic, int s) :ev::sig(ic) ,m_ic(ic) { //this->set(m_ic); this->set(this); this->set(s); this->start(); } void operator()(ev::sig &w, int s) { log_info("recved signal %d",s); worker::instance()->stop(); stop(); m_ic.stop(); } }; struct main_loop:io_context { main_loop(service_callback&sc) :io_context(sc) { } virtual int run(int port) { int fd=zio::listen_on(port); if(fd<0) { log_errno("try listen_on %d",port); return -1; } sock_listen _1(*this,fd); // stdin_io _2(*this); block_sig(SIGPIPE); signal_w sint(*this,SIGINT),term(*this,SIGTERM); while(!check_stop_flag()) { try { ev::dynamic_loop::run(0); } catch(const std::exception&e) { log_error("捕获到异常:%s",e.what()); } catch(...) { log_error("捕获到未知异常"); } } _1.stop(); close_all(); return 0; } void block_sig(int sig) { sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, sig); if(pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) == -1) { log_errno("block signal:%d",sig); } } }; service_handle*service_handle::instance(service_callback*sc) { static main_loop _impl(*sc); sc->set_handle(&_impl); return &_impl; }