阅读视图

发现新文章,点击刷新页面。

从网关的角度理解并实现一个 Mini OpenClaw

1. 前言

OpenClaw 与其他 AI Agent 最本质的区别是什么?首先,OpenClaw 本身也是一个 AI Agent,但关键在于它能连接多种 IM 渠道,并利用这些 IM 工具提供的开发能力来调用自身的 Agent——这种能力被称为“网关”。因此,有后端的技术大咖将 OpenClaw 总结为:OpenClaw = 高权限 AI Agent + 网关

所以只有理解了 OpenClaw 的本质之后,我们才可以实现一个 Mini OpenClaw。

首先我们要实现一个网关,那么网关是什么呢?

网关对于后端的同学来说,肯定不陌生。在 Spring Boot 微服务架构中,API 网关已成为标准的基础设施组件,其核心作用与 OpenClaw 中的“网关”如出一辙:对外隐藏后端的实现细节(服务地址、版本、熔断等),对内统一通信协议,并提供横切能力(如鉴权、限流、日志等) 。两者的区别仅在于作用对象不同——OpenClaw 的网关面向 IM 渠道(消息协议适配),而后端网关面向 HTTP/RPC 调用(协议转换与流量管理)。

所以 OpenClaw 的所谓网关就是一个消息协议适配器。

所以我们先要实现网关最核心的功能:协议适配。这是网关最本质的能力——对外讲 IM 的方言,对内统一说普通话。

2. 网关核心功能:协议适配

不同 IM(飞书、微信 等)的消息格式千差万别:有的用 user_id,有的用 from 字段,有的消息正文可能嵌套在 text 或 message 对象中。我们可以通过设计一个消息协议将这些差异全部“抹平”,这样本地 AI Agent 就只依赖这标准消息协议,无需关心消息来自哪个渠道。

设计一个入站的消息对象 InboundMessage:

# events.py
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class InboundMessage:
    """从聊天频道接收到的消息"""
    channel: str  # 用于区分来源,后续发送回复时需要知道应该调用哪个 IM 的 API(feishu、wechat)
    sender_id: str  # 用户标识符
    chat_id: str  # 聊天/频道标识符
    content: str  # 消息文本
    timestamp: datetime = field(default_factory=datetime.now)  # 消息时间

这样新增一个 IM 渠道时,只需要写一个适配器将私有消息转换成 InboundMessage 即可,其余代码零改动。

简而言之:设计 InboundMessage 就是为了让网关“对外讲方言,对内讲普通话”,所有渠道的消息到达网关后立刻被标准化,Agent 只需处理这一种标准格式。

同样地不同 IM 的发送接口千差万别:飞书需要 receive_id,微信需要 touser,Telegram 需要 chat_id。通过设计一个 OutboundMessage 消息对象,这样 Agent 只需要产出 channelchat_idcontent 三个核心字段,网关再根据 channel 值调用对应的 IM 适配器,由适配器负责转换成目标 IM 的私有请求格式即可。

OutboundMessage 消息对象的字段设计如下:

# events.py
@dataclass
class OutboundMessage:
    """要发送到聊天频道的消息"""
    
    channel: str
    chat_id: str
    content: str
    reply_to: str | None = None # 支持引用回复,用于指明当前回复的是哪一条历史消息

网关的输入是 InboundMessage,输出是 OutboundMessage,这样本地 AI Agent 核心只处理这两种标准格式信息,完全不依赖任何 IM 私有 API。这使得添加新 IM 渠道变得非常简单:只需要写一个适配器,将 InboundMessage 解析出来,并将 OutboundMessage 转换成该 IM 的发送请求即可。因为本地 AI Agent 完全不知道自己在和谁在交互,它只看到 InboundMessage/OutboundMessage,这正是网关隐藏后端实现细节的精髓,也是网关本质的体现

3. 网关内部路由:统一通信总线

根据前面的设计,我们已经将各个 IM 渠道的消息统一成了 InboundMessage,并将 Agent 的回复统一成了 OutboundMessage。但仅仅统一格式还不够,还需要解决一个核心问题:多个渠道的消息并发涌入,而 Agent 的处理可能是同步/半异步的,如何让它们有序、可靠、不互相阻塞?

这就需要一个统一通信总线——本质上是一个轻量级的内部消息路由。而最经典、最可靠的实现方式就是双队列解耦

入站异步队列: 渠道 → Agent
出站异步队列: Agent → 渠道

通过双队列把网关内部的“消息流动”标准化为两个 FIFO 管道:

  • 入站异步队列:所有 IM 渠道的消息汇聚点,Agent 从这头取“原材料”。
  • 出站异步队列:所有回复的汇聚点,分发器从这头取“成品”并发送。

为什么需要这样设计?

每个 IM 渠道(飞书、微信等)都有自己的 Webhook 或长连接,当瞬间收到大量消息(例如群聊刷屏)时,如果直接在回调中同步调用 Agent,Agent 处理耗时较长,会导致 Webhook 超时、连接堆积,甚至被 IM 服务器屏蔽。

我们让每个渠道适配器只做最轻量的事情,每当接收到消息时,就只需要解析消息、封装成上述设计的 InboundMessage,然后立即推送到入站异步队列中,马上返回返回即可。而 Agent 的处理则由一个独立的后台协程从入站异步队列中拉取,这样生产者和消费者的速度完全解耦。即使 Agent 处理得慢,队列也能起到“缓冲”作用,不会丢消息。

同时 Agent 只产出上述设计的 OutboundMessage 的数据并推送到出站异步队列中。另一个独立的分发器协程从出站异步队列中取出消息,找到对应的渠道适配器,调用该适配器的发送方法进行发送消息。这样一来,Agent 完全不需要知道消息要发往哪里、怎么发,路由逻辑全封装在网关内部。

统一通信总线代码实现如下:

# message_bus.py
"""用于解耦频道与智能体通信的异步消息队列"""
import asyncio
from loguru import logger
from events import InboundMessage, OutboundMessage

class MessageBus:
    """
    异步消息总线,用于将聊天频道与智能体核心解耦。
    频道将消息推送到入站队列,智能体处理它们并将响应推送到出站队列。
    """
    def __init__(self):
        # 入站异步队列
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        # 出站异步队列
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
    
    async def publish_inbound(self, msg: InboundMessage) -> None:
        """将来自频道的消息发布给智能体"""
        await self.inbound.put(msg)
    
    async def consume_inbound(self) -> InboundMessage:
        """消费下一条入站消息(阻塞直到有消息可用)"""
        return await self.inbound.get()
    
    async def publish_outbound(self, msg: OutboundMessage) -> None:
        """将智能体的响应发布给频道"""
        await self.outbound.put(msg)
    
    async def consume_outbound(self) -> OutboundMessage:
        """消费下一条出站消息(阻塞直到有消息可用)"""
        return await self.outbound.get()

同时入站异步队列和出站异步队列通过 asyncio.Queue 提供。asyncio.Queue 是异步编程中实现生产者-消费者模式的标准工具,它让不同协程之间可以安全、非阻塞地交换数据。在我们上述网关的设计中,正是依赖它实现了入站/出站双队列解耦,从而让多个 IM 渠道可以并发接收消息,同时 Agent 通过并发处理消息,实现效率提高。没有它,你就得自己用锁和条件变量实现类似功能,既复杂又容易出错。

接着我们修改上一篇文章《如何使用飞书机器人连接本地 AI Agent》中实现的飞书连接本地 AI Agent 的飞书频道,实现将来自飞书的消息转发到通信总线。

# feishu.py
+ from events import InboundMessage
+ from message_bus import MessageBus

class FeishuChannel:
    """极简版飞书 WebSocket 长连接机器人"""
+    name = "feishu"
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        self.config = config
        self.bus = bus
        # 省略...

    async def start(self) -> None:
        # 省略...
-    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
+    async def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""
        msg = data.event.message
+        sender = data.event.sender
        # 只处理用户发送的纯文本消息
        if data.event.sender.sender_type == "bot" or msg.message_type != "text":
            return

        content = json.loads(msg.content).get("text", "")
        if not content:
            return
        
+        # 提取发送者信息
+        sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
+        # 获取用于回复的 chat_id
+        chat_id = msg.chat_id
+        chat_type = msg.chat_type  # "p2p" 或 "group"
+        reply_to = chat_id if chat_type == "group" else sender_id
+        # 将消息转发到总线
+        await self._handle_message(
+            sender_id=sender_id,
+            chat_id=reply_to,
+            content=content,
+        )
-        # 启动独立线程处理 AI 逻辑并回复,防止阻塞 WebSocket 接收循环
-        # threading.Thread(
-        #     target=self._process_and_reply, 
-        #     args=(msg.chat_id, content)
-        # ).start()

+    async def _handle_message(
+        self,
+        sender_id: str,
+        chat_id: str,
+        content: str,
+    ) -> None:
+        """
+        处理来自聊天平台的传入消息。
+        此方法将消息转发到总线。
        
+        参数:
+            sender_id: 发送者的标识符。
+            chat_id: 聊天/通道的标识符。
+            content: 消息文本内容。
+        """
        
+        msg = InboundMessage(
+            channel=self.name,
+            sender_id=str(sender_id),
+            chat_id=str(chat_id),
+            content=content
+        )
        
+        await self.bus.publish_inbound(msg)

现在我们已经将飞书发过来的消息推送到通信总线中了,接着我们需要在 Agent 异步处理协程中循环读取总线中的消息进行处理了。

4. 实现并发 Agent Loop

我们上文讲到了通过 asyncio.Queue 实现了入站/出站双队列解耦,从而让多个 IM 渠道可以并发接收消息,同时 Agent 通过并发处理消息,实现效率提高。

但我们前面实现的 Agent Loop 的同步处理数据,所以我们需要重新设计并实现我们的 Agent Loop。

首先我们这个 Agent Loop 需要具备以下功能点:

  1. 持续运行:只要网关没有关闭,Agent Loop 就要一直工作,不能退出。
  2. 响应及时:当有新消息到达时,应尽快开始处理,避免不必要的延迟。
  3. 可优雅停止:外部可以调用 stop() 方法,让循环在安全时机退出,而不是强制杀死协程。
  4. 容错性:单条消息处理失败不应导致整个循环崩溃,并且要能告知用户出错。

那么第一个功能点持续运行,我们可以通过使用一个布尔标志控制循环是否继续。

self._running = True
while self._running:
    # 只要 self._running = True 就一直循环读取通讯总线中的消息进行处理

这样只要 self._running = True 就一直循环读取通讯总线中的消息进行处理。同时我们设计一个 stop() 方法设置 self._running = False,这样外部协程就可以调用 stop() 使得循环将在下一次条件判断时退出。

在读取通讯总线中的消息时,我们需要通过 asyncio.wait_for 实现可中断阻塞读取。即如下实现:

self._running = True
while self._running:
    # 只要 self._running = True 就一直循环读取通讯总线中的消息进行处理
    msg = await asyncio.wait_for(
        self.bus.consume_inbound(), # 本质是 await inbound_queue.get()
        timeout=1.0,
    )

如果不使用 asyncio.wait_for 而是直接使用 await self.bus.consume_inbound() 的话,没有消息就一直等着,那么循环永远不会走到 while self._running 的条件判断。此时调用 stop() 设置 self._running = False 是无效的,因为协程卡在 get() 上,永远没有机会检查 self._running 标志。

而使用 asyncio.wait_for 并设置超时为 1 秒,也就是如果 1 秒内返回了消息,就正常得到 msg。如果 1 秒后队列仍为空,wait_for 会抛出 asyncio.TimeoutError。这样,协程最多阻塞 1 秒就会醒来一次,重新检查 while self._running。因此,即使没有消息,循环也能每秒检查一次退出标志,实现可中断的阻塞读取

根据上述设计我们初步实现 Agent Loop 如下:

import asyncio
import json
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from openai import AsyncOpenAI

from events import InboundMessage, OutboundMessage
from message_bus import MessageBus

load_dotenv()

class AgentLoop:
    def __init__(
        self,
        bus: MessageBus,
        max_iterations: int = 200,
        api_key: str | None = None,
        base_url: str = "https://api.deepseek.com",
        model: str = "deepseek-chat",
    ):
        self.bus = bus
        # 最大工具调用轮次,防止死循环
        self.max_iterations = max_iterations
        self.model = model
        self._running = False
        # 初始化 OpenAI异步客户端 兼容客户端(如 DeepSeek)
        self.client = AsyncOpenAI(
            api_key=api_key or os.getenv("DEEPSEEK_API_KEY"),
            base_url=base_url,
        )

    # ------------------------------------------------------------------
    # 主循环:持续消费 入站异步队列
    # ------------------------------------------------------------------

    async def run(self) -> None:
        """运行智能体循环,处理来自总线的消息。"""
        self._running = True
        logger.info("Agent loop started")

        while self._running:
            try:
                # 从入站队列消费下一条消息,设置超时以便能定期检查 _running 标志
                msg = await asyncio.wait_for(
                    self.bus.consume_inbound(),
                    timeout=1.0,
                )
                try:
                    # 处理消息并获取响应
                    response = await self._process_message(msg)
                    if response:
                        # 将响应发布到出站队列
                        await self.bus.publish_outbound(response)
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    await self.bus.publish_outbound(
                        OutboundMessage(
                            channel=msg.channel,
                            chat_id=msg.chat_id,
                            content=f"抱歉,处理消息时出错:{e}",
                        )
                    )
            except asyncio.TimeoutError:
                continue

    def stop(self) -> None:
        """停止智能体循环。"""
        self._running = False
        logger.info("Agent loop stopping")

上述的 run 方法需要在一开始就启动,这样才可以实现一有消息就马上处理,而不会漏消息。我们把上一篇讲解实现飞书接入本地 AI Agent 的启动文件 test_feishu.py 重命名为 gateway.py,也就是网关的意思,并且修改其中的启动代码:

+ from message_bus import MessageBus
+ from loop import AgentLoop
async def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxx",         # 替换为真实的 App ID
        app_secret="xxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
+    deepseek_key = os.getenv("DEEPSEEK_API_KEY", "")
+    bus = MessageBus()
+    agent = AgentLoop(
+        bus=bus,
+        api_key=deepseek_key,
+        base_url="https://api.deepseek.com",
+        model="deepseek-chat",
+        max_iterations=20,
+    )
    
    # 2. 初始化频道并启动长连接
-    channel = FeishuChannel(config=config)
+channel = FeishuChannel(config=config, bus=bus)
    
    logger.info("正在启动飞书机器人长连接...")
    
-    # 3. 启动并保持运行
+    # 3. 并发运行
    try:
-        await channel.start()
+        await asyncio.gather(
+            agent.run(),          # 持续消费 inbound 队列,调用 LLM
+            channel.start(),      # 飞书启动
+        )
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

通过上述修改我们就实现了 Agent 和飞书频道在初始化的时候并发运行,从而实现了一开始就监听入站异步队列的消息。

上述 Agent Loop 的 self._process_message 方法是还没实现的,所以我们继续实现 Agent 对消息的处理。本质就是实现大模型的工具调用循环。

在实现 Agent 对消息的处理之前,我们先要重新设计一下会话历史。

5. 会话历史设计

在前面的文章中我们的会话历史就是一个数组,结构如下:

history = [
    {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个助手")},
    {"role": "user", "content": content}
]

后续如果继续有消息就根据角色往数组 history 中追加用户消息和助手消息即可。

但在 OpenClaw 中需要保证不同渠道、不同群、不同用户的历史会话完全隔离。我们可以使用 dict[str, list[dict]] 作为存储结构,相当于在 JavaScript 中设置一个对象,然后通过 key 作为唯一标识进行会话隔离。

key 设计:

这个 key 我们可以设置由 channel + chat_id 组合而成,例如 "feishu:oc_xxx"。然后我们在之前设计的 InboundMessage 对象中设置一个 session_key 方法用于返回会话唯一标识。设置如下:

@dataclass
class InboundMessage:
    # 省略...
    
+    @property
+    def session_key(self) -> str:
+        """用于会话标识的唯一键"""
+        return f"{self.channel}:{self.chat_id}"

value 设计:

value 其实就是上述的历史会话数组,即:

[
    {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个助手")},
    {"role": "user", "content": content}
]

同时我们设计一个 _get_history 的函数来实现对会话历史的懒加载,如果 session_key 不存在,自动创建新列表并插入 system prompt,如果 session_key 存在则返回内部列表的直接引用,调用方可以修改它,即追加消息。这样设计可以避免拷贝带来的性能开销。

实现如下:

# ---------- 会话历史管理(按 session_key 隔离) ----------
# 全局字典:存储所有会话的对话历史
# - Key: session_key,用于唯一标识一个会话(例如 "feishu:chat_id")
# - Value: 消息列表,每个元素是 OpenAI API 兼容的消息字典(包含 role, content 等字段)
_sessions: dict[str, list[dict]] = {}

# 系统提示词:定义 AI 助手的角色、能力和行为准则
SYSTEM_PROMPT = (
    "你是一个智能助手,可以通过工具帮助用户完成任务。"
    "请简洁、准确地回答用户问题。"
)
# 获取会话历史
def _get_history(session_key: str) -> list[dict]:
    # 若为新会话,自动初始化一条包含 system prompt 的消息
    if session_key not in _sessions:
        _sessions[session_key] = [{"role": "system", "content": SYSTEM_PROMPT}]
    # 返回该会话的历史列表(引用,允许外部修改)
    return _sessions[session_key]

6. Agent Loop 的核心:消息处理

在完成了会话历史管理和主循环的可中断阻塞读取之后,Agent Loop 最核心的部分就是 单条消息的处理逻辑——即 _process_message 方法。该方法实现了 ReAct(推理+行动)模式:调用 LLM → 若需要工具则执行工具 → 将结果返回 LLM → 重复直到得到最终答案。下面详细解析其实现:

class AgentLoop:
    # 省略...

    # ------------------------------------------------------------------
    # 单条消息处理:tool-call 循环
    # ------------------------------------------------------------------
    async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
        # 1. 获取当前会话的历史,并追加用户消息
        messages = _get_history(msg.session_key)
        messages.append({"role": "user", "content": msg.content})

        final_content: str | None = None
        # 2. 进入工具调用循环(最多 max_iterations 次)
        for iteration in range(self.max_iterations):
            # 3. 调用 LLM(异步非阻塞)
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages, 
                tools=TOOLS,
                tool_choice="auto",
            )
            assistant_msg = response.choices[0].message

            # 将助手消息追加到历史
            messages.append(assistant_msg)

            # 4. 如果没有 tool_calls,说明任务完成
            if not assistant_msg.tool_calls:
                final_content = assistant_msg.content or ""
                break

            # 5. 执行所有工具调用,并将结果以 role=tool 追加到历史记录
            for tool_call in assistant_msg.tool_calls:
                name = tool_call.function.name
                args = json.loads(tool_call.function.arguments)
                logger.debug(f"Executing tool: {name}, args: {args}")

                result = _execute_tool(name, args)
                logger.debug(f"Tool result: {result[:100]}")

                messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "name": name,
                        "content": result,
                    }
                )
        else:
            # 达到最大迭代次数
            final_content = "已达到最大处理轮次,无法给出最终答案。"

        if final_content is None:
            final_content = "处理完成,但没有内容返回。"
        # 6. 构造出站消息返回给用户
        return OutboundMessage(
            channel=msg.channel,
            chat_id=msg.chat_id,
            content=final_content,
        )

上述代码的实现跟我们前面文章实现 Agent Loop 是一样的,所以大家还有不懂的话,可以回看前面文章的详细解析。最最重要的就是最后返回了构造了 OutboundMessage 格式的出站消息,然后在 run 方法中通过 self.bus.publish_outbound(response) 将消息发布到出站队列。

其中工具定义实现如下:

# ---------- 内置工具定义 ----------
TOOLS: list[dict] = [
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "读取本地文本文件内容。",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "文件路径"},
                    "encoding": {
                        "type": "string",
                        "enum": ["utf-8", "gbk"],
                        "description": "文件编码,默认 utf-8",
                    },
                },
                "required": ["path"],
            },
        },
    }
]

def _execute_tool(name: str, arguments: dict) -> str:
    """同步执行内置工具,返回字符串结果。"""
    if name == "read_file":
        from pathlib import Path

        path = arguments.get("path", "")
        encoding = arguments.get("encoding", "utf-8")
        try:
            p = Path(path).expanduser()
            if not p.exists():
                return f"❌ 文件不存在: {path}"
            return p.read_text(encoding=encoding)
        except Exception as e:
            return f"❌ 读取失败: {e}"
    return f"❌ 未知工具: {name}"

我们这里先只实现一个读取文件内容的工具,后续再实现更多的工具。

7. 构建网关的渠道层

7.1 为什么需要渠道层?

在上一小节中,我们实现在 Agent 中构造了 OutboundMessage 格式的出站消息,然后将消息发布到出站队列中。但还缺少关键的一环:出站异步队列中的消息由谁来消费?如何将 Agent 的回复正确地发送回原来的聊天频道?

我们知道每个即时通讯平台都有自己独特的 API 协议,如果让 Agent 直接处理这些差异,会导致 Agent 逻辑中混杂大量渠道特定代码,每增加一个渠道就要修改 Agent 核心逻辑,这会造成维护噩耗。

所以我们需要构建一个 渠道管理器(ChannelManager),作为网关的出站交通枢纽,负责管理所有 IM 适配器的生命周期,并将出站消息路由到正确的渠道。具体需要实现以下功能:

  1. 注册与管理渠道实例

    • 运行时动态注册各个渠道
    • 维护渠道状态信息
    • 提供统一的渠道访问接口
  2. 协调启动与停止流程

    • 控制渠道启动顺序,避免竞态条件
    • 实现优雅停止,防止消息丢失
    • 处理异常情况下的资源清理
  3. 消息路由与派发

    • 根据消息的 channel 字段路由到正确渠道
    • 调用渠道的发送方法
    • 实现错误隔离和重试机制

7.2 渠道层的设计与实现

如果把整个网关系统比作一个繁忙的交通枢纽,那么渠道层就是站在十字路口中央的交警。它不亲自运送货物,但指挥着所有运输车辆有序通行。

具体来说,渠道层连接着:

  • 上游:内部消息总线(MessageBus),接收标准化的出站消息
  • 下游:各个 IM 渠道适配器(FeishuChannel、WechatChannel 等)

我们先实现一个 ChannelManager 类,并实现数据结构与初始化。代码如下:

import asyncio
from loguru import logger
from message_bus import MessageBus
from feishu import FeishuChannel


class ChannelManager:
    def __init__(self, bus: MessageBus):
        self.bus = bus
        # 存储已注册的渠道适配器,key 为渠道名称(如 "feishu")
        self.channels: dict[str, FeishuChannel] = {}
        # 出站分发器的任务句柄,用于优雅停止
        self._dispatch_task: asyncio.Task | None = None

ChannelManager 的核心数据结构 channels 是一个字典: channel_name → 适配器实例

  • Key = 渠道名称(如 "feishu"、"wechat")
  • Value = 渠道实例对象

这个设计实现了运行时动态注册,可以在不重启服务的情况下添加新渠道。

接着我们来实现注册渠道功能:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        """注册一个渠道适配器。要求该适配器必须有 name 属性和 send 方法。"""
        self.channels[channel.name] = channel
        logger.info(f"Channel registered: {channel.name}")

上述注册渠道的代码实现看起很简单,其实背后的设计原理一点也不简单。它应用了工厂模式 + 依赖注入的设计模式。

  1. 工厂模式体现在:渠道的创建由外部完成,ChannelManager 只负责使用
  2. 依赖注入体现在:渠道实例通过 register() 方法注入,而非在 ChannelManager 内部创建

我们已经实现了一个飞书渠道 FeishuChannel,所以现在需要通过以下方式进行注册飞书渠道:

manager.register(FeishuChannel(...))

同时将来如果我们想新增一个微信渠道,就可以这样实现了,先实现一个 WechatChannel,然后:

manager.register(WechatChannel(...))

这样网关核心代码零改动,真正实现了"开闭原则":对扩展开放,对修改关闭。

接着实现启动所有已注册的频道以及出站分发器。

代码实现如下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        # 省略...
    async def start_all(self) -> None:
        """启动所有已注册的频道以及出站分发器。"""
        if not self.channels:
            logger.warning("No channels registered")
            return

        # 先启动出站分发器协程(确保一有出站消息就能被处理)
        self._dispatch_task = asyncio.create_task(self._dispatch_outbound())

        # 并发启动所有渠道(每个渠道的 start 方法负责建立长连接或监听 Webhook)
        tasks = []
        for name, channel in self.channels.items():
            logger.info(f"Starting {name} channel...")
            tasks.append(asyncio.create_task(channel.start()))

        # 注意:通常渠道的 start 会永久阻塞(如 WebSocket 循环),因此 gather 不会返回
        await asyncio.gather(*tasks, return_exceptions=True)

我们上述的代码实现了一个看似简单却至关重要的设计决策,就是先启动分发器再启动渠道。那么为什么先启动分发器再启动渠道呢?

主要是为了防止消息丢失与响应延迟。让我们分析两种启动顺序的后果:

场景 A:先启动渠道,后启动分发器 时间线:

  1. 飞书渠道启动成功 ✓
  2. 用户立即发送消息:"你好"
  3. Agent 快速处理,生成回复:"你好!我是AI助手"
  4. 回复进入出站队列...
  5. 但是!分发器还没启动 ❌
  6. 回复消息在队列中堆积
  7. 用户等待...等待...(用户体验差)

场景 B:先启动分发器,后启动渠道(我们采用的方式) 时间线:

  1. 分发器启动,开始监听出站队列 ✓
  2. 飞书渠道启动成功 ✓
  3. 用户发送消息:"你好"
  4. Agent 处理,生成回复:"你好!我是AI助手"
  5. 回复进入出站队列
  6. 分发器立即发现新消息 ✓
  7. 路由到飞书渠道,立即发送 ✓
  8. 用户秒级收到回复(体验流畅)

在实际的生产环境经验中,"空转等待"比"忙中丢消息"要好得多。分发器提前就位,就像快递员提前在仓库门口等待,包裹一出来就能立即配送。

接着我们实现出站消息分发器

代码实现如下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        # 省略...
    async def start_all(self) -> None:
        # 省略...
    async def _dispatch_outbound(self) -> None:
        """
        出站分发器:持续消费 outbound 队列,将消息发送到对应的渠道。
        这是一个后台协程,在 start_all 时启动。
        """
        logger.info("Outbound dispatcher started")

        while True:
            try:
                # 可中断阻塞读取,每隔1秒检查一次取消信号
                msg = await asyncio.wait_for(
                    self.bus.consume_outbound(),
                    timeout=1.0,
                )
                # 根据消息中的 channel 字段找到对应的适配器
                channel = self.channels.get(msg.channel)
                if channel:
                    try:
                        # 调用适配器的 send 方法(各渠道自己实现转换和发送逻辑)
                        await channel.send(msg)
                    except Exception as e:
                        logger.error(f"Error sending to {msg.channel}: {e}")
                else:
                    logger.warning(f"Unknown channel: {msg.channel}")

            except asyncio.TimeoutError:
                # 超时不是错误,只是没有消息,继续循环
                continue
            except asyncio.CancelledError:
                break

我们上一小节中所说的先启动分发器,本质就是通过 while True 不断循环使用 asyncio.wait_for 消费 outbound 队列,然后根据 msg.channel 路由并调用 send 方法。

设计亮点:

  1. 拉模式(Pull)而非推模式(Push)

    • 主动从消息队列拉取消息,控制权在自己手中
    • 相比回调式的推模式,更容易控制消费速率和错误处理
  2. 可中断的事件循环

    • timeout=1.0 让循环能定期"抬头看路",检查是否有停止信号
    • 没有这个超时,任务会一直阻塞在 consume_outbound() 上,难以优雅停止

接着我们继续实现渠道的发送方法,这是协议翻译的最后一步。

为了让 ChannelManager 能够统一管理,每个 IM 适配器必须实现以下两个成员:

  1. name: str:渠道唯一标识(如 "feishu")。
  2. async send(msg: OutboundMessage) -> None:发送回复的方法。

以飞书适配器为例,我们之前已经定义了 name = "feishu",现在补充 send 方法的实现:

class FeishuChannel:
    # 省略...
    async def send(self, msg: OutboundMessage) -> None:
        """通过飞书发送消息。"""
        if not self._client:
            logger.warning("飞书客户端未初始化")
            return

        try:
            # 根据 chat_id 格式确定 receive_id_type
            # open_id 以 "ou_" 开头,chat_id 以 "oc_" 开头
            if msg.chat_id.startswith("oc_"):
                receive_id_type = "chat_id"
            else:
                receive_id_type = "open_id"

            # 构建文本消息内容
            content = json.dumps({"text": msg.content})

            request = CreateMessageRequest.builder() \
                .receive_id_type(receive_id_type) \
                .request_body(
                    CreateMessageRequestBody.builder()
                    .receive_id(msg.chat_id)
                    .msg_type("text")
                    .content(content)
                    .build()
                ).build()

            # OpenAPI 调用是同步的,在线程中运行以避免阻塞
            response = await asyncio.to_thread(
                self._client.im.v1.message.create, request
            )

            if not response.success():
                logger.error(
                    f"发送飞书消息失败:code={response.code}, "
                    f"msg={response.msg}, log_id={response.get_log_id()}"
                )
            else:
                logger.debug(f"飞书消息已发送至 {msg.chat_id}")

        except Exception as e:
            logger.error(f"发送飞书消息时出错:{e}")

本质是就是将我们上一篇文章中的 FeishuChannel 类中 _process_and_reply 方法改成 send 方法即可。这样,ChannelManager 就可以统一调用 await channel.send(msg),完全不需要关心飞书 API 的具体细节。

8. 集成到网关启动入口

现在,我们将 MessageBus、AgentLoop、FeishuChannel 和 ChannelManager 全部串联起来。实现如下:

# gateway.py
import os
from loguru import logger
from feishu import FeishuChannel, FeishuConfig
from message_bus import MessageBus
from loop import AgentLoop
from manager import ChannelManager

async def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxx",         # 替换为真实的 App ID
        app_secret="xxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
    deepseek_key = os.getenv("DEEPSEEK_API_KEY", "")
    # 2. 创建总线
    bus = MessageBus()
    # 3. 创建 Agent 循环
    agent = AgentLoop(
        bus=bus,
        api_key=deepseek_key,
        base_url="https://api.deepseek.com",
        model="deepseek-chat",
        max_iterations=20,
    )
    
    # 4. 创建飞书渠道(传入总线,以便它 publish_inbound)
    feishu_channel = FeishuChannel(config=config, bus=bus)
    # 5. 创建渠道管理器,并注册飞书渠道
    channels = ChannelManager(bus=bus)
    channels.register(feishu_channel)
    
    logger.info("正在启动 Mini OpenClaw 网关...")
    
    # 6. 并发运行
    try:
        await asyncio.gather(
            agent.run(),          # 持续消费 inbound 队列,调用 LLM
            channels.start_all(), # 飞书长连接 + 出向派发器
        )
    except KeyboardInterrupt:
        pass
    finally:
        logger.info("收到退出信号,正在关闭...")
        agent.stop()
        await channels.stop_all()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

至此整个网关的运行流程如下:

1. 网关“通电”

  • 我们启动 manager.start_all(),它立刻做了两件事:
    • 先派一个“快递员”(_dispatch_outbound 后台任务)守在 发件箱(outbound 队列) 旁边,随时准备把回复送出去。
    • 然后接通 飞书这个“电话线”feishu_channel.start()),开始等待用户发消息。

2. 用户发来消息

  • 用户在飞书群里说了一句“帮我读一下 /tmp/note.txt”。
  • 飞书适配器收到这条“方言消息”,立即翻译成网关内部的 普通话(InboundMessage),然后丢进 收件箱(inbound 队列)

3. Agent 大脑开始思考

  • agent.run() 一直在盯着 收件箱,一看到有新消息就取出来。
  • 它调用大模型并可能执行工具(比如读取文件),最终生成一段回复文本。
  • 然后把回复包装成 标准包裹(OutboundMessage),扔进 发件箱(outbound 队列)

4. 快递员送货

  • 守在 发件箱 旁边的快递员(_dispatch_outbound)发现新包裹,看看上面写的“收件渠道”是 feishu
  • 他马上找到飞书适配器,把包裹交给它:“请发到这个 chat_id 的群里”。
  • 飞书适配器又把回复从 普通话 翻译回 飞书的方言,调用飞书 API 发回群里。

5. 用户看到回复

  • 用户收到助手返回的文件内容,整个流程结束。

我们上述的 channels.start_all() 方法是还没实现的,我们实现一下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    async def start_all(self) -> None:
        # 省略...
    async def stop_all(self) -> None:
        """优雅停止所有渠道和出站分发器。"""
        logger.info("Stopping all channels...")

        # 第一阶段:取消出站分发器任务
        if self._dispatch_task:
            self._dispatch_task.cancel()
            try:
                await self._dispatch_task
            except asyncio.CancelledError:
                pass

        # 第二阶段:逐个停止渠道(每个渠道的 stop 方法应关闭连接、释放资源)
        for name, channel in self.channels.items():
            try:
                await channel.stop()
                logger.info(f"Stopped {name} channel")
            except Exception as e:
                logger.error(f"Error stopping {name}: {e}")

实现也很简单,首先停止出站分发器的任务,再逐个停止渠道的连接,释放资源。

接着我们启动网关:

python gateway.py

启动结果如下:

01.png

然后我们接着在上一篇文章中设置了的飞书机器人中进行发消息。

然后我们发现报错了:

image.png

报错原因是因为飞书 SDK 的 register_p2_im_message_receive_v1 要求注册一个同步回调函数(不能是 async def),但消息处理逻辑(如解析内容、发布到 MessageBus)是异步的。因此,我们需要实现一个跨线程调度适配器,用于将飞书 WebSocket 线程中的同步回调安全地桥接到 asyncio 主事件循环。

9. 跨线程调度适配器

首先我们需要保存主事件循环对象,我们是在网关启动文件 gateway.py 中通过 asyncio.run(main()) 启动的主循环。因为飞书 WebSocket 客户端运行在一个独立的后台线程中(见 threading.Thread(target=run_ws, daemon=True).start()),它的回调需要一个同步函数,但真正的消息处理逻辑 _on_message 是一个异步协程,需要被提交到主事件循环中执行,因为 MessageBus 等组件是绑定到主循环的。为了从另一个线程安全地将协程投递到主事件循环,就需要持有主事件循环的引用

先保存主事件循环对象:

class FeishuChannel:
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        self.config = config
        self.bus = bus
+        self._loop = None
        self._client = lark.Client.builder() \
            .app_id(config.app_id) \
            .app_secret(config.app_secret) \
            .build()

    async def start(self) -> None:
        # 省略...
+        # 保存主事件循环对象
+        self._loop = asyncio.get_running_loop()
        def run_ws():
            # 省略...

接着我们创建了一个同步函数 _on_message_sync 作为 register_p2_im_message_receive_v1 的实际回调,然后在 _on_message_sync 中将真正异步的处理函数 _on_message 调度到主事件循环中执行。实现如下:

def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None:
    try:
        if self._loop and self._loop.is_running():
            # 将异步处理函数调度到主事件循环
            asyncio.run_coroutine_threadsafe(
                self._on_message(data),
                self._loop
            )
        else:
            # 备用方案:在新事件循环中运行
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(self._on_message(data))
            finally:
                loop.close()
    except Exception as e: logger.error(f"处理飞书消息时出错:{e}")

接着我们修改 register_p2_im_message_receive_v1 的实际回调函数为上述我们实现的 _on_message_sync

class FeishuChannel:
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        # 省略...
    async def start(self) -> None:
        # 省略...
        # 注册接收消息事件处理函数 im.message.receive_v1
-        handler = builder.register_p2_im_message_receive_v1(self._on_message).build()
+        handler = builder.register_p2_im_message_receive_v1(self._on_message_sync).build()
        # 保存主事件循环对象
        self._loop = asyncio.get_running_loop()

总的来说就是在主事件循环中“记住”主循环对象,供后续其他线程通过 asyncio.run_coroutine_threadsafe 将协程调度回主循环执行,是实现跨线程异步任务调度

同时当主事件循环不存在时创建一个全新的临时事件循环,在当前线程(WebSocket 线程)中同步运行 self._on_message(data),执行完毕后关闭循环。

经过上述迭代后,我们再次启动我们的程序:python gateway.py

然我们再在飞书设置的 AI 机器人上跟我们的 Mini OpenClaw 进行对话,结果如下:

1cbfafacd6d84ef03bd64151f081c17a.jpg

然后我们再根目录下创建一个 test.txt 文件,内容为:“从网关的角度理解并实现一个 Mini OpenClaw”,然后在飞书设置的 AI 机器人输入:“帮我读取 test.txt 文件”,结果如下:

e85ee7fd4d5df8c7fa605994b44a19e4.jpg

至此我们的 Mini OpenClaw 就实现了。

10. 总结

经过上述文章我们可以更加透彻地理解为什么说 OpenClaw 可以简单总结为“高级 Agent + 网关”了。它把飞书、微信这些聊天软件的“方言消息”统一通过一个网关转成内部能听懂的“普通话”(InboundMessage),Agent 只处理这种标准消息。

为了防止消息太多堵死系统,用了两个队列(入站异步队列出站异步队列,相当于收信箱和发件箱)把接收和回复解耦开,像流水线一样互不干扰。Agent 处理完后把回复扔进发件箱,再由分发器根据渠道标签(feishu、wechat)转回对应平台的格式发回去。

这样一来,添加新平台就像加个翻译插件,核心代码完全不用动。最后用跨线程调度解决了飞书回调异步的问题。整个网关跑起来就是:用户发消息 → 标准化 → 入站队列 → Agent 思考(可调用工具)→ 出站队列 → 翻译回原平台 → 用户收到回复

上述实现也是港大开源的 Nanobot 的核心实现,Nanobot 可以说是 Python 版的 OpenClaw,是学习研究场景的轻量选择。

我是程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

4.响应式系统基础:从发布订阅模式的角度理解 Vue3 的数据响应式原理

前言

我们从前面的文章中知道的所谓发布订阅模式的本质是不管代码结构如何变化,它的核心都是管理对象间的依赖关系,或者说是事件间的依赖关系,一方变化了,所有跟其建立依赖关系的依赖都将得到通知。同时发布者对象既可以是发布者也可以是订阅者,所以我们不能只从代码组织结构去分辨模式,而是从意图去分辨。

Vue2 的数据响应式的实现,在代码结构层面多少是看得出有经典发布订阅模式的架构影子,所以社区里也有人从发布订阅模式角度去分析过,但 Vue3 的数据响应式的实现从代码结构上来看跟所谓标准的发布订阅模式的代码架构差别是很大的。一般社区作者也不从发布订阅模式的角度去分析它的实现原理,那么今天就让我们从发布订阅模式的角度去理解 Vue3 的数据响应式原理吧。

发布订阅模式原理回顾

我们经过前面的学习,我们很容易通过发布订阅模式初步实现 Vue3 的 reactive API,代码如下:

class Dep {
  constructor() {
    // 订阅者存储中心
    this.subs = []
  }
  // 添加订阅者
  addSub(sub) {
    this.subs.push(sub)
  }
  // 通知订阅者
  notify() {
    this.subs.forEach(sub => sub())
  }
}
const dep = new Dep()
let activeEffect
// reactive
function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
            activeEffect && dep.addSub(activeEffect)
            return Reflect.get(target, key) 
        },
        set(target, key, val) {
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
            dep.notify()
            return result
        }
    })
}

我们就可以进行以下测试了:

const proxy = reactive({ author: 'Cobyte' })

// 订阅者
const subscriber = () => {
    console.log(`我是:${proxy.author}`)
}

activeEffect = subscriber
subscriber()
activeEffect = null

// 修改
proxy.author = 'coboy'

根据上一篇 Vue2 的数据响应式原理的实践,我们可以做小小的优化:

class Dep {
  // 省略...
-  addSub(sub) {
+  addSub() {
    if (activeEffect) {
-        this.subs.push(sub)
+        this.subs.push(activeEffect)
    }
  }
  // 省略...
}
// 省略...
function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
-            activeEffect && dep.addSub(activeEffect)
+            dep.addSub()
            return Reflect.get(target, key) 
        },
        // 省略... 
    })
}

我们上面 reactive 的实现,每个订阅者还不能进行跟每个对象的属性进行隔离的。什么意思呢?看以下测试代码:

const proxy = reactive({ author: 'Cobyte', date: '2024-03-05' })

// 订阅者
const subscriber = () => {
    console.log(`我是:${proxy.author}`)
}
// 订阅者2
const subscriber2 = () => {
    console.log(`日期是:${proxy.date}`)
}
activeEffect = subscriber
subscriber()
activeEffect = subscriber2
subscriber2()
activeEffect = null

// 修改
proxy.author = 'coboy'

测试结果如下:

D01.png

我们可以看到最后修改 author 属性值的时候,两个订阅者函数都执行了。是因为我们在 getter 进行订阅的时候,把不同属性的订阅者都存储在同一个全局变量中了,而在 Vue2 中把每一个属性的消息代理都通过闭包进行了隔离,也就是每一个属性都拥有属于自己的消息代理,相当于每一个属性都是一个发布者。

而 Vue3 中的 Proxy API 很明显不能通过闭包来进行隔离每个属性的消息代理。那么我们根据前面的发布订阅模式的实践理解,还可以通过给消息代理对象通过添加 key 的方式来让订阅者只订阅自己感兴趣的内容。

那么相关代码修改如下:

class Dep {
  constructor() {
    // 订阅者存储中心
-    this.subs = []
+    this.subs = {}
  }
  // 添加订阅者
-  addSub() {
+  addSub(key) {
+    if (!this.subs[key]) {
+        this.subs[key] = []
+    }
    if (activeEffect) {
-    this.subs.push(sub)
+    this.subs[key].push(activeEffect)
    }
  }
  // 通知订阅者
-  notify() {
+  notify(key)
-    this.subs.forEach(sub => sub())
+    this.subs[key].forEach(sub => sub())
  }
}
const dep = new Dep()
let activeEffect
// reactive
function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
-            dep.addSub()
+            dep.addSub(key)
            return Reflect.get(target, key) 
        },
        set(target, key, val) { 
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
-            dep.notify()
+            dep.notify(key)
            return result
        }
    })
}

我们经过上面的修改再进行测试,我们发现已经可以正确打印我们期待的结果了。

D02.png

我们上面实现的 reactive 函数还存在一个问题,我们现在可以通过 key 来把不同订阅者进行分类,但不同的对象中可能会存在相同的 key,例子如下:

const proxy = reactive({ author: 'Cobyte', date: '2024-03-05' })

const proxy2 = reactive({ author: 'Cobyte2' })
// 订阅者
const subscriber = () => {
    console.log(`我是:${proxy.author}`)
}
// 订阅者2
const subscriber2 = () => {
    console.log(`我是:${proxy2.author}`)
}
activeEffect = subscriber
subscriber()
activeEffect = subscriber2
subscriber2()
activeEffect = null

// 修改
proxy.author = 'coboy'

测试结果如下:

D03.png

我们发现我们只修改了 proxy.author 的值,但订阅者2 subscriber2 也执行了,这不是我们期待的结果,所以我们还要迭代我们的功能。

我们既然可以添加 key 来让订阅者订阅自己喜欢的内容,那么是否还可以进行增加 key, 来区分不同的对象呢?我们把对象也当成一个 key,也就是在 getter 添加依赖的时候这样操作:dep.addSub(target, key, activeEffect),那么在 setter 的时候这样操作:dep.notify(target, key)。很明显我们可以通过 Map 来把一个对象作为一个 key。

所以我们对消息代理中心做以下修改:

class Dep {
  constructor() {
    // 订阅者存储中心
-    this.subs = {}
+    this.subs = new Map()
  }
  // 添加订阅者
-  addSub(key) {
+  addSub(target, key) {
+    let depsMap = this.subs.get(target)
+    if (!depsMap) {
+        depsMap = {}
+        this.subs.set(target, depsMap)
+    }
    
-    if (!this.subs[key]) {
-      this.subs[key] = []
-    }
+    if (!depsMap[key]) {
+        depsMap[key] = []
+    }
     if (activeEffect) {
-    this.subs[key].push(activeEffect)
+    depsMap[key].push(activeEffect)
     }
  }
  // 通知订阅者
-  notify(key) {
+  notify(target, key) {
-    this.subs[key].forEach(sub => sub())
+    const depsMap = this.subs.get(target)
+    if (!depsMap) return
+    const deps = depsMap[key] 
+    deps && deps.forEach(sub => sub())
  }
}

接着我们也去修改 reactive 中相关的地方:

function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
-            dep.addSub(key)
+            dep.addSub(target, key)
            return Reflect.get(target, key) 
        },
        set(target, key, val) {
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
-            dep.notify(key)
+            dep.notify(target, key)
            return result
        }
    })
}

我们重新测试,我们发现打印了如期的结果:

D04.png

桶的数据结构设计?

我们看到通过发布订阅模式去理解 Vue3 的数据响应式原理,理解起所谓依赖数据结构 ,是非常好理解的。我们通过由浅入深的地讲解所谓 数据结构的形成,它的形成是自然而然的形成的,而不是一开始就经过特别精心设计的,它没有那么的神秘,它是由最简单的功能一步步迭代形成的,是非常符合我们的日常开发规律的,因为我们日常的应用也是由最简单的功能开始慢慢迭代成非常复杂的功能。一开始所谓 ,只是一个 Array (可以简单理解为:[]) 的结构,后来我们通过增加 key 来区分不同的订阅者,这行为在发布订阅模式中就是通过 key 来让订阅者只订阅自己感兴趣的内容;增加 key 后, 的数据结构变为 Object -> Array (可以简单理解为:{ key: [] }),再后来我们继续增加响应式对象作为 key,来区分不同的属性,避免不同响应式数据中可能存在相同属性的情况。最后我们的 的数据结构变为 Map -> Object -> Array (可以简单理解为:{ target: { key: [] } })。

我们熟悉 Vue3 源码的同学会知道,所谓 的数据结构跟我们上面还是区别的,那其实都是性能优化迭代的结果,我们也可以继续迭代我们的功能。首先是我们的订阅者是通过 Array 的方式存储的,为了防止重复添加订阅者,我们需要在执行完订阅者函数之后把 activeEffect 变量设置为 null,同时也是为了确保只在副作用函数中读取响应式变量才进行依赖收集。我们可以把订阅者的存储方法改成 Set 的数据结构,因为 Set 具有自动去除重复的功能。

相关代码修改如下:

class Dep {
  // 省略...
  addSub(target, key, sub) {
    // 省略...
    if (!depsMap[key]) {
-        depsMap[key] = []
+        depsMap[key] = new Set()
    }
    if (activeEffect) {
-    depsMap[key].push(activeEffect)
+    depsMap[key].add(activeEffect)
    }
  }
  // 省略...
}

经过上面修改,我们的 结构变成了 Map -> Object -> Set。我们还可以继续优化,我们可以把中间的 Object 改成 Map,因为在频繁增删键值对和存储大量数据的场景下 Map 的性能要比 Ojbect 更好。

class Dep {
  // 省略...
  addSub(target, key, sub) {
    let depsMap = this.subs.get(target)
    if (!depsMap) {
-        depsMap = {}
        depsMap = new Map()
        this.subs.set(target, depsMap)
    }
+    let dep = depsMap.get(key)
-    if (!depsMap[key]) {
+    if (!dep) {
-        depsMap[key] = new Set()
+        dep = new Set()
+        depsMap.set(key, dep)
+    }
    if (activeEffect) {
-    depsMap[key].add(activeEffect)
+    dep.add(activeEffect)
    }
  }
  // 通知订阅者
  notify(target, key) {
    const depsMap = this.subs.get(target)
    if (!depsMap) return
-    const deps = depsMap[key] 
+    const deps = depsMap.get(key) 
    deps && deps.forEach(sub => sub())
  }
}

最后我们还可以继续优化的地方就是把存储订阅者的变量 this.subsMap 类型改成 WeakMap 类型。

class Dep {
  constructor() {
    // 订阅者存储中心
-    this.subs = new Map()
+    this.subs = new WeakMap()
  }
}

为什么不采用 WeakMap 而不采用 Map 呢?我们通过下面的一个例子来说明:

const map = new Map()
const weakMap = new WeakMap()

function test() {
    const mapObj = { test: 'mapObj' }
    const weakMapObj = { test: 'weakMapObj' }
    map.set(mapObj, true)
    weakMap.set(weakMapObj, true)
}

test()

console.log('map', map)
console.log('weakMap', weakMap)

我们从打印的结果中可以一目了然地看出两者的区别,WeakMap 对 key 是弱引用的,所谓弱引用就是一旦上下文执行完毕,WeakMap 中 key 对象没有被其他代码引用的时候,垃圾回收器就会把该对象从内存移除。Map 则不会把 key 对象进行移除,这样就会容易导致内存溢出,就算不内存溢出,当数据大的时候,操作性能也会下降,所以 Vue3 源码中就采用了 WeakMap。

最后小结 Vue3 底层源码是使用 WeakMap 和 Map 来构建依赖关系图,具体来说是:

  • targetMap 是一个WeakMap,键是响应式对象(target),值是一个Map(depsMap)。
  • depsMap 的键是对象的属性(key),值是一个Dep(即一个Set),存储了所有依赖该属性的副作用函数。

订阅者中介的实现

我们通过前面的文章对发布订阅模式的学习,可以知道发布者可以抽离一些公共功能统一放到一个中介类中,也就是所谓的事件总线或者消息代理,而订阅者同样也可以进行中介化,从而实现订阅者的多态化。所谓多态就是当不同的对象去执行同一个方法时会产生出不同的状态。我们通过上一篇文章可以知道 Vue2 中所谓的 Watcher 类其实就是订阅者中介,在项目中不同的组件其实底层都是通过 Watcher 类来执行的,而所谓依赖收集,其中收集的是 Watcher,那些响应式数据发生变化后去通知的也是 Watcher,然后再通过 Watcher 去执行具体的组件渲染。

那么 Vue3 的数据响应式也是通过发布订阅模式实现的,那么很自然的也存在订阅者中介。在 Vue3 源码中 ReactiveEffect 类从发布订阅模式的角度理解就是订阅者中介的角色,所以从发布订阅模式的角度理解 Vue3 的数据响应式原理,就非常容易理解为什么要有一个 ReactiveEffect 类了,甚至不用去看具体的实现细节,我们都可以知道 ReactiveEffect 所实现的功能是什么了。

我们知道 Vue2 中的 Watcher 有一个 update 方法,就是在发布者去通知所有订阅者的时候,订阅者统一执行的方法就是 update,那么很明显 ReactiveEffect 也同样需要这样的一个方法,在 Vue3 源码中这个方法叫 run,同样初始化的时候需要接收一个函数作为参数也就是具体订阅者需要做的事情。

ReactiveEffect 的初步实现:

class ReactiveEffect {
    constructor(fn) {
        this._fn = fn
    }
    run () {
        // 根据 Vue2 的数据响应式原理,我们知道在执行具体订阅者函数之前需要把当前订阅者赋值给一个中间变量。
        activeEffect = this
        this._fn()
        // 确保只在副作用函数中读取响应式变量才进行依赖收集
        activeEffect = null
    }
}

然后我们进行测试:

const proxy = reactive({ author: 'Cobyte', date: '2024-03-05' })
const _effect = new ReactiveEffect(() => {
    console.log(`我是:${proxy.author}`)
})
_effect.run()
proxy.author = 'coboy' 

我们可以看到正确打印了结果:

D06.png

同时我们发现 ReactiveEffect 的订阅者函数参数初始化在外部手动执行的,而 Vue2 的 Watcher 中的订阅者函数初始化在 Watcher 内部实例化的时候自动执行的,这个只是设计上区别。

我们把上述实现订阅的过程进行封装一下,那么就是 effect API 了,代码如下:

function effect(fn) {
    const _effect = new ReactiveEffect(fn)
    _effect.run()
}

从发布订阅模式的角度来看本质上 Vue3 的数据响应式实现原理跟 Vue2 的数据响应式原理的实现是一脉相承的。

互为订阅者

我们通过前面文章的学习,我们知道在 Vue2 中会存在发布者中介类 Dep 和订阅者类 Watcher 互为订阅者的情况,场景就是可能会取消某一个副作用函数的中的响应式数据的追踪,比如组件卸载了,那么我们就需要停止组件的依赖追踪。在 Vue3 中自然也存在这种场景,那么也就说在 Vue3 中也存在互为订阅者的情况。但在 Vue3 中的情况又会跟 Vue2 不一样,Vue2 是订阅者 Watcher 类直接订阅发布者中介类 Dep,因为在 Vue2 中每一个 Dep 实例都和一个发布者关联,也就是和每一个属性或者对象进行关联。而在 Vue3 中因为是通过 Proxy API 实现的数据响应式,每一个 Dep 的实例并不对应着具体的属性,所以我们要找到对应具体的属性的记录的变量,其实就是对应 key 的记录变量。

我们再看看 Dep 中关于对应 key 部分的订阅者记录变量部分代码:

class Dep {
  // 省略...
  addSub(target, key) {
    let depsMap = this.subs.get(target)
    if (!depsMap) {
        depsMap = new Map()
        this.subs.set(target, depsMap)
    } 
    let dep = depsMap.get(key)
    if (!dep) {
      dep = new Set()
      depsMap.set(key, deps)
    }
    if (activeEffect) {
      dep.add(activeEffect)
    }
  }
  // 省略...
}

我们可以看到对应每一个 key 的订阅者记录变量是 deps,所以我们只需要把对应的 deps 记录到 ReactiveEffect 中即可。

首先我们修改 ReactiveEffect 类,添加记录变量 deps

class ReactiveEffect {
+    // 记录哪些变量记录了该订阅者,在 Vue2 中则是记录哪些 Dep 记录了该 Watcher
+    deps = []
    // 省略...
}

接着我们在记录响应式数据对象的 key 的消息代理对象的地方把对应的 key 的消息代理对象添加到订阅者 ReactiveEffectdeps 变量中,代码如下:

class Dep {
  // 省略...
  addSub(target, key) {
    // 省略...
    if (activeEffect) {
      deps.add(activeEffect)
+      activeEffect.deps.push(deps)
    }
  }
  // 省略...
}

这样我们就完成了对应 key 的变量对 ReactiveEffect 的订阅,那么有订阅,也就有取消订阅。

取消订阅功能如下:

class ReactiveEffect {
    // 省略...
    // 取消订阅
+    stop () {
+      this.deps.forEach(dep => dep.delete(this))
+    }
}

接着我们再修改 effect API:

function effect(fn) {
    const _effect = new ReactiveEffect(fn)
    _effect.run()
+    return _effect
}

这样我们就可以进行以下测试了:

const proxy = reactive({ author: 'Cobyte', date: '2024-03-05' })

const _effect = effect(() => {
  console.log(`我是:${proxy.author}`)
})
proxy.author = 'Coboy'
// 取消订阅,也就是取消依赖追踪
_effect.stop()
proxy.author = '掘金签约作者'

打印结果如下:

D06.png

我们看到取消依赖追踪后,我们再去修改响应式数据,我们之前设置的订阅者函数就不再执行了,也就是得不到通知了。

那么停止依赖追踪之后,我又想它继续进行依赖追踪呢?这样我们就需要把 ReactiveEffect 中的 run 方法也返回出来。

我们继续进行 effect API 的功能迭代,新的修改如下:

function effect(fn) {
    const _effect = new ReactiveEffect(fn)
    _effect.run()
+    const runner = _effect.run.bind(_effect)
+    runner.effect = _effect
+    return runner 
}

这样我们就可以在取消依赖追踪后,还可以在某个时机中又恢复依赖追踪了,测试代码如下:

const proxy = reactive({ author: 'Cobyte', date: '2024-03-05' })

const runner = effect(() => {
  console.log(`我是:${proxy.author}`)
})
proxy.author = 'Coboy'
// 取消订阅,也就是取消依赖追踪
runner.effect.stop()
proxy.author = '掘金签约作者'
// 恢复依赖追踪
runner()
proxy.author = '恢复依赖追踪了'

我们可以看到如期打印了我们期待的结果:

D07.png

为什么 Vue3 的发布订阅模式不采用传统代码结构?

我们上面实现 Vue2 的数据响应式原理是很明显采用了发布订阅模式的,因为我们存在一个发布者中介类 Dep,这个代码结构跟传统教学中的发布订阅模式中的代码结构是很相似的。但实际上 Vue3 源码中是不存在发布者中介类的,也就是跟传统发布订阅模式的代码结构是不相同的,那么是否意味着 Vue3 并没有采用发布订阅模式呢?答案是否定的,正如我们前面文章中所说的那样,判断模式不能从代码结构上进行判断,而应该从代码意图。

class Dep {
  constructor() {
    // 订阅者存储中心
    this.subs = new WeakMap()
  }
  // 添加订阅者
-  addSub(target, key) {
+  track(target, key) {
    let depsMap = this.subs.get(target)
    if (!depsMap) {
        depsMap = new Map()
        this.subs.set(target, depsMap)
    }
    let dep = depsMap.get(key)
    if (!dep) {
      dep = new Set()
      depsMap.set(key, dep)
    }
    if (activeEffect) {
      dep.add(activeEffect)
      activeEffect.deps.push(dep)
    }
  }
  // 通知订阅者
-  notify(target, key) {
+  trigger(target, key) {
    const depsMap = this.subs.get(target)
    if (!depsMap) return
    const deps = depsMap.get(key)
    deps && deps.forEach(effect => effect.run())
  }
}

上面我们经过对方法名称的修改,我们的代码结构从命名上跟 Vue3 源码有些类似了,我们接着把 Dep 类也去掉:

  // 全局订阅者记录变量
  const targetMap = new WeakMap()
  // 添加订阅者
  function track(target, key) {
    let depsMap = targetMap.get(target)
    if (!depsMap) {
        depsMap = new Map()
        targetMap.set(target, depsMap)
    }
    let dep = depsMap.get(key)
    if (!dep) {
      dep = new Set()
      depsMap.set(key, dep)
    }
    if (activeEffect) {
      dep.add(activeEffect)
      activeEffect.deps.push(dep)
    }
  }
  // 通知订阅者
  function trigger(target, key){
    const depsMap = targetMap.get(target)
    if (!depsMap) return
    const deps = depsMap.get(key)
    deps && deps.forEach(effect => effect.run())
  }

接着我们也要把 reactive 中的相关代码也进行修改:

function reactive(data) {
    return new Proxy(data, {
        get(target, key) {
            // 存在依赖就把依赖收集到依赖存储中心
-            dep.addSub(target, key)
+            track(target, key)
            return Reflect.get(target, key) 
        },
        set(target, key, val) {
            const result = Reflect.set(target, key, val)
            // 值更新了,就需要去把依赖存储中心中的依赖全部重新执行一遍
-            dep.notify(target, key)
+            trigger(target, key)
            return result
        }
    })
}

我们可以看到经过上述修改之后,我们的代码结构跟 Vue3 源码是一模一样的了,但并不是说代码结构变了,模式也变了,上述代码结构依然是发布订阅模式。那么 Vue3 为什么要把依赖收集和依赖触发的函数进行分开呢?主要是因为分开之后依赖收集和依赖触发的函数就可以分别独立导出了,给其他功能 API 比如 ref、computed 使用了,代码可以达到最极致的抽象及复用。

确保只在副作用函数中读取响应式变量才进行依赖收集

不采用 Proxy API 实现数据响应式

因为 Proxy 无法提供对原始值的代理,所以我们需要对原始值的响应式进行特别处理,我们可以使用一层对象作为包裹,间接实现原始值的响应式方案。

当我们不通过 Proxy 实现代理的时候,除了使用 Vue2 中使用的 Object.defineProperty以外,我们还可以根据前面总结的实践规律,我们只需要可以实现在数据读取的时候进行依赖收集,然后在数据更改的时候进行依赖触发就可以了。那么明显我们可以使用在发布订阅模式那篇中讲到的公众号的例子。

// 定义发布者公众号
const weChatOfficialAccount = {
    // 订阅公众号的人的记录列表
    subscribers: [],
    // 文章内容
    article: '原始值内容',
    // 发布文章
    setArticle(value) {
        this.article = value
        // 更新文章的时候通知所有的订阅者
        this.notify()
    },
    // 添加订阅者
    addDep(fn) {
        // 把订阅者添加进记录列表
        this.subscribers.push(fn) 
    },
    // 广播信息
    notify(title) {
        // 发布信息时就是把记录列表中的订阅者全部通知一次
        this.subscribers.forEach(fn => fn(title));
    }
}

上述代码就是前面我们实现公众号讲解发布订阅模式的例子。在上述例子中,我们实现了在数据更新的时候触发依赖,也就是 setArticle 函数。那么我们再实现在数据读取的时候进行依赖收集即可,为了现在这个功能,我们把读取 article 属性值的行为也封装成一个函数。

代码如下:

// 定义发布者公众号
const weChatOfficialAccount = {
    // 订阅公众号的人的记录列表
    subscribers: [],
    // 文章内容
    article: '',
+    getArticle() {
+        return this.article
+    },
    // 省略...
}

这样我们就可以通过以下的方式获取文章内容了:

effect(() => {
    console.log(`原始值内容:${weChatOfficialAccount.getArticle()}`)
})
// 更改内容
weChatOfficialAccount.setArticle(520)

同时我们的发布者的通知函数也需要进行修改:

// 定义发布者公众号
const weChatOfficialAccount = {
    // 省略...
    // 广播信息
-    notify(title) {
+    notify() {
        // 发布信息时就是把记录列表中的订阅者全部通知一次
-        this.subscribers.forEach(fn => fn(title))
+        this.subscribers.forEach(dep => dep.run())
    }
}

那么我们就可以在 getArticle 函数中进行依赖收集了:

// 定义发布者公众号
const weChatOfficialAccount = {
    // 订阅公众号的人的记录列表
    subscribers: [],
    // 文章内容
    article: '',
    getArticle() {
+        // 进行依赖收集,也就是进行订阅
+        if (activeEffect) this.addDep(activeEffect)
        return this.article
    },
    // 省略...
}

这样我们的测试结果如下:

D08.png

我们上述方式是通过一个典型的发布订阅模式来实现对一个对象的观察,当这个对象发生改变之后,所有依赖该对象的订阅者都将得到通知。

我们通过一个工厂函数上面的公众号对象进行进行封装,代码如下:

// ref 工厂函数
function ref(value) {
    return {
        // 订阅公众号的人的记录列表
        subscribers: [],
        // 文章内容
        _value: value,
        getArticle() {
            if (activeEffect) this.addDep(activeEffect)
            return this._value
        },
        // 发布文章
        setArticle(value) {
            this._value = value
            // 更新文章的时候通知所有的订阅者
            this.notify()
        },
        // 添加订阅者
        addDep(fn) {
            // 把订阅者添加进记录列表
            this.subscribers.push(fn) 
        },
        // 广播信息
        notify() {
            // 发布信息时就是把记录列表中的订阅者全部通知一次
            this.subscribers.forEach(dep => dep.run());
        }
    }
}

我们可以看到经过上述的代码封装之后,我们实现了对原始值的响应式。那么接下来我们希望通过普通的方式获取和设置对象的值:

const weChatOfficialAccount = ref('初始值')
effect(() => {
    console.log(`原始值内容:${weChatOfficialAccount.article}`)
})
// 更改内容
weChatOfficialAccount.article = 520

通过前面的学习我们知道除了使用 Object.defineProperty 进行显式声明属性访问器之外,还可以通过字面量的方式,本质还是属性访问器

修改如下:

function ref(value) {
    return {
        // 订阅公众号的人的记录列表
        subscribers: [],
        // 文章内容
        _value: value,
-        getArticle() {
+        get article() {
            if (activeEffect) this.addDep(activeEffect)
            return this._value
        },
        // 发布文章
-        setArticle(value) {
+        set article(value) {
            this._value = value
            // 更新文章的时候通知所有的订阅者
            this.notify()
        },
        // 省略...
}

经过上述修改之后,我们就可以通过属性访问器像普通方式那样访问和设置对象的属性值了。

那么为了跟 Vue3 的 ref API 设计一致,我们把 article 属性改成 value

function ref(value) {
    return {
        // 订阅公众号的人的记录列表
        subscribers: [],
        // 文章内容
        _value: value,
-        get article() {
+        get value() {
            if (activeEffect) this.addDep(activeEffect)
            return this._value
        },
        // 发布文章
-        set article(value) {
+        set value(value) {
            this._value = value
            // 更新文章的时候通知所有的订阅者
            this.notify()
        },
        // 省略...
}

那么改了之后我们的 ref 就跟 Vue3 的一样用法了:

const weChatOfficialAccount = ref('初始值')
effect(() => {
    console.log(`原始值内容:${weChatOfficialAccount.value}`)
})
// 更改内容
weChatOfficialAccount.value = 520

接着我们对依赖收集函数 track 和依赖触发函数 trigger 进行修改让我们的代码尽可能地复用。修改如下:

// 添加订阅者
function track(target, key) {
    // 省略...
    if (activeEffect) {
-        dep.add(activeEffect)
-        activeEffect.deps.push(dep)
+        trackEffect(dep)
    }
}
+ function trackEffect(dep) {
+     dep.add(activeEffect)
+     activeEffect.deps.push(dep)
+ }
// 通知订阅者
function trigger(target, key) {
    // 省略...
-    deps && deps.forEach(effect => effect.run())
+    triggerEffect(deps)
}
+ function triggerEffect(deps) {
+     if(deps) {
+         deps.forEach(effect => effect.run());
+     }
}

接着我们进行重构 ref 函数:

function ref(value) {
    return {
        // 订阅公众号的人的记录列表
-        subscribers: [],
+        dep: new Set()
        // 文章内容
        _value: value,
        get value() {
-            if (activeEffect) this.addDep(activeEffect)
+            if (activeEffect) trackEffect(this.dep)
            return this._value
        },
        // 发布文章
        set value(value) {
            this._value = value
            // 更新文章的时候通知所有的订阅者
-            this.notify()
+            triggerEffect(this.dep)
        },
-        // 添加订阅者
-        addDep(fn) {
-            // 把订阅者添加进记录列表
-            this.subscribers.push(fn) 
-        },
-        // 广播信息
-        notify() {
-            // 发布信息时就是把记录列表中的订阅者全部通知一次
-            this.subscribers.forEach(dep => dep.run());
-        }
-    }
}

我们可以看到经过重构之后,我们的 ref 函数就变得比较整洁了,我们 ref 中的部分发布订阅的功能就和前面 reative 的发布订阅已经实现的功能代码进行了复用。

我们通过前面文章的学习,我们知道 Vue3 的 ref 底层是通过 OOP 的方式进行实现的,但本质还是跟我们上面一样的,那么我们也通过 OOP 的方式实现一遍吧。

实现代码如下:

class RefImpl {
    _value
    dep = new Set()
    constructor(value) {
        // 如果传进来的是对象那么最终还是通过 reactive API 实现数据响应式
        this._value = isObject(value) ? reactive(value) : value
    }
    get value() {
       // 存在依赖就把依赖收集到依赖存储中心
       if (activeEffect) trackEffect(this.dep)
       return this._value 
    }
    set value(val) {
        this._value = val
        // 更新文章的时候通知所有的订阅者
        triggerEffect(this.dep)
    }
}

function ref(value) {
    return new RefImpl(value)
}

最终我们的测试结果还是一样的,这里唯一值得注意的是,如果传进来的是对象那么最终还是通过 reactive API 实现数据响应式。

API 的设计技巧及知识的串联

我们上文中实现的数据响应式代码中,有一个函数的名称叫:observe,还有一个类叫:Observer,在 Vue2 源码中也是这么起名的。那么为什么要这么起名称呢?这么起名称有什么特殊的含义吗?

我们上面这个所谓数据响应式的原理,其实是在观察数据的变化,跟我们在 web 开发中观察 DOM 对象的变化的行为是很像的,甚至可以说本质是一样的。

MutationObserver 与 Vue2 数据响应式的联系

我们如果要观察一个 DOM 对象发生改变了就进行某些操作的话,可以通过 MutationObserver API来实现。例子如下:

// 获取 DOM 对象
const targetNode = document.querySelector('#some-id');

// 观察者回调函数
const subscriber = (mutations) => {
  mutations.forEach((mutation) => {
    if (mutation.type === 'childList') {
      mutation.addedNodes.forEach((addedNode) => {
        console.log(`添加了子元素:${addedNode.nodeName}`);
        // 执行相应的处理逻辑
      });
      mutation.removedNodes.forEach((removedNode) => {
        console.log(`移除了子元素:${removedNode.nodeName}`);
        // 执行相应的处理逻辑
      });
    }
  });
} 

// 创建一个观察器实例并传入回调函数,当观察到变动时便执行回调函数
const observer = new MutationObserver(subscriber);
// 配置需要观察的选项
const config = {
  childList: true, // 观察子元素是否发生变化
};
// 观察 DOM 对象是否发生变化
observer.observe(targetNode, config);

我们从上面的代码可以看出 MutationObserver 所做的事情,跟我们 Vue2 中对响应式数据的监听是一样的。DOM 对象就是我们 Vue2 中的响应式数据,当它发生变化之后就会去触发回调函数执行,相当于 Vu2 中的响应式数据发生改变后会触发 Watcher 一样。所以 MutationObserver 本质也是一个发布订阅模式,但它使用方式跟我们所谓传统的发布订阅模式是不一样的,但正如我们前面说的理解一种模式不应该从代码组织结构去进行分辨,而是意图。

所以我们从 Vue2 的数据响应式实现原理,就可以联系到 MutationObserver,然后联系它们的相同点,从而加深我们对知识的理解。当然尤雨溪当初给 Vue2 对一个对象实现数据响应式的处理函数和类命名为 observeObserver,是否参考了 MutationObserver 的 API 命名规则我们无从考证,但它们的工作方式值得我们联系,从而加深我们的知识理解。

总结

本文从发布订阅模式的核心思想出发,深入剖析了 Vue3 响应式系统的设计本质。发布订阅模式的关键在于管理对象间的依赖关系——一方变化时,所有依赖方都能得到通知,而非拘泥于特定的代码结构。Vue3 虽然不再像 Vue2 那样拥有显式的 Dep 类,但其底层依然遵循这一模式。

通过逐步迭代,我们自然形成了 Vue3 中著名的“桶”数据结构:最初用一个数组存储订阅者,然后按属性 key 分类,再按响应式对象 target 隔离,最终演变为 WeakMap(target) → Map(key) → Set(effect) 的依赖图。这种结构并非凭空设计,而是功能迭代的自然产物,体现了发布订阅模式在 Proxy 场景下的灵活应用。

Vue3 中的 ReactiveEffect 类扮演了订阅者中介的角色,类似于 Vue2 的 Watcher,负责管理具体副作用函数的执行与依赖追踪。通过 effect 函数封装,我们可以轻松创建响应式副作用,并借助 stop 机制实现取消订阅,这体现了订阅者与发布者之间“互为订阅”的关系。

值得注意的是,Vue3 将依赖收集(track)和依赖触发(trigger)拆分为独立函数,而非保留传统的 Dep 类结构。这一设计变化并非模式的改变,而是为了提升代码复用性,让 refcomputed 等 API 也能共享同一套响应式核心。

此外,对原始值的响应式实现(ref)同样基于发布订阅模式——通过属性访问器(getter/setter)在读取时收集依赖,在修改时触发更新。当 ref 包裹对象时,内部会回退到 reactive 处理,保证了逻辑的一致性。

最后,从 API 命名(如 observe / Observer)到与浏览器原生 MutationObserver 的类比,都能看出响应式系统与观察者模式之间的深刻联系。理解这些设计背后的模式思想,远比记忆具体代码实现更有价值。

上述文章写于:2023 年,由于个人原因今年 2026 年发布。

我是程序员Cobyte,现在已转向研究 AI Agent,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

3.响应式系统基础:从发布订阅模式的角度理解 Vue2 的数据响应式原理

前言

关于 Vue2 和 Vue3 的数据响应式原理,相信 Vue 技术栈的同学或多或少都了解过,甚至在简历上写很熟悉,而且我们在第一篇中也已经基本实现过了,但大家是否真的彻底掌握了呢?

正如我们前面所说的那样,Vue2、Vue3、SolidJS、Mobx 它们的数据响应式基本原理都是一样的,具体区别只是设计理念和实现方式不一样。按以前读书时代考试做题一样,它们是同一类型的题目,也就是基于依赖收集和触发的运行时的数据响应式,如果说你只会解答其中一道题,其他的题却不会解答,则说明你并没有真正彻底掌握这一类题。

同样地基于依赖追踪和触发的响应式系统都是通过发布订阅模式进行实现的,那么你知道 Vue 的数据响应式原理中是如何运用发布订阅模式的吗?

所以我们在本篇当中将从发布订阅模式的角度来理解 Vue 的数据响应式原理,彻底掌握数据响应式的基本原理,同时也巩固我们在上一篇中所说的发布订阅模式。我们在上一篇中学习了发布订阅模式,我们都是基于一些 demo 的例子去理解,本篇则真正的把发布订阅模式在实际项目的进行运用。

温馨提示,阅读本本之前最好先阅读前一篇文章,对发布订阅模式有一定的理解

发布订阅模式原理回顾

我们在上一篇中最后是通过 Object.defineProperty 方法对公众号对象 weChatOfficialAccountarticle 属性进行劫持监听,然后在 getter 的时候进行订阅,在 setter 的时候进行发布。

代码如下:

// 定义公众号
const weChatOfficialAccount = {
    // 订阅公众号的人的记录列表
    subscribers: [],
    // 文章内容
    article: '',
    // 添加订阅者
    addDep(fn) {
        // 把订阅者添加进记录列表
        this.subscribers.push(fn) 
    },
    // 广播信息
    notify(title) {
        // 发布信息时就是把记录列表中的订阅者全部通知一次
        this.subscribers.forEach(fn => fn(this.article))
    },
    // 取消订阅
    remove(fn) { 
        // 找到需要删除的订阅者
        const index = this.subscribers.indexOf(fn)
        // 删除订阅者
        this.subscribers.splice(index, 1 )
    }
}
defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者就进行订阅操作
            if (subscriber) weChatOfficialAccount.addDep(subscriber)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            weChatOfficialAccount.notify()
        }
    })
}

我们可以看到 weChatOfficialAccount 对象上有很多属于发布订阅模式中的功能,如果说还有其他对象也需要实现这样的功能,那么也要实现一遍这些功能,很明显这样是不可接受的,我们可以通过上一篇中实现的消息代理来代替 weChatOfficialAccount 对象中的发布订阅模式的功能。

首先我们对消息代理的实现做如下修改:

class Dep {
  constructor() {
    // 订阅者存储中心
    this.subs = []
  }
  // 添加订阅者
  addSub(sub) {
    this.subs.push(sub)
  }
  // 通知订阅者
  notify() {
    this.subs.forEach(sub => sub())
  }
}

如果熟悉 Vue2 数据响应式原理的同学对上面的代码肯定很熟悉,这个就是 Vue2 源码中的 Dep 类的简易实现,所以 Vue2 源码中的 Dep 其实就是一个事件总线或者叫消息代理,但它又不仅仅是消息代理,在某些时刻它同时又是一个订阅者,这个情况就是我们在上篇当中所说的一个对象既可以是发布者也可以是订阅者,而在 Vue2 源码中 Dep 类既是消息代理中心又是订阅者,具体情况我们将下文中进行详细讲解。

接下来我们继续做如下修改:

const weChatOfficialAccount = {
    // 消息代理对象
    __ob__: new Dep(),
    // 文章内容
    article: '', 
}
defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
    // 获取消息代理对象
    const dep = weChatOfficialAccount.__ob__
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者就进行订阅操作
            if (subscriber) dep.addSub(subscriber)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

我们在公众号对象中通过 __ob__ 属性来存储消息代理对象,这样公众号对象原本属于发布订阅的功能就通过 Dep 类来实现了,这样代码的功能职责就梳理得十分清晰了,也符合代码整洁之道。

我们对修改后的代码进行测试:

// 订阅者小明
let subscriber = () => {
    console.log(`收到的公众号文章:${weChatOfficialAccount.article}`)
}
// 读取一次,触发 getter 进行订阅
weChatOfficialAccount.article
// 设置为 null 防止重复订阅
subscriber = null
// 公众号发布文章则直接通过给属性赋值的方式
weChatOfficialAccount.article = '通过 Object.defineProperty 方法实现订阅发布模式'

上述测试代码的测试结果将会打印:

收到的公众号文章:通过 Object.defineProperty 方法实现订阅发布模式

我们通过 Object.defineProperty 方法实现对 weChatOfficialAccount 对象的属性 article 进行监听实现发布订阅功能,然后订阅的时候需要读取一下,触发 getter 进行订阅,这个行为在上述例子中比较奇怪,我们把它改成我们容易理解的例子。

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-9" />
    <title></title>
  </head>
  <body>
    <div id='content'></div>
    <script>
        const weChatOfficialAccount = {
            // 消息代理对象
            __ob__: new Dep(),
            // 文章内容
            article: '文章内容', 
        }
        // 省略 ...
        // 订阅者小明
        let subscriber = () => {
            const el = document.getElementById('content')
            el.textContent = `小明收到的公众号文章:${weChatOfficialAccount.article}`
        }
        // 初始化
        subscriber()
        subscriber = null
    </script>
  <body>
</html>

我们改成我们 web 应用程序中的例子就比较容易理解了。所谓订阅者小明,就是一个 HTML 更新函数,在初始化执行 subscriber() 函数的时候,会读取公众号对象 weChatOfficialAccountarticle 属性的值,这样就会触发 getter 函数进行订阅,在后续当公众号对象 weChatOfficialAccountarticle 属性值发生变化的时候,就会触发 setter 进行发布,也就是重新执行订阅者函数,然后网页内容发生变化。这个也是 Vue2 中的数据响应式的基本原理。

在上述例子中我们只对其中一个属性进行监听,但实际情况很有可能有其他订阅者对其他属性的进行引用。

// 订阅者郭靖
let guojingSubscriber = () => {
    console.log(`郭靖收到的公众号文章作者:${weChatOfficialAccount.author}`)
} 
// 订阅者杨过
let yangguoSubscriber = () => {
    console.log(`杨过收到的公众号发布时间:${weChatOfficialAccount.date}`)
}

然后我们需要对公众号对象进行修改:

const weChatOfficialAccount = {
    // 事件总线对象
    __ob__: new Dep(),
    // 文章内容
    article: '文章内容',
+    author: '作者',
+    date: '日期'
}
defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
+ defineReactive(weChatOfficialAccount, 'author', weChatOfficialAccount.author)
+ defineReactive(weChatOfficialAccount, 'date', weChatOfficialAccount.date)

同时需要对 getter 中的添加订阅者部分进行修改,为了精准添加对应的订阅者,我们需要判断对应的属性:

function defineReactive(data, key, val) {
    // 获取消息代理对象
    const dep = weChatOfficialAccount.__ob__
    Object.defineProperty(data, key, {
        get() {
+            // 为了精准添加对应属性的订阅者,我们需要判断对应的属性
+            if (subscriber && key === 'article') dep.addSub(subscriber)
+            if (guojingSubscriber && key === 'author') dep.addSub(guojingSubscriber)
+            if (yangguoSubscriber && key === 'date') dep.addSub(yangguoSubscriber)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

我们上述代码中为了精准添加对应属性的订阅者,我们需要在 getter 中判断对应的属性,在功能简单的情况下可以,如果功能复杂,对象的属性庞大的情况下,这样肯定是不能接受的。

又因为在触发 getter 的时候,只会是在某个订阅者函数在执行的时候,也就是说在 getter 被触发的时候,这个时候的订阅者是确定的,所以我们可以采用 中间变量 形式来解决这个问题。我们设置一个全局变量 activeEffect,也就是所谓中间变量,然后在初始化执行订阅者函数之前把需要执行的订阅者函数赋值给 activeEffect,然后在 getter 里面就可以把中间变量 activeEffect 通过消息代理对象添加到订阅者记录里面了,然后在执行完该订阅者函数之后则需要把中间变量 activeEffect 设置为 null,防止重复添加。

代码修改如下:

+ // 订阅者中间变量
+ let activeEffect
function defineReactive(data, key, val) {
    // 获取消息代理对象
    const dep = weChatOfficialAccount.__ob__
    Object.defineProperty(data, key, {
        get() {
+            // 存在订阅者中间变量就进行订阅者添加
+            if (activeEffect) dep.addSub(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}
+ // 初始化订阅者
+ activeEffect = subscriber
+ subscriber()
+ activeEffect = null
+ activeEffect = guojingSubscriber
+ guojingSubscriber()
+ activeEffect = null
+ activeEffect = yangguoSubscriber
+ yangguoSubscriber()
+ activeEffect = null

接着我们进行测试:

// 公众号发布文章
weChatOfficialAccount.article = '通过 Object.defineProperty 方法实现订阅发布模式'

打印结果如下:

C01.png

我们就发现我们虽然只对 article 属性进行赋值,但也触发了其他属性的订阅者的执行。那么我们就需要对属性与订阅者之间进行准确关联。那么如何进行准确关联呢?我们通过第一篇文章可以知道,在通过 Object.defineProperty 对每一个属性进行劫持监听的时候,通过闭包的形式把属性值缓存下来的,所以每一个属性的消息代理也放在闭包函数 defineReactive 中。

// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
+    // 通过闭包把每一个属性的消息代理进行缓存
+    const dep = new Dep()
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (activeEffect) dep.addSub(activeEffect)
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

这时我们再进行测试的时候,就可以精准触发订阅者了。

我们上面是通过手动调用 defineReactive 函数进行对象的属性劫持的,我们可以通过获取所有对象的属性然后遍历调用 defineReactive 函数进行对象的属性劫持,同时把这个功能封装成一个工具函数 observe。我们在第一篇中也实现过的了,下面我们来重新实现一下:

function observe (data) {
    Object.keys(data).forEach(key => {
        defineReactive(data, key, data[key]) 
    })
}

这样我们就可以通过 observe 函数来定义一个响应式对象了。

const weChatOfficialAccount = {
    // 文章内容
    article: '文章内容',
    author: '作者',
    date: '日期'
}
- defineReactive(weChatOfficialAccount, 'article', weChatOfficialAccount.article)
- defineReactive(weChatOfficialAccount, 'author', weChatOfficialAccount.author)
- defineReactive(weChatOfficialAccount, 'date', weChatOfficialAccount.date)
+ observe(weChatOfficialAccount)

至此我们便通过发布订阅模式初步实现了 Vue2 的响应式原理。创建一个对象,通过 observe 工具函数遍历此对象所有的 property,并使用 Object.defineProperty 把这些 property 全部转为 getter/setter,然后在 getter 的时候进行依赖收集,在 setter 的时候进行依赖触发。

从发布订阅模式的角度来说就是每一个对象的 property 都是发布者,然后它的消息代理则通过闭包的形式跟每一个 property 的值一起缓存在 defineReactive 闭包函数创建独立空间中,它们是多对多的关系。

小结

在我们一般的发布订阅模式(也叫观察者模式)中,发布者或者被观察者是很明确的,是一个具体的对象,但正如我们前一篇文章所说的那样,发布订阅模式是没有标准范式的,设计模式也是,辨别一种模式不能通过代码结构,而是代码意图。而在我们上述实现的 Vue2 响应式原理的过程中,我们发现其实每一个对象属性(property)都是一个发布者或者叫被观察者,它的发布者功能则通过消息代理中介进行实现,而这个消息代理对象则通过闭包的形式跟每个属性值一起缓存在闭包当中。我们又可以发现所谓发布订阅模式的触发条件也不是唯一的,我们一般的描述定义是,当一个对象发现变化的时候才去触发所有依赖它的订阅者,其实不然,发布订阅模式的触发条件可以是状态的变化、某个操作的变化、甚至是发布订阅者的通知也可以触发另外一个发布者进行发布操作。如果有在 Vue 中使用过事件总线的同学会很清楚,我们在组件中触发通知(emit)订阅者操作的时候并不一定是组件属性发生了变化,而有可能是某个方法触发了通知(emit)订阅者操作。

对数组进行响应式的处理

可以通过 Object.defineProperty 对数组进行监听,但监听不了数组自身的原型链方法,而 pushpopshiftunshiftsplicesortreverse 对数组进行操作是会改变数组的数据结构的,从发布订阅模式的角度来说数据发生变化后我们需要通知该数组对象的所有订阅者。为了实现这需求我们需要劫持数组的操作方法,即在对数组进行 push 等操作的时候我们能监听到。实现方案就是对数组的原型进行重写,重写的方法就是覆盖数组数组对象上的原型对象 __proto__。我们在第一篇当中是通过粗暴的直接覆盖的方式,但那样会把原来的一些数组方法也覆盖掉了,那样是不可取。

我们可以通过获取数组原型上的对象,然后只修改需要修改的方法即可。我们对 observe 方法修改如下:

function observe (data) {
+    // 如果是数组则重写数组上的原型
+    if (Array.isArray(data)) {
+        // 获取数组原型
+        const arrayProto = Array.prototype
+        // 通过 Object.create 创建一个原型为 arrayProto 的空对象
+        const arrayMethods = Object.create(arrayProto)
+        // 修改 push 方法
+        arrayMethods['push'] = function (...args) {
+            // 获取原始方法
+            const original = arrayProto['push']
+            // 执行原始方法
+            const result = original.apply(this, args)
+            return result
+        }
+        // 覆盖原型对象
+        data.__proto__ = arrayMethods
+    } else {
        Object.keys(data).forEach(key => {
          // 如果属性不是 __ob__ 则进行监听
          if (key !== '__ob__')  defineReactive(data, key) 
        })
+    }
}

上述代码我们通过 Object.create 创建一个原型为 arrayProto 的空对象:arrayMethods。然后给空对象设置 push 属性值为一个函数,最终把 arrayMethods 赋值给 data__proto__。这里就涉及到了一个 JavaScript 原型链的基础知识,当我们获取一个对象的属性值的时候,我们优先从该对象的自身属性上去获取,如果找不到则沿着该对象的 __proto__ 属性上的对象上的属性去查找,如果还找不到,则继续沿着 __proto__ 上的对象去查找。

我们经过上面的代码设置之后,我们通过 observe 设置一个数组,那么这个数组的原型对象则变为了arrayMethods,当执行该数组的 push 方法,根据原型链的规则,它会先执行 arrayMethods 对象上的 push 方法,这样我们就可以对该数组的 push 方法进行了监听,我们最终还是通过原本数组上 push 方法进行操作,但我们可以捕捉到了 push 的动作,这样我们就可以在 push 操作之后,进行通知所有该数组上的订阅者了。

我们从前面的发布订阅模式的知识可以知道,一个发布者对象上需要有一个消息代理对象,所以我们需要继续迭代我们的代码:

function observe (data) {
+    // 不存在消息代理则设置消息代理对象
+    if (!data.__ob__) data.__ob__ = new Dep()
    // 如果是数组则重写数组上的原型
    if (Array.isArray(data)) {
        // 获取数组原型
        const arrayProto = Array.prototype
        // 通过 Object.create 创建一个原型为 arrayProto 的空对象
        const arrayMethods = Object.create(arrayProto)
        // 修改 push 方法
        arrayMethods['push'] = function (...args) {
            // 获取原始方法
            const original = arrayProto['push']
            // 执行原始方法
            const result = original.apply(this, args)
+            // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
+            data.__ob__.notify()
             // 同时对新添加的数据也进行响应式化
+             for (let i = 0, l = args.length; i < l; i++) {
+              observe(args[i])
+             }
        }
        // 覆盖原型对象
        data.__proto__ = arrayMethods
    } else {
        Object.keys(data).forEach(key => {
          // 如果属性不是 __ob__ 则进行监听
          if (key !== '__ob__')  defineReactive(data, key) 
        })
    }
}

那么我们这个发布者对象的订阅者在哪里进行添加呢,从数据响应式的角度就是这个响应式对象的依赖在哪里收集呢?

其实不管是对象还是数组的订阅者都是在 getter 中进行添加的。
例如:{ list: [1,2,3,4] }
你要获取到 list 数组的内容,首先是通过 list 这个 property 进行获取的,所以当通过 list 这个 property 进行获取数组内容的时候,就触发了 list 这个 property 的 getter。

所以我们需要对 defineReactive 函数进行修改:

function defineReactive(data, key) {
    let val = data[key]
    // 获取消息代理对象
    const dep = new Dep()
+    // 对获取到的属性值进行递归 observe 监听
+    const childOb = observe(val)
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (activeEffect){
                dep.addSub(activeEffect)
+                // 如果存在 childOb 则说明属性值是一个对象,则也需要对该对象进行订阅者收集,从发布订阅模式的角度看,这是一个订阅操作
+                if (childOb) childOb.addSub(activeEffect) 
            }
            return val
        },
        set(newVal) {
            val = newVal
            // 通知所有的订阅者
            dep.notify()
        }
    })
}

在 getter 中会进行 property 的订阅者添加,收集到的订阅者保存在对应 property 的消息代理对象中,同时也会判断,property 的值如果是一个对象,还会对这个对象进行订阅者添加,收集到的订阅者还会保存到这个对象的消息代理对象上。

所以我们还需要对 observe 函数进行修改:

+ // 判断是否是对象
+ function isObject(obj) {
+     return obj !== null && typeof obj === 'object'
+ }
function observe (data) {
+    // 不是对象则直接返回
+    if (!isObject(data)) return 
    
    // 省略 ...
    
+    // 返回消息代理对象
+    return data.__ob__
}

至此我们对数组的响应式也实现了,接下来就是进行测试:

// 定义公众号
const weChatOfficialAccount = {
    // 文章内容
    article: '',
    author: '',
    date: '',
    arr: ['掘金']
}

observe(weChatOfficialAccount)

// 订阅者小明
let subscriber = () => {
    console.log(`收到的公众号文章:${weChatOfficialAccount.arr.join('====')}`)
}

// 初始化订阅者
activeEffect = subscriber
subscriber()
activeEffect = null
// 公众号发布文章则直接通过给属性赋值的方式
weChatOfficialAccount.arr.push('通过 Object.defineProperty 方法实现订阅发布模式')

打印结果如下:

C03.png

我们可以看到正确打印了结果,也就是我们也实现了对数组的响应式。

通过重构实现 Observer 类

我们上述函数 observe 的实现其实职责是很不清晰的,也不利于后续的维护,所以我们需要对它进行重构。软件应该是“自描述”的,代码除了给机器看之外,也要给人看。我们希望写的代码更易读,让代码可以更好地表达自己的意图。

这里我们涉及到一些开发技巧,我们可以先实现具体的功能,然后再重构,在重构的时候通过封装成抽象的类或者其他函数,让代码可以更好地表达自己的意图。那么 observe 函数中可以将对数组响应式的处理,还有对对象属性循环劫持分别封装成不同的函数,然后通过函数名称可以让我们的代码意图更明显。

我们对 observe 函数进行重构,代码如下:

function observe (data) {
    // 不是对象则直接返回
    if (!isObject(data)) return 
    // 不存在消息代理则设置消息代理对象
    if (!data.__ob__) data.__ob__ = new Dep()
    // 如果是数组则重写数组上的原型
    if (Array.isArray(data)) {
+        protoAugment(data)
    } else {
+        walk(data)
    }
    // 返回消息代理对象
    return data.__ob__
}
+ // 对数组进行响应式处理
+ function protoAugment(target) {
+    // 获取数组原型
+    const arrayProto = Array.prototype
+    // 通过 Object.create 创建一个原型为 arrayProto 的空对象
+    const arrayMethods = Object.create(arrayProto)
+    // 修改 push 方法
+    arrayMethods['push'] = function (...args) {
+        // 获取原始方法
+        const original = arrayProto['push']
+        // 执行原始方法
+        const result = original.apply(this, args)
+        // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
+        target.__ob__.notify()
        // 同时对新添加的数据也进行响应式化
        observeArray(args)
+    }
+    // 覆盖原型对象
+    target.__proto__ = arrayMethods
+ }
+ // 对对象进行响应式处理
+ function walk(obj) {
+    const keys = Object.keys(obj)
+    keys.forEach(key => {
+        // 如果属性不是 __ob__ 则进行监听
+        if (key !== '__ob__')  defineReactive(obj, key) 
+    })
+ }
 // 对数组的每一项元素都进行响应式处理
 function observeArray (items) {
    for (let i = 0, l = items.length; i < l; i++) {
      observe(items[i])
    }
 }

经过我们上面对不同的功能代码进行重构后,我们就可以通过函数名称很容易理解代码的意图了。但我们上面的功能函数还是十分的分散,而它们都是同一种功能类型的函数,都是实现对象响应式的功能函数,所以我们可以通过 OOP 的思想把响应式数据和操作封装到一个类里面,这个类我们把它命名为 Observer

Observer 类的代码实现如下:

class Observer {
    constructor(value) {
        this.value = value
        this.dep = new Dep()
        this.value.__ob__ = this
        if (Array.isArray(value)) {
            // 对数组进行响应式处理
            this.protoAugment(value)
            // 对数组的每一项都进行响应式处理
            this.observeArray(value)
        } else {
            // 对对象进行响应式处理
            this.walk(value)
        }
    }
    // 进行原型重写
    protoAugment(target) {
        // 获取数组原型
        const arrayProto = Array.prototype
        // 通过 Object.create 创建一个原型为 arrayProto 的空对象
        const arrayMethods = Object.create(arrayProto)
        // 修改 push 方法
        arrayMethods['push'] = function (...args) {
            const ob = this.__ob__
            // 获取原始方法
            const original = arrayProto['push']
            // 执行原始方法
            const result = original.apply(this, args)
            // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
            ob.dep.notify()
            // 同时对新添加的数据也进行响应式化
            ob.observeArray(args)
        }
        // 覆盖原型对象
        target.__proto__ = arrayMethods
    }

    // 对对象进行响应式处理
    walk(obj) {
        const keys = Object.keys(obj)
        keys.forEach(key => {
            // 如果属性不是 __ob__ 则进行监听
            if (key !== '__ob__')  defineReactive(obj, key) 
        })
    }
    // 对数组的每一项元素都进行响应式处理
    observeArray (items) {
        for (let i = 0, l = items.length; i < l; i++) {
            observe(items[i])
        }
    }
}

function observe (data) {
    // 不是对象则直接返回
    if (!isObject(data)) return
    const ob = new Observer(data)
    // 返回 Observer 实例对象
    return ob
}

defineReactive 函数也需要进行以下修改:

function defineReactive(data, key) {
// ...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (activeEffect){
                dep.addSub(activeEffect)
                // 如果存在 childOb 则说明属性值是一个对象,则也需要对该对象进行订阅者收集,从发布订阅模式的角度看,这是一个订阅操作
-                if (childOb) childOb.addSub(activeEffect) 
+                if (childOb) childOb.dep.addSub(activeEffect) 
            }
            return val
        },
        set(newVal) {
// ...
        }
    })
}

我们修改后进行重新测试也正常打印了结果:

C03.png

我们还可以进行一下性能优化,我们上述代码 protoAugment 函数部分,我们创建了数组原型对象的变量,而这些变量其实是不会变化,我们可以把它们的声明移到 protoAugment 函数外面,这样每一次调用 protoAugment 函数就不会重复重新创建这些变量了。

+ // 获取数组原型
+ const arrayProto = Array.prototype
+ // 通过 Object.create 创建一个原型为 arrayProto 的空对象
+ const arrayMethods = Object.create(arrayProto)
+ // 修改 push 方法
+ arrayMethods['push'] = function (...args) {
+    const ob = this.__ob__
+    // 获取原始方法
+    const original = arrayProto['push']
+    // 执行原始方法
+    const result = original.apply(this, args)
+    // 因为执行 push 方法,数组数据有变化所以需要通知订阅者
+    ob.dep.notify()
+    // 同时对新添加的数据也进行响应式化
+    ob.observeArray(args)
+ }

class Observer {
    constructor(value) {
        // 省略 ...
        if (Array.isArray(value)) {
            // 对数组进行响应式处理
+            this.protoAugment(value, arrayMethods)
            // 对数组的每一项都进行响应式处理
            this.observeArray(value)
        } else {

        }
    }

+    protoAugment(target, src) {
+        // 覆盖原型对象
+        target.__proto__ = src
+    }
    // 省略 ...
}

至此我们通过重构就实现了 Observer 类,这一节没有涉及到发布订阅模式和数据响应式相关的内容,只是一下编程技巧的内容,而之所以有这一节是为了我们的代码结构更贴近 Vue2 源码的实现。通过这一节的实现,我们也可以知道发布订阅模式是如何在 Vue2 数据响应式中实现的。

那么从发布订阅模式的角度来看所谓 Observer 类,其实是一个发布者或者叫被观察者,虽然它的类命叫 Observer 翻译过叫观察者,但从观察者模式的角度来看,它不能叫观察者,因为它并没有向哪个被观察者进行订阅操作。但它又不是一个纯粹的发布者,它主要作用是将数据对象转换为响应式对象,使得当数据发生变化时能够触发相应的更新操作,它同时通过递归遍历数据对象中的所有属性,为每个属性设置 gettersetter 来实现数据的劫持和监听,从功能上来看它是在观察自己的属性。

从代码结构上来看,它的发布订阅模式的实现跟传统标准的发布订阅模式的结构还是存在很大差别的,但正如我们上篇文章中所说的那样,我们并不能从代码结构上去判断是否属于什么模式,而是从代码意图去判断。

订阅者中介实现

我们知道 Vue2 中的订阅者是通过 Watcher 类来实现的,也就是我们上一篇文章中所讲的订阅者中介

我们先实现一个订阅者中介:

class Watcher {
    constructor(fn) {
        // 让每个订阅者所需要做的事情通过参数的形式传进来,这样更灵活,拓展性更强
        this.getter = fn
        // 初始化的时候直接读取触发订阅者收集,因为这样设计符合 web 应用的特性
        this.get()
    }
    get() {
        // 通过 Dep.target 来设置当前的订阅者是谁
        Dep.target = this
        this.getter()
        Dep.target = null
    }
    // 接受发布者通知的更新方法
    update() {
        this.getter()
    }
}

我们这里设计的订阅者中介类的实现跟我们上一篇中的订阅者中介类的实现,最大的不同就是,这里的设计需要在初始化的时候就要去执行一次订阅者所传的参数函数,因为在 web 应用应用中,应用需要初始化。

我们在实例化订阅者的时候,就把该订阅者需要做的事情当成参数传进去:

new Subscriber(() => {
    console.log(`郭靖收到的公众号文章:${weChatOfficialAccount.article}`)
}) 

同时 Dep 类的 notify 方法也需要修改一下:

class Dep {
  // 通知订阅者
  notify() {
-    this.subs.forEach(sub => sub())
+    this.subs.forEach(sub => sub.update())
  }
}

defineReactive 函数也需要修改:

- // 订阅者中间变量
- let activeEffect
// 通过闭包把劫持的属性值进行缓存
function defineReactive(data, key, val) {
    // 省略 ...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
-            if (activeEffect){
+            if (Dep.target){
-                dep.addSub(activeEffect)
+                dep.addSub(Dep.target)
                // 如果存在 childOb 则说明属性值是一个对象,则也需要对该对象进行订阅者收集,从发布订阅模式的角度看,这是一个订阅操作
-                if (childOb) childOb.dep.addSub(activeEffect) 
+                if (childOb) childOb.dep.addSub(Dep.target) 
            }
            return val
        },
        set(newVal) {
            // 省略 ...
        }
    })
}

这样我们就可以很方便通过一下进行测试了:

// 定义公众号
const weChatOfficialAccount = {
    // 文章内容
    article: '',
    author: '',
    date: '',
    arr: ['掘金']
}
observe(weChatOfficialAccount)
// 初始化订阅者
new Watcher(() => {
    console.log(`收到的公众号文章:${weChatOfficialAccount.arr.join('====')}`)
})

// 公众号发布文章则直接通过给属性赋值的方式
weChatOfficialAccount.arr.push('通过 Object.defineProperty 方法实现订阅发布模式')

打印结果如下:

C03.png

小结

Dep 和 Watcher 互为订阅者

我们通过上文知道 Dep 类其实是一个消息代理或者叫事件总线,而 Watcher 则是一个订阅者,但我们前面也留了一个引子,说它们还有一层关系,就是互为订阅者。那么既然 DepWatcher 互为订阅者,也就是说它们其实也是一个发布者的角色。所以现实系统中的应用远远要比我们所学的所谓标准模式要复杂得多。

我们知道在 Vue2 中可以通过 Options 选项设置 watcher 来实现对响应式数据的监听,其实还可以通过 this.$watcher() 来实现对响应式数据的监听,使用方法都是一样的,唯一的不同就是 this.$watcher() 会返回一个函数,这个函数的作用就是停止对响应式数据的监听。

那么要实现停止对响应式数据的监听则需要知道那些 Dep 记录了当前的 Watcher,我们就需要通知那些 Dep 取消订阅当前的 Watcher。那么要实现这个功能,就需要 Watcher 也进行记录自己订阅了哪些 Dep,当取消对响应式数据的监听的时候,就从当前 Watcher 的订阅记录里去通知那些 Dep 取消自己的订阅。

比如说我们在 Vue2 当中有这么一个功能:

const unwatch = this.$watch(function(){
      return this.name + this.age + this.sex
    }, function(newValue, oldValue){
    console.log('新值为:', newValue)
    console.log('旧值为:', oldValue)
    if (newValue === 'cobyte') {
      // 停止监听
      unwatch()
    }
})

我们接下来去实现这个功能,也就是实现 Vue2 的 $watcher() 功能。以下是 Vue2 官网对 $watcher API 的一些参数和功能的介绍。

  • [vm.$watch( expOrFn, callback, options )]

  • 参数

    • {string | Function} expOrFn

    • {Function | Object} callback

    • {Object} [options]
      
      • {boolean} deep
      • {boolean} immediate
  • 返回值{Function} unwatch

  • 用法

    观察 Vue 实例上的一个表达式或者一个函数计算结果的变化。回调函数得到的参数为新值和旧值。表达式只接受简单的键路径。对于更复杂的表达式,用一个函数取代。

这个 $watcher 的功能从发布订阅模式的角度可以看成是,第一个参数是订阅者要做的事情,第二个参数是在做完事情后拿到结果再通过第二参数输出结果,而且是每次所依赖的响应式数据发生变化后都需要执行第二个参数函数,输出新的结果。

那么我们先实现下面的功能:

new Watcher(function() {
   return `要做的事情,获取文章:${weChatOfficialAccount.article}`
}, function(newValue, oldValue) {
   console.log(`新结果是:${newValue},旧结果是:${oldValue}`)
})

我们对 Watcher 类做以下修改:

class Watcher {
-    constructor(fn) {
+    constructor(fn, cb) {
+        this.cb = cb
       // 让每个订阅者所需要做的事情通过参数的形式传进来,这样更灵活,拓展性更强
       this.getter = fn
       // 初始化的时候直接读取触发订阅者收集,因为这样设计符合 web 应用的特性
-        this.get()
+        this.value = this.get()
   }
   get() {
+        let value
       // 通过 Dep.target 来设置当前的订阅者是谁
       Dep.target = this
+        value = this.getter()
       Dep.target = null
+        return value
   }
   // 接受发布者通知的更新方法
   update() {
     // 获取新值
-      this.getter()
+      const value = this.getter()
+      // 设置旧值
+      const oldValue = this.value
+      // 更新值
+      this.value = value
+      if (this.cb) {
+        // 因为是用户写的,有可能存在错误
+        try {
+          this.cb(value, oldValue)
+        } catch(err) {
+          throw err
+        }
+      }
   }
}

我们进行测试:

weChatOfficialAccount.article = '第一次更新'
weChatOfficialAccount.article = '第二次更新'

测试结果如下:

C04.png

我们看到正确打印了结果。

有了以上的基础功能,接下来我们就很容易实现 $watcher API,代码如下:

function $watcher(expOrFn, cb) {
    const watcher = new Watcher(expOrFn, cb)
}

我们知道 $watcher API 是有很多配置选项的,也就是第三个参数,比如立即执行回调就是通过第三个参数配置 immediatetrue 来实现的,下面我们也来实现它:

function $watcher(expOrFn, cb, options) {
    // 因为 options 有可能不存在,要做兼容处理
    options = options || {}
    const watcher = new Watcher(expOrFn, cb, options)
    // 如果 immediate 为 true 则立即执行回调函数 
    if (options.immediate) {
      try {
        cb(watcher.value)
      } catch (error) {
        throw new Error(error)
      }
    }
}

在 Vue2 中 Watcher 实例是分为系统的 Watcher 和用户的 Watcher 的,像我们在组件里面通过配置 watcher,就是用户的 Watcher,怎么体现区分呢?我们下面来设置,其实也很简单:

function $watcher(expOrFn, cb, options) {
    // 因为 options 有可能不存在,要做兼容处理
    options = options || {}
+    // 设置用户级的 Watcher
+    options.user = true
    const watcher = new Watcher(expOrFn, cb, options)
    // 如果 immediate 为 true 则立即执行回调函数 
    if (options.immediate) {
      try {
        cb(watcher.value)
      } catch (error) {
        throw new Error(error)
      }
    }
}

接着修改 Watcher 类:

class Watcher {
-    constructor(fn, cb){
+    constructor(fn, cb, options) {
+      if (options) {
+          this.user = !!options.user
+      } else {
+          this.user = false
+      }
      // 省略...
    }
    update() {
      // 省略...
-      if (this.cb) {
+      if (this.user) {
        // 省略...
      }
    }
}

修改也很简单,从上面的修改可以看得出,只有用户级的 Watcher 才会在更新的时候执行回调函数。

接下来我们测试立即回调功能:

$watcher(function() {
    return `要做的事情,获取文章:${weChatOfficialAccount.article}`
}, function(newValue, oldValue) {
    console.log(`新结果是:${newValue},旧结果是:${oldValue}`)
}, { immediate: true })

我们可以看到初始化的时候,就立即执行回调函数了。

C05.png

立即执行,旧值为 undefined,符合如期。

实现了上面的基础部分的功能,我们就可以实现重要的功能了,取消订阅。

function $watcher(expOrFn, cb, options) {
     // 省略...
+    // 返回一个可以取消订阅的函数
+    return function unwatchFn () {
+      watcher.teardown()
+    }
}

我们这里通过 Watcher 实例对象的 treardown 方法去取消订阅,其实是要去通知那些记录了该 Watcher 的 Dep 去删除其记录中的该 Watcher。那么我们怎么知道哪些 Dep 记录该 Watcher 呢?所以我们就需要在 Watcher 中记录其订阅了的 Dep。从发布订阅模式的角度来说就是 Dep 要对 Watcher 进行订阅,Dep 是订阅者,Watcher 是发布者,而我们之前是 Watcher 对相关的属性的 Dep 进行订阅,Watcher 是订阅者,相关属性的 Dep 是发布者。

我们首先对 Watcher 实现发布订阅的功能,代码迭代如下:

class Watcher {
    constructor(fn, cb, options) {
        // 省略...
+        this.deps = []
        // 省略...
    }
+    addDep(dep) {
+        this.deps.push(dep)
+    }
+    // 取消订阅
+    teardown() {
+        let i = this.deps.length
+        while (i--) {
+            this.deps[i].removeSub(this)
+        }
+    }
}

我们这里的取消订阅是通过 Watcher 所记录的 Dep 实例对象去执行 Dep 上的 removeSub 方法去把自己删除,这样将来 Dep 触发更新的时候,就通知不了自己了,也就执行不了 update 方法了。

接下来我们实现 Dep 类上的 removeSub 方法,迭代代码如下:

class Dep {
  // 省略..

+  // 取消订阅
+  removeSub (sub) {
+    // 找到需要取消的订阅者
+    const index = this.subs.indexOf(sub)
+    if (index > -1) {
+        // 删除订阅者
+        this.subs.splice(index, 1)
+    }
+  }
  // 省略...
}

我们可以看到 Dep 类中的取消订阅功能,跟普通发布订阅中的取消订阅功能是一样的。

我们前面已经实现了 DepWatcher 的订阅,那么接下来就是 Watcher 怎么对 Dep 进行订阅了。我们知道不管是什么数据类型都是在 getter 中进行依赖收集的,所以要实现 WatcherDep 的订阅,也要从 getter 开始。我们在 getter 里面可以通过 Dep.target 获取到当前的 Watcher,也可以获取到当前属性对应的 Dep 实例对象,那么就可以互相添加订阅者了。

代码迭代如下:

function defineReactive(data, key) {
    // 省略...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (Dep.target){
                dep.addSub(Dep.target)
+                // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
+                Dep.target.addDep(dep)
                if (childOb){
                    childOb.dep.addSub(Dep.target)
+                    // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
+                    Dep.target.addDep(childOb.dep)
                }
            }
            return val
        },
        // 省略...
    })
}

接下来我们就可以进行测试了:

const unwatch = $watcher(function(){
    return weChatOfficialAccount.article + weChatOfficialAccount.author
}, function(newValue, oldValue) {
    console.log('新值为:', newValue)
    console.log('旧值为:', oldValue)
    if (newValue === 'cobyte') {
      // 停止监听
      unwatch()
    }
})

console.log('会打印新值旧值')
weChatOfficialAccount.article = 'co'
console.log('会打印新值旧值')
weChatOfficialAccount.author = 'byte'
console.log('不会打印新值旧值')
weChatOfficialAccount.article = 'cobyte'

我们发现如期打印了我们期待的结果:

C06.png

我们上面在 getter 中对 Dep 和 Watcher 进行相互订阅的操作,还可以进行优化一下,让代码更优雅。

class Dep {
    // 省略...
+  // 通过 depend 方法进行依赖收集
+  depend() {
+    if (Dep.target) {
+      // 在 Dep 中进行 Watcher 
+      Dep.target.addDep(this)
+    }
+  }
  // 省略...
}

接着在 Watcher 中调用 Dep 的方法添加自己

class Watcher {
    // 省略...
    addDep(dep) {
        this.deps.push(dep)
+        // 调用 Dep 实例的添加订阅方法添加自己
+        dep.addSub(this)
    }
    // 省略...
}

接着我们修改 getter 中代码:

function defineReactive(data, key) {
    // 省略...
    Object.defineProperty(data, key, {
        get() {
            // 存在订阅者中间变量就进行订阅者添加
            if (Dep.target){
-                dep.addSub(Dep.target)
-                // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
-                Dep.target.addDep(dep)
+                dep.depend()
                if (childOb){
-                    childOb.dep.addSub(Dep.target)
-                    // 通过 Dep.target 就可以添加 Watcher 对应的 Dep 了
-                    Dep.target.addDep(childOb.dep)
+                    childOb.dep.depend()
                }
            }
            return val
        },
        // 省略...
    })
}

经过我们的重构,getter 中的依赖收集相关代码变得清晰多了。

总结

从发布订阅模式的角度来理解 Vue 的数据响应式原理,就是发布订阅模式的具体运用的过程。

上述文章写于:2023 年,由于个人原因今年 2026 年发布。

我是程序员Cobyte,现在已转向研究 AI Agent,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

❌