普通视图

发现新文章,点击刷新页面。
今天 — 2026年3月25日首页

Claude Code + OpenSpec 正在加速 AICoding 落地:从模型博弈到工程化的范式转移|得物技术

作者 得物技术
2026年3月24日 10:22

一、破局:AI 编码的真正瓶颈不是模型,是上下文管理

在软件开发的历史进程中,每一次效率的飞跃都伴随着抽象层次的提升。从汇编语言到高级语言,从手动内存管理到垃圾回收,开发者始终在寻求降低认知负荷的方法。进入 2026 年,生成式人工智能(GenAI)已成为编程领域不可或缺的力量。 然而,行业正经历从 “模型崇拜” 向 “工程落地” 的深刻转型,单纯依靠增加大语言模型(LLM)的参数规模已无法解决复杂业务逻辑中的幻觉与失控问题。

当前的共识是,AI 编码(AICoding)的真正瓶颈不在于模型的逻辑能力,而在于上下文管理(Context Management)的失效与开发意图(Intent)的模糊。

通过对 Anthropic 推出的 Claude Code(以下简称 CC)与 Fission AI 倡导的 OpenSpec 进行深度解构可以发现,两者正在通过 “代理化执行” 与 “规格化驱动” 双轮驱动,构建一套闭环的 AI 研发体系。这种结合不仅标志着 AI 编程工具从 IDE 插件向终端原生代理(Agentic Tool)的转变,更预示着 “规格驱动开发”(Spec-Driven Development, SDD)将成为企业级 AICoding 落地的核心范式。

在 AICoding 的早期阶段,开发者普遍认为只要模型足够强大,就能解决所有编程难题。然而,随着项目复杂度的增加,这种观点遭到了现实的挑战。研究表明,虽然 AI 编码助手的使用率在提升,但软件交付的稳定性却在下降。例如,Google 的 DORA 2024 报告指出,AI 采用率每增加 25%,交付稳定性反而下降 7.2%。

生产力悖论与认知负荷

AICoding 领域存在一个显著的 “生产力悖论”:开发者在使用 AI 时主观感知速度提升了 20%,但实际完成任务的时间却增加了 19%。这一现象的根源在于 AI 在处理长上下文时的效能衰减。随着任务推移,AI 往往会陷入修正循环(Fix/Test Loops),无法触及深层的业务功能,反而需要更多的人工干预。

模型的逻辑推理能力(Reasoning)在短小上下文中表现卓越,但在大型工程环境中,模型面临的是 “上下文中毒”(Context Poisoning)和 “注意力漂移”(Attention Drift)。当对话历史过长或包含过多无关代码时,模型的性能会呈现非线性下降。例如,GPT-4o 等先进模型在 1K Token 时的准确率为 99.3%,而当上下文扩展到 32K Token 时,准确率会暴跌至 69.7%。这种 “性能断崖” 意味着,单纯依靠扩大上下文窗口(Context Window)并不能解决问题。

上下文工程的兴起

上下文工程(Context Engineering)正在取代提示词工程(Prompt Engineering),成为 AICoding 的核心技术方案。上下文工程的核心不在于 “如何写更好的指令”,而在于 “如何为模型筛选最精准的 Token 集合”。

下表对比了传统缩放路径与上下文工程路径的局限性:

在大型组织中,上下文管理面临更严峻的挑战。很多关键决策并未记录在代码中,而是散落在飞书文档评论、群消息、会议或开发者的认知中。AI 代理在缺乏这些隐性知识(Implicit Knowledge)的情况下,生成的方案虽然符合语法,但却违背了架构初衷或业务约束。

上下文作为一等系统

现代 AI 代理架构开始将上下文视为一种具有自身架构、生命周期和约束的 “一等系统”。在这种视角下,上下文管理不再是临时的字符串拼接,而是一条精密的 “编译器管道”:

  • 存储与呈现分离: 区分持久化的会话状态(Session)与单次模型调用的工作上下文(Working Context)。
  • 显式转换: 通过命名的、有序的处理器(Processors)构建上下文,而非随机堆砌。
  • 默认作用域: 每个子代理仅能看到执行任务所需的最小上下文,通过工具(Tools)按需获取更多信息。

二、Claude Code:把 AI 变成真正懂你项目的编码伙伴

Claude Code (CC) 是 Anthropic 推出的原生代理工具,它直接运行在终端中,具备读取文件、运行命令、执行重构以及自主验证的能力。与传统的 IDE 插件相比,CC 的核心优势在于其“代理循环”(Agentic Loop)和对上下文协议的深度掌控。

代理循环:收集、行动与验证

CC 的工作流程被定义为一个闭环系统,旨在模仿人类工程师的思维过程:

  • Gather Context(收集上下文): CC 不会盲目读取整个目录,而是通过文件搜索、Git 状态检查以及读取特定的 CLAUDE.md 文件来建立认知。
  • Take Action(采取行动): 基于推理,CC 可以跨多个文件执行编辑,或者利用终端工具(如 npm install、git commit)操作环境。
  • Verify Results(验证结果): 这是 CC 最具杀伤力的特性。它能自动运行测试、捕捉错误,并根据反馈调整方案。研究表明,带有验证步骤的 Coding 生成过程,其成功率远高于单次生成。

终端原生的工程哲学

CC 选择了终端而非图形界面作为主场,这体现了其 “代理优先” 的设计哲学。CC 遵循 Unix 哲学,支持管道(Pipe)、脚本化和自动化集成。这种设计使得 CC 能够与现有的 CI/CD 流程完美衔接,例如在 GitHub Actions 中自动执行代码审计。Anthropic 最新推出的 Code Review 功能,就是通过 Claude Code 基于 PR 的方式进行 bug 的追踪。

下表详细对比了 CC 与行业领先的 AI 编辑器 Cursor 的差异:

MCP 与“即时上下文”

CC 深度整合了模型上下文协议(Model Context Protocol, MCP)。MCP 是一个开放标准,允许 AI 代理安全地访问外部数据源。

为了应对大规模工具定义导致的上下文溢出,CC 引入了 “工具搜索” 和 “代码执行” 模式。代理不再一次性加载成千上万个 API 定义,而是通过编写代码按需调用 MCP 服务。例如,在分析大型数据库时,CC 不会加载全量数据,而是编写针对性的查询语句,仅将结果摘要读入上下文。这种 “按需加载” 策略极大地提升了 Token 的效用。

CLAUDE.md 与自动记忆

CC 引入了 CLAUDE.md 文件作为项目的 “操作手册”。这是一个置于根目录的 Markdown 文件,用于存储项目特定的编码标准、架构决策和测试指令。与临时提示词不同,CLAUDE.md 提供了持久的、跨会话的约束。

此外,CC 具备 “自动记忆”(Auto Memory)功能。它会自动在 MEMORY.md 中记录项目的构建命令、调试心得和用户的偏好设置。每当新会话启动时,CC 会加载这些记忆的前 200 行,从而确保 AI 在长期协作中能够 “越用越懂你”。

三、OpenSpec:给 AI 编码加上"规格书",从失控到可沉淀

虽然 Claude Code 提供了强大的执行引擎,但在复杂业务中,AI 仍然可能因为意图不明而跑偏,最终导致交付的代码不符合预期。

OpenSpec 的出现为 AI 编码提供了 “规格说明书”,将 AICoding 从 “凭感觉写代码” 提升到了 “按规格执行任务” 的高度。

规格驱动开发 (SDD) 的兴起

OpenSpec 倡导的是一种 “规格驱动开发”(Spec-Driven Development)范式。其核心理念是:在写任何一行代码之前,先由人类与 AI 共同协商并锁定一份机器可读、人可评审的规格文档。

下表展示了 SDD 的三个演进阶段:

OpenSpec 的工件体系 (Artifacts)

OpenSpec 弃用了笨重的开发文档,转而采用一套轻量级的、面向 AI 优化的 Markdown 工件体系。每个变更(Change)都被组织在独立的文件夹中:

  • proposal.md: 描述变更的初衷(Why)和范围(What)。
  • specs/: 具体的逻辑规格,通常包含 “Scenario(场景)” 描述,通过具体的输入输出消除模糊性。
  • design.md: 技术设计方案,包括本次变更涉及的数据库变更、接口调整等。
  • tasks.md: 原子化的任务清单,作为 AI 的执行路径图。

解决上下文污染:提案、应用与归档

OpenSpec 最具洞察力的设计在于其生命周期管理。AI 在处理新任务时,最忌讳被旧任务的陈旧信息干扰。OpenSpec 的 “归档(Archive)” 机制解决了这一问题:

  • Proposal 阶段: 建立一个独立的变更上下文,让 AI 只关注当前变更。
  • Apply 阶段: AI 严格按照 tasks.md 执行,避免了盲目扫描全库导致的 Token 浪费。
  • Archive 阶段: 任务完成后,临时变更文档被移入归档,核心规格更新至主规格文件。这保证了 AI 始终在一个 “卫生” 的上下文环境下工作,同时也为项目留下了可追溯的决策链路。

四 、实战:CC + OpenSpec 如何落地真实业务

在实际的企业业务场景中,如何整合这两大工具?答案在于将 OpenSpec 的标准化指令集注入到 Claude Code 的会话环境中。

案例实战:复杂业务逻辑的重构

假设一个电商项目需要重构其优惠券结算逻辑。在传统的 AI 辅助下,AI 可能会在修改 CouponService.java 时遗漏分布式锁,或者破坏原有的满减叠加规则。采用 CC + OpenSpec 模式,流程如下:

第一步:提案初始化

执行 /opsx:propose "重构优惠券结算逻辑,引入 Redis 分布式锁并支持多卷叠加"。CC 会在 openspec/changes/refactor-coupon-logic/ 下生成整套骨架。AI 会通过分析现有代码,在 spec.md 中自动列出已知的结算场景。

第二步:规格对齐与边界确认

这时不用急着让 AI 写代码,而是需要先审阅 spec.md。如果发现 AI 没考虑 “优惠券过期临界点” 的并发问题,可以直接要求 AI 修改规格:“在 spec.md 中增加过期校验场景,并要求使用 Lua 脚本保证原子性”。

第三步:受控应用(Apply)

一旦规格通过人工评审,就可以执行 /opsx:apply 了。这时,CC 就变成了完美的执行机器。它不再 “猜” 开发者的意图,而是对照 tasks.md 逐项实施。每一项修改后,它都会运行相关的测试。如果测试失败,CC 会自动分析错误并重新修复,直到该项 Task 标为 “完成”。

第四步、归档与知识固化

任务结束后,执行 /opsx:archive。原本散落在会话记录中的重构逻辑,现在变成了 openspec/specs/coupon-settlement.md 中的标准规格。当下一次另一个 AI 代理(或新入职同事)需要修改此模块时,它只需读取这份规格,即可获得完整的业务语境。

工具链对比:为何选择 OpenSpec

在 SDD 工具链中,OpenSpec 展现出了极高的工程性价比:

OpenSpec 的优势在于它不试图改变开发者的工具偏好。无论是使用 Claude Code、Cursor 还是 Aider,都可以无缝接入 OpenSpec 的规格管理层。

五、沉淀:让 AI 编码能力在团队中持续积累

AICoding 落地的终极目标不是让个体开发者写得更快,而是提升整个团队的知识资产质量。AI 编码能力不应随对话窗口的关闭而消失,而应作为 “团队记忆” 沉淀下来。

从个人技能到组织技能

团队可以通过自定义 Skill 和 MCP Server 来固化组织资产。

  • Skill: 将公司特有的代码风格、安全审计清单,或者特定中间件的使用指南封装为 .claude/skills/。当团队成员使用 CC 时,AI 会自动加载这些技能,仿佛有一位资深架构师在时刻盯着每一行代码。
  • MCP Server: 连接企业内部的向量数据库(如基于 Zilliz 的语义搜索),让 AI 代理能够从数千万行历史代码中找到最佳实践。

建立 AICoding 效能飞轮

AICoding 的成功落地需要建立一套正向循环的 “飞轮”:

  • 规格积累: 每完成一个 PR,都强制更新对应的 OpenSpec 规格文件。
  • 指令进化:发现 AI 反复犯的错,就将其转化为 CLAUDE.md 中的负向约束(Prohibited rules)。
  • 并行执行: 利用 CC 的 Agent Teams 能力,让一个代理负责写规格,另一个代理负责审计代码,第三个代理负责集成测试。

角色转变:从 “码农” 到 “规格定义者”

在 CC + OpenSpec 模式下,软件工程师的角色正在发生质变。如果 AI 能够根据完美的描述生成任何代码,那么 “代码” 本身就变成了编译后的中间产物,而 “规格” 才是核心产品。领域专家(Domain Experts)的重要性显著提升,因为他们能提供最高质量的业务意图描述。这种趋势将迫使开发者从关注 “语法实现” 转向关注 “系统设计” 和 “逻辑严密性”。

六、结语:AICoding 落地的飞轮正在转动

在 2026 年,AICoding 已不再是科幻。Claude Code 提供的强大代理能力,配合 OpenSpec 提供的精密规格框架,为企业提供了一套可复制、可量化的研发新范式。

我们必须承认,AI 编码的瓶颈从来不是模型不够聪明,而是我们与 AI 之间的 “沟通带宽” 太低且 “上下文” 太脏。通过上下文工程化管理(CC)和意图标准化表达(OpenSpec),我们正在构建一套让 AI 能够长期、稳定产出的工程环境。

随着这一模式的普及,软件开发的门槛将进一步降低,而创新的上限将被无限拉高。AICoding 落地的飞轮已经转动,那些能够率先将 AI 编码能力转化为团队组织资产的企业,将在未来的数字化竞争中占据绝对的先机。毕竟,在 AI 时代,掌握了 “意图” 与 “上下文” 的人,才掌握了软件工程的未来。

参考文档:

  1. thenewstack.io/context-is-…
  2. github.blog/ai-and-ml/g…
  3. solguruz.com/blog/spec-d…
  4. medium.com/@eran.swear…
  5. www.anthropic.com/engineering…
  6. code.claude.com/docs/en/how…
  7. www.anthropic.com/engineering…
  8. code.claude.com/docs/en/bes…
  9. dev.to/webdevelope…

往期回顾

1.大禹平台:流批一体离线Dump平台的设计与应用|得物技术

2.基于 Cursor Agent 的流水线 AI CR 实践|得物技术

3.从IDE到Terminal:适合后端宝宝体质的Claude Code工作流|得物技术

4.AI编程能力边界探索:基于 Claude Code 的 Spec Coding 项目实战|得物技术

5.搜索 C++ 引擎回归能力建设:从自测到工程化准出|得物技术

文 /后羿

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

昨天以前首页

大禹平台:流批一体离线Dump平台的设计与应用|得物技术

作者 得物技术
2026年3月19日 10:47

一、前言

大禹平台是一个离线 Dump 平台。在不同的场景都有自己的 Dump 流程,我们这里的 Dump 特指在搜索、推荐、广告(后续简称 “搜推广”)的场景中,将异构数据源加工处理后给到索引平台做索引的流程。

Dump 流程有如下一些特点:

  • 多源异构的数据:包括 MySQL、ODPS、HBase 和 Kafka 等各种数据源。
  • 多样化的输出:输出支持搜推广引擎构建倒排索引、Summary 服务构建 kv/kkv 索引等。
  • 流批数据结合:一般会有全量和增量,需要保证处理逻辑一致,增量能达到秒级更新。
  • 数据处理能力:例如多表 Join、UDF、Filter 等,以方便业务的开发和接入。

离线 Dump 流程

二、项目背景

现状

当前 dump 开发模式

如上图是当前常见的 Dump 开发模式,采用了流批分离架构:流处理通过 DTS 订阅 binlog,由 Flink 消费主表变更事件并反查关联表构建宽表,实现增量更新;批处理则将 MySQL 数据抽取至 ODPS,通过 Spark 处理多源数据并按业务逻辑拼接,最终输出 ODPS 表。这种架构存在以下问题:

当前 dump 开发的问题

目标

依托社区搜索核心场景,构建流批一体化的新质 Dump 架构,实现以下三大核心能力突破:

  • 工程效率: 基于可视化 DAG 编排工具,提供低代码开发能力,通过拖拽式界面实现复杂任务流程的快速搭建与迭代,显著降低开发门槛。
  • 数据质量: 基于流批一体架构,通过统一逻辑开发范式实现流批数据同源同构,从根本上提升数据准确性与可靠性。
  • 稳定性保障: 通过引入镜像表和状态大宽表,提高了数据的查询效率,系统性降低对源库的反查压力,确保系统长期稳定运行。

二、大禹平台介绍

平台设计

系统架构

平台架构

如上图是大禹平台技术架构,底层依赖公司的 DJob Cron 定时任务、Flink/Spark 流批计算能力以及多种存储系统;上层为平台支持的搜推广多种场景业务。

大禹平台分为管理平台与后台系统两部分。管理平台完成处理逻辑的 DAG 开发和相关 Debug、回归验证、监控大盘等能力;后台系统将管理平台的配置转为执行任务,然后依托流批框架生成 Flink/Spark 执行实例,通过调度引擎完成全流程任务执行。

如下图是新版 Dump 流程,将 Dump 拆分为三个阶段:镜像阶段、宽表阶段、导出阶段,以及流、批两种处理模式。新版流程处理过程有如下优化:

  • MySQL 镜像至 HBase: 平台将任务依赖的 MySQL 数据统一同步至 HBase 构建镜像层,实现与上游 RDS 解耦。有效规避多任务并发反查导致的数据库压力,支持跨任务共享复用 HBase 镜像表,显著提升数据源稳定性与资源利用率。
  • Binlog 订阅平台化: 将 RDS Binlog 订阅流程深度内嵌,自动完成 DTS 订阅创建与 Kafka 资源申请,封装为标准化服务。开发者无需关注底层链路,一键配置即可获取实时变更流,降低接入复杂度,保障流式数据可靠性。
  • 状态大宽表消除反查: 基于 HBase 构建持久化状态大宽表,完整记录字段中间状态。任务处理时直接读取状态数据,彻底规避冗余反查逻辑,简化开发流程。

新版 Dump 流程

调度引擎

大禹平台利用得物 DJob Cron 自建调度系统,通过搭建多个 Cron Job 轮训的方式,完成对任务分阶段的处理。

Cron Job 构建调度系统

一个执行实例的全流程

执行框架

在镜像、宽表、导出三个阶段,分别都有对应 Spark 和 Flink 处理框架。其中,镜像阶段完成 MySQL 数据同步,导出阶段完成状态宽表到引擎数据源的导出流程,宽表阶段是具体的业务逻辑实现。

宽表 Spark 框架逻辑: 任务严格遵循 DAG 拓扑顺序,依次执行各算子节点(数据源→业务逻辑→导出)的数据处理流水线,最终通过 BulkLoad 方式将结果高效写入 HBase。

宽表阶段 Spark 框架逻辑

宽表 Flink 框架逻辑: 消费非维表节点的增量,依据节点依赖关系进行拓扑排序后依次执行各节点计算逻辑,将产出字段更新至状态宽表,并实时同步至下游导出链路。

宽表阶段 Flink 框架逻辑

流批一体保障数据质量

平台采用统一的 DAG 编排引擎,将流处理与批处理任务抽象为相同的计算拓扑,从架构层面保障数据源头的天然一致性,彻底规避因不同环境下开发导致的数据偏差风险。同时,平台内置标准化的 UDF(用户自定义函数)开发模板与运行时框架:开发者只需专注业务逻辑实现,编写的 UDF 代码经一次注册,即可无缝嵌入流式与批量处理流程,真正实现 “一次开发、流批复用”,显著提升开发效率,降低维护成本,保障 Dump 开发从数据源头到处理逻辑各环节的流批一致性。

平台通过定义 AlgoDumpUDF 方法类,完成消息类型封装,用户可以利用 UDF 实现数据过滤和驱动删除等逻辑。

public abstract class AlgoDumpUDF implements UDFFunction, Serializable {
    //消息类型 add/delete/drop 三种
    public AlgoDumpMessageType algoDumpMessageType = 
    AlgoDumpMessageType.MESSAGE_TYPE_ADD;
    @Override
    public AlgoDumpMessageType getStatus() {
        return algoDumpMessageType;
    }


    //调用该方法实现增量驱动删除
    @Override
    public void delete(Object key, String reason) {
        this.algoDumpMessageType = AlgoDumpMessageType.MESSAGE_TYPE_DELETE;
    }
    //调用该方法实现增量过滤
    @Override
    public void drop(Object key, String reason) {
        this.algoDumpMessageType = AlgoDumpMessageType.MESSAGE_TYPE_DROP;
    }
    /**
     * 用户重写该方法完成业务逻辑开发
     */
    public void process() throws Exception {
    }
}

CASE示例:用户通过重写process()方法, 实现自己的业务逻辑,实现时可以利用drop方法把无效数据过滤,利用delete方法实现对下游索引发送删除消息。

public class MyUdf extends AlgoDumpUDF implements Serializable {
    public  Tuple2<String, String> process(String id, String taskname) 
    throws Exception {
        //过滤消息
        if(StringUtils.isBlank(id)) {
            this.drop(id, "drop by id null");
        }


        //驱动增量删除消息
        if(id.equals(0)) {
            this.delete(id, "delete by id = 0");
        }


        //用户写具体业务逻辑
        String a1 = "";
        if (taskname.equals("dddddd")) {
            a1 = "ddd";
        }
        String b1 = "test";
        return new Tuple2<>(a1, b1);
    }
}

小全量模式加速数据Dump

大禹支持任务实例按照大全量和小全量两种模式运行,针对部分频繁更新部分字段需求的任务可实现快速加载。

  • 大全量: 对数据源执行全量同步重建,生成全新的状态大宽表,并同步刷新流批处理链路,实现数据基准的彻底更新与端到端一致性保障。
  • 小全量: 基于现有状态大宽表,仅针对批处理来源字段加载最新数据源快照,经处理后通过 BulkLoad 高效写入 HBase;依托 HBase 多版本特性实现新旧数据平滑切换,确保批处理数据增量更新过程中查询服务零中断、数据时效性与业务连续性兼得。

小全量模式

任务复用支持数据分层管理

大禹平台支持任务产出的双重应用:既可对接计算引擎(如 CEngine),亦可作为公共数据被下游任务高效复用。平台通过标准化的 MirrorOut(导出)与 MirrorIn(接入)算子构建清晰的数据复用链路 —— 上游任务将公共数据配置为 MirrorOut 导出,下游任务通过 MirrorIn 算子一键引用,无需重复开发与数据搬运,实现数据资产的即产即用、任务依赖的显式管理,显著提升开发效率与数据复用性。

任务复用

管理平台

任务开发与运维

管理平台提供一站式任务开发生命周期管理,涵盖任务创建、可视化流程编排、实例调度与资源管控等核心环节;其中,Dump 任务通过可视化编排实现业务配置——用户仅需拖拽算子节点、配置参数,即可直观构建数据处理逻辑,显著提升开发效率与配置准确性。

如下图,通过拖拽算子的方式,可以直观地构建 dump 任务的流程图,实现便捷高效的开发体验。

图画编排式开发任务

执行实例以可视化流程图形式完整呈现任务执行全流程,每个节点清晰展示输入参数与输出结果,并支持对指定节点进行手动重试或终止操作,便于问题定位与流程干预。

执行实例状态

辅助工具

数据回归验证

平台提供流批数据回归验证能力,支持模板化配置与一键复用,高效保障数据质量与业务稳定性。

  • 批量回归: 多版本批数据快速比对,一键校验全量一致性,适用于版本迭代验证;
  • 流式回归: 基于索引表增量变更抽样,对指定时间窗口内实时数据进行跨索引一致性校验,精准定位流式链路异常。

创建批数据回归任务

创建流数据回归任务

数据Debug

大禹平台构建了覆盖全链路的数据运维干预能力,确保数据处理的可靠性与灵活性。

  • 组图配置: 支持对源端组图配置进行主动干预与调整,实现配置策略的快速生效。
  • Dump流程: 支持Dump构建流程的调控,实现对全链路流程的问题快速定位,保障数据产出的稳定性与高效性。
  • 在线索引: 提供线上索引数据的实时干预能力,支持对增量数据进行修正,确保索引内容的及时性与准确性。

四、业务场景实践

社区搜索倒排表链路

如下图所示,社区搜索倒排表 Dump 任务以动态内容为核心实体,融合动态实时内容流、天级统计特征及商品多维特征,通过流批一体处理生成高时效的倒排索引宽表。

社区搜索倒排宽表链路

穿搭精选推荐链路

如下图所示,穿搭精选推荐 Dump 任务以动态-商品关系为核心主表,融合动态维度的多源流批特征数据(如内容特征基础表、内容审核表、天级离线统计特征表等),利用DAG 编排构建动态-商品的大宽表。

穿搭精选推荐链路

五、未来规划

平台能力持续增强

  • 算子体系完善: 基于业务场景持续增强关键算子(如维表动态更新、Service 服务化算子、UDTF 部署优化等)和优化调度流程,强化数据处理灵活性;
  • 性能深度优化: 引入任务剪枝、智能倾斜治理等策略,提升资源利用率与执行效率;
  • 可观测性升级: 构建覆盖全局大盘与任务粒度的监控体系,完善资源消耗追踪、Debug 与全链路 Trace 能力,夯实平台稳定性与运维支撑基础。

深化协同共建,释放平台价值

  • 纵向提效: 聚焦索引构建效率攻坚,与索引平台深度协同重构数据同步链路。以社区搜索大宽表为例,当前同步耗时近3小时,通过消除冗余中间状态、精简处理流程,可以实现索引构建端到端提速,显著压缩数据准备周期。
  • 横向赋能: 平台能力已在社区域多业务场景完成验证,后续可以联动其他业务场景共建;同时平台的子功能也具有通用能力,可将数据回归验证、索引监控大盘等高复用能力模块化开放,赋能各业务线“即插即用”,加速技术资产沉淀与跨域协同创新。

大禹未来规划

往期回顾

1.基于 Cursor Agent 的流水线 AI CR 实践|得物技术

2.从IDE到Terminal:适合后端宝宝体质的Claude Code工作流|得物技术

3.AI编程能力边界探索:基于 Claude Code 的 Spec Coding 项目实战|得物技术

4.搜索 C++ 引擎回归能力建设:从自测到工程化准出|得物技术

5.得物社区搜推公式融合调参框架-加乘树3.0实战

文 /野雨

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

从IDE到Terminal:适合后端宝宝体质的Claude Code工作流|得物技术

作者 得物技术
2026年3月17日 13:38

一、背景

事情是这样的,之前对 AI 编程一直是观望态度,但是部门最近在做 AI 辅助编程 POC,有幸成为 POC 用户,用上了自己舍不得买的高级编程模型 (感谢公司)。尽管我自认为是一个在代码上很挑剔的人,但是试了下感觉居然还可以 (Go、React)!只能说还得是谷歌,调整重心略微发力,Gemini 3 表现确实很不错。既然尝到甜头了,觉得自己是时候好好地琢磨琢磨,研究研究,沉淀一套自己的工作流、方法论,解放自己的生产力,顺应潮流努力成为 AI 时代的受益者,而不是被淘汰的人!

新的开发范式需要搭建新的开发环境和匹配自己开发习惯的工作流,这就像刚学编程那会,需要挑一个自己喜欢的 IDE、熟悉 IDE 快捷键和优化 IDE 设置一样。过程中间肯定有阵痛,Java 开发者们回忆一下多年之前从 Eclipse 转 IDEA 那会的阵痛吧,但是磨刀不误砍柴工,阵痛之后一定是生产力提升。借本文分享下我摸索后的方案,供大家参考。

二、工具选型

目前 AI 辅助编程领域热火朝天,各种 GUI 工具、TUI 工具如雨后春笋让人目不暇接,这对于花心的强迫症选手(比如我)来说选型很困难。但是我觉得有两个基础认知可以帮助我们更好地做决定:

(一)AI 辅助编程工具由脑和手两部分组成。脑是外接的大模型 API,手是各个产品调教的提示词和内部工作流。按我理解,【脑】决定了工具的上限,【手】决定了工具的下限。在这个场景里,大模型就像是汽车里的发动机,而且所有型号的汽车支持的【发动机】规格都是通用的、统一的、标准化的。有了这个基础,我们可以随便选一个趁手的工具,然后自行按场景选配【合适】的【发动机】。

(二)AI 辅助编程当前是一个【千帆竞发】的热门领域,而且单纯就【工具】来说,这个领域【没有技术壁垒】。A 产品抛出的杀手级特性,不出半个月一定会有 B 产品跟进。毕竟现在软件迭代的速度借助 AI 提升了很多,A 产品验证过的想法,B 产品可以很快地跟进和实现。Claude Code CLI 的开发者就使用 Claude Code CLI 迭代 Claude Code CLI,有点绕口,大概就是【工具自举】的意思吧。

Claude Code CLI

综上,其实没啥纠结的,我们照着这两点来选型就好:1. 这个工具一定得便捷地支持模型插拔,就是我随时可以根据场景换一个更适合的、更便宜的、表现更好的大模型。而且这种插拔一定要简单。 2. 这个工具一定要有积极的维护者,不断地迭代、优化它的工作流、提示词。最好是一个商业化产品,因为商业化产品出于其商业目标,一定会投入资源积极进行迭代。 

当前满足这两个条件的,我想也就是 Claude Code CLI 了: 1. Claude Code CLI 是一个商业化产品,有专门的技术团队在不停地更新、迭代。 2. Claude Code CLI 可以非常便捷地支持大模型插拔,我可以随时根据成本、效率、体验来切换合适的大模型。因此,这个环节我选 【Claude Code CLI】。

后文以CC代指Claude Code CLI。

快速切换模型

我通过自定义 Shell 函数来实现便捷的模型切换,不同的场景、不同的任务使用不同的模型。基本原理就是,CC 支持环境变量注入 LLM 配置信息,因此我只需要按场景注入【行内临时环境变量】即可。

详见:Bash - 行内环境变量,Bash 是标准的 Shell 实现,其他 Shell 如 Zsh 都兼容其行为。

Shell配置

我到处弄了一堆免费的、收费的模型用,然后给他们取了我记得住的别名:

使用效果

为了兼容,设置了一个 claude 别名:

这样输入claude 时,默认使用智谱 GLM 模型。

脚本源码

Shell 脚本大概这样,可以修改后配置到自己的 ~/.zshrc 中。如果不熟悉 Shell,嫌麻烦也可以试试这个开源工具:farion1231/cc-switch。

# claude 默认
alias claude='zcc'
# Kimi
function kcc(){
    echo Kimi Claude Code...
    local model="kimi-k2.5"
    ANTHROPIC_BASE_URL="https://api.moonshot.cn/anthropic" \
    ANTHROPIC_AUTH_TOKEN="sk-xxxxxxxxx" \
    ANTHROPIC_SMALL_FAST_MODEL="$model" \
    ANTHROPIC_DEFAULT_OPUS_MODEL="$model" \
    ANTHROPIC_DEFAULT_SONNET_MODEL="$model" \
    ANTHROPIC_DEFAULT_HAIKU_MODEL="$model" \
    CLAUDE_CODE_SUBAGENT_MODEL="$model" \
    launch_claude_code $@
}
# 智谱GLM
function zcc(){
    echo GLM Claude Code...
    ANTHROPIC_BASE_URL="https://open.bigmodel.cn/api/anthropic" \
    ANTHROPIC_AUTH_TOKEN="sk-xxxxxxxxx" \
    launch_claude_code $@
}
# 七牛
function qcc(){
    echo QiNiu Claude Code...
    local model="minimax/minimax-m2.1"
    ANTHROPIC_BASE_URL="https://api.qnaigc.com" \
    ANTHROPIC_AUTH_TOKEN="sk-xxxxxxxxx" \
    ANTHROPIC_SMALL_FAST_MODEL="$model" \
    ANTHROPIC_DEFAULT_OPUS_MODEL="$model" \
    ANTHROPIC_DEFAULT_SONNET_MODEL="$model" \
    ANTHROPIC_DEFAULT_HAIKU_MODEL="$model" \
    CLAUDE_CODE_SUBAGENT_MODEL="$model" \
    launch_claude_code $@
}
function launch_claude_code(){
    CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC=1 \
#    clear
    command claude $@
}

三、开发环境

在当前的气氛下,我想我算是一个【古板】的开发者,我做不到【fire and forget】,或者说完全靠黑盒的自然语言对话来完成代码开发。

我还是只将 AI 当助手,还是想要白盒的掌控 AI 写的代码,还是希望最终交付的代码有我的风格、我的审美、我的品味。毕竟 AI 也只能帮我写代码,并不能帮我背锅。尽管我选择了 TUI 工具 Claude Code CLI,但是我还做不到全程只在终端操作,我还是习惯 JetBrains 特色的双栏 diff。

因此,当前我开发流程的起点还是传统的 IDE,比如我最喜欢的 JetBrains。每天上班第一件事是接水,第二件事就是打开 IDE。所以我需要想办法来将 GUI 工具和 TUI 工具流畅的衔接起来,减少代码开发时的频繁切换产生的割裂感!

多屏协作

如上图,我有 3 个显示器,我的构想是这样的:

  1. MacBook 内置显示器 —— 常驻两个空间:一个用来打开浏览器,还有 VPN、网易云音乐、Finder 软件,用来承接各种临时的操作。一个用来打开飞书,用来沟通、协作。

  1. 中间主屏 —— 常驻两个空间:一个用来打开浏览器,用来做各种【输出】。一个用来打开 IDE,专注于写代码、看代码,用标签页打开多个 Project。

  1. 左边竖屏 —— 常驻两个空间:一个用来打开浏览器,用于看文档、查资料等各种【输入】。一个用来打开 TUI 工具,进行辅助编程!

GUI/TUI衔接

现在问题来了,我希望我的开发工作的【主轴】是 IDE,流程的起点是 IDE。但是我的 IDE 在中间屏幕,终端在左边屏幕,它俩是独立软件,没法协作、自动跟随切换 Project 的工作目录。我希望有个【自动化流程】,当我在 IDE 里切换项目的时候,CC 自动跟随切换!

衔接流程

我期待的流程是这样的:

因为某个原因,我在 IDE 里打开了一个项目 A  准备写代码了,点击 IDE 里的某个【按钮】,左边屏幕自动【新建】一个项目 A 的 CC 会话终端并激活到前台显示   我跟左边的 CC 对话,让他干活  我在中间的 IDE 里评审、调试、诊断  因为某些原因我又要在 IDE 打开一个别的项目 B  我再次点击那个【按钮】,左边屏幕自动【新建】一个项目 B 的 CC 会话终端并激活到前台显示  我在 IDE 里又切回了项目 A,我又点击了那个【按钮】,左边屏幕自动【切换】到 A 的 CC 会话终端并激活到前台显示。

好的想法已经有了,AI 时代就怕你没有想法,有想法就一定有办法实现!

代码实现

  1. macOS 上的原生软件,大部分支持 AppleScript 自动化,也就是说我们可以写脚本驱动软件的行为、模拟人机交互,比如打开软件、新建 tab、点击按钮等。

  2. JetBrains IDE 支持集成外部命令,也就是说:可以在 IDE 里点击一个按钮,自动执行一个 Shell 脚本或者别的可执行文件。

产品需求清晰了,接下来开始让 AI 干活!一顿沟通和调试之后,我们有了一个【自动化】创建 iTerm2 新标签的可执行脚本!

这是给大模型的需求提示词,大家可以按需选用,做个性化的调整:

## 📌 工具功能说明
请帮我创建一个 macOS 上的 iTerm2 自动化工具,主要功能包括:
### 核心需求
1. **智能窗口管理**:自动使用或创建 iTerm2 窗口
2. **项目标签管理**:为每个项目目录维护独立的标签页,支持标签复用
3. **三面板布局**:自动创建固定的三面板布局(上方一个全宽面板,下方两个并排面板)
4. **命令自动执行**:在每个面板中自动切换到项目目录并执行预定义的命令
### 使用场景
```bash
# 基本用法:在当前目录打开
./open-claude-in-iterm.sh
# 指定项目目录
./open-claude-in-iterm.sh /path/to/project
```
---
## 🎯 技术架构要求
### 技术栈
- **Shell 脚本** (open-claude-in-iterm.sh):参数处理、路径规范化、日志管理
- **AppleScript** (open-claude-in-iterm.applescript):iTerm2 自动化核心逻辑
**依赖**:macOS、iTerm2、Bash
---
## 📋 详细功能规格
### 1. Shell 脚本 (open-claude-in-iterm.sh)
#### 参数处理
- **参数1**:项目目录(可选,默认当前目录)
- **自动处理**:相对路径转绝对路径
#### 面板命令配置
```bash
PAN1_CMD="claude"     # 上方面板命令
PAN2_CMD="claude"     # 左下面板命令
PAN3_CMD="claude"     # 右下面板命令
```
### 2. AppleScript (open-claude-in-iterm.applescript)
#### 主要流程
**步骤1:窗口管理**
- 检查 iTerm2 是否运行(未运行则自动启动)
- 使用当前激活的 iTerm2 窗口,如果没有则创建新窗口
**步骤2:标签管理(关键逻辑)**
- 在找到的窗口中,查找 `session.path` 变量等于项目目录的标签
- **复用逻辑**:如果找到现有标签 且 窗口不是新创建的 → 直接切换标签并返回
- **创建逻辑**:如果未找到标签 或 窗口是新创建的 → 创建新标签和布局
**步骤3:三面板布局创建**
```
布局示意图:
┌─────────────────────────┐
│   上方面板 (全宽)         │
│   执行: PAN1_CMD         │
├──────────────┬──────────┤
│  左下面板    │  右下面板 │
│  PAN2_CMD   │  PAN3_CMD │
└──────────────┴──────────┘
```
**分割顺序(重要)**:
1. 初始状态:一个全屏 session(上方面板)
2. 第一次分割:对上方 session 执行**水平分割**,创建下方面板
3. 第二次分割:对下方 session 执行**垂直分割**,创建右下面板
**步骤4:命令执行**
在每个面板中依次执行:
1. 切换到项目目录:`cd "/path/to/project"`
2. 清屏:`clear`
3. 等待 0.3 秒(确保目录切换完成)
4. 执行命令:`PAN_CMD`
5. 等待 0.5 秒(确保命令启动)
## ⚠️ 常见错误
- ❌ 符号链接未处理,导致找不到 AppleScript 文件
- ❌ 分割顺序错误,导致布局不正确
- ❌ 缺少 delay,导致命令执行失败或在错误目录执行
- ❌ 新窗口处理错误,导致多余空白标签
- ❌ 标签复用逻辑错误,导致同一项目创建多个标签
- ❌ 路径未引用,导致包含空格的路径失败

IDE配置

创建外部工具

添加到工具栏

使用效果

点击工具栏按钮后,自动在全屏的 iTerm2 窗口新建或激活项目目录下的 CC 会话,下图里就是 3 个项目。

四、多Agent协作

会的越多,让你干的就越多。既然 AI 那么牛,一个 CC 会话已经满足不了我膨胀的想法和需求了。我希望我可以同时支配多个 AI 开发工程师,而我变成 PM!所以参考酒米的思路,我给每个项目的终端,自动化的划分了 3 个子窗口,每个子窗口都是一个 CC 会话。效果大概这样:

主从架构

每个项目自动打开 3 个常驻的 AI 会话,我设想的工作流是这样的:

【架构师】上面的大屏,用贵的模型!专门用来跟我聊需求、对方案、产出任务列表。

【开发者】下面的两个小屏,用领域特定的模型,专门用来落地大屏架构师产出的方案和任务。比如前端需求用前端效果好的模型,后端需求用后端效果好的模型。

知人善用才是好 PM!这个模式也很匹配现实中的组织架构和成本取舍,现实中每个需求一般也都是由一个架构师和多个中高级开发者来协作完成!感谢热心市民无声雨,给我们小组共享了自己采购的纯血 Claude 模型,所以目前我用 Claude 模型来对方案,用 GLM 或者 MiniMax 来实施方案!

规范驱动开发(SDD)

主从智能体的协作很重要,我跟【架构师】聊了半天确定的方案和设计,需要有一个清晰的、对大模型友好的方案和任务文档作为【开发者】的输入。这就很巧,刚好最近在流行 SDD,规范驱动开发。大致就是模拟现实中的软件开发流程将开发生命周期拆分为 3 个阶段:

  • 【proposal】需求对齐、方案设计、【任务细化】;
  • 【apply】开发任务实施;
  • 【archive】功能验收、文档沉淀

围绕这个流程,开源社区设计和研发了一系列对大模型非常友好的工具和提示词(比如 OpenSpec),【阶段 1】和【阶段 2】中间通过格式设计良好的【设计文档和任务文档】来进行上下文交接。

也就是说,我可以在上述的 3 窗口环境中,按照 SDD 流程来:【proposal】跟【架构师】交互,对齐需求、设计和任务 A  【apply】让【开发者 1】着手完成任务 A  【proposal】继续跟【架构师】交互,对齐需求、设计和任务 B  【apply】让【开发者 2】着手完成任务 B  【proposal】继续跟【架构师】交互,对齐需求、设计和任务 C  【apply】让【开发者 1】着手完成任务 C  ……

五、CC拓展

CC 当然很厉害,但它本质上也就是一个朴素的 ReAct 模式智能体。

ReAct 这么火,大家肯定也都耳熟能详了,我们也就不说太多。当然 CC 团队围绕编程这个课题做了很多细致的提示词调优和内置工作流设计,这个我们黑盒的用就好了,也没必要关注太多。我们最需要关注的,是 CC 提供给我们使用者的【拓展点】,那些允许我们个性化设置的东西。

命令(command)

命令的本质就是预定义的提示词模板。目的是为了省事,不用每次都重复的输入类似的提示词。比如想让 CC 帮我提交代码,每次我们可能都要交代一大堆字,比如:

请调用 git diff --cached 获取当前暂存区的代码变动。
忽略所有的 node_modules 或二进制文件。
基于变动内容,判断这是一个 feat (新功能), fix (修复) 还是 chore (杂务)。
生成一个不超过 50 字符的标题,并在正文详细列出影响的文件。
由我确认后执行 git commit。”

就像写代码的时候将重复代码提取为一个独立方法一样,我们可以把这些可以复用的提示词固定成一个【命令】,后续使用的时候,直接输入命令名字就好。斜杠命令是一段提示词的快捷方式。

技能(skill)

技能和命令最大的差别就是:命令是用户主动提交的提示词,而技能是 Agent 自己决策后自动导入的提示词。当然技能包里除了提示词,一般还会携带一些配套的工具、脚本、命令或者文档。

比如,我安装了一个【html 转 pdf 的技能包】,这只能提示 CC 可以使用这个技能,但是具体用不用、什么时候用、怎么用都是 CC 自己规划、决策的。

子代理(subAgent)

SubAgents 是可以并行处理任务的独立 AI 代理,每个子代理拥有独立的上下文窗口,可以分配不同任务以提高效率。【主代理】的上下文窗口中包含有【子代理】的【简短】描述信息,可以基于这个描述信息规划、决策使用哪个子代理。

{
  "agents":{
    "code-reviewer":{
      "description":"专门负责代码审查的子代理",
      "model":"claude-opus-4-5",
      "instructions":"你是一个专业的代码审查专家,专注于检查代码质量、安全漏洞和性能问题。",
      "tools":["read","search","git"],
      "permissions":{
        "allowWrite":false
      }
    },
    "test-writer":{
      "description":"专门负责编写测试的子代理",
      "model":"claude-sonnet-4-5",
      "instructions":"你是一个测试工程师,专注于编写全面的单元测试和集成测试。",
      "tools":["read","write","bash"]
    },
    "doc-generator":{
      "description":"专门负责生成文档的子代理",
      "model":"claude-sonnet-4-5",
      "instructions":"你是一个技术文档专家,专注于生成清晰、准确的技术文档。",
      "tools":["read","write"]
    }
  }
}

独立上下文窗口的好处是:避免上下文污染和占用。比如我要在代码里找一个接口的所有实现类,这个就很适合子代理来做。主代理只需要交代给子代理接口名,然后就等子代理返回实现类列表。

这样在主代理的上下文窗口里,只会有子代理的输入和输出(几个类文件路径),而子代理在搜索过程中遍历文件、目录、读取文件内容产生的临时 token,不会对主代理产生影响。我目前认为 SubAgent 和 Skill 差不太多。不过我不确认 Skill 是不是在独立的上下文中执行。

MCP

MCP 和技能一样,都是由 CC 自主规划、决策使用的。差别有两个:

  1. MCP 工具的说明信息占用的上下文太多了!不管是否被使用,每次都需要一口气提交所有工具的完整元信息(使用说明 + 出入参 Schema)供大模型规划、决策,占用大量上下文。而【技能】选择了【渐进式披露】,先向大模型提供少量关键信息,只有在大模型选择了使用技能时,才告诉大模型更多关于技能的补充说明信息,让大模型进一步推理、决策。

  2. MCP 工具更多的偏向【远程 RPC】,基于网络来实现原子化的远程能力调用。而【技能】更多的偏向【本地 IPC】,具体能力更多通过【编排】本地脚本、本地命令来实现,有点像 stdio 模式下的 MCP。

钩子(hook)

hook 是在特定事件触发时自动执行的脚本,用于自定义工作流、拦截危险操作、自动格式化代码等。就类似 Linux NetFilter,CC 在很多地方植入了流程执行的劫持点,将流程上下文交给用户开发的脚本或者命令。

插件(plugin)

plugin 就是上述各种拓展打包、分发、安装的一种格式。你可以把它想象成 npm 包、pip 包、apk 包等我们比较熟悉的概念。然后我们可以按流程和格式建设插件市场,类似 pip-index、npm-index 等。

我没有细看流程和格式,但是大概也就是一个特定文件布局的 zip 文件包,里面有插件描述信息和各类拓展,比如可以包含:

  • 5 个 Skills;
  • 10 个斜杠命令;
  • 3 个 MCP 服务器配置;
  • 2 个 SubAgent 定义;
  • 若干 Hooks。

六、CC技巧

飞书MCP

飞书官方提供了 MCP,我主要用它来读写飞书文档,蛮好用的,大家可以试试。比如我每周都要在固定目录下创建固定标题格式的【系统巡检文档】,所以我借助飞书 MCP 整了个自定义 Command 帮我自动创建这些文档去除重复劳动,感觉真香!之前每次都要手动建 3 个文档、选目录、改名字!

@模糊搜索

有时候我们需要精确的告诉 CC,哪个文件需要读或者改,其实不用从 IDE 里复制文件路径,直接在终端里模糊搜索就好了。

WebFetch

CC 默认集成了 WebFetch 命令,就是指定 URL 读取网页内容,这个理论上就是一个本地执行的 curl 命令,没有云端成本,不需要云端协作。但是有个问题:(一)CC 在访问地址之前,会先调用 anthropic.com 的一个风控接口,判断这个网络地址是否有安全风险。(二)政策原因,anthropic.com 会拒绝所有来自中国大陆、香港的请求,风控接口返回 404 或者其他。(三)风控不通过,WebFetch 失败。

在 ~/.claude/settings.json 中添加如下配置,禁用 WebFetch 工具前置的风控检查就好了。

{
  "skipWebFetchPreflight":true,
}

详见:linux.do/t/topic/114…

WebSearch

WebSearch 是需要云端协作的,需要有个搜索引擎服务提供能力。因为我们没有用官方的付费订阅,所以默认的 WebSearch 工具我们用不了,调用 WebSearch 工具得到的结果都是 0。

办法是去找一个免费或者收费的 MCP 服务。免费的我看大家都推荐 Brave<brave.com>,大家也可以找找别的。收费的也有很多,我看智谱的套餐里限量提供了 <联网搜索 MCP - 智谱 AI 开放文档>。也有很多按量付费的,大概几分钱一次,有需要的可以找找。

添加了 MCP 搜索工具后,建议禁用 CC 自带的 WebSearch 工具,不然每次跟大模型交互时,工具信息还会带给大模型,产生额外的 token 开销和推理误判。在 ~/.claude/settings.json 中添加如下配置:

{
  "permissions":{
    "deny":[
      "WebSearch"
    ]
  }
}

iTerm2通知

终端上的任务需要我们输入的时候,可以配置下,让 iTerm2 发出声音和通知。这样我们就不会因为忘记确认操作而阻塞进度。

详见:Optimize your terminal setup - Claude Code Docs

清空上下文

因为我们每个项目都复用一屏内的 3 个子窗口,一般不会重开。为了避免上下文溢出或者之前对话对新任务产生干扰,当我们完成一个任务时,需要及时的执行 /clear 命令,清空上下文,从 0 开始新对话。

如果任务没有完成,但是又不得不 clear,那么可以维护一个自定义命令,在 clear 后提示大模型根据 git status 看到的文件变更快速找回上下文。把 git 状态当作 AI 的 “短期记忆快照”,/clear 只清上下文,不清工作进度。

# Context Catch-up
当前对话已被 `/clear`,请通过 git 状态恢复上下文。
使用方式:
1. 阅读 `git status`(必要时结合 `git diff`2. 仅基于文件变更推断正在进行的任务
3. 延续现有实现思路,不要假设额外背景
4. 在未收到明确指令前,先给出你对当前上下文的判断
目标:
- 快速找回任务状态
- 避免旧对话或错误假设干扰新任务

注意力哨兵

在记忆文件里要求大模型扮演一个特别的角色,如果聊着聊着角色行为丢失了,说明大模型注意力失焦了,已经丢掉了你最开始的要求。这时候就该 clear 一下重开会话了。

拓展市场

为了便于相关个性化拓展物料的分发、便于大家搜索、安装,市面上已经有了相关的分发平台和便捷安装命令了。

  1. skills.sh

  1. www.aitmpl.com

状态行个性化

状态行显示在 Claude Code 会话界面底部,可以自定义显示的内容,比如git分支名、目录名、模型名等。推荐使github开源项目:claude-code-statusline-pro-aicodeditor,效果如下:

详见:github.com/HorizonWing…

七、总结

差生文具多,尽管我暂时还没有使用 CC 产出啥说得上来的东西,但是确实花了很多时间琢磨怎么让它用起来更顺手。一些不成熟的想法,希望可以给到大家启发。

参考:

  1. www.ginonotes.com/posts/how-i…
  2. www.cnblogs.com/knqiufan/p/…

往期回顾

1.AI编程能力边界探索:基于 Claude Code 的 Spec Coding 项目实战|得物技术

2.搜索 C++ 引擎回归能力建设:从自测到工程化准出|得物技术 

3.得物社区搜推公式融合调参框架-加乘树3.0实战

4.深入剖析Spark UI界面:参数与界面详解|得物技术

5.Sentinel Java客户端限流原理解析|得物技术

文 /羊羽

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

AI编程能力边界探索:基于 Claude Code 的 Spec Coding 项目实战|得物技术

作者 得物技术
2026年3月12日 11:20

一、前言

10 天,2.5 万行代码,提效 36%。 基于 Claude Code 的 Spec Coding(规格驱动编码) 深度实战。通过 2,754 次工具调用,我们不仅完成了从 0 到 1 的前端项目搭建,更在“约束+示范+视觉”的三层规范体系下,摸清了 AI 编程的真实能力边界。本文将复盘这场实战,拆解如何用结构化工作流消除 AI 的不确定性,重构开发者的核心竞争力。

二、Spec Coding

什么是 Spec Coding 工作流

众所周知,Spec Coding(规格驱动编码)的核心思想是:在写代码之前,先写规格文档。通过 openspec 工具,每个功能变更都经历以下阶段:

Spec 工作流的实际价值

减少返工: 在 proposal 阶段明确为什么以及怎么做,避免实现完才发现方向不对。适合复杂功能: 对于需要跨多个文件多个层次的功能,tasks 分组让 AI 聚焦在当前步骤。可审计: 每个 Change 的完整决策链(proposal→design→specs→tasks)都留有记录,方便回溯。

三、项目是什么

一个标准企业级中后台搭建,包括表格、表单、卡片列表、数据看板等中后台常见核心功能,项目从零搭建到完成以下全部功能,全程使用 Claude Code 辅助开发。

四、数据概览

在这次使用Claude Code 做 Spec Coding的从0到1项目探索中,我们积累了一份完整的原始数据,以下所有数字均来自Claude Code对 109 个 .jsonl 会话文件的整体数据统计:

2,754 次工具调用的分布揭示了 AI 的"工作方式", AI 自主完成的 738 次文件读取、550 次代码编辑、662 次终端命令执行,以及 208 次任务进度标记——几乎覆盖了一个研发日常工作的全部动作类型。

五、开发时间线:10 天的演进过程

阶段一:设计阶段

在动工之前,我们完成了产品方向的确认和 UI 设计稿、产品PRD的输出。过程主要使用 Cursor + 设计规范 Rules,直接从概念沟通到生成高保真 UI 稿(HTML文件),再生成标准的 PRD 需求描述,覆盖系统所有核心页面。这一阶段的产出是一套可直接用于开发对齐的视觉参考,也是后续 AI 生成代码时的重要上下文来源。

阶段二:项目搭建(2个工作日,20 条指令)

此阶段我们以问答式交互为主,聚焦于项目基础设施的搭建和简单需求的尝试。我们向 AI 提出架构问题,由 AI 给出方案,我们决策后执行。在这个过程中,AI 帮助我们熟悉技术栈、搭建项目结构、配置开发环境,并完成了第一个核心列表页面的开发,成功打通了前后端的数据链路。

阶段三:功能开发(4个工作日,89 条指令)

这是整个项目开发强度最高的阶段,我们引入了“规格驱动编码”(Spec Coding)的工作流,约 80% 的功能代码在此阶段完成。我们不再是简单地给 AI 下达指令,而是先与 AI 共同定义清晰的功能规格(Specification),然后 AI 基于这份“蓝图”自主进行编码。通过这种方式,我们高效地完成了包括授权管理、数据分析看板、文档树状结构等多个复杂功能的开发。

阶段四:细节打磨与生产部署(4个工作日,108 条指令)

最后阶段的工作重心转向功能迭代、系统重构和生产环境的部署排障。我们与 AI 一起,对已有功能进行了多轮优化,例如完善了核心业务流程、重构了侧边栏导航、修复了登录跳转逻辑等。同时,我们也对项目首页进行了深度的代码重构,解决了前期快速迭代中积累的技术债。最后,在部署阶段,我们遇到了复杂的构建问题,通过与 AI 的多轮分析和尝试,最终定位并解决了问题,成功将应用部署上线。

六、典型案例

案例一:AI 驱动产品设计

没有产品经理、没有 UI 设计师,一个工程师如何用 AI 独立完成从产品定义到高保真原型、再到研发文档的全流程。

背景:

传统意义上,从 0 到 1 开发一个企业级知识问答平台需要三个角色:产品经理(需求分析 + 用户路径 + PRD)、UI 设计师(交互稿 + 高保真设计稿)、工程师(编码实现)。这个项目设计过程中,通过让 AI 在不同阶段扮演不同角色,覆盖了全部三个职责。

让 AI 扮演产品经理:

在 Rules 中植入「首席产品专家」Persona 提示词,将 AI 从工程师的「急于执行」模式切换为产品经理的「先想清楚」模式,与 AI 聊清楚我们想干什么。

让 AI 扮演 UI 设计师:

在 Rules 中定义设计规范,通过对话式生成逐页产出高保真 HTML 文件,而不是源码:

让 AI 生成研发可读的 PRD:

基于产品经理角色,将 HTML 设计稿作为上下文,最后生成精确到组件行为级别的 PRD:

案例二:SDD 驱动前端功能研发

在已有系统上增量交付一个完整功能模块,SDD 如何保证「增量」功能快速开发,并系统性提升前后端联调效率。比如其中有个SSD需求开发「定时任务管理」完整模块,并且对接 6 个后端接口。这是 SDD 工作流第一次被完整运用于新功能模块开发,也是验证「SDD + MCP」前后端联调提效的关键场景。

页面功能开发: opsx:new 到 archive,人工指令 < 10 条,AI代码占比100%,交付完整任务管理模块(独立路由 + 完整 CRUD + 执行记录 + 检索结果)。

前后端联调: SDD + MCP 的联调路径:接口 URL → MCP直连文档 → 一次性获取字段、枚举、必填项 →  接口文件一次生成 → ****联调一次通过,6 个接口零联调返工。

研发效率: 同日额外交付了两个完整模块,3个独立完整模块,单日全部开发完成,按纯人工开发,当天人效提升3倍。

案例三:SDD 驱动系统重构

重构与新功能的根本差异:

新功能开发是「从无到有」:AI 可以大胆生成,错了删掉重来。重构是「在活体系统上动手术」:这种高风险对 AI 执行提出了截然不同的要求——不仅要知道改什么,更要知道不能改什么,以及按什么顺序改。 SDD 的价值正在于此:在动代码之前,把这三件事全部写清楚。

知识问答首页重构:

架构债务: 大量首页业务组件与公共组件混放、useChat 导出 20+ 方法(4 种无关职责混合)、ChatInterface 接收 17 个 props(参数3 层传递)。

执行TASKS: 9 组 34 个子任务,从「grep 确认组件当前归属」→「按新分层迁移」→「更新所有 import 路径」→「tsc 类型检查」→「冒烟验证」,每一步有明确输入和验收标准。

执行结果: 34个任务全部完成(含 4 个验证任务),AI 全程独立执行,人工干预 < 5 条指令。7个业务组件与公共组件完成解耦,useChat 拆为 3 个单职责 hook,ChatInterface 从 17 个 props 缩减至 6-8 个。

案例四:复杂问题排障

并不是所有编程相关的问题AI都可以解决,哪类工程问题从结构上超出了 AI 的能力边界?这里举一个遇到的场景。

其中有一天遇到一个测试环境构建失败的问题,结果过程约 4 小时,7 个会话、15+ 次方案尝试、59 条指令。整个项目单日指令最多的一天,也是 AI 独立解决能力最受限的一天。

这一天有一个值得注意的特征:AI 每次分析都是正确的——问题不在于 AI 的分析能力不足,而在于问题的结构性特征超出了 AI 的信息范围和反馈机制:

  • 云服务器构建时发生,本地无法复现: 每次验证方案必须提交代码等待 CI(一轮约 10 分钟),AI 分析的是日志截图,无法感知「现在的 CI 环境还有哪些隐性配置」。
  • 多根因互相掩盖,解决一层才暴露下一层: AI 每次分析都正确,但正确分析的只是当前暴露的那一层,问题全貌无法被单次分析覆盖。
  • 隐性行为无文档,根因藏在依赖源码内部: Prisma postinstall 境外下载没有任何显式错误,引导AI 不得不深入阅读 node_modules 源码第 2319 行才能发现根因。这类「运行时行为藏在依赖内部、没有文档描述」的问题,超出了 AI 通过训练数据或当前上下文主动推断的范围。

最后确认的原因:

  • .npmrc 历史副作用: 早期为跳过 @next/swc-darwin-arm64 在 Linux 下载而加入的 omit=optional,无意间也跳过了 @tailwindcss/oxide-linux-x64-gnu(Tailwind v4 的 native binding),postinstall 陷入循环等待
  • Prisma v6 境外下载沉默卡死: AI 需要阅读 node_modules/@prisma/fetch-engine/dist/index.js 第 2319 行才能发现这个行为——postinstall 不报错、不超时,只是无限等待。
  • pnpm 跨平台 lockfile 不一致: macOS arm64 生成的 lockfile 不含 Linux x64 的 native package;切回 npm 则 lockfile 被忽略,安装结果每次不同。

最终解法(4 小时探索后得出):

七、代码规范落地:CLAUDE.md 和 Rules 的实际效果

规范体系设计思想:三层结构

本项目的规范体系是三个层次的协同约束, 每层解决不同的问题:

第一层:约束层(.claude/rules/)      ← 告诉 AI「禁止什么、必须怎样」
第二层:示范层(.claude/code-design/)← 告诉 AI「标准产出长什么样」
第三层:视觉层(.claude/ui-design/)  ← 告诉 AI「页面应该长什么样」

为什么需要三层?

只有「约束层」时,AI 知道规则但缺乏参考实现,容易在复杂场景下产生符合规则但不符合团队风格的代码。加入「示范层」和「视觉层」后,AI 可以直接对齐团队的标准产出,减少「虽然合法但不地道」的代码。

第一层:约束层(.claude/rules/)

7 个规范文件,分别约束不同维度:

.claude/rules/
├── ts.md          # TypeScript 规范(禁止 any、使用可选链等)
├── code-names.md  # 命名规范(kebab-case/camelCase/PascalCase)
├── comment.md     # 注释规范(JSDoc、@ai-context/@ai-rules 文件头)
├── lint.md        # 代码风格(单引号、文件末尾换行)
├── style.md       # 样式规范(Tailwind CSS、less 文件)
├── pages.md       # 页面目录结构规范(constants/services/hooks/components 分层)
└── service.md     # API 接口生成规范(fetch{Name}Api 命名、UniversalResp 泛型)

第二层:示范层(.claude/code-design/)

将项目常见场景预置完整的「标准模板代码」,AI 在生成新页面时可以直接参照,后续可以切换为skills:

.claude/code-design/
├── pro-table/          # 通用列表页模板(含搜索、分页、批量操作、行操作)
├── pro-form/           # 通用表单页模板(含创建/编辑双模式、字段验证)
├── editable-pro-table/ # 可编辑表格模板(含行内编辑、添加/保存/删除)
├── drawer/             # 抽屉组件模板(含标准打开/关闭逻辑)
├── compontent/         # 通用组件模板(含 README、Props 定义、使用示例)
└── utils/              # 工具函数模板

示范代码的作用不只是「看个格式」。以 pro-table 为例,当开发者让 AI「参考 .claude/code-design/pro-table 生成知识治理列表页」时,AI 直接继承了这套模式,一次就能生成符合团队风格的代码,无需多轮调整。

第三层:视觉层(.claude/ui-design/)

注意存放 HTML 设计稿,覆盖主要页面的视觉参考:

.claude/ui-design/
├── knowledge-spaces.html  # 知识空间列表页设计稿
├── search-strategy.html   # 检索配置页设计稿
├── space-detail.html      # 空间详情页设计稿
└── xxx设计稿

这些 HTML 文件可以直接在浏览器中打开预览,AI 也可以读取其中的结构和样式信息。实践中,提供 HTML 设计稿后,AI 生成的 UI 与设计意图的吻合度明显高于纯文字描述,尤其是布局结构、颜色方案、间距配置等细节。

规范约束的实际效果

正面效果(规范被遵循的案例):

  • 接口命名一致性: 所有接口函数均以 fetch{Name}Api 命名,类型以 I{Name}Req/Res 格式,整个项目 205 个文件保持高度一致。
  • 目录分层被遵守: constants/、services/、hooks/、components/ 分层在每个新页面中都被正确创建。
  • 代码模板被继承: CURD页面均参照了 pro-table 模板的 hooks 分离方式,代码结构高度一致。
  • 使用可选链: 几乎所有数据访问都使用了 ?. 和 ??,有效避免运行时报错。

需要人工干预的案例:

  • 2/24,AI 生成知识空间列表后,将所有代码写在单文件中,未按规范分层。通过一条追问后,AI 重构为正确结构。
  • 2/27,AI 错误地使用了 .less 后缀,但项目实际配置使用 SCSS,在收到错误提示后立即修正。
  • 出现 antd v5 废弃 API(destroyOnClose、dropdownStyle),AI 习惯于使用训练数据中更常见的旧 API,需要通过报警信息触发修正。

结论:

规范体系对 AI 的约束是有效的,但规范文件只是「约束」而非「能力」——只有「约束层」时,AI 知道不能做什么,但遇到复杂场景仍可能生成不够地道的代码;加入「示范层」和「视觉层」后,AI 有了对齐的锚点,输出质量和一致性明显提升。

八、MCP 工具:消除信息断层

在 AI 辅助前端开发中,有两类高频信息断层,在此项目中进行了接入:

接口文档断层: 接口文档在 API平台,AI 无法直接访问,只能靠用户手工复制字段,容易遗漏、版本不一致。需求文档断层: PRD、设计文档存在飞书云文档中,每次引用都需要用户打开→复制→粘贴到对话框,打断思路。

MCP 一:接口文档直连

通过该工具,AI 可以根据接口 URL 自动拉取完整接口文档——包括入参字段、出参结构、枚举值定义、必填项标注。累计被调用了 21 次,完成39个接口联调 ,覆盖了几乎所有接口的初次接入和更新迭代场景。服务端接口未生效之前,并且支持同步生成mock数据,减少后端依赖。interface.ts 类型定义质量非常高,字段注释完整,无需人工校对。

MCP 二:飞书云文档直读

通过该MCP工具,AI 可以直接读取飞书云文档的内容(PRD、设计说明、技术文档等),无需用户手工打开→复制→粘贴。

典型应用场景:

九、AI Spec Coding 经验总结

重新理解「AI 辅助编程」是什么

流行的说法是「AI 是你的 Copilot」。这个比喻在日常补全层面成立,但在 Spec Coding 实践之后,我更倾向于另一个模型:AI 是一个极度服从、无限耐心、但没有内部业务知识常识的「顶级执行者 」。

这个比喻捕捉了三个关键特征:

极度服从: AI 会一字不差地执行你写的规范,不会主动质疑「这样做合理吗」。这是优势,也是风险——规范写得越准确,执行越可靠;规范有歧义,AI 会选一个「看起来合理」的解释,而不是停下来问你。

无限耐心: 34 个任务的重构、9 组联调任务、跨会话的上下文恢复——这些在人类身上需要消耗大量意志力的事情,AI 做起来没有摩擦成本。本项目 208 次 TodoWrite 调用背后,是 AI 持续更新进度状态、从不嫌烦的特性。

没有内部业务常识: AI 不知道你们公司的部署环境是什么样的,不知道这个接口上周刚换过版本,不知道「这个交互做成这样用户会抱怨」。它只知道你告诉它的。这也是 3/4 生产构建排障花了大量时间的根本原因。

AI 的能力边界在哪里

从 10 天、2,754 次工具调用中,我们归纳出一个更精确的能力边界框架,而不是简单的「能做/不能做」:

真实项目中的并不是所有的需求都值得写一份 Spec。在真实的项目迭代中,我们需要根据需求颗粒度来选择协作模式。

小颗粒需求:对话框即扫即改

  • 场景:改个文案、修个显隐逻辑、调整 CSS 间距。
  • 策略:直接在 Cursor Chat  中对话。
  • 理由:沟通成本低于编写规范的成本,AI 的即时反馈效率最高。

中颗粒标准化需求:基于Rules 或者 Skills 预设规范生成

  • 场景:增加一个标准的 CRUD 页面、创建一个简单的业务组件。
  • 策略:利用预设的 Cursor Rules 或 Skills(如 pro-table.mdc)。
  • 理由:这类需求有强烈的“模式感”。只要规则定义清晰(如“执行流程:识别场景 -> 读取示例 -> 生成类型 -> 完成 UI”),AI 就能基于标准化模板高质量输出。

中大颗粒复杂功能:OpenSpec 深度协作

  • 场景:重构核心逻辑、新增带有复杂业务逻辑的模块、无参考代码的新功能。
  • 策略:OpenSpec 标准流 (SDD)。
  • 理由:业务逻辑复杂时,AI 极易产生幻觉或需求偏移。通过 Spec 强制进行“先设计后编码”,可以确保 AI 的每一步都在既定轨道上,且 Spec 记录了设计的决策过程,对于后期维护价值巨大。

AI 失效的三种模式

经过本项目的实践,AI Coding 的失效不是随机的,而是可归类的:

模式一:规范真空

任务涉及的领域没有规范约束,AI 自行填充「合理默认值」。

  • 表现:生成的代码功能正确,但风格/结构偏离团队约定。
  • 发生频率:高(尤其在新功能开发初期)。
  • 应对:在 CLAUDE.md 或 code-design 中补充对应规范,一次修复,全局生效。

模式二:信息孤岛

AI 掌握的信息是当前会话的快照,看不到系统外的状态。

  • 表现:本地正常,CI 失败;AI 分析每次都对,但解的都是当前暴露的问题。
  • 发生频率:低,但代价高。
  • 应对:跨平台、跨环境的依赖要在架构设计阶段提前锁定;环境差异要写成规范前置处理。

模式三:任务目标模糊

AI 把「该问人的问题」当成「执行问题」来解决。

  • 表现:用户说「优化一下首页」,AI 悄悄改了组件结构,而不是先澄清目标。
  • 发生频率:中。
  • 应对:Spec 工作流的 proposal 阶段强制要求先描述「Why」,避免 AI 自行填充目标。

开发者角色的重构

AI Coding 不是让开发者「消失」,而是让开发者的工作向上迁移:

这意味着:

规范设计能力成为 AI 时代开发者的核心竞争力——能写出让 AI 可靠执行的规范,价值比能写出同等功能代码更高。

系统性思维变得更重要——生产构建问题的排障经历说明,AI 可以帮你解决每一个局部问题,但无法帮你看到真实业务全局。

质量意识前移——过去 Code Review 在代码写完后进行,现在需要在 方案设计/任务执行 阶段就介入,而不是等 AI 执行完再纠错。

值得期待的方向

基于本项目的数据和经验,后续在以下方向可作深入探索:

规范体系的结构化积累: 每次踩坑后补充到 CLAUDE.md/rules,形成团队共享的「AI 执行约束库」。目前 7 条规范文件是手动维护的,下一步可以建立「踩坑→提炼规范→自动追加」的闭环。

MCP 工具链的纵向延伸: 本项目 MCP 仅覆盖了接口文档、飞书文档。后续针对设计稿、测试用例、发布平台、日志平台接入,可以进一步形成完整的AI Coding链路。

多 Agent 并行开发: 本项目开发过程中,发现大型任务执行等待时间较长,下一步可以尝试多Agen并发生成,同时开发不同功能模块。

一句话总结

AI Coding 的本质不仅仅是用 AI 写代码,而是用结构化的规范和工作流把不确定性消除在执行之前——AI 负责在确定性空间里高速执行,人负责维护和扩展那个确定性空间的边界。

10 天、217 条指令、2,754 次工具调用、25,546 行净增代码——这个数字背后,是一套让 AI 可以「看见」、「理解」、「遵守」团队约定的规范体系。规范是杠杆,AI 是力,Spec 工作流是支点。

本报告由claude code基于claude code 109 个真实历史会话、2,754 次工具调用记录生成,人工补充并校准,数据来源:~/.claude/projects/-Users-admin-Desktop-code-knowledge-qa/。

往期回顾

1.搜索 C++ 引擎回归能力建设:从自测到工程化准出|得物技术

2.得物社区搜推公式融合调参框架-加乘树3.0实战 

3.深入剖析Spark UI界面:参数与界面详解|得物技术

4.Sentinel Java客户端限流原理解析|得物技术

5.社区推荐重排技术:双阶段框架的实践与演进|得物技术

文 /阳凯

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

搜索 C++ 引擎回归能力建设:从自测到工程化准出|得物技术

作者 得物技术
2026年3月10日 09:58

一、为什么要做这件事

在搜索系统中, C++ 引擎长期扮演着底层核心基础设施的角色:性能敏感、逻辑复杂、变更频繁,同时承载着大规模线上流量的稳定运行。随着业务持续发展和技术架构不断演进,我们逐步意识到:在高频迭代背景下,回归能力也需要同步升级。

过去一年,我们围绕搜索 C++ 引擎展开了一次系统性的回归能力工程化建设。本文将介绍这次能力升级的背景思考、核心设计思路以及落地实践。

高频迭代背景下:回归能力需要同步升级

搜索 C++ 引擎的升级主要来自三类需求:业务功能需求、重要技术项目(有 QA 深度参与)、大量技术优化与结构性改造需求。

在实际迭代节奏中,技术优化与结构性改造类需求占比较高,引擎整体呈现出多人并行开发、持续迭代推进的状态。随着规模扩大,我们发现:现有回归环境更适用于单次项目式验证。多需求并行时,资源调度与复用能力仍有提升空间,回归准出标准尚未完全工程化。这意味着,在稳定性要求不断提升的背景下,我们有必要构建更加标准化、流程化的回归体系,让质量保障能力与迭代节奏匹配。

现有测试方式的演进空间

当前搜索引擎主要依赖两类测试手段:DIFF 测试和压测,这些手段在长期实践中发挥了重要作用,但随着业务复杂度提升,我们也逐步看到进一步优化的空间:流量获取依赖下载日志、手工上传,自动化程度仍可提升。DIFF 过程中存在自然噪音。需要更精细化处理(AA DIFF、排序不稳定)。报告与分析信息分散在不同工具中,定位效率有优化空间。多套工具并行使用,缺乏统一平台化沉淀。整体来看,测试能力更多体现为“工具能力集合”,而在流程标准化、资产沉淀与统一治理方面仍有提升空间。

二、我们要解决什么问题

这次建设的目标,并不是简单“再做一个工具”,而是希望系统性解决以下问题:让 DIFF 和压测成为搜索 C++ 引擎的标配回归能力、让回归结果具备可分析、可归因能力、让回归成为发布的硬性准出标准、保证工具本身的稳定性,不成为新风险、整体提升引擎的回归效率和交付质量、通过流程和流水线,降低对“人”的依赖。一句话总结:把回归这件事,从“靠自觉”,变成“靠系统”。

三、整体方案概览

围绕上述目标,我们将建设拆分为五个关键方向:流量录制:一次录制,多处复用。环境建设:稳定、可复用的 DIFF/ 压测环境。DIFF 工具体系:从“能跑”到“好分析”。一键压测能力:降低执行门槛。工具与索引平台集成:让回归真正被用起来。

下面将会按模块展开说明。

流量录制:回归的基础设施

为什么先做流量录制

DIFF 和压测的核心前提只有一个:真实、稳定、可复用的流量。因此我们优先建设了搜索 C++ 引擎的流量录制链路,作为后续所有测试能力的基础。

流量如何触发

  • 索引平台集群详情页直接发起流量录制。
  • 索引平台更新 ARK 配置中心中的录制配置。
  • 搜索 C++ 引擎实时监听配置变化。

录制配置设计

所有配置统一收敛在 dsearch3#test.properties,支持:

  • 全局开关。
  • 指定 app / group。
  • 截止时间。
  • 指定 IP。
  • 采样率(0~100)。

这使得录制行为可控、可回收、可精细化管理。

流量生成与存储

  • 引擎侧根据配置生成 Kafka 消息。
  • 多业务场景复用同一 ARK 集。
  • 多场景流量复用同一个 Kafka Topic。

最终流量落入 ODPS,按天分区,字段包含:

  • 请求体。
  • 流量场景。
  • 实验信息。
  • 环境信息(生产 / 预发)。

这为后续 DIFF、压测、问题复现提供了统一数据源。

流量存储字段说明:

request_type:流量标签(原C++引擎请求类型)
app_name:C++引擎appName
group_name:C++引擎groupName
request_body:录制的C++引擎请求体
env:录制的流量环境:预发/生产
graph_name:图名称
experiments:实验列表(搜索新增)
pt:ODPS分区,按天分

DIFF 测试:从无到“可归因”

DIFF 执行流程:

DIFF 的入口统一在索引平台:查询流量 →选择流量→配置参数→触发 DIFF→查看报告。底层由测试服务 + 脚本完成:流量筛选与改造、请求转发、去噪、报告生成与存储。

DIFF 对比方式:

对照组部署 master 分支,实验组部署预发布分支。指定行或者指定集群方式请求对照组和实验组环境。打开新功能开关进行响应比对,生成预期有DIFF报告。

DIFF 环境设计

支持两种模式:

  • 指定集群:对照组 / 实验组两套完整集群。
  • 指定行 精确绑定 search / rank IP。

通过该设计,保证对比的唯一变量只有代码和配置。

流量筛选与回放改造

支持多维度筛选:

  • 搜索场景(交易 / 社区 / 聚合等)。
  • 流量标签(综合 / 销量 / 新品等)。
  • 实验命中情况。

同时解决了生产流量无法直接在预发回放的问题(表名、图参数、模型等适配)。

DIFF 策略设计

我们不只关注“有没有 DIFF ”,而是关注这个 DIFF 是否符合预期,因此 DIFF 被拆为两类:

响应 DIFF
  • 响应字段对比。
  • 漏斗算子字段对比。
指标 DIFF
  • 相似度分布(忽略/不忽略排序)。
  • 漏斗算子一致率。
  • 字段增删改统计。
  • 定制化指标。

DIFF 去噪

DIFF 不可用,往往不是因为“真问题”,而是噪音太多。我们重点处理了:AA DIFF(排序不稳定、非确定性逻辑)、可忽略字段、数值微小波动、内部超时导致的异常结果,目标只有一个:让开发看到的DIFF,尽可能都是真问题。

DIFF 报告设计

报告展示

DIFF 汇总报告:

  • 应用、集群、请求接口、流量标签、路由信息、对比数量、DIFF 数量、完全一致率、query_tag 平均召回数、score 平均分等。
  • 相似度分布统计报告(不忽略排序/忽略排序)。
  • 漏斗算子一致率统计报告。
  • 字段增删改统计。

DIFF 详情报告:

  • traceId、一致率、增删改字段、请求体等。
  • 漏斗算子 DIFF 明细。
  • 响应 DIFF 明细。
报告通知

通知到群 @个人,添加报告链接。

压测:一键完成性能回归

压测执行流程:

  • 索引平台作为压力测试发起入口,查询流量->选择流量->填写压测参数->压测触发->压测记录查看。
  • 测试服务提供索引平台操作的接口能力,查询流量->流量筛选->压测文件生成->压测任务触发->压测状态更新。
  • 压测平台提供实际压测能力,启动压测任务->生成压测报告。

整个过程无需人工干预。

执行方式:

  • 对照组:master 分支。
  • 实验组:预发布分支。
  • 开启新功能开关。
  • 阶梯式加压,对比性能曲线。

压测环境设计

同 DIFF 环境建设。

压测报告设计

报告展示

压测平台报告。

报告通知

通知到群 @个人,添加报告链接。

发布流水线与准出机制

回归能力建设的最终目标,是进入发布流程。当前已完成:UT / MR 流水线初步建设,后续规划中将:把 DIFF 和压测作为发布硬性卡点、回归不通过,禁止上线、回归过程自动扩缩容,避免长期占用资源、自动生成准出报告。

四、后续规划

回归执行率 100%:解决“忘跑回归”。

准出流水线全自动化。

横向覆盖更多搜索场景(流控、商业化、国际搜索等)。

形成统一的上线 SOP 规范。

五、总结

搜索 C++ 引擎回归能力建设,并不是一次“工具升级”,而是一场工程化治理:把经验变成流程、把自觉变成约束、把风险前移到上线之前,最终目标只有一个:让搜索引擎的每一次升级,都更可控、更可信。

往期回顾

1.得物社区搜推公式融合调参框架-加乘树3.0实战

2.深入剖析Spark UI界面:参数与界面详解|得物技术 

3.Sentinel Java客户端限流原理解析|得物技术

4.社区推荐重排技术:双阶段框架的实践与演进|得物技术

5.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

文 /耿辉

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

得物社区搜推公式融合调参框架-加乘树3.0实战

作者 得物技术
2026年3月5日 10:15

一、背景简介

近年来,搜索/推荐/广告系统在粗排(Pre-ranking)与精排(Ranking)阶段的模型训练中,呈现出一个明确的趋势:从单目标优化转向多目标建模 + 多目标融合。模型目标多、融合公式复杂,给工程维护、算法迭代效率都带来了挑战。

为了明文化直白展示公式全景、方便决策调参方向,直接配公式、线上自动算(既支持精排预估目标融合、也支持业务条件boost)。我们设计并落地了加乘树调参框架。从1.0优化至3.0,我们提供了:一个调参框架(Java版、同时引擎基建同学落地了C++版)能支持不同算法环节“公式即配即用”,一个打通AB实验的一站式产品化平台,支持一站式“辅助配置->调试->开实验->变更管控”。

带来收益:无论是粗排还是精排,“训多目标、融公式” 已成为工业界标准范式。在得物社区搜索、推荐的模型迭代实践中,我们也确实走“模型多目标训练 + 融合公式调参”范式,2025在社区推荐、社区搜索落地了几十次LR(社区推荐内外流精排、粗排,社区搜索精排)、近百次加乘树推全。

二、即配即用:算法爆发的催化剂,工程稳定的绊脚石?

在算法领域,“即配即用”的工程框架多次成为推动算法快速迭代甚至“爆发式增长”的关键基础设施。面对粗、精排“多目标建模 + 多目标融合”这一建模范式,社区算法和工程提出了如下基建目标:

即配即用提人效: 实时调整配置、线上就能自动生效数学逻辑,使算法工程师从过去几天才能完成一次调参,转变为一天内可进行多次迭代,从而将精力集中在模型和融合公式本身。

全量配置+增量配置范式: 实验只配要改的几行,降低配错风险。全量配置不动,形成天然降级能力。

DSL可解释性强: 粗、精排的融合公式配置量大,数学变换复杂,容易配错。我们提供的DSL让算法同学直接写数学公式/逻辑表达式。明文公式形成策略全景,方便算法同学决策调参方向。

编译校验与降级体系筑牢稳定性防线: 即配即用+数学公式DSL的需求,给工程稳定性带来极大挑战。我们采用“编译语法校验 + 自动用全量配置降级 + 手动切换编译/解释模式”三位一体保障稳定性。

三、可信赖底座:让复杂公式配置既灵活又可靠

全量配置+增量配置范式

传统的KV、JSON 或 YAML等配置格式在面对上百行数学公式时已显乏力:一方面配置体量大、人工修改易出错且缺乏容错机制;另一方面可读性差,难以维护和审查。

我们采用“全量配置+增量配置”的设计,天然解决了使用门槛&自动降级问题:

  • 只配增量,让使用更轻松、出错更可控: 全量配置锁定为只读,确保基线稳定;算法同学只需声明需要新增或修改的增量配置(upsert)。系统在运行时将增量动态合并到全量配置中,生成最终生效的实验配置——既简化了操作;又避免了误改全局参数的风险。
  • 增量可试,基线兜底: 增量配置有误,自动回退至基线,形成天然降级机制。

给一个社区搜索主搜精排的样例:

DSL接近数学公式/逻辑表达式明文

社区搜索、社区推荐的精排融合公式,服务了“多目标融合+业务boost调权”,语义包含:数学变换、逻辑判断、自定义UDF。当算法写下一串sin(log(max(UDF(x), y))),框架能否接住?框架必须托底,正确校验与执行,杜绝“配错即崩”。

从加乘树1.0到3.0,公式解析统一选用 ANTLR。相比手搓“逆波兰表达式”或“Flex & Bison”,它基于AST校验更可靠,且Java开发门槛低。实际加乘树的配置结构里,公式按KV配置(Key 为结果名,Value 为表达式),支持跨行引用——前序公式的输出可作为后序公式的输入,形成可串联的计算链,直至得出最终结果。

  • 公式链转DAG: 在加乘树3.0中,有相互依赖关系的多行公式,被框架解析成DAG。每个item都通过这套DAG计算融合分,1个item可能有多个融合分、每棵DAG的根结点对应1个融合分。
  • AST驱动逐行校验: 每行公式都依托编译原理,校验&解析为抽象语法树(AST)。结构化的AST可支撑后续可靠计算。
  • 加乘树3.0把DAG和AST直接翻译成代码: 框架将公式链直接翻译成可执行代码,用字节码技术加载到JVM中。每个item直接计算即可。

编译校验与降级体系筑牢稳定性防线

即配即用给算法同学迭代提效带来便利,同时给工程维稳带来挑战。尤其加乘树面临的配置是可自由组合、千变万化的数学公式时,绝对不能出现“配错即崩”的情况。我们做了如下一整套安全设计:

  • 编译原理强校验: 如何应对无限组合的公式配置?加乘树选择了编译原理强校验,用了ANTLR框架,把公式校验&解析成严谨的可访问结构(AST)。
  • DAG强校验公式链: 加乘树3.0初始化阶段自动解析公式链间的依赖关系,一边将公式链解析成DAG、一边强校验。能通过校验、最终编排成DAG的公式,才会进入实际计算;不能通过校验的危险配置(漏配公式、公式配错)都会在初始化阶段就被拦截,不会进入实际计算。
  • 自动降级范式: 加乘树设计了一套自动降级范式,方便“前置拦截错误、事中有效托底、后置发出告警”。一旦有错误的实验开流量,加乘树初始化阶段就会校验出错误,当次请求忽略AB实验配置、直接用全量配置计算,并及时发出“实验配置有误”的告警。
  • 串行重算托底: 如果有“编译原理校验”、“DAG校验”没有校验出的意外怎么办?如果框架仅仅是高峰期计算超时失败了怎么办?加乘树最后一层安全托底是“用全量配置串行重算”。无论如何保证线上效果。

四、核心攻坚:加乘树3.0升级编译执行

加乘树2.0在社区搜索落地后,“每次请求3000个item、线程并发拆的多”的情况,暴露出加乘树耗CPU、耗线程的弱点。C++版加乘树替换了计算引擎,没有采用antlr visitor解释执行数学运算的方式,而是用exprtk框架、收获了更高的性能。

受C++版加乘树的启发,我们计划替换Java版加乘树的计算引擎,降CPU消耗、降执行平响。加乘树3.0变成“直接将配置翻译成代码,字节码加载,直接计算”的编译执行形态。

极致性能:配置直译硬代码,零中间损耗 + 最优 JIT

Antlr翻译&Javassist加载,直接“公式翻译成可执行代码”: 包括多行公式的依赖关系、数学计算&UDF调用,直接拉平成硬代码。硬代码执行效率最高,没有map缓存、递归调用栈等损耗。

多行公式传递中间结果,map换POJO: 每个item维护自己的缓存map,高并发put/resize,造成明显的CPU消耗、youngGC压力。本次会初始化时决策缓存POJO,避免resize、且读写更高效。

核心Javassist管理类借鉴Dubbo写法: Dubbo的ClassGenerator写法,对内存管理考虑比较完善。本次借鉴ClassGenerator,把动态生成代码收入唯一管理单例类。

性能收益

晚高峰模块平响、CPU火焰图消耗和内存分配火焰图消耗均显著降低。

典型踩坑

字节码加载不容忍语法糖:

动态生成的字节码必须严格遵循JVM 范,平时习惯手写的Java法糖是不容忍的。例如,Float a = (float) b; 在源码中合法,但若b是Double类型,该语句涉及拆箱 + 窄化转换 + 装箱,而字节码层面需显式插入doubleValue() → (float) cast → Float.valueOf() 等指令。若直接按表面类型生成字节码,将触发VerifyError。

OOM在多处需要关注:

Javassist使用不当容易OOM:Javassist 在生成和操作字节码时(如通过 CtClass),因为其缓存机制,需要开发者主动管理资源释放。每次parse字节码的CtClass要及时释放,否则高频生成字节码容易触发OOM。这一点上,加乘树参照了Dubbo的ClassGenerator写法,创建、销毁内聚在同一个类里,即用即释放。

动态生成ClassLoader/Class/Instance要能GC:Instance能GC,ClassLoader/Class能GC吗?答案是能,只有从ClassLoader -> Class -> Instance全链路都GC Root不可达了,这一串才能GC。所以用Spring的ClassLoader这类常驻ClassLoader加载动态生成类是不行的,必须用即用即弃的自定义ClassLoader,并注意全链路的强引用问题。

我们实际验证了动态生成的类确实能被GC掉。

多重护航:防止非法Java字节码引发线上问题

ASM + Javassist双重检验: 翻译生成的代码,经Javassist生成字节码后,除Javassist .toClass()的自检验,我们还让字节码过了ASM的字节码静态校验(会运行类似JVM的类型推断验证,确保每条指令执行前后,局部变量表和操作数栈的状态是类型安全的)。

沙箱加载: 我们将加乘树管理平台封装成了一个沙箱,算法同学调试公式点击“校验”,平台会用同一套SDK模拟线上全套加载流程:“AST强校验 -> DAG强校验 -> 真实翻译代码 -> Javassist & ASM 双校验 -> 反射调用构造器创建实例”,一整套无误后才往线上推配置。

线上异步加载,任何问题自动降级: “可执行代码(执行器)初始化”读写分离,新配置上线是异步刷新,刷新错误只会造成线上流量过来找不到执行器,自动降级走全量配置(并发出告警),不影响效果。

可回退解释执行: 加乘树2.0、1.0的解释执行能力十分稳定、只是性能略差,3.0可以一键回退解释执行。

加乘树管理平台:一站式配置、调试与实验平台

面向算法同学: 做了一套一站式“辅助配置->校验->实时调试->开实验->变更管控”的使用体验,告别繁琐配置、体感更丝滑。

面向系统稳定: 加乘树管理平台把自己封装成了一个沙箱,如上一个模块所述,一切风险都拦截在沙箱爆炸。

五、稳扎稳打:从1.0到3.0的演进

加乘树1.0: 支持配公式、框架直接算公式,支持UDF,解释执行。加乘树2.0: 少量性能优化,抽象成SDK。加乘树3.0: 升级为编译执行,外观简化为只需要配公式、框架自动解析DAG。

加乘树1.0和2.0都是用的解释执行,antlr visitor遍历AST做“数学/逻辑/if判断”运算。加乘树3.0升级成了编译执行,多行公式解析DAG、每行公式用antlr解析AST时,直接翻译成Java执行代码,用字节码技术把执行代码加载进JVM直接执行。同时加乘树3.0也支持降级至解释执行。

加乘树1.0

解决:落地即配即用公式,解决手搓硬代码迭代效率低、代码腐化导致生效逻辑不清晰的问题。缺陷:费线程&CPU。

加乘树1.0于2025年1月在社区推荐外流精排落地,配法(使用外观)、降级机制是后续迭代不变的:

  • 配法:1): “全量配置+叠实验改动”的配置机制 2)配置总共分 consts(输入物料)、paramBranch(条件分支替换参数)、formulas(公式)、root(融合结果字段名)。
  • 降级机制:1): 初始化阶段就检测公式配错、漏配公式等,一旦检出就自动降级走全量配置、并发出告警 2)少量运行时才能发现的问题,串行重算、降级算全量配置。

当时是从手搓硬代码做公式融合,无DIFF迁移过来,解决了如下2个迭代痛点:

  • 迭代效率: 除调参是可配,调公式形态、调生效条件等都需要开发&上线。
  • 逻辑黑盒: boost、融合公式迭代复杂之后,生效逻辑变得黑盒,不容易分析调参方向。

加乘树1.0的实现要点

纯item维度(请求维度的公式也会每个item重复计算)。consts->paramBranch->formulas串行计算。antlr解析单行公式成AST,框架递归解析树依赖,antlr visitor解释执行。

为什么用antlr

DSL语法校验: 我们需要一种配置设计,能尽可能简洁地表征模型融合公式(支持逻辑判断/复杂数学变换/UDF)——接近Java语法&数学公式的DSL(当时有对标字节的配置外观)。我们需要准确校验DSL配置正确、并正确解析DSL配置——在antlr、手搓逆波兰表达式、flex&bison里,选了用antlr校验、解析DSL(用AST校验原理可靠,Java上手难度低)。

antlr visitor解释执行: 依靠AST解析计算是一种可靠的计算逻辑。我们需要稳定靠谱的计算引擎,因为算法同学大规模使用后、会出现大量千变万化的公式组合——依靠AST解析计算是一种可靠的计算逻辑。

类SIMD设计使性能可接受: antlr解析AST非常耗时,必须一次parse多次复用,不能在item维度重复parse。一般用antlr visitor做线上实时计算,性能是不可接受的。我们采用了一种类SIMD的代码写法,使落地性能可接受——类SIMD的设计,一次antlr visitor算一批item。最终落地的性能、没有因为antlr visitor拖过多后腿,性能比旧版硬代码融合公式还要好。

antlr语法定义文件

antlr visitor如何通过访问AST计算1行公式

加乘树2.0

解决:抽象成SDK;执行计划自动识别请求维度公式、便于序融合等逻辑写UDF。缺陷:受限于解释执行,仍然比较耗线程。

加乘树2.0于2025年9月在社区搜索落地。优化点如下:

  • 使用体验: 配置json结构简化,只需要配递归的一组公式即可(砍掉了consts、paramBranch)。if()的配法简化:旧版编译器设计的简单,将 “logic表达式”与“math表达式”分别放在2个编译器里,使用者不允许if里嵌套函数,加乘树2.0合并了编译器,if()里可以嵌套函数。支持“隐式item正排”。

  • 性能: 框架自动识别Req维度的公式,全局只计算1次。执行计划加缓存,砍掉“每次请求都重新build执行计划”,平响降低。
  • 横向扩展: Java版加乘树抽象为SDK,方便扩场景直接引用。

加乘树3.0

解决:升级为编译执行,性能大幅提升。

加乘树3.0于2026年1月在社区搜索落地。之前“核心攻坚”模块有提到,高并发&计算量大的情况下,暴露出加乘树耗CPU、耗线程的弱点(类SIMD设计虽然能让性能可接受,但毕竟antlr visitor计算方式需要升级)。

加乘树3.0替换了执行引擎。我们观察火焰图发现“按公式逻辑直接裸写的java代码”性能最高效,但是迭代效率最低。加乘树为了即配即用公式,性能却打了折扣。为了平衡“即配即用”的迭代效率问题和“性能”,我们“将配置公式直接翻译成可执行代码,用字节码技术加载到JVM中直接计算”,这让加乘树从解释执行升级为编译执行。

六、还能更好

多语言 & 模块化: 加乘树有Java版,同时有C++版,是引擎同学创新实现的另一个高性能版本。支持多种业务场景及模块(如粗排、精排),可灵活接入 Java 业务引擎或 C++ 高性能引擎。欢迎其他场景和模块接入。

稳定性 & 产品化: 重点打磨“加乘树管理平台沙箱拦截 -> 线上容错降级 -> 失败监控告警发现 -> 解释执行托底” 的有效性,定期演练降级、验证算法效果。增强“加乘树管理平台”DIFF能力,扩展展示“调试DAG”、“可DIFF动态生成的代码”,打通实时debug平台,可以“DAG展开看计算的中间结果”。

多层公式组成DAG(打磨中)

配置生成的可执行代码做DIFF(建设中)

打通模型调用自动化: 在加乘树这里打通精排模型调用,对精排模型的调用也高度抽象,一配即用、一配即可加入公式融合。

往期回顾

1.深入剖析Spark UI界面:参数与界面详解|得物技术

2.Sentinel Java客户端限流原理解析|得物技术

3.社区推荐重排技术:双阶段框架的实践与演进|得物技术

4.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

5.服务拆分之旅:测试过程全揭秘|得物技术

文 /啊俊 风林 益嘉

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

Sentinel Java客户端限流原理解析|得物技术

作者 得物技术
2026年2月26日 13:33

一、从一次 HTTP 请求开始

在一个生产环境中,服务节点通常暴露了成百上千个 HTTP 接口对外提供服务。为了保证系统的稳定性,核心 HTTP 接口往往需要配置限流规则。给 HTTP 接口配置限流,可以防止突发或恶意的高并发请求耗尽服务器资源(如 CPU、内存、数据库连接等),从而避免服务崩溃或引发雪崩效应。

基础示例

假设我们有下面这样一个 HTTP 接口,需要给它配置限流规则:

@RestController
@RequiredArgsConstructor
@RequestMapping("/demo")
public class DemoController {

    @RequestMapping("/hello")
    @SentinelResource("test_sentinel")
    public String hello() {
        return "hello world";
    }
}

使用起来非常简单。首先我们可以选择给接口加上 @SentinelResource 注解(也可以不加,如果不加 Sentinel 客户端会使用请求路径作为资源名,详细原理在后面章节讲解),然后到流控控制台给该资源配置流控规则即可。

二、限流规则的加载

限流规则的生效,是从限流规则的加载开始的。聚焦到客户端的 RuleLoader 类,可以看到它支持了多种规则的加载:

  • 流控规则;
  • 集群限流规则;
  • 熔断规则;
  • ......

RuleLoader 核心逻辑

RuleLoader 类的核心作用是将这些规则加载到缓存中,方便后续使用:

public class RuleLoader {

    /**
     * 加载所有 Sentinel 规则到内存缓存
     *
     * @param sentinelRules 包含各种规则的配置对象
     */
    public static void loadRule(SentinelRules sentinelRules) {
        if (sentinelRules == null) {
            return;
        }

        // 加载流控规则
        FlowRuleManager.loadRules(sentinelRules.getFlowRules());
        // 加载集群流控规则
        RuleManager.loadClusterFlowRule(sentinelRules.getFlowRules());

        // 加载参数流控规则
        ParamFlowRuleManager.loadRules(sentinelRules.getParamFlowRules());
        // 加载参数集群流控规则
        RuleManager.loadClusterParamFlowRule(sentinelRules.getParamFlowRules());

        // 加载熔断规则
        DegradeRuleManager.loadRules(sentinelRules.getDegradeRules());

        // 加载参数熔断规则
        ParamDegradeRuleManager.loadRules(sentinelRules.getParamDegradeRules());

        // 加载系统限流规则
        SystemRuleManager.loadRules(sentinelRules.getSystemRules());
    }
}

流控规则加载详情

以流控规则的加载为例深入FlowRuleManager.loadRules 方法可以看到其完整的加载逻辑:

public static void loadRules(List<FlowRule> rules) {
    // 通过动态配置属性更新规则值
    currentProperty.updateValue(rules);
}

updateValue 方法负责通知所有监听器配置变更:

public boolean updateValue(T newValue) {
    // 如果新旧值相同,无需更新
    if (isEqual(value, newValue)) {
        return false;
    }
    RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);

    // 更新配置值
    value = newValue;
    // 通知所有监听器配置已更新
    for (PropertyListener<T> listener : listeners) {
        listener.configUpdate(newValue);
    }
    return true;
}

FlowPropertyListener 是流控规则变更的具体监听器实现:

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {

    @Override
    public void configUpdate(List<FlowRule> value) {
        // 构建流控规则映射表(按资源名分组)
        Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
        if (rules != null) {
            // 清空旧规则
            flowRules.clear();
            // 加载新规则
            flowRules.putAll(rules);
        }
        RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
    }
}

三、SentinelServletFilter 过滤器

在 Sentinel 中,所有的资源都对应一个资源名称和一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建。Entry 是限流的入口类,通过 @SentinelResource 注解的限流本质上也是通过 AOP 的方式进行了对 Entry 类的调用。

Entry 的编程范式

Entry 类的标准使用方式如下:

// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串
try (Entry entry = SphU.entry("resourceName")) {
    // 被保护的业务逻辑
    // do something here...
} catch (BlockException ex) {
    // 资源访问阻止,被限流或被降级
    // 在此处进行相应的处理操作
}

Servlet Filter 拦截逻辑

对于一个 HTTP 资源,在没有显式标注 @SentinelResource 注解的情况下,会有一个 Servlet Filter 类 SentinelServletFilter 统一进行拦截:

public class SentinelServletFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest) request;
        Entry urlEntry = null;

        try {
            // 获取并清理请求路径
            String target = FilterUtil.filterTarget(sRequest);

            // 统一 URL 清理逻辑
            // 对于 RESTful API,必须对 URL 进行清理(例如将 /foo/1 和 /foo/2 统一为 /foo/:id),
            // 否则上下文和资源的数量会超过阈值
            SentinelUrlCleaner urlCleaner = SentinelUrlCleaner.SENTINEL_URL_CLEANER;
            if (urlCleaner != null) {
                target = urlCleaner.clean(sRequest, target);
            }

            // 如果请求路径不为空且非安全扫描,则进入限流逻辑
            if (!StringUtil.isEmpty(target) && !isSecScan) {
                // 解析来源标识(用于来源限流)
                String origin = parseOrigin(sRequest);
                // 确定上下文名称
                String contextName = webContextUnify
                    ? WebServletConfig.WEB_SERVLET_CONTEXT_NAME
                    : target;

                // 使用 WEB_SERVLET_CONTEXT_NAME 作为当前 Context 的名字
                ContextUtil.enter(contextName, origin);

                // 根据配置决定是否包含 HTTP 方法
                if (httpMethodSpecify) {
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
                    // 实际进入到限流统计判断逻辑,资源名是 "方法:路径"
                    urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                } else {
                    // 实际进入到限流统计判断逻辑,资源名是请求路径
                    urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                }
            }

            // 继续执行后续过滤器
            chain.doFilter(request, response);

        } catch (BlockException e) {
            // 处理被限流的情况
            HttpServletResponse sResponse = (HttpServletResponse) response;
            // 返回限流页面或重定向到其他 URL
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);

        } catch (IOException | ServletException | RuntimeException e2) {
            // 记录异常信息用于统计
            Tracer.traceEntry(e2, urlEntry);
            throw e2;

        } finally {
            // 释放 Entry 资源
            if (urlEntry != null) {
                urlEntry.exit();
            }
            // 退出当前上下文
            ContextUtil.exit();
        }
    }
}

四、SentinelResourceAspect 切面

如果在接口上标注了 @SentinelResource 注解,还会有另外的逻辑处理。Sentinel 定义了一个单独的 AOP 切面 SentinelResourceAspect 专门用于处理注解限流。

SentinelResource 注解定义

先来看看 @SentinelResource 注解的完整定义:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {

    /**
     * Sentinel 资源的名称(即资源标识)
     * 必填项,不能为空
     */
    String value() default "";

    /**
     * 资源的入口类型(入站 IN 或出站 OUT)
     * 默认为出站(OUT)
     */
    EntryType entryType() default EntryType.OUT;

    /**
     * 资源的分类(类型)
     * 自 1.7.0 版本起支持
     */
    int resourceType() default 0;

    /**
     * 限流或熔断时调用的 block 异常处理方法的名称
     * 默认为空(即不指定)
     */
    String blockHandler() default "";

    /**
     * blockHandler 所在的类
     * 如果与原方法不在同一个类,需要指定此参数
     */
    Class<?>[] blockHandlerClass() default {};

    /**
     * 降级(fallback)方法的名称
     * 默认为空(即不指定)
     */
    String fallback() default "";

    /**
     * 用作通用的默认降级方法
     * 该方法不能接收任何参数,且返回类型需与原方法兼容
     */
    String defaultFallback() default "";

    /**
     * fallback 所在的类
     * 如果与原方法不在同一个类,需要指定此参数
     */
    Class<?>[] fallbackClass() default {};

    /**
     * 需要被追踪并触发 fallback 的异常类型列表
     * 默认为 Throwable(即所有异常都会触发 fallback)
     */
    Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};

    /**
     * 指定需要忽略的异常类型(即这些异常不会触发 fallback)
     * 注意:exceptionsToTrace 和 exceptionsToIgnore 不应同时使用;
     * 若同时存在,exceptionsToIgnore 优先级更高
     */
    Class<? extends Throwable>[] exceptionsToIgnore() default {};
}

实际使用示例

下面是一个完整的使用示例,展示了 @SentinelResource 注解的各种配置方式:

@RestController
public class SentinelController {

    @Autowired
    private ISentinelService service;

    @GetMapping(value = "/hello/{s}")
    public String apiHello(@PathVariable long s) {
        return service.hello(s);
    }
}

public interface ISentinelService {
    String hello(long s);
}

@Service
@Slf4j
public class SentinelServiceImpl implements ISentinelService {

    /**
     * Sentinel 提供了 @SentinelResource 注解用于定义资源
     *
     * @param s 输入参数
     * @return 返回结果
     */
    @Override
    // value:资源名称,必需项(不能为空)
    // blockHandler:对应处理 BlockException 的函数名称
    // fallback:用于在抛出异常的时候提供 fallback 处理逻辑
    @SentinelResource(value = "hello", blockHandler = "exceptionHandler", fallback = "helloFallback")
    public String hello(long s) {
        log.error("hello:{}", s);
        return String.format("Hello at %d", s);
    }

    /**
     * Fallback 函数
     * 函数签名与原函数一致,或加一个 Throwable 类型的参数
     */
    public String helloFallback(long s) {
        log.error("helloFallback:{}", s);
        return String.format("Halooooo %d", s);
    }

    /**
     * Block 异常处理函数
     * 参数最后多一个 BlockException,其余与原函数一致
     */
    public String exceptionHandler(long s, BlockException ex) {
        // Do some log here.
        log.error("exceptionHandler:{}", s);
        ex.printStackTrace();
        return "Oops, error occurred at " + s;
    }
}

SentinelResourceAspect 核心逻辑

@SentinelResource 注解由 SentinelResourceAspect 切面处理,核心逻辑如下:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        // 获取目标方法
        Method originMethod = resolveMethod(pjp);

        // 获取注解信息
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }

        // 获取资源配置信息
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();

        Entry entry = null;
        try {
            // 创建限流入口
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 执行原方法
            Object result = pjp.proceed();
            return result;

        } catch (BlockException ex) {
            // 处理被限流异常
            return handleBlockException(pjp, annotation, ex);

        } catch (Throwable ex) {
            // 处理业务异常
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // 优先检查忽略列表
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            // 检查异常是否在追踪列表中
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                // 执行 fallback 逻辑
                return handleFallback(pjp, annotation, ex);
            }

            // 没有 fallback 函数可以处理该异常,直接抛出
            throw ex;

        } finally {
            // 释放 Entry 资源
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }

    /**
     * 处理 BlockException
     *
     * blockHandler / blockHandlerClass 说明:
     * - blockHandler:对应处理 BlockException 的函数名称,可选项
     * - blockHandler 函数签名:与原方法相匹配并且最后加一个额外的参数,类型为 BlockException
     * - blockHandler 函数默认需要和原方法在同一个类中
     * - 若希望使用其他类的函数,则可以指定 blockHandlerClass 为对应的类的 Class 对象
     * - 注意:blockHandlerClass 中对应的函数必须为 static 函数,否则无法解析
     */
    protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
            throws Throwable {

        // 执行 blockHandler 方法(如果配置了的话)
        Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
                annotation.blockHandlerClass());

        if (blockHandlerMethod != null) {
            Object[] originArgs = pjp.getArgs();
            // 构造参数:原方法参数 + BlockException
            Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
            args[args.length - 1] = ex;

            try {
                // 根据 static 方法与否进行不同的调用
                if (isStatic(blockHandlerMethod)) {
                    return blockHandlerMethod.invoke(null, args);
                }
                return blockHandlerMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // 抛出实际的异常
                throw e.getTargetException();
            }
        }

        // 如果没有 blockHandler,则尝试执行 fallback
        return handleFallback(pjp, annotation, ex);
    }

    /**
     * 处理 Fallback 逻辑
     *
     * fallback / fallbackClass 说明:
     * - fallback:fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑
     * - fallback 函数可以针对所有类型的异常(除了 exceptionsToIgnore 里面排除掉的异常类型)进行处理
     *
     * fallback 函数签名和位置要求:
     * - 返回值类型必须与原函数返回值类型一致
     * - 方法参数列表需要和原函数一致,或者可以额外多一个 Throwable 类型的参数用于接收对应的异常
     * - fallback 函数默认需要和原方法在同一个类中
     * - 若希望使用其他类的函数,则可以指定 fallbackClass 为对应的类的 Class 对象
     * - 注意:fallbackClass 中对应的函数必须为 static 函数,否则无法解析
     */
    protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
                                    Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        Object[] originArgs = pjp.getArgs();

        // 执行 fallback 函数(如果配置了的话)
        Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);

        if (fallbackMethod != null) {
            // 构造参数:根据 fallback 方法的参数数量决定是否添加异常参数
            int paramCount = fallbackMethod.getParameterTypes().length;
            Object[] args;
            if (paramCount == originArgs.length) {
                args = originArgs;
            } else {
                args = Arrays.copyOf(originArgs, originArgs.length + 1);
                args[args.length - 1] = ex;
            }

            try {
                // 根据 static 方法与否进行不同的调用
                if (isStatic(fallbackMethod)) {
                    return fallbackMethod.invoke(null, args);
                }
                return fallbackMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // 抛出实际的异常
                throw e.getTargetException();
            }
        }

        // 如果没有 fallback,尝试使用 defaultFallback
        return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
    }
}

五、流控处理核心逻辑

从入口函数开始,我们深入到流控处理的核心逻辑。

入口函数调用链

public class SphU {

    /**
     * 创建限流入口
     *
     * @param name 资源名称
     * @param resourceType 资源类型
     * @param trafficType 流量类型(IN 或 OUT)
     * @param args 参数数组
     * @return Entry 对象
     * @throws BlockException 如果被限流则抛出此异常
     */
    public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
            throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
    }

    public static Entry entry(String name, EntryType trafficType, int batchCount) throws BlockException {
        return Env.sph.entry(name, trafficType, batchCount, OBJECTS0);
    }
}
public class CtSph implements Sph {

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }

    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }

    /**
     * 带优先级的入口方法,这是限流的核心逻辑
     */
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
        Context context = ContextUtil.getContext();

        // 如果上下文数量超过阈值,则不进行规则检查
        if (context instanceof NullContext) {
            // NullContext 表示上下文数量超过了阈值,这里只初始化 Entry,不进行规则检查
            return new CtEntry(resourceWrapper, null, context);
        }

        // 如果没有上下文,使用默认上下文
        if (context == null) {
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // 如果全局开关关闭,则不进行规则检查
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 获取或创建 ProcessorSlotChain(责任链)
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * 如果资源(slot chain)数量超过 {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * 则不进行规则检查
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 创建 Entry 对象
        Entry e = new CtEntry(resourceWrapper, chain, context);

        try {
            // 执行责任链进行规则检查
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            // 如果被限流,释放 Entry 并抛出异常
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // 这不应该发生,除非 Sentinel 内部存在错误
            log.warn("Sentinel unexpected exception,{}", e1.getMessage());
        }
        return e;
    }
}

ProcessorSlotChain 功能插槽链

lookProcessChain 方法实际创建了 ProcessorSlotChain 功能插槽链。ProcessorSlotChain 采用责任链模式,将不同的功能(限流、降级、系统保护)组合在一起。

SlotChain 的获取与创建

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 先从缓存中获取
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);

    if (chain == null) {
        // 双重检查锁,保证线程安全
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry 大小限制
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                // 创建新的 SlotChain
                chain = SlotChainProvider.newSlotChain();

                // 使用不可变模式更新缓存
                Map<ResourceWrapper, ProcessorSlotChain> newMap =
                    new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

SlotChain 的构建

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // 通过 SPI 加载所有 ProcessorSlot 并排序
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);

        for (ProcessorSlot slot : sortedSlotList) {
            // 只处理继承自 AbstractLinkedProcessorSlot 的 Slot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() +
                    ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            // 将 Slot 添加到责任链尾部
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

SlotChain 的功能划分

Slot Chain 可以分为两部分:

  • 统计数据构建部分(statistic):负责收集各种指标数据;
  • 判断部分(rule checking):根据规则判断是否限流。

官方架构图很好地解释了各个 Slot 的作用及其负责的部分。目前 ProcessorSlotChain 的设计是一个资源对应一个,构建好后缓存起来,方便下次直接取用。

各 Slot 的执行顺序

以下是 Sentinel 中各个 Slot 的默认执行顺序:

NodeSelectorSlot
    ↓
ClusterBuilderSlot
    ↓
StatisticSlot
    ↓
ParamFlowSlot
    ↓
SystemSlot
    ↓
AuthoritySlot
    ↓
FlowSlot
    ↓
DegradeSlot

NodeSelectorSlot - 上下文节点选择

这个功能插槽主要为资源下不同的上下文创建对应的 DefaultNode(实际用于统计指标信息)。解释一下Sentinel中的Node是什么,简单来说就是每个资源统计指标存放的容器,只不过内部由于不同的统计口径(秒级、分钟及)而分别有不同的统计窗口。Node在Sentinel不是单一的结构,而是总体上形成父子关系的树形结构。

不同的调用会有不同的 context 名称,如在当前 MVC 场景下,上下文为 sentinel_web_servlet_context。

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * 同一个资源在不同上下文中的 DefaultNode 映射
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 从映射表中获取当前上下文对应的节点
        DefaultNode node = map.get(context.getName());

        if (node == null) {
            // 双重检查锁,保证线程安全
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    // 创建新的 DefaultNode
                    node = new DefaultNode(resourceWrapper, null);

                    // 使用写时复制更新缓存
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;

                    // 构建调用树
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }

        // 设置当前上下文的当前节点
        context.setCurNode(node);
        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

ClusterBuilderSlot - 集群节点构建

这个功能槽主要用于创建 ClusterNode。ClusterNode 和 DefaultNode 的区别是:

DefaultNode 是特定于上下文的(context-specific);

ClusterNode 是不区分上下文的(context-independent),用于统计该资源在所有上下文中的整体数据。

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * 全局 ClusterNode 映射表
     */
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 创建 ClusterNode(如果不存在)
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // 创建集群节点
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());

                    // 更新全局映射表
                    HashMap<ResourceWrapper, ClusterNode> newMap =
                        new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }

        // 将 ClusterNode 设置到 DefaultNode 中
        node.setClusterNode(clusterNode);

        // 如果有来源标识,则创建 origin node
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

StatisticSlot - 统计插槽

StatisticSlot 是 Sentinel 最重要的类之一,用于根据规则判断结果进行相应的统计操作。

统计逻辑说明

entry 的时候:

依次执行后续的判断 Slot;

每个 Slot 触发流控会抛出异常(BlockException 的子类);

若有 BlockException 抛出,则记录 block 数据;

若无异常抛出则算作可通过(pass),记录 pass 数据。

exit 的时候:

若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数 -1。

记录数据的维度:

线程数 +1;

记录当前 DefaultNode 数据;

记录对应的 originNode 数据(若存在 origin);

累计 IN 统计数据(若流量类型为 IN)。

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 此位置会调用 SlotChain 中后续的所有 Slot,完成所有规则检测
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 请求通过,增加线程数和通过数
            // 代码运行到这个位置,就证明之前的所有 Slot 检测都通过了
            // 此时就可以统计请求的相应数据了

            // 增加线程数(+1)
            node.increaseThreadNum();
            // 增加通过请求的数量(这里涉及到滑动窗口算法)
            node.addPassRequest(count);

            // 省略其他统计逻辑...

        } catch (PriorityWaitException ex) {
            // 如果是优先级等待异常,记录优先级等待数
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // 记录入站统计数据
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            throw ex;

        } catch (BlockException e) {
            // 如果被限流,记录被限流数
            // 省略 block 统计逻辑...
            throw e;

        } catch (Throwable ex) {
            // 如果发生业务异常,记录异常数
            // 省略异常统计逻辑...
            throw ex;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1
        // 记录数据的维度:线程数+1、记录当前 DefaultNode 数据、记录对应的 originNode 数据(若存在 origin)
        // 、累计 IN 统计数据(若流量类型为 IN)
        // 省略 exit 统计逻辑...
    }
}

StatisticNode 数据结构

到这里,StatisticSlot 的作用已经比较清晰了。接下来我们需要分析它的统计数据结构。fireEntry 调用向下的节点和之前的方式一样,剩下的节点主要包括:

  • ParamFlowSlot;
  • SystemSlot;
  • AuthoritySlot;
  • FlowSlot;
  • DegradeSlot;

其中比较常见的是流控和熔断:FlowSlot、DegradeSlot,所以下面我们着重分析 FlowSlot。

六、FlowSlot - 流控插槽

这个 Slot 主要根据预设的资源的统计信息,按照固定的次序依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止。

FlowSlot 核心逻辑

@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 执行流控检查
        checkFlow(resourceWrapper, context, node, count, prioritized);

        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    // 省略其他方法...
}

checkFlow 方法详解

/**
 * 执行流控检查
 *
 * @param ruleProvider 规则提供者函数
 * @param resource 资源包装器
 * @param context 上下文
 * @param node 节点
 * @param count 请求数量
 * @param prioritized 是否优先
 * @throws BlockException 如果被限流则抛出异常
 */
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    // 判断规则和资源不能为空
    if (ruleProvider == null || resource == null) {
        return;
    }

    // 获取指定资源的所有流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());

    // 逐个应用流控规则。若无法通过则抛出异常,后续规则不再应用
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                // FlowException 继承 BlockException
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

通过这里我们就可以得知,流控规则是通过 FlowRule 来完成的,数据来源是我们使用的流控控制台,也可以通过代码进行设置。

FlowRule 流控规则

每条流控规则主要由三个要素构成:

  • grade(阈值类型):按 QPS(每秒请求数)还是线程数进行限流;
  • strategy(调用关系策略):基于调用关系的流控策略;
  • controlBehavior(流控效果):当 QPS 超过阈值时的流量整形行为。
public class FlowRule extends AbstractRule {

    public FlowRule() {
        super();
        // 来源默认 Default
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    public FlowRule(String resourceName) {
        super();
        // 资源名称
        setResource(resourceName);
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    /**
     * 流控的阈值类型
     * 0: 线程数
     * 1: QPS
     */
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     * 流控阈值
     */
    private double count;

    /**
     * 基于调用链的流控策略
     * STRATEGY_DIRECT: 直接流控(按来源)
     * STRATEGY_RELATE: 关联流控(关联资源)
     * STRATEGY_CHAIN: 链路流控(按入口资源)
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * 关联流控模式下的关联资源
     */
    private String refResource;

    /**
     * 流控效果(流量整形行为)
     * 0: 默认(直接拒绝)
     * 1: 预热(Warm Up)
     * 2: 排队等待(Rate Limiter)
     * 3: 预热 + 排队等待(目前控制台没有)
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    /**
     * 预热时长(秒)
     */
    private int warmUpPeriodSec = 10;

    /**
     * 排队等待的最大超时时间(毫秒)
     */
    private int maxQueueingTimeMs = 500;

    /**
     * 是否为集群模式
     */
    private boolean clusterMode;

    /**
     * 集群模式配置
     */
    private ClusterFlowConfig clusterConfig;

    /**
     * 流量整形控制器
     */
    private TrafficShapingController controller;

    // 省略 getter/setter 方法...
}

七、滑动窗口算法

不管流控规则采用何种流控算法,在底层都需要有支持指标统计的数据结构作为支撑。在 Sentinel 中,用于支撑基于 QPS 等限流的数据结构是 StatisticNode。

StatisticNode 数据结构

public class StatisticNode implements Node {

    /**
     * 保存最近 1 秒内的统计数据
     * 每个桶(bucket)500ms,共 2 个桶
     */
    private transient volatile Metric rollingCounterInSecond =
        new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);

    /**
     * 保存最近 60 秒的统计数据
     * windowLengthInMs 被特意设置为 1000 毫秒,即每个桶代表 1 秒
     * 共 60 个桶,这样可以获得每秒精确的统计信息
     */
    private transient Metric rollingCounterInMinute =
        new ArrayMetric(60, 60 * 1000, false);

    // 省略其他字段和方法...
}

ArrayMetric 核心实现

ArrayMetric 是 Sentinel 中数据采集的核心,内部使用了 BucketLeapArray,即滑动窗口的思想进行数据的采集。

public class ArrayMetric implements Metric {

    /**
     * 滑动窗口数组
     */
    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            // 可抢占的滑动窗口,支持借用未来窗口的配额
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            // 普通滑动窗口
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
}

这里有两种实现:

  • BucketLeapArray:普通滑动窗口,每个时间桶仅记录固定时间窗口内的指标数据;
  • OccupiableBucketLeapArray:扩展实现,支持"抢占"未来时间窗口的令牌或容量,在流量突发时允许借用后续窗口的配额,实现更平滑的限流效果。

BucketLeapArray - 滑动窗口实现

LeapArray 核心属性

LeapArray 是滑动窗口的基础类,其核心属性如下:

/**
 * 窗口大小(长度),单位:毫秒
 * 例如:1000ms
 */
private int windowLengthInMs;

/**
 * 样本数(桶的数量)
 * 例如:5(表示 5 个桶,每个 1000ms,总共 5 秒)
 */
private int sampleCount;

/**
 * 采集周期(总时间窗口长度),单位:毫秒
 * 例如:5 * 1000ms(5 秒)
 */
private int intervalInMs;

/**
 * 窗口数组,array 长度就是样本数 sampleCount
 */
protected final AtomicReferenceArray<WindowWrap<T>> array;

/**
 * 更新窗口数据的锁,保证数据的正确性
 */
private final ReentrantLock updateLock;

WindowWrap 窗口包装器

每个窗口包装器包含三个属性:

 public class WindowWrap<T> {

    /**
     * 窗口大小(长度),单位:毫秒
     * 与 LeapArray 中的 windowLengthInMs 一致
     */
    private final long windowLengthInMs;

    /**
     * 窗口开始时间戳
     * 它的值是 windowLengthInMs 的整数倍
     */
    private long windowStart;

    /**
     * 窗口数据(泛型 T)
     * Sentinel 目前只有 MetricBucket 类型,存储统计数据
     */
    private T value;
}

MetricBucket 指标桶

public class MetricBucket {

    /**
     * 计数器数组
     * 长度是需要统计的事件种类数,目前是 6 个
     * LongAdder 是线程安全的计数器,性能优于 AtomicLong
     */
    private final LongAdder[] counters;
    
    // 省略其他字段和方法...
}

滑动窗口工作原理

LeapArray 统计数据的基本思路:

创建一个长度为 n 的数组,数组元素就是窗口;

每个窗口包装了 1 个指标桶,桶中存放了该窗口时间范围内对应的请求统计数据;

可以想象成一个环形数组在时间轴上向右滚动;

请求到达时,会命中数组中的一个窗口,该请求的数据就会存到命中的这个窗口包含的指标桶中;

当数组转满一圈时,会回到数组的开头;

此时下标为 0 的元素需要重复使用,它里面的窗口数据过期了,需要重置,然后再使用。

获取当前窗口

LeapArray 获取当前时间窗口的方法:

 /**
 * 获取当前时间戳对应的窗口
 *
 * @return 当前时间的窗口
 */
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}

/**
 * 获取指定时间戳对应的窗口(核心方法)
 *
 * @param timeMillis 时间戳(毫秒)
 * @return 对应的窗口
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 计算数组下标
    int idx = calculateTimeIdx(timeMillis);

    // 计算当前请求对应的窗口开始时间
    long windowStart = calculateWindowStart(timeMillis);

    // 无限循环,确保能够获取到窗口
    while (true) {
        // 取窗口
        WindowWrap<T> old = array.get(idx);

        if (old == null) {
            // 第一次使用,创建新窗口
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

            // CAS 操作,确保只初始化一次
            if (array.compareAndSet(idx, null, window)) {
                // 成功更新,返回创建的窗口
                return window;
            } else {
                // CAS 失败,让出时间片,等待其他线程完成初始化
                Thread.yield();
            }

        } else if (windowStart == old.windowStart()) {
            // 命中:取出的窗口的开始时间和本次请求计算出的窗口开始时间一致
            return old;

        } else if (windowStart > old.windowStart()) {
            // 窗口过期:本次请求计算出的窗口开始时间大于取出的窗口
            // 说明取出的窗口过期了,需要重置
            if (updateLock.tryLock()) {
                try {
                    // 成功获取锁,更新窗口开始时间,计数器重置
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // 获取锁失败,让出时间片,等待其他线程更新
                Thread.yield();
            }

        } else if (windowStart < old.windowStart()) {
            // 异常情况:机器时钟回拨等
            // 正常情况不会进入该分支
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

数据存储

在获取到窗口之后,就可以存储数据了。ArrayMetric 实现了 Metric 中存取数据的接口方法。

示例:存储 RT(响应时间)

/**
 * 添加响应时间数据
 *
 * @param rt 响应时间(毫秒)
 */
public void addRT(long rt) {
    // 获取当前时间窗口,data 为 BucketLeapArray
    WindowWrap<MetricBucket> wrap = data.currentWindow();

    // 计数
    wrap.value().addRT(rt);
}

/**
 * MetricBucket 的 addRT 方法
 *
 * @param rt 响应时间
 */
public void addRT(long rt) {
    // 记录 RT 时间对 rt 值
    add(MetricEvent.RT, rt);

    // 记录最小响应时间(非线程安全,但没关系)
    if (rt < minRt) {
        minRt = rt;
    }
}

/**
 * 通用的计数方法
 *
 * @param event 事件类型
 * @param n 增加的数量
 * @return 当前桶
 */
public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}

数据读取

示例:读取 RT(响应时间)

/**
 * 获取总响应时间
 *
 * @return 总响应时间
 */
public long rt() {
    // 触发当前窗口更新(处理过期窗口)
    data.currentWindow();

    long rt = 0;
    // 取出所有的 bucket
    List<MetricBucket> list = data.values();

    for (MetricBucket window : list) {
        rt += window.rt(); // 求和
    }
    return rt;
}

/**
 * 获取所有有效的窗口
 *
 * @return 有效窗口列表
 */
public List<T> values() {
    return values(TimeUtil.currentTimeMillis());
}

/**
 * 获取指定时间之前的所有有效窗口
 *
 * @param timeMillis 时间戳
 * @return 有效窗口列表
 */
public List<T> values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList<T>(); // 正常情况不会到这里
    }

    int size = array.length();
    List<T> result = new ArrayList<T>(size);

    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);

        // 过滤掉没有初始化过的窗口和过期的窗口
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }

        result.add(windowWrap.value());
    }
    return result;
}

/**
 * 判断窗口是否过期
 *
 * @param time 给定时间(通常是当前时间)
 * @param windowWrap 窗口包装器
 * @return 如果过期返回 true
 */
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    // 给定时间与窗口开始时间超过了一个采集周期
    return time - windowWrap.windowStart() > intervalInMs;
}

OccupiableBucketLeapArray - 可抢占窗口

为什么需要 OccupiableBucketLeapArray?

假设一个资源的访问 QPS 稳定是 10,请求是均匀分布的:

在时间 0.0-1.0 秒区间中,通过了 10 个请求;

在 1.1 秒的时候,观察到的 QPS 可能只有 5,因为此时第一个时间窗口被重置了,只有第二个时间窗口有值;

当在秒级统计的情形下,用 BucketLeapArray 会有 0~50%的数据误这时就要用 OccupiableBucketLeapArray 来解决这个问题。

OccupiableBucketLeapArray 实现

从上面我们可以看到在秒级统计 rollingCounterInSecond 中,初始化实例时有两种构造参数:

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    /**
     * 借用未来窗口的数组
     */
    private final FutureBucketLeapArray borrowArray;

    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        // 创建借用窗口数组
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

    /**
     * 创建新的空桶
     * 会从 borrowArray 中借用数据
     */
    @Override
    public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();

        // 获取借用窗口的数据
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            // 将借用数据复制到新桶中
            newBucket.reset(borrowBucket);
        }

        return newBucket;
    }

    /**
     * 重置窗口
     * 会从 borrowArray 中借用 pass 数据
     */
    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // 更新开始时间并重置值
        w.resetTo(time);

        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            // 重置桶值并添加借用的 pass 数据
            w.value().reset();
            w.value().addPass((int) borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

    /**
     * 获取当前等待中的请求数量
     */
    @Override
    public long currentWaiting() {
        borrowArray.currentWindow();
        long currentWaiting = 0;
        List<MetricBucket> list = borrowArray.values();

        for (MetricBucket window : list) {
            currentWaiting += window.pass();
        }
        return currentWaiting;
    }

    /**
     * 添加等待中的请求数量
     *
     * @param time 时间
     * @param acquireCount 获取数量
     */
    @Override
    public void addWaiting(long time, int acquireCount) {
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
    }
}

八、总结

至此,Sentinel 的基本情况都已经分析完成。以上内容主要讲解了 Sentinel 的核心处理流程,包括:

核心流程总结

  1. 规则加载:
  • 通过 RuleLoader 将各种规则(流控、熔断、系统限流等)加载到内存缓存中。
  1. 请求拦截:
  • 通过 SentinelServletFilter 过滤器拦截 HTTP 请求;
  • 通过SentinelResourceAspect切面处理 @SentinelResource 注解。
  1. 责任链处理:
  • 使用 ProcessorSlotChain 责任链模式组合多个功能插槽;
  • 每个插槽负责特定的功能(统计、流控、熔断等)。
  1. 流控判断:
  • FlowSlot 根据流控规则判断是否限流;
  • 通过滑动窗口算法统计 QPS、线程数等指标。
  1. 异常处理:
  • 被限流时抛出 BlockException;
  • 通过 blockHandler 或 fallback 处理异常。

核心技术点

  1. 责任链模式:
  • 通过 ProcessorSlotChain 将不同的限流功能组合在一起。
  1. 滑动窗口算法:
  • LeapArray 实现环形滑动窗口;
  • BucketLeapArray 普通滑动窗口;
  • OccupiableBucketLeapArray 可抢占窗口,支持借用未来配额。
  1. 数据结构:
  • DefaultNode:特定于上下文的统计节点;
  • ClusterNode:不区分上下文的集群统计节点;
  • StatisticNode:核心统计节点,包含秒级和分钟级统计。
  1. 限流算法:
  • QPS 限流:通过滑动窗口统计 QPS;
  • 线程数限流:通过原子计数器统计线程数;
  • 流控效果:快速失败、预热、排队等待等;

Sentinel 通过精心设计的架构,实现了高效、灵活、可扩展的流量控制能力,为微服务系统提供了强大的保护机制。

往期回顾

1.社区推荐重排技术:双阶段框架的实践与演进|得物技术

2.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

3.服务拆分之旅:测试过程全揭秘|得物技术

4.大模型网关:大模型时代的智能交通枢纽|得物技术

5.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

文 /万钧

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

社区推荐重排技术:双阶段框架的实践与演进|得物技术

作者 得物技术
2026年2月12日 13:48

一、背景

推荐系统典型pipeline

在推荐系统多阶段Pipeline(召回→粗排→精排→重排)中,重排作为最终决策环节,承担着将精排输出的有限候选集(通常为Top 100–500个Item)转化为最优序列的关键职责。数学定义为在给定候选集C={x1,x2,,xn}C = \lbrace x_1,x_2,……,x_n \rbrace与目标列表长度LL,重排的目标是寻找一个排列πP(C,L)\pi^* \in P(C,L),使得全局收益函数最大化。

在推荐系统、搜索排序等AI领域,Pointwise 建模是精排阶段的核心方法,即对每个 Item 独立打分后排序,pointwise 建模范式面临挑战:

  • 多样性约束:精排按 item 独立打分排序 → 高分 item 往往语义/类目高度同质(如5个相似短视频连续曝光)。
  • 位置偏差:用户注意力随位置显著衰减,且不同item对位置敏感度不同。
  • 上下文建模:用户决策是序列行为,而非独立事件。

二、重排架构演进:生成式模型实践

我们的重排系统采用G-E两阶段协同框架:

  • 生成阶段(Generation):高效生成若干高质量候选排列。
  • 评估阶段(Evaluation):对候选排列进行精细化打分,选出全局最优结果。

不考虑算力和耗时的情况下,通过穷举所有排列P(C,L)P(C,L)

生成阶段主要依赖启发式规则、随机扰动 + beamSearch算法生成候选list,双阶段范式存在显著的痛点:

  • 质量-延迟-多样性的“不可能三角”:在实践中,增加生成候选list数一般可以提升最终list的质量,但边际收益递减;优化过程中,我们通过增加多目标、多样性等策略都取得了消费指标的提升,但在候选list达到百量级时,单纯增加候选集对指标的提升,同时还有:
  1. 增加beam width,系统耗时增加,DCG@K提升逐渐减少。

  2. 增加通道数,通道间重叠度逐渐增加,去重list增加逐渐减少。

  • 阶段间目标不一致:
  1. 分布偏移:启发式生成Beam Search输出的Top排列中,20%被评估模型否定,生成阶段搜索效率浪费。

  2. 梯度断层:Beam Search含argmax操作,双阶段无法端到端优化;生成模型无法感知评估反馈,优化方向偏离全局最优。

生成模型优化

生成分为启发式方法和生成式模型方法, 一般认为生成式模型方法要好于启发式方法。生成式模型逐渐成为重排主流范式,主要分为两类:自回归生成模型、非自回归生成模型。

  • 自回归生成:按位置顺序逐个生成物品,第 t 位的预测依赖前 t-1 位已生成结果。
  1. 优点:

a. 序列依赖建模强,天然捕获物品间的顺序依赖。

b. 训练简单稳定,每步使用真实前序作为输入,收敛快。

c. 生成质量高,逐步细化决策,适合长序列精细优化。

  1. 缺点:

a. 推理延迟高,生成 L 个物品需 L 次前向传播,线上服务难以满足毫秒级要求。

b. 局部最优风险,早期错误决策无法回溯修正,影响整体序列质量。

  • 非自回归生成:一次性预测整个推荐序列的所有位置,各位置预测相互独立。
  1. 优点:

a. 推理速度极快:生成整个序列仅需1次前向传播。

2.缺点:

b.条件独立性假设过强:各位置并行预测,难以显式建模物品间复杂依赖关系。

非自回归模型

为了对齐双阶段一致性,同时考虑线上性能,我们推进了非自回归模型的上线。模型结构如下图:

模型包括Candidates Encoder和Position Encoder,Candidates Encoder是标准的Transformer结构, 用于获取item间的交互信息;Position Encoder额外增加了Cross Attention,期望Position序列同时关注Candidate序列。

  • 模型特征:用户信息、item特征、位置信息、上游精排打分特征。
  • 模型输出:一次性输出 n×L 的位置-物品得分矩阵(n 为候选 item 数,L 为目标列表长度),支持高效并行推理
p^ij=exp(xitj)i=1nexp(xitj)\hat{p}_{ij} = \frac{\exp(\mathbf{x}_i^\top \mathbf{t}_j)}{\sum_{i=1}^n \exp(\mathbf{x}_i^\top \mathbf{t}_j)}
  • 位置感知建模:引入可学习位置嵌入,显式建模“同一 item 在不同位置表现不同”的现象(如首屏效应、位置衰减)。
  • 训练目标:模型使用logloss,让正反馈label序列的生成概率最大, 同时负反馈label序列的生成概率最小:
Llog=i[pijyilog(pij^)+pij(1yi)log(1pij^)]\mathcal{L}_{\log} = -\sum_{i} \big[ p_{ij}y_i \log(\hat{p_{ij}}) + p_{ij}(1-y_i) \log(1-\hat{p_{ij}}) \big]

其中,pijp_{ij}表示位置i上是否展示物品j,yiy_{i}表示位置i上的label。

线上实验及收益:

  • 一期新增了非自回归生成通道,pvctr +0.6%,时长+0.55%。
  • 二期在所有通道排序因子中bagging非自回归模型,pvctr +1.0%,时长+1.13%。

自回归模型

由于条件独立性假设, 非自回归模型对上下文信息建模是不够的,近期我们重点推进了自回归模型的开发。

模型通过Transformer架构建模list整体收益,我们使用单向transformer模拟用户浏览行为的因果性,同时解决自回归生成的暴露偏差问题,保持训练和推理的一致性。结构如下:

  • 模型特征:用户信息、item特征、位置信息、上游精排打分特征。
  • 训练目标:模型使用有序回归loss,在评估多个回合中不同长度的子列表时,能够很好地体现出序列中的增量价值。是用于判断长度为j的子列表是否已经达到i次点击或转化的损失函数。
Li,j(θj)=k=1N([yk<i]log(1pi,j(xk))+[yki]log(pi,j(xk)))L_{i,j}(\theta_j) = -\sum_{k=1}^{N} \left([y_k < i]\log(1-p_{i,j}(x_k)) + [y_k \geq i]\log(p_{i,j}(x_k))\right)

线上模型推理效率优化及实验效果:

自回归生成模型推理延迟高,生成 L 个物品需 L 次前向传播,线上服务难以满足毫秒级要求。因此,我们在传统自回归生成模型的基础上增加MTP(multi token prediction)结构,突破生成式重排模型推理瓶颈。其核心思想是将传统自回归的单步预测扩展为单步多token联合预测,显著减少生成迭代次数。

自回归生成模型在社区推荐已完成了推全,实验中我们新增了自回归生成模型通道,但不是完全体,仅部分位置生成调用了模型:

  • 一期调用两次模型,每次预测4个位置,pvctr +0.69%,有效vv +0.58%。
  • 二期调用两次模型,每次预测5个位置,pvctr +0.54%,有效vv +0.40%。

三、推理性能优化:端到端生成的效率保障

工程架构

为解决CPU推理模型延迟高、制约业务效果的问题,我们对DScatter模型服务进行升级,引入高性能GPU推理能力,具体方案如下:

  • GPU推理框架集成与升级:
  1. 框架升级:将现有依赖的推理框架升级为支持GPU的高性能服务框架。

  2. 硬件资源引入:引入 NVIDIA L20 等专业推理显卡,为当前的listwise评估模型及自回归生成模型提供专用算力,实现模型推理的硬件加速。

  • DScatter模型服务独立部署与容量提升:
  1. 为解决模型部署效率低与资源竞争问题,将DScatter的模型打分逻辑从现有重排服务中完全解耦,构建并部署独立的 DScatter-Model-Server 集群,从根本上消除与重排服务在CPU、内存等关键资源上的竞争。

模型优化

  • 模型格式转换与加速:

导出为 ONNX 格式,使用 TensorRT 进行量化、层融合、动态张量显存等技术加速推理。

  • Item Embedding缓存:

预计算item静态网络,线上直接查询节省计算量。

  • 自回归生成模型核心优化,KV Cache 复用:

缓存已生成token的KV和attention值,仅计算增量token相关值,避免重复计算。

  • 其他LLM推理加速技术应用落地,例如GQA

四、未来规划:迈向端到端序列生成的下一代重排架构

当前“生成-评估”双阶段范式虽在工程落地性上取得平衡,但其本质仍是局部优化:生成阶段依赖启发式规则或浅层模型生成候选,评估阶段虽能识别优质序列,却无法反向指导生成过程,导致系统能力存在理论上限。为突破这一瓶颈,我们规划构建端到端序列生成(End-to-End Sequence Generation) 架构,将重排从“候选筛选”升级为“序列创造”,直接以全局业务目标(如用户停留时长、互动深度、内容生态健康度)为优化目标。

核心架构设计:

  • 统一生成器:以 Transformer 为基础架构,搭建自回归序列建模能力,采用分层混合生成策略:
  1. 粗粒度并行生成:首层预测序列骨架(如类目分布、内容密度)等。

  2. 细粒度自回归精调:在骨架约束下,自回归生成具体 item,确保局部最优。

  • 序列级Reward Modeling:
  1. 构建多目标 reward 函数:xtr、多样性。

  2. Engagement:基于用户滑动轨迹建模序列累积收益(如滑动深度加权CTR)。

  3. Diversity:跨类目/创作者/内容形式的分布熵。

4.Fairness:冷启内容、长尾创作者曝光保障。

训练范式升级:强化学习与对比学习融合

推进自回归生成模型的架构升级与训练体系重构,引入强化学习微调(PPO/DPO)与对比学习机制,提升序列整体效率。

  • 搭建近线系统,生成高质量list候选,提升系统能力上限:

1.基于 DCG 的列表质量打分:

a. 对每个曝光列表L,计算其 DCG@K作为质量分数:

DCG(L)=j=1Kgain(itemj)log2(j+1)\text{DCG}(L) = \sum_{j=1}^{K} \frac{\text{gain}(item_j)}{\log_2(j + 1)}

其中 gain(item)可定义为:

若点击:+1.0

若互动(点赞/收藏):+1.5

若观看 >5s:+0.8

否则:0

2.构造偏好对:

a.对同一用户在同一上下文下的两个列表 LwL_w(win)和 LlL_l(lose)。

b.若 DCG(Lw)>DCG(Ll)+δDCG(L_w) > DCG(L_l) + \delta(δ 为 margin,如 0.1),则构成一个有效偏好对。

  • 引入强化学习微调(PPO/DPO)与对比学习机制,提升序列整体效率:
  1. 模型结构:

a.使用当前自回归生成模型作为策略模型。

b.固定预训练模型作为参考策略 (即 DPO 中的“旧策略”)。

2.DPO损失:

LDPO(θ)=E(x,yw,yl)D[logσ(β(logπθ(ywx)πref(ywx)logπθ(ylx)πref(ylx)))]\mathcal{L}_{\text{DPO}}(\theta) = -\mathbb{E}_{(x, y_w, y_l) \sim \mathcal{D}} \left[ \log \sigma \left( \beta \left( \log \frac{\pi_\theta(y_w \mid x)}{\pi_{\text{ref}}(y_w \mid x)} - \log \frac{\pi_\theta(y_l \mid x)}{\pi_{\text{ref}}(y_l \mid x)} \right) \right) \right]
  • 技术价值:
  1. 突破“质量-延迟-多样性”不可能三角:通过序列级优化,在同等延迟下实现质量与多样性双提升。

  2. 为AIGC与推荐融合铺路:端到端生成器可无缝接入AIGC内容,实现“内容生成-序列编排”一体化。

参考文献:

  1. Gloeckle F, Idrissi B Y, Rozière B, et al. Better & faster large language models via multi-token prediction[J]. arXiv preprint arXiv:2404.19737, 2024.
  2. Ren Y, Yang Q, Wu Y, et al. Non-autoregressive generative models for reranking recommendation[C]//Proceedings of the 30th ACM SIGKDD Conference on Knowledge Discovery and Data Mining. 2024: 5625-5634.
  3. Meng Y, Guo C, Cao Y, et al. A generative re-ranking model for list-level multi-objective optimization at taobao[C]//Proceedings of the 48th International ACM SIGIR Conference on Research and Development in Information Retrieval. 2025: 4213-4218.
  4. Zhao X, Xia L, Zhang L, et al. Deep reinforcement learning for page-wise recommendations[C]//Proceedings of the 12th ACM conference on recommender systems. 2018: 95-103.
  5. Feng Y, Hu B, Gong Y, et al. GRN: Generative Rerank Network for Context-wise Recommendation[J]. arXiv preprint arXiv:2104.00860, 2021.
  6. Pang L, Xu J, Ai Q, et al. Setrank: Learning a permutation-invariant ranking model for information retrieval[C]//Proceedings of the 43rd international ACM SIGIR conference on research and development in information retrieval. 2020: 499-508.

往期回顾

1.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

2.服务拆分之旅:测试过程全揭秘|得物技术

3.大模型网关:大模型时代的智能交通枢纽|得物技术

4.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

5.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

文 /张卓

关注得物技术,每周一、三更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

Flink ClickHouse Sink:生产级高可用写入方案|得物技术

作者 得物技术
2026年2月10日 11:11

一、背景与痛点

业务场景

在实时大数据处理场景中,Flink + ClickHouse 的组合被广泛应用于:

  • 日志处理: 海量应用日志实时写入分析库。
  • 监控分析: 业务指标、APM 数据的实时聚合。

这些场景的共同特点:

  • 数据量大:百万级 TPS,峰值可达千万级。
  • 写入延迟敏感: 需要秒级可见。
  • 数据准确性要求高:不允许数据丢失。
  • 多表写入: 不同数据根据分表策略写入不同的表。

开源 Flink ClickHouse Sink 的痛点

Flink 官方提供的 ClickHouse Sink(flink-connector-jdbc)在生产环境中存在以下严重问题:

痛点一:缺乏基于数据量的攒批机制

问题表现:

// Flink 官方 JDBC Sink 的实现
public class JdbcSink<Textends RichSinkFunction<T> {
    private final int batchSize;  // 固定批次大小
    @Override
    public void invoke(value, Context context) {
        bufferedValues.add(value);
        if (bufferedValues.size() >= batchSize) {
            // 只能基于记录数攒批,无法基于数据量
            flush();
        }
    }

带来的问题:

  1. 内存占用不可控: 100 条 1KB 的日志和 100 条 10MB 的日志占用内存差距 100 倍。
  2. OOM 风险高: 大日志记录(如堆栈转储)会迅速撑爆内存。
  3. 写入性能差: 无法根据记录大小动态调整批次,导致小记录批次过大浪费网络开销。

痛点二:无法支持动态表结构

问题表现:

// Flink 官方 Sink 只能写入固定表
public class JdbcSink {
    private final String sql;  // 固定的 INSERT SQL
    public JdbcSink(String jdbcUrl, String sql, ...) {
        this.sql = sql;  // 硬编码的表结构
    }
}

带来的问题:

  1. 多应用无法隔离: 所有应用的数据写入同一张表,通过特定分表策略区分。
  2. 扩展性差: 新增应用需要手动建表,无法动态路由。
  3. 性能瓶颈: 单表数据量过大(百亿级),查询和写入性能急剧下降。

痛点三:分布式表写入性能问题

问题表现:

// 大多数生产实现直接写入分布式表
INSERT INTO distributed_table_all VALUES (...)

ClickHouse 分布式表的工作原理:

带来的问题:

  1. 网络开销大: 数据需要经过分布式表层转发,延迟增加。
  2. 写入性能差: 分布式表增加了路由和转发逻辑,吞吐量降低。
  3. 热点问题: 所有数据先到分布式表节点,再转发,造成单点瓶颈。

生产级方案的核心改进

针对以上痛点,本方案提供了以下核心改进:

改进一:基于数据量的攒批机制

public class ClickHouseSinkCounter {
    private Long metaSize;  // 累计数据量(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize();  // 累加数据量
    }
}
// 触发条件
private boolean flushCondition(String application) {
    return checkMetaSize(application)  // metaSize >= 10000 字节
        || checkTime(application);     // 或超时 30 秒
}

优势:

  • 内存可控: 根据数据量而非记录数攒批。
  • 精确控制: 1KB 的记录攒 10000 条 = 10MB,1MB 的记录攒 10 条 = 10MB。
  • 避免OOM: 大日志记录不会撑爆内存。

改进二:动态表结构与分片策略

public abstract class ClickHouseShardStrategy<T> {
    public abstract String getTableName(T data);
}
//日志侧实现为应用级分表
public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 动态路由:order-service → tb_logs_order_service
        return String.format("tb_logs_%s", application);
    }
}

优势:

  • 应用隔离: 日志侧内置应用级分表,每个应用独立分表。
  • 动态路由: 根据 application 自动路由到目标表。
  • 扩展性强: 新增应用无需手动建表(配合 ClickHouse 自动建表)。

改进三:本地表写入 + 动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 直接写本地表,避免分布式表转发
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1. 动态获取集群节点列表
        List<String> healthyHosts = getHealthyHosts(exceptionHosts);
        // 2. 随机选择健康节点
        return dataSourceMap.get(healthyHosts.get(random.nextInt(size)));
    }
}

优势:

  • 性能提升: 直接写本地表,避免网络转发。
  • 高可用: 动态节点发现 + 故障节点剔除。
  • 负载均衡: 随机选择 + Shuffle 初始化。

技术方案概览

基于以上改进,本方案提供了以下核心能力:

  1. 本地表/分布式表写入: 性能优化与高可用平衡。
  2. 分片策略: 按应用维度路由与隔离。
  3. 攒批与内存控制: 双触发机制(数据量 + 超时)。
  4. 流量控制与限流: 有界队列 + 连接池。
  5. 健壮的重试机制: 递归重试 + 故障节点剔除。
  6. Checkpoint 语义保证: At-Least-Once 数据一致性。

二、核心架构设计

架构图

核心组件

核心流程

三、本地表 vs 分布式表写入

ClickHouse 表结构说明

ClickHouse 推荐直接写本地表,原因:

  1. 写入性能: 避免分布式表的网络分发。
  2. 数据一致性: 直接写入目标节点,减少中间环节故障点,比分布式表写入更安全,利于工程化。
  3. 负载均衡: 客户端路由实现负载分散。
-- 本地表(实际存储数据)
CREATE TABLE tb_logs_local ON CLUSTER 'default' (
    application String,
    environment String,
    message String,
    log_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(log_time)
ORDER BY (application, log_time);
-- 分布式表(逻辑视图,不存储数据)
CREATE TABLE tb_logs_all ON CLUSTER 'default' AS tb_logs_local
ENGINE = Distributed('default', dw_log, tb_logs_local, cityHash64(application));

HikariCP 连接池配置

// HikariCP 连接池配置
public class ClickHouseDataSourceUtils {
    private static HikariConfig getHikariConfig(DataSourceImpl dataSource) {
        HikariConfig config = new HikariConfig();
        config.setConnectionTimeout(30000L);    // 连接超时 30s
        config.setMaximumPoolSize(20);          // 最大连接数 20
        config.setMinimumIdle(2);               // 最小空闲 2
        config.setDataSource(dataSource);
        return config;
    }
    private static Properties getClickHouseProperties(ClickHouseSinkCommonParams params) {
        Properties props = new Properties();
        props.setProperty("user", params.getUser());
        props.setProperty("password", params.getPassword());
        props.setProperty("database", params.getDatabase());
        props.setProperty("socket_timeout""180000");      // Socket 超时 3 分钟
        props.setProperty("socket_keepalive""true");      // 保持连接
        props.setProperty("http_connection_provider""APACHE_HTTP_CLIENT");
        return props;
    }
}

配置说明:

  • maxPoolSize=20:每个 ClickHouse 节点最多 20 个连接。
  • minIdle=2:保持 2 个空闲连接,避免频繁创建。
  • socket_timeout=180s:Socket 超时 3 分钟,防止长时间查询阻塞。

ClickHouseLocalWriter:动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 本地节点缓存,按 IP 维护
    private final ConcurrentMap<StringHikariDataSource> dataSourceMap;
    // 动态获取集群本地表节点
    private final ClusterIpsUtils clusterIpsUtils;
    // IP 变更标志(CAS 锁,避免并发更新)
    private static final AtomicBoolean IP_CHANGING = new AtomicBoolean(false);
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1️⃣ 检测集群节点变化(通过 CAS 避免并发更新)
        if (clusterIpsChanged() && IP_CHANGING.compareAndSet(falsetrue)) {
            try {
                ipChanged(); // 动态更新 dataSourceMap
            } finally {
                IP_CHANGING.set(false);
            }
        }
        // 2️⃣ 获取异常节点列表(从 Redis + APM 实时查询)
        Set<String> exceptIps = clusterIpsUtils.getExceptIps();
        exceptIps.addAll(exceptionHosts);
        // 3️⃣ 过滤健康节点,随机选择
        List<String> healthyHosts = dataSourceMap.keySet().stream()
            .filter(host -> !exceptIps.contains(host))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            throw new RuntimeException("Can't get datasource from local cache");
        }
        return dataSourceMap.get(healthyHosts.get(random.nextInt(healthyHosts.size())));
    }
    private void ipChanged() {
        List<String> clusterIps = clusterIpsUtils.getClusterIps();
        // 新增节点:自动创建连接池
        clusterIps.forEach(ip ->
            dataSourceMap.computeIfAbsent(ip, v ->
                createHikariDataSource(ip, port)
            )
        );
        // 移除下线节点:关闭连接池
        dataSourceMap.forEach((ip, ds) -> {
            if (!clusterIps.contains(ip)) {
                dataSourceMap.remove(ip);
                ds.close();
            }
        });
    }
}

核心逻辑:

  1. 动态节点发现: 从 system.clusters 查询所有节点。
  2. 自动扩缩容: 节点上线自动加入,下线自动剔除。
  3. 故障节点剔除: 通过 APM 监控,自动剔除异常节点。
  4. 负载均衡: 随机选择健康节点,避免热点。

集群节点动态发现(ClusterIpsUtils)

public class ClusterIpsUtils {
    // 从 system.clusters 查询所有节点
    private static final String QUERY_CLUSTER_IPS =
        "select host_address from system.clusters where cluster = 'default'";
    // LoadingCache:定时刷新节点列表(1 小时)
    private final LoadingCache<StringList<String>> clusterIpsCache =
        CacheBuilder.newBuilder()
            .expireAfterAccess(10, TimeUnit.HOURS)
            .refreshAfterWrite(1, TimeUnit.HOURS)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public List<String> load(String dbName) {
                    return queryClusterIps();  // 定时刷新节点列表
                }
            }));
    // 异常节点缓存(1 分钟刷新)
    private final LoadingCache<StringFlinkExceptIpModel> exceptIpsCache =
        CacheBuilder.newBuilder()
            .refreshAfterWrite(1, TimeUnit.MINUTES)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public FlinkExceptIpModel load(String dbName) {
                    return queryExceptIp();  // 从 Redis + APM 查询异常节点
                }
            }));
}

异常节点监控策略:

  • 磁盘使用率 >= 90%: 从 APM 查询 Prometheus 指标,自动加入黑名单。
  • HTTP 连接数 >= 50: 连接数过多说明节点压力大,自动加入黑名单。
  • 人工配置: 通过 Redis 配置手动剔除节点

数据来源:

  1. ClickHouse system.clusters 表: 获取所有集群节点。
  2. APM Prometheus 接口: 监控节点健康状态。
  3. Redis 缓存: 人工配置的异常节点。

负载均衡优化

public class ClickHouseWriter {
    public <T> ClickHouseWriter(...) {
        // Shuffle:随机打乱数据源顺序
        Collections.shuffle(clickHouseDataSources);
        this.clickHouseDataSources = clickHouseDataSources;
    }
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 轮询 + 随机选择(已 shuffle,避免热点)
        int current = this.currentRandom.getAndIncrement();
        if (current >= clickHouseDataSources.size()) {
            this.currentRandom.set(0);
        }
        return clickHouseDataSources.get(currentRandom.get());
    }
}

优势:

  • 初始化时 shuffle,避免所有 writer 同时从第一个节点开始。
  • 轮询 + 随机选择,负载分散更均匀。
  • 故障节点自动剔除。

四、支持分表策略

分片策略抽象

public abstract class ClickHouseShardStrategy<T> {
    private String tableName;      // 表名模板,如 "tb_log_%s"
    private Integer tableCount;    // 分表数量
    // 根据数据决定目标表名
    public abstract String getTableName(T data);
}

日志分片实现

public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 表名格式:tb_log_{application}
        // 例如:application = "order-service" -> table = "tb_log_order_service"
        return String.format(
            this.getTableName(),
            application.replace("-""_").toLowerCase()
        );
    }
}

按表(应用)维度的缓冲区

日志侧维度降级为应用名称维度缓冲区,实则因为按照应用分表,

业务方可使用自身分表策略携带表名元数据,进行表维度缓冲。

public class ClickHouseShardSinkBuffer {
    // 按 application 分组的缓冲区(ConcurrentHashMap 保证并发安全)
    private final ConcurrentHashMap<StringClickHouseSinkCounter> localValues;
    public void put(LogModel value) {
        String application = value.getApplication();
        // 1️⃣ 检查是否需要 flush
        if (flushCondition(application)) {
            addToQueue(application); // 触发写入
        }
        // 2️⃣ 添加到缓冲区(线程安全的 compute 操作)
        localValues.compute(application, (k, v) -> {
            if (v == null) v = new ClickHouseSinkCounter();
            v.add(value);
            return v;
        });
    }
    private void addToQueue(String application) {
        localValues.computeIfPresent(application, (k, v) -> {
            // 深拷贝并清空(避免并发修改异常)
            List<LogModel> deepCopy = v.copyValuesAndClear();
            // 构造请求 Blank:application + targetTable + values
            String targetTable = shardStrategy.getTableName(application);
            ClickHouseRequestBlank blank = new ClickHouseRequestBlank(deepCopy, application, targetTable);
            // 放入队列
            writer.put(blank);
            return v;
        });
    }
}

核心设计:

  • 应用隔离: 每个表(应用)独立的 buffer,互不影响。
  • 线程安全: 使用 ConcurrentHashMap.compute()保证并发安全。
  • 深拷贝: List.copyOf() 创建不可变副本,避免并发修改。
  • 批量清空: 一次性取出所有数据,清空计数器。

五、攒批与内存控制

双触发机制

public class ClickHouseShardSinkBuffer {
    private final int maxFlushBufferSize;  // 最大批次大小(如 10000)
    private final long timeoutMillis;      // 超时时间(如 30s)
    // 触发条件检查(满足任一即触发)
    private boolean flushCondition(String application) {
        return localValues.get(application) != null
            && (checkMetaSize(application) || checkTime(application));
    }
    // 条件1:达到批次大小
    private boolean checkMetaSize(String application) {
        return localValues.get(application).getMetaSize() >= maxFlushBufferSize;
    }
    // 条件2:超时
    private boolean checkTime(String application) {
        long current = System.currentTimeMillis();
        return current - localValues.get(application).getInsertTime() > timeoutMillis;
    }
}

批次大小计算

public class ClickHouseSinkCounter {
    private final List<LogModel> values;
    private Long metaSize; // 累计的 metaSize(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize(); // 累加 metaSize
    }
    public List<LogModel> copyValuesAndClear() {
        List<LogModel> logModels = List.copyOf(this.values); // 深拷贝(不可变)
        this.values.clear();
        this.metaSize = 0L;
        this.insertTime = System.currentTimeMillis();
        return logModels;
    }
}

关键点:

  • 使用 metaSize(字节数)而非记录数控制批次,内存控制更精确。
  • List.copyOf() 创建不可变副本,避免并发修改。
  • 清空后重置 insertTime,保证超时触发准确性。

带随机抖动的超时

private final long timeoutMillis;
public ClickHouseShardSinkBuffer(..., int timeoutSec, ...) {
    // 基础超时 + 10% 随机抖动(避免惊群效应)
    this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSec)
                      + new SecureRandom().nextInt((int) (timeoutSec * 0.1 * 1000));
}

目的: 避免多个TM 同时触发 flush,造成写入流量峰值。

配置示例

ClickHouseShardSinkBuffer.Builder
    .aClickHouseSinkBuffer()
    .withTargetTable("single_table")  //单表时,可直接使用指定表名
    .withMaxFlushBufferSize(10000)  // 对应字节数
    .withTimeoutSec(30)              // 30 秒超时
    .withClickHouseShardStrategy(new LogClickHouseShardStrategy("table_prefix_%s", 8))  //分表策略时,使用
    // 分表策略可根据业务实际情况进行扩展
    .build(clickHouseWriter);

六、写入限流与流量控制

有界队列设计

public class ClickHouseWriter {
    // 有界阻塞队列
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, ...) {
        // 队列最大容量配置(默认 10)
        this.commonQueue = new LinkedBlockingQueue<>(sinkParams.getQueueMaxCapacity());
    }
    public void put(ClickHouseRequestBlank params) {
        unProcessedCounter.incrementAndGet();
        // put() 方法在队列满时会阻塞,实现背压
        commonQueue.put(params);
    }
}

背压传导:

线程池并发控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

WriterTask 消费逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

配置参数

七、重试机制与超时控制

Future 超时控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

超时策略:

  • Future 超时: 3 分钟(orTimeout)。
  • Socket 超时: 3 分钟(socket_timeout=180000)。
  • 连接超时: 30 秒(connectionTimeout=30000)。

重试逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

重试控制逻辑

private void handleUnsuccessfulResponse(..., Set<String> exceptHosts) {
    // 检查 Future 是否已完成(避免重复完成)
    if (future.isDone()) {
        return;
    }
    if (attemptCounter >= maxRetries) {
        // 达到最大重试次数,标记失败
        future.completeExceptionally(new RuntimeException("Max retries exceeded"));
    } else {
        // 递归重试
        requestBlank.incrementCounter();
        send(requestBlank, future, exceptHosts); // 递归调用,排除失败节点
    }
}

重试策略:

  • 递归重试: 失败后递归调用,直到成功或达到最大次数。
  • 异常节点隔离: 每次重试时排除失败的节点(exceptHosts)。
  • 超时控制: Future 超时(3 分钟)防止永久阻塞。

为什么递归重试是更好的选择

递归重试(当前实现)

队列重试(假设方案)

保证一致性

  // ClickHouseWriter.java:139-158
  while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
      CompletableFuture<Void> future = FutureUtil.allOf(futures);
      future.get(3, TimeUnit.MINUTES);  // 阻塞直到全部完成
  }
  • Checkpoint 时所有数据要么全部成功,要么全部失败。
  • 重启后不会有部分数据重复的问题。

简单可靠

  • 代码逻辑清晰。
  • 对于队列重试且不重复,需要复杂的二阶段提交(这里暂不展开),大幅增加代码复杂度。

性能可接受

class WriterTask implements Runnable {
    @Override
    public void run() {
        while (isWorking || !queue.isEmpty()) {
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置 3 分钟超时
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES); // 防止永久阻塞
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Timeout"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}
  • 虽然阻塞,但有超时保护。
  • ClickHouse 写入通常很快(秒级)。
  • 网络故障时重试也合理。

避开故障节点

  // ClickHouseWriter.java:259-260
  HikariDataSource dataSource = getNextDataSource(exceptHosts);
  • 递归时可以传递 exceptHosts。
  • 自动避开失败的节点。
  • 提高成功率。

异常节点剔除

// 特殊错误码列表(自动加入黑名单)
private final List<Integer> ignoreHostCodes = Arrays.asList(2101002);
public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
    if (CollectionUtils.isNotEmpty(exceptionHosts)) {
        // 过滤异常节点
        List<HikariDataSource> healthyHosts = clickHouseDataSources.stream()
            .filter(ds -> !exceptionHosts.contains(getHostFromUrl(ds)))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            return null// 所有节点都异常
        }
        return healthyHosts.get(random.nextInt(healthyHosts.size()));
    }
    // 正常轮询(已 shuffle,避免热点)
    return clickHouseDataSources.get(currentRandom.getAndIncrement() % size);
}

故障节点剔除策略:

  1. 错误码 210(网络异常): 自动加入黑名单。
  2. 错误码 1002(连接池异常): 自动加入黑名单。
  3. APM 监控: 磁盘 >= 90%、HTTP 连接 >= 50 的节点。
  4. 手动配置: 通过 Redis 配置剔除。

恢复机制:

  • LoadingCache 定时刷新(1 分钟)。
  • 节点恢复健康后自动从黑名单移除。

重试流程图

八、异常处理模式

两种 Sink 模式

public Sink buildSink(String targetTable, String targetCount, int maxBufferSize) {
    IClickHouseSinkBuffer buffer = ClickHouseShardSinkBuffer.Builder
        .aClickHouseSinkBuffer()
        .withTargetTable(targetTable)
        .withMaxFlushBufferSize(maxBufferSize)
        .withClickHouseShardStrategy(new LogClickHouseShardStrategy(targetTable, count))
        .build(clickHouseWriter);
    // 根据配置选择模式
    if (ignoringClickHouseSendingExceptionEnabled) {
        return new UnexceptionableSink(buffer);  // 忽略异常
    } else {
        return new ExceptionsThrowableSink(buffer); // 抛出异常
    }
}

UnexceptionableSink(忽略异常 - At-Most-Once)

public class UnexceptionableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) {
        buffer.put(message);  // 不检查 Future 状态
    }
    @Override
    public void flush() {
        buffer.flush();
    }
}

适用场景:

  • 允许部分数据丢失。
  • 不希望因写入异常导致任务失败。
  • 对数据准确性要求不高(如日志统计)。

语义保证:At-Most-Once(最多一次)

ExceptionsThrowableSink(抛出异常 - At-Least-Once)

public class ExceptionsThrowableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) throws ExecutionException, InterruptedException {
        buffer.put(message);
        // 每次写入都检查 Future 状态
        buffer.assertFuturesNotFailedYet();
    }
    @Override
    public void flush() throws ExecutionException, InterruptedException {
        buffer.flush();
    }
}

Future 状态检查:

public void assertFuturesNotFailedYet() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = FutureUtil.allOf(futures);
    // 非阻塞检查
    if (future.isCompletedExceptionally()) {
        logger.error("There is something wrong with the future. exist sink now");
        future.get(); // 抛出异常,导致 Flink 任务失败
    }
}

适用场景:

  • 数据准确性要求高。
  • 需要保证所有数据写入成功。
  • 异常时希望 Flink 任务失败并重启。

语义保证:At-Least-Once(至少一次)

Future 清理策略与并发控制

定时检查器

public class ClickHouseSinkScheduledCheckerAndCleaner {
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<CompletableFuture<Boolean>> futures;
    // ⚠️ volatile 保证多线程可见性(关键并发控制点)
    private volatile boolean isFlushing = false;
    public ClickHouseSinkScheduledCheckerAndCleaner(...) {
        // 单线程定时执行器
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        // 定时执行清理任务(每隔 checkTimeout 秒,默认 30 秒)
        scheduledExecutorService.scheduleWithFixedDelay(getTask(), ...);
    }
    private Runnable getTask() {
        return () -> {
            synchronized (this) {
                //  关键:检查是否正在 flush,避免并发冲突
                if (isFlushing) {
                    return// Checkpoint 期间暂停清理
                }
                // 1️⃣ 清理已完成的 Future
                futures.removeIf(filter);
                // 2️⃣ 触发所有 Buffer 的 flush(检查是否需要写入)
                clickHouseSinkBuffers.forEach(IClickHouseSinkBuffer::tryAddToQueue);
            }
        };
    }
    // Checkpoint flush 前调用(暂停 cleaner)
    public synchronized void beforeFlush() {
        isFlushing = true;
    }
    // Checkpoint flush 后调用(恢复 cleaner)
    public synchronized void afterFlush() {
        isFlushing = false;
    }
}

核心设计:

  • volatile boolean isFlushing: 标志位,协调 cleaner 与 checkpoint 线程。
  • synchronized (this): 保证原子性,避免并发冲突。
  • 单线程执行器: 避免 cleaner 内部并发问题。

并发控制机制

问题场景:

时间轴冲突:
T1: Cleaner 线程正在执行 tryAddToQueue()
T2: Checkpoint 触发,调用 sink.flush()
T3: Cleaner 同时也在执行 tryAddToQueue()
    ├─ 可能导致:数据重复写入
    ├─ 可能导致:Buffer 清空顺序混乱
    └─ 可能导致:Future 状态不一致

解决方案:

// ClickHouseSinkManager.flush()
public void flush() {
    // 1️⃣ 暂停定时清理任务(设置标志)
    clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
    try {
        // 2️⃣ 执行 flush(此时 cleaner 线程会跳过执行)
        clickHouseWriter.waitUntilAllFuturesDone(falsefalse);
    } finally {
        // 3️⃣ 恢复定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
    }
}

并发控制流程:

关键设计点:

  1. volatile 保证可见性: isFlushing 使用 volatile,确保多线程间的可见性。
  2. synchronized 保证原子性: getTask() 整个方法体使用 synchronized (this)。
  3. 标志位协调: 通过 isFlushing 标志实现两个线程间的协调。
  4. finally 确保恢复: 即使 waitUntilAllFuturesDone() 异常,也会在 finally 中恢复 cleaner。

避免的并发问题:

  • 数据重复写入: Cleaner 和 Checkpoint 同时 flush。
  • Buffer 状态不一致: 一边清空一边写入。
  • Future 清理冲突: 正在使用的 Future 被清理。

性能影响:

  • Checkpoint flush 期间,cleaner 暂停执行(通常 1-3 秒)。
  • Cleaner 跳过的周期会在下次正常执行时补偿。
  • 对整体吞吐影响极小(cleaner 间隔通常 30 秒)。

九、Checkpoint 语义保证

为什么 Checkpoint 时必须 Flush?

不 Flush 的后果

不Flush导致数据永久丢失

正确做法

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    logger.info("start doing snapshot. flush sink to ck");
    // 1. 先 flush buffer(将内存数据写入 ClickHouse)
    if (sink != null) {
        sink.flush();
    }
    // 2. 等待所有写入完成
    if (sinkManager != null && !sinkManager.isClosed()) {
        sinkManager.flush();
    }
    // 此时 Checkpoint 才能标记为成功
    logger.info("doing snapshot. flush sink to ck");
}

Flush 实现与并发协调

public class ClickHouseSinkManager {
    public void flush() {
        //  步骤1:暂停定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
        try {
            //  步骤2:执行 buffer flush + 等待所有写入完成
            clickHouseWriter.waitUntilAllFuturesDone(falsefalse);
        } finally {
            //  步骤3:恢复定时清理任务(finally 确保执行)
            clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
        }
    }
}

并发协调详解:

// cleaner 线程执行流程
synchronized (this) {
    if (isFlushing) {
        return// Checkpoint 期间跳过本次执行
    }
    // 正常执行:清理已完成的 Future + 触发 Buffer flush
    futures.removeIf(filter);
    buffers.forEach(Buffer::tryAddToQueue);
}

关键点:

  • volatile 可见性: isFlushing 使用 volatile 确保 cleaner 线程立即看到状态变化。
  • synchronized互斥: getTask()方法体使用 synchronized (this) 确保原子性。
  • 标志位协调: 通过 beforeFlush() / afterFlush() 管理标志位。
  • finally 保证恢复: 即使 flush 异常,也会在 finally 中恢复 cleaner。

等待所有 Future 完成

public synchronized void waitUntilAllFuturesDone(boolean stopWriters, boolean clearFutures) {
    try {
        // 循环等待:直到所有 Future 完成 + 队列清空
        while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
            CompletableFuture<Void> all = FutureUtil.allOf(futures);
            // 最多等待 3 分钟(与 Future 超时一致)
            all.get(3, TimeUnit.MINUTES);
            // 移除已完成的 Future(非异常)
            futures.removeIf(f -> f.isDone() && !f.isCompletedExceptionally());
            // 检查是否有异常 Future
            if (anyFutureFailed()) {
                break; // 有异常则退出
            }
        }
    } finally {
        if (stopWriters) stopWriters();
        if (clearFutures) futures.clear();
    }
}

关键逻辑:

  • 循环等待直到所有 Future 完成 + 队列清空。
  • 超时 3 分钟(与 Future 超时一致)。
  • 移除已完成的非异常 Future。
  • 有异常时退出循环。

三种 Flush 触发方式对比

Checkpoint 参数配置

// Checkpoint 配置建议
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint(间隔 1 分钟)
env.enableCheckpointing(60000);
// Checkpoint 超时(必须大于 Future 超时 + 重试时间)
// 建议:CheckpointTimeout > FutureTimeout * MaxRetries
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
// 一致性模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔(避免过于频繁)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 秒
// 最大并发 Checkpoint 数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

语义保证

推荐配置:

生产环境: 使用 ExceptionsThrowableSink + Checkpoint。

允许部分丢失: 使用 UnexceptionableSink。

十、最佳实践与调优

生产配置

// ========== ClickHouse 连接参数 ==========
clickhouse.sink.target-table = tb_logs_local
clickhouse.sink.max-buffer-size104857600        // 批次大小
clickhouse.sink.table-count0                // 0 表示不分表
// ========== 写入性能参数 ==========
clickhouse.sink.num-writers10               // 写入线程数
clickhouse.sink.queue-max-capacity10        // 队列容量
clickhouse.sink.timeout-sec30               // flush 超时
clickhouse.sink.retries10                   // 最大重试次数
clickhouse.sink.check.timeout-sec30         // 定时检查间隔
// ========== 异常处理参数 ==========
clickhouse.sink.ignoring-clickhouse-sending-exception-enabledfalse
clickhouse.sink.local-address-enabledtrue   // 启用本地表写入
// ========== ClickHouse 集群配置 ==========
clickhouse.access.hosts192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123
clickhouse.access.user = default
clickhouse.access.password = ***
clickhouse.access.database = dw_xx_xx
clickhouse.access.cluster = default
// ========== HikariCP 连接池配置 ==========
connectionTimeout30000                      // 连接超时 30s
maximumPoolSize20                           // 最大连接数 20
minimumIdle2                                // 最小空闲 2
socket_timeout180000                        // Socket 超时 3mi

性能调优

故障排查

十一、总结

本文深入分析了 Flink ClickHouse Sink 的实现方案,核心亮点包括:

技术亮点

  • 连接池选型: 使用 HikariCP,性能优异,连接管理可靠。
  • Future 超时控制: orTimeout(3min) 防止永久阻塞。
  • 显式资源管理: Connection 和 PreparedStatement 显式关闭,防止连接泄漏。
  • 负载均衡优化: Shuffle 初始化 + 轮询选择,避免热点。
  • 异常处理增强: future.isDone() 检查,避免重复完成。
  • 本地表写入: 动态节点发现 + 故障剔除,写入性能提升。
  • 分片策略: 按表(应用)维度路由,独立缓冲和隔离。
  • 攒批优化: 双触发机制(大小 + 超时)+ 随机抖动。
  • 流量控制: 有界队列 + 线程池,实现背压。
  • 健壮重试: 递归重试 + 异常节点剔除 + 最大重试限制。

Checkpoint 语义

  • At-Least-Once: ExceptionsThrowableSink + Checkpoint。
  • At-Most-Once: UnexceptionableSink。
  • Exactly-Once: 需要配合 ClickHouse 事务(未实现)。

生产建议

  1. 必须: Checkpoint 时 flush,否则会丢数据。
  2. 推荐: 使用 HikariCP + 本地表写入。
  3. 推荐: 配置合理的超时(Future < Socket < Checkpoint)。
  4. 推荐: 监控队列大小、Future 失败率、重试次数。

该方案已在生产环境大规模验证,能够稳定支撑百万级 TPS 的日志写入场景。

往期回顾

1.服务拆分之旅:测试过程全揭秘|得物技术

2.大模型网关:大模型时代的智能交通枢纽|得物技术

3.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

4.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

5.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

文 /虚白

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

服务拆分之旅:测试过程全揭秘|得物技术

作者 得物技术
2026年2月5日 14:47

一、引言

代码越写越多怎么办?在线等挺急的! Bidding-interface服务代码库代码量已经达到100w行!!

Bidding-interface应用是出价域核心应用之一,主要面向B端商家。跟商家后台有关的出价功能都围绕其展开。是目前出价域代码量最多的服务。

随着出价业务最近几年来的快速发展,出价服务承接的流量虽然都是围绕卖家出价,但是已远远超过卖家出价功能范围。业务的快速迭代而频繁变更给出价核心链路高可用、高性能都带来了巨大的风险。

经总结有如下几个痛点:

  • 核心出价链路未隔离:

    出价链路各子业务模块间代码有不同程度的耦合,迭代开发可扩展性差,往往会侵入到出价主流程代码的改动。每个子模块缺乏独立的封装,而且存在大量重复的代码,每次业务规则调整,需要改动多处,容易出现漏改漏测的问题。

  • 大单体&功能模块定义混乱:

    历史原因上层业务层代码缺乏抽象,代码无法实现复用,需求开发代码量大,导致需求估时偏高,经常出现20+人日的大需求,需求开发中又写出大量重复代码,导致出价服务代码库快速膨胀,应用启动耗时过长,恶性循环。

  • B/C端链路未隔离:

    B端卖家出价链路流量与C端价格业务场景链路流量没有完全隔离,由于历史原因,有些B端出价链路接口代码还存在于price应用中,偶尔B端需求开发会对C端应用做代码变更。存在一定的代码管控和应用权限管控成本。

  • 发布效率影响:

    代码量庞大,导致编译速度缓慢。代码过多,类的依赖关系更为复杂,持续迭代逐步加大编译成本,随着持续迭代,新的代码逻辑 ,引入更多jar 依赖,间接导致项目部署时长变长蓝绿发布和紧急问题处理时长显著增加;同时由于编译与部署时间长,直接影响开发人员在日常迭代中的效率(自测,debug,部署)。

  • 业务抽象&分层不合理:

    历史原因出价基础能力领域不明确,出价底层和业务层分层模糊,业务层代码和出价底层代码耦合严重,出价底层能力缺乏抽象,上层业务扩展需求频繁改动出价底层能力代码。给出价核心链路代码质量把控带来较高的成本, 每次上线变更也带来一定的风险。

以上,对于Bidding服务的拆分和治理,已经箭在弦上不得不发。否则,持续的迭代会继续恶化服务的上述问题。

经过前期慎重的筹备,设计,排期,拆分,和测试。目前Bidding应用经过四期的拆分节奏,已经马上要接近尾声了。服务被拆分成三个全新的应用,目前在小流量灰度放量中。

本次拆分涉及:1000+Dubbo接口,300+个HTTP接口,200+ MQ消息,100+个TOC任务,10+个 DJob任务。

本人是出价域测试一枚,参与了一期-四期的拆分测试工作。

项目在全组研发+测试的ALL IN投入下,已接近尾声。值此之际输出一篇文章,从测试视角复盘下,Bidding服务的拆分与治理,也全过程揭秘下出价域内的拆分测试过程。

二、服务拆分的原则

首先,在细节性介绍Bidding拆分之前。先过大概过一下服务拆分原则:

  • 单一职责原则 (SRP):  每个服务应该只负责一项特定的业务功能,避免功能混杂。

  • 高内聚、低耦合:  服务内部高度内聚,服务之间松耦合,尽量减少服务之间的依赖关系。

  • 业务能力导向:  根据业务领域和功能边界进行服务拆分,确保每个服务都代表一个完整的业务能力。

拆分原则之下,还有不同的策略可以采纳:基于业务能力拆分、基于领域驱动设计 (DDD) 拆分、基于数据拆分等等。同时,拆分时应该注意:避免过度拆分、考虑服务之间的通信成本、设计合理的 API 接口。

服务拆分是微服务架构设计的关键步骤,需要根据具体的业务场景和团队情况进行综合考虑。合理的服务拆分可以提高系统的灵活性、可扩展性和可维护性,而不合理的服务拆分则会带来一系列问题。

三、Bidding服务拆分的设计

如引言介绍过。Bidding服务被拆分出三个新的应用,同时保留bidding应用本身。目前共拆分成四个应用:Bidding-foundtion,Bidding-interface,Bidding-operation和Bidding-biz。详情如下:

  • 出价基础服务-Bidding-foundation:

出价基础服务,对出价基础能力抽象,出价领域能力封装,基础能力沉淀。

  • 出价服务-Bidding-interfaces:

商家端出价,提供出价基础能力和出价工具,提供商家在各端出价链路能力,重点保障商家出价基础功能和出价体验。

  • 出价运营服务-Bidding-operation:

出价运营,重点支撑运营对出价业务相关规则的维护以及平台其他域业务变更对出价域数据变更的业务处理:

  1. 出价管理相关配置:出价规则配置、指定卖家规则管理、出价应急隐藏/下线管理工具等;
  2. 业务大任务:包括控价生效/失效,商研鉴别能力变更,商家直发资质变更,品牌方出价资质变更等大任务执行。
  • 业务扩展服务-Bidding-biz:

更多业务场景扩展,侧重业务场景的灵活扩展,可拆出的现有业务范围:国补采购单出价,空中成单业务,活动出价,直播出价,现订现采业务,预约抢购,新品上线预出价,入仓预出价。

应用拆分前后流量分布情况:

图片

四、Bidding拆分的节奏和目标收益

服务拆分是项大工程,对目前的线上质量存在极大的挑战。合理的排期和拆分计划是重点,可预期的收益目标是灵魂。

经过前期充分调研和规划。Bidding拆分被分成了四期,每期推进一个新应用。并按如下六大步进行:

图片

Bidding拆分目标

  • 解决Bidding大单体问题: 对Bidding应用进行合理规划,完成代码和应用拆分,解决一直以来Bidding大单体提供的服务多而混乱,维护成本高,应用编译部署慢,发布效率低等等问题。
  • 核心链路隔离&提升稳定性: 明确出价基础能力,对出价基础能力下沉,出价基础能力代码拆分出独立的代码库,并且部署在独立的新应用中,实现出价核心链路隔离,提升出价核心链路稳定性。
  • 提升迭代需求开发效率: 完成业务层代码抽象,业务层做组件化配置化,实现业务层抽象复用,降低版本迭代需求开发成本。
  • 实现出价业务应用合理规划: 各服务定位、职能明确,分层抽象合理,更好服务于企/个商家、不同业务线运营等不同角色业务推进。

预期的拆分收益

  • 出价服务应用结构优化:

    完成对Bidding大单体应用合理规划拆分,向下沉淀出出价基础服务应用层,降低出价基础能力维护成功;向上抽离出业务扩展应用层,能够实现上层业务的灵活扩展;同时把面向平台运营和面向卖家出价的能力独立维护;在代码库和应用层面隔离,有效减少版本迭代业务需求开发变更对应用的影响面,降低应用和代码库的维护成本。

  • 完成业务层整体设计,业务层抽象复用,业务层做组件化配置化,提升版本迭代需求开发效率,降低版本迭代需求开发成本:

    按业务类型对业务代码进行分类,统一设计方案,提高代码复用性,支持业务场景变化时快速扩展,以引导降价为例,当有类似降价换流量/降价换销量新的降价场景需求时,可以快速上线,类似情况每个需求可以减少10-20人日开发工作量。

  • 代码质量提升 :

    通过拆分出价基础服务和对出价流程代码做重构,将出价基础底层能力代码与上层业务层代码解耦,降低代码复杂度,降低代码冲突和维护难度,从而提高整体代码质量和可维护性。

  • 开发效率提升 :

    1. 缩短应用部署时间: 治理后的出价服务将加快编译和部署速度,缩短Bidding-interfaces应用发布(编译+部署)时间 由12分钟降低到6分钟,从而显著提升开发人员的工作效率,减少自测、调试和部署所需的时间。以Bidding服务T1环境目前一个月编译部署至少1500次计算,每个月可以节约150h应用发布时间。
    2. 提升问题定位效率: 出价基础服务层与上层业务逻辑层代码库&应用分开后,排查定位开发过程中遇到的问题和线上问题时可以有效缩小代码范围,快速定位问题代码位置。

五、测试计划设计

服务拆分的前期,研发团队投入了大量的心血。现在代码终于提测了,进入我们的测试环节:

为了能收获更好的质量效果,同时也为了不同研发、测试同学的分工。我们需要细化到最细粒度,即接口维度整理出一份详细的文档。基于此文档的基础,我们确定工作量和人员排期:

如本迭代,我们投入4位研发同学,2位测试同学。完成该200个Dubbo接口和100个HTTP接口,以及20个Topic迁移。对应的提测接口,标记上负责的研发、测试、测试进度、接口详细信息等内容。

基于该文档的基础上,我们的工作清晰而明确。一个大型的服务拆分,也变成了一步一步的里程碑任务。

接下来给大家看一下,关于Bidding拆分。我们团队整体的测试计划,我们一共设计了五道流程。

  • 第一关:自测接口对比:

    每批次拆分接口提测前,研发同学必须完成接口自测。基于新旧接口返回结果对比验证。验证通过后标记在文档中,再进入测试流程。

    对于拆分项目,自测卡的相对更加严格。由于仅做接口迁移,逻辑无变更,自测也更加容易开展。由研发同学做好接口自测,可以避免提测后新接口不通的低级问题。提高项目进度。

    在这个环节中。偶尔遇见自测不充分、新接口参数传丢、新Topic未配置等问题。(三期、四期测试中,我们加强了对研发自测的要求)。

  • 第二关:测试功能回归

    这一步骤基本属于测试的人工验证,同时重点需关注写接口数据验证。

    回归时要测的细致。每个接口,测试同学进行合理评估。尽量针对接口主流程,进行细致功能回归。由于迁移的接口数量多,历史逻辑重。一方面在接口测试任务分配时,要尽量选择对该业务熟悉的同学。另一方面,承接的同学也有做好历史逻辑梳理。尽量不要产生漏测造成的问题。

    该步骤测出的问题五花八门。另外由于Bidding拆分成多个新服务。两个新服务经常彼此间调用会出现问题。比如二期Bidding-foundation迁移完成后,Bidding-operation的接口在迁移时,依赖接口需要从Bidding替换成foundation的接口。

    灰度打开情况下,调用新接口报错仍然走老逻辑。(测试时,需要关注trace中是否走了新应用)。

  • 第三关:自动化用例

    出价域内沉淀了比较完善的接口自动化用例。在人工测试时,测试同学可以借助自动化能力,完成对迁移接口的回归功能验证。

    同时在发布前天,组内会特地多跑一轮全量自动化。一次是迁移接口开关全部打开,一次是迁移接口开关全部关闭即正常的自动化回归。然后全员进行排错。

    全量的自动化用例执行,对迁移接口问题拦截,有比较好的效果。因为会有一些功能点,人工测试时关联功能未考虑到,但在接口自动化覆盖下无所遁形。

  • 第四关:流量回放

    在拆分接口开关打开的情况下,在预发环境进行流量回放。

    线上录制流量的数据往往更加复杂,经常会测出一些意料之外的问题。

    迭代过程中,我们组内仍然会在沿用两次回放。迁移接口开关打开后回放一次,开关关闭后回放一次。(跟发布配置保持一致)。

  • 第五关:灰度过程中,关闭接口开关,功能回滚

    为保证线上生产质量,在迁移接口小流量灰度过程中。我们持续监测线上问题告警群。

    以上,就是出价域测试团队,针对服务拆分的测试流程。同时遵循可回滚的发布标准,拆分接口做了非常完善的灰度功能。下一段落进行介绍。

六、各流量类型灰度切量方案

出价流程切新应用灰度控制从几个维度控制:总开关,出价类型范围,channel范围,source范围,bidSource范围,uid白名单&uid百分比(0-10000):

  • 灰度策略
  • 支持 接口维度 ,按照百分比进行灰度切流;

  • 支持一键回切;

Dubbo接口、HTTP接口、TOC任务迁移、DMQ消息迁移分别配有不同的灰度策略。

七、结语

拆分的过程中,伴随着很多迭代需求的开发。为了提高迁移效率,我们会在需求排期后,并行处理迭代功能相关的接口,把服务拆分和迭代需求一起完成掉。

目前,我们的拆分已经进入尾声。迭代发布后,整体的技术项目就结束了。灰度节奏在按预期节奏进行~

值得一提的是,目前我们的流量迁移仍处于第一阶段,即拆分应用出价域内灰度迁移,上游不感知。目前所有的流量仍然通过bidding服务接口进行转发。后续第二阶段,灰度验证完成后,需要进行上游接口替换,流量直接请求拆分后的应用。

往期回顾

1.大模型网关:大模型时代的智能交通枢纽|得物技术

2.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

3.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

4.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

5.Galaxy比数平台功能介绍及实现原理|得物技术

文 /寇森

关注得物技术,每周一、三更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

❌
❌