123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 |
- /**
- * Module dependencies.
- */
- var eio = require('engine.io-client');
- var Socket = require('./socket');
- var Emitter = require('component-emitter');
- var parser = require('socket.io-parser');
- var on = require('./on');
- var bind = require('component-bind');
- var debug = require('debug')('socket.io-client:manager');
- var indexOf = require('indexof');
- var Backoff = require('backo2');
- /**
- * IE6+ hasOwnProperty
- */
- var has = Object.prototype.hasOwnProperty;
- /**
- * Module exports
- */
- module.exports = Manager;
- /**
- * `Manager` constructor.
- *
- * @param {String} engine instance or engine uri/opts
- * @param {Object} options
- * @api public
- */
- function Manager (uri, opts) {
- if (!(this instanceof Manager)) return new Manager(uri, opts);
- if (uri && ('object' === typeof uri)) {
- opts = uri;
- uri = undefined;
- }
- opts = opts || {};
- opts.path = opts.path || '/socket.io';
- this.nsps = {};
- this.subs = [];
- this.opts = opts;
- this.reconnection(opts.reconnection !== false);
- this.reconnectionAttempts(opts.reconnectionAttempts || Infinity);
- this.reconnectionDelay(opts.reconnectionDelay || 1000);
- this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000);
- this.randomizationFactor(opts.randomizationFactor || 0.5);
- this.backoff = new Backoff({
- min: this.reconnectionDelay(),
- max: this.reconnectionDelayMax(),
- jitter: this.randomizationFactor()
- });
- this.timeout(null == opts.timeout ? 20000 : opts.timeout);
- this.readyState = 'closed';
- this.uri = uri;
- this.connecting = [];
- this.lastPing = null;
- this.encoding = false;
- this.packetBuffer = [];
- var _parser = opts.parser || parser;
- this.encoder = new _parser.Encoder();
- this.decoder = new _parser.Decoder();
- this.autoConnect = opts.autoConnect !== false;
- if (this.autoConnect) this.open();
- }
- /**
- * Propagate given event to sockets and emit on `this`
- *
- * @api private
- */
- Manager.prototype.emitAll = function () {
- this.emit.apply(this, arguments);
- for (var nsp in this.nsps) {
- if (has.call(this.nsps, nsp)) {
- this.nsps[nsp].emit.apply(this.nsps[nsp], arguments);
- }
- }
- };
- /**
- * Update `socket.id` of all sockets
- *
- * @api private
- */
- Manager.prototype.updateSocketIds = function () {
- for (var nsp in this.nsps) {
- if (has.call(this.nsps, nsp)) {
- this.nsps[nsp].id = this.generateId(nsp);
- }
- }
- };
- /**
- * generate `socket.id` for the given `nsp`
- *
- * @param {String} nsp
- * @return {String}
- * @api private
- */
- Manager.prototype.generateId = function (nsp) {
- return (nsp === '/' ? '' : (nsp + '#')) + this.engine.id;
- };
- /**
- * Mix in `Emitter`.
- */
- Emitter(Manager.prototype);
- /**
- * Sets the `reconnection` config.
- *
- * @param {Boolean} true/false if it should automatically reconnect
- * @return {Manager} self or value
- * @api public
- */
- Manager.prototype.reconnection = function (v) {
- if (!arguments.length) return this._reconnection;
- this._reconnection = !!v;
- return this;
- };
- /**
- * Sets the reconnection attempts config.
- *
- * @param {Number} max reconnection attempts before giving up
- * @return {Manager} self or value
- * @api public
- */
- Manager.prototype.reconnectionAttempts = function (v) {
- if (!arguments.length) return this._reconnectionAttempts;
- this._reconnectionAttempts = v;
- return this;
- };
- /**
- * Sets the delay between reconnections.
- *
- * @param {Number} delay
- * @return {Manager} self or value
- * @api public
- */
- Manager.prototype.reconnectionDelay = function (v) {
- if (!arguments.length) return this._reconnectionDelay;
- this._reconnectionDelay = v;
- this.backoff && this.backoff.setMin(v);
- return this;
- };
- Manager.prototype.randomizationFactor = function (v) {
- if (!arguments.length) return this._randomizationFactor;
- this._randomizationFactor = v;
- this.backoff && this.backoff.setJitter(v);
- return this;
- };
- /**
- * Sets the maximum delay between reconnections.
- *
- * @param {Number} delay
- * @return {Manager} self or value
- * @api public
- */
- Manager.prototype.reconnectionDelayMax = function (v) {
- if (!arguments.length) return this._reconnectionDelayMax;
- this._reconnectionDelayMax = v;
- this.backoff && this.backoff.setMax(v);
- return this;
- };
- /**
- * Sets the connection timeout. `false` to disable
- *
- * @return {Manager} self or value
- * @api public
- */
- Manager.prototype.timeout = function (v) {
- if (!arguments.length) return this._timeout;
- this._timeout = v;
- return this;
- };
- /**
- * Starts trying to reconnect if reconnection is enabled and we have not
- * started reconnecting yet
- *
- * @api private
- */
- Manager.prototype.maybeReconnectOnOpen = function () {
- // Only try to reconnect if it's the first time we're connecting
- if (!this.reconnecting && this._reconnection && this.backoff.attempts === 0) {
- // keeps reconnection from firing twice for the same reconnection loop
- this.reconnect();
- }
- };
- /**
- * Sets the current transport `socket`.
- *
- * @param {Function} optional, callback
- * @return {Manager} self
- * @api public
- */
- Manager.prototype.open =
- Manager.prototype.connect = function (fn, opts) {
- debug('readyState %s', this.readyState);
- if (~this.readyState.indexOf('open')) return this;
- debug('opening %s', this.uri);
- this.engine = eio(this.uri, this.opts);
- var socket = this.engine;
- var self = this;
- this.readyState = 'opening';
- this.skipReconnect = false;
- // emit `open`
- var openSub = on(socket, 'open', function () {
- self.onopen();
- fn && fn();
- });
- // emit `connect_error`
- var errorSub = on(socket, 'error', function (data) {
- debug('connect_error');
- self.cleanup();
- self.readyState = 'closed';
- self.emitAll('connect_error', data);
- if (fn) {
- var err = new Error('Connection error');
- err.data = data;
- fn(err);
- } else {
- // Only do this if there is no fn to handle the error
- self.maybeReconnectOnOpen();
- }
- });
- // emit `connect_timeout`
- if (false !== this._timeout) {
- var timeout = this._timeout;
- debug('connect attempt will timeout after %d', timeout);
- if (timeout === 0) {
- openSub.destroy(); // prevents a race condition with the 'open' event
- }
- // set timer
- var timer = setTimeout(function () {
- debug('connect attempt timed out after %d', timeout);
- openSub.destroy();
- socket.close();
- socket.emit('error', 'timeout');
- self.emitAll('connect_timeout', timeout);
- }, timeout);
- this.subs.push({
- destroy: function () {
- clearTimeout(timer);
- }
- });
- }
- this.subs.push(openSub);
- this.subs.push(errorSub);
- return this;
- };
- /**
- * Called upon transport open.
- *
- * @api private
- */
- Manager.prototype.onopen = function () {
- debug('open');
- // clear old subs
- this.cleanup();
- // mark as open
- this.readyState = 'open';
- this.emit('open');
- // add new subs
- var socket = this.engine;
- this.subs.push(on(socket, 'data', bind(this, 'ondata')));
- this.subs.push(on(socket, 'ping', bind(this, 'onping')));
- this.subs.push(on(socket, 'pong', bind(this, 'onpong')));
- this.subs.push(on(socket, 'error', bind(this, 'onerror')));
- this.subs.push(on(socket, 'close', bind(this, 'onclose')));
- this.subs.push(on(this.decoder, 'decoded', bind(this, 'ondecoded')));
- };
- /**
- * Called upon a ping.
- *
- * @api private
- */
- Manager.prototype.onping = function () {
- this.lastPing = new Date();
- this.emitAll('ping');
- };
- /**
- * Called upon a packet.
- *
- * @api private
- */
- Manager.prototype.onpong = function () {
- this.emitAll('pong', new Date() - this.lastPing);
- };
- /**
- * Called with data.
- *
- * @api private
- */
- Manager.prototype.ondata = function (data) {
- this.decoder.add(data);
- };
- /**
- * Called when parser fully decodes a packet.
- *
- * @api private
- */
- Manager.prototype.ondecoded = function (packet) {
- this.emit('packet', packet);
- };
- /**
- * Called upon socket error.
- *
- * @api private
- */
- Manager.prototype.onerror = function (err) {
- debug('error', err);
- this.emitAll('error', err);
- };
- /**
- * Creates a new socket for the given `nsp`.
- *
- * @return {Socket}
- * @api public
- */
- Manager.prototype.socket = function (nsp, opts) {
- var socket = this.nsps[nsp];
- if (!socket) {
- socket = new Socket(this, nsp, opts);
- this.nsps[nsp] = socket;
- var self = this;
- socket.on('connecting', onConnecting);
- socket.on('connect', function () {
- socket.id = self.generateId(nsp);
- });
- if (this.autoConnect) {
- // manually call here since connecting event is fired before listening
- onConnecting();
- }
- }
- function onConnecting () {
- if (!~indexOf(self.connecting, socket)) {
- self.connecting.push(socket);
- }
- }
- return socket;
- };
- /**
- * Called upon a socket close.
- *
- * @param {Socket} socket
- */
- Manager.prototype.destroy = function (socket) {
- var index = indexOf(this.connecting, socket);
- if (~index) this.connecting.splice(index, 1);
- if (this.connecting.length) return;
- this.close();
- };
- /**
- * Writes a packet.
- *
- * @param {Object} packet
- * @api private
- */
- Manager.prototype.packet = function (packet) {
- debug('writing packet %j', packet);
- var self = this;
- if (packet.query && packet.type === 0) packet.nsp += '?' + packet.query;
- if (!self.encoding) {
- // encode, then write to engine with result
- self.encoding = true;
- this.encoder.encode(packet, function (encodedPackets) {
- for (var i = 0; i < encodedPackets.length; i++) {
- self.engine.write(encodedPackets[i], packet.options);
- }
- self.encoding = false;
- self.processPacketQueue();
- });
- } else { // add packet to the queue
- self.packetBuffer.push(packet);
- }
- };
- /**
- * If packet buffer is non-empty, begins encoding the
- * next packet in line.
- *
- * @api private
- */
- Manager.prototype.processPacketQueue = function () {
- if (this.packetBuffer.length > 0 && !this.encoding) {
- var pack = this.packetBuffer.shift();
- this.packet(pack);
- }
- };
- /**
- * Clean up transport subscriptions and packet buffer.
- *
- * @api private
- */
- Manager.prototype.cleanup = function () {
- debug('cleanup');
- var subsLength = this.subs.length;
- for (var i = 0; i < subsLength; i++) {
- var sub = this.subs.shift();
- sub.destroy();
- }
- this.packetBuffer = [];
- this.encoding = false;
- this.lastPing = null;
- this.decoder.destroy();
- };
- /**
- * Close the current socket.
- *
- * @api private
- */
- Manager.prototype.close =
- Manager.prototype.disconnect = function () {
- debug('disconnect');
- this.skipReconnect = true;
- this.reconnecting = false;
- if ('opening' === this.readyState) {
- // `onclose` will not fire because
- // an open event never happened
- this.cleanup();
- }
- this.backoff.reset();
- this.readyState = 'closed';
- if (this.engine) this.engine.close();
- };
- /**
- * Called upon engine close.
- *
- * @api private
- */
- Manager.prototype.onclose = function (reason) {
- debug('onclose');
- this.cleanup();
- this.backoff.reset();
- this.readyState = 'closed';
- this.emit('close', reason);
- if (this._reconnection && !this.skipReconnect) {
- this.reconnect();
- }
- };
- /**
- * Attempt a reconnection.
- *
- * @api private
- */
- Manager.prototype.reconnect = function () {
- if (this.reconnecting || this.skipReconnect) return this;
- var self = this;
- if (this.backoff.attempts >= this._reconnectionAttempts) {
- debug('reconnect failed');
- this.backoff.reset();
- this.emitAll('reconnect_failed');
- this.reconnecting = false;
- } else {
- var delay = this.backoff.duration();
- debug('will wait %dms before reconnect attempt', delay);
- this.reconnecting = true;
- var timer = setTimeout(function () {
- if (self.skipReconnect) return;
- debug('attempting reconnect');
- self.emitAll('reconnect_attempt', self.backoff.attempts);
- self.emitAll('reconnecting', self.backoff.attempts);
- // check again for the case socket closed in above events
- if (self.skipReconnect) return;
- self.open(function (err) {
- if (err) {
- debug('reconnect attempt error');
- self.reconnecting = false;
- self.reconnect();
- self.emitAll('reconnect_error', err.data);
- } else {
- debug('reconnect success');
- self.onreconnect();
- }
- });
- }, delay);
- this.subs.push({
- destroy: function () {
- clearTimeout(timer);
- }
- });
- }
- };
- /**
- * Called upon successful reconnect.
- *
- * @api private
- */
- Manager.prototype.onreconnect = function () {
- var attempt = this.backoff.attempts;
- this.reconnecting = false;
- this.backoff.reset();
- this.updateSocketIds();
- this.emitAll('reconnect', attempt);
- };
|