Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

AsyncQueue.js 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import { getLogger } from '@jitsi/logger';
  2. import { queue } from 'async-es';
  3. const logger = getLogger(__filename);
  4. /**
  5. * Error to be passed to a callback of a queued task when the queue is cleared.
  6. */
  7. export class ClearedQueueError extends Error {
  8. /**
  9. * Creates new instance.
  10. */
  11. constructor(message) {
  12. super(message);
  13. this.name = 'ClearedQueueError';
  14. }
  15. }
  16. /**
  17. * A queue for async task execution.
  18. */
  19. export default class AsyncQueue {
  20. /**
  21. * Creates new instance.
  22. */
  23. constructor() {
  24. this._queue = queue(this._processQueueTasks.bind(this), 1);
  25. this._stopped = false;
  26. this._taskCallbacks = new Map();
  27. }
  28. /**
  29. * Removes any pending tasks from the queue.
  30. */
  31. clear() {
  32. for (const finishedCallback of this._taskCallbacks.values()) {
  33. try {
  34. finishedCallback?.(new ClearedQueueError('The queue has been cleared'));
  35. } catch (error) {
  36. logger.error('Error in callback while clearing the queue:', error);
  37. }
  38. }
  39. this._queue.kill();
  40. }
  41. /**
  42. * Internal task processing implementation which makes things work.
  43. */
  44. _processQueueTasks(task, finishedCallback) {
  45. try {
  46. task(finishedCallback);
  47. } catch (error) {
  48. logger.error(`Task failed: ${error?.stack}`);
  49. finishedCallback(error);
  50. } finally {
  51. this._taskCallbacks.delete(task);
  52. }
  53. }
  54. /**
  55. * Pauses the execution of the tasks on the queue.
  56. */
  57. pause() {
  58. this._queue.pause();
  59. }
  60. /**
  61. * The 'task' function will be given a callback it MUST call with either:
  62. * 1) No arguments if it was successful or
  63. * 2) An error argument if there was an error
  64. * If the task wants to process the success or failure of the task, it
  65. * should pass the {@code callback} to the push function, e.g.:
  66. * queue.push(task, (err) => {
  67. * if (err) {
  68. * // error handling
  69. * } else {
  70. * // success handling
  71. * }
  72. * });
  73. *
  74. * @param {function} task - The task to be executed. See the description above.
  75. * @param {function} [callback] - Optional callback to be called after the task has been executed.
  76. */
  77. push(task, callback) {
  78. if (this._stopped) {
  79. callback && callback(new Error('The queue has been stopped'));
  80. return;
  81. }
  82. this._taskCallbacks.set(task, callback);
  83. this._queue.push(task, callback);
  84. }
  85. /**
  86. * Resumes the execution of the tasks on the queue.
  87. */
  88. resume() {
  89. this._queue.resume();
  90. }
  91. /**
  92. * Shutdowns the queue. All already queued tasks will execute, but no future tasks can be added. If a task is added
  93. * after the queue has been shutdown then the callback will be called with an error.
  94. */
  95. shutdown() {
  96. this._stopped = true;
  97. }
  98. }