普通视图

发现新文章,点击刷新页面。
今天 — 2026年4月7日首页

如何使用飞书机器人连接本地 AI Agent

作者 Cobyte
2026年4月7日 08:30

1. 前言

我们在上一篇文章中讲解了如何使用微信 ClawBot 连接本地 AI Agent,这一篇我们讲解如何使用飞书机器人连接本地 AI Agent。

首先我们需要创建一个飞书应用。

2. 创建飞书应用

2.1 打开飞书开放平台

访问飞书开发平台 open.feishu.cn/app 并登录。

2.2 创建应用

1.点击 创建企业自建应用

image.png

2.填写应用名称和描述

image.png

3.选择应用图标

2.3 复制凭证

在“凭证与基础信息”中复制:

  • App ID
  • App Secret

2.4 启用机器人能力

image.png

2.5 配置权限

权限管理中点击批量导入/导出权限按钮

image.png

然后复制粘贴以下权限:

{
  "scopes": {
    "tenant": [
      "aily:file:read",
      "aily:file:write",
      "application:application.app_message_stats.overview:readonly",
      "application:application:self_manage",
      "application:bot.menu:write",
      "cardkit:card:read",
      "cardkit:card:write",
      "contact:user.employee_id:readonly",
      "corehr:file:download",
      "event:ip_list",
      "im:chat.access_event.bot_p2p_chat:read",
      "im:chat.members:bot_access",
      "im:message",
      "im:message.group_at_msg:readonly",
      "im:message.p2p_msg:readonly",
      "im:message:readonly",
      "im:message:send_as_bot",
      "im:resource"
    ],
    "user": ["aily:file:read", "aily:file:write", "im:chat.access_event.bot_p2p_chat:read"]
  }
}

最后点击确定并申请开通。

2.6 配置事件订阅

事件与回调中进行事件配置,选择订阅方式为长连接,这样我们就可以在本地电脑也可以连接飞书机器人了(本质是WebSocket)。

image.png

接着添加接收消息事件:im.message.receive_v1

image.png

加密策略设置,这个可选项。

image.png

2.7 发布应用

接着我们在版本管理与发布中创建一个版本并发布。

image.png

一般我们在测试的时候,我们自己的飞书账号就是管理员,上述版本申请发布后会在手机飞书上收到一条审核消息,我们按提示进行操作审核即可。

这时我们就可以在手机飞书上搜索我们刚刚创建的“AI 机器人”,然后点击进去就可以对话了,但目前我们还没通过代码连接它,所以还对不了话。

01.jpg

值得注意的是:上述飞书机器人配置也是OpenClaw的官方飞书机器人配置方案。

3. 实现飞书连接本地 Agent

3.1 构建 API Client

根据飞书开发平台开发文档的 《Python SDK 指南》,我们在通过 SDK 调用飞书开放接口之前,需要先在代码中创建一个 API Client,用来指定当前使用的应用、日志级别、HTTP 请求超时时间等基本信息。

我们的实现如下:

from dataclasses import dataclass
import lark_oapi as lark

@dataclass
class FeishuConfig:
    """飞书渠道配置"""
    app_id: str = ""
    app_secret: str = ""
    encrypt_key: str = "" # 可选字段,空字符串表示不使用加密
    verification_token: str = "" # 可选字段,空字符串表示不验证消息来源

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        self.config = config
        # 构建 API Client
        self._client = lark.Client.builder() \
            .app_id(config.app_id) \
            .app_secret(config.app_secret) \
            .build()

首先我们实现一个配置类来统一管理上述我们说到的 App IDApp Secret 以及在配置事件订阅中讲到的加密策略设置的 Encrypt KeyVerification Token

后续我们就可以通过依赖注入模式调用 FeishuChannel 类了:

# 使用
config = FeishuConfig(app_id="...", app_secret="...")
channel = FeishuChannel(config=config)  # 配置通过构造函数注入

这样实现了配置与实现分离,将来支持热更新配置。

3.2 长连接飞书客户端

根据飞书开发平台开发文档的 《Python SDK 指南》的处理事件章节,我们的实现如下:

from loguru import logger
class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    def start(self) -> None:
        # 构建事件处理器
        builder = lark.EventDispatcherHandler.builder(
            self.config.encrypt_key, 
            self.config.verification_token
        )
        # 注册接收消息事件处理函数 im.message.receive_v1
        handler = builder.register_p2_im_message_receive_v1(self._on_message).build()
        # 初始化长连接客户端并传入事件处理器   
        ws_client = lark.ws.Client(
            self.config.app_id, 
            self.config.app_secret, 
            event_handler=handler
        )
        # start() 方法会阻塞主线程,持续运行,直到手动关闭
        ws_client.start()

        logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""

我们首先构建事件处理器,同时如果你在开发者后台的应用详情中,配置了 事件与回调 > 加密策略 页面内的加密信息(Encrypt Key 和 Verification Token)。则必须将加密信息的值传递到 EventDispatcherHandler.builder 方法的参数中。

接着我们注册接收消息事件处理函数。也就是我们在前面配置事件订阅小节所讲的添加的接收消息事件:im.message.receive_v1

最后通过 lark.ws.Client() 初始化长连接客户端,必填参数为应用的 APP_ID 和 APP_SECRET。我们在上面讲述创建飞书应用的时候已经阐述过 APP_ID 和 APP_SECRET 了。

值得注意的是:飞书的长连接客户端 (ws.Client) 只能用来"接收"事件,不能用来"发送"消息!如果要主动发消息或回复消息,必须使用普通的 Open API 客户端 (lark.Client)。

3.3 接收消息事件处理函数

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    def start(self) -> None:
        # 省略...

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""
        msg = data.event.message
        # 只处理用户发送的纯文本消息
        if data.event.sender.sender_type == "bot" or msg.message_type != "text":
            return

        content = json.loads(msg.content).get("text", "")
        if not content:
            return
        logger.info(f"收到{msg.chat_id}消息: {content}")
        # 发送消息
        self._process_and_reply(msg.chat_id, content)

我们这里先只处理用户发送的纯文本消息。

3.4 发送消息到 AI Agent 处理并进行回复

前面说了飞书的长连接客户端 (ws.Client) 只能用来"接收"事件,不能用来"发送"消息!如果要主动发消息或回复消息,必须使用普通的 Open API 客户端 (lark.Client)。

所以根据飞书开发平台开发文档的 《Python SDK 指南》的服务器API章节的《发送消息》,我们的实现如下:

import agent_loop as agent  # 导入本地 AI Agent 逻辑模块

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...
    def start(self) -> None:
        # 省略...
    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        # 省略...
    def _process_and_reply(self, chat_id: str, content: str) -> None:
        """调用本地 AI Agent 获取结果并调用飞书 API 发送"""
        try:
            history = [
                {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个 AI 助手")},
                {"role": "user", "content": content}
            ]
            
            # 1. 运行 AI 思考逻辑
            reply_text = agent.agent_loop(history)
            if not reply_text:
                return

            # 2. 发送回复
            receive_id_type = "chat_id" if chat_id.startswith("oc_") else "open_id"
            req = CreateMessageRequest.builder().receive_id_type(receive_id_type).request_body(
                CreateMessageRequestBody.builder()
                .receive_id(chat_id)
                .msg_type("text")
                .content(json.dumps({"text": reply_text}))
                .build()
            ).build()
            
            resp = self._client.im.v1.message.create(req)
            if resp.success():
                logger.info(f"➡️ 成功回复消息到: {chat_id}")
            else:
                logger.error(f"❌ 回复失败: {resp.msg}")
                
        except Exception as e:
            logger.error(f"❌ 处理异常: {e}")

_process_and_reply 方法中,我们首先构造与 AI Agent 的对话历史,调用 agent.agent_loop(history) 运行我们前面实现的 AI Agent 进行思考获取回复文本。这部分我们应该比较熟悉了。

根据飞书开发平台开发文档的 《发送消息》指南,发送消息时需要指定 receive_id_type 参数。飞书的 ID 有两种类型:

  • chat_id:以 oc_ 开头的群聊 ID
  • open_id:用户个人 Open ID

我们通过简单的字符串前缀判断 chat_id.startswith("oc_") 来区分这两种类型。

构建消息请求时:

  1. 首先创建 CreateMessageRequest 的建造者
  2. 设置 receive_id_type(接收者ID类型)
  3. 通过 request_body() 设置请求体,其中又使用 CreateMessageRequestBody 的建造者设置具体参数
  4. 调用 .build() 方法最终构建请求对象

这里需要注意的是,飞书消息的 content 字段必须是 JSON 字符串格式,所以我们需要使用 json.dumps({"text": reply_text}) 将回复文本包装成 JSON。

最后调用 self._client.im.v1.message.create(req) 发送消息到飞书服务器,并根据响应结果记录成功或失败日志。

现在我们已经完成了飞书渠道的核心实现,接下来编写启动脚本。

4. 启动飞书机器人

我们创建一个独立的启动文件 test_feishu.py

from loguru import logger
from feishu import FeishuChannel, FeishuConfig

def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxxx",         # 替换为真实的 App ID
        app_secret="xxxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
    
    # 2. 初始化频道并启动长连接
    channel = FeishuChannel(config=config)
    
    logger.info("正在启动飞书机器人长连接...")
    
    # 3. 启动并保持运行
    try:
        channel.start()
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    main()

启动脚本的实现非常简单直接:

第一步:配置飞书机器人凭证

我们需要创建 FeishuConfig 实例,填入在飞书开放平台创建应用时获取的凭证:

  • app_id:应用的唯一标识,以 cli_ 开头
  • app_secret:应用的密钥,用于 API 身份验证
  • encrypt_key:可选,如果在开发者后台配置了加密策略则需要填入
  • verification_token:可选,用于验证消息来源

第二步:初始化飞书频道

使用配置对象创建 FeishuChannel 实例。这里体现了依赖注入模式的优势——我们可以轻松更换不同的配置来源。

第三步:启动并保持运行

调用 channel.start() 启动飞书机器人的 WebSocket 长连接。这个方法会阻塞主线程,持续运行直到收到退出信号。

我们使用 try...except KeyboardInterrupt 捕获用户按 Ctrl+C 的中断信号,实现优雅退出。当用户想要停止机器人时,只需在终端中按 Ctrl+C,程序会记录退出日志然后正常结束。

运行机器人

在启动之前,先安装相关依赖。

requirements.txt 依赖如下:

python-dotenv==1.0.1
openai==2.24.0
loguru==0.7.3
# 飞书官方 Python SDK
lark-oapi>=1.2.0

执行 pip install -r requirements.txt 安装依赖。

依赖安装完毕后,在终端中执行:

python test_feishu.py

你会看到以下输出:

2026-04-05 00:59:23.456 | INFO     | 正在启动飞书机器人长连接...

此时机器人已经启动并开始监听飞书消息了。你可以在飞书中找到你的机器人应用,发送消息进行测试。

5. 启用异步事件循环线程

5.1 Node.js 和 Python 的主线程对比

我们知道 Node.js 默认的主线程就是事件循环线程,而 Python 的主线程默认是没有事件循环的。所以我们在上面也提到了调用 channel.start() 启动飞书机器人的 WebSocket 长连接会阻塞主线程。因为飞书 Python SDK 的 lark.ws.Client 采用同步阻塞设计,其 start() 方法会启动自己的事件循环并持续运行。主要体现在 FeishuChannel 类的 start 方法中以下代码:

ws_client.start()
logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

我们发现执行 ws_client.start() 代码后日志不打印了,因为主线程被阻塞了。

而在 Node.js 中以下的代码则不会被阻塞:

const WebSocket = require('ws');

function run_ws() {
  const ws = new WebSocket('ws://example.com/socket');

  ws.on('open', () => {
    console.log('[Worker] WebSocket connected');
  });

  ws.on('message', (data) => {
    console.log('[Worker] Received:', data.toString());
  });
}

// 同步执行
run_ws();
console.log('主线程没有被阻塞');

我们发现直接在 Node.js 的主线程中启动 WebSocket 连接服务是不会阻塞的,因为 Node.js 中的 WebSocket 本身是基于 Node.js 的事件循环来实现的。

5.2 通过 asyncio 启动异步事件循环

通过上一小节我们知道 Python 默认的主线程是没有事件循环的,需要显式创建运行。目前在 Python 3.7+ 推荐通过 asyncio 来启动异步事件循环。

实现很简单:

async def main():
    # 省略...
    try:
        await channel.start()
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

上述代码的转换过程等于:

主线程 (执行Python代码)
    ↓
调用 asyncio.run()
    ↓
主线程内部启动事件循环
    ↓
主线程 = 异步事件循环线程

但现在运行会报错,因为虽然主线程启动了异步事件循环线程,但飞书 SDK 的 lark.ws.Client 内部也使用了异步事件循环,当我们在已有的 asyncio 事件循环中调用它时,会产生冲突。

所以我们需要在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突。修改如下:

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    async def start(self) -> None:
        # 省略...
        def run_ws():
            # 为 WebSocket 客户端所在线程设置专属的事件循环,避免报错
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            import lark_oapi.ws.client
            lark_oapi.ws.client.loop = loop
            # 初始化长连接客户端
            ws_client = lark.ws.Client(
                self.config.app_id, 
                self.config.app_secret, 
                event_handler=handler
            )
            ws_client.start()
        # 在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突
        threading.Thread(target=run_ws, daemon=True).start()
        logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

        # 保持主协程存活
        while True:
            await asyncio.sleep(1)

主要修改了 FeishuChannel 类的 start 方法,通过 threading.Thread 在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突,并为飞书 WebSocket 客户端所在线程设置专属的事件循环,避免报错。

5.3 防止 AI 处理阻塞消息接收

无论是 Python 还是 Node.js,都需要防止耗时的 AI 处理阻塞消息接收。所以我们需要修改 _on_message 方法:

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    async def start(self) -> None:
        # 省略...

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        # 省略...

        # 启动独立线程处理 AI 逻辑和回复,防止阻塞 WebSocket 接收循环
        threading.Thread(
            target=self._process_and_reply, 
            args=(msg.chat_id, content)
        ).start()

我们看到在上述代码中启动一个独立线程处理 AI 逻辑和回复,防止阻塞 WebSocket 接收消息循环。

这时我们再次启动代码,我们可以看到 start 方法中的日志同步打印了:

2026-04-05 01:20:01.456 | INFO     | 正在启动飞书机器人长连接...
2026-04-05 01:20:01.457 | INFO     |  飞书极简版机器人已启动 (WebSocket)

6. 总结

至此,我们已经完整实现了通过飞书机器人连接本地 AI Agent。通过上述实践,我们不仅掌握了具体的实现步骤,更重要的是理解了背后的技术原理:

  • Node.js vs Python 事件循环:Node.js 主线程本身就是事件循环线程,天生异步;Python 则需要显式创建事件循环,主线程默认同步执行
  • Python 异步架构:深入理解了 asyncio 事件循环与多线程的协作模式,以及如何通过线程隔离解决 SDK 的冲突

记住:技术的最佳学习方式就是动手实践。现在就去飞书开放平台创建你的应用,启动这个机器人,感受 AI 与即时通讯结合的魅力吧!

我是程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

昨天以前首页

1.基于依赖追踪和触发的响应式系统的本质

作者 Cobyte
2026年4月1日 19:57

前言

Vue1、Vue3、SolidJS、Mobx 它们的数据响应式基本原理都是一样的,具体区别只是设计理念和实现方式不一样。按以前读书时代考试做题一样,它们是同一类型的题目,都是基于依赖收集和触发的运行时的数据响应式,如果说你只会解答其中一道题,其他的题却不会解答,则说明你并没有真正彻底掌握这一类题。

为什么标题说“基于依赖追踪的响应式系统”,因为在前端响应式的框架有很多,但他们的实现原理却各有不同。 在前端一般谈到响应式框架,可能大家都会不约而同地联想到 Vue,除了 Vue 之外,也许还有人会想到 React 以及 Svelte、SolidJS。他们都有一个共同点,都是通过数据驱动视图。他们在实现方式上又互相有一些相似之处,其中 Vue 和 React 都采用了虚拟DOM技术,Vue 和 SolidJS 的数据响应式实现则都是采用了依赖追踪的方式,所以在数据响应式的实现方面 Vue 和 SolidJS 最相似,而 Svelte 的实现方式则跟 Vue 和 React 都不一样,Svelte 是基于编译响应式。当然 Vue 和 React 的具体实现技术也是不一样的,但它们在宏观层面则是一样,都是通过数据驱动视图,当数据发生变化时,视图会重新渲染,这种机制使得开发者只需要关注数据的变化,而不需要手动操作 DOM。Vue 通过数据劫持,使得操作数据需要额外的 API ,系统变能感知数据的变化,而 React 和 SolidJS 则需要手动调用 API 去触发数据变化。

手动操作 DOM 的上古时代

例如我们现在有一个这样的需求,有一个按钮 <button>0</button>,当我们点击按钮的时候,按钮中的文本就进行加 1。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title>手动操作DOM</title>
  </head>
  <body>
    <button id="btn"></button>
    <script>
        let count = 0
        const btnEl = document.getElementById('btn')
        btnEl.textContent = count
        btnEl.addEventListener('click', function () {
            count++
            btnEl.textContent = count
        })
    </script>
  <body>
</html>

上述的 button 按钮是通过 HTML 进行渲染的,我们还可以通过 JavaScript API 进行创建。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title>手动操作DOM</title>
  </head>
  <body>
-    <button id="btn"></button>
    <script>
        let count = 0
-        const btnEl = document.getElementById('button')
+        const btnEl = document.createElement('button')
+        const textNode = document.createTextNode(count)
+        btnEl.appendChild(textNode)
        btnEl.addEventListener('click', function () {
            count++
            btnEl.textContent = count
        })
+        document.body.appendChild(btnEl)
    </script>
  <body>
</html>

上述这种方式,在项目应用非常庞大的时候,开发效率是非常低下的,同时维护成本却又非常高的,所以就出现了像 React、Vue 这种通过数据进行驱动视图的前端框架。

通过数据驱动视图

虽然 Vue 和 React 在具体的实现技术方案上差异是非常大的,但在宏观层面它们则是一样的,都是通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上,从而提高性能。

不管是 Vue 还是 React 的虚拟DOM 本质上都是一个对象,上面记录着一些真实 DOM 的信息,比如 type、props、children,我们这里简单模拟一下,并通过虚拟DOM 和 Diff 算法来改写上面的例子。

首先我们通过一个 createElement 的函数来创建一个节点的虚拟DOM对象,这里跟 React 的对齐,children 节点在 props 中,Vue 的 children 是跟 props 同级的,这些差别对我们进行宏观研究不重要。

function createElement(type, props) {
    return {
        type,
        props
    }
}

接着我们创建一个函数组件 App。

count = 0
function App (){
    return createElement('button', {
        onClick: () => {
            count ++
            setCount(count)
        },
        children: count
    })
}

接着我们通过以下方式把创建的虚拟DOM 挂载到根节点上。

render(App(), document.getElementById('app'))

接着我们实现 render 方法

let oldVnode
function render(vNode, container) {
    if (!oldVnode) {
        oldVnode = vNode
        const el = document.createElement(vNode.type)
        // 保存真实DOM 到虚拟DOM 的 el 属性上,将来更新的时候,不用重新创建,从而达到提高性能效果
        oldVnode.el = el
        const textNode =  document.createTextNode(vNode.props.children)
        el.appendChild(textNode)
        // 绑定虚拟DOM 上的事件
        el.addEventListener('click', vNode.props.onClick)
        container.appendChild(el)
    } else if(oldVnode.props.children !== vNode.props.children) {
        oldVnode.el.textContent = vNode.props.children
    }
}

我们在这里非常简单且宏观的实现了新老虚拟DOM 的对比,当不存在旧虚拟DOM 则是挂载阶段,创建真实DOM,并保存到虚拟DOM 的 el 属性上,将来更新的时候,不用重新创建,从而达到提高性能效果。在进行新老虚拟DOM 的时候,我们这里只比较 children 一个属性,如果新老 children 不一样就把新的虚拟DOM 上的 children 的值更新到对应的真实DOM 上。

我们在上面的 App 函数中的 props 中的点击事件函数中有一个 setCount 的方法还没实现,它的实现可以抽象成如下:

function setCount(val) {
    count = val
    render(App(), document.getElementById('app'))
}

从上述代码可以看到 setCount 的实现很简单,这个方法就是在点击之后进行更新数据 count 的,并且在更新数据 count 的同时重新渲染视图,把新的 count 值显示到页面上。

以下是测试效果:

00.gif

我们在上述例子中通过极简的代码,从宏观层面阐明了 React 的响应式原理,通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上。

其实上述这段 React 的响应式原理放在 Vue 的响应式原理也是成立的,最大的不同则是 React 更新数据需要通过 setCount 函数,也就是所谓手动触发,而 Vue 则是自动触发的。那么下面我们来看看 Vue 的响应式是怎么实现的。

基于依赖追踪的响应式

我们知道 Vue1 的响应式数据是通过 Object.defineProperty 来实现的。基于 Object.defineProperty 来实现需要初始化对对象的每一个熟悉进行劫持监听。

例如我们有这么这个对象 const data = { count:0 },那么我们需要进行以下操作:

const data = { count: 0 }
Object.keys(data).forEach(key => {
   Object.defineProperty(data, key, {
    get() {
        return data[key]
    },
    set(val) {
        data[key] = val
    }
   }) 
})

上述这个写法会造成内存栈溢出,主要是因为在 Object.defineProperty 的 getter 中读取 data[key] 会触发 getter 循环读取,从而造成死循环。

00.png

我们可以把 getter 中 data[key] 的取值放在进行 Object.defineProperty 监听之前。

const data = { count: 0 }
Object.keys(data).forEach(key => {
+   const val = data[key]
   Object.defineProperty(data, key, {
    get() {
+        return val
    },
    set(val) {
        data[key] = val
    }
   }) 
})

但这样 val 会被循环取值进行了覆盖,没办法正确读取每个 key 的值,为了可以读取每个 key 的值,我们可以通过闭包的形式把每个 key 的值缓存下来。

const data = { count: 0 }
Object.keys(data).forEach(key => {
   defineReactive(data, key, data[key]) 
})

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            return val
        },
        set(newVal) {
            val = newVal
        }
    })
}

接下来我们要做的就是在 getter 中进行依赖收集,然后在 setter 中进行依赖触发,这本质上就是一个订阅发布模式。

const data = { count: 0 }
+ // 声明一个依赖存储中心
+ const subscribers = new Set()
+ // 需要收集的依赖,在 Vue1 叫 wachter,Vue3 中叫 effect,本质上就是一个订阅者,关于发布订阅模式,我们后续再详细介绍
+ let activeEffect
Object.keys(data).forEach(key => {
   defineReactive(data, key, data[key]) 
})

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
+            // 存在依赖就把依赖收集到依赖存储中心
+            if(activeEffect) subscribers.add(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
+            // 值更新了,就需要去把依赖存储中心中的订阅者全部重新执行一遍
+            subscribers.forEach(sub => sub())
        }
    })
}

我们在上述代码中通过一个全局变量 subscribers 在响应式数据的 getter 把依赖收集到 subscribers 中, 在 setter 中则把 subscribers 中收集到的依赖进行循环遍历重新执行一遍,从而实现了依赖追踪和触发。

那么现在我们有了响应式数据 data 之后,我们就可以对我们前面的例子中的 App 函数中的 count 数据进行更改了,我们之前实现的是 React 的方式,我们现在要把它改成 Vue 的方式。

- count = 0
function App (){
    return createElement('button', {
        onClick: () => {
-            count ++
-            setCount(count)
+            data.count ++
        },
-        children: count
+        children: data.count
    })
}

此外渲染函数的执行方式也需要改成一个副作用函数,通过副作用函数进行调用执行。

    activeEffect = () => {
        render(App(), document.getElementById('app'))
    }

    activeEffect()

    activeEffect = null

我们把一个副作用函数赋值给了变量 activeEffect,然后再执行 activeEffect,那么在执行 activeEffect 函数的时候就会去执行 rander 函数,并通过 App 函数生成虚拟DOM,在 App 函数中对虚拟DOM 的 children 属性赋值的时候是通过读取响应式数据 data 中的 count 值,那么这时就会触发 count 属性的 getter,然后就会在 getter 中进行依赖收集,在 getter 中很明显这个时候 activeEffect 是有值的,所以会进行依赖收集。当点击的时候,就会触发 data.count ++ 的执行,这时就会触发 count 属性 setter,然后就会在 setter 中进行依赖触发。

以下是测试效果:

00.gif

至此我们把 Vue 的数据响应式也通过最少的代码量阐明了,以上的 Vue 的响应式原理估计很多同学都非常清楚,因为这是面试被问几率非常高的题目。我在这里重复讲解,是为了对比 React 和 Vue 响应式原理的差别,总的来说,React 和 Vue 的响应式原理在宏观层面是有非常大的相同之处的,都是通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上。在宏观层面 React 和 Vue 响应式原理最大的不同则是数据触发方式的不同,React 是数据变更后需要开发者通过手动调用 React 提供的 API 进行触发视图的更新,而 Vue 则是自动触发的,因为 Vue 的状态数据是响应式的,而 React 的状态数据不是响应式的。

我们一般在很多的文章中都只讲了 Vue1 的数据响应式原理是通过 Object.defineProperty 来实现的,那么是否只能通过 Object.defineProperty 来实现呢?很明显不是,我们上述例子中的 data,我们现在如果想给它新增属性 data.name,那么 Object.defineProperty 是无法进行监听追踪的,所以我们通过一个工具来对 data 也进行监听。

function observe (data) {
    // 给对象 data 添加一个属于 data 对象的依赖存储中心
    data.__ob__ = new Set()
    Object.keys(data).forEach(key => {
        const value = data[key]
        defineReactive(data, key, value) 
    })
}

那么在 getter 中要对对象的依赖也进行收集

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            // 存在依赖就把依赖收集到依赖存储中心
            if(activeEffect) {
                subscribers.add(activeEffect)
                // 如果读取的值是对象,那么还要给这个对象进行依赖收集,并且新的对象也要通过 observe 进行监听
                if(Object.prototype.toString.call(val) === '[object Object]') {
                    observe(val)
                    val.__ob__.add(activeEffect)
                }
            } 
            return val
        },
        set(newVal) {
            val = newVal
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
        }
    })
}

然后我们要专门通过一个单独的 API 来给响应式对象添加属性,并且在添加属性之后进行依赖触发。

function set(target, key, val) {
    target[key] = val
    // 新添加的属性也需要通过 Object.defineProperty 进行监听
    defineReactive(target, key, val)
    const ob = target.__ob__
    // 进行对象的依赖触发
    ob.forEach(sub => sub())
}

接着我们把 App 函数进行以下修改

// 修改数据结构
const data = { data: { count: 0 } }
// 通过 observe 处理
observe(data)
function App (){
    return createElement('button', {
        onClick: () => {
            set(data.data, 'count1', 2)
        },
        children: JSON.stringify(data.data)
    })
}

然后我们重新运行代码结果如下:

01.gif

我们看到可以正常运行,但对象中的私有属性 __ob__ 也显示出来了,我们希望它不要被枚举出来,我们可以通过 Object.defineProperty 对它进行以下设置。

function observe (data) {
    // 给对象 data 添加一个属于 data 对象的订阅者中心
-    data.__ob__ = new Set()
+    Object.defineProperty(data, '__ob__', {
+        value: new Set(), // 属性的值,默认为 undefined
+        enumerable: false, // 属性是否可枚举,默认为 false
+        writable: true, // 值是否可写,默认为 false
+        configurable: true // 属性是否可配置,默认为 false
+    })
    Object.keys(data).forEach(key => {
        const value = data[key]
        defineReactive(data, key, value) 
    })
}

修改后再进行测试,我们可以看到 __ob__ 属性不再出现了。

02.gif

可以通过 Object.defineProperty 对数组进行监听,但监听不了 push、pop、shift 等对数组进行操作的方法,所以我们需要对数组的操作方法进行重写,重写的方法就是覆盖数组数据上的原型对象 __proto__

function observe (data) {
// 省略 ...
+   if (Array.isArray(value)) {
+      // 如果是数组则重新数组上的原型
+      value.__proto__ = {
+          join(val) {
+             // 通过原生数组上方法进行调用
+             return Array.prototype.join.call(value, val)
+          },
+          push(val) {
+             // 通过原生数组上的方法进行调用
+             Array.prototype.push.call(value, val)
+             subscribers.forEach(sub => sub())
+          }
+      }
+   } else {
        Object.keys(data).forEach(key => {
            const value = data[key]
            defineReactive(data, key, value) 
        })
+    }
}

我们这里只测试 join 和 push 方法,而 join 方法没有更改到数据,所以是不用进行依赖触发的。

然后我们对 App 应用也进行修改一下,以便测试数组响应式数据

// 数据
const data = ['cobyte']
// 通过 bserver 处理
observe(data)
function App (){
    return createElement('button', {
        onclick: () => {
            data.push('=')
        },
        children: data.join('-')
    })
}

测试结果如下:

03.gif

小结

至此,我们通过手写已经基本实现了 Vue1 的数据响应式原理,我们可以通过对 Vue2 数据响应式原理的分析进行一个宏观总结。我们需要在实践中总结规律,然后又通过规律更好地指导实践

首先我们都知道 Vue1 的数据响应式原理是通过 Object.defineProperty 实现的,通过 Object.defineProperty 可以监听一个对象的属性的读取(getter)和修改(setter),这样就可以在 getter 的时候进行依赖收集,在 setter 的时候进行依赖触发。但 Vue2 不单单只是通过 Object.defineProperty 实现数据响应式的,因为只有被 Object.defineProperty 初始化了的属性才可以进行监听,而当一个对象新增一个属性时,则监听不了。这时我们需要通过额外的手段来实现对象新增属性时的监听,具体方案就是通过给对象新增一个私有的属性 __ob__,去记录属于该对象的依赖,当该对象新增属性时则触发该对象的依赖重新执行。同时 Object.defineProperty 也监听不了数组的原生方法,例如:push、pop、shift、unshift、splice、sort、reverse,我们观察一下这些数组方法发现都有一个共同特点,就是他们都会修改数组,使数组数据发生变化,那么根据数据响应式的原理,数据发生了改变就需要进行依赖触发,那么我们需要对响应式数据类型为数组的数据进行重写它们的原型,这样我们就可以在响应式数组通过 push、pop、shift、unshift、splice、sort、reverse 方法修改数组的时候进行依赖触发了。

我们可以总结出,不管是通过 Object.defineProperty 进行监听对象属性还是通过给对象添加私有属性 __ob__,去记录该对象的依赖,还是重写数组的原型方法,目的都只有一个:进行数据的依赖追踪和触发。

我们还可以进一步进行总结规律:这种基于依赖追踪的响应式系统,并不是某一种技术,而是一种模式。核心只有一个,就是在数据读取的时候进行依赖收集,在数据更改的时候进行依赖触发。

基于这种指导思想,我们就可以很好去实践 Vue2 的数据响应式原理了。

Vue3 只是通过 Proxy 实现数据响应式吗

Vue3 是通过 Proxy 对数据实现 getter/setter 代理,从而实现响应式数据,然后在副作用函数中读取响应式数据的时候,就会触发 Proxy 的 getter,在 getter 里面把对当前的副作用函数保存起来,将来对应响应式数据发生更改的话,则把之前保存起来的副作用函数取出来执行。这个过程跟 Vue2 是一样的,只是实现细节不一样。

实现起来也非常简单:

function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && subscribers.add(activeEffect)
            return Reflect.get(target, key) 
        },
        set(target, key, val) {
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
            return result
        }
    })
}

我们再把 App 函数进行修改:

const data = reactive({count: 0})
function App (){
    return createElement('button', {
        onClick: () => {
            data.count ++
        },
        children: data.count
    })
}

测试结果如下:

00.gif

我们这里不是为了深入探讨 Vue2 的数据响应式原理的,而是为了验证上面实现 Vue2 的数据响应式原理总结的规律。也就是:这种基于依赖追踪的响应式系统,并不是某一种技术,而是一种模式。核心只有一个,就是在数据读取的时候进行依赖收集,在数据更改的时候进行依赖触发。后续我们基于数据响应式原理的规律便可以很好去理解其他数据响应式系统了,例如 React 的状态管理库——Mobx、SolidJS,我们在后续也将探讨这些库的数据响应式原理的实现。

Vue1 是通过 Object.defineProperty 实现对数据的读写监听,但由于 Object.defineProperty 的局限性,Vue2 并不只是通过 Object.defineProperty 实现数据响应式的,但都为了实现在数据读取时进行依赖收集,在数据更改时进行依赖触发。Vue3 则通过新的 API:Proxy 可以实现对数据的读写监听,但核心也是为了实现在数据读取时进行依赖收集,在数据更改时进行依赖触发。

那么问题来了,Vue1 并不只是通过 Object.defineProperty 实现数据响应式的,那么 Vue3 只是通过 Proxy 实现了数据响应式吗?

其实这个问题可以转化得更具体一些,Vue2 的 reactive 和 ref 的底层实现原理是一样的吗?有人认为 ref 和 reactive 的底层实现原理都是一样的,也就是 ref 也是通过 reactive 实现的,也就是 ref 也是通过 Proxy 实现的。如果说 ref 和 reactive 的底层实现原理不一样的话,也就是说 Vue3 可以不通过 Proxy 实现数据的响应式。

很明显 Vue3 可以不通过 Proxy 实现数据的响应式的,也就是 ref 和 reactive 的底层实现原理是不一样的。那么根据我们上面总结的实践规律,我们只需要可以实现在数据读取的时候进行依赖收集,然后在数据更改的时候进行依赖触发就可以了。那么明显可以使用 Vue2 中的 Object.defineProperty 中的 getter/setter,这种方式也叫属性访问器。根据上面 Vue2 的数据响应式原理我们可以知道如果通过 Object.defineProperty 实现对数据的监听,还要通过闭包的方式,就显得不够简洁。那么属性访问器除了使用 Object.defineProperty 进行显式声明之外,还可以通过字面量的方式,本质还是属性访问器

例如:

function ref(value) {
    return {
        _value: value,
        get value() {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && subscribers.add(activeEffect)
            return this._value
        },
        set value(val) {
            this._value = val
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
        }
    }
}

然后我们通过 ref 函数来创建一个响应式数据,再修改 App 函数。

const count = ref(0)

function App (){
    return createElement('button', {
        onClick: () => {
            count.value ++
        },
        children: count.value
    })
}

测试运行结果:

00.gif

这也就是 Vue2 的 ref API 的实现原理,当然在 Vue3 源码中如果 ref 传进来的值是一个引用对象的话,还是通过 reactive 进行实现。此外在 Vue3 的源码中 ref API 是通过一个 class 类来实现的,但原理是一样的。

我们下面也可以简单实现一下:

class RefImpl {
    _value
    constructor(value) {
        this._value = value
    }
    get value() {
       // 存在依赖就把依赖收集到依赖存储中心
       activeEffect && subscribers.add(activeEffect)
       return this._value 
    }
    set value(val) {
        this._value = val
        // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
        subscribers.forEach(sub => sub())
    }
}

function ref(value) {
    return new RefImpl(value)
}

修改好的测试结果还是一样的。

00.gif

通过沙箱模式实现依赖追踪的数据响应式

通过上面对 Vue1 和 Vue3 的数据响应式原理的实现与分析,我们知道都借助了 JavaScript 的原生 API(Object.defineProperty 和 Proxy) 来实现依赖追踪的响应式系统,那么不借助 JavaScript 原生 API 还可以实现依赖追踪的响应式系统吗? 我们上面总结出的结论是,基于依赖追踪的响应式系统的本质是在读取数据的时候收集依赖,在更新数据的时候触发依赖。那么基于此原理,我们只需要把读写进行分离那么可以实现了。

我们把上面第一版 ref 的实现通过闭包的形式改造一下:

function ref(value) {
    const s = {
        value
    }

    function getState() {
        return s.value
    }

    function setState(val) {
        s.value = val
    }
    return [getState, setState]
}

const [getState, setState] = ref(0)

console.log('初始值:', getState())
// 修改
setState(1)
console.log('修改后:', getState())

我们可以看到通过闭包的我们实现了读写分离,这种模式有一个专业的术语叫:沙箱模式,这样我们就可以在读取数据的时候收集依赖,在修改数据的时候触发依赖了。

function ref(value) {
    const s = {
        value
    }

    function getState() {
        // 存在依赖就把依赖收集到依赖存储中心
        activeEffect && subscribers.add(activeEffect)
        return s.value
    }

    function setState(val) {
        s.value = val
        // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
        subscribers.forEach(sub => sub())
    }
    return [getState, setState]
}

接着我们把 App 函数也进行修改一下:

const [count, setCount] = ref(0)

function App (){
    return createElement('button', {
        onClick: () => {
            setCount(count() + 0)
        },
        children: count()
    })
}

其实上述这种实现依赖追踪的响应式系统的方式就是 SolidJS 的响应式原理,长得像 React,实际上是 Vue。所以我们只要把核心原理搞清楚,就可以举一反三了,像读书时候一样,以后同类型的题目,你都回作答了。当然 SolidJS 的响应式原理远不止这些,我们将在后续章节继续进行深入探讨,搞明白了 SolidJS, Vue Vapor 的原理也非常容易理解了。

总结

上述所有例子中的依赖收集和触发的过程,本质就是一个发布订阅模式,而关于发布订阅模式,我们将在下一篇文章中进行详细介绍。当我们掌握了发布订阅模式后,我们再去理解这些通过依赖收集和触发实现的数据响应式系统,就会如鱼得水。

上述文章写于:2023 年,由于个人原因今年 2026 年发布。

我是程序员Cobyte,现在已转向研究 AI Agent,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

微信 ClawBot 接入本地 AI Agent 的实现原理

作者 Cobyte
2026年3月31日 09:09

1. 前言

我们知道微信最近推出了微信 ClawBot,用于在微信中与 OpenClaw 收发消息。掘金签约作者群里的一位大佬说:

01.jpg

受到启发,我就去研究了一下微信半公开的官方文档后发现,我们确实也可以通过用微信 ClawBot 接入任何 AI Agent

至于为什么说官方文档是半公开呢,因为官方暂时还没有公开的文档地址,但又可以通过某些渠道看到(怎么可以看到,本文最后揭晓)。

本文就将带你一步步实现如何通过微信 ClawBot 接入自己开发的 AI Agent。其实我们只需要做三件事:

  1. 扫码登录,拿到微信 ClawBot 的身份凭证;
  2. 长轮询等待消息,一有消息立刻获取;
  3. 把消息交给本地 Agent 处理,再把回复发回微信。

第一步,扫码登录。

2. 扫码登录

根据微信 ClawBot 文档的要求,我们先要获取一个二维码,等用户用微信扫描并确认后,服务端就会返回一个 bot_token 的通信凭证,后续所有请求都必须带着这个 token。这个跟我们平时的开发是一样,我们登录之后才能进行操作。

2.1 拉取二维码

首先微信 ClawBot 的接口地址是:

BASE_URL = "https://ilinkai.weixin.qq.com"

其次,登录的第一步是向服务端请求二维码。我们根据微信 ClawBot 的文档可以知道请求的接口是:

ilink/bot/get_bot_qrcode?bot_type=3

请求的 HTTP 方式是 GET。值得注意的是参数 bot_type 在微信 ClawBot 的文档中只出现了在获取二维码的时候,值是 3,而其他枚举值的情况,文档中并没有说明。

因为要用到 HTTP 的 GET 请求,所以我们需要封装一个 GET 请求的方法:

import json
import urllib.request
import urllib.error

# 省略...

def _get(url: str, headers: dict = {}, timeout: int = 35) -> dict:
    """发送 GET 请求"""
    req = urllib.request.Request(url, headers=headers, method="GET")
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} GET {url}: {e.read().decode(errors='replace')}") from e

接着我们封装拉取二维码的请求函数:

def fetchQRCode():
    base = BASE_URL.rstrip("/") + "/"
    url = base + "ilink/bot/get_bot_qrcode?bot_type=3"
    resp = _get(url)                      # GET 请求
    qrcode_raw = resp.get("qrcode")       # 服务端用于轮询的标识
    qrcode_url = resp.get("qrcode_img_content")   # 可扫描的二维码链接
    return qrcode_raw, qrcode_url

微信 ClawBot 服务端返回的 qrcode_img_content 是一个可以直接扫码的链接。我们在终端里可以通过安装 qrcode 库把它打印成 ASCII 二维码或者直接打印链接让用户打开链接,通过手机微信进行扫码。字段 qrcode 则是服务端的二维码标识,用于后续轮询二维码的状态,是否已经被扫码等。

我们测试一下上述代码:

print(fetchQRCode())

打印结果如下:

('50189c0db1817eb74a2bfc11e4ccdb35', 'https://liteapp.weixin.qq.com/q/7GiQu1?qrcode=50189c0db1817eb74a2bfc11e4ccdb35&bot_type=3')

我们打开上述链接在浏览器打开是一个微信二维码。

image.png

2.2 轮询扫码状态

二维码生成后,我们每隔一秒向微信 ClawBot 提供的二维码状态查询接口进行请求,直到用户完成确认。接着我们封装一个轮询扫码状态的请求接口函数:

def pollQRStatus(qrcode_raw):
    base = BASE_URL.rstrip("/") + "/"
    poll_url = base + f"ilink/bot/get_qrcode_status?qrcode={urllib.parse.quote(qrcode_raw)}"
    deadline = time.time() + 480    # 最多等 8 分钟
    # 这个是微信 ClawBot 规定的,没得解析
    headers = { "iLink-App-ClientVersion": "1" }

    while time.time() < deadline:
        try:
            s = _get(poll_url, headers)
        except Exception as e:
            print(f"  [轮询错误] {e}", flush=True)
            time.sleep(2)
            continue

        status = s.get("status", "wait")

        if status == "wait":
            # 还没扫,继续等,打一个点表示进度
            sys.stdout.write(".")
            sys.stdout.flush()

        elif status == "scaned":
            # 已经扫了,等用户在微信里点确认
            print("\n👀 已扫码,请在微信中点击确认...", flush=True)

        elif status == "confirmed":
            # ✅ 用户点了确认,登录成功!
            token      = s.get("bot_token", "")
            account_id = s.get("ilink_bot_id", "")
            # 账号 ID 规范化:把 @ 和 . 换成 -,例如 abc@im.wechat → abc-im-wechat
            account_id = account_id.replace("@", "-").replace(".", "-")
            real_base  = s.get("baseurl") or BASE_URL
            print(f"\n✅ 登录成功!account_id={account_id}", flush=True)
            return {"token": token, "account_id": account_id, "base_url": real_base}

        elif status == "expired":
            raise RuntimeError("二维码已过期,请重新运行程序。")
        # 每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力
        time.sleep(1)
    raise RuntimeError("登录超时(8分钟),请重试。")

上述函数主要是实现了根据状态码处理不同情况。服务端返回的 JSON 中包含 status 字段,表示当前二维码的状态。我们根据其值进行分支处理:

status == "wait"

  • 表示二维码尚未被扫描。
  • 在终端打印一个点 .(不换行),表示程序仍在等待,给用户视觉反馈。

status == "scaned"

  • 表示用户已经扫描了二维码,但尚未在微信中点击“确认”。
  • 打印提示信息 👀 已扫码,请在微信中点击确认...,告知用户当前进度。

status == "confirmed"

  • 成功状态:用户已确认,登录成功。
  • 从响应中提取 bot_tokenilink_bot_id(机器人唯一标识)、baseurl(可选的后端地址)。
  • 并且对 account_id 进行规范化处理,将 @ 和 . 替换为 -,例如 abc@im.wechat 变为 abc-im-wechat
  • 打印成功信息,并返回一个包含 tokenaccount_idbase_url 的字典,供上层保存和后续请求使用。

status == "expired"

  • 二维码已过期(我们这里设置 8 分钟未扫描或确认即为过期)。
  • 抛出 RuntimeError,提示用户重新运行程序获取新二维码。

最后,每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力。

2.3 实现登录并保存 token 到本地

我们在上面实现了拉取二维码的函数 fetchQRCode 和轮询等待扫码确认的函数 pollQRStatus,我们就可以实现一个登录函数 login 将整个流程串联起来了。实现如下:

def login() -> dict:
    """
    扫码登录,返回 {"token": "...", "account_id": "...", "base_url": "..."}
    """
    # ── 第 1 步:拉取二维码 ──
    [qrcode_raw, qrcode_url] = fetchQRCode() 

    if not qrcode_raw:
        raise RuntimeError(f"获取二维码失败")

    # ── 第 2 步:在终端打印二维码 ──
    print("\n请用微信扫描下方二维码:\n", flush=True)
    try:
        import qrcode                          # pip install qrcode[pil]
        qr = qrcode.QRCode(version=1, border=1)
        qr.add_data(qrcode_url)
        qr.make(fit=True)
        qr.print_ascii(invert=True)            # 用 ASCII 字符在终端渲染,尺寸最小
    except ImportError:
        # 没有安装 qrcode 库时,直接打印链接,用浏览器打开也能扫
        print(f"  {qrcode_url}\n", flush=True)

    # ── 第 3 步:轮询等待扫码 ──
    print("等待扫码...", flush=True)
    return pollQRStatus(qrcode_raw) 

上述登录函数最后返回的数据结构如下:

{
'token': 'd5b3973bb743@im.bot:060000215ac9e1ce7116aeb48b3d998c5b2e4e', 
'account_id': 'd5b3973bb743-im-bot',
'base_url': 'https://ilinkai.weixin.qq.com
'}

为了下次启动不用重新扫码,我们把 token 和 account_id 保存到文件 .weixin_token.json 中:

# token 的本地文件路径
TOKEN_FILE  = Path(__file__).parent / ".weixin_token.json"   # 保存登录后的 token

# 保存 token 和账号
def save_token(data: dict) -> None:
    """把 token 信息保存到本地文件,下次启动不用重新扫码。"""
    TOKEN_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), "utf-8")
    TOKEN_FILE.chmod(0o600)    # 仅当前用户可读,保护 token 安全

这样,下次运行程序时,如果文件存在就直接加载,跳过登录流程。所以我们还需要有一个从本地文件读取上次保存的 token 和账号的函数。实现如下:

def load_token() -> Optional[dict]:
    """从本地文件读取上次保存的 token(如果有的话)。"""
    if TOKEN_FILE.exists():
        try:
            return json.loads(TOKEN_FILE.read_text("utf-8"))
        except Exception:
            pass
    return None

所以整个主流程就是:

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()

    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    # todo run_monitor()

登录完成之后,接下来就是进入消息监听循环了。

3. 长轮询监听循环

拿到 token 后我们就需要发起一个长轮询的 HTTP 接口请求,微信 ClawBot 服务端会“憋着”不返回,直到有新消息或超时(约 35 秒)才返回。这个接口就是:

ilink/bot/getupdates

它采用 POST 方式请求,所以我们需要封装一个 POST 请求的方法,并且微信 ClawBot 的所有 POST 的请求都需要一个通用请求头,通用请求头的要求如下:

Header 说明
Content-Type application/json
AuthorizationType 固定值 ilink_bot_token
Authorization Bearer <token>(登录后获取)
X-WECHAT-UIN 随机 uint32 的 base64 编码

所以我们先封装一个每次请求都需要带的 HTTP 请求头的函数:

def _headers(token: Optional[str] = None) -> dict:
    """
    构造每次请求都需要带的 HTTP 请求头。
    - AuthorizationType: 固定值,告诉服务端这是 bot token 认证
    - Authorization: 登录拿到的 token,未登录时不带
    - X-WECHAT-UIN: 随机数的 base64,模拟微信客户端标识
    """
    h = {
        "Content-Type": "application/json",
        "AuthorizationType": "ilink_bot_token",
        # 随机 uint32 → 十进制字符串 → base64,与原版协议一致
        "X-WECHAT-UIN": base64.b64encode(
            str(struct.unpack(">I", os.urandom(4))[0]).encode()
        ).decode(),
    }
    if token:
        h["Authorization"] = f"Bearer {token}"
    return h

接着我们封装一个通用 POST 请求函数:

def _url(path: str) -> str:
    """拼接完整 URL,确保 BASE_URL 末尾有斜杠。"""
    base = BASE_URL.rstrip("/") + "/"
    return base + path
    
def _post(path: str, body: dict, token: Optional[str] = None, timeout: int = 15) -> dict:
    """
    发送 POST JSON 请求,返回解析后的响应字典。
    所有与微信后端的通信都走这个函数。
    """
    data = json.dumps(body).encode("utf-8")
    req  = urllib.request.Request(
        _url(path),
        data    = data,
        headers = {**_headers(token), "Content-Length": str(len(data))},
        method  = "POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} /{path}: {e.read().decode(errors='replace')}") from e

所有与微信 ClawBot 后端的通信都走这个 _POST 函数。

接着我们封装长轮询接收消息函数:

def getUpdates(token: str, buf: str = "", timeout: int = 35) -> dict:
    """
    长轮询接口:向服务端发请求,服务端"憋着"不回,直到有新消息或超时才返回。

    参数:
      buf     - 上次返回的游标,传给服务端表示"从这里继续",首次传空字符串
      timeout - 等待秒数,服务端通常在 35 秒内有消息就返回,无消息就返回空

    返回值里的重要字段:
      msgs            - 新消息列表(可能为空)
      get_updates_buf - 新游标,下次请求要带上它
    """
    try:
        return _post(
            "ilink/bot/getupdates",
            body    = {"get_updates_buf": buf, "base_info": {"channel_version": "mini-bridge-1.0"}},
            token   = token,
            timeout = timeout + 5,    # 客户端超时比服务端多 5 秒,避免误判
        )
    except (TimeoutError, OSError) as e:
        if "timed out" in str(e).lower():
            # 超时是正常现象,不是错误,直接返回空结果,继续下一轮
            return {"ret": 0, "msgs": [], "get_updates_buf": buf}
        raise

上述 getUpdates 函数返回两个重要的字段 msgs(消息列表)和 get_updates_buf(游标),消息列表我们很好理解,但游标我们需要了解一下。

什么是游标?

游标就是一个字符串,每次 getUpdates 返回时都会同时返回一个新的游标,表示“下一次请求从这里开始”。我们可以把游标持久化到文件 .weixin_buf.txt 中,这样即使程序重启,也能接着之前的位置继续收消息,不会漏掉中间的消息。

游标存储的本地文件路径设置如下:

# 游标存储的本地文件路径
BUF_FILE    = Path(__file__).parent / ".weixin_buf.txt"      # 保存消息游标(断点续传)

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的字段如下:

字段 类型 说明
ret number 返回码,0 = 成功
errcode number? 错误码(如 -14 = 会话超时)
errmsg string? 错误描述
msgs WeixinMessage[] 消息列表(结构见下方)
get_updates_buf string 新的同步游标,下次请求时回传
longpolling_timeout_ms number? 服务端建议的下次长轮询超时(ms)

根据上述的资料我们就可以实现对 getUpdates 接口的长轮询监听循环了,实现如下:

def run_monitor(token: str) -> None:
    """
    长轮询监听循环
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

上述实现 run_monitor 函数目前能长轮询监听微信 ClawBot 服务器消息的接收。主要功能如下:

  • 首先加载之前保存的消息游标(buf)实现断点续传。

  • 进入监听循环:

    1. 调用 getUpdates(token, buf) 获取消息(可能阻塞直到有消息或超时)。
    2. 如果调用失败(异常),则增加失败计数,连续失败 3 次后休眠 30 秒再重试;否则休眠 2 秒后继续。
    3. 如果返回的业务错误码非 0,则打印错误并休眠 2 秒后继续。
    4. 成功获取响应后,提取 get_updates_buf 新游标,更新 buf 并持久化到文件,以便下次重启恢复

我们知道上述 getUpdates 接口还返回了 msgs 消息列表,我们需要遍历返回的消息列表,提取文本,交给本地 AI Agent 进行处理。

4. 处理返回的消息

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的 msgs 消息列表字段结构如下:

字段 类型 说明
seq number? 消息序列号
message_id number? 消息唯一 ID
from_user_id string? 发送者 ID
to_user_id string? 接收者 ID
create_time_ms number? 创建时间戳(ms)
session_id string? 会话 ID
message_type number? 1 = USER, 2 = BOT
message_state number? 0 = NEW, 1 = GENERATING, 2 = FINISH
item_list MessageItem[]? 消息内容列表
context_token string? 会话上下文令牌,回复时需回传

然后字段 item_list(消息内容列表)的字段结构又如下:

字段 类型 说明
type number 1 TEXT, 2 IMAGE, 3 VOICE, 4 FILE, 5 VIDEO
text_item { text: string }? 文本内容
image_item ImageItem? 图片(含 CDN 引用和 AES 密钥)
voice_item VoiceItem? 语音(SILK 编码)
file_item FileItem? 文件附件
video_item VideoItem? 视频
ref_msg RefMessage? 引用消息

根据上述资料我们就可以处理微信 ClawBot 服务器返回的消息了。处理如下:

def run_monitor(token: str) -> None:

    # 省略...

    while True:

        # 省略...

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

实现也很简单,遍历 msgs 消息列表,然后再从消息的 item_list 里找 type=1(文本)的那一项。而非文本消息(图片、语音等)我们暂不处理,先跑通主流程再说。

经过上述处理后我们就拿到了微信 ClawBot 服务器返回的文本消息了,我们接着就把它交给本地 Agent 进行处理。

5. 接入本地 AI Agent

前面的步骤已经实现了本地接收到微信 ClawBot 发来的信息了,现在就需要接入一个本地 AI Agent 来处理微信用户发来的信息了。接入本地 AI Agent 也很简单,我们前面的文章已经实现了一个 Agent Loop,我们直接使用就可以了。

我们定义一个函数 askAgent,用它来管理每个用户的对话历史,并将用户的新消息交给 Agent 处理:

# ── 导入本地 Agent ──
from agent import agent_loop, SYSTEM as AGENT_SYSTEM

# 每个微信用户维护一份独立的对话历史,key 是用户 ID
_sessions: dict[str, list] = {}

def askAgent(user_id: str, user_text: str) -> str:
    """
    把用户的消息交给 Agent 处理,返回 Agent 的回复文本。

    - 每个用户有自己独立的对话历史(_sessions),实现多用户隔离
    - agent_loop 会循环调用大模型直到得到最终回复
    """
    # 第一次对话时,初始化这个用户的历史,带上系统提示词
    if user_id not in _sessions:
        _sessions[user_id] = [{"role": "system", "content": AGENT_SYSTEM}]

    # 把用户这条消息追加到历史
    _sessions[user_id].append({"role": "user", "content": user_text})

    # 交给 Agent 处理,agent_loop 会直接修改传入的列表(追加 assistant 回复)
    try:
        reply = agent_loop(_sessions[user_id])
        return reply or "(无回复)"
    except Exception as e:
        return f"[Agent 出错] {e}"

我们上述函数 ask_agent 实现了把用户的消息交给 Agent 处理,然后返回 Agent 的回复文本,并且还实现每个用户有自己独立的对话历史,实现了多用户隔离。

接下来就是把 Agent 的回复发回微信。

6. 把 Agent 的回复发回微信

回复消息的接口是 ilink/bot/sendmessage,它最重要的参数是 context_token,这个 token 是从收到的消息里原样取出的,服务端依靠它来将回复与对话关联起来(类似于会话 ID)。我们来实现一个 sendMessage 函数进行发送信息:

def sendMessage(token: str, to_user_id: str, text: str, context_token: str) -> None:
    """
    向微信用户发送一条文本消息。

    重要:context_token 必须原样从收到的消息里取出并回传,
    服务端靠它把回复和对话关联起来。没有它,消息发不出去。
    """
    _post(
        "ilink/bot/sendmessage",
        token = token,
        body  = {
            "msg": {
                "from_user_id" : "",                            # bot 发送,留空
                "to_user_id"   : to_user_id,                   # 发给谁
                "client_id"    : f"mini-{secrets.token_hex(8)}",  # 本次消息的唯一ID,防重复
                "message_type" : 2,                            # 2 = BOT 消息
                "message_state": 2,                            # 2 = 消息已完成(非流式)
                "item_list"    : [{"type": 1, "text_item": {"text": text}}],  # type=1 是文本
                "context_token": context_token,                # ← 关键!必须带上
            },
            "base_info": {"channel_version": "mini-bridge-1.0"},
        },
    )

这里我们固定使用 message_type=2(机器人消息)、message_state=2(已完成,非流式)。client_id 是消息的唯一标识,用于去重,这里随机生成即可。

7. 整合运行

现在我们就可以把登录、收消息、处理消息、发消息串起来了,整合运行。

def run_monitor(token: str) -> None:
    """
    长轮询监听循环:持续等待微信消息,收到后交给 Agent 处理并回复。

    整个循环做三件事:
      1. 调 getUpdates() 等消息(服务端"憋着",有消息才返回)
      2. 遍历返回的消息列表,提取文本,交给 ask_agent() 得到回复
      3. 调 sendMessage() 把回复发回给用户

    参数:
      token - 登录后拿到的 bot_token,每次请求都要带上
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

            # 第二件事:把文本交给 Agent,得到回复
            reply = askAgent(from_user, text)
            print(f"[回复] {reply[:60]}", flush=True)

            # 第三件事:把 Agent 的回复发回微信
            try:
                send_message(token, from_user, reply, ctx_token)
                print("[✓] 已发送", flush=True)
            except Exception as e:
                print(f"[✗] 发送失败: {e}", flush=True)

最后,在主函数中,我们读取或登录获取 token,然后启动 run_monitor

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()
    # 不存在 token 就扫码登录
    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    run_monitor(token)

在运行前我们需要安装一下相关依赖。

requirements.txt 内容如下:

openai==2.24.0
itchat-uos>=1.3.10
qrcode_terminal == 0.8.0

然后执行:

pip install -r requirements.txt

接着我们运行上述代码结果显示如下:

image.png

接着我们使用微信扫码结果显示如下:

image.png

我们点击按钮继续,这时可以看到终端显示如下:

image.png

微信端显示如下:

image.png

聊天栏显示:

c4a8f799d97ee7c9a29b75fa359f5263.jpg

这时我们就可以通过微信 ClawBot 和我们本地自己写的 Agent 进行通讯了。比如我们之前实现的一个可以读取本地文件的 AI Agent,我们创建一个测试文件 test.txt,写上以下内容:

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人

然后在微信 ClawBot 中输入:帮我读取 test.txt 的文件内容,显示如下

image.png

终端内容显示如下:

image.png

8. 总结与扩展

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人,它能够:

  • 通过扫码登录
  • 长轮询接收消息
  • 调用任意本地 AI Agent 处理消息
  • 将回复发回给微信用户

整个程序的核心代码不到 200 行,却涵盖了微信 ClawBot 协议的关键点。你可以在此基础上轻松扩展:

  • 支持多轮对话:通过会话历史管理,我们已经实现了多轮对话的基础。
  • 支持图片、语音:解析消息中的 item_list,识别图片或语音,调用相应的 AI 模型(如图像识别、语音转文字)。
  • 支持命令识别:在文本中检测特定前缀(如 /help),触发不同功能。
  • 接入更强大的 Agent:例如集成 LangChain 实现复杂工作流、接入 Ollama 或 vLLM 等本地推理框架运行开源大模型、或增加联网搜索、RAG(检索增强生成)等能力。

最重要的是,这套方法不依赖任何第三方中间件,完全基于微信官方 ClawBot 协议,相对稳定可靠。你只需要一个微信账号,就能让你的 AI 助手 7×24 小时在线。

希望本文能帮你打开一扇窗,让你在微信这个庞大的社交平台上,用自己的 AI 能力创造更多有趣的应用。动手试一试吧,你会发现过程比想象中简单许多!

我是 程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

最后怎么查看微信 ClawBot 的官方文档,可以通过 npm 安装 @tencent-weixin/openclaw-weixin-cli@tencent-weixin/openclaw-weixin 包,然后在 node_modules 目录中找对应的包里面有源码和文档。当然微信团队不公开可能后续会随时改变策略,所以须谨慎评估风险。

❌
❌