1
0

sio_socket.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. #include "sio_socket.h"
  2. #include "internal/sio_packet.h"
  3. #include "internal/sio_client_impl.h"
  4. #include <boost/asio/deadline_timer.hpp>
  5. #include <boost/system/error_code.hpp>
  6. #include <queue>
  7. #include <cstdarg>
  8. #if DEBUG || _DEBUG
  9. #define LOG(x) std::cout << x
  10. #else
  11. #define LOG(x)
  12. #endif
  13. #define NULL_GUARD(_x_) \
  14. if(_x_ == NULL) return
  15. namespace sio
  16. {
  17. class event_adapter
  18. {
  19. public:
  20. static void adapt_func(socket::event_listener_aux const& func, event& event)
  21. {
  22. func(event.get_name(),event.get_message(),event.need_ack(),event.get_ack_message_impl());
  23. }
  24. static inline socket::event_listener do_adapt(socket::event_listener_aux const& func)
  25. {
  26. return std::bind(&event_adapter::adapt_func, func,std::placeholders::_1);
  27. }
  28. static inline event create_event(std::string const& nsp,std::string const& name,message::list&& message,bool need_ack)
  29. {
  30. return event(nsp,name,message,need_ack);
  31. }
  32. };
  33. const std::string& event::get_nsp() const
  34. {
  35. return m_nsp;
  36. }
  37. const std::string& event::get_name() const
  38. {
  39. return m_name;
  40. }
  41. const message::ptr& event::get_message() const
  42. {
  43. if(m_messages.size()>0)
  44. return m_messages[0];
  45. else
  46. {
  47. static message::ptr null_ptr;
  48. return null_ptr;
  49. }
  50. }
  51. const message::list& event::get_messages() const
  52. {
  53. return m_messages;
  54. }
  55. bool event::need_ack() const
  56. {
  57. return m_need_ack;
  58. }
  59. void event::put_ack_message(message::list const& ack_message)
  60. {
  61. if(m_need_ack)
  62. m_ack_message = std::move(ack_message);
  63. }
  64. inline
  65. event::event(std::string const& nsp,std::string const& name,message::list&& messages,bool need_ack):
  66. m_nsp(nsp),
  67. m_name(name),
  68. m_messages(std::move(messages)),
  69. m_need_ack(need_ack)
  70. {
  71. }
  72. inline
  73. event::event(std::string const& nsp,std::string const& name,message::list const& messages,bool need_ack):
  74. m_nsp(nsp),
  75. m_name(name),
  76. m_messages(messages),
  77. m_need_ack(need_ack)
  78. {
  79. }
  80. message::list const& event::get_ack_message() const
  81. {
  82. return m_ack_message;
  83. }
  84. inline
  85. message::list& event::get_ack_message_impl()
  86. {
  87. return m_ack_message;
  88. }
  89. class socket::impl
  90. {
  91. public:
  92. impl(client_impl *,std::string const&);
  93. ~impl();
  94. void on(std::string const& event_name,event_listener_aux const& func);
  95. void on(std::string const& event_name,event_listener const& func);
  96. void off(std::string const& event_name);
  97. void off_all();
  98. #define SYNTHESIS_SETTER(__TYPE__,__FIELD__) \
  99. void set_##__FIELD__(__TYPE__ const& l) \
  100. { m_##__FIELD__ = l;}
  101. SYNTHESIS_SETTER(error_listener, error_listener) //socket io errors
  102. #undef SYNTHESIS_SETTER
  103. void on_error(error_listener const& l);
  104. void off_error();
  105. void close();
  106. void emit(std::string const& name, message::list const& msglist, std::function<void (message::list const&)> const& ack);
  107. std::string const& get_namespace() const {return m_nsp;}
  108. protected:
  109. void on_connected();
  110. void on_close();
  111. void on_open();
  112. void on_message_packet(packet const& packet);
  113. void on_disconnect();
  114. private:
  115. // Message Parsing callbacks.
  116. void on_socketio_event(const std::string& nsp, int msgId,const std::string& name, message::list&& message);
  117. void on_socketio_ack(int msgId, message::list const& message);
  118. void on_socketio_error(message::ptr const& err_message);
  119. event_listener get_bind_listener_locked(string const& event);
  120. void ack(int msgId,string const& name,message::list const& ack_message);
  121. void timeout_connection(const boost::system::error_code &ec);
  122. void send_connect();
  123. void send_packet(packet& p);
  124. static event_listener s_null_event_listener;
  125. static unsigned int s_global_event_id;
  126. sio::client_impl *m_client;
  127. bool m_connected;
  128. std::string m_nsp;
  129. std::map<unsigned int, std::function<void (message::list const&)> > m_acks;
  130. std::map<std::string, event_listener> m_event_binding;
  131. error_listener m_error_listener;
  132. std::unique_ptr<boost::asio::deadline_timer> m_connection_timer;
  133. std::queue<packet> m_packet_queue;
  134. std::mutex m_event_mutex;
  135. std::mutex m_packet_mutex;
  136. friend class socket;
  137. };
  138. void socket::impl::on(std::string const& event_name,event_listener_aux const& func)
  139. {
  140. this->on(event_name,event_adapter::do_adapt(func));
  141. }
  142. void socket::impl::on(std::string const& event_name,event_listener const& func)
  143. {
  144. std::lock_guard<std::mutex> guard(m_event_mutex);
  145. m_event_binding[event_name] = func;
  146. }
  147. void socket::impl::off(std::string const& event_name)
  148. {
  149. std::lock_guard<std::mutex> guard(m_event_mutex);
  150. auto it = m_event_binding.find(event_name);
  151. if(it!=m_event_binding.end())
  152. {
  153. m_event_binding.erase(it);
  154. }
  155. }
  156. void socket::impl::off_all()
  157. {
  158. std::lock_guard<std::mutex> guard(m_event_mutex);
  159. m_event_binding.clear();
  160. }
  161. void socket::impl::on_error(error_listener const& l)
  162. {
  163. m_error_listener = l;
  164. }
  165. void socket::impl::off_error()
  166. {
  167. m_error_listener = nullptr;
  168. }
  169. socket::impl::impl(client_impl *client,std::string const& nsp):
  170. m_client(client),
  171. m_connected(false),
  172. m_nsp(nsp)
  173. {
  174. NULL_GUARD(client);
  175. if(m_client->opened())
  176. {
  177. send_connect();
  178. }
  179. }
  180. socket::impl::~impl()
  181. {
  182. }
  183. unsigned int socket::impl::s_global_event_id = 1;
  184. void socket::impl::emit(std::string const& name, message::list const& msglist, std::function<void (message::list const&)> const& ack)
  185. {
  186. NULL_GUARD(m_client);
  187. message::ptr msg_ptr = msglist.to_array_message(name);
  188. int pack_id;
  189. if(ack)
  190. {
  191. pack_id = s_global_event_id++;
  192. std::lock_guard<std::mutex> guard(m_event_mutex);
  193. m_acks[pack_id] = ack;
  194. }
  195. else
  196. {
  197. pack_id = -1;
  198. }
  199. packet p(m_nsp, msg_ptr,pack_id);
  200. send_packet(p);
  201. }
  202. void socket::impl::send_connect()
  203. {
  204. NULL_GUARD(m_client);
  205. if(m_nsp == "/")
  206. {
  207. return;
  208. }
  209. packet p(packet::type_connect,m_nsp);
  210. m_client->send(p);
  211. m_connection_timer.reset(new boost::asio::deadline_timer(m_client->get_io_service()));
  212. boost::system::error_code ec;
  213. m_connection_timer->expires_from_now(boost::posix_time::milliseconds(20000), ec);
  214. m_connection_timer->async_wait(std::bind(&socket::impl::timeout_connection,this, std::placeholders::_1));
  215. }
  216. void socket::impl::close()
  217. {
  218. NULL_GUARD(m_client);
  219. if(m_connected)
  220. {
  221. packet p(packet::type_disconnect,m_nsp);
  222. send_packet(p);
  223. if(!m_connection_timer)
  224. {
  225. m_connection_timer.reset(new boost::asio::deadline_timer(m_client->get_io_service()));
  226. }
  227. boost::system::error_code ec;
  228. m_connection_timer->expires_from_now(boost::posix_time::milliseconds(3000), ec);
  229. m_connection_timer->async_wait(lib::bind(&socket::impl::on_close, this));
  230. }
  231. }
  232. void socket::impl::on_connected()
  233. {
  234. if(m_connection_timer)
  235. {
  236. m_connection_timer->cancel();
  237. m_connection_timer.reset();
  238. }
  239. if(!m_connected)
  240. {
  241. m_connected = true;
  242. m_client->on_socket_opened(m_nsp);
  243. while (true) {
  244. m_packet_mutex.lock();
  245. if(m_packet_queue.empty())
  246. {
  247. m_packet_mutex.unlock();
  248. return;
  249. }
  250. sio::packet front_pack = std::move(m_packet_queue.front());
  251. m_packet_queue.pop();
  252. m_packet_mutex.unlock();
  253. m_client->send(front_pack);
  254. }
  255. }
  256. }
  257. void socket::impl::on_close()
  258. {
  259. NULL_GUARD(m_client);
  260. sio::client_impl *client = m_client;
  261. m_client = NULL;
  262. if(m_connection_timer)
  263. {
  264. m_connection_timer->cancel();
  265. m_connection_timer.reset();
  266. }
  267. m_connected = false;
  268. {
  269. std::lock_guard<std::mutex> guard(m_packet_mutex);
  270. while (!m_packet_queue.empty()) {
  271. m_packet_queue.pop();
  272. }
  273. }
  274. client->on_socket_closed(m_nsp);
  275. client->remove_socket(m_nsp);
  276. }
  277. void socket::impl::on_open()
  278. {
  279. send_connect();
  280. }
  281. void socket::impl::on_disconnect()
  282. {
  283. NULL_GUARD(m_client);
  284. if(m_connected)
  285. {
  286. m_connected = false;
  287. std::lock_guard<std::mutex> guard(m_packet_mutex);
  288. while (!m_packet_queue.empty()) {
  289. m_packet_queue.pop();
  290. }
  291. }
  292. }
  293. void socket::impl::on_message_packet(packet const& p)
  294. {
  295. NULL_GUARD(m_client);
  296. if(p.get_nsp() == m_nsp)
  297. {
  298. switch (p.get_type())
  299. {
  300. // Connect open
  301. case packet::type_connect:
  302. {
  303. LOG("Received Message type (Connect)"<<std::endl);
  304. this->on_connected();
  305. break;
  306. }
  307. case packet::type_disconnect:
  308. {
  309. LOG("Received Message type (Disconnect)"<<std::endl);
  310. this->on_close();
  311. break;
  312. }
  313. case packet::type_event:
  314. case packet::type_binary_event:
  315. {
  316. LOG("Received Message type (Event)"<<std::endl);
  317. const message::ptr ptr = p.get_message();
  318. if(ptr->get_flag() == message::flag_array)
  319. {
  320. const array_message* array_ptr = static_cast<const array_message*>(ptr.get());
  321. if(array_ptr->get_vector().size() >= 1&&array_ptr->get_vector()[0]->get_flag() == message::flag_string)
  322. {
  323. const string_message* name_ptr = static_cast<const string_message*>(array_ptr->get_vector()[0].get());
  324. message::list mlist;
  325. for(size_t i = 1;i<array_ptr->get_vector().size();++i)
  326. {
  327. mlist.push(array_ptr->get_vector()[i]);
  328. }
  329. this->on_socketio_event(p.get_nsp(), p.get_pack_id(),name_ptr->get_string(), std::move(mlist));
  330. }
  331. }
  332. break;
  333. }
  334. // Ack
  335. case packet::type_ack:
  336. case packet::type_binary_ack:
  337. {
  338. LOG("Received Message type (ACK)"<<std::endl);
  339. const message::ptr ptr = p.get_message();
  340. if(ptr->get_flag() == message::flag_array)
  341. {
  342. message::list msglist(ptr->get_vector());
  343. this->on_socketio_ack(p.get_pack_id(),msglist);
  344. }
  345. else
  346. {
  347. this->on_socketio_ack(p.get_pack_id(),message::list(ptr));
  348. }
  349. break;
  350. }
  351. // Error
  352. case packet::type_error:
  353. {
  354. LOG("Received Message type (ERROR)"<<std::endl);
  355. this->on_socketio_error(p.get_message());
  356. break;
  357. }
  358. default:
  359. break;
  360. }
  361. }
  362. }
  363. void socket::impl::on_socketio_event(const std::string& nsp,int msgId,const std::string& name, message::list && message)
  364. {
  365. bool needAck = msgId >= 0;
  366. event ev = event_adapter::create_event(nsp,name, std::move(message),needAck);
  367. event_listener func = this->get_bind_listener_locked(name);
  368. if(func)func(ev);
  369. if(needAck)
  370. {
  371. this->ack(msgId, name, ev.get_ack_message());
  372. }
  373. }
  374. void socket::impl::ack(int msgId, const string &, const message::list &ack_message)
  375. {
  376. packet p(m_nsp, ack_message.to_array_message(),msgId,true);
  377. send_packet(p);
  378. }
  379. void socket::impl::on_socketio_ack(int msgId, message::list const& message)
  380. {
  381. std::function<void (message::list const&)> l;
  382. {
  383. std::lock_guard<std::mutex> guard(m_event_mutex);
  384. auto it = m_acks.find(msgId);
  385. if(it!=m_acks.end())
  386. {
  387. l = it->second;
  388. m_acks.erase(it);
  389. }
  390. }
  391. if(l)l(message);
  392. }
  393. void socket::impl::on_socketio_error(message::ptr const& err_message)
  394. {
  395. if(m_error_listener)m_error_listener(err_message);
  396. }
  397. void socket::impl::timeout_connection(const boost::system::error_code &ec)
  398. {
  399. NULL_GUARD(m_client);
  400. if(ec)
  401. {
  402. return;
  403. }
  404. m_connection_timer.reset();
  405. LOG("Connection timeout,close socket."<<std::endl);
  406. //Should close socket if no connected message arrive.Otherwise we'll never ask for open again.
  407. this->on_close();
  408. }
  409. void socket::impl::send_packet(sio::packet &p)
  410. {
  411. NULL_GUARD(m_client);
  412. if(m_connected)
  413. {
  414. while (true) {
  415. m_packet_mutex.lock();
  416. if(m_packet_queue.empty())
  417. {
  418. m_packet_mutex.unlock();
  419. break;
  420. }
  421. sio::packet front_pack = std::move(m_packet_queue.front());
  422. m_packet_queue.pop();
  423. m_packet_mutex.unlock();
  424. m_client->send(front_pack);
  425. }
  426. m_client->send(p);
  427. }
  428. else
  429. {
  430. std::lock_guard<std::mutex> guard(m_packet_mutex);
  431. m_packet_queue.push(p);
  432. }
  433. }
  434. socket::event_listener socket::impl::get_bind_listener_locked(const string &event)
  435. {
  436. std::lock_guard<std::mutex> guard(m_event_mutex);
  437. auto it = m_event_binding.find(event);
  438. if(it!=m_event_binding.end())
  439. {
  440. return it->second;
  441. }
  442. return socket::event_listener();
  443. }
  444. socket::socket(client_impl* client,std::string const& nsp):
  445. m_impl(new impl(client,nsp))
  446. {
  447. }
  448. socket::~socket()
  449. {
  450. delete m_impl;
  451. }
  452. void socket::on(std::string const& event_name,event_listener const& func)
  453. {
  454. m_impl->on(event_name, func);
  455. }
  456. void socket::on(std::string const& event_name,event_listener_aux const& func)
  457. {
  458. m_impl->on(event_name, func);
  459. }
  460. void socket::off(std::string const& event_name)
  461. {
  462. m_impl->off(event_name);
  463. }
  464. void socket::off_all()
  465. {
  466. m_impl->off_all();
  467. }
  468. void socket::close()
  469. {
  470. m_impl->close();
  471. }
  472. void socket::on_error(error_listener const& l)
  473. {
  474. m_impl->on_error(l);
  475. }
  476. void socket::off_error()
  477. {
  478. m_impl->off_error();
  479. }
  480. void socket::emit(std::string const& name, message::list const& msglist, std::function<void (message::list const&)> const& ack)
  481. {
  482. m_impl->emit(name, msglist,ack);
  483. }
  484. std::string const& socket::get_namespace() const
  485. {
  486. return m_impl->get_namespace();
  487. }
  488. void socket::on_connected()
  489. {
  490. m_impl->on_connected();
  491. }
  492. void socket::on_close()
  493. {
  494. m_impl->on_close();
  495. }
  496. void socket::on_open()
  497. {
  498. m_impl->on_open();
  499. }
  500. void socket::on_message_packet(packet const& p)
  501. {
  502. m_impl->on_message_packet(p);
  503. }
  504. void socket::on_disconnect()
  505. {
  506. m_impl->on_disconnect();
  507. }
  508. }