Przeglądaj źródła

fix(JingleSession) start modification queue after session is established.

Pull initial offer/answer tasks out of the modification queue and execute them right away. Only track and codec related operations that necessitate a renegotiation cycle need to be pushed to the modification queue. The queue execution is paused until the session is established. This avoids track operations being executed before the session is established. This fixes an issue seen in p2p connections where sources of the initiator are not signaled to the remote since the tracks are added while the initiator is waiting for a session-accept from the peer.
master
Jaya Allamsetty 2 lat temu
rodzic
commit
f0dd403951
2 zmienionych plików z 163 dodań i 139 usunięć
  1. 14
    0
      modules/util/AsyncQueue.js
  2. 149
    139
      modules/xmpp/JingleSessionPC.js

+ 14
- 0
modules/util/AsyncQueue.js Wyświetl plik

@@ -34,6 +34,13 @@ export default class AsyncQueue {
34 34
         }
35 35
     }
36 36
 
37
+    /**
38
+     * Pauses the execution of the tasks on the queue.
39
+     */
40
+    pause() {
41
+        this._queue.pause();
42
+    }
43
+
37 44
     /**
38 45
      * The 'task' function will be given a callback it MUST call with either:
39 46
      *  1) No arguments if it was successful or
@@ -60,6 +67,13 @@ export default class AsyncQueue {
60 67
         this._queue.push(task, callback);
61 68
     }
62 69
 
70
+    /**
71
+     * Resumes the execution of the tasks on the queue.
72
+     */
73
+    resume() {
74
+        this._queue.resume();
75
+    }
76
+
63 77
     /**
64 78
      * Shutdowns the queue. All already queued tasks will execute, but no future tasks can be added. If a task is added
65 79
      * after the queue has been shutdown then the callback will be called with an error.

+ 149
- 139
modules/xmpp/JingleSessionPC.js Wyświetl plik

@@ -326,11 +326,15 @@ export default class JingleSessionPC extends JingleSession {
326 326
         this.remoteSourceMaxFrameHeights = undefined;
327 327
 
328 328
         /**
329
-         * The queue used to serialize operations done on the peerconnection.
329
+         * The queue used to serialize operations done on the peerconnection after the session is established.
330
+         * The queue is paused until the first offer/answer cycle is complete. Only track or codec related
331
+         * operations which necessitate a renegotiation cycle need to be pushed to the modification queue.
332
+         * These tasks will be executed after the session has been established.
330 333
          *
331 334
          * @type {AsyncQueue}
332 335
          */
333 336
         this.modificationQueue = new AsyncQueue();
337
+        this.modificationQueue.pause();
334 338
 
335 339
         /**
336 340
          * Flag used to guarantee that the connection established event is
@@ -999,6 +1003,10 @@ export default class JingleSessionPC extends JingleSession {
999 1003
                 // then we should either call 'success' here immediately or
1000 1004
                 // modify sendSessionAccept method to do that
1001 1005
                 this.sendSessionAccept(() => {
1006
+                    // Start processing tasks on the modification queue.
1007
+                    logger.debug('Resuming the modification queue after session is established!');
1008
+                    this.modificationQueue.resume();
1009
+
1002 1010
                     success();
1003 1011
                     this.room.eventEmitter.emit(XMPPEvents.SESSION_ACCEPT, this);
1004 1012
 
@@ -1033,35 +1041,29 @@ export default class JingleSessionPC extends JingleSession {
1033 1041
         if (!this.isInitiator) {
1034 1042
             throw new Error('Trying to invite from the responder session');
1035 1043
         }
1036
-        const workFunction = finishedCallback => {
1037
-            const addTracks = [];
1044
+        logger.debug(`${this} Executing invite task`);
1038 1045
 
1039
-            for (const track of localTracks) {
1040
-                addTracks.push(this.peerconnection.addTrack(track, this.isInitiator));
1041
-            }
1046
+        const addTracks = [];
1042 1047
 
1043
-            Promise.all(addTracks)
1044
-                .then(() => this.peerconnection.createOffer(this.mediaConstraints))
1045
-                .then(offerSdp => this.peerconnection.setLocalDescription(offerSdp))
1046
-                .then(() => {
1047
-                    this.peerconnection.processLocalSdpForTransceiverInfo(localTracks);
1048
+        for (const track of localTracks) {
1049
+            addTracks.push(this.peerconnection.addTrack(track, this.isInitiator));
1050
+        }
1048 1051
 
1049
-                    // NOTE that the offer is obtained from the localDescription getter as it needs to go though
1050
-                    // the transformation chain.
1051
-                    this.sendSessionInitiate(this.peerconnection.localDescription.sdp);
1052
-                })
1053
-                .then(() => finishedCallback(), error => finishedCallback(error));
1054
-        };
1052
+        Promise.all(addTracks)
1053
+            .then(() => this.peerconnection.createOffer(this.mediaConstraints))
1054
+            .then(offerSdp => this.peerconnection.setLocalDescription(offerSdp))
1055
+            .then(() => {
1056
+                this.peerconnection.processLocalSdpForTransceiverInfo(localTracks);
1055 1057
 
1056
-        logger.debug(`${this} Queued invite task`);
1057
-        this.modificationQueue.push(
1058
-            workFunction,
1059
-            error => {
1060
-                if (error) {
1061
-                    logger.error(`${this} invite error`, error);
1062
-                } else {
1063
-                    logger.debug(`${this} invite executed - OK`);
1064
-                }
1058
+                // NOTE that the offer is obtained from the localDescription getter as it needs to go though
1059
+                // the transformation chain.
1060
+                this.sendSessionInitiate(this.peerconnection.localDescription.sdp);
1061
+            })
1062
+            .then(() => {
1063
+                logger.debug(`${this} invite executed - OK`);
1064
+            })
1065
+            .catch(error => {
1066
+                logger.error(`${this} invite error`, error);
1065 1067
             });
1066 1068
     }
1067 1069
 
@@ -1110,37 +1112,34 @@ export default class JingleSessionPC extends JingleSession {
1110 1112
         if (!this.isInitiator) {
1111 1113
             throw new Error('Trying to set an answer on the responder session');
1112 1114
         }
1115
+        logger.debug(`${this} Executing setAnswer task`);
1113 1116
 
1114
-        const workFunction = finishedCallback => {
1115
-            const newRemoteSdp = this._processNewJingleOfferIq(jingleAnswer);
1116
-            const oldLocalSdp = new SDP(this.peerconnection.localDescription.sdp);
1117
-            const remoteDescription = new RTCSessionDescription({
1118
-                type: 'answer',
1119
-                sdp: newRemoteSdp.raw
1120
-            });
1117
+        const newRemoteSdp = this._processNewJingleOfferIq(jingleAnswer);
1118
+        const oldLocalSdp = new SDP(this.peerconnection.localDescription.sdp);
1119
+        const remoteDescription = new RTCSessionDescription({
1120
+            type: 'answer',
1121
+            sdp: newRemoteSdp.raw
1122
+        });
1121 1123
 
1122
-            this.peerconnection.setRemoteDescription(remoteDescription)
1123
-                .then(() => {
1124
-                    if (this.state === JingleSessionState.PENDING) {
1125
-                        this.state = JingleSessionState.ACTIVE;
1126
-                        const newLocalSdp = new SDP(this.peerconnection.localDescription.sdp);
1124
+        this.peerconnection.setRemoteDescription(remoteDescription)
1125
+            .then(() => {
1126
+                if (this.state === JingleSessionState.PENDING) {
1127
+                    this.state = JingleSessionState.ACTIVE;
1127 1128
 
1128
-                        this.sendContentModify();
1129
-                        this.notifyMySSRCUpdate(oldLocalSdp, newLocalSdp);
1130
-                    }
1131
-                })
1132
-                .then(() => finishedCallback(), error => finishedCallback(error));
1133
-        };
1129
+                    // Start processing tasks on the modification queue.
1130
+                    logger.debug('Resuming the modification queue after session is established!');
1131
+                    this.modificationQueue.resume();
1132
+                    const newLocalSdp = new SDP(this.peerconnection.localDescription.sdp);
1134 1133
 
1135
-        logger.debug(`${this} Queued setAnswer task`);
1136
-        this.modificationQueue.push(
1137
-            workFunction,
1138
-            error => {
1139
-                if (error) {
1140
-                    logger.error(`${this} setAnswer task failed: ${error}`);
1141
-                } else {
1142
-                    logger.debug(`${this} setAnswer task done`);
1134
+                    this.sendContentModify();
1135
+                    this.notifyMySSRCUpdate(oldLocalSdp, newLocalSdp);
1143 1136
                 }
1137
+            })
1138
+            .then(() => {
1139
+                logger.debug(`${this} setAnswer task done`);
1140
+            })
1141
+            .catch(error => {
1142
+                logger.error(`${this} setAnswer task failed: ${error}`);
1144 1143
             });
1145 1144
     }
1146 1145
 
@@ -1157,78 +1156,71 @@ export default class JingleSessionPC extends JingleSession {
1157 1156
      * offer/answer).
1158 1157
      */
1159 1158
     setOfferAnswerCycle(jingleOfferAnswerIq, success, failure, localTracks = []) {
1160
-        const workFunction = finishedCallback => {
1161
-            const addTracks = [];
1162
-            const audioTracks = localTracks.filter(track => track.getType() === MediaType.AUDIO);
1163
-            const videoTracks = localTracks.filter(track => track.getType() === MediaType.VIDEO);
1164
-            let tracks = localTracks;
1165
-
1166
-            // Add only 1 video track at a time. Adding 2 or more video tracks to the peerconnection at the same time
1167
-            // makes the browser go into a renegotiation loop by firing 'negotiationneeded' event after every
1168
-            // renegotiation.
1169
-            if (FeatureFlags.isMultiStreamSendSupportEnabled() && videoTracks.length > 1) {
1170
-                tracks = [ ...audioTracks, videoTracks[0] ];
1171
-            }
1172
-            for (const track of tracks) {
1173
-                addTracks.push(this.peerconnection.addTrack(track, this.isInitiator));
1174
-            }
1175
-            const newRemoteSdp = this._processNewJingleOfferIq(jingleOfferAnswerIq);
1176
-            const oldLocalSdp = this.peerconnection.localDescription.sdp;
1159
+        logger.debug(`${this} Executing setOfferAnswerCycle task`);
1160
+
1161
+        const addTracks = [];
1162
+        const audioTracks = localTracks.filter(track => track.getType() === MediaType.AUDIO);
1163
+        const videoTracks = localTracks.filter(track => track.getType() === MediaType.VIDEO);
1164
+        let tracks = localTracks;
1165
+
1166
+        // Add only 1 video track at a time. Adding 2 or more video tracks to the peerconnection at the same time
1167
+        // makes the browser go into a renegotiation loop by firing 'negotiationneeded' event after every
1168
+        // renegotiation.
1169
+        if (FeatureFlags.isMultiStreamSendSupportEnabled() && videoTracks.length > 1) {
1170
+            tracks = [ ...audioTracks, videoTracks[0] ];
1171
+        }
1172
+        for (const track of tracks) {
1173
+            addTracks.push(this.peerconnection.addTrack(track, this.isInitiator));
1174
+        }
1175
+        const newRemoteSdp = this._processNewJingleOfferIq(jingleOfferAnswerIq);
1176
+        const oldLocalSdp = this.peerconnection.localDescription.sdp;
1177 1177
 
1178
-            const bridgeSession = $(jingleOfferAnswerIq)
1179
-                .find('>bridge-session[xmlns="http://jitsi.org/protocol/focus"]');
1180
-            const bridgeSessionId = bridgeSession.attr('id');
1178
+        const bridgeSession = $(jingleOfferAnswerIq).find('>bridge-session[xmlns="http://jitsi.org/protocol/focus"]');
1179
+        const bridgeSessionId = bridgeSession.attr('id');
1181 1180
 
1182
-            if (bridgeSessionId !== this._bridgeSessionId) {
1183
-                this._bridgeSessionId = bridgeSessionId;
1184
-            }
1185
-            const remoteDescription = new RTCSessionDescription({
1186
-                type: 'offer',
1187
-                sdp: newRemoteSdp.raw
1188
-            });
1181
+        if (bridgeSessionId !== this._bridgeSessionId) {
1182
+            this._bridgeSessionId = bridgeSessionId;
1183
+        }
1184
+        const remoteDescription = new RTCSessionDescription({
1185
+            type: 'offer',
1186
+            sdp: newRemoteSdp.raw
1187
+        });
1189 1188
 
1190
-            Promise.all(addTracks)
1191
-                .then(() => this._responderRenegotiate(remoteDescription))
1192
-                .then(() => {
1193
-                    this.peerconnection.processLocalSdpForTransceiverInfo(tracks);
1194
-                    if (this.state === JingleSessionState.PENDING) {
1195
-                        this.state = JingleSessionState.ACTIVE;
1196
-
1197
-                        // #1 Sync up video transfer active/inactive only after the initial O/A cycle. We want to
1198
-                        // adjust the video media direction only in the local SDP and the Jingle contents direction
1199
-                        // included in the initial offer/answer is mapped to the remote SDP. Jingle 'content-modify'
1200
-                        // IQ is processed in a way that it will only modify local SDP when remote peer is no longer
1201
-                        // interested in receiving video content. Changing media direction in the remote SDP will mess
1202
-                        // up our SDP translation chain (simulcast, video mute, RTX etc.)
1203
-                        // #2 Sends the max frame height if it was set, before the session-initiate/accept
1204
-                        if (this.isP2P
1205
-                            && (!this._localVideoActive || this._sourceReceiverConstraints)) {
1206
-                            this.sendContentModify();
1207
-                        }
1189
+        Promise.all(addTracks)
1190
+            .then(() => this._responderRenegotiate(remoteDescription))
1191
+            .then(() => {
1192
+                this.peerconnection.processLocalSdpForTransceiverInfo(tracks);
1193
+                if (this.state === JingleSessionState.PENDING) {
1194
+                    this.state = JingleSessionState.ACTIVE;
1195
+
1196
+                    // #1 Sync up video transfer active/inactive only after the initial O/A cycle. We want to
1197
+                    // adjust the video media direction only in the local SDP and the Jingle contents direction
1198
+                    // included in the initial offer/answer is mapped to the remote SDP. Jingle 'content-modify'
1199
+                    // IQ is processed in a way that it will only modify local SDP when remote peer is no longer
1200
+                    // interested in receiving video content. Changing media direction in the remote SDP will mess
1201
+                    // up our SDP translation chain (simulcast, video mute, RTX etc.)
1202
+                    // #2 Sends the max frame height if it was set, before the session-initiate/accept
1203
+                    if (this.isP2P
1204
+                        && (!this._localVideoActive || this._sourceReceiverConstraints)) {
1205
+                        this.sendContentModify();
1208 1206
                     }
1207
+                }
1209 1208
 
1210
-                    // Old local SDP will be available when we're setting answer for the first time, but not when offer
1211
-                    // and it's fine since we're generating an answer now it will contain all our SSRCs.
1212
-                    if (oldLocalSdp) {
1213
-                        const newLocalSdp = new SDP(this.peerconnection.localDescription.sdp);
1209
+                // Old local SDP will be available when we're setting answer for the first time, but not when offer
1210
+                // and it's fine since we're generating an answer now it will contain all our SSRCs.
1211
+                if (oldLocalSdp) {
1212
+                    const newLocalSdp = new SDP(this.peerconnection.localDescription.sdp);
1214 1213
 
1215
-                        this.notifyMySSRCUpdate(new SDP(oldLocalSdp), newLocalSdp);
1216
-                    }
1217
-                })
1218
-                .then(() => finishedCallback(), error => finishedCallback(error));
1219
-        };
1220
-
1221
-        logger.debug(`${this} Queued setOfferAnswerCycle task`);
1222
-        this.modificationQueue.push(
1223
-            workFunction,
1224
-            error => {
1225
-                if (error) {
1226
-                    logger.error(`${this} setOfferAnswerCycle task failed: ${error}`);
1227
-                    failure(error);
1228
-                } else {
1229
-                    logger.debug(`${this} setOfferAnswerCycle task done`);
1230
-                    success();
1214
+                    this.notifyMySSRCUpdate(new SDP(oldLocalSdp), newLocalSdp);
1231 1215
                 }
1216
+            })
1217
+            .then(() => {
1218
+                logger.debug(`${this} setOfferAnswerCycle task done`);
1219
+                success();
1220
+            })
1221
+            .catch(error => {
1222
+                logger.error(`${this} setOfferAnswerCycle task failed: ${error}`);
1223
+                failure(error);
1232 1224
             });
1233 1225
     }
1234 1226
 
@@ -1318,31 +1310,49 @@ export default class JingleSessionPC extends JingleSession {
1318 1310
         newFingerprint.attr('hash', 'sha-1');
1319 1311
         newFingerprint.text('00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00');
1320 1312
 
1321
-        // First set an offer with a rejected 'data' section
1322
-        this.setOfferAnswerCycle(
1323
-            jingleOfferElem,
1324
-            () => {
1325
-                // Now set the original offer(with the 'data' section)
1326
-                this.setOfferAnswerCycle(
1327
-                    originalOffer,
1328
-                    () => {
1329
-                        const localSDP = new SDP(this.peerconnection.localDescription.sdp);
1313
+        const workFunction = finishedCallback => {
1314
+            // First set an offer with a rejected 'data' section
1315
+            this.setOfferAnswerCycle(
1316
+                jingleOfferElem,
1317
+                () => {
1318
+                    // Now set the original offer(with the 'data' section)
1319
+                    this.setOfferAnswerCycle(
1320
+                        originalOffer,
1321
+                        () => {
1322
+                            const localSDP = new SDP(this.peerconnection.localDescription.sdp);
1323
+
1324
+                            if (typeof this.options.channelLastN === 'number' && this.options.channelLastN >= 0) {
1325
+                                localSDP.initialLastN = this.options.channelLastN;
1326
+                            }
1330 1327
 
1331
-                        if (typeof this.options.channelLastN === 'number' && this.options.channelLastN >= 0) {
1332
-                            localSDP.initialLastN = this.options.channelLastN;
1333
-                        }
1328
+                            this.sendTransportAccept(localSDP, success, failure);
1334 1329
 
1335
-                        this.sendTransportAccept(localSDP, success, failure);
1330
+                            this.room.eventEmitter.emit(
1331
+                                XMPPEvents.ICE_RESTART_SUCCESS,
1332
+                                this,
1333
+                                originalOffer);
1336 1334
 
1337
-                        this.room.eventEmitter.emit(
1338
-                            XMPPEvents.ICE_RESTART_SUCCESS,
1339
-                            this,
1340
-                            originalOffer);
1341
-                    },
1342
-                    failure);
1343
-            },
1344
-            failure
1345
-        );
1335
+                            finishedCallback();
1336
+                        }, error => finishedCallback(error)
1337
+                    );
1338
+                }, error => finishedCallback(error)
1339
+            );
1340
+        };
1341
+
1342
+        logger.debug(`${this} Queued ICE restart task`);
1343
+
1344
+        // Queue and execute
1345
+        this.modificationQueue.push(
1346
+            workFunction,
1347
+            error => {
1348
+                if (error) {
1349
+                    logger.error(`${this} ICE restart task failed: ${error}`);
1350
+                    failure(error);
1351
+                } else {
1352
+                    logger.debug(`${this} ICE restart task done`);
1353
+                    success();
1354
+                }
1355
+            });
1346 1356
     }
1347 1357
 
1348 1358
     /**

Ładowanie…
Anuluj
Zapisz