普通视图

发现新文章,点击刷新页面。
昨天 — 2026年4月1日首页

1.基于依赖追踪和触发的响应式系统的本质

作者 Cobyte
2026年4月1日 19:57

前言

Vue1、Vue3、SolidJS、Mobx 它们的数据响应式基本原理都是一样的,具体区别只是设计理念和实现方式不一样。按以前读书时代考试做题一样,它们是同一类型的题目,都是基于依赖收集和触发的运行时的数据响应式,如果说你只会解答其中一道题,其他的题却不会解答,则说明你并没有真正彻底掌握这一类题。

为什么标题说“基于依赖追踪的响应式系统”,因为在前端响应式的框架有很多,但他们的实现原理却各有不同。 在前端一般谈到响应式框架,可能大家都会不约而同地联想到 Vue,除了 Vue 之外,也许还有人会想到 React 以及 Svelte、SolidJS。他们都有一个共同点,都是通过数据驱动视图。他们在实现方式上又互相有一些相似之处,其中 Vue 和 React 都采用了虚拟DOM技术,Vue 和 SolidJS 的数据响应式实现则都是采用了依赖追踪的方式,所以在数据响应式的实现方面 Vue 和 SolidJS 最相似,而 Svelte 的实现方式则跟 Vue 和 React 都不一样,Svelte 是基于编译响应式。当然 Vue 和 React 的具体实现技术也是不一样的,但它们在宏观层面则是一样,都是通过数据驱动视图,当数据发生变化时,视图会重新渲染,这种机制使得开发者只需要关注数据的变化,而不需要手动操作 DOM。Vue 通过数据劫持,使得操作数据需要额外的 API ,系统变能感知数据的变化,而 React 和 SolidJS 则需要手动调用 API 去触发数据变化。

手动操作 DOM 的上古时代

例如我们现在有一个这样的需求,有一个按钮 <button>0</button>,当我们点击按钮的时候,按钮中的文本就进行加 1。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title>手动操作DOM</title>
  </head>
  <body>
    <button id="btn"></button>
    <script>
        let count = 0
        const btnEl = document.getElementById('btn')
        btnEl.textContent = count
        btnEl.addEventListener('click', function () {
            count++
            btnEl.textContent = count
        })
    </script>
  <body>
</html>

上述的 button 按钮是通过 HTML 进行渲染的,我们还可以通过 JavaScript API 进行创建。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title>手动操作DOM</title>
  </head>
  <body>
-    <button id="btn"></button>
    <script>
        let count = 0
-        const btnEl = document.getElementById('button')
+        const btnEl = document.createElement('button')
+        const textNode = document.createTextNode(count)
+        btnEl.appendChild(textNode)
        btnEl.addEventListener('click', function () {
            count++
            btnEl.textContent = count
        })
+        document.body.appendChild(btnEl)
    </script>
  <body>
</html>

上述这种方式,在项目应用非常庞大的时候,开发效率是非常低下的,同时维护成本却又非常高的,所以就出现了像 React、Vue 这种通过数据进行驱动视图的前端框架。

通过数据驱动视图

虽然 Vue 和 React 在具体的实现技术方案上差异是非常大的,但在宏观层面它们则是一样的,都是通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上,从而提高性能。

不管是 Vue 还是 React 的虚拟DOM 本质上都是一个对象,上面记录着一些真实 DOM 的信息,比如 type、props、children,我们这里简单模拟一下,并通过虚拟DOM 和 Diff 算法来改写上面的例子。

首先我们通过一个 createElement 的函数来创建一个节点的虚拟DOM对象,这里跟 React 的对齐,children 节点在 props 中,Vue 的 children 是跟 props 同级的,这些差别对我们进行宏观研究不重要。

function createElement(type, props) {
    return {
        type,
        props
    }
}

接着我们创建一个函数组件 App。

count = 0
function App (){
    return createElement('button', {
        onClick: () => {
            count ++
            setCount(count)
        },
        children: count
    })
}

接着我们通过以下方式把创建的虚拟DOM 挂载到根节点上。

render(App(), document.getElementById('app'))

接着我们实现 render 方法

let oldVnode
function render(vNode, container) {
    if (!oldVnode) {
        oldVnode = vNode
        const el = document.createElement(vNode.type)
        // 保存真实DOM 到虚拟DOM 的 el 属性上,将来更新的时候,不用重新创建,从而达到提高性能效果
        oldVnode.el = el
        const textNode =  document.createTextNode(vNode.props.children)
        el.appendChild(textNode)
        // 绑定虚拟DOM 上的事件
        el.addEventListener('click', vNode.props.onClick)
        container.appendChild(el)
    } else if(oldVnode.props.children !== vNode.props.children) {
        oldVnode.el.textContent = vNode.props.children
    }
}

我们在这里非常简单且宏观的实现了新老虚拟DOM 的对比,当不存在旧虚拟DOM 则是挂载阶段,创建真实DOM,并保存到虚拟DOM 的 el 属性上,将来更新的时候,不用重新创建,从而达到提高性能效果。在进行新老虚拟DOM 的时候,我们这里只比较 children 一个属性,如果新老 children 不一样就把新的虚拟DOM 上的 children 的值更新到对应的真实DOM 上。

我们在上面的 App 函数中的 props 中的点击事件函数中有一个 setCount 的方法还没实现,它的实现可以抽象成如下:

function setCount(val) {
    count = val
    render(App(), document.getElementById('app'))
}

从上述代码可以看到 setCount 的实现很简单,这个方法就是在点击之后进行更新数据 count 的,并且在更新数据 count 的同时重新渲染视图,把新的 count 值显示到页面上。

以下是测试效果:

00.gif

我们在上述例子中通过极简的代码,从宏观层面阐明了 React 的响应式原理,通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上。

其实上述这段 React 的响应式原理放在 Vue 的响应式原理也是成立的,最大的不同则是 React 更新数据需要通过 setCount 函数,也就是所谓手动触发,而 Vue 则是自动触发的。那么下面我们来看看 Vue 的响应式是怎么实现的。

基于依赖追踪的响应式

我们知道 Vue1 的响应式数据是通过 Object.defineProperty 来实现的。基于 Object.defineProperty 来实现需要初始化对对象的每一个熟悉进行劫持监听。

例如我们有这么这个对象 const data = { count:0 },那么我们需要进行以下操作:

const data = { count: 0 }
Object.keys(data).forEach(key => {
   Object.defineProperty(data, key, {
    get() {
        return data[key]
    },
    set(val) {
        data[key] = val
    }
   }) 
})

上述这个写法会造成内存栈溢出,主要是因为在 Object.defineProperty 的 getter 中读取 data[key] 会触发 getter 循环读取,从而造成死循环。

00.png

我们可以把 getter 中 data[key] 的取值放在进行 Object.defineProperty 监听之前。

const data = { count: 0 }
Object.keys(data).forEach(key => {
+   const val = data[key]
   Object.defineProperty(data, key, {
    get() {
+        return val
    },
    set(val) {
        data[key] = val
    }
   }) 
})

但这样 val 会被循环取值进行了覆盖,没办法正确读取每个 key 的值,为了可以读取每个 key 的值,我们可以通过闭包的形式把每个 key 的值缓存下来。

const data = { count: 0 }
Object.keys(data).forEach(key => {
   defineReactive(data, key, data[key]) 
})

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            return val
        },
        set(newVal) {
            val = newVal
        }
    })
}

接下来我们要做的就是在 getter 中进行依赖收集,然后在 setter 中进行依赖触发,这本质上就是一个订阅发布模式。

const data = { count: 0 }
+ // 声明一个依赖存储中心
+ const subscribers = new Set()
+ // 需要收集的依赖,在 Vue1 叫 wachter,Vue3 中叫 effect,本质上就是一个订阅者,关于发布订阅模式,我们后续再详细介绍
+ let activeEffect
Object.keys(data).forEach(key => {
   defineReactive(data, key, data[key]) 
})

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
+            // 存在依赖就把依赖收集到依赖存储中心
+            if(activeEffect) subscribers.add(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
+            // 值更新了,就需要去把依赖存储中心中的订阅者全部重新执行一遍
+            subscribers.forEach(sub => sub())
        }
    })
}

我们在上述代码中通过一个全局变量 subscribers 在响应式数据的 getter 把依赖收集到 subscribers 中, 在 setter 中则把 subscribers 中收集到的依赖进行循环遍历重新执行一遍,从而实现了依赖追踪和触发。

那么现在我们有了响应式数据 data 之后,我们就可以对我们前面的例子中的 App 函数中的 count 数据进行更改了,我们之前实现的是 React 的方式,我们现在要把它改成 Vue 的方式。

- count = 0
function App (){
    return createElement('button', {
        onClick: () => {
-            count ++
-            setCount(count)
+            data.count ++
        },
-        children: count
+        children: data.count
    })
}

此外渲染函数的执行方式也需要改成一个副作用函数,通过副作用函数进行调用执行。

    activeEffect = () => {
        render(App(), document.getElementById('app'))
    }

    activeEffect()

    activeEffect = null

我们把一个副作用函数赋值给了变量 activeEffect,然后再执行 activeEffect,那么在执行 activeEffect 函数的时候就会去执行 rander 函数,并通过 App 函数生成虚拟DOM,在 App 函数中对虚拟DOM 的 children 属性赋值的时候是通过读取响应式数据 data 中的 count 值,那么这时就会触发 count 属性的 getter,然后就会在 getter 中进行依赖收集,在 getter 中很明显这个时候 activeEffect 是有值的,所以会进行依赖收集。当点击的时候,就会触发 data.count ++ 的执行,这时就会触发 count 属性 setter,然后就会在 setter 中进行依赖触发。

以下是测试效果:

00.gif

至此我们把 Vue 的数据响应式也通过最少的代码量阐明了,以上的 Vue 的响应式原理估计很多同学都非常清楚,因为这是面试被问几率非常高的题目。我在这里重复讲解,是为了对比 React 和 Vue 响应式原理的差别,总的来说,React 和 Vue 的响应式原理在宏观层面是有非常大的相同之处的,都是通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上。在宏观层面 React 和 Vue 响应式原理最大的不同则是数据触发方式的不同,React 是数据变更后需要开发者通过手动调用 React 提供的 API 进行触发视图的更新,而 Vue 则是自动触发的,因为 Vue 的状态数据是响应式的,而 React 的状态数据不是响应式的。

我们一般在很多的文章中都只讲了 Vue1 的数据响应式原理是通过 Object.defineProperty 来实现的,那么是否只能通过 Object.defineProperty 来实现呢?很明显不是,我们上述例子中的 data,我们现在如果想给它新增属性 data.name,那么 Object.defineProperty 是无法进行监听追踪的,所以我们通过一个工具来对 data 也进行监听。

function observe (data) {
    // 给对象 data 添加一个属于 data 对象的依赖存储中心
    data.__ob__ = new Set()
    Object.keys(data).forEach(key => {
        const value = data[key]
        defineReactive(data, key, value) 
    })
}

那么在 getter 中要对对象的依赖也进行收集

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            // 存在依赖就把依赖收集到依赖存储中心
            if(activeEffect) {
                subscribers.add(activeEffect)
                // 如果读取的值是对象,那么还要给这个对象进行依赖收集,并且新的对象也要通过 observe 进行监听
                if(Object.prototype.toString.call(val) === '[object Object]') {
                    observe(val)
                    val.__ob__.add(activeEffect)
                }
            } 
            return val
        },
        set(newVal) {
            val = newVal
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
        }
    })
}

然后我们要专门通过一个单独的 API 来给响应式对象添加属性,并且在添加属性之后进行依赖触发。

function set(target, key, val) {
    target[key] = val
    // 新添加的属性也需要通过 Object.defineProperty 进行监听
    defineReactive(target, key, val)
    const ob = target.__ob__
    // 进行对象的依赖触发
    ob.forEach(sub => sub())
}

接着我们把 App 函数进行以下修改

// 修改数据结构
const data = { data: { count: 0 } }
// 通过 observe 处理
observe(data)
function App (){
    return createElement('button', {
        onClick: () => {
            set(data.data, 'count1', 2)
        },
        children: JSON.stringify(data.data)
    })
}

然后我们重新运行代码结果如下:

01.gif

我们看到可以正常运行,但对象中的私有属性 __ob__ 也显示出来了,我们希望它不要被枚举出来,我们可以通过 Object.defineProperty 对它进行以下设置。

function observe (data) {
    // 给对象 data 添加一个属于 data 对象的订阅者中心
-    data.__ob__ = new Set()
+    Object.defineProperty(data, '__ob__', {
+        value: new Set(), // 属性的值,默认为 undefined
+        enumerable: false, // 属性是否可枚举,默认为 false
+        writable: true, // 值是否可写,默认为 false
+        configurable: true // 属性是否可配置,默认为 false
+    })
    Object.keys(data).forEach(key => {
        const value = data[key]
        defineReactive(data, key, value) 
    })
}

修改后再进行测试,我们可以看到 __ob__ 属性不再出现了。

02.gif

可以通过 Object.defineProperty 对数组进行监听,但监听不了 push、pop、shift 等对数组进行操作的方法,所以我们需要对数组的操作方法进行重写,重写的方法就是覆盖数组数据上的原型对象 __proto__

function observe (data) {
// 省略 ...
+   if (Array.isArray(value)) {
+      // 如果是数组则重新数组上的原型
+      value.__proto__ = {
+          join(val) {
+             // 通过原生数组上方法进行调用
+             return Array.prototype.join.call(value, val)
+          },
+          push(val) {
+             // 通过原生数组上的方法进行调用
+             Array.prototype.push.call(value, val)
+             subscribers.forEach(sub => sub())
+          }
+      }
+   } else {
        Object.keys(data).forEach(key => {
            const value = data[key]
            defineReactive(data, key, value) 
        })
+    }
}

我们这里只测试 join 和 push 方法,而 join 方法没有更改到数据,所以是不用进行依赖触发的。

然后我们对 App 应用也进行修改一下,以便测试数组响应式数据

// 数据
const data = ['cobyte']
// 通过 bserver 处理
observe(data)
function App (){
    return createElement('button', {
        onclick: () => {
            data.push('=')
        },
        children: data.join('-')
    })
}

测试结果如下:

03.gif

小结

至此,我们通过手写已经基本实现了 Vue1 的数据响应式原理,我们可以通过对 Vue2 数据响应式原理的分析进行一个宏观总结。我们需要在实践中总结规律,然后又通过规律更好地指导实践

首先我们都知道 Vue1 的数据响应式原理是通过 Object.defineProperty 实现的,通过 Object.defineProperty 可以监听一个对象的属性的读取(getter)和修改(setter),这样就可以在 getter 的时候进行依赖收集,在 setter 的时候进行依赖触发。但 Vue2 不单单只是通过 Object.defineProperty 实现数据响应式的,因为只有被 Object.defineProperty 初始化了的属性才可以进行监听,而当一个对象新增一个属性时,则监听不了。这时我们需要通过额外的手段来实现对象新增属性时的监听,具体方案就是通过给对象新增一个私有的属性 __ob__,去记录属于该对象的依赖,当该对象新增属性时则触发该对象的依赖重新执行。同时 Object.defineProperty 也监听不了数组的原生方法,例如:push、pop、shift、unshift、splice、sort、reverse,我们观察一下这些数组方法发现都有一个共同特点,就是他们都会修改数组,使数组数据发生变化,那么根据数据响应式的原理,数据发生了改变就需要进行依赖触发,那么我们需要对响应式数据类型为数组的数据进行重写它们的原型,这样我们就可以在响应式数组通过 push、pop、shift、unshift、splice、sort、reverse 方法修改数组的时候进行依赖触发了。

我们可以总结出,不管是通过 Object.defineProperty 进行监听对象属性还是通过给对象添加私有属性 __ob__,去记录该对象的依赖,还是重写数组的原型方法,目的都只有一个:进行数据的依赖追踪和触发。

我们还可以进一步进行总结规律:这种基于依赖追踪的响应式系统,并不是某一种技术,而是一种模式。核心只有一个,就是在数据读取的时候进行依赖收集,在数据更改的时候进行依赖触发。

基于这种指导思想,我们就可以很好去实践 Vue2 的数据响应式原理了。

Vue3 只是通过 Proxy 实现数据响应式吗

Vue3 是通过 Proxy 对数据实现 getter/setter 代理,从而实现响应式数据,然后在副作用函数中读取响应式数据的时候,就会触发 Proxy 的 getter,在 getter 里面把对当前的副作用函数保存起来,将来对应响应式数据发生更改的话,则把之前保存起来的副作用函数取出来执行。这个过程跟 Vue2 是一样的,只是实现细节不一样。

实现起来也非常简单:

function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && subscribers.add(activeEffect)
            return Reflect.get(target, key) 
        },
        set(target, key, val) {
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
            return result
        }
    })
}

我们再把 App 函数进行修改:

const data = reactive({count: 0})
function App (){
    return createElement('button', {
        onClick: () => {
            data.count ++
        },
        children: data.count
    })
}

测试结果如下:

00.gif

我们这里不是为了深入探讨 Vue2 的数据响应式原理的,而是为了验证上面实现 Vue2 的数据响应式原理总结的规律。也就是:这种基于依赖追踪的响应式系统,并不是某一种技术,而是一种模式。核心只有一个,就是在数据读取的时候进行依赖收集,在数据更改的时候进行依赖触发。后续我们基于数据响应式原理的规律便可以很好去理解其他数据响应式系统了,例如 React 的状态管理库——Mobx、SolidJS,我们在后续也将探讨这些库的数据响应式原理的实现。

Vue1 是通过 Object.defineProperty 实现对数据的读写监听,但由于 Object.defineProperty 的局限性,Vue2 并不只是通过 Object.defineProperty 实现数据响应式的,但都为了实现在数据读取时进行依赖收集,在数据更改时进行依赖触发。Vue3 则通过新的 API:Proxy 可以实现对数据的读写监听,但核心也是为了实现在数据读取时进行依赖收集,在数据更改时进行依赖触发。

那么问题来了,Vue1 并不只是通过 Object.defineProperty 实现数据响应式的,那么 Vue3 只是通过 Proxy 实现了数据响应式吗?

其实这个问题可以转化得更具体一些,Vue2 的 reactive 和 ref 的底层实现原理是一样的吗?有人认为 ref 和 reactive 的底层实现原理都是一样的,也就是 ref 也是通过 reactive 实现的,也就是 ref 也是通过 Proxy 实现的。如果说 ref 和 reactive 的底层实现原理不一样的话,也就是说 Vue3 可以不通过 Proxy 实现数据的响应式。

很明显 Vue3 可以不通过 Proxy 实现数据的响应式的,也就是 ref 和 reactive 的底层实现原理是不一样的。那么根据我们上面总结的实践规律,我们只需要可以实现在数据读取的时候进行依赖收集,然后在数据更改的时候进行依赖触发就可以了。那么明显可以使用 Vue2 中的 Object.defineProperty 中的 getter/setter,这种方式也叫属性访问器。根据上面 Vue2 的数据响应式原理我们可以知道如果通过 Object.defineProperty 实现对数据的监听,还要通过闭包的方式,就显得不够简洁。那么属性访问器除了使用 Object.defineProperty 进行显式声明之外,还可以通过字面量的方式,本质还是属性访问器

例如:

function ref(value) {
    return {
        _value: value,
        get value() {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && subscribers.add(activeEffect)
            return this._value
        },
        set value(val) {
            this._value = val
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
        }
    }
}

然后我们通过 ref 函数来创建一个响应式数据,再修改 App 函数。

const count = ref(0)

function App (){
    return createElement('button', {
        onClick: () => {
            count.value ++
        },
        children: count.value
    })
}

测试运行结果:

00.gif

这也就是 Vue2 的 ref API 的实现原理,当然在 Vue3 源码中如果 ref 传进来的值是一个引用对象的话,还是通过 reactive 进行实现。此外在 Vue3 的源码中 ref API 是通过一个 class 类来实现的,但原理是一样的。

我们下面也可以简单实现一下:

class RefImpl {
    _value
    constructor(value) {
        this._value = value
    }
    get value() {
       // 存在依赖就把依赖收集到依赖存储中心
       activeEffect && subscribers.add(activeEffect)
       return this._value 
    }
    set value(val) {
        this._value = val
        // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
        subscribers.forEach(sub => sub())
    }
}

function ref(value) {
    return new RefImpl(value)
}

修改好的测试结果还是一样的。

00.gif

通过沙箱模式实现依赖追踪的数据响应式

通过上面对 Vue1 和 Vue3 的数据响应式原理的实现与分析,我们知道都借助了 JavaScript 的原生 API(Object.defineProperty 和 Proxy) 来实现依赖追踪的响应式系统,那么不借助 JavaScript 原生 API 还可以实现依赖追踪的响应式系统吗? 我们上面总结出的结论是,基于依赖追踪的响应式系统的本质是在读取数据的时候收集依赖,在更新数据的时候触发依赖。那么基于此原理,我们只需要把读写进行分离那么可以实现了。

我们把上面第一版 ref 的实现通过闭包的形式改造一下:

function ref(value) {
    const s = {
        value
    }

    function getState() {
        return s.value
    }

    function setState(val) {
        s.value = val
    }
    return [getState, setState]
}

const [getState, setState] = ref(0)

console.log('初始值:', getState())
// 修改
setState(1)
console.log('修改后:', getState())

我们可以看到通过闭包的我们实现了读写分离,这种模式有一个专业的术语叫:沙箱模式,这样我们就可以在读取数据的时候收集依赖,在修改数据的时候触发依赖了。

function ref(value) {
    const s = {
        value
    }

    function getState() {
        // 存在依赖就把依赖收集到依赖存储中心
        activeEffect && subscribers.add(activeEffect)
        return s.value
    }

    function setState(val) {
        s.value = val
        // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
        subscribers.forEach(sub => sub())
    }
    return [getState, setState]
}

接着我们把 App 函数也进行修改一下:

const [count, setCount] = ref(0)

function App (){
    return createElement('button', {
        onClick: () => {
            setCount(count() + 0)
        },
        children: count()
    })
}

其实上述这种实现依赖追踪的响应式系统的方式就是 SolidJS 的响应式原理,长得像 React,实际上是 Vue。所以我们只要把核心原理搞清楚,就可以举一反三了,像读书时候一样,以后同类型的题目,你都回作答了。当然 SolidJS 的响应式原理远不止这些,我们将在后续章节继续进行深入探讨,搞明白了 SolidJS, Vue Vapor 的原理也非常容易理解了。

总结

上述所有例子中的依赖收集和触发的过程,本质就是一个发布订阅模式,而关于发布订阅模式,我们将在下一篇文章中进行详细介绍。当我们掌握了发布订阅模式后,我们再去理解这些通过依赖收集和触发实现的数据响应式系统,就会如鱼得水。

上述文章写于:2023 年,由于个人原因今年 2026 年发布。

我是程序员Cobyte,现在已转向研究 AI Agent,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

昨天以前首页

微信 ClawBot 接入本地 AI Agent 的实现原理

作者 Cobyte
2026年3月31日 09:09

1. 前言

我们知道微信最近推出了微信 ClawBot,用于在微信中与 OpenClaw 收发消息。掘金签约作者群里的一位大佬说:

01.jpg

受到启发,我就去研究了一下微信半公开的官方文档后发现,我们确实也可以通过用微信 ClawBot 接入任何 AI Agent

至于为什么说官方文档是半公开呢,因为官方暂时还没有公开的文档地址,但又可以通过某些渠道看到(怎么可以看到,本文最后揭晓)。

本文就将带你一步步实现如何通过微信 ClawBot 接入自己开发的 AI Agent。其实我们只需要做三件事:

  1. 扫码登录,拿到微信 ClawBot 的身份凭证;
  2. 长轮询等待消息,一有消息立刻获取;
  3. 把消息交给本地 Agent 处理,再把回复发回微信。

第一步,扫码登录。

2. 扫码登录

根据微信 ClawBot 文档的要求,我们先要获取一个二维码,等用户用微信扫描并确认后,服务端就会返回一个 bot_token 的通信凭证,后续所有请求都必须带着这个 token。这个跟我们平时的开发是一样,我们登录之后才能进行操作。

2.1 拉取二维码

首先微信 ClawBot 的接口地址是:

BASE_URL = "https://ilinkai.weixin.qq.com"

其次,登录的第一步是向服务端请求二维码。我们根据微信 ClawBot 的文档可以知道请求的接口是:

ilink/bot/get_bot_qrcode?bot_type=3

请求的 HTTP 方式是 GET。值得注意的是参数 bot_type 在微信 ClawBot 的文档中只出现了在获取二维码的时候,值是 3,而其他枚举值的情况,文档中并没有说明。

因为要用到 HTTP 的 GET 请求,所以我们需要封装一个 GET 请求的方法:

import json
import urllib.request
import urllib.error

# 省略...

def _get(url: str, headers: dict = {}, timeout: int = 35) -> dict:
    """发送 GET 请求"""
    req = urllib.request.Request(url, headers=headers, method="GET")
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} GET {url}: {e.read().decode(errors='replace')}") from e

接着我们封装拉取二维码的请求函数:

def fetchQRCode():
    base = BASE_URL.rstrip("/") + "/"
    url = base + "ilink/bot/get_bot_qrcode?bot_type=3"
    resp = _get(url)                      # GET 请求
    qrcode_raw = resp.get("qrcode")       # 服务端用于轮询的标识
    qrcode_url = resp.get("qrcode_img_content")   # 可扫描的二维码链接
    return qrcode_raw, qrcode_url

微信 ClawBot 服务端返回的 qrcode_img_content 是一个可以直接扫码的链接。我们在终端里可以通过安装 qrcode 库把它打印成 ASCII 二维码或者直接打印链接让用户打开链接,通过手机微信进行扫码。字段 qrcode 则是服务端的二维码标识,用于后续轮询二维码的状态,是否已经被扫码等。

我们测试一下上述代码:

print(fetchQRCode())

打印结果如下:

('50189c0db1817eb74a2bfc11e4ccdb35', 'https://liteapp.weixin.qq.com/q/7GiQu1?qrcode=50189c0db1817eb74a2bfc11e4ccdb35&bot_type=3')

我们打开上述链接在浏览器打开是一个微信二维码。

image.png

2.2 轮询扫码状态

二维码生成后,我们每隔一秒向微信 ClawBot 提供的二维码状态查询接口进行请求,直到用户完成确认。接着我们封装一个轮询扫码状态的请求接口函数:

def pollQRStatus(qrcode_raw):
    base = BASE_URL.rstrip("/") + "/"
    poll_url = base + f"ilink/bot/get_qrcode_status?qrcode={urllib.parse.quote(qrcode_raw)}"
    deadline = time.time() + 480    # 最多等 8 分钟
    # 这个是微信 ClawBot 规定的,没得解析
    headers = { "iLink-App-ClientVersion": "1" }

    while time.time() < deadline:
        try:
            s = _get(poll_url, headers)
        except Exception as e:
            print(f"  [轮询错误] {e}", flush=True)
            time.sleep(2)
            continue

        status = s.get("status", "wait")

        if status == "wait":
            # 还没扫,继续等,打一个点表示进度
            sys.stdout.write(".")
            sys.stdout.flush()

        elif status == "scaned":
            # 已经扫了,等用户在微信里点确认
            print("\n👀 已扫码,请在微信中点击确认...", flush=True)

        elif status == "confirmed":
            # ✅ 用户点了确认,登录成功!
            token      = s.get("bot_token", "")
            account_id = s.get("ilink_bot_id", "")
            # 账号 ID 规范化:把 @ 和 . 换成 -,例如 abc@im.wechat → abc-im-wechat
            account_id = account_id.replace("@", "-").replace(".", "-")
            real_base  = s.get("baseurl") or BASE_URL
            print(f"\n✅ 登录成功!account_id={account_id}", flush=True)
            return {"token": token, "account_id": account_id, "base_url": real_base}

        elif status == "expired":
            raise RuntimeError("二维码已过期,请重新运行程序。")
        # 每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力
        time.sleep(1)
    raise RuntimeError("登录超时(8分钟),请重试。")

上述函数主要是实现了根据状态码处理不同情况。服务端返回的 JSON 中包含 status 字段,表示当前二维码的状态。我们根据其值进行分支处理:

status == "wait"

  • 表示二维码尚未被扫描。
  • 在终端打印一个点 .(不换行),表示程序仍在等待,给用户视觉反馈。

status == "scaned"

  • 表示用户已经扫描了二维码,但尚未在微信中点击“确认”。
  • 打印提示信息 👀 已扫码,请在微信中点击确认...,告知用户当前进度。

status == "confirmed"

  • 成功状态:用户已确认,登录成功。
  • 从响应中提取 bot_tokenilink_bot_id(机器人唯一标识)、baseurl(可选的后端地址)。
  • 并且对 account_id 进行规范化处理,将 @ 和 . 替换为 -,例如 abc@im.wechat 变为 abc-im-wechat
  • 打印成功信息,并返回一个包含 tokenaccount_idbase_url 的字典,供上层保存和后续请求使用。

status == "expired"

  • 二维码已过期(我们这里设置 8 分钟未扫描或确认即为过期)。
  • 抛出 RuntimeError,提示用户重新运行程序获取新二维码。

最后,每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力。

2.3 实现登录并保存 token 到本地

我们在上面实现了拉取二维码的函数 fetchQRCode 和轮询等待扫码确认的函数 pollQRStatus,我们就可以实现一个登录函数 login 将整个流程串联起来了。实现如下:

def login() -> dict:
    """
    扫码登录,返回 {"token": "...", "account_id": "...", "base_url": "..."}
    """
    # ── 第 1 步:拉取二维码 ──
    [qrcode_raw, qrcode_url] = fetchQRCode() 

    if not qrcode_raw:
        raise RuntimeError(f"获取二维码失败")

    # ── 第 2 步:在终端打印二维码 ──
    print("\n请用微信扫描下方二维码:\n", flush=True)
    try:
        import qrcode                          # pip install qrcode[pil]
        qr = qrcode.QRCode(version=1, border=1)
        qr.add_data(qrcode_url)
        qr.make(fit=True)
        qr.print_ascii(invert=True)            # 用 ASCII 字符在终端渲染,尺寸最小
    except ImportError:
        # 没有安装 qrcode 库时,直接打印链接,用浏览器打开也能扫
        print(f"  {qrcode_url}\n", flush=True)

    # ── 第 3 步:轮询等待扫码 ──
    print("等待扫码...", flush=True)
    return pollQRStatus(qrcode_raw) 

上述登录函数最后返回的数据结构如下:

{
'token': 'd5b3973bb743@im.bot:060000215ac9e1ce7116aeb48b3d998c5b2e4e', 
'account_id': 'd5b3973bb743-im-bot',
'base_url': 'https://ilinkai.weixin.qq.com
'}

为了下次启动不用重新扫码,我们把 token 和 account_id 保存到文件 .weixin_token.json 中:

# token 的本地文件路径
TOKEN_FILE  = Path(__file__).parent / ".weixin_token.json"   # 保存登录后的 token

# 保存 token 和账号
def save_token(data: dict) -> None:
    """把 token 信息保存到本地文件,下次启动不用重新扫码。"""
    TOKEN_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), "utf-8")
    TOKEN_FILE.chmod(0o600)    # 仅当前用户可读,保护 token 安全

这样,下次运行程序时,如果文件存在就直接加载,跳过登录流程。所以我们还需要有一个从本地文件读取上次保存的 token 和账号的函数。实现如下:

def load_token() -> Optional[dict]:
    """从本地文件读取上次保存的 token(如果有的话)。"""
    if TOKEN_FILE.exists():
        try:
            return json.loads(TOKEN_FILE.read_text("utf-8"))
        except Exception:
            pass
    return None

所以整个主流程就是:

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()

    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    # todo run_monitor()

登录完成之后,接下来就是进入消息监听循环了。

3. 长轮询监听循环

拿到 token 后我们就需要发起一个长轮询的 HTTP 接口请求,微信 ClawBot 服务端会“憋着”不返回,直到有新消息或超时(约 35 秒)才返回。这个接口就是:

ilink/bot/getupdates

它采用 POST 方式请求,所以我们需要封装一个 POST 请求的方法,并且微信 ClawBot 的所有 POST 的请求都需要一个通用请求头,通用请求头的要求如下:

Header 说明
Content-Type application/json
AuthorizationType 固定值 ilink_bot_token
Authorization Bearer <token>(登录后获取)
X-WECHAT-UIN 随机 uint32 的 base64 编码

所以我们先封装一个每次请求都需要带的 HTTP 请求头的函数:

def _headers(token: Optional[str] = None) -> dict:
    """
    构造每次请求都需要带的 HTTP 请求头。
    - AuthorizationType: 固定值,告诉服务端这是 bot token 认证
    - Authorization: 登录拿到的 token,未登录时不带
    - X-WECHAT-UIN: 随机数的 base64,模拟微信客户端标识
    """
    h = {
        "Content-Type": "application/json",
        "AuthorizationType": "ilink_bot_token",
        # 随机 uint32 → 十进制字符串 → base64,与原版协议一致
        "X-WECHAT-UIN": base64.b64encode(
            str(struct.unpack(">I", os.urandom(4))[0]).encode()
        ).decode(),
    }
    if token:
        h["Authorization"] = f"Bearer {token}"
    return h

接着我们封装一个通用 POST 请求函数:

def _url(path: str) -> str:
    """拼接完整 URL,确保 BASE_URL 末尾有斜杠。"""
    base = BASE_URL.rstrip("/") + "/"
    return base + path
    
def _post(path: str, body: dict, token: Optional[str] = None, timeout: int = 15) -> dict:
    """
    发送 POST JSON 请求,返回解析后的响应字典。
    所有与微信后端的通信都走这个函数。
    """
    data = json.dumps(body).encode("utf-8")
    req  = urllib.request.Request(
        _url(path),
        data    = data,
        headers = {**_headers(token), "Content-Length": str(len(data))},
        method  = "POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} /{path}: {e.read().decode(errors='replace')}") from e

所有与微信 ClawBot 后端的通信都走这个 _POST 函数。

接着我们封装长轮询接收消息函数:

def getUpdates(token: str, buf: str = "", timeout: int = 35) -> dict:
    """
    长轮询接口:向服务端发请求,服务端"憋着"不回,直到有新消息或超时才返回。

    参数:
      buf     - 上次返回的游标,传给服务端表示"从这里继续",首次传空字符串
      timeout - 等待秒数,服务端通常在 35 秒内有消息就返回,无消息就返回空

    返回值里的重要字段:
      msgs            - 新消息列表(可能为空)
      get_updates_buf - 新游标,下次请求要带上它
    """
    try:
        return _post(
            "ilink/bot/getupdates",
            body    = {"get_updates_buf": buf, "base_info": {"channel_version": "mini-bridge-1.0"}},
            token   = token,
            timeout = timeout + 5,    # 客户端超时比服务端多 5 秒,避免误判
        )
    except (TimeoutError, OSError) as e:
        if "timed out" in str(e).lower():
            # 超时是正常现象,不是错误,直接返回空结果,继续下一轮
            return {"ret": 0, "msgs": [], "get_updates_buf": buf}
        raise

上述 getUpdates 函数返回两个重要的字段 msgs(消息列表)和 get_updates_buf(游标),消息列表我们很好理解,但游标我们需要了解一下。

什么是游标?

游标就是一个字符串,每次 getUpdates 返回时都会同时返回一个新的游标,表示“下一次请求从这里开始”。我们可以把游标持久化到文件 .weixin_buf.txt 中,这样即使程序重启,也能接着之前的位置继续收消息,不会漏掉中间的消息。

游标存储的本地文件路径设置如下:

# 游标存储的本地文件路径
BUF_FILE    = Path(__file__).parent / ".weixin_buf.txt"      # 保存消息游标(断点续传)

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的字段如下:

字段 类型 说明
ret number 返回码,0 = 成功
errcode number? 错误码(如 -14 = 会话超时)
errmsg string? 错误描述
msgs WeixinMessage[] 消息列表(结构见下方)
get_updates_buf string 新的同步游标,下次请求时回传
longpolling_timeout_ms number? 服务端建议的下次长轮询超时(ms)

根据上述的资料我们就可以实现对 getUpdates 接口的长轮询监听循环了,实现如下:

def run_monitor(token: str) -> None:
    """
    长轮询监听循环
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

上述实现 run_monitor 函数目前能长轮询监听微信 ClawBot 服务器消息的接收。主要功能如下:

  • 首先加载之前保存的消息游标(buf)实现断点续传。

  • 进入监听循环:

    1. 调用 getUpdates(token, buf) 获取消息(可能阻塞直到有消息或超时)。
    2. 如果调用失败(异常),则增加失败计数,连续失败 3 次后休眠 30 秒再重试;否则休眠 2 秒后继续。
    3. 如果返回的业务错误码非 0,则打印错误并休眠 2 秒后继续。
    4. 成功获取响应后,提取 get_updates_buf 新游标,更新 buf 并持久化到文件,以便下次重启恢复

我们知道上述 getUpdates 接口还返回了 msgs 消息列表,我们需要遍历返回的消息列表,提取文本,交给本地 AI Agent 进行处理。

4. 处理返回的消息

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的 msgs 消息列表字段结构如下:

字段 类型 说明
seq number? 消息序列号
message_id number? 消息唯一 ID
from_user_id string? 发送者 ID
to_user_id string? 接收者 ID
create_time_ms number? 创建时间戳(ms)
session_id string? 会话 ID
message_type number? 1 = USER, 2 = BOT
message_state number? 0 = NEW, 1 = GENERATING, 2 = FINISH
item_list MessageItem[]? 消息内容列表
context_token string? 会话上下文令牌,回复时需回传

然后字段 item_list(消息内容列表)的字段结构又如下:

字段 类型 说明
type number 1 TEXT, 2 IMAGE, 3 VOICE, 4 FILE, 5 VIDEO
text_item { text: string }? 文本内容
image_item ImageItem? 图片(含 CDN 引用和 AES 密钥)
voice_item VoiceItem? 语音(SILK 编码)
file_item FileItem? 文件附件
video_item VideoItem? 视频
ref_msg RefMessage? 引用消息

根据上述资料我们就可以处理微信 ClawBot 服务器返回的消息了。处理如下:

def run_monitor(token: str) -> None:

    # 省略...

    while True:

        # 省略...

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

实现也很简单,遍历 msgs 消息列表,然后再从消息的 item_list 里找 type=1(文本)的那一项。而非文本消息(图片、语音等)我们暂不处理,先跑通主流程再说。

经过上述处理后我们就拿到了微信 ClawBot 服务器返回的文本消息了,我们接着就把它交给本地 Agent 进行处理。

5. 接入本地 AI Agent

前面的步骤已经实现了本地接收到微信 ClawBot 发来的信息了,现在就需要接入一个本地 AI Agent 来处理微信用户发来的信息了。接入本地 AI Agent 也很简单,我们前面的文章已经实现了一个 Agent Loop,我们直接使用就可以了。

我们定义一个函数 askAgent,用它来管理每个用户的对话历史,并将用户的新消息交给 Agent 处理:

# ── 导入本地 Agent ──
from agent import agent_loop, SYSTEM as AGENT_SYSTEM

# 每个微信用户维护一份独立的对话历史,key 是用户 ID
_sessions: dict[str, list] = {}

def askAgent(user_id: str, user_text: str) -> str:
    """
    把用户的消息交给 Agent 处理,返回 Agent 的回复文本。

    - 每个用户有自己独立的对话历史(_sessions),实现多用户隔离
    - agent_loop 会循环调用大模型直到得到最终回复
    """
    # 第一次对话时,初始化这个用户的历史,带上系统提示词
    if user_id not in _sessions:
        _sessions[user_id] = [{"role": "system", "content": AGENT_SYSTEM}]

    # 把用户这条消息追加到历史
    _sessions[user_id].append({"role": "user", "content": user_text})

    # 交给 Agent 处理,agent_loop 会直接修改传入的列表(追加 assistant 回复)
    try:
        reply = agent_loop(_sessions[user_id])
        return reply or "(无回复)"
    except Exception as e:
        return f"[Agent 出错] {e}"

我们上述函数 ask_agent 实现了把用户的消息交给 Agent 处理,然后返回 Agent 的回复文本,并且还实现每个用户有自己独立的对话历史,实现了多用户隔离。

接下来就是把 Agent 的回复发回微信。

6. 把 Agent 的回复发回微信

回复消息的接口是 ilink/bot/sendmessage,它最重要的参数是 context_token,这个 token 是从收到的消息里原样取出的,服务端依靠它来将回复与对话关联起来(类似于会话 ID)。我们来实现一个 sendMessage 函数进行发送信息:

def sendMessage(token: str, to_user_id: str, text: str, context_token: str) -> None:
    """
    向微信用户发送一条文本消息。

    重要:context_token 必须原样从收到的消息里取出并回传,
    服务端靠它把回复和对话关联起来。没有它,消息发不出去。
    """
    _post(
        "ilink/bot/sendmessage",
        token = token,
        body  = {
            "msg": {
                "from_user_id" : "",                            # bot 发送,留空
                "to_user_id"   : to_user_id,                   # 发给谁
                "client_id"    : f"mini-{secrets.token_hex(8)}",  # 本次消息的唯一ID,防重复
                "message_type" : 2,                            # 2 = BOT 消息
                "message_state": 2,                            # 2 = 消息已完成(非流式)
                "item_list"    : [{"type": 1, "text_item": {"text": text}}],  # type=1 是文本
                "context_token": context_token,                # ← 关键!必须带上
            },
            "base_info": {"channel_version": "mini-bridge-1.0"},
        },
    )

这里我们固定使用 message_type=2(机器人消息)、message_state=2(已完成,非流式)。client_id 是消息的唯一标识,用于去重,这里随机生成即可。

7. 整合运行

现在我们就可以把登录、收消息、处理消息、发消息串起来了,整合运行。

def run_monitor(token: str) -> None:
    """
    长轮询监听循环:持续等待微信消息,收到后交给 Agent 处理并回复。

    整个循环做三件事:
      1. 调 getUpdates() 等消息(服务端"憋着",有消息才返回)
      2. 遍历返回的消息列表,提取文本,交给 ask_agent() 得到回复
      3. 调 sendMessage() 把回复发回给用户

    参数:
      token - 登录后拿到的 bot_token,每次请求都要带上
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

            # 第二件事:把文本交给 Agent,得到回复
            reply = askAgent(from_user, text)
            print(f"[回复] {reply[:60]}", flush=True)

            # 第三件事:把 Agent 的回复发回微信
            try:
                send_message(token, from_user, reply, ctx_token)
                print("[✓] 已发送", flush=True)
            except Exception as e:
                print(f"[✗] 发送失败: {e}", flush=True)

最后,在主函数中,我们读取或登录获取 token,然后启动 run_monitor

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()
    # 不存在 token 就扫码登录
    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    run_monitor(token)

在运行前我们需要安装一下相关依赖。

requirements.txt 内容如下:

openai==2.24.0
itchat-uos>=1.3.10
qrcode_terminal == 0.8.0

然后执行:

pip install -r requirements.txt

接着我们运行上述代码结果显示如下:

image.png

接着我们使用微信扫码结果显示如下:

image.png

我们点击按钮继续,这时可以看到终端显示如下:

image.png

微信端显示如下:

image.png

聊天栏显示:

c4a8f799d97ee7c9a29b75fa359f5263.jpg

这时我们就可以通过微信 ClawBot 和我们本地自己写的 Agent 进行通讯了。比如我们之前实现的一个可以读取本地文件的 AI Agent,我们创建一个测试文件 test.txt,写上以下内容:

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人

然后在微信 ClawBot 中输入:帮我读取 test.txt 的文件内容,显示如下

image.png

终端内容显示如下:

image.png

8. 总结与扩展

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人,它能够:

  • 通过扫码登录
  • 长轮询接收消息
  • 调用任意本地 AI Agent 处理消息
  • 将回复发回给微信用户

整个程序的核心代码不到 200 行,却涵盖了微信 ClawBot 协议的关键点。你可以在此基础上轻松扩展:

  • 支持多轮对话:通过会话历史管理,我们已经实现了多轮对话的基础。
  • 支持图片、语音:解析消息中的 item_list,识别图片或语音,调用相应的 AI 模型(如图像识别、语音转文字)。
  • 支持命令识别:在文本中检测特定前缀(如 /help),触发不同功能。
  • 接入更强大的 Agent:例如集成 LangChain 实现复杂工作流、接入 Ollama 或 vLLM 等本地推理框架运行开源大模型、或增加联网搜索、RAG(检索增强生成)等能力。

最重要的是,这套方法不依赖任何第三方中间件,完全基于微信官方 ClawBot 协议,相对稳定可靠。你只需要一个微信账号,就能让你的 AI 助手 7×24 小时在线。

希望本文能帮你打开一扇窗,让你在微信这个庞大的社交平台上,用自己的 AI 能力创造更多有趣的应用。动手试一试吧,你会发现过程比想象中简单许多!

我是 程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

最后怎么查看微信 ClawBot 的官方文档,可以通过 npm 安装 @tencent-weixin/openclaw-weixin-cli@tencent-weixin/openclaw-weixin 包,然后在 node_modules 目录中找对应的包里面有源码和文档。当然微信团队不公开可能后续会随时改变策略,所以须谨慎评估风险。

❌
❌