普通视图

发现新文章,点击刷新页面。
今天 — 2026年4月10日首页

3.响应式系统基础:从发布订阅模式的角度理解 Vue2 的数据响应式原理

作者 Cobyte
2026年4月10日 09:04

前言

关于 Vue2 和 Vue3 的数据响应式原理,相信 Vue 技术栈的同学或多或少都了解过,甚至在简历上写很熟悉,而且我们在第一篇中也已经基本实现过了,但大家是否真的彻底掌握了呢?

正如我们前面所说的那样,Vue2、Vue3、SolidJS、Mobx 它们的数据响应式基本原理都是一样的,具体区别只是设计理念和实现方式不一样。按以前读书时代考试做题一样,它们是同一类型的题目,也就是基于依赖收集和触发的运行时的数据响应式,如果说你只会解答其中一道题,其他的题却不会解答,则说明你并没有真正彻底掌握这一类题。

同样地基于依赖追踪和触发的响应式系统都是通过发布订阅模式进行实现的,那么你知道 Vue 的数据响应式原理中是如何运用发布订阅模式的吗?

所以我们在本篇当中将从发布订阅模式的角度来理解 Vue 的数据响应式原理,彻底掌握数据响应式的基本原理,同时也巩固我们在上一篇中所说的发布订阅模式。我们在上一篇中学习了发布订阅模式,我们都是基于一些 demo 的例子去理解,本篇则真正的把发布订阅模式在实际项目的进行运用。

温馨提示,阅读本本之前最好先阅读前一篇文章,对发布订阅模式有一定的理解

发布订阅模式原理回顾

我们在上一篇中最后是通过 Object.defineProperty 方法对公众号对象 weChatOfficialAccountarticle 属性进行劫持监听,然后在 getter 的时候进行订阅,在 setter 的时候进行发布。

代码如下:

// 定义公众号
const weChatOfficialAccount = {
    // 订阅公众号的人的记录列表
    subscribers: [],
    // 文章内容
    article: '',
    // 添加订阅者
    addDep(fn) {
        // 把订阅者添加进记录列表
        this.subscribers.push(fn) 
    },
    // 广播信息
    notify(title) {
        // 发布信息时就是把记录列表中的订阅者全部通知一次
        this.subscribers.forEach(fn => fn(this.article))
    },
    // 取消订阅
    remove(fn) { 
        // 找到需要删除的订阅者
        const index = this.subscribers.indexOf(fn)
        // 删除订阅者
        this.subscribers.splice(index, 1 )
    }
}
defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者就进行订阅操作
            if (subscriber) weChatOfficialAccount.addDep(subscriber)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            weChatOfficialAccount.notify()
        }
    })
}

我们可以看到 weChatOfficialAccount 对象上有很多属于发布订阅模式中的功能,如果说还有其他对象也需要实现这样的功能,那么也要实现一遍这些功能,很明显这样是不可接受的,我们可以通过上一篇中实现的消息代理来代替 weChatOfficialAccount 对象中的发布订阅模式的功能。

首先我们对消息代理的实现做如下修改:

class Dep {
  constructor() {
    // 订阅者存储中心
    this.subs = []
  }
  // 添加订阅者
  addSub(sub) {
    this.subs.push(sub)
  }
  // 通知订阅者
  notify() {
    this.subs.forEach(sub => sub())
  }
}

如果熟悉 Vue2 数据响应式原理的同学对上面的代码肯定很熟悉,这个就是 Vue2 源码中的 Dep 类的简易实现,所以 Vue2 源码中的 Dep 其实就是一个事件总线或者叫消息代理,但它又不仅仅是消息代理,在某些时刻它同时又是一个订阅者,这个情况就是我们在上篇当中所说的一个对象既可以是发布者也可以是订阅者,而在 Vue2 源码中 Dep 类既是消息代理中心又是订阅者,具体情况我们将下文中进行详细讲解。

接下来我们继续做如下修改:

const weChatOfficialAccount = {
    // 消息代理对象
    __ob__: new Dep(),
    // 文章内容
    article: '', 
}
defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
    // 获取消息代理对象
    const dep = weChatOfficialAccount.__ob__
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者就进行订阅操作
            if (subscriber) dep.addSub(subscriber)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

我们在公众号对象中通过 __ob__ 属性来存储消息代理对象,这样公众号对象原本属于发布订阅的功能就通过 Dep 类来实现了,这样代码的功能职责就梳理得十分清晰了,也符合代码整洁之道。

我们对修改后的代码进行测试:

// 订阅者小明
let subscriber = () => {
    console.log(`收到的公众号文章:${weChatOfficialAccount.article}`)
}
// 读取一次,触发 getter 进行订阅
weChatOfficialAccount.article
// 设置为 null 防止重复订阅
subscriber = null
// 公众号发布文章则直接通过给属性赋值的方式
weChatOfficialAccount.article = '通过 Object.defineProperty 方法实现订阅发布模式'

上述测试代码的测试结果将会打印:

收到的公众号文章:通过 Object.defineProperty 方法实现订阅发布模式

我们通过 Object.defineProperty 方法实现对 weChatOfficialAccount 对象的属性 article 进行监听实现发布订阅功能,然后订阅的时候需要读取一下,触发 getter 进行订阅,这个行为在上述例子中比较奇怪,我们把它改成我们容易理解的例子。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title></title>
  </head>
  <body>
    <div id='content'></div>
    <script>
        const weChatOfficialAccount = {
            // 消息代理对象
            __ob__: new Dep(),
            // 文章内容
            article: '文章内容', 
        }
        // 省略 ...
        // 订阅者小明
        let subscriber = () => {
            const el = document.getElementById('content')
            el.textContent = `小明收到的公众号文章:${weChatOfficialAccount.article}`
        }
        // 初始化
        subscriber()
        subscriber = null
    </script>
  <body>
</html>

我们改成我们 web 应用程序中的例子就比较容易理解了。所谓订阅者小明,就是一个 HTML 更新函数,在初始化执行 subscriber() 函数的时候,会读取公众号对象 weChatOfficialAccountarticle 属性的值,这样就会触发 getter 函数进行订阅,在后续当公众号对象 weChatOfficialAccountarticle 属性值发生变化的时候,就会触发 setter 进行发布,也就是重新执行订阅者函数,然后网页内容发生变化。这个也是 Vue2 中的数据响应式的基本原理。

在上述例子中我们只对其中一个属性进行监听,但实际情况很有可能有其他订阅者对其他属性的进行引用。

// 订阅者郭靖
let guojingSubscriber = () => {
    console.log(`郭靖收到的公众号文章作者:${weChatOfficialAccount.author}`)
} 
// 订阅者杨过
let yangguoSubscriber = () => {
    console.log(`杨过收到的公众号发布时间:${weChatOfficialAccount.date}`)
}

然后我们需要对公众号对象进行修改:

const weChatOfficialAccount = {
    // 事件总线对象
    __ob__: new Dep(),
    // 文章内容
    article: '文章内容',
+    author: '作者',
+    date: '日期'
}
defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
+ defineReactive(weChatOfficialAccount, 'author', weChatOfficialAccount.author)
+ defineReactive(weChatOfficialAccount, 'date', weChatOfficialAccount.date)

同时需要对 getter 中的添加订阅者部分进行修改,为了精准添加对应的订阅者,我们需要判断对应的属性:

function defineReactive(data, key, val) {
    // 获取消息代理对象
    const dep = weChatOfficialAccount.__ob__
    Object.defineProperty(data, key, {
        get() {
+            // 为了精准添加对应属性的订阅者,我们需要判断对应的属性
+            if (subscriber && key === 'article') dep.addSub(subscriber)
+            if (guojingSubscriber && key === 'author') dep.addSub(guojingSubscriber)
+            if (yangguoSubscriber && key === 'date') dep.addSub(yangguoSubscriber)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

我们上述代码中为了精准添加对应属性的订阅者,我们需要在 getter 中判断对应的属性,在功能简单的情况下可以,如果功能复杂,对象的属性庞大的情况下,这样肯定是不能接受的。

又因为在触发 getter 的时候,只会是在某个订阅者函数在执行的时候,也就是说在 getter 被触发的时候,这个时候的订阅者是确定的,所以我们可以采用 中间变量 形式来解决这个问题。我们设置一个全局变量 activeEffect,也就是所谓中间变量,然后在初始化执行订阅者函数之前把需要执行的订阅者函数赋值给 activeEffect,然后在 getter 里面就可以把中间变量 activeEffect 通过消息代理对象添加到订阅者记录里面了,然后在执行完该订阅者函数之后则需要把中间变量 activeEffect 设置为 null,防止重复添加。

代码修改如下:

+ // 订阅者中间变量
+ let activeEffect
function defineReactive(data, key, val) {
    // 获取消息代理对象
    const dep = weChatOfficialAccount.__ob__
    Object.defineProperty(data, key, {
        get() {
+            // 存在订阅者中间变量就进行订阅者添加
+            if (activeEffect) dep.addSub(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}
+ // 初始化订阅者
+ activeEffect = subscriber
+ subscriber()
+ activeEffect = null
+ activeEffect = guojingSubscriber
+ guojingSubscriber()
+ activeEffect = null
+ activeEffect = yangguoSubscriber
+ yangguoSubscriber()
+ activeEffect = null

接着我们进行测试:

// 公众号发布文章
weChatOfficialAccount.article = '通过 Object.defineProperty 方法实现订阅发布模式'

打印结果如下:

C01.png

我们就发现我们虽然只对 article 属性进行赋值,但也触发了其他属性的订阅者的执行。那么我们就需要对属性与订阅者之间进行准确关联。那么如何进行准确关联呢?我们通过第一篇文章可以知道,在通过 Object.defineProperty 对每一个属性进行劫持监听的时候,通过闭包的形式把属性值缓存下来的,所以每一个属性的消息代理也放在闭包函数 defineReactive 中。

// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
+    // 通过闭包把每一个属性的消息代理进行缓存
+    const dep = new Dep()
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (activeEffect) dep.addSub(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

这时我们再进行测试的时候,就可以精准触发订阅者了。

我们上面是通过手动调用 defineReactive 函数进行对象的属性劫持的,我们可以通过获取所有对象的属性然后遍历调用 defineReactive 函数进行对象的属性劫持,同时把这个功能封装成一个工具函数 observe。我们在第一篇中也实现过的了,下面我们来重新实现一下:

function observe (data) {
    Object.keys(data).forEach(key => {
        defineReactive(data, key, data[key]) 
    })
}

这样我们就可以通过 observe 函数来定义一个响应式对象了。

const weChatOfficialAccount = {
    // 文章内容
    article: '文章内容',
    author: '作者',
    date: '日期'
}
- defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
- defineReactive(weChatOfficialAccount, 'author', weChatOfficialAccount.author)
- defineReactive(weChatOfficialAccount, 'date', weChatOfficialAccount.date)
+ observe(weChatOfficialAccount)

至此我们便通过发布订阅模式初步实现了 Vue2 的响应式原理。创建一个对象,通过 observe 工具函数遍历此对象所有的 property,并使用 Object.defineProperty 把这些 property 全部转为 getter/setter,然后在 getter 的时候进行依赖收集,在 setter 的时候进行依赖触发。

从发布订阅模式的角度来说就是每一个对象的 property 都是发布者,然后它的消息代理则通过闭包的形式跟每一个 property 的值一起缓存在 defineReactive 闭包函数创建独立空间中,它们是多对多的关系。

小结

在我们一般的发布订阅模式(也叫观察者模式)中,发布者或者被观察者是很明确的,是一个具体的对象,但正如我们前一篇文章所说的那样,发布订阅模式是没有标准范式的,设计模式也是,辨别一种模式不能通过代码结构,而是代码意图。而在我们上述实现的 Vue2 响应式原理的过程中,我们发现其实每一个对象属性(property)都是一个发布者或者叫被观察者,它的发布者功能则通过消息代理中介进行实现,而这个消息代理对象则通过闭包的形式跟每个属性值一起缓存在闭包当中。我们又可以发现所谓发布订阅模式的触发条件也不是唯一的,我们一般的描述定义是,当一个对象发现变化的时候才去触发所有依赖它的订阅者,其实不然,发布订阅模式的触发条件可以是状态的变化、某个操作的变化、甚至是发布订阅者的通知也可以触发另外一个发布者进行发布操作。如果有在 Vue 中使用过事件总线的同学会很清楚,我们在组件中触发通知(emit)订阅者操作的时候并不一定是组件属性发生了变化,而有可能是某个方法触发了通知(emit)订阅者操作。

对数组进行响应式的处理

可以通过 Object.defineProperty 对数组进行监听,但监听不了数组自身的原型链方法,而 pushpopshiftunshiftsplicesortreverse 对数组进行操作是会改变数组的数据结构的,从发布订阅模式的角度来说数据发生变化后我们需要通知该数组对象的所有订阅者。为了实现这需求我们需要劫持数组的操作方法,即在对数组进行 push 等操作的时候我们能监听到。实现方案就是对数组的原型进行重写,重写的方法就是覆盖数组数组对象上的原型对象 __proto__。我们在第一篇当中是通过粗暴的直接覆盖的方式,但那样会把原来的一些数组方法也覆盖掉了,那样是不可取。

我们可以通过获取数组原型上的对象,然后只修改需要修改的方法即可。我们对 observe 方法修改如下:

function observe (data) {
+    // 如果是数组则重写数组上的原型
+    if (Array.isArray(data)) {
+        // 获取数组原型
+        const arrayProto = Array.prototype
+        // 通过 Object.create 创建一个原型为 arrayProto 的空对象
+        const arrayMethods = Object.create(arrayProto)
+        // 修改 push 方法
+        arrayMethods['push'] = function (...args) {
+            // 获取原始方法
+            const original = arrayProto['push']
+            // 执行原始方法
+            const result = original.apply(this, args)
+            return result
+        }
+        // 覆盖原型对象
+        data.__proto__ = arrayMethods
+    } else {
        Object.keys(data).forEach(key => {
          // 如果属性不是 __ob__ 则进行监听
          if (key !== '__ob__')  defineReactive(data, key) 
        })
+    }
}

上述代码我们通过 Object.create 创建一个原型为 arrayProto 的空对象:arrayMethods。然后给空对象设置 push 属性值为一个函数,最终把 arrayMethods 赋值给 data__proto__。这里就涉及到了一个 JavaScript 原型链的基础知识,当我们获取一个对象的属性值的时候,我们优先从该对象的自身属性上去获取,如果找不到则沿着该对象的 __proto__ 属性上的对象上的属性去查找,如果还找不到,则继续沿着 __proto__ 上的对象去查找。

我们经过上面的代码设置之后,我们通过 observe 设置一个数组,那么这个数组的原型对象则变为了arrayMethods,当执行该数组的 push 方法,根据原型链的规则,它会先执行 arrayMethods 对象上的 push 方法,这样我们就可以对该数组的 push 方法进行了监听,我们最终还是通过原本数组上 push 方法进行操作,但我们可以捕捉到了 push 的动作,这样我们就可以在 push 操作之后,进行通知所有该数组上的订阅者了。

我们从前面的发布订阅模式的知识可以知道,一个发布者对象上需要有一个消息代理对象,所以我们需要继续迭代我们的代码:

function observe (data) {
+    // 不存在消息代理则设置消息代理对象
+    if (!data.__ob__) data.__ob__ = new Dep()
    // 如果是数组则重写数组上的原型
    if (Array.isArray(data)) {
        // 获取数组原型
        const arrayProto = Array.prototype
        // 通过 Object.create 创建一个原型为 arrayProto 的空对象
        const arrayMethods = Object.create(arrayProto)
        // 修改 push 方法
        arrayMethods['push'] = function (...args) {
            // 获取原始方法
            const original = arrayProto['push']
            // 执行原始方法
            const result = original.apply(this, args)
+            // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
+            data.__ob__.notify()
             // 同时对新添加的数据也进行响应式化
+             for (let i = 0, l = args.length; i < l; i++) {
+              observe(args[i])
+             }
        }
        // 覆盖原型对象
        data.__proto__ = arrayMethods
    } else {
        Object.keys(data).forEach(key => {
          // 如果属性不是 __ob__ 则进行监听
          if (key !== '__ob__')  defineReactive(data, key) 
        })
    }
}

那么我们这个发布者对象的订阅者在哪里进行添加呢,从数据响应式的角度就是这个响应式对象的依赖在哪里收集呢?

其实不管是对象还是数组的订阅者都是在 getter 中进行添加的。
例如:{ list: [1,2,3,4] }
你要获取到 list 数组的内容,首先是通过 list 这个 property 进行获取的,所以当通过 list 这个 property 进行获取数组内容的时候,就触发了 list 这个 property 的 getter。

所以我们需要对 defineReactive 函数进行修改:

function defineReactive(data, key) {
    let val = data[key]
    // 获取消息代理对象
    const dep = new Dep()
+    // 对获取到的属性值进行递归 observe 监听
+    const childOb = observe(val)
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (activeEffect){
                dep.addSub(activeEffect)
+                // 如果存在 childOb 则说明属性值是一个对象,则也需要对该对象进行订阅者收集,从发布订阅模式的角度看,这是一个订阅操作
+                if (childOb) childOb.addSub(activeEffect) 
            }
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

在 getter 中会进行 property 的订阅者添加,收集到的订阅者保存在对应 property 的消息代理对象中,同时也会判断,property 的值如果是一个对象,还会对这个对象进行订阅者添加,收集到的订阅者还会保存到这个对象的消息代理对象上。

所以我们还需要对 observe 函数进行修改:

+ // 判断是否是对象
+ function isObject(obj) {
+     return obj !== null && typeof obj === 'object'
+ }
function observe (data) {
+    // 不是对象则直接返回
+    if (!isObject(data)) return 
    
    // 省略 ...
    
+    // 返回消息代理对象
+    return data.__ob__
}

至此我们对数组的响应式也实现了,接下来就是进行测试:

// 定义公众号
const weChatOfficialAccount = {
    // 文章内容
    article: '',
    author: '',
    date: '',
    arr: ['掘金']
}

observe(weChatOfficialAccount)

// 订阅者小明
let subscriber = () => {
    console.log(`收到的公众号文章:${weChatOfficialAccount.arr.join('====')}`)
}

// 初始化订阅者
activeEffect = subscriber
subscriber()
activeEffect = null
// 公众号发布文章则直接通过给属性赋值的方式
weChatOfficialAccount.arr.push('通过 Object.defineProperty 方法实现订阅发布模式')

打印结果如下:

C03.png

我们可以看到正确打印了结果,也就是我们也实现了对数组的响应式。

通过重构实现 Observer 类

我们上述函数 observe 的实现其实职责是很不清晰的,也不利于后续的维护,所以我们需要对它进行重构。软件应该是“自描述”的,代码除了给机器看之外,也要给人看。我们希望写的代码更易读,让代码可以更好地表达自己的意图。

这里我们涉及到一些开发技巧,我们可以先实现具体的功能,然后再重构,在重构的时候通过封装成抽象的类或者其他函数,让代码可以更好地表达自己的意图。那么 observe 函数中可以将对数组响应式的处理,还有对对象属性循环劫持分别封装成不同的函数,然后通过函数名称可以让我们的代码意图更明显。

我们对 observe 函数进行重构,代码如下:

function observe (data) {
    // 不是对象则直接返回
    if (!isObject(data)) return 
    // 不存在消息代理则设置消息代理对象
    if (!data.__ob__) data.__ob__ = new Dep()
    // 如果是数组则重写数组上的原型
    if (Array.isArray(data)) {
+        protoAugment(data)
    } else {
+        walk(data)
    }
    // 返回消息代理对象
    return data.__ob__
}
+ // 对数组进行响应式处理
+ function protoAugment(target) {
+    // 获取数组原型
+    const arrayProto = Array.prototype
+    // 通过 Object.create 创建一个原型为 arrayProto 的空对象
+    const arrayMethods = Object.create(arrayProto)
+    // 修改 push 方法
+    arrayMethods['push'] = function (...args) {
+        // 获取原始方法
+        const original = arrayProto['push']
+        // 执行原始方法
+        const result = original.apply(this, args)
+        // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
+        target.__ob__.notify()
        // 同时对新添加的数据也进行响应式化
        observeArray(args)
+    }
+    // 覆盖原型对象
+    target.__proto__ = arrayMethods
+ }
+ // 对对象进行响应式处理
+ function walk(obj) {
+    const keys = Object.keys(obj)
+    keys.forEach(key => {
+        // 如果属性不是 __ob__ 则进行监听
+        if (key !== '__ob__')  defineReactive(obj, key) 
+    })
+ }
 // 对数组的每一项元素都进行响应式处理
 function observeArray (items) {
    for (let i = 0, l = items.length; i < l; i++) {
      observe(items[i])
    }
 }

经过我们上面对不同的功能代码进行重构后,我们就可以通过函数名称很容易理解代码的意图了。但我们上面的功能函数还是十分的分散,而它们都是同一种功能类型的函数,都是实现对象响应式的功能函数,所以我们可以通过 OOP 的思想把响应式数据和操作封装到一个类里面,这个类我们把它命名为 Observer

Observer 类的代码实现如下:

class Observer {
    constructor(value) {
        this.value = value
        this.dep = new Dep()
        this.value.__ob__ = this
        if (Array.isArray(value)) {
            // 对数组进行响应式处理
            this.protoAugment(value)
            // 对数组的每一项都进行响应式处理
            this.observeArray(value)
        } else {
            // 对对象进行响应式处理
            this.walk(value)
        }
    }
    // 进行原型重写
    protoAugment(target) {
        // 获取数组原型
        const arrayProto = Array.prototype
        // 通过 Object.create 创建一个原型为 arrayProto 的空对象
        const arrayMethods = Object.create(arrayProto)
        // 修改 push 方法
        arrayMethods['push'] = function (...args) {
            const ob = this.__ob__
            // 获取原始方法
            const original = arrayProto['push']
            // 执行原始方法
            const result = original.apply(this, args)
            // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
            ob.dep.notify()
            // 同时对新添加的数据也进行响应式化
            ob.observeArray(args)
        }
        // 覆盖原型对象
        target.__proto__ = arrayMethods
    }

    // 对对象进行响应式处理
    walk(obj) {
        const keys = Object.keys(obj)
        keys.forEach(key => {
            // 如果属性不是 __ob__ 则进行监听
            if (key !== '__ob__')  defineReactive(obj, key) 
        })
    }
    // 对数组的每一项元素都进行响应式处理
    observeArray (items) {
        for (let i = 0, l = items.length; i < l; i++) {
            observe(items[i])
        }
    }
}

function observe (data) {
    // 不是对象则直接返回
    if (!isObject(data)) return
    const ob = new Observer(data)
    // 返回 Observer 实例对象
    return ob
}

defineReactive 函数也需要进行以下修改:

function defineReactive(data, key) {
// ...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (activeEffect){
                dep.addSub(activeEffect)
                // 如果存在 childOb 则说明属性值是一个对象,则也需要对该对象进行订阅者收集,从发布订阅模式的角度看,这是一个订阅操作
-                if (childOb) childOb.addSub(activeEffect) 
+                if (childOb) childOb.dep.addSub(activeEffect) 
            }
            return val
        },
        set(newVal) {
// ...
        }
    })
}

我们修改后进行重新测试也正常打印了结果:

C03.png

我们还可以进行一下性能优化,我们上述代码 protoAugment 函数部分,我们创建了数组原型对象的变量,而这些变量其实是不会变化,我们可以把它们的声明移到 protoAugment 函数外面,这样每一次调用 protoAugment 函数就不会重复重新创建这些变量了。

+ // 获取数组原型
+ const arrayProto = Array.prototype
+ // 通过 Object.create 创建一个原型为 arrayProto 的空对象
+ const arrayMethods = Object.create(arrayProto)
+ // 修改 push 方法
+ arrayMethods['push'] = function (...args) {
+    const ob = this.__ob__
+    // 获取原始方法
+    const original = arrayProto['push']
+    // 执行原始方法
+    const result = original.apply(this, args)
+    // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
+    ob.dep.notify()
+    // 同时对新添加的数据也进行响应式化
+    ob.observeArray(args)
+ }

class Observer {
    constructor(value) {
        // 省略 ...
        if (Array.isArray(value)) {
            // 对数组进行响应式处理
+            this.protoAugment(value, arrayMethods)
            // 对数组的每一项都进行响应式处理
            this.observeArray(value)
        } else {

        }
    }

+    protoAugment(target, src) {
+        // 覆盖原型对象
+        target.__proto__ = src
+    }
    // 省略 ...
}

至此我们通过重构就实现了 Observer 类,这一节没有涉及到发布订阅模式和数据响应式相关的内容,只是一下编程技巧的内容,而之所以有这一节是为了我们的代码结构更贴近 Vue2 源码的实现。通过这一节的实现,我们也可以知道发布订阅模式是如何在 Vue2 数据响应式中实现的。

那么从发布订阅模式的角度来看所谓 Observer 类,其实是一个发布者或者叫被观察者,虽然它的类命叫 Observer 翻译过叫观察者,但从观察者模式的角度来看,它不能叫观察者,因为它并没有向哪个被观察者进行订阅操作。但它又不是一个纯粹的发布者,它主要作用是将数据对象转换为响应式对象,使得当数据发生变化时能够触发相应的更新操作,它同时通过递归遍历数据对象中的所有属性,为每个属性设置 gettersetter 来实现数据的劫持和监听,从功能上来看它是在观察自己的属性。

从代码结构上来看,它的发布订阅模式的实现跟传统标准的发布订阅模式的结构还是存在很大差别的,但正如我们上篇文章中所说的那样,我们并不能从代码结构上去判断是否属于什么模式,而是从代码意图去判断。

订阅者中介实现

我们知道 Vue2 中的订阅者是通过 Watcher 类来实现的,也就是我们上一篇文章中所讲的订阅者中介

我们先实现一个订阅者中介:

class Watcher {
    constructor(fn) {
        // 让每个订阅者所需要做的事情通过参数的形式传进来,这样更灵活,拓展性更强
        this.getter = fn
        // 初始化的时候直接读取触发订阅者收集,因为这样设计符合 web 应用的特性
        this.get()
    }
    get() {
        // 通过 Dep.target 来设置当前的订阅者是谁
        Dep.target = this
        this.getter()
        Dep.target = null
    }
    // 接受发布者通知的更新方法
    update() {
        this.getter()
    }
}

我们这里设计的订阅者中介类的实现跟我们上一篇中的订阅者中介类的实现,最大的不同就是,这里的设计需要在初始化的时候就要去执行一次订阅者所传的参数函数,因为在 web 应用应用中,应用需要初始化。

我们在实例化订阅者的时候,就把该订阅者需要做的事情当成参数传进去:

new Subscriber(() => {
    console.log(`郭靖收到的公众号文章:${weChatOfficialAccount.article}`)
}) 

同时 Dep 类的 notify 方法也需要修改一下:

class Dep {
  // 通知订阅者
  notify() {
-    this.subs.forEach(sub => sub())
+    this.subs.forEach(sub => sub.update())
  }
}

defineReactive 函数也需要修改:

- // 订阅者中间变量
- let activeEffect
// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
    // 省略 ...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
-            if (activeEffect){
+            if (Dep.target){
-                dep.addSub(activeEffect)
+                dep.addSub(Dep.target)
                // 如果存在 childOb 则说明属性值是一个对象,则也需要对该对象进行订阅者收集,从发布订阅模式的角度看,这是一个订阅操作
-                if (childOb) childOb.dep.addSub(activeEffect) 
+                if (childOb) childOb.dep.addSub(Dep.target) 
            }
            return val
        },
        set(newVal) {
            // 省略 ...
        }
    })
}

这样我们就可以很方便通过一下进行测试了:

// 定义公众号
const weChatOfficialAccount = {
    // 文章内容
    article: '',
    author: '',
    date: '',
    arr: ['掘金']
}
observe(weChatOfficialAccount)
// 初始化订阅者
new Watcher(() => {
    console.log(`收到的公众号文章:${weChatOfficialAccount.arr.join('====')}`)
})

// 公众号发布文章则直接通过给属性赋值的方式
weChatOfficialAccount.arr.push('通过 Object.defineProperty 方法实现订阅发布模式')

打印结果如下:

C03.png

小结

Dep 和 Watcher 互为订阅者

我们通过上文知道 Dep 类其实是一个消息代理或者叫事件总线,而 Watcher 则是一个订阅者,但我们前面也留了一个引子,说它们还有一层关系,就是互为订阅者。那么既然 DepWatcher 互为订阅者,也就是说它们其实也是一个发布者的角色。所以现实系统中的应用远远要比我们所学的所谓标准模式要复杂得多。

我们知道在 Vue2 中可以通过 Options 选项设置 watcher 来实现对响应式数据的监听,其实还可以通过 this.$watcher() 来实现对响应式数据的监听,使用方法都是一样的,唯一的不同就是 this.$watcher() 会返回一个函数,这个函数的作用就是停止对响应式数据的监听。

那么要实现停止对响应式数据的监听则需要知道那些 Dep 记录了当前的 Watcher,我们就需要通知那些 Dep 取消订阅当前的 Watcher。那么要实现这个功能,就需要 Watcher 也进行记录自己订阅了哪些 Dep,当取消对响应式数据的监听的时候,就从当前 Watcher 的订阅记录里去通知那些 Dep 取消自己的订阅。

比如说我们在 Vue2 当中有这么一个功能:

const unwatch = this.$watch(function(){
      return this.name + this.age + this.sex
    }, function(newValue, oldValue){
    console.log('新值为:', newValue)
    console.log('旧值为:', oldValue)
    if (newValue === 'cobyte') {
      // 停止监听
      unwatch()
    }
})

我们接下来去实现这个功能,也就是实现 Vue2 的 $watcher() 功能。以下是 Vue2 官网对 $watcher API 的一些参数和功能的介绍。

  • [vm.$watch( expOrFn, callback, options )]

  • 参数

    • {string | Function} expOrFn

    • {Function | Object} callback

    • {Object} [options]
      
      • {boolean} deep
      • {boolean} immediate
  • 返回值{Function} unwatch

  • 用法

    观察 Vue 实例上的一个表达式或者一个函数计算结果的变化。回调函数得到的参数为新值和旧值。表达式只接受简单的键路径。对于更复杂的表达式,用一个函数取代。

这个 $watcher 的功能从发布订阅模式的角度可以看成是,第一个参数是订阅者要做的事情,第二个参数是在做完事情后拿到结果再通过第二参数输出结果,而且是每次所依赖的响应式数据发生变化后都需要执行第二个参数函数,输出新的结果。

那么我们先实现下面的功能:

new Watcher(function() {
   return `要做的事情,获取文章:${weChatOfficialAccount.article}`
}, function(newValue, oldValue) {
   console.log(`新结果是:${newValue},旧结果是:${oldValue}`)
})

我们对 Watcher 类做以下修改:

class Watcher {
-    constructor(fn) {
+    constructor(fn, cb) {
+        this.cb = cb
       // 让每个订阅者所需要做的事情通过参数的形式传进来,这样更灵活,拓展性更强
       this.getter = fn
       // 初始化的时候直接读取触发订阅者收集,因为这样设计符合 web 应用的特性
-        this.get()
+        this.value = this.get()
   }
   get() {
+        let value
       // 通过 Dep.target 来设置当前的订阅者是谁
       Dep.target = this
+        value = this.getter()
       Dep.target = null
+        return value
   }
   // 接受发布者通知的更新方法
   update() {
     // 获取新值
-      this.getter()
+      const value = this.getter()
+      // 设置旧值
+      const oldValue = this.value
+      // 更新值
+      this.value = value
+      if (this.cb) {
+        // 因为是用户写的,有可能存在错误
+        try {
+          this.cb(value, oldValue)
+        } catch(err) {
+          throw err
+        }
+      }
   }
}

我们进行测试:

weChatOfficialAccount.article = '第一次更新'
weChatOfficialAccount.article = '第二次更新'

测试结果如下:

C04.png

我们看到正确打印了结果。

有了以上的基础功能,接下来我们就很容易实现 $watcher API,代码如下:

function $watcher(expOrFn, cb) {
    const watcher = new Watcher(expOrFn, cb)
}

我们知道 $watcher API 是有很多配置选项的,也就是第三个参数,比如立即执行回调就是通过第三个参数配置 immediatetrue 来实现的,下面我们也来实现它:

function $watcher(expOrFn, cb, options) {
    // 因为 options 有可能不存在,要做兼容处理
    options = options || {}
    const watcher = new Watcher(expOrFn, cb, options)
    // 如果 immediate 为 true 则立即执行回调函数 
    if (options.immediate) {
      try {
        cb(watcher.value)
      } catch (error) {
        throw new Error(error)
      }
    }
}

在 Vue2 中 Watcher 实例是分为系统的 Watcher 和用户的 Watcher 的,像我们在组件里面通过配置 watcher,就是用户的 Watcher,怎么体现区分呢?我们下面来设置,其实也很简单:

function $watcher(expOrFn, cb, options) {
    // 因为 options 有可能不存在,要做兼容处理
    options = options || {}
+    // 设置用户级的 Watcher
+    options.user = true
    const watcher = new Watcher(expOrFn, cb, options)
    // 如果 immediate 为 true 则立即执行回调函数 
    if (options.immediate) {
      try {
        cb(watcher.value)
      } catch (error) {
        throw new Error(error)
      }
    }
}

接着修改 Watcher 类:

class Watcher {
-    constructor(fn, cb){
+    constructor(fn, cb, options) {
+      if (options) {
+          this.user = !!options.user
+      } else {
+          this.user = false
+      }
      // 省略...
    }
    update() {
      // 省略...
-      if (this.cb) {
+      if (this.user) {
        // 省略...
      }
    }
}

修改也很简单,从上面的修改可以看得出,只有用户级的 Watcher 才会在更新的时候执行回调函数。

接下来我们测试立即回调功能:

$watcher(function() {
    return `要做的事情,获取文章:${weChatOfficialAccount.article}`
}, function(newValue, oldValue) {
    console.log(`新结果是:${newValue},旧结果是:${oldValue}`)
}, { immediate: true })

我们可以看到初始化的时候,就立即执行回调函数了。

C05.png

立即执行,旧值为 undefined,符合如期。

实现了上面的基础部分的功能,我们就可以实现重要的功能了,取消订阅。

function $watcher(expOrFn, cb, options) {
     // 省略...
+    // 返回一个可以取消订阅的函数
+    return function unwatchFn () {
+      watcher.teardown()
+    }
}

我们这里通过 Watcher 实例对象的 treardown 方法去取消订阅,其实是要去通知那些记录了该 Watcher 的 Dep 去删除其记录中的该 Watcher。那么我们怎么知道哪些 Dep 记录该 Watcher 呢?所以我们就需要在 Watcher 中记录其订阅了的 Dep。从发布订阅模式的角度来说就是 Dep 要对 Watcher 进行订阅,Dep 是订阅者,Watcher 是发布者,而我们之前是 Watcher 对相关的属性的 Dep 进行订阅,Watcher 是订阅者,相关属性的 Dep 是发布者。

我们首先对 Watcher 实现发布订阅的功能,代码迭代如下:

class Watcher {
    constructor(fn, cb, options) {
        // 省略...
+        this.deps = []
        // 省略...
    }
+    addDep(dep) {
+        this.deps.push(dep)
+    }
+    // 取消订阅
+    teardown() {
+        let i = this.deps.length
+        while (i--) {
+            this.deps[i].removeSub(this)
+        }
+    }
}

我们这里的取消订阅是通过 Watcher 所记录的 Dep 实例对象去执行 Dep 上的 removeSub 方法去把自己删除,这样将来 Dep 触发更新的时候,就通知不了自己了,也就执行不了 update 方法了。

接下来我们实现 Dep 类上的 removeSub 方法,迭代代码如下:

class Dep {
  // 省略..

+  // 取消订阅
+  removeSub (sub) {
+    // 找到需要取消的订阅者
+    const index = this.subs.indexOf(sub)
+    if (index > -1) {
+        // 删除订阅者
+        this.subs.splice(index, 1)
+    }
+  }
  // 省略...
}

我们可以看到 Dep 类中的取消订阅功能,跟普通发布订阅中的取消订阅功能是一样的。

我们前面已经实现了 DepWatcher 的订阅,那么接下来就是 Watcher 怎么对 Dep 进行订阅了。我们知道不管是什么数据类型都是在 getter 中进行依赖收集的,所以要实现 WatcherDep 的订阅,也要从 getter 开始。我们在 getter 里面可以通过 Dep.target 获取到当前的 Watcher,也可以获取到当前属性对应的 Dep 实例对象,那么就可以互相添加订阅者了。

代码迭代如下:

function defineReactive(data, key) {
    // 省略...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (Dep.target){
                dep.addSub(Dep.target)
+                // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
+                Dep.target.addDep(dep)
                if (childOb){
                    childOb.dep.addSub(Dep.target)
+                    // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
+                    Dep.target.addDep(childOb.dep)
                }
            }
            return val
        },
        // 省略...
    })
}

接下来我们就可以进行测试了:

const unwatch = $watcher(function(){
    return weChatOfficialAccount.article + weChatOfficialAccount.author
}, function(newValue, oldValue) {
    console.log('新值为:', newValue)
    console.log('旧值为:', oldValue)
    if (newValue === 'cobyte') {
      // 停止监听
      unwatch()
    }
})

console.log('会打印新值旧值')
weChatOfficialAccount.article = 'co'
console.log('会打印新值旧值')
weChatOfficialAccount.author = 'byte'
console.log('不会打印新值旧值')
weChatOfficialAccount.article = 'cobyte'

我们发现如期打印了我们期待的结果:

C06.png

我们上面在 getter 中对 Dep 和 Watcher 进行相互订阅的操作,还可以进行优化一下,让代码更优雅。

class Dep {
    // 省略...
+  // 通过 depend 方法进行依赖收集
+  depend() {
+    if (Dep.target) {
+      // 在 Dep 中进行 Watcher 
+      Dep.target.addDep(this)
+    }
+  }
  // 省略...
}

接着在 Watcher 中调用 Dep 的方法添加自己

class Watcher {
    // 省略...
    addDep(dep) {
        this.deps.push(dep)
+        // 调用 Dep 实例的添加订阅方法添加自己
+        dep.addSub(this)
    }
    // 省略...
}

接着我们修改 getter 中代码:

function defineReactive(data, key) {
    // 省略...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (Dep.target){
-                dep.addSub(Dep.target)
-                // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
-                Dep.target.addDep(dep)
+                dep.depend()
                if (childOb){
-                    childOb.dep.addSub(Dep.target)
-                    // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
-                    Dep.target.addDep(childOb.dep)
+                    childOb.dep.depend()
                }
            }
            return val
        },
        // 省略...
    })
}

经过我们的重构,getter 中的依赖收集相关代码变得清晰多了。

总结

从发布订阅模式的角度来理解 Vue 的数据响应式原理,就是发布订阅模式的具体运用的过程。

上述文章写于:2023 年,由于个人原因今年 2026 年发布。

我是程序员Cobyte,现在已转向研究 AI Agent,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

昨天以前首页

如何使用飞书机器人连接本地 AI Agent

作者 Cobyte
2026年4月7日 08:30

1. 前言

我们在上一篇文章中讲解了如何使用微信 ClawBot 连接本地 AI Agent,这一篇我们讲解如何使用飞书机器人连接本地 AI Agent。

首先我们需要创建一个飞书应用。

2. 创建飞书应用

2.1 打开飞书开放平台

访问飞书开发平台 open.feishu.cn/app 并登录。

2.2 创建应用

1.点击 创建企业自建应用

image.png

2.填写应用名称和描述

image.png

3.选择应用图标

2.3 复制凭证

在“凭证与基础信息”中复制:

  • App ID
  • App Secret

2.4 启用机器人能力

image.png

2.5 配置权限

权限管理中点击批量导入/导出权限按钮

image.png

然后复制粘贴以下权限:

{
  "scopes": {
    "tenant": [
      "aily:file:read",
      "aily:file:write",
      "application:application.app_message_stats.overview:readonly",
      "application:application:self_manage",
      "application:bot.menu:write",
      "cardkit:card:read",
      "cardkit:card:write",
      "contact:user.employee_id:readonly",
      "corehr:file:download",
      "event:ip_list",
      "im:chat.access_event.bot_p2p_chat:read",
      "im:chat.members:bot_access",
      "im:message",
      "im:message.group_at_msg:readonly",
      "im:message.p2p_msg:readonly",
      "im:message:readonly",
      "im:message:send_as_bot",
      "im:resource"
    ],
    "user": ["aily:file:read", "aily:file:write", "im:chat.access_event.bot_p2p_chat:read"]
  }
}

最后点击确定并申请开通。

2.6 配置事件订阅

事件与回调中进行事件配置,选择订阅方式为长连接,这样我们就可以在本地电脑也可以连接飞书机器人了(本质是WebSocket)。

image.png

接着添加接收消息事件:im.message.receive_v1

image.png

加密策略设置,这个可选项。

image.png

2.7 发布应用

接着我们在版本管理与发布中创建一个版本并发布。

image.png

一般我们在测试的时候,我们自己的飞书账号就是管理员,上述版本申请发布后会在手机飞书上收到一条审核消息,我们按提示进行操作审核即可。

这时我们就可以在手机飞书上搜索我们刚刚创建的“AI 机器人”,然后点击进去就可以对话了,但目前我们还没通过代码连接它,所以还对不了话。

01.jpg

值得注意的是:上述飞书机器人配置也是OpenClaw的官方飞书机器人配置方案。

3. 实现飞书连接本地 Agent

3.1 构建 API Client

根据飞书开发平台开发文档的 《Python SDK 指南》,我们在通过 SDK 调用飞书开放接口之前,需要先在代码中创建一个 API Client,用来指定当前使用的应用、日志级别、HTTP 请求超时时间等基本信息。

我们的实现如下:

from dataclasses import dataclass
import lark_oapi as lark

@dataclass
class FeishuConfig:
    """飞书渠道配置"""
    app_id: str = ""
    app_secret: str = ""
    encrypt_key: str = "" # 可选字段,空字符串表示不使用加密
    verification_token: str = "" # 可选字段,空字符串表示不验证消息来源

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        self.config = config
        # 构建 API Client
        self._client = lark.Client.builder() \
            .app_id(config.app_id) \
            .app_secret(config.app_secret) \
            .build()

首先我们实现一个配置类来统一管理上述我们说到的 App IDApp Secret 以及在配置事件订阅中讲到的加密策略设置的 Encrypt KeyVerification Token

后续我们就可以通过依赖注入模式调用 FeishuChannel 类了:

# 使用
config = FeishuConfig(app_id="...", app_secret="...")
channel = FeishuChannel(config=config)  # 配置通过构造函数注入

这样实现了配置与实现分离,将来支持热更新配置。

3.2 长连接飞书客户端

根据飞书开发平台开发文档的 《Python SDK 指南》的处理事件章节,我们的实现如下:

from loguru import logger
class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    def start(self) -> None:
        # 构建事件处理器
        builder = lark.EventDispatcherHandler.builder(
            self.config.encrypt_key, 
            self.config.verification_token
        )
        # 注册接收消息事件处理函数 im.message.receive_v1
        handler = builder.register_p2_im_message_receive_v1(self._on_message).build()
        # 初始化长连接客户端并传入事件处理器   
        ws_client = lark.ws.Client(
            self.config.app_id, 
            self.config.app_secret, 
            event_handler=handler
        )
        # start() 方法会阻塞主线程,持续运行,直到手动关闭
        ws_client.start()

        logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""

我们首先构建事件处理器,同时如果你在开发者后台的应用详情中,配置了 事件与回调 > 加密策略 页面内的加密信息(Encrypt Key 和 Verification Token)。则必须将加密信息的值传递到 EventDispatcherHandler.builder 方法的参数中。

接着我们注册接收消息事件处理函数。也就是我们在前面配置事件订阅小节所讲的添加的接收消息事件:im.message.receive_v1

最后通过 lark.ws.Client() 初始化长连接客户端,必填参数为应用的 APP_ID 和 APP_SECRET。我们在上面讲述创建飞书应用的时候已经阐述过 APP_ID 和 APP_SECRET 了。

值得注意的是:飞书的长连接客户端 (ws.Client) 只能用来"接收"事件,不能用来"发送"消息!如果要主动发消息或回复消息,必须使用普通的 Open API 客户端 (lark.Client)。

3.3 接收消息事件处理函数

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    def start(self) -> None:
        # 省略...

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""
        msg = data.event.message
        # 只处理用户发送的纯文本消息
        if data.event.sender.sender_type == "bot" or msg.message_type != "text":
            return

        content = json.loads(msg.content).get("text", "")
        if not content:
            return
        logger.info(f"收到{msg.chat_id}消息: {content}")
        # 发送消息
        self._process_and_reply(msg.chat_id, content)

我们这里先只处理用户发送的纯文本消息。

3.4 发送消息到 AI Agent 处理并进行回复

前面说了飞书的长连接客户端 (ws.Client) 只能用来"接收"事件,不能用来"发送"消息!如果要主动发消息或回复消息,必须使用普通的 Open API 客户端 (lark.Client)。

所以根据飞书开发平台开发文档的 《Python SDK 指南》的服务器API章节的《发送消息》,我们的实现如下:

import agent_loop as agent  # 导入本地 AI Agent 逻辑模块

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...
    def start(self) -> None:
        # 省略...
    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        # 省略...
    def _process_and_reply(self, chat_id: str, content: str) -> None:
        """调用本地 AI Agent 获取结果并调用飞书 API 发送"""
        try:
            history = [
                {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个 AI 助手")},
                {"role": "user", "content": content}
            ]
            
            # 1. 运行 AI 思考逻辑
            reply_text = agent.agent_loop(history)
            if not reply_text:
                return

            # 2. 发送回复
            receive_id_type = "chat_id" if chat_id.startswith("oc_") else "open_id"
            req = CreateMessageRequest.builder().receive_id_type(receive_id_type).request_body(
                CreateMessageRequestBody.builder()
                .receive_id(chat_id)
                .msg_type("text")
                .content(json.dumps({"text": reply_text}))
                .build()
            ).build()
            
            resp = self._client.im.v1.message.create(req)
            if resp.success():
                logger.info(f"➡️ 成功回复消息到: {chat_id}")
            else:
                logger.error(f"❌ 回复失败: {resp.msg}")
                
        except Exception as e:
            logger.error(f"❌ 处理异常: {e}")

_process_and_reply 方法中,我们首先构造与 AI Agent 的对话历史,调用 agent.agent_loop(history) 运行我们前面实现的 AI Agent 进行思考获取回复文本。这部分我们应该比较熟悉了。

根据飞书开发平台开发文档的 《发送消息》指南,发送消息时需要指定 receive_id_type 参数。飞书的 ID 有两种类型:

  • chat_id:以 oc_ 开头的群聊 ID
  • open_id:用户个人 Open ID

我们通过简单的字符串前缀判断 chat_id.startswith("oc_") 来区分这两种类型。

构建消息请求时:

  1. 首先创建 CreateMessageRequest 的建造者
  2. 设置 receive_id_type(接收者ID类型)
  3. 通过 request_body() 设置请求体,其中又使用 CreateMessageRequestBody 的建造者设置具体参数
  4. 调用 .build() 方法最终构建请求对象

这里需要注意的是,飞书消息的 content 字段必须是 JSON 字符串格式,所以我们需要使用 json.dumps({"text": reply_text}) 将回复文本包装成 JSON。

最后调用 self._client.im.v1.message.create(req) 发送消息到飞书服务器,并根据响应结果记录成功或失败日志。

现在我们已经完成了飞书渠道的核心实现,接下来编写启动脚本。

4. 启动飞书机器人

我们创建一个独立的启动文件 test_feishu.py

from loguru import logger
from feishu import FeishuChannel, FeishuConfig

def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxxx",         # 替换为真实的 App ID
        app_secret="xxxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
    
    # 2. 初始化频道并启动长连接
    channel = FeishuChannel(config=config)
    
    logger.info("正在启动飞书机器人长连接...")
    
    # 3. 启动并保持运行
    try:
        channel.start()
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    main()

启动脚本的实现非常简单直接:

第一步:配置飞书机器人凭证

我们需要创建 FeishuConfig 实例,填入在飞书开放平台创建应用时获取的凭证:

  • app_id:应用的唯一标识,以 cli_ 开头
  • app_secret:应用的密钥,用于 API 身份验证
  • encrypt_key:可选,如果在开发者后台配置了加密策略则需要填入
  • verification_token:可选,用于验证消息来源

第二步:初始化飞书频道

使用配置对象创建 FeishuChannel 实例。这里体现了依赖注入模式的优势——我们可以轻松更换不同的配置来源。

第三步:启动并保持运行

调用 channel.start() 启动飞书机器人的 WebSocket 长连接。这个方法会阻塞主线程,持续运行直到收到退出信号。

我们使用 try...except KeyboardInterrupt 捕获用户按 Ctrl+C 的中断信号,实现优雅退出。当用户想要停止机器人时,只需在终端中按 Ctrl+C,程序会记录退出日志然后正常结束。

运行机器人

在启动之前,先安装相关依赖。

requirements.txt 依赖如下:

python-dotenv==1.0.1
openai==2.24.0
loguru==0.7.3
# 飞书官方 Python SDK
lark-oapi>=1.2.0

执行 pip install -r requirements.txt 安装依赖。

依赖安装完毕后,在终端中执行:

python test_feishu.py

你会看到以下输出:

2026-04-05 00:59:23.456 | INFO     | 正在启动飞书机器人长连接...

此时机器人已经启动并开始监听飞书消息了。你可以在飞书中找到你的机器人应用,发送消息进行测试。

5. 启用异步事件循环线程

5.1 Node.js 和 Python 的主线程对比

我们知道 Node.js 默认的主线程就是事件循环线程,而 Python 的主线程默认是没有事件循环的。所以我们在上面也提到了调用 channel.start() 启动飞书机器人的 WebSocket 长连接会阻塞主线程。因为飞书 Python SDK 的 lark.ws.Client 采用同步阻塞设计,其 start() 方法会启动自己的事件循环并持续运行。主要体现在 FeishuChannel 类的 start 方法中以下代码:

ws_client.start()
logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

我们发现执行 ws_client.start() 代码后日志不打印了,因为主线程被阻塞了。

而在 Node.js 中以下的代码则不会被阻塞:

const WebSocket = require('ws');

function run_ws() {
  const ws = new WebSocket('ws://example.com/socket');

  ws.on('open', () => {
    console.log('[Worker] WebSocket connected');
  });

  ws.on('message', (data) => {
    console.log('[Worker] Received:', data.toString());
  });
}

// 同步执行
run_ws();
console.log('主线程没有被阻塞');

我们发现直接在 Node.js 的主线程中启动 WebSocket 连接服务是不会阻塞的,因为 Node.js 中的 WebSocket 本身是基于 Node.js 的事件循环来实现的。

5.2 通过 asyncio 启动异步事件循环

通过上一小节我们知道 Python 默认的主线程是没有事件循环的,需要显式创建运行。目前在 Python 3.7+ 推荐通过 asyncio 来启动异步事件循环。

实现很简单:

async def main():
    # 省略...
    try:
        await channel.start()
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

上述代码的转换过程等于:

主线程 (执行Python代码)
    ↓
调用 asyncio.run()
    ↓
主线程内部启动事件循环
    ↓
主线程 = 异步事件循环线程

但现在运行会报错,因为虽然主线程启动了异步事件循环线程,但飞书 SDK 的 lark.ws.Client 内部也使用了异步事件循环,当我们在已有的 asyncio 事件循环中调用它时,会产生冲突。

所以我们需要在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突。修改如下:

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    async def start(self) -> None:
        # 省略...
        def run_ws():
            # 为 WebSocket 客户端所在线程设置专属的事件循环,避免报错
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            import lark_oapi.ws.client
            lark_oapi.ws.client.loop = loop
            # 初始化长连接客户端
            ws_client = lark.ws.Client(
                self.config.app_id, 
                self.config.app_secret, 
                event_handler=handler
            )
            ws_client.start()
        # 在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突
        threading.Thread(target=run_ws, daemon=True).start()
        logger.info("✅ 飞书极简版机器人已启动 (WebSocket)")

        # 保持主协程存活
        while True:
            await asyncio.sleep(1)

主要修改了 FeishuChannel 类的 start 方法,通过 threading.Thread 在独立线程中运行飞书的 WebSocket 客户端,避免与主线程的事件循环冲突,并为飞书 WebSocket 客户端所在线程设置专属的事件循环,避免报错。

5.3 防止 AI 处理阻塞消息接收

无论是 Python 还是 Node.js,都需要防止耗时的 AI 处理阻塞消息接收。所以我们需要修改 _on_message 方法:

class FeishuChannel:
    def __init__(self, config: FeishuConfig):
        # 省略...

    async def start(self) -> None:
        # 省略...

    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        # 省略...

        # 启动独立线程处理 AI 逻辑和回复,防止阻塞 WebSocket 接收循环
        threading.Thread(
            target=self._process_and_reply, 
            args=(msg.chat_id, content)
        ).start()

我们看到在上述代码中启动一个独立线程处理 AI 逻辑和回复,防止阻塞 WebSocket 接收消息循环。

这时我们再次启动代码,我们可以看到 start 方法中的日志同步打印了:

2026-04-05 01:20:01.456 | INFO     | 正在启动飞书机器人长连接...
2026-04-05 01:20:01.457 | INFO     |  飞书极简版机器人已启动 (WebSocket)

6. 总结

至此,我们已经完整实现了通过飞书机器人连接本地 AI Agent。通过上述实践,我们不仅掌握了具体的实现步骤,更重要的是理解了背后的技术原理:

  • Node.js vs Python 事件循环:Node.js 主线程本身就是事件循环线程,天生异步;Python 则需要显式创建事件循环,主线程默认同步执行
  • Python 异步架构:深入理解了 asyncio 事件循环与多线程的协作模式,以及如何通过线程隔离解决 SDK 的冲突

记住:技术的最佳学习方式就是动手实践。现在就去飞书开放平台创建你的应用,启动这个机器人,感受 AI 与即时通讯结合的魅力吧!

我是程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

1.基于依赖追踪和触发的响应式系统的本质

作者 Cobyte
2026年4月1日 19:57

前言

Vue1、Vue3、SolidJS、Mobx 它们的数据响应式基本原理都是一样的,具体区别只是设计理念和实现方式不一样。按以前读书时代考试做题一样,它们是同一类型的题目,都是基于依赖收集和触发的运行时的数据响应式,如果说你只会解答其中一道题,其他的题却不会解答,则说明你并没有真正彻底掌握这一类题。

为什么标题说“基于依赖追踪的响应式系统”,因为在前端响应式的框架有很多,但他们的实现原理却各有不同。 在前端一般谈到响应式框架,可能大家都会不约而同地联想到 Vue,除了 Vue 之外,也许还有人会想到 React 以及 Svelte、SolidJS。他们都有一个共同点,都是通过数据驱动视图。他们在实现方式上又互相有一些相似之处,其中 Vue 和 React 都采用了虚拟DOM技术,Vue 和 SolidJS 的数据响应式实现则都是采用了依赖追踪的方式,所以在数据响应式的实现方面 Vue 和 SolidJS 最相似,而 Svelte 的实现方式则跟 Vue 和 React 都不一样,Svelte 是基于编译响应式。当然 Vue 和 React 的具体实现技术也是不一样的,但它们在宏观层面则是一样,都是通过数据驱动视图,当数据发生变化时,视图会重新渲染,这种机制使得开发者只需要关注数据的变化,而不需要手动操作 DOM。Vue 通过数据劫持,使得操作数据需要额外的 API ,系统变能感知数据的变化,而 React 和 SolidJS 则需要手动调用 API 去触发数据变化。

手动操作 DOM 的上古时代

例如我们现在有一个这样的需求,有一个按钮 <button>0</button>,当我们点击按钮的时候,按钮中的文本就进行加 1。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title>手动操作DOM</title>
  </head>
  <body>
    <button id="btn"></button>
    <script>
        let count = 0
        const btnEl = document.getElementById('btn')
        btnEl.textContent = count
        btnEl.addEventListener('click', function () {
            count++
            btnEl.textContent = count
        })
    </script>
  <body>
</html>

上述的 button 按钮是通过 HTML 进行渲染的,我们还可以通过 JavaScript API 进行创建。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title>手动操作DOM</title>
  </head>
  <body>
-    <button id="btn"></button>
    <script>
        let count = 0
-        const btnEl = document.getElementById('button')
+        const btnEl = document.createElement('button')
+        const textNode = document.createTextNode(count)
+        btnEl.appendChild(textNode)
        btnEl.addEventListener('click', function () {
            count++
            btnEl.textContent = count
        })
+        document.body.appendChild(btnEl)
    </script>
  <body>
</html>

上述这种方式,在项目应用非常庞大的时候,开发效率是非常低下的,同时维护成本却又非常高的,所以就出现了像 React、Vue 这种通过数据进行驱动视图的前端框架。

通过数据驱动视图

虽然 Vue 和 React 在具体的实现技术方案上差异是非常大的,但在宏观层面它们则是一样的,都是通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上,从而提高性能。

不管是 Vue 还是 React 的虚拟DOM 本质上都是一个对象,上面记录着一些真实 DOM 的信息,比如 type、props、children,我们这里简单模拟一下,并通过虚拟DOM 和 Diff 算法来改写上面的例子。

首先我们通过一个 createElement 的函数来创建一个节点的虚拟DOM对象,这里跟 React 的对齐,children 节点在 props 中,Vue 的 children 是跟 props 同级的,这些差别对我们进行宏观研究不重要。

function createElement(type, props) {
    return {
        type,
        props
    }
}

接着我们创建一个函数组件 App。

count = 0
function App (){
    return createElement('button', {
        onClick: () => {
            count ++
            setCount(count)
        },
        children: count
    })
}

接着我们通过以下方式把创建的虚拟DOM 挂载到根节点上。

render(App(), document.getElementById('app'))

接着我们实现 render 方法

let oldVnode
function render(vNode, container) {
    if (!oldVnode) {
        oldVnode = vNode
        const el = document.createElement(vNode.type)
        // 保存真实DOM 到虚拟DOM 的 el 属性上,将来更新的时候,不用重新创建,从而达到提高性能效果
        oldVnode.el = el
        const textNode =  document.createTextNode(vNode.props.children)
        el.appendChild(textNode)
        // 绑定虚拟DOM 上的事件
        el.addEventListener('click', vNode.props.onClick)
        container.appendChild(el)
    } else if(oldVnode.props.children !== vNode.props.children) {
        oldVnode.el.textContent = vNode.props.children
    }
}

我们在这里非常简单且宏观的实现了新老虚拟DOM 的对比,当不存在旧虚拟DOM 则是挂载阶段,创建真实DOM,并保存到虚拟DOM 的 el 属性上,将来更新的时候,不用重新创建,从而达到提高性能效果。在进行新老虚拟DOM 的时候,我们这里只比较 children 一个属性,如果新老 children 不一样就把新的虚拟DOM 上的 children 的值更新到对应的真实DOM 上。

我们在上面的 App 函数中的 props 中的点击事件函数中有一个 setCount 的方法还没实现,它的实现可以抽象成如下:

function setCount(val) {
    count = val
    render(App(), document.getElementById('app'))
}

从上述代码可以看到 setCount 的实现很简单,这个方法就是在点击之后进行更新数据 count 的,并且在更新数据 count 的同时重新渲染视图,把新的 count 值显示到页面上。

以下是测试效果:

00.gif

我们在上述例子中通过极简的代码,从宏观层面阐明了 React 的响应式原理,通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上。

其实上述这段 React 的响应式原理放在 Vue 的响应式原理也是成立的,最大的不同则是 React 更新数据需要通过 setCount 函数,也就是所谓手动触发,而 Vue 则是自动触发的。那么下面我们来看看 Vue 的响应式是怎么实现的。

基于依赖追踪的响应式

我们知道 Vue1 的响应式数据是通过 Object.defineProperty 来实现的。基于 Object.defineProperty 来实现需要初始化对对象的每一个熟悉进行劫持监听。

例如我们有这么这个对象 const data = { count:0 },那么我们需要进行以下操作:

const data = { count: 0 }
Object.keys(data).forEach(key => {
   Object.defineProperty(data, key, {
    get() {
        return data[key]
    },
    set(val) {
        data[key] = val
    }
   }) 
})

上述这个写法会造成内存栈溢出,主要是因为在 Object.defineProperty 的 getter 中读取 data[key] 会触发 getter 循环读取,从而造成死循环。

00.png

我们可以把 getter 中 data[key] 的取值放在进行 Object.defineProperty 监听之前。

const data = { count: 0 }
Object.keys(data).forEach(key => {
+   const val = data[key]
   Object.defineProperty(data, key, {
    get() {
+        return val
    },
    set(val) {
        data[key] = val
    }
   }) 
})

但这样 val 会被循环取值进行了覆盖,没办法正确读取每个 key 的值,为了可以读取每个 key 的值,我们可以通过闭包的形式把每个 key 的值缓存下来。

const data = { count: 0 }
Object.keys(data).forEach(key => {
   defineReactive(data, key, data[key]) 
})

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            return val
        },
        set(newVal) {
            val = newVal
        }
    })
}

接下来我们要做的就是在 getter 中进行依赖收集,然后在 setter 中进行依赖触发,这本质上就是一个订阅发布模式。

const data = { count: 0 }
+ // 声明一个依赖存储中心
+ const subscribers = new Set()
+ // 需要收集的依赖,在 Vue1 叫 wachter,Vue3 中叫 effect,本质上就是一个订阅者,关于发布订阅模式,我们后续再详细介绍
+ let activeEffect
Object.keys(data).forEach(key => {
   defineReactive(data, key, data[key]) 
})

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
+            // 存在依赖就把依赖收集到依赖存储中心
+            if(activeEffect) subscribers.add(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
+            // 值更新了,就需要去把依赖存储中心中的订阅者全部重新执行一遍
+            subscribers.forEach(sub => sub())
        }
    })
}

我们在上述代码中通过一个全局变量 subscribers 在响应式数据的 getter 把依赖收集到 subscribers 中, 在 setter 中则把 subscribers 中收集到的依赖进行循环遍历重新执行一遍,从而实现了依赖追踪和触发。

那么现在我们有了响应式数据 data 之后,我们就可以对我们前面的例子中的 App 函数中的 count 数据进行更改了,我们之前实现的是 React 的方式,我们现在要把它改成 Vue 的方式。

- count = 0
function App (){
    return createElement('button', {
        onClick: () => {
-            count ++
-            setCount(count)
+            data.count ++
        },
-        children: count
+        children: data.count
    })
}

此外渲染函数的执行方式也需要改成一个副作用函数,通过副作用函数进行调用执行。

    activeEffect = () => {
        render(App(), document.getElementById('app'))
    }

    activeEffect()

    activeEffect = null

我们把一个副作用函数赋值给了变量 activeEffect,然后再执行 activeEffect,那么在执行 activeEffect 函数的时候就会去执行 rander 函数,并通过 App 函数生成虚拟DOM,在 App 函数中对虚拟DOM 的 children 属性赋值的时候是通过读取响应式数据 data 中的 count 值,那么这时就会触发 count 属性的 getter,然后就会在 getter 中进行依赖收集,在 getter 中很明显这个时候 activeEffect 是有值的,所以会进行依赖收集。当点击的时候,就会触发 data.count ++ 的执行,这时就会触发 count 属性 setter,然后就会在 setter 中进行依赖触发。

以下是测试效果:

00.gif

至此我们把 Vue 的数据响应式也通过最少的代码量阐明了,以上的 Vue 的响应式原理估计很多同学都非常清楚,因为这是面试被问几率非常高的题目。我在这里重复讲解,是为了对比 React 和 Vue 响应式原理的差别,总的来说,React 和 Vue 的响应式原理在宏观层面是有非常大的相同之处的,都是通过操作数据来驱动视图,开发者只需要关注数据的变化,而不需要手动操作 DOM,同时它们都是通过虚拟DOM 和 Diff 算法进行实现响应式的,当组件的状态发生改变时,Vue 和 React 都会重新生成新的虚拟 DOM,并通过 Diff 算法比较新老虚拟 DOM 的差异,然后只更新需要更新的部分到真实 DOM 上。在宏观层面 React 和 Vue 响应式原理最大的不同则是数据触发方式的不同,React 是数据变更后需要开发者通过手动调用 React 提供的 API 进行触发视图的更新,而 Vue 则是自动触发的,因为 Vue 的状态数据是响应式的,而 React 的状态数据不是响应式的。

我们一般在很多的文章中都只讲了 Vue1 的数据响应式原理是通过 Object.defineProperty 来实现的,那么是否只能通过 Object.defineProperty 来实现呢?很明显不是,我们上述例子中的 data,我们现在如果想给它新增属性 data.name,那么 Object.defineProperty 是无法进行监听追踪的,所以我们通过一个工具来对 data 也进行监听。

function observe (data) {
    // 给对象 data 添加一个属于 data 对象的依赖存储中心
    data.__ob__ = new Set()
    Object.keys(data).forEach(key => {
        const value = data[key]
        defineReactive(data, key, value) 
    })
}

那么在 getter 中要对对象的依赖也进行收集

function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            // 存在依赖就把依赖收集到依赖存储中心
            if(activeEffect) {
                subscribers.add(activeEffect)
                // 如果读取的值是对象,那么还要给这个对象进行依赖收集,并且新的对象也要通过 observe 进行监听
                if(Object.prototype.toString.call(val) === '[object Object]') {
                    observe(val)
                    val.__ob__.add(activeEffect)
                }
            } 
            return val
        },
        set(newVal) {
            val = newVal
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
        }
    })
}

然后我们要专门通过一个单独的 API 来给响应式对象添加属性,并且在添加属性之后进行依赖触发。

function set(target, key, val) {
    target[key] = val
    // 新添加的属性也需要通过 Object.defineProperty 进行监听
    defineReactive(target, key, val)
    const ob = target.__ob__
    // 进行对象的依赖触发
    ob.forEach(sub => sub())
}

接着我们把 App 函数进行以下修改

// 修改数据结构
const data = { data: { count: 0 } }
// 通过 observe 处理
observe(data)
function App (){
    return createElement('button', {
        onClick: () => {
            set(data.data, 'count1', 2)
        },
        children: JSON.stringify(data.data)
    })
}

然后我们重新运行代码结果如下:

01.gif

我们看到可以正常运行,但对象中的私有属性 __ob__ 也显示出来了,我们希望它不要被枚举出来,我们可以通过 Object.defineProperty 对它进行以下设置。

function observe (data) {
    // 给对象 data 添加一个属于 data 对象的订阅者中心
-    data.__ob__ = new Set()
+    Object.defineProperty(data, '__ob__', {
+        value: new Set(), // 属性的值,默认为 undefined
+        enumerable: false, // 属性是否可枚举,默认为 false
+        writable: true, // 值是否可写,默认为 false
+        configurable: true // 属性是否可配置,默认为 false
+    })
    Object.keys(data).forEach(key => {
        const value = data[key]
        defineReactive(data, key, value) 
    })
}

修改后再进行测试,我们可以看到 __ob__ 属性不再出现了。

02.gif

可以通过 Object.defineProperty 对数组进行监听,但监听不了 push、pop、shift 等对数组进行操作的方法,所以我们需要对数组的操作方法进行重写,重写的方法就是覆盖数组数据上的原型对象 __proto__

function observe (data) {
// 省略 ...
+   if (Array.isArray(value)) {
+      // 如果是数组则重新数组上的原型
+      value.__proto__ = {
+          join(val) {
+             // 通过原生数组上方法进行调用
+             return Array.prototype.join.call(value, val)
+          },
+          push(val) {
+             // 通过原生数组上的方法进行调用
+             Array.prototype.push.call(value, val)
+             subscribers.forEach(sub => sub())
+          }
+      }
+   } else {
        Object.keys(data).forEach(key => {
            const value = data[key]
            defineReactive(data, key, value) 
        })
+    }
}

我们这里只测试 join 和 push 方法,而 join 方法没有更改到数据,所以是不用进行依赖触发的。

然后我们对 App 应用也进行修改一下,以便测试数组响应式数据

// 数据
const data = ['cobyte']
// 通过 bserver 处理
observe(data)
function App (){
    return createElement('button', {
        onclick: () => {
            data.push('=')
        },
        children: data.join('-')
    })
}

测试结果如下:

03.gif

小结

至此,我们通过手写已经基本实现了 Vue1 的数据响应式原理,我们可以通过对 Vue2 数据响应式原理的分析进行一个宏观总结。我们需要在实践中总结规律,然后又通过规律更好地指导实践

首先我们都知道 Vue1 的数据响应式原理是通过 Object.defineProperty 实现的,通过 Object.defineProperty 可以监听一个对象的属性的读取(getter)和修改(setter),这样就可以在 getter 的时候进行依赖收集,在 setter 的时候进行依赖触发。但 Vue2 不单单只是通过 Object.defineProperty 实现数据响应式的,因为只有被 Object.defineProperty 初始化了的属性才可以进行监听,而当一个对象新增一个属性时,则监听不了。这时我们需要通过额外的手段来实现对象新增属性时的监听,具体方案就是通过给对象新增一个私有的属性 __ob__,去记录属于该对象的依赖,当该对象新增属性时则触发该对象的依赖重新执行。同时 Object.defineProperty 也监听不了数组的原生方法,例如:push、pop、shift、unshift、splice、sort、reverse,我们观察一下这些数组方法发现都有一个共同特点,就是他们都会修改数组,使数组数据发生变化,那么根据数据响应式的原理,数据发生了改变就需要进行依赖触发,那么我们需要对响应式数据类型为数组的数据进行重写它们的原型,这样我们就可以在响应式数组通过 push、pop、shift、unshift、splice、sort、reverse 方法修改数组的时候进行依赖触发了。

我们可以总结出,不管是通过 Object.defineProperty 进行监听对象属性还是通过给对象添加私有属性 __ob__,去记录该对象的依赖,还是重写数组的原型方法,目的都只有一个:进行数据的依赖追踪和触发。

我们还可以进一步进行总结规律:这种基于依赖追踪的响应式系统,并不是某一种技术,而是一种模式。核心只有一个,就是在数据读取的时候进行依赖收集,在数据更改的时候进行依赖触发。

基于这种指导思想,我们就可以很好去实践 Vue2 的数据响应式原理了。

Vue3 只是通过 Proxy 实现数据响应式吗

Vue3 是通过 Proxy 对数据实现 getter/setter 代理,从而实现响应式数据,然后在副作用函数中读取响应式数据的时候,就会触发 Proxy 的 getter,在 getter 里面把对当前的副作用函数保存起来,将来对应响应式数据发生更改的话,则把之前保存起来的副作用函数取出来执行。这个过程跟 Vue2 是一样的,只是实现细节不一样。

实现起来也非常简单:

function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && subscribers.add(activeEffect)
            return Reflect.get(target, key) 
        },
        set(target, key, val) {
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
            return result
        }
    })
}

我们再把 App 函数进行修改:

const data = reactive({count: 0})
function App (){
    return createElement('button', {
        onClick: () => {
            data.count ++
        },
        children: data.count
    })
}

测试结果如下:

00.gif

我们这里不是为了深入探讨 Vue2 的数据响应式原理的,而是为了验证上面实现 Vue2 的数据响应式原理总结的规律。也就是:这种基于依赖追踪的响应式系统,并不是某一种技术,而是一种模式。核心只有一个,就是在数据读取的时候进行依赖收集,在数据更改的时候进行依赖触发。后续我们基于数据响应式原理的规律便可以很好去理解其他数据响应式系统了,例如 React 的状态管理库——Mobx、SolidJS,我们在后续也将探讨这些库的数据响应式原理的实现。

Vue1 是通过 Object.defineProperty 实现对数据的读写监听,但由于 Object.defineProperty 的局限性,Vue2 并不只是通过 Object.defineProperty 实现数据响应式的,但都为了实现在数据读取时进行依赖收集,在数据更改时进行依赖触发。Vue3 则通过新的 API:Proxy 可以实现对数据的读写监听,但核心也是为了实现在数据读取时进行依赖收集,在数据更改时进行依赖触发。

那么问题来了,Vue1 并不只是通过 Object.defineProperty 实现数据响应式的,那么 Vue3 只是通过 Proxy 实现了数据响应式吗?

其实这个问题可以转化得更具体一些,Vue2 的 reactive 和 ref 的底层实现原理是一样的吗?有人认为 ref 和 reactive 的底层实现原理都是一样的,也就是 ref 也是通过 reactive 实现的,也就是 ref 也是通过 Proxy 实现的。如果说 ref 和 reactive 的底层实现原理不一样的话,也就是说 Vue3 可以不通过 Proxy 实现数据的响应式。

很明显 Vue3 可以不通过 Proxy 实现数据的响应式的,也就是 ref 和 reactive 的底层实现原理是不一样的。那么根据我们上面总结的实践规律,我们只需要可以实现在数据读取的时候进行依赖收集,然后在数据更改的时候进行依赖触发就可以了。那么明显可以使用 Vue2 中的 Object.defineProperty 中的 getter/setter,这种方式也叫属性访问器。根据上面 Vue2 的数据响应式原理我们可以知道如果通过 Object.defineProperty 实现对数据的监听,还要通过闭包的方式,就显得不够简洁。那么属性访问器除了使用 Object.defineProperty 进行显式声明之外,还可以通过字面量的方式,本质还是属性访问器

例如:

function ref(value) {
    return {
        _value: value,
        get value() {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && subscribers.add(activeEffect)
            return this._value
        },
        set value(val) {
            this._value = val
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            subscribers.forEach(sub => sub())
        }
    }
}

然后我们通过 ref 函数来创建一个响应式数据,再修改 App 函数。

const count = ref(0)

function App (){
    return createElement('button', {
        onClick: () => {
            count.value ++
        },
        children: count.value
    })
}

测试运行结果:

00.gif

这也就是 Vue2 的 ref API 的实现原理,当然在 Vue3 源码中如果 ref 传进来的值是一个引用对象的话,还是通过 reactive 进行实现。此外在 Vue3 的源码中 ref API 是通过一个 class 类来实现的,但原理是一样的。

我们下面也可以简单实现一下:

class RefImpl {
    _value
    constructor(value) {
        this._value = value
    }
    get value() {
       // 存在依赖就把依赖收集到依赖存储中心
       activeEffect && subscribers.add(activeEffect)
       return this._value 
    }
    set value(val) {
        this._value = val
        // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
        subscribers.forEach(sub => sub())
    }
}

function ref(value) {
    return new RefImpl(value)
}

修改好的测试结果还是一样的。

00.gif

通过沙箱模式实现依赖追踪的数据响应式

通过上面对 Vue1 和 Vue3 的数据响应式原理的实现与分析,我们知道都借助了 JavaScript 的原生 API(Object.defineProperty 和 Proxy) 来实现依赖追踪的响应式系统,那么不借助 JavaScript 原生 API 还可以实现依赖追踪的响应式系统吗? 我们上面总结出的结论是,基于依赖追踪的响应式系统的本质是在读取数据的时候收集依赖,在更新数据的时候触发依赖。那么基于此原理,我们只需要把读写进行分离那么可以实现了。

我们把上面第一版 ref 的实现通过闭包的形式改造一下:

function ref(value) {
    const s = {
        value
    }

    function getState() {
        return s.value
    }

    function setState(val) {
        s.value = val
    }
    return [getState, setState]
}

const [getState, setState] = ref(0)

console.log('初始值:', getState())
// 修改
setState(1)
console.log('修改后:', getState())

我们可以看到通过闭包的我们实现了读写分离,这种模式有一个专业的术语叫:沙箱模式,这样我们就可以在读取数据的时候收集依赖,在修改数据的时候触发依赖了。

function ref(value) {
    const s = {
        value
    }

    function getState() {
        // 存在依赖就把依赖收集到依赖存储中心
        activeEffect && subscribers.add(activeEffect)
        return s.value
    }

    function setState(val) {
        s.value = val
        // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
        subscribers.forEach(sub => sub())
    }
    return [getState, setState]
}

接着我们把 App 函数也进行修改一下:

const [count, setCount] = ref(0)

function App (){
    return createElement('button', {
        onClick: () => {
            setCount(count() + 0)
        },
        children: count()
    })
}

其实上述这种实现依赖追踪的响应式系统的方式就是 SolidJS 的响应式原理,长得像 React,实际上是 Vue。所以我们只要把核心原理搞清楚,就可以举一反三了,像读书时候一样,以后同类型的题目,你都回作答了。当然 SolidJS 的响应式原理远不止这些,我们将在后续章节继续进行深入探讨,搞明白了 SolidJS, Vue Vapor 的原理也非常容易理解了。

总结

上述所有例子中的依赖收集和触发的过程,本质就是一个发布订阅模式,而关于发布订阅模式,我们将在下一篇文章中进行详细介绍。当我们掌握了发布订阅模式后,我们再去理解这些通过依赖收集和触发实现的数据响应式系统,就会如鱼得水。

上述文章写于:2023 年,由于个人原因今年 2026 年发布。

我是程序员Cobyte,现在已转向研究 AI Agent,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

微信 ClawBot 接入本地 AI Agent 的实现原理

作者 Cobyte
2026年3月31日 09:09

1. 前言

我们知道微信最近推出了微信 ClawBot,用于在微信中与 OpenClaw 收发消息。掘金签约作者群里的一位大佬说:

01.jpg

受到启发,我就去研究了一下微信半公开的官方文档后发现,我们确实也可以通过用微信 ClawBot 接入任何 AI Agent

至于为什么说官方文档是半公开呢,因为官方暂时还没有公开的文档地址,但又可以通过某些渠道看到(怎么可以看到,本文最后揭晓)。

本文就将带你一步步实现如何通过微信 ClawBot 接入自己开发的 AI Agent。其实我们只需要做三件事:

  1. 扫码登录,拿到微信 ClawBot 的身份凭证;
  2. 长轮询等待消息,一有消息立刻获取;
  3. 把消息交给本地 Agent 处理,再把回复发回微信。

第一步,扫码登录。

2. 扫码登录

根据微信 ClawBot 文档的要求,我们先要获取一个二维码,等用户用微信扫描并确认后,服务端就会返回一个 bot_token 的通信凭证,后续所有请求都必须带着这个 token。这个跟我们平时的开发是一样,我们登录之后才能进行操作。

2.1 拉取二维码

首先微信 ClawBot 的接口地址是:

BASE_URL = "https://ilinkai.weixin.qq.com"

其次,登录的第一步是向服务端请求二维码。我们根据微信 ClawBot 的文档可以知道请求的接口是:

ilink/bot/get_bot_qrcode?bot_type=3

请求的 HTTP 方式是 GET。值得注意的是参数 bot_type 在微信 ClawBot 的文档中只出现了在获取二维码的时候,值是 3,而其他枚举值的情况,文档中并没有说明。

因为要用到 HTTP 的 GET 请求,所以我们需要封装一个 GET 请求的方法:

import json
import urllib.request
import urllib.error

# 省略...

def _get(url: str, headers: dict = {}, timeout: int = 35) -> dict:
    """发送 GET 请求"""
    req = urllib.request.Request(url, headers=headers, method="GET")
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} GET {url}: {e.read().decode(errors='replace')}") from e

接着我们封装拉取二维码的请求函数:

def fetchQRCode():
    base = BASE_URL.rstrip("/") + "/"
    url = base + "ilink/bot/get_bot_qrcode?bot_type=3"
    resp = _get(url)                      # GET 请求
    qrcode_raw = resp.get("qrcode")       # 服务端用于轮询的标识
    qrcode_url = resp.get("qrcode_img_content")   # 可扫描的二维码链接
    return qrcode_raw, qrcode_url

微信 ClawBot 服务端返回的 qrcode_img_content 是一个可以直接扫码的链接。我们在终端里可以通过安装 qrcode 库把它打印成 ASCII 二维码或者直接打印链接让用户打开链接,通过手机微信进行扫码。字段 qrcode 则是服务端的二维码标识,用于后续轮询二维码的状态,是否已经被扫码等。

我们测试一下上述代码:

print(fetchQRCode())

打印结果如下:

('50189c0db1817eb74a2bfc11e4ccdb35', 'https://liteapp.weixin.qq.com/q/7GiQu1?qrcode=50189c0db1817eb74a2bfc11e4ccdb35&bot_type=3')

我们打开上述链接在浏览器打开是一个微信二维码。

image.png

2.2 轮询扫码状态

二维码生成后,我们每隔一秒向微信 ClawBot 提供的二维码状态查询接口进行请求,直到用户完成确认。接着我们封装一个轮询扫码状态的请求接口函数:

def pollQRStatus(qrcode_raw):
    base = BASE_URL.rstrip("/") + "/"
    poll_url = base + f"ilink/bot/get_qrcode_status?qrcode={urllib.parse.quote(qrcode_raw)}"
    deadline = time.time() + 480    # 最多等 8 分钟
    # 这个是微信 ClawBot 规定的,没得解析
    headers = { "iLink-App-ClientVersion": "1" }

    while time.time() < deadline:
        try:
            s = _get(poll_url, headers)
        except Exception as e:
            print(f"  [轮询错误] {e}", flush=True)
            time.sleep(2)
            continue

        status = s.get("status", "wait")

        if status == "wait":
            # 还没扫,继续等,打一个点表示进度
            sys.stdout.write(".")
            sys.stdout.flush()

        elif status == "scaned":
            # 已经扫了,等用户在微信里点确认
            print("\n👀 已扫码,请在微信中点击确认...", flush=True)

        elif status == "confirmed":
            # ✅ 用户点了确认,登录成功!
            token      = s.get("bot_token", "")
            account_id = s.get("ilink_bot_id", "")
            # 账号 ID 规范化:把 @ 和 . 换成 -,例如 abc@im.wechat → abc-im-wechat
            account_id = account_id.replace("@", "-").replace(".", "-")
            real_base  = s.get("baseurl") or BASE_URL
            print(f"\n✅ 登录成功!account_id={account_id}", flush=True)
            return {"token": token, "account_id": account_id, "base_url": real_base}

        elif status == "expired":
            raise RuntimeError("二维码已过期,请重新运行程序。")
        # 每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力
        time.sleep(1)
    raise RuntimeError("登录超时(8分钟),请重试。")

上述函数主要是实现了根据状态码处理不同情况。服务端返回的 JSON 中包含 status 字段,表示当前二维码的状态。我们根据其值进行分支处理:

status == "wait"

  • 表示二维码尚未被扫描。
  • 在终端打印一个点 .(不换行),表示程序仍在等待,给用户视觉反馈。

status == "scaned"

  • 表示用户已经扫描了二维码,但尚未在微信中点击“确认”。
  • 打印提示信息 👀 已扫码,请在微信中点击确认...,告知用户当前进度。

status == "confirmed"

  • 成功状态:用户已确认,登录成功。
  • 从响应中提取 bot_tokenilink_bot_id(机器人唯一标识)、baseurl(可选的后端地址)。
  • 并且对 account_id 进行规范化处理,将 @ 和 . 替换为 -,例如 abc@im.wechat 变为 abc-im-wechat
  • 打印成功信息,并返回一个包含 tokenaccount_idbase_url 的字典,供上层保存和后续请求使用。

status == "expired"

  • 二维码已过期(我们这里设置 8 分钟未扫描或确认即为过期)。
  • 抛出 RuntimeError,提示用户重新运行程序获取新二维码。

最后,每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力。

2.3 实现登录并保存 token 到本地

我们在上面实现了拉取二维码的函数 fetchQRCode 和轮询等待扫码确认的函数 pollQRStatus,我们就可以实现一个登录函数 login 将整个流程串联起来了。实现如下:

def login() -> dict:
    """
    扫码登录,返回 {"token": "...", "account_id": "...", "base_url": "..."}
    """
    # ── 第 1 步:拉取二维码 ──
    [qrcode_raw, qrcode_url] = fetchQRCode() 

    if not qrcode_raw:
        raise RuntimeError(f"获取二维码失败")

    # ── 第 2 步:在终端打印二维码 ──
    print("\n请用微信扫描下方二维码:\n", flush=True)
    try:
        import qrcode                          # pip install qrcode[pil]
        qr = qrcode.QRCode(version=1, border=1)
        qr.add_data(qrcode_url)
        qr.make(fit=True)
        qr.print_ascii(invert=True)            # 用 ASCII 字符在终端渲染,尺寸最小
    except ImportError:
        # 没有安装 qrcode 库时,直接打印链接,用浏览器打开也能扫
        print(f"  {qrcode_url}\n", flush=True)

    # ── 第 3 步:轮询等待扫码 ──
    print("等待扫码...", flush=True)
    return pollQRStatus(qrcode_raw) 

上述登录函数最后返回的数据结构如下:

{
'token': 'd5b3973bb743@im.bot:060000215ac9e1ce7116aeb48b3d998c5b2e4e', 
'account_id': 'd5b3973bb743-im-bot',
'base_url': 'https://ilinkai.weixin.qq.com
'}

为了下次启动不用重新扫码,我们把 token 和 account_id 保存到文件 .weixin_token.json 中:

# token 的本地文件路径
TOKEN_FILE  = Path(__file__).parent / ".weixin_token.json"   # 保存登录后的 token

# 保存 token 和账号
def save_token(data: dict) -> None:
    """把 token 信息保存到本地文件,下次启动不用重新扫码。"""
    TOKEN_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), "utf-8")
    TOKEN_FILE.chmod(0o600)    # 仅当前用户可读,保护 token 安全

这样,下次运行程序时,如果文件存在就直接加载,跳过登录流程。所以我们还需要有一个从本地文件读取上次保存的 token 和账号的函数。实现如下:

def load_token() -> Optional[dict]:
    """从本地文件读取上次保存的 token(如果有的话)。"""
    if TOKEN_FILE.exists():
        try:
            return json.loads(TOKEN_FILE.read_text("utf-8"))
        except Exception:
            pass
    return None

所以整个主流程就是:

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()

    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    # todo run_monitor()

登录完成之后,接下来就是进入消息监听循环了。

3. 长轮询监听循环

拿到 token 后我们就需要发起一个长轮询的 HTTP 接口请求,微信 ClawBot 服务端会“憋着”不返回,直到有新消息或超时(约 35 秒)才返回。这个接口就是:

ilink/bot/getupdates

它采用 POST 方式请求,所以我们需要封装一个 POST 请求的方法,并且微信 ClawBot 的所有 POST 的请求都需要一个通用请求头,通用请求头的要求如下:

Header 说明
Content-Type application/json
AuthorizationType 固定值 ilink_bot_token
Authorization Bearer <token>(登录后获取)
X-WECHAT-UIN 随机 uint32 的 base64 编码

所以我们先封装一个每次请求都需要带的 HTTP 请求头的函数:

def _headers(token: Optional[str] = None) -> dict:
    """
    构造每次请求都需要带的 HTTP 请求头。
    - AuthorizationType: 固定值,告诉服务端这是 bot token 认证
    - Authorization: 登录拿到的 token,未登录时不带
    - X-WECHAT-UIN: 随机数的 base64,模拟微信客户端标识
    """
    h = {
        "Content-Type": "application/json",
        "AuthorizationType": "ilink_bot_token",
        # 随机 uint32 → 十进制字符串 → base64,与原版协议一致
        "X-WECHAT-UIN": base64.b64encode(
            str(struct.unpack(">I", os.urandom(4))[0]).encode()
        ).decode(),
    }
    if token:
        h["Authorization"] = f"Bearer {token}"
    return h

接着我们封装一个通用 POST 请求函数:

def _url(path: str) -> str:
    """拼接完整 URL,确保 BASE_URL 末尾有斜杠。"""
    base = BASE_URL.rstrip("/") + "/"
    return base + path
    
def _post(path: str, body: dict, token: Optional[str] = None, timeout: int = 15) -> dict:
    """
    发送 POST JSON 请求,返回解析后的响应字典。
    所有与微信后端的通信都走这个函数。
    """
    data = json.dumps(body).encode("utf-8")
    req  = urllib.request.Request(
        _url(path),
        data    = data,
        headers = {**_headers(token), "Content-Length": str(len(data))},
        method  = "POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} /{path}: {e.read().decode(errors='replace')}") from e

所有与微信 ClawBot 后端的通信都走这个 _POST 函数。

接着我们封装长轮询接收消息函数:

def getUpdates(token: str, buf: str = "", timeout: int = 35) -> dict:
    """
    长轮询接口:向服务端发请求,服务端"憋着"不回,直到有新消息或超时才返回。

    参数:
      buf     - 上次返回的游标,传给服务端表示"从这里继续",首次传空字符串
      timeout - 等待秒数,服务端通常在 35 秒内有消息就返回,无消息就返回空

    返回值里的重要字段:
      msgs            - 新消息列表(可能为空)
      get_updates_buf - 新游标,下次请求要带上它
    """
    try:
        return _post(
            "ilink/bot/getupdates",
            body    = {"get_updates_buf": buf, "base_info": {"channel_version": "mini-bridge-1.0"}},
            token   = token,
            timeout = timeout + 5,    # 客户端超时比服务端多 5 秒,避免误判
        )
    except (TimeoutError, OSError) as e:
        if "timed out" in str(e).lower():
            # 超时是正常现象,不是错误,直接返回空结果,继续下一轮
            return {"ret": 0, "msgs": [], "get_updates_buf": buf}
        raise

上述 getUpdates 函数返回两个重要的字段 msgs(消息列表)和 get_updates_buf(游标),消息列表我们很好理解,但游标我们需要了解一下。

什么是游标?

游标就是一个字符串,每次 getUpdates 返回时都会同时返回一个新的游标,表示“下一次请求从这里开始”。我们可以把游标持久化到文件 .weixin_buf.txt 中,这样即使程序重启,也能接着之前的位置继续收消息,不会漏掉中间的消息。

游标存储的本地文件路径设置如下:

# 游标存储的本地文件路径
BUF_FILE    = Path(__file__).parent / ".weixin_buf.txt"      # 保存消息游标(断点续传)

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的字段如下:

字段 类型 说明
ret number 返回码,0 = 成功
errcode number? 错误码(如 -14 = 会话超时)
errmsg string? 错误描述
msgs WeixinMessage[] 消息列表(结构见下方)
get_updates_buf string 新的同步游标,下次请求时回传
longpolling_timeout_ms number? 服务端建议的下次长轮询超时(ms)

根据上述的资料我们就可以实现对 getUpdates 接口的长轮询监听循环了,实现如下:

def run_monitor(token: str) -> None:
    """
    长轮询监听循环
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

上述实现 run_monitor 函数目前能长轮询监听微信 ClawBot 服务器消息的接收。主要功能如下:

  • 首先加载之前保存的消息游标(buf)实现断点续传。

  • 进入监听循环:

    1. 调用 getUpdates(token, buf) 获取消息(可能阻塞直到有消息或超时)。
    2. 如果调用失败(异常),则增加失败计数,连续失败 3 次后休眠 30 秒再重试;否则休眠 2 秒后继续。
    3. 如果返回的业务错误码非 0,则打印错误并休眠 2 秒后继续。
    4. 成功获取响应后,提取 get_updates_buf 新游标,更新 buf 并持久化到文件,以便下次重启恢复

我们知道上述 getUpdates 接口还返回了 msgs 消息列表,我们需要遍历返回的消息列表,提取文本,交给本地 AI Agent 进行处理。

4. 处理返回的消息

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的 msgs 消息列表字段结构如下:

字段 类型 说明
seq number? 消息序列号
message_id number? 消息唯一 ID
from_user_id string? 发送者 ID
to_user_id string? 接收者 ID
create_time_ms number? 创建时间戳(ms)
session_id string? 会话 ID
message_type number? 1 = USER, 2 = BOT
message_state number? 0 = NEW, 1 = GENERATING, 2 = FINISH
item_list MessageItem[]? 消息内容列表
context_token string? 会话上下文令牌,回复时需回传

然后字段 item_list(消息内容列表)的字段结构又如下:

字段 类型 说明
type number 1 TEXT, 2 IMAGE, 3 VOICE, 4 FILE, 5 VIDEO
text_item { text: string }? 文本内容
image_item ImageItem? 图片(含 CDN 引用和 AES 密钥)
voice_item VoiceItem? 语音(SILK 编码)
file_item FileItem? 文件附件
video_item VideoItem? 视频
ref_msg RefMessage? 引用消息

根据上述资料我们就可以处理微信 ClawBot 服务器返回的消息了。处理如下:

def run_monitor(token: str) -> None:

    # 省略...

    while True:

        # 省略...

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

实现也很简单,遍历 msgs 消息列表,然后再从消息的 item_list 里找 type=1(文本)的那一项。而非文本消息(图片、语音等)我们暂不处理,先跑通主流程再说。

经过上述处理后我们就拿到了微信 ClawBot 服务器返回的文本消息了,我们接着就把它交给本地 Agent 进行处理。

5. 接入本地 AI Agent

前面的步骤已经实现了本地接收到微信 ClawBot 发来的信息了,现在就需要接入一个本地 AI Agent 来处理微信用户发来的信息了。接入本地 AI Agent 也很简单,我们前面的文章已经实现了一个 Agent Loop,我们直接使用就可以了。

我们定义一个函数 askAgent,用它来管理每个用户的对话历史,并将用户的新消息交给 Agent 处理:

# ── 导入本地 Agent ──
from agent import agent_loop, SYSTEM as AGENT_SYSTEM

# 每个微信用户维护一份独立的对话历史,key 是用户 ID
_sessions: dict[str, list] = {}

def askAgent(user_id: str, user_text: str) -> str:
    """
    把用户的消息交给 Agent 处理,返回 Agent 的回复文本。

    - 每个用户有自己独立的对话历史(_sessions),实现多用户隔离
    - agent_loop 会循环调用大模型直到得到最终回复
    """
    # 第一次对话时,初始化这个用户的历史,带上系统提示词
    if user_id not in _sessions:
        _sessions[user_id] = [{"role": "system", "content": AGENT_SYSTEM}]

    # 把用户这条消息追加到历史
    _sessions[user_id].append({"role": "user", "content": user_text})

    # 交给 Agent 处理,agent_loop 会直接修改传入的列表(追加 assistant 回复)
    try:
        reply = agent_loop(_sessions[user_id])
        return reply or "(无回复)"
    except Exception as e:
        return f"[Agent 出错] {e}"

我们上述函数 ask_agent 实现了把用户的消息交给 Agent 处理,然后返回 Agent 的回复文本,并且还实现每个用户有自己独立的对话历史,实现了多用户隔离。

接下来就是把 Agent 的回复发回微信。

6. 把 Agent 的回复发回微信

回复消息的接口是 ilink/bot/sendmessage,它最重要的参数是 context_token,这个 token 是从收到的消息里原样取出的,服务端依靠它来将回复与对话关联起来(类似于会话 ID)。我们来实现一个 sendMessage 函数进行发送信息:

def sendMessage(token: str, to_user_id: str, text: str, context_token: str) -> None:
    """
    向微信用户发送一条文本消息。

    重要:context_token 必须原样从收到的消息里取出并回传,
    服务端靠它把回复和对话关联起来。没有它,消息发不出去。
    """
    _post(
        "ilink/bot/sendmessage",
        token = token,
        body  = {
            "msg": {
                "from_user_id" : "",                            # bot 发送,留空
                "to_user_id"   : to_user_id,                   # 发给谁
                "client_id"    : f"mini-{secrets.token_hex(8)}",  # 本次消息的唯一ID,防重复
                "message_type" : 2,                            # 2 = BOT 消息
                "message_state": 2,                            # 2 = 消息已完成(非流式)
                "item_list"    : [{"type": 1, "text_item": {"text": text}}],  # type=1 是文本
                "context_token": context_token,                # ← 关键!必须带上
            },
            "base_info": {"channel_version": "mini-bridge-1.0"},
        },
    )

这里我们固定使用 message_type=2(机器人消息)、message_state=2(已完成,非流式)。client_id 是消息的唯一标识,用于去重,这里随机生成即可。

7. 整合运行

现在我们就可以把登录、收消息、处理消息、发消息串起来了,整合运行。

def run_monitor(token: str) -> None:
    """
    长轮询监听循环:持续等待微信消息,收到后交给 Agent 处理并回复。

    整个循环做三件事:
      1. 调 getUpdates() 等消息(服务端"憋着",有消息才返回)
      2. 遍历返回的消息列表,提取文本,交给 ask_agent() 得到回复
      3. 调 sendMessage() 把回复发回给用户

    参数:
      token - 登录后拿到的 bot_token,每次请求都要带上
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

            # 第二件事:把文本交给 Agent,得到回复
            reply = askAgent(from_user, text)
            print(f"[回复] {reply[:60]}", flush=True)

            # 第三件事:把 Agent 的回复发回微信
            try:
                send_message(token, from_user, reply, ctx_token)
                print("[✓] 已发送", flush=True)
            except Exception as e:
                print(f"[✗] 发送失败: {e}", flush=True)

最后,在主函数中,我们读取或登录获取 token,然后启动 run_monitor

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()
    # 不存在 token 就扫码登录
    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    run_monitor(token)

在运行前我们需要安装一下相关依赖。

requirements.txt 内容如下:

openai==2.24.0
itchat-uos>=1.3.10
qrcode_terminal == 0.8.0

然后执行:

pip install -r requirements.txt

接着我们运行上述代码结果显示如下:

image.png

接着我们使用微信扫码结果显示如下:

image.png

我们点击按钮继续,这时可以看到终端显示如下:

image.png

微信端显示如下:

image.png

聊天栏显示:

c4a8f799d97ee7c9a29b75fa359f5263.jpg

这时我们就可以通过微信 ClawBot 和我们本地自己写的 Agent 进行通讯了。比如我们之前实现的一个可以读取本地文件的 AI Agent,我们创建一个测试文件 test.txt,写上以下内容:

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人

然后在微信 ClawBot 中输入:帮我读取 test.txt 的文件内容,显示如下

image.png

终端内容显示如下:

image.png

8. 总结与扩展

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人,它能够:

  • 通过扫码登录
  • 长轮询接收消息
  • 调用任意本地 AI Agent 处理消息
  • 将回复发回给微信用户

整个程序的核心代码不到 200 行,却涵盖了微信 ClawBot 协议的关键点。你可以在此基础上轻松扩展:

  • 支持多轮对话:通过会话历史管理,我们已经实现了多轮对话的基础。
  • 支持图片、语音:解析消息中的 item_list,识别图片或语音,调用相应的 AI 模型(如图像识别、语音转文字)。
  • 支持命令识别:在文本中检测特定前缀(如 /help),触发不同功能。
  • 接入更强大的 Agent:例如集成 LangChain 实现复杂工作流、接入 Ollama 或 vLLM 等本地推理框架运行开源大模型、或增加联网搜索、RAG(检索增强生成)等能力。

最重要的是,这套方法不依赖任何第三方中间件,完全基于微信官方 ClawBot 协议,相对稳定可靠。你只需要一个微信账号,就能让你的 AI 助手 7×24 小时在线。

希望本文能帮你打开一扇窗,让你在微信这个庞大的社交平台上,用自己的 AI 能力创造更多有趣的应用。动手试一试吧,你会发现过程比想象中简单许多!

我是 程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

最后怎么查看微信 ClawBot 的官方文档,可以通过 npm 安装 @tencent-weixin/openclaw-weixin-cli@tencent-weixin/openclaw-weixin 包,然后在 node_modules 目录中找对应的包里面有源码和文档。当然微信团队不公开可能后续会随时改变策略,所以须谨慎评估风险。

❌
❌