You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

e2eping.ts 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. import { getLogger } from '@jitsi/logger';
  2. import JitsiConference from '../../JitsiConference';
  3. import * as JitsiConferenceEvents from '../../JitsiConferenceEvents';
  4. import JitsiParticipant from '../../JitsiParticipant';
  5. import * as JitsiE2EPingEvents from '../../service/e2eping/E2ePingEvents';
  6. const logger = getLogger(__filename);
  7. /**
  8. * The 'type' of a message which designates an e2e ping request.
  9. * @type {string}
  10. */
  11. const E2E_PING_REQUEST: string = 'e2e-ping-request';
  12. /**
  13. * The 'type' of a message which designates an e2e ping response.
  14. * @type {string}
  15. */
  16. const E2E_PING_RESPONSE: string = 'e2e-ping-response';
  17. /**
  18. * The number of requests to wait for before emitting an RTT value.
  19. */
  20. const DEFAULT_NUM_REQUESTS: number = 5;
  21. /**
  22. * The maximum number of messages per second to aim for. This is for the entire
  23. * conference, with the assumption that all endpoints join at once.
  24. */
  25. const DEFAULT_MAX_MESSAGES_PER_SECOND: number = 250;
  26. /**
  27. * The conference size beyond which e2e pings will be disabled.
  28. */
  29. const DEFAULT_MAX_CONFERENCE_SIZE: number = 200;
  30. export interface IPingMessage {
  31. id: number;
  32. type: string;
  33. }
  34. export interface IE2ePingOptions {
  35. e2eping?: {
  36. maxConferenceSize?: number;
  37. maxMessagesPerSecond?: number;
  38. numRequests?: number;
  39. };
  40. }
  41. export interface IRequest {
  42. id: number;
  43. rtt?: number;
  44. timeSent: number;
  45. }
  46. /**
  47. * Saves e2e ping related state for a single JitsiParticipant.
  48. */
  49. class ParticipantWrapper {
  50. participant: JitsiParticipant;
  51. e2eping: E2ePing;
  52. id: string;
  53. requests: { [key: number]: IRequest; };
  54. lastRequestId: number;
  55. timeout: number | null;
  56. /**
  57. * Creates a ParticipantWrapper
  58. * @param {JitsiParticipant} participant - The remote participant that this
  59. * object wraps.
  60. * @param {E2ePing} e2eping
  61. */
  62. constructor(participant: JitsiParticipant, e2eping: E2ePing) {
  63. // The JitsiParticipant
  64. this.participant = participant;
  65. // The E2ePing
  66. this.e2eping = e2eping;
  67. // Caches the ID
  68. this.id = participant.getId();
  69. // Recently sent requests
  70. this.requests = {};
  71. // The ID of the last sent request. We just increment it for each new
  72. // request. Start at 1 so we can consider only thruthy values valid.
  73. this.lastRequestId = 1;
  74. this.sendRequest = this.sendRequest.bind(this);
  75. this.handleResponse = this.handleResponse.bind(this);
  76. this.maybeLogRttAndStop = this.maybeLogRttAndStop.bind(this);
  77. this.scheduleNext = this.scheduleNext.bind(this);
  78. this.stop = this.stop.bind(this);
  79. this.getDelay = this.getDelay.bind(this);
  80. this.timeout = this.scheduleNext();
  81. }
  82. /**
  83. * Schedule the next ping to be sent.
  84. */
  85. scheduleNext(): number {
  86. return window.setTimeout(this.sendRequest, this.getDelay());
  87. }
  88. /**
  89. * Stop pinging this participant, canceling a scheduled ping, if any.
  90. */
  91. stop(): void {
  92. if (this.timeout) {
  93. window.clearTimeout(this.timeout);
  94. }
  95. this.e2eping.removeParticipant(this.id);
  96. }
  97. /**
  98. * Get the delay until the next ping in milliseconds.
  99. */
  100. getDelay(): number {
  101. const conferenceSize = this.e2eping.conference.getParticipants().length;
  102. const endpointPairs = conferenceSize * (conferenceSize - 1) / 2;
  103. const totalMessages = endpointPairs * this.e2eping.numRequests;
  104. const totalSeconds = totalMessages / this.e2eping.maxMessagesPerSecond;
  105. // Randomize between .5 and 1.5
  106. const r = 1.5 - Math.random();
  107. const delayBetweenMessages = r * Math.max(1000 * (totalSeconds / this.e2eping.numRequests), 1000);
  108. return delayBetweenMessages;
  109. }
  110. /**
  111. * Sends the next ping request.
  112. * @type {*}
  113. */
  114. sendRequest(): void {
  115. const requestId = this.lastRequestId++;
  116. const requestMessage = {
  117. type: E2E_PING_REQUEST,
  118. id: requestId
  119. };
  120. this.e2eping.sendMessage(requestMessage, this.id);
  121. this.requests[requestId] = {
  122. id: requestId,
  123. timeSent: window.performance.now()
  124. };
  125. }
  126. /**
  127. * Handles a response from this participant.
  128. * @type {*}
  129. */
  130. handleResponse(response: any): void {
  131. const request = this.requests[response.id];
  132. if (request) {
  133. request.rtt = window.performance.now() - request.timeSent;
  134. }
  135. this.maybeLogRttAndStop();
  136. }
  137. /**
  138. * Check if we've received the pre-configured number of responses, and if
  139. * so log the measured RTT and stop sending requests.
  140. * @type {*}
  141. */
  142. maybeLogRttAndStop(): void {
  143. // The RTT we'll report is the minimum RTT measured
  144. let rtt = Infinity;
  145. let request: any, requestId: any;
  146. let numRequestsWithResponses = 0;
  147. let totalNumRequests = 0;
  148. for (requestId in this.requests) {
  149. if (this.requests.hasOwnProperty(requestId)) {
  150. request = this.requests[requestId];
  151. totalNumRequests++;
  152. if (request.rtt) {
  153. numRequestsWithResponses++;
  154. rtt = Math.min(rtt, request.rtt);
  155. }
  156. }
  157. }
  158. if (numRequestsWithResponses >= this.e2eping.numRequests) {
  159. logger.info(`Measured RTT=${rtt} ms to ${this.id} (in ${this.participant.getProperty('region')})`);
  160. this.stop();
  161. this.e2eping.conference.eventEmitter.emit(
  162. JitsiE2EPingEvents.E2E_RTT_CHANGED, this.participant, rtt);
  163. return;
  164. } else if (totalNumRequests > 2 * this.e2eping.numRequests) {
  165. logger.info(`Stopping e2eping for ${this.id} because we sent ${totalNumRequests} with only `
  166. + `${numRequestsWithResponses} responses.`);
  167. this.stop();
  168. return;
  169. }
  170. this.timeout = this.scheduleNext();
  171. }
  172. }
  173. /**
  174. * Implements end-to-end ping (from one conference participant to another) via
  175. * the jitsi-videobridge channel (either WebRTC data channel or web socket).
  176. *
  177. * TODO: use a broadcast message instead of individual pings to each remote
  178. * participant.
  179. *
  180. * This class:
  181. * 1. Sends periodic ping requests to all other participants in the
  182. * conference.
  183. * 2. Responds to ping requests from other participants.
  184. * 3. Fires events with the end-to-end RTT to each participant whenever a
  185. * response is received.
  186. * 4. Fires analytics events with the end-to-end RTT periodically.
  187. */
  188. export default class E2ePing {
  189. conference: JitsiConference;
  190. eventEmitter: any;
  191. sendMessage: (message: IPingMessage, participantId: string) => void;
  192. participants: { [key: string]: ParticipantWrapper; };
  193. numRequests: number;
  194. maxConferenceSize: number;
  195. maxMessagesPerSecond: number;
  196. /**
  197. * @param {JitsiConference} conference - The conference.
  198. * @param {Function} sendMessage - The function to use to send a message.
  199. * @param {Object} options
  200. */
  201. constructor(
  202. conference: JitsiConference,
  203. options: IE2ePingOptions,
  204. sendMessage: (message: IPingMessage, participantId: string) => void) {
  205. this.conference = conference;
  206. this.eventEmitter = conference.eventEmitter;
  207. this.sendMessage = sendMessage;
  208. // Maps a participant ID to its ParticipantWrapper
  209. this.participants = {};
  210. this.numRequests = DEFAULT_NUM_REQUESTS;
  211. this.maxConferenceSize = DEFAULT_MAX_CONFERENCE_SIZE;
  212. this.maxMessagesPerSecond = DEFAULT_MAX_MESSAGES_PER_SECOND;
  213. if (options?.e2eping) {
  214. if (typeof options.e2eping.numRequests === 'number') {
  215. this.numRequests = options.e2eping.numRequests;
  216. }
  217. if (typeof options.e2eping.maxConferenceSize === 'number') {
  218. this.maxConferenceSize = options.e2eping.maxConferenceSize;
  219. }
  220. if (typeof options.e2eping.maxMessagesPerSecond === 'number') {
  221. this.maxMessagesPerSecond = options.e2eping.maxMessagesPerSecond;
  222. }
  223. }
  224. logger.info(
  225. `Initializing e2e ping with numRequests=${this.numRequests}, maxConferenceSize=${this.maxConferenceSize}, `
  226. + `maxMessagesPerSecond=${this.maxMessagesPerSecond}.`);
  227. this.participantJoined = this.participantJoined.bind(this);
  228. this.participantLeft = this.participantLeft.bind(this);
  229. conference.on(JitsiConferenceEvents.USER_LEFT, this.participantLeft);
  230. this.messageReceived = this.messageReceived.bind(this);
  231. conference.on(JitsiConferenceEvents.ENDPOINT_MESSAGE_RECEIVED, this.messageReceived);
  232. this.conferenceJoined = this.conferenceJoined.bind(this);
  233. conference.on(JitsiConferenceEvents.CONFERENCE_JOINED, this.conferenceJoined);
  234. }
  235. /**
  236. * Delay processing USER_JOINED events until the MUC is fully joined,
  237. * otherwise the apparent conference size will be wrong.
  238. */
  239. conferenceJoined(): void {
  240. this.conference.getParticipants().forEach(p => this.participantJoined(p.getId(), p));
  241. this.conference.on(JitsiConferenceEvents.USER_JOINED, this.participantJoined);
  242. }
  243. /**
  244. * Handles a message that was received.
  245. *
  246. * @param participant - The message sender.
  247. * @param payload - The payload of the message.
  248. */
  249. messageReceived(participant: JitsiParticipant, payload: IPingMessage): void {
  250. // Listen to E2E PING requests and responses from other participants
  251. // in the conference.
  252. if (payload.type === E2E_PING_REQUEST) {
  253. this.handleRequest(participant.getId(), payload);
  254. } else if (payload.type === E2E_PING_RESPONSE) {
  255. this.handleResponse(participant.getId(), payload);
  256. }
  257. }
  258. /**
  259. * Handles a participant joining the conference. Starts to send ping
  260. * requests to the participant.
  261. *
  262. * @param {String} id - The ID of the participant.
  263. * @param {JitsiParticipant} participant - The participant that joined.
  264. */
  265. participantJoined(id: string, participant: JitsiParticipant): void {
  266. if (this.participants[id]) {
  267. logger.info(`Participant wrapper already exists for ${id}. Clearing.`);
  268. this.participants[id].stop();
  269. }
  270. if (this.conference.getParticipants().length > this.maxConferenceSize) {
  271. return;
  272. }
  273. // We don't need to send e2eping in both directions for a pair of
  274. // endpoints. Force only one direction with just string comparison of
  275. // the IDs.
  276. if (this.conference.myUserId() > id) {
  277. logger.info(`Starting e2eping for participant ${id}`);
  278. this.participants[id] = new ParticipantWrapper(participant, this);
  279. }
  280. }
  281. /**
  282. * Remove a participant without calling "stop".
  283. */
  284. removeParticipant(id: string): void {
  285. if (this.participants[id]) {
  286. delete this.participants[id];
  287. }
  288. }
  289. /**
  290. * Handles a participant leaving the conference. Stops sending requests.
  291. *
  292. * @param {String} id - The ID of the participant.
  293. */
  294. participantLeft(id: string): void {
  295. if (this.participants[id]) {
  296. this.participants[id].stop();
  297. delete this.participants[id];
  298. }
  299. }
  300. /**
  301. * Handles a ping request coming from another participant.
  302. *
  303. * @param {string} participantId - The ID of the participant who sent the
  304. * request.
  305. * @param {Object} request - The request.
  306. */
  307. handleRequest(participantId: string, request: { id: number; }): void {
  308. // If it's a valid request, just send a response.
  309. if (request?.id) {
  310. const response = {
  311. type: E2E_PING_RESPONSE,
  312. id: request.id
  313. };
  314. this.sendMessage(response, participantId);
  315. } else {
  316. logger.info(`Received an invalid e2e ping request from ${participantId}.`);
  317. }
  318. }
  319. /**
  320. * Handles a ping response coming from another participant
  321. * @param {string} participantId - The ID of the participant who sent the
  322. * response.
  323. * @param {Object} response - The response.
  324. */
  325. handleResponse(participantId: string, response: { id: number; }): void {
  326. const participantWrapper = this.participants[participantId];
  327. if (participantWrapper) {
  328. participantWrapper.handleResponse(response);
  329. }
  330. }
  331. /**
  332. * Stops this E2ePing (i.e. stop sending requests).
  333. */
  334. stop(): void {
  335. logger.info('Stopping e2eping');
  336. this.conference.off(JitsiConferenceEvents.USER_JOINED, this.participantJoined);
  337. this.conference.off(JitsiConferenceEvents.USER_LEFT, this.participantLeft);
  338. this.conference.off(JitsiConferenceEvents.ENDPOINT_MESSAGE_RECEIVED, this.messageReceived);
  339. for (const id in this.participants) {
  340. if (this.participants.hasOwnProperty(id)) {
  341. this.participants[id].stop();
  342. }
  343. }
  344. this.participants = {};
  345. }
  346. }