Przeglądaj źródła

feat: Detects shard changed when using websockets. (#1462)

* feat: Detects shard changed when using websockets.

* squash: Simplifies code.

* squash: Rename method.

* squash: Skips checking for shard changed on the initial connection.
dev1
Дамян Минков 4 lat temu
rodzic
commit
cb484cf48c
No account linked to committer's email address
2 zmienionych plików z 77 dodań i 14 usunięć
  1. 46
    11
      modules/xmpp/XmppConnection.js
  2. 31
    3
      modules/xmpp/xmpp.js

+ 46
- 11
modules/xmpp/XmppConnection.js Wyświetl plik

@@ -21,7 +21,8 @@ export default class XmppConnection extends Listenable {
21 21
      */
22 22
     static get Events() {
23 23
         return {
24
-            CONN_STATUS_CHANGED: 'CONN_STATUS_CHANGED'
24
+            CONN_STATUS_CHANGED: 'CONN_STATUS_CHANGED',
25
+            CONN_SHARD_CHANGED: 'CONN_SHARD_CHANGED'
25 26
         };
26 27
     }
27 28
 
@@ -39,6 +40,8 @@ export default class XmppConnection extends Listenable {
39 40
      *
40 41
      * @param {Object} options
41 42
      * @param {String} options.serviceUrl - The BOSH or WebSocket service URL.
43
+     * @param {String} options.shard - The BOSH or WebSocket is connecting to this shard.
44
+     * Useful for detecting when shard changes.
42 45
      * @param {String} [options.enableWebsocketResume=true] - True/false to control the stream resumption functionality.
43 46
      * It will enable automatically by default if supported by the XMPP server.
44 47
      * @param {Number} [options.websocketKeepAlive=240000] - The websocket keep alive interval. It's 4 minutes by
@@ -47,11 +50,12 @@ export default class XmppConnection extends Listenable {
47 50
      * The keep alive is HTTP GET request to the {@link options.serviceUrl}.
48 51
      * @param {Object} [options.xmppPing] - The xmpp ping settings.
49 52
      */
50
-    constructor({ enableWebsocketResume, websocketKeepAlive, serviceUrl, xmppPing }) {
53
+    constructor({ enableWebsocketResume, websocketKeepAlive, serviceUrl, shard, xmppPing }) {
51 54
         super();
52 55
         this._options = {
53 56
             enableWebsocketResume: typeof enableWebsocketResume === 'undefined' ? true : enableWebsocketResume,
54 57
             pingOptions: xmppPing,
58
+            shard,
55 59
             websocketKeepAlive: typeof websocketKeepAlive === 'undefined' ? 4 * 60 * 1000 : Number(websocketKeepAlive)
56 60
         };
57 61
 
@@ -89,6 +93,9 @@ export default class XmppConnection extends Listenable {
89 93
                 onPingThresholdExceeded: () => this._onPingErrorThresholdExceeded(),
90 94
                 pingOptions: xmppPing
91 95
             }));
96
+
97
+        // tracks whether this is the initial connection or a reconnect
98
+        this._oneSuccessfulConnect = false;
92 99
     }
93 100
 
94 101
     /**
@@ -260,6 +267,14 @@ export default class XmppConnection extends Listenable {
260 267
 
261 268
         if (status === Strophe.Status.CONNECTED || status === Strophe.Status.ATTACHED) {
262 269
             this._maybeEnableStreamResume();
270
+
271
+            // after connecting - immediately check whether shard changed,
272
+            // we need this only when using websockets as bosh checks headers from every response
273
+            if (this._usesWebsocket && this._oneSuccessfulConnect) {
274
+                this._keepAliveAndCheckShard();
275
+            }
276
+            this._oneSuccessfulConnect = true;
277
+
263 278
             this._maybeStartWSKeepAlive();
264 279
             this._processDeferredIQs();
265 280
             this._resumeTask.cancel();
@@ -377,18 +392,38 @@ export default class XmppConnection extends Listenable {
377 392
 
378 393
             logger.debug(`Scheduling next WebSocket keep-alive in ${intervalWithJitter}ms`);
379 394
 
380
-            this._wsKeepAlive = setTimeout(() => {
381
-                const url = this.service.replace('wss://', 'https://').replace('ws://', 'http://');
382
-
383
-                fetch(url).catch(
384
-                    error => {
385
-                        logger.error(`Websocket Keep alive failed for url: ${url}`, { error });
386
-                    })
387
-                    .then(() => this._maybeStartWSKeepAlive());
388
-            }, intervalWithJitter);
395
+            this._wsKeepAlive = setTimeout(
396
+                () => this._keepAliveAndCheckShard()
397
+                    .then(() => this._maybeStartWSKeepAlive()),
398
+                intervalWithJitter);
389 399
         }
390 400
     }
391 401
 
402
+    /**
403
+     * Do a http GET to the shard and if shard change will throw an event.
404
+     *
405
+     * @private
406
+     * @returns {Promise}
407
+     */
408
+    _keepAliveAndCheckShard() {
409
+        const { shard } = this._options;
410
+        const url = this.service.replace('wss://', 'https://').replace('ws://', 'http://');
411
+
412
+        return fetch(url)
413
+            .then(response => {
414
+                const responseShard = response.headers.get('x-jitsi-shard');
415
+
416
+                if (responseShard !== shard) {
417
+                    logger.error(
418
+                        `Detected that shard changed from ${shard} to ${responseShard}`);
419
+                    this.eventEmitter.emit(XmppConnection.Events.CONN_SHARD_CHANGED);
420
+                }
421
+            })
422
+            .catch(error => {
423
+                logger.error(`Websocket Keep alive failed for url: ${url}`, { error });
424
+            });
425
+    }
426
+
392 427
     /**
393 428
      * Goes over the list of {@link DeferredSendIQ} tasks and sends them.
394 429
      *

+ 31
- 3
modules/xmpp/xmpp.js Wyświetl plik

@@ -30,12 +30,20 @@ const logger = getLogger(__filename);
30 30
  * @param {string} [options.token] - JWT token used for authentication(JWT authentication module must be enabled in
31 31
  * Prosody).
32 32
  * @param {string} options.serviceUrl - The service URL for XMPP connection.
33
+ * @param {string} options.shard - The shard where XMPP connection initially landed.
33 34
  * @param {string} options.enableWebsocketResume - True to enable stream resumption.
34 35
  * @param {number} [options.websocketKeepAlive] - See {@link XmppConnection} constructor.
35 36
  * @param {Object} [options.xmppPing] - See {@link XmppConnection} constructor.
36 37
  * @returns {XmppConnection}
37 38
  */
38
-function createConnection({ enableWebsocketResume, serviceUrl = '/http-bind', token, websocketKeepAlive, xmppPing }) {
39
+function createConnection({
40
+    enableWebsocketResume,
41
+    serviceUrl = '/http-bind',
42
+    shard,
43
+    token,
44
+    websocketKeepAlive,
45
+    xmppPing }) {
46
+
39 47
     // Append token as URL param
40 48
     if (token) {
41 49
         // eslint-disable-next-line no-param-reassign
@@ -46,7 +54,8 @@ function createConnection({ enableWebsocketResume, serviceUrl = '/http-bind', to
46 54
         enableWebsocketResume,
47 55
         serviceUrl,
48 56
         websocketKeepAlive,
49
-        xmppPing
57
+        xmppPing,
58
+        shard
50 59
     });
51 60
 }
52 61
 
@@ -131,7 +140,26 @@ export default class XMPP extends Listenable {
131 140
             serviceUrl: options.serviceUrl || options.bosh,
132 141
             token,
133 142
             websocketKeepAlive: options.websocketKeepAlive,
134
-            xmppPing
143
+            xmppPing,
144
+            shard: options.deploymentInfo?.shard
145
+        });
146
+
147
+        // forwards the shard changed event
148
+        this.connection.on(XmppConnection.Events.CONN_SHARD_CHANGED, () => {
149
+            /* eslint-disable camelcase */
150
+            const details = {
151
+                shard_changed: true,
152
+                suspend_time: this.connection.ping.getPingSuspendTime(),
153
+                time_since_last_success: this.connection.getTimeSinceLastSuccess()
154
+            };
155
+            /* eslint-enable camelcase */
156
+
157
+            this.eventEmitter.emit(
158
+                JitsiConnectionEvents.CONNECTION_FAILED,
159
+                JitsiConnectionErrors.OTHER_ERROR,
160
+                undefined,
161
+                undefined,
162
+                details);
135 163
         });
136 164
 
137 165
         this._initStrophePlugins();

Ładowanie…
Anuluj
Zapisz