实现最大异步并发执行队列
题目说明
日常开发中遇到异步并发执行时我们通常会使用 Promise.All([])
,但是如果这个并发数量很大(如超过100)那么我们考虑到服务器的并发压力就需要设置一个最大并发数。
这也是一个初/中级的热门面试题,本文就详细介绍如何用各种姿势来实现 最大异步并发执行队列
。
/**
* 最大异步并发执行队列
* tasks 任务列表
* maxConcurrency 最大并发数
* @returns {Promise<void>}
*/
async function maxAsyncConcurrency(
tasks: Array<() => Promise<void>>,
maxConcurrency: number,
) {
// 实现这个函数
}
测试代码
const wait = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const waitLog = async (ms, text) => {
await wait(ms);
console.log(text);
};
const main = async () => {
await maxAsyncConcurrencyRecursion(
[
() => waitLog(1000, 1),
() => waitLog(1000, 2),
() => waitLog(1000, 3),
() => waitLog(1000, 4),
() => waitLog(1000, 5),
() => waitLog(1000, 6),
() => waitLog(1000, 7),
() => waitLog(1000, 8),
() => waitLog(1000, 9),
],
3,
);
}
main();
思路1:递归实现(最好理解)
通过递归方式实现,把每个并发当成一个运行管道,每个管道实现为一个运行任务的异步函数,函数中完成一个任务就从队列里取下一个任务继续执行,直到清空队列即可。
async function maxAsyncConcurrencyRecursion(tasks, maxConcurrency) {
const queue = [...tasks];
// 运行管道
const pipeRunFn = async (fn) => {
await fn();
if (queue.length > 0) {
const nextFn = queue.shift();
await pipeRunFn(nextFn);
}
};
// 最大运行管道
const pipes = queue.splice(0, maxConcurrency);
await Promise.all(pipes.map(pipeRunFn));
}
思路2:非递归实现
将思路1中的管道异步函数递归切换成
while
循环条件来实现。
async function maxAsyncConcurrency(fns, max) {
const queue = [...fns];
let active = 0;
while(queue.length) {
if (active >= max) {
await wait(100); // 如果并发已经达到最大,就等会再进入while循环继续轮询
continue;
}
const fn = queue.shift();
active++;
fn().finally(() => {
active--;
});
}
}
更加贴合实践的用法,面向对象象实现流式新增任务
题目
class RequestQueue {
private maxConcurrent: number; // 最大并发数量
private queue: Array<() => Promise<void>> = []; // 存储任务队列
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
/** 添加任务 */
public addTask(task: () => Promise<void>) {}
}
测试代码
const main = async () => {
const reqQueue = new RequestQueue(3);
reqQueue.addTask(() => waitLog(1000, 1))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 2))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 3))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 4))
await wait(2000);
reqQueue.addTask(() => waitLog(1000, 5))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 6))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 7))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 8))
await wait(100);
reqQueue.addTask(() => waitLog(1000, 9))
}
main();
递归实现
流式增加任务,而不是一开始就拿到全量的任务列表。新增任务时自动触发并发执行
class RequestQueueRecursion {
private maxConcurrent: number; // 最大并发数量
private queue: Array<() => Promise<void>> = []; // 存储任务队列
private active: number = 0; // 当前正在运行的任务计数
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
/** 添加一个任务到队列中 */
public addTask(task: () => Promise<void>) {
this.queue.push(task);
this.execute();
}
private async execute() {
while(this.active < this.maxConcurrent && this.query.length > 0) {
this.active ++;
const fn = this.query.shift();
fn().finally(() => {
this.active--;
this.execute();
});
}
}
}
非递归实现
class RequestQueue {
private maxConcurrent: number; // 最大并发数量
private queue: Array<() => Promise<any>> = []; // 存储任务队列
private active: number = 0; // 当前正在运行的任务计数
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
/** 添加一个任务到队列中 */
public addTask(task: () => Promise<any>) {
this.queue.push(task);
this.execute();
}
/** 运行管道 */
private async execute() {
const queue = this.queue;
while(queue.length > 0) {
if (this.active >= this.maxConcurrent) {
await wait(100);
continue;
}
this.active ++;
const fn = queue.shift();
fn().finally(() => {
this.active--;
});
}
}
}