OpenClaw Gateway RPC 运行时:一个 WebSocket 协议引擎的深度解剖
如果你用过 OpenClaw,你一定注意到它的跨平台能力——iOS、Android、macOS 客户端、命令行工具、Web 控制台……这些客户端分布在不同设备、不同网络,却都能实时与 Gateway 对话,收发消息、触发 AI 会话、管理 Cron 任务。
这一切的底层,是一套基于 WebSocket 的 RPC 协议运行时。今天我们把它从头到尾拆开来看。
一、为什么选 WebSocket,而不是 REST / gRPC
在正式看代码之前,先聊聊选型理由。
Gateway 需要同时满足两件事:双向通信(服务端主动推送事件)和请求-响应语义(客户端发方法调用,服务端回结果)。
REST 解决了第二点,却没有第一点。gRPC 两点都有,但它需要 HTTP/2,而桌面 menubar 和移动端 SDK 与 Gateway 之间的网络环境可能经过 Tailscale / 反向代理,HTTP/2 的兼容性反而是麻烦。
WebSocket 在这里是个刚好够用的选择:
- 单连接全双工,服务端随时可推;
- 基于 HTTP 升级,穿透代理友好;
- 纯文本 JSON 帧,调试可见,无额外序列化依赖;
- 浏览器原生支持,Control UI 和 Webchat 可以直连。
二、协议层:三种帧和一个版本号
所有帧定义在 src/gateway/protocol/schema/frames.ts,用 @sinclair/typebox 描述 schema。完整定义:
// RequestFrame:客户端调用某个 RPC 方法
export const RequestFrameSchema = Type.Object(
{
type: Type.Literal("req"),
id: NonEmptyString, // 调用 ID,对应响应时 echo 回来
method: NonEmptyString, // 如 "chat.send" / "sessions.list"
params: Type.Optional(Type.Unknown()),
},
{ additionalProperties: false },
);
// ResponseFrame:服务端对某次 req 的回应
export const ResponseFrameSchema = Type.Object(
{
type: Type.Literal("res"),
id: NonEmptyString, // 与 req.id 对应
ok: Type.Boolean(),
payload: Type.Optional(Type.Unknown()),
error: Type.Optional(ErrorShapeSchema),
},
{ additionalProperties: false },
);
// EventFrame:服务端主动推送的事件,无需 req 触发
export const EventFrameSchema = Type.Object(
{
type: Type.Literal("event"),
event: NonEmptyString, // 如 "agent" / "tick" / "chat"
payload: Type.Optional(Type.Unknown()),
seq: Type.Optional(Type.Integer({ minimum: 0 })),
stateVersion: Type.Optional(StateVersionSchema),
},
{ additionalProperties: false },
);
// 顶层判别联合
export const GatewayFrameSchema = Type.Union(
[RequestFrameSchema, ResponseFrameSchema, EventFrameSchema],
{ discriminator: "type" },
);
seq 字段是广播事件的全局单调递增序号。客户端可以用 seq 检测是否有事件被跳过(网络抖动时丢帧),按需请求重放。定向推送(broadcastToConnIds)不带 seq,因为它不是全局序列。
stateVersion 则是双整数的版本向量 { presence: number, health: number },让客户端知道它现在看到的状态快照是否是最新的。
三、服务端常量:帧大小与心跳节奏
src/gateway/server-constants.ts:
// 与客户端保持同步,canvas 快照可以很大
export const MAX_PAYLOAD_BYTES = 25 * 1024 * 1024;
// 单连接发送缓冲区上限,超过则视为慢消费者
export const MAX_BUFFERED_BYTES = 50 * 1024 * 1024;
// 握手阶段(未认证)的帧大小限制,远小于认证后
export const MAX_PREAUTH_PAYLOAD_BYTES = 64 * 1024;
// 心跳 tick 事件间隔:30 秒
export const TICK_INTERVAL_MS = 30_000;
// 健康快照刷新间隔:60 秒
export const HEALTH_REFRESH_INTERVAL_MS = 60_000;
// 握手超时:3 秒(连接后必须完成认证)
export const DEFAULT_HANDSHAKE_TIMEOUT_MS = 3_000;
// 去重 TTL 和最大条目数
export const DEDUPE_TTL_MS = 5 * 60_000;
export const DEDUPE_MAX = 1000;
注意 MAX_PREAUTH_PAYLOAD_BYTES = 64KB——这是握手阶段(尚未认证)允许的最大帧大小。任何未认证连接发来超过 64KB 的帧,立刻被关闭,原因记录为 preauth-payload-too-large。这杜绝了攻击者在握手阶段发送巨型帧耗尽内存。
MAX_BUFFERED_BYTES 是另一个关键数字。广播函数发帧之前,会检查 socket.bufferedAmount,如果超过这个阈值,该客户端会被踢下线(slow consumer),防止一个慢消费者拖垮整个广播队列。
四、连接生命周期:从 TCP 到 hello-ok
整个 WebSocket 连接的生命周期由两个文件分工:
-
src/gateway/server/ws-connection.ts:连接级别的状态机,处理打开/关闭事件; -
src/gateway/server/ws-connection/message-handler.ts:消息级别的处理,1158 行,是协议引擎的核心。
4.1 连接打开:发 Challenge
一个新的 WebSocket 连接进来,attachGatewayWsConnectionHandler 立刻做这几件事:
wss.on("connection", (socket, upgradeReq) => {
const connId = randomUUID(); // 连接唯一 ID
let handshakeState: "pending" | "connected" | "failed" = "pending";
const openedAt = Date.now();
// 立刻发 challenge 事件,里面有 nonce
const connectNonce = randomUUID();
send({
type: "event",
event: "connect.challenge",
payload: { nonce: connectNonce, ts: Date.now() },
});
// 握手超时计时器,3 秒内未完成则强制关闭
const handshakeTimer = setTimeout(() => {
if (!client) {
handshakeState = "failed";
setCloseCause("handshake-timeout", { handshakeMs: Date.now() - openedAt });
close();
}
}, handshakeTimeoutMs);
// ...
为什么要先发 nonce?
这是防重放攻击的关键机制。客户端在发送 connect 请求时,需要用自己的设备私钥对 {nonce, role, scopes, signedAt} 进行签名。nonce 是服务端生成的一次性随机数,且有 2 分钟时效窗口(DEVICE_SIGNATURE_SKEW_MS = 2 * 60 * 1000)。攻击者即使截获了一次合法的签名,也无法在另一个连接里复用——因为 nonce 不同。
4.2 连接关闭:追踪诊断信息
关闭时,代码把所有诊断信息一起打进日志:
socket.once("close", (code, reason) => {
const durationMs = Date.now() - openedAt;
// 如果是 node 角色断开,注销 nodeRegistry
if (client?.connect?.role === "node") {
const context = buildRequestContext();
const nodeId = context.nodeRegistry.unregister(connId);
if (nodeId) {
removeRemoteNodeInfo(nodeId);
context.nodeUnsubscribeAll(nodeId);
}
}
// 更新 presence 快照
if (client?.presenceKey) {
upsertPresence(client.presenceKey, { reason: "disconnect" });
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
}
// 记录 closeCause、握手状态、最后一帧的 type/method/id
logWs("out", "close", {
connId, code, reason: logReason,
durationMs, cause: closeCause,
handshake: handshakeState,
lastFrameType, lastFrameMethod, lastFrameId,
});
});
lastFrameType / lastFrameMethod / lastFrameId 是每次处理消息时都会更新的三个字段,专门用于断连后的事后诊断——看连接最后在干什么、卡在哪个方法上。
五、握手认证:一条六关卡流水线
握手认证是 message-handler.ts 最复杂的部分,大约占整个文件的 80%。它是一条线性流水线,任何一关卡失败都直接关闭连接。
第一关:格式验证
第一条消息必须是合法的 RequestFrame,且 method === "connect",params 满足 ConnectParamsSchema。否则立刻关闭,原因 invalid-handshake。
const isRequestFrame = validateRequestFrame(parsed);
if (!isRequestFrame || parsed.method !== "connect" || !validateConnectParams(parsed.params)) {
const handshakeError = isRequestFrame
? parsed.method === "connect"
? `invalid connect params: ${formatValidationErrors(validateConnectParams.errors)}`
: "invalid handshake: first request must be connect"
: "invalid request frame";
// 关闭连接...
}
第二关:协议版本协商
ConnectParams 里有 minProtocol 和 maxProtocol 两个字段,形成客户端声明的协议版本区间。服务端的 PROTOCOL_VERSION 必须落在这个区间内,否则报 protocol-mismatch:
const { minProtocol, maxProtocol } = connectParams;
if (maxProtocol < PROTOCOL_VERSION || minProtocol > PROTOCOL_VERSION) {
markHandshakeFailure("protocol-mismatch", {
minProtocol, maxProtocol, expectedProtocol: PROTOCOL_VERSION,
});
sendHandshakeErrorResponse(ErrorCodes.INVALID_REQUEST, "protocol mismatch", {
details: { expectedProtocol: PROTOCOL_VERSION },
});
close(1002, "protocol mismatch");
return;
}
这个双边区间设计让协议升级平滑——客户端可以声明 minProtocol: 3, maxProtocol: 5,服务端可以接受任何版本在此范围内的连接,而不需要精确匹配。
第三关:Origin 检查
对于 Control UI 和 Webchat 类型的客户端,以及带 Origin 头的浏览器请求,必须通过 Origin 合法性检查。这里有一个特殊的告警路径:
if (originCheck.matchedBy === "host-header-fallback") {
originCheckMetrics.hostHeaderFallbackAccepted += 1;
logWsControl.warn(
`security warning: websocket origin accepted via Host-header fallback...`
);
}
当 Origin 头缺失时,服务端可以配置 dangerouslyAllowHostHeaderOriginFallback 回退到用 Host 头做匹配,但这是危险选项,每次接受都会写警告日志并计数。
第四关:角色和共享认证
ConnectParams 里的 role 只有两个值——"operator" 或 "node":
export const GATEWAY_ROLES = ["operator", "node"] as const;
export type GatewayRole = (typeof GATEWAY_ROLES)[number];
-
operator:人类或自动化工具发来的命令,绝大多数 RPC 方法只有 operator 能调; -
node:移动端设备(iOS/Android)注册为远程执行节点,只能使用node.*方法。
共享认证(auth.token / auth.password)在这一关校验,同时也对 loopback 直连请求做 Tailscale/trusted-proxy 判断。
第五关:设备身份验证
如果客户端提供了 device 字段({ id, publicKey, signature, signedAt, nonce }),就进入设备签名验证流程:
const derivedId = deriveDeviceIdFromPublicKey(device.publicKey);
if (!derivedId || derivedId !== device.id) {
rejectDeviceAuthInvalid("device-id-mismatch", "device identity mismatch");
return;
}
const signedAt = device.signedAt;
if (typeof signedAt !== "number" || Math.abs(Date.now() - signedAt) > DEVICE_SIGNATURE_SKEW_MS) {
rejectDeviceAuthInvalid("device-signature-stale", "device signature expired");
return;
}
// nonce 必须和服务端发出的 connectNonce 完全一致
if (providedNonce !== connectNonce) {
rejectDeviceAuthInvalid("device-nonce-mismatch", "device nonce mismatch");
return;
}
三重验证:设备 ID 必须能从公钥推导出来(防止伪造 ID);签名时间戳必须在 ±2 分钟窗口内(防止重放);nonce 必须和本次连接发出的挑战值一致(防止跨连接重用)。
认证决策由 resolveConnectAuthState + resolveConnectAuthDecision 两步完成,支持多种凭据类型:
export type ConnectAuthState = {
authResult: GatewayAuthResult;
authOk: boolean;
authMethod: GatewayAuthResult["method"];
sharedAuthOk: boolean; // token/password 认证结果
sharedAuthProvided: boolean;
bootstrapTokenCandidate?: string; // 初次配对 bootstrap token
deviceTokenCandidate?: string; // 已配对设备的持久 token
deviceTokenCandidateSource?: DeviceTokenCandidateSource;
};
第六关:设备配对验证
即使认证通过,设备还需要验证是否已配对(getPairedDevice),且配对记录中的公钥与当前连接提供的公钥完全一致。
如果未配对,触发配对流程 requestDevicePairing:
const requirePairing = async (
reason: "not-paired" | "role-upgrade" | "scope-upgrade" | "metadata-upgrade",
) => {
const allowSilentLocalPairing = shouldAllowSilentLocalPairing({
isLocalClient, hasBrowserOriginHeader, isControlUi, isWebchat, reason,
});
const pairing = await requestDevicePairing({
deviceId: device.id,
publicKey: devicePublicKey,
...clientPairingMetadata,
silent: allowSilentLocalPairing,
});
if (pairing.request.silent === true) {
// 本地客户端静默自动批准
const approved = await approveDevicePairing(pairing.request.requestId);
context.broadcast("device.pair.resolved", { requestId, deviceId, decision: "approved", ts }, { dropIfSlow: true });
} else if (pairing.created) {
// 远程客户端:广播配对请求,等待人工审批
context.broadcast("device.pair.requested", pairing.request, { dropIfSlow: true });
// 关闭连接,让客户端等待审批后重连
close(1008, "pairing required");
return false;
}
};
Scope 升级(客户端请求比配对记录更高的权限)、Role 升级(客户端请求不同角色)、设备元数据变更(平台/设备家族不符合已记录的 pinned 值),都会触发重新配对流程,每次都记安全审计日志:
logGateway.warn(
`security audit: device access upgrade requested reason=role-upgrade device=${device.id} ip=... auth=${authMethod} roleFrom=... roleTo=${role} scopesFrom=... scopesTo=...`,
);
六、hello-ok:握手成功的状态快照
六关全部通过后,服务端发送 hello-ok 响应,里面包含一个完整的状态快照:
const helloOk = {
type: "hello-ok",
protocol: PROTOCOL_VERSION,
server: {
version: resolveRuntimeServiceVersion(process.env),
connId, // 本次连接的唯一 ID,客户端日志定位用
},
features: {
methods: gatewayMethods, // 所有可用方法名列表
events, // 所有可能推送的事件名列表
},
snapshot, // 当前 presence、health、配置路径等完整状态
canvasHostUrl, // Canvas 宿主 URL(node 角色专有)
auth: deviceToken ? {
deviceToken: deviceToken.token,
role: deviceToken.role,
scopes: deviceToken.scopes,
issuedAtMs: deviceToken.rotatedAtMs ?? deviceToken.createdAtMs,
} : undefined,
policy: {
maxPayload: MAX_PAYLOAD_BYTES, // 25MB
maxBufferedBytes: MAX_BUFFERED_BYTES, // 50MB
tickIntervalMs: TICK_INTERVAL_MS, // 30000ms
},
};
features.methods 是方法白名单——客户端收到之后才知道这个服务端版本支持哪些方法,可以根据此做功能降级。Snapshot 的完整 schema:
export const SnapshotSchema = Type.Object({
presence: Type.Array(PresenceEntrySchema), // 所有在线客户端列表
health: HealthSnapshotSchema, // 渠道健康状态
stateVersion: StateVersionSchema, // { presence: number, health: number }
uptimeMs: Type.Integer({ minimum: 0 }),
configPath: Type.Optional(NonEmptyString),
stateDir: Type.Optional(NonEmptyString),
sessionDefaults: Type.Optional(SessionDefaultsSchema),
authMode: Type.Optional(Type.Union([
Type.Literal("none"), Type.Literal("token"),
Type.Literal("password"), Type.Literal("trusted-proxy"),
])),
updateAvailable: Type.Optional(Type.Object({
currentVersion: NonEmptyString,
latestVersion: NonEmptyString,
channel: NonEmptyString,
})),
}, { additionalProperties: false });
这个 snapshot 让客户端在连接成功的瞬间就拿到足够的上下文,不需要再单独发 health 或 status 请求——减少了一个 RTT。
七、认证完成后:请求路由和方法调度
握手成功后,后续帧必须全是 RequestFrame(type: "req")。非 req 帧会收到错误响应,但连接不会立刻关闭。
每个请求经过 handleGatewayRequest 完成三层过滤:
export async function handleGatewayRequest(
opts: GatewayRequestOptions & { extraHandlers?: GatewayRequestHandlers },
): Promise<void> {
const { req, respond, client, isWebchatConnect, context } = opts;
// 第一层:角色授权
const authError = authorizeGatewayMethod(req.method, client);
if (authError) { respond(false, undefined, authError); return; }
// 第二层:控制平面写操作限流(3次/60秒)
if (CONTROL_PLANE_WRITE_METHODS.has(req.method)) {
const budget = consumeControlPlaneWriteBudget({ client });
if (!budget.allowed) {
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, `rate limit exceeded...`, {
retryable: true,
retryAfterMs: budget.retryAfterMs,
details: { method: req.method, limit: "3 per 60s" },
}));
return;
}
}
// 第三层:查找 handler 并执行
const handler = opts.extraHandlers?.[req.method] ?? coreGatewayHandlers[req.method];
if (!handler) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, `unknown method: ${req.method}`));
return;
}
// 包裹在插件 request scope 中执行
await withPluginRuntimeGatewayRequestScope({ context, client, isWebchatConnect }, invokeHandler);
}
CONTROL_PLANE_WRITE_METHODS 目前是 config.apply、config.patch、update.run 三个——修改配置和触发更新,每 60 秒只能调 3 次,防止自动化脚本频繁 hammer。
withPluginRuntimeGatewayRequestScope 把 handler 包裹在一个插件运行时请求 scope 里,允许子 Agent 在工具执行过程中回调到 Gateway 方法——这是 Pi 嵌入式运行时和上下文引擎工具调用时的关键路径。
八、方法注册:核心处理器表
所有方法处理器通过展开合并聚合到一张哈希表里:
export const coreGatewayHandlers: GatewayRequestHandlers = {
...connectHandlers,
...logsHandlers,
...voicewakeHandlers,
...healthHandlers,
...channelsHandlers,
...chatHandlers,
...cronHandlers,
...deviceHandlers,
...doctorHandlers,
...execApprovalsHandlers,
...webHandlers,
...modelsHandlers,
...configHandlers,
...wizardHandlers,
...talkHandlers,
...toolsCatalogHandlers,
...ttsHandlers,
...skillsHandlers,
...sessionsHandlers,
...systemHandlers,
...updateHandlers,
...nodeHandlers,
...nodePendingHandlers,
...pushHandlers,
...sendHandlers,
...usageHandlers,
...agentHandlers,
...agentsHandlers,
...browserHandlers,
};
每个 handler 的类型签名简洁统一:
export type GatewayRequestHandler = (opts: GatewayRequestHandlerOptions) => Promise<void> | void;
export type GatewayRequestHandlerOptions = {
req: RequestFrame;
params: Record<string, unknown>;
client: GatewayClient | null;
isWebchatConnect: (params: ConnectParams | null | undefined) => boolean;
respond: RespondFn;
context: GatewayRequestContext;
};
respond 是 handler 专属的响应回调,已经绑定了 req.id,handler 调用 respond(true, payload) 时,框架自动拼出 { type: "res", id: req.id, ok: true, payload }。handler 不需要关心 WebSocket 帧格式。
九、Scope 权限体系:最小特权原则落地
OpenClaw 实现了细粒度的 Scope 系统,比 RBAC 更精细:
// src/gateway/method-scopes.ts
export const ADMIN_SCOPE = "operator.admin";
export const READ_SCOPE = "operator.read";
export const WRITE_SCOPE = "operator.write";
export const APPROVALS_SCOPE = "operator.approvals";
export const PAIRING_SCOPE = "operator.pairing";
每个方法都有明确的 scope 归属:
-
operator.read:health、channels.status、sessions.list、logs.tail……只读操作; -
operator.write:send、agent、chat.send、node.invoke……发消息和触发 AI; -
operator.approvals:exec.approval.*……仅管理执行许可; -
operator.pairing:node.pair.*、device.pair.*……仅管理设备配对; -
operator.admin:config.*、wizard.*、agents.create……管理员操作,覆盖所有;
读操作有一个特殊规则——operator.write 隐含了 operator.read 的权限:
if (requiredScope === READ_SCOPE) {
if (scopes.includes(READ_SCOPE) || scopes.includes(WRITE_SCOPE)) {
return { allowed: true };
}
return { allowed: false, missingScope: READ_SCOPE };
}
这让"能写的客户端也能读"这个直觉得以实现,而无需给每个有写权限的客户端都显式加上 read scope。
Node 角色的方法隔离
node 角色只能使用 NODE_ROLE_METHODS 里列出的方法:
const NODE_ROLE_METHODS = new Set([
"node.invoke.result", // 返回工具调用结果
"node.event", // 推送节点事件
"node.pending.drain", // 清空待执行队列
"node.canvas.capability.refresh",
"node.pending.pull",
"node.pending.ack",
"skills.bins", // 汇报已安装的 skill 二进制
]);
移动端设备作为 node 接入后,只能报告执行结果和拉取任务,完全无法调用 chat.send、config.set 这类方法——即使它的 token 泄露,攻击面也被大幅压缩。
十、广播引擎:事件推送的作用域守卫
服务端主动推事件通过 src/gateway/server-broadcast.ts 实现。核心是 createGatewayBroadcaster:
export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient> }) {
let seq = 0;
const broadcastInternal = (
event: string,
payload: unknown,
opts?: GatewayBroadcastOpts,
targetConnIds?: ReadonlySet<string>,
) => {
if (params.clients.size === 0) { return; }
const isTargeted = Boolean(targetConnIds);
const eventSeq = isTargeted ? undefined : ++seq; // 定向推送不带全局 seq
const frame = JSON.stringify({
type: "event", event, payload,
seq: eventSeq,
stateVersion: opts?.stateVersion,
});
for (const c of params.clients) {
if (targetConnIds && !targetConnIds.has(c.connId)) { continue; }
// Scope 守卫:特权事件只推给有对应 scope 的连接
if (!hasEventScope(c, event)) { continue; }
// 慢消费者处理
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES;
if (slow && opts?.dropIfSlow) { continue; } // 允许丢弃:跳过
if (slow) {
c.socket.close(1008, "slow consumer"); // 不允许丢弃:踢出
continue;
}
c.socket.send(frame);
}
};
// ...
}
事件的 Scope 守卫是独立的:
const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
"exec.approval.requested": [APPROVALS_SCOPE],
"exec.approval.resolved": [APPROVALS_SCOPE],
"device.pair.requested": [PAIRING_SCOPE],
"device.pair.resolved": [PAIRING_SCOPE],
"node.pair.requested": [PAIRING_SCOPE],
"node.pair.resolved": [PAIRING_SCOPE],
};
function hasEventScope(client: GatewayWsClient, event: string): boolean {
const required = EVENT_SCOPE_GUARDS[event];
if (!required) { return true; } // 无守卫事件:全部客户端可接收
const role = client.connect.role ?? "operator";
if (role !== "operator") { return false; }
const scopes = Array.isArray(client.connect.scopes) ? client.connect.scopes : [];
if (scopes.includes(ADMIN_SCOPE)) { return true; }
return required.some((scope) => scopes.includes(scope));
}
执行审批请求和设备配对请求这类事件,只会推送给有对应 scope 的连接。一个只有 operator.read scope 的只读监控客户端,不会收到安全敏感事件。
dropIfSlow 选项让广播变成"尽力投递"语义——状态快照和 tick 心跳等无关紧要的事件,慢消费者直接跳过,不会导致其被踢下线。而 agent 流式回复这种必须保序的事件,则让慢消费者付出被踢下线的代价。
十一、UnauthorizedFloodGuard:防角色探测攻击
已认证的连接,如果反复调用权限不足的方法,会触发 UnauthorizedFloodGuard:
const DEFAULT_CLOSE_AFTER = 10; // 超过 10 次未授权调用,关闭连接
const DEFAULT_LOG_EVERY = 100; // 每 100 次记一条日志(防日志爆炸)
export class UnauthorizedFloodGuard {
private count = 0;
private suppressedSinceLastLog = 0;
registerUnauthorized(): UnauthorizedFloodDecision {
this.count += 1;
const shouldClose = this.count > this.closeAfter; // > 10 次则关闭
const shouldLog = this.count === 1 // 第一次必记
|| this.count % this.logEvery === 0 // 每 100 次记一次
|| shouldClose; // 触发关闭时必记
// ...
return { shouldClose, shouldLog, count, suppressedSinceLastLog };
}
reset(): void { // 只要有一次成功的授权调用,计数归零
this.count = 0;
this.suppressedSinceLastLog = 0;
}
}
这个设计非常合理:每个连接独立一个 guard 实例;只要穿插了合法调用,计数就会重置(不会因为偶尔调用一个没权限的方法就被踢);但如果连续探测超过 10 个无权方法,就会被关闭,且只有在第 1 次、第 100、200……次或关闭时才写日志,防止日志被 flood。
十二、GatewayRequestContext:请求上下文的依赖注入
每个 handler 收到的 context 对象是整个 Gateway 的服务注入点,类型定义超过 70 行:
export type GatewayRequestContext = {
deps: ReturnType<typeof createDefaultDeps>;
cron: CronService;
cronStorePath: string;
execApprovalManager?: ExecApprovalManager;
loadGatewayModelCatalog: () => Promise<ModelCatalogEntry[]>;
getHealthCache: () => HealthSummary | null;
refreshHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
logGateway: SubsystemLogger;
broadcast: GatewayBroadcastFn;
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
nodeSendToAllSubscribed: (event: string, payload: unknown) => void;
nodeSubscribe: (nodeId: string, sessionKey: string) => void;
nodeUnsubscribe: (nodeId: string, sessionKey: string) => void;
nodeRegistry: NodeRegistry;
agentRunSeq: Map<string, number>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatAbortedRuns: Map<string, number>;
chatRunBuffers: Map<string, string>;
addChatRun: (sessionId: string, entry: { sessionKey: string; clientRunId: string }) => void;
removeChatRun: (...) => { ... } | undefined;
registerToolEventRecipient: (runId: string, connId: string) => void;
dedupe: Map<string, DedupeEntry>;
wizardSessions: Map<string, WizardSession>;
getRuntimeSnapshot: () => ChannelRuntimeSnapshot;
startChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
stopChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
// ...
};
这个对象由 buildRequestContext() 在每次消息处理时构建。它是闭包驱动的——broadcast、nodeRegistry、chatAbortControllers 这些状态都来自 server 启动时初始化的共享引用,buildRequestContext 只是把它们打包成统一接口传进 handler。
dedupe 是去重 map,防止网络抖动导致同一请求被客户端重试多次(TTL = 5 分钟,最多 1000 条)。agentRunSeq 确保 agent 流式事件的序号单调递增,不会因为并发推送乱序。
十三、健康快照:版本化的惰性刷新
health-state.ts 实现了一个简洁的健康快照管理模式:
let presenceVersion = 1;
let healthVersion = 1;
let healthCache: HealthSummary | null = null;
let healthRefresh: Promise<HealthSummary> | null = null; // 防并发重入
export async function refreshGatewayHealthSnapshot(opts?: { probe?: boolean }) {
if (!healthRefresh) {
healthRefresh = (async () => {
const snap = await getHealthSnapshot({ probe: opts?.probe });
healthCache = snap;
healthVersion += 1; // 健康版本 +1,触发客户端更新
if (broadcastHealthUpdate) {
broadcastHealthUpdate(snap); // 推送健康事件给所有客户端
}
return snap;
})().finally(() => {
healthRefresh = null; // 无论成功失败,清除 in-flight 标记
});
}
return healthRefresh; // 并发调用共享同一个 Promise
}
healthRefresh 是一个 Promise 去重锁:如果已经有一个 health 刷新在进行中,新的调用直接返回同一个 Promise,而不会发起新的健康检查。这对于 hello-ok 之后立刻触发 refreshGatewayHealthSnapshot({ probe: true }) 的场景很重要——多个客户端几乎同时连接时,只会有一次真实的健康探测。
presenceVersion 和 healthVersion 是两个独立的单调整数。stateVersion: { presence, health } 随每个广播事件一起发出,客户端可以判断自己是否持有最新快照,按需主动刷新(而不是每次连接都重新拉一遍)。
十四、启动侧车:Gateway 开机时的并行任务
src/gateway/server-startup.ts 的 startGatewaySidecars 负责在 Gateway HTTP 服务器就绪后,启动一系列后台服务:
export async function startGatewaySidecars(params) {
// 1. 清理过期 session 锁文件(防止崩溃后残留锁)
await cleanStaleLockFiles({ sessionsDir, staleMs: 30 * 60 * 1000, removeStale: true });
// 2. 启动浏览器控制服务器(如果配置启用)
browserControl = await startBrowserControlServerIfEnabled();
// 3. 启动 Gmail 监听器(如果配置了 hooks.gmail.account)
await startGmailWatcherWithLogs({ cfg, log: logHooks });
// 4. 加载内部 Hook 处理器
clearInternalHooks();
const loadedCount = await loadInternalHooks(cfg, defaultWorkspaceDir);
// 5. 启动所有渠道(Telegram、Discord、Signal……)
await params.startChannels();
// 6. 触发 gateway:startup 内部 hook(延迟 250ms,等渠道就绪)
setTimeout(() => {
void triggerInternalHook(createInternalHookEvent("gateway", "startup", "gateway:startup", ...));
}, 250);
// 7. 启动插件服务(memory-lancedb 等 plugin services)
pluginServices = await startPluginServices({ registry, config, workspaceDir });
// 8. 启动内存后端(QMD memory)
void startGatewayMemoryBackend({ cfg, log });
// 9. 处理 restart sentinel(服务重启后自动唤醒之前的 agent)
if (shouldWakeFromRestartSentinel()) {
setTimeout(() => void scheduleRestartSentinelWake({ deps }), 750);
}
return { browserControl, pluginServices };
}
这里有一个细节:gateway:startup hook 是 250ms 后触发的,而 restart sentinel wake 是 750ms 后触发的。这个时序保证 hook 处理器和渠道连接在 agent 唤醒前已经就绪。
十五、Schema 验证:AJV + TypeBox 的双层体系
所有 RPC 参数的 schema 用 @sinclair/typebox 声明,运行时验证用 AJV 编译。protocol/index.ts 在模块加载时把所有 schema 预编译为验证函数:
const ajv = new (AjvPkg as unknown as new (opts?: object) => import("ajv").default)({
allErrors: true, // 收集所有错误而非第一个就停
strict: false,
removeAdditional: false, // 不移除额外字段(由 TypeBox additionalProperties: false 处理)
});
export const validateConnectParams = ajv.compile<ConnectParams>(ConnectParamsSchema);
export const validateRequestFrame = ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateResponseFrame = ajv.compile<ResponseFrame>(ResponseFrameSchema);
// ... 共 50+ 个预编译验证器
所有 schema 都带 additionalProperties: false,确保客户端不能传入任何未声明字段。这既防止了字段注入,也让接口保持严格的向前兼容性。
当验证失败时,formatValidationErrors 把 AJV 的错误对象转换为可读的错误消息:
export function formatValidationErrors(errors: ErrorObject[] | null | undefined) {
// 特殊处理 additionalProperties 错误,给出具体是哪个多余字段
for (const err of errors) {
if (keyword === "additionalProperties") {
const additionalProperty = params?.additionalProperty;
parts.push(`${where}: unexpected property '${additionalProperty}'`);
continue;
}
// 通用错误格式化
const where = instancePath ? `at ${instancePath}: ` : "";
parts.push(`${where}${message}`);
}
// 去重后合并
const unique = Array.from(new Set(parts.filter(...)));
return unique.join("; ");
}
十六、可观测性:结构化的 WS 日志体系
Gateway 的 WebSocket 日志不是简单的字符串拼接,而是有独立的结构化系统(ws-log.ts,439 行)。
每条日志都有固定格式:
[gateway/ws] ← in req chat.send id=a1b2…c3d4 session=xxx… delta=12ms
[gateway/ws] → out res chat.send ✓ durationMs=142 errorCode=n/a
[gateway/ws] → out event agent seq=1024 clients=3 presenceVersion=7
UUID 被缩写为 前8位…后4位 的形式(shortId),既可读又节省空间。敏感字段(API key、token)通过 redactSensitiveText 脱敏后才进日志。
这套日志体系让运维人员在不需要断点调试的情况下,就能从日志里还原出一次完整的请求往来过程。
总结
回顾整个 Gateway RPC 运行时,它的设计体现了几个核心原则:
协议的严格性:每个帧都经过 Schema 验证,每个握手步骤都是独立的关卡。任何格式错误或安全异常,都会立刻终止连接并记录原因。协议版本双边区间让客户端可以声明自己能接受的范围,而不是要求精确匹配。
认证的纵深防御:六关卡握手流水线——格式验证、版本协商、Origin 检查、共享认证、设备签名、配对验证——每关都是独立的防线。nonce-based 挑战-响应防重放,设备公钥推导设备 ID 防伪造,平台/家族 pinning 防元数据冒用。
权限的最小原则:Role + Scope 双维度,每个 RPC 方法和每个广播事件都有精确的权限要求。node 角色只能使用 node.* 方法,读操作和写操作分开授权,安全敏感事件只推给有对应 scope 的连接。
可靠性的精细运营:慢消费者踢出、去重 TTL、健康快照惰性刷新与 Promise 去重锁、unauthorized flood guard——每一个都是针对具体运维场景的精确补丁。
涉及源文件
-
src/gateway/protocol/schema/frames.ts— 帧 Schema 定义 -
src/gateway/protocol/schema/snapshot.ts— Snapshot / Presence / StateVersion Schema -
src/gateway/protocol/index.ts— AJV 预编译验证器 + formatValidationErrors -
src/gateway/server-constants.ts— 帧大小、心跳、超时常量 -
src/gateway/server/ws-connection.ts— 连接生命周期状态机 -
src/gateway/server/ws-connection/message-handler.ts— 消息处理核心(1158 行) -
src/gateway/server/ws-connection/auth-context.ts— 认证状态解析与决策 -
src/gateway/server/ws-connection/unauthorized-flood-guard.ts— 未授权洪水防护 -
src/gateway/server/health-state.ts— 健康快照版本管理 -
src/gateway/server-methods.ts— 方法注册表 + handleGatewayRequest -
src/gateway/server-methods-list.ts— 全量方法和事件列表 -
src/gateway/method-scopes.ts— Scope 体系与方法授权 -
src/gateway/role-policy.ts— Role 体系 -
src/gateway/server-broadcast.ts— 广播引擎与事件 Scope 守卫 -
src/gateway/server-startup.ts— Gateway 启动侧车服务 -
src/gateway/server-methods/types.ts— GatewayRequestContext 类型定义 -
src/gateway/ws-log.ts— 结构化 WS 日志体系