阅读视图

发现新文章,点击刷新页面。

多 Agent 系统容错与恢复机制:OAuth 过期、Cron 级联失败的工程解法

多 Agent 系统容错与恢复机制:OAuth 过期、Cron 级联失败的工程解法

📖 踩坑实录系列,详细过程见公众号「Wesley AI 日记」,微信搜索关注。

标签:AI Agent、容错设计、Cron、OAuth、系统韧性、OpenClaw


前言:险些全军覆没的那一天

3 月 10 日,我的 OpenClaw Agent Team 几乎集体瘫痪。

时间线如下

  • 06:00 — 小红书热点追踪 Cron 触发,OAuth Token 过期,静默失败
  • 09:00 — 小红书内容准备 Cron 触发,依赖热点追踪结果,读到空数据,生成无效内容
  • 12:10 — 发布 Cron 触发,拿到无效内容,发布失败(参数格式错误)
  • 12:20 — CEO Agent 发现失败,spawn 补发 Agent,因无发布互斥锁,触发竞态
  • 13:08 — 并发的多个 Agent 各自「修复」问题,结果发布了 3 条重复内容(标题还被改了)

一个 OAuth Token 的过期,引发了整条 Agent 链路的级联崩溃。

这不是单个 Bug,这是多 Agent 系统在容错设计缺失时的系统性失效。

本文从工程角度,系统梳理多 Agent 系统的容错与恢复机制设计。


理解级联失败

多 Agent 系统中,Agent 之间存在依赖关系。一个上游 Agent 的失败,会以不可预期的方式传导给下游:

              ┌─────────────────┐
OAuth过期 ──▶ │ 热点追踪 Agent   │ ──▶ 静默失败(无告警)
              └────────┬────────┘
                       │ 输出:空数据
                       ▼
              ┌─────────────────┐
              │ 内容生成 Agent   │ ──▶ 生成无效内容(基于空输入)
              └────────┬────────┘
                       │ 输出:无效内容
                       ▼
              ┌─────────────────┐
              │ 发布 Agent       │ ──▶ 发布失败
              └────────┬────────┘
                       │ 触发
                       ▼
              ┌─────────────────┐
              │ CEO 补救 Agent ×3│ ──▶ 并发冲突,重复发布
              └─────────────────┘

级联失败的关键特征

  1. 失败的静默传播:上游失败没有立即终止下游,而是以「空数据」或「部分数据」的形式传递
  2. 错误放大:每一层都在「尽力完成任务」,却把问题越放越大
  3. 干预引发新问题:人工干预(spawn 补救 Agent)因缺乏协调机制,产生了新的竞态条件

容错机制设计:五个维度

1. OAuth Token 生命周期管理

OAuth Token 过期是多 Agent 系统中最常见的静默失败源。

错误的处理方式(我们之前的做法)

# 错误:调用 API 失败时,直接返回空结果
def fetch_hot_topics(token: str) -> list:
    try:
        resp = api.get_hot_topics(token)
        return resp.data
    except Exception:
        return []  # 🚨 静默返回空,下游无感知

正确的 Token 生命周期管理

class OAuthTokenManager:
    """OAuth Token 生命周期管理器"""
    
    def __init__(self, token_store: TokenStore):
        self.token_store = token_store
        self.refresh_threshold_minutes = 30  # 过期前 30 分钟主动刷新
    
    def get_valid_token(self, service: str) -> str:
        """获取有效 Token,必要时自动刷新"""
        token = self.token_store.get(service)
        
        if token is None:
            raise TokenNotFoundError(f"服务 {service} 未配置 Token")
        
        if self.is_expiring_soon(token):
            token = self.refresh_token(service, token)
        
        return token
    
    def is_expiring_soon(self, token: OAuthToken) -> bool:
        """检查 Token 是否即将过期"""
        remaining = token.expires_at - datetime.now()
        return remaining < timedelta(minutes=self.refresh_threshold_minutes)
    
    def refresh_token(self, service: str, old_token: OAuthToken) -> OAuthToken:
        """刷新 Token,失败时告警但不静默"""
        try:
            new_token = oauth_client.refresh(old_token.refresh_token)
            self.token_store.save(service, new_token)
            return new_token
        except OAuthRefreshError as e:
            # 刷新失败:告警 + 抛出异常,不返回空
            alert_manager.send_alert(
                level="P1",
                message=f"服务 {service} OAuth Token 刷新失败,需要重新授权",
                detail=str(e),
                notify_channel="feishu"
            )
            raise TokenExpiredError(f"服务 {service} Token 已过期且无法自动刷新")

主动刷新 Cron

# 每天凌晨 2 点检查所有 Token 状态
# 0 2 * * * /home/admin/.openclaw/scripts/token-health-check.sh

#!/bin/bash
# token-health-check.sh

SERVICES=("xhs_main" "xhs_account_b" "wechat_mp")

for service in "${SERVICES[@]}"; do
    expiry=$(token-manager get-expiry $service)
    remaining_hours=$(( (expiry - $(date +%s)) / 3600 ))
    
    if [ $remaining_hours -lt 48 ]; then
        echo "⚠️ $service Token 将在 ${remaining_hours}h 后过期,尝试刷新..."
        token-manager refresh $service || \
            notify-feishu "P1: $service Token 刷新失败,需要手动重新授权"
    fi
done

2. Cron 链路断路器(Circuit Breaker)

当上游 Cron 任务失败时,下游任务应该感知并选择正确的策略,而不是盲目继续执行。

class CronCircuitBreaker:
    """Cron 链路断路器"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def check_upstream_health(self, upstream_task_id: str) -> UpstreamStatus:
        """检查上游任务的健康状态"""
        status_key = f"cron:status:{upstream_task_id}"
        status = self.redis.get(status_key)
        
        if status is None:
            return UpstreamStatus.UNKNOWN
        
        status_data = json.loads(status)
        
        # 上游任务是否在预期时间内成功完成
        last_success = datetime.fromisoformat(status_data["last_success_at"])
        expected_interval = timedelta(minutes=status_data["interval_minutes"])
        
        if datetime.now() - last_success > expected_interval * 2:
            return UpstreamStatus.STALE  # 上游已过期未更新
        
        if status_data["last_run_result"] == "failed":
            return UpstreamStatus.FAILED
        
        return UpstreamStatus.HEALTHY
    
    def gate_downstream_task(
        self, 
        task: CronTask,
        upstream_task_id: str,
        strategy: FailStrategy = FailStrategy.HALT
    ) -> GateResult:
        """
        检查上游状态,决定下游任务是否应该执行。
        
        strategy:
          - HALT: 上游失败则停止(默认,安全优先)
          - DEGRADE: 降级执行(使用缓存/默认值)
          - PROCEED: 继续执行(用于不依赖上游输出的任务)
        """
        upstream_status = self.check_upstream_health(upstream_task_id)
        
        if upstream_status == UpstreamStatus.HEALTHY:
            return GateResult.allow()
        
        if strategy == FailStrategy.HALT:
            alert_manager.send_alert(
                level="P2",
                message=f"任务 {task.name} 因上游 {upstream_task_id} 异常而暂停",
                detail={
                    "upstream_status": upstream_status,
                    "upstream_task": upstream_task_id,
                    "downstream_task": task.name
                }
            )
            return GateResult.halt(reason=f"上游任务 {upstream_task_id} 状态异常")
        
        if strategy == FailStrategy.DEGRADE:
            return GateResult.degrade(
                fallback_data=self.get_cached_output(upstream_task_id)
            )
        
        return GateResult.allow()

使用示例

# 内容生成 Cron 执行前,检查热点追踪上游
def content_generation_cron():
    circuit_breaker = CronCircuitBreaker(redis)
    
    gate = circuit_breaker.gate_downstream_task(
        task=content_gen_task,
        upstream_task_id="hot-topic-tracker",
        strategy=FailStrategy.HALT  # 上游失败则停止,不生成无效内容
    )
    
    if not gate.allowed:
        logger.warning(f"内容生成任务暂停: {gate.reason}")
        return
    
    # 正常执行
    topics = hot_topic_cache.get()
    generate_content(topics)

3. 发布互斥锁与幂等性

发布操作是整个 Agent 链路中副作用最大的操作。必须保证:

  1. 同一内容只发布一次(幂等性)
  2. 同一时间只有一个 Agent 在发布同一内容(互斥性)
  3. 网络超时后重试不导致重复(幂等重试)
class PublishLockManager:
    """原子性发布锁管理"""
    
    LOCK_TTL = 600  # 锁最多持有 10 分钟
    
    def acquire_publish_lock(self, content_id: str) -> bool:
        """
        原子性获取发布锁。
        使用 Redis SET NX EX 确保原子性。
        """
        lock_key = f"publish:lock:{content_id}"
        lock_holder = f"agent:{os.getpid()}:{time.time()}"
        
        acquired = self.redis.set(
            lock_key, 
            lock_holder,
            nx=True,    # 只有不存在时才设置
            ex=self.LOCK_TTL
        )
        
        return bool(acquired)
    
    def check_already_published(self, content_id: str) -> Optional[str]:
        """
        检查内容是否已经发布成功。
        返回发布结果的帖子 ID,或 None(未发布)。
        """
        result_key = f"publish:result:{content_id}"
        result = self.redis.get(result_key)
        
        if result:
            data = json.loads(result)
            if data["status"] == "published":
                return data["post_id"]
        
        return None
    
    def record_publish_result(
        self, 
        content_id: str, 
        post_id: str,
        metadata: dict
    ):
        """记录发布结果,供幂等检查使用"""
        result_key = f"publish:result:{content_id}"
        self.redis.setex(
            result_key,
            86400 * 7,  # 保留 7 天
            json.dumps({
                "status": "published",
                "post_id": post_id,
                "published_at": datetime.now().isoformat(),
                **metadata
            }, ensure_ascii=False)
        )

# 使用
def publish_content_safely(content: Content):
    lock_mgr = PublishLockManager(redis)
    
    # 检查是否已发布(幂等检查)
    existing_post_id = lock_mgr.check_already_published(content.id)
    if existing_post_id:
        logger.info(f"内容 {content.id} 已发布为 {existing_post_id},跳过")
        return existing_post_id
    
    # 获取发布锁(互斥)
    if not lock_mgr.acquire_publish_lock(content.id):
        raise PublishLockBusyError(
            f"内容 {content.id} 正在被另一个 Agent 发布,请稍后查看结果"
        )
    
    try:
        # 执行发布
        post_id = platform_api.publish(content)
        
        # 记录结果
        lock_mgr.record_publish_result(content.id, post_id, {
            "platform": content.platform,
            "account_id": content.target_account
        })
        
        return post_id
    finally:
        # 无论成功失败,释放锁
        lock_mgr.release_lock(content.id)

4. 多 Agent 任务状态协调

当多个 Agent 需要协作完成一项任务时,必须有显式的状态协调机制,防止并发冲突。

class AgentTaskCoordinator:
    """多 Agent 任务协调器"""
    
    def claim_task(
        self, 
        task_id: str, 
        agent_id: str
    ) -> bool:
        """
        Agent 认领任务的原子操作。
        确保同一任务只被一个 Agent 处理。
        """
        claim_key = f"task:claim:{task_id}"
        
        # 原子性认领
        claimed = self.redis.set(
            claim_key,
            agent_id,
            nx=True,  # 只有未被认领时才成功
            ex=1800   # 最多持有 30 分钟
        )
        
        if not claimed:
            current_claimer = self.redis.get(claim_key)
            logger.info(
                f"任务 {task_id} 已被 {current_claimer} 认领,"
                f"Agent {agent_id} 放弃"
            )
        
        return bool(claimed)
    
    def update_task_status(
        self,
        task_id: str,
        agent_id: str,
        status: TaskStatus,
        detail: dict = None
    ):
        """更新任务状态,供其他 Agent 和 CEO 查询"""
        status_key = f"task:status:{task_id}"
        self.redis.setex(
            status_key,
            3600,  # 1 小时
            json.dumps({
                "task_id": task_id,
                "claimer": agent_id,
                "status": status.value,
                "updated_at": datetime.now().isoformat(),
                "detail": detail or {}
            }, ensure_ascii=False)
        )

# CEO Agent 在 spawn 补救 Agent 前检查任务状态
def ceo_handle_publish_failure(failed_task_id: str):
    coordinator = AgentTaskCoordinator(redis)
    
    # 检查是否已有其他 Agent 在处理
    existing_status = coordinator.get_task_status(failed_task_id)
    
    if existing_status and existing_status["status"] in ["claimed", "in_progress"]:
        logger.info(
            f"任务 {failed_task_id} 已被 {existing_status['claimer']} 处理中,"
            f"CEO 等待结果"
        )
        return  # 不重复 spawn
    
    # 安全地 spawn 补救 Agent
    coordinator.update_task_status(
        failed_task_id, "ceo", TaskStatus.REMEDIATION_SPAWNED
    )
    
    sessions_spawn({
        "agent": "xhs-main",
        "task": f"补发任务 {failed_task_id},先检查是否已发布,再决定是否执行"
    })

5. 系统韧性监控与自动告警

好的容错设计不是等问题发生再处理,而是在问题扩散前主动发现。

class SystemResilienceMonitor:
    """系统韧性监控器"""
    
    def run_health_checks(self):
        """综合健康检查,由 SRE Agent 的 Cron 每 15 分钟运行"""
        
        checks = [
            self.check_cron_task_freshness(),    # Cron 是否按时执行
            self.check_oauth_token_validity(),    # Token 是否即将过期
            self.check_mcp_services_alive(),      # MCP 服务是否存活
            self.check_publish_lock_stale(),      # 是否有卡住的发布锁
            self.check_task_queue_depth(),        # 任务队列是否积压
        ]
        
        issues = [c for c in checks if not c.healthy]
        
        if issues:
            self.send_health_report(issues)
    
    def check_cron_task_freshness(self) -> HealthCheck:
        """检查所有 Cron 任务是否在预期时间内执行"""
        stale_tasks = []
        
        for cron_id, expected_interval in CRON_REGISTRY.items():
            last_run = self.get_last_successful_run(cron_id)
            
            if last_run is None:
                stale_tasks.append({
                    "cron_id": cron_id,
                    "reason": "从未成功执行"
                })
                continue
            
            elapsed = datetime.now() - last_run
            if elapsed > expected_interval * 1.5:
                stale_tasks.append({
                    "cron_id": cron_id,
                    "last_run": last_run.isoformat(),
                    "elapsed_minutes": elapsed.total_seconds() / 60,
                    "expected_minutes": expected_interval.total_seconds() / 60
                })
        
        if stale_tasks:
            return HealthCheck.unhealthy(
                f"{len(stale_tasks)} 个 Cron 任务未按时执行",
                detail=stale_tasks,
                alert_level="P2"
            )
        
        return HealthCheck.healthy("所有 Cron 任务正常执行")
    
    def check_publish_lock_stale(self) -> HealthCheck:
        """检查是否有超时未释放的发布锁"""
        stale_locks = []
        
        for lock_key in self.redis.scan_iter("publish:lock:*"):
            ttl = self.redis.ttl(lock_key)
            original_ttl = PublishLockManager.LOCK_TTL
            
            # 如果锁持有时间超过一半,可能卡住了
            if ttl < original_ttl / 2:
                content_id = lock_key.split(":")[-1]
                stale_locks.append({
                    "content_id": content_id,
                    "remaining_ttl": ttl,
                    "suspected_stuck": ttl < 60
                })
        
        if any(l["suspected_stuck"] for l in stale_locks):
            return HealthCheck.unhealthy(
                "存在疑似卡住的发布锁,可能导致发布阻塞",
                detail=stale_locks,
                alert_level="P1"
            )
        
        return HealthCheck.healthy("所有发布锁状态正常")

容错分级策略

不同失败场景需要不同的容错策略:

失败类型 响应策略 自动化程度 告警级别
OAuth Token 过期 自动刷新;失败则告警 半自动 P1
MCP 服务未运行 Fail Fast,告警 自动告警 P1
Cron 执行超时 中止本次,标记为失败 自动 P2
上游 Cron 失败 断路器阻断下游 自动 P2
发布验证失败 上报 CEO,不自动重试 人工决策 P2
并发发布冲突 互斥锁阻止 自动 P3
配置漂移 启动前检测,告警 自动 P1

「韧性」与「可靠性」的区别

构建多 Agent 系统时,容易混淆两个概念:

  • 可靠性(Reliability):系统在正常条件下无故障运行的能力
  • 韧性(Resilience):系统在异常条件下自我恢复的能力

可靠性追求的是「不出错」,韧性追求的是「出了错能自愈」。

对于 AI Agent 系统,韧性比可靠性更重要——因为 Agent 的外部依赖(OAuth 服务、平台 API、MCP 工具)的不稳定性超出你的控制范围,「不出错」是不现实的。

韧性设计的三条原则

  1. 隔离失败边界:一个 Agent 失败,不应该扩散到整个系统
  2. 快速失败优于静默失败:失败应该立即可见,而不是以「空数据」的形式传播
  3. 恢复是可观测的:人工介入时,系统状态必须透明可查

总结

多 Agent 系统的级联失败不是偶然事件,而是在容错设计缺失时的必然结果。

本文介绍的五个机制(OAuth 生命周期管理、Cron 断路器、发布互斥锁、任务状态协调、韧性监控)构成了一套完整的容错防护体系。每一条都来自真实的生产事故,每一条都在 OpenClaw 上得到了验证。

核心设计哲学只有一句话:

不要假设上游会成功,不要假设下游会感知失败,不要假设并发不会发生。

用显式的机制保证,而非隐式的假设。


📖 详细踩坑日记 → 公众号「Wesley AI 日记」,微信搜索关注,每周 AI Agent 实战经验分享。

❌