sio_client_impl.cpp 19 KB


  1. //
  2. // sio_client_impl.cpp
  3. // SioChatDemo
  4. //
  5. // Created by Melo Yao on 4/3/15.
  6. // Copyright (c) 2015 Melo Yao. All rights reserved.
  7. //
  8. #include "sio_client_impl.h"
  9. #include <sstream>
  10. #include <boost/date_time/posix_time/posix_time.hpp>
  11. #include <mutex>
  12. #include <cmath>
  13. // Comment this out to disable handshake logging to stdout
  14. #if DEBUG || _DEBUG
  15. #define LOG(x) std::cout << x
  16. #else
  17. #define LOG(x)
  18. #endif
  19. using boost::posix_time::milliseconds;
  20. using namespace std;
  21. namespace sio
  22. {
  23. /*************************public:*************************/
  24. client_impl::client_impl() :
  25. m_ping_interval(0),
  26. m_ping_timeout(0),
  27. m_network_thread(),
  28. m_con_state(con_closed),
  29. m_reconn_delay(5000),
  30. m_reconn_delay_max(25000),
  31. m_reconn_attempts(0xFFFFFFFF),
  32. m_reconn_made(0)
  33. {
  34. using websocketpp::log::alevel;
  35. #ifndef DEBUG
  36. m_client.clear_access_channels(alevel::all);
  37. m_client.set_access_channels(alevel::connect|alevel::disconnect|alevel::app);
  38. #endif
  39. // Initialize the Asio transport policy
  40. m_client.init_asio();
  41. // Bind the clients we are using
  42. using websocketpp::lib::placeholders::_1;
  43. using websocketpp::lib::placeholders::_2;
  44. m_client.set_open_handler(lib::bind(&client_impl::on_open,this,_1));
  45. m_client.set_close_handler(lib::bind(&client_impl::on_close,this,_1));
  46. m_client.set_fail_handler(lib::bind(&client_impl::on_fail,this,_1));
  47. m_client.set_message_handler(lib::bind(&client_impl::on_message,this,_1,_2));
  48. #if SIO_TLS
  49. m_client.set_tls_init_handler(lib::bind(&client_impl::on_tls_init,this,_1));
  50. #endif
  51. m_packet_mgr.set_decode_callback(lib::bind(&client_impl::on_decode,this,_1));
  52. m_packet_mgr.set_encode_callback(lib::bind(&client_impl::on_encode,this,_1,_2));
  53. }
  54. client_impl::~client_impl()
  55. {
  56. this->sockets_invoke_void(&sio::socket::on_close);
  57. sync_close();
  58. }
  59. void client_impl::connect(const string& uri, const map<string,string>& query, const map<string, string>& headers)
  60. {
  61. if(m_reconn_timer)
  62. {
  63. m_reconn_timer->cancel();
  64. m_reconn_timer.reset();
  65. }
  66. if(m_network_thread)
  67. {
  68. if(m_con_state == con_closing||m_con_state == con_closed)
  69. {
  70. //if client is closing, join to wait.
  71. //if client is closed, still need to join,
  72. //but in closed case,join will return immediately.
  73. m_network_thread->join();
  74. m_network_thread.reset();//defensive
  75. }
  76. else
  77. {
  78. //if we are connected, do nothing.
  79. return;
  80. }
  81. }
  82. m_con_state = con_opening;
  83. m_base_url = uri;
  84. m_reconn_made = 0;
  85. string query_str;
  86. for(map<string,string>::const_iterator it=query.begin();it!=query.end();++it){
  87. query_str.append("&");
  88. query_str.append(it->first);
  89. query_str.append("=");
  90. string query_str_value=encode_query_string(it->second);
  91. query_str.append(query_str_value);
  92. }
  93. m_query_string=move(query_str);
  94. m_http_headers = headers;
  95. this->reset_states();
  96. m_client.get_io_service().dispatch(lib::bind(&client_impl::connect_impl,this,uri,m_query_string));
  97. m_network_thread.reset(new thread(lib::bind(&client_impl::run_loop,this)));//uri lifecycle?
  98. }
  99. socket::ptr const& client_impl::socket(string const& nsp)
  100. {
  101. lock_guard<mutex> guard(m_socket_mutex);
  102. string aux;
  103. if(nsp == "")
  104. {
  105. aux = "/";
  106. }
  107. else if( nsp[0] != '/')
  108. {
  109. aux.append("/",1);
  110. aux.append(nsp);
  111. }
  112. else
  113. {
  114. aux = nsp;
  115. }
  116. auto it = m_sockets.find(aux);
  117. if(it!= m_sockets.end())
  118. {
  119. return it->second;
  120. }
  121. else
  122. {
  123. pair<const string, socket::ptr> p(aux,shared_ptr<sio::socket>(new sio::socket(this,aux)));
  124. return (m_sockets.insert(p).first)->second;
  125. }
  126. }
  127. void client_impl::close()
  128. {
  129. m_con_state = con_closing;
  130. this->sockets_invoke_void(&sio::socket::close);
  131. m_client.get_io_service().dispatch(lib::bind(&client_impl::close_impl, this,close::status::normal,"End by user"));
  132. }
  133. void client_impl::sync_close()
  134. {
  135. m_con_state = con_closing;
  136. this->sockets_invoke_void(&sio::socket::close);
  137. m_client.get_io_service().dispatch(lib::bind(&client_impl::close_impl, this,close::status::normal,"End by user"));
  138. if(m_network_thread)
  139. {
  140. m_network_thread->join();
  141. m_network_thread.reset();
  142. }
  143. }
  144. /*************************protected:*************************/
  145. void client_impl::send(packet& p)
  146. {
  147. m_packet_mgr.encode(p);
  148. }
  149. void client_impl::remove_socket(string const& nsp)
  150. {
  151. lock_guard<mutex> guard(m_socket_mutex);
  152. auto it = m_sockets.find(nsp);
  153. if(it!= m_sockets.end())
  154. {
  155. m_sockets.erase(it);
  156. }
  157. }
  158. boost::asio::io_service& client_impl::get_io_service()
  159. {
  160. return m_client.get_io_service();
  161. }
  162. void client_impl::on_socket_closed(string const& nsp)
  163. {
  164. if(m_socket_close_listener)m_socket_close_listener(nsp);
  165. }
  166. void client_impl::on_socket_opened(string const& nsp)
  167. {
  168. if(m_socket_open_listener)m_socket_open_listener(nsp);
  169. }
  170. /*************************private:*************************/
  171. void client_impl::run_loop()
  172. {
  173. m_client.run();
  174. m_client.reset();
  175. m_client.get_alog().write(websocketpp::log::alevel::devel,
  176. "run loop end");
  177. }
  178. void client_impl::connect_impl(const string& uri, const string& queryString)
  179. {
  180. do{
  181. websocketpp::uri uo(uri);
  182. ostringstream ss;
  183. #if SIO_TLS
  184. ss<<"wss://";
  185. #else
  186. ss<<"ws://";
  187. #endif
  188. const std::string host(uo.get_host());
  189. // As per RFC2732, literal IPv6 address should be enclosed in "[" and "]".
  190. if(host.find(':')!=std::string::npos){
  191. ss<<"["<<uo.get_host()<<"]";
  192. } else {
  193. ss<<uo.get_host();
  194. }
  195. ss<<":"<<uo.get_port()<<"/socket.io/?EIO=4&transport=websocket";
  196. if(m_sid.size()>0){
  197. ss<<"&sid="<<m_sid;
  198. }
  199. ss<<"&t="<<time(NULL)<<queryString;
  200. lib::error_code ec;
  201. client_type::connection_ptr con = m_client.get_connection(ss.str(), ec);
  202. if (ec) {
  203. m_client.get_alog().write(websocketpp::log::alevel::app,
  204. "Get Connection Error: "+ec.message());
  205. break;
  206. }
  207. for( auto&& header: m_http_headers ) {
  208. con->replace_header(header.first, header.second);
  209. }
  210. m_client.connect(con);
  211. return;
  212. }
  213. while(0);
  214. if(m_fail_listener)
  215. {
  216. m_fail_listener();
  217. }
  218. }
  219. void client_impl::close_impl(close::status::value const& code,string const& reason)
  220. {
  221. LOG("Close by reason:"<<reason << endl);
  222. if(m_reconn_timer)
  223. {
  224. m_reconn_timer->cancel();
  225. m_reconn_timer.reset();
  226. }
  227. if (m_con.expired())
  228. {
  229. cerr << "Error: No active session" << endl;
  230. }
  231. else
  232. {
  233. lib::error_code ec;
  234. m_client.close(m_con, code, reason, ec);
  235. }
  236. }
  237. void client_impl::send_impl(shared_ptr<const string> const& payload_ptr,frame::opcode::value opcode)
  238. {
  239. if(m_con_state == con_opened)
  240. {
  241. lib::error_code ec;
  242. m_client.send(m_con,*payload_ptr,opcode,ec);
  243. if(ec)
  244. {
  245. cerr<<"Send failed,reason:"<< ec.message()<<endl;
  246. }
  247. }
  248. }
  249. void client_impl::ping(const boost::system::error_code& ec)
  250. {
  251. if(ec || m_con.expired())
  252. {
  253. if ( ec != boost::asio::error::operation_aborted )
  254. //LOG("ping exit,con is expired?"<<m_con.expired()<<",ec:"<<ec.message()<<endl){};
  255. LOG( "ping exit,con is expired?" << m_con.expired() << ",ec:" << ec.message() << endl );
  256. return;
  257. }
  258. packet p(packet::frame_ping);
  259. m_packet_mgr.encode(p, [&](bool /*isBin*/,shared_ptr<const string> payload)
  260. {
  261. lib::error_code ec;
  262. this->m_client.send(this->m_con, *payload, frame::opcode::text, ec);
  263. });
  264. if(m_ping_timer)
  265. {
  266. boost::system::error_code e_code;
  267. m_ping_timer->expires_from_now(milliseconds(m_ping_interval), e_code);
  268. m_ping_timer->async_wait(lib::bind(&client_impl::ping,this,lib::placeholders::_1));
  269. }
  270. if(!m_ping_timeout_timer)
  271. {
  272. m_ping_timeout_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service()));
  273. boost::system::error_code timeout_ec;
  274. m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout), timeout_ec);
  275. m_ping_timeout_timer->async_wait(lib::bind(&client_impl::timeout_pong, this,lib::placeholders::_1));
  276. }
  277. }
  278. void client_impl::timeout_pong(const boost::system::error_code &ec)
  279. {
  280. if(ec)
  281. {
  282. return;
  283. }
  284. LOG("Pong timeout"<<endl);
  285. m_client.get_io_service().dispatch(lib::bind(&client_impl::close_impl, this,close::status::policy_violation,"Pong timeout"));
  286. }
  287. void client_impl::timeout_reconnect(boost::system::error_code const& ec)
  288. {
  289. if(ec)
  290. {
  291. return;
  292. }
  293. if(m_con_state == con_closed)
  294. {
  295. m_con_state = con_opening;
  296. m_reconn_made++;
  297. this->reset_states();
  298. LOG("Reconnecting..."<<endl);
  299. if(m_reconnecting_listener) m_reconnecting_listener();
  300. m_client.get_io_service().dispatch(lib::bind(&client_impl::connect_impl,this,m_base_url,m_query_string));
  301. }
  302. }
  303. unsigned client_impl::next_delay() const
  304. {
  305. //no jitter, fixed power root.
  306. unsigned reconn_made = min<unsigned>(m_reconn_made,32);//protect the pow result to be too big.
  307. return static_cast<unsigned>(min<double>(m_reconn_delay * pow(1.5,reconn_made),m_reconn_delay_max));
  308. }
  309. socket::ptr client_impl::get_socket_locked(string const& nsp)
  310. {
  311. lock_guard<mutex> guard(m_socket_mutex);
  312. auto it = m_sockets.find(nsp);
  313. if(it != m_sockets.end())
  314. {
  315. return it->second;
  316. }
  317. else
  318. {
  319. return socket::ptr();
  320. }
  321. }
  322. void client_impl::sockets_invoke_void(void (sio::socket::*fn)(void))
  323. {
  324. map<const string,socket::ptr> socks;
  325. {
  326. lock_guard<mutex> guard(m_socket_mutex);
  327. socks.insert(m_sockets.begin(),m_sockets.end());
  328. }
  329. for (auto it = socks.begin(); it!=socks.end(); ++it) {
  330. ((*(it->second)).*fn)();
  331. }
  332. }
  333. void client_impl::on_fail(connection_hdl)
  334. {
  335. m_con.reset();
  336. m_con_state = con_closed;
  337. this->sockets_invoke_void(&sio::socket::on_disconnect);
  338. LOG("Connection failed." << endl);
  339. if(m_reconn_made<m_reconn_attempts)
  340. {
  341. LOG("Reconnect for attempt:"<<m_reconn_made<<endl);
  342. unsigned delay = this->next_delay();
  343. if(m_reconnect_listener) m_reconnect_listener(m_reconn_made,delay);
  344. m_reconn_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service()));
  345. boost::system::error_code ec;
  346. m_reconn_timer->expires_from_now(milliseconds(delay), ec);
  347. m_reconn_timer->async_wait(lib::bind(&client_impl::timeout_reconnect,this,lib::placeholders::_1));
  348. }
  349. else
  350. {
  351. if(m_fail_listener)m_fail_listener();
  352. }
  353. }
  354. void client_impl::on_open(connection_hdl con)
  355. {
  356. LOG("Connected." << endl);
  357. m_con_state = con_opened;
  358. m_con = con;
  359. m_reconn_made = 0;
  360. this->sockets_invoke_void(&sio::socket::on_open);
  361. this->socket("");
  362. if(m_open_listener)m_open_listener();
  363. }
  364. void client_impl::on_close(connection_hdl con)
  365. {
  366. LOG("Client Disconnected." << endl);
  367. con_state m_con_state_was = m_con_state;
  368. m_con_state = con_closed;
  369. lib::error_code ec;
  370. close::status::value code = close::status::normal;
  371. client_type::connection_ptr conn_ptr = m_client.get_con_from_hdl(con, ec);
  372. if (ec) {
  373. LOG("OnClose get conn failed"<<ec<<endl);
  374. }
  375. else
  376. {
  377. code = conn_ptr->get_local_close_code();
  378. }
  379. m_con.reset();
  380. this->clear_timers();
  381. client::close_reason reason;
  382. // If we initiated the close, no matter what the close status was,
  383. // we'll consider it a normal close. (When using TLS, we can
  384. // sometimes get a TLS Short Read error when closing.)
  385. if(code == close::status::normal || m_con_state_was == con_closing)
  386. {
  387. this->sockets_invoke_void(&sio::socket::on_disconnect);
  388. reason = client::close_reason_normal;
  389. }
  390. else
  391. {
  392. this->sockets_invoke_void(&sio::socket::on_disconnect);
  393. if(m_reconn_made<m_reconn_attempts)
  394. {
  395. LOG("Reconnect for attempt:"<<m_reconn_made<<endl);
  396. unsigned delay = this->next_delay();
  397. if(m_reconnect_listener) m_reconnect_listener(m_reconn_made,delay);
  398. m_reconn_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service()));
  399. boost::system::error_code ec;
  400. m_reconn_timer->expires_from_now(milliseconds(delay), ec);
  401. m_reconn_timer->async_wait(lib::bind(&client_impl::timeout_reconnect,this,lib::placeholders::_1));
  402. return;
  403. }
  404. reason = client::close_reason_drop;
  405. }
  406. if(m_close_listener)
  407. {
  408. m_close_listener(reason);
  409. }
  410. }
  411. void client_impl::on_message(connection_hdl, client_type::message_ptr msg)
  412. {
  413. if (m_ping_timeout_timer) {
  414. boost::system::error_code ec;
  415. m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout),ec);
  416. m_ping_timeout_timer->async_wait(lib::bind(&client_impl::timeout_pong, this,lib::placeholders::_1));
  417. }
  418. // Parse the incoming message according to socket.IO rules
  419. m_packet_mgr.put_payload(msg->get_payload());
  420. }
  421. void client_impl::on_handshake(message::ptr const& message)
  422. {
  423. if(message && message->get_flag() == message::flag_object)
  424. {
  425. const object_message* obj_ptr =static_cast<object_message*>(message.get());
  426. const map<string,message::ptr>* values = &(obj_ptr->get_map());
  427. auto it = values->find("sid");
  428. if (it!= values->end()) {
  429. m_sid = static_pointer_cast<string_message>(it->second)->get_string();
  430. }
  431. else
  432. {
  433. goto failed;
  434. }
  435. it = values->find("pingInterval");
  436. if (it!= values->end()&&it->second->get_flag() == message::flag_integer) {
  437. m_ping_interval = (unsigned)static_pointer_cast<int_message>(it->second)->get_int();
  438. }
  439. else
  440. {
  441. m_ping_interval = 25000;
  442. }
  443. it = values->find("pingTimeout");
  444. if (it!=values->end()&&it->second->get_flag() == message::flag_integer) {
  445. m_ping_timeout = (unsigned) static_pointer_cast<int_message>(it->second)->get_int();
  446. }
  447. else
  448. {
  449. m_ping_timeout = 60000;
  450. }
  451. m_ping_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service()));
  452. boost::system::error_code ec;
  453. m_ping_timer->expires_from_now(milliseconds(m_ping_interval), ec);
  454. //if(ec)LOG("ec:"<<ec.message()<<endl){};
  455. if(ec)LOG("ec:"<<ec.message()<<endl);
  456. m_ping_timer->async_wait(lib::bind(&client_impl::ping,this,lib::placeholders::_1));
  457. LOG("On handshake,sid:"<<m_sid<<",ping interval:"<<m_ping_interval<<",ping timeout"<<m_ping_timeout<<endl);
  458. return;
  459. }
  460. failed:
  461. //just close it.
  462. m_client.get_io_service().dispatch(lib::bind(&client_impl::close_impl, this,close::status::policy_violation,"Handshake error"));
  463. }
  464. void client_impl::on_pong()
  465. {
  466. if(m_ping_timeout_timer)
  467. {
  468. m_ping_timeout_timer->cancel();
  469. m_ping_timeout_timer.reset();
  470. }
  471. }
  472. void client_impl::on_decode(packet const& p)
  473. {
  474. switch(p.get_frame())
  475. {
  476. case packet::frame_message:
  477. {
  478. socket::ptr so_ptr = get_socket_locked(p.get_nsp());
  479. if(so_ptr)so_ptr->on_message_packet(p);
  480. break;
  481. }
  482. case packet::frame_open:
  483. this->on_handshake(p.get_message());
  484. break;
  485. case packet::frame_close:
  486. //FIXME how to deal?
  487. this->close_impl(close::status::abnormal_close, "End by server");
  488. break;
  489. case packet::frame_pong:
  490. this->on_pong();
  491. break;
  492. default:
  493. break;
  494. }
  495. }
  496. void client_impl::on_encode(bool isBinary,shared_ptr<const string> const& payload)
  497. {
  498. LOG("encoded payload length:"<<payload->length()<<endl);
  499. m_client.get_io_service().dispatch(lib::bind(&client_impl::send_impl,this,payload,isBinary?frame::opcode::binary:frame::opcode::text));
  500. }
  501. void client_impl::clear_timers()
  502. {
  503. LOG("clear timers"<<endl);
  504. boost::system::error_code ec;
  505. if(m_ping_timeout_timer)
  506. {
  507. m_ping_timeout_timer->cancel(ec);
  508. m_ping_timeout_timer.reset();
  509. }
  510. if(m_ping_timer)
  511. {
  512. m_ping_timer->cancel(ec);
  513. m_ping_timer.reset();
  514. }
  515. }
  516. void client_impl::reset_states()
  517. {
  518. m_client.reset();
  519. m_sid.clear();
  520. m_packet_mgr.reset();
  521. }
  522. #if SIO_TLS
  523. client_impl::context_ptr client_impl::on_tls_init(connection_hdl conn)
  524. {
  525. context_ptr ctx = context_ptr(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
  526. boost::system::error_code ec;
  527. ctx->set_options(boost::asio::ssl::context::default_workarounds |
  528. boost::asio::ssl::context::no_sslv2 |
  529. boost::asio::ssl::context::single_dh_use,ec);
  530. if(ec)
  531. {
  532. cerr<<"Init tls failed,reason:"<< ec.message()<<endl;
  533. }
  534. return ctx;
  535. }
  536. #endif
  537. std::string client_impl::encode_query_string(const std::string &query){
  538. ostringstream ss;
  539. ss << std::hex;
  540. // Percent-encode (RFC3986) non-alphanumeric characters.
  541. for(const char c : query){
  542. if((c >= 'a' && c <= 'z') || (c>= 'A' && c<= 'Z') || (c >= '0' && c<= '9')){
  543. ss << c;
  544. } else {
  545. ss << '%' << std::uppercase << std::setw(2) << int((unsigned char) c) << std::nouppercase;
  546. }
  547. }
  548. ss << std::dec;
  549. return ss.str();
  550. }
  551. }