#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_file.h" #include "crc.h" #include "module_service/module_device_net.h" extern config_file config; struct client_ex:client { virtual void on_notify()=0; virtual void close_impl()=0; }; struct io_context: zloop> ,service_handle { private: service_callback&m_serv; ev::timer m_timer; public: std::vector> m_thread_clts; io_context(service_callback&serv) :m_serv(serv) ,m_timer(*this) { m_thread_clts.reserve(2048); m_timer.set(this); m_timer.start(0,1); } virtual ~io_context() { } void boardcast(const std::vector&msg) { for(const auto&i:m_thread_clts) { std::vector tmp(msg); i->send(std::move(tmp)); } } void on_timer() { m_serv.on_timer(); } void on_connect(const std::shared_ptr &clt) { if(clt->handle()>=(int)m_thread_clts.size()) { m_thread_clts.resize(clt->handle()+1); } m_thread_clts[clt->handle()]=clt; m_serv.on_connect(clt); } void on_close(const std::shared_ptr &clt) { m_serv.on_close(clt); m_thread_clts[clt->handle()].reset(); } void on_send_timeout(const std::shared_ptr &clt) { m_serv.on_send_timeout(clt); } void on_recv_timeout(const std::shared_ptr &clt) { m_serv.on_recv_timeout(clt); } void on_message(const std::shared_ptr& clt,const char*data,size_t len) { m_serv.on_message(clt,data,len); } void close_all() { for(const auto&clt:m_thread_clts) { if(!clt) continue; ((client_ex*)clt.get())->close_impl(); } } void on_async(const std::list>¬ify_clts) { for(const auto&clt:notify_clts) { ((client_ex*)clt.get())->on_notify(); } } void stop() { async_stop(); } }; struct fd_io:ev::io { io_context&m_ic; int m_fd; fd_io(io_context&ic,int fd,int ev_flag=EV_READ) :ev::io(ic) ,m_ic(ic) ,m_fd(fd) { zio::setblocking(fd,false); this->set(this); this->set(m_fd,ev_flag); this->start(); } void stop () throw () \ { ev::io::stop(); } io_context&context() { return m_ic; } virtual void operator()(ev::io &w, int)=0; virtual ~fd_io() { stop(); zio::close(m_fd); } }; struct stdin_io:fd_io { stdin_io(io_context&ic) :fd_io(ic,0) {} virtual void operator()(ev::io &w, int) { #if 0 char buf[256]; if(!gets(buf)) return; if(strchr(buf,'X')) { log_info("stdin input 'X', exiting..."); worker::instance()->stop(); stop(); m_ic.stop(); } #endif } }; struct sock_client:fd_io,client_ex { io_context&m_ic; std::string m_name; std::atomic m_close_flag{false}; int m_type=1;//site char *m_b{0}; int m_clen{0}; int m_size{1<<16}; int m_max_package_size{2048}; int m_recv_time_out; ev::timer m_recv_timer; ev::timer m_sync_timer; // ev::timer m_send_timer; std::mutex m_mutex; std::list> m_olist; std::vector m_obuf; size_t m_opos=0; bool m_can_write{false}; int m_site_id{-1}; sock_client(io_context&ic,const char*name,int fd,int recv_time_out,int max_package_size) :fd_io(ic,fd,EV_READ|EV_WRITE) ,m_ic(ic) ,m_name(name) ,m_recv_timer(ic) ,m_sync_timer(ic) { m_max_package_size=max_package_size; m_recv_time_out=recv_time_out; m_recv_timer.set(this); int recv_timeout_first=config.get("service.recv_timeout_first",10); m_recv_timer.start(recv_timeout_first,0); int site_sync=config.get("site_sync",0);//分站时间同步,考虑到双IP双机情况,缺省关闭 int site_sync_freq=config.get("site_sync.freq",10*60);//分站时间同步间隔 m_sync_timer.set(this); if(site_sync) { log_info("启动分站同步定时器:%s,%d",name,site_sync_freq); struct timeval tv; gettimeofday(&tv,0); m_sync_timer.start(2-tv.tv_usec/1000000.+0.01,site_sync_freq); } // m_send_timer.set(ic); // m_send_timer.set(5,0); // m_send_timer.set(this); // m_send_timer.start(); m_b=(char*)malloc(m_size); // m_timestamp[0]=0; } ~sock_client() { free(m_b); } #if 0 bool check_timestamp(const char*time) { //秒、分、时、天、周、月、年, 脑残的设计 char buf[6]; buf[0]=time[6]; buf[1]=time[5]; buf[2]=time[3]; buf[3]=time[2]; buf[4]=time[1]; buf[5]=time[0]; if(memcmp(m_timestamp,buf,6)<=0) { memcpy(m_timestamp,buf,6); return true; } return false; } #endif int type() { return m_type; } void set_conn_type(int t) { m_type=t; } void close() { m_close_flag.store(true); m_ic.async_request(shared_from_this()); } void send(std::vector&&b) { { std::unique_lock _lock(m_mutex); m_olist.push_back(std::move(b)); } m_ic.async_request(shared_from_this()); } void on_sync_timeout() { // 从第一个字节开始,分别表示毫秒(2字节)、秒、分、时、天、月、年 unsigned char buf[20]={0,13,0x78,0x3b}; struct timeval tv; gettimeofday(&tv,0); struct tm buff={0}; const struct tm*t=localtime_r(&tv.tv_sec,&buff); int p=4; buf[p++]=(tv.tv_usec/1000)&0xFF; buf[p++]=((tv.tv_usec/1000)>>8)&0xFF; buf[p++]=t->tm_sec; buf[p++]=t->tm_min; buf[p++]=t->tm_hour; buf[p++]=t->tm_mday; buf[p++]=t->tm_wday; buf[p++]=t->tm_mon+1; buf[p++]=t->tm_year-100; uint16_t ccrc=do_crc(buf+2,11); buf[p++]=ccrc>>8; buf[p++]=ccrc&0xff; std::vector tmp((char*)buf,(char*)buf+15); send(std::move(tmp)); logn_info(1,"分站同步:ip=%s,time=%d-%02d-%02d %02d:%02d:%02d.%03d",m_name.c_str(),buf[12]+2000,buf[11],buf[9],buf[8],buf[7],buf[6],buf[5]*256+buf[4]); } void on_send_timeout() { m_ic.on_send_timeout(shared_from_this()); } void on_recv_timeout() { m_ic.on_recv_timeout(shared_from_this()); logn_warn(1,"socket %s recv timeout.",m_name.c_str()); close_impl(); } std::string name() { return m_name; } int handle() { return m_fd; } void grow_buf(int len) { if(m_size-m_clen>=len) return; int size=m_size; while(m_size-m_clen0) { m_clen+=rc; continue; } if(rc==-2) return 0; if(rc==0) { logn_info(1,"socket %d(%s) close by remote",m_fd,m_name.c_str()); } else if(rc==-1) { logn_errno(1,"hava a error on socket %d(%s)",m_fd,m_name.c_str()); } return -1; } return 0; } int read_clt_cp_buf(char* buf, int nLengthBeforeDecode, int nBufLength) { //先清空数据 memset(m_b, 0, m_size); memcpy(m_b, buf, nBufLength); free(buf); m_clen -= nLengthBeforeDecode; m_clen += nBufLength; return 0; } void close_impl() { m_recv_timer.stop(); fd_io::stop(); // 清除网络下的所有设备 module_device_net::instance()->clear(m_name); logn_info(1,"socket %s closed.",m_name.c_str()); m_ic.on_close(shared_from_this()); } size_t calc_length(uint8_t*b)const { return (b[0]<<8)|b[1]; } int indexOfCode(const char c) { const char * code = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; for (unsigned int i = 0; i < 64; i++) { if (code[i] == c) return i; } return 0; } std::string decodeBase64(std::string input,int& nBufLength) { nBufLength = 0; unsigned char input_char[4]; unsigned char output_char[4]; int output_num = 0; int k = 0; std::string output_str = ""; for (unsigned int i = 0; i < input.size(); i++) { input_char[k] = indexOfCode(input[i]); k++; if (k == 4) { output_num = ((int)input_char[0] << 18) + ((int)input_char[1] << 12) + ((int)input_char[2] << 6) + ((int)input_char[3]); output_char[0] = (unsigned char)((output_num & 0x00FF0000) >> 16); output_char[1] = (unsigned char)((output_num & 0x0000FF00) >> 8); output_char[2] = (unsigned char)(output_num & 0x000000FF); output_char[3] = '\0'; output_str.append((char *)output_char); nBufLength++; k = 0; } } return output_str; } char *base64_decode(char *code,long& ref_length) { //根据base64表,以字符找到对应的十进制数据 int table[] = { 0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,62,0,0,0, 63,52,53,54,55,56,57,58, 59,60,61,0,0,0,0,0,0,0,0, 1,2,3,4,5,6,7,8,9,10,11,12, 13,14,15,16,17,18,19,20,21, 22,23,24,25,0,0,0,0,0,0,26, 27,28,29,30,31,32,33,34,35, 36,37,38,39,40,41,42,43,44, 45,46,47,48,49,50,51 }; long len; long str_len; char *res; int i, j; //计算解码后的字符串长度 len = strlen(code); //判断编码后的字符串后是否有= if (strstr(code, "==")) str_len = len / 4 * 3 - 2; else if (strstr(code, "=")) str_len = len / 4 * 3 - 1; else str_len = len / 4 * 3; res = (char*)malloc(sizeof(char)*str_len + 1); res[str_len] = '\0'; //以4个字符为一位进行解码 for (i = 0, j = 0; i < len - 2; j += 3, i += 4) { res[j] = ((unsigned char)table[code[i]]) << 2 | (((unsigned char)table[code[i + 1]]) >> 4); //取出第一个字符对应base64表的十进制数的前6位与第二个字符对应base64表的十进制数的后2位进行组合 res[j + 1] = (((unsigned char)table[code[i + 1]]) << 4) | (((unsigned char)table[code[i + 2]]) >> 2); //取出第二个字符对应base64表的十进制数的后4位与第三个字符对应bas464表的十进制数的后4位进行组合 res[j + 2] = (((unsigned char)table[code[i + 2]]) << 6) | ((unsigned char)table[code[i + 3]]); //取出第三个字符对应base64表的十进制数的后2位与第4个字符进行组合 } ref_length = str_len; return res; } int io_read() { if(read_clt()<0) return -1; try { bool bLoRa = false; // 若以POST开头,则是LoRa的数据包 std::string txtPOST = "POST"; if (0 == strncmp(m_b, txtPOST.c_str(), txtPOST.length())) { log_info("LoRa:%s", m_b); std::string LoRaMsg = m_b; //为LoRa基站推送数据 std::string txtType = "\"type\":7001"; if (LoRaMsg.find(txtType.c_str()) != std::string::npos) { log_info("LoRaType"); std::string txtData = "\"data\":\""; int nPos = LoRaMsg.find(txtData.c_str()); if (nPos != std::string::npos) { std::string strDataTmp = LoRaMsg.substr(nPos + txtData.length()); std::string strData = strDataTmp.substr(0, strDataTmp.find("\"")); log_info("LoRaData:%s",strData.c_str()); long ref_length = 0; char * szBase64Decode = base64_decode((char*)strData.c_str(), ref_length); log_info("LoRaDataDecode:%s,Length=%d", szBase64Decode, ref_length); log_info("LoRaDataDecode 1=%d,2=%d", (unsigned char)(szBase64Decode[2]), (unsigned char)(szBase64Decode[3])); if ((unsigned char)(szBase64Decode[2]) == 0x90 && (unsigned char)(szBase64Decode[3]) == 0x1c) { bLoRa = true; log_info("LoRaDataDecode OK"); } if (read_clt_cp_buf(szBase64Decode, LoRaMsg.length(), ref_length) < 0) { return -1; } if(bLoRa == true) { } else { return -1; } } } } int msg_len; for(;m_clen>=2;) { msg_len=calc_length((uint8_t*)m_b)+2; /*if (bLoRa) { logn_bin(1, name().c_str(), m_b, msg_len);//输出二进制日志 }*/ if(msg_len>m_max_package_size) { log_info("zmg516"); logn_error(1,"package too big:%d/%d,close socket. site=%s.",msg_len,m_max_package_size,m_name.c_str()); return -1; } if(msg_len<=2) { log_info("zmg515"); logn_error(1,"package too small:%d,close socket. site=%s.",msg_len,m_name.c_str()); return -1; } if (m_clen < msg_len) { log_info("LoRaDataDecode:msg_len=%d,m_clen=%d", msg_len, m_clen); log_info("zmg511"); break; } if(check_crc(m_b,msg_len)) { log_info("zmg512"); on_message(m_b,msg_len); logn_bin(1,name().c_str(),m_b,msg_len);//输出二进制日志 } else { log_info("zmg513"); logn_bin(1,name().c_str(),m_b,msg_len);//输出二进制日志 logn_error(1,"check_crc_error,close socket. site=%s.",m_name.c_str()); return -1; } log_info("zmg514"); memmove(m_b,&m_b[msg_len],m_clen-msg_len); m_clen-=msg_len; } } catch(const std::exception&e) { logn_error(1,"package error,close socket. site=%s,err_info=%s",m_name.c_str(),e.what()); return -1; } return 0; } int io_write() { if(! m_can_write) { set(EV_READ|EV_WRITE); return 0; } for(;;) { if(m_obuf.size()==m_opos) { std::unique_lock _lock(m_mutex); if(m_olist.empty()) break; m_obuf.swap(m_olist.front()); m_olist.pop_front(); m_opos=0; } int left=m_obuf.size()-m_opos; int rc=zio::write(m_fd,&*m_obuf.begin()+m_opos,left); if(rc>0) { m_opos+=rc; if(m_opos==m_obuf.size()) continue; } else if(rc==0||rc==-2)//缓冲区满 { m_can_write=false; set(EV_READ|EV_WRITE); break; } else { logn_errno(1,"zio::write(%d,ptr,%d)",m_fd,m_obuf.size()-m_opos); return -1; } } if(m_olist.empty() && m_can_write) { set(EV_READ); } return 0; } void operator()(ev::io &w, int flag) { if(flag & EV_WRITE) { logn_debug(1,"socket %d(%s) can write,flag=%d." ,m_fd,m_name.c_str(),flag); m_can_write=true; if(io_write()<0) { close_impl(); return; } } if(flag & EV_READ) { if(io_read()<0) { close_impl(); return; } m_recv_timer.start(m_recv_time_out); } } bool check_crc(void *b0,size_t mlen) { uint8_t*b=(uint8_t*)b0; uint16_t crc=b[mlen-2]; crc<<=8; crc|=b[mlen-1]; uint16_t ccrc=do_crc((unsigned char*)b+2,mlen-4); return ccrc==crc; } void on_message(const char*m,int mlen) { m_ic.on_message(shared_from_this(),m,mlen); } void on_notify() { if(m_close_flag) { close_impl(); return; } io_write(); } }; struct sock_listen: fd_io { sock_listen(io_context&ic,int fd):fd_io(ic,fd){} int recv_time_out=config.get("service.recv_timeout",3); int max_package_size=config.get("service.max_package",2048); void operator()(ev::io &w, int) { char name[32]; int fd=zio::accept(m_fd,name); if(fd<0) { logn_errno(1,"socket %s connect error.",name); return; } zio::setiobuf(fd,32<<10,32<<10); m_ic.on_connect(std::make_shared(m_ic,name,fd,recv_time_out,max_package_size)); logn_info(1,"socket %s connected.",name); } }; struct signal_w:ev::sig { io_context&m_ic; signal_w(io_context&ic, int s) :ev::sig(ic) ,m_ic(ic) { this->set(this); this->set(s); this->start(); } void operator()(ev::sig &w, int s) { log_info("recved signal %d",s); worker::instance()->stop(); stop(); m_ic.stop(); } }; struct main_loop:io_context { main_loop(service_callback&sc) :io_context(sc) { } virtual int run(int port) { int fd=zio::listen_on(port); if(fd<0) { log_errno("try listen_on %d", port); std_errno("listen %d failed", port); return -1; } sock_listen _1(*this,fd); block_sig(SIGPIPE); signal_w sint(*this,SIGINT),term(*this,SIGTERM); while(!check_stop_flag()) { try { ev::dynamic_loop::run(0); } catch(const std::exception&e) { log_error("捕获到异常:%s",e.what()); } catch(...) { log_error("捕获到未知异常"); } } _1.stop(); close_all(); return 0; } void block_sig(int sig) { sigset_t signal_mask; sigemptyset(&signal_mask); sigaddset(&signal_mask, sig); if(pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) == -1) { log_errno("block signal:%d",sig); } } }; service_handle*service_handle::instance(service_callback*sc) { static main_loop _impl(*sc); sc->set_handle(&_impl); return &_impl; }