普通视图

发现新文章,点击刷新页面。
昨天 — 2025年12月13日首页

异步并行任务执行工具

作者 NuLL
2025年12月13日 16:30

📖 概述

runParallelTasks 是一个生产级的并行异步任务执行工具,它提供了一种优雅的方式来并行执行多个异步任务,同时支持丰富的功能如重试机制、超时控制、进度追踪和任务取消。

🎯 设计哲学

为什么这样设计?

传统异步并行处理(如 Promise.all())存在以下局限性:

  1. 错误处理粗糙:一个任务失败会导致整个批次失败
  2. 缺乏进度反馈:无法知道任务执行进度
  3. 无取消机制:无法中途停止任务执行
  4. 缺乏重试能力:网络波动时无法自动恢复
  5. 资源管理困难:无法清理超时任务和监听器

本工具的设计目标是解决这些问题,提供:

  • ✅ 细粒度错误处理:每个任务独立处理成功/失败
  • ✅ 实时进度追踪:精确掌握执行进度
  • ✅ 完善的取消机制:支持随时取消所有任务
  • ✅ 智能重试策略:自动重试失败任务
  • ✅ 资源自动管理:避免内存泄漏

🆚 与传统方案对比

特性 Promise.all() Promise.allSettled() runParallelTasks
错误处理 一个失败全部失败 收集所有结果,无后续处理 每个任务独立错误处理 + 全局兜底
进度追踪 ❌ 不支持 ❌ 不支持 ✅ 实时进度回调
取消机制 ❌ 不支持 ❌ 不支持 ✅ 支持取消所有任务
重试机制 ❌ 不支持 ❌ 不支持 ✅ 支持配置化重试
超时控制 ❌ 不支持 ❌ 不支持 ✅ 支持任务级超时
资源清理 ❌ 无 ❌ 无 ✅ 自动清理定时器/监听器
错误调试 简单错误信息 简单状态信息 ✅ 完整错误历史记录

🏗️ 架构设计

核心执行流程

// 执行流程:重试 → 超时 → 取消
const executeTask = () => withRetry(asyncTask, retryCount, retryDelay, signal, taskIndex, taskName);
const taskPromise = Promise.resolve()
  .then(() => withTimeout(executeTask, timeout, taskIndex, taskName))
  // 后续处理...

设计说明

  • 执行顺序:超时包裹重试,确保总超时包含所有重试尝试
  • 取消检查:每次重试前检查取消状态,避免无效执行
  • 错误传播:重试用尽后向上抛出最终错误

重试机制 (withRetry)

/**
 * 带重试的任务执行器
 * 设计特点:
 * 1. 迭代实现:避免递归导致的堆栈溢出
 * 2. 取消检查:每次重试前检查取消信号
 * 3. 错误记录:记录所有重试错误的历史记录
 * 4. 延迟响应:重试延迟期间可立即响应取消
 */
const withRetry = async (asyncTask, retryCount = 0, retryDelay = 0, signal, taskIndex, taskName) => {
  const retryErrors = []; // 记录所有重试错误
  let currentRetry = 0;

  while (currentRetry <= retryCount) {
    // 检查取消(第一道防线)
    if (signal?.aborted) {
      const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止重试`);
      abortError.name = 'AbortError';
      abortError.retryErrors = retryErrors;
      abortError.retryCount = currentRetry;
      abortError.totalRetry = retryCount;
      throw abortError;
    }

    try {
      const result = await asyncTask(signal);
      return {
        data: result,
        retryCount: currentRetry,
        totalRetry: retryCount,
        retryErrors
      };
    } catch (error) {
      // 记录错误历史
      retryErrors.push({
        retry: currentRetry,
        error: error.message,
        timestamp: new Date().toISOString()
      });

      // 重试用尽
      if (currentRetry >= retryCount) {
        error.retryErrors = retryErrors;
        error.retryCount = currentRetry;
        error.totalRetry = retryCount;
        throw error;
      }

      // 延迟重试(支持取消)
      await delayWithCancel(retryDelay, signal, taskIndex, taskName);
      currentRetry++;
    }
  }
};

延迟函数 (delayWithCancel)

/**
 * 带取消响应的延迟函数
 * 设计特点:
 * 1. 取消响应:延迟期间监听取消信号,立即中断
 * 2. 资源清理:自动清理定时器和事件监听器
 * 3. 原子操作:确保清理操作只执行一次
 */
const delayWithCancel = (delay, signal, taskIndex, taskName) => {
  return new Promise((resolve, reject) => {
    if (delay <= 0) return resolve();
    
    // 立即检查取消状态
    if (signal?.aborted) {
      const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止延迟`);
      abortError.name = 'AbortError';
      return reject(abortError);
    }

    let timeoutId;
    let abortHandler;
    
    // 统一的清理函数
    const cleanup = () => {
      clearTimeout(timeoutId);
      if (abortHandler) {
        signal?.removeEventListener('abort', abortHandler);
      }
    };

    // 延迟成功结束
    const onFinish = () => {
      cleanup();
      resolve();
    };

    // 取消处理函数
    abortHandler = () => {
      cleanup();
      const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})取消,中断重试延迟`);
      abortError.name = 'AbortError';
      reject(abortError);
    };

    // 设置延迟
    timeoutId = setTimeout(onFinish, delay);
    
    // 监听取消信号
    signal?.addEventListener('abort', abortHandler);
  });
};

超时控制 (withTimeout)

/**
 * 带超时的任务执行器
 * 设计特点:
 * 1. 总超时:超时时间包含所有重试尝试
 * 2. 竞态执行:任务执行与超时竞态,先完成者生效
 * 3. 自动清理:任务完成后自动清理超时定时器
 */
const withTimeout = (taskFn, timeout, taskIndex, taskName) => {
  if (!timeout || timeout <= 0) return taskFn();

  return new Promise((resolve, reject) => {
    let timeoutId;
    
    // 超时Promise
    const timeoutPromise = new Promise((_, reject) => {
      timeoutId = setTimeout(() => {
        const timeoutError = new Error(`任务[${taskIndex}](${taskName || '未知'})超时(${timeout}ms,含所有重试)`);
        timeoutError.name = 'TaskTimeoutError';
        timeoutError.taskIndex = taskIndex;
        timeoutError.taskName = taskName;
        reject(timeoutError);
      }, timeout);
    });

    // 竞态执行
    Promise.race([taskFn(), timeoutPromise])
      .then(resolve)
      .catch(reject)
      .finally(() => {
        clearTimeout(timeoutId); // 关键:清理超时定时器
      });
  });
};

结果聚合 (allDone)

/**
 * 聚合所有任务结果
 * 设计特点:
 * 1. 统一格式:将所有任务结果格式化为统一结构
 * 2. 错误兜底:处理意料之外的错误
 * 3. 完整信息:包含任务索引、名称、重试信息等
 */
const allDone = Promise.allSettled(taskPromises).then((settledResults) => {
  return settledResults.map((item) => {
    if (item.status === 'fulfilled') return item.value;
    
    // 兜底处理:理论上不会执行到这里(内部已catch所有错误)
    return {
      success: false,
      error: item.reason,
      taskIndex: -1,
      taskName: '未知任务',
      isAborted: false,
      reason: 'UNHANDLED_ERROR',
      retryCount: 0,
      totalRetry: 0,
      retryErrors: []
    };
  });
});

📚 使用方法

基本安装

// 1. 复制 runParallelTasks 函数到你的项目
// 2. 导入函数
import { runParallelTasks } from './utils/asyncTask';

// 或者作为独立模块使用
// import runParallelTasks from 'parallel-task-runner';

任务队列配置

每个任务可以配置以下属性:

const task = {
  // 必需:异步任务函数,可接收 AbortSignal
  asyncTask: (signal) => fetch('/api/data', { signal }).then(r => r.json()),
  
  // 可选:任务成功回调(支持异步)
  onSuccess: (data, index) => {
    console.log(`任务${index}成功:`, data);
    updateUI(data);
  },
  
  // 可选:任务失败回调(支持异步)
  onError: (error, index) => {
    console.error(`任务${index}失败:`, error);
    showError(error);
  },
  
  // 可选:任务名称(用于日志和调试)
  taskName: '获取用户数据',
  
  // 可选:总超时时间(毫秒,包含所有重试)
  timeout: 10000,
  
  // 可选:重试次数(默认0,不重试)
  retryCount: 3,
  
  // 可选:重试延迟(毫秒,默认0)
  retryDelay: 1000
};

执行配置

const options = {
  // 必需:任务队列数组
  taskQueue: [...],
  
  // 可选:全局进度回调
  onProgress: (completed, total, taskIndex, taskName) => {
    console.log(`进度: ${completed}/${total}`);
    updateProgressBar(completed / total);
  },
  
  // 可选:全局错误兜底
  onGlobalError: (error, taskIndex, taskName) => {
    console.error(`任务${taskIndex}(${taskName})未处理错误:`, error);
    sendToErrorTracking(error);
  },
  
  // 可选:是否启用取消功能(默认true)
  enableAbort: true
};

执行和结果处理

// 执行任务
const runner = runParallelTasks(options);

// 1. 使用 allDone 等待所有任务完成
runner.allDone.then(results => {
  const successCount = results.filter(r => r.success).length;
  const failedCount = results.filter(r => !r.success).length;
  
  console.log(`完成: ${successCount}成功, ${failedCount}失败`);
  
  // 处理成功结果
  results.filter(r => r.success).forEach(result => {
    console.log(`任务${result.taskIndex}结果:`, result.result);
  });
  
  // 处理失败结果
  results.filter(r => !r.success).forEach(result => {
    console.error(`任务${result.taskIndex}失败原因:`, result.error.message);
    if (result.retryCount > 0) {
      console.error(`已重试${result.retryCount}次`, result.retryErrors);
    }
  });
});

// 2. 随时取消任务(如页面卸载时)
// runner.abort();

// 3. 访问单个任务的Promise(高级用法)
// runner.promises[0].then(result => console.log('第一个任务结果:', result));

📋 使用案例

案例1:页面数据加载

/**
 * 场景:页面初始化时需要并行加载多个API数据
 * 需求:需要进度显示,支持取消,关键数据需要重试
 */
const loadPageData = () => {
  const taskQueue = [
    {
      taskName: '用户信息',
      asyncTask: (signal) => api.getUserInfo({ signal }),
      timeout: 5000,
      retryCount: 1,
      retryDelay: 1000,
      onSuccess: (data) => store.commit('SET_USER', data),
      onError: (error) => {
        console.error('用户信息加载失败');
        showFallbackUserInfo();
      }
    },
    {
      taskName: '配置信息',
      asyncTask: (signal) => api.getConfig({ signal }),
      timeout: 3000,
      onSuccess: (data) => store.commit('SET_CONFIG', data)
    },
    {
      taskName: '推荐内容',
      asyncTask: (signal) => api.getRecommendations({ signal }),
      timeout: 8000,
      onSuccess: (data) => store.commit('SET_RECOMMENDATIONS', data)
    }
  ];

  const runner = runParallelTasks({
    taskQueue,
    onProgress: (completed, total) => {
      showLoadingProgress(completed / total * 100);
    },
    onGlobalError: (error, index, name) => {
      logToMonitoring('页面数据加载失败', { taskIndex: index, taskName: name, error });
    }
  });

  // 返回runner,以便在组件卸载时取消
  return runner;
};

// 使用
const pageDataLoader = loadPageData();

// 等待所有数据加载完成
pageDataLoader.allDone.then(results => {
  const allSuccess = results.every(r => r.success);
  if (allSuccess) {
    showPageContent();
  } else {
    showPartialContent(results);
  }
});

// 页面卸载时取消未完成的任务
onBeforeUnmount(() => {
  pageDataLoader.abort();
});

案例2:批量文件上传

/**
 * 场景:批量上传多个文件
 * 需求:显示总进度,单个文件可重试,支持取消上传
 */
const uploadFiles = (files) => {
  const taskQueue = files.map((file, index) => ({
    taskName: `文件: ${file.name}`,
    asyncTask: async (signal) => {
      // 使用FormData上传
      const formData = new FormData();
      formData.append('file', file);
      
      const response = await fetch('/api/upload', {
        method: 'POST',
        body: formData,
        signal // 支持取消
      });
      
      if (!response.ok) {
        throw new Error(`上传失败: ${response.status}`);
      }
      
      return await response.json();
    },
    timeout: 30000, // 30秒超时
    retryCount: 2,  // 重试2次
    retryDelay: 2000, // 2秒后重试
    onSuccess: (result, index) => {
      updateFileStatus(index, 'success');
      console.log(`文件${file.name}上传成功:`, result);
    },
    onError: (error, index) => {
      updateFileStatus(index, 'error');
      console.error(`文件${file.name}上传失败:`, error);
      
      // 根据重试情况显示不同提示
      if (error.retryCount > 0) {
        showToast(`${file.name}上传失败,已重试${error.retryCount}次`);
      }
    }
  }));

  const runner = runParallelTasks({
    taskQueue,
    onProgress: (completed, total) => {
      updateTotalProgress(completed / total * 100);
    },
    onGlobalError: (error, index) => {
      console.error(`文件${files[index]?.name}上传异常:`, error);
    }
  });

  return runner;
};

// 使用
const files = [...]; // 文件列表
const uploadRunner = uploadFiles(files);

// 监控上传结果
uploadRunner.allDone.then(results => {
  const successCount = results.filter(r => r.success).length;
  showToast(`上传完成: ${successCount}/${files.length}个文件成功`);
  
  // 处理失败的文件
  results.filter(r => !r.success).forEach(result => {
    logUploadFailure(result);
  });
});

// 用户取消上传
cancelButton.onclick = () => {
  uploadRunner.abort();
  showToast('上传已取消');
};

案例3:健康检查监控

/**
 * 场景:监控多个微服务的健康状态
 * 需求:并行检查,快速失败,记录检查历史
 */
const checkServiceHealth = (services) => {
  const taskQueue = services.map((service, index) => ({
    taskName: service.name,
    asyncTask: async (signal) => {
      const response = await fetch(`${service.url}/health`, {
        signal,
        timeout: 3000
      });
      
      const data = await response.json();
      
      if (data.status !== 'healthy') {
        throw new Error(`服务状态异常: ${data.status}`);
      }
      
      return data;
    },
    timeout: 5000, // 5秒超时
    retryCount: 1, // 快速重试1次
    retryDelay: 1000,
    onSuccess: (data, index) => {
      markServiceHealthy(services[index].id);
      console.log(`${services[index].name}健康检查通过`);
    },
    onError: (error, index) => {
      const service = services[index];
      markServiceUnhealthy(service.id);
      
      // 记录详细的健康检查失败信息
      logHealthCheckFailure({
        service: service.name,
        error: error.message,
        retryCount: error.retryCount || 0,
        retryErrors: error.retryErrors || []
      });
    }
  }));

  const runner = runParallelTasks({
    taskQueue,
    onProgress: (completed, total) => {
      updateDashboardHealthStatus(completed, total);
    },
    onGlobalError: (error, index, name) => {
      // 发送到监控系统
      sendToMonitoringSystem({
        type: 'HEALTH_CHECK_ERROR',
        service: name,
        error: error.message
      });
    },
    enableAbort: false // 健康检查不需要取消
  });

  return runner;
};

// 定时执行健康检查
setInterval(() => {
  const services = [
    { id: 'auth', name: '认证服务', url: 'https://auth.example.com' },
    { id: 'payment', name: '支付服务', url: 'https://payment.example.com' },
    { id: 'notification', name: '通知服务', url: 'https://notification.example.com' }
  ];
  
  const healthChecker = checkServiceHealth(services);
  
  healthChecker.allDone.then(results => {
    const healthyCount = results.filter(r => r.success).length;
    updateSystemHealthIndicator(healthyCount / results.length * 100);
    
    // 如果有服务不健康,发送警报
    const unhealthy = results.filter(r => !r.success);
    if (unhealthy.length > 0) {
      sendAlert(`有${unhealthy.length}个服务不健康`);
    }
  });
}, 60000); // 每分钟检查一次

案例4:API请求合并优化

/**
 * 场景:页面需要多个API数据,传统方案是串行请求
 * 优化:使用并行请求减少总加载时间
 */
const fetchDashboardData = () => {
  const taskQueue = [
    {
      taskName: '用户统计',
      asyncTask: () => api.getUserStats(),
      timeout: 3000,
      onSuccess: (data) => store.commit('SET_USER_STATS', data)
    },
    {
      taskName: '销售数据',
      asyncTask: () => api.getSalesData(),
      timeout: 5000,
      retryCount: 1,
      onSuccess: (data) => store.commit('SET_SALES_DATA', data)
    },
    {
      taskName: '库存状态',
      asyncTask: () => api.getInventoryStatus(),
      timeout: 4000,
      onSuccess: (data) => store.commit('SET_INVENTORY', data)
    },
    {
      taskName: '活动列表',
      asyncTask: () => api.getActivities(),
      timeout: 6000,
      onSuccess: (data) => store.commit('SET_ACTIVITIES', data)
    }
  ];

  const runner = runParallelTasks({
    taskQueue,
    onProgress: (completed, total) => {
      // 显示加载进度
      const progress = Math.min(completed / total * 100, 99); // 最大99%,留1%给最终处理
      updateLoadingProgress(progress);
    },
    onGlobalError: (error, index, name) => {
      console.error(`仪表板数据加载失败: ${name}`, error);
    }
  });

  return runner;
};

// 使用 - 相比串行请求,时间从 sum(time) 减少到 max(time)
const dashboardLoader = fetchDashboardData();

// 传统串行方式大约需要 3+5+4+6 = 18秒
// 并行方式最多只需要 max(3,5,4,6) = 6秒

dashboardLoader.allDone.then(results => {
  const allLoaded = results.every(r => r.success);
  
  if (allLoaded) {
    showDashboard();
  } else {
    // 部分数据加载失败,显示降级内容
    showDegradedDashboard(results);
  }
});

🔧 高级配置

自定义重试策略

// 基于错误类型的重试策略
const createRetryConfig = (error) => {
  // 网络错误:重试3次
  if (error.name === 'NetworkError' || error.name === 'TypeError') {
    return { retryCount: 3, retryDelay: 1000 };
  }
  
  // 服务器5xx错误:重试2次
  if (error.status >= 500 && error.status < 600) {
    return { retryCount: 2, retryDelay: 2000 };
  }
  
  // 其他错误:不重试
  return { retryCount: 0 };
};

// 在任务配置中使用
const task = {
  asyncTask: async (signal) => {
    try {
      return await fetch('/api/data', { signal }).then(r => r.json());
    } catch (error) {
      // 根据错误类型动态决定重试策略
      const retryConfig = createRetryConfig(error);
      error.retryConfig = retryConfig;
      throw error;
    }
  },
  // 动态重试配置
  retryCount: (task) => task.error?.retryConfig?.retryCount || 0,
  retryDelay: (task) => task.error?.retryConfig?.retryDelay || 0
};

性能监控集成

// 添加性能监控
const monitoredRunParallelTasks = (options) => {
  const startTime = performance.now();
  const taskCount = options.taskQueue.length;
  
  const runner = runParallelTasks({
    ...options,
    onProgress: (completed, total, taskIndex, taskName) => {
      // 调用原始进度回调
      options.onProgress?.(completed, total, taskIndex, taskName);
      
      // 性能监控
      if (completed === total) {
        const endTime = performance.now();
        const duration = endTime - startTime;
        
        sendToAnalytics({
          event: 'PARALLEL_TASKS_COMPLETED',
          taskCount,
          duration,
          successRate: completed / total
        });
      }
    }
  });
  
  return runner;
};

📊 性能建议

最佳实践

  1. 合理设置超时时间
    • 关键任务:5-10秒
    • 非关键任务:3-5秒
    • 后台任务:10-30秒
  1. 重试策略建议
    • 网络请求:重试2-3次,延迟1-2秒
    • 支付操作:重试1-2次,延迟2-3秒
    • 文件上传:重试1次,延迟3秒
  1. 并发控制
    • 虽然工具支持无限并发,但建议根据实际情况控制任务数量
    • 大量任务(>50)建议分批执行
  1. 内存管理
    • 页面卸载时务必调用 abort() 取消未完成任务
    • 监控长时间运行的任务,避免内存泄漏

🐛 常见问题

Q1: 任务取消后,allDone 还会返回结果吗?

A: 会的。取消的任务会返回一个特殊的结果对象,其中 isAborted: truereason: 'USER_CANCELLED'allDone 会等待所有任务(包括被取消的)完成。

Q2: 重试期间超时如何计算?

A: 超时时间是从任务开始到结束的总时间,包含所有重试尝试。例如:设置 timeout: 10000,重试3次,那么从第一次尝试开始计时,10秒后如果还没成功则超时。

Q3: 任务函数必须接收 signal 参数吗?

A: 不需要。工具总是传递 signal 参数,但如果你的任务函数不需要取消功能,可以忽略这个参数。

Q4: 如何实现并发控制?

A: 当前版本不内置并发控制,因为设计目标是真正的并行执行。如果需要并发控制,建议在外部实现任务分批。

Q5: 错误对象中的 retryErrors 包含什么?

A: 包含所有重试尝试的错误记录数组,每个记录包含:

  • retry: 第几次重试(从0开始)
  • error: 错误信息
  • timestamp: 错误发生时间

📈 扩展建议

如果未来需要扩展功能,可以考虑:

  1. 优先级调度:为任务添加优先级,高优先级先执行
  2. 依赖关系:支持任务间的依赖关系
  3. 并发限制:限制同时执行的任务数量
  4. 断点续传:对于长时间任务支持暂停/恢复
  5. 更复杂的重试策略:指数退避、抖动等算法

📝 总结

runParallelTasks 是一个功能全面、设计优雅的并行任务执行工具,它解决了传统异步并行处理的诸多痛点,特别适合以下场景:

  • ✅ 复杂页面初始化:需要加载多个API
  • ✅ 批量操作:文件上传、数据导入导出
  • ✅ 监控检查:服务健康检查、心跳检测
  • ✅ 实时数据处理:并行处理多个数据流
  • ✅ 用户交互响应:多个后台任务并行执行

通过合理使用这个工具,可以显著提升应用的用户体验和代码的可维护性。


📄 完整代码

最后,这是完整的 runParallelTasks 函数代码:

/**
 * @file utils/asyncTask.js
 * @description 并行执行异步任务队列(重试机制终极优化版)
 * 核心特性:
 * 1. 重试延迟期间可立即响应取消(无需等待延迟结束)
 * 2. 所有定时器(重试延迟/超时)自动清理,无内存泄漏
 * 3. 每次重试前检查取消状态,避免无效重试
 * 4. 记录所有重试错误(保留最后一次错误为主,附带错误列表)
 * 5. 总超时包裹整个重试过程(符合需求),重试次数/延迟可配置
 * 6. 取消/超时/重试逻辑解耦,代码结构清晰
 */

export function runParallelTasks({
  taskQueue,
  onProgress,
  onGlobalError,
  enableAbort = true
}) {
  // 初始化取消控制器
  const controller = enableAbort ? new AbortController() : null;
  const { signal } = controller || {};
  const total = taskQueue.length;
  let completed = 0;
  const taskPromises = [];

  // 空队列兜底
  if (total === 0) {
    console.warn('runParallelTasks: 任务队列为空');
    return {
      promises: taskPromises,
      abort: () => {},
      allDone: Promise.resolve([])
    };
  }

  /**
   * 带取消响应的延迟函数(核心改进:延迟期间可取消,清理定时器)
   * @param {number} delay 延迟毫秒数
   * @param {AbortSignal} signal 取消信号
   * @param {number} taskIndex 任务索引
   * @param {string} taskName 任务名称
   * @returns {Promise<void>} 延迟Promise,取消时立即reject
   */
  const delayWithCancel = (delay, signal, taskIndex, taskName) => {
    return new Promise((resolve, reject) => {
      if (delay <= 0) return resolve();
      
      // 检查是否已取消
      if (signal?.aborted) {
        const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止延迟`);
        abortError.name = 'AbortError';
        return reject(abortError);
      }

      let timeoutId;
      let abortHandler;
      
      // 清理函数
      const cleanup = () => {
        clearTimeout(timeoutId);
        if (abortHandler) {
          signal?.removeEventListener('abort', abortHandler);
        }
      };

      // 延迟成功结束
      const onFinish = () => {
        cleanup();
        resolve();
      };

      // 取消处理
      abortHandler = () => {
        cleanup();
        const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})取消,中断重试延迟`);
        abortError.name = 'AbortError';
        reject(abortError);
      };

      // 设置延迟定时器
      timeoutId = setTimeout(onFinish, delay);
      
      // 监听取消信号
      signal?.addEventListener('abort', abortHandler);
    });
  };

  /**
   * 带重试的任务执行器(核心改进:延迟响应取消、记录所有错误、清理定时器)
   * @param {Function} asyncTask 异步任务函数
   * @param {number} retryCount 重试次数
   * @param {number} retryDelay 重试间隔
   * @param {AbortSignal} signal 取消信号
   * @param {number} taskIndex 任务索引
   * @param {string} taskName 任务名称
   * @returns {Promise<any>} 任务执行结果
   */
  const withRetry = async (asyncTask, retryCount = 0, retryDelay = 0, signal, taskIndex, taskName) => {
    const retryErrors = []; // 记录所有重试错误
    let currentRetry = 0;

    while (currentRetry <= retryCount) {
      // 每次重试前检查是否已取消(第一道防线)
      if (signal?.aborted) {
        const abortError = new Error(`任务[${taskIndex}](${taskName || '未知'})已取消,终止重试`);
        abortError.name = 'AbortError';
        abortError.retryErrors = retryErrors;
        abortError.retryCount = currentRetry;
        abortError.totalRetry = retryCount;
        throw abortError;
      }

      try {
        // 执行单次任务
        const result = await asyncTask(signal);
        // 成功则返回结果,附带重试信息
        return {
          data: result,
          retryCount: currentRetry,
          totalRetry: retryCount,
          retryErrors
        };
      } catch (error) {
        // 记录当前错误
        retryErrors.push({
          retry: currentRetry,
          error: error.message || String(error),
          timestamp: new Date().toISOString()
        });

        // 重试用尽,抛出最终错误(附带所有重试错误)
        if (currentRetry >= retryCount) {
          if (!error.retryErrors) error.retryErrors = retryErrors;
          if (!error.retryCount) error.retryCount = currentRetry;
          if (!error.totalRetry) error.totalRetry = retryCount;
          throw error;
        }

        console.log(`任务[${taskIndex}](${taskName || '未知'})执行失败,将在${retryDelay}ms后重试(第${currentRetry + 1}/${retryCount}次)`, error.message);
        
        try {
          // 重试延迟(支持取消)
          await delayWithCancel(retryDelay, signal, taskIndex, taskName);
        } catch (delayError) {
          // 延迟期间被取消,传播取消错误
          delayError.retryErrors = retryErrors;
          delayError.retryCount = currentRetry;
          delayError.totalRetry = retryCount;
          throw delayError;
        }
        
        currentRetry++;
      }
    }
  };

  /**
   * 带超时的任务执行器(总超时包裹整个重试过程)
   * @param {Function} taskFn 任务函数(含重试逻辑)
   * @param {number} timeout 总超时时间
   * @param {number} taskIndex 任务索引
   * @param {string} taskName 任务名称
   * @returns {Promise<any>} 任务执行结果
   */
  const withTimeout = (taskFn, timeout, taskIndex, taskName) => {
    if (!timeout || timeout <= 0) return taskFn();

    return new Promise((resolve, reject) => {
      let timeoutId;
      // 总超时Promise
      const timeoutPromise = new Promise((_, reject) => {
        timeoutId = setTimeout(() => {
          const timeoutError = new Error(`任务[${taskIndex}](${taskName || '未知'})超时(${timeout}ms,含所有重试)`);
          timeoutError.name = 'TaskTimeoutError';
          timeoutError.taskIndex = taskIndex;
          timeoutError.taskName = taskName;
          reject(timeoutError);
        }, timeout);
      });

      // 竞态执行:任务(含重试) vs 总超时
      Promise.race([taskFn(), timeoutPromise])
        .then(resolve)
        .catch(reject)
        .finally(() => {
          clearTimeout(timeoutId); // 清理超时定时器
        });
    });
  };

  // 遍历执行每个任务
  taskQueue.forEach((task, taskIndex) => {
    const {
      asyncTask,
      onSuccess,
      onError,
      taskName,
      timeout,
      retryCount = 0,
      retryDelay = 0
    } = task;

    // 执行任务:重试(带取消/延迟清理) → 总超时 → 取消
    const executeTask = () => withRetry(asyncTask, retryCount, retryDelay, signal, taskIndex, taskName);

    const taskPromise = Promise.resolve()
      .then(() => withTimeout(executeTask, timeout, taskIndex, taskName))
      // 成功处理
      .then((result) => {
        // 解构重试结果(兼容无重试的情况)
        const { data, retryCount: actualRetry, totalRetry, retryErrors } = result || {};
        return Promise.resolve(onSuccess?.(data, taskIndex))
          .then(() => ({
            success: true,
            result: data,
            taskIndex,
            taskName,
            isAborted: false,
            retryCount: actualRetry || 0,
            totalRetry: totalRetry || 0,
            retryErrors: retryErrors || []
          }));
      })
      // 失败处理
      .catch((error) => {
        // 处理主动取消(含重试延迟中取消)
        if (error.name === 'AbortError') {
          console.log(`runParallelTasks: 任务[${taskIndex}](${taskName || '未知'})已取消`, error.message);
          return {
            success: false,
            error,
            taskIndex,
            taskName,
            isAborted: true,
            reason: 'USER_CANCELLED',
            retryCount: error.retryCount || 0,
            totalRetry: error.totalRetry || 0,
            retryErrors: error.retryErrors || []
          };
        }

        // 处理超时/最终执行失败(重试用尽)
        return Promise.resolve()
          .then(() => {
            // 优先执行专属错误回调
            if (onError) {
              return onError(error, taskIndex);
            }
            // 全局错误处理(try-catch兜底)
            try {
              onGlobalError?.(error, taskIndex, taskName);
            } catch (globalErr) {
              console.error(`runParallelTasks: 全局错误处理函数执行失败`, globalErr);
            }
            console.error(
              `runParallelTasks: 任务[${taskIndex}](${taskName || '未知'})最终执行失败`,
              `已重试${error.retryCount || 0}/${error.totalRetry || 0}次`,
              `错误列表:${JSON.stringify(error.retryErrors || [])}`,
              error
            );
          })
          .then(() => ({
            success: false,
            error,
            taskIndex,
            taskName,
            isAborted: false,
            reason: error.name === 'TaskTimeoutError' ? 'TIMEOUT' : 'EXECUTION_FAILED',
            retryCount: error.retryCount || 0,
            totalRetry: error.totalRetry || 0,
            retryErrors: error.retryErrors || []
          }));
      })
      // 进度更新(原子操作)
      .finally(() => {
        const currentCompleted = ++completed;
        onProgress?.(currentCompleted, total, taskIndex, taskName);
      });

    taskPromises.push(taskPromise);
  });

  // 聚合Promise:格式化所有任务结果
  const allDone = Promise.allSettled(taskPromises).then((settledResults) => {
    return settledResults.map((item) => {
      if (item.status === 'fulfilled') return item.value;
      // 兜底处理
      return {
        success: false,
        error: item.reason,
        taskIndex: -1,
        taskName: '未知任务',
        isAborted: false,
        reason: 'UNHANDLED_ERROR',
        retryCount: 0,
        totalRetry: 0,
        retryErrors: []
      };
    });
  });

  // 取消方法
  const abort = () => {
    if (controller) {
      controller.abort();
      console.log('runParallelTasks: 已触发取消所有任务');
    } else {
      console.warn('runParallelTasks: 未开启取消功能(enableAbort=false)');
    }
  };

  return {
    promises: taskPromises,
    abort,
    allDone
  };
}

异步互斥锁

作者 NuLL
2025年12月13日 16:27

异步任务互斥锁工具 (Async Lock Manager)

📖 概述

LockManager 是一个生产级的异步任务互斥锁管理工具,专为现代 Web 应用中的并发控制设计。它通过互斥锁机制防止异步任务重复执行,提供队列管理、智能重试、超时控制和资源自动清理等功能。

🎯 设计哲学

为什么需要异步任务互斥锁?

传统的防抖节流方案存在以下局限性:

  1. 无法防止长时间异步操作:防抖节流只能控制函数调用频率,但无法防止 API 接口长时间未返回时的重复调用
  2. 缺乏队列管理:多个并发请求无法有序排队执行
  3. 缺少取消机制:无法中断已发起的异步任务
  4. 资源管理困难:无法自动清理过期锁和等待任务
  5. 缺乏智能重试:简单的重试策略无法适应复杂错误场景

本工具的设计目标是解决这些问题,提供:

  • ✅ 原子性操作:确保锁的获取和释放是原子操作
  • ✅ 智能队列管理:支持 FIFO 队列,可配置队列大小和超时
  • ✅ 可中断执行:支持任务取消和超时中断
  • ✅ 指数退避重试:支持自定义重试条件和退避策略
  • ✅ 资源自动管理:自动清理过期锁和队列项
  • ✅ 完整监控统计:提供执行统计和状态监控

🆚 与传统方案对比

特性 防抖 (Debounce) 节流 (Throttle) 简单互斥锁 LockManager
防止重复调用 ✅ 时间窗口内 ✅ 固定频率 ✅ 直到完成 ✅ 直到完成 + 队列
异步任务支持 ❌ 有限 ❌ 有限 ✅ 基础 ✅ 完整(重试、超时、取消)
队列管理 ❌ 不支持 ❌ 不支持 ❌ 不支持 ✅ 支持 FIFO 队列
取消机制 ❌ 不支持 ❌ 不支持 ❌ 不支持 ✅ 支持主动取消
重试策略 ❌ 不支持 ❌ 不支持 ❌ 不支持 ✅ 指数退避 + 自定义条件
超时控制 ❌ 不支持 ❌ 不支持 ❌ 不支持 ✅ 支持任务和队列超时
资源清理 ❌ 无 ❌ 无 ❌ 无 ✅ 自动清理过期锁
状态监控 ❌ 无 ❌ 无 ❌ 无 ✅ 完整统计信息

🏗️ 架构设计

核心执行流程

// 执行流程:检查锁 → 加入队列(可选) → 获取锁 → 执行任务 → 释放锁 → 处理队列
async execute(options) {
  // 1. 检查锁状态
  // 2. 如果已锁定且启用队列,加入队列等待
  // 3. 获取锁(原子操作)
  // 4. 执行任务(支持重试)
  // 5. 清理锁资源
  // 6. 处理队列中的下一个任务
}

锁管理机制 (_acquireLock)

/**
 * 原子性地获取锁
 * 设计特点:
 * 1. 三重检查:确保锁获取的原子性
 * 2. 唯一标识:为每个锁尝试生成唯一ID
 * 3. 资源预分配:提前创建取消控制器
 * 4. 验证机制:设置后验证确保原子性
 */
_acquireLock(name) {
  const attemptId = `${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
  
  // 第一重检查
  const existing = this._lockMap.get(name);
  if (existing?.locked) {
    return null;
  }
  
  // 创建锁对象
  const lockItem = {
    locked: true,
    abortController: new AbortController(),
    timeoutTimer: null,
    createdAt: Date.now(),
    taskId: `${name}_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`,
    attemptId: attemptId
  };
  
  // 第二重检查(原子性保障)
  const current = this._lockMap.get(name);
  if (current?.locked) {
    return null;
  }
  
  // 设置锁
  this._lockMap.set(name, lockItem);
  
  // 最终验证(确保原子性)
  const afterSet = this._lockMap.get(name);
  if (afterSet?.attemptId !== attemptId) {
    lockItem.abortController.abort();
    return null;
  }
  
  return lockItem;
}

队列管理机制 (_addToQueue_processNextInQueue)

/**
 * 将任务加入等待队列
 * 设计特点:
 * 1. 容量控制:可配置最大队列大小
 * 2. 超时管理:队列等待也有超时控制
 * 3. 有序执行:FIFO(先进先出)原则
 * 4. 资源清理:超时自动清理队列项
 */
_addToQueue(options) {
  const { name, maxQueueSize } = options;
  
  let queue = this._queueMap.get(name);
  if (!queue) {
    queue = [];
    this._queueMap.set(name, queue);
  }
  
  // 队列容量检查
  if (queue.length >= maxQueueSize) {
    const error = new Error(`任务队列【${name}】已满(最大${maxQueueSize})`);
    error.type = 'queue_full';
    error.code = 'QUEUE_FULL';
    return Promise.reject(error);
  }
  
  return new Promise((resolve, reject) => {
    const queueItem = {
      options,
      resolve,
      reject,
      enqueuedAt: Date.now()
    };
    
    queue.push(queueItem);
    
    // 队列等待超时
    if (options.timeout > 0) {
      queueItem.timeoutTimer = setTimeout(() => {
        const index = queue.indexOf(queueItem);
        if (index > -1) {
          queue.splice(index, 1);
          const error = new Error(`任务【${name}】在队列中等待超时`);
          error.type = 'queue_timeout';
          error.code = 'QUEUE_TIMEOUT';
          reject(error);
        }
      }, options.timeout);
    }
  });
}

/**
 * 处理队列中的下一个任务
 * 设计特点:
 * 1. 微任务调度:使用 Promise.resolve() 避免 setTimeout 延迟
 * 2. 递归处理:自动处理队列中的所有任务
 * 3. 资源清理:处理完成后清理空队列
 * 4. 错误传播:正确处理任务成功和失败
 */
async _processNextInQueue(name) {
  const queue = this._queueMap.get(name);
  if (!queue || queue.length === 0) {
    this._queueMap.delete(name);
    return;
  }
  
  // 使用微任务处理,避免 setTimeout 的延迟
  await Promise.resolve();
  
  const queueItem = queue.shift();
  
  // 清理队列项的超时定时器
  if (queueItem.timeoutTimer) {
    clearTimeout(queueItem.timeoutTimer);
  }
  
  try {
    const result = await this._executeTask(queueItem.options);
    queueItem.resolve(result);
  } catch (error) {
    queueItem.reject(error);
  } finally {
    // 递归处理下一个任务
    if (queue.length > 0) {
      Promise.resolve().then(() => this._processNextInQueue(name));
    } else {
      this._queueMap.delete(name);
    }
  }
}

智能重试机制 (_executeWithExponentialBackoff)

/**
 * 指数退避重试执行
 * 设计特点:
 * 1. 取消检查:每次重试前检查取消信号
 * 2. 退避算法:指数退避 + 随机抖动
 * 3. 自定义条件:支持根据错误类型决定是否重试
 * 4. 安全延迟:可中断的延时函数
 */
async _executeWithExponentialBackoff(fn, maxRetries, baseDelay, maxDelay, abortController, retryCondition) {
  let lastError;
  
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      // 检查是否已取消
      if (abortController.signal.aborted) {
        const cancelError = new Error('任务已被取消');
        cancelError.type = 'cancel';
        cancelError.code = 'CANCELLED';
        throw cancelError;
      }
      
      // 非首次尝试时延迟
      if (attempt > 0) {
        const delay = this._calculateExponentialBackoffDelay(attempt, baseDelay, maxDelay);
        await this._sleep(delay, abortController.signal);
      }
      
      return await fn();
      
    } catch (error) {
      lastError = error;
      
      // 判断是否应该重试
      if (!this._shouldRetry(error, retryCondition)) {
        throw error;
      }
      
      // 重试用尽
      if (attempt === maxRetries) {
        error.retryAttempts = attempt;
        throw error;
      }
    }
  }
  
  throw lastError;
}

/**
 * 判断是否应该重试(支持自定义重试条件)
 */
_shouldRetry(error, retryCondition) {
  // 不重试的错误类型
  const noRetryTypes = ['cancel', 'timeout', 'queue_full', 'queue_timeout', 'lock_failed'];
  if (noRetryTypes.includes(error.type)) {
    return false;
  }
  
  // 如果提供了自定义重试条件函数,使用它
  if (typeof retryCondition === 'function') {
    return retryCondition(error);
  }
  
  // 默认重试条件:非特定错误都重试
  return true;
}

资源自动清理 (_cleanupExpiredLocks)

/**
 * 清理过期锁和队列
 * 设计特点:
 * 1. 定期执行:每60秒自动清理一次
 * 2. 双重清理:同时清理过期锁和队列项
 * 3. 优雅终止:清理时发送取消信号
 * 4. 统计记录:记录清理操作便于监控
 */
_cleanupExpiredLocks() {
  const now = Date.now();
  const maxAge = this._defaults.maxLockAge;
  
  // 清理过期锁
  for (const [name, lockItem] of this._lockMap.entries()) {
    if (lockItem.locked && (now - lockItem.createdAt) > maxAge) {
      console.warn(`清理过期锁【${name}】,已锁定${now - lockItem.createdAt}ms`);
      
      const error = new Error('锁过期自动清理');
      error.type = 'timeout';
      error.code = 'LOCK_EXPIRED';
      
      if (lockItem.abortController) {
        lockItem.abortController.abort(error);
      }
      
      this._lockMap.delete(name);
    }
  }
  
  // 清理过期队列项
  for (const [name, queue] of this._queueMap.entries()) {
    // ... 清理逻辑
  }
}

📚 使用方法

基本安装

// 方式1:使用默认单例(无控制台警告)
import { asyncLock, releaseLock } from './asyncLock';

// 方式2:创建自定义实例
import { createLockManager } from './asyncLock';
const myLockManager = createLockManager({
  timeout: 10000,
  maxQueueSize: 10,
  tipHandler: (msg) => console.warn(msg)
});

// 方式3:使用带控制台警告的单例
import { verboseLockManager } from './asyncLock';

基础配置选项

const options = {
  // 必需:锁名称(用于标识任务类型)
  name: 'submitForm',
  
  // 必需:异步任务函数
  asyncFn: async (signal) => {
    // signal 是 AbortSignal,用于取消任务
    if (signal.aborted) throw new Error('任务已取消');
    return await fetch('/api/submit', { signal }).then(r => r.json());
  },
  
  // 可选:任务超时时间(毫秒)
  timeout: 8000,
  
  // 可选:重试次数(默认0)
  retryCount: 2,
  
  // 可选:基础重试延迟(毫秒)
  baseRetryDelay: 1000,
  
  // 可选:最大重试延迟(毫秒)
  maxRetryDelay: 10000,
  
  // 可选:自定义重试条件函数
  retryCondition: (error) => {
    // 只对网络错误重试
    return error.message.includes('Network') || error.message.includes('timeout');
  },
  
  // 可选:重复执行时的提示信息
  repeatTip: '操作中,请稍后...',
  
  // 可选:重复执行时是否抛出错误(默认true)
  throwRepeatError: true,
  
  // 可选:是否启用队列(默认false)
  enableQueue: true,
  
  // 可选:队列最大长度(默认100)
  maxQueueSize: 5,
  
  // 可选:成功回调
  onSuccess: (result) => {
    console.log('任务成功:', result);
  },
  
  // 可选:失败回调
  onFail: (error) => {
    console.error('任务失败:', error.message);
  },
  
  // 可选:提示处理器(用于显示重复提示)
  tipHandler: (message) => {
    Toast.warning(message);
  }
};

执行任务

// 使用默认单例
try {
  const result = await asyncLock(options);
  console.log('执行结果:', result);
} catch (error) {
  if (error.code === 'LOCKED') {
    // 重复执行被拒绝
    console.warn('请勿重复操作');
  } else if (error.code === 'QUEUE_FULL') {
    // 队列已满
    console.error('系统繁忙,请稍后重试');
  } else {
    // 其他错误
    console.error('执行失败:', error);
  }
}

// 使用自定义实例
try {
  const result = await myLockManager.execute(options);
  console.log('执行结果:', result);
} catch (error) {
  // 错误处理
}

锁管理操作

import { 
  asyncLock, 
  releaseLock, 
  releaseAllLocks, 
  cancelLockTask,
  getLockStatus,
  getStats,
  resetStats 
} from './asyncLock';

// 1. 手动释放指定锁
releaseLock('submitForm');

// 2. 释放所有锁
releaseAllLocks();

// 3. 取消正在执行的任务
const cancelled = cancelLockTask('submitForm', '用户主动取消');
if (cancelled) {
  console.log('任务已取消');
}

// 4. 获取锁状态
const status = getLockStatus('submitForm');
console.log('锁状态:', {
  是否锁定: status.locked,
  锁定时长: `${status.age}ms`,
  队列长度: status.queueLength
});

// 5. 获取统计信息
const stats = getStats();
console.log('执行统计:', {
  总执行次数: stats.totalExecutions,
  成功次数: stats.successCount,
  超时次数: stats.timeoutCount,
  当前活跃锁: stats.activeLocks.length
});

// 6. 重置统计
resetStats();

📋 使用案例

案例1:表单提交防重复

/**
 * 场景:表单提交按钮防止用户重复点击
 * 需求:提交期间禁用按钮,防止重复提交,支持取消
 */
class FormSubmitService {
  constructor() {
    this.isSubmitting = false;
  }
  
  async submitForm(formData) {
    if (this.isSubmitting) {
      Toast.warning('正在提交,请稍候...');
      return;
    }
    
    this.isSubmitting = true;
    
    try {
      const result = await asyncLock({
        name: 'formSubmit',
        asyncFn: async (signal) => {
          // 模拟API调用
          const response = await fetch('/api/submit', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify(formData),
            signal
          });
          
          if (!response.ok) {
            throw new Error(`提交失败: ${response.status}`);
          }
          
          return await response.json();
        },
        timeout: 10000,
        retryCount: 1,
        baseRetryDelay: 2000,
        repeatTip: '正在提交中,请勿重复点击',
        tipHandler: (msg) => Toast.warning(msg),
        onSuccess: (result) => {
          Toast.success('提交成功!');
          console.log('提交结果:', result);
        },
        onFail: (error) => {
          if (error.code !== 'LOCKED') {
            Toast.error(`提交失败: ${error.message}`);
          }
        }
      });
      
      return result;
    } finally {
      this.isSubmitting = false;
    }
  }
  
  // 用户离开页面时取消提交
  cancelSubmit() {
    cancelLockTask('formSubmit', '用户离开页面');
  }
}

// 使用
const formService = new FormSubmitService();

// 提交表单
submitButton.addEventListener('click', async () => {
  const formData = collectFormData();
  await formService.submitForm(formData);
});

// 页面离开时取消
window.addEventListener('beforeunload', () => {
  formService.cancelSubmit();
});

案例2:支付订单防重复

/**
 * 场景:支付订单防止重复支付
 * 需求:支付期间锁定订单,防止重复支付,支持队列
 */
class PaymentService {
  constructor(orderId) {
    this.orderId = orderId;
    this.lockName = `payment_${orderId}`;
  }
  
  async processPayment(paymentData) {
    try {
      return await asyncLock({
        name: this.lockName,
        asyncFn: async (signal) => {
          // 调用支付接口
          const paymentResult = await this.callPaymentApi(paymentData, signal);
          
          // 更新订单状态
          await this.updateOrderStatus(paymentResult, signal);
          
          return paymentResult;
        },
        timeout: 30000, // 支付操作需要更长时间
        retryCount: 2,
        baseRetryDelay: 3000,
        maxRetryDelay: 15000,
        // 只对网络错误和服务器5xx错误重试
        retryCondition: (error) => {
          const isNetworkError = error.message.includes('Network') || 
                                 error.message.includes('fetch');
          const isServerError = error.message.includes('50') || 
                                error.message.includes('服务不可用');
          return isNetworkError || isServerError;
        },
        enableQueue: true,
        maxQueueSize: 1, // 同一订单只允许一个排队
        repeatTip: '订单支付处理中,请稍候...',
        tipHandler: (msg) => {
          showPaymentStatus(msg);
        },
        onSuccess: (result) => {
          showPaymentSuccess(result);
          trackPaymentEvent('success', this.orderId);
        },
        onFail: (error) => {
          if (error.code === 'LOCKED') {
            // 重复支付被阻止
            trackPaymentEvent('prevented_duplicate', this.orderId);
          } else if (error.code === 'QUEUE_FULL') {
            showPaymentError('订单正在处理,请勿重复操作');
          } else {
            showPaymentError(`支付失败: ${error.message}`);
            trackPaymentEvent('failed', this.orderId, error);
          }
        }
      });
    } catch (error) {
      console.error('支付处理异常:', error);
      throw error;
    }
  }
  
  async callPaymentApi(paymentData, signal) {
    // 模拟支付API调用
    const response = await fetch('/api/payment/process', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        orderId: this.orderId,
        ...paymentData
      }),
      signal
    });
    
    if (!response.ok) {
      throw new Error(`支付API错误: ${response.status}`);
    }
    
    return await response.json();
  }
  
  async updateOrderStatus(paymentResult, signal) {
    // 更新订单状态
    const response = await fetch(`/api/orders/${this.orderId}/status`, {
      method: 'PUT',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        status: 'paid',
        paymentId: paymentResult.paymentId,
        paidAt: new Date().toISOString()
      }),
      signal
    });
    
    if (!response.ok) {
      throw new Error(`订单状态更新失败: ${response.status}`);
    }
  }
  
  // 取消支付
  cancelPayment() {
    const cancelled = cancelLockTask(this.lockName, '用户取消支付');
    if (cancelled) {
      showPaymentStatus('支付已取消');
      trackPaymentEvent('cancelled', this.orderId);
    }
    return cancelled;
  }
}

// 使用
const paymentService = new PaymentService('ORDER_123456');

// 开始支付
paymentButton.addEventListener('click', async () => {
  const paymentData = {
    amount: 100.00,
    method: 'credit_card',
    cardToken: 'tok_123456'
  };
  
  try {
    await paymentService.processPayment(paymentData);
  } catch (error) {
    console.error('支付失败:', error);
  }
});

// 取消支付
cancelButton.addEventListener('click', () => {
  paymentService.cancelPayment();
});

案例3:文件上传队列管理

/**
 * 场景:批量文件上传,需要控制并发和防止重复上传
 * 需求:同一文件不能重复上传,上传任务需要排队
 */
class FileUploadManager {
  constructor() {
    this.uploadQueue = new Map(); // fileId -> upload promise
  }
  
  async uploadFile(file, options = {}) {
    const fileId = this.generateFileId(file);
    const lockName = `upload_${fileId}`;
    
    // 如果已经在队列中,返回已有的Promise
    if (this.uploadQueue.has(fileId)) {
      return this.uploadQueue.get(fileId);
    }
    
    const uploadPromise = asyncLock({
      name: lockName,
      asyncFn: async (signal) => {
        try {
          // 更新UI状态
          this.updateFileStatus(fileId, 'uploading');
          
          // 执行上传
          const result = await this.doUpload(file, signal, options);
          
          // 上传成功
          this.updateFileStatus(fileId, 'success');
          return result;
        } catch (error) {
          // 上传失败
          this.updateFileStatus(fileId, 'error');
          throw error;
        }
      },
      timeout: 5 * 60 * 1000, // 5分钟超时
      retryCount: 3,
      baseRetryDelay: 5000,
      maxRetryDelay: 60000,
      retryCondition: (error) => {
        // 只对网络错误重试
        return error.message.includes('network') || 
               error.message.includes('timeout') ||
               error.message.includes('Network');
      },
      enableQueue: true,
      maxQueueSize: 0, // 同一文件不上传队列
      repeatTip: '文件正在上传中...',
      tipHandler: (msg) => {
        console.log(`文件 ${file.name}: ${msg}`);
      },
      onSuccess: (result) => {
        console.log(`文件 ${file.name} 上传成功:`, result);
        this.uploadQueue.delete(fileId);
      },
      onFail: (error) => {
        console.error(`文件 ${file.name} 上传失败:`, error);
        this.uploadQueue.delete(fileId);
      },
      autoCleanup: false // 手动清理,避免上传完成前锁被清理
    });
    
    // 保存到队列
    this.uploadQueue.set(fileId, uploadPromise);
    
    return uploadPromise;
  }
  
  async doUpload(file, signal, options) {
    const formData = new FormData();
    formData.append('file', file);
    
    // 添加上传进度回调
    const xhr = new XMLHttpRequest();
    
    return new Promise((resolve, reject) => {
      // 监听取消信号
      if (signal.aborted) {
        reject(new Error('上传被取消'));
        return;
      }
      
      const onAbort = () => {
        xhr.abort();
        reject(new Error('上传被取消'));
      };
      
      signal.addEventListener('abort', onAbort);
      
      // 设置上传进度
      xhr.upload.addEventListener('progress', (event) => {
        if (event.lengthComputable) {
          const percent = Math.round((event.loaded / event.total) * 100);
          this.updateUploadProgress(fileId, percent);
        }
      });
      
      // 完成处理
      xhr.addEventListener('load', () => {
        signal.removeEventListener('abort', onAbort);
        
        if (xhr.status >= 200 && xhr.status < 300) {
          resolve(JSON.parse(xhr.responseText));
        } else {
          reject(new Error(`上传失败: ${xhr.status} ${xhr.statusText}`));
        }
      });
      
      xhr.addEventListener('error', () => {
        signal.removeEventListener('abort', onAbort);
        reject(new Error('网络错误,上传失败'));
      });
      
      xhr.addEventListener('abort', () => {
        signal.removeEventListener('abort', onAbort);
        reject(new Error('上传被取消'));
      });
      
      // 开始上传
      xhr.open('POST', '/api/upload');
      xhr.send(formData);
    });
  }
  
  generateFileId(file) {
    // 生成文件唯一ID(实际项目中可能需要更复杂的逻辑)
    return `${file.name}_${file.size}_${file.lastModified}`;
  }
  
  updateFileStatus(fileId, status) {
    // 更新UI显示
    console.log(`文件 ${fileId} 状态: ${status}`);
  }
  
  updateUploadProgress(fileId, percent) {
    // 更新上传进度
    console.log(`文件 ${fileId} 上传进度: ${percent}%`);
  }
  
  // 取消文件上传
  cancelUpload(file) {
    const fileId = this.generateFileId(file);
    const lockName = `upload_${fileId}`;
    
    const cancelled = cancelLockTask(lockName, '用户取消上传');
    if (cancelled) {
      this.uploadQueue.delete(fileId);
      this.updateFileStatus(fileId, 'cancelled');
      console.log(`文件 ${file.name} 上传已取消`);
    }
    
    return cancelled;
  }
  
  // 批量取消所有上传
  cancelAllUploads() {
    releaseAllLocks();
    this.uploadQueue.clear();
    console.log('所有文件上传已取消');
  }
}

// 使用
const uploadManager = new FileUploadManager();

// 上传文件
fileInput.addEventListener('change', async (event) => {
  const files = Array.from(event.target.files);
  
  for (const file of files) {
    try {
      await uploadManager.uploadFile(file);
    } catch (error) {
      console.error(`文件 ${file.name} 上传失败:`, error);
    }
  }
});

// 取消上传
cancelButton.addEventListener('click', () => {
  const file = getSelectedFile();
  uploadManager.cancelUpload(file);
});

案例4:全局配置管理

/**
 * 场景:应用全局配置需要防止并发修改
 * 需求:配置更新需要互斥,多个更新请求需要排队
 */
class ConfigManager {
  constructor() {
    this.config = {};
    this.lockManager = createLockManager({
      timeout: 15000,
      maxLockAge: 2 * 60 * 1000, // 2分钟
      maxQueueSize: 5,
      tipHandler: (msg) => console.log('[ConfigLock]', msg),
      enableStats: true
    });
  }
  
  async updateConfig(key, value, options = {}) {
    const lockName = `config_${key}`;
    
    try {
      const result = await this.lockManager.execute({
        name: lockName,
        asyncFn: async (signal) => {
          // 获取当前配置
          const currentConfig = await this.fetchConfig(key, signal);
          
          // 验证配置
          if (options.validate) {
            const isValid = await options.validate(value, currentConfig, signal);
            if (!isValid) {
              throw new Error('配置验证失败');
            }
          }
          
          // 更新配置
          const updateResult = await this.doUpdateConfig(key, value, signal);
          
          // 更新本地缓存
          this.config[key] = value;
          
          // 触发配置变更事件
          this.emitConfigChange(key, value, currentConfig);
          
          return updateResult;
        },
        timeout: options.timeout || 10000,
        retryCount: options.retryCount || 1,
        baseRetryDelay: 2000,
        retryCondition: (error) => {
          // 只对网络错误重试
          return error.message.includes('network') || 
                 error.message.includes('timeout') ||
                 error.name === 'TypeError'; // fetch错误
        },
        enableQueue: true,
        onSuccess: (result) => {
          console.log(`配置 ${key} 更新成功:`, result);
        },
        onFail: (error) => {
          if (error.code !== 'LOCKED') {
            console.error(`配置 ${key} 更新失败:`, error);
          }
        }
      });
      
      return result;
    } catch (error) {
      console.error(`配置 ${key} 更新异常:`, error);
      throw error;
    }
  }
  
  async fetchConfig(key, signal) {
    const response = await fetch(`/api/config/${key}`, { signal });
    if (!response.ok) {
      throw new Error(`获取配置失败: ${response.status}`);
    }
    return await response.json();
  }
  
  async doUpdateConfig(key, value, signal) {
    const response = await fetch(`/api/config/${key}`, {
      method: 'PUT',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ value }),
      signal
    });
    
    if (!response.ok) {
      throw new Error(`更新配置失败: ${response.status}`);
    }
    
    return await response.json();
  }
  
  emitConfigChange(key, newValue, oldValue) {
    // 触发配置变更事件
    const event = new CustomEvent('configChange', {
      detail: { key, newValue, oldValue }
    });
    window.dispatchEvent(event);
  }
  
  // 批量更新配置(多个配置项原子更新)
  async batchUpdateConfig(updates, options = {}) {
    const lockName = 'config_batch_update';
    
    return await this.lockManager.execute({
      name: lockName,
      asyncFn: async (signal) => {
        // 开始事务
        const transactionId = await this.beginTransaction(signal);
        
        try {
          const results = {};
          
          // 依次更新每个配置
          for (const [key, value] of Object.entries(updates)) {
            const result = await this.doUpdateConfig(key, value, signal);
            results[key] = result;
            this.config[key] = value;
          }
          
          // 提交事务
          await this.commitTransaction(transactionId, signal);
          
          // 触发批量变更事件
          this.emitBatchConfigChange(updates);
          
          return results;
        } catch (error) {
          // 回滚事务
          await this.rollbackTransaction(transactionId, signal);
          throw error;
        }
      },
      timeout: 30000, // 批量操作需要更长时间
      retryCount: 0, // 批量操作不重试
      enableQueue: true,
      maxQueueSize: 1 // 批量操作只允许一个排队
    });
  }
  
  async beginTransaction(signal) {
    const response = await fetch('/api/config/transaction/begin', { signal });
    if (!response.ok) {
      throw new Error('开始事务失败');
    }
    const data = await response.json();
    return data.transactionId;
  }
  
  async commitTransaction(transactionId, signal) {
    const response = await fetch('/api/config/transaction/commit', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ transactionId }),
      signal
    });
    
    if (!response.ok) {
      throw new Error('提交事务失败');
    }
  }
  
  async rollbackTransaction(transactionId, signal) {
    await fetch('/api/config/transaction/rollback', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ transactionId }),
      signal
    }).catch(() => {
      // 回滚失败也继续,不影响主流程
    });
  }
  
  emitBatchConfigChange(updates) {
    const event = new CustomEvent('configBatchChange', {
      detail: { updates }
    });
    window.dispatchEvent(event);
  }
  
  // 获取锁管理器统计信息(用于监控)
  getLockStats() {
    return this.lockManager.getStats();
  }
  
  // 清理所有配置锁
  cleanupConfigLocks() {
    this.lockManager.releaseAllLocks();
  }
}

// 使用
const configManager = new ConfigManager();

// 更新单个配置
async function updateTheme(theme) {
  try {
    await configManager.updateConfig('theme', theme, {
      validate: async (value, current) => {
        // 验证主题是否有效
        const validThemes = ['light', 'dark', 'auto'];
        return validThemes.includes(value);
      },
      retryCount: 2
    });
  } catch (error) {
    if (error.code === 'LOCKED') {
      console.log('配置正在更新中,请稍后');
    } else {
      console.error('更新主题失败:', error);
    }
  }
}

// 批量更新配置
async function updateUserPreferences(prefs) {
  try {
    const results = await configManager.batchUpdateConfig(prefs);
    console.log('偏好设置更新成功:', results);
  } catch (error) {
    console.error('批量更新失败:', error);
  }
}

// 监控锁状态
setInterval(() => {
  const stats = configManager.getLockStats();
  if (stats.activeLocks.length > 0) {
    console.log('活跃的配置锁:', stats.activeLocks);
  }
}, 60000);

🔧 高级配置

自定义重试策略

// 基于错误类型的智能重试策略
const smartRetryCondition = (error) => {
  // 网络错误:重试
  if (error.name === 'NetworkError' || 
      error.name === 'TypeError' || 
      error.message.includes('network')) {
    return true;
  }
  
  // 服务器5xx错误:重试
  if (error.status >= 500 && error.status < 600) {
    return true;
  }
  
  // 服务器4xx错误(除429外):不重试
  if (error.status >= 400 && error.status < 500 && error.status !== 429) {
    return false;
  }
  
  // 429 Too Many Requests:使用退避重试
  if (error.status === 429) {
    return true;
  }
  
  // 默认情况:不重试
  return false;
};

// 使用自定义重试条件
await asyncLock({
  name: 'apiCall',
  asyncFn: apiCallFunction,
  retryCount: 3,
  retryCondition: smartRetryCondition
});

性能监控集成

// 创建带监控的锁管理器
class MonitoredLockManager extends LockManager {
  constructor(options = {}) {
    super(options);
    this.metrics = {
      lockAcquisitionTime: [],
      taskExecutionTime: [],
      queueWaitTime: []
    };
  }
  
  async execute(options) {
    const startTime = performance.now();
    
    try {
      const result = await super.execute(options);
      
      // 记录执行时间
      const endTime = performance.now();
      const executionTime = endTime - startTime;
      this.metrics.taskExecutionTime.push(executionTime);
      
      // 发送性能指标
      this.sendMetrics({
        name: options.name,
        executionTime,
        success: true
      });
      
      return result;
    } catch (error) {
      const endTime = performance.now();
      const executionTime = endTime - startTime;
      
      // 发送错误指标
      this.sendMetrics({
        name: options.name,
        executionTime,
        success: false,
        errorType: error.type,
        errorCode: error.code
      });
      
      throw error;
    }
  }
  
  sendMetrics(metric) {
    // 发送到监控系统
    console.log('[LockMetrics]', metric);
    
    // 实际项目中可以发送到 APM 系统
    // sendToAPM('lock_execution', metric);
  }
  
  getPerformanceMetrics() {
    const calculateStats = (array) => {
      if (array.length === 0) return null;
      
      const sum = array.reduce((a, b) => a + b, 0);
      const avg = sum / array.length;
      const max = Math.max(...array);
      const min = Math.min(...array);
      
      return { count: array.length, avg, min, max, sum };
    };
    
    return {
      taskExecution: calculateStats(this.metrics.taskExecutionTime),
      lockAcquisition: calculateStats(this.metrics.lockAcquisitionTime),
      queueWait: calculateStats(this.metrics.queueWaitTime)
    };
  }
}

// 使用带监控的锁管理器
const monitoredManager = new MonitoredLockManager();

// 定期打印性能指标
setInterval(() => {
  const metrics = monitoredManager.getPerformanceMetrics();
  console.log('锁管理器性能指标:', metrics);
}, 60000);

📊 性能建议

最佳实践

  1. 合理设置超时时间
    • 快速操作:1-5秒
    • 普通操作:5-10秒
    • 长时间操作:10-30秒
    • 文件上传等:1-5分钟
  1. 队列配置建议
    • 关键操作:队列大小 1(确保严格顺序)
    • 普通操作:队列大小 3-5
    • 批量操作:队列大小 10-20
    • 注意:队列越大,内存占用越高
  1. 重试策略建议
    • 网络请求:重试2-3次,基础延迟1-3秒
    • 支付操作:重试1-2次,基础延迟2-5秒
    • 文件操作:重试0-1次,基础延迟5-10秒
  1. 内存管理
    • 定期检查锁状态,避免内存泄漏
    • 页面卸载时调用 destroy() 清理资源
    • 监控队列长度,避免无限增长
  1. 错误处理
    • 区分用户取消和系统错误
    • 对不同的错误类型采取不同的处理策略
    • 记录详细的错误日志以便排查

🐛 常见问题

Q1: 锁会自动释放吗?

A: 是的。锁会在以下情况下自动释放:

  • 任务执行完成(成功或失败)
  • 任务超时
  • 锁过期(超过 maxLockAge 配置)
  • 手动调用 releaseLock() 或 releaseAllLocks()

Q2: 队列中的任务会按顺序执行吗?

A: 是的。队列采用 FIFO(先进先出)原则,任务会按照加入队列的顺序依次执行。

Q3: 如何防止内存泄漏?

A: 锁管理器内置以下防护措施:

  1. 定期清理过期锁(默认60秒一次)
  2. 队列项超时自动清理
  3. 页面卸载时可以调用 destroy() 方法
  4. 所有定时器和事件监听器都有清理逻辑

Q4: 支持分布式环境吗?

A: 当前版本是单机内存锁,适用于单页面应用或单服务器环境。如果需要分布式锁,可以基于此模式扩展,使用 Redis 或其他分布式存储作为锁存储后端。

Q5: 如何监控锁管理器的状态?

A: 可以通过以下方式监控:

  1. 使用 getLockStatus(name) 获取特定锁状态
  2. 使用 getStats() 获取全局统计信息
  3. 继承 LockManager 类添加自定义监控
  4. 监听相关事件(需要自行扩展事件系统)

📈 扩展建议

如果未来需要扩展功能,可以考虑:

  1. 分布式锁支持:集成 Redis 或其他分布式存储
  2. 锁优先级:为队列中的任务添加优先级
  3. 锁续期机制:长时间任务自动续期
  4. 事件系统:锁状态变化时触发事件
  5. 浏览器存储持久化:页面刷新后恢复锁状态
  6. 更复杂的队列算法:支持优先级队列、延迟队列等

📝 总结

LockManager 是一个功能全面、设计优雅的异步任务互斥锁工具,它解决了传统防抖节流方案的诸多痛点,特别适合以下场景:

  • ✅ 表单提交:防止重复提交
  • ✅ 支付操作:防止重复支付
  • ✅ 文件上传:同一文件不上传多次
  • ✅ 配置更新:防止并发修改配置
  • ✅ 关键操作:需要严格顺序执行的操作
  • ✅ 资源竞争:多组件共享资源时的并发控制

通过合理使用这个工具,可以显著提升应用的数据一致性和用户体验,避免因并发操作导致的业务逻辑错误。


📄 完整代码

  1. 默认单例 (asyncLock):适合大多数场景
  2. 自定义实例 (createLockManager):需要不同配置时使用
  3. 类直接使用 (LockManager):需要继承扩展时使用

工具已经过精心设计和测试,可以直接在生产环境中使用。

/**
 * 异步任务互斥锁工具
 * 需求:防抖节流不能防止api接口长时间未返回。如果用户等待一小段时候后重新点击提交,会导致重新触发请求;
 * 解决思路:用互斥锁思路处理异步任务锁定,通过name进行异步任务锁定,防止重入。
 * 核心能力:防止异步任务未完成时重复执行、超时控制、任务取消、资源自动清理
 * 支持:队列机制、指数退避重试、原子操作、错误分类、性能监控
 */
class LockManager {
  constructor(options = {}) {
    // 存储所有锁状态
    this._lockMap = new Map();
    
    // 等待队列
    this._queueMap = new Map();
    
    // 默认配置
    this._defaults = {
      timeout: 10000,
      repeatTip: '操作中,请稍后...',
      throwRepeatError: true,
      autoCleanup: true,
      maxLockAge: 5 * 60 * 1000,
      maxQueueSize: 100,
      enableStats: true,
      tipHandler: () => {}, 
      ...options
    };
    
    // 统计信息
    this._stats = {
      totalExecutions: 0,
      successCount: 0,
      timeoutCount: 0,
      cancelCount: 0,
      repeatRejectCount: 0,
      queueFullCount: 0,
      retryCount: 0
    };
    
    // 定期清理过期锁和队列
    this._cleanupInterval = setInterval(() => this._cleanupExpiredLocks(), 60000);
    
    // 绑定方法,确保在回调中使用正确的this
    this._processNextInQueue = this._processNextInQueue.bind(this);
  }

  /**
   * 原子性地获取锁
   */
  _acquireLock(name) {
    const attemptId = `${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
    const now = Date.now();
    
    // 第一重检查
    const existing = this._lockMap.get(name);
    if (existing?.locked) {
      return null;
    }
    
    // 创建新的锁对象
    const lockItem = {
      locked: true,
      abortController: new AbortController(),
      timeoutTimer: null,
      createdAt: now,
      taskId: `${name}_${now}_${Math.random().toString(36).slice(2, 10)}`,
      attemptId: attemptId,
      waitingQueue: this._queueMap.get(name) || []
    };
    
    // 第二重检查(原子性保障)
    const current = this._lockMap.get(name);
    if (current?.locked) {
      return null;
    }
    
    // 设置锁(原子操作)
    this._lockMap.set(name, lockItem);
    
    // 最终验证
    const afterSet = this._lockMap.get(name);
    if (afterSet?.attemptId !== attemptId) {
      lockItem.abortController.abort();
      return null;
    }
    
    return lockItem;
  }

  /**
   * 将任务加入等待队列
   */
  _addToQueue(options) {
    const { name, maxQueueSize = this._defaults.maxQueueSize } = options;
    
    let queue = this._queueMap.get(name);
    if (!queue) {
      queue = [];
      this._queueMap.set(name, queue);
    }
    
    if (queue.length >= maxQueueSize) {
      this._stats.queueFullCount++;
      const error = new Error(`任务队列【${name}】已满(最大${maxQueueSize})`);
      error.type = 'queue_full';
      error.code = 'QUEUE_FULL';
      return Promise.reject(error);
    }
    
    return new Promise((resolve, reject) => {
      const queueItem = {
        options,
        resolve,
        reject,
        enqueuedAt: Date.now()
      };
      
      queue.push(queueItem);
      
      if (options.timeout > 0) {
        queueItem.timeoutTimer = setTimeout(() => {
          const index = queue.indexOf(queueItem);
          if (index > -1) {
            queue.splice(index, 1);
            const error = new Error(`任务【${name}】在队列中等待超时`);
            error.type = 'queue_timeout';
            error.code = 'QUEUE_TIMEOUT';
            reject(error);
            
            if (queue.length === 0) {
              this._queueMap.delete(name);
            }
          }
        }, options.timeout);
      }
    });
  }

  /**
   * 处理队列中的下一个任务(使用微任务)
   */
  async _processNextInQueue(name) {
    const queue = this._queueMap.get(name);
    if (!queue || queue.length === 0) {
      this._queueMap.delete(name);
      return;
    }
    
    // 使用微任务处理,避免 setTimeout 的延迟
    await Promise.resolve();
    
    const queueItem = queue.shift();
    
    if (queueItem.timeoutTimer) {
      clearTimeout(queueItem.timeoutTimer);
    }
    
    try {
      const result = await this._executeTask(queueItem.options);
      queueItem.resolve(result);
    } catch (error) {
      queueItem.reject(error);
    } finally {
      // 继续处理下一个(递归)
      if (queue.length > 0) {
        // 再次使用微任务
        Promise.resolve().then(() => this._processNextInQueue(name));
      } else {
        this._queueMap.delete(name);
      }
    }
  }

  /**
   * 执行任务核心逻辑
   */
  async _executeTask(options) {
    const {
      name,
      asyncFn,
      timeout = this._defaults.timeout,
      retryCount = 0,
      baseRetryDelay = 1000,
      maxRetryDelay = 30000,
      retryCondition = null // 自定义重试条件函数
    } = options;
    
    const lockItem = this._acquireLock(name);
    if (!lockItem) {
      const error = new Error(`无法获取锁【${name}】`);
      error.type = 'lock_failed';
      error.code = 'LOCK_FAILED';
      throw error;
    }
    
    let result;
    try {
      if (timeout > 0) {
        lockItem.timeoutTimer = setTimeout(() => {
          const timeoutError = new Error(`任务【${name}】超时(${timeout}ms)`);
          timeoutError.type = 'timeout';
          timeoutError.code = 'TIMEOUT';
          lockItem.abortController.abort(timeoutError);
        }, timeout);
      }
      
      result = await this._executeWithExponentialBackoff(
        () => asyncFn(lockItem.abortController.signal),
        retryCount,
        baseRetryDelay,
        maxRetryDelay,
        lockItem.abortController,
        retryCondition // 传递重试条件
      );
      
      return result;
      
    } catch (error) {
      error.lockName = name;
      error.taskId = lockItem.taskId;
      throw error;
      
    } finally {
      this._cleanupLock(name, lockItem, options.autoCleanup ?? this._defaults.autoCleanup);
      
      // 使用微任务处理下一个队列任务
      Promise.resolve().then(() => this._processNextInQueue(name));
    }
  }

  /**
   * 判断是否应该重试(支持自定义重试条件)
   */
  _shouldRetry(error, retryCondition) {
    // 不重试的错误类型
    const noRetryTypes = ['cancel', 'timeout', 'queue_full', 'queue_timeout', 'lock_failed'];
    if (noRetryTypes.includes(error.type)) {
      return false;
    }
    
    // 如果提供了自定义重试条件函数,使用它
    if (typeof retryCondition === 'function') {
      return retryCondition(error);
    }
    
    // 默认重试条件:非特定错误都重试
    return true;
  }

  /**
   * 指数退避重试执行(支持自定义重试条件)
   */
  async _executeWithExponentialBackoff(fn, maxRetries, baseDelay, maxDelay, abortController, retryCondition) {
    let lastError;
    
    for (let attempt = 0; attempt <= maxRetries; attempt++) {
      try {
        if (abortController.signal.aborted) {
          const cancelError = new Error('任务已被取消');
          cancelError.type = 'cancel';
          cancelError.code = 'CANCELLED';
          throw cancelError;
        }
        
        if (attempt > 0) {
          const delay = this._calculateExponentialBackoffDelay(
            attempt,
            baseDelay,
            maxDelay
          );
          
          this._stats.retryCount++;
          console.log(`任务重试第${attempt}次,延迟${delay}ms`);
          
          await this._sleep(delay, abortController.signal);
        }
        
        return await fn();
        
      } catch (error) {
        lastError = error;
        
        // 使用统一的判断逻辑决定是否重试
        if (!this._shouldRetry(error, retryCondition)) {
          throw error;
        }
        
        if (attempt === maxRetries) {
          error.retryAttempts = attempt;
          throw error;
        }
      }
    }
    
    throw lastError;
  }

  /**
   * 计算指数退避延迟
   */
  _calculateExponentialBackoffDelay(attempt, baseDelay, maxDelay) {
    const exponentialDelay = baseDelay * Math.pow(2, attempt - 1);
    const jitter = exponentialDelay * 0.1 * Math.random();
    return Math.min(exponentialDelay + jitter, maxDelay);
  }

  /**
   * 可中断的延时(安全的事件监听清理)
   */
  _sleep(ms, signal) {
    return new Promise((resolve, reject) => {
      if (signal.aborted) {
        reject(new Error('等待被中断'));
        return;
      }
      
      const timer = setTimeout(() => {
        // 清理事件监听
        signal.removeEventListener('abort', abortHandler);
        resolve();
      }, ms);
      
      const abortHandler = () => {
        clearTimeout(timer);
        const error = new Error('等待被中断');
        error.type = 'cancel';
        error.code = 'SLEEP_CANCELLED';
        reject(error);
      };
      
      signal.addEventListener('abort', abortHandler);
      
      // 确保在Promise settled后清理
      const cleanup = () => {
        clearTimeout(timer);
        signal.removeEventListener('abort', abortHandler);
      };
      
      // 无论成功还是失败都执行清理
      this._safeFinally(() => {
        cleanup();
      }, resolve, reject);
    });
  }

  /**
   * 安全的finally执行,避免影响原始Promise
   */
  _safeFinally(cleanupFn, resolve, reject) {
    const wrappedResolve = (value) => {
      try {
        cleanupFn();
      } finally {
        resolve(value);
      }
    };
    
    const wrappedReject = (error) => {
      try {
        cleanupFn();
      } finally {
        reject(error);
      }
    };
    
    return { resolve: wrappedResolve, reject: wrappedReject };
  }

  /**
   * 清理锁资源
   */
  _cleanupLock(name, lockItem, autoCleanup) {
    if (lockItem.timeoutTimer) {
      clearTimeout(lockItem.timeoutTimer);
      lockItem.timeoutTimer = null;
    }
    
    if (lockItem.abortController) {
      lockItem.abortController = null;
    }
    
    if (autoCleanup) {
      this._lockMap.delete(name);
    } else {
      lockItem.locked = false;
      lockItem.abortController = null;
      lockItem.timeoutTimer = null;
    }
  }

  /**
   * 清理过期锁和队列
   */
  _cleanupExpiredLocks() {
    const now = Date.now();
    const maxAge = this._defaults.maxLockAge;
    
    // 清理过期锁
    for (const [name, lockItem] of this._lockMap.entries()) {
      if (lockItem.locked && (now - lockItem.createdAt) > maxAge) {
        console.warn(`清理过期锁【${name}】,已锁定${now - lockItem.createdAt}ms`);
        
        const error = new Error('锁过期自动清理');
        error.type = 'timeout';
        error.code = 'LOCK_EXPIRED';
        
        if (lockItem.abortController) {
          lockItem.abortController.abort(error);
        }
        
        this._lockMap.delete(name);
      }
    }
    
    // 清理过期队列项
    for (const [name, queue] of this._queueMap.entries()) {
      const expiredItems = [];
      
      for (let i = 0; i < queue.length; i++) {
        const item = queue[i];
        const queueAge = now - item.enqueuedAt;
        const timeout = item.options?.timeout || 30000;
        if (queueAge > timeout) {
          expiredItems.push(i);
        }
      }
      
      for (let i = expiredItems.length - 1; i >= 0; i--) {
        const index = expiredItems[i];
        const item = queue[index];
        
        if (item.timeoutTimer) {
          clearTimeout(item.timeoutTimer);
        }
        
        const error = new Error(`任务【${name}】在队列中过期`);
        error.type = 'queue_timeout';
        error.code = 'QUEUE_TIMEOUT';
        item.reject(error);
        
        queue.splice(index, 1);
      }
      
      if (queue.length === 0) {
        this._queueMap.delete(name);
      }
    }
  }

  /**
   * 执行带锁的异步任务
   */
  async execute(options) {
    const {
      name,
      asyncFn,
      onSuccess,
      onFail,
      repeatTip = this._defaults.repeatTip,
      timeout = this._defaults.timeout,
      throwRepeatError = this._defaults.throwRepeatError,
      tipHandler = this._defaults.tipHandler, // 使用配置的默认值
      enableQueue = false,
      maxQueueSize = this._defaults.maxQueueSize,
      retryCount = 0,
      baseRetryDelay = 1000,
      maxRetryDelay = 30000,
      retryCondition = null, // 自定义重试条件
      autoCleanup = this._defaults.autoCleanup
    } = options;

    this._stats.totalExecutions++;

    try {
      const existingLock = this._lockMap.get(name);
      if (existingLock?.locked) {
        this._stats.repeatRejectCount++;
        
        const repeatError = new Error(repeatTip);
        repeatError.type = 'repeat';
        repeatError.code = 'LOCKED';
        repeatError.lockName = name;
        
        tipHandler(repeatTip);
        
        if (enableQueue) {
          console.log(`任务【${name}】加入等待队列,当前队列长度:${this._queueMap.get(name)?.length || 0}`);
          
          const queueOptions = {
            ...options,
            enableQueue: false,
            maxQueueSize: undefined
          };
          
          const queueResult = await this._addToQueue({
            ...queueOptions,
            name,
            maxQueueSize
          });
          
          onSuccess?.(queueResult);
          return queueResult;
        } else {
          onFail?.(repeatError);
          if (throwRepeatError) throw repeatError;
          return Promise.reject(repeatError);
        }
      }

      const result = await this._executeTask({
        name,
        asyncFn,
        timeout,
        retryCount,
        baseRetryDelay,
        maxRetryDelay,
        retryCondition, // 传递重试条件
        autoCleanup
      });

      this._stats.successCount++;
      onSuccess?.(result);
      return result;
      
    } catch (error) {
      switch (error.type) {
        case 'timeout':
          this._stats.timeoutCount++;
          break;
        case 'cancel':
          this._stats.cancelCount++;
          break;
        case 'queue_full':
          this._stats.queueFullCount++;
          break;
      }
      
      onFail?.(error);
      throw error;
    }
  }

  /**
   * 手动释放指定锁
   */
  releaseLock(name) {
    const lockItem = this._lockMap.get(name);
    if (lockItem) {
      this._cleanupLock(name, lockItem, true);
    }
    
    const queue = this._queueMap.get(name);
    if (queue) {
      queue.forEach(item => {
        if (item.timeoutTimer) {
          clearTimeout(item.timeoutTimer);
        }
        const error = new Error('锁被手动释放,队列任务取消');
        error.type = 'cancel';
        error.code = 'MANUAL_RELEASE';
        item.reject(error);
      });
      this._queueMap.delete(name);
    }
  }

  /**
   * 批量释放所有锁
   */
  releaseAllLocks() {
    this._lockMap.forEach((lockItem, name) => {
      this._cleanupLock(name, lockItem, true);
    });
    this._lockMap.clear();
    
    this._queueMap.forEach((queue, name) => {
      queue.forEach(item => {
        if (item.timeoutTimer) {
          clearTimeout(item.timeoutTimer);
        }
        const error = new Error('所有锁被释放,队列任务取消');
        error.type = 'cancel';
        error.code = 'ALL_RELEASED';
        item.reject(error);
      });
    });
    this._queueMap.clear();
  }

  /**
   * 取消正在执行的任务
   */
  cancelLockTask(name, reason = "用户主动取消") {
    const lockItem = this._lockMap.get(name);
    if (lockItem?.locked && lockItem.abortController) {
      const error = new Error(reason);
      error.type = 'cancel';
      error.code = 'USER_CANCEL';
      lockItem.abortController.abort(error);
      this._cleanupLock(name, lockItem, true);
      return true;
    }
    return false;
  }

  /**
   * 获取指定任务的锁状态
   */
  getLockStatus(name) {
    const lockItem = this._lockMap.get(name);
    const queue = this._queueMap.get(name);
    
    return {
      locked: lockItem?.locked ?? false,
      taskId: lockItem?.taskId,
      createdAt: lockItem?.createdAt,
      age: lockItem ? Date.now() - lockItem.createdAt : 0,
      hasAbortController: !!lockItem?.abortController,
      queueLength: queue?.length || 0,
      queueWaitTimes: queue?.map(item => Date.now() - item.enqueuedAt) || []
    };
  }

  /**
   * 获取统计信息
   */
  getStats() {
    return {
      ...this._stats,
      activeLocks: Array.from(this._lockMap.entries())
        .filter(([_, lock]) => lock.locked)
        .map(([name, lock]) => ({
          name,
          age: Date.now() - lock.createdAt,
          taskId: lock.taskId
        })),
      waitingQueues: Array.from(this._queueMap.entries())
        .map(([name, queue]) => ({
          name,
          length: queue.length,
          oldestWait: queue.length > 0 ? Date.now() - queue[0].enqueuedAt : 0
        }))
    };
  }

  /**
   * 重置统计信息
   */
  resetStats() {
    this._stats = {
      totalExecutions: 0,
      successCount: 0,
      timeoutCount: 0,
      cancelCount: 0,
      repeatRejectCount: 0,
      queueFullCount: 0,
      retryCount: 0
    };
  }

  /**
   * 销毁实例
   */
  destroy() {
    clearInterval(this._cleanupInterval);
    this.releaseAllLocks();
    this._queueMap.clear();
    this._lockMap.clear();
  }
}

// 创建锁管理器的工厂函数
export const createLockManager = (options) => new LockManager(options);

// 默认单例(无默认控制台警告)
export const defaultLockManager = new LockManager({
  tipHandler: () => {} // 明确指定空函数
});

// 带控制台警告的单例(如果需要)
export const verboseLockManager = new LockManager({
  tipHandler: console.warn
});

// 核心方法导出(使用默认单例)
export const asyncLock = (options) => defaultLockManager.execute(options);
export const releaseLock = (name) => defaultLockManager.releaseLock(name);
export const releaseAllLocks = () => defaultLockManager.releaseAllLocks();
export const cancelLockTask = (name, reason) => defaultLockManager.cancelLockTask(name, reason);
export const getLockStatus = (name) => defaultLockManager.getLockStatus(name);
export const getStats = () => defaultLockManager.getStats();
export const resetStats = () => defaultLockManager.resetStats();
export const destroyLockManager = () => defaultLockManager.destroy();

// 导出类本身
export { LockManager };

/*********************************************************************
 * 使用示例
 *********************************************************************/

/*
// 示例1:基础使用(无控制台警告)
import { asyncLock } from './asyncLock';

const submitForm = async () => {
  try {
    const result = await asyncLock({
      name: 'formSubmit',
      asyncFn: async (signal) => {
        if (signal.aborted) throw new Error('任务已被取消');
        return await api.submit(data);
      },
      timeout: 8000,
      retryCount: 2,
      baseRetryDelay: 1000,
      maxRetryDelay: 10000,
      onSuccess: (res) => console.log('提交成功:', res),
      tipHandler: (msg) => console.warn(msg) // 需要时才传入
    });
  } catch (err) {
    console.error('捕获到错误:', err);
  }
};

// 示例2:自定义重试条件
const fetchWithRetry = async () => {
  try {
    const result = await asyncLock({
      name: 'fetchData',
      asyncFn: async (signal) => {
        const response = await fetch('/api/data', { signal });
        if (!response.ok) throw new Error(`HTTP ${response.status}`);
        return await response.json();
      },
      retryCount: 3,
      retryCondition: (error) => {
        // 只对网络错误和5xx错误重试
        return error.message.includes('Failed to fetch') || 
               error.message.includes('HTTP 5');
      },
      onFail: (error) => {
        if (!error.message.includes('HTTP 4')) {
          console.error('需要重试的错误:', error);
        }
      }
    });
  } catch (err) {
    console.error('最终失败:', err);
  }
};

// 示例3:队列处理
const processWithQueue = async () => {
  try {
    const result = await asyncLock({
      name: 'heavyProcess',
      asyncFn: async (signal) => {
        // 耗时处理
        return await heavyProcessing();
      },
      enableQueue: true,
      maxQueueSize: 10,
      timeout: 30000,
      onSuccess: (res) => {
        console.log('处理完成,结果:', res);
      }
    });
  } catch (err) {
    if (err.code === 'QUEUE_FULL') {
      alert('系统繁忙,请稍后重试');
    }
  }
};

// 示例4:使用verbose版本(需要控制台警告)
import { verboseLockManager } from './asyncLock';

const verboseTask = async () => {
  const result = await verboseLockManager.execute({
    name: 'verboseTask',
    asyncFn: async () => {
      // 任务逻辑
    },
    // 会自动输出控制台警告
  });
};

// 示例5:多个锁管理器实例(隔离环境)
import { createLockManager } from './asyncLock';

const userLockManager = createLockManager({
  maxQueueSize: 5,
  tipHandler: (msg) => Toast.warning(msg)
});

const systemLockManager = createLockManager({
  timeout: 30000,
  tipHandler: console.error
});

// 分别使用
const userTask = async () => {
  await userLockManager.execute({
    name: 'userAction',
    asyncFn: userAction
  });
};

const systemTask = async () => {
  await systemLockManager.execute({
    name: 'systemTask',
    asyncFn: systemTask
  });
};
*/
❌
❌