浏览代码

feat(XmppConnection): sendIQ2

A new method which not only has Promise interface,
but also allows to queue IQs regardless of the current
connection's status.

Ping and send ICE failed notification requests are changed
to use this method which makes it more reliable on XMPP
Websocket connections on mobile(which in contrary to BOSH
dies much faster there).
dev1
paweldomas 5 年前
父节点
当前提交
61dca1d7b9

+ 9
- 0
modules/util/TestUtils.js 查看文件

1
+/* global process */
2
+/**
3
+ * Returns a Promise resolved after {@code process.nextTick}.
4
+ *
5
+ * @returns {Promise<void>}
6
+ */
7
+export function nextTick() {
8
+    return new Promise(resolve => process.nextTick(resolve));
9
+}

+ 10
- 9
modules/xmpp/JingleSessionPC.js 查看文件

719
                     id: this._bridgeSessionId
719
                     id: this._bridgeSessionId
720
                 });
720
                 });
721
 
721
 
722
-        this.connection.sendIQ(
723
-            sessionInfo,
724
-            null,
725
-            this.newJingleErrorHandler(sessionInfo),
726
-            /*
727
-             * This message will be often sent when there are connectivity
728
-             * issues, so make it slightly longer than Prosody's default BOSH
729
-             * inactivity timeout of 60 seconds.
730
-             */ 65);
722
+        this.connection.sendIQ2(
723
+            sessionInfo, {
724
+                /*
725
+                 * This message will be often sent when there are connectivity
726
+                 * issues, so make it slightly longer than Prosody's default BOSH
727
+                 * inactivity timeout of 60 seconds.
728
+                 */
729
+                timeout: 65
730
+            })
731
+            .catch(this.newJingleErrorHandler(sessionInfo));
731
     }
732
     }
732
 
733
 
733
     /**
734
     /**

+ 88
- 2
modules/xmpp/XmppConnection.js 查看文件

66
 
66
 
67
         this._lastSuccessTracker = new LastSuccessTracker();
67
         this._lastSuccessTracker = new LastSuccessTracker();
68
         this._lastSuccessTracker.startTracking(this._stropheConn);
68
         this._lastSuccessTracker.startTracking(this._stropheConn);
69
+
70
+        /**
71
+         * @typedef DeferredSendIQ Object
72
+         * @property {Element} iq - The IQ to send.
73
+         * @property {function} resolve - The resolve method of the deferred Promise.
74
+         * @property {function} reject - The reject method of the deferred Promise.
75
+         * @property {number} timeout - The ID of the timeout task that needs to be cleared, before sending the IQ.
76
+         */
77
+        /**
78
+         * Deferred IQs to be sent upon reconnect.
79
+         * @type {Array<DeferredSendIQ>}
80
+         * @private
81
+         */
82
+        this._deferredIQs = [];
69
     }
83
     }
70
 
84
 
71
     /**
85
     /**
225
 
239
 
226
         let blockCallback = false;
240
         let blockCallback = false;
227
 
241
 
228
-        if (status === Strophe.Status.CONNECTED) {
242
+        if (status === Strophe.Status.CONNECTED || status === Strophe.Status.ATTACHED) {
229
             this._maybeEnableStreamResume();
243
             this._maybeEnableStreamResume();
230
             this._maybeStartWSKeepAlive();
244
             this._maybeStartWSKeepAlive();
231
-            this._resumeRetryN = 0;
245
+            this._processDeferredIQs();
232
         } else if (status === Strophe.Status.DISCONNECTED) {
246
         } else if (status === Strophe.Status.DISCONNECTED) {
233
             // FIXME add RECONNECTING state instead of blocking the DISCONNECTED update
247
             // FIXME add RECONNECTING state instead of blocking the DISCONNECTED update
234
             blockCallback = this._tryResumingConnection();
248
             blockCallback = this._tryResumingConnection();
243
         }
257
         }
244
     }
258
     }
245
 
259
 
260
+    /**
261
+     * Clears the list of IQs and rejects deferred Promises with an error.
262
+     *
263
+     * @private
264
+     */
265
+    _clearDeferredIQs() {
266
+        for (const deferred of this._deferredIQs) {
267
+            deferred.reject(new Error('disconnect'));
268
+        }
269
+        this._deferredIQs = [];
270
+    }
271
+
246
     /**
272
     /**
247
      * The method is meant to be used for testing. It's a shortcut for closing the WebSocket.
273
      * The method is meant to be used for testing. It's a shortcut for closing the WebSocket.
248
      *
274
      *
260
     disconnect(...args) {
286
     disconnect(...args) {
261
         clearTimeout(this._resumeTimeout);
287
         clearTimeout(this._resumeTimeout);
262
         clearTimeout(this._wsKeepAlive);
288
         clearTimeout(this._wsKeepAlive);
289
+        this._clearDeferredIQs();
263
         this._stropheConn.disconnect(...args);
290
         this._stropheConn.disconnect(...args);
264
     }
291
     }
265
 
292
 
336
         }
363
         }
337
     }
364
     }
338
 
365
 
366
+    /**
367
+     * Goes over the list of {@link DeferredSendIQ} tasks and sends them.
368
+     *
369
+     * @private
370
+     * @returns {void}
371
+     */
372
+    _processDeferredIQs() {
373
+        for (const deferred of this._deferredIQs) {
374
+            if (deferred.iq) {
375
+                clearTimeout(deferred.timeout);
376
+
377
+                const timeLeft = Date.now() - deferred.start;
378
+
379
+                this.sendIQ(
380
+                    deferred.iq,
381
+                    result => deferred.resolve(result),
382
+                    error => deferred.reject(error),
383
+                    timeLeft);
384
+            }
385
+        }
386
+
387
+        this._deferredIQs = [];
388
+    }
389
+
339
     /**
390
     /**
340
      * Send a stanza. This function is called to push data onto the send queue to go out over the wire.
391
      * Send a stanza. This function is called to push data onto the send queue to go out over the wire.
341
      *
392
      *
369
         return this._stropheConn.sendIQ(elem, callback, errback, timeout);
420
         return this._stropheConn.sendIQ(elem, callback, errback, timeout);
370
     }
421
     }
371
 
422
 
423
+    /**
424
+     * Sends an IQ immediately if connected or puts it on the send queue otherwise(in contrary to other send methods
425
+     * which would fail immediately if disconnected).
426
+     *
427
+     * @param {Element} iq - The IQ to send.
428
+     * @param {number} timeout - How long to wait for the response. The time when the connection is reconnecting is
429
+     * included, which means that the IQ may never be sent and still fail with a timeout.
430
+     */
431
+    sendIQ2(iq, { timeout }) {
432
+        return new Promise((resolve, reject) => {
433
+            if (this.connected) {
434
+                this.sendIQ(
435
+                    iq,
436
+                    result => resolve(result),
437
+                    error => reject(error));
438
+            } else {
439
+                const deferred = {
440
+                    iq,
441
+                    resolve,
442
+                    reject,
443
+                    start: Date.now(),
444
+                    timeout: setTimeout(() => {
445
+                        // clears the IQ on timeout and invalidates the deferred task
446
+                        deferred.iq = undefined;
447
+
448
+                        // Strophe calls with undefined on timeout
449
+                        reject(undefined);
450
+                    }, timeout)
451
+                };
452
+
453
+                this._deferredIQs.push(deferred);
454
+            }
455
+        });
456
+    }
457
+
372
     /**
458
     /**
373
      *  Helper function to send presence stanzas. The main benefit is for sending presence stanzas for which you expect
459
      *  Helper function to send presence stanzas. The main benefit is for sending presence stanzas for which you expect
374
      *  a responding presence stanza with the same id (for example when leaving a chat room).
460
      *  a responding presence stanza with the same id (for example when leaving a chat room).

+ 193
- 0
modules/xmpp/XmppConnection.spec.js 查看文件

1
+import { default as XmppConnection } from './XmppConnection';
2
+import { $iq, Strophe } from 'strophe.js';
3
+import { nextTick } from '../util/TestUtils';
4
+
5
+/**
6
+ * Mock Strophe connection.
7
+ */
8
+class MockStropheConnection {
9
+    /**
10
+     * XMPP service URL.
11
+     *
12
+     * @returns {string}
13
+     */
14
+    get service() {
15
+        return 'wss://localhost/xmpp-websocket';
16
+    }
17
+
18
+    /**
19
+     * {@see Strophe.Connection.connect}
20
+     */
21
+    connect(jid, pass, callback) {
22
+        this._connectCb = callback;
23
+    }
24
+
25
+    /**
26
+     * {@see Strophe.Connection.disconnect}
27
+     */
28
+    disconnect() {
29
+        this.simulateConnectionState(Strophe.Status.DISCONNECTING);
30
+        this.simulateConnectionState(Strophe.Status.DISCONNECTED);
31
+    }
32
+
33
+    /**
34
+     * Simulates transition to the new connection status.
35
+     *
36
+     * @param {Strophe.Status} newState - The new connection status to set.
37
+     * @returns {void}
38
+     */
39
+    simulateConnectionState(newState) {
40
+        this._connectCb(newState);
41
+    }
42
+
43
+    /**
44
+     * {@see Strophe.Connection.sendIQ}.
45
+     */
46
+    sendIQ(iq, resultCb) {
47
+        resultCb();
48
+    }
49
+}
50
+
51
+/**
52
+ * Creates any IQ.
53
+ * @returns {Element}
54
+ */
55
+function testIQ() {
56
+    return $iq({
57
+        to: 'remoteJid',
58
+        type: 'set'
59
+    })
60
+    .c('jingle', { xmlns: 'urn:xmpp:jingle:1',
61
+        action: 'session-info',
62
+        initiator: 'blabla',
63
+        sid: '1234' })
64
+    .up();
65
+}
66
+
67
+describe('XmppConnection', () => {
68
+    let connection;
69
+    let mockStropheConnection;
70
+    let sendIQSpy;
71
+
72
+    beforeEach(() => {
73
+        jasmine.clock().install();
74
+
75
+        spyOn(Strophe, 'Connection').and.callFake((...args) => {
76
+            mockStropheConnection = new MockStropheConnection(...args);
77
+
78
+            return mockStropheConnection;
79
+        });
80
+
81
+        connection = new XmppConnection({
82
+            serviceUrl: 'wss://localhost/xmpp-websocket'
83
+        });
84
+
85
+        sendIQSpy = spyOn(mockStropheConnection, 'sendIQ').and.callThrough();
86
+
87
+        // eslint-disable-next-line no-empty-function
88
+        connection.connect('jid', undefined, () => { });
89
+    });
90
+
91
+    afterEach(() => {
92
+        jasmine.clock().uninstall();
93
+    });
94
+    describe('sendIQ2', () => {
95
+        it('will send the IQ immediately if connected', () => {
96
+            mockStropheConnection.simulateConnectionState(Strophe.Status.CONNECTED);
97
+
98
+            return connection.sendIQ2(testIQ(), { timeout: 15000 })
99
+                .then(() => {
100
+                    expect(sendIQSpy).toHaveBeenCalled();
101
+                });
102
+        });
103
+        it('will send the IQ on reconnect', () => {
104
+            mockStropheConnection.simulateConnectionState(Strophe.Status.CONNECTING);
105
+
106
+            let resolved = false;
107
+
108
+            connection
109
+                .sendIQ2(testIQ(), { timeout: 15000 })
110
+                .then(() => {
111
+                    resolved = true;
112
+                });
113
+
114
+            jasmine.clock().tick(10000);
115
+
116
+            return nextTick()
117
+                .then(() => {
118
+                    expect(resolved).toBe(false);
119
+                    expect(sendIQSpy).not.toHaveBeenCalled();
120
+
121
+                    mockStropheConnection.simulateConnectionState(Strophe.Status.CONNECTED);
122
+
123
+                    return nextTick();
124
+                })
125
+                .then(() => {
126
+                    expect(resolved).toBe(true);
127
+                    expect(sendIQSpy).toHaveBeenCalled();
128
+                });
129
+        });
130
+        it('will timeout the operation if not connected in time', () => {
131
+            mockStropheConnection.simulateConnectionState(Strophe.Status.CONNECTING);
132
+
133
+            let rejected = false, resolved = false;
134
+
135
+            connection
136
+                .sendIQ2(testIQ(), { timeout: 15000 })
137
+                .then(() => {
138
+                    resolved = true;
139
+                }, () => {
140
+                    rejected = true;
141
+                });
142
+
143
+            jasmine.clock().tick(10000);
144
+
145
+            return nextTick()
146
+                .then(() => {
147
+                    expect(sendIQSpy).not.toHaveBeenCalled();
148
+                    expect(resolved).toBe(false);
149
+                    expect(rejected).toBe(false);
150
+
151
+                    jasmine.clock().tick(10000);
152
+
153
+                    return nextTick();
154
+                })
155
+                .then(() => {
156
+                    expect(sendIQSpy).not.toHaveBeenCalled();
157
+                    expect(resolved).toBe(false);
158
+                    expect(rejected).toBe(true);
159
+                });
160
+        });
161
+        it('will reject the promise on explicit disconnect', () => {
162
+            mockStropheConnection.simulateConnectionState(Strophe.Status.CONNECTING);
163
+
164
+            let rejected = false, resolved = false;
165
+
166
+            connection
167
+                .sendIQ2(testIQ(), { timeout: 15000 })
168
+                .then(() => {
169
+                    resolved = true;
170
+                }, error => {
171
+                    rejected = error;
172
+                });
173
+
174
+            jasmine.clock().tick(10000);
175
+
176
+            return nextTick()
177
+                .then(() => {
178
+                    expect(sendIQSpy).not.toHaveBeenCalled();
179
+                    expect(resolved).toBe(false);
180
+                    expect(rejected).toBe(false);
181
+
182
+                    connection.disconnect();
183
+
184
+                    return nextTick();
185
+                })
186
+                .then(() => {
187
+                    expect(sendIQSpy).not.toHaveBeenCalled();
188
+                    expect(resolved).toBe(false);
189
+                    expect(rejected).toEqual(new Error('disconnect'));
190
+                });
191
+        });
192
+    });
193
+});

+ 2
- 1
modules/xmpp/strophe.ping.js 查看文件

76
         });
76
         });
77
 
77
 
78
         iq.c('ping', { xmlns: Strophe.NS.PING });
78
         iq.c('ping', { xmlns: Strophe.NS.PING });
79
-        this.connection.sendIQ(iq, success, error, timeout);
79
+        this.connection.sendIQ2(iq, { timeout })
80
+            .then(success, error);
80
     }
81
     }
81
 
82
 
82
     /* eslint-enable max-params */
83
     /* eslint-enable max-params */

正在加载...
取消
保存