阅读视图

发现新文章,点击刷新页面。

微信 ClawBot 接入本地 AI Agent 的实现原理

1. 前言

我们知道微信最近推出了微信 ClawBot,用于在微信中与 OpenClaw 收发消息。掘金签约作者群里的一位大佬说:

01.jpg

受到启发,我就去研究了一下微信半公开的官方文档后发现,我们确实也可以通过用微信 ClawBot 接入任何 AI Agent

至于为什么说官方文档是半公开呢,因为官方暂时还没有公开的文档地址,但又可以通过某些渠道看到(怎么可以看到,本文最后揭晓)。

本文就将带你一步步实现如何通过微信 ClawBot 接入自己开发的 AI Agent。其实我们只需要做三件事:

  1. 扫码登录,拿到微信 ClawBot 的身份凭证;
  2. 长轮询等待消息,一有消息立刻获取;
  3. 把消息交给本地 Agent 处理,再把回复发回微信。

第一步,扫码登录。

2. 扫码登录

根据微信 ClawBot 文档的要求,我们先要获取一个二维码,等用户用微信扫描并确认后,服务端就会返回一个 bot_token 的通信凭证,后续所有请求都必须带着这个 token。这个跟我们平时的开发是一样,我们登录之后才能进行操作。

2.1 拉取二维码

首先微信 ClawBot 的接口地址是:

BASE_URL = "https://ilinkai.weixin.qq.com"

其次,登录的第一步是向服务端请求二维码。我们根据微信 ClawBot 的文档可以知道请求的接口是:

ilink/bot/get_bot_qrcode?bot_type=3

请求的 HTTP 方式是 GET。值得注意的是参数 bot_type 在微信 ClawBot 的文档中只出现了在获取二维码的时候,值是 3,而其他枚举值的情况,文档中并没有说明。

因为要用到 HTTP 的 GET 请求,所以我们需要封装一个 GET 请求的方法:

import json
import urllib.request
import urllib.error

# 省略...

def _get(url: str, headers: dict = {}, timeout: int = 35) -> dict:
    """发送 GET 请求"""
    req = urllib.request.Request(url, headers=headers, method="GET")
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} GET {url}: {e.read().decode(errors='replace')}") from e

接着我们封装拉取二维码的请求函数:

def fetchQRCode():
    base = BASE_URL.rstrip("/") + "/"
    url = base + "ilink/bot/get_bot_qrcode?bot_type=3"
    resp = _get(url)                      # GET 请求
    qrcode_raw = resp.get("qrcode")       # 服务端用于轮询的标识
    qrcode_url = resp.get("qrcode_img_content")   # 可扫描的二维码链接
    return qrcode_raw, qrcode_url

微信 ClawBot 服务端返回的 qrcode_img_content 是一个可以直接扫码的链接。我们在终端里可以通过安装 qrcode 库把它打印成 ASCII 二维码或者直接打印链接让用户打开链接,通过手机微信进行扫码。字段 qrcode 则是服务端的二维码标识,用于后续轮询二维码的状态,是否已经被扫码等。

我们测试一下上述代码:

print(fetchQRCode())

打印结果如下:

('50189c0db1817eb74a2bfc11e4ccdb35', 'https://liteapp.weixin.qq.com/q/7GiQu1?qrcode=50189c0db1817eb74a2bfc11e4ccdb35&bot_type=3')

我们打开上述链接在浏览器打开是一个微信二维码。

image.png

2.2 轮询扫码状态

二维码生成后,我们每隔一秒向微信 ClawBot 提供的二维码状态查询接口进行请求,直到用户完成确认。接着我们封装一个轮询扫码状态的请求接口函数:

def pollQRStatus(qrcode_raw):
    base = BASE_URL.rstrip("/") + "/"
    poll_url = base + f"ilink/bot/get_qrcode_status?qrcode={urllib.parse.quote(qrcode_raw)}"
    deadline = time.time() + 480    # 最多等 8 分钟
    # 这个是微信 ClawBot 规定的,没得解析
    headers = { "iLink-App-ClientVersion": "1" }

    while time.time() < deadline:
        try:
            s = _get(poll_url, headers)
        except Exception as e:
            print(f"  [轮询错误] {e}", flush=True)
            time.sleep(2)
            continue

        status = s.get("status", "wait")

        if status == "wait":
            # 还没扫,继续等,打一个点表示进度
            sys.stdout.write(".")
            sys.stdout.flush()

        elif status == "scaned":
            # 已经扫了,等用户在微信里点确认
            print("\n👀 已扫码,请在微信中点击确认...", flush=True)

        elif status == "confirmed":
            # ✅ 用户点了确认,登录成功!
            token      = s.get("bot_token", "")
            account_id = s.get("ilink_bot_id", "")
            # 账号 ID 规范化:把 @ 和 . 换成 -,例如 abc@im.wechat → abc-im-wechat
            account_id = account_id.replace("@", "-").replace(".", "-")
            real_base  = s.get("baseurl") or BASE_URL
            print(f"\n✅ 登录成功!account_id={account_id}", flush=True)
            return {"token": token, "account_id": account_id, "base_url": real_base}

        elif status == "expired":
            raise RuntimeError("二维码已过期,请重新运行程序。")
        # 每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力
        time.sleep(1)
    raise RuntimeError("登录超时(8分钟),请重试。")

上述函数主要是实现了根据状态码处理不同情况。服务端返回的 JSON 中包含 status 字段,表示当前二维码的状态。我们根据其值进行分支处理:

status == "wait"

  • 表示二维码尚未被扫描。
  • 在终端打印一个点 .(不换行),表示程序仍在等待,给用户视觉反馈。

status == "scaned"

  • 表示用户已经扫描了二维码,但尚未在微信中点击“确认”。
  • 打印提示信息 👀 已扫码,请在微信中点击确认...,告知用户当前进度。

status == "confirmed"

  • 成功状态:用户已确认,登录成功。
  • 从响应中提取 bot_tokenilink_bot_id(机器人唯一标识)、baseurl(可选的后端地址)。
  • 并且对 account_id 进行规范化处理,将 @ 和 . 替换为 -,例如 abc@im.wechat 变为 abc-im-wechat
  • 打印成功信息,并返回一个包含 tokenaccount_idbase_url 的字典,供上层保存和后续请求使用。

status == "expired"

  • 二维码已过期(我们这里设置 8 分钟未扫描或确认即为过期)。
  • 抛出 RuntimeError,提示用户重新运行程序获取新二维码。

最后,每次轮询后(除非已返回成功)都会休眠 1 秒,避免高频请求对服务端造成压力。

2.3 实现登录并保存 token 到本地

我们在上面实现了拉取二维码的函数 fetchQRCode 和轮询等待扫码确认的函数 pollQRStatus,我们就可以实现一个登录函数 login 将整个流程串联起来了。实现如下:

def login() -> dict:
    """
    扫码登录,返回 {"token": "...", "account_id": "...", "base_url": "..."}
    """
    # ── 第 1 步:拉取二维码 ──
    [qrcode_raw, qrcode_url] = fetchQRCode() 

    if not qrcode_raw:
        raise RuntimeError(f"获取二维码失败")

    # ── 第 2 步:在终端打印二维码 ──
    print("\n请用微信扫描下方二维码:\n", flush=True)
    try:
        import qrcode                          # pip install qrcode[pil]
        qr = qrcode.QRCode(version=1, border=1)
        qr.add_data(qrcode_url)
        qr.make(fit=True)
        qr.print_ascii(invert=True)            # 用 ASCII 字符在终端渲染,尺寸最小
    except ImportError:
        # 没有安装 qrcode 库时,直接打印链接,用浏览器打开也能扫
        print(f"  {qrcode_url}\n", flush=True)

    # ── 第 3 步:轮询等待扫码 ──
    print("等待扫码...", flush=True)
    return pollQRStatus(qrcode_raw) 

上述登录函数最后返回的数据结构如下:

{
'token': 'd5b3973bb743@im.bot:060000215ac9e1ce7116aeb48b3d998c5b2e4e', 
'account_id': 'd5b3973bb743-im-bot',
'base_url': 'https://ilinkai.weixin.qq.com
'}

为了下次启动不用重新扫码,我们把 token 和 account_id 保存到文件 .weixin_token.json 中:

# token 的本地文件路径
TOKEN_FILE  = Path(__file__).parent / ".weixin_token.json"   # 保存登录后的 token

# 保存 token 和账号
def save_token(data: dict) -> None:
    """把 token 信息保存到本地文件,下次启动不用重新扫码。"""
    TOKEN_FILE.write_text(json.dumps(data, indent=2, ensure_ascii=False), "utf-8")
    TOKEN_FILE.chmod(0o600)    # 仅当前用户可读,保护 token 安全

这样,下次运行程序时,如果文件存在就直接加载,跳过登录流程。所以我们还需要有一个从本地文件读取上次保存的 token 和账号的函数。实现如下:

def load_token() -> Optional[dict]:
    """从本地文件读取上次保存的 token(如果有的话)。"""
    if TOKEN_FILE.exists():
        try:
            return json.loads(TOKEN_FILE.read_text("utf-8"))
        except Exception:
            pass
    return None

所以整个主流程就是:

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()

    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    # todo run_monitor()

登录完成之后,接下来就是进入消息监听循环了。

3. 长轮询监听循环

拿到 token 后我们就需要发起一个长轮询的 HTTP 接口请求,微信 ClawBot 服务端会“憋着”不返回,直到有新消息或超时(约 35 秒)才返回。这个接口就是:

ilink/bot/getupdates

它采用 POST 方式请求,所以我们需要封装一个 POST 请求的方法,并且微信 ClawBot 的所有 POST 的请求都需要一个通用请求头,通用请求头的要求如下:

Header 说明
Content-Type application/json
AuthorizationType 固定值 ilink_bot_token
Authorization Bearer <token>(登录后获取)
X-WECHAT-UIN 随机 uint32 的 base64 编码

所以我们先封装一个每次请求都需要带的 HTTP 请求头的函数:

def _headers(token: Optional[str] = None) -> dict:
    """
    构造每次请求都需要带的 HTTP 请求头。
    - AuthorizationType: 固定值,告诉服务端这是 bot token 认证
    - Authorization: 登录拿到的 token,未登录时不带
    - X-WECHAT-UIN: 随机数的 base64,模拟微信客户端标识
    """
    h = {
        "Content-Type": "application/json",
        "AuthorizationType": "ilink_bot_token",
        # 随机 uint32 → 十进制字符串 → base64,与原版协议一致
        "X-WECHAT-UIN": base64.b64encode(
            str(struct.unpack(">I", os.urandom(4))[0]).encode()
        ).decode(),
    }
    if token:
        h["Authorization"] = f"Bearer {token}"
    return h

接着我们封装一个通用 POST 请求函数:

def _url(path: str) -> str:
    """拼接完整 URL,确保 BASE_URL 末尾有斜杠。"""
    base = BASE_URL.rstrip("/") + "/"
    return base + path
    
def _post(path: str, body: dict, token: Optional[str] = None, timeout: int = 15) -> dict:
    """
    发送 POST JSON 请求,返回解析后的响应字典。
    所有与微信后端的通信都走这个函数。
    """
    data = json.dumps(body).encode("utf-8")
    req  = urllib.request.Request(
        _url(path),
        data    = data,
        headers = {**_headers(token), "Content-Length": str(len(data))},
        method  = "POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        raise RuntimeError(f"HTTP {e.code} /{path}: {e.read().decode(errors='replace')}") from e

所有与微信 ClawBot 后端的通信都走这个 _POST 函数。

接着我们封装长轮询接收消息函数:

def getUpdates(token: str, buf: str = "", timeout: int = 35) -> dict:
    """
    长轮询接口:向服务端发请求,服务端"憋着"不回,直到有新消息或超时才返回。

    参数:
      buf     - 上次返回的游标,传给服务端表示"从这里继续",首次传空字符串
      timeout - 等待秒数,服务端通常在 35 秒内有消息就返回,无消息就返回空

    返回值里的重要字段:
      msgs            - 新消息列表(可能为空)
      get_updates_buf - 新游标,下次请求要带上它
    """
    try:
        return _post(
            "ilink/bot/getupdates",
            body    = {"get_updates_buf": buf, "base_info": {"channel_version": "mini-bridge-1.0"}},
            token   = token,
            timeout = timeout + 5,    # 客户端超时比服务端多 5 秒,避免误判
        )
    except (TimeoutError, OSError) as e:
        if "timed out" in str(e).lower():
            # 超时是正常现象,不是错误,直接返回空结果,继续下一轮
            return {"ret": 0, "msgs": [], "get_updates_buf": buf}
        raise

上述 getUpdates 函数返回两个重要的字段 msgs(消息列表)和 get_updates_buf(游标),消息列表我们很好理解,但游标我们需要了解一下。

什么是游标?

游标就是一个字符串,每次 getUpdates 返回时都会同时返回一个新的游标,表示“下一次请求从这里开始”。我们可以把游标持久化到文件 .weixin_buf.txt 中,这样即使程序重启,也能接着之前的位置继续收消息,不会漏掉中间的消息。

游标存储的本地文件路径设置如下:

# 游标存储的本地文件路径
BUF_FILE    = Path(__file__).parent / ".weixin_buf.txt"      # 保存消息游标(断点续传)

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的字段如下:

字段 类型 说明
ret number 返回码,0 = 成功
errcode number? 错误码(如 -14 = 会话超时)
errmsg string? 错误描述
msgs WeixinMessage[] 消息列表(结构见下方)
get_updates_buf string 新的同步游标,下次请求时回传
longpolling_timeout_ms number? 服务端建议的下次长轮询超时(ms)

根据上述的资料我们就可以实现对 getUpdates 接口的长轮询监听循环了,实现如下:

def run_monitor(token: str) -> None:
    """
    长轮询监听循环
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

上述实现 run_monitor 函数目前能长轮询监听微信 ClawBot 服务器消息的接收。主要功能如下:

  • 首先加载之前保存的消息游标(buf)实现断点续传。

  • 进入监听循环:

    1. 调用 getUpdates(token, buf) 获取消息(可能阻塞直到有消息或超时)。
    2. 如果调用失败(异常),则增加失败计数,连续失败 3 次后休眠 30 秒再重试;否则休眠 2 秒后继续。
    3. 如果返回的业务错误码非 0,则打印错误并休眠 2 秒后继续。
    4. 成功获取响应后,提取 get_updates_buf 新游标,更新 buf 并持久化到文件,以便下次重启恢复

我们知道上述 getUpdates 接口还返回了 msgs 消息列表,我们需要遍历返回的消息列表,提取文本,交给本地 AI Agent 进行处理。

4. 处理返回的消息

根据微信 ClawBot 的官方文档我们可以知道 getUpdates 接口返回的 msgs 消息列表字段结构如下:

字段 类型 说明
seq number? 消息序列号
message_id number? 消息唯一 ID
from_user_id string? 发送者 ID
to_user_id string? 接收者 ID
create_time_ms number? 创建时间戳(ms)
session_id string? 会话 ID
message_type number? 1 = USER, 2 = BOT
message_state number? 0 = NEW, 1 = GENERATING, 2 = FINISH
item_list MessageItem[]? 消息内容列表
context_token string? 会话上下文令牌,回复时需回传

然后字段 item_list(消息内容列表)的字段结构又如下:

字段 类型 说明
type number 1 TEXT, 2 IMAGE, 3 VOICE, 4 FILE, 5 VIDEO
text_item { text: string }? 文本内容
image_item ImageItem? 图片(含 CDN 引用和 AES 密钥)
voice_item VoiceItem? 语音(SILK 编码)
file_item FileItem? 文件附件
video_item VideoItem? 视频
ref_msg RefMessage? 引用消息

根据上述资料我们就可以处理微信 ClawBot 服务器返回的消息了。处理如下:

def run_monitor(token: str) -> None:

    # 省略...

    while True:

        # 省略...

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

实现也很简单,遍历 msgs 消息列表,然后再从消息的 item_list 里找 type=1(文本)的那一项。而非文本消息(图片、语音等)我们暂不处理,先跑通主流程再说。

经过上述处理后我们就拿到了微信 ClawBot 服务器返回的文本消息了,我们接着就把它交给本地 Agent 进行处理。

5. 接入本地 AI Agent

前面的步骤已经实现了本地接收到微信 ClawBot 发来的信息了,现在就需要接入一个本地 AI Agent 来处理微信用户发来的信息了。接入本地 AI Agent 也很简单,我们前面的文章已经实现了一个 Agent Loop,我们直接使用就可以了。

我们定义一个函数 askAgent,用它来管理每个用户的对话历史,并将用户的新消息交给 Agent 处理:

# ── 导入本地 Agent ──
from agent import agent_loop, SYSTEM as AGENT_SYSTEM

# 每个微信用户维护一份独立的对话历史,key 是用户 ID
_sessions: dict[str, list] = {}

def askAgent(user_id: str, user_text: str) -> str:
    """
    把用户的消息交给 Agent 处理,返回 Agent 的回复文本。

    - 每个用户有自己独立的对话历史(_sessions),实现多用户隔离
    - agent_loop 会循环调用大模型直到得到最终回复
    """
    # 第一次对话时,初始化这个用户的历史,带上系统提示词
    if user_id not in _sessions:
        _sessions[user_id] = [{"role": "system", "content": AGENT_SYSTEM}]

    # 把用户这条消息追加到历史
    _sessions[user_id].append({"role": "user", "content": user_text})

    # 交给 Agent 处理,agent_loop 会直接修改传入的列表(追加 assistant 回复)
    try:
        reply = agent_loop(_sessions[user_id])
        return reply or "(无回复)"
    except Exception as e:
        return f"[Agent 出错] {e}"

我们上述函数 ask_agent 实现了把用户的消息交给 Agent 处理,然后返回 Agent 的回复文本,并且还实现每个用户有自己独立的对话历史,实现了多用户隔离。

接下来就是把 Agent 的回复发回微信。

6. 把 Agent 的回复发回微信

回复消息的接口是 ilink/bot/sendmessage,它最重要的参数是 context_token,这个 token 是从收到的消息里原样取出的,服务端依靠它来将回复与对话关联起来(类似于会话 ID)。我们来实现一个 sendMessage 函数进行发送信息:

def sendMessage(token: str, to_user_id: str, text: str, context_token: str) -> None:
    """
    向微信用户发送一条文本消息。

    重要:context_token 必须原样从收到的消息里取出并回传,
    服务端靠它把回复和对话关联起来。没有它,消息发不出去。
    """
    _post(
        "ilink/bot/sendmessage",
        token = token,
        body  = {
            "msg": {
                "from_user_id" : "",                            # bot 发送,留空
                "to_user_id"   : to_user_id,                   # 发给谁
                "client_id"    : f"mini-{secrets.token_hex(8)}",  # 本次消息的唯一ID,防重复
                "message_type" : 2,                            # 2 = BOT 消息
                "message_state": 2,                            # 2 = 消息已完成(非流式)
                "item_list"    : [{"type": 1, "text_item": {"text": text}}],  # type=1 是文本
                "context_token": context_token,                # ← 关键!必须带上
            },
            "base_info": {"channel_version": "mini-bridge-1.0"},
        },
    )

这里我们固定使用 message_type=2(机器人消息)、message_state=2(已完成,非流式)。client_id 是消息的唯一标识,用于去重,这里随机生成即可。

7. 整合运行

现在我们就可以把登录、收消息、处理消息、发消息串起来了,整合运行。

def run_monitor(token: str) -> None:
    """
    长轮询监听循环:持续等待微信消息,收到后交给 Agent 处理并回复。

    整个循环做三件事:
      1. 调 getUpdates() 等消息(服务端"憋着",有消息才返回)
      2. 遍历返回的消息列表,提取文本,交给 ask_agent() 得到回复
      3. 调 sendMessage() 把回复发回给用户

    参数:
      token - 登录后拿到的 bot_token,每次请求都要带上
    """

    # ── 加载上次的消息游标(断点续传)──
    # 游标让服务端知道"从哪条消息开始",程序重启后不会漏掉中途的消息
    buf = BUF_FILE.read_text("utf-8").strip() if BUF_FILE.exists() else ""
    if buf:
        print("[✓] 从上次游标恢复", flush=True)
    print("[监听中] 等待微信消息...\n", flush=True)

    fail_count = 0    # 连续失败计数,失败太多就暂停一会儿

    while True:

        # ── 第一件事:等消息 ──
        try:
            resp = getUpdates(token, buf=buf)
        except Exception as e:
            # 网络抖动或服务端异常,失败超过 3 次才真正暂停
            fail_count += 1
            print(f"[错误] getUpdates 失败 ({fail_count}/3): {e}", flush=True)
            if fail_count >= 3:
                print("[退避] 连续失败 3 次,等待 30 秒后重试...", flush=True)
                fail_count = 0
                time.sleep(30)
            else:
                time.sleep(2)
            continue

        fail_count = 0

        # 服务端返回了业务错误码,打印后稍等再重试
        if resp.get("ret", 0) != 0 or resp.get("errcode", 0) != 0:
            print(f"[服务端错误] {resp}", flush=True)
            time.sleep(2)
            continue

        # 更新并持久化游标(下次重启可以从这里接着取消息)
        new_buf = resp.get("get_updates_buf", "")
        if new_buf:
            buf = new_buf
            BUF_FILE.write_text(buf, "utf-8")

        # ── 第二件事 + 第三件事:处理每条消息,回复用户 ──
        for msg in resp.get("msgs") or []:

            # 只处理用户发来的消息(message_type=1),忽略 bot 自己发的(=2)
            if msg.get("message_type") != 1:
                continue

            from_user = msg.get("from_user_id", "")
            ctx_token = msg.get("context_token", "")  # ← 必须原样回传给 send_message

            # 从消息的 item_list 里找 type=1(文本)的那一项
            text = ""
            for item in msg.get("item_list") or []:
                if item.get("type") == 1:               # type=1 是文本消息
                    text = (item.get("text_item") or {}).get("text", "")
                    break

            if not text.strip():
                continue    # 非文本消息(图片、语音等)暂不处理

            print(f"[收到] {from_user}: {text[:60]}", flush=True)

            # 第二件事:把文本交给 Agent,得到回复
            reply = askAgent(from_user, text)
            print(f"[回复] {reply[:60]}", flush=True)

            # 第三件事:把 Agent 的回复发回微信
            try:
                send_message(token, from_user, reply, ctx_token)
                print("[✓] 已发送", flush=True)
            except Exception as e:
                print(f"[✗] 发送失败: {e}", flush=True)

最后,在主函数中,我们读取或登录获取 token,然后启动 run_monitor

def main():
    """
    程序入口,只做两件事:
      1. 登录(拿 token)
      2. 调 run_monitor() 开始监听
    """

    # 优先读取上次保存的 token,有就跳过扫码
    creds = load_token()
    # 不存在 token 就扫码登录
    if not creds:
        print("=== 微信扫码登录 ===", flush=True)
        creds = login()
        save_token(creds)
        print(f"[✓] token 已保存到 {TOKEN_FILE}", flush=True)

    token      = creds["token"]
    account_id = creds["account_id"]
    base_url   = creds.get("base_url", BASE_URL)
    print(f"\n[启动] account={account_id}  base={base_url}", flush=True)

    # 登录完成,进入消息监听循环
    run_monitor(token)

在运行前我们需要安装一下相关依赖。

requirements.txt 内容如下:

openai==2.24.0
itchat-uos>=1.3.10
qrcode_terminal == 0.8.0

然后执行:

pip install -r requirements.txt

接着我们运行上述代码结果显示如下:

image.png

接着我们使用微信扫码结果显示如下:

image.png

我们点击按钮继续,这时可以看到终端显示如下:

image.png

微信端显示如下:

image.png

聊天栏显示:

c4a8f799d97ee7c9a29b75fa359f5263.jpg

这时我们就可以通过微信 ClawBot 和我们本地自己写的 Agent 进行通讯了。比如我们之前实现的一个可以读取本地文件的 AI Agent,我们创建一个测试文件 test.txt,写上以下内容:

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人

然后在微信 ClawBot 中输入:帮我读取 test.txt 的文件内容,显示如下

image.png

终端内容显示如下:

image.png

8. 总结与扩展

通过本文,我们完整实现了一个基于微信 ClawBot 协议的机器人,它能够:

  • 通过扫码登录
  • 长轮询接收消息
  • 调用任意本地 AI Agent 处理消息
  • 将回复发回给微信用户

整个程序的核心代码不到 200 行,却涵盖了微信 ClawBot 协议的关键点。你可以在此基础上轻松扩展:

  • 支持多轮对话:通过会话历史管理,我们已经实现了多轮对话的基础。
  • 支持图片、语音:解析消息中的 item_list,识别图片或语音,调用相应的 AI 模型(如图像识别、语音转文字)。
  • 支持命令识别:在文本中检测特定前缀(如 /help),触发不同功能。
  • 接入更强大的 Agent:例如集成 LangChain 实现复杂工作流、接入 Ollama 或 vLLM 等本地推理框架运行开源大模型、或增加联网搜索、RAG(检索增强生成)等能力。

最重要的是,这套方法不依赖任何第三方中间件,完全基于微信官方 ClawBot 协议,相对稳定可靠。你只需要一个微信账号,就能让你的 AI 助手 7×24 小时在线。

希望本文能帮你打开一扇窗,让你在微信这个庞大的社交平台上,用自己的 AI 能力创造更多有趣的应用。动手试一试吧,你会发现过程比想象中简单许多!

我是 程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI 全栈。

最后怎么查看微信 ClawBot 的官方文档,可以通过 npm 安装 @tencent-weixin/openclaw-weixin-cli@tencent-weixin/openclaw-weixin 包,然后在 node_modules 目录中找对应的包里面有源码和文档。当然微信团队不公开可能后续会随时改变策略,所以须谨慎评估风险。

❌