#include "wsClient.h" #include "constdef.h" #include #include #include #include "log.h" #include "common_tool.h" namespace sys { const int _LOGIN_SLEEP_TIME_ = 500;//登录前的等待时间(单位:毫秒) const int _OPEN_SOCKET_WAIT_TIME_ = 100;//等待打开socket时间(单位:毫秒) wsClient::wsClient() { __ID = 0; _reset(); } wsClient::~wsClient() { __wsclient.sync_close(); } void wsClient::_reset() { __Connected = false; __SktOpened = false; __Logined = false; } static void beatheart(int, std::string const&, sio::message::ptr const&, bool, sio::message::list & ) { } std::shared_ptr wsClient::clone() { std::shared_ptr ret=std::make_shared(); ret->__ID = __ID; ret->__uri = __uri; ret->__MsgFuncList = __MsgFuncList; return ret; } void wsClient::init( int ID, const std::string & uri, const std::map& MsgFuncList ) { __ID = ID; __uri = uri; __MsgFuncList = MsgFuncList; //MsgFuncList.insert( std::make_pair( "beatheart", &web_connect::_beatheart_callback ) ); __MsgFuncList.insert(std::make_pair("beatheart",&beatheart)); } std::string wsClient::get_uri() { return __uri; } int wsClient::connect( int time_out ) { if ( __uri.empty() ) { __LastError = "Error, uri is empty."; return -1; } { __wsclient.set_reconnect_attempts( 0 ); using std::placeholders::_1; using std::placeholders::_2; using std::placeholders::_3; using std::placeholders::_4; sio::socket::ptr sock = __wsclient.socket(); __recv_ping_time = 0; __connet_time = 0; __wsclient.set_open_listener( std::bind( &wsClient::_on_connected, this ) ); __wsclient.set_close_listener( std::bind( &wsClient::_on_close, this, std::placeholders::_1 ) ); __wsclient.set_reconnect_listener( std::bind( &wsClient::_on_reconnect, this, std::placeholders::_1, std::placeholders::_2 ) ); __wsclient.set_fail_listener( std::bind( &wsClient::_on_fail, this ) ); __wsclient.set_socket_open_listener( std::bind( &wsClient::_on_socket_opened, this ) ); sock->on( JSON_CMD_VALUE_CALL, sio::socket::event_listener_aux( std::bind( &wsClient::_OnCallMessage, this, _1, _2, _3, _4 ) ) ); sock->on( "code", sio::socket::event_listener_aux( std::bind( &wsClient::_OnLoginResponse, this, _1, _2, _3, _4 ) ) ); } __wsclient.connect( __uri ); std::cv_status status = std::cv_status::timeout; int nRet = -1; __lock.lock(); if ( !IsConnected() ) { status = __cond.wait_for( __lock, std::chrono::seconds( time_out ) ); if ( std::cv_status::timeout == status ) { __LastError = "Failed to connect server."; nRet = -1; } else { unsigned int cur_time = time(0); __connet_time = cur_time; __recv_ping_time = cur_time; nRet = 0; } } else { nRet = 0; } __lock.unlock(); return nRet; } void wsClient::login() { if ( !IsConnected() ) { __LastError = "please connect server before login!"; return; } //std::cv_status status = std::cv_status::timeout; __lock.lock(); if ( !IsSktOpened() ) { // status = __cond.wait_for( __lock, std::chrono::milliseconds( _OPEN_SOCKET_WAIT_TIME_ ) ); } __lock.unlock(); if ( !IsSktOpened() ) { __LastError = "socket is not opened before login!"; return; } std::this_thread::sleep_for( std::chrono::milliseconds( _LOGIN_SLEEP_TIME_ ) ); sys::_JS_LOGIN_ Login; Login.user_name = JSON_VALUE_USERNAME; Login.user_password = JSON_VALUE_PASSWORD; std::string strLogin = __jsBuilder.BuildLogin( Login ); log_info("send2web: cmd=%s, data=%s", JSON_CMD_VALUE_USER, strLogin.c_str()); strLogin += "\n"; sio::socket::ptr skt_ptr; skt_ptr = __wsclient.socket(); skt_ptr->emit( JSON_CMD_VALUE_USER, strLogin, [this]( sio::message::list const& msglist ) { int nRet = 0; char szError[512] = { 0 }; sio::message::ptr msg_ptr = msglist[0]; nRet = ( int ) msg_ptr->get_map()["code"]->get_int(); if ( 0 == nRet ) { __Logined = true; } else { __Logined = false; sprintf( szError, "Login failed,code=%d", nRet ); __LastError = szError; } } ); } void wsClient::_on_connected() { __lock.lock(); __cond.notify_all(); __Connected = true; __lock.unlock(); } void wsClient::send( const std::string & Cmd, const std::string & Data ) { std::lock_guard lock( __send_lock ); sio::socket::ptr skt_ptr; skt_ptr = __wsclient.socket(); skt_ptr->emit( Cmd, Data ); } void wsClient::_on_close( sio::client::close_reason const & reason ) { log_info("websocket %d close()",__ID); _reset(); // 启动重连线程,尝试每隔10s连接一次,连接成功就事件阻塞 //reconnect(); } void wsClient::_on_reconnect( unsigned p1, unsigned p2 ) { log_info("websocket %d reconnect()",__ID); _reset(); } void wsClient::_on_fail() { _reset(); } void wsClient::_on_socket_opened() { __SktOpened = true; } void wsClient::_OnCallMessage( std::string const& name, sio::message::ptr const& data, bool need_ack, sio::message::list &ack_resp ) { // 接收web传送的json if(data->to_string() == ""){ log_info("web-message: recv web send message is null!"); return; } log_info("web-message:%s", data->to_string().c_str()); if ( data->get_flag() == sio::message::flag_object ) { std::string cmd = data->get_map()[JSON_ROOT_KEY_CMD]->get_string(); std::map::iterator mit_func; mit_func = __MsgFuncList.find( cmd ); if ( mit_func != __MsgFuncList.end() ) { try { mit_func->second( GetID(), name, data, need_ack, ack_resp ); } catch(const std::exception&e) { log_error("捕获到异常:%s",e.what()); } catch(...) { log_error("捕获到未知异常"); } } else { log_error("web-message没有找到对应的处理函数:%s",data->to_string().c_str()); } __recv_ping_time = time(0); }else if(data->get_flag() == sio::message::flag_string){ rapidjson::Document doc; int len = data->to_string().length(); std::string str = data->to_string().substr(1, len-2); //log_info("web-message: begin parse..."); doc.Parse(str.c_str()); //log_info("web-message: end parse..."); if(!doc.HasParseError()){ std::string cmd = doc[JSON_ROOT_KEY_CMD].GetString(); auto mit_func = __MsgFuncList.find(cmd); log_info("web-message: cmd=%s, type=%d, msg=%s", cmd.c_str(), data->get_flag(),data->to_string().c_str()); if(mit_func != __MsgFuncList.end()){ sio::message::ptr msg = to_sio_message(str); mit_func->second(GetID(), name, msg, need_ack, ack_resp); /*sio::message::ptr msg; if(cmd == "call_card_req"){ msg = call_to_sio_message(str); }else{ msg = to_sio_message(str); } mit_func->second(GetID(), name, msg, need_ack, ack_resp);*/ }else{ log_info("web-message: not found cmd, MsgFuncList.size()=%d", __MsgFuncList.size()); for(auto it = __MsgFuncList.begin(); it != __MsgFuncList.end(); ++it){ log_info("cmd: %s", it->first.c_str()); } } }else{ log_info("web-message: parse error!"); } } } void wsClient::_OnLoginResponse( std::string const & name, sio::message::ptr const & data, bool need_ack, sio::message::list & ack_resp ) { char szError[512] = { 0 }; int res_code = ( int ) data->get_map()["code"]->get_int(); switch ( res_code ) { case -1: { __Logined = false; sprintf( szError, "Login failed,code=%d", res_code ); __LastError = szError; break; } case 0: { __Logined = true; break; } default: break; } } int wsClient::GetID() { return __ID; } bool wsClient::IsConnected() { return __Connected; } bool wsClient::IsSktOpened() { return __SktOpened; } bool wsClient::IsLogined() { return __Logined; } void wsClient::close() { __wsclient.sync_close(); // __wsclient.close(); log_info("[wsclient] %d close, reason=%s", __ID, GetLastError().c_str()); } void wsClient::reconnect() { __wsclient.sync_close(); while(!IsConnected()){ try{ if(connect() < 0){ log_info("[wsclient] error=%s", GetLastError().c_str()); std::this_thread::sleep_for(std::chrono::seconds(3)); continue; } login(); std::this_thread::sleep_for(std::chrono::seconds(10)); }catch(...) { log_info("[wsclient] connect error"); } } log_info("[wsClient] %d reconnect successful.", __ID); } std::string wsClient::GetLastError() { return __LastError; } int wsClient::GetPingTime() const { return __recv_ping_time; } sio::message::ptr wsClient::to_sio_message(const std::string& val) { sio::message::ptr message = sio::object_message::create(); rapidjson::Document doc; doc.Parse(val.c_str()); log_info("[change sio message] val=%s", val.c_str()); for(auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it){ std::string key = it->name.GetString(); //log_info("[change sio message] name=%s", key.c_str()); if(it->value.IsString()){ static_cast(message.get())->get_map()[key.c_str()] = sio::string_message::create(it->value.GetString()); }else if(it->value.IsObject()){ static_cast(message.get())->get_map()[key.c_str()] = from_json(it->value); }else if(it->value.IsArray()){ static_cast(message.get())->get_map()[key.c_str()] = from_array(it->value); } } return message; } sio::message::ptr wsClient::call_to_sio_message(const std::string& val) { sio::message::ptr message = sio::object_message::create(); rapidjson::Document doc; doc.Parse(val.c_str()); if(doc.HasParseError()){ log_info("[change sio message] rapidjson::Document pares error."); return message; } for(auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it){ std::string key = it->name.GetString(); //log_info("[change sio message] key=%s", key.c_str()); if(it->value.IsString()){ static_cast(message.get())->get_map()[key.c_str()] = sio::string_message::create(it->value.GetString()); }else if(it->value.IsObject()){ static_cast(message.get())->get_map()[key.c_str()] = from_json(it->value); }else if(it->value.IsArray()){ static_cast(message.get())->get_map()[key.c_str()] = from_array(it->value); } } return message; } sio::message::ptr wsClient::from_json(const rapidjson::Value& val) { sio::message::ptr message = sio::object_message::create(); log_info("[change sio message] from_json"); for(auto it = val.MemberBegin(); it != val.MemberEnd(); ++it){ if(it->value.IsInt()){ static_cast(message.get())->get_map()[it->name.GetString()] = sio::int_message::create(it->value.GetInt()); }else if(it->value.IsString()){ static_cast(message.get())->get_map()[it->name.GetString()] = sio::string_message::create(it->value.GetString()); }else if(it->value.IsBool()){ static_cast(message.get())->get_map()[it->name.GetString()] = sio::bool_message::create(it->value.GetBool()); }else if(it->value.IsDouble()){ static_cast(message.get())->get_map()[it->name.GetString()] = sio::double_message::create(it->value.GetDouble()); }else if(it->value.IsArray()){ static_cast(message.get())->get_map()[it->name.GetString()] = from_array(it->value); }else if(it->value.IsNull()){ static_cast(message.get())->get_map()[it->name.GetString()] = sio::null_message::create(); }else if(it->value.IsObject()){ static_cast(message.get())->get_map()[it->name.GetString()] = from_json(it->value); } } return message; } sio::message::ptr wsClient::from_array(const rapidjson::Value& val) { sio::message::ptr message = sio::array_message::create(); log_info("[change sio message] from_array: %s", val.GetString()); for(auto it = val.Begin(); it != val.End(); ++it) { if(it->IsInt()){ static_cast(message.get())->get_vector().push_back(sio::int_message::create(it->GetInt())); }else if(it->IsString()){ static_cast(message.get())->get_vector().push_back(sio::string_message::create(it->GetString())); }else if(it->IsBool()){ static_cast(message.get())->get_vector().push_back(sio::bool_message::create(it->GetBool())); }else if(it->IsDouble()){ static_cast(message.get())->get_vector().push_back(sio::double_message::create(it->GetDouble())); }else if(it->IsObject()){ static_cast(message.get())->get_vector().push_back(from_json(*it)); } } return message; } }