You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

strophe.stream-management.js 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. /* eslint-disable */
  2. import { $build, Strophe } from 'strophe.js';
  3. /**
  4. * StropheJS - Stream Management XEP-0198
  5. *
  6. * This plugin implements stream management ACK capabilities of the specs XEP-0198.
  7. * Note: Resumption is not supported in this current implementation.
  8. *
  9. * Reference: http://xmpp.org/extensions/xep-0198.html
  10. *
  11. * @class streamManagement
  12. */
  13. Strophe.addConnectionPlugin('streamManagement', {
  14. /**
  15. * @property {Boolean} logging: Set to true to enable logging regarding out of sync stanzas.
  16. */
  17. logging: false,
  18. /**
  19. * @property {Boolean} autoSendCountOnEveryIncomingStanza: Set to true to send an 'a' response after every stanza.
  20. * @default false
  21. * @public
  22. */
  23. autoSendCountOnEveryIncomingStanza: false,
  24. /**
  25. * @property {Integer} requestResponseInterval: Set this value to send a request for counter on very interval
  26. * number of stanzas sent. Set to 0 to disable.
  27. * @default 5
  28. * @public
  29. */
  30. requestResponseInterval: 5,
  31. /**
  32. * @property {Pointer} _c: Strophe connection instance.
  33. * @private
  34. */
  35. _c: null,
  36. /**
  37. * @property {String} _NS XMPP Namespace.
  38. * @private
  39. */
  40. _NS: 'urn:xmpp:sm:3',
  41. /**
  42. * @property {Boolean} _isStreamManagementEnabled
  43. * @private
  44. */
  45. _isStreamManagementEnabled: false,
  46. /**
  47. * @property {Integer} _serverProcesssedStanzasCounter: Keeps count of stanzas confirmed processed by the server.
  48. * The server is the source of truth of this value. It is the 'h' attribute on the latest 'a' element received
  49. * from the server.
  50. * @private
  51. */
  52. _serverProcesssedStanzasCounter: null,
  53. /**
  54. * @property {Integer} _clientProcessedStanzasCounter: Counter of stanzas received by the client from the server.
  55. * Client is the source of truth of this value. It is the 'h' attribute in the 'a' sent from the client to
  56. * the server.
  57. * @private
  58. */
  59. _clientProcessedStanzasCounter: null,
  60. /**
  61. * @property {Integer} _clientSentStanzasCounter
  62. * @private
  63. */
  64. _clientSentStanzasCounter: null,
  65. /**
  66. * Stores a reference to Strophe connection xmlOutput function to wrap counting functionality.
  67. * @method _originalXMLOutput
  68. * @type {Handler}
  69. * @private
  70. */
  71. _originalXMLOutput: null,
  72. /**
  73. * @property {Handler} _requestHandler: Stores reference to handler that process count request from server.
  74. * @private
  75. */
  76. _requestHandler: null,
  77. /**
  78. * @property {Handler} _incomingHandler: Stores reference to handler that processes incoming stanzas count.
  79. * @private
  80. */
  81. _incomingHandler: null,
  82. /**
  83. * @property {Integer} _requestResponseIntervalCount: Counts sent stanzas since last response request.
  84. */
  85. _requestResponseIntervalCount: 0,
  86. /**
  87. * @property {boolean} _isSupported: indicates whether or not the server has advertised support for the stream
  88. * management namespace.
  89. */
  90. _isSupported: false,
  91. /**
  92. * @property {Queue} _unacknowledgedStanzas: Maintains a list of packet ids for stanzas which have yet to be acknowledged.
  93. */
  94. _unacknowledgedStanzas: [],
  95. /**
  96. * @property {Array} _acknowledgedStanzaListeners: Stores callbacks for each stanza acknowledged by the server.
  97. * Provides the packet id of the stanza as a parameter.
  98. * @private
  99. */
  100. _acknowledgedStanzaListeners: [],
  101. addAcknowledgedStanzaListener: function(listener) {
  102. this._acknowledgedStanzaListeners.push(listener);
  103. },
  104. enable: function(resume) {
  105. if (!this._isSupported) {
  106. throw new Error('The server doesn\'t support urn:xmpp:sm:3 namespace');
  107. } else if (this._connectionStatus !== Strophe.Status.CONNECTED) {
  108. throw new Error('enable() can only be called in the CONNECTED state');
  109. }
  110. this._c.send($build('enable', { xmlns: this._NS, resume }));
  111. this._c.flush();
  112. this._c.pause();
  113. },
  114. getResumeToken: function() {
  115. return this._resumeToken;
  116. },
  117. isSupported() {
  118. return this._isSupported;
  119. },
  120. resume: function() {
  121. if (!this.getResumeToken()) {
  122. throw new Error('No resume token');
  123. }
  124. if (this._connectionStatus !== Strophe.Status.DISCONNECTED) {
  125. throw new Error('resume() can only be called in the DISCONNECTED state');
  126. }
  127. this._c.options.explicitResourceBinding = true;
  128. this._resuming = true;
  129. this._originalConnect.apply(this._c, this._connectArgs);
  130. },
  131. requestAcknowledgement: function() {
  132. if (this._connectionStatus !== Strophe.Status.CONNECTED) {
  133. throw new Error('requestAcknowledgement() can only be called in the CONNECTED state');
  134. }
  135. this._requestResponseIntervalCount = 0;
  136. this._c.send($build('r', { xmlns: this._NS }));
  137. },
  138. getOutgoingCounter: function() {
  139. return this._clientSentStanzasCounter;
  140. },
  141. getIncomingCounter: function() {
  142. return this._clientProcessedStanzasCounter;
  143. },
  144. init: function(conn) {
  145. this._c = conn;
  146. Strophe.addNamespace('SM', this._NS);
  147. // Storing original xmlOutput function to use additional logic
  148. this._originalXMLOutput = this._c.xmlOutput;
  149. this._c.xmlOutput = this.xmlOutput.bind(this);
  150. this._originalConnect = this._c.connect;
  151. this._c.connect = this._interceptConnectArgs.bind(this);
  152. this._originalOnStreamFeaturesAfterSASL = this._c._onStreamFeaturesAfterSASL;
  153. this._c._onStreamFeaturesAfterSASL = this._onStreamFeaturesAfterSASL.bind(this);
  154. this._originalDoDisconnect = this._c._doDisconnect;
  155. this._c._doDisconnect = this._interceptDoDisconnect.bind(this);
  156. this._originalDisconnect = this._c.disconnect;
  157. this._c.disconnect = this._interceptDisconnect.bind(this);
  158. },
  159. _interceptDisconnect: function() {
  160. this._resumeToken = undefined;
  161. this._originalDisconnect.apply(this._c, arguments);
  162. },
  163. _interceptDoDisconnect: function() {
  164. if (this.getResumeToken()
  165. && !this._resuming
  166. && this._c.connected && !this._c.disconnecting) {
  167. this._resumeState = {
  168. handlers: this._c.handlers,
  169. timedHandlers: this._c.timedHandlers,
  170. removeTimeds: this._c.removeTimeds,
  171. removeHandlers: this._c.removeHandlers,
  172. addTimeds: this._c.addTimeds,
  173. addHandlers: this._c.addHandlers
  174. };
  175. this._storedJid = this._c.jid;
  176. this.logging && Strophe.debug('SM stored resume state, handler count: ' + this._resumeState.handlers.length);
  177. }
  178. // Remove any queued stanzas from the buffer that have failed to send while the socket was closed,
  179. // as they would interfere with the resume flow. They will be resent anyway.
  180. this._c._data = [];
  181. this._originalDoDisconnect.apply(this._c, arguments);
  182. },
  183. _interceptConnectArgs: function() {
  184. this._connectArgs = arguments;
  185. this._originalConnect.apply(this._c, arguments);
  186. },
  187. _onStreamFeaturesAfterSASL: function(elem) {
  188. this._isSupported = elem.getElementsByTagNameNS(this._NS, "sm").length > 0;
  189. return this._originalOnStreamFeaturesAfterSASL.apply(this._c, arguments);
  190. },
  191. statusChanged: function (status) {
  192. this._connectionStatus = status;
  193. if (!this.getResumeToken()
  194. && (status === Strophe.Status.CONNECTED || status === Strophe.Status.DISCONNECTED)) {
  195. this.logging && Strophe.debug('SM reset state');
  196. this._serverProcesssedStanzasCounter = 0;
  197. this._clientProcessedStanzasCounter = 0;
  198. this._clientSentStanzasCounter = 0;
  199. this._isStreamManagementEnabled = false;
  200. this._requestResponseIntervalCount = 0;
  201. // FIXME not described in JSDocs
  202. this._resuming = false;
  203. if (status === Strophe.Status.DISCONNECTED) {
  204. this._isSupported = false;
  205. }
  206. this._unacknowledgedStanzas = [];
  207. if (this._requestHandler) {
  208. this._c.deleteHandler(this._requestHandler);
  209. }
  210. if (this._incomingHandler) {
  211. this._c.deleteHandler(this._incomingHandler);
  212. }
  213. this._requestHandler = this._c.addHandler(this._handleServerRequestHandler.bind(this), this._NS, 'r');
  214. this._ackHandler = this._c.addHandler(this._handleServerAck.bind(this), this._NS, 'a');
  215. this._incomingHandler = this._c.addHandler(this._incomingStanzaHandler.bind(this));
  216. // FIXME handler instances stored, but never used
  217. this._enabledHandler = this._c._addSysHandler(this._handleEnabled.bind(this), this._NS, 'enabled');
  218. this._resumeFailedHandler = this._c._addSysHandler(this._handleResumeFailed.bind(this), this._NS, 'failed');
  219. this._resumedHandler = this._c._addSysHandler(this._handleResumed.bind(this), this._NS,'resumed');
  220. } else if (status === Strophe.Status.BINDREQUIRED) {
  221. this._c.jid = this._storedJid;
  222. // Restore Strophe handlers
  223. for (const h of (this._resumeState.handlers || [])
  224. .concat(this._resumeState.addHandlers || [])) {
  225. this._c._addSysHandler(h.handler, h.ns, h.name, h.type, h.id);
  226. }
  227. for (const h of (this._resumeState.timedHandlers || [])
  228. .concat(this._resumeState.addTimeds)) {
  229. this._c.addTimedHandler(h.period, h.handler);
  230. }
  231. for (const h of (this._resumeState.removeTimeds || [])
  232. .concat(this._resumeState.removeHandlers || [])) {
  233. this._c.deleteTimedHandler(h);
  234. }
  235. // FIXME check conditions if there's session ID and if enabled
  236. this._c.send($build('resume', {
  237. xmlns: this._NS,
  238. h: this._clientProcessedStanzasCounter,
  239. previd: this._resumeToken
  240. }));
  241. this._c.flush();
  242. } else if (status === Strophe.Status.ERROR) {
  243. this.logging && Strophe.debug('SM cleared resume token on error');
  244. this._resumeToken = undefined;
  245. }
  246. },
  247. /**
  248. * This method overrides the send method implemented by Strophe.Connection
  249. * to count outgoing stanzas
  250. *
  251. * @method Send
  252. * @public
  253. */
  254. xmlOutput: function(elem) {
  255. if (Strophe.isTagEqual(elem, 'iq') ||
  256. Strophe.isTagEqual(elem, 'presence') ||
  257. Strophe.isTagEqual(elem, 'message')) {
  258. this._increaseSentStanzasCounter(elem);
  259. }
  260. return this._originalXMLOutput.call(this._c, elem);
  261. },
  262. _handleEnabled: function(elem) {
  263. this._isStreamManagementEnabled = true;
  264. // FIXME fail if requested, but not enabled
  265. this._resumeToken = elem.getAttribute('resume') === 'true' && elem.getAttribute('id');
  266. this._c.resume();
  267. return true;
  268. },
  269. _handleResumeFailed: function(elem) {
  270. const error = elem && (
  271. (elem.firstElementChild && elem.firstElementChild.tagName)
  272. || (elem.firstChild && elem.firstChild.tagName));
  273. this._c._changeConnectStatus(Strophe.Status.ERROR, error, elem);
  274. this._c._doDisconnect();
  275. return true;
  276. },
  277. _handleResumed: function(elem) {
  278. // FIXME check if in the correct state
  279. var handledCount = parseInt(elem.getAttribute('h'));
  280. this._handleAcknowledgedStanzas(handledCount, this._serverProcesssedStanzasCounter);
  281. this._resuming = false;
  282. this._c.do_bind = false; // No need to bind our resource anymore
  283. this._c.authenticated = true;
  284. this._c.restored = true;
  285. if (this._unacknowledgedStanzas.length > 0) {
  286. this.logging && Strophe.debug('SM Sending unacknowledged stanzas', this._unacknowledgedStanzas);
  287. for(const stanza of this._unacknowledgedStanzas) {
  288. this._c.send(stanza);
  289. }
  290. } else {
  291. this.logging && Strophe.debug('SM No unacknowledged stanzas', this._unacknowledgedStanzas);
  292. }
  293. this._c._changeConnectStatus(Strophe.Status.CONNECTED, null);
  294. return true;
  295. },
  296. _incomingStanzaHandler: function(elem) {
  297. if (Strophe.isTagEqual(elem, 'iq') || Strophe.isTagEqual(elem, 'presence') || Strophe.isTagEqual(elem, 'message')) {
  298. this._increaseReceivedStanzasCounter();
  299. if (this.autoSendCountOnEveryIncomingStanza) {
  300. this._answerProcessedStanzas();
  301. }
  302. }
  303. return true;
  304. },
  305. _handleAcknowledgedStanzas: function(reportedHandledCount, lastKnownHandledCount) {
  306. var delta = reportedHandledCount - lastKnownHandledCount;
  307. if (delta < 0) {
  308. this._throwError('New reported stanza count lower than previous. New: ' + reportedHandledCount + ' - Previous: ' + lastKnownHandledCount);
  309. }
  310. if (delta > this._unacknowledgedStanzas.length) {
  311. this._throwError('Higher reported acknowledge count than unacknowledged stanzas. Reported Acknowledge Count: ' + delta + ' - Unacknowledge Stanza Count: ' + this._unacknowledgedStanzas.length + ' - New: ' + reportedHandledCount + ' - Previous: ' + lastKnownHandledCount);
  312. }
  313. for(var i = 0; i < delta; i++) {
  314. var stanza = this._unacknowledgedStanzas.shift();
  315. for (var j = 0; j < this._acknowledgedStanzaListeners.length; j++) {
  316. this._acknowledgedStanzaListeners[j](stanza);
  317. }
  318. }
  319. if (this.logging && this._unacknowledgedStanzas.length > 0) {
  320. Strophe.warn('SM Unacknowledged stanzas', this._unacknowledgedStanzas);
  321. }
  322. this._serverProcesssedStanzasCounter = reportedHandledCount;
  323. if (this.requestResponseInterval > 0) {
  324. this._requestResponseIntervalCount = 0;
  325. }
  326. },
  327. _handleServerRequestHandler: function() {
  328. this._answerProcessedStanzas();
  329. return true;
  330. },
  331. _handleServerAck: function(elem){
  332. var handledCount = parseInt(elem.getAttribute('h'));
  333. this._handleAcknowledgedStanzas(handledCount, this._serverProcesssedStanzasCounter);
  334. return true;
  335. },
  336. _answerProcessedStanzas: function() {
  337. if (this._isStreamManagementEnabled) {
  338. this._c.send($build('a', { xmlns: this._NS, h: this._clientProcessedStanzasCounter }));
  339. }
  340. },
  341. _increaseSentStanzasCounter: function(elem) {
  342. if (this._isStreamManagementEnabled) {
  343. if (this._unacknowledgedStanzas.indexOf(elem) !== -1) {
  344. return;
  345. }
  346. this._unacknowledgedStanzas.push(elem);
  347. this._clientSentStanzasCounter++;
  348. if (this.requestResponseInterval > 0) {
  349. this._requestResponseIntervalCount++;
  350. if (this._requestResponseIntervalCount === this.requestResponseInterval) {
  351. // FIXME Can not call send from onIdle.
  352. setTimeout(() => {
  353. if (this._connectionStatus === Strophe.Status.CONNECTED) {
  354. this.requestAcknowledgement();
  355. }
  356. }, 1);
  357. }
  358. }
  359. }
  360. },
  361. _increaseReceivedStanzasCounter: function() {
  362. if (this._isStreamManagementEnabled) {
  363. this._clientProcessedStanzasCounter++;
  364. }
  365. },
  366. _throwError: function(msg) {
  367. Strophe.error(msg);
  368. throw new Error(msg);
  369. }
  370. });