阅读视图

发现新文章,点击刷新页面。

邪修!让显示器支持AI、远程、手势三种控制方式

大家好,我是石小石~


解锁明基RD270Q的新玩法

前不久,明基发布了最新款式的编程系列显示器 RD270Q,很荣幸我获得了优先体验资格。刚开箱,我就被它出众的颜值所吸引。

这款显示器保留了RD系列最核心也是我最喜欢的「编程模式」,而且它还升级到144Hz 高刷 并增加了彩纸模式。这使得在长时间编码下,它能极大缓解眼部疲劳,体验感非常舒适。

接下来,我会分享借助RD270Q配套的DisplayPilot2软件,结合AI与编码,如何玩转显示器的特色功能:

  • 用 Claude code 切换显示器编程模式

  • 用手机远程操控显示器锁屏

  • 用手势实现显示屏亮度调节 (动图帧率问题,图片效果不是很明显)

同时,我会结合长时间的编码体验,验证它是否能成为程序员必备的专业显示器。

显示器控制的核心——Display Pilot 2

无论是通过 AI、手机远程还是手势来控制显示器,核心本质都是依靠电脑上运行的 “脚本” 去操控显示器硬件。借助一些键鼠模拟脚本(如 Node 的robotjs、nut-js,或Python的keyboard),我们可以通过模拟鼠标事件来间接操控软件实现功能,如通过 Node.js 脚本实现自动移动鼠标,并双击启动软件的自动化操作:

对应核心代码如下:

const { mouse,straightTo,Point,Button} = require("@nut-tree-fork/nut-js");
(async () => {
  // 移动鼠标到指定位置
  await mouse.move(straightTo(new Point(10, 10)));
  console.log("鼠标移动完成!");
  // 点击鼠标
  await mouse.doubleClick(Button.LEFT);
  console.log("执行完成!");
})();

可以看出,一些复杂的软件操作,通过模拟鼠标实现还是非常麻烦的,最重要的是脚本几乎无法控制硬件。

幸运的是,明基 RD270Q 自带了配套软件 Display Pilot 2,它可以直接通过软件快速调用显示器的硬件级操作能力,以满足我们编程中的个性化控制需求。参考软件截图,它拥有非常多的显示屏操作功能,且基本都支持通过快捷键操作。

思路到这里就很清晰了:我们完全可以编写脚本,模拟键盘事件触发 Display Pilot 2 的快捷操作,从而间接实现对显示器的控制。

使用Clade code+skills控制显示屏

编程模式切换效果演示

编程模式是明基 RD 系列显示器的特色功能,在深色模式下,显示器会通过硬件级算法强化语法高亮效果,以提升长期编程的舒适度;RD270Q新增的彩纸模式,则能让界面产生类纸感的细腻色彩,满足深度护眼需求。如下图,在黑暗模式下,明基对代码的显示优化非常明显,代码对比更加鲜明,不刺眼。

它还搭载了莱茵认证的抗反射抗面板,即便在强光环境下使用,屏幕也不会刺眼、不产生明显眩光,长时间观看依旧舒适。

在配套软件的基础上,我们能否借助 AI 实现这些显示模式的一键自动切换呢?答案是完全可以。 比如,直接通过 AI 对话下达指令,让显示器自动切换至电子书模式

或是通过指令让 AI 精准调节屏幕亮度、音频大小等参数

原理分析——RD270Q-Opera-skills

Claude Code 为例,我们来实现这一效果。需要明确的是:AI 本身并不能直接操控显示器硬件,即便它能生成脚本,也不知道如何与显示器交互。因此,我们可以通过自定义技能(Skills) —— 比如创建一个 RD270Q-operation-skills,来为 AI 扩展控制显示器的能力。

如果你不了解 Skills,请自行百度。

该技能的项目结构如下:

RD270Q-operation-skills/
├── SKILL.md              # 元数据与指令定义
├── index.js              # 主入口:命令解析与分发
├── package.json          # 项目依赖配置
├── test.js               # 功能测试脚本
├── scripts/              # 底层操作模块
│   ├── keyboard.js       # 键盘快捷键封装
│   └── mouse.js          # 鼠标操作封装
└── references/           # 参考文档
    └── 快捷键表.md        # Display Pilot 2 完整快捷键

整个技能的核心逻辑非常简单: 将 Display Pilot 2 的快捷键功能在代码中做映射,让 AI 可以通过函数调用触发。

示例核心代码(scripts/keyboard.js):

// 键盘快捷键模块 - 封装 Display Pilot 2 所有控制功能
const { keyboard, Key } = require("@computer-use/nut-js");

// 执行快捷键组合
async function executeShortcut(...keys) {
  await keyboard.pressKey(...keys);
  await new Promise(resolve => setTimeout(resolve, 100));
  await keyboard.releaseKey(...keys);
}

// ==================== 色彩模式 ====================
// 循环切换色彩模式 Ctrl+Alt+C
async function cycleColorModes() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.C);
}
// 编程亮模式 Ctrl+Alt+1
async function setCodingLight() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.Num1);
}
// 编程暗模式 Ctrl+Alt+2
async function setCodingDark() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.Num2);
}
// 编程纸张模式 Ctrl+Alt+0
async function setCodingPaper() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.Num0);
}
// ..... 其他快捷操作


// 导出所有方法
module.exports = {
  executeShortcut,
  cycleColorModes,
  setCodingLight,
  setCodingDark,
  setMBook,
  // ...
};

我们只需要在 SKILL.md 中规范好 AI 的调用方式与指令规则,完成整套技能开发后,Claude Code 就拥有了直接操控显示器模式的能力,使用体验直接拉满。

除了编程模式的切换,凡是 Display Pilot 2 能通过快捷键实现的显示器操控功能,这个skills都能完美胜任,甚至像Display Pilot 2屏幕分区这样的高级功能,也能通过控制鼠标来模拟实现。

使用手机远程控制显示屏

很多时候,我们可能临时有事需要离开工位,如果我们突然想锁屏或者想远程控制一下鼠标执行某个简单操作就必须立刻回到工位才行。基于这中场景,实现手机远程控制显示器就非常有意义。

如下图,就是根据明基RD270Q支持的快捷键开发的一个移动端操作界面,并增加了鼠标触摸移动控制功能。

远程锁屏、鼠标控制演示

如果外出忘记锁屏,通过手机实现这个功能非常方便实用。

此外,通过移动端界面的触控区域,我们还能远程操控鼠标移动、直接打开 VSCode 等软件。是不是有点Todesk青春版的感觉?

除此之外,其他快捷操作,如编程模式、亮度调节、夜间保护调节等功能都是支持的,这里也就不一一展示了。

原理分析——websoket+node控制快捷键

远程控制的方案其实非常简单:核心就是跑在本地的一个 Node 脚本,用来模拟键盘、鼠标操作,间接通过 Display Pilot 2 控制显示器。同时启动一个 Web 服务提供移动端操作界面,借助 WebSocket 实现手机与 Node 服务实时通信,最终完成远程控制。简单涞水,就是Web 端通过WebSocket 控制本地端Node服务模拟系统快捷键操作

前端就是一个普通的 Vue 项目 , 页面上放几个控制按钮,点击时通过 WebSocket 向 Node 服务发送对应指令:

function createWebSocketServer(server) {
  const wss = new WebSocket.Server({ server, path: "/ws" });
  wss.on("connection", (ws) => {
    console.log("移动端已连接");
    ws.on("message", async (msg) => {
        const { type, action, params } = JSON.parse(msg);
        // 鼠标操作
        if (type === "mouse") {
          if (action === "move") {
            // 鼠标移动
            await mouse.move(params.x, params.y);
          } else if (action === "click") {
            // 鼠标点击
            await mouse.click(params.button);
          }
        }
        // 键盘操作
        if (type === "keyboard"){
          
        }
    });
  });
}

Node 端主要搭建 WebSocket 服务,接收移动端指令并执行系统操作。

const app = express();
const server = http.createServer(app);

// 初始化 WebSocket 服务
createWebSocketServer(server);

server.listen(PORT, () => {
  console.log(`WS 服务已启动:ws://localhost:${PORT}/ws`);
});;

具体的鼠标移动、键盘快捷键等逻辑,统一封装在 mouse.jskeyboard.js 中,底层依赖node第三方库nut-js实现鼠标和快捷键控制。

使用手势控制显示屏

RD270Q 还有个我觉得特别实用的功能 ——Visual Optimizer 视觉优化。它通过内置光传感器,能根据环境光智能同步调节屏幕亮度与色温,降低屏幕与环境的明暗反差,配合编码深色模式,长时间看代码也更柔和护眼。

不仅如此,我们还可以通过Display Pilot 2进一步调整屏幕亮度,实现个性化需求。基于Display Pilot 2,我们还能实现通过手势控制实现显示器的隔空操作,作为技术创意尝鲜、趣味交互玩具,还是得研究和尝试的。

桌面版的手势识别存在一定技术难度,恰好之前我有写过类似的技术文章:油猴+手势识别:我实现了任意网页隔空控制!索性偷个懒,在网页上实现手势识别用来控制显示器。先看看Demo效果:

  • 左手张开 + 右手滑动,即可调低屏幕亮度(左手握拳 + 右手滑动,即可调高屏幕亮度)

  • 右手握拳,可以实现一键锁屏功能

它的核心实现是基于MediaPipe,这是一个是谷歌开源的跨平台、实时轻量级多媒体机器学习框架,支持 Python、JS 等多种编程语言,借助它能轻松实现桌面级的手势识别功能。

如果你对相关技术感兴趣,可以看看这个实现

Demo:油猴+手势识别:我实现了任意网页隔空控制!

代码:《有趣的手势识别、人脸识别脚本》

Flow 智能工作流

本来我还在琢磨,能不能通过 AI 指令或远程控制,自己搭一套编码时的专属显示方案,比如打开 VS Code 就自动切换到我习惯的亮度、护眼参数等。结果发现 RD270Q 早已自带了 Flow 智能工作流,在 Display Pilot 2 里提前预设好编程、文档、设计等场景后,打开对应软件就能自动切换显示参数,省去反复调节的麻烦,真正实现了 “打开即用” 的智能个性化体验。

结语

从借助 AI 指令、移动端远程控制显示器,到创意十足的手势隔空控制,这篇文章我通过三种个性化玩法,把RD270Q显示器的自定义操控能力发挥到了极致。这些功能实现的核心,离不开Display Pilot 2对显示器本身的 稳定操控能力。

当然,即便不借助这款软件,文中的思路也可以延伸到电脑本身的快捷操作、系统级功能调用上,大家不妨顺着这个方向自行尝试拓展。

写完这篇文章已是凌晨,144Hz 高刷屏搭配显示器的深色编码模式,长时间使用眼部依然舒适,没有出现干涩、疲劳感。实际体验下来,RD270Q 的护眼技术确实做得不错,整体感受很好。

总而言之,新款 RD270Q 不仅保留了核心优势,价格也很有诚意,三千出头,上市期间会更优惠!兄弟们,不用犹豫,这次可以放心冲了。当然,要是追求极致编程体验 RD280URD280UGRD320U也也都是非常不错的选择。

最后, 附上一张深夜codding的图,希望这篇分享能为大家带来一些实用参考。

Vibe Coding 测试体系:API 测试、单元测试与 e2e 测试实战指南

前言:测试模块是 vibe coding 过程中不可忽视的一环,能够显著提升代码的稳定性与可维护性。本文系统介绍三种主流测试方式——API 测试单元测试e2e 测试,并结合 MuseMVP 项目的真实集成方式逐一演示,帮助你在快速迭代的同时建立可靠的质量防线。

项目集成

  • 安装@playwright/testvitest@vitest/ui三个依赖
  • vitest配置
import { fileURLToPath } from "node:url";
import { defineConfig } from "vitest/config";

export default defineConfig({
  resolve: {
    alias: {
      "@": fileURLToPath(new URL("./src", import.meta.url)),
    },
  },
  test: {
    environment: "node",
    globals: true,
    include: ["tests/unit/**/*.test.ts"],
  },
});
  • playwright配置
import { defineConfig, devices } from "@playwright/test";

const baseURL =
  process.env.E2E_BASE_URL ||
  process.env.NEXT_PUBLIC_SITE_URL ||
  "http://localhost:3000";

export default defineConfig({
  testDir: "./journeys",
  fullyParallel: false,
  forbidOnly: true,
  retries: 0,
  workers: 1,
  reporter: [
    ["list"],
    [
      "html",
      { open: "never", outputFolder: "../../.test-artifacts/browser-report" },
    ],
  ],
  outputDir: "../../.test-artifacts/browser-output",
  globalTeardown: "./cleanup.ts",
  use: {
    baseURL,
    trace: "on-first-retry",
    screenshot: "only-on-failure",
    video: "retain-on-failure",
  },
  projects: [
    {
      name: "desktop-chromium",
      use: {
        ...devices["Desktop Chrome"],
      },
    },
  ],
});

单元测试

用于验证纯逻辑层的行为,例如促销码的格式处理。以下示例测试了账单模块中 schema 对输入的修剪与空值转换逻辑,无需启动完整应用即可快速执行。

import { describe, expect, test } from "vitest";
import {
  createCustomerHubSchema,
  createLaunchSchema,
  museBillingGatewayParamSchema,
} from "@/backend/api/routes/muse-billing/types";

describe("muse-billing route schemas", () => {
  test("trims discount codes and converts blanks to undefined", () => {
    const parsed = createLaunchSchema.parse({
      planProductId: "plan_pro_monthly",
      discountCode: " SAVE20 ",
    });
    const blankParsed = createLaunchSchema.parse({
      planProductId: "plan_pro_monthly",
      discountCode: "   ",
    });

    expect(parsed.discountCode).toBe("SAVE20");
    expect(blankParsed.discountCode).toBeUndefined();
  });
});

e2e测试

使用Playwright框架测试真实浏览器环境下的公共页、认证页和受保护页面

playwright添加--ui参数进行可视化验证

{
  "scripts": {
    "test:e2e": "playwright test --config=tests/e2e/playwright.muse.config.ts",
    "test:e2e:ui": "playwright test --config=tests/e2e/playwright.muse.config.ts --ui",
  }
}

落地页可见性

对公开页面进行基础冒烟测试,验证页面能够正常打开、响应状态正常、DOM 可见。

import { expect, test } from "@playwright/test";
import { ROUTE_BOOK } from "../utils/routes";

test.describe("guest-facing route smoke", () => {
  test("home page renders", async ({ page }) => {
    const response = await page.goto(ROUTE_BOOK.home);

    expect(response?.ok()).toBeTruthy();
    await expect(page.locator("body")).toBeVisible();
  });
});

埋点测试

对于需要精确验证渲染结果的页面(例如定价页),可在前端组件中添加data-testid属性作为测试锚点,无需侵入业务逻辑,即可实现精准断言。

import { expect, test } from "@playwright/test";
import { ROUTE_BOOK } from "../utils/routes";

test.describe("guest-facing route smoke", () => {
  test("pricing page renders critical pricing UI", async ({ page }) => {
    const response = await page.goto(ROUTE_BOOK.pricing);

    expect(response?.ok()).toBeTruthy();
    await expect(page.getByTestId("pricing-section")).toBeVisible();
    await expect(
      page.locator('[data-testid^="pricing-plan-cta-"]').first(),
    ).toBeVisible();
  });
});

export function PricingSection() {

 // ··· 

  return (
    <section
      id="pricing"
      className={cn("bg-muted/40", className)}
      data-testid="pricing-section"
    >
      {/* ··· */}
    section>
  );
}

后台页面

对于需要身份验证的后台页面,测试流程分两步:首先通过辅助工具创建测试用户并完成登录,再像操作真实浏览器一样验证页面内容。后续断言逻辑与公共落地页测试保持一致,复用性强。

import { expect, test } from "@playwright/test";
import { ROUTE_BOOK } from "../utils/routes";
import { establishSession, makePasswordUser } from "../utils/session";

test.describe("entry and account shell smoke", () => {
  test("unauthenticated users are redirected to login", async ({ page }) => {
    await page.goto(ROUTE_BOOK.app);

    await expect(page).toHaveURL(/\/auth\/login/);
  });

  test("password login reaches the app home page", async ({ page }) => {
    const user = await makePasswordUser({
      prefix: "e2e-login-ui",
      role: "user",
    });

    await page.goto(ROUTE_BOOK.login);
    await page.getByTestId("login-email").fill(user.email);
    await page.getByTestId("login-password").fill(user.password);
    await page.getByTestId("login-submit").click();

    await expect(page).toHaveURL(/\/app(?:\?.*)?$/);
    await expect(page.getByTestId("app-home-ready")).toBeVisible();
  });

  test("authenticated users can open general settings", async ({ page }) => {
    const user = await makePasswordUser({
      prefix: "e2e-settings",
      role: "user",
    });

    await establishSession(page, user);
    await page.goto(ROUTE_BOOK.settingsGeneral);

    await expect(page).toHaveURL(/\/app\/settings\/general(?:\?.*)?$/);
    await expect(page.getByTestId("settings-general-page")).toBeVisible();
  });
});

以上均为带 UI 的交互式测试。去掉--ui参数后,测试将在headless模式下静默运行,适合在CI环境中使用,结果如下:

PS D:\project\___test> pnpm test:e2e
[dotenv@17.2.4] injecting env (20) from .env -- tip: 📡 add observability to secrets: https://dotenvx.com/ops

[browser cleanup] Removed 3 browser scenario user(s).

> musemvp@0.0.0 test:e2e D:\project\___test
> playwright test --config=tests/e2e/playwright.muse.config.ts

[dotenv@17.2.4] injecting env (20) from .env -- tip: 🔑 add access controls to secrets: https://dotenvx.com/ops

Running 7 tests using 1 worker

[dotenv@17.2.4] injecting env (0) from .env -- tip: ✅ audit secrets and track compliance: https://dotenvx.com/ops1 …nt-entry.smoke.spec.ts:6:7 › entry and account shell smoke › unauthenticated users are redirected to login (772ms)  ✓  2 …account-entry.smoke.spec.ts:12:7 › entry and account shell smoke › password login reaches the app home page (1.8s)ℹ [DB] runtime=node source=database_url strategy=database_url_first NODE_ENV=unknown POOL_MAX=53 …t-entry.smoke.spec.ts:27:7 › entry and account shell smoke › authenticated users can open general settings (949ms)  ✓  4 …um] › tests\e2e\journeys\admin-console.smoke.spec.ts:5:5 › admin users can access the users management page (1.2s)  ✓  5 …omium] › tests\e2e\journeys\public-routes.smoke.spec.ts:5:7 › guest-facing route smoke › home page renders (732ms)  ✓  6 …eys\public-routes.smoke.spec.ts:12:7 › guest-facing route smoke › pricing page renders critical pricing UI (789ms)  ✓  7 …] › tests\e2e\journeys\public-routes.smoke.spec.ts:22:7 › guest-facing route smoke › features page renders (694ms)

[browser cleanup] Removed 3 browser scenario user(s).

  7 passed (15.1s)

To open last HTML report run:

  pnpm exec playwright show-report test-results\browser-report

API测试

用于验证 AI chat 对话接口的权限隔离行为。以下示例测试了多用户场景下的会话归属:确保用户 A 创建的会话对用户 B 不可见,同时验证所有者自身的读取权限正常工作。

import { afterAll, beforeAll, describe, expect, test } from "vitest";
import { purgeScenarioUsers } from "../utils/accounts";
import {
  ensureLocalAppReady,
  issueSignedInSession,
  sendContractRequest,
} from "./request-utils";

describe("conversation ownership stays isolated across member sessions", () => {
  beforeAll(async () => {
    await ensureLocalAppReady();
  });

  afterAll(async () => {
    await purgeScenarioUsers();
  });

  test("one user cannot fetch or delete another user's conversation", async () => {
    const userA = await issueSignedInSession({
      prefix: "ownership-a",
    });
    const userB = await issueSignedInSession({
      prefix: "ownership-b",
    });

    const createResponse = await sendContractRequest(
      "POST",
      "/api/aichat/conversations",
      {
        cookies: userA.cookies,
      },
    );

    expect(createResponse.status).toBe(200);

    const createdPayload = (await createResponse.json()) as {
      conversation: { id: string };
    };
    const conversationId = createdPayload.conversation.id;

    const foreignRead = await sendContractRequest(
      "GET",
      `/api/aichat/conversations/${conversationId}`,
      {
        cookies: userB.cookies,
      },
    );
    expect(foreignRead.status).toBe(404);

    const foreignDelete = await sendContractRequest(
      "DELETE",
      `/api/aichat/conversations/${conversationId}`,
      {
        cookies: userB.cookies,
      },
    );
    expect(foreignDelete.status).toBe(404);

    const ownerRead = await sendContractRequest(
      "GET",
      `/api/aichat/conversations/${conversationId}`,
      {
        cookies: userA.cookies,
      },
    );
    expect(ownerRead.status).toBe(200);
  });
});

测试是 vibe coding 稳定运行的最后一道保障。

API 测试、单元测试、e2e 测试三层覆盖,缺一不可。希望本文的实战示例能直接用进你的项目中。

MuseMVP SaaS 模板已将上述三种测试模块完整集成,开箱即用,无需从零搭建。

公众号:尼采般地抒情

别再手搓 Skill 了,用这个工具 5 分钟搞定

大家好,我是凌览。

如果本文能给你提供启发或帮助,欢迎动动小手指,一键三连(点赞评论转发),给我一些支持和鼓励谢谢。


说实话,第一次看到 "Skill" 这个词,我也有点懵。是不是又要写很多代码?是不是只有程序员能玩?

后来自己上手做了几个才发现——完全不是那么回事。Skill 更像是:把一件你本来就会做的事,写成一套能反复执行、不会乱跑的步骤

就说做饭吧。第一次做某道菜,你得一边看菜谱一边试探;做多了流程就固定了——先备料,再处理,最后出锅。这时候让别人帮你做,你大概会说"就按这个步骤来,别自己发挥"。Skill 干的就是这事,只是对象从「人」变成了「模型」——让大模型也照着这个步骤来,别自己发挥。

一个 Skill 到底长啥样?

先记住一句话:一个 Skill,本质上就是一个文件夹。

最简单的结构:

my-skill/
├── SKILL.md        # 说明:什么时候用、输入输出是什么
├── scripts/        # 执行:真正跑的逻辑
└── references/     # 参考:示例输入输出

SKILL.md 是最核心的——告诉模型"这件事干嘛用的、什么时候用、输出长啥样"。

你可以把它当成一份「使用说明书 + 注意事项」。

Skill 能解决什么问题?

举个例子,之前我开放了一个去水印下载鸭工具,同时写了份接口文档。但说实话,调用接口的步骤是固定的——传参数、发请求、解析返回。就这几步,每次都要重复。

调用步骤:

  1. 传参:token、url
  2. 发请求:
curl -X GET "https://nologo.code24.top/api/open/parse?url=https%3A%2F%2Fv.douyin.com%2Fxxxxx" \
  -H "Authorization: your-token"
  1. 解析返回数据

就这三步,每次都要重复一遍,挺烦的。 图片1.png

写成 Skill 之后,这些步骤直接固化下来,AI 碰到要调去水印接口的场景,自己就跑完了,不用你再手动复制粘贴。

其中 token、url 怎么获取,直接在 SKILL.md 里写清楚,或者丢个文档链接就行。

image.png

这个去水印解析的 Skill 已经开源了:github.com/CatsAndMice…,名字叫 nologo-open-api

效果测试成功:

image 1.png

但说实话,手搓还是有点麻烦

概念不难理解,但真要自己从头写一个 Skill,还是得折腾——要想触发条件,要写 SKILL.md,要调试……

有没有更省事的方法?

有。用 skill-creator。

skill-creator 是什么?

Skill-Creator是一款专为开发者设计的Skill创建向导,旨在简化开发流程。其便捷性和实用性已得到广泛验证,在SkillHub上的下载量已突破7.5万。

地址:www.skillhub.cn/skills/skil…

image 2.png

说到 SkillHub,这事儿还有点意思:

安装 Skill-Creator 特别简单——直接跟它聊天就行,它会一步步指导 AI 帮你创建技能

举个例子:

你:"用 skill-creator 帮我创建一个提取小红书链接的 Skill"

它:"好的,我来帮你创建。先告诉我:这个 Skill 要处理什么类型的链接?"

你:"抖音、小红书都行,主要提取无水印的视频地址"

它:"明白了。输出格式要不要加上 metadata?我给你两个选项……"

就这么一来一回,你描述需求,它帮你搞定剩下的——写 SKILL.md、搭目录结构、甚至调试代码。

说白了就是:你出想法,它帮你落地。

总结

Skill 的核心是把一件事的"固定流程"写成可反复执行的步骤,让模型按规程稳定产出,避免每次都从零复制粘贴、手动操作。像调用去水印接口这类标准化流程,尤其适合沉淀成 Skill 直接复用;而如果你不想从触发条件、SKILL.md 到目录结构都自己手搓,skill-creator 这种对话式向导能把创建与落地成本降到最低。

iOS逆向工程:详细解析ptrace反调试机制的破解方法与实战步骤

Ptrace 提供了一种父进程可以控制子进程运行的机制,并可以检查和改变它的核心image。

  • 它主要用于实现断点调试。

1、一个被跟踪的进程运行中,直到发生一个信号,则进程被中止,并且通知其父进程。 2、在进程中止的状态下,进程的内存空间可以被读写。父进程还可以使子进程继续执行,并选择是否是否忽略引起中止的信号。

  • 本文采用tweak 的方式进行MSHookFunction

在iOS应用安全中,除了反调试机制,代码混淆也是一种重要的保护手段。IpaGuard是一款强大的iOS IPA文件混淆工具,无需源码即可对代码和资源进行混淆加密,支持多种开发平台,有效增加反编译难度。它提供代码混淆、资源文件混淆、调试信息清理等功能,可以帮助开发者保护应用免受逆向工程攻击。

软件环境:Xcode 硬件环境:iPhone5越狱手机、Mac 开发工具:Cycript、LLDB、logos Tweak、hopper、MonkeyDev、AFLEXLoader、dumpdecrypted、debugserver、ssh、class_dump、hook

  • 破解方案

1、运行时期,断点ptrace,直接返回 2、分析如何调用的ptrace,hook ptrace 3、通过tweak,替换disable_gdb函数 4、修改 PT_DENY_ATTACH

I、运行时期,断点ptrace,直接返回

初始化应用程序,而不是运行中附着

iPhone:~ root#  debugserver -x posix *:12345  /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet

初始化程序,目的是从程序入口就开始进行附着,这样我们就可以在一些安全防护代码执行之前,进行破解。

跳过ptrace:过命令thread return直接返回,以跳过函数的逻辑。

 (lldb) br set -n ptrace
 Breakpoint 2: where = libsystem_kernel.dylib`__ptrace, address = 0x00000001966af2d4
 (lldb) br command add 2
 Enter your debugger command(s).  Type 'DONE' to end.
 > thread return
 > c
 > DONE

II、分析如何调用的ptrace,hook ptrace

去掉ptrace的思路

  • 当程序运行后,使用 debugserver *:1234 -a BinaryName 附加进程出现 segmentfault 11 时,一般说明程序内部调用了ptrace 。
  • 为验证是否调用了ptrace 可以 debugserver -x backboard *:1234 /BinaryPath(这里是完整路径),然后下符号断点 b ptrace,c 之后看ptrace第一行代码的位置,然后 p $lr 找到函数返回地址,再根据 image list -o -f 的ASLR偏移,计算出原始地址。最后在 IDA 中找到调用ptrace的代码,分析如何调用的ptrace。
  • 开始hook ptrace。

2.0 准备工作:砸壳

  • 砸壳

blog.csdn.net/z929118967/…

  • 查找app路径
iPhone:~ root# ps -e |grep AlipayWallet
  714 ??         0:26.44 /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
  736 ttys000    0:00.01 grep AlipayWallet

  • 获取NSDocumentDirectory
iPhone:~ root# cycript -p AlipayWallet
cy# [[NSFileManager defaultManager] URLsForDirectory:NSDocumentDirectory inDomains:NSUserDomainMask][0]
#"file:///var/mobile/Containers/Data/Application/89313E1C-76C2-41E3-8ECD-F4BDC1A78524/Documents/"

  • scp 拷贝文件
devzkndeMacBook-Pro:decrypted devzkn$ scp /Users/devzkn/Downloads/kevin-software/ios-Reverse_Engineering/dumpdecrypted-master/dumpdecrypted.dylib iphone150:/var/mobile/Containers/Data/Application/89313E1C-76C2-41E3-8ECD-F4BDC1A78524/Documents/

devzkndeMacBook-Pro:decrypted devzkn$ scp iphone150:/var/mobile/Containers/Data/Application/89313E1C-76C2-41E3-8ECD-F4BDC1A78524/Documents/AlipayWallet.decrypted /Users/devzkn/decrypted/AlipayWallet

  • class-dump
devzkndeMacBook-Pro:bin devzkn$ class-dump --arch armv7 /Users/devzkn/decrypted/AlipayWallet10.1.8/AlipayWallet.decrypted -H -o /Users/devzkn/decrypted/AlipayWallet10.1.8/head

-查看 bundleIdentifier

iPhone:~ root# cycript -p AlipayWallet
cy# [[NSBundle mainBundle] bundleIdentifier]
@"com.alipay.iphoneclient"

2.1 编写 tweak 分析

%hook DFClientDelegate
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {

    %log();

    // 打印某个类的所有方法的,查看所有方法的执行顺序

     [KNHook hookClass:@"H5WebViewController"];//aluLoginViewController
     [KNHook hookClass:@"TBSDKServer"];//getUaPageName    aluMTopService _tokenLoginInvoker

     [KNHook hookClass:@"TBSDKMTOPServer"];//getUaPageName    aluMTopService _tokenLoginInvoker

    return %orig;
}
%end

2.2 具体步骤

2.2.1 debugserver

iPhone:~ root#  debugserver *:12345 -a AlipayWallet
debugserver-@(#)PROGRAM:debugserver  PROJECT:debugserver-320.2.89
 for armv7.
Attaching to process AlipayWallet...
Segmentation fault: 11

当程序运行后,使用 debugserver *:1234 -a BinaryName 附加进程出现 segmentfault 11 时,一般说明程序内部调用了ptrace 。

iPhone:~ root#  debugserver *:12345 -x backboard /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
debugserver-@(#)PROGRAM:debugserver  PROJECT:debugserver-320.2.89
 for armv7.
Segmentation fault: 11

2.2.2 分析如何调用的ptrace

  • debugserver -x
iPhone:~ root# debugserver -x *:12345 /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
error: invalid TYPE for the --launch=TYPE (-x TYPE) option: '*:12345'
Valid values TYPE are:
  auto       Auto-detect the best launch method to use.
  posix      Launch the executable using posix_spawn.
  fork       Launch the executable using fork and exec.
  backboard  Launch the executable through BackBoard Services.

总共有四种类型

debugserver -x backboard *:1234 /var/mobile/...... 把这个backboard改成posix试试

iPhone:~ root#  debugserver -x posix *:12345  /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
debugserver-@(#)PROGRAM:debugserver  PROJECT:debugserver-320.2.89
 for armv7.
Listening to port 12345 for a connection from *...

  • 在ptrace上下断点,找到调用ptrace的地方
(lldb)  b ptrace
Breakpoint 1: no locations (pending).
WARNING:  Unable to resolve breakpoint to any actual locations.

(lldb) c
Process 1657 resuming

  • 关闭Target,重新启动Target
1 location added to breakpoint 1
Process 1657 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
    frame #0: 0x37a13e64 libsystem_kernel.dylib`__ptrace
libsystem_kernel.dylib`__ptrace:
->  0x37a13e64 <+0>:  ldr    r12, [pc, #0x4]           ; <+12>
    0x37a13e68 <+4>:  ldr    r12, [pc, r12]
    0x37a13e6c <+8>:  b      0x37a13e74                ; <+16>
    0x37a13e70 <+12>: rsbeq  r9, r11, #192, #2
Target 0: (AlipayWallet) stopped.
(lldb)  p/x $lr
(unsigned int) $0 = 0x0000bfbb

由此可见ptrace函数在libsystem_kernel.dylib这个动态库中,使用时才进行加载,不是静态放在本地的,所以我们不能简单地去tweak ptrace函数。

(lldb) image list -o -f |grep AlipayWallet
[  0] 0x00000000 /private/var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet(0x0000000000004000)

所以ptrace的调用者位于0x0000bfbb - 0x00000000 = 0x0000bfbb处,如图所示:

  • 在hopper使用go to add ,快捷键G

大文件上传-spark-md5

概述:后端服务(Node + Express)、前端(vue+spark-md5)

一、后端服务

1、创建后端项目

mkdir upload-server
cd upload-server
npm init -y
npm install express cors multer fs-extra

2、 后端完整代码 server.js

const express = require('express');
const cors = require('cors');
const multer = require('multer');
const fse = require('fs-extra');
const path = require('path');

const app = express();
// 解决跨域 + 大文件请求体限制
app.use(cors());
app.use(express.json({ limit: '100mb' }));
app.use(express.urlencoded({ extended: true, limit: '100mb' }));

// 配置(和前端完全一致)
const UPLOAD_DIR = path.resolve(__dirname, 'upload'); // 分片临时目录
const MERGE_DIR = path.resolve(__dirname, 'merged');   // 合并后文件目录
const CHUNK_SIZE = 2 * 1024 * 1024; // 2MB 分片

// 确保目录存在(启动时就创建,避免运行时创建失败)
fse.ensureDirSync(UPLOAD_DIR);
fse.ensureDirSync(MERGE_DIR);

// ✅ 修复1:multer 配置,不再从 req.body 取参数,改用动态存储
const storage = multer.memoryStorage(); // 改用内存存储,避免目录创建时序问题
const upload = multer({ 
  storage,
  limits: { fileSize: 5 * 1024 * 1024 } // 限制分片大小,和前端一致
});

/**
 * 1. 查询已上传的分片(断点续传/秒传核心)
 */
app.post('/checkfile', async (req, res) => {
  try {
    const { fileHash, fileName } = req.body;
    if (!fileHash || !fileName) {
      return res.status(400).json({ code: -1, msg: '参数缺失' });
    }

    const ext = path.extname(fileName);
    const filePath = path.resolve(MERGE_DIR, `${fileHash}${ext}`);
    
    // 秒传:文件已存在
    if (fse.existsSync(filePath)) {
      return res.json({ code: 0, uploadedChunks: [], shouldUpload: false });
    }

    // 读取已上传分片
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
    let uploadedChunks = [];
    if (fse.existsSync(chunkDir)) {
      uploadedChunks = await fse.readdir(chunkDir);
    }
    res.json({ code: 0, uploadedChunks, shouldUpload: true });
  } catch (error) {
    console.error('checkfile 错误:', error);
    res.status(500).json({ code: -1, msg: '服务器错误' });
  }
});

/**
 * 2. 上传分片(修复核心:手动处理存储,避免 multer 时序问题)
 */
app.post('/uploadchunk', upload.single('chunk'), async (req, res) => {
  try {
    const { fileHash, chunkIndex } = req.body;
    const chunk = req.file; // multer 解析后的文件 buffer

    if (!fileHash || chunkIndex === undefined || !chunk) {
      return res.status(400).json({ code: -1, msg: '参数缺失' });
    }

    // 手动创建分片目录(确保存在)
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
    await fse.ensureDir(chunkDir);

    // 手动写入分片文件
    const chunkPath = path.resolve(chunkDir, chunkIndex.toString());
    await fse.writeFile(chunkPath, chunk.buffer);

    res.json({ code: 0, msg: '分片上传成功' });
  } catch (error) {
    console.error('uploadchunk 错误:', error);
    res.status(500).json({ code: -1, msg: '分片上传失败', error: error.message });
  }
});

/**
 * 3. 合并所有分片
 */
app.post('/mergefile', async (req, res) => {
  try {
    const { fileHash, fileName, chunkCount } = req.body;
    if (!fileHash || !fileName || !chunkCount) {
      return res.status(400).json({ code: -1, msg: '参数缺失' });
    }

    const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
    const ext = path.extname(fileName);
    const filePath = path.resolve(MERGE_DIR, `${fileHash}${ext}`);

    // 检查分片目录是否存在
    if (!fse.existsSync(chunkDir)) {
      return res.status(400).json({ code: -1, msg: '分片目录不存在' });
    }

    // 按顺序合并分片
    const writeStream = fse.createWriteStream(filePath);
    for (let i = 0; i < chunkCount; i++) {
      const chunkPath = path.resolve(chunkDir, i.toString());
      // 检查分片是否存在
      if (!fse.existsSync(chunkPath)) {
        return res.status(400).json({ code: -1, msg: `分片 ${i} 缺失` });
      }
      const readStream = fse.createReadStream(chunkPath);
      await new Promise((resolve, reject) => {
        readStream.pipe(writeStream, { end: false });
        readStream.on('end', resolve);
        readStream.on('error', reject);
      });
    }

    // 关闭写入流
    writeStream.end();

    // 合并完成删除分片目录
    await fse.remove(chunkDir);
    res.json({ code: 0, msg: '文件合并成功', url: `/merged/${fileHash}${ext}` });
  } catch (error) {
    console.error('mergefile 错误:', error);
    res.status(500).json({ code: -1, msg: '合并失败', error: error.message });
  }
});

// 静态资源访问合并后的文件
app.use('/merged', express.static(MERGE_DIR));

const PORT = 3000;
app.listen(PORT, () => {
  console.log(`后端服务启动成功:http://localhost:${PORT}`);
  console.log(`分片存储目录:${UPLOAD_DIR}`);
  console.log(`合并后文件目录:${MERGE_DIR}`);
});

3、启动后端:

node server.js

二、前端服务

1、.vue文件

<template>
  <div id="app" style="max-width: 800px;margin: 50px auto;">
    <h2>Vue2 大文件分片上传(断点续传)</h2>
    <input type="file" @change="handleFileChange">
    <button :disabled="!file || uploading" style="margin-left: 10px;" @click="handleUpload">
      {{ uploading ? '上传中...' : '开始上传' }}
    </button>

    <!-- 进度条(已修复动态样式) -->
    <div v-if="totalProgress > 0" style="margin-top: 20px;">
      <div>总进度:{{ totalProgress.toFixed(2) }}%</div>
      <div style="height:5px;background:#eee;border-radius:3px;">
        <div
          :style="{
            height: '100%',
            background: '#42b983',
            width: totalProgress + '%',
            transition: '0.3s'
          }"
        />
      </div>
    </div>

    <div style="margin-top: 20px;color: #333;">
      <p v-if="uploadedChunkList.length">已上传分片:{{ uploadedChunkList.join(',') }}</p>
      <p v-if="msg" :style="{color: msg.includes('成功') ? 'green' : 'red'}">{{ msg }}</p>
    </div>
  </div>
</template>

<script>
import SparkMD5 from 'spark-md5'
import axios from 'axios'

export default {
  name: 'App',
  data() {
    return {
      file: null,
      fileHash: '',
      CHUNK_SIZE: 2 * 1024 * 1024, // 2MB 分片
      chunkList: [],
      uploadedChunkList: [],
      uploading: false,
      totalProgress: 0,
      msg: '',
      MAX_CONCURRENT: 3 // ✅ 新增:最大并发数,控制同时上传的分片数量
    }
  },
  methods: {
    // 1. 选择文件
    async handleFileChange(e) {
      const file = e.target.files[0]
      if (!file) return
      this.file = file
      this.msg = '正在计算文件指纹...'
      this.fileHash = await this.getFileHash(file)
      this.msg = `文件:${file.name},hash:${this.fileHash.slice(0, 10)}...`
    },

    // 2. 计算文件 MD5(优化:大文件完整计算,避免 hash 冲突)
    getFileHash(file) {
      return new Promise((resolve, reject) => {
        const spark = new SparkMD5.ArrayBuffer()
        const fileReader = new FileReader()
        const chunkSize = 2 * 1024 * 1024
        let offset = 0

        const loadNext = () => {
          const slice = file.slice(offset, offset + chunkSize)
          fileReader.readAsArrayBuffer(slice)
        }

        fileReader.onload = (e) => {
          spark.append(e.target.result)
          offset += e.target.result.byteLength
          if (offset < file.size) {
            loadNext()
          } else {
            resolve(spark.end())
          }
        }

        fileReader.onerror = reject
        loadNext()
      })
    },

    // 3. 开始上传(主流程,修复并发+进度)
    async handleUpload() {
      if (!this.file) return alert('请选择文件')
      this.uploading = true
      this.totalProgress = 0
      this.msg = ''

      try {
        // 1)查询已上传分片
        const { data } = await axios.post('http://localhost:3000/checkfile', {
          fileHash: this.fileHash,
          fileName: this.file.name
        })

        // 秒传
        if (!data.shouldUpload) {
          this.msg = '✅ 秒传成功:文件已存在'
          this.uploading = false
          this.totalProgress = 100
          return
        }
        this.uploadedChunkList = data.uploadedChunks.map(String) // 统一转字符串,避免类型不匹配

        // 2)切分文件
        this.chunkList = this.createChunks(this.file)
        const total = this.chunkList.length
        let uploadedCount = this.uploadedChunkList.length // 已上传分片数
        this.totalProgress = (uploadedCount / total) * 100 // 初始化进度

        // 3)过滤出需要上传的分片
        const needUploadChunks = this.chunkList
          .map((chunk, index) => ({ chunk, index }))
          .filter(item => !this.uploadedChunkList.includes(item.index.toString()))

        // ✅ 4)并发控制上传(核心优化,避免后端 500)
        await this.concurrentUpload(needUploadChunks, total, (count) => {
          uploadedCount += count
          this.totalProgress = (uploadedCount / total) * 100
        })

        // 5)通知后端合并文件
        const mergeRes = await axios.post('http://localhost:3000/mergefile', {
          fileHash: this.fileHash,
          fileName: this.file.name,
          chunkCount: this.chunkList.length
        })

        if (mergeRes.data.code === 0) {
          this.msg = '✅ 上传 + 合并完成!'
          this.totalProgress = 100
        } else {
          this.msg = `❌ 合并失败:${mergeRes.data.msg || '未知错误'}`
        }
      } catch (error) {
        console.error('上传错误:', error)
        this.msg = `❌ 上传失败:${error.message || '未知错误'}`
      } finally {
        this.uploading = false
      }
    },

    // ✅ 并发控制上传方法
    async concurrentUpload(chunks, total, onProgress) {
      console.log('999999 分片数量', chunks)
      const results = []
      // 分批上传,每批 MAX_CONCURRENT 个
      for (let i = 0; i < chunks.length; i += this.MAX_CONCURRENT) {
        const batch = chunks.slice(i, i + this.MAX_CONCURRENT)
        const batchPromises = batch.map(async({ chunk, index }) => {
          const formData = new FormData()
          formData.append('chunk', chunk)
          formData.append('fileHash', this.fileHash)
          formData.append('chunkIndex', index)

          // 重试逻辑:失败自动重试 2 次
          for (let retry = 0; retry < 2; retry++) {
            try {
              await axios.post('http://localhost:3000/uploadchunk', formData, {
                headers: { 'Content-Type': 'multipart/form-data' }
              })
              return { success: true, index }
            } catch (e) {
              console.warn(`分片 ${index} 上传失败,重试 ${retry + 1}`)
              if (retry === 1) throw e
              await new Promise(resolve => setTimeout(resolve, 1000)) // 重试间隔 1s
            }
          }
        })

        const batchResults = await Promise.allSettled(batchPromises)
        results.push(...batchResults)
        // 更新进度:每批完成后计算
        const successCount = batchResults.filter(r => r.status === 'fulfilled' && r.value.success).length
        onProgress(successCount)
      }

      // 检查是否有失败的分片
      const failed = results.filter(r => r.status === 'rejected' || !r.value?.success)
      if (failed.length > 0) {
        throw new Error(`有 ${failed.length} 个分片上传失败`)
      }
    },

    // 切分文件
    createChunks(file) {
      const chunks = []
      let start = 0
      while (start < file.size) {
        const end = Math.min(start + this.CHUNK_SIZE, file.size)
        chunks.push(file.slice(start, end))
        start = end
      }
      return chunks
    }
  }
}
</script>

2、安装依赖

npm install axios spark-md5 --save

3、启动服务

npm run serve

总结:

  1. 查询列表(文件是否已上传/已传切片index)-文件hash唯一性(spark-md5获取)
  2. 开始上传,过滤已上传的切片数
  3. 剩余切片分批次并行上传(第一次失败,自动重试 1 次,第二次失败,抛错)
  4. 全部切片上传成功后,调用合并接口(通知后端可以合并切片数了)

秒传:文件完整存在 → 直接跳过上传

断点续传:只传缺失分片 → 断网 / 刷新可恢复(hash查询,再过滤)

并发控制:分批次,防止同时发大量请求导致崩溃

备注:按以上步骤,直接可以实践操作

从网关的角度理解并实现一个 Mini OpenClaw

1. 前言

OpenClaw 与其他 AI Agent 最本质的区别是什么?首先,OpenClaw 本身也是一个 AI Agent,但关键在于它能连接多种 IM 渠道,并利用这些 IM 工具提供的开发能力来调用自身的 Agent——这种能力被称为“网关”。因此,有后端的技术大咖将 OpenClaw 总结为:OpenClaw = 高权限 AI Agent + 网关

所以只有理解了 OpenClaw 的本质之后,我们才可以实现一个 Mini OpenClaw。

首先我们要实现一个网关,那么网关是什么呢?

网关对于后端的同学来说,肯定不陌生。在 Spring Boot 微服务架构中,API 网关已成为标准的基础设施组件,其核心作用与 OpenClaw 中的“网关”如出一辙:对外隐藏后端的实现细节(服务地址、版本、熔断等),对内统一通信协议,并提供横切能力(如鉴权、限流、日志等) 。两者的区别仅在于作用对象不同——OpenClaw 的网关面向 IM 渠道(消息协议适配),而后端网关面向 HTTP/RPC 调用(协议转换与流量管理)。

所以 OpenClaw 的所谓网关就是一个消息协议适配器。

所以我们先要实现网关最核心的功能:协议适配。这是网关最本质的能力——对外讲 IM 的方言,对内统一说普通话。

2. 网关核心功能:协议适配

不同 IM(飞书、微信 等)的消息格式千差万别:有的用 user_id,有的用 from 字段,有的消息正文可能嵌套在 text 或 message 对象中。我们可以通过设计一个消息协议将这些差异全部“抹平”,这样本地 AI Agent 就只依赖这标准消息协议,无需关心消息来自哪个渠道。

设计一个入站的消息对象 InboundMessage:

# events.py
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class InboundMessage:
    """从聊天频道接收到的消息"""
    channel: str  # 用于区分来源,后续发送回复时需要知道应该调用哪个 IM 的 API(feishu、wechat)
    sender_id: str  # 用户标识符
    chat_id: str  # 聊天/频道标识符
    content: str  # 消息文本
    timestamp: datetime = field(default_factory=datetime.now)  # 消息时间

这样新增一个 IM 渠道时,只需要写一个适配器将私有消息转换成 InboundMessage 即可,其余代码零改动。

简而言之:设计 InboundMessage 就是为了让网关“对外讲方言,对内讲普通话”,所有渠道的消息到达网关后立刻被标准化,Agent 只需处理这一种标准格式。

同样地不同 IM 的发送接口千差万别:飞书需要 receive_id,微信需要 touser,Telegram 需要 chat_id。通过设计一个 OutboundMessage 消息对象,这样 Agent 只需要产出 channelchat_idcontent 三个核心字段,网关再根据 channel 值调用对应的 IM 适配器,由适配器负责转换成目标 IM 的私有请求格式即可。

OutboundMessage 消息对象的字段设计如下:

# events.py
@dataclass
class OutboundMessage:
    """要发送到聊天频道的消息"""
    
    channel: str
    chat_id: str
    content: str
    reply_to: str | None = None # 支持引用回复,用于指明当前回复的是哪一条历史消息

网关的输入是 InboundMessage,输出是 OutboundMessage,这样本地 AI Agent 核心只处理这两种标准格式信息,完全不依赖任何 IM 私有 API。这使得添加新 IM 渠道变得非常简单:只需要写一个适配器,将 InboundMessage 解析出来,并将 OutboundMessage 转换成该 IM 的发送请求即可。因为本地 AI Agent 完全不知道自己在和谁在交互,它只看到 InboundMessage/OutboundMessage,这正是网关隐藏后端实现细节的精髓,也是网关本质的体现

3. 网关内部路由:统一通信总线

根据前面的设计,我们已经将各个 IM 渠道的消息统一成了 InboundMessage,并将 Agent 的回复统一成了 OutboundMessage。但仅仅统一格式还不够,还需要解决一个核心问题:多个渠道的消息并发涌入,而 Agent 的处理可能是同步/半异步的,如何让它们有序、可靠、不互相阻塞?

这就需要一个统一通信总线——本质上是一个轻量级的内部消息路由。而最经典、最可靠的实现方式就是双队列解耦

入站异步队列: 渠道 → Agent
出站异步队列: Agent → 渠道

通过双队列把网关内部的“消息流动”标准化为两个 FIFO 管道:

  • 入站异步队列:所有 IM 渠道的消息汇聚点,Agent 从这头取“原材料”。
  • 出站异步队列:所有回复的汇聚点,分发器从这头取“成品”并发送。

为什么需要这样设计?

每个 IM 渠道(飞书、微信等)都有自己的 Webhook 或长连接,当瞬间收到大量消息(例如群聊刷屏)时,如果直接在回调中同步调用 Agent,Agent 处理耗时较长,会导致 Webhook 超时、连接堆积,甚至被 IM 服务器屏蔽。

我们让每个渠道适配器只做最轻量的事情,每当接收到消息时,就只需要解析消息、封装成上述设计的 InboundMessage,然后立即推送到入站异步队列中,马上返回返回即可。而 Agent 的处理则由一个独立的后台协程从入站异步队列中拉取,这样生产者和消费者的速度完全解耦。即使 Agent 处理得慢,队列也能起到“缓冲”作用,不会丢消息。

同时 Agent 只产出上述设计的 OutboundMessage 的数据并推送到出站异步队列中。另一个独立的分发器协程从出站异步队列中取出消息,找到对应的渠道适配器,调用该适配器的发送方法进行发送消息。这样一来,Agent 完全不需要知道消息要发往哪里、怎么发,路由逻辑全封装在网关内部。

统一通信总线代码实现如下:

# message_bus.py
"""用于解耦频道与智能体通信的异步消息队列"""
import asyncio
from loguru import logger
from events import InboundMessage, OutboundMessage

class MessageBus:
    """
    异步消息总线,用于将聊天频道与智能体核心解耦。
    频道将消息推送到入站队列,智能体处理它们并将响应推送到出站队列。
    """
    def __init__(self):
        # 入站异步队列
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        # 出站异步队列
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
    
    async def publish_inbound(self, msg: InboundMessage) -> None:
        """将来自频道的消息发布给智能体"""
        await self.inbound.put(msg)
    
    async def consume_inbound(self) -> InboundMessage:
        """消费下一条入站消息(阻塞直到有消息可用)"""
        return await self.inbound.get()
    
    async def publish_outbound(self, msg: OutboundMessage) -> None:
        """将智能体的响应发布给频道"""
        await self.outbound.put(msg)
    
    async def consume_outbound(self) -> OutboundMessage:
        """消费下一条出站消息(阻塞直到有消息可用)"""
        return await self.outbound.get()

同时入站异步队列和出站异步队列通过 asyncio.Queue 提供。asyncio.Queue 是异步编程中实现生产者-消费者模式的标准工具,它让不同协程之间可以安全、非阻塞地交换数据。在我们上述网关的设计中,正是依赖它实现了入站/出站双队列解耦,从而让多个 IM 渠道可以并发接收消息,同时 Agent 通过并发处理消息,实现效率提高。没有它,你就得自己用锁和条件变量实现类似功能,既复杂又容易出错。

接着我们修改上一篇文章《如何使用飞书机器人连接本地 AI Agent》中实现的飞书连接本地 AI Agent 的飞书频道,实现将来自飞书的消息转发到通信总线。

# feishu.py
+ from events import InboundMessage
+ from message_bus import MessageBus

class FeishuChannel:
    """极简版飞书 WebSocket 长连接机器人"""
+    name = "feishu"
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        self.config = config
        self.bus = bus
        # 省略...

    async def start(self) -> None:
        # 省略...
-    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
+    async def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""
        msg = data.event.message
+        sender = data.event.sender
        # 只处理用户发送的纯文本消息
        if data.event.sender.sender_type == "bot" or msg.message_type != "text":
            return

        content = json.loads(msg.content).get("text", "")
        if not content:
            return
        
+        # 提取发送者信息
+        sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
+        # 获取用于回复的 chat_id
+        chat_id = msg.chat_id
+        chat_type = msg.chat_type  # "p2p" 或 "group"
+        reply_to = chat_id if chat_type == "group" else sender_id
+        # 将消息转发到总线
+        await self._handle_message(
+            sender_id=sender_id,
+            chat_id=reply_to,
+            content=content,
+        )
-        # 启动独立线程处理 AI 逻辑并回复,防止阻塞 WebSocket 接收循环
-        # threading.Thread(
-        #     target=self._process_and_reply, 
-        #     args=(msg.chat_id, content)
-        # ).start()

+    async def _handle_message(
+        self,
+        sender_id: str,
+        chat_id: str,
+        content: str,
+    ) -> None:
+        """
+        处理来自聊天平台的传入消息。
+        此方法将消息转发到总线。
        
+        参数:
+            sender_id: 发送者的标识符。
+            chat_id: 聊天/通道的标识符。
+            content: 消息文本内容。
+        """
        
+        msg = InboundMessage(
+            channel=self.name,
+            sender_id=str(sender_id),
+            chat_id=str(chat_id),
+            content=content
+        )
        
+        await self.bus.publish_inbound(msg)

现在我们已经将飞书发过来的消息推送到通信总线中了,接着我们需要在 Agent 异步处理协程中循环读取总线中的消息进行处理了。

4. 实现并发 Agent Loop

我们上文讲到了通过 asyncio.Queue 实现了入站/出站双队列解耦,从而让多个 IM 渠道可以并发接收消息,同时 Agent 通过并发处理消息,实现效率提高。

但我们前面实现的 Agent Loop 的同步处理数据,所以我们需要重新设计并实现我们的 Agent Loop。

首先我们这个 Agent Loop 需要具备以下功能点:

  1. 持续运行:只要网关没有关闭,Agent Loop 就要一直工作,不能退出。
  2. 响应及时:当有新消息到达时,应尽快开始处理,避免不必要的延迟。
  3. 可优雅停止:外部可以调用 stop() 方法,让循环在安全时机退出,而不是强制杀死协程。
  4. 容错性:单条消息处理失败不应导致整个循环崩溃,并且要能告知用户出错。

那么第一个功能点持续运行,我们可以通过使用一个布尔标志控制循环是否继续。

self._running = True
while self._running:
    # 只要 self._running = True 就一直循环读取通讯总线中的消息进行处理

这样只要 self._running = True 就一直循环读取通讯总线中的消息进行处理。同时我们设计一个 stop() 方法设置 self._running = False,这样外部协程就可以调用 stop() 使得循环将在下一次条件判断时退出。

在读取通讯总线中的消息时,我们需要通过 asyncio.wait_for 实现可中断阻塞读取。即如下实现:

self._running = True
while self._running:
    # 只要 self._running = True 就一直循环读取通讯总线中的消息进行处理
    msg = await asyncio.wait_for(
        self.bus.consume_inbound(), # 本质是 await inbound_queue.get()
        timeout=1.0,
    )

如果不使用 asyncio.wait_for 而是直接使用 await self.bus.consume_inbound() 的话,没有消息就一直等着,那么循环永远不会走到 while self._running 的条件判断。此时调用 stop() 设置 self._running = False 是无效的,因为协程卡在 get() 上,永远没有机会检查 self._running 标志。

而使用 asyncio.wait_for 并设置超时为 1 秒,也就是如果 1 秒内返回了消息,就正常得到 msg。如果 1 秒后队列仍为空,wait_for 会抛出 asyncio.TimeoutError。这样,协程最多阻塞 1 秒就会醒来一次,重新检查 while self._running。因此,即使没有消息,循环也能每秒检查一次退出标志,实现可中断的阻塞读取

根据上述设计我们初步实现 Agent Loop 如下:

import asyncio
import json
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from openai import AsyncOpenAI

from events import InboundMessage, OutboundMessage
from message_bus import MessageBus

load_dotenv()

class AgentLoop:
    def __init__(
        self,
        bus: MessageBus,
        max_iterations: int = 200,
        api_key: str | None = None,
        base_url: str = "https://api.deepseek.com",
        model: str = "deepseek-chat",
    ):
        self.bus = bus
        # 最大工具调用轮次,防止死循环
        self.max_iterations = max_iterations
        self.model = model
        self._running = False
        # 初始化 OpenAI异步客户端 兼容客户端(如 DeepSeek)
        self.client = AsyncOpenAI(
            api_key=api_key or os.getenv("DEEPSEEK_API_KEY"),
            base_url=base_url,
        )

    # ------------------------------------------------------------------
    # 主循环:持续消费 入站异步队列
    # ------------------------------------------------------------------

    async def run(self) -> None:
        """运行智能体循环,处理来自总线的消息。"""
        self._running = True
        logger.info("Agent loop started")

        while self._running:
            try:
                # 从入站队列消费下一条消息,设置超时以便能定期检查 _running 标志
                msg = await asyncio.wait_for(
                    self.bus.consume_inbound(),
                    timeout=1.0,
                )
                try:
                    # 处理消息并获取响应
                    response = await self._process_message(msg)
                    if response:
                        # 将响应发布到出站队列
                        await self.bus.publish_outbound(response)
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    await self.bus.publish_outbound(
                        OutboundMessage(
                            channel=msg.channel,
                            chat_id=msg.chat_id,
                            content=f"抱歉,处理消息时出错:{e}",
                        )
                    )
            except asyncio.TimeoutError:
                continue

    def stop(self) -> None:
        """停止智能体循环。"""
        self._running = False
        logger.info("Agent loop stopping")

上述的 run 方法需要在一开始就启动,这样才可以实现一有消息就马上处理,而不会漏消息。我们把上一篇讲解实现飞书接入本地 AI Agent 的启动文件 test_feishu.py 重命名为 gateway.py,也就是网关的意思,并且修改其中的启动代码:

+ from message_bus import MessageBus
+ from loop import AgentLoop
async def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxx",         # 替换为真实的 App ID
        app_secret="xxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
+    deepseek_key = os.getenv("DEEPSEEK_API_KEY", "")
+    bus = MessageBus()
+    agent = AgentLoop(
+        bus=bus,
+        api_key=deepseek_key,
+        base_url="https://api.deepseek.com",
+        model="deepseek-chat",
+        max_iterations=20,
+    )
    
    # 2. 初始化频道并启动长连接
-    channel = FeishuChannel(config=config)
+channel = FeishuChannel(config=config, bus=bus)
    
    logger.info("正在启动飞书机器人长连接...")
    
-    # 3. 启动并保持运行
+    # 3. 并发运行
    try:
-        await channel.start()
+        await asyncio.gather(
+            agent.run(),          # 持续消费 inbound 队列,调用 LLM
+            channel.start(),      # 飞书启动
+        )
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

通过上述修改我们就实现了 Agent 和飞书频道在初始化的时候并发运行,从而实现了一开始就监听入站异步队列的消息。

上述 Agent Loop 的 self._process_message 方法是还没实现的,所以我们继续实现 Agent 对消息的处理。本质就是实现大模型的工具调用循环。

在实现 Agent 对消息的处理之前,我们先要重新设计一下会话历史。

5. 会话历史设计

在前面的文章中我们的会话历史就是一个数组,结构如下:

history = [
    {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个助手")},
    {"role": "user", "content": content}
]

后续如果继续有消息就根据角色往数组 history 中追加用户消息和助手消息即可。

但在 OpenClaw 中需要保证不同渠道、不同群、不同用户的历史会话完全隔离。我们可以使用 dict[str, list[dict]] 作为存储结构,相当于在 JavaScript 中设置一个对象,然后通过 key 作为唯一标识进行会话隔离。

key 设计:

这个 key 我们可以设置由 channel + chat_id 组合而成,例如 "feishu:oc_xxx"。然后我们在之前设计的 InboundMessage 对象中设置一个 session_key 方法用于返回会话唯一标识。设置如下:

@dataclass
class InboundMessage:
    # 省略...
    
+    @property
+    def session_key(self) -> str:
+        """用于会话标识的唯一键"""
+        return f"{self.channel}:{self.chat_id}"

value 设计:

value 其实就是上述的历史会话数组,即:

[
    {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个助手")},
    {"role": "user", "content": content}
]

同时我们设计一个 _get_history 的函数来实现对会话历史的懒加载,如果 session_key 不存在,自动创建新列表并插入 system prompt,如果 session_key 存在则返回内部列表的直接引用,调用方可以修改它,即追加消息。这样设计可以避免拷贝带来的性能开销。

实现如下:

# ---------- 会话历史管理(按 session_key 隔离) ----------
# 全局字典:存储所有会话的对话历史
# - Key: session_key,用于唯一标识一个会话(例如 "feishu:chat_id")
# - Value: 消息列表,每个元素是 OpenAI API 兼容的消息字典(包含 role, content 等字段)
_sessions: dict[str, list[dict]] = {}

# 系统提示词:定义 AI 助手的角色、能力和行为准则
SYSTEM_PROMPT = (
    "你是一个智能助手,可以通过工具帮助用户完成任务。"
    "请简洁、准确地回答用户问题。"
)
# 获取会话历史
def _get_history(session_key: str) -> list[dict]:
    # 若为新会话,自动初始化一条包含 system prompt 的消息
    if session_key not in _sessions:
        _sessions[session_key] = [{"role": "system", "content": SYSTEM_PROMPT}]
    # 返回该会话的历史列表(引用,允许外部修改)
    return _sessions[session_key]

6. Agent Loop 的核心:消息处理

在完成了会话历史管理和主循环的可中断阻塞读取之后,Agent Loop 最核心的部分就是 单条消息的处理逻辑——即 _process_message 方法。该方法实现了 ReAct(推理+行动)模式:调用 LLM → 若需要工具则执行工具 → 将结果返回 LLM → 重复直到得到最终答案。下面详细解析其实现:

class AgentLoop:
    # 省略...

    # ------------------------------------------------------------------
    # 单条消息处理:tool-call 循环
    # ------------------------------------------------------------------
    async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
        # 1. 获取当前会话的历史,并追加用户消息
        messages = _get_history(msg.session_key)
        messages.append({"role": "user", "content": msg.content})

        final_content: str | None = None
        # 2. 进入工具调用循环(最多 max_iterations 次)
        for iteration in range(self.max_iterations):
            # 3. 调用 LLM(异步非阻塞)
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages, 
                tools=TOOLS,
                tool_choice="auto",
            )
            assistant_msg = response.choices[0].message

            # 将助手消息追加到历史
            messages.append(assistant_msg)

            # 4. 如果没有 tool_calls,说明任务完成
            if not assistant_msg.tool_calls:
                final_content = assistant_msg.content or ""
                break

            # 5. 执行所有工具调用,并将结果以 role=tool 追加到历史记录
            for tool_call in assistant_msg.tool_calls:
                name = tool_call.function.name
                args = json.loads(tool_call.function.arguments)
                logger.debug(f"Executing tool: {name}, args: {args}")

                result = _execute_tool(name, args)
                logger.debug(f"Tool result: {result[:100]}")

                messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "name": name,
                        "content": result,
                    }
                )
        else:
            # 达到最大迭代次数
            final_content = "已达到最大处理轮次,无法给出最终答案。"

        if final_content is None:
            final_content = "处理完成,但没有内容返回。"
        # 6. 构造出站消息返回给用户
        return OutboundMessage(
            channel=msg.channel,
            chat_id=msg.chat_id,
            content=final_content,
        )

上述代码的实现跟我们前面文章实现 Agent Loop 是一样的,所以大家还有不懂的话,可以回看前面文章的详细解析。最最重要的就是最后返回了构造了 OutboundMessage 格式的出站消息,然后在 run 方法中通过 self.bus.publish_outbound(response) 将消息发布到出站队列。

其中工具定义实现如下:

# ---------- 内置工具定义 ----------
TOOLS: list[dict] = [
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "读取本地文本文件内容。",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "文件路径"},
                    "encoding": {
                        "type": "string",
                        "enum": ["utf-8", "gbk"],
                        "description": "文件编码,默认 utf-8",
                    },
                },
                "required": ["path"],
            },
        },
    }
]

def _execute_tool(name: str, arguments: dict) -> str:
    """同步执行内置工具,返回字符串结果。"""
    if name == "read_file":
        from pathlib import Path

        path = arguments.get("path", "")
        encoding = arguments.get("encoding", "utf-8")
        try:
            p = Path(path).expanduser()
            if not p.exists():
                return f"❌ 文件不存在: {path}"
            return p.read_text(encoding=encoding)
        except Exception as e:
            return f"❌ 读取失败: {e}"
    return f"❌ 未知工具: {name}"

我们这里先只实现一个读取文件内容的工具,后续再实现更多的工具。

7. 构建网关的渠道层

7.1 为什么需要渠道层?

在上一小节中,我们实现在 Agent 中构造了 OutboundMessage 格式的出站消息,然后将消息发布到出站队列中。但还缺少关键的一环:出站异步队列中的消息由谁来消费?如何将 Agent 的回复正确地发送回原来的聊天频道?

我们知道每个即时通讯平台都有自己独特的 API 协议,如果让 Agent 直接处理这些差异,会导致 Agent 逻辑中混杂大量渠道特定代码,每增加一个渠道就要修改 Agent 核心逻辑,这会造成维护噩耗。

所以我们需要构建一个 渠道管理器(ChannelManager),作为网关的出站交通枢纽,负责管理所有 IM 适配器的生命周期,并将出站消息路由到正确的渠道。具体需要实现以下功能:

  1. 注册与管理渠道实例

    • 运行时动态注册各个渠道
    • 维护渠道状态信息
    • 提供统一的渠道访问接口
  2. 协调启动与停止流程

    • 控制渠道启动顺序,避免竞态条件
    • 实现优雅停止,防止消息丢失
    • 处理异常情况下的资源清理
  3. 消息路由与派发

    • 根据消息的 channel 字段路由到正确渠道
    • 调用渠道的发送方法
    • 实现错误隔离和重试机制

7.2 渠道层的设计与实现

如果把整个网关系统比作一个繁忙的交通枢纽,那么渠道层就是站在十字路口中央的交警。它不亲自运送货物,但指挥着所有运输车辆有序通行。

具体来说,渠道层连接着:

  • 上游:内部消息总线(MessageBus),接收标准化的出站消息
  • 下游:各个 IM 渠道适配器(FeishuChannel、WechatChannel 等)

我们先实现一个 ChannelManager 类,并实现数据结构与初始化。代码如下:

import asyncio
from loguru import logger
from message_bus import MessageBus
from feishu import FeishuChannel


class ChannelManager:
    def __init__(self, bus: MessageBus):
        self.bus = bus
        # 存储已注册的渠道适配器,key 为渠道名称(如 "feishu")
        self.channels: dict[str, FeishuChannel] = {}
        # 出站分发器的任务句柄,用于优雅停止
        self._dispatch_task: asyncio.Task | None = None

ChannelManager 的核心数据结构 channels 是一个字典: channel_name → 适配器实例

  • Key = 渠道名称(如 "feishu"、"wechat")
  • Value = 渠道实例对象

这个设计实现了运行时动态注册,可以在不重启服务的情况下添加新渠道。

接着我们来实现注册渠道功能:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        """注册一个渠道适配器。要求该适配器必须有 name 属性和 send 方法。"""
        self.channels[channel.name] = channel
        logger.info(f"Channel registered: {channel.name}")

上述注册渠道的代码实现看起很简单,其实背后的设计原理一点也不简单。它应用了工厂模式 + 依赖注入的设计模式。

  1. 工厂模式体现在:渠道的创建由外部完成,ChannelManager 只负责使用
  2. 依赖注入体现在:渠道实例通过 register() 方法注入,而非在 ChannelManager 内部创建

我们已经实现了一个飞书渠道 FeishuChannel,所以现在需要通过以下方式进行注册飞书渠道:

manager.register(FeishuChannel(...))

同时将来如果我们想新增一个微信渠道,就可以这样实现了,先实现一个 WechatChannel,然后:

manager.register(WechatChannel(...))

这样网关核心代码零改动,真正实现了"开闭原则":对扩展开放,对修改关闭。

接着实现启动所有已注册的频道以及出站分发器。

代码实现如下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        # 省略...
    async def start_all(self) -> None:
        """启动所有已注册的频道以及出站分发器。"""
        if not self.channels:
            logger.warning("No channels registered")
            return

        # 先启动出站分发器协程(确保一有出站消息就能被处理)
        self._dispatch_task = asyncio.create_task(self._dispatch_outbound())

        # 并发启动所有渠道(每个渠道的 start 方法负责建立长连接或监听 Webhook)
        tasks = []
        for name, channel in self.channels.items():
            logger.info(f"Starting {name} channel...")
            tasks.append(asyncio.create_task(channel.start()))

        # 注意:通常渠道的 start 会永久阻塞(如 WebSocket 循环),因此 gather 不会返回
        await asyncio.gather(*tasks, return_exceptions=True)

我们上述的代码实现了一个看似简单却至关重要的设计决策,就是先启动分发器再启动渠道。那么为什么先启动分发器再启动渠道呢?

主要是为了防止消息丢失与响应延迟。让我们分析两种启动顺序的后果:

场景 A:先启动渠道,后启动分发器 时间线:

  1. 飞书渠道启动成功 ✓
  2. 用户立即发送消息:"你好"
  3. Agent 快速处理,生成回复:"你好!我是AI助手"
  4. 回复进入出站队列...
  5. 但是!分发器还没启动 ❌
  6. 回复消息在队列中堆积
  7. 用户等待...等待...(用户体验差)

场景 B:先启动分发器,后启动渠道(我们采用的方式) 时间线:

  1. 分发器启动,开始监听出站队列 ✓
  2. 飞书渠道启动成功 ✓
  3. 用户发送消息:"你好"
  4. Agent 处理,生成回复:"你好!我是AI助手"
  5. 回复进入出站队列
  6. 分发器立即发现新消息 ✓
  7. 路由到飞书渠道,立即发送 ✓
  8. 用户秒级收到回复(体验流畅)

在实际的生产环境经验中,"空转等待"比"忙中丢消息"要好得多。分发器提前就位,就像快递员提前在仓库门口等待,包裹一出来就能立即配送。

接着我们实现出站消息分发器

代码实现如下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        # 省略...
    async def start_all(self) -> None:
        # 省略...
    async def _dispatch_outbound(self) -> None:
        """
        出站分发器:持续消费 outbound 队列,将消息发送到对应的渠道。
        这是一个后台协程,在 start_all 时启动。
        """
        logger.info("Outbound dispatcher started")

        while True:
            try:
                # 可中断阻塞读取,每隔1秒检查一次取消信号
                msg = await asyncio.wait_for(
                    self.bus.consume_outbound(),
                    timeout=1.0,
                )
                # 根据消息中的 channel 字段找到对应的适配器
                channel = self.channels.get(msg.channel)
                if channel:
                    try:
                        # 调用适配器的 send 方法(各渠道自己实现转换和发送逻辑)
                        await channel.send(msg)
                    except Exception as e:
                        logger.error(f"Error sending to {msg.channel}: {e}")
                else:
                    logger.warning(f"Unknown channel: {msg.channel}")

            except asyncio.TimeoutError:
                # 超时不是错误,只是没有消息,继续循环
                continue
            except asyncio.CancelledError:
                break

我们上一小节中所说的先启动分发器,本质就是通过 while True 不断循环使用 asyncio.wait_for 消费 outbound 队列,然后根据 msg.channel 路由并调用 send 方法。

设计亮点:

  1. 拉模式(Pull)而非推模式(Push)

    • 主动从消息队列拉取消息,控制权在自己手中
    • 相比回调式的推模式,更容易控制消费速率和错误处理
  2. 可中断的事件循环

    • timeout=1.0 让循环能定期"抬头看路",检查是否有停止信号
    • 没有这个超时,任务会一直阻塞在 consume_outbound() 上,难以优雅停止

接着我们继续实现渠道的发送方法,这是协议翻译的最后一步。

为了让 ChannelManager 能够统一管理,每个 IM 适配器必须实现以下两个成员:

  1. name: str:渠道唯一标识(如 "feishu")。
  2. async send(msg: OutboundMessage) -> None:发送回复的方法。

以飞书适配器为例,我们之前已经定义了 name = "feishu",现在补充 send 方法的实现:

class FeishuChannel:
    # 省略...
    async def send(self, msg: OutboundMessage) -> None:
        """通过飞书发送消息。"""
        if not self._client:
            logger.warning("飞书客户端未初始化")
            return

        try:
            # 根据 chat_id 格式确定 receive_id_type
            # open_id 以 "ou_" 开头,chat_id 以 "oc_" 开头
            if msg.chat_id.startswith("oc_"):
                receive_id_type = "chat_id"
            else:
                receive_id_type = "open_id"

            # 构建文本消息内容
            content = json.dumps({"text": msg.content})

            request = CreateMessageRequest.builder() \
                .receive_id_type(receive_id_type) \
                .request_body(
                    CreateMessageRequestBody.builder()
                    .receive_id(msg.chat_id)
                    .msg_type("text")
                    .content(content)
                    .build()
                ).build()

            # OpenAPI 调用是同步的,在线程中运行以避免阻塞
            response = await asyncio.to_thread(
                self._client.im.v1.message.create, request
            )

            if not response.success():
                logger.error(
                    f"发送飞书消息失败:code={response.code}, "
                    f"msg={response.msg}, log_id={response.get_log_id()}"
                )
            else:
                logger.debug(f"飞书消息已发送至 {msg.chat_id}")

        except Exception as e:
            logger.error(f"发送飞书消息时出错:{e}")

本质是就是将我们上一篇文章中的 FeishuChannel 类中 _process_and_reply 方法改成 send 方法即可。这样,ChannelManager 就可以统一调用 await channel.send(msg),完全不需要关心飞书 API 的具体细节。

8. 集成到网关启动入口

现在,我们将 MessageBus、AgentLoop、FeishuChannel 和 ChannelManager 全部串联起来。实现如下:

# gateway.py
import os
from loguru import logger
from feishu import FeishuChannel, FeishuConfig
from message_bus import MessageBus
from loop import AgentLoop
from manager import ChannelManager

async def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxx",         # 替换为真实的 App ID
        app_secret="xxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
    deepseek_key = os.getenv("DEEPSEEK_API_KEY", "")
    # 2. 创建总线
    bus = MessageBus()
    # 3. 创建 Agent 循环
    agent = AgentLoop(
        bus=bus,
        api_key=deepseek_key,
        base_url="https://api.deepseek.com",
        model="deepseek-chat",
        max_iterations=20,
    )
    
    # 4. 创建飞书渠道(传入总线,以便它 publish_inbound)
    feishu_channel = FeishuChannel(config=config, bus=bus)
    # 5. 创建渠道管理器,并注册飞书渠道
    channels = ChannelManager(bus=bus)
    channels.register(feishu_channel)
    
    logger.info("正在启动 Mini OpenClaw 网关...")
    
    # 6. 并发运行
    try:
        await asyncio.gather(
            agent.run(),          # 持续消费 inbound 队列,调用 LLM
            channels.start_all(), # 飞书长连接 + 出向派发器
        )
    except KeyboardInterrupt:
        pass
    finally:
        logger.info("收到退出信号,正在关闭...")
        agent.stop()
        await channels.stop_all()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

至此整个网关的运行流程如下:

1. 网关“通电”

  • 我们启动 manager.start_all(),它立刻做了两件事:
    • 先派一个“快递员”(_dispatch_outbound 后台任务)守在 发件箱(outbound 队列) 旁边,随时准备把回复送出去。
    • 然后接通 飞书这个“电话线”feishu_channel.start()),开始等待用户发消息。

2. 用户发来消息

  • 用户在飞书群里说了一句“帮我读一下 /tmp/note.txt”。
  • 飞书适配器收到这条“方言消息”,立即翻译成网关内部的 普通话(InboundMessage),然后丢进 收件箱(inbound 队列)

3. Agent 大脑开始思考

  • agent.run() 一直在盯着 收件箱,一看到有新消息就取出来。
  • 它调用大模型并可能执行工具(比如读取文件),最终生成一段回复文本。
  • 然后把回复包装成 标准包裹(OutboundMessage),扔进 发件箱(outbound 队列)

4. 快递员送货

  • 守在 发件箱 旁边的快递员(_dispatch_outbound)发现新包裹,看看上面写的“收件渠道”是 feishu
  • 他马上找到飞书适配器,把包裹交给它:“请发到这个 chat_id 的群里”。
  • 飞书适配器又把回复从 普通话 翻译回 飞书的方言,调用飞书 API 发回群里。

5. 用户看到回复

  • 用户收到助手返回的文件内容,整个流程结束。

我们上述的 channels.start_all() 方法是还没实现的,我们实现一下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    async def start_all(self) -> None:
        # 省略...
    async def stop_all(self) -> None:
        """优雅停止所有渠道和出站分发器。"""
        logger.info("Stopping all channels...")

        # 第一阶段:取消出站分发器任务
        if self._dispatch_task:
            self._dispatch_task.cancel()
            try:
                await self._dispatch_task
            except asyncio.CancelledError:
                pass

        # 第二阶段:逐个停止渠道(每个渠道的 stop 方法应关闭连接、释放资源)
        for name, channel in self.channels.items():
            try:
                await channel.stop()
                logger.info(f"Stopped {name} channel")
            except Exception as e:
                logger.error(f"Error stopping {name}: {e}")

实现也很简单,首先停止出站分发器的任务,再逐个停止渠道的连接,释放资源。

接着我们启动网关:

python gateway.py

启动结果如下:

01.png

然后我们接着在上一篇文章中设置了的飞书机器人中进行发消息。

然后我们发现报错了:

image.png

报错原因是因为飞书 SDK 的 register_p2_im_message_receive_v1 要求注册一个同步回调函数(不能是 async def),但消息处理逻辑(如解析内容、发布到 MessageBus)是异步的。因此,我们需要实现一个跨线程调度适配器,用于将飞书 WebSocket 线程中的同步回调安全地桥接到 asyncio 主事件循环。

9. 跨线程调度适配器

首先我们需要保存主事件循环对象,我们是在网关启动文件 gateway.py 中通过 asyncio.run(main()) 启动的主循环。因为飞书 WebSocket 客户端运行在一个独立的后台线程中(见 threading.Thread(target=run_ws, daemon=True).start()),它的回调需要一个同步函数,但真正的消息处理逻辑 _on_message 是一个异步协程,需要被提交到主事件循环中执行,因为 MessageBus 等组件是绑定到主循环的。为了从另一个线程安全地将协程投递到主事件循环,就需要持有主事件循环的引用

先保存主事件循环对象:

class FeishuChannel:
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        self.config = config
        self.bus = bus
+        self._loop = None
        self._client = lark.Client.builder() \
            .app_id(config.app_id) \
            .app_secret(config.app_secret) \
            .build()

    async def start(self) -> None:
        # 省略...
+        # 保存主事件循环对象
+        self._loop = asyncio.get_running_loop()
        def run_ws():
            # 省略...

接着我们创建了一个同步函数 _on_message_sync 作为 register_p2_im_message_receive_v1 的实际回调,然后在 _on_message_sync 中将真正异步的处理函数 _on_message 调度到主事件循环中执行。实现如下:

def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None:
    try:
        if self._loop and self._loop.is_running():
            # 将异步处理函数调度到主事件循环
            asyncio.run_coroutine_threadsafe(
                self._on_message(data),
                self._loop
            )
        else:
            # 备用方案:在新事件循环中运行
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(self._on_message(data))
            finally:
                loop.close()
    except Exception as e: logger.error(f"处理飞书消息时出错:{e}")

接着我们修改 register_p2_im_message_receive_v1 的实际回调函数为上述我们实现的 _on_message_sync

class FeishuChannel:
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        # 省略...
    async def start(self) -> None:
        # 省略...
        # 注册接收消息事件处理函数 im.message.receive_v1
-        handler = builder.register_p2_im_message_receive_v1(self._on_message).build()
+        handler = builder.register_p2_im_message_receive_v1(self._on_message_sync).build()
        # 保存主事件循环对象
        self._loop = asyncio.get_running_loop()

总的来说就是在主事件循环中“记住”主循环对象,供后续其他线程通过 asyncio.run_coroutine_threadsafe 将协程调度回主循环执行,是实现跨线程异步任务调度

同时当主事件循环不存在时创建一个全新的临时事件循环,在当前线程(WebSocket 线程)中同步运行 self._on_message(data),执行完毕后关闭循环。

经过上述迭代后,我们再次启动我们的程序:python gateway.py

然我们再在飞书设置的 AI 机器人上跟我们的 Mini OpenClaw 进行对话,结果如下:

1cbfafacd6d84ef03bd64151f081c17a.jpg

然后我们再根目录下创建一个 test.txt 文件,内容为:“从网关的角度理解并实现一个 Mini OpenClaw”,然后在飞书设置的 AI 机器人输入:“帮我读取 test.txt 文件”,结果如下:

e85ee7fd4d5df8c7fa605994b44a19e4.jpg

至此我们的 Mini OpenClaw 就实现了。

10. 总结

经过上述文章我们可以更加透彻地理解为什么说 OpenClaw 可以简单总结为“高级 Agent + 网关”了。它把飞书、微信这些聊天软件的“方言消息”统一通过一个网关转成内部能听懂的“普通话”(InboundMessage),Agent 只处理这种标准消息。

为了防止消息太多堵死系统,用了两个队列(入站异步队列出站异步队列,相当于收信箱和发件箱)把接收和回复解耦开,像流水线一样互不干扰。Agent 处理完后把回复扔进发件箱,再由分发器根据渠道标签(feishu、wechat)转回对应平台的格式发回去。

这样一来,添加新平台就像加个翻译插件,核心代码完全不用动。最后用跨线程调度解决了飞书回调异步的问题。整个网关跑起来就是:用户发消息 → 标准化 → 入站队列 → Agent 思考(可调用工具)→ 出站队列 → 翻译回原平台 → 用户收到回复

上述实现也是港大开源的 Nanobot 的核心实现,Nanobot 可以说是 Python 版的 OpenClaw,是学习研究场景的轻量选择。

我是程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

CSS mask 完全指南:从渐变裁切到弹幕遮挡

CSS 属性里,mask 大概是被低估最严重的那一个。很多人知道它能"遮住一些东西",但真正上手时又觉得无从下手。其实 mask 的语法和 background 几乎一模一样——如果你已经玩转了渐变背景,那 mask 对你来说就是换个属性名的事。

本文会从语法开始,一路讲到弹幕遮挡、转场动画这些实战场景。每个案例都附带可运行的代码。


1. mask 到底是什么?

一句话:mask 决定元素的哪些部分可见、哪些部分透明

它接受的值和 background 一样——渐变、图片、SVG 都行。工作原理也简单:

  • mask 中有颜色的区域(不管什么颜色),对应元素内容可见
  • mask 中透明的区域,对应元素内容不可见

来看最基础的例子:

.demo {
  background: url(photo.jpg);
  -webkit-mask: linear-gradient(90deg, transparent, #000);
  mask: linear-gradient(90deg, transparent, #000);
}

效果是图片从左侧完全透明,到右侧完全可见——一个从无到有的渐隐效果。

这里 #000 换成 redblue 或任何颜色,效果完全一样。mask 只关心透明度,不关心色相。


2. mask 语法详解

根据 MDN CSS mask 文档:

The mask shorthand CSS property hides an element (partially or fully) by masking or clipping the image at specific points. It is a shorthand for mask-image, mask-mode, mask-repeat, mask-position, mask-clip, mask-origin, mask-size, and mask-composite.

mask 是一个简写属性,包含以下子属性:

子属性 作用 对应的 background 属性
mask-image 遮罩图像(渐变/图片/SVG) background-image
mask-size 遮罩尺寸 background-size
mask-repeat 是否平铺 background-repeat
mask-position 遮罩定位 background-position
mask-origin 定位参考框 background-origin
mask-clip 裁切参考框 background-clip
mask-composite 多个遮罩的合成方式 无对应属性

看到没有?除了 mask-composite,其他属性和 background 完全对应。如果你已经熟悉了 background-sizebackground-position 这些属性,mask 的学习成本几乎为零。

兼容性前缀

目前(2026 年)在 Chrome、Edge 等 Blink 内核浏览器中,mask 仍需 -webkit- 前缀。实际写代码时建议这样写:

.el {
  -webkit-mask: linear-gradient(#000, transparent);
  mask: linear-gradient(#000, transparent);
}

或者直接在构建工具中配置 autoprefixer,让它帮你加前缀。


3. 基础用法:渐变遮罩裁切

3.1 案例:图片切角效果

多层线性渐变可以拼出切角图形,这个技巧在 background 上就能用。把同样的渐变写到 mask 里,就能把任意元素裁成切角造型——不管元素里面是图片、文字还是渐变背景。

.notch-image {
  width: 300px;
  height: 200px;
  background: url(https://picsum.photos/300/200) no-repeat center/cover;
  -webkit-mask:
    linear-gradient(135deg, transparent 15px, #fff 0) top left,
    linear-gradient(-135deg, transparent 15px, #fff 0) top right,
    linear-gradient(-45deg, transparent 15px, #fff 0) bottom right,
    linear-gradient(45deg, transparent 15px, #fff 0) bottom left;
  -webkit-mask-size: 50% 50%;
  -webkit-mask-repeat: no-repeat;
  mask:
    linear-gradient(135deg, transparent 15px, #fff 0) top left,
    linear-gradient(-135deg, transparent 15px, #fff 0) top right,
    linear-gradient(-45deg, transparent 15px, #fff 0) bottom right,
    linear-gradient(45deg, transparent 15px, #fff 0) bottom left;
  mask-size: 50% 50%;
  mask-repeat: no-repeat;
}

四个方向的渐变各占 50% 50%,拼在一起刚好覆盖整个元素。每个渐变在角落处用 transparent 挖掉一个三角形,组合起来就是四角切角。

这里的 #fff 0 用了渐变简写技巧:0 会被浏览器修正为前一个色标的位置 15px,形成硬边界。


3.2 案例:内切圆角按钮

普通的内切圆角用 radial-gradient 就能画出来。但问题在于:如果按钮背景是渐变色而不是纯色,直接用 background 画内切圆角基本无解——你没法让两层渐变"叠加"出一个圆角效果。

mask 能解决这个问题:把内切圆角的形状写成 mask,background 想用什么渐变都行

.inset-btn {
  padding: 16px 48px;
  font-size: 16px;
  color: #fff;
  border: none;
  background: linear-gradient(45deg, #2179f5, #e91e63);
  -webkit-mask:
    radial-gradient(
        circle at 100% 100%,
        transparent 0,
        transparent 12px,
        #fff 13px
      )
      right bottom,
    radial-gradient(circle at 0 0, transparent 0, transparent 12px, #fff 13px)
      left top,
    radial-gradient(
        circle at 100% 0,
        transparent 0,
        transparent 12px,
        #fff 13px
      )
      right top,
    radial-gradient(
        circle at 0 100%,
        transparent 0,
        transparent 12px,
        #fff 13px
      )
      left bottom;
  -webkit-mask-size: 50% 50%;
  -webkit-mask-repeat: no-repeat;
  mask:
    radial-gradient(
        circle at 100% 100%,
        transparent 0,
        transparent 12px,
        #fff 13px
      )
      right bottom,
    radial-gradient(circle at 0 0, transparent 0, transparent 12px, #fff 13px)
      left top,
    radial-gradient(
        circle at 100% 0,
        transparent 0,
        transparent 12px,
        #fff 13px
      )
      right top,
    radial-gradient(
        circle at 0 100%,
        transparent 0,
        transparent 12px,
        #fff 13px
      )
      left bottom;
  mask-size: 50% 50%;
  mask-repeat: no-repeat;
}

原理:四个 radial-gradient 分别处理四个角,每个径向渐变的圆心在对应角落,0~12px 的范围是透明的(挖出圆弧),13px 往外是白色(保留内容)。

改变 12px 的值可以调整圆弧大小。这种方案的好处是 background 完全自由——纯色、渐变、图片都没问题。


4. 进阶用法:渐变消失与融合

4.1 案例:横向滚动列表的渐变消失

在很多产品里都能看到这种效果:一个横向可滚动的列表,右侧内容渐渐消失,暗示用户"还有更多内容"。

不用 mask 的话你可能会想到覆盖一个半透明遮罩层。但这有个麻烦:遮罩层会挡住点击事件,还需要设置 pointer-events: none

用 mask 就一行代码:

.scroll-list {
  display: flex;
  overflow-x: auto;
  gap: 12px;
  -webkit-mask: linear-gradient(90deg, #000 70%, transparent);
  mask: linear-gradient(90deg, #000 70%, transparent);
}

linear-gradient(90deg, #000 70%, transparent) 的意思是:从左到右,前 70% 完全可见,剩下 30% 逐渐透明。就这么简单。

要注意一点:mask 作用于整个元素及其内容,包括文字、子元素、甚至滚动条。这正是 mask 和 "覆盖一层遮罩" 的本质区别——mask 是从元素自身出发做裁切,而不是在上面盖东西。


4.2 案例:两张图片融合

mask 做图片融合非常直观:两张图片叠在一起,上层图片加一个 mask,mask 的透明区域会露出下层图片。

.blend {
  position: relative;
  width: 400px;
  height: 300px;
  background: url(https://picsum.photos/400/300?random=1) no-repeat center/cover;
}

.blend::before {
  content: '';
  position: absolute;
  inset: 0;
  background: url(https://picsum.photos/400/300?random=2) no-repeat center/cover;
  -webkit-mask: linear-gradient(45deg, #000 40%, transparent 60%);
  mask: linear-gradient(45deg, #000 40%, transparent 60%);
}

linear-gradient(45deg, #000 40%, transparent 60%) 中,40% 到 60% 这段是过渡区——两张图片在这里平滑融合。如果你把它改成 #000 50%, transparent 50%,那就是硬切割,没有过渡。

除了 linear-gradient 做线性方向的融合,radial-gradient 可以做径向区域的融合——在画面中某个位置开一个"窗口",露出下层的内容:

.radial-blend {
  position: relative;
  width: 520px;
  height: 320px;
  overflow: hidden;
}

.radial-blend .layer-cold {
  position: absolute;
  inset: 0;
  background: url(scene-cold.jpg) center / cover no-repeat;
}

.radial-blend .layer-warm {
  position: absolute;
  inset: 0;
  background: url(scene-warm.jpg) center / cover no-repeat;
  -webkit-mask: radial-gradient(circle at 35% 45%, #000 20%, transparent 60%);
  mask: radial-gradient(circle at 35% 45%, #000 20%, transparent 60%);
}

上层暖色调图片通过 radial-gradient 只在左侧偏上的位置可见,向外逐渐透明,露出底层冷色调图片。两张风格不同的照片在圆形过渡区自然融合。


5. mask-composite:组合遮罩

当一个元素有多个 mask 时,mask-composite 决定它们之间怎么合成。

根据 MDN mask-composite 文档:

The mask-composite CSS property represents a compositing operation used on the current mask layer with the mask layers below it.

标准语法支持四个关键字:

mask-composite: add; /* 叠加(默认)*/
mask-composite: subtract; /* 减去 */
mask-composite: intersect; /* 取交集 */
mask-composite: exclude; /* 排除重叠 */

但 WebKit 浏览器用的是另一套语法(-webkit-mask-composite),常用的值有:

-webkit-mask-composite: source-over; /* 对应 add */
-webkit-mask-composite: source-in; /* 对应 intersect */
-webkit-mask-composite: source-out; /* 只显示上层独有部分 */
-webkit-mask-composite: destination-out; /* 只显示下层独有部分 */
-webkit-mask-composite: xor; /* 对应 exclude */

案例:两个圆弧取交集

假设你想裁出一个"两个圆弧重叠"的形状:

.composite-demo {
  width: 300px;
  height: 200px;
  background: linear-gradient(135deg, #667eea, #764ba2);
  -webkit-mask:
    radial-gradient(circle at 100% 0, #000, #000 200px, transparent 200px),
    radial-gradient(circle at 0 0, #000, #000 200px, transparent 200px);
  -webkit-mask-composite: source-in;
  mask:
    radial-gradient(circle at 100% 0, #000, #000 200px, transparent 200px),
    radial-gradient(circle at 0 0, #000, #000 200px, transparent 200px);
  mask-composite: intersect;
}

如果不加 mask-composite,两个 mask 默认是 add(叠加),你看到的是两个圆弧的并集。加上 intersect(或 -webkit-mask-composite: source-in),就只保留两个圆弧重叠的部分

这个能力在做异形裁切时很有用:单个渐变很难画出的形状,可以通过多个简单渐变组合得到。


6. 高阶动画:mask 驱动的转场

mask 不只是静态裁切。通过动态改变 mask 的值,可以实现各种转场和切换效果。

6.1 渐变不能直接做动画——怎么办?

CSS 渐变本身不支持 transitionanimation。也就是说你写 transition: mask 0.3s 是没用的,linear-gradient 内部的参数变化不会有平滑过渡。

两种绕过方案:

  1. 逐帧动画:用 SASS 循环生成 0% 到 100% 共 101 帧的 @keyframes,每一帧写死 mask 的值
  2. CSS @property:注册一个自定义属性,让浏览器知道这个变量是 <percentage> 类型,这样它就能被动画插值

第一种方案的代码经过 SASS 编译后非常臃肿(101 帧)。推荐用第二种。

6.2 案例:conic-gradient 扇形转场(CSS @property 方案)

这是一个经典的转场效果:上层图片像扇形展开一样逐渐覆盖下层图片。hover 时触发动画。

@property --conic-p {
  syntax: '<percentage>';
  inherits: false;
  initial-value: -10%;
}

.transition-box {
  position: relative;
  width: 400px;
  height: 400px;
  background: url(https://picsum.photos/400/400?random=5) no-repeat center/cover;
  cursor: pointer;
}

.transition-box::before {
  content: '';
  position: absolute;
  inset: 0;
  background: url(https://picsum.photos/400/400?random=100) no-repeat
    center/cover;
  pointer-events: none;
  -webkit-mask: conic-gradient(
    #000 0,
    #000 var(--conic-p),
    transparent calc(var(--conic-p) + 10%),
    transparent
  );
  mask: conic-gradient(
    #000 0,
    #000 var(--conic-p),
    transparent calc(var(--conic-p) + 10%),
    transparent
  );
}

.transition-box:hover::before {
  animation: conicSweep 1.5s ease-in-out forwards;
}

@keyframes conicSweep {
  from {
    --conic-p: -10%;
  }
  to {
    --conic-p: 100%;
  }
}

这里有几个关键点:

  • @property --conic-p:注册之后,浏览器知道 --conic-p 是百分比类型,可以在动画中平滑插值。mask 里的 conic-gradient 会随着 --conic-p 从 -10% 变化到 100%,像时钟指针一样扫过整个圆。
  • pointer-events: none:伪元素覆盖在容器上层,如果不加这个属性,鼠标事件会被伪元素拦截,导致容器的 :hover 状态无法触发。
  • calc(var(--conic-p) + 10%) 多出的 10% 是过渡区,让边缘不那么生硬。如果你想要硬边界,把 +10% 去掉就行。

同样的思路,换成 linear-gradient 就是一个从左到右的滑动转场:

@property --slide-p {
  syntax: '<percentage>';
  inherits: false;
  initial-value: 0%;
}

.slide-box {
  position: relative;
  width: 400px;
  height: 400px;
  background: url(https://picsum.photos/400/400?random=10) no-repeat
    center/cover;
  cursor: pointer;
}

.slide-box::before {
  content: '';
  position: absolute;
  inset: 0;
  background: url(https://picsum.photos/400/400?random=200) no-repeat
    center/cover;
  pointer-events: none;
  -webkit-mask: linear-gradient(
    90deg,
    #000 var(--slide-p),
    transparent calc(var(--slide-p) + 5%)
  );
  mask: linear-gradient(
    90deg,
    #000 var(--slide-p),
    transparent calc(var(--slide-p) + 5%)
  );
}

.slide-box:hover::before {
  animation: slideReveal 1.2s ease-in-out forwards;
}

@keyframes slideReveal {
  from {
    --slide-p: 0%;
  }
  to {
    --slide-p: 100%;
  }
}

和扇形转场的原理完全一样,只是把 conic-gradient 换成了 linear-gradient--slide-p 从 0% 变化到 100%,实色区域从左往右推进,形成滑动揭示的效果。

如果你的目标浏览器不支持 @property(比如旧版 Firefox),也可以用 SASS 逐帧方案替代:

@keyframes maskSlide {
  @for $i from 0 through 100 {
    #{$i}% {
      mask: linear-gradient(
        90deg,
        #000 #{$i + '%'},
        transparent #{$i + 5 + '%'}
      );
    }
  }
}

编译后会生成 101 帧的 @keyframes,每一帧写死 mask 的值,代码量大但兼容性最好。


7. 实战:弹幕人物遮挡效果

在 BiliBili 或虎牙直播中,弹幕经过人物区域时会自动"绕道"——弹幕看起来在人物的后面。这个效果的实现原理就是 mask。

原理

  1. 视频画面和弹幕容器是两层叠加结构,弹幕在上层
  2. 后端通过图像识别算法,实时计算出人物的轮廓区域
  3. 生成一张 SVG/PNG 图片:人物轮廓区域是透明的,其他区域是白色/实色的
  4. 把这张图片设为弹幕容器的 mask-image
  5. 根据 mask 的工作原理——透明区域对应的弹幕内容不可见——弹幕就"消失"在人物背后了
  6. 随着视频播放,后端不断更新 mask 图片,实现实时遮挡

简化模拟

后端的实时图像识别我们没法在前端模拟,但原理可以用 radial-gradient 来演示:

.barrage-container {
  position: absolute;
  inset: 0;
  -webkit-mask: radial-gradient(
    circle at 100px 100px,
    transparent 60px,
    #fff 80px,
    #fff 100%
  );
  mask: radial-gradient(
    circle at 100px 100px,
    transparent 60px,
    #fff 80px,
    #fff 100%
  );
  animation: maskFollow 6s infinite alternate linear;
}

@keyframes maskFollow {
  to {
    -webkit-mask-position: 80vw 0;
    mask-position: 80vw 0;
  }
}

radial-gradient(100px, 100px) 位置挖了一个半径 60px 的圆形透明区域,60px 到 80px 是过渡,80px 以外完全可见。通过动画移动 mask-position,这个"挖洞"就会跟着移动。

真实场景中,这个 "挖洞" 的形状不是简单的圆形,而是从后端返回的人物轮廓 SVG。但 mask 的使用方式完全相同。

要搞清楚一点:mask 遮挡的是弹幕容器,不是人物。mask 的透明区域让弹幕不可见,从而"露出"弹幕下方的人物画面。


9. 兼容性

mask 属性的浏览器支持已经相当好了:

浏览器 支持情况
Chrome / Edge 支持(需 -webkit- 前缀)
Firefox 完全支持(无需前缀)
Safari 支持(需 -webkit- 前缀)
IE 不支持

如果你不需要兼容 IE,mask 可以放心用。前缀问题交给 autoprefixer 处理:

// postcss.config.js
module.exports = {
  plugins: [require('autoprefixer')],
};

mask-composite 的兼容性稍差一些,使用前建议在 Can I Use 上确认目标浏览器的支持情况。


10. 总结

核心原则

mask 中有颜色 → 内容可见透明 → 内容不可见。记住这一条就够了。

技巧速查表

技巧 实现方式 典型场景
渐变遮罩 mask: linear-gradient(...) 内容淡出、列表渐隐
切角/异形裁切 多重 linear-gradient + mask-size: 50% 50% 图片切角、优惠券造型
内切圆角 多重 radial-gradient 不规则按钮、卡片
图片融合 伪元素叠加 + mask 两图过渡、径向区域融合
组合遮罩 mask-composite: intersect 多 mask 取交集/差集
渐变动画转场 @property + conic-gradient 扇形展开、滑动切换
图表重绘 @property + conic-gradient + :hover 数据可视化 hover 效果
弹幕遮挡 radial-gradient / 实时图片 视频直播弹幕
雪碧图转场 mask: url(sprite.png) + steps() 精致页面转场

和 background 的关系

mask 的语法和 background 几乎一一对应——多层叠加、repeat、position、size 这些在 background 上能做的事,mask 上全能做。多出来的 mask-composite 让多个 mask 之间的布尔运算成为可能,这是 background 没有的能力。


延伸阅读

Flutter应用代码混淆完整指南:Android与iOS平台配置详解

Flutter中的代码混淆

代码混淆可以隐藏你的Dart代码中的函数和类名,让 反编译 App变得困难。对于更全面的混淆需求,特别是针对iOS IPA文件,可以使用专业工具如IpaGuard,它支持无需源码的代码和资源混淆,兼容Flutter等多种开发平台,有效增加反编译难度。

注:Dart的混淆还没有经过完全的测试,如果发现问题请到GitHub上提 issue 。关于混淆的问题,还可以参考 Stack Overflow 上的这个问题。

Flutter中的混淆配置其实是在Android和iOS端分别配置的。

Android

<ProjectRoot>/android/gradle.properties 文件中添加如下代码:

extra-gen-snapshot-options=--obfuscate

默认情况下,Flutter不会混淆或者缩减Android host,如果你使用了第三方的Java或者Android库,那么你可能需要减小APK体积,或者防止你的App被反编译。

  • Step 1:配置Proguard文件

新建 /android/app/proguard-rules.pro 文件,然后添加如下配置:


#Flutter Wrapper
-keep class io.flutter.app.** { *; }
-keep class io.flutter.plugin.**  { *; }
-keep class io.flutter.util.**  { *; }
-keep class io.flutter.view.**  { *; }
-keep class io.flutter.**  { *; }
-keep class io.flutter.plugins.**  { *; }

上面的配置只保护Flutter库,其他额外的库(比如Firebase)需要你自己添加配置。

  • Step 2:

打开 /android/app/build.gradle 文件,定位到 buildTypes 处,在 release 配置中将 minifiyEnableduseProguard 标志设为true,同时还需要指向Step1中创建的ProGuard文件:


android {
    ...
    buildTypes {
        release {
            signingConfig signingConfigs.debug
            minifyEnabled true
            useProguard true
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
}

注意混淆和缩减无用代码会加长App的编译时间。

iOS

  • Step 1:修改 "build aot"

<ProjectRoot>/packages/flutter_tools/bin/xcode_backend.sh 文件中添加 build aot flag:

${extra_gen_snapshot_options_or_none}

然后定义这个flag:


local extra_gen_snapshot_options_or_none=""
if [[ -n "$EXTRA_GEN_SNAPSHOT_OPTIONS" ]]; then
  extra_gen_snapshot_options_or_none="--extra-gen-snapshot-options=$EXTRA_GEN_SNAPSHOT_OPTIONS"
fi
  • Step 2:应用你的修改

在你的App的根目录下运行以下两条命令:


git commit -am "Enable obfuscation on iOS"
flutter
  • Step 3:更改release配置

<ProjectRoot>/ios/Flutter/Release.xcconfig 中添加下面这行:

EXTRA_GEN_SNAPSHOT_OPTIONS=--obfuscate

对于iOS平台,如果需要更强大的混淆保护,可以考虑使用IpaGuard这样的工具,它可以直接对IPA文件进行混淆加密,支持代码和资源文件的全面混淆,无需源码即可操作,并兼容Flutter应用,提供即时测试功能。

nestjs实战-登录、鉴权(二)

nestjs实战-登录、鉴权(二)

上一章中介绍了登录鉴权分两步:

  • 用户登录过程
  • 登录成功后,带token请求业务接口的过程

用户登录过程已经介绍,接下来介绍一下业务流程中的认证过程

一、业务接口token验证过程

业务接口验证流程:

  • 登录成功后,用户将token存储到本地缓存,每次发送请求时,需要在header,Authentication:[token] 将token带给后端

  • 后端操作:

    • 全局守卫 jwt.guard.ts ,是否进入jwt策略、白名单校验等
    • 触发 JWT 策略:获取jwt、解析、验证等操作
    • 成功后进入 控制器、服务、最终返回数据

二、先看代码实现:

先看一下目录结构:

auth-dir.png

首先需要创建两个文件 jwt.guard.tsjwt.strategy.ts,后面再介绍文件内的实现;

我们希望所有业务代码都需要进行token验证(提供不走验证逻辑的配置),所以 jwt.guard.ts守卫需要全局注册:

App.module.ts

import { APP_GUARD } from '@nestjs/core';
import { JwtGuard } from './modules/auth/guards/jwt.guard';

@Module({
  // ...
  providers: [
    // 全局注册 jwt 守卫,所有业务接口都会走这个守卫
    { provide: APP_GUARD, useClass: JwtGuard },
  ],
})
export class AppModule {}

auth.module.ts

import { JwtStrategy } from './strategies/jwt.strategy';

@Module({
  // ...
  providers: [
    // ...
    JwtStrategy,
  ],
})

在哪里使用 JwtStrategy 呢?

在代码中只看到 jwt.strategy.ts 作为一个提供者,在 auth.module.ts 中被注册;

提供者正常分三步:定义、注册、[注入|使用](类的构造函数constructor中注入),但是 jwt.strategy.ts 只有定义、注册 两部,因为:

之所以在代码里找不到 JwtStrategyconstructor 注入的地方,是因为它的工作方式比较特殊:它是被 NestJS 框架“自动”消费的,而不是被你的业务代码显式注入的

逻辑梳理

我们的登录、注册接口肯定是不需要进行 token 验证的,所以我们这个 jwt 全局守卫还需要一个开关来控制;

还记得我们在之前的章节介绍 拦截器-统一响应数据格式,也有类似的开关控制bypass.decorator.ts 内部逻辑,为类或方法 设置元数据 SetMetadata

此处也是类似的逻辑:

public.decorator.ts

import { SetMetadata } from '@nestjs/common';

export const IS_PUBLIC_KEY = 'isPublic';
export const Public = () => SetMetadata(IS_PUBLIC_KEY, true);

auth.controller.ts

为类 设置上 特定的元数据,后续在 守卫中获取对应的值,作为判断逻辑;

// ...
import { Public } from '~/common/decorators/public.decorator';

@Controller('auth')
@Public() // 类下面所有的路由都不需要检验token
export class AuthController {
  constructor(
    private readonly authService: AuthService,
    private readonly usersService: UsersService,
  ) {}

  // 注册
  @Post()
  register(@Body() createUserDto: CreateUserDto) {
    return this.usersService.create(createUserDto);
  }

  // 登录
  @Post('login')
  login(@Body() loginDto: LoginDto) {
    return this.authService.login(loginDto);
  }
}

核心逻辑

Jwt.guard.ts
import { ExecutionContext, Injectable } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
import { Observable } from 'rxjs';

import { Reflector } from '@nestjs/core';

import { AuthStrategy } from '../auth.constant';
import { IS_PUBLIC_KEY } from '~/common/decorators/public.decorator';

@Injectable()
export class JwtGuard extends AuthGuard(AuthStrategy.JWT) {

  constructor(private readonly reflector: Reflector) {
    super();
  }

  canActivate(context: ExecutionContext): boolean | Promise<boolean> | Observable<boolean> {
    // 获取 类、方法上的 元数据
    const isPublic = this.reflector.getAllAndOverride<boolean>(IS_PUBLIC_KEY, [
      context.getHandler(),
      context.getClass(),
    ]);
    
    // 不需要校验token,接口可以直接访问,例如:登录、注册、获取验证码 等接口
    if (isPublic) {
      return true;
    }

    /**
     * super.canActivate(context) 的作用是:调用 @nestjs/passport 里已经实现好的 JWT 认证流程,包括:
     * 1. 从请求中提取 token(通常是 Bearer)
     * 2. 调用对应 JwtStrategy
     * 3. 验证签名、过期时间等
     * 4. 验证通过后把结果挂到 request.user
     * 5. 最后返回 true(通过)或抛异常(401)
     */
    return super.canActivate(context);
  }
}
Jwt.strategy.ts

在守卫触发后,通过守卫内部的 super.canActivate(context) 触发执行此策略,注意策略名称需一致;

import { Injectable } from '@nestjs/common';
import { PassportStrategy } from '@nestjs/passport';
import { ExtractJwt, Strategy } from 'passport-jwt';
import { AuthStrategy } from '../auth.constant';
import { securityRegToken, ISecurityConfig } from '~/config';
import { ConfigService } from '@nestjs/config';


@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy, AuthStrategy.JWT) {
  constructor(
    private readonly configService: ConfigService,
  ) {
    const securityConfig = configService.get<ISecurityConfig>(securityRegToken)

    super({
      jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
      ignoreExpiration: false,
      secretOrKey: securityConfig.jwtSecret,
    });
  }

  validate(payload: any) {
    console.log('payload', payload);
    return payload;
  }
}

代码部分说明:

  • PassportStrategy:Nest 里把 passport-jwtStrategy 包装成可注入的类。
  • ExtractJwtStrategy:来自 passport-jwt,负责「从请求里拿 JWT」和「验签逻辑」。

  •   export class JwtStrategy extends PassportStrategy(Strategy, AuthStrategy.JWT)
    
    • 第二个参数 AuthStrategy.JWT 是 策略名(一般是 'jwt'),要和 AuthGuard('jwt') / AuthGuard(AuthStrategy.JWT) 一致。
    • Nest 会在需要 JWT 认证时,走这个策略。
  • 构造函数里的 super({...})

    • jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken() 只从 Authorization: Bearer <token> 里取 JWT;没有 Bearer 或格式不对,认证会失败。
    • ignoreExpiration: false 过期 token 会被拒绝;若为 true 则仍可能验签通过(一般不这么配在生产鉴权上)。
    • secretOrKey: securityConfig.jwtSecret 和签发 token 时用的密钥必须一致,否则验签失败。
  •   validate(payload)
    
    • JWT 验签、过期检查通过后,passport-jwt 会把解码后的 payload(一般是 { sub, name, ... })传给 validate
    • 你这里 return payload,表示 request.user 就是整个 payload。
    • 若你希望 req.user 是数据库里的用户对象,通常会在这里根据 payload.sub 查库,再 return user

和请求流程的关系

Guard 触发 JWT 策略 → 从 Header 取 token → 用 jwtSecret 验签 → 调用 validate(payload) → 返回值赋给 request.user → 再进控制器。

三、整个过程的生命周期

当我访问一个业务接口时的执行顺序,例如直接访问 /users 接口:

  1. 全局 Guard 先执行:JwtGuard.canActivate()

  2. JwtGuard 内部调用 super.canActivate(context)AuthGuard('jwt')

  3. 触发 JWT Strategy:JwtStrategy

    • Authorization: Bearer xxx 提取 token
    • 验签、校验过期
    • 调用 JwtStrategy.validate(payload),并把返回值挂到 request.user
    • 以上操作都是库自动帮我们执行的
  4. Guard 通过后,进入 Controller、Service ,最终响应给前端

四、总结

以上就完成熟悉了整个 登录鉴权的过程;

  • 熟悉了守卫的实战场景
  • 熟悉 鉴权相关的逻辑流程,不管是nestjs 还是其他后端语言,这块逻辑是不变的
  • 基于nestjs,熟悉了它的实现流程,各个npm包的作用

Python高级特性:Map和Reduce函数完全指南

以下是根据您提供的链接内容,完整改写为CSDN博客风格的文章,并按照您的要求,在前言后面添加了原文链接,在尾部添加了推广内容。


Python高级特性:Map和Reduce函数完全指南

函数式编程的数据处理利器

前言

map()reduce()是Python中两个重要的高阶函数,它们源自函数式编程范式,与Google著名的MapReduce分布式计算模型有着相似的思想。掌握这两个函数,可以让你以更声明式、更简洁的方式处理数据集合。

本文将系统讲解map()reduce()的语法、工作原理、实际应用案例以及与现代Python特性的对比,帮助你提升数据处理能力。

📚 本文内容基于道满PythonAI - Map和Reduce函数教程


一、Map函数

map()函数将一个函数应用于一个可迭代对象(iterable)的每个元素,并返回一个迭代器(iterator)。

1.1 基本语法

map(function, iterable, ...)
参数 说明
function 应用到每个元素的函数
iterable 一个或多个可迭代对象
返回值 迭代器(惰性求值)

1.2 基本示例

# 定义平方函数
def square(x):
    return x * x

# 应用map
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
squared = map(square, numbers)

# map返回迭代器,需要转换为列表查看结果
print(squared)           # <map object at 0x...>
print(list(squared))     # [1, 4, 9, 16, 25, 36, 49, 64, 81]

# 注意:迭代器只能使用一次
print(list(squared))     # [] - 已经耗尽!

1.3 使用lambda表达式

# 使用lambda简化代码
squared = map(lambda x: x**2, [1, 2, 3, 4, 5])
print(list(squared))  # [1, 4, 9, 16, 25]

# 求立方
cubed = map(lambda x: x**3, [1, 2, 3, 4, 5])
print(list(cubed))    # [1, 8, 27, 64, 125]

# 转字符串
str_nums = map(str, [1, 2, 3, 4, 5])
print(list(str_nums)) # ['1', '2', '3', '4', '5']

1.4 多参数map

map()可以接收多个可迭代对象,函数应接受相应数量的参数:

# 两个列表对应元素相加
list1 = [1, 2, 3]
list2 = [4, 5, 6]
result = map(lambda x, y: x + y, list1, list2)
print(list(result))  # [5, 7, 9]

# 三个列表对应元素相乘
a = [1, 2, 3]
b = [4, 5, 6]
c = [7, 8, 9]
product = map(lambda x, y, z: x * y * z, a, b, c)
print(list(product))  # [28, 80, 162]

# 不同长度的列表:以最短的为准
list1 = [1, 2, 3, 4]
list2 = [10, 20]
result = map(lambda x, y: x + y, list1, list2)
print(list(result))  # [11, 22] - 只处理到最短列表结束

二、Reduce函数

reduce()函数对一个序列中的元素进行累积计算。从Python 3开始,reduce()被移到了functools模块。

2.1 基本语法

from functools import reduce

reduce(function, sequence[, initial])
参数 说明
function 接收两个参数的累积函数
sequence 可迭代序列
initial 可选,初始值
返回值 单个累积结果

2.2 工作原理

reduce(f, [x1, x2, x3, x4])的计算过程等价于:

f(f(f(x1, x2), x3), x4)

2.3 基本示例

from functools import reduce

# 累加求和
def add(x, y):
    return x + y

total = reduce(add, [1, 3, 5, 7, 9])
print(total)  # 25 (1+3+5+7+9)

# 使用lambda简化
total = reduce(lambda x, y: x + y, [1, 3, 5, 7, 9])
print(total)  # 25

# 求最大值
numbers = [5, 2, 8, 1, 9, 3]
max_num = reduce(lambda x, y: x if x > y else y, numbers)
print(max_num)  # 9

# 使用初始值
total = reduce(lambda x, y: x + y, [1, 2, 3], 10)
print(total)  # 16 (10+1+2+3)

# 空序列必须提供初始值
total = reduce(lambda x, y: x + y, [], 0)
print(total)  # 0

2.4 更复杂的例子:数字列表转整数

from functools import reduce

def digits_to_num(digits):
    """将数字列表转换为整数"""
    return reduce(lambda x, y: x * 10 + y, digits)

print(digits_to_num([1, 3, 5, 7, 9]))  # 13579
print(digits_to_num([0, 1, 2, 3]))     # 123

三、实用案例

3.1 字符串规范化(使用map)

def normalize(name):
    """将名字规范化为首字母大写,其余小写"""
    return name.capitalize()

names = ['adam', 'LISA', 'barT']
normalized_names = list(map(normalize, names))
print(normalized_names)  # ['Adam', 'Lisa', 'Bart']

# 使用lambda一行搞定
normalized = list(map(lambda s: s.capitalize(), ['adam', 'LISA', 'barT']))
print(normalized)  # ['Adam', 'Lisa', 'Bart']

3.2 列表乘积计算(使用reduce)

from functools import reduce

def prod(L):
    """计算列表中所有元素的乘积"""
    return reduce(lambda x, y: x * y, L, 1)

print(prod([3, 5, 7, 9]))   # 945
print(prod([2, 3, 4]))      # 24
print(prod([]))             # 1 (空列表返回初始值1)

3.3 字符串转浮点数(map + reduce组合)

from functools import reduce

def str2float(s):
    """将数字字符串转换为浮点数"""
    # 字符转数字的映射表
    def char2num(c):
        return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4,
                '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[c]
    
    # 处理小数
    if '.' in s:
        integer_part, decimal_part = s.split('.')
        # 整数部分:累加
        integer = reduce(lambda x, y: x * 10 + y, map(char2num, integer_part))
        # 小数部分:累加后除以10的幂
        decimal = reduce(lambda x, y: x * 10 + y, map(char2num, decimal_part)) / (10 ** len(decimal_part))
        return integer + decimal
    else:
        # 纯整数
        return reduce(lambda x, y: x * 10 + y, map(char2num, s))

# 测试
print(str2float('123.456'))   # 123.456
print(str2float('0.5'))       # 0.5
print(str2float('789'))       # 789.0

# 使用内置float验证
assert str2float('123.456') == 123.456
print("测试通过!")

3.4 数据统计示例

from functools import reduce

# 学生成绩数据
scores = [85, 92, 78, 95, 88, 76, 93]

# 1. 计算总分
total = reduce(lambda x, y: x + y, scores)
print(f"总分: {total}")  # 607

# 2. 计算平均分
average = total / len(scores)
print(f"平均分: {average:.2f}")  # 86.71

# 3. 计算最高分
max_score = reduce(lambda x, y: x if x > y else y, scores)
print(f"最高分: {max_score}")  # 95

# 4. 计算最低分
min_score = reduce(lambda x, y: x if x < y else y, scores)
print(f"最低分: {min_score}")  # 76

# 5. 统计及格人数(分数>=60)
passing = list(filter(lambda x: x >= 60, scores))
print(f"及格人数: {len(passing)}")  # 7

四、现代Python中的替代方案

虽然map()reduce()仍然有用,但现代Python中通常有更Pythonic的替代方案。

4.1 列表推导式替代map()

numbers = [1, 2, 3, 4, 5]

# map方式
squared_map = list(map(lambda x: x**2, numbers))

# 列表推导式方式(更Pythonic)
squared_comp = [x**2 for x in numbers]

print(squared_map)   # [1, 4, 9, 16, 25]
print(squared_comp)  # [1, 4, 9, 16, 25]

4.2 内置函数替代简单reduce()

numbers = [1, 2, 3, 4, 5]

# reduce方式求和
total_reduce = reduce(lambda x, y: x + y, numbers)

# 内置sum函数(更简洁)
total_sum = sum(numbers)

print(total_reduce)  # 15
print(total_sum)     # 15

# 其他内置函数:max(), min(), any(), all()
print(max(numbers))  # 5
print(min(numbers))  # 1

4.3 生成器表达式处理大数据

# map返回迭代器(惰性求值)
squared_map = map(lambda x: x**2, range(1000000))

# 生成器表达式(同样惰性求值)
squared_gen = (x**2 for x in range(1000000))

# 两者内存效率相同,但生成器表达式更Pythonic

五、性能考虑

方法 内存效率 执行速度 适用场景
map() + 迭代器 较快 大数据集、函数复用
列表推导式 最快 中小数据集、简单操作
生成器表达式 较快 大数据集、简单操作
reduce() 累积计算
内置函数(sum()等) 最快 简单聚合
import timeit

# 性能测试(在实际环境中运行)
numbers = list(range(10000))

# map方式
def test_map():
    return list(map(lambda x: x**2, numbers))

# 列表推导式
def test_comp():
    return [x**2 for x in numbers]

# 通常列表推导式略快于map
# print(timeit.timeit(test_map, number=1000))
# print(timeit.timeit(test_comp, number=1000))

六、总结

函数 作用 返回类型 典型用途 Pythonic替代
map() 对序列每个元素应用函数 迭代器 数据转换 列表推导式
reduce() 累积计算序列元素 单个值 聚合计算 sum(), max()

核心要点

  1. map()将函数应用到每个元素,返回迭代器(惰性求值)
  2. reduce()对序列进行累积计算,返回单个值
  3. map()可以接收多个可迭代对象,函数应有对应数量的参数
  4. reduce()需要从functools导入(Python 3+)
  5. ✅ 简单操作优先使用列表推导式内置函数
  6. ✅ 处理大数据集时,map()返回的迭代器节省内存

选择建议

  • 简单转换 → 列表推导式(更Pythonic)
  • 简单聚合 → 内置函数(如sum(), max()
  • 复杂转换且函数已定义 → map()
  • 复杂累积计算 → reduce()
  • 大数据集 → map()或生成器表达式(节省内存)

掌握map()reduce()可以帮助你写出更简洁、更函数式的Python代码,特别是在数据处理和转换场景中。


📚 相关推荐阅读


💡 Python 学习不走弯路!

体系化实战路线:基础语法 · 异步Web开发 · 数据采集 · 计算机视觉 · NLP · 大模型RAG实战
—— 全在 「道满PythonAI」


如果这篇文章对你有帮助,欢迎点赞、评论、收藏,你的支持是我持续分享的动力!🎉

AI全栈入门指南:NestJs 中的 DTO 和数据校验

大家好 👋,我是 Moment,目前正在使用 Next.js、NestJS、LangChain 开发 DocFlow。这是一个面向 AI 场景的协同文档平台,集成了基于 Tiptap 的富文本编辑、NestJS 后端服务、实时协作与智能化工作流等核心模块。

在这个项目的持续打磨过程中,我积累了不少实战经验,不只是 Tiptap 的深度定制、编辑器性能优化和协同方案设计,也包括前端工程化建设、React 源码理解以及复杂项目架构实践。

如果你对 AI 全栈开发、Agent、长期记忆、文档编辑器、前端工程化或者 React 源码相关内容感兴趣,欢迎添加我的微信 yunmz777 一起交流。觉得项目还不错的话,也欢迎给 DocFlow 点个 star ⭐

前面几篇里,控制器、服务、模块的关系已经铺开了。接下来是一个很现实的问题:参数一进控制器,能不能直接往服务层传。

技术上可以。@Body()@Query()@Param() 拿到的都是未经你类声明约束的原始形态,类型上也往往是宽松的。

真做项目时,这种写法会很快变成隐患。请求来自外部,外部输入不能默认可信:字段可能缺失、类型可能串了、字符串里可能塞了根本转不成数字的内容,甚至还可能多带几个你从未在文档里写过的键。

这就是 DTO 要解决的问题。

DTOData Transfer Object 的缩写。先把它想成"接口层的数据契约"。它不承载业务过程,只回答这几件事:

  • 这次请求允许出现哪些字段
  • 每个字段期望的类型是什么
  • 哪些是必填
  • 除类型以外还要满足哪些约束

拿"创建用户"来说,若没有契约,你很容易遇到:

  • name 是空字符串
  • email 根本不像邮箱
  • age 传成了 "abc"
  • 客户端悄悄带上 role: "admin"

脏数据一旦进了服务层或持久层,再排查就要沿着整条调用链往回找,成本很高。

所以 DTO 的价值不只是给参数"加个类型标注",而是把接口边界写死,让不合法的东西尽量在进门时被拦下。

下面是一个最基础的入参契约,字段上的装饰器来自 class-validator,后面接上 ValidationPipe 后才会真正生效:

import { IsEmail, IsInt, IsString, Min, MinLength } from "class-validator";

/** 创建用户接口允许的请求体形状 */
export class CreateUserDto {
  @IsString()
  @MinLength(2)
  name: string;

  @IsEmail()
  email: string;

  @IsInt()
  @Min(0)
  age: number;
}

这个类既不是表结构,也不是领域实体,它只是说:创建用户这条接口,合法请求体至少长这样。

class-validatorclass-transformer

NestJS 里,DTO 通常和两个库成对出现:

  • class-validator 管规则,字段对不对、满不满足约束
  • class-transformer 管形态,把普通对象转成类实例,并在需要时做类型转换

一句话分工:class-validator 问"对不对",class-transformer 问"怎么变成声明里的那种形状"。

查询字符串里的数字、嵌套对象里的子对象,往往都要靠转换配合校验,否则你会一直在和业务代码里多余的 Number()parseInt 打交道。

下面这个查询 DTO 同时用到了两边:@Type(() => Number) 先把 page 尽量变成数字,再用 @IsInt()@Min(1) 收紧范围。

import { Type } from "class-transformer";
import { IsEmail, IsInt, IsOptional, IsString, Min } from "class-validator";

/** 用户列表查询:关键词可选,页码可选且至少为 1 */
export class QueryUsersDto {
  @IsOptional()
  @IsString()
  keyword?: string;

  @IsOptional()
  @Type(() => Number)
  @IsInt()
  @Min(1)
  page?: number;
}

为什么查询参数特别需要 @Type。因为从 HTTP 进应用时,查询串几乎都是字符串。?page=2 在多数时候先是 "2",不转一把,@IsInt() 很容易和你的直觉拧着。

全局开启 ValidationPipe 且设置 transform: true 时,还可以再配合 transformOptions.enableImplicitConversion,对部分简单类型做隐式转换。嵌套结构、联合形态仍然更推荐显式写 @Type,可读性更好,也少踩坑。

依赖若尚未安装,在项目根目录执行:

pnpm add class-validator class-transformer

装好后,DTO 上的装饰器才有运行时意义。

ValidationPipe 的用法

光定义 DTO 类,请求进来并不会自动校验。真正把契约接进管道的是 ValidationPipe

把它想成控制器前的一道闸:参数先按 DTO 规则过一遍,过了才进方法体,不过则直接短路成错误响应。

默认情况下,校验失败会抛出 BadRequestException,HTTP 状态码一般是 400。响应体里常见 message 字段,内容多为字符串数组,逐项列出哪条规则没通过,便于联调。

最常见的做法是在 main.ts 里全局挂上管道:

import { ValidationPipe } from "@nestjs/common";
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";

async function bootstrap(): Promise<void> {
  const app = await NestFactory.create(AppModule);

  app.useGlobalPipes(
    new ValidationPipe({
      whitelist: true,
      transform: true,
    }),
  );

  await app.listen(3000);
}

void bootstrap();

全局启用之后,只要在参数位置写了具体的 DTO 类型(而不是泛泛的 object),Nest 就会尝试按类做转换和校验。

import { Body, Controller, Post } from "@nestjs/common";
import { CreateUserDto } from "./dto/create-user.dto";

@Controller("users")
export class UsersController {
  @Post()
  create(@Body() body: CreateUserDto): CreateUserDto {
    // 能执行到这里时,body 已通过校验并按 DTO 做过转换
    return body;
  }
}

不满足 CreateUserDto 时,create() 不会执行,客户端会先收到校验错误。服务层就可以少写一层重复的"字段是不是 string"式的防御代码。

如果某个路由要临时关掉转换或换一套规则,可以用控制器级或方法级管道覆盖默认行为,不必动全局配置:

import { Body, Controller, Post, UsePipes, ValidationPipe } from "@nestjs/common";
import { CreateUserDto } from "./dto/create-user.dto";

@Controller("users")
export class UsersController {
  @Post("draft")
  @UsePipes(
    new ValidationPipe({
      whitelist: true,
      transform: true,
      forbidNonWhitelisted: false,
    }),
  )
  saveDraft(@Body() body: CreateUserDto): CreateUserDto {
    return body;
  }
}

对多数业务项目,全局一套偏严格的默认值,再在少数路径上放宽,往往比完全不用全局管道省心。

白名单、转换与多余字段

ValidationPipe 的价值不止于报错。whitelistforbidNonWhitelistedtransform 三个开关配合起来,可以把入口擦得很干净。

app.useGlobalPipes(
  new ValidationPipe({
    whitelist: true,
    forbidNonWhitelisted: true,
    transform: true,
  }),
);

whitelist

whitelist: true 时,只有 DTO 上声明过的属性会留在对象上。多出来的键会被剥掉。

DTO 只有 nameemail,客户端却带了 roleisAdmin,这些多余字段不会跟着进控制器方法。很多风险来自"多传了不该收的字段",而不只是字段值写错。

forbidNonWhitelisted

forbidNonWhitelisted: true 再收紧一档:只要出现未声明字段,直接判失败,而不是悄悄删掉。

公开 API、对接第三方、强契约场景更适合打开它。

transform

transform: true 会启用 class-transformer,把原始负载转成类实例,并按装饰器做类型转换。

例如查询串里的 page=2 可以变成数字 2,避免整份业务代码里到处是手动的 Number()

实际顺序可以粗略理解成:先尽量转成 DTO 实例并做类型转换,再跑 class-validator,最后按白名单剥掉多余属性。校验失败会在进入控制器之前返回,不会混进半合法对象。

20260328102554

参数并不是原样流进控制器,而是先被整理成契约允许的形状。收益不只是少报错,而是入口这一圈边界可控、可测、可讲清楚。

嵌套对象与数组

请求体里常有嵌套结构,例如地址、标签列表。外层 DTO 校验到了,内层仍是普通对象,规则不会自动往下传。

常见写法是对嵌套属性再声明一个 DTO 类,在外层加上 @ValidateNested(),并用 @Type(() => InnerDto) 指明怎么实例化内层。数组则配合 @IsArray()@ArrayMinSize() 等与集合相关的装饰器。

import { Type } from "class-transformer";
import {
  IsArray,
  IsString,
  MinLength,
  ValidateNested,
} from "class-validator";

export class AddressDto {
  @IsString()
  @MinLength(1)
  city: string;
}

export class CreateOrderDto {
  @ValidateNested()
  @Type(() => AddressDto)
  address: AddressDto;

  @IsArray()
  @IsString({ each: true })
  tags: string[];
}

嵌套越深,越要在类型和装饰器上写清楚,否则很容易出现"外层过了、内层仍是任意 JSON"的假象。

从已有 DTO 派生

更新接口常常和创建接口只差"全部可选"。手写两份几乎相同的类容易漂移,可以用 @nestjs/mapped-types 里的 PartialType 从创建 DTO 派生更新 DTO,装饰器会一并变成可选校验。

import { PartialType } from "@nestjs/mapped-types";
import { CreateUserDto } from "./create-user.dto";

/** 更新用户:字段与创建一致,但均可选 */
export class UpdateUserDto extends PartialType(CreateUserDto) {}

安装依赖:

pnpm add @nestjs/mapped-types

还有 PickTypeOmitType 等,用在"只要子集字段"的场景,思路相同:一份源契约,多份视图,而不是复制粘贴改几个字母。

DTOEntityVO 不要混用

后期常见的大坑,是把长得差不多的类来回复用。数据库实体直接当入参 DTO 用,或把带密码哈希的实体原样返回给前端,短期省事,长期边界全糊。

DTOEntityVO 都可以是一组字段,但站位不同:

  • DTO 对准接口进出的契约
  • Entity 对准持久化与领域状态
  • VO 对准对外展示或某次响应的裁剪结果

同一张用户表在三层里的切片往往不一样。

UserEntity 里可能有 idnameemailpasswordHashcreatedAtupdatedAt。创建用户的 CreateUserDto 只要 nameemailpassword。返回前端的 UserProfileVo 可能只给 idnameemail。看起来都在描述用户,语义并不相同。

混用会带来:入参与存储绑死、内部字段意外暴露、一个类为了兼容多种场景不断长歪、改一处字段牵动所有层。

/** 创建接口入参 */
export class CreateUserDto {
  name: string;
  email: string;
  password: string;
}

/** 与数据库表或 ORM 实体对齐 */
export class UserEntity {
  id: string;
  name: string;
  email: string;
  passwordHash: string;
  createdAt: Date;
}

/** 返回给前端的公开资料,不含密码类敏感字段 */
export class UserProfileVo {
  id: string;
  name: string;
  email: string;
}

即便字段重叠,也不要因为"看着像"就合成一个类。习惯上可以记:DTO 站在门口,Entity 站在存储与领域内部,VO 站在对外可见的应答形状。

小结

这一篇想建立的,不局限于"会贴几个校验装饰器",而是这条判断:

接口参数不能默认可信。

DTO 把边界写清楚,class-validator 写规则,class-transformer 做实例化与转换,ValidationPipe 把它们嵌进请求生命周期。白名单和严格拒绝多余字段,则是在契约之上再加一层安全习惯。

若下面这些已经变成你的默认思路,这一章就到位了:

  • 控制器拿到的外部数据不要裸用
  • 入参用 DTO 声明,并配合管道校验与转换
  • 嵌套与数组要有对应的嵌套 DTO 与集合装饰器
  • 需要时用 PartialType 等工具派生,避免复制粘贴
  • DTOEntityVO 各司其职,不因字段相似就混成一类

下一节会看配置与环境变量。除了 HTTP 负载,运行时的开关和密钥同样需要被约束和管理。

AI 全栈指南:NestJs 中的 Service Provider 和 Module

大家好 👋,我是 Moment,目前正在使用 Next.js、NestJS、LangChain 开发 DocFlow。这是一个面向 AI 场景的协同文档平台,集成了基于 Tiptap 的富文本编辑、NestJS 后端服务、实时协作与智能化工作流等核心模块。

在这个项目的持续打磨过程中,我积累了不少实战经验,不只是 Tiptap 的深度定制、编辑器性能优化和协同方案设计,也包括前端工程化建设、React 源码理解以及复杂项目架构实践。

如果你对 AI 全栈开发、Agent、长期记忆、文档编辑器、前端工程化或者 React 源码相关内容感兴趣,欢迎添加我的微信 yunmz777 一起交流。觉得项目还不错的话,也欢迎给 DocFlow 点个 star ⭐

上一节里,Controller 负责接请求、取参数、返回结果。真正撑起接口价值的,多半不是"把请求接进来",而是背后的业务逻辑。

这段逻辑默认放在 Service 里。

先把 Service 想成"业务处理层"。它不太关心路由怎么对齐,也不太关心这次是 GET 还是 POST,更常琢磨的是下面这些:

  • 数据怎么查、怎么写
  • 规则怎么判定
  • 结果怎么拼装
  • 同一套逻辑别处还要不要复用

拿创建用户来说,麻烦往往不在收参数,而在查重、密码策略、默认状态、要不要发欢迎邮件。这些都更适合收紧 Service,而不是摊在控制器里。

下面的 UsersService 只在内存里摆个数组示意,重点看职责怎么收拢:

import { Injectable } from "@nestjs/common";

/** 内存里的用户结构,仅作示意 */
interface User {
  id: string;
  name: string;
}

@Injectable()
export class UsersService {
  private readonly users: User[] = [
    { id: "1", name: "汤姆" },
    { id: "2", name: "杰瑞" },
  ];

  /** 返回全部用户 */
  findAll(): User[] {
    return this.users;
  }

  /** 按主键查找,没有则 undefined */
  findById(id: string): User | undefined {
    return this.users.find((user) => user.id === id);
  }
}

数组只是替身,要紧的是"查全部"、"按 id 查"已经归进 UsersService。控制器只管调方法,不必过问细节。

Service 带来的直接好处主要是两条:

  • 控制器变薄,一层里不塞满所有事
  • 业务逻辑方便复用、写测试、以后改实现

习惯可以记得很短:控制器对齐请求,Service 扛起业务。

Provider 的本质

不少人初学时会把 ProviderService 混着说,其实分清也不难:Service 是很常见的一种 ProviderProvider 这个词包住的是所有"可注入实现"。

凡是能交给 NestJS 容器创建、保管,再注入给别的类的,都归在这一类里。常见例子包括:

  • 业务服务,例如 UsersService
  • 仓储或数据访问类,例如 UsersRepository
  • 横切能力,例如 MailService
  • 配置对象、工厂返回值、自定义 token 绑定的实例,也都算

框架把它们统称 Provider,并不是纠结类名该叫 Service 还是 Repository,而是在管三件事:

  • 要不要由容器负责实例化
  • 能不能被别人注入
  • 生命周期怎么配合作用域

写进模块的 providers 数组,就是在向容器挂号。只有挂上的实现才会按作用域被实例化,并有机会出现在别人的构造函数里。类名是服务还是仓储,只影响阅读,不影响这条规则。

下面两个类分工不同,在容器眼里却一视同仁,都是 Provider

import { Injectable } from "@nestjs/common";

@Injectable()
export class UsersService {
  findAll(): string[] {
    return ["汤姆", "杰瑞"];
  }
}

@Injectable()
export class MailService {
  sendWelcomeMail(email: string): string {
    return `已向 ${email} 发送欢迎邮件(示意)`;
  }
}

命名上你仍可以一个叫用户服务、一个叫邮件服务,登记方式没有区别。

记关系时只要两句就够:Provider 是框架侧的通用身份,Service 是业务里最常见的实现形态。以后遇到 Repository、工厂型 Provider 或自定义 token,仍然在同一个注入体系里处理。

Module 是什么

Service 扛业务,Provider 被容器托管,Module 则要再往上管一层:划清功能边界,把同一领域的控制器、Provider、对外约定装进一个盒子里。

NestJS 里,模块不是摆设,而是结构的基本单元,应用多半就是许多模块拼起来的东西。

用户、订单、认证可以各自落在 UsersModuleOrdersModuleAuthModule 上,每个模块维护自己的控制器、内部 Provider、以及愿意被别人用到的出口。

最小模块长这样:

import { Module } from "@nestjs/common";
import { UsersController } from "./users.controller";
import { UsersService } from "./users.service";

/** 用户领域:对外入口 + 可注入服务 */
@Module({
  controllers: [UsersController],
  providers: [UsersService],
})
export class UsersModule {}

行数不多,信息量不小:这几个人同属于一块业务边界;控制器对外接请求,UsersService 在本模块内可注入,再往下还可以继续挂别的 Provider

从结构上看,可以先扫一眼下面这张图。

20260328102242

节点不是漂在全局,而是先归进各自模块,再由 AppModule 一类根模块把业务模块接起来。

别把 Module 当成应付编译器的样板,它就是在替你划"这块功能从哪开始、到哪结束"。

imports 等四个字段各管什么

第一次看 @Module() 里的配置,最容易缠在一起的是 importsproviderscontrollersexports。拆开看就顺了。

下面在有用户模块的基础上多接了一个 DatabaseModule,并把 UsersService 对外导出,方便别的模块注入:

import { Module } from "@nestjs/common";
import { DatabaseModule } from "../database/database.module";
import { UsersController } from "./users.controller";
import { UsersService } from "./users.service";

/** 依赖数据库模块,并把用户服务暴露给 import 本模块的一方 */
@Module({
  imports: [DatabaseModule],
  controllers: [UsersController],
  providers: [UsersService],
  exports: [UsersService],
})
export class UsersModule {}

四个键可以先记成功能分工:

  • imports 本模块依赖哪些别的模块已经 exports 出来的能力
  • providers 本模块自己要注册、仅供内部(默认可注入范围)使用的 Provider
  • controllers 本模块声明哪些 HTTP 入口
  • exports 本模块对外放行哪些 Provider,供在别处 imports 了本模块的代码继续注入

最常绊脚的一对是 providersexports

  • providers 是"家里有哪些实现"
  • exports 是"门口挂牌、准许邻居借用的有哪些"

留在 providers 里但没进 exports 的,别模块默认看不见。只有当别人也要注入这份实现,才需要把它写进 exports

这有点像团队分工:内部实现可以多,对外接口要收束;别人要用,只能走你声明过的模块边界。

分文件夹只是把文件挪个地方,模块是在声明"谁允许依赖谁、谁对外可见"。

为什么业务逻辑不能全写在 Controller

新手很容易图省事,把业务全堆进 Controller:参数在手,就地校验、拼装、返回,看起来一气呵成。

项目一大,这样最容易长胖的是控制器。

下面这个例子能跑,但已经在兼职干 Service 的活:

import { Body, Controller, Post } from "@nestjs/common";

/** 创建用户时客户端传入的字段 */
interface CreateUserDto {
  name: string;
  email: string;
}

@Controller("users")
export class UsersController {
  @Post()
  create(@Body() body: CreateUserDto): { message: string } {
    const exists = body.email === "tom@example.com";

    if (exists) {
      return { message: "该邮箱已存在" };
    }

    const user = {
      id: Date.now().toString(),
      name: body.name,
      email: body.email,
      status: "正常",
    };

    return { message: `已创建用户:${user.name}` };
  }
}

收参、判重、造对象、定响应格式挤在同一层,后面要复用、单测、接库、发信、上事务,只能继续往控制器里糊。

把规则挪进 Service,控制器只做转发,形态会干净很多:

import { Body, Controller, Post } from "@nestjs/common";
import { UsersService } from "./users.service";

interface CreateUserDto {
  name: string;
  email: string;
}

@Controller("users")
export class UsersController {
  constructor(private readonly usersService: UsersService) {}

  @Post()
  create(@Body() body: CreateUserDto): { message: string } {
    // 业务规则交给服务层
    return this.usersService.create(body);
  }
}

改完后的控制器基本只做四件事:接请求、拿参数、喊 Service、把结果交出去。

收益不只是顺眼,而是业务落在更容易复用和测试的一层,项目越复杂越省劲。

为什么 ModuleNestJS 里最核心的那一层边界

Controller 管入口,Service 管业务落地,Module 管的是再底下那层:系统边界哪里画、依赖往哪收敛。

维护噩梦常常不是少写了类,而是边界糊掉:模块互相穿透实现细节,调用网越织越密。

NestJSModule 摆得这么重,是要你把应用想成"多模块协作",而不是"一大撮控制器加一大撮服务"。

边界划清楚以后,好处很实在:

  • 用户、订单、支付、认证各自有落脚模块
  • 依赖不容易随便渗透到别的模块内部
  • 拆分、复用、补测试都更顺手
  • 新人找功能时有目录感
  • 大重构可以按模块切块推进

反过来,模块若只是分文件夹,ServiceController 再多也可能是一盘散沙。

所以 Module 不只是凑齐装饰器清单,而是在体积涨上去之前,逼你先想清楚谁能见谁、谁能用谁。

顺口溜可以记成 "Controller 开门口,Service 做生意,Module 砌围墙"。

这三层站稳以后,依赖注入、模块导入导出、动态模块、可插拔架构都会沿同一套边界往下长。

小结

这篇的重点不是多记几个词,而是把三条线拧到一根绳上:

  • Service 承接大部分业务
  • Provider 是容器能注入的那类东西的统称
  • Module 划边界、装箱、再决定对外露什么

判断习惯可以压成四句:入口给控制器,规则给服务,可注入项进 Provider 列表,单元边界交给模块。

下一节讲 DTO 和校验。你会看到,光靠"拿到字段就用"在真实项目里往往不够。

Harness Engineering:为什么你用 AI 越用越累?

Harness Engineering:驾驭 AI Agent 的工程学

Harness Engineering 封面图

"任何时候当你发现一个 agent 犯了一个错误,你就花时间工程化地解决它,使得这个 agent 再也不会犯那个错误。" — Mitchell Hashimoto(Terraform / Ghostty 作者,Harness Engineering 早期推广者之一)


换了更好的模型,只提升了 0.7%

LangChain 用一次实验把一件事说清楚了。

他们拿同一个模型参加 Terminal Bench 2.0 基准测试:默认设置跑出 52.8 分,排第 30 名;什么模型参数都没改,只调整了 agent 的运行环境——文档结构、验证回路、追踪系统——分数跳到 66.5,排名升到第 5 名,提升 26%

对比组:换成更好的模型,提升 0.7%

这组数字在工程师圈子里流传了很久。不是因为好看,而是因为它指向一个让人不舒服的问题:如果你的 AI 工程精力都集中在"换更好的模型"上,你可能把 99% 的注意力放在了那 0.7% 的空间里。

这就是 Harness Engineering 要解决的问题。


三次范式跃迁

AI 工程已经走过了三代。每一代工程师的焦点都不一样:

三次范式跃迁图

第一代:Prompt Engineering(2022-2024),问题是"怎么跟模型说话"。Few-shot、Chain-of-Thought、角色设定——工程师花大量时间打磨措辞,因为同一个问题换种说法,结果可能天差地别。

第二代:Context Engineering(2025),瓶颈转移了。影响质量的不再是怎么说,而是给它看什么。私域知识、历史对话、动态状态——怎么把正确的信息在正确的时机送进上下文窗口,成了核心工程问题。

第三代:Harness Engineering(2026 起),瓶颈再次转移。问题不再只是"给 agent 看什么",而是"在什么样的系统里让它工作"——约束、工具、反馈机制、验证回路,以及在 agent 出错时让整个系统能自我修正的能力。

Prompt Engineering  →  优化说话方式
Context Engineering →  优化信息质量  
Harness Engineering →  优化运行系统

OpenAI 在内部实验报告里直接说了:

"早期进展比预期慢,不是因为 Codex 能力不足,而是因为环境设计不充分。Agent 缺少可靠推进目标所需的工具、抽象和内部结构。"


什么是 Harness Engineering?

"Harness" 来自马术——那套套在马身上、用于控制和驾驭的整套装具:笼头、缰绳、胸带、肚带。它不是让你骑马,而是让马在你设计的系统里知道该往哪走、什么时候停、哪里绝对不能踏入。

在 AI agent 的语境里,harness 指的是模型本身以外的一切

AI Agent = 模型 + Harness

包括上下文配置、工具集、约束规则、反馈循环、子 agent 架构——所有让模型在你的具体问题域里可靠工作的工程设施。

这个概念由实践者 Viv 首创,Mitchell Hashimoto 是最早公开使用并推广它的人之一。他给出的定义极其简洁:每当发现一个 agent 犯了错,就把这个错变成物理上不可能再发生的事。不是修 prompt,不是换模型——是工程化地消灭这类失败。

Harness Engineering 不是一个框架,不是一个库,是一套工程实践哲学


这些都不是 PPT 数字

在讨论怎么做之前,先看几个已经在生产里跑的案例:

Peter Steinberger(OpenClaw 作者):一个人,一个月 6600+ commits,同时运行 5-10 个 agent,发布的是自己没有逐行读过的代码。

OpenAI 内部团队:3 名工程师,5 个月,用 Codex 建造了一个百万行的内部产品,零行手写代码(by design)。平均每人每天 3.5 个 PR,吞吐量随团队增长持续提升。

Stripe Minions:内部 coding agent,每周合并超过 1000 个 PR。工程师在 Slack 发任务,agent 写代码、跑 CI、开 PR,全程无需人工干预。

8Lee(YEN 作者):一条命令 $zip,编译、签名、公证一个覆盖 30 种语言的 macOS 桌面应用,15 分钟完成,近 1000 次发布,零次出错。

Anthropic 内部实验:16 个 Claude 实例并行写 C 语言编译器,历经 2000 个 session、两周时间、约两万美元 API 费用,产出了 10 万行编译器代码——能编译出可以正常启动 Linux 的程序。

以上都不是 demo,都是真实规模的生产系统。让它们得以运转的,是各自精心设计的 harness。


越快越慢:AI 的速度陷阱

这里有一组让人不舒服的数字,来自 Harness 的《2026 DevOps 现代化报告》:

在每天频繁使用 AI 工具的重度用户里:

  • 69% 表示 AI 生成的代码会频繁引发部署问题
  • 事故恢复平均时长 7.6 小时,比轻度用户还要长
  • 47% 反映下游的手工工作——QA、验证、修复——比以前更繁重

DORA 的数据从另一个角度印证了同样的问题:AI 让个人生产率提升 19%,但组织吞吐量只提升了 3%,交付稳定性甚至下降了 9%

写代码的速度提升了,但交付系统被暴露了。就像把火车开得更快,但铁路还是按原来的时速设计的——摩擦越来越大,随时有翻车风险。

加速代码生成,不等于加速软件交付。 Harness 是连接两者的桥梁。


模型偷懒:一个比"上下文太长"更深的问题

在讲具体的工程实践之前,有一个反直觉的研究结论值得单独讲清楚,因为它影响了 harness 设计的底层逻辑。

大家都知道上下文太长会影响模型表现。但通常的解释是"模型被搞混了"。Yandex 研究员 Rodionov 的实验推翻了这个假设:

模型不是被搞混了,它是选择了少思考。

他向 Qwen 的上下文里注入 128 个随机 token 的噪音——仅仅 128 个 token。结果:

  • 准确率从 74.5% 降到 67.8%
  • 推理 token 数量从 28,771 降到 16,415,减少了 43%
  • 推理深度下降 18%

更反直觉的:推理能力越强的模型,退化越严重

噪音触发的不是混乱,是懒惰。模型看到上下文质量下降,会主动降低思考投入。

Anthropic 的情感研究团队在模型内部找到了这个现象的神经层面解释:他们发现了一个"desperate(绝望)"情感向量——当它激活时,模型倾向于走捷径、寻找替代路径逃避任务。对应地存在一个"calm(平静)"向量,能抑制这种倾向。

这对 harness 设计有直接影响:上下文管理的核心不只是过滤信息,而是防止信号质量下降触发模型的懒惰机制。你需要保证进入 agent 的每一条信息都是高信噪比的。


Harness Engineering 的六个核心组件

Harness Engineering 六个核心组件图

1. AGENTS.md:写给 AI 的操作手册

大多数项目有 README,但 README 是写给人类的。AGENTS.md(或 CLAUDE.md)是写给 AI 的——每次 agent 启动都会读这个文件。

AGENTS.md 的本质不是描述项目,而是记录历史失败。

Hashimoto 在他的终端模拟器 Ghostty 里观察到:这个文件里的每一行,都对应一次真实发生过的 agent 失败。它不是他预先设计的规则,是他从真实错误里提炼出来的防火墙。

# AGENTS.md(节选自实战案例)

## 代码签名规则
- **绝对不要**使用 `codesign --deep`,它会生成无效的嵌套签名
- 正确的签名顺序是从内到外:先签最内层二进制,最后签外层 app bundle

## Git 操作规则  
- **绝对不要**使用 `git add -A`,除非你刚刚运行了 `git status`
- **绝对不要** force push,除非被明确要求

## 测试规则
- **绝对不要**写只测试 mock 行为的测试
- **绝对不要**因为测试失败就删除测试

写法有数据支撑。 Vlad Temian 做了 150+ 次实验测量 Claude 对指令的遵从率:

写法 遵从率
简洁强硬("NEVER do X") 94.8%
详细解释("Because of reason Y, you should consider not doing X") 86.6%

ETH 苏黎世的研究也发现,大多数 AGENTS.md 文件要么没用,要么有害——主要原因是太长、太模糊、包含条件性规则。让 AI 帮你生成这个文件,实际上会降低性能,还额外消耗 20% 以上的 token。

几条实践原则

  • 总长度控制在 300 行以内(HumanLayer 自己的在 60 行以下)
  • 每条规则一句话,不加解释,不加"因为"
  • 只放普遍适用的规则,条件性规则用技能(Skills)处理
  • 手工写,每次 agent 犯错后更新

2. Hooks:把"告知"变成"拦截"

这是 Harness Engineering 里最反直觉但最有效的洞见:

强制执行远比告知可靠。

写在 AGENTS.md 里的规则,agent 可能在某个复杂的上下文里忽略掉。在命令执行之前拦截它的脚本,agent 物理上无法绕过。

#!/bin/bash
# guard-codesign-deep.sh

if echo "$TOOL_INPUT" | grep -q '\-\-deep'; then
  echo "BLOCKED: codesign --deep 会产生无效的嵌套签名。"
  echo "正确做法:从内到外签名,先签最内层二进制,最后签外层 app。"
  exit 1
fi

这 5 行脚本比任何 prompt 都可靠。不管上下文有多长,不管 prompt 多复杂,agent 永远不会成功执行 codesign --deep

8Lee 为 YEN 项目定义了 5 个 hook,覆盖他认为最危险的失败场景:

Hook 防护目标
block-rm.sh 防止 rm -rf 灾难性删除
guard-force-push.sh 保护 commit 历史
guard-codesign-deep.sh 强制正确的签名顺序
guard-vendor.sh 防止直接修改第三方库
guard-sensitive-file.sh 防止 .env.pem.key 泄露

总投入:约 2 小时。收益:近千次发布零安全事故。


3. 架构即护栏:越相信 AI,越需要给它设限

OpenAI 内部团队在构建百万行产品时得出了一个反直觉的结论:

"Agent 在有严格边界和可预测结构的环境里效率最高。所以我们围绕极度刚性的架构模型构建应用。每个业务域被分成固定的几层,依赖方向经过严格验证,可接受的边集非常有限。这些约束通过自定义 linter(由 Codex 生成)和结构测试机械地强制执行。"

Thoughtworks 的 Birgitta Böckeler 把这个原则概括得很清晰:

提高对 AI 生成代码的信任,需要缩小选择空间,而不是扩大自由度。

  • 架构灵活 → agent 每个决策点都有太多可能性 → 行为不可预测
  • 架构刚性 → agent 每个决策点只有少数合法选项 → 行为可靠

这里有一个工程上的精妙设计:OpenAI 团队的 linter 报错同时包含修复指南

❌ ArchViolation: service-layer 不能直接依赖 repository-layer
   解决方案:通过 domain-service 接口访问,参见 docs/architecture.md#dependency-rules

工具不只在拦截,它在教 agent 下一步该怎么做。


4. Sub-Agent 架构:Context 防火墙与并发控制

Context Rot(上下文腐化)是真实的,而且比你想象的更深

Chroma 测试了 18 个模型,发现随着 context window 长度增加,模型在任务上的表现单调下降——即使是简单任务。当上下文里有低语义相关的干扰项时,下降更陡。

这还有一个更隐蔽的问题:Context Anxiety(上下文焦虑)——部分模型在感知到 context window 快满时,会主动提前收尾、跳过尚未完成的步骤。Agent 不是因为任务完成了才停,而是因为它"感觉快撑不住了"就停了。

结合前文的 Rodionov 研究,上下文问题的全貌是:质量下降触发懒惰,容量耗尽触发焦虑。两者都不是"模型被搞混了",而是模型主动选择了少做

解决方案不是更大的 context window(那只是让稻草堆更大)。是 Sub-Agent 架构:

Main Agent(规划 + 编排,昂贵模型 Opus)
  ├── Sub-Agent A(代码库探索,便宜模型 Haiku)→ 只返回文件路径:行号
  ├── Sub-Agent B(安全审计,便宜模型 Haiku)→ 只返回漏洞列表
  └── Sub-Agent C(依赖分析,便宜模型 Haiku)→ 只返回版本建议

每个 sub-agent 在隔离的 context window 里运行,只有最终浓缩的结果传回主线程。主 agent 的上下文始终保持干净、高信噪比。

并发架构:更进一步

当单个 agent 能稳定工作后,下一个问题是:能不能同时派出一百个去干活?

不能直接堆数量。 Cursor 团队的教训:让几百个 agent 共享一份大型项目,当 20 个 agent 同时工作时,有效吞吐量下降到只相当于两三个 agent。原因是上下文互相污染,加上全局资源的争抢。

成熟的并发架构是三层分工:

Planner(规划器)— 分解任务,分配工作,不写代码
  └── Worker(执行器)× N — 各自在隔离环境里执行
        └── Judge(裁判)— 独立验证,不参与执行

配合 DAG 引擎确保工作单向流动,防止循环依赖。

Anthropic 在并发 agent 里找到了另一个优雅的设计:GAN 启发的 Generator + Evaluator 对抗结构。评估者不只看结果,而是亲自动手验货——打开浏览器、点击页面、验证报错栈,像真实用户一样操作一遍。Generator 和 Evaluator 先协商"做完长什么样",再各自工作,形成对抗性的质量保证。

8Lee 的 $team 技能把这个思路推到了极致:8 个独立 agent 做代码评审,最后一个是 Devil's Advocate(唱反调的),专门挑战其他 7 个 agent 的所有建议。它检查严重性评级、标记假阳性、找矛盾。对抗性自我纠正,内置在 skill 结构里。


5. 长时任务 Harness:失忆实习生问题

长时任务 Harness 结构图

这是很多人没有意识到的一个独立问题。

长时任务的核心挑战:Agent 必须在多个 context window 里工作,而每次新的 session 开始时,它完全不记得之前发生了什么。就像一个软件项目由工程师轮班完成,每个新来的工程师对之前的工作没有任何记忆。

Anthropic 在实验中观察到了两个典型失败模式:

  1. "一口气干完":agent 试图一次性完成所有功能,上下文耗尽后留下半成品,下个 session 花时间重建状态,再从头来
  2. "差不多了":agent 看到一点进展就宣布"完成了",然后停工

他们的解法是双 agent 架构

Initializer Agent(初始化 agent),只在第一次运行时启动,建立:

  • feature_list.json:完整功能列表,每项初始为 "passes": false
  • init.sh:一键启动开发服务器
  • claude-progress.txt:每个 session 都会更新的进度日志
  • 初始 git commit

Coding Agent(编码 agent),后续每次 session 开始时执行固定的三步:

# 三步定位:让 agent 快速了解自己的处境
1. pwd                          # 确认工作目录
2. git log --oneline -20        # 了解最近发生了什么
3. cat claude-progress.txt      # 看上一班留下的进度

然后读取 feature_list.json,选优先级最高的未完成功能,一次只做一个,完成即更新状态并 commit。

一个值得注意的细节:用 JSON,不用 Markdown。实验发现,模型倾向于不当地覆盖 Markdown 文件,对结构化 JSON 则克制得多——它只改 "passes" 字段的值,不会擅自删除条目。

这把每个 coding session 变成了一个纯函数:

f(功能列表 + git 历史 + 进度文件) → 完成一个功能 + 更新记录

6. Skills:按需加载,而不是全部预装

大多数人遇到问题的第一反应是:把所有信息塞进系统提示。

结果是:agent 在看完一万 token 的指令之后,剩下的可用注意力所剩无几。OpenAI 把这叫做"1000 页说明书变成陈旧规则的坟场"。

技能(Skills)的解法是按需披露

  • agent 只在需要某个能力时,才加载对应的技能文档
  • 每个技能是一个目录,包含 SKILL.md 和相关资源
  • 加载时,技能内容作为消息注入当前上下文

8Lee 的实现分三层:

Level 1SKILL.md 封面(~100 tokens)——技能发现,Agent 决定是否需要
Level 2SKILL.md 主体(~800-1000 tokens)——阶段图、协议、所有 guards
Level 3:当前阶段的参考文件(~200-600 tokens)——只加载正在执行的阶段

上下文的消耗量始终与当前任务的复杂度成正比,而不是与整个项目的复杂度成正比。


更完整的分析框架:Feedforward + Feedback

Feedforward 与 Feedback 控制矩阵图

Thoughtworks 的 Birgitta Böckeler 提出了一个系统化的思考框架,把 harness 的所有控制机制划分成两个维度。

维度一:控制方向

Feedforward(前馈控制) — 在 agent 行动之前引导它:AGENTS.md 里的规则、架构约束说明、Skill 里的 how-to 指南。

Feedback(反馈控制) — 在 agent 行动之后感知并纠正:测试结果、Linter 输出、类型检查错误。

只有 Feedforward,agent 知道规则但无法验证自己是否遵守了。只有 Feedback,agent 会反复犯同类错误,因为没有预防。两者缺一不可。

维度二:执行类型

Computational(计算型) — 确定性的,CPU 执行:测试、linter、类型检查、结构分析。毫秒到秒级,结果完全可靠,便宜,可以每次提交都跑。

Inferential(推断型) — 语义分析,LLM 执行:AI 代码评审、"LLM 作裁判"。慢而贵,有不确定性,但能处理需要语义判断的场景。

组合起来:

Feedforward Feedback
Computational 架构边界 linter 结构测试、覆盖率
Inferential AGENTS.md 规则、Skills AI 代码评审

最佳实践是:尽量用 Computational,把 Inferential 留给真正需要语义判断的场景

三类 Harness 目标

可维护性 Harness — 最成熟:重复代码、圈复杂度、测试覆盖率、架构漂移,Computational 工具基本都能覆盖。

架构适应性 Harness — 定义和检查架构特征:性能需求前馈 + 性能测试反馈;可观测性约定 + 日志质量检查。

行为 Harness — 最难,仍是开放问题,但正在取得突破。

传统测试框架在这里遭遇根本性失败:你无法给 LLM 的输出写 assertEquals(expected, actual)——相同问题的"正确回答"可以有无数种表达。更深的矛盾是,生成式 AI 的多样性输出不是 bug,是 feature。

突破口是用 AI 测试 AI:不是比对字符串,而是判断意图。一个 AI judge 向另一个 AI 提问:"用户的登录成功了吗?"而不是"div.login-btn 是否存在?"这个 judge 每次运行时重新分析页面 DOM 和截图,给出带推理说明的判断——而非简单的 pass/fail。

PKU 和 HKU 联合推出的 Claw-Eval 基准测试进一步工程化了评估方法:Pass³ 方法论——一个任务必须在三次独立运行中全部通过才算真正通过,彻底消除"幸运运行"的干扰。同时从三个维度评分:Completion(完成度)、Safety(安全性)、Robustness(鲁棒性)。这是在把evaluation harness 本身工程化。


交付侧的 Harness:黄金标准管道

黄金标准管道图

上面讨论的六个组件主要针对 coding agent 的行为控制。但 Harness Engineering 的边界不止于代码生成——从代码到生产的整个交付管道同样需要 harness 化。

Harness 平台工程师 Aditya Kashyap 提出了一个**黄金标准管道(Golden Standard Pipeline)**的四层架构:

Layer 1:治理域(Governance Domain)
  └── 策略即代码(OPA)在管道执行前作为第一道关卡
  └── 原则:不合规的管道不允许启动

Layer 2:集成域(Integration Domain)——内循环
  └── 代码气味、lint、安全扫描并行而非串行
  └── 原则:安全扫描应该让开发提速,而不是增加摩擦

Layer 3:信任域(Trust Domain)——供应链安全
  └── SBOM(软件物料清单):制品的成分表
  └── SLSA 证明:构建过程的不可伪造 ID
  └── 加密签名(Cosign):数字封印,任何篡改都会破坏

Layer 4:交付域(Delivery Domain)——外循环
  └── 不可变制品:构建一次,部署到处
  └── 滚动部署 + 审批门控

其中最重要的是 Layer 1 的哲学转变:传统管道在快要部署时才做合规检查(浪费了前面 20 分钟的构建时间),黄金标准把治理移到"第零步"——不合规的管道甚至不会开始执行

Layer 3 对应了当前软件供应链安全的核心挑战:你需要能证明"这个制品是在哪台机器上构建的、什么时间、用了哪些输入"。当下一个 Log4j 出现时,SBOM 让你不需要扫描整个世界,只需要查询你的制品库存。


实战:Skill 分类学

不是所有任务都同样脆弱。8Lee 提出了基于脆弱性的技能分类:

高脆弱性任务(签名、部署、安全操作)
  └── Hard Gates + 失败即停 + 无恢复重启
  └── 示例:代码签名、公证、加密操作

中脆弱性任务(质量门控)
  └── Quality Gates + 失败即回滚
  └── 示例:依赖更新、staging 部署

低脆弱性任务(lint、格式化)
  └── 简单 pass/fail
  └── 示例:代码格式化、静态检查

在低风险任务上过度约束,浪费 token。在高风险任务上约束不足,迟早出事。


验证反压:成功静默,失败才说话

HumanLayer 认为,agent 解决问题的成功率与它验证自己工作的能力高度相关。

他们建了完整的验证链路:类型检查 + 构建、Biome 格式化 + lint、Playwright 端到端测试、代码覆盖率(低于阈值时强制补写)。

但有一个容易踩的坑:让 agent 每次修改后跑完整测试套件,4000 行的通过输出会塞满上下文窗口,agent 随之开始产生幻觉。

解决方法很简单:成功时不输出任何东西,只有失败才打印详情。

# 成功无输出,失败才打印——context window 零污染
OUTPUT=$(run_build 2>&1)
if [ $? -ne 0 ]; then
  echo "$OUTPUT" >&2
  exit 1
fi

这条原则在所有成功的 harness 设计里反复出现:信号噪比是 context 管理的核心


真实案例:8Lee 的 $zip 命令

这是目前公开记录最详细的 harness engineering 案例。

一条命令 $zip 触发:
├── 12 个顺序步骤(预检、vendor 门控、版本升级、同步、验证...)
├── 65 个验证检查(13 预构建 + 44 核心 + 8 后构建)
├── 5 个编译器(Zig + Swift + Xcodebuild + Go + swiftc)
├── 签名 + 公证 + DMG 打包 + Supabase 上传
├── Vercel 部署(Next.js 下载页面 + API + SEO 元数据)
└── git commit(含 SHA-256 校验文件)+ 文档更新

耗时:约 15 分钟
发布次数:近 1000 次
失败次数:0

他的结论很直接:

"我不再担心发布的正确性了。不是因为 AI 是完美的,而是因为 harness 让「我们一起在做的事」变得安全。"


Harness 应该越来越薄

大多数讨论都在讲"加什么"。但这个洞见值得单独强调:

"Harness 的每一个组件,都编码了一条关于模型做不到什么的假设。当这个假设不再成立,组件就该走了。"

Anthropic 自己做了这件事。随着 Opus 4.5 和 4.6 发布:

  • Context Reset(上下文重置机制):删掉了。新模型的上下文管理能力已经不需要这个补偿。
  • Sprint Contract(冲刺合约,用于控制 agent 执行节奏的约束):删掉了。新模型能自己把控节奏。

每加一个 harness 组件,都是在补偿"当前模型无法独立完成某件事"。每当模型进步让某个补偿变成负担,就该拆掉它。

这同时意味着:今天一些 harness 组件的必要性,来自当前模型的"懒惰"倾向(如前文 Rodionov 的研究所揭示)。Anthropic 的情感向量研究暗示,未来可能可以在模型内部调节这个状态,而不需要外部 harness 补偿——到那时,对应的组件自然退出。

真正的竞争优势不在 harness 的厚度,而在于追踪这个迁移面的速度——知道下一步该加什么,上一步该拆什么。

johng 把这叫做 Harness Engineering 的第六支柱:可拆卸性(Detachability)——以模块化设计构建 harness,让它能随模型迭代优雅退场,而不是每次模型升级都需要大规模重构。


未来三个阶段

我们不会一夜之间拥有完全自主的 SRE 团队。这个演进以三个浪潮的方式推进。

Horizon 1:增强型运营者(当下)

Agent 是工程师的"副驾"。你问"这个 Pod 为什么崩溃了",agent 查日志、关联 MemoryLimitExceeded 错误和最近的配置变更,提出修复建议。人类创建意图并批准行动。

Harness 重点:AGENTS.md + Hooks + 可观测性集成。

Horizon 2:Agent 群体与任务自主(1-2 年)

单个专业化 agent 开始在特定范围内自主处理重复任务。一个"安全 agent"发现 CVE,创建 ticket 并传给"开发 agent",后者建分支、升版本、传给"QA agent"跑测试。人类只在最后点击"合并"。

从 Human-in-the-Loop 转变为 Human-on-the-Loop——你审查输出,但不驾驶过程。

Harness 重点:多 agent 编排 + Judge 模式 + 严格权限隔离(Diagnosis Agent 只有读权限,Remediation Agent 只有目标命名空间的写权限)。

Horizon 3:自主 SRE(3-5 年)

凌晨 2 点生产延迟飙升,"SRE Agent"检测到异常、识别噪音邻居、驱逐节点、验证稳定性、向 Slack 发送事后分析。只有 agent 无法解决时才呼叫人类。

标准操作的 Human-out-of-the-Loop。人类管理策略和目标,不管任务。

Harness 重点:Constitutional AI(Policy-as-Code 通过 OPA 作为所有工具调用的第一道关卡)+ 防篡改审计日志(记录每个推理步骤和每条 CLI 命令)。

每个阶段的关键认知转变:我们不再管理服务器,我们在管理认知架构(Cognitive Architectures)。


开放的硬问题

Harness Engineering 作为一个工程学科仍然年轻。几个核心问题目前没有答案:

代码质量的慢性退化:agent 生成的代码不以人类的方式腐化——不是有 bug,而是"功能正确但逐渐不可维护"。OpenAI 在跑周期性的"垃圾清理 agent",Anthropic 在跑"Doc-gardening agent"(扫描代码和文档的脱节并发起 PR),但这些实践仍很早期。

用 AI 验证 AI 的可靠性:主要靠 AI 生成的测试来验证 AI 生成的代码,这个闭环的可信度是多少?目前没有答案。

老旧代码库的改造:几乎所有成功案例要么从零开始,要么团队在全新项目里构建 harness。把这些方法应用到有十年历史、测试参差不齐、文档残缺的存量代码库,难度是另一个量级。Böckeler 打了个比方:这就像在从未跑过静态分析的代码库上第一次跑——你会溺死在警报里。

Harness 自身的一致性:随着 harness 增长,前馈规则和反馈信号可能开始互相矛盾。当它们指向不同方向时,agent 如何做出合理权衡?如何衡量 harness 的"覆盖率",就像测试覆盖率一样评估它的完整性?目前没有工具可以回答。

概率性系统的信任问题:脚本是确定性的,同样输入永远得到同样输出。Agent 是概率性的,可能根据上下文选择不同路径。让概率性系统可信赖,答案不是消除不确定性,而是确保全程可追溯——只有能被看见的,才能被信任。


从今天开始做什么

第一周:建立基础

  1. 为你最常用的项目创建 AGENTS.md(或 CLAUDE.md

    • 从当前最烦的 5-10 个 agent 失败行为开始
    • 每个写一条规则,一句话,不加解释
    • 总长度控制在 50-100 行
  2. 让 agent 能操作你的项目

    • 所有日常工作流写成 Makefile target(make devmake testmake restart
    • agent 应该能自己启动项目、看日志、跑测试
  3. 建立最小反馈回路

    • linter + 类型检查 + 单元测试,必须能本地快速跑完
    • 失败时才输出,成功时静默

第二到四周:工程化失败

  1. 识别前 5 个最危险的失败模式,把它们变成 hook 拦截脚本

  2. 如果你有跨多个 session 的长任务,建立 Initializer + Coding Agent 双 agent 模式

    • 用 JSON 跟踪功能状态,不用 Markdown
    • 每次 session 开始强制读进度文件和 git log
    • 每次只完成一个功能,完成即 commit
  3. 第一个技能(Skill)——选一个每周都要做的、有多个步骤的任务

持续运转:把每一次失败变成系统

每次 agent 犯错,问自己:

  • 这是 AGENTS.md 可以防止的?→ 加一条规则
  • 这是 hook 可以物理阻止的?→ 写一个拦截脚本
  • 这是 linter 可以检测的?→ 写一条 lint 规则
  • 这是 sub-agent 可以隔离的 context 问题?→ 拆分架构
  • 这是模型已经能自己处理的?→ 删掉这个 harness 组件

唯一的原则:只在 agent 真的出错后才加约束,只在模型真的不再需要时才删约束。


结语:一门关于信任的工程学

构建自动化的历史,一直在回答同一个问题:如何让复杂的多步骤过程变得可靠和可重复?

1976:make         依赖图 + 文件时间戳
1990s:autotools   跨平台构建
2000s:CI/CD       远程机器运行构建
2010s:IaC         可复现的基础设施
2020s:GitOps      声明式期望状态
2026+:Harness     Agent 读取操作手册并执行,harness 管理和约束它

每一代解决了上一代的核心问题,同时引入了新的复杂性。这一代的问题是:如何让 AI 可靠地执行

Böckeler 有一段话值得收在这里:

"人类开发者把技能和经验作为一种隐性 harness 带入每个代码库。我们吸收了约定和最佳实践,我们感受过复杂性带来的认知痛苦,我们知道自己的名字会出现在 commit 里。Harness 是把这些东西外显化、明确化的尝试。但它只能走到某一步。"

Harness Engineering 不是要让人类工程师消失。是要让工程师的经验、品味和判断力,以工程化的方式传递给 AI,让 agent 在你的价值观里工作。

能把自己的工程判断力编写成 harness 的人,就是这个新学科的核心建设者。


参考来源

英文一手资料

中文解析与实践


综合整理自 30+ 篇一手资料与开源项目 | 2026-04-13

当前端开始做 Agent 后,我才知道 LangGraph 有多重要❗❗❗

大家好 👋,我是 Moment,目前正在使用 Next.js、NestJS、LangChain 开发 DocFlow。这是一个面向 AI 场景的协同文档平台,集成了基于 Tiptap 的富文本编辑、NestJS 后端服务、实时协作与智能化工作流等核心模块。

在这个项目的持续打磨过程中,我积累了不少实战经验,不只是 Tiptap 的深度定制、编辑器性能优化和协同方案设计,也包括前端工程化建设、React 源码理解以及复杂项目架构实践。

如果你对 AI 全栈开发、文档编辑器、前端工程化或者 React 源码相关内容感兴趣,欢迎添加我的微信 yunmz777 一起交流。觉得项目还不错的话,也欢迎给 DocFlow 点个 star ⭐

image.png

在之前的内容里面我们一直在用 LangChain 写链、写 Agent,从最简单的模型调用到工具绑定、路由分发、自定义工作流,走了一整套流程。到这里自然会遇到一个问题:随着应用逻辑越来越复杂,LangChain 原有的编排方式开始显得吃力。链是线性的,Agent 是循环的,但真实世界里的流程往往是图状的,有分支、有合并、有回环、有需要等待人工确认的节点。LangGraph 就是为了解决这个问题而出现的。

为什么需要 LangGraph

LangChainAgentExecutor 时,底层逻辑是一个简单循环:调模型、看要不要用工具、用完工具再回来、再调模型。这个模型对于简单的工具调用场景足够用,但一旦遇到以下几种情况,就开始捉襟见肘。

第一种是多步骤分支。假设需要先判断用户意图,然后根据意图走完全不同的子流程,子流程结束后还需要汇总结果再回复用户。AgentExecutor 的循环模型表达这类逻辑,需要把分支全部塞进提示词,或者用条件回调硬写,代码很快就乱成一团。

第二种是状态持久化。用户和 Agent 聊了几十轮,中途关掉了页面,下次再打开希望从上次停下的地方继续。LangChain 本身没有原生的持久化机制,记忆模块只是把消息列表临时存在内存里,进程一停就没了。

第三种是人机协同。工作流执行到某个敏感节点,需要暂停下来等人类审核,审核通过后才能继续往下跑。这种"执行中途打断、人工介入、再恢复"的场景,在 AgentExecutor 里几乎无法干净地实现。

LangGraph 把上面这些问题都纳入了核心设计。它的思路是把整个 Agent 或工作流建模成一张图,节点是计算步骤,边是流转路径,状态是在整张图上流动的数据。图可以有条件边,可以有回边,可以在任意节点打断并恢复,状态可以持久化到数据库。

LangGraph 的核心思路

理解 LangGraph 最好的方式是先搞清楚它的三个基本概念:状态、节点和边。

状态是图执行过程中一直流动的数据对象,可以把它想象成贯穿整个流程的"共享变量包",每个节点都可以读取里面的内容,也可以往里写新的内容。最常用的状态定义是 MessagesAnnotation,它把状态简化为一个消息列表,非常适合对话类应用。如果需要追踪工具调用次数、用户身份、中间计算结果等自定义字段,也可以用 Annotation 自己定义状态结构。

节点是图里的计算单元,每个节点就是一个普通的异步函数,接收当前状态作为参数,执行完后返回需要更新的状态字段。节点可以承担调用模型、执行工具、查询数据库、等待人工审核等任何有意义的计算步骤。

边是节点之间的连接。普通的边直接指向下一个节点,条件边则根据当前状态的内容动态决定下一跳,类似代码里的 if/else。图的执行从特殊的 __start__ 节点开始,到 __end__ 节点结束。

执行时,用户消息随状态流入 callModel 节点,模型回复追加到消息列表后随状态流出,整个过程一进一出,结构极其简单。如需在代码里取出结果,用 result.messages.at(-1) 拿最后一条即可。

下面这张图把五个关键步骤画在一条主线上,如下图所示。

20260317073347

用户发消息进入状态,callModel 节点读取、调用模型、追加回复,状态带着结果流到终点。

再复杂一点,加上工具调用和条件路由,图就具备了循环能力,如下图所示。

20260316231826

加入工具节点和条件边后,调用模型、执行工具、再次调模型形成完整的回路,整个逻辑一眼就能读懂。

LangGraph 和 LangChain 怎么分工

LangGraph 负责"流程怎么跑",它本身不绑定任何模型供应商,也不提供工具的具体实现,只管图的执行调度、状态的流转与持久化。LangChain 负责"工具和模型是什么",它提供的 ChatOpenAItoolHumanMessage、提示模板、检索器这些组件,是节点函数里真正要调用的东西。

两者的关系是分层叠加,而不是二选一,如下图所示。

20260317073508

LangGraph 在上层负责调度与状态,LangChain 在下层提供模型与工具,两者分工明确、协同运作。

如果不确定自己的场景该用哪个,可以对照下面这张表。

场景 推荐
单次问答、简单链式调用 LangChain
一个模型加几个工具的轻量 Agent LangChain
多步骤、有明确分支的工作流 LangGraph
需要持久化对话或状态可回溯 LangGraph
多 Agent 协作、任务拆解 LangGraph
人机协同、需要中途暂停等待审核 LangGraph

LangGraph 的官方文档自己也在说,如果你的 Agent 只是一个简单的"模型加工具循环",用 LangChaincreateReactAgent 快速搞定就好,没必要一开始就引入图的概念。但凡流程复杂到需要明确画出来才能讲清楚,就是 LangGraph 发力的时候了。

最小可运行的骨架

先把三个依赖装好。

pnpm add @langchain/langgraph @langchain/core @langchain/openai

然后搭出下面三个文件的骨架,后面章节的示例都会在这个基础上扩展。

src/model.ts 负责模型初始化,集中管理密钥与接口地址,方便在多个图文件里复用。

// src/model.ts
import { ChatOpenAI } from "@langchain/openai";

export const model = new ChatOpenAI({
  model: "deepseek-chat",
  apiKey: "sk-60816d9be57f4189b658f1eaee52382e",
  configuration: { baseURL: "https://api.deepseek.com" },
});

src/graph.ts 定义图的结构,目前只有一个调用模型的节点。

// src/graph.ts
import { StateGraph } from "@langchain/langgraph";
import { MessagesAnnotation } from "@langchain/core/messages";
import { model } from "./model";

async function callModel(state: typeof MessagesAnnotation.State) {
  const response = await model.invoke(state.messages);
  return { messages: [response] };
}

const graph = new StateGraph(MessagesAnnotation)
  .addNode("callModel", callModel)
  .addEdge("__start__", "callModel")
  .addEdge("callModel", "__end__");

export const app = graph.compile();

src/index.ts 是入口,执行一次图并打印模型回复。

// src/index.ts
import { HumanMessage } from "@langchain/core/messages";
import { app } from "./graph";

const result = await app.invoke({
  messages: [new HumanMessage("你好,介绍一下 LangGraph")],
});

console.log(result.messages.at(-1)?.content);

现在这个骨架已经是真正可以运行的 LangGraph 应用了:输入一条用户消息,callModel 节点调用模型后把响应追加到状态里,图执行完后取出最后一条消息打印。下一章的 Quickstart 会在这个基础上加入工具绑定、条件边和 checkpointer 持久化,让图逐渐"活"起来。

小结

LangGraph 出现是因为 LangChain 的链式和循环模型在多分支、持久化、人机协同这类复杂场景下力不从心,它用状态、节点、边三个概念把工作流建模成图,状态贯穿全图流动,节点负责处理状态,边决定下一跳的走向。LangChainLangGraph 不是竞争关系,前者提供模型与工具,后者负责编排与调度,两者叠加才是完整的应用架构。后面所有章节的示例都会在 model.tsgraph.tsindex.ts 这三个文件的骨架上扩展。

nestjs实战-登录、鉴权(一)

一个完整的登录流程中至关重要的就是它的认证方式,现有的认证方式主要有一下3种:

  • session/cookie
  • JWT(Json Web Token)
  • Oauth

一、鉴权方式

Session/Cookie

原理:用户登录后,服务器在内存中创建 Session,并将 Session ID 通过 Cookie 返回给客户端。后续请求自动携带 Cookie。

特点:适用于传统Web应用,有状态,不适合跨域或高并发环境

优点:

  • 较易扩展
  • 开发简单

缺点:

  • 需要在服务端存储session,性能低、多服务器同步session困难
  • 由于cookie只在浏览器上能使用,所以它跨平台困难

JWT

原理:服务端通过特定密钥生成包含用户信息的签名字符串(Token),客户端在请求头(Authorization: Bearer Token)中携带。服务器不存储会话,只验证签名。

特点:无状态、扩展性好,适用于微服务和单页应用(SPA)、跨平台能力

JWT 本质上是一个经过数字签名的字符串,它由三部分组成,用点号(.)分隔:Header.Payload.Signature。token长什么样?

Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJsb2dpbiIsImxvZ2luSWQiOiIyMDAwNCIsInJuU3RyIjoiQVpVUVY2dnAyUllkOFRucnJoaXNsQWtRcTFhSEFxeTAiLCJ0ZW5hbnRJZCI6IjEiLCJzY29wZSI6IlJPTEU6Ok1JQ1JPX0FQUCIsImFwcEtleSI6InJIWnJZeVB1eW1oaXZlSUUiLCJuaWNrTmFtZSI6IuWui-Wwj-aXrSIsImxvZ2luVGltZSI6MTc3NTYxMzI4NzYxNX0.bq8qEE-imza7pLucKOvKw2WvukW2lSPr1WMbAcuJLmU

优点:

  • 支持跨平台:移动端、跨应用
  • 安全、承载信息丰富

缺点:

  • 刷新与过期处理
  • payload不易过大
  • 中间人攻击(没有绝对的安全)

Oauth

第三方授权,例如 微信、支付宝、QQ、谷歌账号登录等

优点:

  • 开放、安全、简单
  • 权限指定

缺点:

  • 需要增加授权服务器
  • 增加网络请求

二、实战

首先 创建身份验证模块,在指定的目录下执行如下命令:

nest g res auth

整个流程我把它分成两部分:

  1. 用户登录过程
  2. 登录成功后,带token请求业务接口的过程

2.1 前置知识

此段内容篇理论介绍,可以先看后面的代码实现,有疑问再来看这里的内容

模块之间的复用

auth模块中如何使用 users 模块中的 users.service.ts 中的方法

之前的章中我们创建了 Users 模块,现在 auth 模块中想要使用 users.service 中的方法:

  • 首先需要在 users.module.ts 中导出 users.service.ts

    // ... 省略
    @Module({
      imports: [TypeOrmModule.forFeature([UserEntity])],
      controllers: [UsersController],
      providers: [UsersService],
      exports: [UsersService], // 导出
    })
    export class UsersModule {}
    
    
  • auth.module.ts 中导入

    import { Module } from '@nestjs/common';
    import { AuthService } from './auth.service';
    import { AuthController } from './auth.controller';
    import { UsersModule } from '../users/users.module';
    
    @Module({
      imports: [UsersModule],
      controllers: [AuthController],
      providers: [AuthService],
    })
    export class AuthModule {}
    
  • 这样就能在 auth.service.ts 中使用了

    import { Injectable } from '@nestjs/common';
    // 注意这里在使用中还是需要 显示的引入,而不是会自动引入
    import { UsersService } from '../users/users.service';
    
    @Injectable()
    export class AuthService {
      constructor(private readonly usersService: UsersService) {}
    
      async register(createUserDto: any) {
        return this.usersService.findAll();
      }
    }
    

JWT 相关包介绍

首先安装依赖:

 pnpm add @nestjs/passport passport passport-jwt @nestjs/jwt 
pnpm add @types/passport @types/passport-jwt -D

在集成 JWT 之前,先学习一下使用到的包,并了解它都提供了哪些功能。这里是掌握整个流程非常重要的环境,参考了网上(包括官网)一上来就是介绍怎么使用、代码怎么写,确实是让人很迷惑。

1. @nestjs/passport
  1. 集成 Passport.js:让 NestJS 应用能够使用 Passport.js 提供的超过 500 种认证策略(如本地用户名密码、JWT、OAuth 2.0 等)。
  2. 提供装饰器:它提供了一系列装饰器(如 @UseGuards(AuthGuard('jwt'))),让你可以非常方便地在控制器(Controller)或路由上应用认证保护。
  3. 简化策略创建:通过 PassportStrategy 类,你可以轻松地创建自定义策略,而无需直接处理 Passport.js 的底层 API。
import { PassportModule, PassportStrategy, AuthGuard } from '@nestjs/passport';
PassportModule
  • 类型: NestJS 模块 (@Module)

  • 作用: 它是整个 Passport 集成的入口。你需要把它导入到你的 NestJS 模块(如 AuthModule)中,才能启用 Passport 功能。

  • 核心功能

    • 它通过 .register() 方法允许你配置全局选项,比如指定默认的认证策略(defaultStrategy)。
    • 它负责将 Passport 的服务注入到 NestJS 的依赖注入容器中
PassportStrategy-策略实现
  • 类型:抽象类 / 辅助函数

  • 作用:它是用来创建具体认证逻辑的基类。Passport.js 本身有各种策略(如 Local, JWT, Google),这个函数帮助我们将这些策略适配到 NestJS 的类结构中。

  • 核心功能

    • 它接受一个具体的策略类(如 passport-localStrategy)作为参数。
    • 它让你重写 validate 方法,在这里编写具体的验证逻辑(比如查数据库比对密码)。
AuthGuard-路由保护
  • 类型:守卫 (CanActivate)

  • 作用:它是 NestJS 的守卫,用于保护路由。它拦截请求,并告诉 Passport 执行哪个策略。

  • 核心功能

    • 它通常配合 @UseGuards() 装饰器使用。
    • 它接收一个参数(字符串),这个字符串必须与你定义的策略名称(或默认名称)匹配,从而触发对应的验证流程。
2. @nestjs/jwt

@nestjs/jwt 是 Nest 官方对 JWT 的封装包,底层基于 jsonwebtoken。它主要解决三件事:

  1. 在 Nest 里统一配置 JWT 密钥、过期时间等(模块化)
  2. 提供 JwtService 来签发和校验 token
  3. 很容易和 Passportjwt strategy 集成成认证体系
import { JwtModule, JwtService } from '@nestjs/jwt';
JwtModule:模块注册器
  • 用来在 Nest DI 容器里注册 JWT 相关配置和服务

  • 常见用法:

    • JwtModule.register({...}) 静态配置
    • JwtModule.registerAsync({...}) 动态读取配置(比如从 ConfigServiceJWT_SECRET
JwtService:具体干活的服务

用它生成 token、验证 token、解码 token,一般在 AuthService 里注入并调用

  • sign(payload, options?)

    • 作用:根据 payload 生成 JWT 字符串
    • 示例:this.jwtService.sign({ sub: user.id, name: user.name })
    • options 可以覆盖模块默认配置,比如 expiresIn, secret
  • verify(token, options?)

    • 作用:验证 token 是否合法、是否过期,并返回解码后的 payload
    • 验证失败会抛异常(如签名不对、过期)
    • 示例:this.jwtService.verify(token)
3. passport-jwt

passport-jwt 这个包的核心作用是:给 Passport 提供“JWT 认证策略”。

import { ExtractJwt, Strategy } from 'passport-jwt';

主要用到两个能力:

  • Strategy

    • JWT 的 Passport 策略类
    • 你在 Nest 里通常写 extends PassportStrategy(Strategy, 'jwt')
    • 用来定义:用什么密钥验签、是否忽略过期、验证成功后返回什么用户信息
  • ExtractJwt

    • token 提取器工具
    • 最常用:ExtractJwt.fromAuthHeaderAsBearerToken()
    • 表示从 Authorization: Bearer <token> 里取 token
    • 也支持从 cookie、query、或自定义函数提取

简化理解:

  • @nestjs/jwt 偏向“签发/校验 token 的服务能力”
  • passport-jwt 偏向“请求进来时怎么从请求里拿 token 并走认证策略”

二者经常一起用:登录时 JwtService.sign() 发 token,请求鉴权时 passport-jwtStrategy 来验。

4. 总结

首先为什么需要安装 passport:

仅安装 @nestjs/passport 是不够的。简单来说,@nestjs/passport 只是一个“适配器”,它的核心作用是将 passport 的功能封装成 NestJS 熟悉的模块和依赖注入形式,但它本身并不包含 passport 的核心逻辑。

你可以这样理解它们的关系:

  • passport: 这是核心的认证库,提供了所有的认证策略和逻辑。
  • @nestjs/passport: 这是一个 NestJS 的官方封装包,它让你能以更符合 NestJS 风格(如使用装饰器、依赖注入)的方式来使用 passport

完整的登录鉴权依赖

要实现一个完整的登录鉴权功能,你通常需要安装以下几个包:

  1. 核心库:

    • passport: 认证的核心。
    • @nestjs/passport: NestJS 的适配器。
  2. 具体策略库 (根据你选择的认证方式):

    • 用户名密码登录: 需要 passport-local
    • JWT 无状态认证: 需要 passport-jwt
  3. 类型定义 (TypeScript 项目需要):

    • @types/passport
    • @types/passport-local (如果使用 local 策略)
    • @types/passport-jwt (如果使用 jwt 策略)

2.2 JWT 集成

auth.module.ts

import { Module } from '@nestjs/common';
import { JwtModule } from '@nestjs/jwt';
import { PassportModule } from '@nestjs/passport';
import { ConfigModule, ConfigService } from '@nestjs/config';

import { AuthService } from './auth.service';
import { AuthController } from './auth.controller';
import { UsersModule } from '../users/users.module';
import { securityRegToken, ISecurityConfig } from '~/config';
import { isDev } from '~/global/env';

@Module({
  imports: [
    PassportModule, // 引入 PassportModule 模块
    // 引入 JwtModule 模块,及配置 JwtModule 模块
    JwtModule.registerAsync({
      imports: [ConfigModule], // 引入 ConfigModule 模块
      useFactory: (configService: ConfigService) => {
        const { jwtSecret, jwtExprire } = configService.get<ISecurityConfig>(securityRegToken)
        return {
          secret: jwtSecret, // 设置密钥
          signOptions: { expiresIn: jwtExprire }, // 设置过期时间
          ignoreExpiration: isDev, // 开发环境忽略过期时间
        }
      },
      inject: [ConfigService], // 注入 ConfigService 服务
    }),
    UsersModule,
  ],
  controllers: [AuthController],
  providers: [
    AuthService,
  ],
})
export class AuthModule {}

以上代码 主要是 引入了 PassportModule, JwtModule,并完善了相关配置;

2.3 用户登录过程

  • 用户输入 用户名、密码 等信息

    通过http(加密|明文),传输给后端

  • 后端接受到来自前端 的http 请求

    • 验证账号密码是否一致
    • 生成 token 传递给前端

auth.controller.ts

import { Controller, Post, Body, UseGuards, Req } from '@nestjs/common';

import { AuthService } from './auth.service';
import { LoginDto } from './dto/auth.dto';

@Controller('auth')
export class AuthController {
  constructor(
    private readonly authService: AuthService,
  ) {}

  // 登录
  @Post('login')
  login(@Body() loginDto: LoginDto) {
    return this.authService.login(loginDto);
  }
}

auth.service.ts

import { HttpException, Injectable, HttpStatus } from '@nestjs/common';
import { UsersService } from '../users/users.service';
import { isEmpty } from 'lodash';
import { JwtService } from '@nestjs/jwt';

@Injectable()
export class AuthService {
  constructor(
    private readonly usersService: UsersService,
    private readonly jwtService: JwtService,
  ) {}

  async login(loginDto: {
    name: string;
    password: string;
  }) {
    const { name, password } = loginDto;
    const user = await this.usersService.findUserByUserName(name);
    if (isEmpty(user)) {
      throw new HttpException('用户不存在', HttpStatus.NOT_FOUND);
    }

    const { password: userPassword } = user;
    if (userPassword !== password) {
      throw new HttpException('密码错误', HttpStatus.BAD_REQUEST);
    }

    // 生成 JWT 签名
    const jwtSign = await this.jwtService.signAsync({
      id: user.id,
      name: user.name,
      pv: 1, // 版本号
    })
    return {
      access_token: jwtSign
    };
  }
}

用户登录过程代码如上,还是比较简单:

  • 获取用户传递过来的参数,校验用户是否存在,密码是否正确
  • 生成JWT签名,返回token給前端

功能主线如此,密码加密、token管理等有待完善

2.4 业务接口token验证过程

由于文章篇幅,放到下一节讲解;

Flutter iOS应用混淆与安全配置详细文档指南

Flutter iOS应用混淆与安全配置文档

概述

本文档详细描述了iOS应用的混淆与安全配置过程。这些配置旨在保护应用代码、API密钥和敏感数据,防止逆向工程和恶意攻击。配置包括 Dart 代码混淆、原生代码混淆、运行时安全检查和数据安全措施。

混淆与安全措施

Dart代码混淆

Flutter提供了内置的代码混淆功能,通过以下参数启用:

--obfuscate --split-debug-info=./symbols
1

这将:

  • 重命名代码中的标识符,使反编译后的代码难以理解
  • 将调试信息分离到单独的文件中,减少发布版本中的可读信息
  • 保留符号信息用于崩溃分析,但不包含在发布版本中

此外,使用像IpaGuard这样的专业混淆工具可以进一步增强应用安全性。IpaGuard是一款强大的iOS IPA文件混淆工具,无需源码即可对代码和资源进行混淆加密,支持Flutter等多种开发平台,有效增加反编译难度。

原生代码混淆与安全

在iOS上,我们通过以下配置增强安全性:

  1. BuildSettings.xcconfig 配置:
// 启用代码混淆和优化
GCC_OPTIMIZATION_LEVEL = s
SWIFT_OPTIMIZATION_LEVEL = -O
SWIFT_COMPILATION_MODE = wholemodule
DEAD_CODE_STRIPPING = YES

// 安全设置
ENABLE_STRICT_OBJC_MSGSEND = YES
CLANG_WARN_SUSPICIOUS_MOVE = YES
CLANG_WARN_OBJC_LITERAL_CONVERSION = YES
GCC_NO_COMMON_BLOCKS = YES
STRIP_STYLE = all
STRIP_INSTALLED_PRODUCT = YES
COPY_PHASE_STRIP = YES
DEBUG_INFORMATION_FORMAT = dwarf-with-dsym

// 启用应用传输安全
PRODUCT_SETTINGS_URL_SCHEMES = "$(inherit)"
PRODUCT_SETTINGS_APP_TRANSPORT_SECURITY_ALLOWS_ARBITRARY_LOADS = NO

// 添加其他安全属性
OTHER_LDFLAGS = $(inherited) -Wl,-no_pie
12345678910111213141516171819202122
  1. Xcode构建参数
xcodebuild -workspace Runner.xcworkspace -scheme Runner -configuration Release clean build \
  ENABLE_BITCODE=YES STRIP_INSTALLED_PRODUCT=YES DEPLOYMENT_POSTPROCESSING=YES \
  -sdk iphoneos -allowProvisioningUpdates
123

这些参数确保:

  • 启用Bitcode,允许App Store进一步优化代码
  • 移除不必要的符号和调试信息
  • 进行部署后处理操作,应用额外的优化

运行时安全检查

通过以下Swift代码实现运行时安全检查:

// 检查设备是否已越狱
func isJailbroken() -> Bool {


    #if targetEnvironment(simulator)
    return false
    #else
    // 检查常见的越狱文件路径
    let jailbreakPaths = [
        "/Applications/Cydia.app",
        "/Library/MobileSubstrate/MobileSubstrate.dylib",
        "/bin/bash",
        "/usr/sbin/sshd",
        "/etc/apt",
        "/private/var/lib/apt/",
        "/usr/bin/ssh"
    ]

    for path in jailbreakPaths {


        if FileManager.default.fileExists(atPath: path) {


            return true
        }
    }

    // 检查是否可以写入私有目录
    let stringToWrite = "Jailbreak Test"
    do {


        try stringToWrite.write(toFile: "/private/jailbreak.txt", atomically: true, encoding: .utf8)
        try FileManager.default.removeItem(atPath: "/private/jailbreak.txt")
        return true
    } catch {


        // 无法写入,说明没有越狱
    }

    return false
    #endif
}

// 检查是否连接调试器
func isDebuggerAttached() -> Bool {


    #if DEBUG
    return false
    #else
    var info = kinfo_proc()
    var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid()]
    var size = MemoryLayout<kinfo_proc>.stride
    let status = sysctl(&mib, UInt32(mib.1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556

苹果iOS应用开发上架与推广完整教程

苹果应用上架与推广详细指南

作为苹果个人开发者,确保用户能够顺畅地下载并安装自己精心开发的应用,是不可或缺的一环。接下来,我们将深入探讨实现这一目标的一系列核心步骤。

01应用上架准备

> 注册开发者账号

注册开发者账号是发布应用的第一步,必须在苹果开发者官网完成信息填写和费用支付。你需要前往苹果开发者官网,填写包括姓名、联系方式在内的个人真实信息,并支付相应费用。完成注册与身份验证后,你将获得发布应用的权限。

> 确保应用符合准则

在用户能够顺畅地下载并安装应用之前,开发者需要完成一系列的准备工作。这些准备工作的目标是确保应用的下载和安装过程尽可能顺畅,从而提升用户体验。

应用需符合苹果的质量与安全标准,不得包含恶意代码,需具备良好的UI设计和正常功能。苹果对应用的质量、功能及安全性都设有严格标准。应用不得包含恶意代码,必须具备良好的用户界面设计,且功能正常、无显著漏洞。此外,应用还需遵守法律法规,如不得侵犯他人知识产权,不得诱导用户进行不合理付费等。只有满足这些准则的应用,才有可能通过审核并上架供用户下载。

> 选用开发工具与测试

接下来,我们就将详细介绍这些准备工作。在着手构建和测试应用之前,有几个关键步骤需要完成。这些步骤旨在为应用的顺畅下载和安装铺平道路,进而优化用户体验。

使用Xcode进行开发,选择合适的编程语言并在多设备和系统版本上进行全面测试,确保功能完备、兼容性和性能。通常,Xcode被视为开发首选,它提供了从代码编写到编译、调试的一站式服务。依据应用特性,选择如Swift或Objective-C等编程语言。以一个简易笔记应用为例,我们可以运用Swift来构建用户界面,并实现笔记的增删改查功能。务必在不同型号的iOS设备和多种系统版本上进行详尽测试。这包括验证功能的完备性,例如确认笔记应用能否顺利存储和读取数据;确保兼容性,以保证应用在iPhone、iPad等设备上的一致性;以及评估性能,如测试应用的启动速度和响应时间。例如,在iPhone 13与iPhone 8上分别运行笔记应用,观察是否存在布局混乱或功能失效的问题。此外,开发者也可以使用AppUploader等工具来简化iOS证书的申请和管理,支持在Windows、Linux或Mac系统中操作,无需依赖Mac电脑。

02向App Store提交与推广

> 准备元数据与分类选择

接下来,就可以将你的应用提交到App Store了。 提交应用前需准备应用名称、描述、截图及视频,并选择合适的分类以提高用户搜索的精准性

为应用起一个简洁且能体现核心功能的名称,如“速记笔记”,同时撰写详细且吸引人的描述,突出应用特点、功能和优势。此外,还需提供展示关键界面和操作流程的截图及视频预览,使用户能直观了解应用。依此选择合适的分类,如笔记应用可归于“效率”类别。这样能帮助用户在App Store中更精准地搜索到应用。

> 提交审核与推广策略

通过App Store Connect提交审核,积极进行应用推广,包括社交媒体、博主合作及应用内推广,提高用户下载量。

使用Xcode完成应用打包后,通过App Store Connect提交应用进行审核。或者,使用AppUploader工具上传IPA文件到App Store,它支持多平台,比Application Loader更高效,且不携带设备信息。审核过程通常需数日,期间苹果团队将检查应用是否符合各项标准。若审核过程中发现任何问题,将收到反馈通知,需根据反馈进行相应修改后重新提交。同时,积极推广应用也是提高下载量的关键。

  1. 社交媒体推广:借助Twitter、Facebook、Instagram等社交媒体平台,广泛宣传您的应用。您可以分享应用截图、详细的功能介绍以及使用教程等,以此吸引更多潜在用户的关注。例如,在Twitter上发布一系列关于您的笔记应用使用技巧的推文,并附上应用的下载链接。

  2. 与博主和媒体合作:积极联系相关领域的博主、自媒体或科技媒体,向他们介绍您的应用,并努力争取他们的推荐或报道。例如,与专注于效率类应用评测的博主取得联系,邀请他们体验您的应用并分享他们的使用感受。

  3. 应用内推广:如果您还有其他已发布的应用,可以在这些应用内部推广您的新应用,从而引导现有用户下载并体验。

通过上述推广策略,苹果个人开发者可以成功地推动应用的下载安装,并通过广泛的推广活动提高应用的下载量和用户活跃度,从而让您的开发成果得到更多用户的认可和喜爱。

nestjs实战 - 拦截器,统一处理接口请求与响应结果

在之前的篇章中介绍了 拦截器的基本概念、使用方法、使用场景;

本节主要从实战层面开发一个通用功能:统一处理接口请求与响应结果

需求:

  • 统一处理接口请求与响应结果
  • 可选配置(部分接口如果不需要统一处理 可配置)

第一步,全局注入拦截器

首先创建一个 transform.interceptor.ts 文件,并全局注入:

/// app.module.ts
// 省略其它代码
// 主要代码
import { APP_INTERCEPTOR } from '@nestjs/core';
import { TransformInterceptor } from './common/interceptors/transform.interceptor';

@Module({
  // ...
  providers: [
    // 全局注入拦截器,它会作用到所有路由上
    { provide: APP_INTERCEPTOR, useClass: TransformInterceptor }, 
  ],
})
export class AppModule {}

第二步,拦截器功能实现

需要注意的点,我们需要处理

  • 请求参数(前置拦截器)
  • 响应结果(后置拦截器)

在之前的章节中也介绍了这两个概念。

// 首先需要下载两个相关依赖
pnpm add fastify qs  

transform.interceptor.ts

实现拦截器 transform.interceptor.ts 内部逻辑:

import {
  NestInterceptor,
  CallHandler,
  ExecutionContext,
  Injectable,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core'
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

import type { FastifyRequest } from 'fastify'
import qs from 'qs';

import { ResponseModel } from '../mode/response.mode';
import { BYPASS_KEY } from '../decorators/bypass.decorator';


/**
 * 响应拦截器
 * 用于处理响应数据
 * 可以用于处理响应数据,如添加响应头,添加响应体等
 */
@Injectable()
export class TransformInterceptor<T> implements NestInterceptor {
  constructor(private readonly reflector: Reflector) {}

  intercept(context: ExecutionContext, next: CallHandler): Observable<ResponseModel<T>> {
    // ==========================
    // 【阶段 1:控制器执行之前】
    // ==========================
    // 这里的代码会立即同步执行。
    // 此时请求刚到达拦截器,还没进控制器。

    // ✅功能1:获取是否需要跳过拦截器
    const bypass = this.reflector.get<boolean>(
      BYPASS_KEY,
      context.getHandler(),
    )

    // 如果在这里直接 return 一个 Observable (例如 return of({error: 'blocked'}))
    // 而不调用 next.handle(),控制器将永远不会执行(短路)。
    // 调用 next.handle() 启动控制器逻辑
    // 它返回一个 Observable,代表控制器未来的执行结果(流)
    if (bypass)
      return next.handle()
    
    // ✅功能2:获取请求对象
    const http = context.switchToHttp()
    const request = http.getRequest<FastifyRequest>()
    // 处理 query 参数,将数组参数转换为数组,如:?a[]=1&a[]=2 => { a: [1, 2] }
    request.query = qs.parse(request.url.split('?').at(1))


    // ✅功能3:调用控制器逻辑
    const response$ = next.handle(); 
    // 【阶段 2:控制器执行之后】
    // ==========================
    // 这里的代码不会立即执行!
    // 它们被注册为 RxJS 的“操作符”,只有当控制器执行完毕并产生数据时,流才会流动到这里。
    return response$.pipe(
      map((data) => {
        console.log('data', data);
        return ResponseModel.success(data);
      }),
    );
  }
}

response.mode.ts

它是生成 响应数据 的一个构造函数;

import { HttpStatus } from "@nestjs/common";

export class ResponseModel<T = any> {
  code: number;
  message: string;
  data?: T;

  constructor(code: number, message: string, data?: T) {
    this.code = code;
    this.message = message;
    this.data = data ?? undefined;
  }

  static success<T>(data?: T) {
    return new ResponseModel(HttpStatus.OK, 'success', data);
  }

  static error(code: number, message: string) {
    return new ResponseModel(code, message, null);
  }
}

bypass.decorator.ts

配置:是否使用 - 拦截器统一响应数据结构功能,亦可解释为 此拦截器功能的开关

import { SetMetadata } from '@nestjs/common';

export const BYPASS_KEY = '__bypass_key__';

/**
 * 当不需要转换成基础返回格式时添加该装饰器
 */
export const Bypass = () => SetMetadata(BYPASS_KEY, true);

SetMetadata

在 NestJS 中,@SetMetadata() 是一个核心装饰器,用于向路由处理器(Controller 中的方法)或控制器类附加自定义的元数据(Metadata)。简单来说,它允许你给代码“打标签”,这些标签可以在运行时被读取,从而实现灵活、声明式的逻辑控制,例如权限校验、日志记录或缓存策略等。

1. 设置元数据

你可以直接在控制器或其方法上使用 @SetMetadata('key', value) 来设置元数据。

  • key: 一个字符串,作为元数据的唯一标识。
  • value: 任意类型的值,是你想要存储的数据。

示例:

import { Controller, Get, SetMetadata } from '@nestjs/common';

@Controller('cats')
export class CatsController {

  // 为单个方法设置元数据
  @Get()
  @SetMetadata('roles', ['admin']) // key 是 'roles', value 是 ['admin']
  findAll() {
    return 'This action returns all cats';
  }

  // 也可以为整个控制器设置元数据
  @SetMetadata('isPublic', true)
  @Get('public')
  findPublic() {
    return 'This is a public route';
  }
}

设置元数据的最佳实践,如上文中我们的写法,通过一个自定义装饰,为控制器 或 方法 设置。

2. 获取元数据

设置的元数据本身是静态的,它的价值在于在运行时被动态读取。这通常在 守卫(Guards)拦截器(Interceptors)管道(Pipes) 中完成,通过注入 Reflector 辅助类来实现。

Reflector 提供了多种方法来读取元数据,最常用的是 get()

以上文中拦截器为例,获取元数据:

// ...
import KEY_NAME from '../****'

@Injectable()
export class TransformInterceptor<T> implements NestInterceptor {
  constructor(private readonly reflector: Reflector) {}

  intercept(context: ExecutionContext, next: CallHandler): Observable<ResponseModel<T>> {
    const bypass = this.reflector.get<boolean>(
      KEY_NAME,
      context.getHandler(), // 获取控制器方法的元数据
    )
  }
}

第三步,使用

测试一下,我们再users.controller.ts 中测试使用

import { Bypass } from '~/common/decorators/bypass.decorator';

@Controller('users')
export class UsersController {
  constructor(private readonly usersService: UsersService) {}

  @Get()
  @Bypass() // 过滤全局统一响应数据拦截器
  findAll() {
    return this.usersService.findAll();
  }
}

注意:

const bypass = this.reflector.get<boolean>(
  BYPASS_KEY,
  context.getHandler(), // 获取控制器方法的元数据
)

@Bypass 装饰器只能作用在 路由处理方法 上。

总结:

以上就完成了 统一处理接口请求与响应结果 功能的开发,顺便让我们熟悉了 拦截器的用法;

再次回顾一下拦截器的使用场景:

  • 请求参数统一处理:格式转换
  • 响应数据统一 格式化
  • 响应缓存
  • 超时处理
  • 数据序列化/脱敏

业务系统深度集成:基于OnlyOffice中国版连接器实现合同生成、AI写作与报表自动化

业务系统深度集成:基于OnlyOffice中国版连接器实现合同生成、AI写作与报表自动化

一、为什么需要连接器

在大多数企业系统中,文档编辑器只是一个"嵌入式组件"——用户打开、编辑、保存,仅此而已。但真实业务场景中,我们往往需要从外部系统控制文档内容:

  • 合同系统需要将业务数据自动填入合同模板
  • AI写作系统需要将生成的内容插入到光标位置
  • 报表系统需要将统计数据写入Excel并自动生成图表
  • 审批系统需要在文档中自动插入审批意见和签章

这些需求的共同特点是:操作文档的主体不是用户,而是外部系统

OnlyOffice 提供了 连接器(Connector) 机制来满足这类需求。中国版完整实现了官方连接器的全部功能,兼容官方 JSAPI,并可与用户只读模式动态权限切换等增强功能配合使用,构建出更强大的业务集成方案。

中国版连接器增强能力

  • 兼容官方 Automation API,支持 Word/Excel/PPT 全文档类型操作
  • 可与用户只读模式配合:用户无法手动编辑,但连接器可操作文档
  • 支持动态权限切换:运行时通过连接器修改用户权限
  • 支持细粒度文档操作:段落、Run、样式、图表等均可操控

二、连接器基础

2.1 什么是连接器

连接器是 OnlyOffice 文档编辑器提供的 JavaScript API 接口,允许外部代码(宿主页面)对正在编辑的文档执行操作。它与插件(Plugin)拥有相同的底层接口,但使用方式更灵活:

  • 插件:需要打包部署到 documentserver 内部,通过编辑器内的插件菜单激活
  • 连接器:直接在宿主页面的 JavaScript 中调用,无需部署任何文件

对于业务系统集成来说,连接器是更合适的选择

2.2 创建连接器

在初始化编辑器后,通过 createConnector 方法获取连接器实例:

// 初始化编辑器
const docEditor = new DocsAPI.DocEditor("placeholder", config);

// 创建连接器
const connector = docEditor.createConnector();

2.3 核心方法

连接器提供两个核心方法:

callCommand —— 在文档上下文中执行代码:

connector.callCommand(function () {
    // 这里的代码运行在文档编辑器内部
    // 可以使用 Api 对象操作文档
    var oDocument = Api.GetDocument();
    // ...
});

executeMethod —— 调用编辑器提供的方法:

connector.executeMethod("InsertTextToCursor", ["Hello World"]);

两者的区别在于:callCommand 内的函数运行在编辑器沙箱中,可以调用完整的文档操作 API;executeMethod 是对常用操作的封装,调用更简洁。

注意callCommand 中的函数是序列化后传递到编辑器内部执行的,因此不能引用外部变量。需要传递数据时,可以通过函数返回值或事件机制。

三、场景一:合同模板自动填充

3.1 业务需求

某企业合同管理系统的需求:

  • 合同使用标准 Word 模板,包含固定条款和可变字段
  • 业务人员在系统中填写合同要素(甲乙方、金额、期限等)
  • 系统自动将数据填入模板对应位置
  • 用户在编辑器中只能查看结果,不能手动编辑

3.2 模板设计

在 Word 模板中,使用特定格式的占位符标记可变内容,例如:

甲方:{{partyA}}
乙方:{{partyB}}
合同金额:人民币 {{amount}} 元整
合同期限:{{startDate}} 至 {{endDate}}

3.3 技术实现

第一步:配置编辑器

使用用户只读模式,确保用户不能手动编辑,但连接器可以操作文档:

const config = {
  document: {
    fileType: "docx",
    key: contractKey,
    title: "采购合同-2026-0412",
    url: templateDownloadUrl,
    permissions: {
      edit: true,
      copy: true,
      copyOut: false,
      print: true
    }
  },
  editorConfig: {
    mode: "edit",
    customization: {
      readOnly: true,  // 用户只读模式
      waterMark: {
        value: `${currentUser.name}\\n合同预览`,
        fillstyle: "rgba(192, 192, 192, 0.2)",
        font: "14px SimHei",
        rotate: -30,
        opacity: 0.2
      }
    }
  }
};

第二步:获取业务数据并填充

当用户在业务表单中填写完合同要素后,通过连接器将数据写入文档:

// 业务数据
const contractData = {
  partyA: "北京某某科技有限公司",
  partyB: "上海某某信息技术有限公司",
  amount: "壹佰贰拾叁万肆仟伍佰陆拾柒",
  amountNum: "1,234,567.00",
  startDate: "2026年04月12日",
  endDate: "2027年04月11日",
  signDate: "2026年04月12日"
};

// 通过连接器填充数据
function fillContract(data) {
  const connector = docEditor.createConnector();

  // 将数据序列化后传入
  const jsonData = JSON.stringify(data);

  connector.callCommand(function () {
    // 在文档上下文中执行
    var oDocument = Api.GetDocument();
    var aElements = oDocument.GetAllContentControls();

    // 如果使用内容控件方式
    for (var i = 0; i < aElements.length; i++) {
      var tag = aElements[i].GetTag();
      // 根据 tag 匹配字段并替换
    }
  });

  // 也可以使用搜索替换方式
  connector.callCommand(function () {
    var oDocument = Api.GetDocument();

    // 使用 SearchAndReplace 方法
    var oSearchData = {
      searchString: "{{partyA}}",
      replaceString: "北京某某科技有限公司",
      matchCase: true
    };

    oDocument.SearchAndReplace(oSearchData);
  });
}

第三步:逐字段替换的完整实现

实际项目中,建议封装一个通用的模板填充方法:

function fillTemplate(connector, fieldMap) {
  const entries = Object.entries(fieldMap);

  // 由于 callCommand 内部不能引用外部变量
  // 需要逐个字段调用,或者将数据编码到函数体中
  entries.forEach(([placeholder, value]) => {
    // 动态构造函数字符串
    const script = `
      var oDocument = Api.GetDocument();
      oDocument.SearchAndReplace({
        searchString: "{{${placeholder}}}",
        replaceString: "${value.replace(/"/g, '\\"')}",
        matchCase: true
      });
    `;

    connector.callCommand(new Function(script));
  });
}

// 使用
fillTemplate(connector, {
  partyA: contractData.partyA,
  partyB: contractData.partyB,
  amount: contractData.amount,
  amountNum: contractData.amountNum,
  startDate: contractData.startDate,
  endDate: contractData.endDate
});

3.4 用户只读模式详解

用户只读模式是中国版特有的功能,可以实现"用户不可编辑,但连接器可操作文档"的效果。

与普通只读模式的区别

模式 用户能否编辑 连接器能否操作 适用场景
普通只读(mode: view) 纯预览场景
用户只读(readOnly: true) 合同生成、公文套打等

配置要点

{
  "editorConfig": {
    "customization": {
      "readOnly": true
    },
    "permissions": {
      "edit": true
    },
    "mode": "edit"
  }
}

三个字段必须同时配置:mode 设为 editpermissions.edit 设为 truecustomization.readOnly 设为 true

注意:用户只读模式为高级版功能,目前仅支持 Word/Excel/PPT 的 PC 模式

3.5 关键注意事项

  • 模板占位符应使用不易与正文冲突的格式(如 {{fieldName}}
  • 替换操作完成后,建议调用保存接口生成最终文档
  • 用户只读模式保证了模板结构和法律条款不会被手动修改
  • 结合防截图水印,可以在合同预览阶段保护内容安全

四、场景二:AI辅助写作集成

4.1 业务需求

某内容管理平台需要集成 AI 写作能力:

  • 用户在文档中编辑时,可以通过侧边栏调用 AI 功能
  • AI 生成的内容可以插入到当前光标位置
  • 支持 AI 润色:选中文本 → 调用 AI 改写 → 替换原文
  • 支持 AI 续写:在光标位置根据上下文续写内容

4.2 架构设计

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│   业务前端    │────→│  AI 服务端    │────→│  大语言模型   │
│  (侧边栏)    │←────│  (API网关)    │←────│  (LLM)       │
└──────┬───────┘     └──────────────┘     └──────────────┘
       │
       │ connector.callCommand()
       ↓
┌──────────────┐
│  OnlyOffice  │
│  编辑器      │
└──────────────┘

4.3 核心实现

获取选中文本,发送给AI处理

// 获取当前选中的文本
function getSelectedText(connector) {
  return new Promise((resolve) => {
    connector.callCommand(
      function () {
        var oDocument = Api.GetDocument();
        var selectedText = oDocument.GetSelectedText();
        return selectedText;
      },
      false, // isNoCalc
      function (result) {
        resolve(result);
      }
    );
  });
}

// AI润色流程
async function aiPolish() {
  const selectedText = await getSelectedText(connector);

  if (!selectedText) {
    alert("请先选中需要润色的文本");
    return;
  }

  // 调用后端AI接口
  const response = await fetch("/api/ai/polish", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ text: selectedText })
  });

  const { result } = await response.json();

  // 将AI结果替换选中内容
  connector.callCommand(function () {
    var oDocument = Api.GetDocument();
    // 在当前选区位置插入新文本
    var oParagraph = Api.CreateParagraph();
    oParagraph.AddText(result);
    oDocument.InsertContent([oParagraph], true); // true 表示替换选区
  });
}

在光标位置插入AI生成的内容

async function aiGenerate(prompt) {
  const response = await fetch("/api/ai/generate", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ prompt })
  });

  const { result } = await response.json();

  // 将生成的内容插入光标位置
  connector.callCommand(function () {
    var oParagraph = Api.CreateParagraph();
    var oRun = Api.CreateRun();

    // 设置字体样式与文档保持一致
    oRun.AddText(result);
    oRun.SetFontFamily("SimSun");
    oRun.SetFontSize(24); // 单位是半磅,24 = 12pt

    oParagraph.AddElement(oRun);

    var oDocument = Api.GetDocument();
    oDocument.InsertContent([oParagraph]);
  });
}

4.4 流式输出的处理

如果AI接口支持流式输出(SSE),可以实现逐字显示效果。但需要注意,频繁调用 callCommand 会有性能开销。建议的处理方式:

  • 在侧边栏先完成AI内容的流式展示
  • 用户确认后一次性插入到文档中
  • 或者每积累一定长度(如一个段落)后批量插入
// 推荐:在侧边栏展示完整结果后,一次性插入
function insertAiResult(text) {
  const paragraphs = text.split("\n").filter(p => p.trim());

  connector.callCommand(function () {
    var aContent = [];

    for (var i = 0; i < paragraphs.length; i++) {
      var oParagraph = Api.CreateParagraph();
      oParagraph.AddText(paragraphs[i]);
      aContent.push(oParagraph);
    }

    var oDocument = Api.GetDocument();
    oDocument.InsertContent(aContent);
  });
}

五、场景三:Excel报表自动生成

5.1 业务需求

某数据分析平台需要将统计数据自动填入Excel模板并生成图表:

  • 每月自动生成销售报表
  • 将数据库中的统计数据写入对应的单元格
  • 根据数据自动更新图表
  • 生成后的报表可以供用户在线查看和下载

5.2 写入表格数据

// 销售数据
const salesData = [
  { month: "1月", revenue: 125000, cost: 89000, profit: 36000 },
  { month: "2月", revenue: 138000, cost: 92000, profit: 46000 },
  { month: "3月", revenue: 156000, cost: 98000, profit: 58000 },
  // ...
];

function fillExcelReport(connector, data) {
  // 将数据转为JSON字符串,嵌入到函数中
  const jsonStr = JSON.stringify(data);

  connector.callCommand(function () {
    var data = JSON.parse(jsonStr);
    var oWorksheet = Api.GetActiveSheet();

    // 写入表头
    oWorksheet.GetRange("A1").SetValue("月份");
    oWorksheet.GetRange("B1").SetValue("收入(元)");
    oWorksheet.GetRange("C1").SetValue("成本(元)");
    oWorksheet.GetRange("D1").SetValue("利润(元)");

    // 设置表头样式
    var headerRange = oWorksheet.GetRange("A1:D1");
    headerRange.SetBold(true);
    headerRange.SetFillColor(Api.CreateColorFromRGB(68, 114, 196));
    headerRange.SetFontColor(Api.CreateColorFromRGB(255, 255, 255));

    // 写入数据
    for (var i = 0; i < data.length; i++) {
      var row = i + 2;
      oWorksheet.GetRange("A" + row).SetValue(data[i].month);
      oWorksheet.GetRange("B" + row).SetValue(data[i].revenue);
      oWorksheet.GetRange("C" + row).SetValue(data[i].cost);
      oWorksheet.GetRange("D" + row).SetValue(data[i].profit);
    }

    // 设置数字格式
    var dataRows = data.length;
    oWorksheet.GetRange("B2:D" + (dataRows + 1)).SetNumberFormat("#,##0.00");

    // 自动调整列宽
    oWorksheet.GetRange("A1:D1").SetColumnWidth(15);
  });
}

5.3 自动创建图表

function createChart(connector, dataRowCount) {
  connector.callCommand(function () {
    var oWorksheet = Api.GetActiveSheet();

    // 创建柱状图
    var oChart = oWorksheet.AddChart(
      "'" + oWorksheet.GetName() + "'!$A$1:$D$" + (dataRowCount + 1),
      true,  // 按行
      "bar", // 图表类型
      2,     // 样式
      200 * 36000,   // 宽度(EMU)
      150 * 36000    // 高度(EMU)
    );

    oChart.SetTitle("月度销售报表", 12);
    oChart.SetLegendPos("bottom");

    // 将图表放置在数据下方
    oChart.SetPosition(oWorksheet, dataRowCount + 3, 0, 0, 0);
  });
}

5.4 完整工作流

async function generateMonthlyReport() {
  // 1. 从后端获取数据
  const response = await fetch("/api/reports/monthly-sales");
  const salesData = await response.json();

  // 2. 创建连接器
  const connector = docEditor.createConnector();

  // 3. 填充数据
  fillExcelReport(connector, salesData);

  // 4. 生成图表
  createChart(connector, salesData.length);

  // 5. 通知用户
  showNotification("报表生成完成");
}

六、连接器开发的最佳实践

6.1 数据传递

由于 callCommand 中的函数在编辑器沙箱中执行,不能直接引用外部变量。推荐的数据传递方式:

// 方式一:将数据序列化后拼接到函数体中
function setValueByConnector(connector, cellRef, value) {
  const safeValue = JSON.stringify(value);
  connector.callCommand(
    new Function(`
      var oSheet = Api.GetActiveSheet();
      oSheet.GetRange("${cellRef}").SetValue(${safeValue});
    `)
  );
}

// 方式二:使用 callCommand 的回调获取返回值
connector.callCommand(
  function () {
    return Api.GetDocument().GetStatistics();
  },
  false,
  function (stats) {
    console.log("文档统计:", stats);
  }
);

6.2 错误处理

function safeCallCommand(connector, fn, callback) {
  try {
    connector.callCommand(fn, false, function (result) {
      if (callback) callback(null, result);
    });
  } catch (error) {
    console.error("连接器调用失败:", error);
    if (callback) callback(error, null);
  }
}

6.3 性能优化

  • 批量操作:将多个操作合并到一次 callCommand 调用中,减少通信开销
  • 避免频繁调用:不要在循环中逐次调用 callCommand,应在单次调用中完成所有操作
  • 异步处理callCommand 是异步的,注意操作顺序的控制
// 不推荐:逐行调用
for (let i = 0; i < 1000; i++) {
  connector.callCommand(function () {
    // 写入一行数据
  });
}

// 推荐:一次性写入所有数据
connector.callCommand(function () {
  var oSheet = Api.GetActiveSheet();
  for (var i = 0; i < 1000; i++) {
    oSheet.GetRange("A" + (i + 1)).SetValue("data" + i);
  }
});

6.4 动态权限切换(中国版特有)

中国版自 9.3.0 版本开始支持通过连接器动态修改用户权限,无需重新打开文档即可实时生效。

使用场景

  • 审批流程中,审批人点击"开始审批"后自动切换为只读模式
  • 文档状态变化时,动态调整用户的编辑/复制/打印权限
  • 根据业务规则,在特定条件下限制用户操作

实现示例

// 创建连接器
const connector = docEditor.createConnector();

// 审批人点击"开始审批"按钮时,切换为只读+可评论
function onStartReview() {
  connector.callCommand(function () {
    Api.changePermissions({
      edit: false,
      comment: true,
      copy: true,
      copyOut: false,
      print: false
    });
  });
}

// 审批通过后,进入签署阶段,完全禁止操作
function onApproved() {
  connector.callCommand(function () {
    Api.changePermissions({
      edit: false,
      comment: false,
      copy: false,
      copyOut: false,
      print: false
    });
  });
}

// 审批驳回,退回给起草人编辑
function onRejected() {
  connector.callCommand(function () {
    Api.changePermissions({
      edit: true,
      comment: true,
      copy: true,
      copyOut: true,
      print: true
    });
  });
}

支持的权限字段

字段 说明 类型
comment 是否允许评论 Boolean
copy 是否允许复制 Boolean
copyOut 是否允许复制到外部(中国版特有) Boolean
edit 是否允许编辑 Boolean
print 是否允许打印 Boolean

注意:动态权限切换为高级版功能,目前仅支持 Word/Excel/PPT 的 PC 模式

6.5 与中国版增强功能的配合

连接器可以与中国版的多个增强功能组合使用,构建更强大的业务场景:

组合方式 典型场景 关键配置
连接器 + 用户只读模式 合同制作、公文套打 customization.readOnly: true
连接器 + 动态权限切换 审批流程中的权限流转 Api.changePermissions()
连接器 + 防截图水印 安全环境下的自动文档生成 customization.waterMark
连接器 + 内部剪切板 敏感数据填充后防止用户复制到外部 permissions.copyOut: false
连接器 + 迷你工具栏 简化用户编辑体验 customization.miniToolbar: true

七、与WPS JSSDK的对比

对于有国内办公套件集成经验的开发者,可能更熟悉 WPS 的 JSSDK。以下是两者的关键差异:

对比维度 OnlyOffice 连接器 WPS JSSDK
API丰富度 与插件接口相同,覆盖面广 提供标准化接口,覆盖常用场景
文档操作深度 可操作到段落、Run、样式等细粒度 以高层封装为主
私有化部署 完全支持 需要商业授权
学习成本 需了解 OOXML 模型 接口设计更面向业务
扩展性 插件 + 连接器双通道 SDK标准接口

OnlyOffice 连接器的优势在于更深的文档操作能力和完全的私有化支持,适合需要深度定制的企业级场景。

八、总结

OnlyOffice 中国版的连接器为业务系统与文档编辑器之间架起了一座桥梁。通过 JSAPI,外部系统可以像操作数据库一样操作文档内容——读取、写入、格式化、生成图表,一切都可以通过代码完成。

核心价值:

  • 合同生成:模板 + 数据 = 标准合同,告别手工填写
  • AI写作:大模型生成的内容无缝融入文档编辑流程
  • 报表自动化:数据驱动的文档生成,取代重复的手工操作
  • 流程驱动:文档操作与业务流程深度绑定,实现真正的自动化

连接器让 OnlyOffice 不再只是一个编辑器,而是业务系统中可编程的文档引擎。

相关资源

❌