概述
在 OpenClaw 微信插件的开发过程中,一个核心挑战是如何实现可靠的出站消息发送(Outbound Messaging)。微信后端 API 要求每条出站消息都必须携带一个 context_token,这个令牌是通过 getupdates 接口在接收消息时返回的。原始实现将 contextToken 仅存储在内存中,导致每次网关重启或使用 CLI 命令时,出站消息发送都会失败。
本文将详细介绍如何通过引入持久化的 Context Token 存储机制,解决这一问题,从而实现稳定可靠的自定义消息发送能力。
问题背景
微信 API 的 Context Token 机制
微信的消息协议设计了一个重要的安全机制:context_token。这个令牌具有以下特点:
-
按消息发放:每次调用
getupdates 接口获取新消息时,服务器会为该对话返回一个 context_token
-
发送时必须携带:调用
sendmessage 接口发送消息时,必须将收到的 context_token 原样回传
-
用于会话关联:微信后端通过
context_token 来关联对话上下文,确保消息发送的合法性
原始实现的局限性
在改造之前,contextToken 仅以简单的内存 Map 形式存储:
// 原始实现 - 仅内存存储
const contextTokenStore = new Map<string, string>();
export function setContextToken(accountId: string, userId: string, token: string): void {
const k = `${accountId}:${userId}`;
contextTokenStore.set(k, token); // 仅内存存储,进程结束即丢失
}
export function getContextToken(accountId: string, userId: string): string | undefined {
return contextTokenStore.get(`${accountId}:${userId}`);
}
这种实现方式导致了以下问题:
| 场景 |
问题描述 |
| 网关重启 |
插件进程重启后,内存中的 contextToken 全部丢失,无法发送消息 |
| CLI 命令 |
openclaw message send 命令会重新加载插件,无法访问之前的内存状态 |
| 首次出站 |
如果没有收到过该用户的消息,就没有 contextToken,无法主动发送消息 |
错误示例
当尝试在没有 contextToken 的情况下发送消息时,系统会抛出错误:
Error: sendWeixinOutbound: contextToken is required
或者:
Error: sendMessageWeixin: contextToken is required
解决方案:持久化 Context Token 存储
架构设计
为了解决上述问题,我们设计了一个双层存储架构:
┌─────────────────────────────────────────────────────────────┐
│ Context Token 存储架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────────────┐ │
│ │ In-Memory Cache │ │ Persistent Storage │ │
│ │ (Map) │◄────►│ (FileSystem) │ │
│ │ │ │ │ │
│ │ - 快速访问 │ │ - 进程间共享 │ │
│ │ - 运行时缓存 │ │ - 重启后恢复 │ │
│ │ - 毫秒级读取 │ │ - CLI 可访问 │ │
│ └──────────────────┘ └──────────────────────────┘ │
│ ▲ ▲ │
│ │ │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ Token Store │ │
│ │ Manager │ │
│ └───────────────┘ │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ set() │ │ get() │ │clear() │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
存储路径设计
持久化文件存储在用户主目录下的 OpenClaw 配置目录中:
~/.openclaw/openclaw-weixin/context-tokens/
├── {accountId-1}/
│ ├── user1_im_wechat.json
│ ├── user2_im_wechat.json
│ └── ...
├── {accountId-2}/
│ ├── user3_im_wechat.json
│ └── ...
└── ...
每个文件对应一个 (accountId, userId) 组合,存储该对话的最新 contextToken。
核心代码实现
1. 持久化存储模块:context-token-store.ts
这是整个持久化机制的基础模块,负责与文件系统交互。
import fs from "node:fs";
import path from "node:path";
import { resolveStateDir } from "./state-dir.js";
import { logger } from "../util/logger.js";
// ---------------------------------------------------------------------------
// Persistent Context Token Store
// ---------------------------------------------------------------------------
/**
* Context token persistence for outbound messaging.
*
* The Weixin API requires a context_token for every outbound message, which is
* issued per-message by the getupdates API. This store persists the latest
* contextToken to disk so that outbound messages can be sent even after the
* gateway restarts or when using CLI commands.
*
* Storage path: ~/.openclaw/openclaw-weixin/context-tokens/{accountId}/{userId}.json
*/
interface ContextTokenData {
token: string;
updatedAt: string;
}
function resolveContextTokensDir(): string {
return path.join(resolveStateDir(), "openclaw-weixin", "context-tokens");
}
function resolveContextTokenPath(accountId: string, userId: string): string {
// Sanitize userId for filesystem safety (replace @ and other special chars)
const safeUserId = userId.replace(/[^a-zA-Z0-9_-]/g, "_");
return path.join(resolveContextTokensDir(), accountId, `${safeUserId}.json`);
}
1.1 保存 Token:persistContextToken
/**
* Persist a context token to disk.
* Called when an inbound message is received with a new context_token.
*/
export function persistContextToken(accountId: string, userId: string, token: string): void {
try {
const filePath = resolveContextTokenPath(accountId, userId);
const dir = path.dirname(filePath);
// 确保目录存在(递归创建)
fs.mkdirSync(dir, { recursive: true });
const data: ContextTokenData = {
token,
updatedAt: new Date().toISOString(),
};
// 写入 JSON 文件,格式化便于调试
fs.writeFileSync(filePath, JSON.stringify(data, null, 2), "utf-8");
// 设置文件权限为仅所有者可读写(安全考虑)
try {
fs.chmodSync(filePath, 0o600);
} catch {
// best-effort
}
logger.debug(`persistContextToken: saved token for ${accountId}:${userId}`);
} catch (err) {
logger.error(`persistContextToken: failed to save token: ${String(err)}`);
}
}
关键点说明:
-
路径安全处理:
userId 可能包含特殊字符(如 @),通过正则替换为下划线确保文件系统安全
-
递归目录创建:使用
fs.mkdirSync(dir, { recursive: true }) 确保多级目录自动创建
-
权限控制:设置
0o600 权限,仅允许文件所有者可读写,保护敏感 token 数据
-
错误处理:采用 "best-effort" 策略,即使持久化失败也不影响主流程
1.2 加载 Token:loadPersistedContextToken
/**
* Load a persisted context token from disk.
* Returns undefined if no token exists or loading fails.
*/
export function loadPersistedContextToken(accountId: string, userId: string): string | undefined {
try {
const filePath = resolveContextTokenPath(accountId, userId);
if (!fs.existsSync(filePath)) {
return undefined;
}
const raw = fs.readFileSync(filePath, "utf-8");
const data = JSON.parse(raw) as ContextTokenData;
// 验证 token 格式
if (typeof data.token === "string" && data.token.trim()) {
logger.debug(`loadPersistedContextToken: loaded token for ${accountId}:${userId}`);
return data.token;
}
return undefined;
} catch (err) {
logger.debug(`loadPersistedContextToken: failed to load token: ${String(err)}`);
return undefined;
}
}
关键点说明:
-
防御性编程:文件不存在、JSON 解析失败、token 格式不正确时都返回
undefined
-
格式验证:确保加载的 token 是非空字符串
1.3 清除 Token:clearPersistedContextToken
/**
* Clear a persisted context token (e.g., on session timeout or logout).
*/
export function clearPersistedContextToken(accountId: string, userId: string): void {
try {
const filePath = resolveContextTokenPath(accountId, userId);
if (fs.existsSync(filePath)) {
fs.unlinkSync(filePath);
logger.debug(`clearPersistedContextToken: cleared token for ${accountId}:${userId}`);
}
} catch (err) {
logger.error(`clearPersistedContextToken: failed to clear token: ${String(err)}`);
}
}
1.4 批量加载:loadAllPersistedContextTokens
/**
* Load all persisted context tokens for an account.
* Returns a map of userId -> token.
*/
export function loadAllPersistedContextTokens(accountId: string): Map<string, string> {
const result = new Map<string, string>();
try {
const accountDir = path.join(resolveContextTokensDir(), accountId);
if (!fs.existsSync(accountDir)) {
return result;
}
const files = fs.readdirSync(accountDir);
for (const file of files) {
if (!file.endsWith(".json")) continue;
// Convert filename back to userId (approximate, since we sanitized it)
const safeUserId = file.slice(0, -5); // remove .json
const filePath = path.join(accountDir, file);
try {
const raw = fs.readFileSync(filePath, "utf-8");
const data = JSON.parse(raw) as ContextTokenData;
if (typeof data.token === "string" && data.token.trim()) {
// Store with the safe userId - the actual lookup will use the same sanitization
result.set(safeUserId, data.token);
}
} catch {
// Skip invalid files
}
}
logger.debug(`loadAllPersistedContextTokens: loaded ${result.size} tokens for ${accountId}`);
} catch (err) {
logger.debug(`loadAllPersistedContextTokens: failed to load tokens: ${String(err)}`);
}
return result;
}
2. 双层存储管理:inbound.ts
在持久化存储之上,我们构建了一个双层存储管理器,协调内存缓存和持久化存储的交互。
import { logger } from "../util/logger.js";
import { generateId } from "../util/random.js";
import type { WeixinMessage, MessageItem } from "../api/types.js";
import { MessageItemType } from "../api/types.js";
import {
persistContextToken,
loadPersistedContextToken,
loadAllPersistedContextTokens,
} from "../storage/context-token-store.js";
// ---------------------------------------------------------------------------
// Context token store (in-process cache + persistent storage)
// ---------------------------------------------------------------------------
/**
* contextToken is issued per-message by the Weixin getupdates API and must
* be echoed verbatim in every outbound send.
*
* This store uses both in-memory cache and persistent storage:
* - In-memory: fast access during gateway runtime
* - Persistent: allows outbound messaging after gateway restart or via CLI
*
* Storage path: ~/.openclaw/openclaw-weixin/context-tokens/{accountId}/{userId}.json
*/
const contextTokenStore = new Map<string, string>();
function contextTokenKey(accountId: string, userId: string): string {
return `${accountId}:${userId}`;
}
2.1 存储 Token:setContextToken
/**
* Store a context token for a given account+user pair.
* Persists to disk for CLI/outbound access after restart.
*/
export function setContextToken(accountId: string, userId: string, token: string): void {
const k = contextTokenKey(accountId, userId);
logger.debug(`setContextToken: key=${k}`);
// 1. 写入内存缓存(快速访问)
contextTokenStore.set(k, token);
// 2. 同时持久化到磁盘(跨进程共享)
persistContextToken(accountId, userId, token);
}
设计要点:
-
双写策略:每次设置 token 时,同时更新内存和磁盘
-
以内存为准:内存缓存是运行时权威数据源
-
磁盘为备份:磁盘存储用于进程重启后的恢复
2.2 获取 Token:getContextToken
/**
* Retrieve the cached context token for a given account+user pair.
* Falls back to persisted storage if not in memory.
*/
export function getContextToken(accountId: string, userId: string): string | undefined {
const k = contextTokenKey(accountId, userId);
// 1. 首先检查内存缓存
const cached = contextTokenStore.get(k);
if (cached !== undefined) {
logger.debug(`getContextToken: key=${k} found=in-memory storeSize=${contextTokenStore.size}`);
return cached;
}
// 2. 内存未命中,尝试从持久化存储加载
const persisted = loadPersistedContextToken(accountId, userId);
if (persisted !== undefined) {
// 回填到内存缓存,加速后续访问
contextTokenStore.set(k, persisted);
logger.debug(`getContextToken: key=${k} found=persisted storeSize=${contextTokenStore.size}`);
return persisted;
}
logger.debug(`getContextToken: key=${k} found=false storeSize=${contextTokenStore.size}`);
return undefined;
}
缓存策略:
-
L1 缓存(内存):纳秒级访问速度
-
L2 缓存(磁盘):毫秒级访问速度
-
回填机制:从磁盘加载后自动回填到内存,形成缓存预热
2.3 预加载 Token:preloadContextTokens
/**
* Pre-load all persisted context tokens for an account into memory.
* Called when an account starts to enable immediate outbound messaging.
*/
export function preloadContextTokens(accountId: string): void {
const tokens = loadAllPersistedContextTokens(accountId);
for (const [safeUserId, token] of tokens) {
// Use the safe userId as the key (it was sanitized for filesystem)
const k = `${accountId}:${safeUserId}`;
contextTokenStore.set(k, token);
}
logger.info(`preloadContextTokens: loaded ${tokens.size} tokens for ${accountId}`);
}
使用场景:
- 网关启动时预加载所有历史 token
- 账号重新连接时恢复会话状态
- 确保重启后立即具备消息发送能力
3. 网关集成:channel.ts
持久化存储机制需要在网关生命周期中正确集成,才能发挥作用。
3.1 导入依赖
import path from "node:path";
import type { ChannelPlugin, OpenClawConfig } from "openclaw/plugin-sdk";
import { normalizeAccountId } from "openclaw/plugin-sdk";
import {
registerWeixinAccountId,
loadWeixinAccount,
saveWeixinAccount,
listWeixinAccountIds,
resolveWeixinAccount,
triggerWeixinChannelReload,
DEFAULT_BASE_URL,
} from "./auth/accounts.js";
import type { ResolvedWeixinAccount } from "./auth/accounts.js";
import { assertSessionActive } from "./api/session-guard.js";
import { getContextToken, preloadContextTokens } from "./messaging/inbound.js";
import { logger } from "./util/logger.js";
// ... 其他导入
3.2 出站消息发送
async function sendWeixinOutbound(params: {
cfg: OpenClawConfig;
to: string;
text: string;
accountId?: string | null;
contextToken?: string;
mediaUrl?: string;
}): Promise<{ channel: string; messageId: string }> {
const account = resolveWeixinAccount(params.cfg, params.accountId);
const aLog = logger.withAccount(account.accountId);
// 验证会话状态
assertSessionActive(account.accountId);
if (!account.configured) {
aLog.error(`sendWeixinOutbound: account not configured`);
throw new Error("weixin not configured: please run `openclaw channels login --channel openclaw-weixin`");
}
// 关键验证:必须有 contextToken
if (!params.contextToken) {
aLog.error(`sendWeixinOutbound: contextToken missing, refusing to send to=${params.to}`);
throw new Error("sendWeixinOutbound: contextToken is required");
}
const result = await sendMessageWeixin({
to: params.to,
text: params.text,
opts: {
baseUrl: account.baseUrl,
token: account.token,
contextToken: params.contextToken,
}
});
return { channel: "openclaw-weixin", messageId: result.messageId };
}
3.3 出站消息配置
export const weixinPlugin: ChannelPlugin<ResolvedWeixinAccount> = {
// ... 其他配置
outbound: {
deliveryMode: "direct",
textChunkLimit: 4000,
// 发送文本消息
sendText: async (ctx) => {
const result = await sendWeixinOutbound({
cfg: ctx.cfg,
to: ctx.to,
text: ctx.text,
accountId: ctx.accountId,
// 从存储中获取 contextToken
contextToken: getContextToken(ctx.accountId!, ctx.to),
});
return result;
},
// 发送媒体消息
sendMedia: async (ctx) => {
const account = resolveWeixinAccount(ctx.cfg, ctx.accountId);
const aLog = logger.withAccount(account.accountId);
assertSessionActive(account.accountId);
if (!account.configured) {
aLog.error(`sendMedia: account not configured`);
throw new Error(
"weixin not configured: please run `openclaw channels login --channel openclaw-weixin`",
);
}
const mediaUrl = ctx.mediaUrl;
if (mediaUrl && (isLocalFilePath(mediaUrl) || isRemoteUrl(mediaUrl))) {
let filePath: string;
if (isLocalFilePath(mediaUrl)) {
filePath = resolveLocalPath(mediaUrl);
aLog.debug(`sendMedia: uploading local file ${filePath}`);
} else {
aLog.debug(`sendMedia: downloading remote mediaUrl=${mediaUrl.slice(0, 80)}...`);
filePath = await downloadRemoteImageToTemp(mediaUrl, MEDIA_OUTBOUND_TEMP_DIR);
aLog.debug(`sendMedia: remote image downloaded to ${filePath}`);
}
// 获取 contextToken 用于媒体发送
const contextToken = getContextToken(account.accountId, ctx.to);
const result = await sendWeixinMediaFile({
filePath,
to: ctx.to,
text: ctx.text ?? "",
opts: { baseUrl: account.baseUrl, token: account.token, contextToken },
cdnBaseUrl: account.cdnBaseUrl,
});
return { channel: "openclaw-weixin", messageId: result.messageId };
}
// 回退到纯文本发送
const result = await sendWeixinOutbound({
cfg: ctx.cfg,
to: ctx.to,
text: ctx.text ?? "",
accountId: ctx.accountId,
contextToken: getContextToken(ctx.accountId!, ctx.to),
});
return result;
},
},
// ... 其他配置
};
3.4 网关启动时预加载
gateway: {
startAccount: async (ctx) => {
logger.debug(`startAccount entry`);
if (!ctx) {
logger.warn(`gateway.startAccount: called with undefined ctx, skipping`);
return;
}
const account = ctx.account;
const aLog = logger.withAccount(account.accountId);
aLog.debug(`about to call monitorWeixinProvider`);
aLog.info(`starting weixin webhook`);
ctx.setStatus?.({
accountId: account.accountId,
running: true,
lastStartAt: Date.now(),
lastEventAt: Date.now(),
});
if (!account.configured) {
aLog.error(`account not configured`);
ctx.log?.error?.(
`[${account.accountId}] weixin not logged in — run: openclaw channels login --channel openclaw-weixin`,
);
ctx.setStatus?.({ accountId: account.accountId, running: false });
throw new Error("weixin not configured: missing token");
}
ctx.log?.info?.(`[${account.accountId}] starting weixin provider (${DEFAULT_BASE_URL})`);
const logPath = aLog.getLogFilePath();
ctx.log?.info?.(`[${account.accountId}] weixin logs: ${logPath}`);
// ═══════════════════════════════════════════════════════════════════
// 关键:启动时预加载持久化的 context tokens
// 这使得网关重启后立即具备出站消息发送能力
// ═══════════════════════════════════════════════════════════════════
preloadContextTokens(account.accountId);
return monitorWeixinProvider({
baseUrl: account.baseUrl,
cdnBaseUrl: account.cdnBaseUrl,
token: account.token,
accountId: account.accountId,
config: ctx.cfg,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
setStatus: ctx.setStatus,
});
},
// ... 其他网关方法
}
4. 消息发送实现:send.ts
最后,我们来看实际的消息发送实现,它依赖于前面构建的 contextToken 机制。
import type { ReplyPayload } from "openclaw/plugin-sdk";
import { stripMarkdown } from "openclaw/plugin-sdk";
import { sendMessage as sendMessageApi } from "../api/api.js";
import type { WeixinApiOptions } from "../api/api.js";
import { logger } from "../util/logger.js";
import { generateId } from "../util/random.js";
import type { MessageItem, SendMessageReq } from "../api/types.js";
import { MessageItemType, MessageState, MessageType } from "../api/types.js";
import type { UploadedFileInfo } from "../cdn/upload.js";
function generateClientId(): string {
return generateId("openclaw-weixin");
}
/**
* Convert markdown-formatted model reply to plain text for Weixin delivery.
* Preserves newlines; strips markdown syntax.
*/
export function markdownToPlainText(text: string): string {
let result = text;
// Code blocks: strip fences, keep code content
result = result.replace(/```[^\n]*\n?([\s\S]*?)```/g, (_, code: string) => code.trim());
// Images: remove entirely
result = result.replace(/!\[[^\]]*\]\([^)]*\)/g, "");
// Links: keep display text only
result = result.replace(/\[([^\]]+)\]\([^)]*\)/g, "$1");
// Tables: remove separator rows, then strip leading/trailing pipes and convert inner pipes to spaces
result = result.replace(/^\|[\s:|-]+\|$/gm, "");
result = result.replace(/^\|(.+)\|$/gm, (_, inner: string) =>
inner.split("|").map((cell) => cell.trim()).join(" "),
);
result = stripMarkdown(result);
return result;
}
4.1 构建消息请求
/** Build a SendMessageReq containing a single text message. */
function buildTextMessageReq(params: {
to: string;
text: string;
contextToken?: string;
clientId: string;
}): SendMessageReq {
const { to, text, contextToken, clientId } = params;
const item_list: MessageItem[] = text
? [{ type: MessageItemType.TEXT, text_item: { text } }]
: [];
return {
msg: {
from_user_id: "",
to_user_id: to,
client_id: clientId,
message_type: MessageType.BOT,
message_state: MessageState.FINISH,
item_list: item_list.length ? item_list : undefined,
context_token: contextToken ?? undefined, // 关键:传递 context_token
},
};
}
/** Build a SendMessageReq from a reply payload (text only; image send uses sendImageMessageWeixin). */
function buildSendMessageReq(params: {
to: string;
contextToken?: string;
payload: ReplyPayload;
clientId: string;
}): SendMessageReq {
const { to, contextToken, payload, clientId } = params;
return buildTextMessageReq({
to,
text: payload.text ?? "",
contextToken,
clientId,
});
}
4.2 发送文本消息
/**
* Send a plain text message downstream.
* contextToken is required for all reply sends; missing it breaks conversation association.
*/
export async function sendMessageWeixin(params: {
to: string;
text: string;
opts: WeixinApiOptions & { contextToken?: string };
}): Promise<{ messageId: string }> {
const { to, text, opts } = params;
// 严格检查:没有 contextToken 拒绝发送
if (!opts.contextToken) {
logger.error(`sendMessageWeixin: contextToken missing, refusing to send to=${to}`);
throw new Error("sendMessageWeixin: contextToken is required");
}
const clientId = generateClientId();
const req = buildSendMessageReq({
to,
contextToken: opts.contextToken,
payload: { text },
clientId,
});
try {
await sendMessageApi({
baseUrl: opts.baseUrl,
token: opts.token,
timeoutMs: opts.timeoutMs,
body: req,
});
} catch (err) {
logger.error(`sendMessageWeixin: failed to=${to} clientId=${clientId} err=${String(err)}`);
throw err;
}
return { messageId: clientId };
}
4.3 发送媒体消息
/**
* Send one or more MessageItems (optionally preceded by a text caption) downstream.
* Each item is sent as its own request so that item_list always has exactly one entry.
*/
async function sendMediaItems(params: {
to: string;
text: string;
mediaItem: MessageItem;
opts: WeixinApiOptions & { contextToken?: string };
label: string;
}): Promise<{ messageId: string }> {
const { to, text, mediaItem, opts, label } = params;
const items: MessageItem[] = [];
if (text) {
items.push({ type: MessageItemType.TEXT, text_item: { text } });
}
items.push(mediaItem);
let lastClientId = "";
for (const item of items) {
lastClientId = generateClientId();
const req: SendMessageReq = {
msg: {
from_user_id: "",
to_user_id: to,
client_id: lastClientId,
message_type: MessageType.BOT,
message_state: MessageState.FINISH,
item_list: [item],
context_token: opts.contextToken ?? undefined, // 传递 context_token
},
};
try {
await sendMessageApi({
baseUrl: opts.baseUrl,
token: opts.token,
timeoutMs: opts.timeoutMs,
body: req,
});
} catch (err) {
logger.error(
`${label}: failed to=${to} clientId=${lastClientId} err=${String(err)}`,
);
throw err;
}
}
logger.debug(`${label}: success to=${to} clientId=${lastClientId}`);
return { messageId: lastClientId };
}
4.4 发送图片消息
/**
* Send an image message downstream using a previously uploaded file.
* Optionally include a text caption as a separate TEXT item before the image.
*
* ImageItem fields:
* - media.encrypt_query_param: CDN download param
* - media.aes_key: AES key, base64-encoded
* - mid_size: original ciphertext file size
*/
export async function sendImageMessageWeixin(params: {
to: string;
text: string;
uploaded: UploadedFileInfo;
opts: WeixinApiOptions & { contextToken?: string };
}): Promise<{ messageId: string }> {
const { to, text, uploaded, opts } = params;
// 同样需要 contextToken
if (!opts.contextToken) {
logger.error(`sendImageMessageWeixin: contextToken missing, refusing to send to=${to}`);
throw new Error("sendImageMessageWeixin: contextToken is required");
}
logger.debug(
`sendImageMessageWeixin: to=${to} filekey=${uploaded.filekey} fileSize=${uploaded.fileSize} aeskey=present`,
);
const imageItem: MessageItem = {
type: MessageItemType.IMAGE,
image_item: {
media: {
encrypt_query_param: uploaded.downloadEncryptedQueryParam,
aes_key: Buffer.from(uploaded.aeskey).toString("base64"),
encrypt_type: 1,
},
mid_size: uploaded.fileSizeCiphertext,
},
};
return sendMediaItems({ to, text, mediaItem: imageItem, opts, label: "sendImageMessageWeixin" });
}
4.5 发送视频和文件消息
/**
* Send a video message downstream using a previously uploaded file.
* VideoItem: media (CDN ref), video_size (ciphertext bytes).
* Includes an optional text caption sent as a separate TEXT item first.
*/
export async function sendVideoMessageWeixin(params: {
to: string;
text: string;
uploaded: UploadedFileInfo;
opts: WeixinApiOptions & { contextToken?: string };
}): Promise<{ messageId: string }> {
const { to, text, uploaded, opts } = params;
if (!opts.contextToken) {
logger.error(`sendVideoMessageWeixin: contextToken missing, refusing to send to=${to}`);
throw new Error("sendVideoMessageWeixin: contextToken is required");
}
const videoItem: MessageItem = {
type: MessageItemType.VIDEO,
video_item: {
media: {
encrypt_query_param: uploaded.downloadEncryptedQueryParam,
aes_key: Buffer.from(uploaded.aeskey).toString("base64"),
encrypt_type: 1,
},
video_size: uploaded.fileSizeCiphertext,
},
};
return sendMediaItems({ to, text, mediaItem: videoItem, opts, label: "sendVideoMessageWeixin" });
}
/**
* Send a file attachment downstream using a previously uploaded file.
* FileItem: media (CDN ref), file_name, len (plaintext bytes as string).
* Includes an optional text caption sent as a separate TEXT item first.
*/
export async function sendFileMessageWeixin(params: {
to: string;
text: string;
fileName: string;
uploaded: UploadedFileInfo;
opts: WeixinApiOptions & { contextToken?: string };
}): Promise<{ messageId: string }> {
const { to, text, fileName, uploaded, opts } = params;
if (!opts.contextToken) {
logger.error(`sendFileMessageWeixin: contextToken missing, refusing to send to=${to}`);
throw new Error("sendFileMessageWeixin: contextToken is required");
}
const fileItem: MessageItem = {
type: MessageItemType.FILE,
file_item: {
media: {
encrypt_query_param: uploaded.downloadEncryptedQueryParam,
aes_key: Buffer.from(uploaded.aeskey).toString("base64"),
encrypt_type: 1,
},
file_name: fileName,
len: String(uploaded.fileSize),
},
};
return sendMediaItems({ to, text, mediaItem: fileItem, opts, label: "sendFileMessageWeixin" });
}
数据流全景图
以下是改造后的完整数据流:
┌─────────────────────────────────────────────────────────────────────────────┐
│ 入站消息流程 (Inbound) │
└─────────────────────────────────────────────────────────────────────────────┘
┌──────────────┐
│ Weixin API │ getupdates 返回消息 + context_token
└──────┬───────┘
│
▼
┌──────────────┐
│ monitor.ts │ 解析消息,提取 context_token
└──────┬───────┘
│
▼
┌──────────────┐ ┌─────────────────┐
│ inbound.ts │────►│ 内存 Map 缓存 │
│ setContextToken └─────────────────┘
└──────┬───────┘ │
│ │
│ ┌────────▼────────┐
│ │ 持久化存储 │
└─────────────►│ (JSON 文件) │
└─────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ 出站消息流程 (Outbound) │
└─────────────────────────────────────────────────────────────────────────────┘
┌──────────────────┐
│ 发送请求来源 │
│ - AI 自动回复 │
│ - CLI 命令 │
│ - 定时任务 │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ channel.ts │ getContextToken(accountId, userId)
│ sendText() │
└────────┬─────────┘
│
▼
┌──────────────────┐ 命中? ┌─────────────────┐
│ inbound.ts │─────────────►│ 返回内存缓存 │
│ getContextToken │ └─────────────────┘
└────────┬─────────┘
│ 未命中
▼
┌──────────────────┐ 存在? ┌─────────────────┐
│ context-token- │─────────────►│ 加载 + 回填缓存 │
│ store.ts │ │ 返回 token │
│ loadPersisted... │ └─────────────────┘
└────────┬─────────┘
│ 不存在
▼
┌──────────────────┐
│ 抛出错误 │
│ "contextToken │
│ is required" │
└──────────────────┘
│
▼ (token 存在)
┌──────────────────┐
│ send.ts │ 构建请求 + context_token
│ sendMessageWeixin│
└────────┬─────────┘
│
▼
┌──────────────────┐
│ api.ts │ HTTP POST sendmessage
│ sendMessageApi │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Weixin API │ 消息发送成功
└──────────────────┘
使用场景示例
场景 1:AI 自动回复
当用户发送消息触发 AI 响应时,流程如下:
// 1. 用户发送消息
const inboundMsg = await getUpdates(); // 返回 { ..., context_token: "abc123" }
// 2. 存储 contextToken
setContextToken("account-1", "user@im.wechat", "abc123");
// 同时写入:
// - 内存: contextTokenStore.set("account-1:user@im.wechat", "abc123")
// - 磁盘: ~/.openclaw/.../account-1/user_im_wechat.json
// 3. AI 生成回复
const reply = await ai.generateResponse(inboundMsg.body);
// 4. 发送回复(自动获取 contextToken)
await sendText({
to: "user@im.wechat",
text: reply,
accountId: "account-1",
// getContextToken("account-1", "user@im.wechat") 自动返回 "abc123"
});
场景 2:网关重启后恢复
// 网关启动
async function startAccount(ctx) {
// 预加载所有持久化的 token
preloadContextTokens("account-1");
// 从磁盘加载 ~/.openclaw/.../account-1/*.json
// 恢复到内存缓存
// 现在可以立即发送消息,即使没有收到新消息
await sendText({
to: "user@im.wechat",
text: "网关已重启,服务恢复正常",
accountId: "account-1",
// getContextToken 会从内存缓存返回之前持久化的 token
});
}
场景 3:CLI 命令发送消息
# 使用 openclaw agent 命令(在网关会话中)
openclaw agent --session-id <session-id> --message "Hello" --deliver
# 由于 contextToken 已持久化,即使 CLI 重新加载插件也能获取到 token
总结
通过引入持久化的 Context Token 存储机制,我们解决了微信插件在出站消息发送方面的核心限制:
| 改进点 |
改造前 |
改造后 |
| 存储位置 |
仅内存 |
内存 + 磁盘 |
| 网关重启 |
Token 丢失,无法发送 |
从磁盘恢复,立即可用 |
| CLI 命令 |
无法获取 Token |
从磁盘读取 Token |
| 首次出站 |
必须等待入站消息 |
使用历史 Token 即可发送 |
| 可靠性 |
低 |
高 |
核心设计原则
-
双写策略:内存和磁盘同时更新,确保数据一致性
-
分层缓存:内存优先,磁盘兜底,兼顾速度和可靠性
-
预加载机制:启动时批量恢复,避免冷启动延迟
-
防御性编程:所有文件操作都有错误处理,单点故障不影响整体
-
安全第一:敏感数据设置严格的文件权限(0o600)
代码文件清单
| 文件路径 |
职责 |
src/storage/context-token-store.ts |
持久化存储实现 |
src/messaging/inbound.ts |
双层存储管理器 |
src/channel.ts |
网关集成和出站发送 |
src/messaging/send.ts |
消息发送实现 |
这套机制确保了 OpenClaw 微信插件在各种场景下都能稳定可靠地发送消息,为 AI 助手与微信用户的交互提供了坚实的基础。