阅读视图

发现新文章,点击刷新页面。

前端重生之 - 前端视角下的 Python

以前我认为 JavaScript 就是编程世界的全部。从 jQuery 时代的 DOM 操作,到 React/Vue 的组件化革命,再到 TypeScript 的类型安全,见证了前端技术的每一次跃迁。然而,AI 时代来临,人人都在喊转 “全栈“,所以我也开始真正深入 Python 的生态系统,才发现这不仅是两门语言的对话,更是两种编程哲学、两种技术文化的碰撞与融合。这篇文章,是我从前端视角重新审视 Python 的记录,也是我对技术本质的一次探索,接下来我还将从前端视角看 Java、Go、C# 等不同的后端的语言,可能会有错误的地方,欢迎指正,也欢迎关注我,后期还将有分析其他语言的文章,奥利给!


从 JS 的异步到 Python 的同步

1.1 事件循环的底层机制

前端对事件循环(Event Loop)的理解,往往始于浏览器中的 setTimeoutPromise。JavaScript 的单线程异步模型,是为了应对浏览器环境中用户交互、网络请求等 I/O 密集型场景而设计的。我们习惯了回调地狱的煎熬,也享受过 async/await 带来的语法糖甜蜜。但很少有人深入思考:为什么 JavaScript 必须是单线程的?这个设计选择背后的权衡是什么?

JavaScript 诞生于浏览器环境,而浏览器的核心职责是渲染页面和响应用户交互。如果 JavaScript 是多线程的,一个线程正在修改 DOM,另一个线程同时也在修改同一个 DOM 节点,就会产生竞争条件(Race Condition),导致不可预测的行为。为了避免这种复杂性,JavaScript 的设计者选择了单线程模型,并通过事件循环来实现异步非阻塞 I/O。

浏览器的事件循环可以简化为以下伪代码:

while (true) {
    // 1. 执行宏任务队列中的一个任务
    const macroTask = macroTaskQueue.shift();
    if (macroTask) execute(macroTask);
    
    // 2. 执行所有微任务
    while (microTaskQueue.length > 0) {
        const microTask = microTaskQueue.shift();
        execute(microTask);
    }
    
    // 3. 渲染(如果需要)
    if (shouldRender) render();
}

这个模型保证了 JavaScript 的执行顺序是可预测的:宏任务 → 微任务 → 渲染。Promise 的回调之所以比 setTimeout 先执行,就是因为它们被放入了微任务队列。

然而,当我第一次接触 Python 的 asyncio 时,一种奇妙的熟悉感与陌生感同时涌现。Python 的协程机制与 JavaScript 的 Promise 有着惊人的相似性,但底层哲学却截然不同。

1.2 Python asyncio 的设计哲学

Python 的 asyncio 是在 Python 3.4 中引入的,在 3.5 中通过 async/await 语法得到大幅改进。与 JavaScript 不同,Python 并不是天生单线程的——它有多线程(threading 模块)和多进程(multiprocessing 模块)的完整支持。asyncio 是 Python 对协程(Coroutine)这一并发模型的选择,而不是被迫的设计。

让我们深入对比两者的实现:

# Python asyncio 示例
import asyncio

async def fetch_data(url):
    print(f"开始请求: {url}")
    await asyncio.sleep(1)  # 模拟网络请求
    print(f"请求完成: {url}")
    return f"数据来自 {url}"

async def main():
    # 并发执行多个任务
    tasks = [
        fetch_data("https://api.example.com/1"),
        fetch_data("https://api.example.com/2"),
        fetch_data("https://api.example.com/3")
    ]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())
// JavaScript 对比实现
async function fetchData(url) {
    console.log(`开始请求: ${url}`);
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log(`请求完成: ${url}`);
    return `数据来自 ${url}`;
}

async function main() {
    const tasks = [
        fetchData("https://api.example.com/1"),
        fetchData("https://api.example.com/2"),
        fetchData("https://api.example.com/3")
    ];
    const results = await Promise.all(tasks);
    console.log(results);
}

main();

表面看两者几乎相同,但底层实现有本质区别:

特性 JavaScript Python
事件循环 浏览器/Node 内置,不可替换 asyncio 库实现,可自定义
协程实现 基于 Promise 和微任务队列 基于生成器(Generator)和事件循环
线程模型 单线程 + 事件循环 多线程/多进程 + 可选的协程
GIL 影响 无(天生单线程) 有(多线程受 GIL 限制)
并发性能 适合 I/O 密集型 适合 I/O 密集型,CPU 密集型需用多进程

1.3 GIL:Python 的"阿喀琉斯之踵"

谈到 Python 的并发,就不能不提 GIL(Global Interpreter Lock,全局解释器锁)。GIL 是 CPython 实现中的一个机制,它确保任何时候只有一个线程在执行 Python 字节码。这意味着,即使在多核 CPU 上,Python 的多线程也无法实现真正的并行计算。

import threading
import time

def cpu_bound_task(n):
    """CPU 密集型任务"""
    count = 0
    for i in range(n):
        count += i * i
    return count

# 多线程版本(受 GIL 限制)
def multi_threaded():
    threads = []
    for _ in range(4):
        t = threading.Thread(target=cpu_bound_task, args=(10_000_000,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

# 多进程版本(绕过 GIL)
from multiprocessing import Process

def multi_process():
    processes = []
    for _ in range(4):
        p = Process(target=cpu_bound_task, args=(10_000_000,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()

# 性能对比
start = time.time()
multi_threaded()
print(f"多线程耗时: {time.time() - start:.2f}秒")

start = time.time()
multi_process()
print(f"多进程耗时: {time.time() - start:.2f}秒")

在我的测试环境中(4 核 CPU),多线程版本耗时约 12 秒,而多进程版本仅需 3 秒。这就是 GIL 的影响——多线程在 CPU 密集型任务上无法发挥多核优势。

JavaScript 没有 GIL 的问题,因为它天生就是单线程的。但这也意味着 JavaScript 无法利用多核 CPU 进行并行计算——除非使用 Worker Threads(Node.js)或 Web Workers(浏览器),但这些机制与主线程是隔离的,通信成本较高。

1.4 编程范式的思维转换

JavaScript 是一门多范式语言,但前端开发中函数式编程的影子无处不在:mapfilterreduce 成为日常,Immutable.js 和 Ramda 这样的库广受欢迎。我们追求纯函数、避免副作用、崇尚不可变性。这种趋势在 React 的函数组件和 Hooks 中达到顶峰。

// React 函数组件 + Hooks(函数式风格)
import React, { useState, useEffect } from 'react';

function UserList({ users }) {
    const [filteredUsers, setFilteredUsers] = useState([]);
    
    useEffect(() => {
        const activeUsers = users
            .filter(u => u.isActive)
            .map(u => ({ ...u, name: u.name.toUpperCase() }));
        setFilteredUsers(activeUsers);
    }, [users]);
    
    return (
        <ul>
            {filteredUsers.map(u => <li key={u.id}>{u.name}</li>)}
        </ul>
    );
}

Python 则是一门 "batteries included" 的语言,它拥抱多种范式却从不偏执。在 Python 中,你可以写出优雅的函数式代码:

# Python 函数式风格
users = [
    {"id": 1, "name": "Alice", "is_active": True},
    {"id": 2, "name": "Bob", "is_active": False},
    {"id": 3, "name": "Charlie", "is_active": True}
]

# 函数式写法
filtered_users = list(
    map(
        lambda u: {**u, "name": u["name"].upper()},
        filter(lambda u: u["is_active"], users)
    )
)

# 但更 Pythonic 的方式是列表推导式
filtered_users = [
    {**u, "name": u["name"].upper()} 
    for u in users 
    if u["is_active"]
]

这种"列表推导式"的语法,是 Python 对函数式编程的本土化改造。它既保留了函数式的表达能力,又符合 Python 简洁优雅的设计哲学。

Python 还支持面向对象和命令式编程:

# Python 面向对象风格
class User:
    def __init__(self, id, name, is_active):
        self.id = id
        self.name = name
        self.is_active = is_active
    
    def activate(self):
        self.is_active = True
    
    def __repr__(self):
        return f"User({self.name})"

# 使用类
users = [User(1, "Alice", True), User(2, "Bob", False)]
for user in users:
    if not user.is_active:
        user.activate()

这让我反思:前端开发中是否过度追求函数式的"纯粹",而忽略了实用主义的平衡?React 的类组件被函数组件取代,但类组件在某些场景下(如复杂的生命周期管理)仍然有其优势。Python 的多范式支持提醒我们:没有最好的范式,只有最适合场景的范式。


类型系统——从动态到静态的考虑

2.1 TypeScript 的革命

2012 年,TypeScript 的诞生改变了前端开发的格局。作为 JavaScript 的超集,TypeScript 为动态语言带来了静态类型的严谨。今天,几乎所有大型前端项目都采用 TypeScript,类型安全已成为行业标准。

TypeScript 的成功不是偶然的。它解决了 JavaScript 开发中的几个核心痛点:

  1. 运行时错误前置:在编译阶段发现类型错误,而不是在生产环境崩溃
  2. IDE 支持:智能提示、自动补全、重构支持
  3. 文档即代码:类型定义就是最好的 API 文档
  4. 团队协作:类型约束作为团队间的契约
// TypeScript 示例
interface User {
    id: number;
    name: string;
    email?: string;  // 可选属性
}

function greet(user: User): string {
    return `Hello, ${user.name}`;
}

// 编译错误:类型不匹配
const result = greet({ id: "1", name: "Alice" });  // Error: id 应该是 number

2.2 Python 类型注解的演进

有趣的是,Python 的类型注解(Type Hints)几乎是与 TypeScript 同期发展的。PEP 484 在 2014 年引入类型注解,PEP 526 在 2016 年完善变量注解。两条平行线,却走向了相似的终点。

from typing import Optional, List, Dict

class User:
    def __init__(self, id: int, name: str, email: Optional[str] = None):
        self.id = id
        self.name = name
        self.email = email

def greet(user: User) -> str:
    return f"Hello, {user.name}"

# 类型检查工具(如 mypy)会在静态分析时报告错误
user = User(id="1", name="Alice")  # mypy: Argument "id" has incompatible type "str"; expected "int"

然而,TypeScript 和 Python 类型系统的底层哲学存在本质差异:

2.2.1 编译时 vs 运行时

TypeScript 的类型在编译时完全擦除,编译后的 JavaScript 不包含任何类型信息:

// TypeScript 源码
function add(a: number, b: number): number {
    return a + b;
}

// 编译后的 JavaScript
function add(a, b) {
    return a + b;
}

Python 的类型注解在运行时保留,但解释器不做强制检查:

# Python 源码
def add(a: int, b: int) -> int:
    return a + b

# 运行时可以通过 __annotations__ 访问类型信息
print(add.__annotations__)  # {'a': <class 'int'>, 'b': <class 'int'>, 'return': <class 'int'>}

# 但解释器不会检查类型
result = add("hello", "world")  # 正常运行,返回 "helloworld"

2.2.2 结构类型 vs 名义类型

TypeScript 采用结构类型系统(Structural Typing),也称为"鸭子类型"(Duck Typing):

interface Point {
    x: number;
    y: number;
}

function printPoint(p: Point) {
    console.log(`${p.x}, ${p.y}`);
}

// 只要结构匹配,就可以传递
printPoint({ x: 1, y: 2 });  // OK
printPoint({ x: 1, y: 2, z: 3 });  // OK(多余属性允许)

Python 的类型检查器(如 mypy)同样支持结构类型,通过 Protocol(PEP 544):

from typing import Protocol

class Point(Protocol):
    x: int
    y: int

def print_point(p: Point) -> None:
    print(f"{p.x}, {p.y}")

class MyPoint:
    def __init__(self, x: int, y: int):
        self.x = x
        self.y = y

# MyPoint 没有显式继承 Point,但结构匹配即可
print_point(MyPoint(1, 2))  # OK

2.2.3 类型推断

TypeScript 的类型推断更为激进:

// TypeScript 能推断出 arr 是 number[]
const arr = [1, 2, 3];

// 能推断出 result 是 number
const result = arr.map(x => x * 2).filter(x => x > 2);

Python 的类型推断相对保守,需要显式注解:

from typing import List

# Python 需要显式类型注解
arr: List[int] = [1, 2, 3]

# 或者让 mypy 推断(有限支持)
result = [x * 2 for x in arr if x > 2]  # mypy 能推断为 List[int]

2.3 渐进式类型的价值

这种对比从侧面来说:类型系统的价值不在于"正确性"本身,而在于它如何帮助团队协作和代码演进。Python 的渐进式类型(Gradual Typing)策略——允许在需要时添加类型,在灵活时保持动态——或许比 TypeScript 的"全有或全无"更加务实。

# Python 渐进式类型示例
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from models import User  # 仅在类型检查时导入

def process_user(user):  # 动态类型,灵活
    return user.name.upper()

def process_user_typed(user: "User") -> str:  # 静态类型,安全
    return user.name.upper()

在实际的开发中,我通常采用以下策略:

  • 公共 API 和核心模块使用完整的类型注解
  • 脚本和原型代码保持动态类型,快速迭代
  • 使用 mypy 在 CI 中检查关键模块的类型安全

生态系统

3.1 npm 与 PyPI:包管理的两种哲学

前端对于 npm 生态的复杂情感,可以用一句话概括:"node_modules 是世界上最重的东西"。JavaScript 的微包文化(left-pad 事件)和依赖,是每个前端的心头痛。

让我们看看一个典型的 React 项目的依赖树:

$ npm list | wc -l
# 输出可能超过 1000 行

$ du -sh node_modules
# 输出可能超过 500MB

这种依赖膨胀的原因是多方面的:

  1. 微包文化:JavaScript 生态倾向于将功能拆分为极小的包,一个左填充函数(left-pad)也能成为一个包
  2. 重复依赖:不同版本的同一个库可能同时存在
  3. 开发依赖混杂:构建工具、测试框架、类型定义都混在一起

2016 年的微包事件是一个标志性案例。一个只有 11 行代码的包被作者从 npm 下架,导致全球数千个项目无法构建。这暴露了微包文化的脆弱性。

Python 的包管理生态则呈现出不同的面貌。pip、conda、poetry、pipenv …… 工具超级多,但核心理念一致:显式优于隐式

# Python 的 requirements.txt
requests==2.28.1
numpy>=1.21.0
pandas~=1.5.0

这个文件明确告诉我们:

  • requests 必须严格等于 2.28.1 版本
  • numpy 可以是 1.21.0 或更高版本
  • pandas 可以是 1.5.x 系列(补丁版本可以变)

这种显式依赖管理的文化,让 Python 项目的可重现性远超 JavaScript。当你克隆一个 Python 项目,你知道需要安装什么;而当你克隆一个 Node.js 项目,node_modules 的深渊往往让人望而却步。

3.2 虚拟环境:Python 的隔离艺术

Python 的虚拟环境(virtualenv/venv)是包管理的另一大特色。每个项目可以有独立的 Python 环境和依赖,互不干扰。

# 创建虚拟环境
python -m venv myproject-env

# 激活虚拟环境
source myproject-env/bin/activate  # Linux/Mac
myproject-env\Scripts\activate  # Windows

# 安装依赖
pip install -r requirements.txt

# 退出虚拟环境
deactivate

这与 Node.js 的 node_modules 本地安装有相似之处,但更加彻底——虚拟环境甚至隔离了 Python 解释器本身。

现在 Python 项目更倾向于使用 pyproject.toml(PEP 518)来管理依赖:

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "my-project"
version = "0.1.0"
description = "A sample project"

[tool.poetry.dependencies]
python = "^3.10"
requests = "^2.28"
pydantic = "^1.10"

[tool.poetry.dev-dependencies]
pytest = "^7.0"
black = "^22.0"
mypy = "^0.991"

Poetry 不仅管理依赖,还管理虚拟环境、打包、发布,是一个完整的项目管理工具。这与 JavaScript 生态中 npm/yarn/pnpm 的竞争格局形成鲜明对比。

3.3 标准库的力量

Python 的 "batteries included" 哲学,在标准库中体现得淋漓尽致。从文件处理到网络编程,从正则表达式到 JSON 解析,从单元测试到并发编程——Python 标准库几乎覆盖了一个开发者 80% 的日常需求。

# Python 标准库示例
import json
import re
import urllib.request
from datetime import datetime, timedelta
from pathlib import Path
from unittest import TestCase, main

# JSON 处理
data = {"name": "Alice", "age": 30}
json_str = json.dumps(data, indent=2)

# 正则表达式
pattern = r"\b\w+@\w+\.\w+\b"
emails = re.findall(pattern, "Contact: alice@example.com, bob@test.org")

# 文件路径操作
config_path = Path.home() / ".config" / "myapp" / "settings.json"

# 日期时间
now = datetime.now()
future = now + timedelta(days=7)

# HTTP 请求
with urllib.request.urlopen("https://api.example.com/data") as response:
    data = json.loads(response.read())

相比之下,JavaScript 的标准库堪称贫瘠。直到 ES6 引入 Promise、fetch、模块化,JavaScript 才勉强跟上时代。但即便如此,lodash、axios、moment 依然是大多数项目的标配。

这种差异的根源在于语言的设计目标:

  • JavaScript 诞生于浏览器,被设计为轻量级脚本语言,依赖浏览器提供的 DOM API
  • Python 诞生于通用编程,被设计为"可执行的伪代码",需要在各种环境中独立运行

标准库的丰富程度,反映的是语言设计者对"开箱即用"的不同理解。

3.4 生态系统的成熟度对比

维度 JavaScript/npm Python/PyPI
包数量 200万+ 40万+
包平均大小 小(微包文化) 大(功能完整)
依赖管理 嵌套依赖(node_modules) 扁平依赖 + 虚拟环境
标准库 贫瘠 丰富("batteries included")
类型定义 @types/* 包 内置类型注解
安全审计 npm audit safety, pip-audit
私有仓库 Verdaccio, Nexus PyPI Enterprise, Devpi

数据科学的疆域的追赶

4.1 Python 的数据霸权

如果说前端是 JavaScript 的天下,那么数据科学就是 Python 的帝国。NumPy、Pandas、Matplotlib、Scikit-learn、TensorFlow、PyTorch——这些库构成了数据科学的完整工具链,而 Python 是它们的通用语言。

这种霸权不是偶然的。Python 的简洁语法、丰富的科学计算库、与 C/C++/Fortran 的良好互操作性,使其成为数据科学家的首选语言。

4.1.1 NumPy:向量化计算的威力

NumPy 是 Python 科学计算的基础。它提供了高效的多维数组对象和数学函数库,底层使用 C 实现,性能远超纯 Python。

import numpy as np

# 创建数组
arr = np.array([1, 2, 3, 4, 5])

# 向量化运算——比 Python 循环快 100 倍
result = arr * 2 + 1  # [3, 5, 7, 9, 11]

# 多维数组
matrix = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])

# 矩阵运算
transposed = matrix.T
dot_product = matrix @ matrix.T

# 统计函数
mean = np.mean(arr)
std = np.std(arr)
max_val = np.max(matrix, axis=0)  # 每列的最大值

让我们做一个性能对比:

import time

# Python 原生列表
python_list = list(range(1_000_000))

start = time.time()
result = [x * 2 for x in python_list]
print(f"Python 列表推导式: {time.time() - start:.4f}秒")

# NumPy 数组
numpy_array = np.array(python_list)

start = time.time()
result = numpy_array * 2
print(f"NumPy 向量化运算: {time.time() - start:.4f}秒")

在我的电脑上,Python 列表推导式耗时约 0.08 秒,而 NumPy 仅需 0.001 秒——80 倍的性能差距!这是因为 NumPy 的运算是在 C 层面执行的,避免了 Python 的解释器开销。

4.1.2 Pandas:数据处理的艺术

如果说 NumPy 是数组计算的利器,Pandas 就是数据处理的瑞士军刀。它提供了 DataFrame 和 Series 两种数据结构,让数据清洗、转换、分析变得异常简单。

import pandas as pd

# 读取数据
df = pd.read_csv('sales_data.csv')

# 数据清洗
df = df.dropna()  # 删除缺失值
df = df[df['price'] > 0]  # 过滤异常值
df['date'] = pd.to_datetime(df['date'])  # 类型转换

# 数据转换
df['revenue'] = df['price'] * df['quantity']
df['month'] = df['date'].dt.month

# 分组聚合
monthly_sales = df.groupby('month').agg({
    'revenue': 'sum',
    'quantity': 'mean'
}).reset_index()

# 透视表
pivot = df.pivot_table(
    values='revenue',
    index='category',
    columns='month',
    aggfunc='sum'
)

# 合并数据
merged = pd.merge(df, customer_df, on='customer_id', how='left')

这段代码如果用 JavaScript 实现,需要多少行?lodash 可以处理数组,但没有原生的 DataFrame 概念。D3.js 可以做数据转换,但学习曲线陡峭。Pandas 的链式操作让复杂的数据处理变得可读、可维护。

4.1.3 数据可视化

Matplotlib 和 Seaborn 是 Python 数据可视化的主力军:

import matplotlib.pyplot as plt
import seaborn as sns

# 设置样式
sns.set_style("whitegrid")

# 创建图表
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 折线图
df.groupby('date')['revenue'].sum().plot(ax=axes[0, 0], title='Daily Revenue')

# 柱状图
df['category'].value_counts().plot(kind='bar', ax=axes[0, 1], title='Category Distribution')

# 散点图
axes[1, 0].scatter(df['price'], df['quantity'], alpha=0.5)
axes[1, 0].set_title('Price vs Quantity')

# 热力图
corr = df[['price', 'quantity', 'revenue']].corr()
sns.heatmap(corr, annot=True, ax=axes[1, 1], title='Correlation Matrix')

plt.tight_layout()
plt.savefig('analysis.png', dpi=300)

前端可能会说:"这些用 D3.js 也能做,而且交互性更强。"没错,D3.js 的交互能力是 Matplotlib 无法比拟的。但 Matplotlib 的优势在于快速探索和静态报告——数据分析不需要为每个图表写 200 行 D3 代码。

4.2 前端的数据觉醒

幸运的是,前端世界正在觉醒。TensorFlow.js、ONNX.js、Apache Arrow JS——这些项目正在把数据科学的能力带入浏览器。

4.2.1 TensorFlow.js

TensorFlow.js 让机器学习模型可以在浏览器中运行:

import * as tf from '@tensorflow/tfjs';

// 创建一个简单的神经网络
const model = tf.sequential({
    layers: [
        tf.layers.dense({ inputShape: [784], units: 32, activation: 'relu' }),
        tf.layers.dense({ units: 10, activation: 'softmax' })
    ]
});

model.compile({
    optimizer: 'adam',
    loss: 'categoricalCrossentropy',
    metrics: ['accuracy']
});

// 训练模型(在浏览器中)
await model.fit(xs, ys, {
    epochs: 10,
    batchSize: 32,
    callbacks: {
        onEpochEnd: (epoch, logs) => {
            console.log(`Epoch ${epoch}: loss = ${logs.loss}`);
        }
    }
});

// 预测
const prediction = model.predict(newImage);

这意味着前端可以在用户设备上运行机器学习模型,无需服务器参与。隐私保护、低延迟、离线可用——这些是服务端推理无法比拟的优势。

4.2.2 Apache Arrow JS

Apache Arrow 是一种跨语言的列式内存格式,Arrow JS 让 JavaScript 可以高效地处理大规模数据:

import { Table, FloatVector } from 'apache-arrow';

// 创建 Arrow 表
const table = Table.new(
    [
        FloatVector.from([1, 2, 3, 4, 5]),
        FloatVector.from([10, 20, 30, 40, 50])
    ],
    ['x', 'y']
);

// 高效查询
const sum = table.getColumn('y').toArray().reduce((a, b) => a + b, 0);

Arrow 的列式存储格式让数据在 Python、JavaScript、R、Julia 之间零拷贝传输成为可能。这对于前后端数据交互是一个革命性的改进。

4.3 前端的数据科学学习路径

但更深层的思考是:前端是否应该掌握数据科学的能力?

我的答案是肯定的。现代前端不再是简单的页面展示,而是数据驱动的交互应用。理解数据处理、理解机器学习的基本原理,将成为高级前端的必备技能。

Web 开发的殊途同归

5.1 Django vs Express:两种架构哲学

Django 是 Python Web 开发的旗舰框架,它的哲学是"约定优于配置"。ORM、表单处理、认证系统、管理后台——Django 提供了一站式解决方案。这种"全功能框架"的思路,让开发者可以快速搭建复杂的 Web 应用。

# Django 模型定义
from django.db import models
from django.contrib.auth.models import User

class Article(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        ordering = ['-created_at']

# Django 视图
from django.shortcuts import render, get_object_or_404
from rest_framework import viewsets
from rest_framework.decorators import action

class ArticleViewSet(viewsets.ModelViewSet):
    queryset = Article.objects.all()
    serializer_class = ArticleSerializer
    
    @action(detail=True, methods=['post'])
    def publish(self, request, pk=None):
        article = self.get_object()
        article.publish()
        return Response({'status': 'published'})

Django 的优势在于:

  • 快速开发:内置的 admin 界面让 CRUD 操作无需额外代码
  • 安全性:内置 CSRF 保护、SQL 注入防护、XSS 过滤
  • 可扩展性:丰富的第三方应用生态

但 Django 也有其局限性:

  • 灵活性不足:Django 的"全功能"意味着你必须按照它的方式做事
  • 学习曲线陡峭:需要理解 ORM、视图、模板、中间件等多个概念
  • 性能开销:大而全的框架必然带来性能损耗

Express.js 则是 Node.js 世界的微框架代表:

const express = require('express');
const mongoose = require('mongoose');

const app = express();
app.use(express.json());

// 模型定义(使用 Mongoose)
const articleSchema = new mongoose.Schema({
    title: String,
    content: String,
    author: { type: mongoose.Schema.Types.ObjectId, ref: 'User' },
    createdAt: { type: Date, default: Date.now }
});

const Article = mongoose.model('Article', articleSchema);

// 路由
app.get('/api/articles', async (req, res) => {
    const articles = await Article.find().sort({ createdAt: -1 });
    res.json(articles);
});

app.post('/api/articles', async (req, res) => {
    const article = new Article(req.body);
    await article.save();
    res.status(201).json(article);
});

app.listen(3000);

Express 的优势在于:

  • 灵活性:只提供基础功能,其他由你选择
  • 学习曲线平缓:理解中间件概念后即可上手
  • 性能:轻量级框架,开销小

但 Express 的灵活性也带来了问题:

  • 选择困难症:ORM 用 Sequelize、TypeORM 还是 Prisma?验证用 Joi、Yup 还是 class-validator?
  • 项目结构不一致:每个 Express 项目的结构都可能不同
  • 重复造轮子:很多功能需要自己实现或选择第三方库
维度 Django Express.js
架构风格 全功能框架(" batteries included ") 微框架
ORM 内置 Django ORM 需额外选择(Sequelize/TypeORM/Prisma)
认证授权 内置 需额外实现(Passport.js 等)
管理后台 内置 admin
学习曲线 陡峭 平缓
灵活性 较低
适用场景 大型项目、快速原型 中小型项目、API 服务
性能 中等

5.2 FastAPI:Python 的现代答案

如果说 Django 是 Python 的 Spring,那么 FastAPI 就是 Python 的 NestJS。FastAPI 采用声明式编程、依赖注入、类型注解,它的设计哲学与 TypeScript 生态高度契合。

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

# 数据库设置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 数据库模型
class UserDB(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)

# Pydantic 模型(用于 API 验证)
class UserBase(BaseModel):
    name: str
    email: str

class UserCreate(UserBase):
    pass

class User(UserBase):
    id: int
    
    class Config:
        orm_mode = True

# FastAPI 应用
app = FastAPI(title="User API", version="1.0.0")

# 依赖注入:数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 路由
@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = UserDB(**user.dict())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.get("/users/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(UserDB).offset(skip).limit(limit).all()
    return users

@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(UserDB).filter(UserDB.id == user_id).first()
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return user

这段代码的优雅让我惊叹:

  1. 类型安全:Pydantic 模型自动验证请求和响应
  2. 自动文档:访问 /docs 即可获得 Swagger UI 文档
  3. 异步支持:原生支持 async/await
  4. 依赖注入Depends 让代码解耦、可测试

对比 TypeScript 的 NestJS:

// NestJS 对比
import { Controller, Get, Post, Body, Param } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';

class CreateUserDto {
    name: string;
    email: string;
}

@Controller('users')
export class UserController {
    constructor(
        @InjectRepository(User)
        private userRepository: Repository<User>
    ) {}
    
    @Post()
    async create(@Body() createUserDto: CreateUserDto) {
        const user = this.userRepository.create(createUserDto);
        return this.userRepository.save(user);
    }
    
    @Get()
    async findAll() {
        return this.userRepository.find();
    }
}

FastAPI 和 NestJS 的设计如此相似——装饰器路由、依赖注入、DTO 验证、ORM 集成。这或许预示着 Web 开发的未来:语言的边界正在模糊,好的设计理念会被跨语言借鉴。

5.3 性能对比:Node.js vs Python

让我们做一个简单的性能测试,对比 Node.js 和 Python 处理 HTTP 请求的能力:

Node.js (Express)

const express = require('express');
const app = express();

app.get('/api/data', (req, res) => {
    // 模拟数据库查询
    const data = Array.from({ length: 1000 }, (_, i) => ({
        id: i,
        value: Math.random()
    }));
    res.json(data);
});

app.listen(3000);

Python (FastAPI)

from fastapi import FastAPI
import random

app = FastAPI()

@app.get("/api/data")
async def get_data():
    data = [
        {"id": i, "value": random.random()}
        for i in range(1000)
    ]
    return data

# 使用 uvicorn 运行:uvicorn main:app --workers 4

使用 wrk 进行压力测试:

# Node.js
wrk -t12 -c400 -d30s http://localhost:3000/api/data
# Requests/sec:  15000

# Python (单 worker)
wrk -t12 -c400 -d30s http://localhost:8000/api/data
# Requests/sec:   8000

# Python (4 workers)
wrk -t12 -c400 -d30s http://localhost:8000/api/data
# Requests/sec:  25000

结果挺让人惊讶的:在单 worker 模式下,Node.js 的性能是 Python 的 2 倍。但当 Python 使用多 worker(利用多核 CPU)时,性能反超 Node.js。这说明:

  1. 单线程性能:Node.js 的 V8 引擎优于 Python 的解释器
  2. 多核利用:Python 的多进程模型可以充分利用多核 CPU
  3. 场景选择:I/O 密集型任务两者差距不大,CPU 密集型任务需要多进程

融会贯通

6.1 语言只是工具,思维才是核心

深入 Python 之后,我越来越确信一个观点:编程语言的差异,远不如编程思维的差异重要。无论是 JavaScript 还是 Python,优秀的代码都遵循相同的原则:

6.1.1 SOLID 原则

单一职责原则(Single Responsibility Principle)

# 不好的设计:一个类做太多事
class UserManager:
    def create_user(self, data): ...
    def send_email(self, user): ...
    def generate_report(self): ...

# 好的设计:职责分离
class UserService:
    def create_user(self, data): ...

class EmailService:
    def send_email(self, user): ...

class ReportService:
    def generate_report(self): ...

开闭原则(Open/Closed Principle)

from abc import ABC, abstractmethod

# 抽象基类
class PaymentProcessor(ABC):
    @abstractmethod
    def process(self, amount: float) -> bool:
        pass

# 具体实现
class AlipayProcessor(PaymentProcessor):
    def process(self, amount: float) -> bool:
        # 支付宝支付逻辑
        return True

class WechatProcessor(PaymentProcessor):
    def process(self, amount: float) -> bool:
        # 微信支付逻辑
        return True

# 使用
class PaymentService:
    def __init__(self, processor: PaymentProcessor):
        self.processor = processor
    
    def pay(self, amount: float) -> bool:
        return self.processor.process(amount)

# 新增支付方式无需修改现有代码
class StripeProcessor(PaymentProcessor):
    def process(self, amount: float) -> bool:
        return True

6.1.2 设计模式

设计模式是跨语言的。无论是 JavaScript 还是 Python,观察者模式、工厂模式、策略模式等都有相似的实现:

# Python 观察者模式
from typing import List, Callable

class EventEmitter:
    def __init__(self):
        self._listeners: dict[str, List[Callable]] = {}
    
    def on(self, event: str, callback: Callable):
        if event not in self._listeners:
            self._listeners[event] = []
        self._listeners[event].append(callback)
    
    def emit(self, event: str, *args, **kwargs):
        for callback in self._listeners.get(event, []):
            callback(*args, **kwargs)

# 使用
emitter = EventEmitter()
emitter.on('user_created', lambda user: print(f"User created: {user}"))
emitter.emit('user_created', {'name': 'Alice'})
// JavaScript 观察者模式(几乎相同)
class EventEmitter {
    constructor() {
        this.listeners = {};
    }
    
    on(event, callback) {
        if (!this.listeners[event]) {
            this.listeners[event] = [];
        }
        this.listeners[event].push(callback);
    }
    
    emit(event, ...args) {
        (this.listeners[event] || []).forEach(cb => cb(...args));
    }
}

6.1.3 编程思维的培养

掌握多门语言的价值,不在于"技多不压身",而在于从不同视角理解这些原则,形成更全面的技术判断力。

  • JavaScript 教会我:异步编程、函数式思维、事件驱动
  • Python 教会我:简洁优雅、实用主义、科学计算
  • TypeScript 教会我:类型安全、接口设计、静态分析

6.2 未来的融合趋势

技术发展的趋势是融合而非对立。我们看到 Python 和 JavaScript 都在做的事情:

6.2.1 WebAssembly 的崛起

WebAssembly(Wasm)让 Python 可以在浏览器中运行:

# 使用 Pyodide 在浏览器中运行 Python
import micropip
await micropip.install('numpy')

import numpy as np
arr = np.array([1, 2, 3, 4, 5])
result = arr * 2

这意味着前端可以在浏览器中使用 Python 的数据处理能力,而无需服务器参与。

6.2.2 PyScript 的革命

PyScript 让 Python 可以直接嵌入 HTML:

<!DOCTYPE html>
<html>
<head>
    <link rel="stylesheet" href="https://pyscript.net/latest/pyscript.css" />
    <script defer src="https://pyscript.net/latest/pyscript.js"></script>
</head>
<body>
    <div id="output"></div>
    
    <py-script>
        from js import document
        import numpy as np
        
        arr = np.array([1, 2, 3, 4, 5])
        result = arr * 2
        
        output = document.getElementById('output')
        output.innerHTML = f"Result: {result.tolist()}"
    </py-script>
</body>
</html>

6.2.3 跨语言借鉴

  • Node.js 的 worker_threads 借鉴了 Python 的多进程模型
  • TypeScript 的类型系统影响了 Python 的类型注解设计
  • Rust 的所有权系统正在影响 JavaScript 和 Python 的内存管理思路

6.2.4 全栈工程师的新定义

未来的工程师,可能不再被"前端"或"后端"的标签所限制。他们会根据场景选择最合适的工具:

  • Python 处理数据、训练模型、编写自动化脚本
  • JavaScript/TypeScript 构建用户界面、实现交互逻辑
  • Rust 编写高性能模块、系统级工具
  • Go 构建微服务、高并发后端

这不是"全栈"的泛化,而是技术能力的深化。真正的技术专家,不是掌握最多语言的人,而是知道何时使用哪门语言的人。


结语

写完这篇文章,我想起了一个古老的比喻:

"如果你手里只有一把锤子,那么所有问题看起来都像钉子。"

JavaScript 是我手中的第一把锤子,它帮助我构建了无数精彩的 Web 应用。从简单的页面交互到复杂的单页应用,从 jQuery 到 React,从回调地狱到 async/await——JavaScript 陪伴我走过了前端技术的每一个阶段。

但 Python 让我看到了另一片天空:

  • 数据科学的深邃:NumPy、Pandas、Scikit-learn 让数据处理变得优雅而高效
  • 自动化的便捷:几行 Python 脚本可以替代 hours of manual work
  • 科学计算的严谨:从物理模拟到金融建模,Python 是科学家的首选语言
  • Web 开发的简洁:FastAPI 的设计哲学让我重新审视"好的代码"的定义

这两门语言不是竞争对手,而是互补的伙伴。

技术的深度,来自于对一门语言的精通;技术的广度,来自于对多门语言的理解。而技术的智慧,来自于知道何时使用哪一门语言,加油,奥利给!

手撕 Claude Code-4: TodoWrite 与任务系统

第 4 章:TodoWrite 与任务系统

源码位置:src/tools/TodoWriteTool/src/tasks/src/utils/tasks.ts


4.1 两个容易混淆的概念

Claude Code 中有两个相关但不同的"任务"概念:

概念 说明 存储位置
Todo(任务清单) Claude 在当前会话中规划的工作步骤 AppState.todos
Task(系统任务) 运行时管理的实体(子代理、Shell 命令、后台会话等) AppState.tasks

本章先讲 TodoWrite(任务清单),再讲 Task System(系统任务)。


4.2 TodoWrite:Claude 的工作计划本

4.2.1 数据模型

// src/utils/todo/types.ts
type TodoItem = {
  content: string                                    // 任务描述
  status: 'pending' | 'in_progress' | 'completed'
  activeForm: string                                 // 任务的主动形式(如 "Reading src/foo.ts")
}

type TodoList = TodoItem[]

4.2.2 工具实现

源码位置:src/tools/TodoWriteTool/TodoWriteTool.ts:65

async call({ todos }, context) {
  const appState = context.getAppState()
  
  // 每个代理有自己的 todo 列表(用 agentId 或 sessionId 区分)
  const todoKey = context.agentId ?? getSessionId()
  const oldTodos = appState.todos[todoKey] ?? []
  
  // 关键逻辑:如果所有任务都已完成,AppState 中清空为 []
  const allDone = todos.every(_ => _.status === 'completed')
  const newTodos = allDone ? [] : todos

  // 验证提醒:仅当 allDone && 3+ 项 && 无验证步骤时才触发
  let verificationNudgeNeeded = false
  if (
    feature('VERIFICATION_AGENT') &&
    getFeatureValue_CACHED_MAY_BE_STALE('tengu_hive_evidence', false) &&
    !context.agentId &&   // 只在主线程触发,不在子代理
    allDone &&            // 必须是关闭列表的那一刻
    todos.length >= 3 &&
    !todos.some(t => /verif/i.test(t.content))
  ) {
    verificationNudgeNeeded = true
  }

  // 注意:API 是 setAppState(接收 prev → newState 函数),不是 updateAppState
  context.setAppState(prev => ({
    ...prev,
    todos: { ...prev.todos, [todoKey]: newTodos },
  }))

  // 返回值中 newTodos 是原始输入 todos(非清空后的 [])
  return {
    data: { oldTodos, newTodos: todos, verificationNudgeNeeded },
  }
}

4.2.3 关键设计点

1. 所有完成即清空

当所有 todo 都标记为 completed,AppState 中的列表被清空为 [] 而不是保留所有已完成项。这避免了 token 浪费(每次 API 调用都要序列化已完成的任务)。注意:工具返回值中的 newTodos 字段仍为原始 todos 输入,清空仅影响 AppState 存储层。

2. agentId 隔离

每个子代理有独立的 todo 列表(通过 agentId 作为 key)。主线程用 sessionId。这样并行运行的子代理不会互相干扰。

3. shouldDefer: true

TodoWrite 被标记为延迟披露,不在每次 API 请求中直接包含其 Schema。Claude 通过 ToolSearch 找到它。

4. 与 TodoV2(Task API)互斥

源码位置:src/tools/TodoWriteTool/TodoWriteTool.ts:53 中:

isEnabled() {
  return !isTodoV2Enabled()
}

源码位置:src/utils/tasks.ts:133 isTodoV2Enabled() 在以下情况返回 true(即 TodoWrite 被禁用):

  • 环境变量 CLAUDE_CODE_ENABLE_TASKS=1
  • 非交互式会话(SDK 模式)

这意味着 SDK 用户默认使用 Task API 而非 TodoWrite。

5. verificationNudgeNeeded

这是一个"行为引导"机制。触发条件(需同时满足):

  • Feature flag VERIFICATION_AGENT 已开启(编译时 gate)
  • GrowthBook flag tengu_hive_evidence 为 true
  • 当前在主线程(!context.agentId
  • 当前调用正在关闭整个列表(allDone === true
  • 任务数 ≥ 3
  • 没有任何任务内容匹配 /verif/i

满足时,工具结果会追加提示,要求 Claude 在写最终摘要前先 spawn 验证代理。


4.3 Task System:运行时实体管理

Task System 管理着 Claude Code 中所有的"运行中的实体"。

4.3.0 TaskStateBase:所有任务的公共基础

源码位置:src/Task.ts:45

type TaskStateBase = {
  id: string           // 任务 ID(带类型前缀,见下表)
  type: TaskType       // 'local_bash' | 'local_agent' | 'remote_agent' | 'in_process_teammate' | 'local_workflow' | 'monitor_mcp' | 'dream'
  status: TaskStatus   // 'pending' | 'running' | 'completed' | 'failed' | 'killed'
  description: string
  toolUseId?: string   // 触发此任务的 tool_use block ID(用于通知回调)
  startTime: number
  endTime?: number
  totalPausedMs?: number
  outputFile: string   // 磁盘输出文件路径(快照,/clear 后不变)
  outputOffset: number // 已读取字节偏移量(用于增量 delta 读取)
  notified: boolean    // 是否已发送完成通知(防止重复通知)
}

Task ID 前缀表(src/Task.ts:79):

类型 前缀 示例
local_bash b b3f9a2c1
local_agent a a7d8e4f2
remote_agent r r2a1b3c4
in_process_teammate t t5e6f7a8
local_workflow w w9b0c1d2
monitor_mcp m m3e4f5a6
dream d d7b8c9d0
主会话(特殊) s s1a2b3c4

local_agent 和主会话后台化共用 a 前缀(LocalAgentTaskState),但主会话后台化通过 LocalMainSessionTask.ts 中独立的 generateMainSessionTaskId() 使用 s 前缀,与普通子代理区分。

4.3.1 TaskState 联合类型

源码位置:src/tasks/types.ts

// 注意:类型名是 TaskState,不是 Task
type TaskState =
  | LocalShellTaskState        // 长运行 Shell 命令
  | LocalAgentTaskState        // 本地子代理(含后台化主会话)
  | RemoteAgentTaskState       // 远程代理
  | InProcessTeammateTaskState // in-process 队友(团队协作)
  | LocalWorkflowTaskState     // 工作流
  | MonitorMcpTaskState        // MCP 监控
  | DreamTaskState             // 记忆整合子代理(auto-dream)

注意LocalMainSessionTask 不是独立的联合类型成员。它是 LocalAgentTaskState 的子类型:

// src/tasks/LocalMainSessionTask.ts
type LocalMainSessionTaskState = LocalAgentTaskState & { agentType: 'main-session' }

这意味着后台化主会话与普通子代理共用同一个 type: 'local_agent' 标识符,通过 agentType 字段区分。

4.3.2 主会话后台化(LocalMainSessionTask)

这是一个特殊功能:用户按 Ctrl+B 两次可以将当前对话"后台化",然后开始新的对话。后台化后,原查询继续在后台独立运行。

源码位置:src/tasks/LocalMainSessionTask.ts

// LocalMainSessionTaskState 不是独立类型,而是 LocalAgentTaskState 的子类型
type LocalMainSessionTaskState = LocalAgentTaskState & {
  agentType: 'main-session'  // 唯一区分标识
}
// type: 'local_agent'(继承)
// taskId: 's' 前缀(区分普通代理的 'a' 前缀)
// messages?: Message[](继承,存储后台查询的消息历史)
// isBackgrounded: boolean(继承,true = 后台,false = 前台展示中)
// 无独立 transcript 字段

关键函数:

// 注册后台会话任务,返回 { taskId, abortSignal }
registerMainSessionTask(description, setAppState, agentDefinition?, abortController?)

// 启动真正的后台查询(wrap runWithAgentContext + query())
startBackgroundSession({ messages, queryParams, description, setAppState })

// 将后台任务切回前台展示
foregroundMainSessionTask(taskId, setAppState): Message[]

为什么需要隔离的转录路径? 后台任务使用 getAgentTranscriptPath(agentId) 而不是主会话的路径。若用同一路径,/clear 会意外覆盖后台会话数据。后台任务通过 initTaskOutputAsSymlink() 创建软链接,/clear 重链接时不影响后台任务的历史记录。

4.3.3 本地代理任务(LocalAgentTask)

每个子代理对应一个 LocalAgentTaskState

源码位置:src/tasks/LocalAgentTask/LocalAgentTask.tsx:116

type LocalAgentTaskState = TaskStateBase & {
  type: 'local_agent'
  agentId: string
  prompt: string
  selectedAgent?: AgentDefinition
  agentType: string          // 区分子类型,'main-session' = 后台化主会话
  model?: string
  abortController?: AbortController
  error?: string
  result?: AgentToolResult
  progress?: AgentProgress   // 进度信息(包含 toolUseCount、tokenCount)
  retrieved: boolean         // 结果是否已被取回
  messages?: Message[]       // 代理的消息历史(UI 展示用)
  lastReportedToolCount: number
  lastReportedTokenCount: number
  isBackgrounded: boolean    // false = 前台展示中,true = 后台运行
  pendingMessages: string[]  // SendMessage 排队的消息,在工具轮边界处理
  retain: boolean            // UI 持有此任务(阻止驱逐)
  diskLoaded: boolean        // 是否已从磁盘加载 sidechain JSONL
  evictAfter?: number        // 驱逐时间戳(任务完成后设置)
}

注意:文档中常见误写 toolUseCount 为 LocalAgentTaskState 的直接字段,实际它在 progress.toolUseCount 中。status 字段继承自 TaskStateBase'running' | 'completed' | 'failed' | 'stopped')。

进度追踪通过专门的辅助函数:

// src/tasks/LocalAgentTask/LocalAgentTask.tsx
createProgressTracker()           // 初始化 ProgressTracker(分别追踪 input/output tokens)
updateProgressFromMessage()       // 从 assistant message 累积 token 和工具调用数
getProgressUpdate()               // 生成 AgentProgress 快照(供 UI 消费)
createActivityDescriptionResolver() // 通过 tool.getActivityDescription() 生成人类可读描述

Token 追踪的精妙设计ProgressTracker 分开存储 latestInputTokens(Claude API 累积值,取最新)和 cumulativeOutputTokens(逐轮累加),避免重复计数。

4.3.4 in-process 队友任务(InProcessTeammateTask)

这是 Agent Teams 的核心数据结构:

src/tasks/InProcessTeammateTask/types.ts

type TeammateIdentity = {
  agentId: string        // e.g., "researcher@my-team"
  agentName: string      // e.g., "researcher"
  teamName: string
  color?: string
  planModeRequired: boolean
  parentSessionId: string  // Leader 的 sessionId
}

type InProcessTeammateTaskState = TaskStateBase & {
  type: 'in_process_teammate'
  identity: TeammateIdentity        // 队友身份(存储在 AppState 中的 plain data)
  prompt: string
  model?: string
  selectedAgent?: AgentDefinition
  abortController?: AbortController         // 终止整个队友
  currentWorkAbortController?: AbortController  // 终止当前轮次
  awaitingPlanApproval: boolean             // 是否等待 plan 审批
  permissionMode: PermissionMode            // 独立权限模式(Shift+Tab 切换)
  error?: string
  result?: AgentToolResult
  progress?: AgentProgress
  messages?: Message[]              // UI 展示用(上限 50 条,TEAMMATE_MESSAGES_UI_CAP)
  pendingUserMessages: string[]     // 查看该队友时用户输入的队列消息
  isIdle: boolean                   // 是否处于空闲(等待 leader 指令)
  shutdownRequested: boolean        // 是否已请求关闭
  lastReportedToolCount: number
  lastReportedTokenCount: number
}

常见误解mailbox 字段不存在于 InProcessTeammateTaskState。队友间的通信邮箱存储在运行时上下文 teamContext.inProcessMailboxes(AsyncLocalStorage 中),不在 AppState 里。pendingUserMessages 是用户从 UI 发给该队友的消息队列,与邮箱是两回事。

内存上限设计messages 字段上限 50 条(TEAMMATE_MESSAGES_UI_CAP),超出后从头部截断。原因是生产环境出现过单个 whale session 启动 292 个 agent、内存达 36.8GB 的情况,根本原因正是此字段持有第二份完整消息副本。

4.3.5 记忆整合任务(DreamTask)

源码位置:src/tasks/DreamTask/DreamTask.ts

type DreamTaskState = TaskStateBase & {
  type: 'dream'
  phase: 'starting' | 'updating'    // starting → updating(首个 Edit/Write 后翻转)
  sessionsReviewing: number          // 正在整理的会话数量
  filesTouched: string[]             // 被 Edit/Write 触碰的文件(不完整,仅 pattern-match 到的)
  turns: DreamTurn[]                 // assistant 轮次(工具调用折叠为计数)
  abortController?: AbortController
  priorMtime: number                 // 用于 kill 时回滚 consolidationLock 时间戳
}

DreamTask 是"auto-dream"记忆整合的 UI 表面层。它不改变子代理运行逻辑,只是让原本不可见的 fork agent 在 footer pill 和 Shift+Down 对话框中可见。子代理按 4 阶段 prompt 运行(orient → gather → consolidate → prune),但 DreamTask 不解析阶段,只通过工具调用类型推断 phase。


4.4 任务注册与生命周期框架

源码位置:src/utils/task/framework.ts

4.4.1 registerTask:注册与恢复

registerTask(task: TaskState, setAppState): void

注册一个新任务时,有两条路径:

新建existing === undefined):

  1. 将 task 写入 AppState.tasks[task.id]
  2. 向 SDK 事件队列发出 task_started 事件

恢复/替换existing !== undefined,如 resumeAgentBackground):

  1. 合并保留以下 UI 状态(避免用户正在查看的面板闪烁):
    • retain:UI 持有标记
    • startTime:面板排序稳定性
    • messages:用户刚发送的消息还未落盘
    • diskLoaded:避免重复加载 sidechain JSONL
    • pendingMessages:待处理消息队列
  2. 发出 task_started(防止 SDK 重复计数)

4.4.2 任务完成通知(XML 格式)

子代理/后台任务完成时,通过 enqueuePendingNotification 将 XML 推入消息队列:

<task_notification>
  <task_id>a7d8e4f2</task_id>
  <tool_use_id>toolu_01xxx</tool_use_id>  <!-- 可选 -->
  <output_file>/tmp/.../tasks/a7d8e4f2.output</output_file>
  <status>completed</status>
  <summary>Task "修复登录 bug" completed successfully</summary>
</task_notification>

这段 XML 在下一轮 API 调用前作为 user 消息被注入 messages,使 LLM 感知到后台任务完成。

4.4.3 evictTerminalTask:两级驱逐

任务完成后并不立即从 AppState.tasks 中删除,而是两级驱逐:

任务完成 → status='completed' + notified=true
  │
  ├─ 如果 retain=true 或 evictAfter > Date.now()
  │    → 保留(UI 正在展示,等待 30s grace period 后驱逐)
  │
  └─ 否则 → 立即从 AppState.tasks 中删除(eagerly evict)
               作为保底,generateTaskAttachments() 也会在下次 poll 时驱逐

PANEL_GRACE_MS = 30_000(30秒)是 coordinator panel 中 agent 任务的展示宽限期,确保用户能看到结果后再消失。


4.5 后台 API 任务工具

用户(通过 Claude)可以创建和管理后台任务:

// TaskCreateTool / TaskUpdateTool / TaskStopTool / TaskGetTool / TaskListTool

这些工具允许 Claude 自己创建和监控后台任务,实现真正的异步多任务处理。注意:这套 Task API 工具仅在 TodoV2 模式下启用(即 isTodoV2Enabled() === true),与 TodoWrite 互斥。


4.6 任务输出的磁盘管理

源码位置:src/utils/task/diskOutput.ts

4.6.1 输出文件路径

// 注意:路径不是 .claude/tasks/,而是项目临时目录下的会话子目录
getTaskOutputPath(taskId) → `{projectTempDir}/{sessionId}/tasks/{taskId}.output`

为什么包含 sessionId? 防止同一项目的并发 Claude Code 会话互相踩踏输出文件。路径在首次调用时被 memoize(let _taskOutputDir),/clear 触发 regenerateSessionId() 时不会重新计算,确保跨 /clear 存活的后台任务仍能找到自己的文件。

4.6.2 DiskTaskOutput 写队列

任务输出通过 DiskTaskOutput 类异步写入磁盘:

class DiskTaskOutput {
  append(content: string): void  // 入队,自动触发 drain
  flush(): Promise<void>         // 等待队列清空
  cancel(): void                 // 丢弃队列(任务被 kill 时)
}

核心设计要点

  1. 写队列#queue: string[] 平铺数组,单个 drain 循环消费,chunk 写入后立即可被 GC,避免 .then() 链持有引用导致内存膨胀

  2. 5GB 上限:超限后追加截断标记并停止写入:

    [output truncated: exceeded 5GB disk cap]
    
  3. O_NOFOLLOW 安全:Unix 上用 O_NOFOLLOW flag 打开文件,防止沙箱中的攻击者通过创建软链接将 Claude Code 写入任意宿主文件

  4. 事务追踪:所有 fire-and-forget 异步操作(initTaskOutputevictTaskOutput 等)注册到 _pendingOps: Set<Promise>,测试可通过 allSettled 等待全部完成,防止跨测试的 ENOENT 竞争

4.6.3 增量输出读取(OutputOffset)

TaskStateBase.outputOffset 记录已消费的字节偏移,实现增量读取:

// 仅读取 fromOffset 之后的新内容(最多 8MB)
getTaskOutputDelta(taskId, fromOffset): Promise<{ content: string; newOffset: number }>

framework.ts 中的 generateTaskAttachments() 在每次 poll 时调用 getTaskOutputDelta,将新增内容附加到 task_status attachment 中推送给 LLM,避免重复加载完整输出文件。

4.6.4 关键函数对比

函数 是否删磁盘文件 是否清内存 适用场景
evictTaskOutput(taskId) 是(flush 后清 Map) 任务完成,结果已消费
cleanupTaskOutput(taskId) 彻底清理(测试、取消)
flushTaskOutput(taskId) 读取前确保写入完成

4.7 Cron 定时任务

通过 Cron 工具,可以创建定期自动运行的任务:

// CronCreate / CronDelete / CronList 工具
// 底层通过 src/utils/cron.ts 管理

// 使用示例(Claude 会这样调用):
// CronCreate({ schedule: '0 9 * * 1', command: '/review-pr' })
// → 每周一早上 9 点自动运行 /review-pr

4.8 流程图:任务的完整生命周期

用户输入复杂任务
      │
      ▼
Claude 调用 TodoWrite         ← 规划工作步骤
  todos: [
    { content: '读取代码', status: 'pending' },
    { content: '修改功能', status: 'pending' },
    { content: '运行测试', status: 'pending' },
  ]
      │
      ▼
Claude 开始执行第一步
TodoWrite: { status: 'in_progress' }  ← 标记进行中
      │
      ├─ 如果需要并行工作:
      │    Agent({ subagent_type: 'general-purpose', ... })
      │         │
      │         ▼
      │    创建 LocalAgentTaskState(id: 'a-xxxxxxxx',agentId 同值)
      │    子代理在独立上下文中运行
      │         │
      │         ▼
      │    子代理有自己的 Todo 列表(隔离)
      │
      ▼
每步完成后:
TodoWrite: { status: 'completed' }    ← 标记完成
      │
      ▼
所有步骤完成:
TodoWrite: todos.every(done) → 清空列表 []
      │
      ▼
Agent Loop 检测到无工具调用 → 停止

小结

组件 职责 源码位置
TodoWriteTool 会话内工作计划追踪(交互模式) src/tools/TodoWriteTool/TodoWriteTool.ts
Task API 工具 后台任务创建与管理(SDK/非交互模式) TaskCreateTool / TaskStopTool
TaskStateBase 所有任务的公共字段(含 id、status、outputOffset) src/Task.ts
LocalMainSessionTask 主会话后台化(LocalAgentTaskState 子类型) src/tasks/LocalMainSessionTask.ts
LocalAgentTaskState 子代理生命周期管理 src/tasks/LocalAgentTask/LocalAgentTask.tsx
InProcessTeammateTaskState 团队队友状态(含内存上限 50 条) src/tasks/InProcessTeammateTask/types.ts
DreamTaskState 记忆整合子代理的 UI 表面层 src/tasks/DreamTask/DreamTask.ts
任务框架 registerTask / evictTerminalTask / 通知 XML src/utils/task/framework.ts
磁盘输出管理 DiskTaskOutput 写队列 / 增量读取 / 5GB 上限 src/utils/task/diskOutput.ts
Cron 定时任务 自动化周期任务 CronCreate/Delete/List 工具

关键设计约定总结

约定 说明
LocalMainSessionTask 不是独立类型 它是 LocalAgentTaskState & { agentType: 'main-session' }
TaskState(不是 Task) 联合类型的正确名称
setAppState(不是 updateAppState AppState 更新的实际 API
toolUseCountprogress 不是 LocalAgentTaskState 的直接字段
TodoWrite 与 Task API 互斥 通过 isTodoV2Enabled()isEnabled() 中切换

零成本打造专业域名邮箱:Cloudflare + Gmail 终极配置保姆级全攻略

大家好,我是 Immerse

专注分享 AI 玩法独立开发AI 出海的 AGI 实践者,更多干货欢迎关注公众号 #沉浸式AI 或访问 yaolifeng.com


如果你手上有自己的域名,只是想把 hi@你的域名.com 用起来收发邮件,又不想每年给 Google Workspace 或 Microsoft 365 交钱,这套 Cloudflare Email Routing + Gmail 现在依然够用。

Cloudflare Email Routing 负责把别人发到你域名邮箱的邮件转进 Gmail。Gmail 负责从这个地址回信。

它不是完整企业邮箱,但拿来放在博客、作品集、简历、联系页、独立开发者官网上,已经很够用了。

主要边界

  • 能做到:别人发到你的域名邮箱,你在 Gmail 里收到;你也能在 Gmail 里用这个域名地址发信和回信
  • 做不到:Cloudflare 不给你真正的邮箱空间,也不提供发信服务器;多人协作、共享日历、管理员后台,这套都没有
  • 要注意:Cloudflare 开启 Email Routing 时会接管你这个域名的邮件 MX 记录。如果你这个域名已经在用腾讯企业邮、阿里企业邮、Google Workspace,不要直接点开通

如果你要一个看起来专业、能正常往来邮件的联系邮箱,这套很合适。你要的是团队邮箱系统,那就别省这点钱,直接上正式服务。

准备资料

  1. 一个已经接入 Cloudflare DNS 的域名
  2. 一个你能正常登录的 Gmail 账号
  3. 能开启 Google 两步验证
  4. 一个备用邮箱用来做测试,QQOutlook、另一个 Gmail 都行

部分参考文档:

先把收件打通

先做最小可用版:让别人发到 hi@你的域名.com 的邮件,能够进你的 Gmail。

  1. 登录 Cloudflare,进入你的域名,打开 Email -> Email Routing
  2. 第一次开通时,Cloudflare 会让你添加或替换邮件相关的 MXTXT 记录。这里要认真看一眼,如果面板提示要删除旧的 MX,就代表旧邮箱服务会一起停掉
  3. 开通后,去 Routing rules 里创建一个自定义地址。比如把 hi@你的域名.com 转发到 yourname@gmail.com
  4. Cloudflare 会往你的 Gmail 发一封验证邮件。点掉验证之前,这条转发规则不算真的生效
  5. 验证完成后,用另一个邮箱发一封测试邮件到你的域名邮箱,不要直接用同一个 Gmail 自己给自己发,很多时候会把你绕晕

你看到的正确结果应该是这样的:

  • Gmail 收到了这封信
  • 收件人还是你的域名邮箱,不是裸露的 Gmail
  • Cloudflare 面板里的规则状态是已启用

小建议:有建明确地址,不要一上来开 Catch-allCatch-all 很方便,但也很容易把拼错地址、垃圾邮件、爬虫乱发的邮件一起吞进来。大多数人先用 hicontacthello 这种固定地址就够了。

再把发件打通

收件只是前半段。那怎么用对应的这个域名发出去。

注意,Cloudflare 不负责发信。它只是把邮件转进来。真正往外发信的,还是 Gmail,或者你后面换掉的别家 SMTP 服务。

开 Google 两步验证,生成应用专用密码

Gmail 不会让你直接拿账号密码去配 SMTP。你要用的是应用专用密码。

  1. 打开 Google 账号的安全页,先把两步验证开起来
  2. 开完以后,再进入 应用专用密码
  3. 新建一个密码,名字随便写,比如“域名邮箱”
  4. Google 会给你一串 16 位密码,保存下来

如果找不到“应用密码”入口,可能的两个原因:

  • 你的账号还没开两步验证
  • 你开了 Advanced Protection,Google 官方说明里明确写了,这种模式下应用专用密码不可用

碰到第二种情况,就别死磕了。要么退出 Advanced Protection,要么直接跳到后面的“换第三方 SMTP”方案。

在 Gmail 里添加对应的发件域名

  1. 打开 Gmail,点右上角设置,进入“查看所有设置”
  2. 打开 账号和导入
  3. 在“用这个地址发送邮件”里点“添加其他电子邮件地址”
  4. 名字填你想展示给别人的名字,邮箱填你的域名地址,比如 hi@你的域名.com
  5. Treat as an alias 这个选项,默认保留勾选就行。你本来就是在给同一个人加另一个发件地址
  6. SMTP 配成下面这样
SMTP 服务器:smtp.gmail.com
端口:465
用户名:你的 Gmail 完整地址
密码:刚刚生成的 16 位应用专用密码
加密方式:SSL

如果 465 + SSL 连不上,再试一次 587 + TLS。这是 Google 官方文档里也支持的组合。

  1. 提交后,Gmail 会往你的域名邮箱发一封确认邮件
  2. 这封确认邮件会先到 Cloudflare,再转发进你的 Gmail
  3. 点掉确认链接,或者把验证码填回去,这个发件地址就能用了

配置自动选择回复邮件

注意,如果你想要别人给你发了邮件,你想用对应的邮件回复,这一步设置是必须的。

设置下面这一项

测试整个流程

用其他邮箱,比如 QQ,163 邮箱测试一遍

  1. 用另一个邮箱发信到 hi@你的域名.com
  2. 去 Gmail 收件箱里打开这封信,直接点回复
  3. 发出去之前,看一眼 From,确认显示的是你的域名邮箱
  4. 回到对方邮箱,确认能收到回复

到这一步,主流程就已经跑通了。

对方为什么还会看到 via gmail.com 或者 on behalf of

这不是你哪里配错了,很多时候是 Gmail 这条发信链路本来就有这个边界。

你现在这套方案里:

  • Cloudflare 负责收件转发
  • Gmail 负责真正把邮件发出去

所以在部分收件客户端里,对方可能会看到你的 Gmail 痕迹,比如 via gmail.com,或者 yourname@gmail.com on behalf of hi@你的域名.com

这在个人沟通、博客联系邮箱、独立开发者业务往来里通常问题不大

最容易踩的几个坑

  • Cloudflare 规则建好了但一直收不到邮件。先看目标 Gmail 有没有点验证邮件,没验证就不会真正转发
  • 一开通 Email Routing,原来的企业邮箱突然废了。因为 MX 已经被 Cloudflare 接管了
  • Gmail 一直提示用户名或密码错误。这里填的不是你的 Google 登录密码,是 16 位应用专用密码
  • Google 账号里根本没有应用专用密码。先查两步验证,再查是不是开了 Advanced Protection
  • 你给自己发测试邮件,觉得没收到。先换一个别的邮箱测,自己给自己发最容易被 Gmail 的会话和去重逻辑干扰判断
  • 对方看到 via gmail.com。这通常不是配置错,是 Gmail SMTP 的边界

真要说这套配置最难的地方,也就两个:一个是别把 MX 记录改糊涂,另一个是 Gmail 一定要用应用专用密码。

Claude半个月崩7次!算力不够自己造,强制实名制封

大家好,我是凌览。

如果本文能给你提供启发或帮助,欢迎动动小手指,一键三连(点赞评论转发),给我一些支持和鼓励谢谢。


Claude 最近真的不太稳。

4月15号,Anthropic 状态页一片红,Claude、Claude Code、API 全线飘绿——哦不,全线报错。 堆了 6000 多条故障报告,三小时后恢复正常。

11.png

这已经是4月以来第7次了。

翻一下记录:1号 Opus 4.6 超时、3号 Claude Code 挂了一小时、6号7号连崩、10号集体出错、13号又挂15分钟。半个月七次,谁顶得住。

服务器扛不住

Anthropic 每次都说是"重磅发布后需求暴涨",说白了就是服务器不够用。

Claude Code 和 Claude Cowork 这类产品,跑起来就是 GPU 黑洞——连续工作几小时不停,每次响应都在烧卡。需求涨太快,算力储备没跟上,怎么办?

Anthropic 的答案:自己造芯片。

22.jpeg

越赚钱,账越难算

反直觉的是,Anthropic 其实赚得不少。年化营收突破 300 亿美元,比去年底翻了三倍多,企业市场 73% 选 Claude。

但 Agent 产品太吃算力了。收入涨,成本也在飙。

怎么算账?三招:

第一招:改了企业版定价。 以前纯订阅,现在 20 美元月费加按量计费。用的多的多付,本质是把重度用户单独拎出来收钱。

第二招:Claude Code 订阅加闸。 用 OpenClaw 这类第三方 Agent,得额外交钱。"算力要优先保障自家产品"。

第三招:强制实名验证。 这刀对国内用户特别狠。KYC 需要政府证件加自拍,靠中转、套壳在用的账号,基本没通过空间。账号一封,记录全清。

33.png

巨头都在绕开英伟达

自研芯片,Anthropic 不是第一个。Meta、OpenAI 都在和博合合作造芯。

为什么找博通?定制 ASIC 的 TCO 比通用 GPU 低 30% 到 50%,每瓦性能高一个数量级。

但 ASIC 绑定特定架构,模型一变效率就下来。没有 CUDA 这种成熟生态,实验场景还是得靠英伟达。

44.png

所以各家都是"多云多芯"——谁都不完全绑在某一家。

总结

Claude 频繁宕机,表面是服务器问题,背后是算力瓶颈。营收涨得再快,Agent 产品一跑起来就是在烧钱。

Anthropic 想自研芯片,把命门攥回自己手里。但在芯片造出来之前,只能靠涨价、附加费、实名制强行算账。

说白了,故事讲得再大,芯片还得看别人脸色。

给普通人的 AI 黑话翻译手册:一文看懂 LLM、RAG、Agent 到底是什么

给普通人的 AI 黑话翻译手册:一文看懂 LLM、RAG、Agent 到底是什么

这两年,人工智能像突然学会了"说人话"一样冲进了每个人的工作和生活。有人用它写文案,有人用它做表格、查资料、写代码、做客服。与此同时,一堆原本只在技术圈里流行的词,也开始频繁出现在产品发布会、行业新闻和公司汇报里:LLM、RAG、Embedding、向量数据库、微调、Agent、幻觉、上下文窗口……

很多人第一次看到这些词时,都会有一种共同感受:每个字都认识,连在一起就看不懂。

这篇文章就是写给这类读者的。你不需要会编程,也不需要有算法背景,只要把它当成一本"AI 黑话翻译手册"来看就行。我的目标不是把你训练成工程师,而是帮你建立一张足够清晰的地图:这些词分别是什么意思,它们在整套 AI 系统里干什么,彼此之间又是什么关系。

一、先把最基础的四个词分清:AI、机器学习、深度学习、神经网络

很多人一开始就被最基础的几个词绕晕了:人工智能、机器学习、深度学习、神经网络。它们不是并列关系,而更像一层层套着的盒子。

  • 人工智能(AI) 是最大的那个框。凡是让机器表现出某种"像人一样"的智能能力,比如识别图片、理解语言、做判断、生成内容,都可以放进 AI 里。
  • 机器学习(Machine Learning) 可以理解成:不给机器写死所有规则,而是让它从数据里自己学规律。传统编程更像"人先把规则写好";机器学习更像"给机器很多例子,让它自己总结模式"。
  • 深度学习(Deep Learning) 是机器学习里最火的一支。它依赖多层神经网络来学习复杂模式,所以叫"深度"。很多现代 AI 能力,比如语音识别、图像识别、大模型,背后都离不开深度学习。
  • 神经网络(Neural Network) 则可以看成深度学习的骨架。它借用了"大脑神经元连接"的启发,用数学网络去完成"输入—加工—输出"的过程。

💡 只记一句话就够了:AI 是总称,机器学习是方法,深度学习是更强的一类方法,神经网络是深度学习的骨架。

graph TD
    subgraph AI [人工智能 AI]
        subgraph ML [机器学习 Machine Learning]
            subgraph DL [深度学习 Deep Learning]
                NN(("神经网络<br>Neural Network"))
            end
        end
    end
    style AI fill:#e1f5fe,stroke:#0288d1,stroke-width:2px
    style ML fill:#b3e5fc,stroke:#0288d1,stroke-width:2px
    style DL fill:#81d4fa,stroke:#0288d1,stroke-width:2px
    style NN fill:#4fc3f7,stroke:#0288d1,stroke-width:2px

二、生成式 AI 为什么会突然爆发?

过去很多 AI 系统主要只会"判断",比如判断这封邮件是不是垃圾邮件、这张图里是不是有车、这笔交易有没有风险。它们更像"分类器"。

但这几年真正爆火的是 生成式 AI(Generative AI) 。它和以前 AI 最大的区别在于:以前很多系统只能告诉你"是什么",现在它可以直接"写出来""画出来""做出来"。

你输入一句"帮我写一封道歉邮件",它能给你整封信;你输入一句"画一张赛博朋克风的城市夜景",它能直接生成图片;你给它一个需求,它甚至能写出一段代码。

所以,生成式 AI 的核心不是"更聪明地判断",而是"更主动地创造内容"。

graph LR
    subgraph traditional [传统AI模型]
        A1["输入: 一张图片"] --> B1{"分类器"} --> C1["输出标签: 这是一只猫"]
    end
    
    subgraph generative [生成式AI]
        A2["输入 Prompt:<br>画一只赛博朋克的猫"] --> B2{"生成大模型"} --> C2["创造新内容:<br>一张赛博朋克猫的高清图"]
    end
    
    style traditional fill:#f5f5f5,stroke:#9e9e9e
    style generative fill:#f3e5f5,stroke:#9c27b0

三、NLP、LLM、多模态,这几个词差在哪?

1. NLP:让机器理解人类语言

自然语言处理(NLP) 是 AI 和计算机科学中的一个方向,目标是让机器理解、处理和生成人类语言。 搜索引擎、语音助手、翻译软件、客服机器人,很多我们习以为常的能力,本质上都属于 NLP。

2. LLM:会读会写的大语言模型

LLM(Large Language Model,大语言模型) 是最近最常见的词之一。你可以把它想象成一个"读过很多东西、特别会组织语言的系统"。它擅长聊天、总结、翻译、写代码、改写文风、提取信息,甚至做一些初步推理。

但它也有局限:知识可能过时,不天然知道你的私有资料,而且有时会一本正经地胡说八道。

3. 多模态:不只懂文字,还会看图听音

现在越来越多模型不只处理文本,还能处理图片、音频、视频。这类模型通常被叫做 多模态模型(Multimodal)

💡 所以,如果说传统 LLM 更像"会读会写的人",那多模态模型更像"既会读写,也会看、会听、会综合判断的人"。

graph LR
    A1["文本 Text"] --> B(("多模态大模型<br>Multimodal LLM"))
    A2["图像 Image"] --> B
    A3["音频 Audio"] --> B
    B --> C1["生成文字回答与推理"]
    B --> C2["生成全新的图像/语音"]

四、Prompt、Token、上下文窗口:为什么同一句话,问法不同结果差很多?

1. Prompt:你怎么问,决定它怎么答

Prompt 就是你给模型的输入,也就是提示词。提示词工程的本质,不是什么神秘咒语,而是:把任务说清楚,把边界讲明白,把结果定义好。

你说"帮我总结一下",和你说"请用小白能看懂的语言,从背景、核心概念、例子、风险四部分总结,每部分不超过 120 字",效果通常会差很多。前者太模糊,后者给了明确任务、目标读者和输出格式。

2. Token:AI 眼中的"文字颗粒"

模型处理输入时,会先把文本切成更小的单元,这些单元叫 Token。它不完全等于汉字,也不完全等于单词,但你可以把 token 理解成模型处理文字时的基本颗粒。

3. 上下文窗口:模型一次能看到多少内容

上下文窗口(Context Window) 指的是模型在单次请求里最多能处理多少 token。

你可以把它想象成模型面前的一张工作台。台子越大,一次能摊开的材料就越多;台子越小,就得先删减信息。为什么超长文档要切片?为什么对话太长模型会"忘事"?原因之一就在这里。

五、Embedding、向量数据库、语义搜索:RAG 之前必须懂的三件事

1. Embedding:把内容变成数字坐标

Embedding(向量嵌入) 是很多人第一次听会觉得很抽象的词。简单说,就是把一句话、一段文档甚至一张图片,转换成一串数字。数字本身你看不懂,但这些数字之间的距离,可以反映"语义上像不像"。

例如"苹果手机充电慢怎么办"和"iPhone 电池掉电快如何处理"这两句话,字面并不一样,但语义接近。Embedding 正是为了帮助系统发现这种"意思相近"。

2. 向量数据库:专门存这些坐标的地方

如果 Embedding 是把内容变成坐标,那么 向量数据库(Vector Database) 就是专门存这些坐标,并且能快速找出"谁最像谁"的数据库。

普通数据库更擅长精确查找,向量数据库更擅长找"语义相近"的内容。

3. 语义搜索:不是搜字面,而是搜意思

传统搜索偏关键词匹配;语义搜索则更进一步,它尝试理解"你真正想找什么"。

💡 所以,Embedding + 向量数据库 + 语义搜索,常常是现代 AI 检索系统的三件套。

六、RAG:为什么几乎成了企业 AI 的标配?

终于可以讲最常被提到的 RAG 了。

RAG(Retrieval-Augmented Generation,检索增强生成) ,你可以把它理解成:模型先别急着回答,先去查资料,再根据查到的资料作答。

你可以把 RAG 想成开卷考试系统。普通 LLM 更像"只靠自己记忆答题";RAG 则是"先翻书,再作答"。这对企业尤其重要,因为企业里有大量模型训练时根本没见过的资料,比如公司制度、产品说明书、内部知识库、合同模板、项目文档。

RAG 的典型流程通常是:

sequenceDiagram
    actor 用户
    participant 检索系统
    participant 企业知识库
    participant 大语言模型
    
    用户->>检索系统: 1. 提问:"公司最新报销额度是多少?"
    检索系统->>企业知识库: 2. 向量化检索相关内部文档
    企业知识库-->>检索系统: 3. 返回匹配片段:《员工手册.pdf》
    检索系统->>大语言模型: 4. 组装Prompt (原始问题 + 检索到的内部规则)
    大语言模型-->>用户: 5. 基于文档内容生成精准回答

它的价值很直接:

  • 补私有知识:让模型能用企业自己的资料回答问题
  • 补新知识:资料更新后,不必重训整个模型
  • 降幻觉:回答更容易"有据可依"

当然,RAG 也不是万能药。它可能搜不到关键资料,也可能搜到的片段不完整,或者模型虽然看到了资料,却没有正确引用。所以工程上还会继续讨论"文档切片""重排序""引用""评测"等问题。

七、微调、SFT、RLHF:这是在"改模型",不是在"查资料"

很多人会把 RAG微调(Fine-tuning) 混在一起。其实它们解决的是两类问题。

1. 微调:让模型更像你想要的样子

监督微调(SFT) 可以理解成:用示例输入和理想输出去继续训练模型,让它更可靠地产生你需要的风格和内容。

所以:

  • RAG 是在回答前给模型临时补资料(查资料)。
  • 微调 是把行为习惯"练进模型里"(改模型)。

适合微调的场景通常包括:固定输出格式、品牌语气、特定行业术语、稳定执行某类任务。但如果你的问题是"知识经常更新",更适合 RAG,而不是微调。

2. RLHF:让模型更符合人类偏好

你可能还听过一个词:RLHF(Reinforcement Learning from Human Feedback,人类反馈强化学习) 。它可以简单理解成:利用人类偏好不断给模型"打分",让模型慢慢学会什么样的回答更好、更自然、更符合人类习惯。

如果继续用"训练一个助理"来打比方:

graph LR
    A["预训练模型<br>读书破万卷"] -->|SFT 监督微调| B["专项训练模型<br>做标准模拟卷"]
    B -->|RLHF 人类反馈| C["偏好对齐模型<br>老师根据表现打分"]
    C --> D(("懂规矩、听话的<br>AI 助手"))
    
    style A fill:#fff3e0,stroke:#ff9800
    style B fill:#e3f2fd,stroke:#2196f3
    style C fill:#f3e5f5,stroke:#9c27b0
    style D fill:#e8f5e9,stroke:#4caf50

八、Function Calling、Workflow、Agent:让 AI 不只是会说,还会做

1. Function Calling:模型开始学会调用工具

Function Calling(函数调用/工具调用) 的意思是:让模型不只是输出文字,还能去调用外部工具和系统,比如查数据库、调接口、搜订单、发邮件、写日历、执行代码。

这一步很关键,因为真实世界的很多任务,不是"会说"就够了,而是"得真的做"。模型相当于大脑,工具相当于手脚。

2. Workflow:先把流程钉住,再把模型放进去

很多企业一开始做 AI,最稳的路线不是直接上 Agent,而是先做 Workflow(工作流) 。也就是把流程先拆好、定好,再把模型嵌进其中某些步骤里。它的优点是稳定、可控、容易审计。

3. Agent:能规划、能调用工具、能多步完成任务的 AI

Agent(智能体) 是这两年另一个热词。它和普通聊天机器人的最大区别在于:聊天机器人通常是"你问一句,它答一句";Agent 更像"你给一个目标,它自己拆步骤想办法完成"。

比如你说:"帮我做一份某行业的竞品分析。"一个 Agent 可能会列提纲、搜资料、整理信息、生成表格、写总结,甚至在发现证据不够时回头继续查。

💡 但现实里最常见的情况其实不是"全都做成 Agent",而是:Workflow 打底,Agent 只负责其中更灵活的部分。

graph TD
    subgraph workflow_sys [Workflow 固定工作流]
        W1["步骤1: 提取网页文本"] --> W2["步骤2: LLM翻译"] --> W3["步骤3: 存入数据库"]
    end
    
    subgraph agent_sys [Agent 智能体]
        A1(("Agent核心大脑")) <-->|"设定目标与拆解"| A2["自主规划"]
        A1 <-->|"执行"| A3["调用外部工具 / API"]
        A1 <-->|"记忆与反思"| A4["获取历史信息并调整策略"]
    end
    
    style workflow_sys fill:#f5f5f5,stroke:#9e9e9e
    style agent_sys fill:#e8eaf6,stroke:#3f51b5

九、幻觉、评测、护栏:为什么 AI 不是会回答就够了?

1. 幻觉:一本正经说错话

AI 幻觉(Hallucination) 指的是模型输出了看起来像真的、但其实不准确甚至完全错误的内容。 这也是为什么很多人觉得 AI 很强,却又不敢完全信它。它厉害的地方在于表达流畅,危险的地方也在于此:它就算错了,也可能错得非常自信。

2. Evals:不评测,就不知道它到底行不行

很多 AI 产品演示时都很惊艳,但一到真实业务场景就容易翻车。于是就有了 Evals(评测) 。评测的核心不是"这次演示不错",而是系统化地衡量准确率、幻觉率、检索命中率、格式稳定性、工具调用成功率等指标。

3. Guardrails:给 AI 装护栏

真实业务里,企业不会把 AI 裸奔上线。通常都会加各种 护栏(Guardrails) ,比如敏感信息过滤、高风险问题转人工、强制引用来源、限制工具权限、限制输出格式等。护栏的意义不是让模型变完美,而是让它在出错时别错得太离谱。

十、把这些词串起来,你就看懂现在的大多数 AI 应用了

如果你现在还是觉得词很多,不妨把它们拼成一张架构图:

graph TD
    User["用户输入 Prompt"] --> Token["Token化 & 受到上下文窗口限制"]
    
    Token -->|"如果需要查企业资料"| RAG["RAG 检索增强<br>Embedding + 向量数据库"]
    Token --> LLM
    RAG --> LLM
    
    LLM(("LLM / 多模态模型<br>系统核心大脑<br>经过SFT/RLHF微调"))
    
    LLM -->|"如果需要执行任务"| FC["Function Calling 工具调用"]
    FC --> Actions["Workflow 流程 / Agent 自主规划"]
    
    Actions --> Guard["Guardrails 护栏<br>敏感词过滤 / 拦截幻觉等"]
    LLM --> Guard
    
    Guard --> Output["产生最终的安全输出与执行结果"]
    
    style LLM fill:#e1bee7,stroke:#8e24aa,stroke-width:3px
    style RAG fill:#bbdefb,stroke:#1976d2
    style FC fill:#c8e6c9,stroke:#388e3c
    style Guard fill:#ffcdd2,stroke:#d32f2f

你会发现,AI 并不是某一个神奇按钮,而更像一整套拼装起来的系统工程。

结语:理解术语,不是为了显得专业,而是为了少被忽悠

今天的 AI 圈非常热闹,也非常容易制造概念泡沫。很多时候,术语一多,普通读者就容易被带着跑:好像只要产品里有 LLM、有 RAG、有 Agent,就一定很先进。但其实不是。真正重要的从来不是"用了多少热词",而是:它到底解决了什么问题,稳定不稳定,成本高不高,是否真的比旧方案更有效。

所以,理解这些术语最大的价值,不是让你去背定义,而是让你在面对一切 AI 方案、AI 新闻和 AI 产品时,能多问几个关键问题:

  • 这是在查资料,还是在改模型?
  • 这是在回答问题,还是能执行任务?
  • 它有没有接外部知识?
  • 有没有做安全控制?
  • 有没有做过评测?

当你开始这样看 AI,你就已经比很多只会复读流行词的人,更接近真正的理解了。

📚 延伸阅读

如果您对 AI 技术的实践细节、更多提效工具和落地案例感兴趣,推荐您进一步阅读这篇非常详细的实战汇总文档:

👉 AI 提效指北:从入门到实践

本文内容致力于用最平白的话翻译复杂的 AI 概念,希望能帮到在 AI 时代探索的你!如果觉得有帮助,欢迎点赞收藏。

邪修!让显示器支持AI、远程、手势三种控制方式

大家好,我是石小石~


解锁明基RD270Q的新玩法

前不久,明基发布了最新款式的编程系列显示器 RD270Q,很荣幸我获得了优先体验资格。刚开箱,我就被它出众的颜值所吸引。

这款显示器保留了RD系列最核心也是我最喜欢的「编程模式」,而且它还升级到144Hz 高刷 并增加了彩纸模式。这使得在长时间编码下,它能极大缓解眼部疲劳,体验感非常舒适。

接下来,我会分享借助RD270Q配套的DisplayPilot2软件,结合AI与编码,如何玩转显示器的特色功能:

  • 用 Claude code 切换显示器编程模式

  • 用手机远程操控显示器锁屏

  • 用手势实现显示屏亮度调节 (动图帧率问题,图片效果不是很明显)

同时,我会结合长时间的编码体验,验证它是否能成为程序员必备的专业显示器。

显示器控制的核心——Display Pilot 2

无论是通过 AI、手机远程还是手势来控制显示器,核心本质都是依靠电脑上运行的 “脚本” 去操控显示器硬件。借助一些键鼠模拟脚本(如 Node 的robotjs、nut-js,或Python的keyboard),我们可以通过模拟鼠标事件来间接操控软件实现功能,如通过 Node.js 脚本实现自动移动鼠标,并双击启动软件的自动化操作:

对应核心代码如下:

const { mouse,straightTo,Point,Button} = require("@nut-tree-fork/nut-js");
(async () => {
  // 移动鼠标到指定位置
  await mouse.move(straightTo(new Point(10, 10)));
  console.log("鼠标移动完成!");
  // 点击鼠标
  await mouse.doubleClick(Button.LEFT);
  console.log("执行完成!");
})();

可以看出,一些复杂的软件操作,通过模拟鼠标实现还是非常麻烦的,最重要的是脚本几乎无法控制硬件。

幸运的是,明基 RD270Q 自带了配套软件 Display Pilot 2,它可以直接通过软件快速调用显示器的硬件级操作能力,以满足我们编程中的个性化控制需求。参考软件截图,它拥有非常多的显示屏操作功能,且基本都支持通过快捷键操作。

思路到这里就很清晰了:我们完全可以编写脚本,模拟键盘事件触发 Display Pilot 2 的快捷操作,从而间接实现对显示器的控制。

使用Clade code+skills控制显示屏

编程模式切换效果演示

编程模式是明基 RD 系列显示器的特色功能,在深色模式下,显示器会通过硬件级算法强化语法高亮效果,以提升长期编程的舒适度;RD270Q新增的彩纸模式,则能让界面产生类纸感的细腻色彩,满足深度护眼需求。如下图,在黑暗模式下,明基对代码的显示优化非常明显,代码对比更加鲜明,不刺眼。

它还搭载了莱茵认证的抗反射抗面板,即便在强光环境下使用,屏幕也不会刺眼、不产生明显眩光,长时间观看依旧舒适。

在配套软件的基础上,我们能否借助 AI 实现这些显示模式的一键自动切换呢?答案是完全可以。 比如,直接通过 AI 对话下达指令,让显示器自动切换至电子书模式

或是通过指令让 AI 精准调节屏幕亮度、音频大小等参数

原理分析——RD270Q-Opera-skills

Claude Code 为例,我们来实现这一效果。需要明确的是:AI 本身并不能直接操控显示器硬件,即便它能生成脚本,也不知道如何与显示器交互。因此,我们可以通过自定义技能(Skills) —— 比如创建一个 RD270Q-operation-skills,来为 AI 扩展控制显示器的能力。

如果你不了解 Skills,请自行百度。

该技能的项目结构如下:

RD270Q-operation-skills/
├── SKILL.md              # 元数据与指令定义
├── index.js              # 主入口:命令解析与分发
├── package.json          # 项目依赖配置
├── test.js               # 功能测试脚本
├── scripts/              # 底层操作模块
│   ├── keyboard.js       # 键盘快捷键封装
│   └── mouse.js          # 鼠标操作封装
└── references/           # 参考文档
    └── 快捷键表.md        # Display Pilot 2 完整快捷键

整个技能的核心逻辑非常简单: 将 Display Pilot 2 的快捷键功能在代码中做映射,让 AI 可以通过函数调用触发。

示例核心代码(scripts/keyboard.js):

// 键盘快捷键模块 - 封装 Display Pilot 2 所有控制功能
const { keyboard, Key } = require("@computer-use/nut-js");

// 执行快捷键组合
async function executeShortcut(...keys) {
  await keyboard.pressKey(...keys);
  await new Promise(resolve => setTimeout(resolve, 100));
  await keyboard.releaseKey(...keys);
}

// ==================== 色彩模式 ====================
// 循环切换色彩模式 Ctrl+Alt+C
async function cycleColorModes() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.C);
}
// 编程亮模式 Ctrl+Alt+1
async function setCodingLight() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.Num1);
}
// 编程暗模式 Ctrl+Alt+2
async function setCodingDark() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.Num2);
}
// 编程纸张模式 Ctrl+Alt+0
async function setCodingPaper() {
  await executeShortcut(Key.LeftControl, Key.LeftAlt, Key.Num0);
}
// ..... 其他快捷操作


// 导出所有方法
module.exports = {
  executeShortcut,
  cycleColorModes,
  setCodingLight,
  setCodingDark,
  setMBook,
  // ...
};

我们只需要在 SKILL.md 中规范好 AI 的调用方式与指令规则,完成整套技能开发后,Claude Code 就拥有了直接操控显示器模式的能力,使用体验直接拉满。

除了编程模式的切换,凡是 Display Pilot 2 能通过快捷键实现的显示器操控功能,这个skills都能完美胜任,甚至像Display Pilot 2屏幕分区这样的高级功能,也能通过控制鼠标来模拟实现。

使用手机远程控制显示屏

很多时候,我们可能临时有事需要离开工位,如果我们突然想锁屏或者想远程控制一下鼠标执行某个简单操作就必须立刻回到工位才行。基于这中场景,实现手机远程控制显示器就非常有意义。

如下图,就是根据明基RD270Q支持的快捷键开发的一个移动端操作界面,并增加了鼠标触摸移动控制功能。

远程锁屏、鼠标控制演示

如果外出忘记锁屏,通过手机实现这个功能非常方便实用。

此外,通过移动端界面的触控区域,我们还能远程操控鼠标移动、直接打开 VSCode 等软件。是不是有点Todesk青春版的感觉?

除此之外,其他快捷操作,如编程模式、亮度调节、夜间保护调节等功能都是支持的,这里也就不一一展示了。

原理分析——websoket+node控制快捷键

远程控制的方案其实非常简单:核心就是跑在本地的一个 Node 脚本,用来模拟键盘、鼠标操作,间接通过 Display Pilot 2 控制显示器。同时启动一个 Web 服务提供移动端操作界面,借助 WebSocket 实现手机与 Node 服务实时通信,最终完成远程控制。简单涞水,就是Web 端通过WebSocket 控制本地端Node服务模拟系统快捷键操作

前端就是一个普通的 Vue 项目 , 页面上放几个控制按钮,点击时通过 WebSocket 向 Node 服务发送对应指令:

function createWebSocketServer(server) {
  const wss = new WebSocket.Server({ server, path: "/ws" });
  wss.on("connection", (ws) => {
    console.log("移动端已连接");
    ws.on("message", async (msg) => {
        const { type, action, params } = JSON.parse(msg);
        // 鼠标操作
        if (type === "mouse") {
          if (action === "move") {
            // 鼠标移动
            await mouse.move(params.x, params.y);
          } else if (action === "click") {
            // 鼠标点击
            await mouse.click(params.button);
          }
        }
        // 键盘操作
        if (type === "keyboard"){
          
        }
    });
  });
}

Node 端主要搭建 WebSocket 服务,接收移动端指令并执行系统操作。

const app = express();
const server = http.createServer(app);

// 初始化 WebSocket 服务
createWebSocketServer(server);

server.listen(PORT, () => {
  console.log(`WS 服务已启动:ws://localhost:${PORT}/ws`);
});;

具体的鼠标移动、键盘快捷键等逻辑,统一封装在 mouse.jskeyboard.js 中,底层依赖node第三方库nut-js实现鼠标和快捷键控制。

使用手势控制显示屏

RD270Q 还有个我觉得特别实用的功能 ——Visual Optimizer 视觉优化。它通过内置光传感器,能根据环境光智能同步调节屏幕亮度与色温,降低屏幕与环境的明暗反差,配合编码深色模式,长时间看代码也更柔和护眼。

不仅如此,我们还可以通过Display Pilot 2进一步调整屏幕亮度,实现个性化需求。基于Display Pilot 2,我们还能实现通过手势控制实现显示器的隔空操作,作为技术创意尝鲜、趣味交互玩具,还是得研究和尝试的。

桌面版的手势识别存在一定技术难度,恰好之前我有写过类似的技术文章:油猴+手势识别:我实现了任意网页隔空控制!索性偷个懒,在网页上实现手势识别用来控制显示器。先看看Demo效果:

  • 左手张开 + 右手滑动,即可调低屏幕亮度(左手握拳 + 右手滑动,即可调高屏幕亮度)

  • 右手握拳,可以实现一键锁屏功能

它的核心实现是基于MediaPipe,这是一个是谷歌开源的跨平台、实时轻量级多媒体机器学习框架,支持 Python、JS 等多种编程语言,借助它能轻松实现桌面级的手势识别功能。

如果你对相关技术感兴趣,可以看看这个实现

Demo:油猴+手势识别:我实现了任意网页隔空控制!

代码:《有趣的手势识别、人脸识别脚本》

Flow 智能工作流

本来我还在琢磨,能不能通过 AI 指令或远程控制,自己搭一套编码时的专属显示方案,比如打开 VS Code 就自动切换到我习惯的亮度、护眼参数等。结果发现 RD270Q 早已自带了 Flow 智能工作流,在 Display Pilot 2 里提前预设好编程、文档、设计等场景后,打开对应软件就能自动切换显示参数,省去反复调节的麻烦,真正实现了 “打开即用” 的智能个性化体验。

结语

从借助 AI 指令、移动端远程控制显示器,到创意十足的手势隔空控制,这篇文章我通过三种个性化玩法,把RD270Q显示器的自定义操控能力发挥到了极致。这些功能实现的核心,离不开Display Pilot 2对显示器本身的 稳定操控能力。

当然,即便不借助这款软件,文中的思路也可以延伸到电脑本身的快捷操作、系统级功能调用上,大家不妨顺着这个方向自行尝试拓展。

写完这篇文章已是凌晨,144Hz 高刷屏搭配显示器的深色编码模式,长时间使用眼部依然舒适,没有出现干涩、疲劳感。实际体验下来,RD270Q 的护眼技术确实做得不错,整体感受很好。

总而言之,新款 RD270Q 不仅保留了核心优势,价格也很有诚意,三千出头,上市期间会更优惠!兄弟们,不用犹豫,这次可以放心冲了。当然,要是追求极致编程体验 RD280URD280UGRD320U也也都是非常不错的选择。

最后, 附上一张深夜codding的图,希望这篇分享能为大家带来一些实用参考。

Vibe Coding 测试体系:API 测试、单元测试与 e2e 测试实战指南

前言:测试模块是 vibe coding 过程中不可忽视的一环,能够显著提升代码的稳定性与可维护性。本文系统介绍三种主流测试方式——API 测试单元测试e2e 测试,并结合 MuseMVP 项目的真实集成方式逐一演示,帮助你在快速迭代的同时建立可靠的质量防线。

项目集成

  • 安装@playwright/testvitest@vitest/ui三个依赖
  • vitest配置
import { fileURLToPath } from "node:url";
import { defineConfig } from "vitest/config";

export default defineConfig({
  resolve: {
    alias: {
      "@": fileURLToPath(new URL("./src", import.meta.url)),
    },
  },
  test: {
    environment: "node",
    globals: true,
    include: ["tests/unit/**/*.test.ts"],
  },
});
  • playwright配置
import { defineConfig, devices } from "@playwright/test";

const baseURL =
  process.env.E2E_BASE_URL ||
  process.env.NEXT_PUBLIC_SITE_URL ||
  "http://localhost:3000";

export default defineConfig({
  testDir: "./journeys",
  fullyParallel: false,
  forbidOnly: true,
  retries: 0,
  workers: 1,
  reporter: [
    ["list"],
    [
      "html",
      { open: "never", outputFolder: "../../.test-artifacts/browser-report" },
    ],
  ],
  outputDir: "../../.test-artifacts/browser-output",
  globalTeardown: "./cleanup.ts",
  use: {
    baseURL,
    trace: "on-first-retry",
    screenshot: "only-on-failure",
    video: "retain-on-failure",
  },
  projects: [
    {
      name: "desktop-chromium",
      use: {
        ...devices["Desktop Chrome"],
      },
    },
  ],
});

单元测试

用于验证纯逻辑层的行为,例如促销码的格式处理。以下示例测试了账单模块中 schema 对输入的修剪与空值转换逻辑,无需启动完整应用即可快速执行。

import { describe, expect, test } from "vitest";
import {
  createCustomerHubSchema,
  createLaunchSchema,
  museBillingGatewayParamSchema,
} from "@/backend/api/routes/muse-billing/types";

describe("muse-billing route schemas", () => {
  test("trims discount codes and converts blanks to undefined", () => {
    const parsed = createLaunchSchema.parse({
      planProductId: "plan_pro_monthly",
      discountCode: " SAVE20 ",
    });
    const blankParsed = createLaunchSchema.parse({
      planProductId: "plan_pro_monthly",
      discountCode: "   ",
    });

    expect(parsed.discountCode).toBe("SAVE20");
    expect(blankParsed.discountCode).toBeUndefined();
  });
});

e2e测试

使用Playwright框架测试真实浏览器环境下的公共页、认证页和受保护页面

playwright添加--ui参数进行可视化验证

{
  "scripts": {
    "test:e2e": "playwright test --config=tests/e2e/playwright.muse.config.ts",
    "test:e2e:ui": "playwright test --config=tests/e2e/playwright.muse.config.ts --ui",
  }
}

落地页可见性

对公开页面进行基础冒烟测试,验证页面能够正常打开、响应状态正常、DOM 可见。

import { expect, test } from "@playwright/test";
import { ROUTE_BOOK } from "../utils/routes";

test.describe("guest-facing route smoke", () => {
  test("home page renders", async ({ page }) => {
    const response = await page.goto(ROUTE_BOOK.home);

    expect(response?.ok()).toBeTruthy();
    await expect(page.locator("body")).toBeVisible();
  });
});

埋点测试

对于需要精确验证渲染结果的页面(例如定价页),可在前端组件中添加data-testid属性作为测试锚点,无需侵入业务逻辑,即可实现精准断言。

import { expect, test } from "@playwright/test";
import { ROUTE_BOOK } from "../utils/routes";

test.describe("guest-facing route smoke", () => {
  test("pricing page renders critical pricing UI", async ({ page }) => {
    const response = await page.goto(ROUTE_BOOK.pricing);

    expect(response?.ok()).toBeTruthy();
    await expect(page.getByTestId("pricing-section")).toBeVisible();
    await expect(
      page.locator('[data-testid^="pricing-plan-cta-"]').first(),
    ).toBeVisible();
  });
});

export function PricingSection() {

 // ··· 

  return (
    <section
      id="pricing"
      className={cn("bg-muted/40", className)}
      data-testid="pricing-section"
    >
      {/* ··· */}
    section>
  );
}

后台页面

对于需要身份验证的后台页面,测试流程分两步:首先通过辅助工具创建测试用户并完成登录,再像操作真实浏览器一样验证页面内容。后续断言逻辑与公共落地页测试保持一致,复用性强。

import { expect, test } from "@playwright/test";
import { ROUTE_BOOK } from "../utils/routes";
import { establishSession, makePasswordUser } from "../utils/session";

test.describe("entry and account shell smoke", () => {
  test("unauthenticated users are redirected to login", async ({ page }) => {
    await page.goto(ROUTE_BOOK.app);

    await expect(page).toHaveURL(/\/auth\/login/);
  });

  test("password login reaches the app home page", async ({ page }) => {
    const user = await makePasswordUser({
      prefix: "e2e-login-ui",
      role: "user",
    });

    await page.goto(ROUTE_BOOK.login);
    await page.getByTestId("login-email").fill(user.email);
    await page.getByTestId("login-password").fill(user.password);
    await page.getByTestId("login-submit").click();

    await expect(page).toHaveURL(/\/app(?:\?.*)?$/);
    await expect(page.getByTestId("app-home-ready")).toBeVisible();
  });

  test("authenticated users can open general settings", async ({ page }) => {
    const user = await makePasswordUser({
      prefix: "e2e-settings",
      role: "user",
    });

    await establishSession(page, user);
    await page.goto(ROUTE_BOOK.settingsGeneral);

    await expect(page).toHaveURL(/\/app\/settings\/general(?:\?.*)?$/);
    await expect(page.getByTestId("settings-general-page")).toBeVisible();
  });
});

以上均为带 UI 的交互式测试。去掉--ui参数后,测试将在headless模式下静默运行,适合在CI环境中使用,结果如下:

PS D:\project\___test> pnpm test:e2e
[dotenv@17.2.4] injecting env (20) from .env -- tip: 📡 add observability to secrets: https://dotenvx.com/ops

[browser cleanup] Removed 3 browser scenario user(s).

> musemvp@0.0.0 test:e2e D:\project\___test
> playwright test --config=tests/e2e/playwright.muse.config.ts

[dotenv@17.2.4] injecting env (20) from .env -- tip: 🔑 add access controls to secrets: https://dotenvx.com/ops

Running 7 tests using 1 worker

[dotenv@17.2.4] injecting env (0) from .env -- tip: ✅ audit secrets and track compliance: https://dotenvx.com/ops1 …nt-entry.smoke.spec.ts:6:7 › entry and account shell smoke › unauthenticated users are redirected to login (772ms)  ✓  2 …account-entry.smoke.spec.ts:12:7 › entry and account shell smoke › password login reaches the app home page (1.8s)ℹ [DB] runtime=node source=database_url strategy=database_url_first NODE_ENV=unknown POOL_MAX=53 …t-entry.smoke.spec.ts:27:7 › entry and account shell smoke › authenticated users can open general settings (949ms)  ✓  4 …um] › tests\e2e\journeys\admin-console.smoke.spec.ts:5:5 › admin users can access the users management page (1.2s)  ✓  5 …omium] › tests\e2e\journeys\public-routes.smoke.spec.ts:5:7 › guest-facing route smoke › home page renders (732ms)  ✓  6 …eys\public-routes.smoke.spec.ts:12:7 › guest-facing route smoke › pricing page renders critical pricing UI (789ms)  ✓  7 …] › tests\e2e\journeys\public-routes.smoke.spec.ts:22:7 › guest-facing route smoke › features page renders (694ms)

[browser cleanup] Removed 3 browser scenario user(s).

  7 passed (15.1s)

To open last HTML report run:

  pnpm exec playwright show-report test-results\browser-report

API测试

用于验证 AI chat 对话接口的权限隔离行为。以下示例测试了多用户场景下的会话归属:确保用户 A 创建的会话对用户 B 不可见,同时验证所有者自身的读取权限正常工作。

import { afterAll, beforeAll, describe, expect, test } from "vitest";
import { purgeScenarioUsers } from "../utils/accounts";
import {
  ensureLocalAppReady,
  issueSignedInSession,
  sendContractRequest,
} from "./request-utils";

describe("conversation ownership stays isolated across member sessions", () => {
  beforeAll(async () => {
    await ensureLocalAppReady();
  });

  afterAll(async () => {
    await purgeScenarioUsers();
  });

  test("one user cannot fetch or delete another user's conversation", async () => {
    const userA = await issueSignedInSession({
      prefix: "ownership-a",
    });
    const userB = await issueSignedInSession({
      prefix: "ownership-b",
    });

    const createResponse = await sendContractRequest(
      "POST",
      "/api/aichat/conversations",
      {
        cookies: userA.cookies,
      },
    );

    expect(createResponse.status).toBe(200);

    const createdPayload = (await createResponse.json()) as {
      conversation: { id: string };
    };
    const conversationId = createdPayload.conversation.id;

    const foreignRead = await sendContractRequest(
      "GET",
      `/api/aichat/conversations/${conversationId}`,
      {
        cookies: userB.cookies,
      },
    );
    expect(foreignRead.status).toBe(404);

    const foreignDelete = await sendContractRequest(
      "DELETE",
      `/api/aichat/conversations/${conversationId}`,
      {
        cookies: userB.cookies,
      },
    );
    expect(foreignDelete.status).toBe(404);

    const ownerRead = await sendContractRequest(
      "GET",
      `/api/aichat/conversations/${conversationId}`,
      {
        cookies: userA.cookies,
      },
    );
    expect(ownerRead.status).toBe(200);
  });
});

测试是 vibe coding 稳定运行的最后一道保障。

API 测试、单元测试、e2e 测试三层覆盖,缺一不可。希望本文的实战示例能直接用进你的项目中。

MuseMVP SaaS 模板已将上述三种测试模块完整集成,开箱即用,无需从零搭建。

公众号:尼采般地抒情

别再手搓 Skill 了,用这个工具 5 分钟搞定

大家好,我是凌览。

如果本文能给你提供启发或帮助,欢迎动动小手指,一键三连(点赞评论转发),给我一些支持和鼓励谢谢。


说实话,第一次看到 "Skill" 这个词,我也有点懵。是不是又要写很多代码?是不是只有程序员能玩?

后来自己上手做了几个才发现——完全不是那么回事。Skill 更像是:把一件你本来就会做的事,写成一套能反复执行、不会乱跑的步骤

就说做饭吧。第一次做某道菜,你得一边看菜谱一边试探;做多了流程就固定了——先备料,再处理,最后出锅。这时候让别人帮你做,你大概会说"就按这个步骤来,别自己发挥"。Skill 干的就是这事,只是对象从「人」变成了「模型」——让大模型也照着这个步骤来,别自己发挥。

一个 Skill 到底长啥样?

先记住一句话:一个 Skill,本质上就是一个文件夹。

最简单的结构:

my-skill/
├── SKILL.md        # 说明:什么时候用、输入输出是什么
├── scripts/        # 执行:真正跑的逻辑
└── references/     # 参考:示例输入输出

SKILL.md 是最核心的——告诉模型"这件事干嘛用的、什么时候用、输出长啥样"。

你可以把它当成一份「使用说明书 + 注意事项」。

Skill 能解决什么问题?

举个例子,之前我开放了一个去水印下载鸭工具,同时写了份接口文档。但说实话,调用接口的步骤是固定的——传参数、发请求、解析返回。就这几步,每次都要重复。

调用步骤:

  1. 传参:token、url
  2. 发请求:
curl -X GET "https://nologo.code24.top/api/open/parse?url=https%3A%2F%2Fv.douyin.com%2Fxxxxx" \
  -H "Authorization: your-token"
  1. 解析返回数据

就这三步,每次都要重复一遍,挺烦的。 图片1.png

写成 Skill 之后,这些步骤直接固化下来,AI 碰到要调去水印接口的场景,自己就跑完了,不用你再手动复制粘贴。

其中 token、url 怎么获取,直接在 SKILL.md 里写清楚,或者丢个文档链接就行。

image.png

这个去水印解析的 Skill 已经开源了:github.com/CatsAndMice…,名字叫 nologo-open-api

效果测试成功:

image 1.png

但说实话,手搓还是有点麻烦

概念不难理解,但真要自己从头写一个 Skill,还是得折腾——要想触发条件,要写 SKILL.md,要调试……

有没有更省事的方法?

有。用 skill-creator。

skill-creator 是什么?

Skill-Creator是一款专为开发者设计的Skill创建向导,旨在简化开发流程。其便捷性和实用性已得到广泛验证,在SkillHub上的下载量已突破7.5万。

地址:www.skillhub.cn/skills/skil…

image 2.png

说到 SkillHub,这事儿还有点意思:

安装 Skill-Creator 特别简单——直接跟它聊天就行,它会一步步指导 AI 帮你创建技能

举个例子:

你:"用 skill-creator 帮我创建一个提取小红书链接的 Skill"

它:"好的,我来帮你创建。先告诉我:这个 Skill 要处理什么类型的链接?"

你:"抖音、小红书都行,主要提取无水印的视频地址"

它:"明白了。输出格式要不要加上 metadata?我给你两个选项……"

就这么一来一回,你描述需求,它帮你搞定剩下的——写 SKILL.md、搭目录结构、甚至调试代码。

说白了就是:你出想法,它帮你落地。

总结

Skill 的核心是把一件事的"固定流程"写成可反复执行的步骤,让模型按规程稳定产出,避免每次都从零复制粘贴、手动操作。像调用去水印接口这类标准化流程,尤其适合沉淀成 Skill 直接复用;而如果你不想从触发条件、SKILL.md 到目录结构都自己手搓,skill-creator 这种对话式向导能把创建与落地成本降到最低。

iOS逆向工程:详细解析ptrace反调试机制的破解方法与实战步骤

Ptrace 提供了一种父进程可以控制子进程运行的机制,并可以检查和改变它的核心image。

  • 它主要用于实现断点调试。

1、一个被跟踪的进程运行中,直到发生一个信号,则进程被中止,并且通知其父进程。 2、在进程中止的状态下,进程的内存空间可以被读写。父进程还可以使子进程继续执行,并选择是否是否忽略引起中止的信号。

  • 本文采用tweak 的方式进行MSHookFunction

在iOS应用安全中,除了反调试机制,代码混淆也是一种重要的保护手段。IpaGuard是一款强大的iOS IPA文件混淆工具,无需源码即可对代码和资源进行混淆加密,支持多种开发平台,有效增加反编译难度。它提供代码混淆、资源文件混淆、调试信息清理等功能,可以帮助开发者保护应用免受逆向工程攻击。

软件环境:Xcode 硬件环境:iPhone5越狱手机、Mac 开发工具:Cycript、LLDB、logos Tweak、hopper、MonkeyDev、AFLEXLoader、dumpdecrypted、debugserver、ssh、class_dump、hook

  • 破解方案

1、运行时期,断点ptrace,直接返回 2、分析如何调用的ptrace,hook ptrace 3、通过tweak,替换disable_gdb函数 4、修改 PT_DENY_ATTACH

I、运行时期,断点ptrace,直接返回

初始化应用程序,而不是运行中附着

iPhone:~ root#  debugserver -x posix *:12345  /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet

初始化程序,目的是从程序入口就开始进行附着,这样我们就可以在一些安全防护代码执行之前,进行破解。

跳过ptrace:过命令thread return直接返回,以跳过函数的逻辑。

 (lldb) br set -n ptrace
 Breakpoint 2: where = libsystem_kernel.dylib`__ptrace, address = 0x00000001966af2d4
 (lldb) br command add 2
 Enter your debugger command(s).  Type 'DONE' to end.
 > thread return
 > c
 > DONE

II、分析如何调用的ptrace,hook ptrace

去掉ptrace的思路

  • 当程序运行后,使用 debugserver *:1234 -a BinaryName 附加进程出现 segmentfault 11 时,一般说明程序内部调用了ptrace 。
  • 为验证是否调用了ptrace 可以 debugserver -x backboard *:1234 /BinaryPath(这里是完整路径),然后下符号断点 b ptrace,c 之后看ptrace第一行代码的位置,然后 p $lr 找到函数返回地址,再根据 image list -o -f 的ASLR偏移,计算出原始地址。最后在 IDA 中找到调用ptrace的代码,分析如何调用的ptrace。
  • 开始hook ptrace。

2.0 准备工作:砸壳

  • 砸壳

blog.csdn.net/z929118967/…

  • 查找app路径
iPhone:~ root# ps -e |grep AlipayWallet
  714 ??         0:26.44 /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
  736 ttys000    0:00.01 grep AlipayWallet

  • 获取NSDocumentDirectory
iPhone:~ root# cycript -p AlipayWallet
cy# [[NSFileManager defaultManager] URLsForDirectory:NSDocumentDirectory inDomains:NSUserDomainMask][0]
#"file:///var/mobile/Containers/Data/Application/89313E1C-76C2-41E3-8ECD-F4BDC1A78524/Documents/"

  • scp 拷贝文件
devzkndeMacBook-Pro:decrypted devzkn$ scp /Users/devzkn/Downloads/kevin-software/ios-Reverse_Engineering/dumpdecrypted-master/dumpdecrypted.dylib iphone150:/var/mobile/Containers/Data/Application/89313E1C-76C2-41E3-8ECD-F4BDC1A78524/Documents/

devzkndeMacBook-Pro:decrypted devzkn$ scp iphone150:/var/mobile/Containers/Data/Application/89313E1C-76C2-41E3-8ECD-F4BDC1A78524/Documents/AlipayWallet.decrypted /Users/devzkn/decrypted/AlipayWallet

  • class-dump
devzkndeMacBook-Pro:bin devzkn$ class-dump --arch armv7 /Users/devzkn/decrypted/AlipayWallet10.1.8/AlipayWallet.decrypted -H -o /Users/devzkn/decrypted/AlipayWallet10.1.8/head

-查看 bundleIdentifier

iPhone:~ root# cycript -p AlipayWallet
cy# [[NSBundle mainBundle] bundleIdentifier]
@"com.alipay.iphoneclient"

2.1 编写 tweak 分析

%hook DFClientDelegate
- (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions {

    %log();

    // 打印某个类的所有方法的,查看所有方法的执行顺序

     [KNHook hookClass:@"H5WebViewController"];//aluLoginViewController
     [KNHook hookClass:@"TBSDKServer"];//getUaPageName    aluMTopService _tokenLoginInvoker

     [KNHook hookClass:@"TBSDKMTOPServer"];//getUaPageName    aluMTopService _tokenLoginInvoker

    return %orig;
}
%end

2.2 具体步骤

2.2.1 debugserver

iPhone:~ root#  debugserver *:12345 -a AlipayWallet
debugserver-@(#)PROGRAM:debugserver  PROJECT:debugserver-320.2.89
 for armv7.
Attaching to process AlipayWallet...
Segmentation fault: 11

当程序运行后,使用 debugserver *:1234 -a BinaryName 附加进程出现 segmentfault 11 时,一般说明程序内部调用了ptrace 。

iPhone:~ root#  debugserver *:12345 -x backboard /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
debugserver-@(#)PROGRAM:debugserver  PROJECT:debugserver-320.2.89
 for armv7.
Segmentation fault: 11

2.2.2 分析如何调用的ptrace

  • debugserver -x
iPhone:~ root# debugserver -x *:12345 /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
error: invalid TYPE for the --launch=TYPE (-x TYPE) option: '*:12345'
Valid values TYPE are:
  auto       Auto-detect the best launch method to use.
  posix      Launch the executable using posix_spawn.
  fork       Launch the executable using fork and exec.
  backboard  Launch the executable through BackBoard Services.

总共有四种类型

debugserver -x backboard *:1234 /var/mobile/...... 把这个backboard改成posix试试

iPhone:~ root#  debugserver -x posix *:12345  /var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet
debugserver-@(#)PROGRAM:debugserver  PROJECT:debugserver-320.2.89
 for armv7.
Listening to port 12345 for a connection from *...

  • 在ptrace上下断点,找到调用ptrace的地方
(lldb)  b ptrace
Breakpoint 1: no locations (pending).
WARNING:  Unable to resolve breakpoint to any actual locations.

(lldb) c
Process 1657 resuming

  • 关闭Target,重新启动Target
1 location added to breakpoint 1
Process 1657 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
    frame #0: 0x37a13e64 libsystem_kernel.dylib`__ptrace
libsystem_kernel.dylib`__ptrace:
->  0x37a13e64 <+0>:  ldr    r12, [pc, #0x4]           ; <+12>
    0x37a13e68 <+4>:  ldr    r12, [pc, r12]
    0x37a13e6c <+8>:  b      0x37a13e74                ; <+16>
    0x37a13e70 <+12>: rsbeq  r9, r11, #192, #2
Target 0: (AlipayWallet) stopped.
(lldb)  p/x $lr
(unsigned int) $0 = 0x0000bfbb

由此可见ptrace函数在libsystem_kernel.dylib这个动态库中,使用时才进行加载,不是静态放在本地的,所以我们不能简单地去tweak ptrace函数。

(lldb) image list -o -f |grep AlipayWallet
[  0] 0x00000000 /private/var/mobile/Containers/Bundle/Application/A612F542-81EF-456A-A6A0-B23046EF57BA/AlipayWallet.app/AlipayWallet(0x0000000000004000)

所以ptrace的调用者位于0x0000bfbb - 0x00000000 = 0x0000bfbb处,如图所示:

  • 在hopper使用go to add ,快捷键G

大文件上传-spark-md5

概述:后端服务(Node + Express)、前端(vue+spark-md5)

一、后端服务

1、创建后端项目

mkdir upload-server
cd upload-server
npm init -y
npm install express cors multer fs-extra

2、 后端完整代码 server.js

const express = require('express');
const cors = require('cors');
const multer = require('multer');
const fse = require('fs-extra');
const path = require('path');

const app = express();
// 解决跨域 + 大文件请求体限制
app.use(cors());
app.use(express.json({ limit: '100mb' }));
app.use(express.urlencoded({ extended: true, limit: '100mb' }));

// 配置(和前端完全一致)
const UPLOAD_DIR = path.resolve(__dirname, 'upload'); // 分片临时目录
const MERGE_DIR = path.resolve(__dirname, 'merged');   // 合并后文件目录
const CHUNK_SIZE = 2 * 1024 * 1024; // 2MB 分片

// 确保目录存在(启动时就创建,避免运行时创建失败)
fse.ensureDirSync(UPLOAD_DIR);
fse.ensureDirSync(MERGE_DIR);

// ✅ 修复1:multer 配置,不再从 req.body 取参数,改用动态存储
const storage = multer.memoryStorage(); // 改用内存存储,避免目录创建时序问题
const upload = multer({ 
  storage,
  limits: { fileSize: 5 * 1024 * 1024 } // 限制分片大小,和前端一致
});

/**
 * 1. 查询已上传的分片(断点续传/秒传核心)
 */
app.post('/checkfile', async (req, res) => {
  try {
    const { fileHash, fileName } = req.body;
    if (!fileHash || !fileName) {
      return res.status(400).json({ code: -1, msg: '参数缺失' });
    }

    const ext = path.extname(fileName);
    const filePath = path.resolve(MERGE_DIR, `${fileHash}${ext}`);
    
    // 秒传:文件已存在
    if (fse.existsSync(filePath)) {
      return res.json({ code: 0, uploadedChunks: [], shouldUpload: false });
    }

    // 读取已上传分片
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
    let uploadedChunks = [];
    if (fse.existsSync(chunkDir)) {
      uploadedChunks = await fse.readdir(chunkDir);
    }
    res.json({ code: 0, uploadedChunks, shouldUpload: true });
  } catch (error) {
    console.error('checkfile 错误:', error);
    res.status(500).json({ code: -1, msg: '服务器错误' });
  }
});

/**
 * 2. 上传分片(修复核心:手动处理存储,避免 multer 时序问题)
 */
app.post('/uploadchunk', upload.single('chunk'), async (req, res) => {
  try {
    const { fileHash, chunkIndex } = req.body;
    const chunk = req.file; // multer 解析后的文件 buffer

    if (!fileHash || chunkIndex === undefined || !chunk) {
      return res.status(400).json({ code: -1, msg: '参数缺失' });
    }

    // 手动创建分片目录(确保存在)
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
    await fse.ensureDir(chunkDir);

    // 手动写入分片文件
    const chunkPath = path.resolve(chunkDir, chunkIndex.toString());
    await fse.writeFile(chunkPath, chunk.buffer);

    res.json({ code: 0, msg: '分片上传成功' });
  } catch (error) {
    console.error('uploadchunk 错误:', error);
    res.status(500).json({ code: -1, msg: '分片上传失败', error: error.message });
  }
});

/**
 * 3. 合并所有分片
 */
app.post('/mergefile', async (req, res) => {
  try {
    const { fileHash, fileName, chunkCount } = req.body;
    if (!fileHash || !fileName || !chunkCount) {
      return res.status(400).json({ code: -1, msg: '参数缺失' });
    }

    const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
    const ext = path.extname(fileName);
    const filePath = path.resolve(MERGE_DIR, `${fileHash}${ext}`);

    // 检查分片目录是否存在
    if (!fse.existsSync(chunkDir)) {
      return res.status(400).json({ code: -1, msg: '分片目录不存在' });
    }

    // 按顺序合并分片
    const writeStream = fse.createWriteStream(filePath);
    for (let i = 0; i < chunkCount; i++) {
      const chunkPath = path.resolve(chunkDir, i.toString());
      // 检查分片是否存在
      if (!fse.existsSync(chunkPath)) {
        return res.status(400).json({ code: -1, msg: `分片 ${i} 缺失` });
      }
      const readStream = fse.createReadStream(chunkPath);
      await new Promise((resolve, reject) => {
        readStream.pipe(writeStream, { end: false });
        readStream.on('end', resolve);
        readStream.on('error', reject);
      });
    }

    // 关闭写入流
    writeStream.end();

    // 合并完成删除分片目录
    await fse.remove(chunkDir);
    res.json({ code: 0, msg: '文件合并成功', url: `/merged/${fileHash}${ext}` });
  } catch (error) {
    console.error('mergefile 错误:', error);
    res.status(500).json({ code: -1, msg: '合并失败', error: error.message });
  }
});

// 静态资源访问合并后的文件
app.use('/merged', express.static(MERGE_DIR));

const PORT = 3000;
app.listen(PORT, () => {
  console.log(`后端服务启动成功:http://localhost:${PORT}`);
  console.log(`分片存储目录:${UPLOAD_DIR}`);
  console.log(`合并后文件目录:${MERGE_DIR}`);
});

3、启动后端:

node server.js

二、前端服务

1、.vue文件

<template>
  <div id="app" style="max-width: 800px;margin: 50px auto;">
    <h2>Vue2 大文件分片上传(断点续传)</h2>
    <input type="file" @change="handleFileChange">
    <button :disabled="!file || uploading" style="margin-left: 10px;" @click="handleUpload">
      {{ uploading ? '上传中...' : '开始上传' }}
    </button>

    <!-- 进度条(已修复动态样式) -->
    <div v-if="totalProgress > 0" style="margin-top: 20px;">
      <div>总进度:{{ totalProgress.toFixed(2) }}%</div>
      <div style="height:5px;background:#eee;border-radius:3px;">
        <div
          :style="{
            height: '100%',
            background: '#42b983',
            width: totalProgress + '%',
            transition: '0.3s'
          }"
        />
      </div>
    </div>

    <div style="margin-top: 20px;color: #333;">
      <p v-if="uploadedChunkList.length">已上传分片:{{ uploadedChunkList.join(',') }}</p>
      <p v-if="msg" :style="{color: msg.includes('成功') ? 'green' : 'red'}">{{ msg }}</p>
    </div>
  </div>
</template>

<script>
import SparkMD5 from 'spark-md5'
import axios from 'axios'

export default {
  name: 'App',
  data() {
    return {
      file: null,
      fileHash: '',
      CHUNK_SIZE: 2 * 1024 * 1024, // 2MB 分片
      chunkList: [],
      uploadedChunkList: [],
      uploading: false,
      totalProgress: 0,
      msg: '',
      MAX_CONCURRENT: 3 // ✅ 新增:最大并发数,控制同时上传的分片数量
    }
  },
  methods: {
    // 1. 选择文件
    async handleFileChange(e) {
      const file = e.target.files[0]
      if (!file) return
      this.file = file
      this.msg = '正在计算文件指纹...'
      this.fileHash = await this.getFileHash(file)
      this.msg = `文件:${file.name},hash:${this.fileHash.slice(0, 10)}...`
    },

    // 2. 计算文件 MD5(优化:大文件完整计算,避免 hash 冲突)
    getFileHash(file) {
      return new Promise((resolve, reject) => {
        const spark = new SparkMD5.ArrayBuffer()
        const fileReader = new FileReader()
        const chunkSize = 2 * 1024 * 1024
        let offset = 0

        const loadNext = () => {
          const slice = file.slice(offset, offset + chunkSize)
          fileReader.readAsArrayBuffer(slice)
        }

        fileReader.onload = (e) => {
          spark.append(e.target.result)
          offset += e.target.result.byteLength
          if (offset < file.size) {
            loadNext()
          } else {
            resolve(spark.end())
          }
        }

        fileReader.onerror = reject
        loadNext()
      })
    },

    // 3. 开始上传(主流程,修复并发+进度)
    async handleUpload() {
      if (!this.file) return alert('请选择文件')
      this.uploading = true
      this.totalProgress = 0
      this.msg = ''

      try {
        // 1)查询已上传分片
        const { data } = await axios.post('http://localhost:3000/checkfile', {
          fileHash: this.fileHash,
          fileName: this.file.name
        })

        // 秒传
        if (!data.shouldUpload) {
          this.msg = '✅ 秒传成功:文件已存在'
          this.uploading = false
          this.totalProgress = 100
          return
        }
        this.uploadedChunkList = data.uploadedChunks.map(String) // 统一转字符串,避免类型不匹配

        // 2)切分文件
        this.chunkList = this.createChunks(this.file)
        const total = this.chunkList.length
        let uploadedCount = this.uploadedChunkList.length // 已上传分片数
        this.totalProgress = (uploadedCount / total) * 100 // 初始化进度

        // 3)过滤出需要上传的分片
        const needUploadChunks = this.chunkList
          .map((chunk, index) => ({ chunk, index }))
          .filter(item => !this.uploadedChunkList.includes(item.index.toString()))

        // ✅ 4)并发控制上传(核心优化,避免后端 500)
        await this.concurrentUpload(needUploadChunks, total, (count) => {
          uploadedCount += count
          this.totalProgress = (uploadedCount / total) * 100
        })

        // 5)通知后端合并文件
        const mergeRes = await axios.post('http://localhost:3000/mergefile', {
          fileHash: this.fileHash,
          fileName: this.file.name,
          chunkCount: this.chunkList.length
        })

        if (mergeRes.data.code === 0) {
          this.msg = '✅ 上传 + 合并完成!'
          this.totalProgress = 100
        } else {
          this.msg = `❌ 合并失败:${mergeRes.data.msg || '未知错误'}`
        }
      } catch (error) {
        console.error('上传错误:', error)
        this.msg = `❌ 上传失败:${error.message || '未知错误'}`
      } finally {
        this.uploading = false
      }
    },

    // ✅ 并发控制上传方法
    async concurrentUpload(chunks, total, onProgress) {
      console.log('999999 分片数量', chunks)
      const results = []
      // 分批上传,每批 MAX_CONCURRENT 个
      for (let i = 0; i < chunks.length; i += this.MAX_CONCURRENT) {
        const batch = chunks.slice(i, i + this.MAX_CONCURRENT)
        const batchPromises = batch.map(async({ chunk, index }) => {
          const formData = new FormData()
          formData.append('chunk', chunk)
          formData.append('fileHash', this.fileHash)
          formData.append('chunkIndex', index)

          // 重试逻辑:失败自动重试 2 次
          for (let retry = 0; retry < 2; retry++) {
            try {
              await axios.post('http://localhost:3000/uploadchunk', formData, {
                headers: { 'Content-Type': 'multipart/form-data' }
              })
              return { success: true, index }
            } catch (e) {
              console.warn(`分片 ${index} 上传失败,重试 ${retry + 1}`)
              if (retry === 1) throw e
              await new Promise(resolve => setTimeout(resolve, 1000)) // 重试间隔 1s
            }
          }
        })

        const batchResults = await Promise.allSettled(batchPromises)
        results.push(...batchResults)
        // 更新进度:每批完成后计算
        const successCount = batchResults.filter(r => r.status === 'fulfilled' && r.value.success).length
        onProgress(successCount)
      }

      // 检查是否有失败的分片
      const failed = results.filter(r => r.status === 'rejected' || !r.value?.success)
      if (failed.length > 0) {
        throw new Error(`有 ${failed.length} 个分片上传失败`)
      }
    },

    // 切分文件
    createChunks(file) {
      const chunks = []
      let start = 0
      while (start < file.size) {
        const end = Math.min(start + this.CHUNK_SIZE, file.size)
        chunks.push(file.slice(start, end))
        start = end
      }
      return chunks
    }
  }
}
</script>

2、安装依赖

npm install axios spark-md5 --save

3、启动服务

npm run serve

总结:

  1. 查询列表(文件是否已上传/已传切片index)-文件hash唯一性(spark-md5获取)
  2. 开始上传,过滤已上传的切片数
  3. 剩余切片分批次并行上传(第一次失败,自动重试 1 次,第二次失败,抛错)
  4. 全部切片上传成功后,调用合并接口(通知后端可以合并切片数了)

秒传:文件完整存在 → 直接跳过上传

断点续传:只传缺失分片 → 断网 / 刷新可恢复(hash查询,再过滤)

并发控制:分批次,防止同时发大量请求导致崩溃

备注:按以上步骤,直接可以实践操作

从网关的角度理解并实现一个 Mini OpenClaw

1. 前言

OpenClaw 与其他 AI Agent 最本质的区别是什么?首先,OpenClaw 本身也是一个 AI Agent,但关键在于它能连接多种 IM 渠道,并利用这些 IM 工具提供的开发能力来调用自身的 Agent——这种能力被称为“网关”。因此,有后端的技术大咖将 OpenClaw 总结为:OpenClaw = 高权限 AI Agent + 网关

所以只有理解了 OpenClaw 的本质之后,我们才可以实现一个 Mini OpenClaw。

首先我们要实现一个网关,那么网关是什么呢?

网关对于后端的同学来说,肯定不陌生。在 Spring Boot 微服务架构中,API 网关已成为标准的基础设施组件,其核心作用与 OpenClaw 中的“网关”如出一辙:对外隐藏后端的实现细节(服务地址、版本、熔断等),对内统一通信协议,并提供横切能力(如鉴权、限流、日志等) 。两者的区别仅在于作用对象不同——OpenClaw 的网关面向 IM 渠道(消息协议适配),而后端网关面向 HTTP/RPC 调用(协议转换与流量管理)。

所以 OpenClaw 的所谓网关就是一个消息协议适配器。

所以我们先要实现网关最核心的功能:协议适配。这是网关最本质的能力——对外讲 IM 的方言,对内统一说普通话。

2. 网关核心功能:协议适配

不同 IM(飞书、微信 等)的消息格式千差万别:有的用 user_id,有的用 from 字段,有的消息正文可能嵌套在 text 或 message 对象中。我们可以通过设计一个消息协议将这些差异全部“抹平”,这样本地 AI Agent 就只依赖这标准消息协议,无需关心消息来自哪个渠道。

设计一个入站的消息对象 InboundMessage:

# events.py
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class InboundMessage:
    """从聊天频道接收到的消息"""
    channel: str  # 用于区分来源,后续发送回复时需要知道应该调用哪个 IM 的 API(feishu、wechat)
    sender_id: str  # 用户标识符
    chat_id: str  # 聊天/频道标识符
    content: str  # 消息文本
    timestamp: datetime = field(default_factory=datetime.now)  # 消息时间

这样新增一个 IM 渠道时,只需要写一个适配器将私有消息转换成 InboundMessage 即可,其余代码零改动。

简而言之:设计 InboundMessage 就是为了让网关“对外讲方言,对内讲普通话”,所有渠道的消息到达网关后立刻被标准化,Agent 只需处理这一种标准格式。

同样地不同 IM 的发送接口千差万别:飞书需要 receive_id,微信需要 touser,Telegram 需要 chat_id。通过设计一个 OutboundMessage 消息对象,这样 Agent 只需要产出 channelchat_idcontent 三个核心字段,网关再根据 channel 值调用对应的 IM 适配器,由适配器负责转换成目标 IM 的私有请求格式即可。

OutboundMessage 消息对象的字段设计如下:

# events.py
@dataclass
class OutboundMessage:
    """要发送到聊天频道的消息"""
    
    channel: str
    chat_id: str
    content: str
    reply_to: str | None = None # 支持引用回复,用于指明当前回复的是哪一条历史消息

网关的输入是 InboundMessage,输出是 OutboundMessage,这样本地 AI Agent 核心只处理这两种标准格式信息,完全不依赖任何 IM 私有 API。这使得添加新 IM 渠道变得非常简单:只需要写一个适配器,将 InboundMessage 解析出来,并将 OutboundMessage 转换成该 IM 的发送请求即可。因为本地 AI Agent 完全不知道自己在和谁在交互,它只看到 InboundMessage/OutboundMessage,这正是网关隐藏后端实现细节的精髓,也是网关本质的体现

3. 网关内部路由:统一通信总线

根据前面的设计,我们已经将各个 IM 渠道的消息统一成了 InboundMessage,并将 Agent 的回复统一成了 OutboundMessage。但仅仅统一格式还不够,还需要解决一个核心问题:多个渠道的消息并发涌入,而 Agent 的处理可能是同步/半异步的,如何让它们有序、可靠、不互相阻塞?

这就需要一个统一通信总线——本质上是一个轻量级的内部消息路由。而最经典、最可靠的实现方式就是双队列解耦

入站异步队列: 渠道 → Agent
出站异步队列: Agent → 渠道

通过双队列把网关内部的“消息流动”标准化为两个 FIFO 管道:

  • 入站异步队列:所有 IM 渠道的消息汇聚点,Agent 从这头取“原材料”。
  • 出站异步队列:所有回复的汇聚点,分发器从这头取“成品”并发送。

为什么需要这样设计?

每个 IM 渠道(飞书、微信等)都有自己的 Webhook 或长连接,当瞬间收到大量消息(例如群聊刷屏)时,如果直接在回调中同步调用 Agent,Agent 处理耗时较长,会导致 Webhook 超时、连接堆积,甚至被 IM 服务器屏蔽。

我们让每个渠道适配器只做最轻量的事情,每当接收到消息时,就只需要解析消息、封装成上述设计的 InboundMessage,然后立即推送到入站异步队列中,马上返回返回即可。而 Agent 的处理则由一个独立的后台协程从入站异步队列中拉取,这样生产者和消费者的速度完全解耦。即使 Agent 处理得慢,队列也能起到“缓冲”作用,不会丢消息。

同时 Agent 只产出上述设计的 OutboundMessage 的数据并推送到出站异步队列中。另一个独立的分发器协程从出站异步队列中取出消息,找到对应的渠道适配器,调用该适配器的发送方法进行发送消息。这样一来,Agent 完全不需要知道消息要发往哪里、怎么发,路由逻辑全封装在网关内部。

统一通信总线代码实现如下:

# message_bus.py
"""用于解耦频道与智能体通信的异步消息队列"""
import asyncio
from loguru import logger
from events import InboundMessage, OutboundMessage

class MessageBus:
    """
    异步消息总线,用于将聊天频道与智能体核心解耦。
    频道将消息推送到入站队列,智能体处理它们并将响应推送到出站队列。
    """
    def __init__(self):
        # 入站异步队列
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        # 出站异步队列
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
    
    async def publish_inbound(self, msg: InboundMessage) -> None:
        """将来自频道的消息发布给智能体"""
        await self.inbound.put(msg)
    
    async def consume_inbound(self) -> InboundMessage:
        """消费下一条入站消息(阻塞直到有消息可用)"""
        return await self.inbound.get()
    
    async def publish_outbound(self, msg: OutboundMessage) -> None:
        """将智能体的响应发布给频道"""
        await self.outbound.put(msg)
    
    async def consume_outbound(self) -> OutboundMessage:
        """消费下一条出站消息(阻塞直到有消息可用)"""
        return await self.outbound.get()

同时入站异步队列和出站异步队列通过 asyncio.Queue 提供。asyncio.Queue 是异步编程中实现生产者-消费者模式的标准工具,它让不同协程之间可以安全、非阻塞地交换数据。在我们上述网关的设计中,正是依赖它实现了入站/出站双队列解耦,从而让多个 IM 渠道可以并发接收消息,同时 Agent 通过并发处理消息,实现效率提高。没有它,你就得自己用锁和条件变量实现类似功能,既复杂又容易出错。

接着我们修改上一篇文章《如何使用飞书机器人连接本地 AI Agent》中实现的飞书连接本地 AI Agent 的飞书频道,实现将来自飞书的消息转发到通信总线。

# feishu.py
+ from events import InboundMessage
+ from message_bus import MessageBus

class FeishuChannel:
    """极简版飞书 WebSocket 长连接机器人"""
+    name = "feishu"
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        self.config = config
        self.bus = bus
        # 省略...

    async def start(self) -> None:
        # 省略...
-    def _on_message(self, data: P2ImMessageReceiveV1) -> None:
+    async def _on_message(self, data: P2ImMessageReceiveV1) -> None:
        """接收到消息时的回调"""
        msg = data.event.message
+        sender = data.event.sender
        # 只处理用户发送的纯文本消息
        if data.event.sender.sender_type == "bot" or msg.message_type != "text":
            return

        content = json.loads(msg.content).get("text", "")
        if not content:
            return
        
+        # 提取发送者信息
+        sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
+        # 获取用于回复的 chat_id
+        chat_id = msg.chat_id
+        chat_type = msg.chat_type  # "p2p" 或 "group"
+        reply_to = chat_id if chat_type == "group" else sender_id
+        # 将消息转发到总线
+        await self._handle_message(
+            sender_id=sender_id,
+            chat_id=reply_to,
+            content=content,
+        )
-        # 启动独立线程处理 AI 逻辑并回复,防止阻塞 WebSocket 接收循环
-        # threading.Thread(
-        #     target=self._process_and_reply, 
-        #     args=(msg.chat_id, content)
-        # ).start()

+    async def _handle_message(
+        self,
+        sender_id: str,
+        chat_id: str,
+        content: str,
+    ) -> None:
+        """
+        处理来自聊天平台的传入消息。
+        此方法将消息转发到总线。
        
+        参数:
+            sender_id: 发送者的标识符。
+            chat_id: 聊天/通道的标识符。
+            content: 消息文本内容。
+        """
        
+        msg = InboundMessage(
+            channel=self.name,
+            sender_id=str(sender_id),
+            chat_id=str(chat_id),
+            content=content
+        )
        
+        await self.bus.publish_inbound(msg)

现在我们已经将飞书发过来的消息推送到通信总线中了,接着我们需要在 Agent 异步处理协程中循环读取总线中的消息进行处理了。

4. 实现并发 Agent Loop

我们上文讲到了通过 asyncio.Queue 实现了入站/出站双队列解耦,从而让多个 IM 渠道可以并发接收消息,同时 Agent 通过并发处理消息,实现效率提高。

但我们前面实现的 Agent Loop 的同步处理数据,所以我们需要重新设计并实现我们的 Agent Loop。

首先我们这个 Agent Loop 需要具备以下功能点:

  1. 持续运行:只要网关没有关闭,Agent Loop 就要一直工作,不能退出。
  2. 响应及时:当有新消息到达时,应尽快开始处理,避免不必要的延迟。
  3. 可优雅停止:外部可以调用 stop() 方法,让循环在安全时机退出,而不是强制杀死协程。
  4. 容错性:单条消息处理失败不应导致整个循环崩溃,并且要能告知用户出错。

那么第一个功能点持续运行,我们可以通过使用一个布尔标志控制循环是否继续。

self._running = True
while self._running:
    # 只要 self._running = True 就一直循环读取通讯总线中的消息进行处理

这样只要 self._running = True 就一直循环读取通讯总线中的消息进行处理。同时我们设计一个 stop() 方法设置 self._running = False,这样外部协程就可以调用 stop() 使得循环将在下一次条件判断时退出。

在读取通讯总线中的消息时,我们需要通过 asyncio.wait_for 实现可中断阻塞读取。即如下实现:

self._running = True
while self._running:
    # 只要 self._running = True 就一直循环读取通讯总线中的消息进行处理
    msg = await asyncio.wait_for(
        self.bus.consume_inbound(), # 本质是 await inbound_queue.get()
        timeout=1.0,
    )

如果不使用 asyncio.wait_for 而是直接使用 await self.bus.consume_inbound() 的话,没有消息就一直等着,那么循环永远不会走到 while self._running 的条件判断。此时调用 stop() 设置 self._running = False 是无效的,因为协程卡在 get() 上,永远没有机会检查 self._running 标志。

而使用 asyncio.wait_for 并设置超时为 1 秒,也就是如果 1 秒内返回了消息,就正常得到 msg。如果 1 秒后队列仍为空,wait_for 会抛出 asyncio.TimeoutError。这样,协程最多阻塞 1 秒就会醒来一次,重新检查 while self._running。因此,即使没有消息,循环也能每秒检查一次退出标志,实现可中断的阻塞读取

根据上述设计我们初步实现 Agent Loop 如下:

import asyncio
import json
import os
from typing import Any

from dotenv import load_dotenv
from loguru import logger
from openai import AsyncOpenAI

from events import InboundMessage, OutboundMessage
from message_bus import MessageBus

load_dotenv()

class AgentLoop:
    def __init__(
        self,
        bus: MessageBus,
        max_iterations: int = 200,
        api_key: str | None = None,
        base_url: str = "https://api.deepseek.com",
        model: str = "deepseek-chat",
    ):
        self.bus = bus
        # 最大工具调用轮次,防止死循环
        self.max_iterations = max_iterations
        self.model = model
        self._running = False
        # 初始化 OpenAI异步客户端 兼容客户端(如 DeepSeek)
        self.client = AsyncOpenAI(
            api_key=api_key or os.getenv("DEEPSEEK_API_KEY"),
            base_url=base_url,
        )

    # ------------------------------------------------------------------
    # 主循环:持续消费 入站异步队列
    # ------------------------------------------------------------------

    async def run(self) -> None:
        """运行智能体循环,处理来自总线的消息。"""
        self._running = True
        logger.info("Agent loop started")

        while self._running:
            try:
                # 从入站队列消费下一条消息,设置超时以便能定期检查 _running 标志
                msg = await asyncio.wait_for(
                    self.bus.consume_inbound(),
                    timeout=1.0,
                )
                try:
                    # 处理消息并获取响应
                    response = await self._process_message(msg)
                    if response:
                        # 将响应发布到出站队列
                        await self.bus.publish_outbound(response)
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    await self.bus.publish_outbound(
                        OutboundMessage(
                            channel=msg.channel,
                            chat_id=msg.chat_id,
                            content=f"抱歉,处理消息时出错:{e}",
                        )
                    )
            except asyncio.TimeoutError:
                continue

    def stop(self) -> None:
        """停止智能体循环。"""
        self._running = False
        logger.info("Agent loop stopping")

上述的 run 方法需要在一开始就启动,这样才可以实现一有消息就马上处理,而不会漏消息。我们把上一篇讲解实现飞书接入本地 AI Agent 的启动文件 test_feishu.py 重命名为 gateway.py,也就是网关的意思,并且修改其中的启动代码:

+ from message_bus import MessageBus
+ from loop import AgentLoop
async def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxx",         # 替换为真实的 App ID
        app_secret="xxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
+    deepseek_key = os.getenv("DEEPSEEK_API_KEY", "")
+    bus = MessageBus()
+    agent = AgentLoop(
+        bus=bus,
+        api_key=deepseek_key,
+        base_url="https://api.deepseek.com",
+        model="deepseek-chat",
+        max_iterations=20,
+    )
    
    # 2. 初始化频道并启动长连接
-    channel = FeishuChannel(config=config)
+channel = FeishuChannel(config=config, bus=bus)
    
    logger.info("正在启动飞书机器人长连接...")
    
-    # 3. 启动并保持运行
+    # 3. 并发运行
    try:
-        await channel.start()
+        await asyncio.gather(
+            agent.run(),          # 持续消费 inbound 队列,调用 LLM
+            channel.start(),      # 飞书启动
+        )
    except KeyboardInterrupt:
        logger.info("收到退出信号,正在关闭...")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

通过上述修改我们就实现了 Agent 和飞书频道在初始化的时候并发运行,从而实现了一开始就监听入站异步队列的消息。

上述 Agent Loop 的 self._process_message 方法是还没实现的,所以我们继续实现 Agent 对消息的处理。本质就是实现大模型的工具调用循环。

在实现 Agent 对消息的处理之前,我们先要重新设计一下会话历史。

5. 会话历史设计

在前面的文章中我们的会话历史就是一个数组,结构如下:

history = [
    {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个助手")},
    {"role": "user", "content": content}
]

后续如果继续有消息就根据角色往数组 history 中追加用户消息和助手消息即可。

但在 OpenClaw 中需要保证不同渠道、不同群、不同用户的历史会话完全隔离。我们可以使用 dict[str, list[dict]] 作为存储结构,相当于在 JavaScript 中设置一个对象,然后通过 key 作为唯一标识进行会话隔离。

key 设计:

这个 key 我们可以设置由 channel + chat_id 组合而成,例如 "feishu:oc_xxx"。然后我们在之前设计的 InboundMessage 对象中设置一个 session_key 方法用于返回会话唯一标识。设置如下:

@dataclass
class InboundMessage:
    # 省略...
    
+    @property
+    def session_key(self) -> str:
+        """用于会话标识的唯一键"""
+        return f"{self.channel}:{self.chat_id}"

value 设计:

value 其实就是上述的历史会话数组,即:

[
    {"role": "system", "content": getattr(agent, "SYSTEM", "你是一个助手")},
    {"role": "user", "content": content}
]

同时我们设计一个 _get_history 的函数来实现对会话历史的懒加载,如果 session_key 不存在,自动创建新列表并插入 system prompt,如果 session_key 存在则返回内部列表的直接引用,调用方可以修改它,即追加消息。这样设计可以避免拷贝带来的性能开销。

实现如下:

# ---------- 会话历史管理(按 session_key 隔离) ----------
# 全局字典:存储所有会话的对话历史
# - Key: session_key,用于唯一标识一个会话(例如 "feishu:chat_id")
# - Value: 消息列表,每个元素是 OpenAI API 兼容的消息字典(包含 role, content 等字段)
_sessions: dict[str, list[dict]] = {}

# 系统提示词:定义 AI 助手的角色、能力和行为准则
SYSTEM_PROMPT = (
    "你是一个智能助手,可以通过工具帮助用户完成任务。"
    "请简洁、准确地回答用户问题。"
)
# 获取会话历史
def _get_history(session_key: str) -> list[dict]:
    # 若为新会话,自动初始化一条包含 system prompt 的消息
    if session_key not in _sessions:
        _sessions[session_key] = [{"role": "system", "content": SYSTEM_PROMPT}]
    # 返回该会话的历史列表(引用,允许外部修改)
    return _sessions[session_key]

6. Agent Loop 的核心:消息处理

在完成了会话历史管理和主循环的可中断阻塞读取之后,Agent Loop 最核心的部分就是 单条消息的处理逻辑——即 _process_message 方法。该方法实现了 ReAct(推理+行动)模式:调用 LLM → 若需要工具则执行工具 → 将结果返回 LLM → 重复直到得到最终答案。下面详细解析其实现:

class AgentLoop:
    # 省略...

    # ------------------------------------------------------------------
    # 单条消息处理:tool-call 循环
    # ------------------------------------------------------------------
    async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
        # 1. 获取当前会话的历史,并追加用户消息
        messages = _get_history(msg.session_key)
        messages.append({"role": "user", "content": msg.content})

        final_content: str | None = None
        # 2. 进入工具调用循环(最多 max_iterations 次)
        for iteration in range(self.max_iterations):
            # 3. 调用 LLM(异步非阻塞)
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages, 
                tools=TOOLS,
                tool_choice="auto",
            )
            assistant_msg = response.choices[0].message

            # 将助手消息追加到历史
            messages.append(assistant_msg)

            # 4. 如果没有 tool_calls,说明任务完成
            if not assistant_msg.tool_calls:
                final_content = assistant_msg.content or ""
                break

            # 5. 执行所有工具调用,并将结果以 role=tool 追加到历史记录
            for tool_call in assistant_msg.tool_calls:
                name = tool_call.function.name
                args = json.loads(tool_call.function.arguments)
                logger.debug(f"Executing tool: {name}, args: {args}")

                result = _execute_tool(name, args)
                logger.debug(f"Tool result: {result[:100]}")

                messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "name": name,
                        "content": result,
                    }
                )
        else:
            # 达到最大迭代次数
            final_content = "已达到最大处理轮次,无法给出最终答案。"

        if final_content is None:
            final_content = "处理完成,但没有内容返回。"
        # 6. 构造出站消息返回给用户
        return OutboundMessage(
            channel=msg.channel,
            chat_id=msg.chat_id,
            content=final_content,
        )

上述代码的实现跟我们前面文章实现 Agent Loop 是一样的,所以大家还有不懂的话,可以回看前面文章的详细解析。最最重要的就是最后返回了构造了 OutboundMessage 格式的出站消息,然后在 run 方法中通过 self.bus.publish_outbound(response) 将消息发布到出站队列。

其中工具定义实现如下:

# ---------- 内置工具定义 ----------
TOOLS: list[dict] = [
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "读取本地文本文件内容。",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "文件路径"},
                    "encoding": {
                        "type": "string",
                        "enum": ["utf-8", "gbk"],
                        "description": "文件编码,默认 utf-8",
                    },
                },
                "required": ["path"],
            },
        },
    }
]

def _execute_tool(name: str, arguments: dict) -> str:
    """同步执行内置工具,返回字符串结果。"""
    if name == "read_file":
        from pathlib import Path

        path = arguments.get("path", "")
        encoding = arguments.get("encoding", "utf-8")
        try:
            p = Path(path).expanduser()
            if not p.exists():
                return f"❌ 文件不存在: {path}"
            return p.read_text(encoding=encoding)
        except Exception as e:
            return f"❌ 读取失败: {e}"
    return f"❌ 未知工具: {name}"

我们这里先只实现一个读取文件内容的工具,后续再实现更多的工具。

7. 构建网关的渠道层

7.1 为什么需要渠道层?

在上一小节中,我们实现在 Agent 中构造了 OutboundMessage 格式的出站消息,然后将消息发布到出站队列中。但还缺少关键的一环:出站异步队列中的消息由谁来消费?如何将 Agent 的回复正确地发送回原来的聊天频道?

我们知道每个即时通讯平台都有自己独特的 API 协议,如果让 Agent 直接处理这些差异,会导致 Agent 逻辑中混杂大量渠道特定代码,每增加一个渠道就要修改 Agent 核心逻辑,这会造成维护噩耗。

所以我们需要构建一个 渠道管理器(ChannelManager),作为网关的出站交通枢纽,负责管理所有 IM 适配器的生命周期,并将出站消息路由到正确的渠道。具体需要实现以下功能:

  1. 注册与管理渠道实例

    • 运行时动态注册各个渠道
    • 维护渠道状态信息
    • 提供统一的渠道访问接口
  2. 协调启动与停止流程

    • 控制渠道启动顺序,避免竞态条件
    • 实现优雅停止,防止消息丢失
    • 处理异常情况下的资源清理
  3. 消息路由与派发

    • 根据消息的 channel 字段路由到正确渠道
    • 调用渠道的发送方法
    • 实现错误隔离和重试机制

7.2 渠道层的设计与实现

如果把整个网关系统比作一个繁忙的交通枢纽,那么渠道层就是站在十字路口中央的交警。它不亲自运送货物,但指挥着所有运输车辆有序通行。

具体来说,渠道层连接着:

  • 上游:内部消息总线(MessageBus),接收标准化的出站消息
  • 下游:各个 IM 渠道适配器(FeishuChannel、WechatChannel 等)

我们先实现一个 ChannelManager 类,并实现数据结构与初始化。代码如下:

import asyncio
from loguru import logger
from message_bus import MessageBus
from feishu import FeishuChannel


class ChannelManager:
    def __init__(self, bus: MessageBus):
        self.bus = bus
        # 存储已注册的渠道适配器,key 为渠道名称(如 "feishu")
        self.channels: dict[str, FeishuChannel] = {}
        # 出站分发器的任务句柄,用于优雅停止
        self._dispatch_task: asyncio.Task | None = None

ChannelManager 的核心数据结构 channels 是一个字典: channel_name → 适配器实例

  • Key = 渠道名称(如 "feishu"、"wechat")
  • Value = 渠道实例对象

这个设计实现了运行时动态注册,可以在不重启服务的情况下添加新渠道。

接着我们来实现注册渠道功能:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        """注册一个渠道适配器。要求该适配器必须有 name 属性和 send 方法。"""
        self.channels[channel.name] = channel
        logger.info(f"Channel registered: {channel.name}")

上述注册渠道的代码实现看起很简单,其实背后的设计原理一点也不简单。它应用了工厂模式 + 依赖注入的设计模式。

  1. 工厂模式体现在:渠道的创建由外部完成,ChannelManager 只负责使用
  2. 依赖注入体现在:渠道实例通过 register() 方法注入,而非在 ChannelManager 内部创建

我们已经实现了一个飞书渠道 FeishuChannel,所以现在需要通过以下方式进行注册飞书渠道:

manager.register(FeishuChannel(...))

同时将来如果我们想新增一个微信渠道,就可以这样实现了,先实现一个 WechatChannel,然后:

manager.register(WechatChannel(...))

这样网关核心代码零改动,真正实现了"开闭原则":对扩展开放,对修改关闭。

接着实现启动所有已注册的频道以及出站分发器。

代码实现如下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        # 省略...
    async def start_all(self) -> None:
        """启动所有已注册的频道以及出站分发器。"""
        if not self.channels:
            logger.warning("No channels registered")
            return

        # 先启动出站分发器协程(确保一有出站消息就能被处理)
        self._dispatch_task = asyncio.create_task(self._dispatch_outbound())

        # 并发启动所有渠道(每个渠道的 start 方法负责建立长连接或监听 Webhook)
        tasks = []
        for name, channel in self.channels.items():
            logger.info(f"Starting {name} channel...")
            tasks.append(asyncio.create_task(channel.start()))

        # 注意:通常渠道的 start 会永久阻塞(如 WebSocket 循环),因此 gather 不会返回
        await asyncio.gather(*tasks, return_exceptions=True)

我们上述的代码实现了一个看似简单却至关重要的设计决策,就是先启动分发器再启动渠道。那么为什么先启动分发器再启动渠道呢?

主要是为了防止消息丢失与响应延迟。让我们分析两种启动顺序的后果:

场景 A:先启动渠道,后启动分发器 时间线:

  1. 飞书渠道启动成功 ✓
  2. 用户立即发送消息:"你好"
  3. Agent 快速处理,生成回复:"你好!我是AI助手"
  4. 回复进入出站队列...
  5. 但是!分发器还没启动 ❌
  6. 回复消息在队列中堆积
  7. 用户等待...等待...(用户体验差)

场景 B:先启动分发器,后启动渠道(我们采用的方式) 时间线:

  1. 分发器启动,开始监听出站队列 ✓
  2. 飞书渠道启动成功 ✓
  3. 用户发送消息:"你好"
  4. Agent 处理,生成回复:"你好!我是AI助手"
  5. 回复进入出站队列
  6. 分发器立即发现新消息 ✓
  7. 路由到飞书渠道,立即发送 ✓
  8. 用户秒级收到回复(体验流畅)

在实际的生产环境经验中,"空转等待"比"忙中丢消息"要好得多。分发器提前就位,就像快递员提前在仓库门口等待,包裹一出来就能立即配送。

接着我们实现出站消息分发器

代码实现如下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    def register(self, channel: FeishuChannel) -> None:
        # 省略...
    async def start_all(self) -> None:
        # 省略...
    async def _dispatch_outbound(self) -> None:
        """
        出站分发器:持续消费 outbound 队列,将消息发送到对应的渠道。
        这是一个后台协程,在 start_all 时启动。
        """
        logger.info("Outbound dispatcher started")

        while True:
            try:
                # 可中断阻塞读取,每隔1秒检查一次取消信号
                msg = await asyncio.wait_for(
                    self.bus.consume_outbound(),
                    timeout=1.0,
                )
                # 根据消息中的 channel 字段找到对应的适配器
                channel = self.channels.get(msg.channel)
                if channel:
                    try:
                        # 调用适配器的 send 方法(各渠道自己实现转换和发送逻辑)
                        await channel.send(msg)
                    except Exception as e:
                        logger.error(f"Error sending to {msg.channel}: {e}")
                else:
                    logger.warning(f"Unknown channel: {msg.channel}")

            except asyncio.TimeoutError:
                # 超时不是错误,只是没有消息,继续循环
                continue
            except asyncio.CancelledError:
                break

我们上一小节中所说的先启动分发器,本质就是通过 while True 不断循环使用 asyncio.wait_for 消费 outbound 队列,然后根据 msg.channel 路由并调用 send 方法。

设计亮点:

  1. 拉模式(Pull)而非推模式(Push)

    • 主动从消息队列拉取消息,控制权在自己手中
    • 相比回调式的推模式,更容易控制消费速率和错误处理
  2. 可中断的事件循环

    • timeout=1.0 让循环能定期"抬头看路",检查是否有停止信号
    • 没有这个超时,任务会一直阻塞在 consume_outbound() 上,难以优雅停止

接着我们继续实现渠道的发送方法,这是协议翻译的最后一步。

为了让 ChannelManager 能够统一管理,每个 IM 适配器必须实现以下两个成员:

  1. name: str:渠道唯一标识(如 "feishu")。
  2. async send(msg: OutboundMessage) -> None:发送回复的方法。

以飞书适配器为例,我们之前已经定义了 name = "feishu",现在补充 send 方法的实现:

class FeishuChannel:
    # 省略...
    async def send(self, msg: OutboundMessage) -> None:
        """通过飞书发送消息。"""
        if not self._client:
            logger.warning("飞书客户端未初始化")
            return

        try:
            # 根据 chat_id 格式确定 receive_id_type
            # open_id 以 "ou_" 开头,chat_id 以 "oc_" 开头
            if msg.chat_id.startswith("oc_"):
                receive_id_type = "chat_id"
            else:
                receive_id_type = "open_id"

            # 构建文本消息内容
            content = json.dumps({"text": msg.content})

            request = CreateMessageRequest.builder() \
                .receive_id_type(receive_id_type) \
                .request_body(
                    CreateMessageRequestBody.builder()
                    .receive_id(msg.chat_id)
                    .msg_type("text")
                    .content(content)
                    .build()
                ).build()

            # OpenAPI 调用是同步的,在线程中运行以避免阻塞
            response = await asyncio.to_thread(
                self._client.im.v1.message.create, request
            )

            if not response.success():
                logger.error(
                    f"发送飞书消息失败:code={response.code}, "
                    f"msg={response.msg}, log_id={response.get_log_id()}"
                )
            else:
                logger.debug(f"飞书消息已发送至 {msg.chat_id}")

        except Exception as e:
            logger.error(f"发送飞书消息时出错:{e}")

本质是就是将我们上一篇文章中的 FeishuChannel 类中 _process_and_reply 方法改成 send 方法即可。这样,ChannelManager 就可以统一调用 await channel.send(msg),完全不需要关心飞书 API 的具体细节。

8. 集成到网关启动入口

现在,我们将 MessageBus、AgentLoop、FeishuChannel 和 ChannelManager 全部串联起来。实现如下:

# gateway.py
import os
from loguru import logger
from feishu import FeishuChannel, FeishuConfig
from message_bus import MessageBus
from loop import AgentLoop
from manager import ChannelManager

async def main():
    # 1. 填入你的飞书机器人凭证
    config = FeishuConfig(
        app_id="xxx",         # 替换为真实的 App ID
        app_secret="xxx",    # 替换为真实的 App Secret
        encrypt_key="",                      # 如果飞书后台配置了 Encrypt Key 则填入,否则留空
        verification_token=""                # 如果配置了 Verification Token 则填入,否则留空
    )
    deepseek_key = os.getenv("DEEPSEEK_API_KEY", "")
    # 2. 创建总线
    bus = MessageBus()
    # 3. 创建 Agent 循环
    agent = AgentLoop(
        bus=bus,
        api_key=deepseek_key,
        base_url="https://api.deepseek.com",
        model="deepseek-chat",
        max_iterations=20,
    )
    
    # 4. 创建飞书渠道(传入总线,以便它 publish_inbound)
    feishu_channel = FeishuChannel(config=config, bus=bus)
    # 5. 创建渠道管理器,并注册飞书渠道
    channels = ChannelManager(bus=bus)
    channels.register(feishu_channel)
    
    logger.info("正在启动 Mini OpenClaw 网关...")
    
    # 6. 并发运行
    try:
        await asyncio.gather(
            agent.run(),          # 持续消费 inbound 队列,调用 LLM
            channels.start_all(), # 飞书长连接 + 出向派发器
        )
    except KeyboardInterrupt:
        pass
    finally:
        logger.info("收到退出信号,正在关闭...")
        agent.stop()
        await channels.stop_all()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

至此整个网关的运行流程如下:

1. 网关“通电”

  • 我们启动 manager.start_all(),它立刻做了两件事:
    • 先派一个“快递员”(_dispatch_outbound 后台任务)守在 发件箱(outbound 队列) 旁边,随时准备把回复送出去。
    • 然后接通 飞书这个“电话线”feishu_channel.start()),开始等待用户发消息。

2. 用户发来消息

  • 用户在飞书群里说了一句“帮我读一下 /tmp/note.txt”。
  • 飞书适配器收到这条“方言消息”,立即翻译成网关内部的 普通话(InboundMessage),然后丢进 收件箱(inbound 队列)

3. Agent 大脑开始思考

  • agent.run() 一直在盯着 收件箱,一看到有新消息就取出来。
  • 它调用大模型并可能执行工具(比如读取文件),最终生成一段回复文本。
  • 然后把回复包装成 标准包裹(OutboundMessage),扔进 发件箱(outbound 队列)

4. 快递员送货

  • 守在 发件箱 旁边的快递员(_dispatch_outbound)发现新包裹,看看上面写的“收件渠道”是 feishu
  • 他马上找到飞书适配器,把包裹交给它:“请发到这个 chat_id 的群里”。
  • 飞书适配器又把回复从 普通话 翻译回 飞书的方言,调用飞书 API 发回群里。

5. 用户看到回复

  • 用户收到助手返回的文件内容,整个流程结束。

我们上述的 channels.start_all() 方法是还没实现的,我们实现一下:

class ChannelManager:
    def __init__(self, bus: MessageBus):
        # 省略...
    async def start_all(self) -> None:
        # 省略...
    async def stop_all(self) -> None:
        """优雅停止所有渠道和出站分发器。"""
        logger.info("Stopping all channels...")

        # 第一阶段:取消出站分发器任务
        if self._dispatch_task:
            self._dispatch_task.cancel()
            try:
                await self._dispatch_task
            except asyncio.CancelledError:
                pass

        # 第二阶段:逐个停止渠道(每个渠道的 stop 方法应关闭连接、释放资源)
        for name, channel in self.channels.items():
            try:
                await channel.stop()
                logger.info(f"Stopped {name} channel")
            except Exception as e:
                logger.error(f"Error stopping {name}: {e}")

实现也很简单,首先停止出站分发器的任务,再逐个停止渠道的连接,释放资源。

接着我们启动网关:

python gateway.py

启动结果如下:

01.png

然后我们接着在上一篇文章中设置了的飞书机器人中进行发消息。

然后我们发现报错了:

image.png

报错原因是因为飞书 SDK 的 register_p2_im_message_receive_v1 要求注册一个同步回调函数(不能是 async def),但消息处理逻辑(如解析内容、发布到 MessageBus)是异步的。因此,我们需要实现一个跨线程调度适配器,用于将飞书 WebSocket 线程中的同步回调安全地桥接到 asyncio 主事件循环。

9. 跨线程调度适配器

首先我们需要保存主事件循环对象,我们是在网关启动文件 gateway.py 中通过 asyncio.run(main()) 启动的主循环。因为飞书 WebSocket 客户端运行在一个独立的后台线程中(见 threading.Thread(target=run_ws, daemon=True).start()),它的回调需要一个同步函数,但真正的消息处理逻辑 _on_message 是一个异步协程,需要被提交到主事件循环中执行,因为 MessageBus 等组件是绑定到主循环的。为了从另一个线程安全地将协程投递到主事件循环,就需要持有主事件循环的引用

先保存主事件循环对象:

class FeishuChannel:
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        self.config = config
        self.bus = bus
+        self._loop = None
        self._client = lark.Client.builder() \
            .app_id(config.app_id) \
            .app_secret(config.app_secret) \
            .build()

    async def start(self) -> None:
        # 省略...
+        # 保存主事件循环对象
+        self._loop = asyncio.get_running_loop()
        def run_ws():
            # 省略...

接着我们创建了一个同步函数 _on_message_sync 作为 register_p2_im_message_receive_v1 的实际回调,然后在 _on_message_sync 中将真正异步的处理函数 _on_message 调度到主事件循环中执行。实现如下:

def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None:
    try:
        if self._loop and self._loop.is_running():
            # 将异步处理函数调度到主事件循环
            asyncio.run_coroutine_threadsafe(
                self._on_message(data),
                self._loop
            )
        else:
            # 备用方案:在新事件循环中运行
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(self._on_message(data))
            finally:
                loop.close()
    except Exception as e: logger.error(f"处理飞书消息时出错:{e}")

接着我们修改 register_p2_im_message_receive_v1 的实际回调函数为上述我们实现的 _on_message_sync

class FeishuChannel:
    def __init__(self, config: FeishuConfig, bus: MessageBus):
        # 省略...
    async def start(self) -> None:
        # 省略...
        # 注册接收消息事件处理函数 im.message.receive_v1
-        handler = builder.register_p2_im_message_receive_v1(self._on_message).build()
+        handler = builder.register_p2_im_message_receive_v1(self._on_message_sync).build()
        # 保存主事件循环对象
        self._loop = asyncio.get_running_loop()

总的来说就是在主事件循环中“记住”主循环对象,供后续其他线程通过 asyncio.run_coroutine_threadsafe 将协程调度回主循环执行,是实现跨线程异步任务调度

同时当主事件循环不存在时创建一个全新的临时事件循环,在当前线程(WebSocket 线程)中同步运行 self._on_message(data),执行完毕后关闭循环。

经过上述迭代后,我们再次启动我们的程序:python gateway.py

然我们再在飞书设置的 AI 机器人上跟我们的 Mini OpenClaw 进行对话,结果如下:

1cbfafacd6d84ef03bd64151f081c17a.jpg

然后我们再根目录下创建一个 test.txt 文件,内容为:“从网关的角度理解并实现一个 Mini OpenClaw”,然后在飞书设置的 AI 机器人输入:“帮我读取 test.txt 文件”,结果如下:

e85ee7fd4d5df8c7fa605994b44a19e4.jpg

至此我们的 Mini OpenClaw 就实现了。

10. 总结

经过上述文章我们可以更加透彻地理解为什么说 OpenClaw 可以简单总结为“高级 Agent + 网关”了。它把飞书、微信这些聊天软件的“方言消息”统一通过一个网关转成内部能听懂的“普通话”(InboundMessage),Agent 只处理这种标准消息。

为了防止消息太多堵死系统,用了两个队列(入站异步队列出站异步队列,相当于收信箱和发件箱)把接收和回复解耦开,像流水线一样互不干扰。Agent 处理完后把回复扔进发件箱,再由分发器根据渠道标签(feishu、wechat)转回对应平台的格式发回去。

这样一来,添加新平台就像加个翻译插件,核心代码完全不用动。最后用跨线程调度解决了飞书回调异步的问题。整个网关跑起来就是:用户发消息 → 标准化 → 入站队列 → Agent 思考(可调用工具)→ 出站队列 → 翻译回原平台 → 用户收到回复

上述实现也是港大开源的 Nanobot 的核心实现,Nanobot 可以说是 Python 版的 OpenClaw,是学习研究场景的轻量选择。

我是程序员Cobyte,欢迎添加 v: icobyte,学习交流 AI Agent 应用开发。

Flutter应用代码混淆完整指南:Android与iOS平台配置详解

Flutter中的代码混淆

代码混淆可以隐藏你的Dart代码中的函数和类名,让 反编译 App变得困难。对于更全面的混淆需求,特别是针对iOS IPA文件,可以使用专业工具如IpaGuard,它支持无需源码的代码和资源混淆,兼容Flutter等多种开发平台,有效增加反编译难度。

注:Dart的混淆还没有经过完全的测试,如果发现问题请到GitHub上提 issue 。关于混淆的问题,还可以参考 Stack Overflow 上的这个问题。

Flutter中的混淆配置其实是在Android和iOS端分别配置的。

Android

<ProjectRoot>/android/gradle.properties 文件中添加如下代码:

extra-gen-snapshot-options=--obfuscate

默认情况下,Flutter不会混淆或者缩减Android host,如果你使用了第三方的Java或者Android库,那么你可能需要减小APK体积,或者防止你的App被反编译。

  • Step 1:配置Proguard文件

新建 /android/app/proguard-rules.pro 文件,然后添加如下配置:


#Flutter Wrapper
-keep class io.flutter.app.** { *; }
-keep class io.flutter.plugin.**  { *; }
-keep class io.flutter.util.**  { *; }
-keep class io.flutter.view.**  { *; }
-keep class io.flutter.**  { *; }
-keep class io.flutter.plugins.**  { *; }

上面的配置只保护Flutter库,其他额外的库(比如Firebase)需要你自己添加配置。

  • Step 2:

打开 /android/app/build.gradle 文件,定位到 buildTypes 处,在 release 配置中将 minifiyEnableduseProguard 标志设为true,同时还需要指向Step1中创建的ProGuard文件:


android {
    ...
    buildTypes {
        release {
            signingConfig signingConfigs.debug
            minifyEnabled true
            useProguard true
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
}

注意混淆和缩减无用代码会加长App的编译时间。

iOS

  • Step 1:修改 "build aot"

<ProjectRoot>/packages/flutter_tools/bin/xcode_backend.sh 文件中添加 build aot flag:

${extra_gen_snapshot_options_or_none}

然后定义这个flag:


local extra_gen_snapshot_options_or_none=""
if [[ -n "$EXTRA_GEN_SNAPSHOT_OPTIONS" ]]; then
  extra_gen_snapshot_options_or_none="--extra-gen-snapshot-options=$EXTRA_GEN_SNAPSHOT_OPTIONS"
fi
  • Step 2:应用你的修改

在你的App的根目录下运行以下两条命令:


git commit -am "Enable obfuscation on iOS"
flutter
  • Step 3:更改release配置

<ProjectRoot>/ios/Flutter/Release.xcconfig 中添加下面这行:

EXTRA_GEN_SNAPSHOT_OPTIONS=--obfuscate

对于iOS平台,如果需要更强大的混淆保护,可以考虑使用IpaGuard这样的工具,它可以直接对IPA文件进行混淆加密,支持代码和资源文件的全面混淆,无需源码即可操作,并兼容Flutter应用,提供即时测试功能。

Flutter iOS应用混淆与安全配置详细文档指南

Flutter iOS应用混淆与安全配置文档

概述

本文档详细描述了iOS应用的混淆与安全配置过程。这些配置旨在保护应用代码、API密钥和敏感数据,防止逆向工程和恶意攻击。配置包括 Dart 代码混淆、原生代码混淆、运行时安全检查和数据安全措施。

混淆与安全措施

Dart代码混淆

Flutter提供了内置的代码混淆功能,通过以下参数启用:

--obfuscate --split-debug-info=./symbols
1

这将:

  • 重命名代码中的标识符,使反编译后的代码难以理解
  • 将调试信息分离到单独的文件中,减少发布版本中的可读信息
  • 保留符号信息用于崩溃分析,但不包含在发布版本中

此外,使用像IpaGuard这样的专业混淆工具可以进一步增强应用安全性。IpaGuard是一款强大的iOS IPA文件混淆工具,无需源码即可对代码和资源进行混淆加密,支持Flutter等多种开发平台,有效增加反编译难度。

原生代码混淆与安全

在iOS上,我们通过以下配置增强安全性:

  1. BuildSettings.xcconfig 配置:
// 启用代码混淆和优化
GCC_OPTIMIZATION_LEVEL = s
SWIFT_OPTIMIZATION_LEVEL = -O
SWIFT_COMPILATION_MODE = wholemodule
DEAD_CODE_STRIPPING = YES

// 安全设置
ENABLE_STRICT_OBJC_MSGSEND = YES
CLANG_WARN_SUSPICIOUS_MOVE = YES
CLANG_WARN_OBJC_LITERAL_CONVERSION = YES
GCC_NO_COMMON_BLOCKS = YES
STRIP_STYLE = all
STRIP_INSTALLED_PRODUCT = YES
COPY_PHASE_STRIP = YES
DEBUG_INFORMATION_FORMAT = dwarf-with-dsym

// 启用应用传输安全
PRODUCT_SETTINGS_URL_SCHEMES = "$(inherit)"
PRODUCT_SETTINGS_APP_TRANSPORT_SECURITY_ALLOWS_ARBITRARY_LOADS = NO

// 添加其他安全属性
OTHER_LDFLAGS = $(inherited) -Wl,-no_pie
12345678910111213141516171819202122
  1. Xcode构建参数
xcodebuild -workspace Runner.xcworkspace -scheme Runner -configuration Release clean build \
  ENABLE_BITCODE=YES STRIP_INSTALLED_PRODUCT=YES DEPLOYMENT_POSTPROCESSING=YES \
  -sdk iphoneos -allowProvisioningUpdates
123

这些参数确保:

  • 启用Bitcode,允许App Store进一步优化代码
  • 移除不必要的符号和调试信息
  • 进行部署后处理操作,应用额外的优化

运行时安全检查

通过以下Swift代码实现运行时安全检查:

// 检查设备是否已越狱
func isJailbroken() -> Bool {


    #if targetEnvironment(simulator)
    return false
    #else
    // 检查常见的越狱文件路径
    let jailbreakPaths = [
        "/Applications/Cydia.app",
        "/Library/MobileSubstrate/MobileSubstrate.dylib",
        "/bin/bash",
        "/usr/sbin/sshd",
        "/etc/apt",
        "/private/var/lib/apt/",
        "/usr/bin/ssh"
    ]

    for path in jailbreakPaths {


        if FileManager.default.fileExists(atPath: path) {


            return true
        }
    }

    // 检查是否可以写入私有目录
    let stringToWrite = "Jailbreak Test"
    do {


        try stringToWrite.write(toFile: "/private/jailbreak.txt", atomically: true, encoding: .utf8)
        try FileManager.default.removeItem(atPath: "/private/jailbreak.txt")
        return true
    } catch {


        // 无法写入,说明没有越狱
    }

    return false
    #endif
}

// 检查是否连接调试器
func isDebuggerAttached() -> Bool {


    #if DEBUG
    return false
    #else
    var info = kinfo_proc()
    var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, getpid()]
    var size = MemoryLayout<kinfo_proc>.stride
    let status = sysctl(&mib, UInt32(mib.1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556

苹果iOS应用开发上架与推广完整教程

苹果应用上架与推广详细指南

作为苹果个人开发者,确保用户能够顺畅地下载并安装自己精心开发的应用,是不可或缺的一环。接下来,我们将深入探讨实现这一目标的一系列核心步骤。

01应用上架准备

> 注册开发者账号

注册开发者账号是发布应用的第一步,必须在苹果开发者官网完成信息填写和费用支付。你需要前往苹果开发者官网,填写包括姓名、联系方式在内的个人真实信息,并支付相应费用。完成注册与身份验证后,你将获得发布应用的权限。

> 确保应用符合准则

在用户能够顺畅地下载并安装应用之前,开发者需要完成一系列的准备工作。这些准备工作的目标是确保应用的下载和安装过程尽可能顺畅,从而提升用户体验。

应用需符合苹果的质量与安全标准,不得包含恶意代码,需具备良好的UI设计和正常功能。苹果对应用的质量、功能及安全性都设有严格标准。应用不得包含恶意代码,必须具备良好的用户界面设计,且功能正常、无显著漏洞。此外,应用还需遵守法律法规,如不得侵犯他人知识产权,不得诱导用户进行不合理付费等。只有满足这些准则的应用,才有可能通过审核并上架供用户下载。

> 选用开发工具与测试

接下来,我们就将详细介绍这些准备工作。在着手构建和测试应用之前,有几个关键步骤需要完成。这些步骤旨在为应用的顺畅下载和安装铺平道路,进而优化用户体验。

使用Xcode进行开发,选择合适的编程语言并在多设备和系统版本上进行全面测试,确保功能完备、兼容性和性能。通常,Xcode被视为开发首选,它提供了从代码编写到编译、调试的一站式服务。依据应用特性,选择如Swift或Objective-C等编程语言。以一个简易笔记应用为例,我们可以运用Swift来构建用户界面,并实现笔记的增删改查功能。务必在不同型号的iOS设备和多种系统版本上进行详尽测试。这包括验证功能的完备性,例如确认笔记应用能否顺利存储和读取数据;确保兼容性,以保证应用在iPhone、iPad等设备上的一致性;以及评估性能,如测试应用的启动速度和响应时间。例如,在iPhone 13与iPhone 8上分别运行笔记应用,观察是否存在布局混乱或功能失效的问题。此外,开发者也可以使用AppUploader等工具来简化iOS证书的申请和管理,支持在Windows、Linux或Mac系统中操作,无需依赖Mac电脑。

02向App Store提交与推广

> 准备元数据与分类选择

接下来,就可以将你的应用提交到App Store了。 提交应用前需准备应用名称、描述、截图及视频,并选择合适的分类以提高用户搜索的精准性

为应用起一个简洁且能体现核心功能的名称,如“速记笔记”,同时撰写详细且吸引人的描述,突出应用特点、功能和优势。此外,还需提供展示关键界面和操作流程的截图及视频预览,使用户能直观了解应用。依此选择合适的分类,如笔记应用可归于“效率”类别。这样能帮助用户在App Store中更精准地搜索到应用。

> 提交审核与推广策略

通过App Store Connect提交审核,积极进行应用推广,包括社交媒体、博主合作及应用内推广,提高用户下载量。

使用Xcode完成应用打包后,通过App Store Connect提交应用进行审核。或者,使用AppUploader工具上传IPA文件到App Store,它支持多平台,比Application Loader更高效,且不携带设备信息。审核过程通常需数日,期间苹果团队将检查应用是否符合各项标准。若审核过程中发现任何问题,将收到反馈通知,需根据反馈进行相应修改后重新提交。同时,积极推广应用也是提高下载量的关键。

  1. 社交媒体推广:借助Twitter、Facebook、Instagram等社交媒体平台,广泛宣传您的应用。您可以分享应用截图、详细的功能介绍以及使用教程等,以此吸引更多潜在用户的关注。例如,在Twitter上发布一系列关于您的笔记应用使用技巧的推文,并附上应用的下载链接。

  2. 与博主和媒体合作:积极联系相关领域的博主、自媒体或科技媒体,向他们介绍您的应用,并努力争取他们的推荐或报道。例如,与专注于效率类应用评测的博主取得联系,邀请他们体验您的应用并分享他们的使用感受。

  3. 应用内推广:如果您还有其他已发布的应用,可以在这些应用内部推广您的新应用,从而引导现有用户下载并体验。

通过上述推广策略,苹果个人开发者可以成功地推动应用的下载安装,并通过广泛的推广活动提高应用的下载量和用户活跃度,从而让您的开发成果得到更多用户的认可和喜爱。

HTTPS超文本传输安全协议全面解析与工作原理

计算机网络---https(超文本传输安全协议)

1. HTTPS的定义与核心定位

HTTPS(HyperText Transfer Protocol Secure,超文本传输安全协议)并非独立于HTTP的“新协议”,而是 HTTP协议与TLS/SSL加密层的结合体——它在HTTP的应用层与TCP传输层(或HTTP/3的QUIC协议层)之间,增加了一套标准化的加密、认证与数据校验机制,核心目标是解决HTTP协议的安全缺陷,保障客户端与服务器之间的通信安全。

从技术本质来看,HTTPS的定位可概括为:

  • 基础不变:仍基于HTTP的请求/响应模型(方法、状态码、头字段完全兼容HTTP),底层传输依赖TCP(HTTP/1.x/2)或UDP(HTTP/3+QUIC);
  • 安全增强:通过TLS/SSL协议实现“加密传输+身份认证+数据防篡改”,弥补HTTP明文传输、无身份校验、数据易被劫持的短板;
  • 行业标配:当前所有涉及隐私(如登录、支付)、敏感数据(如政务、医疗)的场景均强制要求使用HTTPS,主流浏览器(Chrome、Safari)已对HTTP明文网站标记“不安全”警示。

2. HTTPS的核心价值:解决HTTP的三大安全缺陷

要理解HTTPS的必要性,需先明确HTTP的安全漏洞——这些漏洞在公网(如WiFi、运营商网络)中极易被利用,引发隐私泄露、财产损失等风险。HTTPS通过三大核心能力,针对性解决这些问题:

2.1 机密性(Confidentiality):防止数据被窃听

HTTP的致命缺陷是 明文传输:请求/响应内容(如密码、银行卡号、搜索记录)在网络中以“可读文本”形式传输,任何处于传输链路中的设备(如路由器、黑客的抓包工具)都能直接窃取数据。

例如,用户在HTTP网站输入“用户名:test,密码:123456”,抓包工具可直接捕获如下明文:

POST /login HTTP/1.1
Content-Type: application/x-www-form-urlencoded

username=test&password=123456

HTTPS通过 对称加密算法(如AES-256-GCM)解决此问题:客户端与服务器协商出一个“会话密钥”,所有数据传输前用该密钥加密,传输后用同一密钥解密——即使数据被截获,攻击者因无会话密钥,也无法还原为可读文本。

在开发或测试过程中,可以使用抓包工具如 Sniffmaster 来监控和分析 HTTPS 流量,以验证加密效果和排查问题,支持全平台操作且无需复杂代理设置。

2.2 完整性(Integrity):防止数据被篡改

HTTP不校验数据完整性:攻击者可通过“中间人攻击”(MITM)拦截HTTP数据包,修改内容后再转发给目标方,且双方均无法察觉。

典型场景:用户在HTTP电商网站下单,支付金额为100元,攻击者拦截请求后将金额改为10000元,服务器接收并处理修改后的请求,导致用户多支付。

HTTPS通过 哈希算法+数字签名 保证完整性:

  1. 发送方(如服务器)对数据计算哈希值(如SHA-256),生成“数据指纹”;
  2. 用私钥对哈希值签名,生成“数字签名”,与数据一同发送;
  3. 接收方(如客户端)用公钥验证签名,若验证通过,再对接收数据重新计算哈希值;
  4. 对比两次哈希值:一致则数据未被篡改,不一致则数据已被修改,直接丢弃。

2.3 身份认证(Authentication):防止身份被伪造

HTTP无身份认证机制:攻击者可伪造服务器(如搭建虚假WiFi热点,伪装成“商场免费WiFi”),诱骗用户访问虚假网站,窃取用户信息(即“钓鱼攻击”)。

例如,用户想访问 http://www.bank.com,但攻击者伪造了一个域名相似的 http://www.bankk.com,页面样式与真实银行完全一致,用户输入账号密码后,数据直接发送给攻击者。

HTTPS通过 CA证书体系 实现身份认证:服务器需向权威的“证书颁发机构(CA)”申请证书,证书中包含服务器的域名、公钥、CA签名等信息。客户端(如浏览器)接收证书后,会通过内置的“根CA证书”验证服务器证书的合法性——只有验证通过,才确认服务器是真实的,而非伪造。

3. HTTPS的关键组件:TLS/SSL协议与CA证书体系

HTTPS的安全能力依赖两大核心组件:TLS/SSL协议(负责加密与握手)和CA证书(负责身份认证),二者协同工作,构成HTTPS的安全基石。

3.1 TLS/SSL协议:HTTPS的“加密引擎”

TLS(Transport Layer Security,传输层安全协议)是SSL(Secure Sockets Layer)的升级版,当前主流版本为 TLS 1.2TLS 1.3(SSLv3因安全漏洞已被禁用)。TLS协议栈分为三层,自上而下分别为:

协议层 核心功能 关键技术/字段
警报层(Alert) 传递TLS会话中的错误信息(如证书过期、加密套件不支持),触发连接关闭或重试 警报级别(Warning/Fatal)、错误代码(如42表示证书过期)
握手层(Handshake) 协商TLS会话参数(加密套件、会话密钥)、交换证书、验证身份 客户端Hello、服务器Hello、证书、密钥交换消息
记录层(Record) 对应用层数据(HTTP请求/响应)进行分片、压缩、加密、添加认证标签 对称加密算法(AES)、哈希算法(SHA-256)、记录长度
TLS版本演进与核心优化

TLS协议的迭代始终围绕“更安全、更高效”展开,各版本的关键差异如下:

版本 发布时间 核心特点 安全性/性能评价
SSLv3 1996年 首个广泛应用的版本,但存在POODLE漏洞(可被破解加密) 已废弃,完全不安全
TLS 1.0 1999年 修复SSLv3漏洞,引入RSA密钥交换、AES加密 安全性不足(如BEAST漏洞),部分浏览器已禁用
TLS 1.1 2006年 修复BEAST漏洞,改进IV(初始化向量)生成方式 安全性一般,逐步被淘汰
TLS 1.2 2008年 支持SHA-2哈希算法、AEAD加密模式(如AES-GCM),增强完整性与机密性 当前主流版本,安全性可靠
TLS 1.3 2018年 1. 简化握手流程(从4次交互减至3次,支持0-RTT);
2. 移除不安全加密套件;
3. 合并密钥交换与服务器Hello
性能最优、安全性最高,逐步普及

关键优化:TLS 1.3的“0-RTT(Round-Trip Time)”握手可实现“首次重连时无需等待握手完成即可发送数据”,大幅降低延迟——例如,用户第二次访问某网站时,可直接用缓存的会话参数发送请求,无需重新协商密钥。

3.2 CA证书体系:HTTPS的“身份身份证”

CA(Certificate Authority,证书颁发机构)是公认的“网络信任第三方”,负责验证服务器身份并颁发证书。CA证书体系采用“层级信任模型”,确保证书的合法性可追溯,核心构成如下:

证书的核心结构(X.509标准)

一份合法的TLS证书包含以下关键信息(可通过浏览器“查看证书”功能查看):

  • 版本:如X.509 v3;
  • 序列号:CA分配的唯一标识,用于吊销证书;
  • 主体(Subject):证书持有者信息(服务器域名、公司名称等);
  • 公钥信息:服务器的公钥(用于加密会话密钥)及算法(如RSA、ECC);
  • 签发者(Issuer):颁发证书的CA名称(如Let’s Encrypt、Symantec);
  • 有效期:证书生效与过期时间(通常为1-2年,需提前续期);
  • 数字签名:CA用自身私钥对证书内容的哈希值签名,用于客户端验证。
证书的信任链验证

客户端(如浏览器)验证服务器证书时,需通过“信任链”逐层校验,确保证书未被伪造,流程如下:

  1. 客户端接收服务器证书,先验证“服务器证书的签名”——用CA的公钥解密签名,得到哈希值,再对证书内容重新计算哈希值,对比一致则证书内容未被篡改;
  2. 若签发服务器证书的是“中间CA”(而非根CA),则需验证“中间CA证书”的签名,直到追溯至“根CA证书”;
  3. 根CA证书是浏览器/操作系统内置的“信任根”(如微软根CA、苹果根CA),无需额外验证——若根CA未被信任(如自制证书),浏览器会弹出“证书风险”警示,阻止用户访问。
证书的类型与应用场景

根据验证严格程度,CA证书分为三类,适用于不同场景:

  • 域名验证型证书(DV证书):仅验证域名所有权(如通过DNS解析、文件上传验证),无公司信息,仅显示“安全锁”图标,适合个人博客、小型网站,免费(如Let’s Encrypt);
  • 组织验证型证书(OV证书):验证域名所有权+公司主体信息(如营业执照),证书中包含公司名称,适合企业官网、普通电商,需付费(年费数百至数千元);
  • 扩展验证型证书(EV证书):最高级别验证,需提交公司资质、法律文件、实地核验,浏览器地址栏会显示“绿色锁+公司名称”,适合金融、支付、政务网站,费用较高(年费数千元至数万元)。

4. HTTPS的核心工作流程:TLS握手与加密通信

HTTPS的通信过程分为两大阶段: TLS握手阶段(协商会话参数、交换证书、生成会话密钥)和 加密通信阶段(用会话密钥传输HTTP数据)。以下以主流的 TLS 1.2 和优化后的 TLS 1.3 为例,详细拆解流程。

4.1 TLS 1.2握手流程(4次交互)

TLS 1.2握手需客户端与服务器进行4次TCP交互(2个RTT),流程如下:

  1. 客户端Hello(Client Hello)
    • 客户端向服务器发送:支持的TLS版本(如TLS 1.2)、支持的加密套件(如TLS_RSA_WITH_AES_256_GCM_SHA384)、客户端随机数(Client Random,用于后续生成会话密钥)、会话ID(若为重连,携带历史会话ID)。
  2. 服务器Hello + 证书 + 服务器Hello Done
    • 服务器响应:确认TLS版本和加密套件(从客户端支持的列表中选择)、服务器随机数(Server Random,与Client Random共同用于生成密钥)、服务器证书(包含公钥)、“服务器Hello Done”消息(表示服务器已完成初始响应)。
  3. 客户端证书验证 + 密钥交换 + 客户端完成
    • 客户端验证服务器证书:通过信任链校验证书合法性,若验证失败,终止连接并提示风险;
    • 生成预主密钥(Pre-Master Secret):客户端生成一个随机数,用服务器证书中的公钥加密,发送给服务器(即“密钥交换消息”);
    • 生成会话密钥:客户端用Client Random + Server Random + Pre-Master Secret,通过PRF(伪随机函数)生成“会话密钥”(用于后续对称加密);
    • 发送“客户端完成”消息:包含对前序所有握手消息的哈希值+数字签名,证明握手过程未被篡改,同时告知服务器后续将用会话密钥加密数据。
  4. 服务器密钥交换 + 服务器完成
    • 服务器用自身私钥解密“预主密钥”,得到Pre-Master Secret;
    • 用与客户端相同的算法(Client Random + Server Random + Pre-Master Secret)生成会话密钥;
    • 发送“服务器完成”消息:包含对前序握手消息的哈希值+数字签名,告知客户端后续将用会话密钥加密数据。

至此,TLS握手完成,客户端与服务器持有相同的会话密钥,进入加密通信阶段。

4.2 TLS 1.3握手流程(3次交互,优化版)

TLS 1.3通过合并消息、减少交互,将握手压缩至3次TCP交互(1个RTT),流程如下:

  1. 客户端Hello + 密钥交换
    • 客户端发送:支持的TLS 1.3版本、加密套件(仅保留安全套件,如TLS_AES_256_GCM_SHA384)、客户端随机数、“密钥共享”消息(提前生成密钥交换所需的公钥参数,替代TLS 1.2的Pre-Master Secret)。
  2. 服务器Hello + 证书 + 密钥交换 + 服务器完成
    • 服务器响应:确认TLS 1.3版本和加密套件、服务器随机数、服务器证书、“密钥共享”消息(用客户端的公钥参数计算出会话密钥)、“服务器完成”消息(包含握手消息的哈希签名)。
  3. 客户端完成
    • 客户端验证证书后,用服务器的密钥共享参数生成会话密钥;
    • 发送“客户端完成”消息(包含握手消息的哈希签名),进入加密通信阶段。

核心优化:TLS 1.3删除了“服务器Hello Done”消息,将密钥交换与服务器响应合并,减少1次交互;同时支持“0-RTT”模式——若客户端缓存了历史会话参数(如会话票证),首次重连时可直接发送加密的HTTP请求,无需等待握手完成,进一步降低延迟。

4.3 加密通信阶段:HTTP数据的安全传输

TLS握手完成后,客户端与服务器开始用“会话密钥”传输HTTP数据,流程如下:

  1. 应用层(HTTP)生成请求/响应数据(如 GET /index.html HTTP/1.1);
  2. TLS记录层对数据进行处理:
    • 分片:将数据分割为最大16KB的记录;
    • 压缩(可选):用指定算法(如DEFLATE)压缩数据;
    • 加密:用会话密钥通过对称加密算法(如AES-GCM)加密数据;
    • 认证:添加“消息认证码(MAC)”或“认证标签(Tag)”,用于验证数据完整性;
  3. 加密后的记录通过TCP(或QUIC)传输给对方;
  4. 接收方的TLS记录层解密数据、验证完整性、解压缩、重组,再传递给应用层(HTTP)处理。

5. HTTPS的性能优化:打破“HTTPS更慢”的误区

早期HTTPS因TLS握手延迟、加密计算开销,确实比HTTP慢,但随着协议优化(如TLS 1.3)、硬件升级(CPU支持AES指令集)、缓存机制改进,HTTPS的性能已接近HTTP,甚至通过优化可超越HTTP。以下是核心优化方向:

5.1 减少TLS握手延迟

  • 升级TLS 1.3:从2个RTT减至1个RTT,重连时支持0-RTT,延迟降低50%以上;
  • 会话复用
    • 会话ID复用:服务器存储会话参数(如会话密钥),客户端重连时携带会话ID,无需重新协商密钥;
    • 会话票证(TLS Ticket)复用:服务器用密钥加密会话参数,生成“会话票证”发送给客户端,客户端重连时携带票证,服务器解密后直接复用会话,无需存储会话状态(适合分布式服务器);
  • OCSP Stapling(证书状态 stapling):客户端验证证书时,无需向CA服务器查询证书状态(如是否吊销),而是由服务器提前获取CA的OCSP响应,“装订”在证书中一同发送,减少1次CA查询的RTT。

5.2 降低加密计算开销

  • 选择高效加密套件:优先使用AEAD模式的加密套件(如AES-GCM、ChaCha20-Poly1305),兼顾安全与性能——ChaCha20在不支持AES指令集的设备(如低端手机)上性能比AES快3倍;
  • 采用ECC椭圆曲线加密:ECC(Elliptic Curve Cryptography)比RSA更高效,相同安全级别下,ECC的密钥长度更短(如256位ECC≈3072位RSA),加密/解密速度更快,适合移动设备和高并发场景。

5.3 优化证书传输

  • 证书链合并:将服务器证书、中间CA证书合并为一个文件,减少证书传输的TCP连接次数;
  • 证书压缩:使用Brotli或GZIP压缩证书(尤其是EV证书,内容较大),减少传输带宽。

5.4 结合HTTP/3与QUIC

HTTP/3基于QUIC协议(UDP传输),QUIC内置TLS 1.3加密,无需单独的TLS握手——QUIC的“0-RTT”握手可同时完成QUIC连接建立与TLS加密协商,进一步降低延迟;同时QUIC支持“连接迁移”(如手机从WiFi切换到4G,无需重新握手),提升移动场景下的HTTPS体验。

6. HTTPS的常见误区与澄清

误区1:“HTTPS绝对安全,不会被攻击”

HTTPS并非“绝对安全”,仍存在潜在风险,但需满足特定条件:

  • 证书私钥泄露:若服务器私钥被窃取,攻击者可伪造证书,拦截加密数据;
  • 弱加密套件:使用TLS 1.0/1.1或不安全套件(如TLS_RSA_WITH_3DES_EDE_CBC_SHA),可能被破解;
  • 中间人攻击(MITM):若用户信任了伪造的根CA证书(如恶意软件植入伪造根CA),攻击者可拦截并解密HTTPS数据。

应对措施:定期轮换私钥、禁用弱TLS版本和套件、通过安全工具(如Qualys SSL Labs)检测证书配置。此外,工具如 Sniffmaster 提供 HTTPS 抓包功能,支持无需代理的设置,便于开发者进行安全审计和调试。

误区2:“免费CA证书不安全,不如付费证书”

免费证书(如Let’s Encrypt)与付费证书的核心安全机制完全一致,均符合TLS标准,差异仅在验证严格程度和品牌信任度:

  • 安全层面:免费DV证书与付费OV/EV证书均采用相同的加密算法(如AES-256),均可实现机密性、完整性、身份认证;
  • 差异层面:付费证书验证公司信息,适合需要展示企业可信度的场景(如金融),免费证书适合个人或无需展示企业信息的场景(如博客)。

误区3:“HTTPS会增加服务器负载,不适合高并发”

早期HTTPS的加密计算确实会增加服务器CPU负载(约10%-20%),但通过优化可大幅缓解:

  • 硬件优化:使用支持AES-NI指令集的CPU(如Intel Xeon、AMD EPYC),加密计算速度提升10倍以上;
  • 软件优化:Nginx、Apache等服务器已优化TLS处理,支持多进程/多线程并发;
  • 缓存与会话复用:通过会话票证、OCSP Stapling减少重复计算,降低负载。

当前主流互联网公司(如阿里、腾讯)的高并发业务(秒杀、直播)均使用HTTPS,证明其可支撑高并发场景。


HTTPS不仅是“安全协议”,更是当前Web生态的“基础设施”——它通过TLS加密解决了HTTP的安全漏洞,保障了用户隐私与数据安全;同时,HTTPS也是SEO(搜索引擎优化)、PWA(渐进式Web应用)、WebSocket等技术的前提条件,无HTTPS则无法使用这些功能。

未来,HTTPS的发展方向将聚焦于:

  1. TLS 1.3的全面普及:浏览器与服务器逐步淘汰TLS 1.2及以下版本,强制使用更安全、更高效的TLS 1.3;
  2. 量子抗性加密(Post-Quantum Cryptography):研发可抵御量子计算攻击的加密算法(如格密码、哈希签名),避免未来量子计算机破解RSA/ECC加密;
  3. 简化证书管理:通过自动化工具(如Certbot)实现证书的自动申请、续期、部署,降低中小企业使用HTTPS的门槛。

日志诊断 Skill:用 AI + MCP 一键解决BUG|得物技术

一、概述

做后端开发,调 BUG 有一个让人头疼的固定流程:打开日志平台,输入 traceId 或关键词,搜日志;从几十上百条日志里,找到关键的那几条;把日志里的类名、方法名复制出来,去 IDE 里找对应代码;结合代码逻辑,判断哪里出了问题;如果一次找不准,回去再搜日志,再翻代码……

这个过程相对固定,但非常耗时间。每次 BUG 定位,光在日志平台和 IDE 之间来回切换,就能消耗掉大半的时间。

最开始在去年 Q3 想到这个问题的时候,脑子里浮现的第一个方案是:用 Cursor + MCP,把日志平台接进来,再挂一个代码知识库,让 AI 帮我查日志。但这个方案有缺陷 —— 日志查询是「动态的」,它依赖环境、应用、时间范围,没办法静态预置。此外,这样处理没有办法做到比较丝滑地读代码、改代码。

后来开始用 Claude Code,接触到了 Skill 的概念:可以在项目里定义一套自定义命令,描述 AI 应该怎么执行这个命令的每个步骤,于是整个思路变得清晰了。

日志平台有 MCP,Claude Code 有 Skill,两者结合,就能让 AI 自动完成「查日志 → 找关键信息 → 扫描代码 → 定位问题」这整个闭环。然后在 PM 的帮助下,才有了 /log-diagnosis 这个 Skill。

二、日志平台 MCP 是什么

MCP 原理

日志平台推出了基于 MCP(Model Context Protocol)协议的日志查询服务,让 Claude 可以直接调用日志平台的能力,无需人工在日志平台上手动查询。

MCP 本质上是一种标准化的「工具调用协议」,Claude Code 通过 SSE(Server-Sent Events)长连接与 MCP Server 通信,实时获取日志数据。

MCP 环境对照

核心 MCP 工具

鉴权流程

secretKey(日志平台后管申请)
    ↓ acquireTokenTool
accessToken(1小时有效,最多同时存在5个)
    ↓ 携带 accessToken
logsQuery / logSqlQuery / countLogTool ...

secretKey 申请地址:进入日志管理后台 → 日志权限 → 我的应用 → 生成密钥。

三、/log-diagnosis Skill 是什么

Skill 工作原理

log-diagnosis 是一个运行在 Claude Code 里的自定义诊断命令。Claude Code 支持通过 .claude/skills/ 目录定义自定义技能(Skill),以 Markdown 文件描述行为规范,Claude 在收到对应命令时会自动加载并执行。你只需要把 traceId 或告警信息告诉它,剩下的全部交给 AI。完整执行链路如下:

用户输入 /log-diagnosis {环境} {代码分支} {诉求}
    ↓
Claude 加载 .claude/skills/log-diagnosis/SKILL.md
    ↓
读取 .diagnosis/config.json 获取当前环境配置
    ↓
检查 accessToken 是否过期,过期则自动刷新
    ↓
从 traceId 计算日志时间范围(取第9-16位16进制时间戳)
    ↓
调用日志平台 MCP 分页拉取全量日志(最多20页,不遗漏)
    ↓
切换到指定代码分支,结合日志关键词检索代码
    ↓
综合分析:上游日志 + 当前服务日志 + 代码逻辑 → 根因
    ↓
生成诊断报告(飞书文档 or 本地 Markdown)
    ↓
恢复原始代码分支

两种诊断入口

核心能力

  • Token 自动管理:accessToken 过期自动刷新,无需手动维护;
  • 分页全量拉取:自动分页拉完所有日志,禁止只查第一页就下结论(最多 20 页);
  • 跨服务分析:自动识别上下游服务,拉取关联服务日志交叉验证;
  • 代码联动:日志里出现的类名/方法名,直接在代码里精确定位。

queryString 语法规则

# 格式
{field} {操作符} "{值}" {连接符} {field} {操作符} "{值}"
# 操作符
=  : 精确匹配
≈  : 模糊匹配(like)
# 连接符
AND / OR / NOT
# 示例
trace_id"a1b2c3d4e5f6789012345678abcdef01"
trace_id"xxx" AND log_level = "ERROR"
endpoint ≈ "/api/your-endpoint" AND log_level"ERROR"
message ≈ "timeout"

注意:时间范围只通过 start/end 参数控制,不要写在 queryString 中。

四、安装与配置

安装日志平台 MCP

Claude Code

在 Claude Code 命令行中执行,按需安装对应环境:

# 测试环境
claude mcp add --transport sse dw-log-mcp-t1 https://{your-t1-aigw-domain}/api/v1/mcp/log-mcp/sse
# 预发环境
claude mcp add --transport sse dw-log-mcp-pre https://{your-pre-aigw-domain}/api/v1/mcp/log-mcp/sse
# 生产环境
claude mcp add --transport sse dw-log-mcp-prd https://{your-prd-aigw-domain}/api/v1/mcp/log-mcp/sse

安装后重启 Claude Code,执行 /mcp 确认连接状态正常。

Cursor

  1. 打开 Cursor Setting;

  2. 点击 Tools & MCP,添加 MCP Server;

  3. 添加 URL,MCP Server 名称任意。

建议按需安装 MCP Server,避免额外消耗 token,示例配置:

{
  "mcpServers": {
    "dw-log-mcp-t1": {
      "url": "https://{your-t1-aigw-domain}/api/v1/mcp/log-mcp/sse"
    },
    "dw-log-mcp-pre": {
      "url": "https://{your-pre-aigw-domain}/api/v1/mcp/log-mcp/sse"
    },
    "dw-log-mcp-prd": {
      "url": "https://{your-prd-aigw-domain}/api/v1/mcp/log-mcp/sse"
    },
    "dw-log-mcp-oversea-prd": {
      "url": "https://{your-oversea-aigw-domain}/api/v1/mcp/log-mcp/sse"
    }
  }
}

4. 返回设置,就可以看到已经连接上。

安装 /log-diagnosis Skill

将 log-diagnosis 目录放到项目的对应目录下:

Claude Code

your-project/
└── .claude/
    └── skills/
        └── log-diagnosis/
            ├── SKILL.md        # 技能行为规范(核心)
            ├── README.md       # 使用说明
            └── reference.md   # 附录:时间脚本、queryString 示例等

Cursor

your-project/
└── .cursor/
    └── skills/
        └── log-diagnosis/
            ├── SKILL.md        # 技能行为规范(核心)
            ├── README.md       # 使用说明
            └── reference.md   # 附录:时间脚本、queryString 示例等

配置 .diagnosis/config.json

首次运行会自动引导创建 (直接调用 /log-diagnosis,Skill 会一步步指示你给出 secret key),也可手动在项目根目录创建 .diagnosis/config.json:

your-project/
└── .cursor/
    └── skills/
        └── log-diagnosis/
            ├── SKILL.md        # 技能行为规范(核心)
            ├── README.md       # 使用说明
            └── reference.md   # 附录:时间脚本、queryString 示例等

字段说明:

secretKey:唯一需要人工填写的字段,在日志平台后管申请;

accessToken:首次使用时由 AI 自动调用 acquireTokenTool 获取,过期自动刷新;

accessTokenExpireAt:从 acquireTokenTool 返回值自动填充;

fields:调用 logFields 工具自动获取。

五、使用方式

命令格式:

/log-diagnosis {环境} {代码分支(可选)} {诉求描述}

参数说明:

  • {环境}:T1 / PRE / PRD(按实际环境标识填写);
  • {代码分支}:可选,留空则使用当前分支;
  • {诉求描述}:包含 traceId 或告警信息的问题描述,用自然语言书写即可。

示例:

# 用 traceId 定位接口异常
/log-diagnosis T1 feature/your-branch trace_id: "your-trace" 为什么最终没有返回数据
# 用告警信息分析错误原因
/log-diagnosis PRD master 告警详情:【接口:YourService/yourMethod】【业务码:10002000】【业务码消息:系统异常,请稍后重试】帮我分析问题可能性

一行命令,AI 全程接管,几分钟内给出根因分析。

六、实战案例:一个隐蔽的 SQL BUG

背景

某搜索接口在测试环境反馈没有返回数据。拿到 traceId,直接执行:

/log-diagnosis T1 feature/your-branch trace_id: "your-trace" 为什么最终没有返回数据

← 就这一句话,接下来全部交给 AI。

AI 自动拉取日志

Skill 触发后,AI 自动完成:

  • 从 traceId 推算出日志时间范围(2026-02-27 全天);
  • 检查 accessToken 已过期,自动刷新;
  • 调用日志平台 MCP,分 2 页拉取完整日志,共 73 条。

请求入参(从日志自动提取):

{
  "assembleByOrg": true,
  "channelType": "MANUAL",
  "orderNo": "your-order-no",
  "status": 1,
  "ticketNo": "your-ticket-no"
}

AI 还原完整调用链路

AI 自动识别出关键节点:resultList is empty,SQL 查询返回了空结果。问题在 DB 层,而不在业务逻辑层。

AI 提取组装后的查询 DTO

从日志中提取到 toSearchDTO 组装结果:

{
  "channelType": "MANUAL",
  "customerTag": 1,
  "deliveryMode": "某配送方式",
  "orderStatus": "8010",
  "orderType": "0",
  "productCategoryIds": [29],
  "status": 1,
  "ticketSource": 67,
  "ticketTypeId": 5802
}

AI 从日志中提取实际执行的 SQL 发现根因

ORM 框架在日志中打印了实际执行的 SQL,AI 直接读取并分析:

SELECT a.id, a.pid, a.name, a.mode, a.status, a.org_id, a.org_ids,
       a.ticket_group_id, a.tenant_id, a.is_del, a.channel_types
FROM your_type_table a
LEFT JOIN your_relation_table b
    ON b.tenant_id = 1 AND a.id = b.type_id AND b.type = 3 AND b.is_del = 0
WHERE a.tenant_id = 1 AND a.mode = 2 AND a.is_del = 0
  AND a.status = 1
  AND (a.channel_types IS NULL OR a.channel_types = '' OR FIND_IN_SET('MANUAL', a.channel_types) > 0)
  AND (b.root_id is null or b.root_id in (29))
  AND (a.order_types IS NULL OR a.order_types = '' OR FIND_IN_SET('0', a.order_types) > 0)
  AND (a.order_statuses IS NULL OR a.order_statuses = '' OR FIND_IN_SET('8010', a.order_statuses) > 0)
  AND (a.delivery_modes IS NULL OR a.delivery_modes = '' OR FIND_IN_SET('某配送方式', a.delivery_modes) > 0)
  AND (a.ticket_sources IS NULL OR a.ticket_sources = '' OR FIND_IN_SET(67, a.ticket_sources) > 0)
  AND (a.customer_tag IS NULL OR a.customer_tag = 1)   ← BUG 在此

AI 发现:其他字段都处理了 IS NULL 和 = ''(空字符串代表 “不限制”)两种情况,唯独 customer_tag 只判断了 IS NULL,遗漏了空字符串 '' 的情况。

SQL 语义对比:

-- 其他字段(正确):IS NULL 和 '' 都处理了
AND (a.order_types IS NULL OR a.order_types'' OR FIND_IN_SET('0', a.order_types) > 0)
AND (a.delivery_modes IS NULL OR a.delivery_modes'' OR FIND_IN_SET('某配送方式', a.delivery_modes) > 0)
AND (a.ticket_sources IS NULL OR a.ticket_sources'' OR FIND_IN_SET(67, a.ticket_sources) > 0)
-- customer_tag(遗漏了 = '' 的判断)← BUG
AND (a.customer_tag IS NULL OR a.customer_tag1)

DB 中现有的数据,customer_tag 字段都存的是空字符串(未配置),按业务语义本应匹配所有请求,却因为这个遗漏被全部过滤掉了。

AI 定位代码,给出修复方案

AI 在代码中直接找到对应的 MyBatis Mapper XML:

<!-- 问题代码 -->
<if test="customerTag != null">
    and (a.customer_tag IS NULL OR a.customer_tag = #{customerTag})
</if>
<!-- 修复后 -->
<if test="customerTag != null">
    and (a.customer_tag IS NULL OR a.customer_tag = '' OR a.customer_tag = #{customerTag})
</if>

效率对比

这个 BUG 的隐蔽性在于:SQL 语法正确,逻辑上也「看起来」没问题——只有对比了其他字段的写法,才能发现 customer_tag 独自遗漏了空字符串的处理。这类细节差异,人工排查很容易忽略,AI 反而很擅长。

七、诊断效率关键点

  • 有 traceId 时优先用 traceId 拉日志,可精准获取单次请求的完整链路,比关键词搜索精确得多;
  • 关注关键日志节点:toSearchDTO finished / search begins / resultList is empty / search finished 等,快速判断数据在哪一层丢失;
  • SQL 打印日志(ORM 框架输出)是黄金线索,直接反映最终执行的查询条件,AI 能从中发现肉眼难以察觉的差异;
  • 分页必须拉完:日志平台一次只返回部分数据,AI 会严格执行分页直到取完,确保不遗漏关键日志。

八、总结

核心思路:用「协议 + 规范」让 AI 接管固定流程:

这篇文章的本质,是一次对重复性工程劳动的自动化尝试。调 BUG 的过程——查日志、提取关键信息、找代码、分析原因——逻辑固定,步骤繁琐,但并不需要太多创造性思维。这类工作恰好是 AI 最擅长接管的。

实现这个闭环,靠的是两个关键组合:

  • MCP:让 AI 能够调用外部系统(日志平台),突破了「AI 只能处理静态上下文」的限制,实现了对动态数据的实时获取。
  • Skill:给 AI 一份行为规范,告诉它每一步该怎么做、先做什么后做什么、遇到什么情况怎么处理,把「一次性对话」变成「可复用的工程化能力」。

两者缺一不可。只有 MCP,AI 能查日志但不知道怎么系统地分析;只有 Skill,AI 有流程但没有数据来源。组合起来,才形成了真正可落地的闭环。

值得借鉴的地方:

识别「固定流程」是自动化的起点:不是所有工作都适合 AI 接管,但凡是「步骤固定、信息来源明确、输出格式可预期」的工作,都值得尝试用 Skill + MCP 的方式来自动化。排查 BUG 是一个典型,类似的还有:代码审查、性能分析报告生成、告警巡检等。

Skill 的本质是「给 AI 写操作手册」:Skill 文件不是在「训练模型」,而是在给 AI 一份清晰的 SOP。写得越细、约束越明确(比如「禁止只查第一页就下结论」「必须分页拉完所有数据」),AI 的执行质量越稳定。这和写给人看的文档本质上是一回事。

AI 擅长发现「横向对比」类的 BUG:本文的案例揭示了一个有意思的规律:AI 在处理「同类字段逻辑不一致」这类问题时,表现往往比人工更好。原因在于 AI 没有「先入为主」的经验偏见,不会因为「这段代码看起来没问题」就跳过,它会对所有字段做同等的审查。

最后说一句:AI 时代,工程师的核心竞争力不只是「能写代码」,更是「能把自己的经验和流程转化成可复用的 AI 能力」。/log-diagnosis 是一次小小的尝试,但背后的思路,值得在更多场景里延伸。

往期回顾

1.Redis 自动化运维最佳实践|得物技术

2.Claude在得物App数仓的深度集成与效能演进

3.Claude Code + OpenSpec 正在加速 AICoding 落地:从模型博弈到工程化的范式转移|得物技术

4.大禹平台:流批一体离线Dump平台的设计与应用|得物技术

5.基于 Cursor Agent 的流水线 AI CR 实践|得物技术

文 /阿程

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

iOS 多技术栈混淆实现,跨平台 App 混淆拆解与组合

当项目从单一 iOS 原生扩展到 Flutter、React Native 或 Unity 时,混淆这件事会变得复杂。原因不在于工具少,而是每一层代码完全不同

  • Swift / Objective-C → Mach-O 符号
  • Flutter → Dart AOT + assets
  • React Native → JS bundle
  • Unity → DLL + 资源

如果只用一种 iOS 混淆工具,通常只能覆盖其中一部分。


不同技术栈暴露的信息完全不一样

拿一个混合项目举例(Flutter + 原生 + H5),解包 IPA 后可以看到:

AppBinary          // 原生代码
flutter_assets/    // Dart + 资源
main.jsbundle      // JS 逻辑
assets/            // 图片与配置

每一层的“暴露方式”不同:

技术 可被读取的内容
Swift / OC 类名、方法名、参数
Flutter Dart 符号(部分)、资源路径
React Native JS 逻辑
Unity DLL + AssetBundle

这意味着混淆必须分层处理。


原生层:符号混淆(iOS 混淆工具核心能力)

先看最传统的一层:Swift / Objective-C。

检查方式:

strings AppBinary | grep Controller

如果看到:

HomeViewController
PaymentManager

说明符号未处理。


处理方式

使用 Ipa Guard 这类 IPA 级别的 iOS 混淆工具:

  • 导入 IPA
  • 进入代码模块
  • 勾选类 / 方法 / 参数

执行后:

PaymentManager → a82kd3

这一步直接改变 Mach-O 符号,是跨平台项目中最“统一”的一层处理。


Flutter 层:Dart 混淆 + IPA 补充

Flutter 提供内置混淆:

flutter build ios --obfuscate --split-debug-info=./symbols

执行后:

  • Dart 符号被替换
  • 生成符号映射

但 IPA 解包后仍然可以看到:

assets/images/banner.png
config/app.json

补充处理

使用 Ipa Guard 的资源模块:

banner.png → x92kd.png
app.json → a83ks.json

这样 Dart 层 + 资源层同时处理。


React Native:JS 混淆 + 文件重命名

React Native 的关键在 JS bundle:

main.jsbundle

直接打开可以读。


处理步骤

1)压缩 JS:

terser main.js -o main.min.js

2)替换 bundle

3)用 Ipa Guard 修改文件名称:

main.jsbundle → k39sd.bundle

这样:

  • 内容不可读
  • 路径无语义

Unity:资源与 DLL 的组合处理

Unity 项目解包后:

Data/Managed/Assembly-CSharp.dll
Data/Resources/

DLL 可以被反编译,资源路径也能推断逻辑。


处理方式

  • 使用 Unity 构建参数减少符号
  • 在 IPA 层用 Ipa Guard 处理资源名称
  • 修改资源 MD5

例如:

level1.assetbundle → a82kd.bundle

统一处理:资源指纹与结构差异

跨平台项目中,资源重复是一个常见问题。

例如多个 App 使用同一套 UI:

banner.png
icon.png

即使改名,内容仍然一致。

处理方式

在 Ipa Guard 中开启 MD5 修改:

md5 banner.png

处理前后不同。

这一步可以打散资源特征。


七、调试信息清理

检查:

strings AppBinary | grep NSLog

或:

strings AppBinary | grep Flutter

如果存在调试信息,可以统一清理。Ipa Guard 支持删除调试符号和部分日志字符串。


签名工具

无论哪个技术栈,只要修改 IPA,就必须重新签名。

可以使用:

kxsign sign app.ipa \
-c cert.p12 \
-p password \
-m dev.mobileprovision \
-z test.ipa \
-i

本地执行 IPA 混淆 无需上传致云端且不修改工程的方案

在很多团队里,混淆这一步常常被外包给在线加固服务:上传 IPA,等结果,下载再签名。流程确实顺手,但当项目涉及商业逻辑或私有算法时,这种方式总让人有点不踏实——完整的二进制、资源、接口结构都离开了本地环境。

后来我们把这一步彻底改成本地执行,不上传任何文件不改工程源码只操作已编译好的 IPA


一、先确认 IPA 当前长什么样

把构建好的 IPA 复制一份并解压:

unzip app.ipa

进入目录:

Payload/App.app

检查三个位置:

1)二进制可读信息

strings AppBinary | head

如果能看到:

UserManager
PaymentService
VipController

说明符号没有做处理。


2)资源目录结构

assets/images/vip_banner.png
config/payment.json

路径本身已经带有业务语义。


3)前端资源

main.jsbundle
index.html

这些文件如果未压缩,直接可读。


二、本地链路的核心思路

整个流程不依赖任何远程服务,结构如下:

IPA 文件
→ 本地解析
→ 本地混淆
→ 本地资源处理
→ 本地签名
→ 本地测试

关键在于:所有操作都发生在开发机器上。


先处理 JS / H5(如果存在)

如果项目中包含 WebView 或 React Native 模块,可以在 IPA 处理前压缩脚本。

例如:

terser main.js -o main.min.js

或者:

uglifyjs page.js -o page.min.js

压缩后再替换回 IPA 资源目录。

这样可以先降低 JS 层的可读性。


在本地执行 IPA 符号混淆

这一步是核心。

使用 Ipa Guard 这类本地运行的 IPA 混淆工具,可以直接处理 Mach-O 文件,而不需要源码。

操作过程:

  • 打开工具
  • 导入 IPA
  • 进入「代码模块」

可以看到:

OC 类
Swift 类
OC 方法
Swift 方法

在列表中选择需要处理的符号,例如:

UserManager
PaymentHandler
VipService

执行后:

UserManager → k39sd2

整个过程在本地完成,不会上传任何数据。


资源文件本地重写

继续在 Ipa Guard 的资源模块中操作。

勾选:

  • 图片
  • JSON
  • HTML
  • JS

执行后:

vip_banner.png → a82kd.png
payment.json → x92ks.json

工具会自动更新引用路径。

这一层的作用是让资源结构失去语义。


改变资源指纹(避免“同源识别”)

如果多个应用使用相同资源,文件内容会成为识别依据。

在 Ipa Guard 中开启 MD5 修改:

md5 banner.png

处理前后不同。

文件视觉效果不变,但指纹已经改变。


清理调试信息

检查:

strings AppBinary | grep NSLog

如果存在日志或调试字符串,可以在混淆阶段删除。

Ipa Guard 提供调试信息清理选项。


补充一个“简单校验机制”

为了避免 IPA 被二次篡改,可以在原生层加入简单校验:

  • 计算关键文件 hash
  • 启动时验证

例如:

if hash != expected { exit(0) }

这一步不依赖混淆工具,但可以作为补充。


本地完成签名与安装

混淆后 IPA 已失去原签名,需要重新签名。

可以使用:

kxsign sign app.ipa \
-c cert.p12 \
-p password \
-m dev.mobileprovision \
-z test.ipa \
-i

或者直接在 Ipa Guard 中配置证书。

连接设备后可以直接安装。


验证结果(这一步不能跳)

安装后重点检查:

  • 页面是否正常
  • 资源是否加载
  • 动态调用是否正常
  • WebView 内容是否可用

如果出现异常,通常是:

  • 某些符号被误混淆
  • 某些资源路径未正确更新

把 IPA 混淆完全放在本地执行,并不只是“更安全”的选择,它还带来一个实际好处:每一步都可控、可调试、可回滚。相比上传到云端处理,本地流程更适合需要长期维护的项目。

搭建一个云端Skills系统,随时随地记录TikTok爆款

最近 Claude Skills 很火。

但我观察了一圈,发现大家都在陷入一种“开发者的自嗨”。

绝大多数 Skills 的应用场景都被死死锁在 IDE 里,锁在开发者的电脑前。

这叫开发提效,不叫业务提效。

真正的业务发生在移动端,发生在你通勤、吃饭、甚至躺在床上刷 TikTok 的时候。

如果你的 AI 能力必须打开电脑、输入命令行才能调用,那它的时空效率就是零。

于是我抛弃本地的 Claude Code,基于 OpenHands 做了一套云端 Skills 系统。

效果极其简单粗暴:

我在刷 TikTok,看到一个爆款视频,点击复制链接,敲击 iPhone 背面三下。

wxv_4355007050494509070

20 秒后,我的飞书多维表格里自动新增了一行数据。

Image

这行数据包含了:这个视频的无水印文件、Gemini 拆解的镜头语言分析、爆款原因推导,以及一套可直接复用的 AI 视频生成提示词。

全过程我不需要打开电脑,不需要切换 APP,不需要等待。

这就是我今天要聊的:如何用 OpenHands + Skills + iOS 快捷指令,构建一套真正落地的业务自动化系统。

01 为什么 Claude Code 在业务侧是伪需求

先厘清两个概念:OpenHands 和 Claude Code。

Claude Code 是 Anthropic 官方推出的命令行工具,它是一个嵌入在你本地终端里的结对程序员。它的 Skills 本质是上下文记忆和本地工具接口。

它的优势是懂你的代码规范,能直接改你电脑里的文件。

但它有一个对于业务场景的致命弱点:它必须依附于你的会话,你不在,它就不动。

它是一个副驾驶(Copilot)。

而 OpenHands(前身 OpenDevin)是一个开源的、自主的 AI 软件工程师。它运行在 Docker 容器里,是一个独立的服务端 Agent。

Image

openhands.dev/

它是一个可以被封装成 API 服务的数字员工。

我看重 OpenHands 的核心理由只有一个:它可以 24 小时在线,并且可以通过 API 远程唤醒。

我做的这个 TikTok 分析系统,本质就是把 OpenHands 部署在服务器上,通过 FastAPI 暴露接口。

Claude Code 是给你用的工具;OpenHands 是你雇佣的、随时待命的员工。

🐵

小提示:FastAPI 的服务地址后加/docs就是文档了

02 业务视角:从 刷视频 到「数据入库」的闭环

对于做出海营销和短视频矩阵的朋友,拆解爆款是每天的必修课。

传统的流程极其反人类:

  1. 1. 手机刷到视频,点收藏。
  2. 2. 晚上回家打开电脑,把链接导出来。
  3. 3. 找第三方工具去水印下载。
  4. 4. 把视频传给 Gemini 分析。
  5. 5. 人工把分析结果复制粘贴到 Excel 或飞书。

这个链路太长,断点太多。任何需要延迟满足的流程,最终都会变成不了了之。

我的远程 Skills 方案,把这个流程压缩到了极致。

整个逻辑是这样的:

Image

用户端(前端)

利用 iOS 自带的快捷指令 + 背部轻点功能。

  • 动作:获取剪贴板内容(TikTok 链接)。
  • 触发:发送 HTTP POST 请求给我的服务器。
  • 反馈:手机震动一下,表示任务已接收。

Image

Image

服务端(后端)

OpenHands 接收到请求后,自主执行以下 Skills:

  1. Playwright Skill:

启动无头浏览器。这里有一个技术难点,TikTok 的反爬虫机制非常严格。如果用普通的 request 请求,成功率几乎为零。OpenHands 调用 Playwright 模拟真实浏览器行为,绕过 blob 协议,抓取真实的 MP4 视频流。这种方式的下载成功率稳定在 70%-80%

  1. Gemini Skill:

视频下载后,调用Gemini 2.5 Flash,快且便宜。它不只是看,它是理解。它可以识别拍摄角度(俯拍/特写)、运镜方式(推拉摇移)、BGM 节奏点、色彩心理学。

  1. Feishu Skill:

将清洗好的结构化数据(JSON),通过 API 写入飞书多维表格。

结果:

当你刷完半小时视频,打开飞书,几十个爆款视频的深度分析报告已经整整齐齐躺在那里了。

这才是 AI 赋能业务的本质:隐形化。

Image

Openhands 的 Skills 文档:

docs.openhands.dev/sdk/guides/…

03 举一反三:跨境电商的远程 Skills 玩法

这套架构的核心逻辑是:移动端触发 -> 服务端 API -> OpenHands 执行复杂 Skills -> 结果回传。

这个逻辑在出海业务里有无限的延展性。

我给几个具体的场景,你们可以拿去直接落地。

场景一:竞品独立站监控

  • 动作:在手机浏览器看到竞品的 Shopify 店铺,复制链接,触发 Shortcut。
  • Skills:OpenHands 调起爬虫 Skill 扫描该站点的新品上架情况、价格策略,并调用 SEO Skill 分析其关键词布局。
  • 产出:一份竞品分析简报直接推送到你的 Slack 或 钉钉。

场景二:亚马逊差评自动预警与回复草稿

  • 动作:系统监控到差评(自动触发,无需人工)。
  • Skills:OpenHands 读取差评内容,结合历史客服知识库 Skill,分析用户情绪,并模仿金牌客服的语气撰写 3 个版本的回复邮件。
  • 产出:草稿进入审核流,你只需要在手机上点批准。

场景三:广告素材批量生产

  • 动作:上传一张产品图到指定文件夹。
  • Skills:OpenHands 识别产品特征,调用 Midjourney 或 Runway 的 API,结合当下的流行趋势 Skill,自动生成 10 种不同风格的广告背景图。
  • 产出:素材自动同步到 Google Drive 供投放团队筛选。

04 为什么非要用 Agent Skills?写个 Python 脚本不行吗?

这是很多技术出身的朋友最容易陷入的误区。

你这个功能,我写个 Python 脚本 + 定时任务也能跑,为什么要搞这么复杂的 OpenHands Skills?

因为业务逻辑是流动的,而脚本是僵死的。

如果你写死了一个 Python 脚本:

  • 当 TikTok 的前端代码更新了 class 名,脚本报错,你得去修。
  • 当飞书的 API 接口变动,脚本报错,你得去修。
  • 当 Gemini 的模型参数调整,脚本报错,你得去修。

但在 OpenHands Skills 的架构下,我们定义的不是步骤,而是目标。

在我的 Skill 定义里,我告诉 OpenHands:你的任务是下载这个页面上的视频,如果常规方法失败,尝试模拟用户滚动;如果还失败,检查是否有验证码并尝试通过。

OpenHands 作为一个 Agent,它具备自主决策和自我修复的能力。

  • 它发现 TikTok 改了页面结构?它会尝试用视觉识别去定位播放按钮。
  • 它发现 API 报错?它会自主查阅文档或尝试备用节点。

在跨境出海这种平台规则朝令夕改的环境下,维护脚本的成本极高。

我们需要的是一个能够理解意图并自主寻找路径的智能体。

05 思路打开,Agentic Skills 的高级玩法

文章到这里,这套远程 Skills 系统的雏形已经搭建完毕。

但如果你觉得这就结束了,那你就小看了 Agentic Skills 的天花板。

我们现在的架构是“一个请求触发一个 Skill”,但这只是冰山一角。真正的威力在于 Multi-Skill Orchestration(多技能编排)。

  1. 1. Skill Chain(技能链)与递归调用

OpenHands 的 Skill 本质是可执行的逻辑单元。我们可以像写代码一样,让 Skill A 去调用 Skill B。

  • 比如定义一个 Base-Skill:只负责做基础的数据清洗。
  • 再定义一个 Pro-Skill:先调用 Base-Skill 处理数据,再把结果传给 Analysis-Skill,最后调用 Report-Skill 生成报告。

你可以构建一个自我迭代的 Agent。让它先写一段代码(Coding Skill),然后自己运行测试(Testing Skill),如果报错,递归调用 Coding Skill 进行修复,直到测试通过。

  1. 混合云架构(Hybrid Agent Architecture)

OpenHands 运行在 Docker 里,这意味着它可以部署在任何地方。

  • 私有化部署:对于涉及公司财务、用户隐私的数据,你可以把 OpenHands 部署在公司内网服务器上。
  • 公有云调用:对于需要访问外网(如 TikTok 下载、竞品分析)的任务,部署在 AWS 或 Vercel 上。

这样,通过 API 网关,你可以指挥内网的 Agent 去调用外网的 Agent,实现数据在安全域和互联网域之间的智能流转。

  1. “人机回环”的异步交互

谁说 API 只有“请求-响应”这一种模式? 在我的系统中,有些复杂任务(如竞品深度调研)可能需要运行 30 分钟。

  • 流程设计:OpenHands 接收任务 -> 立即返回 TaskID -> 后台异步执行。
  • 关键点:当 Agent 遇到无法决策的卡点(例如:这个验证码我解不开,或者这个竞品网站有两套价格体系,取哪套?),它可以主动通过飞书/Slack 给你发消息请求确认。

你点击确认后,Agent 继续执行。这才是真正的人机协作:AI 处理海量冗余信息,人类只在关键节点做决策。

在这个体系下,Skills 不再是静态的脚本,而是可生长、可组合的原子能力。

未来,你的个人服务器里可能运行着上百个这样的 Skills。它们是一群田螺姑娘,在你睡觉的时候,帮你监控市场、回复邮件、整理知识、优化代码。

而你,只需要握着手机,轻轻敲两下背部,就像魔法师挥动了魔杖。

这,才是 Agent 时代的真正玩法。

❌