You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

AsyncQueue.ts 3.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import { getLogger } from '@jitsi/logger';
  2. import { AsyncQueue as AsyncQueueType, queue } from 'async-es';
  3. const logger = getLogger('modules/util/AsyncQueue');
  4. /**
  5. * Error to be passed to a callback of a queued task when the queue is cleared.
  6. */
  7. export class ClearedQueueError extends Error {
  8. /**
  9. * Creates new instance.
  10. */
  11. constructor(message: string) {
  12. super(message);
  13. this.name = 'ClearedQueueError';
  14. }
  15. }
  16. export type Task = (callback: (err?: Error) => void) => void;
  17. export type TaskCallback = (err?: Error) => void;
  18. /**
  19. * A queue for async task execution.
  20. */
  21. export default class AsyncQueue {
  22. private _queue: AsyncQueueType<Task>;
  23. private _stopped: boolean;
  24. private _taskCallbacks: Map<Task, TaskCallback | undefined>;
  25. /**
  26. * Creates new instance.
  27. */
  28. constructor() {
  29. this._queue = queue(this._processQueueTasks.bind(this), 1);
  30. this._stopped = false;
  31. this._taskCallbacks = new Map();
  32. }
  33. /**
  34. * Internal task processing implementation which makes things work.
  35. */
  36. private _processQueueTasks(task: Task, finishedCallback: TaskCallback): void {
  37. try {
  38. task(finishedCallback);
  39. } catch (error) {
  40. logger.error(`Task failed: ${error?.stack}`);
  41. finishedCallback(error);
  42. } finally {
  43. this._taskCallbacks.delete(task);
  44. }
  45. }
  46. /**
  47. * Removes any pending tasks from the queue.
  48. */
  49. clear(): void {
  50. for (const finishedCallback of this._taskCallbacks.values()) {
  51. try {
  52. finishedCallback?.(new ClearedQueueError('The queue has been cleared'));
  53. } catch (error) {
  54. logger.error('Error in callback while clearing the queue:', error);
  55. }
  56. }
  57. this._queue.kill();
  58. }
  59. /**
  60. * Pauses the execution of the tasks on the queue.
  61. */
  62. pause(): void {
  63. this._queue.pause();
  64. }
  65. /**
  66. * The 'task' function will be given a callback it MUST call with either:
  67. * 1) No arguments if it was successful or
  68. * 2) An error argument if there was an error
  69. * If the task wants to process the success or failure of the task, it
  70. * should pass the {@code callback} to the push function, e.g.:
  71. * queue.push(task, (err) => {
  72. * if (err) {
  73. * // error handling
  74. * } else {
  75. * // success handling
  76. * }
  77. * });
  78. *
  79. * @param {Task} task - The task to be executed. See the description above.
  80. * @param {TaskCallback} [callback] - Optional callback to be called after the task has been executed.
  81. */
  82. push(task: Task, callback?: TaskCallback): void {
  83. if (this._stopped) {
  84. callback && callback(new Error('The queue has been stopped'));
  85. return;
  86. }
  87. this._taskCallbacks.set(task, callback);
  88. this._queue.push(task, callback);
  89. }
  90. /**
  91. * Resumes the execution of the tasks on the queue.
  92. */
  93. resume(): void {
  94. this._queue.resume();
  95. }
  96. /**
  97. * Shutdowns the queue. All already queued tasks will execute, but no future tasks can be added. If a task is added
  98. * after the queue has been shutdown then the callback will be called with an error.
  99. */
  100. shutdown(): void {
  101. this._stopped = true;
  102. }
  103. }