多 Agent 系统容错与恢复机制:OAuth 过期、Cron 级联失败的工程解法
多 Agent 系统容错与恢复机制:OAuth 过期、Cron 级联失败的工程解法
📖 踩坑实录系列,详细过程见公众号「Wesley AI 日记」,微信搜索关注。
标签:AI Agent、容错设计、Cron、OAuth、系统韧性、OpenClaw
前言:险些全军覆没的那一天
3 月 10 日,我的 OpenClaw Agent Team 几乎集体瘫痪。
时间线如下:
- 06:00 — 小红书热点追踪 Cron 触发,OAuth Token 过期,静默失败
- 09:00 — 小红书内容准备 Cron 触发,依赖热点追踪结果,读到空数据,生成无效内容
- 12:10 — 发布 Cron 触发,拿到无效内容,发布失败(参数格式错误)
- 12:20 — CEO Agent 发现失败,spawn 补发 Agent,因无发布互斥锁,触发竞态
- 13:08 — 并发的多个 Agent 各自「修复」问题,结果发布了 3 条重复内容(标题还被改了)
一个 OAuth Token 的过期,引发了整条 Agent 链路的级联崩溃。
这不是单个 Bug,这是多 Agent 系统在容错设计缺失时的系统性失效。
本文从工程角度,系统梳理多 Agent 系统的容错与恢复机制设计。
理解级联失败
多 Agent 系统中,Agent 之间存在依赖关系。一个上游 Agent 的失败,会以不可预期的方式传导给下游:
┌─────────────────┐
OAuth过期 ──▶ │ 热点追踪 Agent │ ──▶ 静默失败(无告警)
└────────┬────────┘
│ 输出:空数据
▼
┌─────────────────┐
│ 内容生成 Agent │ ──▶ 生成无效内容(基于空输入)
└────────┬────────┘
│ 输出:无效内容
▼
┌─────────────────┐
│ 发布 Agent │ ──▶ 发布失败
└────────┬────────┘
│ 触发
▼
┌─────────────────┐
│ CEO 补救 Agent ×3│ ──▶ 并发冲突,重复发布
└─────────────────┘
级联失败的关键特征:
- 失败的静默传播:上游失败没有立即终止下游,而是以「空数据」或「部分数据」的形式传递
- 错误放大:每一层都在「尽力完成任务」,却把问题越放越大
- 干预引发新问题:人工干预(spawn 补救 Agent)因缺乏协调机制,产生了新的竞态条件
容错机制设计:五个维度
1. OAuth Token 生命周期管理
OAuth Token 过期是多 Agent 系统中最常见的静默失败源。
错误的处理方式(我们之前的做法):
# 错误:调用 API 失败时,直接返回空结果
def fetch_hot_topics(token: str) -> list:
try:
resp = api.get_hot_topics(token)
return resp.data
except Exception:
return [] # 🚨 静默返回空,下游无感知
正确的 Token 生命周期管理:
class OAuthTokenManager:
"""OAuth Token 生命周期管理器"""
def __init__(self, token_store: TokenStore):
self.token_store = token_store
self.refresh_threshold_minutes = 30 # 过期前 30 分钟主动刷新
def get_valid_token(self, service: str) -> str:
"""获取有效 Token,必要时自动刷新"""
token = self.token_store.get(service)
if token is None:
raise TokenNotFoundError(f"服务 {service} 未配置 Token")
if self.is_expiring_soon(token):
token = self.refresh_token(service, token)
return token
def is_expiring_soon(self, token: OAuthToken) -> bool:
"""检查 Token 是否即将过期"""
remaining = token.expires_at - datetime.now()
return remaining < timedelta(minutes=self.refresh_threshold_minutes)
def refresh_token(self, service: str, old_token: OAuthToken) -> OAuthToken:
"""刷新 Token,失败时告警但不静默"""
try:
new_token = oauth_client.refresh(old_token.refresh_token)
self.token_store.save(service, new_token)
return new_token
except OAuthRefreshError as e:
# 刷新失败:告警 + 抛出异常,不返回空
alert_manager.send_alert(
level="P1",
message=f"服务 {service} OAuth Token 刷新失败,需要重新授权",
detail=str(e),
notify_channel="feishu"
)
raise TokenExpiredError(f"服务 {service} Token 已过期且无法自动刷新")
主动刷新 Cron:
# 每天凌晨 2 点检查所有 Token 状态
# 0 2 * * * /home/admin/.openclaw/scripts/token-health-check.sh
#!/bin/bash
# token-health-check.sh
SERVICES=("xhs_main" "xhs_account_b" "wechat_mp")
for service in "${SERVICES[@]}"; do
expiry=$(token-manager get-expiry $service)
remaining_hours=$(( (expiry - $(date +%s)) / 3600 ))
if [ $remaining_hours -lt 48 ]; then
echo "⚠️ $service Token 将在 ${remaining_hours}h 后过期,尝试刷新..."
token-manager refresh $service || \
notify-feishu "P1: $service Token 刷新失败,需要手动重新授权"
fi
done
2. Cron 链路断路器(Circuit Breaker)
当上游 Cron 任务失败时,下游任务应该感知并选择正确的策略,而不是盲目继续执行。
class CronCircuitBreaker:
"""Cron 链路断路器"""
def __init__(self, redis_client):
self.redis = redis_client
def check_upstream_health(self, upstream_task_id: str) -> UpstreamStatus:
"""检查上游任务的健康状态"""
status_key = f"cron:status:{upstream_task_id}"
status = self.redis.get(status_key)
if status is None:
return UpstreamStatus.UNKNOWN
status_data = json.loads(status)
# 上游任务是否在预期时间内成功完成
last_success = datetime.fromisoformat(status_data["last_success_at"])
expected_interval = timedelta(minutes=status_data["interval_minutes"])
if datetime.now() - last_success > expected_interval * 2:
return UpstreamStatus.STALE # 上游已过期未更新
if status_data["last_run_result"] == "failed":
return UpstreamStatus.FAILED
return UpstreamStatus.HEALTHY
def gate_downstream_task(
self,
task: CronTask,
upstream_task_id: str,
strategy: FailStrategy = FailStrategy.HALT
) -> GateResult:
"""
检查上游状态,决定下游任务是否应该执行。
strategy:
- HALT: 上游失败则停止(默认,安全优先)
- DEGRADE: 降级执行(使用缓存/默认值)
- PROCEED: 继续执行(用于不依赖上游输出的任务)
"""
upstream_status = self.check_upstream_health(upstream_task_id)
if upstream_status == UpstreamStatus.HEALTHY:
return GateResult.allow()
if strategy == FailStrategy.HALT:
alert_manager.send_alert(
level="P2",
message=f"任务 {task.name} 因上游 {upstream_task_id} 异常而暂停",
detail={
"upstream_status": upstream_status,
"upstream_task": upstream_task_id,
"downstream_task": task.name
}
)
return GateResult.halt(reason=f"上游任务 {upstream_task_id} 状态异常")
if strategy == FailStrategy.DEGRADE:
return GateResult.degrade(
fallback_data=self.get_cached_output(upstream_task_id)
)
return GateResult.allow()
使用示例:
# 内容生成 Cron 执行前,检查热点追踪上游
def content_generation_cron():
circuit_breaker = CronCircuitBreaker(redis)
gate = circuit_breaker.gate_downstream_task(
task=content_gen_task,
upstream_task_id="hot-topic-tracker",
strategy=FailStrategy.HALT # 上游失败则停止,不生成无效内容
)
if not gate.allowed:
logger.warning(f"内容生成任务暂停: {gate.reason}")
return
# 正常执行
topics = hot_topic_cache.get()
generate_content(topics)
3. 发布互斥锁与幂等性
发布操作是整个 Agent 链路中副作用最大的操作。必须保证:
- 同一内容只发布一次(幂等性)
- 同一时间只有一个 Agent 在发布同一内容(互斥性)
- 网络超时后重试不导致重复(幂等重试)
class PublishLockManager:
"""原子性发布锁管理"""
LOCK_TTL = 600 # 锁最多持有 10 分钟
def acquire_publish_lock(self, content_id: str) -> bool:
"""
原子性获取发布锁。
使用 Redis SET NX EX 确保原子性。
"""
lock_key = f"publish:lock:{content_id}"
lock_holder = f"agent:{os.getpid()}:{time.time()}"
acquired = self.redis.set(
lock_key,
lock_holder,
nx=True, # 只有不存在时才设置
ex=self.LOCK_TTL
)
return bool(acquired)
def check_already_published(self, content_id: str) -> Optional[str]:
"""
检查内容是否已经发布成功。
返回发布结果的帖子 ID,或 None(未发布)。
"""
result_key = f"publish:result:{content_id}"
result = self.redis.get(result_key)
if result:
data = json.loads(result)
if data["status"] == "published":
return data["post_id"]
return None
def record_publish_result(
self,
content_id: str,
post_id: str,
metadata: dict
):
"""记录发布结果,供幂等检查使用"""
result_key = f"publish:result:{content_id}"
self.redis.setex(
result_key,
86400 * 7, # 保留 7 天
json.dumps({
"status": "published",
"post_id": post_id,
"published_at": datetime.now().isoformat(),
**metadata
}, ensure_ascii=False)
)
# 使用
def publish_content_safely(content: Content):
lock_mgr = PublishLockManager(redis)
# 检查是否已发布(幂等检查)
existing_post_id = lock_mgr.check_already_published(content.id)
if existing_post_id:
logger.info(f"内容 {content.id} 已发布为 {existing_post_id},跳过")
return existing_post_id
# 获取发布锁(互斥)
if not lock_mgr.acquire_publish_lock(content.id):
raise PublishLockBusyError(
f"内容 {content.id} 正在被另一个 Agent 发布,请稍后查看结果"
)
try:
# 执行发布
post_id = platform_api.publish(content)
# 记录结果
lock_mgr.record_publish_result(content.id, post_id, {
"platform": content.platform,
"account_id": content.target_account
})
return post_id
finally:
# 无论成功失败,释放锁
lock_mgr.release_lock(content.id)
4. 多 Agent 任务状态协调
当多个 Agent 需要协作完成一项任务时,必须有显式的状态协调机制,防止并发冲突。
class AgentTaskCoordinator:
"""多 Agent 任务协调器"""
def claim_task(
self,
task_id: str,
agent_id: str
) -> bool:
"""
Agent 认领任务的原子操作。
确保同一任务只被一个 Agent 处理。
"""
claim_key = f"task:claim:{task_id}"
# 原子性认领
claimed = self.redis.set(
claim_key,
agent_id,
nx=True, # 只有未被认领时才成功
ex=1800 # 最多持有 30 分钟
)
if not claimed:
current_claimer = self.redis.get(claim_key)
logger.info(
f"任务 {task_id} 已被 {current_claimer} 认领,"
f"Agent {agent_id} 放弃"
)
return bool(claimed)
def update_task_status(
self,
task_id: str,
agent_id: str,
status: TaskStatus,
detail: dict = None
):
"""更新任务状态,供其他 Agent 和 CEO 查询"""
status_key = f"task:status:{task_id}"
self.redis.setex(
status_key,
3600, # 1 小时
json.dumps({
"task_id": task_id,
"claimer": agent_id,
"status": status.value,
"updated_at": datetime.now().isoformat(),
"detail": detail or {}
}, ensure_ascii=False)
)
# CEO Agent 在 spawn 补救 Agent 前检查任务状态
def ceo_handle_publish_failure(failed_task_id: str):
coordinator = AgentTaskCoordinator(redis)
# 检查是否已有其他 Agent 在处理
existing_status = coordinator.get_task_status(failed_task_id)
if existing_status and existing_status["status"] in ["claimed", "in_progress"]:
logger.info(
f"任务 {failed_task_id} 已被 {existing_status['claimer']} 处理中,"
f"CEO 等待结果"
)
return # 不重复 spawn
# 安全地 spawn 补救 Agent
coordinator.update_task_status(
failed_task_id, "ceo", TaskStatus.REMEDIATION_SPAWNED
)
sessions_spawn({
"agent": "xhs-main",
"task": f"补发任务 {failed_task_id},先检查是否已发布,再决定是否执行"
})
5. 系统韧性监控与自动告警
好的容错设计不是等问题发生再处理,而是在问题扩散前主动发现。
class SystemResilienceMonitor:
"""系统韧性监控器"""
def run_health_checks(self):
"""综合健康检查,由 SRE Agent 的 Cron 每 15 分钟运行"""
checks = [
self.check_cron_task_freshness(), # Cron 是否按时执行
self.check_oauth_token_validity(), # Token 是否即将过期
self.check_mcp_services_alive(), # MCP 服务是否存活
self.check_publish_lock_stale(), # 是否有卡住的发布锁
self.check_task_queue_depth(), # 任务队列是否积压
]
issues = [c for c in checks if not c.healthy]
if issues:
self.send_health_report(issues)
def check_cron_task_freshness(self) -> HealthCheck:
"""检查所有 Cron 任务是否在预期时间内执行"""
stale_tasks = []
for cron_id, expected_interval in CRON_REGISTRY.items():
last_run = self.get_last_successful_run(cron_id)
if last_run is None:
stale_tasks.append({
"cron_id": cron_id,
"reason": "从未成功执行"
})
continue
elapsed = datetime.now() - last_run
if elapsed > expected_interval * 1.5:
stale_tasks.append({
"cron_id": cron_id,
"last_run": last_run.isoformat(),
"elapsed_minutes": elapsed.total_seconds() / 60,
"expected_minutes": expected_interval.total_seconds() / 60
})
if stale_tasks:
return HealthCheck.unhealthy(
f"{len(stale_tasks)} 个 Cron 任务未按时执行",
detail=stale_tasks,
alert_level="P2"
)
return HealthCheck.healthy("所有 Cron 任务正常执行")
def check_publish_lock_stale(self) -> HealthCheck:
"""检查是否有超时未释放的发布锁"""
stale_locks = []
for lock_key in self.redis.scan_iter("publish:lock:*"):
ttl = self.redis.ttl(lock_key)
original_ttl = PublishLockManager.LOCK_TTL
# 如果锁持有时间超过一半,可能卡住了
if ttl < original_ttl / 2:
content_id = lock_key.split(":")[-1]
stale_locks.append({
"content_id": content_id,
"remaining_ttl": ttl,
"suspected_stuck": ttl < 60
})
if any(l["suspected_stuck"] for l in stale_locks):
return HealthCheck.unhealthy(
"存在疑似卡住的发布锁,可能导致发布阻塞",
detail=stale_locks,
alert_level="P1"
)
return HealthCheck.healthy("所有发布锁状态正常")
容错分级策略
不同失败场景需要不同的容错策略:
| 失败类型 | 响应策略 | 自动化程度 | 告警级别 |
|---|---|---|---|
| OAuth Token 过期 | 自动刷新;失败则告警 | 半自动 | P1 |
| MCP 服务未运行 | Fail Fast,告警 | 自动告警 | P1 |
| Cron 执行超时 | 中止本次,标记为失败 | 自动 | P2 |
| 上游 Cron 失败 | 断路器阻断下游 | 自动 | P2 |
| 发布验证失败 | 上报 CEO,不自动重试 | 人工决策 | P2 |
| 并发发布冲突 | 互斥锁阻止 | 自动 | P3 |
| 配置漂移 | 启动前检测,告警 | 自动 | P1 |
「韧性」与「可靠性」的区别
构建多 Agent 系统时,容易混淆两个概念:
- 可靠性(Reliability):系统在正常条件下无故障运行的能力
- 韧性(Resilience):系统在异常条件下自我恢复的能力
可靠性追求的是「不出错」,韧性追求的是「出了错能自愈」。
对于 AI Agent 系统,韧性比可靠性更重要——因为 Agent 的外部依赖(OAuth 服务、平台 API、MCP 工具)的不稳定性超出你的控制范围,「不出错」是不现实的。
韧性设计的三条原则:
- 隔离失败边界:一个 Agent 失败,不应该扩散到整个系统
- 快速失败优于静默失败:失败应该立即可见,而不是以「空数据」的形式传播
- 恢复是可观测的:人工介入时,系统状态必须透明可查
总结
多 Agent 系统的级联失败不是偶然事件,而是在容错设计缺失时的必然结果。
本文介绍的五个机制(OAuth 生命周期管理、Cron 断路器、发布互斥锁、任务状态协调、韧性监控)构成了一套完整的容错防护体系。每一条都来自真实的生产事故,每一条都在 OpenClaw 上得到了验证。
核心设计哲学只有一句话:
不要假设上游会成功,不要假设下游会感知失败,不要假设并发不会发生。
用显式的机制保证,而非隐式的假设。
📖 详细踩坑日记 → 公众号「Wesley AI 日记」,微信搜索关注,每周 AI Agent 实战经验分享。