异步并行任务执行工具
📖 概述
runParallelTasks 是一个生产级的并行异步任务执行工具,它提供了一种优雅的方式来并行执行多个异步任务,同时支持丰富的功能如重试机制、超时控制、进度追踪和任务取消。
🎯 设计哲学
为什么这样设计?
传统异步并行处理(如 Promise.all())存在以下局限性:
- 错误处理粗糙:一个任务失败会导致整个批次失败
- 缺乏进度反馈:无法知道任务执行进度
- 无取消机制:无法中途停止任务执行
- 缺乏重试能力:网络波动时无法自动恢复
- 资源管理困难:无法清理超时任务和监听器
本工具的设计目标是解决这些问题,提供:
- ✅ 细粒度错误处理:每个任务独立处理成功/失败
- ✅ 实时进度追踪:精确掌握执行进度
- ✅ 完善的取消机制:支持随时取消所有任务
- ✅ 智能重试策略:自动重试失败任务
- ✅ 资源自动管理:避免内存泄漏
🆚 与传统方案对比
| 特性 | Promise.all() |
Promise.allSettled() |
runParallelTasks |
|---|---|---|---|
| 错误处理 | 一个失败全部失败 | 收集所有结果,无后续处理 | 每个任务独立错误处理 + 全局兜底 |
| 进度追踪 | ❌ 不支持 | ❌ 不支持 | ✅ 实时进度回调 |
| 取消机制 | ❌ 不支持 | ❌ 不支持 | ✅ 支持取消所有任务 |
| 重试机制 | ❌ 不支持 | ❌ 不支持 | ✅ 支持配置化重试 |
| 超时控制 | ❌ 不支持 | ❌ 不支持 | ✅ 支持任务级超时 |
| 资源清理 | ❌ 无 | ❌ 无 | ✅ 自动清理定时器/监听器 |
| 错误调试 | 简单错误信息 | 简单状态信息 | ✅ 完整错误历史记录 |
🏗️ 架构设计
核心执行流程
// 执行流程:重试 → 超时 → 取消
const executeTask = () => withRetry(asyncTask, retryCount, retryDelay, signal, taskIndex, taskName);
const taskPromise = Promise.resolve()
.then(() => withTimeout(executeTask, timeout, taskIndex, taskName))
// 后续处理...
设计说明:
- 执行顺序:超时包裹重试,确保总超时包含所有重试尝试
- 取消检查:每次重试前检查取消状态,避免无效执行
- 错误传播:重试用尽后向上抛出最终错误
重试机制 (withRetry)
/**
* 带重试的任务执行器
* 设计特点:
* 1. 迭代实现:避免递归导致的堆栈溢出
* 2. 取消检查:每次重试前检查取消信号
* 3. 错误记录:记录所有重试错误的历史记录
* 4. 延迟响应:重试延迟期间可立即响应取消
*/
const withRetry = async (asyncTask, retryCount = 0, retryDelay = 0, signal, taskIndex, taskName) => {
const retryErrors = []; // 记录所有重试错误
let currentRetry = 0;
while (currentRetry <= retryCount) {
// 检查取消(第一道防线)
if (signal?.aborted) {
const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止重试`);
abortError.name = 'AbortError';
abortError.retryErrors = retryErrors;
abortError.retryCount = currentRetry;
abortError.totalRetry = retryCount;
throw abortError;
}
try {
const result = await asyncTask(signal);
return {
data: result,
retryCount: currentRetry,
totalRetry: retryCount,
retryErrors
};
} catch (error) {
// 记录错误历史
retryErrors.push({
retry: currentRetry,
error: error.message,
timestamp: new Date().toISOString()
});
// 重试用尽
if (currentRetry >= retryCount) {
error.retryErrors = retryErrors;
error.retryCount = currentRetry;
error.totalRetry = retryCount;
throw error;
}
// 延迟重试(支持取消)
await delayWithCancel(retryDelay, signal, taskIndex, taskName);
currentRetry++;
}
}
};
延迟函数 (delayWithCancel)
/**
* 带取消响应的延迟函数
* 设计特点:
* 1. 取消响应:延迟期间监听取消信号,立即中断
* 2. 资源清理:自动清理定时器和事件监听器
* 3. 原子操作:确保清理操作只执行一次
*/
const delayWithCancel = (delay, signal, taskIndex, taskName) => {
return new Promise((resolve, reject) => {
if (delay <= 0) return resolve();
// 立即检查取消状态
if (signal?.aborted) {
const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止延迟`);
abortError.name = 'AbortError';
return reject(abortError);
}
let timeoutId;
let abortHandler;
// 统一的清理函数
const cleanup = () => {
clearTimeout(timeoutId);
if (abortHandler) {
signal?.removeEventListener('abort', abortHandler);
}
};
// 延迟成功结束
const onFinish = () => {
cleanup();
resolve();
};
// 取消处理函数
abortHandler = () => {
cleanup();
const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})取消,中断重试延迟`);
abortError.name = 'AbortError';
reject(abortError);
};
// 设置延迟
timeoutId = setTimeout(onFinish, delay);
// 监听取消信号
signal?.addEventListener('abort', abortHandler);
});
};
超时控制 (withTimeout)
/**
* 带超时的任务执行器
* 设计特点:
* 1. 总超时:超时时间包含所有重试尝试
* 2. 竞态执行:任务执行与超时竞态,先完成者生效
* 3. 自动清理:任务完成后自动清理超时定时器
*/
const withTimeout = (taskFn, timeout, taskIndex, taskName) => {
if (!timeout || timeout <= 0) return taskFn();
return new Promise((resolve, reject) => {
let timeoutId;
// 超时Promise
const timeoutPromise = new Promise((_, reject) => {
timeoutId = setTimeout(() => {
const timeoutError = new Error(`任务[${taskIndex}](${taskName || '未知'})超时(${timeout}ms,含所有重试)`);
timeoutError.name = 'TaskTimeoutError';
timeoutError.taskIndex = taskIndex;
timeoutError.taskName = taskName;
reject(timeoutError);
}, timeout);
});
// 竞态执行
Promise.race([taskFn(), timeoutPromise])
.then(resolve)
.catch(reject)
.finally(() => {
clearTimeout(timeoutId); // 关键:清理超时定时器
});
});
};
结果聚合 (allDone)
/**
* 聚合所有任务结果
* 设计特点:
* 1. 统一格式:将所有任务结果格式化为统一结构
* 2. 错误兜底:处理意料之外的错误
* 3. 完整信息:包含任务索引、名称、重试信息等
*/
const allDone = Promise.allSettled(taskPromises).then((settledResults) => {
return settledResults.map((item) => {
if (item.status === 'fulfilled') return item.value;
// 兜底处理:理论上不会执行到这里(内部已catch所有错误)
return {
success: false,
error: item.reason,
taskIndex: -1,
taskName: '未知任务',
isAborted: false,
reason: 'UNHANDLED_ERROR',
retryCount: 0,
totalRetry: 0,
retryErrors: []
};
});
});
📚 使用方法
基本安装
// 1. 复制 runParallelTasks 函数到你的项目
// 2. 导入函数
import { runParallelTasks } from './utils/asyncTask';
// 或者作为独立模块使用
// import runParallelTasks from 'parallel-task-runner';
任务队列配置
每个任务可以配置以下属性:
const task = {
// 必需:异步任务函数,可接收 AbortSignal
asyncTask: (signal) => fetch('/api/data', { signal }).then(r => r.json()),
// 可选:任务成功回调(支持异步)
onSuccess: (data, index) => {
console.log(`任务${index}成功:`, data);
updateUI(data);
},
// 可选:任务失败回调(支持异步)
onError: (error, index) => {
console.error(`任务${index}失败:`, error);
showError(error);
},
// 可选:任务名称(用于日志和调试)
taskName: '获取用户数据',
// 可选:总超时时间(毫秒,包含所有重试)
timeout: 10000,
// 可选:重试次数(默认0,不重试)
retryCount: 3,
// 可选:重试延迟(毫秒,默认0)
retryDelay: 1000
};
执行配置
const options = {
// 必需:任务队列数组
taskQueue: [...],
// 可选:全局进度回调
onProgress: (completed, total, taskIndex, taskName) => {
console.log(`进度: ${completed}/${total}`);
updateProgressBar(completed / total);
},
// 可选:全局错误兜底
onGlobalError: (error, taskIndex, taskName) => {
console.error(`任务${taskIndex}(${taskName})未处理错误:`, error);
sendToErrorTracking(error);
},
// 可选:是否启用取消功能(默认true)
enableAbort: true
};
执行和结果处理
// 执行任务
const runner = runParallelTasks(options);
// 1. 使用 allDone 等待所有任务完成
runner.allDone.then(results => {
const successCount = results.filter(r => r.success).length;
const failedCount = results.filter(r => !r.success).length;
console.log(`完成: ${successCount}成功, ${failedCount}失败`);
// 处理成功结果
results.filter(r => r.success).forEach(result => {
console.log(`任务${result.taskIndex}结果:`, result.result);
});
// 处理失败结果
results.filter(r => !r.success).forEach(result => {
console.error(`任务${result.taskIndex}失败原因:`, result.error.message);
if (result.retryCount > 0) {
console.error(`已重试${result.retryCount}次`, result.retryErrors);
}
});
});
// 2. 随时取消任务(如页面卸载时)
// runner.abort();
// 3. 访问单个任务的Promise(高级用法)
// runner.promises[0].then(result => console.log('第一个任务结果:', result));
📋 使用案例
案例1:页面数据加载
/**
* 场景:页面初始化时需要并行加载多个API数据
* 需求:需要进度显示,支持取消,关键数据需要重试
*/
const loadPageData = () => {
const taskQueue = [
{
taskName: '用户信息',
asyncTask: (signal) => api.getUserInfo({ signal }),
timeout: 5000,
retryCount: 1,
retryDelay: 1000,
onSuccess: (data) => store.commit('SET_USER', data),
onError: (error) => {
console.error('用户信息加载失败');
showFallbackUserInfo();
}
},
{
taskName: '配置信息',
asyncTask: (signal) => api.getConfig({ signal }),
timeout: 3000,
onSuccess: (data) => store.commit('SET_CONFIG', data)
},
{
taskName: '推荐内容',
asyncTask: (signal) => api.getRecommendations({ signal }),
timeout: 8000,
onSuccess: (data) => store.commit('SET_RECOMMENDATIONS', data)
}
];
const runner = runParallelTasks({
taskQueue,
onProgress: (completed, total) => {
showLoadingProgress(completed / total * 100);
},
onGlobalError: (error, index, name) => {
logToMonitoring('页面数据加载失败', { taskIndex: index, taskName: name, error });
}
});
// 返回runner,以便在组件卸载时取消
return runner;
};
// 使用
const pageDataLoader = loadPageData();
// 等待所有数据加载完成
pageDataLoader.allDone.then(results => {
const allSuccess = results.every(r => r.success);
if (allSuccess) {
showPageContent();
} else {
showPartialContent(results);
}
});
// 页面卸载时取消未完成的任务
onBeforeUnmount(() => {
pageDataLoader.abort();
});
案例2:批量文件上传
/**
* 场景:批量上传多个文件
* 需求:显示总进度,单个文件可重试,支持取消上传
*/
const uploadFiles = (files) => {
const taskQueue = files.map((file, index) => ({
taskName: `文件: ${file.name}`,
asyncTask: async (signal) => {
// 使用FormData上传
const formData = new FormData();
formData.append('file', file);
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
signal // 支持取消
});
if (!response.ok) {
throw new Error(`上传失败: ${response.status}`);
}
return await response.json();
},
timeout: 30000, // 30秒超时
retryCount: 2, // 重试2次
retryDelay: 2000, // 2秒后重试
onSuccess: (result, index) => {
updateFileStatus(index, 'success');
console.log(`文件${file.name}上传成功:`, result);
},
onError: (error, index) => {
updateFileStatus(index, 'error');
console.error(`文件${file.name}上传失败:`, error);
// 根据重试情况显示不同提示
if (error.retryCount > 0) {
showToast(`${file.name}上传失败,已重试${error.retryCount}次`);
}
}
}));
const runner = runParallelTasks({
taskQueue,
onProgress: (completed, total) => {
updateTotalProgress(completed / total * 100);
},
onGlobalError: (error, index) => {
console.error(`文件${files[index]?.name}上传异常:`, error);
}
});
return runner;
};
// 使用
const files = [...]; // 文件列表
const uploadRunner = uploadFiles(files);
// 监控上传结果
uploadRunner.allDone.then(results => {
const successCount = results.filter(r => r.success).length;
showToast(`上传完成: ${successCount}/${files.length}个文件成功`);
// 处理失败的文件
results.filter(r => !r.success).forEach(result => {
logUploadFailure(result);
});
});
// 用户取消上传
cancelButton.onclick = () => {
uploadRunner.abort();
showToast('上传已取消');
};
案例3:健康检查监控
/**
* 场景:监控多个微服务的健康状态
* 需求:并行检查,快速失败,记录检查历史
*/
const checkServiceHealth = (services) => {
const taskQueue = services.map((service, index) => ({
taskName: service.name,
asyncTask: async (signal) => {
const response = await fetch(`${service.url}/health`, {
signal,
timeout: 3000
});
const data = await response.json();
if (data.status !== 'healthy') {
throw new Error(`服务状态异常: ${data.status}`);
}
return data;
},
timeout: 5000, // 5秒超时
retryCount: 1, // 快速重试1次
retryDelay: 1000,
onSuccess: (data, index) => {
markServiceHealthy(services[index].id);
console.log(`${services[index].name}健康检查通过`);
},
onError: (error, index) => {
const service = services[index];
markServiceUnhealthy(service.id);
// 记录详细的健康检查失败信息
logHealthCheckFailure({
service: service.name,
error: error.message,
retryCount: error.retryCount || 0,
retryErrors: error.retryErrors || []
});
}
}));
const runner = runParallelTasks({
taskQueue,
onProgress: (completed, total) => {
updateDashboardHealthStatus(completed, total);
},
onGlobalError: (error, index, name) => {
// 发送到监控系统
sendToMonitoringSystem({
type: 'HEALTH_CHECK_ERROR',
service: name,
error: error.message
});
},
enableAbort: false // 健康检查不需要取消
});
return runner;
};
// 定时执行健康检查
setInterval(() => {
const services = [
{ id: 'auth', name: '认证服务', url: 'https://auth.example.com' },
{ id: 'payment', name: '支付服务', url: 'https://payment.example.com' },
{ id: 'notification', name: '通知服务', url: 'https://notification.example.com' }
];
const healthChecker = checkServiceHealth(services);
healthChecker.allDone.then(results => {
const healthyCount = results.filter(r => r.success).length;
updateSystemHealthIndicator(healthyCount / results.length * 100);
// 如果有服务不健康,发送警报
const unhealthy = results.filter(r => !r.success);
if (unhealthy.length > 0) {
sendAlert(`有${unhealthy.length}个服务不健康`);
}
});
}, 60000); // 每分钟检查一次
案例4:API请求合并优化
/**
* 场景:页面需要多个API数据,传统方案是串行请求
* 优化:使用并行请求减少总加载时间
*/
const fetchDashboardData = () => {
const taskQueue = [
{
taskName: '用户统计',
asyncTask: () => api.getUserStats(),
timeout: 3000,
onSuccess: (data) => store.commit('SET_USER_STATS', data)
},
{
taskName: '销售数据',
asyncTask: () => api.getSalesData(),
timeout: 5000,
retryCount: 1,
onSuccess: (data) => store.commit('SET_SALES_DATA', data)
},
{
taskName: '库存状态',
asyncTask: () => api.getInventoryStatus(),
timeout: 4000,
onSuccess: (data) => store.commit('SET_INVENTORY', data)
},
{
taskName: '活动列表',
asyncTask: () => api.getActivities(),
timeout: 6000,
onSuccess: (data) => store.commit('SET_ACTIVITIES', data)
}
];
const runner = runParallelTasks({
taskQueue,
onProgress: (completed, total) => {
// 显示加载进度
const progress = Math.min(completed / total * 100, 99); // 最大99%,留1%给最终处理
updateLoadingProgress(progress);
},
onGlobalError: (error, index, name) => {
console.error(`仪表板数据加载失败: ${name}`, error);
}
});
return runner;
};
// 使用 - 相比串行请求,时间从 sum(time) 减少到 max(time)
const dashboardLoader = fetchDashboardData();
// 传统串行方式大约需要 3+5+4+6 = 18秒
// 并行方式最多只需要 max(3,5,4,6) = 6秒
dashboardLoader.allDone.then(results => {
const allLoaded = results.every(r => r.success);
if (allLoaded) {
showDashboard();
} else {
// 部分数据加载失败,显示降级内容
showDegradedDashboard(results);
}
});
🔧 高级配置
自定义重试策略
// 基于错误类型的重试策略
const createRetryConfig = (error) => {
// 网络错误:重试3次
if (error.name === 'NetworkError' || error.name === 'TypeError') {
return { retryCount: 3, retryDelay: 1000 };
}
// 服务器5xx错误:重试2次
if (error.status >= 500 && error.status < 600) {
return { retryCount: 2, retryDelay: 2000 };
}
// 其他错误:不重试
return { retryCount: 0 };
};
// 在任务配置中使用
const task = {
asyncTask: async (signal) => {
try {
return await fetch('/api/data', { signal }).then(r => r.json());
} catch (error) {
// 根据错误类型动态决定重试策略
const retryConfig = createRetryConfig(error);
error.retryConfig = retryConfig;
throw error;
}
},
// 动态重试配置
retryCount: (task) => task.error?.retryConfig?.retryCount || 0,
retryDelay: (task) => task.error?.retryConfig?.retryDelay || 0
};
性能监控集成
// 添加性能监控
const monitoredRunParallelTasks = (options) => {
const startTime = performance.now();
const taskCount = options.taskQueue.length;
const runner = runParallelTasks({
...options,
onProgress: (completed, total, taskIndex, taskName) => {
// 调用原始进度回调
options.onProgress?.(completed, total, taskIndex, taskName);
// 性能监控
if (completed === total) {
const endTime = performance.now();
const duration = endTime - startTime;
sendToAnalytics({
event: 'PARALLEL_TASKS_COMPLETED',
taskCount,
duration,
successRate: completed / total
});
}
}
});
return runner;
};
📊 性能建议
最佳实践
- 合理设置超时时间
-
- 关键任务:5-10秒
- 非关键任务:3-5秒
- 后台任务:10-30秒
- 重试策略建议
-
- 网络请求:重试2-3次,延迟1-2秒
- 支付操作:重试1-2次,延迟2-3秒
- 文件上传:重试1次,延迟3秒
- 并发控制
-
- 虽然工具支持无限并发,但建议根据实际情况控制任务数量
- 大量任务(>50)建议分批执行
- 内存管理
-
- 页面卸载时务必调用
abort()取消未完成任务 - 监控长时间运行的任务,避免内存泄漏
- 页面卸载时务必调用
🐛 常见问题
Q1: 任务取消后,allDone 还会返回结果吗?
A: 会的。取消的任务会返回一个特殊的结果对象,其中 isAborted: true,reason: 'USER_CANCELLED'。allDone 会等待所有任务(包括被取消的)完成。
Q2: 重试期间超时如何计算?
A: 超时时间是从任务开始到结束的总时间,包含所有重试尝试。例如:设置 timeout: 10000,重试3次,那么从第一次尝试开始计时,10秒后如果还没成功则超时。
Q3: 任务函数必须接收 signal 参数吗?
A: 不需要。工具总是传递 signal 参数,但如果你的任务函数不需要取消功能,可以忽略这个参数。
Q4: 如何实现并发控制?
A: 当前版本不内置并发控制,因为设计目标是真正的并行执行。如果需要并发控制,建议在外部实现任务分批。
Q5: 错误对象中的 retryErrors 包含什么?
A: 包含所有重试尝试的错误记录数组,每个记录包含:
-
retry: 第几次重试(从0开始) -
error: 错误信息 -
timestamp: 错误发生时间
📈 扩展建议
如果未来需要扩展功能,可以考虑:
- 优先级调度:为任务添加优先级,高优先级先执行
- 依赖关系:支持任务间的依赖关系
- 并发限制:限制同时执行的任务数量
- 断点续传:对于长时间任务支持暂停/恢复
- 更复杂的重试策略:指数退避、抖动等算法
📝 总结
runParallelTasks 是一个功能全面、设计优雅的并行任务执行工具,它解决了传统异步并行处理的诸多痛点,特别适合以下场景:
- ✅ 复杂页面初始化:需要加载多个API
- ✅ 批量操作:文件上传、数据导入导出
- ✅ 监控检查:服务健康检查、心跳检测
- ✅ 实时数据处理:并行处理多个数据流
- ✅ 用户交互响应:多个后台任务并行执行
通过合理使用这个工具,可以显著提升应用的用户体验和代码的可维护性。
📄 完整代码
最后,这是完整的 runParallelTasks 函数代码:
/**
* @file utils/asyncTask.js
* @description 并行执行异步任务队列(重试机制终极优化版)
* 核心特性:
* 1. 重试延迟期间可立即响应取消(无需等待延迟结束)
* 2. 所有定时器(重试延迟/超时)自动清理,无内存泄漏
* 3. 每次重试前检查取消状态,避免无效重试
* 4. 记录所有重试错误(保留最后一次错误为主,附带错误列表)
* 5. 总超时包裹整个重试过程(符合需求),重试次数/延迟可配置
* 6. 取消/超时/重试逻辑解耦,代码结构清晰
*/
export function runParallelTasks({
taskQueue,
onProgress,
onGlobalError,
enableAbort = true
}) {
// 初始化取消控制器
const controller = enableAbort ? new AbortController() : null;
const { signal } = controller || {};
const total = taskQueue.length;
let completed = 0;
const taskPromises = [];
// 空队列兜底
if (total === 0) {
console.warn('runParallelTasks: 任务队列为空');
return {
promises: taskPromises,
abort: () => {},
allDone: Promise.resolve([])
};
}
/**
* 带取消响应的延迟函数(核心改进:延迟期间可取消,清理定时器)
* @param {number} delay 延迟毫秒数
* @param {AbortSignal} signal 取消信号
* @param {number} taskIndex 任务索引
* @param {string} taskName 任务名称
* @returns {Promise<void>} 延迟Promise,取消时立即reject
*/
const delayWithCancel = (delay, signal, taskIndex, taskName) => {
return new Promise((resolve, reject) => {
if (delay <= 0) return resolve();
// 检查是否已取消
if (signal?.aborted) {
const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止延迟`);
abortError.name = 'AbortError';
return reject(abortError);
}
let timeoutId;
let abortHandler;
// 清理函数
const cleanup = () => {
clearTimeout(timeoutId);
if (abortHandler) {
signal?.removeEventListener('abort', abortHandler);
}
};
// 延迟成功结束
const onFinish = () => {
cleanup();
resolve();
};
// 取消处理
abortHandler = () => {
cleanup();
const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})取消,中断重试延迟`);
abortError.name = 'AbortError';
reject(abortError);
};
// 设置延迟定时器
timeoutId = setTimeout(onFinish, delay);
// 监听取消信号
signal?.addEventListener('abort', abortHandler);
});
};
/**
* 带重试的任务执行器(核心改进:延迟响应取消、记录所有错误、清理定时器)
* @param {Function} asyncTask 异步任务函数
* @param {number} retryCount 重试次数
* @param {number} retryDelay 重试间隔
* @param {AbortSignal} signal 取消信号
* @param {number} taskIndex 任务索引
* @param {string} taskName 任务名称
* @returns {Promise<any>} 任务执行结果
*/
const withRetry = async (asyncTask, retryCount = 0, retryDelay = 0, signal, taskIndex, taskName) => {
const retryErrors = []; // 记录所有重试错误
let currentRetry = 0;
while (currentRetry <= retryCount) {
// 每次重试前检查是否已取消(第一道防线)
if (signal?.aborted) {
const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止重试`);
abortError.name = 'AbortError';
abortError.retryErrors = retryErrors;
abortError.retryCount = currentRetry;
abortError.totalRetry = retryCount;
throw abortError;
}
try {
// 执行单次任务
const result = await asyncTask(signal);
// 成功则返回结果,附带重试信息
return {
data: result,
retryCount: currentRetry,
totalRetry: retryCount,
retryErrors
};
} catch (error) {
// 记录当前错误
retryErrors.push({
retry: currentRetry,
error: error.message || String(error),
timestamp: new Date().toISOString()
});
// 重试用尽,抛出最终错误(附带所有重试错误)
if (currentRetry >= retryCount) {
if (!error.retryErrors) error.retryErrors = retryErrors;
if (!error.retryCount) error.retryCount = currentRetry;
if (!error.totalRetry) error.totalRetry = retryCount;
throw error;
}
console.log(`任务[${taskIndex}](${taskName || '未知'})执行失败,将在${retryDelay}ms后重试(第${currentRetry + 1}/${retryCount}次)`, error.message);
try {
// 重试延迟(支持取消)
await delayWithCancel(retryDelay, signal, taskIndex, taskName);
} catch (delayError) {
// 延迟期间被取消,传播取消错误
delayError.retryErrors = retryErrors;
delayError.retryCount = currentRetry;
delayError.totalRetry = retryCount;
throw delayError;
}
currentRetry++;
}
}
};
/**
* 带超时的任务执行器(总超时包裹整个重试过程)
* @param {Function} taskFn 任务函数(含重试逻辑)
* @param {number} timeout 总超时时间
* @param {number} taskIndex 任务索引
* @param {string} taskName 任务名称
* @returns {Promise<any>} 任务执行结果
*/
const withTimeout = (taskFn, timeout, taskIndex, taskName) => {
if (!timeout || timeout <= 0) return taskFn();
return new Promise((resolve, reject) => {
let timeoutId;
// 总超时Promise
const timeoutPromise = new Promise((_, reject) => {
timeoutId = setTimeout(() => {
const timeoutError = new Error(`任务[${taskIndex}](${taskName || '未知'})超时(${timeout}ms,含所有重试)`);
timeoutError.name = 'TaskTimeoutError';
timeoutError.taskIndex = taskIndex;
timeoutError.taskName = taskName;
reject(timeoutError);
}, timeout);
});
// 竞态执行:任务(含重试) vs 总超时
Promise.race([taskFn(), timeoutPromise])
.then(resolve)
.catch(reject)
.finally(() => {
clearTimeout(timeoutId); // 清理超时定时器
});
});
};
// 遍历执行每个任务
taskQueue.forEach((task, taskIndex) => {
const {
asyncTask,
onSuccess,
onError,
taskName,
timeout,
retryCount = 0,
retryDelay = 0
} = task;
// 执行任务:重试(带取消/延迟清理) → 总超时 → 取消
const executeTask = () => withRetry(asyncTask, retryCount, retryDelay, signal, taskIndex, taskName);
const taskPromise = Promise.resolve()
.then(() => withTimeout(executeTask, timeout, taskIndex, taskName))
// 成功处理
.then((result) => {
// 解构重试结果(兼容无重试的情况)
const { data, retryCount: actualRetry, totalRetry, retryErrors } = result || {};
return Promise.resolve(onSuccess?.(data, taskIndex))
.then(() => ({
success: true,
result: data,
taskIndex,
taskName,
isAborted: false,
retryCount: actualRetry || 0,
totalRetry: totalRetry || 0,
retryErrors: retryErrors || []
}));
})
// 失败处理
.catch((error) => {
// 处理主动取消(含重试延迟中取消)
if (error.name === 'AbortError') {
console.log(`runParallelTasks: 任务[${taskIndex}](${taskName || '未知'})已取消`, error.message);
return {
success: false,
error,
taskIndex,
taskName,
isAborted: true,
reason: 'USER_CANCELLED',
retryCount: error.retryCount || 0,
totalRetry: error.totalRetry || 0,
retryErrors: error.retryErrors || []
};
}
// 处理超时/最终执行失败(重试用尽)
return Promise.resolve()
.then(() => {
// 优先执行专属错误回调
if (onError) {
return onError(error, taskIndex);
}
// 全局错误处理(try-catch兜底)
try {
onGlobalError?.(error, taskIndex, taskName);
} catch (globalErr) {
console.error(`runParallelTasks: 全局错误处理函数执行失败`, globalErr);
}
console.error(
`runParallelTasks: 任务[${taskIndex}](${taskName || '未知'})最终执行失败`,
`已重试${error.retryCount || 0}/${error.totalRetry || 0}次`,
`错误列表:${JSON.stringify(error.retryErrors || [])}`,
error
);
})
.then(() => ({
success: false,
error,
taskIndex,
taskName,
isAborted: false,
reason: error.name === 'TaskTimeoutError' ? 'TIMEOUT' : 'EXECUTION_FAILED',
retryCount: error.retryCount || 0,
totalRetry: error.totalRetry || 0,
retryErrors: error.retryErrors || []
}));
})
// 进度更新(原子操作)
.finally(() => {
const currentCompleted = ++completed;
onProgress?.(currentCompleted, total, taskIndex, taskName);
});
taskPromises.push(taskPromise);
});
// 聚合Promise:格式化所有任务结果
const allDone = Promise.allSettled(taskPromises).then((settledResults) => {
return settledResults.map((item) => {
if (item.status === 'fulfilled') return item.value;
// 兜底处理
return {
success: false,
error: item.reason,
taskIndex: -1,
taskName: '未知任务',
isAborted: false,
reason: 'UNHANDLED_ERROR',
retryCount: 0,
totalRetry: 0,
retryErrors: []
};
});
});
// 取消方法
const abort = () => {
if (controller) {
controller.abort();
console.log('runParallelTasks: 已触发取消所有任务');
} else {
console.warn('runParallelTasks: 未开启取消功能(enableAbort=false)');
}
};
return {
promises: taskPromises,
abort,
allDone
};
}