阅读视图

发现新文章,点击刷新页面。

实现最大异步并发执行队列

题目说明

日常开发中遇到异步并发执行时我们通常会使用 Promise.All([]) ,但是如果这个并发数量很大(如超过100)那么我们考虑到服务器的并发压力就需要设置一个最大并发数。

这也是一个初/中级的热门面试题,本文就详细介绍如何用各种姿势来实现 最大异步并发执行队列

/**
 * 最大异步并发执行队列
 * tasks 任务列表
 * maxConcurrency 最大并发数
 * @returns {Promise<void>}
 */
async function maxAsyncConcurrency(
  tasks: Array<() => Promise<void>>,
  maxConcurrency: number,
) {
  // 实现这个函数
  
}

测试代码


const wait = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));

const waitLog = async (ms, text) => {
  await wait(ms);
  console.log(text);
};
const main = async () => {
  await maxAsyncConcurrencyRecursion(
    [
      () => waitLog(1000, 1),
      () => waitLog(1000, 2),
      () => waitLog(1000, 3),
      () => waitLog(1000, 4),
      () => waitLog(1000, 5),
      () => waitLog(1000, 6),
      () => waitLog(1000, 7),
      () => waitLog(1000, 8),
      () => waitLog(1000, 9),
    ],
    3,
  );
}
main();

思路1:递归实现(最好理解)

通过递归方式实现,把每个并发当成一个运行管道,每个管道实现为一个运行任务的异步函数,函数中完成一个任务就从队列里取下一个任务继续执行,直到清空队列即可。

async function maxAsyncConcurrencyRecursion(tasks, maxConcurrency) {
  const queue = [...tasks];
  // 运行管道
  const pipeRunFn = async (fn) => {
    await fn();
    if (queue.length > 0) {
      const nextFn = queue.shift();
      await pipeRunFn(nextFn);
    }
  };
  // 最大运行管道
  const pipes = queue.splice(0, maxConcurrency);
  await Promise.all(pipes.map(pipeRunFn));
}

思路2:非递归实现

将思路1中的管道异步函数递归切换成 while 循环条件来实现。

async function maxAsyncConcurrency(fns, max) {
  const queue = [...fns];
  let active = 0;
  while(queue.length) {
    if (active >= max) {
      await wait(100); // 如果并发已经达到最大,就等会再进入while循环继续轮询
      continue;
    }
    const fn = queue.shift();
    active++;
    fn().finally(() => {
      active--;
    });
  }
}

更加贴合实践的用法,面向对象象实现流式新增任务

题目

class RequestQueue {
  private maxConcurrent: number; // 最大并发数量
  private queue: Array<() => Promise<void>> = []; // 存储任务队列
  
  constructor(maxConcurrent: number) {
    this.maxConcurrent = maxConcurrent;
  }

  /** 添加任务 */
  public addTask(task: () => Promise<void>) {}
}

测试代码


const main = async () => {
  const reqQueue = new RequestQueue(3);
  reqQueue.addTask(() => waitLog(1000, 1))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 2))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 3))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 4))
  await wait(2000);
  reqQueue.addTask(() => waitLog(1000, 5))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 6))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 7))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 8))
  await wait(100);
  reqQueue.addTask(() => waitLog(1000, 9))
}
main();

递归实现

流式增加任务,而不是一开始就拿到全量的任务列表。新增任务时自动触发并发执行

class RequestQueueRecursion {
  private maxConcurrent: number; // 最大并发数量
  private queue: Array<() => Promise<void>> = []; // 存储任务队列
  private active: number = 0; // 当前正在运行的任务计数

  constructor(maxConcurrent: number) {
    this.maxConcurrent = maxConcurrent;
  }

  /** 添加一个任务到队列中 */
  public addTask(task: () => Promise<void>) {
    this.queue.push(task);
    this.execute();
  }

  private async execute() {
    while(this.active < this.maxConcurrent && this.query.length > 0) {
      this.active ++;
      const fn = this.query.shift();
      fn().finally(() => {
        this.active--;
        this.execute();
      });
    }
  }
}

非递归实现

class RequestQueue {
  private maxConcurrent: number; // 最大并发数量
  private queue: Array<() => Promise<any>> = []; // 存储任务队列
  private active: number = 0; // 当前正在运行的任务计数

  constructor(maxConcurrent: number) {
    this.maxConcurrent = maxConcurrent;
  }

  /** 添加一个任务到队列中 */
  public addTask(task: () => Promise<any>) {
    this.queue.push(task);
    this.execute();
  }

  /** 运行管道 */
  private async execute() {
    const queue = this.queue;
    while(queue.length > 0) {
      if (this.active >= this.maxConcurrent) {
        await wait(100);
        continue;
      }
      this.active ++;
      const fn = queue.shift();
      fn().finally(() => {
        this.active--;
      });
    }
  }
}
❌