普通视图

发现新文章,点击刷新页面。
昨天以前首页

如何使用飞书机器人连接本地 AI Agent

作者 Cobyte
2026年4月7日 08:30

1. 前言

我们在上一篇文章中讲解了如何使用微信 ClawBot 连接本地 AI Agent,这一篇我们讲解如何使用飞书机器人连接本地 AI Agent。

首先我们需要创建一个飞书应用。

2. 创建飞书应用

2.1 打开飞书开放平台

访问飞书开发平台 open.feishu.cn/app 并登录。

2.2 创建应用

1.点击 创建企业自建应用

image.png

2.填写应用名称和描述

image.png

3.选择应用图标

2.3 复制凭证

在“凭证与基础信息”中复制:

  • App ID
  • App Secret

2.4 启用机器人能力

image.png

2.5 配置权限

权限管理中点击批量导入/导出权限按钮

image.png

然后复制粘贴以下权限:

{
  "scopes": {
    "tenant": [
      "aily:file:read",
      "aily:file:write",
      "application:application.app_message_stats.overview:readonly",
      "application:application:self_manage",
      "application:bot.menu:write",
      "cardkit:card:read",
      "cardkit:card:write",
      "contact:user.employee_id:readonly",
      "corehr:file:download",
      "event:ip_list",
      "im:chat.access_event.bot_p2p_chat:read",
      "im:chat.members:bot_access",
      "im:message",
      "im:message.group_at_msg:readonly",
      "im:message.p2p_msg:readonly",
      "im:message:readonly",
      "im:message:send_as_bot",
      "im:resource"
    ],
    "user": ["aily:file:read", "aily:file:write", "im:chat.access_event.bot_p2p_chat:read"]
  }
}

最后点击确定并申请开通。

2.6 配置事件订阅

事件与回调中进行事件配置,选择订阅方式为长连接,这样我们就可以在本地电脑也可以连接飞书机器人了(本质是WebSocket)。

image.png

接着添加接收消息事件:im.message.receive_v1

image.png

加密策略设置,这个可选项。

image.png

2.7 发布应用

接着我们在版本管理与发布中创建一个版本并发布。

image.png

一般我们在测试的时候,我们自己的飞书账号就是管理员,上述版本申请发布后会在手机飞书上收到一条审核消息,我们按提示进行操作审核即可。

这时我们就可以在手机飞书上搜索我们刚刚创建的“AI 机器人”,然后点击进去就可以对话了,但目前我们还没通过代码连接它,所以还对不了话。

01.jpg

值得注意的是:上述飞书机器人配置也是OpenClaw的官方飞书机器人配置方案。

3. 实现飞书连接本地 Agent

3.1 构建 API Client

根据飞书开发平台开发文档的 《Python SDK 指南》,我们在通过 SDK 调用飞书开放接口之前,需要先在代码中创建一个 API Client,用来指定当前使用的应用、日志级别、HTTP 请求超时时间等基本信息。

我们的实现如下:

from dataclasses import dataclass
import lark_oapi as lark

@dataclass
class FeishuConfig:
    """飞书渠道配置"""
    app_id: str = ""
    app_secret: str = ""
    encrypt_key: str = "" # 可选字段,空字符串表示不使用加密
    verification_token: str = "" # 可选字段,空字符串表示不验证消息来源

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        self.config = config
        # 构建 API Client
        self._client = lark.Client.builder() \
            .app_id(config.app_id) \
            .app_secret(config.app_secret) \
            .build()

首先我们实现一个配置类来统一管理上述我们说到的 App IDApp Secret 以及在配置事件订阅中讲到的加密策略设置的 Encrypt KeyVerification Token

后续我们就可以通过依赖注入模式调用 FeishuChannel 类了:

# 使用
config = FeishuConfig(app_id="...", app_secret="...")
channel = FeishuChannel(config=config)  # 配置通过构造函数注入

这样实现了配置与实现分离,将来支持热更新配置。

3.2 长连接飞书客户端

根据飞书开发平台开发文档的 《Python SDK 指南》的处理事件章节,我们的实现如下:

from loguru import logger
class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    def start(self) -> None:
        # 构建事件处理器
        builder = lark.EventDispatcherHandler.builder(
            self.config.encrypt_key, 
            self.config.verification_token
        )
        # 注册接收消息事件处理函数 im.message.receive_v1
        handler = builder.register_p2_im_message_receive_v1(self._on_message).build()
        # 初始化长连接客户端并传入事件处理器   
        ws_client = lark.ws.Client(
            self.config.app_id, 
            self.config.app_secret, 
            event_handler=handler
        )
        # start() 方法会阻塞主线程,持续运行,直到手动关闭
        ws_client.start()

        logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""

我们首先构建事件处理器,同时如果你在开发者后台的应用详情中,配置了 事件与回调 > 加密策略 页面内的加密信息(Encrypt Key 和 Verification Token)。则必须将加密信息的值传递到 EventDispatcherHandler.builder 方法的参数中。

接着我们注册接收消息事件处理函数。也就是我们在前面配置事件订阅小节所讲的添加的接收消息事件:im.message.receive_v1

最后通过 lark.ws.Client() 初始化长连接客户端,必填参数为应用的 APP_ID 和 APP_SECRET。我们在上面讲述创建飞书应用的时候已经阐述过 APP_ID 和 APP_SECRET 了。

值得注意的是:飞书的长连接客户端 (ws.Client) 只能用来"接收"事件,不能用来"发送"消息!如果要主动发消息或回复消息,必须使用普通的 Open API 客户端 (lark.Client)。

3.3 接收消息事件处理函数

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    def start(self) -> None:
        # 省略...

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""
        msg = data.event.message
        # 只处理用户发送的纯文本消息
        if data.event.sender.sender_type == "bot" or msg.message_type != "text":
            return

        content = json.loads(msg.content).get("text", "")
        if not content:
            return
        logger.info(f"收到{msg.chat_id}消息: {content}")
        # 发送消息
        self._process_and_reply(msg.chat_id, content)

我们这里先只处理用户发送的纯文本消息。

3.4 发送消息到 AI Agent 处理并进行回复

前面说了飞书的长连接客户端 (ws.Client) 只能用来"接收"事件,不能用来"发送"消息!如果要主动发消息或回复消息,必须使用普通的 Open API 客户端 (lark.Client)。

所以根据飞书开发平台开发文档的 《Python SDK 指南》的服务器API章节的《发送消息》,我们的实现如下:

import agent_loop as agent  # 导入本地 AI Agent 逻辑模块

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...
    def start(self) -> None:
        # 省略...
    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        # 省略...
    def _process_and_reply(self, chat_id: str, content: str) -> None:
        """调用本地 AI Agent 获取结果并调用飞书 API 发送"""
        try:
            history = [
                {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个 AI 助手")},
                {"role": "user", "content": content}
            ]
            
            # 1. 运行 AI 思考逻辑
            reply_text = agent.agent_loop(history)
            if not reply_text:
                return

            # 2. 发送回复
            receive_id_type = "chat_id" if chat_id.startswith("oc_") else "open_id"
            req = CreateMessageRequest.builder().receive_id_type(receive_id_type).request_body(
                CreateMessageRequestBody.builder()
                .receive_id(chat_id)
                .msg_type("text")
                .content(json.dumps({"text": reply_text}))
                .build()
            ).build()
            
            resp = self._client.im.v1.message.create(req)
            if resp.success():
                logger.info(f"➡️ 成功回复消息到: {chat_id}")
            else:
                logger.error(f"❌ 回复失败: {resp.msg}")
                
        except Exception as e:
            logger.error(f"❌ 处理异常: {e}")

_process_and_reply 方法中,我们首先构造与 AI Agent 的对话历史,调用 agent.agent_loop(history) 运行我们前面实现的 AI Agent 进行思考获取回复文本。这部分我们应该比较熟悉了。

根据飞书开发平台开发文档的 《发送消息》指南,发送消息时需要指定 receive_id_type 参数。飞书的 ID 有两种类型:

  • chat_id:以 oc_ 开头的群聊 ID
  • open_id:用户个人 Open ID

我们通过简单的字符串前缀判断 chat_id.startswith("oc_") 来区分这两种类型。

构建消息请求时:

  1. 首先创建 CreateMessageRequest 的建造者
  2. 设置 receive_id_type(接收者ID类型)
  3. 通过 request_body() 设置请求体,其中又使用 CreateMessageRequestBody 的建造者设置具体参数
  4. 调用 .build() 方法最终构建请求对象

这里需要注意的是,飞书消息的 content 字段必须是 JSON 字符串格式,所以我们需要使用 json.dumps({"text": reply_text}) 将回复文本包装成 JSON。

最后调用 self._client.im.v1.message.create(req) 发送消息到飞书服务器,并根据响应结果记录成功或失败日志。

现在我们已经完成了飞书渠道的核心实现,接下来编写启动脚本。

4. 启动飞书机器人

我们创建一个独立的启动文件 test_feishu.py

from loguru import logger
from feishu import FeishuChannel, FeishuConfig

def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxxx",         # 替换为真实的 App ID
        app_secret="xxxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
    
    # 2. 初始化频道并启动长连接
    channel = FeishuChannel(config=config)
    
    logger.info("正在启动飞书机器人长连接...")
    
    # 3. 启动并保持运行
    try:
        channel.start()
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    main()

启动脚本的实现非常简单直接:

第一步:配置飞书机器人凭证

我们需要创建 FeishuConfig 实例,填入在飞书开放平台创建应用时获取的凭证:

  • app_id:应用的唯一标识,以 cli_ 开头
  • app_secret:应用的密钥,用于 API 身份验证
  • encrypt_key:可选,如果在开发者后台配置了加密策略则需要填入
  • verification_token:可选,用于验证消息来源

第二步:初始化飞书频道

使用配置对象创建 FeishuChannel 实例。这里体现了依赖注入模式的优势——我们可以轻松更换不同的配置来源。

第三步:启动并保持运行

调用 channel.start() 启动飞书机器人的 WebSocket 长连接。这个方法会阻塞主线程,持续运行直到收到退出信号。

我们使用 try...except KeyboardInterrupt 捕获用户按 Ctrl+C 的中断信号,实现优雅退出。当用户想要停止机器人时,只需在终端中按 Ctrl+C,程序会记录退出日志然后正常结束。

运行机器人

在启动之前,先安装相关依赖。

requirements.txt 依赖如下:

python-dotenv==1.0.1
openai==2.24.0
loguru==0.7.3
# 飞书官方 Python SDK
lark-oapi>=1.2.0

执行 pip install -r requirements.txt 安装依赖。

依赖安装完毕后,在终端中执行:

python test_feishu.py

你会看到以下输出:

2026-04-05 00:59:23.456 | INFO     | 正在启动飞书机器人长连接...

此时机器人已经启动并开始监听飞书消息了。你可以在飞书中找到你的机器人应用,发送消息进行测试。

5. 启用异步事件循环线程

5.1 Node.js 和 Python 的主线程对比

我们知道 Node.js 默认的主线程就是事件循环线程,而 Python 的主线程默认是没有事件循环的。所以我们在上面也提到了调用 channel.start() 启动飞书机器人的 WebSocket 长连接会阻塞主线程。因为飞书 Python SDK 的 lark.ws.Client 采用同步阻塞设计,其 start() 方法会启动自己的事件循环并持续运行。主要体现在 FeishuChannel 类的 start 方法中以下代码:

ws_client.start()
logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

我们发现执行 ws_client.start() 代码后日志不打印了,因为主线程被阻塞了。

而在 Node.js 中以下的代码则不会被阻塞:

const WebSocket = require('ws');

function run_ws() {
  const ws = new WebSocket('ws://example.com/socket');

  ws.on('open', () => {
    console.log('[Worker] WebSocket connected');
  });

  ws.on('message', (data) => {
    console.log('[Worker] Received:', data.toString());
  });
}

// 同步执行
run_ws();
console.log('主线程没有被阻塞');

我们发现直接在 Node.js 的主线程中启动 WebSocket 连接服务是不会阻塞的,因为 Node.js 中的 WebSocket 本身是基于 Node.js 的事件循环来实现的。

5.2 通过 asyncio 启动异步事件循环

通过上一小节我们知道 Python 默认的主线程是没有事件循环的,需要显式创建运行。目前在 Python 3.7+ 推荐通过 asyncio 来启动异步事件循环。

实现很简单:

async def main():
    # 省略...
    try:
        await channel.start()
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

上述代码的转换过程等于:

主线程 (执行Python代码)
    ↓
调用 asyncio.run()
    ↓
主线程内部启动事件循环
    ↓
主线程 = 异步事件循环线程

但现在运行会报错,因为虽然主线程启动了异步事件循环线程,但飞书 SDK 的 lark.ws.Client 内部也使用了异步事件循环,当我们在已有的 asyncio 事件循环中调用它时,会产生冲突。

所以我们需要在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突。修改如下:

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    async def start(self) -> None:
        # 省略...
        def run_ws():
            # 为 WebSocket 客户端所在线程设置专属的事件循环,避免报错
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            import lark_oapi.ws.client
            lark_oapi.ws.client.loop = loop
            # 初始化长连接客户端
            ws_client = lark.ws.Client(
                self.config.app_id, 
                self.config.app_secret, 
                event_handler=handler
            )
            ws_client.start()
        # 在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突
        threading.Thread(target=run_ws, daemon=True).start()
        logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

        # 保持主协程存活
        while True:
            await asyncio.sleep(1)

主要修改了 FeishuChannel 类的 start 方法,通过 threading.Thread 在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突,并为飞书 WebSocket 客户端所在线程设置专属的事件循环,避免报错。

5.3 防止 AI 处理阻塞消息接收

无论是 Python 还是 Node.js,都需要防止耗时的 AI 处理阻塞消息接收。所以我们需要修改 _on_message 方法:

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    async def start(self) -> None:
        # 省略...

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        # 省略...

        # 启动独立线程处理 AI 逻辑和回复,防止阻塞 WebSocket 接收循环
        threading.Thread(
            target=self._process_and_reply, 
            args=(msg.chat_id, content)
        ).start()

我们看到在上述代码中启动一个独立线程处理 AI 逻辑和回复,防止阻塞 WebSocket 接收消息循环。

这时我们再次启动代码,我们可以看到 start 方法中的日志同步打印了:

2026-04-05 01:20:01.456 | INFO     | 正在启动飞书机器人长连接...
2026-04-05 01:20:01.457 | INFO     |  飞书极简版机器人已启动 (WebSocket)

6. 总结

至此,我们已经完整实现了通过飞书机器人连接本地 AI Agent。通过上述实践,我们不仅掌握了具体的实现步骤,更重要的是理解了背后的技术原理:

  • Node.js vs Python 事件循环:Node.js 主线程本身就是事件循环线程,天生异步;Python 则需要显式创建事件循环,主线程默认同步执行
  • Python 异步架构:深入理解了 asyncio 事件循环与多线程的协作模式,以及如何通过线程隔离解决 SDK 的冲突

记住:技术的最佳学习方式就是动手实践。现在就去飞书开放平台创建你的应用,启动这个机器人,感受 AI 与即时通讯结合的魅力吧!

我是程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

当 AI 已经做出判断,谁来按那个确认键?

作者 yuki_uix
2026年4月3日 21:17

上一篇我得出了一个结论:AI 在电商链路里真正有价值的地方,是认知负担最重和人工成本最高的两类场景。售后工单是后者的典型——规则明确、量大、重复,AI 来做意图识别再合适不过。

但写完那篇之后,我一直在想一个没展开的问题:

AI 做出判断之后,然后呢?

这个"然后",大多数产品设计都跳过了。技术团队花了很多精力让模型的意图识别更准,却很少认真想:当 AI 已经判断出"这个客户想退货",界面接下来应该发生什么?自动触发退货流程?等客服确认?还是让客户自己再按一次?

这条边界——什么时候 AI 该自己执行,什么时候必须等人——不是技术问题,是设计问题。

你有没有在这里停留,然后思考一下,下一步应该是什么样的?


从一个工单说起

想象这样一条售后消息:

"我上周买的外套,收到发现颜色和图片差太多了,能退吗?"

对一个训练充分的意图分类模型来说,这条消息的处理不难:退货意图,原因是色差,属于"货不对板"类目,符合平台退货政策。置信度 0.94。

好,模型判断完了。现在问题来了:

界面该做什么?

选项 A:直接自动发起退货申请,给客户发确认短信。
选项 B:在客服工作台标注"建议:退货申请(置信度 94%)",等客服点击确认。
选项 C:给客户发一条消息:"您是否需要申请退货?",等客户自己确认。
选项 D:根据置信度动态决定——高于某个阈值自动执行,低于阈值转人工。

这四个选项,背后是四种完全不同的产品逻辑。没有哪个天然正确,但选哪个会直接影响:客户体验、客服工作量、出错之后谁来担责、以及商家对这套系统的信任程度。


工单的完整旅程:AI 介入的三个阶段

在讨论"自动执行还是人工确认"之前,先把一条工单从进来到处理完的完整路径摊开来看。

image.png

这张图里有几个细节值得注意:

第一,置信度和风险是两个独立维度,不能只看置信度高低;

第二,无论哪条路径,操作记录都是必须的,不是可选项;

第三,人工处理的结果应该回流到模型,这个闭环在很多产品里是缺失的。

四类工单,四种处置

把意图清晰度和出错风险交叉,能得到四种典型工单,每一种的正确处置方式都不一样:

工单类型 示例消息 AI 置信度 出错风险 建议处置
意图明确 · 低风险 "这个能退货吗,我不喜欢" 高(~95%) 低(可撤销) 自动发起退货申请,显示撤销入口
意图明确 · 高风险 "我要投诉,这个产品质量有问题" 高(~90%) 高(涉及品牌声誉) 工作台标注意图 + 建议回复模板,客服确认后发送
意图模糊 · 可引导 "东西有点问题,怎么处理" 中(~65%) 列出 2-3 个意图候选,客服快速选择;或给客户发引导消息
意图不明 · 情绪激动 "太差劲了!!!退退退!!!" 低(~40%) 高(情绪化客户需要人工安抚) 直接转人工,标注"情绪风险",优先级提升

第四类是最容易被忽视的。"退退退"这个词在字面上是退货意图,模型可能识别出高置信度的退货分类——但这条消息需要的不是触发退货流程,而是先安抚情绪。纯文本意图识别不等于理解语境,这是 AI 介入工单处理时最容易翻车的地方。

「 做了一个可以玩的版本,→ 在线体验

出错了,界面怎么兜底

AI 判断出错不是概率问题,是必然会发生的事。问题不是"怎么避免出错",而是"出错之后系统怎么行动"。

常见的出错场景有三种:

场景一:自动执行了错误动作。 客户说"我想换个颜色",AI 识别为退货并自动发起了退货申请。客户收到退货确认短信,困惑,打电话进来。这时候客服看到的界面应该是:清晰标注"系统于 10 分钟前自动发起退货申请",一键撤销,同时自动生成一条道歉模板消息。如果这个撤销入口藏在三层菜单里,出错的代价就从"小麻烦"变成了"客户愤怒"。

场景二:置信度虚高,判断方向错了。 "我朋友说这个质量不好,我有点担心"——这条消息的关键词触发了模型的"质量投诉"分类,置信度 88%。但实际上客户只是在表达顾虑,还没有购买,根本没有工单可以处理。这类情况,界面的兜底方式是:在工作台显示判断依据("触发词:质量不好"),让客服能快速理解为什么 AI 这么判断,并在纠正之后把这条记录标注为"误判样本"。

场景三:正确意图,错误时机。 客户下单后两小时内发消息"我想取消订单",AI 正确识别为取消意图,自动触发取消流程——但这时候订单已经进入打包环节,取消会触发额外的仓储费用。AI 不知道订单状态,做了一个技术上正确但业务上错误的决定。这个场景说明:意图识别和动作执行之间,需要一层业务规则校验,不能让模型的输出直接触发操作。


我尝试建立一个判断框架

反复想这个问题之后,我觉得影响"自动执行 vs 人工确认"这条边界的,主要是三个变量:

1. 出错的代价有多高?

同样是退货场景,"误触发了一个客户不想退的退货申请",代价是:客户困惑、需要撤销、产生额外沟通。麻烦,但可以修复。

换一个场景:AI 判断某个账号存在异常交易,自动冻结——如果判断错了,代价是:正常用户被误封,投诉升级,信任崩塌。不可轻易修复。

出错代价越高,越需要人工确认作为缓冲。 这个逻辑不复杂,但容易被"模型准确率已经很高了"这个理由绕过。准确率 99% 听起来很高,但如果每天处理一万条工单,就有一百条出错——这一百条落在真实用户身上,每一条都是一个完整的糟糕体验。

2. 可逆性如何?

自动执行之后,这个动作能撤销吗?

退货申请发出了,可以撤销。优惠券发出去了,不好收回。退款打出去了,追回来很麻烦。物流揽件指令发出去了,基本不可逆。

可逆性越低,越需要在执行前确认。 这和出错代价是两个维度——有些事出错代价不高,但就是无法撤销;有些事代价很高,但可以事后补救。两者叠加才是完整的风险评估。

3. 这个判断需要上下文吗?

有些工单,AI 光看消息文本就能判断得很准。但有些情况,真正的意图藏在文本之外:

  • 客户历史上退过几次货了?
  • 这个订单是否处于促销期,退货会触发特殊规则?
  • 客服备注里有没有这个客户的特殊情况?

如果正确判断需要的上下文,模型当前不具备,那置信度数字本身就是虚高的——模型不知道自己不知道什么。这种情况,再高的置信度也不该触发自动执行。


用这个框架重新看那个工单

回到开头那条退货消息,套进三个变量:

  • 出错代价:中等(触发了不该触发的退货申请,可以撤销,但产生摩擦)
  • 可逆性:高(申请发出后客户可以主动取消)
  • 上下文依赖:低("颜色和图片差太多"意图明确,不需要额外信息)

这个组合,倾向于可以自动执行,但要给客户一个明确的撤销入口。客服不需要介入每一条,但系统要在操作后保留一个清晰的"撤销窗口"和操作记录。

换一条消息试试:

"我买的东西有点问题,你们怎么说?"

  • 出错代价:不确定(不知道"问题"是什么,处理方式差异很大)
  • 可逆性:取决于后续动作
  • 上下文依赖:极高("有点问题"几乎没有信息量)

这个组合,模型的置信度无论多高,都不该自动执行任何流程。正确的界面行为是:标注"意图不明确,建议人工介入",并把可能的意图选项列出来,让客服快速选择而不是从头处理。


界面设计的几个具体含义

这个框架落到界面上,会带来几个具体的设计要求,是我觉得目前大多数产品做得不够的地方:

置信度要可见,但不能只是一个数字。

"置信度 94%"对普通客服来说没有意义。更有用的呈现是:把这个数字翻译成行动建议——"建议直接处理"、"建议确认后处理"、"建议人工介入"。数字留给系统日志,界面上给人看的是判断,不是概率。

自动执行之后,操作记录必须显眼。

如果 AI 自动触发了某个流程,这个动作不能藏在日志里。它应该在工作台上有明显的呈现:"系统已自动发起退货申请 · 10分钟内可撤销"。人工覆盖的成本越低,越敢放权给 AI 自动执行。

人工覆盖不该是"报错",是正常流程的一部分。

很多系统设计里,人工覆盖 AI 判断是一个"异常路径"——操作步骤多、界面不顺畅、有时候还要填理由。这个设计隐含了一个假设:AI 是对的,人工推翻是例外。

但实际上,人工覆盖是正常的。模型不可能永远对,边缘案例永远存在。界面应该让"我不同意这个判断"这个操作和"我同意"一样顺畅——一个点击,不需要解释,不需要走审批流。


商家后台的同一个问题

这个框架不只适用于售后工单,商家后台里同样存在大量"AI 已判断,然后呢"的设计问题。

比如 AI 检测到某个 SKU 的库存即将断货,预测三天内售罄——界面该做什么?

自动触发补货申请?发一条通知让运营确认?还是只在数据看板上标注一个预警色,等运营自己发现?

套进同样的框架:

  • 出错代价:高(错误补货会导致积压或资金占用)
  • 可逆性:低(补货指令发出之后,供应链已经启动)
  • 上下文依赖:高(补货决策依赖当前促销计划、账期、仓库容量……这些数据模型不一定都有)

这个组合,答案很清晰:不该自动执行,应该是"高优先级提醒 + 一键确认" 。AI 做信息聚合和预测,人来做最终决策。界面的工作是把"确认"这个动作做得足够顺畅,减少决策摩擦,而不是代替决策。


还没想清楚的地方

置信度阈值该怎么定,谁来定?

我说的"高于某个阈值自动执行",这个阈值应该是固定的系统参数,还是让商家自己配置?不同规模的商家、不同品类的商品,对出错的容忍度差异很大。把这个权力交给商家配置,是更诚实的设计,但也带来了新的认知负担——商家未必知道 94% 和 87% 的置信度在实际操作里意味着什么。

当 AI 频繁被人工覆盖,系统该怎么反应?

如果某类工单的 AI 判断被客服推翻的比例很高,这是一个明确的信号:要么模型在这个类目表现差,要么界面的行动建议设计有问题,要么这类工单本来就不适合 AI 介入。这个反馈机制,应该是自动的,而不是靠数据团队定期去看日志才能发现。


与这个系列的关系

第一篇建立了一个框架:AI 在哪两类场景真正有价值。这篇往前走了一步:当 AI 真的介入之后,界面的责任不是消失,而是变了——从"帮用户完成操作",变成"在 AI 和人之间建立一个合理的权力分配机制"。

下一篇打算进入决策层,聚焦导购 Agent——那里的问题方向相反:不是"AI 判断了,人怎么接管",而是"用户说不清楚自己要什么,AI 怎么开始"。


这篇是观察和思考的笔记,框架还很粗糙。如果你在做类似的产品或界面设计,欢迎交流——特别是那个阈值配置的问题,我还没想到好的解法。

我写了个系统,每天上朝批奏折:把 Agent 做成「文武百官」是什么体验

作者 芋圆ai
2026年4月3日 16:47

我写了个系统,每天上朝批奏折:把 Agent 做成「文武百官」是什么体验

当小人被关进天牢的那一刻,就是朕决定掏钱的那一刻。

写在前面

09_副本.png

先说结论:当你用「拟圣旨」的方式给 Agent 下指令,看着丞相带着六部尚书在 2D 像素宫殿里跑来跑去给你办差——这玩意儿比你想象的更有成就感。

我叫它 Syntropy(太和),一个基于古代朝廷隐喻的可视化多智能体操作系统。但不是那种换个皮就完事的「国潮包装」,而是真的把 Agent 的执行过程「具象化」了:

  • Agent 不再是日志里的一串 tool_call_pending,而是坐在工位上的像素小人
  • 任务调度不再是抽象的 Orchestrator,而是丞相(Minister)在廷议上发号施令
  • 风险拦截不再是冷冰冰的 await human_approval(),而是把 Agent 关进天牢,等你御批

你可能会问:为什么要搞这么复杂?直接写代码不行吗?

因为现有的 Multi-Agent 框架(LangChain、AutoGen、CrewAI……)有一个共同的问题:它们是黑盒。你能看见输入和输出,但中间的思考、决策、调度、执行,全在一团混沌的日志里。你调了半天 prompt,还是不知道 Agent 卡在哪一步。

所以我的思路很简单:既然 Agent 的执行过程看不见,那就让它「看得见」。

08_副本.png

这篇文章会拆解这套系统的技术架构,但不会堆砌术语。咱们边「上朝」边聊:怎么把状态机映射成像素动画、怎么实现前后端实时同步、怎么设计「御批」机制,以及——为什么「当皇帝」这个隐喻,反而让 Agent 系统更好用了。


1. 问题:Agent 系统的「三大弊政」

在动手之前,我先总结了当前 Multi-Agent 系统的三个「弊政」:

1.1 黑盒化(Black Box)——「爱卿,你到底在想什么?」

Agent 的 Chain-of-Thought(思维链)是一串嵌套的 JSON,工具调用是一条条 log 记录。你很难从这些信息里快速判断:

  • Agent 现在在做什么?是在思考,还是在等你审批?
  • 它在等谁?丞相在等户部尚书查账,还是兵部尚书在调兵?
  • 它卡在哪一步了?是 LLM 没返回,还是工具调用超时?
  • 它的决策依据是什么?为什么它选择了这个方案而不是另一个?

01_副本.png

现状:你只能盯着终端滚屏的日志,祈祷 Agent 别卡死。

1.2 失控风险(Uncontrollable)——「爱卿,这奏折朕没批你就敢执行?」

Agent 自主调用工具是一件很危险的事。删除文件、转账、调用外部 API —— 这些操作一旦失控,后果不可逆。

现有的「Human-in-the-loop」方案要么是简单的 input() 阻塞(体验极差),要么需要入侵式地修改整条执行链路(开发成本高)。

现状:你要么相信 Agent 不会乱来,要么就得自己写一套审核机制。

1.3 记忆遗忘(Amnesia)——「爱卿,昨天说的事你怎么忘了?」

大多数 Agent 框架的记忆系统基于关键词匹配或纯向量检索。关键词匹配漏掉语义相关的信息,纯向量检索又容易在专有名词上翻车。长对话场景下,Agent 往往会「忘记」几轮之前说过的关键信息。

现状:你只能不断「提示」Agent,把上下文塞给它,直到 Token 爆掉。


2. 方案:把 Agent 变成「可观测的臣子」

Syntropy 的核心思路只有一句话:

所见即所思(What you see is what they think)。

具体拆解为三个「治国方略」:

2.1 可视化运行时(Visualized Runtime)——「爱卿们,都动起来」

Agent 的内部状态机(THINKINGACTINGWAITINGERROR)不只在日志里打印,而是实时映射为 2D 像素小人的行为动画

  • THINKING → 小人在原地踱步,头顶冒出气泡(正在思考)
  • ACTING → 小人移动到对应的「工位」(户部查账、兵部调兵、工部造器械……)
  • WAITING_FOR_HUMAN → 小人被「关进天牢」,等待你的御批
  • ERROR → 小人倒地,头顶冒叉(出错了,得查日志)

这套映射让 Agent 的执行状态变得一眼可见。你不再需要翻几百行日志,只需要看一眼沙盘,就知道哪个 Agent 卡住了、它在干什么、它在等谁。

2.2 内核级状态机(Kernel-Level State Machine)——「朝堂规矩,不可乱」

后端 Agent 的生命周期被标准化为一个有限状态机(FSM):

agent-state-machine.png 这个 FSM 是「内核级」的,意味着:

  • LLM 推理(Reasoning)和工具执行(Execution)完全解耦
  • 每个状态的进入和离开都会触发标准化的事件(可观测)
  • 风险拦截、记忆压缩、日志追踪都可以作为「状态钩子」无痕插入

简单说:Agent 不再是「自由发挥」,而是按「朝堂规矩」办事。

2.3 人机协同「御批」协议(Human-in-the-loop)——「这道奏折,朕要亲自批」

每个工具调用都有 riskLevel(low / medium / high)。当 Agent 试图执行高风险操作时:

  1. 内核挂起当前执行流,状态切换为 WAITING_FOR_HUMAN
  2. 前端收到 approval_request 事件,弹出「御批」弹窗
  3. 用户点击「准奏」或「驳回」,通过 Socket 发回指令
  4. 内核恢复执行流(或回滚)

这套机制让「人审」不再是事后补救,而是执行流程的有机组成部分


3. 核心功能:奏折阁与决策树

讲完架构,聊聊实际用起来是什么感觉。

3.1 奏折阁(Imperial Archives)——「每道圣旨,都有迹可循」

在 Syntropy 里,每一次用户指令都被封装为一份**「奏折」。奏折阁是系统的任务管理中心,完整记录了从拟旨 → 受理 → 分发 → 复命**的全过程。

奏折的核心特性

  • 折叠/展开:每份奏折默认折叠,仅展示摘要(如"查Q1税收");展开后可查看完整的对话链路与决策树
  • 多视角叙事:左侧展示百官的回复与思考过程,右侧展示皇帝(用户)的指令,清晰还原对话脉络
  • 状态追踪:每份奏折都有明确的状态标签(待处理 / 进行中 / 已完成 / 已驳回),方便追溯

3.2 决策树可视化——「丞相的思考,一目了然」

当丞相收到一道复杂指令(如「查一下上季度的营收,并对比去年同期」),它不会直接给出答案,而是会拆解任务、调度六部、汇总结果。整个过程形成一棵决策树

decision-tree.png

在奏折阁中,这棵决策树以可视化流程图的形式呈现。你可以清楚地看到:

  • 丞相调度了哪些 Agent
  • 每个 Agent 执行了什么操作
  • 每一步的输入和输出是什么
  • 如果某一步出错,具体卡在哪里

02_副本.png

这对调试和优化至关重要。你不再需要猜"Agent 为什么没按我的预期做事",而是可以直接看到它的决策路径,找出问题所在。

3.3 记忆库(Memory Vault)——「史官的起居注」

记忆库展示 Agent 主动保存的重要信息(个人偏好、项目决策、关键事实),支持:

  • 搜索与过滤:按关键词搜索,或按类别(personal / preference / project / decision)筛选
  • 在线编辑:直接在前端编辑或删除记忆条目,实时同步到后端
  • 语义分类:LLM 根据上下文自动选择记忆类别,无需人工标注

4. 技术实现:朝堂是如何运转的

4.1 前端:React + Phaser 的「双引擎」架构

前端是 Syntropy 最复杂的部分。我们需要同时处理两类需求:

  • UI 层:奏折面板、记忆库、官员状态 HUD、御批弹窗……这些是典型的 React 组件
  • 渲染层:2D 像素沙盘、角色动画、寻路、碰撞检测……这些是游戏引擎的领域

我们的方案是 React-Phaser Bridge

frontend-architecture.png

  • Zustand 作为单一数据源(Single Source of Truth),存储所有 Agent 的状态
  • React 组件订阅 Zustand Store,渲染 UI 面板
  • Phaser 3update() 循环中读取 Store,同步小人动画

这套架构的好处是:UI 和渲染完全解耦。React 不用关心像素坐标,Phaser 不用关心业务逻辑,两者通过 Zustand 的状态桥接。

关键代码片段:状态同步
// store/agentStore.ts
export const useAgentStore = create<AgentStore>((set) => ({
  agents: {},
  updateAgent: (id, updates) =>
    set((state) => ({
      agents: {
        ...state.agents,
        [id]: { ...state.agents[id], ...updates },
      },
    })),
}));
// game/MainScene.ts
export class MainScene extends Phaser.Scene {
  update() {
    const agents = useAgentStore.getState().agents;
    Object.values(agents).forEach((agent) => {
      const sprite = this.sprites[agent.id];
      if (sprite) {
        sprite.updateState(agent.status, agent.targetPosition);
      }
    });
  }
}

4.2 后端:自研 Agent 框架

Syntropy 的后端完全自研,不依赖任何现有的 Agent 框架(LangChain、AutoGen 等)。原因很简单:现有框架的状态机模型和我们需要的不完全匹配

后端整体架构

backend-architecture.png

后端核心模块:

模块 职责
Kernel Agent 生命周期管理,状态机调度
Agent 单个 Agent 的 LLM 调用、工具执行、状态流转
LLM Provider 统一的 LLM API 抽象(支持 OpenAI / DeepSeek)
MemoryManager 记忆存储与检索(FTS5 + Vector + RRF)
SocketGateway 前后端实时通信
Tracer 全链路追踪与结构化日志
Agent 核心状态机
// server/core/Agent.ts
class Agent {
  private state: AgentState = 'IDLE';

  async processMessage(message: string) {
    this.setState('THINKING');
    const response = await this.llm.chat(message);
    
    if (response.toolCalls) {
      for (const toolCall of response.toolCalls) {
        const risk = this.assessRisk(toolCall);
        if (risk === 'high') {
          this.setState('WAITING_FOR_HUMAN');
          await this.waitForApproval(toolCall);
        }
        await this.executeTool(toolCall);
      }
    }
    
    this.setState('IDLE');
    return response.content;
  }
}

4.3 记忆系统:RRF 混合检索引擎

Syntropy 的记忆系统不是纯向量检索,而是三位一体的混合架构:

  1. SQLite:结构化元数据(时间、类别、Agent ID)
  2. FTS5:全文倒排索引,精准匹配关键词
  3. Vector:语义向量,模糊语义检索

检索时,我们使用 Reciprocal Rank Fusion (RRF) 算法合并 FTS 和 Vector 的结果:

RRF_score = Σ (1 / (k + rank_i))

其中 k 是平滑常数(通常取 60),rank_i 是某条记忆在第 i 个检索引擎中的排名。

为什么需要 RRF?
  • 关键词场景:用户问「昨天的税收是多少」,FTS 能精准命中包含"税收"的记录,Vector 可能因为语义漂移而漏掉
  • 语义场景:用户问「最近有什么异常吗」,FTS 因为没有一个明确的关键词而失效,Vector 能理解"异常"的语义

RRF 让两者互补,召回率显著提升。

记忆压缩(Memory Compression)

当 Agent 进入 SLEEPING 状态时,系统自动调用 LLM 对当日未处理的对话进行摘要,生成 daily_summary 并持久化。这解决了长对话场景下的 Token 溢出问题。

// server/runtime/MemoryManager.ts
async compressMemories(agentId: string, conversations: Conversation[]) {
  const summary = await this.llm.chat(`
    请对以下对话进行摘要,提取关键决策和待办事项:
    ${conversations.map(c => c.content).join('\n')}
  `);
  
  await this.db.insert('memories', {
    agentId,
    type: 'daily_summary',
    content: summary,
    timestamp: Date.now(),
  });
}

4.4 全链路追踪(Tracer)

每个用户指令生成唯一的 traceId,贯穿 Agent 调度、工具调用、LLM 推理全流程。Tracer 记录 8 种诊断事件:

  • agent.turn:Agent 开始处理一轮对话
  • tool.call:工具调用
  • model.usage:LLM 调用及 Token 消耗
  • dispatch:任务分发
  • approval.wait:等待御批
  • approval.done:御批完成
  • agent.stuck:Agent 卡死检测(3 分钟无响应自动告警)
  • memory.save:记忆保存

所有事件自动脱敏(截断 API Key 等敏感信息),便于性能分析和故障排查。


5. 架构反思

5.1 为什么选 Phaser 而不是 Canvas / SVG?

Phaser 是一个专业的 2D 游戏引擎,提供了完整的场景管理、精灵动画、碰撞检测、寻路算法。如果用 Canvas 手写,这些都要从零实现;如果用 SVG,大量 DOM 元素会导致性能问题。

Phaser 的 WebGL 渲染让我们可以轻松实现:

  • 像素小人的平滑移动动画
  • 头顶气泡的动态效果
  • 大规模 Agent 同时活动时的性能保障

5.2 为什么不直接用 LangChain / AutoGen?

LangChain 和 AutoGen 的状态机模型是「扁平」的:Agent 顺序执行 Thought → Action → Observation。但我们需要的是内核级的状态机,能够:

  • 在任意时刻挂起执行流(御批拦截)
  • 在任意时刻注入新状态(外部干预)
  • 标准化所有状态转换事件(可观测性)

现有的框架很难在不破坏封装的前提下实现这些需求。

5.3 视觉隐喻的取舍

「古代朝廷」这套视觉系统是一把双刃剑:

优点

  • 降低了多 Agent 系统的认知门槛,非技术用户也能理解「丞相调度六部」的概念
  • 增强了产品的辨识度和传播性

缺点

  • 增加了设计和开发成本(像素素材、动画、场景布局)
  • 对部分技术用户来说可能显得「花哨」

我们的判断是:可视化的核心价值在于降低认知负担,视觉隐喻只是手段。如果换一套视觉系统(如太空站、工厂流水线),只要核心架构不变,价值依然存在。


6. 开源与未来

Syntropy 目前处于 Beta 阶段,代码已开源:GitHub - zabr1314/Syntropy

未来规划:

  • 技能市场:允许开发者为 Agent 开发自定义技能(类似 VSCode 插件)
  • 多场景模板:除了「朝廷」,提供「太空站」「工厂」等可选视觉主题
  • Agent 编排可视化:拖拽式构建多 Agent 协作链路
  • 分布式部署:支持将不同 Agent 部署在不同节点上

7. 结语:当皇帝的一天

Syntropy 本质上是一次将 AI 黑盒具象化的尝试。我们相信:

当 Agent 的执行过程变得可见、可交互、可干预,多智能体系统才能真正从实验室走向生产环境。

但除此之外,它还有一个不那么「技术」的价值:它让管理 Agent 变成了一件有趣的事

每天早上打开系统,看到丞相已经在廷议上等你,六部尚书各就各位。你拟定一道圣旨,看着小人们在宫殿里跑来跑去办差。有时候他们会卡住,有时候他们会犯错,有时候他们会把奏折递到你面前等你御批——这一刻,你不是在 debug,你是在当皇帝。

如果你对这套架构感兴趣,或者有自己的看法,欢迎在 GitHub 上交流。


相关资源

❌
❌