普通视图

发现新文章,点击刷新页面。
昨天 — 2026年3月2日首页

OpenClaw 从能聊到能干差的是这 50 个 Skills 😍😍😍

作者 Moment
2026年3月2日 09:06

我正在开发 DocFlow,它是一个完整的 AI 全栈协同文档平台。该项目融合了多个技术栈,包括基于 Tiptap 的富文本编辑器、NestJs 后端服务、AI 集成功能和实时协作。在开发过程中,我积累了丰富的实战经验,涵盖了 Tiptap 的深度定制、性能优化和协作功能的实现等核心难点。

如果你对 AI 全栈开发、Tiptap 富文本编辑器定制或 DocFlow 项目的完整技术方案感兴趣,欢迎加我微信 yunmz777 进行私聊咨询,获取详细的技术分享和最佳实践。 很多人第一次打开 OpenClaw,会下意识把它当成"接在微信或 Slack 上的聊天机器人"。这种理解只对了一半。从架构上看,OpenClaw 更像一个网关:它站在你和一堆能力之间,负责路由、鉴权、记忆和工具调用。真正决定你能做多少事的,不是对话框有多好看,而是背后接了多少"身体"——也就是 Skills。

大语言模型很像一个装在罐子里的超级大脑,读得懂天下书,推得了复杂逻辑,但它没有手。它没法自己点按钮、没法直接读你电脑里的真实文件、也没法"学会"一套新工作流,除非你每次都把那套流程写成十几页的说明塞进提示词。Skills 就是给这个大脑配上的手脚和工具箱。你通过 OpenClaw 发一句指令,网关把请求转给模型,模型按当前加载的 Skill 决定调用哪类工具、走哪套流程,最后把结果经由同一网关回到你的聊天窗口。所以,与其说你在和 OpenClaw 聊天,不如说你在通过它调度一个"大脑 + 一堆可插拔能力"的联合体。

下面这张图概括了这条链路:从你的一句话,到网关、模型与 Skills 的协作,再到结果回到聊天窗口。

20260302010904

上图从左到右依次是你发指令、OpenClaw 网关做路由与鉴权、大模型负责推理、Skills 提供手脚与工具、执行后结果再经网关回到聊天窗口;整条链路就是"一句话调度大脑与能力"。

本文会先澄清"提示词膨胀"为什么会让很多人吃亏,再说明 Skill 是什么、为什么要用 SkillGuard 管好环境安全,最后按类别梳理一批值得优先安装的 Skills,并附上各类别的示意配图 prompt,方便你从"聊天思维"切换到"基础设施思维"。

提示词膨胀:为什么你还在吃亏

如果还没用上 Skill,大概率会落在两种处境之一。

第一种是提示词工程的无底洞。为了让人工智能好好干一次活,你得花十几二十分钟写"完美提示词",把背景、约束、输出格式、反例全塞进去。一次两次还行,任务一换又要重写,而且同一个任务下次还想复用时,又得翻聊天记录或文档把那一大段再贴一遍。成本高、难复用、还容易忘。

第二种是上下文超载。为了"让 AI 更懂我",很多人会在系统提示里塞进大量个性设定、操作指令和禁忌清单。结果模型还没开始干活,上下文窗口已经占掉一大截,轻则推理变慢、容易截断,重则直接爆掉,顺便把 API 额度烧光。更麻烦的是,这些设定和任务强绑定在一起,换一个任务又得改一版提示,维护成本越来越高。

Skill 要解决的就是这两类问题。它把"怎么用某类工具、走某套流程"打包成独立文件(或指令集),只在需要时加载。OpenClaw 和你用的 Agent 只加载当前任务需要的"战术手册",其余时间保持轻量。这样 Agent 能保持敏锐、成本可控、结果也更容易复现。你不是在跟一个越聊越臃肿的聊天窗口较劲,而是在维护一套可组合的能力层。

Skill 是什么,以及为什么要用 SkillGuard

从技术上说,一个 Skill 就是一个打包好的指令集,通常以文件形式存在,用来告诉 Agent 如何使用特定工具或执行某类复杂工作流。

区别在于指令的粒度。不写 Skill 时,你只能说"请帮我看看这段代码有没有错误",模型只能根据当次对话里的零散信息自由发挥。有了 Skill,你可以说"用 debug-pro 这个 Skill 对我的仓库做一次全面扫描,检查日志,然后在终端里按规范修复语法错误"。后者是"指名用哪套能力、按什么流程做",前者是"随便你怎么做"。前者依赖每次的提示词质量,后者依赖事先定义好的能力包,可复用、可审计、可迭代。

Skill 来源一多,风险就来了。ClawHub 等技能市场上有大量第三方 Skill,质量参差不齐,甚至混入恶意代码(窃取凭证、反向 shell、挖矿等),OpenClaw 的架构又给了 Agent 系统级权限和对外通信能力,一旦装上恶意 Skill,影响会很大。所以官方和社区都强调用 SkillGuard 一类机制做监控:在真正执行或下载 Skill 前,做来源校验、行为扫描或沙箱隔离,在"环境安全"和"持续扩展能力"之间取得平衡。如果你打算认真把 OpenClaw 当基础设施用,建议从第一天就打开 SkillGuard,养成"先验再装"的习惯。

从聊天思维到基础设施思维

大多数人会淹没在二十多万个可用 Skill 里,不知道从哪装起。真正能跑赢一票"只会聊天"用户的,往往是那些先把基础能力打牢的人。下面按类别整理一批值得优先考虑的 Skills,并说明它们各自解决什么问题、适合谁用;每一类下面都附了一张"这类在做什么"的示意配图 prompt,你可以丢进文生图模型生成插图,再替换文中的代码块。你可以按自己的角色(独立开发者、内容创作者、做增长的人等)勾选几条主线,再逐步扩展。

基础 Skills:给 AI 装上手脚

如果感觉 OpenClaw"也就那样",多半不是模型不够强,而是管道太薄。把这层当成"操作系统"来配,而不是聊天窗口,会顺很多。下面六个是很多人会优先装的基础能力。

  • find-skills:技能市场里 Skill 数量巨大,手动搜既慢又容易漏。这个 Skill 让 Agent 能按任务描述自动发现、筛选合适的 Skill,相当于导航员。
  • skill-creator:把你自己的工作流和"氛围编程"逻辑打包成可复用 Skill,相当于规模化复制你的习惯和判断,让 AI 反复按同一套标准执行。
  • mcp-builder:通过模型上下文协议(MCP)把 AI 接到私人数据和外部工具上,是 2026 年很多人会装的"桥梁"类 Skill,没有它,很多高级能力接不进来。
  • using-superpowers:强制 Agent 先搞清楚自己有哪些高级能力、在什么场景下用哪一项,而不是凭感觉瞎试,相当于优化器。
  • subagent-driven-development:把大任务拆成子任务,委派给子 Agent 执行并做结果审核,主 Agent 专注规划和验收,适合不想在一个提示词里搞定所有事的场景。
  • agent-tools:给 Agent 一整套常用小工具,处理那些模型默认不擅长、但又很常见的琐事,相当于数字瑞士军刀。

适合谁用:独立创始人、独立开发者,以及不想"手把手教 AI 走每一步"的人。

下面这张图代表"基础 Skills"在做什么:给 AI 装上导航、桥梁、优化器、委派与工具箱。

20260302010149

上图概括了基础类 Skills 的五个角色:找 Skill、接数据、优化能力认知、委派子任务、以及日常小工具,合起来就是让 AI 从"只会聊"变成"能动手"。

策略与创作 Skills:从打字员到战略伙伴

如果 Agent 的输出总像机器人在填废话,多半缺的是"先想清楚再写"的环节。下面这类 Skill 会引入反思、大纲和策略,把 OpenClaw 从快速打字员变成能提前验证逻辑的伙伴。

  • brainstorming:从一个关键词展开多种角度和"如果……会怎样"的场景,避免只停留在第一个听起来很"AI"的念头。
  • copywriting:不只生成文案,还管结构、节奏和语气,减少对老套模板的依赖。
  • systematic-debugging:一套可复用的排查框架,既能用来查代码,也能用来分析项目计划或异常情况。
  • writing-plans:动笔前先建大纲和内容策略,避免长文写飞。
  • content-strategy:选题和内容日历规划,让发布有节奏、有品牌感。
  • executing-plans:把高层计划拆成可执行的多步任务,由 Agent 按顺序完成。
  • marketing-ideas:给一个功能或时间节点,用成熟营销框架产出传播点和活动概念。
  • copy-editing:在纠错之外,顺带打磨语气和逻辑,尽量保留你个人的表达习惯。
  • social-content:把同一套核心信息改写成适合不同平台(如 X、TikTok、小红书)的版本,照顾各处的阅读习惯。
  • reflection:在流程里加入自检和纠错环节,让 Agent 能回顾自己的输出、总结经验、在过程中修正错误。

适合谁用:做内容、做个人品牌、或者想从"对着空白页发愁"变成"稳定产出但不丢灵魂"的人。

下面这张图代表"策略与创作 Skills"在做什么:从想法到大纲、文案、多平台分发与反思。

20260302010315

上图从左到右是创意展开、大纲与计划、文案打磨、多平台分发、按计划执行和反思自检,对应"先想清楚再写、写完后还能改"的一整条创作链。

工程 Skills:用氛围编程的速度,达到生产级质量

氛围编程已经是 2026 年的常态,但光靠对话很难保证生产级可维护性和测试覆盖。下面这些 Skill 把工程最佳实践编码进去,让 Agent 在写代码时更贴近真实团队的标准。

  • vercel-react-best-practicesvercel-composition-patterns:React 与 Vercel 生态下的前端规范和组件组合方式。
  • remotion-best-practices:用代码驱动视频与动画,适合程序化营销素材和产品演示。
  • agent-browserbrowser-use:让 Agent 能浏览网页、填表、抓取数据或做自动化测试,相当于给 AI 装上"眼睛"和"手"。
  • vercel-react-native-skills:移动端跨平台开发时的 React Native 最佳实践。
  • supabase-postgres-best-practices:数据库设计与 PostgreSQL 使用上的建议。
  • next-best-practices:Next.js 的架构与路由等最新模式。
  • webapp-testingtest-driven-development:测试驱动与自动化测试,减少回归和边缘问题。
  • requesting-code-review:在发布前让 Agent 自审代码,查找潜在漏洞和坏味道。

适合谁用:自己做产品、用氛围编程但希望交付物更稳、更可维护的创始人或开发者。

下面这张图代表"工程 Skills"在做什么:从代码规范、测试、到浏览器自动化与发布前审查。

20260302010430

上图概括了工程类 Skills 覆盖的几块:前端与全栈规范、数据库、测试与 TDD、浏览器自动化、视频与移动端,以及发布前的代码自审,对应"用对话的速度写出可维护的代码"。

设计 Skills:不碰 Figma 也能要专业审美

不需要自己会画图,也能让 Agent 产出符合设计原则的界面和素材。

  • web-design-guidelinesfrontend-design:按网格、色板等原则审视前端实现,避免"通用 AI 丑"。
  • ui-ux-pro-max:跨技术栈的设计与无障碍建议。
  • canvas-design:把想法转成 PNG、PDF 等视觉素材。
  • tailwind-design-system:用 Tailwind 令牌和无障碍模式搭可复用的 UI 库。
  • content-visualizerinfographic-pro:根据内容推荐版式、生成信息图或封面图。
  • ai-image-generation:统一调用多模型(如 DALL·E、Imagen、Gemini)做原型和创意图。

适合谁用:做落地页、做高保真内容、希望产品"长得像样"但暂时不打算雇设计团队的开发者或创始人。

下面这张图代表"设计 Skills"在做什么:从网格与色板、到信息图与多模型出图。

20260302010535

上图从左到右是设计原则与色板、前端与组件库、UX 与无障碍、插画与素材、信息图与封面推荐,以及多模型统一出图,对应"不自己画图也能要专业视觉"。

增长 Skills:把 OpenClaw 当增长引擎

产品做出来只是前半段,卖不出去等于没存在感。这类 Skill 覆盖从 SEO、转化到心理诱因的链条。

  • Larry:结合图像生成和病毒式钩子,自动化 TikTok 等平台的图文内容。
  • audit-websiteseo-audit:网站健康与 SEO 缺口扫描,减少盲目猜测。
  • marketing-psychology:把社会认同、稀缺性等原理用到产品钩子和文案里。
  • programmatic-seo:规模化生成大量 SEO 友好页面的工作流。
  • product-marketing-contextpricing-strategypage-cro:定位、定价和落地页转化优化。

适合谁用:独立黑客、独立创始人,以及在没有大预算的情况下也要拉新和变现的营销或产品人。

下面这张图代表"增长 Skills"在做什么:从 SEO、网站健康、到心理诱因与转化。

20260302010700

上图概括了增长类 Skills 的链条:网站与 SEO 诊断、病毒式内容、营销心理、规模化 SEO、产品定位与定价,以及落地页转化优化,对应"产品做出来之后怎么卖出去"。

文档 Skills:把 AI 当行政助理

合并拆分 PDF、做 PPT、出 Word/Excel、把网页转成 Markdown 或 HTML,这类"文书劳动"交给专门 Skill 处理,能省下大量时间。

  • pdf-propptxdocxxlsx:对应格式的生成与编辑能力。
  • url-to-markdownmarkdown-to-html:为知识库或发布流程做格式转换。
  • format-pro:统一文档版式和样式,保持品牌一致。

适合谁用:经常要出报告、提案、知识库或对外材料的创始人、学生或专业人士。

下面这张图代表"文档 Skills"在做什么:从 PDF、PPT、Word、Excel 到格式转换与统一版式。

20260302010751

上图从左到右是 PDF 处理、PPT 与 Word/Excel 生成、网页转 Markdown、Markdown 转 HTML 发布,以及版式与品牌统一,对应"文书杂活交给 AI 省时间"。

小结

OpenClaw 的价值不在于多一个聊天入口,而在于它把"模型 + 记忆 + 工具 + 技能市场"串成了一条可扩展的管道。你发出去的那句话,会经过网关被转成对某几个 Skill 的调用,再驱动真正的读写、执行和对外请求。想把它用成 2026 年能持续跑下去的基础设施,就需要从"聊天"思维切到"网关 + 能力层"思维:先装好基础 Skills、打开 SkillGuard,再按自己的角色补上策略、工程、设计、增长或文档中的几条线,让 Agent 既有手脚,又处在可控、可审计的环境里。

昨天以前首页

你学不会 CSS,不是笨,是方向错了

2026年3月1日 22:17

做前端这么多年,最让我心疼的,就是那些拼尽全力学CSS,却越学越懵的新手。

后台私信里,几乎每天都能看到类似的倾诉:“学CSS大半年了,上百个属性背得滚瓜烂熟,flex、grid的教程刷了一遍又一遍,可一上手做项目,瞬间破防——布局乱得不成样子,兼容问题百出,改一个按钮样式,整个页面都崩了,到最后真的忍不住怀疑,我是不是真的不适合做前端?”

我太懂这种无力感了——当年我刚学CSS的时候,也踩过一模一样的坑,甚至有过深夜对着乱掉的页面,差点砸键盘放弃的瞬间。但今天,我一定要郑重地告诉你:你学不会CSS,真的不是笨,更不是不努力,而是从一开始,你就走错了学习的方向。

很多人学CSS,都陷入了一个致命的误区,也是最容易被忽略的陷阱——把CSS当成了“背属性、拼效果”的工具。今天刷到一个居中技巧,赶紧记在备忘录里;明天看到别人写的炫酷动画,复制粘贴过来凑数;现在更省事,直接丢给AI写,看似省了时间,实则学了个寂寞。

你以为自己学了很多东西,可那些碎片化的知识点,就像一堆散落的砖头,没有框架,没有逻辑,哪怕堆得再高,一阵风就能吹倒。CSS从来不是“堆砌属性”,就像盖房子,你光有砖头水泥不够,得先搭框架、打地基,才能盖出牢固的房子;学CSS也一样,你记再多属性,不懂底层逻辑、没有布局思维,写出来的代码永远是“散的”——出了bug找不到根源,改需求要全盘返工,越写越崩溃,越学越迷茫。

说句掏心窝子的话,我当年也傻过,天天死记硬背属性值,别人写的炫酷效果,我也跟着抄得不亦乐乎,可一到自己独立做项目,还是手忙脚乱,写出来的页面惨不忍睹。直到后来跟着公司的资深前端前辈学习,才突然开窍:CSS的核心,从来不是“记住多少属性”,而是“建立正确的思维”——布局思维、渲染思维、工程化思维。

举个最真实的例子,同样是写一个简单的商品卡片布局,新手和懂思维的开发者,差距真的天差地别:

新手(包括很多依赖AI的人),会直接把图片、标题、价格、按钮,一股脑堆在一个div里,用margin硬调间距,写一堆冗余又杂乱的代码,看似实现了效果,可一旦换个屏幕尺寸,图片和文字直接重叠,按钮跑到页面外面去,改都改不过来;而懂思维的开发者,会先静下心来拆结构、定布局,用最简洁的代码搭建骨架,后期不管是改间距、加功能,还是适配不同屏幕,只需要微调,根本不用全盘返工。

这就是方向的差距:你在死记硬背“怎么写”,纠结于一个属性的用法,而高手在思考“为什么这么写”“怎么写更稳妥”“怎么写能避免后期踩坑”——这也是为什么,同样是学CSS,有人越学越轻松,有人却越学越痛苦。

很多人都说CSS是“玄学”,其实根本不是!它有自己清晰的底层逻辑——层叠、优先级、BFC、渲染机制,只要你吃透这些,你会发现,所有的CSS问题都有章可循,根本不用死记硬背,也不用靠AI抄作业。

说到AI,我必须多提醒一句:AI可以当工具,但绝对不能当老师。它能给你现成的代码,却给不了你“避坑思维”,给不了你“可维护的逻辑”,你抄来的代码,看似省了一时的功夫,后期只会让你踩更多坑、加更多班,到最后,不仅没学会CSS,反而养成了依赖的习惯,越用越废。

如果你现在也正处于“学CSS越学越懵”的状态,如果你也在靠死记硬背、靠AI应付项目,如果你也因为写不好CSS而怀疑自己,不妨停下来,换个方向——先搞懂CSS的底层原理,再建立属于自己的布局思维,最后结合实战案例,把那些碎片化的知识点,串联成一套完整的体系。不用贪多求快,每天吃透一个核心逻辑,练一个实战案例,慢慢你就会发现,原来写CSS,真的可以很轻松,再也不用为了布局错乱、兼容问题而熬夜加班。

我把自己多年实战总结的CSS体系思维、高频避坑技巧、真实项目案例,全都整理成了掘金小册,没有花里胡哨的废话,全是能直接套进项目里的干货,从基础原理到工程化实战,一步步带你找对学习方向,摆脱死记硬背和AI依赖,真正学会写CSS。

除了掘金小册,我今年开始在专耕《CSS 工作坊》专栏,与大家一起探讨 CSS 方面的特性与实战!

不用怕自己基础差,不用怕学不会,只要找对方向,你也能轻松搞定CSS,告别改样式加班的痛苦,摆脱自我怀疑,真正感受到写CSS的乐趣。

最后,想问问正在学CSS的你:你有没有踩过“死记属性”“依赖AI”的坑?有没有因为写不好CSS而崩溃过?评论区聊聊,我帮你避坑,陪你一起把CSS学扎实~

觉得有用的话,点个赞+收藏,跟着我,少走半年弯路,彻底搞定CSS,不再被样式折磨!

别再谈提效了:AI 时代的开发范式本质变了

2026年2月28日 17:58

回应标题,为什么说别再谈提效。只要说的是提效,那思维主体就依旧是人为主导,AI 是辅助。但新时代的开发范式主导权已经发生了迁移:从“人主导、AI 辅助”,变成“AI 主导、人辅助”。

真正该讨论的是——当 AI 成为主要生产者,人该负责什么?

我认为真正的转折点是 Opus 4.6、GPT 5.2/5.3 模型的发布,模型能力大幅增强,使 AI 编码领域上了一个大的阶梯,比如 Remotion。

传统开发范式

不是把原流程每一步都加个 AI 就是新的开发方式,而是需要真正思考AI时代下的新范式,把传统流程里“靠人搬运信息、补齐细节、重复劳动、低效回路”的部分,改成更短的闭环 + 更强的自动验证。

AI 时代,代码是副产品,推理意图才是真资产

在先理解的开发范式之前,先达成一个共识:在 AI 时代下,代码不重要,而为什么这么写代码才是最重要的,推理过程才是真的资产。

代码越来越像编译产物,而推理过程(意图、边界、风险判断)才是可复用的工程资产。

在旧范式里,人写代码 -> 人 review 代码 -> Git 保存 Diff。Git 只记住改了什么,但是没有记住为什么改。

这在 AI 时代下已经不可用,因为 Agent 生成的是海量代码,Agent 代码生成量已经超过人能 review 的上限,如果还靠人去 review,那你的上限就是人,但如果 review 的是意图,那你的上限就约等于 AI 的上限,这两者可能是数十倍的差距。

那开发者不看代码了看什么?看意图。

前一段时间我还认为现在的工程师是 code review 工程师,但是现在我认为可以将前面的 code 去掉了。

新范式里更接近两种形态:

  • Agent 写代码,人 review 意图:人不再逐行读实现,而是确定风险是否可控,验证是否正确
  • 人给意图和标准,Agent 生成代码并自证:人提供目标、边界情况、验收标准,Agent 负责实现、写测试用例、跑验证、给出变更理由和影响范围

我个人目前更倾向于第二种,因为它把人的注意力从实现细节转移到了验收上。

新范式里需要保存推理链路,如果推理过程只存在于上下文窗口中,一旦 Session 结束,上下文就丢失了,这最终只会导致:代码仓库越来越大,项目越来越不可控,因为关键的为什么没有被持久化。

所以,仅仅把代码存进 Git 里已经不够了,还需要一个意图资产库,把推理过程沉淀为可复用、可审计、可持续迭代更新的持久化资产。

理解代码为什么被写出来,比看到代码本身更重要。

面向人工智能编码助手的规范驱动开发 SDD:github.com/Fission-AI/…

开发新范式

各步骤流程具体解释

  • 产品提出需求:需求文档需要尽可能详细,包括背景、用户场景、功能描述、可衡量的目标

  • 可执行需求标准:将大需求变为让 AI 具体可执行、可最终验收的标准

  • AI 制定技术方案、协议约定:让 AI 做技术方案选型,选出最不可能错的那一个

  • AI 制定测试用例:其实也就是先定标准再干活,而不是先干活再去定标准

  • AI 编码、互相 Code Review,然后更新补充、执行测试用例,但是这还不够,要不然迟早会被海量的测试用例拖垮。这里最好能基于 Diff 给出影响范围,智能选择回归,同时给出覆盖率目标,新范式需要像人一样更聪明的跑测试用例

  • 测试验收,将 Bug 提炼给 AI,AI 修复:应当根据需求沉淀用例资产库、缺陷资产库,描述完整的上下文信息,让 AI 能够持续进化

  • 线上观测:这里核心数据应该要脱敏、聚合、建立反馈给 AI 的机制,AI 进行修复后进行归因分析,反哺用例库、知识库,形成数字化资产

这些事情都让 AI 做了,那人做什么?

人负责理解需求 => 拆分需求 => 拆分可独立闭环的目标 => 定执行标准 => 确定技术方案 => 确定用例范围 => 验收,所有做的事情都是为 AI 服务

核心是人设置标准和约束,AI 做事情。

具体执行与挑战

似乎上面每一个步骤想要做好都是挑战?

  • 第一个挑战:如何将产品给出的需求提炼为AI可执行的需求标准? 这需要开发者有较好的需求理解能力、边界设计能力、叠加一定的经验以及一定的产品思维。这一步是我个人认为最难也是最有价值的一步,如果这一环节无法做好,后续的AI开发流程就会受到严重影响。这一步需要产品经理和开发者共同完成,或者开发者独自完成。
  • 第二个挑战:审美。 AI 制定技术方案、编写测试用例作为编码的前置条件,他们确定之后,编码的方向就确定了。而这与个人的技术品味强相关,技术审美比以往更加重要,而拥有较好的技术品味同样是一件不简单的事,这是长期积累的判断力。
  • 第三个挑战:测试自动化。 随着开发规模扩大,尤其是在 AI 生成大量代码的情况下,将面临恐怖的测试规模,需要有一套基于 Git diff 给出影响范围,智能进行回归的方案。
  • 第四个挑战:全自动化运维。 在 AI 时代不仅仅是部署与监控,而是能够出现异常时自动响应并进行调整,从而形成自我进化机制。当然这里面会有一些安全问题,比如核心数据不应该暴露等,过去的CICD流程都需要被重构。
  • 第五个挑战:警惕某些开发者的 Vibe Coding。 你以为是在提速,实际上是在给未来挖坑,不能仅仅为了速度而牺牲技术的长远可维护性。在 AI 时代下,开发者必须时刻保持对代码质量、架构合理性的敏感,这对开发者的要求同样不低。

未来工程师的核心竞争力

过去很多年,会写代码几乎等于有竞争力。谁能写出更复杂、更优雅、更稳定的代码,谁就越值钱。 但 AI 把这条曲线快速拉平了,当实现成本趋近于零,原来旧的招人标准在我现在看来毫无价值。

当写代码不再稀缺,真正稀缺的是什么?我认为接下来最值钱的能力是以下三点:

  • 问题拆解能力:把模糊目标变成 AI 可执行、可细分闭环的任务链
  • 判断能力:决定要做什么、不做什么,确定好每个模块明确的边界
  • 审美能力:知道什么叫能用,什么叫好用,知道什么叫不行。让代码达到可运行的标准,评估推理质量,为生成的代码负责

这些能力并非全新,优秀的工程师和产品经理一直具备这些能力,只是过去被大量实现层的工作掩盖。

以后开发者的角色定义我认为会集中在两个方向:产品工程师Agent 工程师

(1)产品工程师: 也就是接下来的开发者,不仅仅是既懂产品又会写代码,而是能用工程能力把产品落地成可交付的人,他们的核心是利用 AI 写出稳定运行、正确的代码,他们的目标不再是生成代码,而是如何证明代码是对的

(2)Agent 工程师: 他们是新时代的架构师,他们不主要负责写代码,而是负责为 Agent 搭建工作环境与生产流水线——产品工程师执行具体任务,Agent 工程师对整个链路的质量、效率与稳定性负责。有时候 AI 写不出好代码,不是它笨,而是工程结构没搭好,上下文不足。Agent 工程师的核心职责是设计好整个 AI 链路以及 AI 能运行的环境,以及构建最重要的反馈闭环,例如:互相审查 => 测试验证 => 自我修复更新。

思考:如果模型也拥有了这些能力,那工程师还剩下什么?

结语

如果你还在纠结Cursor怎么配置,哪个代码编辑器好用,我直接给你一个判断,你还在考虑怎么用AI写代码,就说明你还没搞懂这场变革,真正的转变是你根本就不需要写代码了。

如果你也看好 AI 行业的长期趋势,愿意把技术落到真实业务、真实用户与真实增长中,做浪潮里的建设者而不是旁观者——我们正在招募同路人。

欢迎联系我:buyaotutoua@163.com(邮件标题建议:AI + 姓名 + 方向)

不越狱能抓到 HTTPS 吗?在未越狱 iPhone 上抓取 HTTPS

2026年2月27日 15:49

这个问题在 iOS 调试中反复出现。

很多人听到“HTTPS”“证书校验”“SSL Pinning”,第一反应就是,是不是必须越狱?

这篇文章在不越狱设备上分别测试三种情况:

  • 普通 HTTPS
  • 启用证书校验的 App
  • 启用双向认证的 App

环境:

  • iPhone(未越狱)
  • 一台 Windows + 一台 Mac
  • 代理工具(Charles / Proxyman)
  • 设备本机抓包工具 SniffMaster

一、代理抓包:不越狱的第一条路径

先测试最基础的方式:代理抓包。

操作步骤

  1. 启动 Charles(或 Proxyman)
  2. 确认代理端口正在监听
  3. iPhone 与电脑连接同一 Wi-Fi
  4. 在 iPhone 的 Wi-Fi 设置中填写代理地址与端口
  5. 在手机上安装并信任证书
  6. 用 Safari 打开一个 HTTPS 网站

如果 Safari 能完整显示请求和响应,说明:

  • 代理路径没问题
  • HTTPS 解密生效
  • 不需要越狱

二、普通 App 的 HTTPS 测试

在同样的代理环境下,打开一个普通测试 App。

结果:

  • 请求可以出现在 Charles 中
  • HTTPS 内容可正常解密
  • 请求体与响应体完整

这一步可以确认在未启用额外安全校验的情况下,不越狱完全可以抓到 HTTPS。


三、遇到证书校验(SSL Pinning)

接下来测试一个启用了证书校验的 App。

操作保持不变,只替换测试 App。

现象:

  • App 提示网络错误
  • Charles 中只出现握手失败或无请求记录

代理路径仍然有效,Safari 仍然可以抓到数据。

说明:

  • 阻断发生在 App 内部
  • 系统信任代理证书不代表 App 会信任

在这里继续重复安装证书不会改变结果。


四、是否必须越狱才能继续?

不越狱依然有两种路径可以尝试。

路径一:分析握手层

可以通过底层抓包确认:

  • 是否存在 TLS ClientHello
  • 是否建立 TCP 连接

如果 TLS 握手存在,说明流量确实发出,只是代理无法接管。


路径二:设备本机抓包

这里切换抓包方式。

使用 SniffMaster 进行设备本机 HTTPS 抓包

SniffMaster 支持通过 USB 在电脑上直接抓取 iOS 设备流量。

操作步骤

  1. 用 USB 将 iPhone 连接电脑
  2. 保持设备解锁并点击“信任此电脑”
  3. 启动 SniffMaster
  4. 在设备列表中选择对应 iPhone
  5. 按提示安装驱动与描述文件
  6. 进入 HTTPS 暴力抓包模式
  7. 点击开始
  8. 触发 App 请求

没有配置 Wi-Fi 代理,也没有安装代理证书。 暴力抓包


五、证书校验 App 的抓包结果

在设备抓包模式下测试同一个启用证书校验的 App。

结果:

  • 请求可以看到
  • HTTPS 内容显示正常
  • 未出现握手失败

区别来自抓包场景。

代理模式依赖替换证书,设备直接抓包不依赖中间人证书。


六、当请求体为空时的判断

如果抓到的 HTTPS 中:

  • URL 可见
  • Header 可见
  • Body 为空

这与越狱无关,而与签名有关。

若测试的是 App Store 下载的应用,需要:

  1. 获取 IPA
  2. 使用 iOS 开发证书重签
  3. 重新安装
  4. 再次抓包

完成后,请求体与响应体可完整显示。


七、双向认证(mTLS)的测试

在双向认证场景中:

  • 代理抓包会在握手阶段失败
  • 设备级抓包仍可观察到 TLS 会话

关键点是抓包工具是否依赖代理替换证书

参考链接:www.sniffmaster.net/tutorial/zh…

一周重写 Next.js?Cloudflare 和 AI 做到了😍😍😍

作者 Moment
2026年2月27日 09:04

我正在开发 DocFlow,它是一个完整的 AI 全栈协同文档平台。该项目融合了多个技术栈,包括基于 Tiptap 的富文本编辑器、NestJs 后端服务、AI 集成功能和实时协作。在开发过程中,我积累了丰富的实战经验,涵盖了 Tiptap 的深度定制、性能优化和协作功能的实现等核心难点。

如果你对 AI 全栈开发、Tiptap 富文本编辑器定制或 DocFlow 项目的完整技术方案感兴趣,欢迎加我微信 yunmz777 进行私聊咨询,获取详细的技术分享和最佳实践。

上周,一名工程师和一套 AI 模型从零重写了最流行的前端框架。产物叫 vinext(读作 "vee-next"),是基于 Vite 的 Next.js 替代实现,一条命令就能部署到 Cloudflare Workers。早期基准测试里,生产构建快至多约 4 倍,客户端包体积最多小约 57%。已有客户在生产环境跑它。整件事大约花了一千一百美元左右的 token 成本。

Next.js 的部署难题

Next.js 是最流行的 React 框架,数百万开发者在用,也支撑着大量线上站点,原因很简单:开发体验很好。

但在更广的 serverless 生态里,Next.js 的部署是个问题。工具链完全是自成一派的:Next.js 在 Turbopack 上投入很大,可如果你要部署到 Cloudflare、Netlify 或 AWS Lambda,就得把构建产物再"捏"成目标平台能跑的样子。

你可能会想:"OpenNext 不就是干这个的吗?"没错。OpenNext 就是为解决这个问题而生的,包括 Cloudflare 在内的多家厂商都往里投了不少工程资源。它能用,但很快就会撞到各种限制,变成打地鼠游戏。

在 Next.js 的构建产物之上再建一层,被证明既难又脆。OpenNext 需要反推 Next.js 的构建输出,结果就是版本之间难以预测的变动,修起来很费劲。

Next.js 一直在做一等公民的 adapters API,Cloudflare 也在和他们协作。但这仍是早期工作,而且即便有了 adapters,你依然建立在 Turbopack 这套专属工具链上。Adapters 只覆盖构建和部署;开发阶段里,next dev 只在 Node.js 里跑,没法插别的运行时。如果你的应用用了 Durable Objects、KV、AI bindings 这类平台能力,在开发环境里想测这些代码,就得搞一堆变通。

vinext 是什么

换个思路:与其去适配 Next.js 的产出,不如在 Vite 上直接重写一套 Next.js 的 API。Vite 是 Next.js 之外大多数前端生态用的构建工具,Astro、SvelteKit、Nuxt、Remix 等都基于它。要的是干净的重实现,而不是包一层或写个 adapter。他们一开始也没把握能成,但现在是 2026 年,造软件的成本已经彻底变了。

结果比预期走得远得多。

把脚本里的 next 换成 vinext,其余基本不用动。现有的 app/pages/next.config.js 都能直接用。安装方式如下:

npm install vinext

常用命令和 Next 类似,只是把 next 换成 vinext

vinext dev    # 开发服务器,带 HMR
vinext build  # 生产构建
vinext deploy # 构建并部署到 Cloudflare Workers

这不是包在 Next.js 和 Turbopack 输出外面的一层皮,而是对同一套 API 的另一种实现:路由、服务端渲染、React Server Components、server actions、缓存、中间件,全部作为 Vite 插件搭在 Vite 之上。更重要的是,借助 Vite Environment API,Vite 的产出可以在任意平台上跑。

数据表现

早期基准测试看起来不错。他们用一套共 33 个路由的 App Router 应用,对比了 vinext 和 Next.js 16。两边做的是同一类事:编译、打包、准备服务端渲染路由。在 Next.js 的构建里关掉了 TypeScript 类型检查和 ESLint(Vite 构建阶段本来也不跑这些),并对 Next.js 使用了 force-dynamic,避免多花时间预渲染静态路由,否则会不公平地拉低 Next 的数字。目标只衡量打包和编译速度。

生产构建时间大致如下(原文表格,此处保留结构):

框架 平均耗时 相对 Next.js
Next.js 16.1.6 (Turbopack) 7.38s 基线
vinext (Vite 7 / Rollup) 4.64s 约 1.6 倍快
vinext (Vite 8 / Rolldown) 1.67s 约 4.4 倍快

客户端包体积(gzip 后):

框架 Gzip 后 相对 Next.js
Next.js 16.1.6 168.9 KB 基线
vinext (Rollup) 74.0 KB 约小 56%
vinext (Rolldown) 72.9 KB 约小 57%

这些数字测的是编译和打包速度,不是线上服务性能;测试用例是单个 33 路由应用,不能代表所有生产场景。他们预期三个项目继续演进后数字会变。完整方法论和历史结果 是公开的,可以当作方向性参考,而非定论。

方向是令人鼓舞的。Vite 的架构,尤其是 Rolldown(Vite 8 里即将到来的 Rust 打包器),在构建性能上有结构性优势,在这里已经能看出来。

部署到 Cloudflare Workers

vinext 把 Cloudflare Workers 当作首选部署目标。一条命令从源码到线上 Worker:

在项目里执行即可完成构建、自动生成 Worker 配置并完成部署:

vinext deploy

App Router 和 Pages Router 都能在 Workers 上跑,包括完整的客户端注水、交互组件、客户端导航和 React 状态。

生产缓存方面,vinext 自带 Cloudflare KV 的缓存处理器,开箱即用 ISR(增量静态再生成)。在代码里设置一次即可:

import { KVCacheHandler } from "vinext/cloudflare";
import { setCacheHandler } from "next/cache";
setCacheHandler(new KVCacheHandler(env.MY_KV_NAMESPACE));

对多数应用来说 KV 就够用了,但缓存层设计成可插拔的。setCacheHandler 意味着你可以换成任何合适的后端,例如大缓存体或不同访问模式更适合用 R2。他们也在改进 Cache API,目标是少配置也能有强缓存能力。总之是尽量灵活,按应用选策略。

当前已有线上示例:App Router Playground、Hacker News 克隆、App Router 与 Pages Router 最小示例等,见 vinext 文档与仓库。还有一例 Cloudflare Agents 在 Next 风格应用里跑,不再需要 getPlatformProxy 之类的变通,因为整个应用在开发与部署阶段都跑在 workerd 里,Durable Objects、AI bindings 等 Cloudflare 能力都可以直接使用,示例见 vinext-agents-example

框架是团队协作的事

当前部署目标是 Cloudflare Workers,但这只占一小部分。vinext 里大约 95% 是纯 Vite:路由、模块 shim、SSR 管线、RSC 集成,没有 Cloudflare 专属逻辑。

Cloudflare 希望和其他托管方一起,让这套工具链也能服务他们的用户(迁移成本很低,他们在 Vercel 上不到 30 分钟就跑通了一个 PoC)。这是开源项目,长期看需要和生态里的伙伴一起投入。欢迎其他平台的 PR;若要加部署目标,可以 提 issue 或直接联系。

状态:实验性

vinext 目前是实验性的。诞生不到一周,还没有经过有规模的流量验证。若你要在生产应用里评估,请保持适当谨慎。

另一方面,测试覆盖面不小:超过 1,700 个 Vitest 用例和 380 个 Playwright E2E,包括从 Next.js 和 OpenNext 的 Cloudflare 一致性套件移植的测试。他们对照 Next.js App Router Playground 做过验证,对 Next.js 16 API 的覆盖约 94%。已有真实客户在试,反馈不错;例如 National Design Studio 在 beta 站点 CIO.gov 上已经用 vinext 跑生产,构建时间和包体积都有明显改善。

README 里老实写了 不打算支持以及不会支持的内容已知限制,尽量坦诚、少过度承诺。

预渲染呢?

vinext 已经支持增量静态再生成(ISR),首访某页后会被缓存并在后台再验证,和 Next.js 行为一致。这部分已经可用。

vinext 目前还不支持构建时静态预渲染。Next.js 里没有动态数据的页面会在 next build 时渲染成静态 HTML;有动态路由时用 generateStaticParams() 枚举要提前构建的页面。vinext 暂时不做这件事。

这是发布时的刻意取舍,路线图上有计划。若你的站是 100% 预生成静态 HTML,眼下从 vinext 获益可能有限。反过来说,若一名工程师花一千多美元 token 就能重写一版 Next.js,你大概也能花很少成本迁到 Astro 这类为静态内容设计的 Vite 系框架(Astro 也能部署到 Cloudflare Workers)。

对非纯静态站点,他们想做得比"构建时全量预渲染"更好一点。

流量感知预渲染(TPR)

Next.js 会在构建时把 generateStaticParams() 列出的页面都预渲染一遍。一万个商品页就意味着构建时渲染一万次,哪怕其中 99% 可能永远不会被请求。构建时间随页面数近似线性增长,这也是为什么大型 Next.js 站点的构建会拖到三十分钟级别。

于是他们做了"流量感知预渲染"(Traffic-aware Pre-Rendering,TPR)。目前是实验性的,计划在更多真实场景验证后作为默认选项。

思路很简单。Cloudflare 已经是站点的反向代理,拥有流量数据,知道哪些页面真的被访问。所以既不必全预渲染,也不必完全不预渲染:在部署时查 Cloudflare 的 zone 分析,只预渲染真正重要的页面。

使用方式是在部署时打开实验开关:

vinext deploy --experimental-tpr

输出会包含类似:分析最近 24 小时流量、统计独立路径数、按流量覆盖(例如 90%)选出要预渲染的页面数量、预渲染耗时并写入 KV 缓存等。

对于十万级商品页的站点,幂律分布下往往 50~200 个页面就覆盖了 90% 的流量。这些页面几秒内预渲染完,其余走按需 SSR,首访后再通过 ISR 缓存。每次部署都会根据当前流量重新算一遍集合,突然爆红的页面会被自动纳入。全程不需要 generateStaticParams(),也不用把构建和线上数据库绑死。

用 AI 再挑战一次 Next.js

这类项目通常要一个团队做几个月甚至几年。多家公司都试过,范围实在太大。Cloudflare 自己也试过一次。两套路由、三十多个模块 shim、服务端渲染管线、RSC 流式、文件系统路由、中间件、缓存、静态导出……没人做成是有原因的。

这次他们在一周内做到了。一名工程师(头衔是工程经理)带着 AI 一起干。

首笔提交在 2 月 13 日。当晚 Pages Router 和 App Router 都有了基础 SSR,中间件、server actions 和流式也跑通了。第二天下午,App Router Playground 已经能渲染 11 个路由里的 10 个。第三天,vinext deploy 能把应用完整部署到 Cloudflare Workers,包括客户端注水。后面几天主要是收口:修边界情况、扩测试、把 API 覆盖拉到约 94%。

和以前几次尝试相比,变的是 AI 强了很多。

为什么这个问题适合交给 AI

不是所有项目都适合这么搞。这次能成,是因为几件事同时满足。

Next.js 有清晰、成文的规范:文档多、用户多、Stack Overflow 和教程里到处都是,API 表面在训练数据里很常见。让 Claude 实现 getServerSideProps 或解释 useRouter 怎么用,它不会乱编,因为它"见过" Next 是怎么工作的。

Next.js 有庞大的测试套件。Next.js 仓库 里有大量 E2E,覆盖各种功能和边界。他们直接移植了其中的测试(代码里有注明来源),等于拿到一份可以机械验证的规格。

Vite 是很好的底座。Vite 解决了前端工具里最难的那块:快 HMR、原生 ESM、清晰的插件 API、生产打包。不需要从零做打包器,只要教它"说" Next.js。@vitejs/plugin-rsc 还在早期,但已经能提供 React Server Components 支持,不必自己实现一整套 RSC。

模型能力跟上了。他们认为哪怕早几个月都很难做成。以前的模型在这么大代码库上很难保持连贯;新模型能把整体架构放在上下文里,推理模块间关系,并经常写出正确代码,让迭代能持续下去。有时会看到它钻进 Next、Vite、React 内部去查 bug。当前最好的模型已经足够好用,而且还在变好。

这几条必须同时成立:目标 API 文档好、测试全、底层构建工具靠谱、模型真的能驾驭这种复杂度。少一条,效果都会打折扣。

实际是怎么做的

vinext 里几乎每一行都是 AI 写的。但更关键的是,每一行都过同样的质量关:人类写的代码也会走的那些门。项目里有 1,700+ Vitest、380 Playwright E2E、通过 tsgo 的完整 TypeScript 检查、通过 oxlint 的 lint,CI 在每个 PR 上全跑一遍。定好这些护栏,是让 AI 在代码库里高效的前提。

流程从规划开始。作者在 OpenCode 里和 Claude 花了几小时来回推敲架构:建什么、什么顺序、用什么抽象。那份计划成了北极星。之后就是固定循环:

  1. 定义一个任务(例如"实现 next/navigation 的 shim,包含 usePathnameuseSearchParamsuseRouter")。
  2. 让 AI 写实现和测试。
  3. 跑测试。
  4. 过了就合并,不过就把错误输出给 AI 继续改。
  5. 重复。

他们还接了 AI 做 Code Review:PR 打开后有 agent 审,审完的评论由另一个 agent 改。反馈环大部分是自动的。

并不是每次都对。有些 PR 就是错的,AI 会很有把握地实现一个"看起来对"但和 Next.js 实际行为不一致的东西。作者经常要纠偏。架构决策、优先级、判断什么时候 AI 在走死胡同,都是人在做。给 AI 好的方向、上下文和护栏,它可以很出活,但掌舵的还得是人。

浏览器级测试用了 agent-browser,用来验证真实渲染结果、客户端导航和注水行为。单测会漏掉很多浏览器侧的细节,这样能补上。

整个项目在 OpenCode 里跑了超过 800 次会话,总成本大约一千一百美元(Claude API token)。

对软件意味着什么

我们为什么有这么多层?这个项目逼着作者认真想这个问题,以及 AI 会怎么改变答案。

软件里大多数抽象的存在,是因为人需要帮忙。我们没法把整个系统装进脑子,于是用一层层东西来管理复杂度,每一层让下一个人的工作轻松一点。框架叠框架、包装库、成千上万行胶水代码,就是这么来的。

AI 没有同样的限制。它可以把整个系统放在上下文里,直接写代码,不需要中间框架来"帮人类理清思路",只需要规格和一块可建的底座。

哪些抽象是真正的基础设施,哪些只是人类认知的拐杖,现在还不清楚。这条线未来几年会大幅移动。但 vinext 是一个数据点:他们拿了一份 API 契约、一个构建工具和一个 AI 模型,中间全是 AI 写的,没有额外的中间框架。他们认为这种模式会在很多软件上重演,我们多年来叠上去的层,不会全部留下。

致谢

感谢 Vite 团队。Vite 是整个项目的基础。@vitejs/plugin-rsc 虽还在早期,但提供了 RSC 支持,否则要从零实现 RSC 会直接卡死。作者把插件推到以前没人测过的场景时,Vite 维护者响应很快、帮了很多忙。

也感谢 Next.js 团队。他们用多年把 React 开发的标杆拉高,API 文档和测试套件如此完善,是 vinext 能做成的重要前提。没有他们立下的标准,就没有 vinext。

试试看

vinext 提供 Agent Skill,可以帮你做迁移,支持 Claude Code、OpenCode、Cursor、Codex 等。安装后打开 Next.js 项目,让 AI 执行迁移即可。

安装 vinext 的 Agent Skill(在支持的工具里执行):

npx skills add cloudflare/vinext

然后在任意支持的工具里打开 Next.js 项目,对 AI 说:

"把这个项目迁移到 vinext"

Skill 会做兼容检查、依赖安装、配置生成和开发服务器启动,并标出需要人工处理的部分。

若想手动迁移,可以用:

npx vinext init   # 从现有 Next.js 项目迁移
npx vinext dev    # 启动开发服务器
npx vinext deploy # 部署到 Cloudflare Workers

源码在 github.com/cloudflare/…,欢迎提 issue、PR 和反馈。

「寻找年味」 沸点活动|获奖名单公示🎊

作者 掘金酱
2026年2月26日 18:15

image.png

🎉2026年 「寻找年味」 沸点活动正式落幕啦!

这个春节,我们在沸点里看见了无数动人瞬间:有故乡的烟火,有团圆的温柔,也有坚守岗位的专注与光芒。

每一条沸点,都是最真实、最可爱、最有温度的新年风景。

感谢每一位技术er,用文字、照片与 AI 创作,记录独属于程序员的春节时光,让技术与年味温暖相融。

由于获奖用户较多,详细的名单公示如下:[2026年 寻找年味沸点活动_获奖名单]

如何快速找到自己:进入飞书表格后,使用 Ctr+F 搜索自己的用户名或 用户id,选择“所有工作表”( 用户 id 即掘金主页 juejin.cn/XXXXXX 最后的一串数字)。之前打开过本表同学请刷新表单,奖项或有增补,以最新的表格为准。

image.png

活动奖品

沸点将根据评审团综合打分互动量得分加权计算。

✅ 常规内容赛道:TOP10 赢 SZCOOL 无线蓝牙音箱

✅ AI 内容赛道: TOP10 拿下「窝在一起」瓦楞猫窝

✅阳光普照奖:凡是符合#寻找年味主题的且有互动(排除作者自己点赞和评论)的沸点内容皆可获得10w 矿石

领奖方式

  • 获得上述奖项的掘友近期请注意 [系统消息] (预计2026年2月26日23:00前下发),请于 2026 年 3 月 4 日 23 点 之前在相关问卷中填写信息,过期将视为放弃奖品。
  • 常规内容赛道/AI 内容赛道 奖品将于问卷截止日期后的 30 个工作日内完成发放。
  • 阳光普照奖矿石将于2026年3月5日(申述时间截止后)发放。

若对获奖名单有疑问,请先自查沸点,确认无以下原因后,可点击联系 爱专研的安东尼 进行申诉。 申诉处理时间2026年2月26日-2026年3月4日,过时维持原结果。

  • 参赛沸点互动量按「点赞数 + 评论数」总和核算;

  • 同一用户发布多条内容,仅取数据最优一条参与排名获奖;

  • 无意义水评论、刷赞等违规行为,剔除获奖资格;

  • 数据统计截止至 2026 年 2 月 23 日 23:59,逾期新增互动不计入;

  • 禁止引战/制造冲突/谩骂内容,或者发现引战内容剔除获奖资格;

  • 评审团会依据沸点内容质量进行打分排序;

  • 非AI相关内容都归类为常规内容赛道,两个赛道不重复获奖,阳光普照奖可叠加赛道获奖;

再次感谢所有掘友的热情参与,愿大家新的一年代码无 Bug、技术节节高、万事皆顺遂!

2026 春晚魔术大揭秘:作为程序员,分分钟复刻一个 | 掘金一周 2.26

作者 掘金一周
2026年2月26日 17:07

本文字数1300+ ,阅读时间大约需要 4分钟。

【掘金一周】本期亮点:

「上榜规则」:文章发布时间在本期「掘金一周」发布时间的前一周内;且符合各个栏目的内容定位和要求。 如发现文章有抄袭、洗稿等违反社区规则的行为,将取消当期及后续上榜资格。

一周“金”选

掘金一周 文章头图 1303x734.jpg

内容评审们会在过去的一周内对社区深度技术好文进行挖掘和筛选,优质的技术文章有机会出现在下方榜单中,排名不分先后。

前端

2026 春晚魔术大揭秘:作为程序员,分分钟复刻一个(附源码) @程序员Sunday

在这个 App 进入“魔术模式”后,键盘事件已经被 e.preventDefault() 拦截了。无论你按哪个数字键,屏幕上只会依次显示程序预设好的那个 差值字符串

我写了个 code-review 的 Agent Skill, 没想到火了 @神三元

四份 checklist 的内容加起来好几千字,如果全塞进 SKILL.md,一上来就会吃掉大量上下文窗口。所以我把它们放在 references/ 里,SKILL.md 里只在需要的步骤写 Load references/xxx.md

构建工具的第三次革命:从 Rollup 到 Rust Bundler,我是如何设计 robuild 的 @sunny_

robuild 就是为解决这些问题而设计的。它基于 Rolldown(Rust 实现的 Rollup 替代品)和 Oxc(Rust 实现的 JavaScript 工具链),专注于库构建场景。

后端

Spring 源码分析 事务管理的实现原理(下) @暮色妖娆丶

这里我以 SpringBoot 源码入口为起点,画了一个相关的流程图,包含了 SpringBoot、Spring 事务、Spring AOP、Spring 事件、BeanFactoryPostProcessor、BeanPostProcessor 等所有 Spring 知识,以及相关模块之间的交互联系

一站式了解Agent Skills @想用offer打牌

因为skills只会暴露name和description,所以agent会自己判断什么场景使用这个skill,就像我们玩游戏一样,脑子已经潜移默化这种场景使用这种skills或者按这样的顺序将多种skills结合使用。

Android

丰田正在使用 Flutter 开发游戏引擎 Fluorite @恋猫de小郭

对于 Fluorite ,目前主要的集成方式就是用 FluoriteView 在 Flutter App 中添加多个 3D 视图,所以可以直接用 Flutter 生态,而 C++ 核心确保在低端硬件(如车载屏)实现主机级效果,避免 Godot 等开源引擎的启动慢/资源重问题。

Flutter 设计包解耦新进展,material_ui 和 cupertino_ui 发布预告 @恋猫de小郭

未来 Flutter 在 Framework 内将不带任何 material 和 cupertino 样式,你可以根据需要选择样式库,甚至觉得使用哪个样式库版本,最重要的是:不升级 Flutter 版本也可以更新最新的设计样式,同时控件 Bug 也可以得到更快的修复和发布

把离线AI代理装进口袋里 @稀有猿诉

正如你的输入是结构化的一样,模型的输出也是结构化的。模型不会仅仅返回一个最终的文本块。相反,它会返回一个 ModelResponse 对象流,其中每个对象代表一种不同的输出类型。模型可能生成纯文本,也可能决定调用你的某个函数。

人工智能

你知道不,你现在给 AI 用的 Agent Skills 可能毫无作用,甚至还拖后腿? @恋猫de小郭

高质量技能 = 搜索空间压缩器,它可以限定决策路径、减少无效探索、提供验证锚点和显式化领域隐性流程,这才是 Skills 能推高 Pareto frontier 的原因,所以,你需要避免百科式技能,它可能带来的更多的噪音。

Rust 编写的 40MB 大小 MicroVM 运行时,完美替代 Docker 作为 AI Agent Sandbox @RoyLin

在 AI Agent 时代,安全的代码执行环境不再是可选项,而是基础设施。A3S Box 正是为这个时代而生的运行时——它让每一行不可信的代码都运行在硬件隔离的沙箱中,让每一字节敏感数据都受到硬件加密的保护,同时让开发者感觉就像在使用 Docker 一样简单。

Skills 实战:让 AI 成为你的领域专家 @冬奇Lab

description 是 Skill 触发的关键。Claude 根据用户请求与 description 的匹配度决定是否加载该 Skill。

📖 投稿专区

大家可以在评论区推荐认为不错的文章,并附上链接和推荐理由,有机会呈现在下一期。文章创建日期必须在下期掘金一周发布前一周以内;可以推荐自己的文章、也可以推荐他人的文章。

AI 全栈时代的工程化护栏:Vben-Nest 让 Mock 契约落地成真实后端

作者 _Jude
2026年2月26日 16:28

AI 让“会写代码的人”变多了,但也让一个老问题更显眼:写得快并不等于做得稳。

  • vibe coding 很爽,但在企业协作/长期迭代里,如果缺少统一规范与质量门禁,返工会非常昂贵
  • 前后端分离的联调常常依赖 Apifox MCP 这类工具,但契约同步、环境对齐、反复调用依然麻烦,而且还会额外消耗 token
  • 真正吸引人的方向是“全栈 + AI”:把重复劳动交给 AI,把关键决策交给工程化与契约,降低个人做全栈的门槛

这份项目模板 vben-nest 的核心思路很直接:在 vben 的工程化底座上新增 Nest 后端(apps/server),并让后端接口完全适配 vben 原本的 mock 契约。前端调用尽量不改,只替换“接口实现者”,从 mock 走向真实后端更平滑、更可演进。

1. 为什么不是“随便写接口”:全栈的关键是契约

一个判断是:随着 AI 能力增强,“前后端一个人搞”会越来越常见。原因并不神秘:很多业务难点不在 UI 或 API 的某一端,而在业务逻辑本身。

当 AI 能把样板代码写得更快时,决定交付质量的就变成了:

  • 需求是否能被拆成稳定的接口契约
  • 前后端是否能围绕同一套领域模型演进
  • 多人协作与长期维护里,边界是否足够清晰、成本是否可控

社区里也常有人吐槽 vben“很重”。但把视角从“个人 demo”切换到“企业协作”,它更像是在提前支付必要成本:规范、质量门禁、脚手架、依赖治理,以及可观测的目录结构与约定。

2. vibe coding 的边界:快要配护栏

vibe coding 适合快速验证与个人实验;一旦进入团队协作或中长期迭代,劣势会迅速放大:风格不一致、边界不清晰、可测试性差、回归成本高。AI 越强,越容易把“写得像能跑”误当成“工程上可交付”。

更理想的组合是:

  • AI 负责加速产出
  • 工程化负责约束质量与一致性

3. 这个项目做了什么:用 Nest 复刻并升级 vben mock

vben 自带的后端 mock(见 apps/backend-mock)已经非常接近真实后端:它不是浏览器里的 mock.js,而是一套独立服务实现,因此能覆盖文件上传、复杂逻辑、鉴权流程等更真实的场景。

vben-nest 在此基础上新增了 Nest 后端(见 apps/server),并坚持一个核心原则:

前端不改接口调用方式,只替换“实现者”。

路径、方法、响应结构、鉴权方式尽量保持一致,从 mock 平滑迁移到真实后端;甚至可以做到“先跑起来,再逐步替换为真实业务”。

4. 兼容策略与实现要点:对齐调用习惯,也保留演进空间

4.1 接口层:兼容 vben 的响应约定

Nest 侧做了三件事,降低前端切换成本:

  • 使用全局前缀 api,对齐 vben 的请求习惯(见 main.ts
  • 通过全局响应拦截器,统一包装为 { code, data, message, error }(见 ResponseInterceptor
  • 通过全局异常过滤器,统一输出错误结构(见 HttpExceptionFilter

这样前端的请求层可以继续使用原有的 code/data 成功码约定(例如 playground 的请求拦截器依旧按 successCode: 0 解析,见 request.ts)。

4.2 鉴权层:从"能用"到"更像生产"

很多 mock 方案只做"伪登录",但真实项目通常会有 refresh token、cookie 策略、过期处理等细节。Nest 侧采用:

这让"用模板练全栈"更贴近真实业务,而不是停留在 demo 层面。

4.3 数据层:从 mock-data 到可落库演进

为了让后端具备真实项目的演进空间,这里引入:

可以先用 seed 数据把前端跑通,再逐步把 mock-data 替换成真实表结构与业务逻辑。

4.4 工程化:把 vben 的护栏带到后端

这类模板的“核心价值”不在于多写了几个接口,而在于少搭了一套后端工程规范。

Nest 同样处于 TypeScript 生态,把后端放进 vben 的 monorepo 后,直接获得:

  • 代码格式化与 lint 约束(Prettier/ESLint/Stylelint)
  • Git Hooks 质量门禁(Lefthook + Commitlint,见 lefthook.yml
  • 统一依赖治理与脚本编排(pnpm workspace + turbo)

在 AI 参与编码的前提下,这套护栏能把“产出速度”与“可维护性”拉到一个更平衡的位置。

5. 快速开始

环境要求

  • Node.js >= 20.19
  • pnpm >= 10(推荐使用 corepack)
  • Docker Desktop(推荐,用于一键启动 PostgreSQL)

安装与运行

# 克隆项目
git clone https://github.com/MiniJude/vben-nest.git
cd vben-nest

# 安装依赖
npm i -g corepack
pnpm install

# 启动数据库 (Docker Compose)
docker compose -f apps/server/docker-compose.yml up -d

# 初始化数据库结构与种子数据
pnpm -F @vben/server run db:init

# 启动后端服务
pnpm -F @vben/server run dev
# 默认端口:3000

# 启动前端 Playground (新开终端)
pnpm dev:play

默认账号

  • vben / 123456
  • admin / 123456
  • jack / 123456

6. 目录结构速览:前端多方案 + 后端可落库

  • 前端应用:apps/web-*(多 UI 方案示例)
  • Playground:playground/(用于快速体验与联调)
  • Mock 服务:apps/backend-mock/(vben 原生 mock 服务实现)
  • Nest 后端:apps/server/(本项目新增,适配 mock 契约)

6. 全栈 + AI 的实践建议:契约先行

更推荐的协作方式是“契约先行(Contract First)”:

  • 先把接口契约稳定下来(URL、method、code/data 结构、分页约定、错误约定)
  • 再让 AI 生成 DTO、Controller、Service、Prisma Schema 的骨架
  • 依靠 lint、类型系统、提交规范,把代码质量稳定在可维护区间

前后端分离下,工具链可以帮助联调,但契约才是协作的“唯一事实来源”。

7. 适合谁使用

  • 想从 vben 上手全栈的前端同学:用熟悉的前端工程体系,把后端也纳入同一条流水线
  • 想快速搭建企业级全栈脚手架的小团队:少做重复造轮子,把精力留给业务
  • 想在真实工程约束下用 AI 提效的人:让 AI 提速,同时保留规范与可演进性

8. 项目链接

github.com/MiniJude/vb…

9. 后续演进

当前版本更偏向“理念与路径”的最小落地:先把接口契约与工程化护栏对齐,让 mock 能平滑替换为可演进的真实后端。面向生产的后端工程还需要持续补齐监控、日志、权限、安全、审计等能力,本项目也会按迭代节奏逐步完善。

Sentinel Java客户端限流原理解析|得物技术

作者 得物技术
2026年2月26日 13:33

一、从一次 HTTP 请求开始

在一个生产环境中,服务节点通常暴露了成百上千个 HTTP 接口对外提供服务。为了保证系统的稳定性,核心 HTTP 接口往往需要配置限流规则。给 HTTP 接口配置限流,可以防止突发或恶意的高并发请求耗尽服务器资源(如 CPU、内存、数据库连接等),从而避免服务崩溃或引发雪崩效应。

基础示例

假设我们有下面这样一个 HTTP 接口,需要给它配置限流规则:

@RestController
@RequiredArgsConstructor
@RequestMapping("/demo")
public class DemoController {

    @RequestMapping("/hello")
    @SentinelResource("test_sentinel")
    public String hello() {
        return "hello world";
    }
}

使用起来非常简单。首先我们可以选择给接口加上 @SentinelResource 注解(也可以不加,如果不加 Sentinel 客户端会使用请求路径作为资源名,详细原理在后面章节讲解),然后到流控控制台给该资源配置流控规则即可。

二、限流规则的加载

限流规则的生效,是从限流规则的加载开始的。聚焦到客户端的 RuleLoader 类,可以看到它支持了多种规则的加载:

  • 流控规则;
  • 集群限流规则;
  • 熔断规则;
  • ......

RuleLoader 核心逻辑

RuleLoader 类的核心作用是将这些规则加载到缓存中,方便后续使用:

public class RuleLoader {

    /**
     * 加载所有 Sentinel 规则到内存缓存
     *
     * @param sentinelRules 包含各种规则的配置对象
     */
    public static void loadRule(SentinelRules sentinelRules) {
        if (sentinelRules == null) {
            return;
        }

        // 加载流控规则
        FlowRuleManager.loadRules(sentinelRules.getFlowRules());
        // 加载集群流控规则
        RuleManager.loadClusterFlowRule(sentinelRules.getFlowRules());

        // 加载参数流控规则
        ParamFlowRuleManager.loadRules(sentinelRules.getParamFlowRules());
        // 加载参数集群流控规则
        RuleManager.loadClusterParamFlowRule(sentinelRules.getParamFlowRules());

        // 加载熔断规则
        DegradeRuleManager.loadRules(sentinelRules.getDegradeRules());

        // 加载参数熔断规则
        ParamDegradeRuleManager.loadRules(sentinelRules.getParamDegradeRules());

        // 加载系统限流规则
        SystemRuleManager.loadRules(sentinelRules.getSystemRules());
    }
}

流控规则加载详情

以流控规则的加载为例深入FlowRuleManager.loadRules 方法可以看到其完整的加载逻辑:

public static void loadRules(List<FlowRule> rules) {
    // 通过动态配置属性更新规则值
    currentProperty.updateValue(rules);
}

updateValue 方法负责通知所有监听器配置变更:

public boolean updateValue(T newValue) {
    // 如果新旧值相同,无需更新
    if (isEqual(value, newValue)) {
        return false;
    }
    RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);

    // 更新配置值
    value = newValue;
    // 通知所有监听器配置已更新
    for (PropertyListener<T> listener : listeners) {
        listener.configUpdate(newValue);
    }
    return true;
}

FlowPropertyListener 是流控规则变更的具体监听器实现:

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {

    @Override
    public void configUpdate(List<FlowRule> value) {
        // 构建流控规则映射表(按资源名分组)
        Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
        if (rules != null) {
            // 清空旧规则
            flowRules.clear();
            // 加载新规则
            flowRules.putAll(rules);
        }
        RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
    }
}

三、SentinelServletFilter 过滤器

在 Sentinel 中,所有的资源都对应一个资源名称和一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建。Entry 是限流的入口类,通过 @SentinelResource 注解的限流本质上也是通过 AOP 的方式进行了对 Entry 类的调用。

Entry 的编程范式

Entry 类的标准使用方式如下:

// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串
try (Entry entry = SphU.entry("resourceName")) {
    // 被保护的业务逻辑
    // do something here...
} catch (BlockException ex) {
    // 资源访问阻止,被限流或被降级
    // 在此处进行相应的处理操作
}

Servlet Filter 拦截逻辑

对于一个 HTTP 资源,在没有显式标注 @SentinelResource 注解的情况下,会有一个 Servlet Filter 类 SentinelServletFilter 统一进行拦截:

public class SentinelServletFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest) request;
        Entry urlEntry = null;

        try {
            // 获取并清理请求路径
            String target = FilterUtil.filterTarget(sRequest);

            // 统一 URL 清理逻辑
            // 对于 RESTful API,必须对 URL 进行清理(例如将 /foo/1 和 /foo/2 统一为 /foo/:id),
            // 否则上下文和资源的数量会超过阈值
            SentinelUrlCleaner urlCleaner = SentinelUrlCleaner.SENTINEL_URL_CLEANER;
            if (urlCleaner != null) {
                target = urlCleaner.clean(sRequest, target);
            }

            // 如果请求路径不为空且非安全扫描,则进入限流逻辑
            if (!StringUtil.isEmpty(target) && !isSecScan) {
                // 解析来源标识(用于来源限流)
                String origin = parseOrigin(sRequest);
                // 确定上下文名称
                String contextName = webContextUnify
                    ? WebServletConfig.WEB_SERVLET_CONTEXT_NAME
                    : target;

                // 使用 WEB_SERVLET_CONTEXT_NAME 作为当前 Context 的名字
                ContextUtil.enter(contextName, origin);

                // 根据配置决定是否包含 HTTP 方法
                if (httpMethodSpecify) {
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
                    // 实际进入到限流统计判断逻辑,资源名是 "方法:路径"
                    urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                } else {
                    // 实际进入到限流统计判断逻辑,资源名是请求路径
                    urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                }
            }

            // 继续执行后续过滤器
            chain.doFilter(request, response);

        } catch (BlockException e) {
            // 处理被限流的情况
            HttpServletResponse sResponse = (HttpServletResponse) response;
            // 返回限流页面或重定向到其他 URL
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);

        } catch (IOException | ServletException | RuntimeException e2) {
            // 记录异常信息用于统计
            Tracer.traceEntry(e2, urlEntry);
            throw e2;

        } finally {
            // 释放 Entry 资源
            if (urlEntry != null) {
                urlEntry.exit();
            }
            // 退出当前上下文
            ContextUtil.exit();
        }
    }
}

四、SentinelResourceAspect 切面

如果在接口上标注了 @SentinelResource 注解,还会有另外的逻辑处理。Sentinel 定义了一个单独的 AOP 切面 SentinelResourceAspect 专门用于处理注解限流。

SentinelResource 注解定义

先来看看 @SentinelResource 注解的完整定义:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SentinelResource {

    /**
     * Sentinel 资源的名称(即资源标识)
     * 必填项,不能为空
     */
    String value() default "";

    /**
     * 资源的入口类型(入站 IN 或出站 OUT)
     * 默认为出站(OUT)
     */
    EntryType entryType() default EntryType.OUT;

    /**
     * 资源的分类(类型)
     * 自 1.7.0 版本起支持
     */
    int resourceType() default 0;

    /**
     * 限流或熔断时调用的 block 异常处理方法的名称
     * 默认为空(即不指定)
     */
    String blockHandler() default "";

    /**
     * blockHandler 所在的类
     * 如果与原方法不在同一个类,需要指定此参数
     */
    Class<?>[] blockHandlerClass() default {};

    /**
     * 降级(fallback)方法的名称
     * 默认为空(即不指定)
     */
    String fallback() default "";

    /**
     * 用作通用的默认降级方法
     * 该方法不能接收任何参数,且返回类型需与原方法兼容
     */
    String defaultFallback() default "";

    /**
     * fallback 所在的类
     * 如果与原方法不在同一个类,需要指定此参数
     */
    Class<?>[] fallbackClass() default {};

    /**
     * 需要被追踪并触发 fallback 的异常类型列表
     * 默认为 Throwable(即所有异常都会触发 fallback)
     */
    Class<? extends Throwable>[] exceptionsToTrace() default {Throwable.class};

    /**
     * 指定需要忽略的异常类型(即这些异常不会触发 fallback)
     * 注意:exceptionsToTrace 和 exceptionsToIgnore 不应同时使用;
     * 若同时存在,exceptionsToIgnore 优先级更高
     */
    Class<? extends Throwable>[] exceptionsToIgnore() default {};
}

实际使用示例

下面是一个完整的使用示例,展示了 @SentinelResource 注解的各种配置方式:

@RestController
public class SentinelController {

    @Autowired
    private ISentinelService service;

    @GetMapping(value = "/hello/{s}")
    public String apiHello(@PathVariable long s) {
        return service.hello(s);
    }
}

public interface ISentinelService {
    String hello(long s);
}

@Service
@Slf4j
public class SentinelServiceImpl implements ISentinelService {

    /**
     * Sentinel 提供了 @SentinelResource 注解用于定义资源
     *
     * @param s 输入参数
     * @return 返回结果
     */
    @Override
    // value:资源名称,必需项(不能为空)
    // blockHandler:对应处理 BlockException 的函数名称
    // fallback:用于在抛出异常的时候提供 fallback 处理逻辑
    @SentinelResource(value = "hello", blockHandler = "exceptionHandler", fallback = "helloFallback")
    public String hello(long s) {
        log.error("hello:{}", s);
        return String.format("Hello at %d", s);
    }

    /**
     * Fallback 函数
     * 函数签名与原函数一致,或加一个 Throwable 类型的参数
     */
    public String helloFallback(long s) {
        log.error("helloFallback:{}", s);
        return String.format("Halooooo %d", s);
    }

    /**
     * Block 异常处理函数
     * 参数最后多一个 BlockException,其余与原函数一致
     */
    public String exceptionHandler(long s, BlockException ex) {
        // Do some log here.
        log.error("exceptionHandler:{}", s);
        ex.printStackTrace();
        return "Oops, error occurred at " + s;
    }
}

SentinelResourceAspect 核心逻辑

@SentinelResource 注解由 SentinelResourceAspect 切面处理,核心逻辑如下:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        // 获取目标方法
        Method originMethod = resolveMethod(pjp);

        // 获取注解信息
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }

        // 获取资源配置信息
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();

        Entry entry = null;
        try {
            // 创建限流入口
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 执行原方法
            Object result = pjp.proceed();
            return result;

        } catch (BlockException ex) {
            // 处理被限流异常
            return handleBlockException(pjp, annotation, ex);

        } catch (Throwable ex) {
            // 处理业务异常
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // 优先检查忽略列表
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            // 检查异常是否在追踪列表中
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                // 执行 fallback 逻辑
                return handleFallback(pjp, annotation, ex);
            }

            // 没有 fallback 函数可以处理该异常,直接抛出
            throw ex;

        } finally {
            // 释放 Entry 资源
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }

    /**
     * 处理 BlockException
     *
     * blockHandler / blockHandlerClass 说明:
     * - blockHandler:对应处理 BlockException 的函数名称,可选项
     * - blockHandler 函数签名:与原方法相匹配并且最后加一个额外的参数,类型为 BlockException
     * - blockHandler 函数默认需要和原方法在同一个类中
     * - 若希望使用其他类的函数,则可以指定 blockHandlerClass 为对应的类的 Class 对象
     * - 注意:blockHandlerClass 中对应的函数必须为 static 函数,否则无法解析
     */
    protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
            throws Throwable {

        // 执行 blockHandler 方法(如果配置了的话)
        Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
                annotation.blockHandlerClass());

        if (blockHandlerMethod != null) {
            Object[] originArgs = pjp.getArgs();
            // 构造参数:原方法参数 + BlockException
            Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
            args[args.length - 1] = ex;

            try {
                // 根据 static 方法与否进行不同的调用
                if (isStatic(blockHandlerMethod)) {
                    return blockHandlerMethod.invoke(null, args);
                }
                return blockHandlerMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // 抛出实际的异常
                throw e.getTargetException();
            }
        }

        // 如果没有 blockHandler,则尝试执行 fallback
        return handleFallback(pjp, annotation, ex);
    }

    /**
     * 处理 Fallback 逻辑
     *
     * fallback / fallbackClass 说明:
     * - fallback:fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑
     * - fallback 函数可以针对所有类型的异常(除了 exceptionsToIgnore 里面排除掉的异常类型)进行处理
     *
     * fallback 函数签名和位置要求:
     * - 返回值类型必须与原函数返回值类型一致
     * - 方法参数列表需要和原函数一致,或者可以额外多一个 Throwable 类型的参数用于接收对应的异常
     * - fallback 函数默认需要和原方法在同一个类中
     * - 若希望使用其他类的函数,则可以指定 fallbackClass 为对应的类的 Class 对象
     * - 注意:fallbackClass 中对应的函数必须为 static 函数,否则无法解析
     */
    protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
                                    Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        Object[] originArgs = pjp.getArgs();

        // 执行 fallback 函数(如果配置了的话)
        Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);

        if (fallbackMethod != null) {
            // 构造参数:根据 fallback 方法的参数数量决定是否添加异常参数
            int paramCount = fallbackMethod.getParameterTypes().length;
            Object[] args;
            if (paramCount == originArgs.length) {
                args = originArgs;
            } else {
                args = Arrays.copyOf(originArgs, originArgs.length + 1);
                args[args.length - 1] = ex;
            }

            try {
                // 根据 static 方法与否进行不同的调用
                if (isStatic(fallbackMethod)) {
                    return fallbackMethod.invoke(null, args);
                }
                return fallbackMethod.invoke(pjp.getTarget(), args);
            } catch (InvocationTargetException e) {
                // 抛出实际的异常
                throw e.getTargetException();
            }
        }

        // 如果没有 fallback,尝试使用 defaultFallback
        return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
    }
}

五、流控处理核心逻辑

从入口函数开始,我们深入到流控处理的核心逻辑。

入口函数调用链

public class SphU {

    /**
     * 创建限流入口
     *
     * @param name 资源名称
     * @param resourceType 资源类型
     * @param trafficType 流量类型(IN 或 OUT)
     * @param args 参数数组
     * @return Entry 对象
     * @throws BlockException 如果被限流则抛出此异常
     */
    public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
            throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
    }

    public static Entry entry(String name, EntryType trafficType, int batchCount) throws BlockException {
        return Env.sph.entry(name, trafficType, batchCount, OBJECTS0);
    }
}
public class CtSph implements Sph {

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }

    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }

    /**
     * 带优先级的入口方法,这是限流的核心逻辑
     */
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
        Context context = ContextUtil.getContext();

        // 如果上下文数量超过阈值,则不进行规则检查
        if (context instanceof NullContext) {
            // NullContext 表示上下文数量超过了阈值,这里只初始化 Entry,不进行规则检查
            return new CtEntry(resourceWrapper, null, context);
        }

        // 如果没有上下文,使用默认上下文
        if (context == null) {
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // 如果全局开关关闭,则不进行规则检查
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 获取或创建 ProcessorSlotChain(责任链)
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * 如果资源(slot chain)数量超过 {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * 则不进行规则检查
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 创建 Entry 对象
        Entry e = new CtEntry(resourceWrapper, chain, context);

        try {
            // 执行责任链进行规则检查
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            // 如果被限流,释放 Entry 并抛出异常
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // 这不应该发生,除非 Sentinel 内部存在错误
            log.warn("Sentinel unexpected exception,{}", e1.getMessage());
        }
        return e;
    }
}

ProcessorSlotChain 功能插槽链

lookProcessChain 方法实际创建了 ProcessorSlotChain 功能插槽链。ProcessorSlotChain 采用责任链模式,将不同的功能(限流、降级、系统保护)组合在一起。

SlotChain 的获取与创建

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 先从缓存中获取
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);

    if (chain == null) {
        // 双重检查锁,保证线程安全
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry 大小限制
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                // 创建新的 SlotChain
                chain = SlotChainProvider.newSlotChain();

                // 使用不可变模式更新缓存
                Map<ResourceWrapper, ProcessorSlotChain> newMap =
                    new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

SlotChain 的构建

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // 通过 SPI 加载所有 ProcessorSlot 并排序
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);

        for (ProcessorSlot slot : sortedSlotList) {
            // 只处理继承自 AbstractLinkedProcessorSlot 的 Slot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() +
                    ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            // 将 Slot 添加到责任链尾部
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

SlotChain 的功能划分

Slot Chain 可以分为两部分:

  • 统计数据构建部分(statistic):负责收集各种指标数据;
  • 判断部分(rule checking):根据规则判断是否限流。

官方架构图很好地解释了各个 Slot 的作用及其负责的部分。目前 ProcessorSlotChain 的设计是一个资源对应一个,构建好后缓存起来,方便下次直接取用。

各 Slot 的执行顺序

以下是 Sentinel 中各个 Slot 的默认执行顺序:

NodeSelectorSlot
    ↓
ClusterBuilderSlot
    ↓
StatisticSlot
    ↓
ParamFlowSlot
    ↓
SystemSlot
    ↓
AuthoritySlot
    ↓
FlowSlot
    ↓
DegradeSlot

NodeSelectorSlot - 上下文节点选择

这个功能插槽主要为资源下不同的上下文创建对应的 DefaultNode(实际用于统计指标信息)。解释一下Sentinel中的Node是什么,简单来说就是每个资源统计指标存放的容器,只不过内部由于不同的统计口径(秒级、分钟及)而分别有不同的统计窗口。Node在Sentinel不是单一的结构,而是总体上形成父子关系的树形结构。

不同的调用会有不同的 context 名称,如在当前 MVC 场景下,上下文为 sentinel_web_servlet_context。

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * 同一个资源在不同上下文中的 DefaultNode 映射
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 从映射表中获取当前上下文对应的节点
        DefaultNode node = map.get(context.getName());

        if (node == null) {
            // 双重检查锁,保证线程安全
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    // 创建新的 DefaultNode
                    node = new DefaultNode(resourceWrapper, null);

                    // 使用写时复制更新缓存
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;

                    // 构建调用树
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }

        // 设置当前上下文的当前节点
        context.setCurNode(node);
        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

ClusterBuilderSlot - 集群节点构建

这个功能槽主要用于创建 ClusterNode。ClusterNode 和 DefaultNode 的区别是:

DefaultNode 是特定于上下文的(context-specific);

ClusterNode 是不区分上下文的(context-independent),用于统计该资源在所有上下文中的整体数据。

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * 全局 ClusterNode 映射表
     */
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

    private static final Object lock = new Object();

    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 创建 ClusterNode(如果不存在)
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // 创建集群节点
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());

                    // 更新全局映射表
                    HashMap<ResourceWrapper, ClusterNode> newMap =
                        new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }

        // 将 ClusterNode 设置到 DefaultNode 中
        node.setClusterNode(clusterNode);

        // 如果有来源标识,则创建 origin node
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

StatisticSlot - 统计插槽

StatisticSlot 是 Sentinel 最重要的类之一,用于根据规则判断结果进行相应的统计操作。

统计逻辑说明

entry 的时候:

依次执行后续的判断 Slot;

每个 Slot 触发流控会抛出异常(BlockException 的子类);

若有 BlockException 抛出,则记录 block 数据;

若无异常抛出则算作可通过(pass),记录 pass 数据。

exit 的时候:

若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数 -1。

记录数据的维度:

线程数 +1;

记录当前 DefaultNode 数据;

记录对应的 originNode 数据(若存在 origin);

累计 IN 统计数据(若流量类型为 IN)。

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 此位置会调用 SlotChain 中后续的所有 Slot,完成所有规则检测
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 请求通过,增加线程数和通过数
            // 代码运行到这个位置,就证明之前的所有 Slot 检测都通过了
            // 此时就可以统计请求的相应数据了

            // 增加线程数(+1)
            node.increaseThreadNum();
            // 增加通过请求的数量(这里涉及到滑动窗口算法)
            node.addPassRequest(count);

            // 省略其他统计逻辑...

        } catch (PriorityWaitException ex) {
            // 如果是优先级等待异常,记录优先级等待数
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // 记录入站统计数据
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            throw ex;

        } catch (BlockException e) {
            // 如果被限流,记录被限流数
            // 省略 block 统计逻辑...
            throw e;

        } catch (Throwable ex) {
            // 如果发生业务异常,记录异常数
            // 省略异常统计逻辑...
            throw ex;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        // 若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1
        // 记录数据的维度:线程数+1、记录当前 DefaultNode 数据、记录对应的 originNode 数据(若存在 origin)
        // 、累计 IN 统计数据(若流量类型为 IN)
        // 省略 exit 统计逻辑...
    }
}

StatisticNode 数据结构

到这里,StatisticSlot 的作用已经比较清晰了。接下来我们需要分析它的统计数据结构。fireEntry 调用向下的节点和之前的方式一样,剩下的节点主要包括:

  • ParamFlowSlot;
  • SystemSlot;
  • AuthoritySlot;
  • FlowSlot;
  • DegradeSlot;

其中比较常见的是流控和熔断:FlowSlot、DegradeSlot,所以下面我们着重分析 FlowSlot。

六、FlowSlot - 流控插槽

这个 Slot 主要根据预设的资源的统计信息,按照固定的次序依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止。

FlowSlot 核心逻辑

@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 执行流控检查
        checkFlow(resourceWrapper, context, node, count, prioritized);

        // 继续执行后续 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    // 省略其他方法...
}

checkFlow 方法详解

/**
 * 执行流控检查
 *
 * @param ruleProvider 规则提供者函数
 * @param resource 资源包装器
 * @param context 上下文
 * @param node 节点
 * @param count 请求数量
 * @param prioritized 是否优先
 * @throws BlockException 如果被限流则抛出异常
 */
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    // 判断规则和资源不能为空
    if (ruleProvider == null || resource == null) {
        return;
    }

    // 获取指定资源的所有流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());

    // 逐个应用流控规则。若无法通过则抛出异常,后续规则不再应用
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                // FlowException 继承 BlockException
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

通过这里我们就可以得知,流控规则是通过 FlowRule 来完成的,数据来源是我们使用的流控控制台,也可以通过代码进行设置。

FlowRule 流控规则

每条流控规则主要由三个要素构成:

  • grade(阈值类型):按 QPS(每秒请求数)还是线程数进行限流;
  • strategy(调用关系策略):基于调用关系的流控策略;
  • controlBehavior(流控效果):当 QPS 超过阈值时的流量整形行为。
public class FlowRule extends AbstractRule {

    public FlowRule() {
        super();
        // 来源默认 Default
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    public FlowRule(String resourceName) {
        super();
        // 资源名称
        setResource(resourceName);
        setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
    }

    /**
     * 流控的阈值类型
     * 0: 线程数
     * 1: QPS
     */
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     * 流控阈值
     */
    private double count;

    /**
     * 基于调用链的流控策略
     * STRATEGY_DIRECT: 直接流控(按来源)
     * STRATEGY_RELATE: 关联流控(关联资源)
     * STRATEGY_CHAIN: 链路流控(按入口资源)
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * 关联流控模式下的关联资源
     */
    private String refResource;

    /**
     * 流控效果(流量整形行为)
     * 0: 默认(直接拒绝)
     * 1: 预热(Warm Up)
     * 2: 排队等待(Rate Limiter)
     * 3: 预热 + 排队等待(目前控制台没有)
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    /**
     * 预热时长(秒)
     */
    private int warmUpPeriodSec = 10;

    /**
     * 排队等待的最大超时时间(毫秒)
     */
    private int maxQueueingTimeMs = 500;

    /**
     * 是否为集群模式
     */
    private boolean clusterMode;

    /**
     * 集群模式配置
     */
    private ClusterFlowConfig clusterConfig;

    /**
     * 流量整形控制器
     */
    private TrafficShapingController controller;

    // 省略 getter/setter 方法...
}

七、滑动窗口算法

不管流控规则采用何种流控算法,在底层都需要有支持指标统计的数据结构作为支撑。在 Sentinel 中,用于支撑基于 QPS 等限流的数据结构是 StatisticNode。

StatisticNode 数据结构

public class StatisticNode implements Node {

    /**
     * 保存最近 1 秒内的统计数据
     * 每个桶(bucket)500ms,共 2 个桶
     */
    private transient volatile Metric rollingCounterInSecond =
        new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);

    /**
     * 保存最近 60 秒的统计数据
     * windowLengthInMs 被特意设置为 1000 毫秒,即每个桶代表 1 秒
     * 共 60 个桶,这样可以获得每秒精确的统计信息
     */
    private transient Metric rollingCounterInMinute =
        new ArrayMetric(60, 60 * 1000, false);

    // 省略其他字段和方法...
}

ArrayMetric 核心实现

ArrayMetric 是 Sentinel 中数据采集的核心,内部使用了 BucketLeapArray,即滑动窗口的思想进行数据的采集。

public class ArrayMetric implements Metric {

    /**
     * 滑动窗口数组
     */
    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            // 可抢占的滑动窗口,支持借用未来窗口的配额
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            // 普通滑动窗口
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
}

这里有两种实现:

  • BucketLeapArray:普通滑动窗口,每个时间桶仅记录固定时间窗口内的指标数据;
  • OccupiableBucketLeapArray:扩展实现,支持"抢占"未来时间窗口的令牌或容量,在流量突发时允许借用后续窗口的配额,实现更平滑的限流效果。

BucketLeapArray - 滑动窗口实现

LeapArray 核心属性

LeapArray 是滑动窗口的基础类,其核心属性如下:

/**
 * 窗口大小(长度),单位:毫秒
 * 例如:1000ms
 */
private int windowLengthInMs;

/**
 * 样本数(桶的数量)
 * 例如:5(表示 5 个桶,每个 1000ms,总共 5 秒)
 */
private int sampleCount;

/**
 * 采集周期(总时间窗口长度),单位:毫秒
 * 例如:5 * 1000ms(5 秒)
 */
private int intervalInMs;

/**
 * 窗口数组,array 长度就是样本数 sampleCount
 */
protected final AtomicReferenceArray<WindowWrap<T>> array;

/**
 * 更新窗口数据的锁,保证数据的正确性
 */
private final ReentrantLock updateLock;

WindowWrap 窗口包装器

每个窗口包装器包含三个属性:

 public class WindowWrap<T> {

    /**
     * 窗口大小(长度),单位:毫秒
     * 与 LeapArray 中的 windowLengthInMs 一致
     */
    private final long windowLengthInMs;

    /**
     * 窗口开始时间戳
     * 它的值是 windowLengthInMs 的整数倍
     */
    private long windowStart;

    /**
     * 窗口数据(泛型 T)
     * Sentinel 目前只有 MetricBucket 类型,存储统计数据
     */
    private T value;
}

MetricBucket 指标桶

public class MetricBucket {

    /**
     * 计数器数组
     * 长度是需要统计的事件种类数,目前是 6 个
     * LongAdder 是线程安全的计数器,性能优于 AtomicLong
     */
    private final LongAdder[] counters;
    
    // 省略其他字段和方法...
}

滑动窗口工作原理

LeapArray 统计数据的基本思路:

创建一个长度为 n 的数组,数组元素就是窗口;

每个窗口包装了 1 个指标桶,桶中存放了该窗口时间范围内对应的请求统计数据;

可以想象成一个环形数组在时间轴上向右滚动;

请求到达时,会命中数组中的一个窗口,该请求的数据就会存到命中的这个窗口包含的指标桶中;

当数组转满一圈时,会回到数组的开头;

此时下标为 0 的元素需要重复使用,它里面的窗口数据过期了,需要重置,然后再使用。

获取当前窗口

LeapArray 获取当前时间窗口的方法:

 /**
 * 获取当前时间戳对应的窗口
 *
 * @return 当前时间的窗口
 */
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}

/**
 * 获取指定时间戳对应的窗口(核心方法)
 *
 * @param timeMillis 时间戳(毫秒)
 * @return 对应的窗口
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    // 计算数组下标
    int idx = calculateTimeIdx(timeMillis);

    // 计算当前请求对应的窗口开始时间
    long windowStart = calculateWindowStart(timeMillis);

    // 无限循环,确保能够获取到窗口
    while (true) {
        // 取窗口
        WindowWrap<T> old = array.get(idx);

        if (old == null) {
            // 第一次使用,创建新窗口
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

            // CAS 操作,确保只初始化一次
            if (array.compareAndSet(idx, null, window)) {
                // 成功更新,返回创建的窗口
                return window;
            } else {
                // CAS 失败,让出时间片,等待其他线程完成初始化
                Thread.yield();
            }

        } else if (windowStart == old.windowStart()) {
            // 命中:取出的窗口的开始时间和本次请求计算出的窗口开始时间一致
            return old;

        } else if (windowStart > old.windowStart()) {
            // 窗口过期:本次请求计算出的窗口开始时间大于取出的窗口
            // 说明取出的窗口过期了,需要重置
            if (updateLock.tryLock()) {
                try {
                    // 成功获取锁,更新窗口开始时间,计数器重置
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // 获取锁失败,让出时间片,等待其他线程更新
                Thread.yield();
            }

        } else if (windowStart < old.windowStart()) {
            // 异常情况:机器时钟回拨等
            // 正常情况不会进入该分支
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

数据存储

在获取到窗口之后,就可以存储数据了。ArrayMetric 实现了 Metric 中存取数据的接口方法。

示例:存储 RT(响应时间)

/**
 * 添加响应时间数据
 *
 * @param rt 响应时间(毫秒)
 */
public void addRT(long rt) {
    // 获取当前时间窗口,data 为 BucketLeapArray
    WindowWrap<MetricBucket> wrap = data.currentWindow();

    // 计数
    wrap.value().addRT(rt);
}

/**
 * MetricBucket 的 addRT 方法
 *
 * @param rt 响应时间
 */
public void addRT(long rt) {
    // 记录 RT 时间对 rt 值
    add(MetricEvent.RT, rt);

    // 记录最小响应时间(非线程安全,但没关系)
    if (rt < minRt) {
        minRt = rt;
    }
}

/**
 * 通用的计数方法
 *
 * @param event 事件类型
 * @param n 增加的数量
 * @return 当前桶
 */
public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}

数据读取

示例:读取 RT(响应时间)

/**
 * 获取总响应时间
 *
 * @return 总响应时间
 */
public long rt() {
    // 触发当前窗口更新(处理过期窗口)
    data.currentWindow();

    long rt = 0;
    // 取出所有的 bucket
    List<MetricBucket> list = data.values();

    for (MetricBucket window : list) {
        rt += window.rt(); // 求和
    }
    return rt;
}

/**
 * 获取所有有效的窗口
 *
 * @return 有效窗口列表
 */
public List<T> values() {
    return values(TimeUtil.currentTimeMillis());
}

/**
 * 获取指定时间之前的所有有效窗口
 *
 * @param timeMillis 时间戳
 * @return 有效窗口列表
 */
public List<T> values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList<T>(); // 正常情况不会到这里
    }

    int size = array.length();
    List<T> result = new ArrayList<T>(size);

    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);

        // 过滤掉没有初始化过的窗口和过期的窗口
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }

        result.add(windowWrap.value());
    }
    return result;
}

/**
 * 判断窗口是否过期
 *
 * @param time 给定时间(通常是当前时间)
 * @param windowWrap 窗口包装器
 * @return 如果过期返回 true
 */
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    // 给定时间与窗口开始时间超过了一个采集周期
    return time - windowWrap.windowStart() > intervalInMs;
}

OccupiableBucketLeapArray - 可抢占窗口

为什么需要 OccupiableBucketLeapArray?

假设一个资源的访问 QPS 稳定是 10,请求是均匀分布的:

在时间 0.0-1.0 秒区间中,通过了 10 个请求;

在 1.1 秒的时候,观察到的 QPS 可能只有 5,因为此时第一个时间窗口被重置了,只有第二个时间窗口有值;

当在秒级统计的情形下,用 BucketLeapArray 会有 0~50%的数据误这时就要用 OccupiableBucketLeapArray 来解决这个问题。

OccupiableBucketLeapArray 实现

从上面我们可以看到在秒级统计 rollingCounterInSecond 中,初始化实例时有两种构造参数:

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    /**
     * 借用未来窗口的数组
     */
    private final FutureBucketLeapArray borrowArray;

    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        // 创建借用窗口数组
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

    /**
     * 创建新的空桶
     * 会从 borrowArray 中借用数据
     */
    @Override
    public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();

        // 获取借用窗口的数据
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            // 将借用数据复制到新桶中
            newBucket.reset(borrowBucket);
        }

        return newBucket;
    }

    /**
     * 重置窗口
     * 会从 borrowArray 中借用 pass 数据
     */
    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // 更新开始时间并重置值
        w.resetTo(time);

        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            // 重置桶值并添加借用的 pass 数据
            w.value().reset();
            w.value().addPass((int) borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

    /**
     * 获取当前等待中的请求数量
     */
    @Override
    public long currentWaiting() {
        borrowArray.currentWindow();
        long currentWaiting = 0;
        List<MetricBucket> list = borrowArray.values();

        for (MetricBucket window : list) {
            currentWaiting += window.pass();
        }
        return currentWaiting;
    }

    /**
     * 添加等待中的请求数量
     *
     * @param time 时间
     * @param acquireCount 获取数量
     */
    @Override
    public void addWaiting(long time, int acquireCount) {
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
    }
}

八、总结

至此,Sentinel 的基本情况都已经分析完成。以上内容主要讲解了 Sentinel 的核心处理流程,包括:

核心流程总结

  1. 规则加载:
  • 通过 RuleLoader 将各种规则(流控、熔断、系统限流等)加载到内存缓存中。
  1. 请求拦截:
  • 通过 SentinelServletFilter 过滤器拦截 HTTP 请求;
  • 通过SentinelResourceAspect切面处理 @SentinelResource 注解。
  1. 责任链处理:
  • 使用 ProcessorSlotChain 责任链模式组合多个功能插槽;
  • 每个插槽负责特定的功能(统计、流控、熔断等)。
  1. 流控判断:
  • FlowSlot 根据流控规则判断是否限流;
  • 通过滑动窗口算法统计 QPS、线程数等指标。
  1. 异常处理:
  • 被限流时抛出 BlockException;
  • 通过 blockHandler 或 fallback 处理异常。

核心技术点

  1. 责任链模式:
  • 通过 ProcessorSlotChain 将不同的限流功能组合在一起。
  1. 滑动窗口算法:
  • LeapArray 实现环形滑动窗口;
  • BucketLeapArray 普通滑动窗口;
  • OccupiableBucketLeapArray 可抢占窗口,支持借用未来配额。
  1. 数据结构:
  • DefaultNode:特定于上下文的统计节点;
  • ClusterNode:不区分上下文的集群统计节点;
  • StatisticNode:核心统计节点,包含秒级和分钟级统计。
  1. 限流算法:
  • QPS 限流:通过滑动窗口统计 QPS;
  • 线程数限流:通过原子计数器统计线程数;
  • 流控效果:快速失败、预热、排队等待等;

Sentinel 通过精心设计的架构,实现了高效、灵活、可扩展的流量控制能力,为微服务系统提供了强大的保护机制。

往期回顾

1.社区推荐重排技术:双阶段框架的实践与演进|得物技术

2.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

3.服务拆分之旅:测试过程全揭秘|得物技术

4.大模型网关:大模型时代的智能交通枢纽|得物技术

5.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

文 /万钧

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

社区推荐重排技术:双阶段框架的实践与演进|得物技术

作者 得物技术
2026年2月12日 13:48

一、背景

推荐系统典型pipeline

在推荐系统多阶段Pipeline(召回→粗排→精排→重排)中,重排作为最终决策环节,承担着将精排输出的有限候选集(通常为Top 100–500个Item)转化为最优序列的关键职责。数学定义为在给定候选集C={x1,x2,,xn}C = \lbrace x_1,x_2,……,x_n \rbrace与目标列表长度LL,重排的目标是寻找一个排列πP(C,L)\pi^* \in P(C,L),使得全局收益函数最大化。

在推荐系统、搜索排序等AI领域,Pointwise 建模是精排阶段的核心方法,即对每个 Item 独立打分后排序,pointwise 建模范式面临挑战:

  • 多样性约束:精排按 item 独立打分排序 → 高分 item 往往语义/类目高度同质(如5个相似短视频连续曝光)。
  • 位置偏差:用户注意力随位置显著衰减,且不同item对位置敏感度不同。
  • 上下文建模:用户决策是序列行为,而非独立事件。

二、重排架构演进:生成式模型实践

我们的重排系统采用G-E两阶段协同框架:

  • 生成阶段(Generation):高效生成若干高质量候选排列。
  • 评估阶段(Evaluation):对候选排列进行精细化打分,选出全局最优结果。

不考虑算力和耗时的情况下,通过穷举所有排列P(C,L)P(C,L)

生成阶段主要依赖启发式规则、随机扰动 + beamSearch算法生成候选list,双阶段范式存在显著的痛点:

  • 质量-延迟-多样性的“不可能三角”:在实践中,增加生成候选list数一般可以提升最终list的质量,但边际收益递减;优化过程中,我们通过增加多目标、多样性等策略都取得了消费指标的提升,但在候选list达到百量级时,单纯增加候选集对指标的提升,同时还有:
  1. 增加beam width,系统耗时增加,DCG@K提升逐渐减少。

  2. 增加通道数,通道间重叠度逐渐增加,去重list增加逐渐减少。

  • 阶段间目标不一致:
  1. 分布偏移:启发式生成Beam Search输出的Top排列中,20%被评估模型否定,生成阶段搜索效率浪费。

  2. 梯度断层:Beam Search含argmax操作,双阶段无法端到端优化;生成模型无法感知评估反馈,优化方向偏离全局最优。

生成模型优化

生成分为启发式方法和生成式模型方法, 一般认为生成式模型方法要好于启发式方法。生成式模型逐渐成为重排主流范式,主要分为两类:自回归生成模型、非自回归生成模型。

  • 自回归生成:按位置顺序逐个生成物品,第 t 位的预测依赖前 t-1 位已生成结果。
  1. 优点:

a. 序列依赖建模强,天然捕获物品间的顺序依赖。

b. 训练简单稳定,每步使用真实前序作为输入,收敛快。

c. 生成质量高,逐步细化决策,适合长序列精细优化。

  1. 缺点:

a. 推理延迟高,生成 L 个物品需 L 次前向传播,线上服务难以满足毫秒级要求。

b. 局部最优风险,早期错误决策无法回溯修正,影响整体序列质量。

  • 非自回归生成:一次性预测整个推荐序列的所有位置,各位置预测相互独立。
  1. 优点:

a. 推理速度极快:生成整个序列仅需1次前向传播。

2.缺点:

b.条件独立性假设过强:各位置并行预测,难以显式建模物品间复杂依赖关系。

非自回归模型

为了对齐双阶段一致性,同时考虑线上性能,我们推进了非自回归模型的上线。模型结构如下图:

模型包括Candidates Encoder和Position Encoder,Candidates Encoder是标准的Transformer结构, 用于获取item间的交互信息;Position Encoder额外增加了Cross Attention,期望Position序列同时关注Candidate序列。

  • 模型特征:用户信息、item特征、位置信息、上游精排打分特征。
  • 模型输出:一次性输出 n×L 的位置-物品得分矩阵(n 为候选 item 数,L 为目标列表长度),支持高效并行推理
p^ij=exp(xitj)i=1nexp(xitj)\hat{p}_{ij} = \frac{\exp(\mathbf{x}_i^\top \mathbf{t}_j)}{\sum_{i=1}^n \exp(\mathbf{x}_i^\top \mathbf{t}_j)}
  • 位置感知建模:引入可学习位置嵌入,显式建模“同一 item 在不同位置表现不同”的现象(如首屏效应、位置衰减)。
  • 训练目标:模型使用logloss,让正反馈label序列的生成概率最大, 同时负反馈label序列的生成概率最小:
Llog=i[pijyilog(pij^)+pij(1yi)log(1pij^)]\mathcal{L}_{\log} = -\sum_{i} \big[ p_{ij}y_i \log(\hat{p_{ij}}) + p_{ij}(1-y_i) \log(1-\hat{p_{ij}}) \big]

其中,pijp_{ij}表示位置i上是否展示物品j,yiy_{i}表示位置i上的label。

线上实验及收益:

  • 一期新增了非自回归生成通道,pvctr +0.6%,时长+0.55%。
  • 二期在所有通道排序因子中bagging非自回归模型,pvctr +1.0%,时长+1.13%。

自回归模型

由于条件独立性假设, 非自回归模型对上下文信息建模是不够的,近期我们重点推进了自回归模型的开发。

模型通过Transformer架构建模list整体收益,我们使用单向transformer模拟用户浏览行为的因果性,同时解决自回归生成的暴露偏差问题,保持训练和推理的一致性。结构如下:

  • 模型特征:用户信息、item特征、位置信息、上游精排打分特征。
  • 训练目标:模型使用有序回归loss,在评估多个回合中不同长度的子列表时,能够很好地体现出序列中的增量价值。是用于判断长度为j的子列表是否已经达到i次点击或转化的损失函数。
Li,j(θj)=k=1N([yk<i]log(1pi,j(xk))+[yki]log(pi,j(xk)))L_{i,j}(\theta_j) = -\sum_{k=1}^{N} \left([y_k < i]\log(1-p_{i,j}(x_k)) + [y_k \geq i]\log(p_{i,j}(x_k))\right)

线上模型推理效率优化及实验效果:

自回归生成模型推理延迟高,生成 L 个物品需 L 次前向传播,线上服务难以满足毫秒级要求。因此,我们在传统自回归生成模型的基础上增加MTP(multi token prediction)结构,突破生成式重排模型推理瓶颈。其核心思想是将传统自回归的单步预测扩展为单步多token联合预测,显著减少生成迭代次数。

自回归生成模型在社区推荐已完成了推全,实验中我们新增了自回归生成模型通道,但不是完全体,仅部分位置生成调用了模型:

  • 一期调用两次模型,每次预测4个位置,pvctr +0.69%,有效vv +0.58%。
  • 二期调用两次模型,每次预测5个位置,pvctr +0.54%,有效vv +0.40%。

三、推理性能优化:端到端生成的效率保障

工程架构

为解决CPU推理模型延迟高、制约业务效果的问题,我们对DScatter模型服务进行升级,引入高性能GPU推理能力,具体方案如下:

  • GPU推理框架集成与升级:
  1. 框架升级:将现有依赖的推理框架升级为支持GPU的高性能服务框架。

  2. 硬件资源引入:引入 NVIDIA L20 等专业推理显卡,为当前的listwise评估模型及自回归生成模型提供专用算力,实现模型推理的硬件加速。

  • DScatter模型服务独立部署与容量提升:
  1. 为解决模型部署效率低与资源竞争问题,将DScatter的模型打分逻辑从现有重排服务中完全解耦,构建并部署独立的 DScatter-Model-Server 集群,从根本上消除与重排服务在CPU、内存等关键资源上的竞争。

模型优化

  • 模型格式转换与加速:

导出为 ONNX 格式,使用 TensorRT 进行量化、层融合、动态张量显存等技术加速推理。

  • Item Embedding缓存:

预计算item静态网络,线上直接查询节省计算量。

  • 自回归生成模型核心优化,KV Cache 复用:

缓存已生成token的KV和attention值,仅计算增量token相关值,避免重复计算。

  • 其他LLM推理加速技术应用落地,例如GQA

四、未来规划:迈向端到端序列生成的下一代重排架构

当前“生成-评估”双阶段范式虽在工程落地性上取得平衡,但其本质仍是局部优化:生成阶段依赖启发式规则或浅层模型生成候选,评估阶段虽能识别优质序列,却无法反向指导生成过程,导致系统能力存在理论上限。为突破这一瓶颈,我们规划构建端到端序列生成(End-to-End Sequence Generation) 架构,将重排从“候选筛选”升级为“序列创造”,直接以全局业务目标(如用户停留时长、互动深度、内容生态健康度)为优化目标。

核心架构设计:

  • 统一生成器:以 Transformer 为基础架构,搭建自回归序列建模能力,采用分层混合生成策略:
  1. 粗粒度并行生成:首层预测序列骨架(如类目分布、内容密度)等。

  2. 细粒度自回归精调:在骨架约束下,自回归生成具体 item,确保局部最优。

  • 序列级Reward Modeling:
  1. 构建多目标 reward 函数:xtr、多样性。

  2. Engagement:基于用户滑动轨迹建模序列累积收益(如滑动深度加权CTR)。

  3. Diversity:跨类目/创作者/内容形式的分布熵。

4.Fairness:冷启内容、长尾创作者曝光保障。

训练范式升级:强化学习与对比学习融合

推进自回归生成模型的架构升级与训练体系重构,引入强化学习微调(PPO/DPO)与对比学习机制,提升序列整体效率。

  • 搭建近线系统,生成高质量list候选,提升系统能力上限:

1.基于 DCG 的列表质量打分:

a. 对每个曝光列表L,计算其 DCG@K作为质量分数:

DCG(L)=j=1Kgain(itemj)log2(j+1)\text{DCG}(L) = \sum_{j=1}^{K} \frac{\text{gain}(item_j)}{\log_2(j + 1)}

其中 gain(item)可定义为:

若点击:+1.0

若互动(点赞/收藏):+1.5

若观看 >5s:+0.8

否则:0

2.构造偏好对:

a.对同一用户在同一上下文下的两个列表 LwL_w(win)和 LlL_l(lose)。

b.若 DCG(Lw)>DCG(Ll)+δDCG(L_w) > DCG(L_l) + \delta(δ 为 margin,如 0.1),则构成一个有效偏好对。

  • 引入强化学习微调(PPO/DPO)与对比学习机制,提升序列整体效率:
  1. 模型结构:

a.使用当前自回归生成模型作为策略模型。

b.固定预训练模型作为参考策略 (即 DPO 中的“旧策略”)。

2.DPO损失:

LDPO(θ)=E(x,yw,yl)D[logσ(β(logπθ(ywx)πref(ywx)logπθ(ylx)πref(ylx)))]\mathcal{L}_{\text{DPO}}(\theta) = -\mathbb{E}_{(x, y_w, y_l) \sim \mathcal{D}} \left[ \log \sigma \left( \beta \left( \log \frac{\pi_\theta(y_w \mid x)}{\pi_{\text{ref}}(y_w \mid x)} - \log \frac{\pi_\theta(y_l \mid x)}{\pi_{\text{ref}}(y_l \mid x)} \right) \right) \right]
  • 技术价值:
  1. 突破“质量-延迟-多样性”不可能三角:通过序列级优化,在同等延迟下实现质量与多样性双提升。

  2. 为AIGC与推荐融合铺路:端到端生成器可无缝接入AIGC内容,实现“内容生成-序列编排”一体化。

参考文献:

  1. Gloeckle F, Idrissi B Y, Rozière B, et al. Better & faster large language models via multi-token prediction[J]. arXiv preprint arXiv:2404.19737, 2024.
  2. Ren Y, Yang Q, Wu Y, et al. Non-autoregressive generative models for reranking recommendation[C]//Proceedings of the 30th ACM SIGKDD Conference on Knowledge Discovery and Data Mining. 2024: 5625-5634.
  3. Meng Y, Guo C, Cao Y, et al. A generative re-ranking model for list-level multi-objective optimization at taobao[C]//Proceedings of the 48th International ACM SIGIR Conference on Research and Development in Information Retrieval. 2025: 4213-4218.
  4. Zhao X, Xia L, Zhang L, et al. Deep reinforcement learning for page-wise recommendations[C]//Proceedings of the 12th ACM conference on recommender systems. 2018: 95-103.
  5. Feng Y, Hu B, Gong Y, et al. GRN: Generative Rerank Network for Context-wise Recommendation[J]. arXiv preprint arXiv:2104.00860, 2021.
  6. Pang L, Xu J, Ai Q, et al. Setrank: Learning a permutation-invariant ranking model for information retrieval[C]//Proceedings of the 43rd international ACM SIGIR conference on research and development in information retrieval. 2020: 499-508.

往期回顾

1.Flink ClickHouse Sink:生产级高可用写入方案|得物技术

2.服务拆分之旅:测试过程全揭秘|得物技术

3.大模型网关:大模型时代的智能交通枢纽|得物技术

4.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

5.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

文 /张卓

关注得物技术,每周一、三更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

Flink ClickHouse Sink:生产级高可用写入方案|得物技术

作者 得物技术
2026年2月10日 11:11

一、背景与痛点

业务场景

在实时大数据处理场景中,Flink + ClickHouse 的组合被广泛应用于:

  • 日志处理: 海量应用日志实时写入分析库。
  • 监控分析: 业务指标、APM 数据的实时聚合。

这些场景的共同特点:

  • 数据量大:百万级 TPS,峰值可达千万级。
  • 写入延迟敏感: 需要秒级可见。
  • 数据准确性要求高:不允许数据丢失。
  • 多表写入: 不同数据根据分表策略写入不同的表。

开源 Flink ClickHouse Sink 的痛点

Flink 官方提供的 ClickHouse Sink(flink-connector-jdbc)在生产环境中存在以下严重问题:

痛点一:缺乏基于数据量的攒批机制

问题表现:

// Flink 官方 JDBC Sink 的实现
public class JdbcSink<Textends RichSinkFunction<T> {
    private final int batchSize;  // 固定批次大小
    @Override
    public void invoke(value, Context context) {
        bufferedValues.add(value);
        if (bufferedValues.size() >= batchSize) {
            // 只能基于记录数攒批,无法基于数据量
            flush();
        }
    }

带来的问题:

  1. 内存占用不可控: 100 条 1KB 的日志和 100 条 10MB 的日志占用内存差距 100 倍。
  2. OOM 风险高: 大日志记录(如堆栈转储)会迅速撑爆内存。
  3. 写入性能差: 无法根据记录大小动态调整批次,导致小记录批次过大浪费网络开销。

痛点二:无法支持动态表结构

问题表现:

// Flink 官方 Sink 只能写入固定表
public class JdbcSink {
    private final String sql;  // 固定的 INSERT SQL
    public JdbcSink(String jdbcUrl, String sql, ...) {
        this.sql = sql;  // 硬编码的表结构
    }
}

带来的问题:

  1. 多应用无法隔离: 所有应用的数据写入同一张表,通过特定分表策略区分。
  2. 扩展性差: 新增应用需要手动建表,无法动态路由。
  3. 性能瓶颈: 单表数据量过大(百亿级),查询和写入性能急剧下降。

痛点三:分布式表写入性能问题

问题表现:

// 大多数生产实现直接写入分布式表
INSERT INTO distributed_table_all VALUES (...)

ClickHouse 分布式表的工作原理:

带来的问题:

  1. 网络开销大: 数据需要经过分布式表层转发,延迟增加。
  2. 写入性能差: 分布式表增加了路由和转发逻辑,吞吐量降低。
  3. 热点问题: 所有数据先到分布式表节点,再转发,造成单点瓶颈。

生产级方案的核心改进

针对以上痛点,本方案提供了以下核心改进:

改进一:基于数据量的攒批机制

public class ClickHouseSinkCounter {
    private Long metaSize;  // 累计数据量(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize();  // 累加数据量
    }
}
// 触发条件
private boolean flushCondition(String application) {
    return checkMetaSize(application)  // metaSize >= 10000 字节
        || checkTime(application);     // 或超时 30 秒
}

优势:

  • 内存可控: 根据数据量而非记录数攒批。
  • 精确控制: 1KB 的记录攒 10000 条 = 10MB,1MB 的记录攒 10 条 = 10MB。
  • 避免OOM: 大日志记录不会撑爆内存。

改进二:动态表结构与分片策略

public abstract class ClickHouseShardStrategy<T> {
    public abstract String getTableName(T data);
}
//日志侧实现为应用级分表
public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 动态路由:order-service → tb_logs_order_service
        return String.format("tb_logs_%s", application);
    }
}

优势:

  • 应用隔离: 日志侧内置应用级分表,每个应用独立分表。
  • 动态路由: 根据 application 自动路由到目标表。
  • 扩展性强: 新增应用无需手动建表(配合 ClickHouse 自动建表)。

改进三:本地表写入 + 动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 直接写本地表,避免分布式表转发
    private final ConcurrentMap<String, HikariDataSource> dataSourceMap;
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1. 动态获取集群节点列表
        List<String> healthyHosts = getHealthyHosts(exceptionHosts);
        // 2. 随机选择健康节点
        return dataSourceMap.get(healthyHosts.get(random.nextInt(size)));
    }
}

优势:

  • 性能提升: 直接写本地表,避免网络转发。
  • 高可用: 动态节点发现 + 故障节点剔除。
  • 负载均衡: 随机选择 + Shuffle 初始化。

技术方案概览

基于以上改进,本方案提供了以下核心能力:

  1. 本地表/分布式表写入: 性能优化与高可用平衡。
  2. 分片策略: 按应用维度路由与隔离。
  3. 攒批与内存控制: 双触发机制(数据量 + 超时)。
  4. 流量控制与限流: 有界队列 + 连接池。
  5. 健壮的重试机制: 递归重试 + 故障节点剔除。
  6. Checkpoint 语义保证: At-Least-Once 数据一致性。

二、核心架构设计

架构图

核心组件

核心流程

三、本地表 vs 分布式表写入

ClickHouse 表结构说明

ClickHouse 推荐直接写本地表,原因:

  1. 写入性能: 避免分布式表的网络分发。
  2. 数据一致性: 直接写入目标节点,减少中间环节故障点,比分布式表写入更安全,利于工程化。
  3. 负载均衡: 客户端路由实现负载分散。
-- 本地表(实际存储数据)
CREATE TABLE tb_logs_local ON CLUSTER 'default' (
    application String,
    environment String,
    message String,
    log_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(log_time)
ORDER BY (application, log_time);
-- 分布式表(逻辑视图,不存储数据)
CREATE TABLE tb_logs_all ON CLUSTER 'default' AS tb_logs_local
ENGINE = Distributed('default', dw_log, tb_logs_local, cityHash64(application));

HikariCP 连接池配置

// HikariCP 连接池配置
public class ClickHouseDataSourceUtils {
    private static HikariConfig getHikariConfig(DataSourceImpl dataSource) {
        HikariConfig config = new HikariConfig();
        config.setConnectionTimeout(30000L);    // 连接超时 30s
        config.setMaximumPoolSize(20);          // 最大连接数 20
        config.setMinimumIdle(2);               // 最小空闲 2
        config.setDataSource(dataSource);
        return config;
    }
    private static Properties getClickHouseProperties(ClickHouseSinkCommonParams params) {
        Properties props = new Properties();
        props.setProperty("user", params.getUser());
        props.setProperty("password", params.getPassword());
        props.setProperty("database", params.getDatabase());
        props.setProperty("socket_timeout""180000");      // Socket 超时 3 分钟
        props.setProperty("socket_keepalive""true");      // 保持连接
        props.setProperty("http_connection_provider""APACHE_HTTP_CLIENT");
        return props;
    }
}

配置说明:

  • maxPoolSize=20:每个 ClickHouse 节点最多 20 个连接。
  • minIdle=2:保持 2 个空闲连接,避免频繁创建。
  • socket_timeout=180s:Socket 超时 3 分钟,防止长时间查询阻塞。

ClickHouseLocalWriter:动态节点发现

public class ClickHouseLocalWriter extends ClickHouseWriter {
    // 本地节点缓存,按 IP 维护
    private final ConcurrentMap<StringHikariDataSource> dataSourceMap;
    // 动态获取集群本地表节点
    private final ClusterIpsUtils clusterIpsUtils;
    // IP 变更标志(CAS 锁,避免并发更新)
    private static final AtomicBoolean IP_CHANGING = new AtomicBoolean(false);
    @Override
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 1️⃣ 检测集群节点变化(通过 CAS 避免并发更新)
        if (clusterIpsChanged() && IP_CHANGING.compareAndSet(falsetrue)) {
            try {
                ipChanged(); // 动态更新 dataSourceMap
            } finally {
                IP_CHANGING.set(false);
            }
        }
        // 2️⃣ 获取异常节点列表(从 Redis + APM 实时查询)
        Set<String> exceptIps = clusterIpsUtils.getExceptIps();
        exceptIps.addAll(exceptionHosts);
        // 3️⃣ 过滤健康节点,随机选择
        List<String> healthyHosts = dataSourceMap.keySet().stream()
            .filter(host -> !exceptIps.contains(host))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            throw new RuntimeException("Can't get datasource from local cache");
        }
        return dataSourceMap.get(healthyHosts.get(random.nextInt(healthyHosts.size())));
    }
    private void ipChanged() {
        List<String> clusterIps = clusterIpsUtils.getClusterIps();
        // 新增节点:自动创建连接池
        clusterIps.forEach(ip ->
            dataSourceMap.computeIfAbsent(ip, v ->
                createHikariDataSource(ip, port)
            )
        );
        // 移除下线节点:关闭连接池
        dataSourceMap.forEach((ip, ds) -> {
            if (!clusterIps.contains(ip)) {
                dataSourceMap.remove(ip);
                ds.close();
            }
        });
    }
}

核心逻辑:

  1. 动态节点发现: 从 system.clusters 查询所有节点。
  2. 自动扩缩容: 节点上线自动加入,下线自动剔除。
  3. 故障节点剔除: 通过 APM 监控,自动剔除异常节点。
  4. 负载均衡: 随机选择健康节点,避免热点。

集群节点动态发现(ClusterIpsUtils)

public class ClusterIpsUtils {
    // 从 system.clusters 查询所有节点
    private static final String QUERY_CLUSTER_IPS =
        "select host_address from system.clusters where cluster = 'default'";
    // LoadingCache:定时刷新节点列表(1 小时)
    private final LoadingCache<StringList<String>> clusterIpsCache =
        CacheBuilder.newBuilder()
            .expireAfterAccess(10, TimeUnit.HOURS)
            .refreshAfterWrite(1, TimeUnit.HOURS)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public List<String> load(String dbName) {
                    return queryClusterIps();  // 定时刷新节点列表
                }
            }));
    // 异常节点缓存(1 分钟刷新)
    private final LoadingCache<StringFlinkExceptIpModel> exceptIpsCache =
        CacheBuilder.newBuilder()
            .refreshAfterWrite(1, TimeUnit.MINUTES)
            .build(CacheLoader.asyncReloading(new CacheLoader<>() {
                @Override
                public FlinkExceptIpModel load(String dbName) {
                    return queryExceptIp();  // 从 Redis + APM 查询异常节点
                }
            }));
}

异常节点监控策略:

  • 磁盘使用率 >= 90%: 从 APM 查询 Prometheus 指标,自动加入黑名单。
  • HTTP 连接数 >= 50: 连接数过多说明节点压力大,自动加入黑名单。
  • 人工配置: 通过 Redis 配置手动剔除节点

数据来源:

  1. ClickHouse system.clusters 表: 获取所有集群节点。
  2. APM Prometheus 接口: 监控节点健康状态。
  3. Redis 缓存: 人工配置的异常节点。

负载均衡优化

public class ClickHouseWriter {
    public <T> ClickHouseWriter(...) {
        // Shuffle:随机打乱数据源顺序
        Collections.shuffle(clickHouseDataSources);
        this.clickHouseDataSources = clickHouseDataSources;
    }
    public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
        // 轮询 + 随机选择(已 shuffle,避免热点)
        int current = this.currentRandom.getAndIncrement();
        if (current >= clickHouseDataSources.size()) {
            this.currentRandom.set(0);
        }
        return clickHouseDataSources.get(currentRandom.get());
    }
}

优势:

  • 初始化时 shuffle,避免所有 writer 同时从第一个节点开始。
  • 轮询 + 随机选择,负载分散更均匀。
  • 故障节点自动剔除。

四、支持分表策略

分片策略抽象

public abstract class ClickHouseShardStrategy<T> {
    private String tableName;      // 表名模板,如 "tb_log_%s"
    private Integer tableCount;    // 分表数量
    // 根据数据决定目标表名
    public abstract String getTableName(T data);
}

日志分片实现

public class LogClickHouseShardStrategy extends ClickHouseShardStrategy<String> {
    @Override
    public String getTableName(String application) {
        // 表名格式:tb_log_{application}
        // 例如:application = "order-service" -> table = "tb_log_order_service"
        return String.format(
            this.getTableName(),
            application.replace("-""_").toLowerCase()
        );
    }
}

按表(应用)维度的缓冲区

日志侧维度降级为应用名称维度缓冲区,实则因为按照应用分表,

业务方可使用自身分表策略携带表名元数据,进行表维度缓冲。

public class ClickHouseShardSinkBuffer {
    // 按 application 分组的缓冲区(ConcurrentHashMap 保证并发安全)
    private final ConcurrentHashMap<StringClickHouseSinkCounter> localValues;
    public void put(LogModel value) {
        String application = value.getApplication();
        // 1️⃣ 检查是否需要 flush
        if (flushCondition(application)) {
            addToQueue(application); // 触发写入
        }
        // 2️⃣ 添加到缓冲区(线程安全的 compute 操作)
        localValues.compute(application, (k, v) -> {
            if (v == null) v = new ClickHouseSinkCounter();
            v.add(value);
            return v;
        });
    }
    private void addToQueue(String application) {
        localValues.computeIfPresent(application, (k, v) -> {
            // 深拷贝并清空(避免并发修改异常)
            List<LogModel> deepCopy = v.copyValuesAndClear();
            // 构造请求 Blank:application + targetTable + values
            String targetTable = shardStrategy.getTableName(application);
            ClickHouseRequestBlank blank = new ClickHouseRequestBlank(deepCopy, application, targetTable);
            // 放入队列
            writer.put(blank);
            return v;
        });
    }
}

核心设计:

  • 应用隔离: 每个表(应用)独立的 buffer,互不影响。
  • 线程安全: 使用 ConcurrentHashMap.compute()保证并发安全。
  • 深拷贝: List.copyOf() 创建不可变副本,避免并发修改。
  • 批量清空: 一次性取出所有数据,清空计数器。

五、攒批与内存控制

双触发机制

public class ClickHouseShardSinkBuffer {
    private final int maxFlushBufferSize;  // 最大批次大小(如 10000)
    private final long timeoutMillis;      // 超时时间(如 30s)
    // 触发条件检查(满足任一即触发)
    private boolean flushCondition(String application) {
        return localValues.get(application) != null
            && (checkMetaSize(application) || checkTime(application));
    }
    // 条件1:达到批次大小
    private boolean checkMetaSize(String application) {
        return localValues.get(application).getMetaSize() >= maxFlushBufferSize;
    }
    // 条件2:超时
    private boolean checkTime(String application) {
        long current = System.currentTimeMillis();
        return current - localValues.get(application).getInsertTime() > timeoutMillis;
    }
}

批次大小计算

public class ClickHouseSinkCounter {
    private final List<LogModel> values;
    private Long metaSize; // 累计的 metaSize(字节)
    public void add(LogModel value) {
        this.values.add(value);
        this.metaSize += value.getMetaSize(); // 累加 metaSize
    }
    public List<LogModel> copyValuesAndClear() {
        List<LogModel> logModels = List.copyOf(this.values); // 深拷贝(不可变)
        this.values.clear();
        this.metaSize = 0L;
        this.insertTime = System.currentTimeMillis();
        return logModels;
    }
}

关键点:

  • 使用 metaSize(字节数)而非记录数控制批次,内存控制更精确。
  • List.copyOf() 创建不可变副本,避免并发修改。
  • 清空后重置 insertTime,保证超时触发准确性。

带随机抖动的超时

private final long timeoutMillis;
public ClickHouseShardSinkBuffer(..., int timeoutSec, ...) {
    // 基础超时 + 10% 随机抖动(避免惊群效应)
    this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSec)
                      + new SecureRandom().nextInt((int) (timeoutSec * 0.1 * 1000));
}

目的: 避免多个TM 同时触发 flush,造成写入流量峰值。

配置示例

ClickHouseShardSinkBuffer.Builder
    .aClickHouseSinkBuffer()
    .withTargetTable("single_table")  //单表时,可直接使用指定表名
    .withMaxFlushBufferSize(10000)  // 对应字节数
    .withTimeoutSec(30)              // 30 秒超时
    .withClickHouseShardStrategy(new LogClickHouseShardStrategy("table_prefix_%s", 8))  //分表策略时,使用
    // 分表策略可根据业务实际情况进行扩展
    .build(clickHouseWriter);

六、写入限流与流量控制

有界队列设计

public class ClickHouseWriter {
    // 有界阻塞队列
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, ...) {
        // 队列最大容量配置(默认 10)
        this.commonQueue = new LinkedBlockingQueue<>(sinkParams.getQueueMaxCapacity());
    }
    public void put(ClickHouseRequestBlank params) {
        unProcessedCounter.incrementAndGet();
        // put() 方法在队列满时会阻塞,实现背压
        commonQueue.put(params);
    }
}

背压传导:

线程池并发控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

WriterTask 消费逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

配置参数

七、重试机制与超时控制

Future 超时控制

public class ClickHouseWriter {
    private final int numWriters; // 写入线程数
    private ExecutorService service;
    private void buildComponents() {
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        service = Executors.newFixedThreadPool(numWriters, threadFactory);
        // 创建多个 WriterTask 并提交
        for (int i = 0; i < numWriters; i++) {
            WriterTask task = new WriterTask(i, commonQueue, sinkParams, futures, unProcessedCounter);
            service.submit(task);
        }
    }
}

超时策略:

  • Future 超时: 3 分钟(orTimeout)。
  • Socket 超时: 3 分钟(socket_timeout=180000)。
  • 连接超时: 30 秒(connectionTimeout=30000)。

重试逻辑

class WriterTask implements Runnable {
    @Override
    public void run() {
        isWorking = true;
        while (isWorking || !queue.isEmpty()) {
            // poll() 超时返回(100ms),避免无限等待
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置超时(3 分钟)
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES);
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    // final 进行未知异常兜底,防止为捕获异常造成future状态不完成,永久阻塞
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Unknown exception"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}

重试控制逻辑

private void handleUnsuccessfulResponse(..., Set<String> exceptHosts) {
    // 检查 Future 是否已完成(避免重复完成)
    if (future.isDone()) {
        return;
    }
    if (attemptCounter >= maxRetries) {
        // 达到最大重试次数,标记失败
        future.completeExceptionally(new RuntimeException("Max retries exceeded"));
    } else {
        // 递归重试
        requestBlank.incrementCounter();
        send(requestBlank, future, exceptHosts); // 递归调用,排除失败节点
    }
}

重试策略:

  • 递归重试: 失败后递归调用,直到成功或达到最大次数。
  • 异常节点隔离: 每次重试时排除失败的节点(exceptHosts)。
  • 超时控制: Future 超时(3 分钟)防止永久阻塞。

为什么递归重试是更好的选择

递归重试(当前实现)

队列重试(假设方案)

保证一致性

  // ClickHouseWriter.java:139-158
  while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
      CompletableFuture<Void> future = FutureUtil.allOf(futures);
      future.get(3, TimeUnit.MINUTES);  // 阻塞直到全部完成
  }
  • Checkpoint 时所有数据要么全部成功,要么全部失败。
  • 重启后不会有部分数据重复的问题。

简单可靠

  • 代码逻辑清晰。
  • 对于队列重试且不重复,需要复杂的二阶段提交(这里暂不展开),大幅增加代码复杂度。

性能可接受

class WriterTask implements Runnable {
    @Override
    public void run() {
        while (isWorking || !queue.isEmpty()) {
            ClickHouseRequestBlank blank = queue.poll(100, TimeUnit.MILLISECONDS);
            if (blank != null) {
                // 创建 Future 并设置 3 分钟超时
                CompletableFuture<Boolean> future = new CompletableFuture<>();
                future.orTimeout(3, TimeUnit.MINUTES); // 防止永久阻塞
                futures.add(future);
                try {
                    send(blank, future, new HashSet<>());
                } finally {
                    if (!future.isDone()) {
                        future.completeExceptionally(new RuntimeException("Timeout"));
                    }
                    queueCounter.decrementAndGet();
                }
            }
        }
    }
}
  • 虽然阻塞,但有超时保护。
  • ClickHouse 写入通常很快(秒级)。
  • 网络故障时重试也合理。

避开故障节点

  // ClickHouseWriter.java:259-260
  HikariDataSource dataSource = getNextDataSource(exceptHosts);
  • 递归时可以传递 exceptHosts。
  • 自动避开失败的节点。
  • 提高成功率。

异常节点剔除

// 特殊错误码列表(自动加入黑名单)
private final List<Integer> ignoreHostCodes = Arrays.asList(2101002);
public HikariDataSource getNextDataSource(Set<String> exceptionHosts) {
    if (CollectionUtils.isNotEmpty(exceptionHosts)) {
        // 过滤异常节点
        List<HikariDataSource> healthyHosts = clickHouseDataSources.stream()
            .filter(ds -> !exceptionHosts.contains(getHostFromUrl(ds)))
            .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(healthyHosts)) {
            return null// 所有节点都异常
        }
        return healthyHosts.get(random.nextInt(healthyHosts.size()));
    }
    // 正常轮询(已 shuffle,避免热点)
    return clickHouseDataSources.get(currentRandom.getAndIncrement() % size);
}

故障节点剔除策略:

  1. 错误码 210(网络异常): 自动加入黑名单。
  2. 错误码 1002(连接池异常): 自动加入黑名单。
  3. APM 监控: 磁盘 >= 90%、HTTP 连接 >= 50 的节点。
  4. 手动配置: 通过 Redis 配置剔除。

恢复机制:

  • LoadingCache 定时刷新(1 分钟)。
  • 节点恢复健康后自动从黑名单移除。

重试流程图

八、异常处理模式

两种 Sink 模式

public Sink buildSink(String targetTable, String targetCount, int maxBufferSize) {
    IClickHouseSinkBuffer buffer = ClickHouseShardSinkBuffer.Builder
        .aClickHouseSinkBuffer()
        .withTargetTable(targetTable)
        .withMaxFlushBufferSize(maxBufferSize)
        .withClickHouseShardStrategy(new LogClickHouseShardStrategy(targetTable, count))
        .build(clickHouseWriter);
    // 根据配置选择模式
    if (ignoringClickHouseSendingExceptionEnabled) {
        return new UnexceptionableSink(buffer);  // 忽略异常
    } else {
        return new ExceptionsThrowableSink(buffer); // 抛出异常
    }
}

UnexceptionableSink(忽略异常 - At-Most-Once)

public class UnexceptionableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) {
        buffer.put(message);  // 不检查 Future 状态
    }
    @Override
    public void flush() {
        buffer.flush();
    }
}

适用场景:

  • 允许部分数据丢失。
  • 不希望因写入异常导致任务失败。
  • 对数据准确性要求不高(如日志统计)。

语义保证:At-Most-Once(最多一次)

ExceptionsThrowableSink(抛出异常 - At-Least-Once)

public class ExceptionsThrowableSink implements Sink<LogModel> {
    private final IClickHouseSinkBuffer<LogModel> buffer;
    @Override
    public void put(LogModel message) throws ExecutionException, InterruptedException {
        buffer.put(message);
        // 每次写入都检查 Future 状态
        buffer.assertFuturesNotFailedYet();
    }
    @Override
    public void flush() throws ExecutionException, InterruptedException {
        buffer.flush();
    }
}

Future 状态检查:

public void assertFuturesNotFailedYet() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = FutureUtil.allOf(futures);
    // 非阻塞检查
    if (future.isCompletedExceptionally()) {
        logger.error("There is something wrong with the future. exist sink now");
        future.get(); // 抛出异常,导致 Flink 任务失败
    }
}

适用场景:

  • 数据准确性要求高。
  • 需要保证所有数据写入成功。
  • 异常时希望 Flink 任务失败并重启。

语义保证:At-Least-Once(至少一次)

Future 清理策略与并发控制

定时检查器

public class ClickHouseSinkScheduledCheckerAndCleaner {
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<CompletableFuture<Boolean>> futures;
    // ⚠️ volatile 保证多线程可见性(关键并发控制点)
    private volatile boolean isFlushing = false;
    public ClickHouseSinkScheduledCheckerAndCleaner(...) {
        // 单线程定时执行器
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        // 定时执行清理任务(每隔 checkTimeout 秒,默认 30 秒)
        scheduledExecutorService.scheduleWithFixedDelay(getTask(), ...);
    }
    private Runnable getTask() {
        return () -> {
            synchronized (this) {
                //  关键:检查是否正在 flush,避免并发冲突
                if (isFlushing) {
                    return// Checkpoint 期间暂停清理
                }
                // 1️⃣ 清理已完成的 Future
                futures.removeIf(filter);
                // 2️⃣ 触发所有 Buffer 的 flush(检查是否需要写入)
                clickHouseSinkBuffers.forEach(IClickHouseSinkBuffer::tryAddToQueue);
            }
        };
    }
    // Checkpoint flush 前调用(暂停 cleaner)
    public synchronized void beforeFlush() {
        isFlushing = true;
    }
    // Checkpoint flush 后调用(恢复 cleaner)
    public synchronized void afterFlush() {
        isFlushing = false;
    }
}

核心设计:

  • volatile boolean isFlushing: 标志位,协调 cleaner 与 checkpoint 线程。
  • synchronized (this): 保证原子性,避免并发冲突。
  • 单线程执行器: 避免 cleaner 内部并发问题。

并发控制机制

问题场景:

时间轴冲突:
T1: Cleaner 线程正在执行 tryAddToQueue()
T2: Checkpoint 触发,调用 sink.flush()
T3: Cleaner 同时也在执行 tryAddToQueue()
    ├─ 可能导致:数据重复写入
    ├─ 可能导致:Buffer 清空顺序混乱
    └─ 可能导致:Future 状态不一致

解决方案:

// ClickHouseSinkManager.flush()
public void flush() {
    // 1️⃣ 暂停定时清理任务(设置标志)
    clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
    try {
        // 2️⃣ 执行 flush(此时 cleaner 线程会跳过执行)
        clickHouseWriter.waitUntilAllFuturesDone(falsefalse);
    } finally {
        // 3️⃣ 恢复定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
    }
}

并发控制流程:

关键设计点:

  1. volatile 保证可见性: isFlushing 使用 volatile,确保多线程间的可见性。
  2. synchronized 保证原子性: getTask() 整个方法体使用 synchronized (this)。
  3. 标志位协调: 通过 isFlushing 标志实现两个线程间的协调。
  4. finally 确保恢复: 即使 waitUntilAllFuturesDone() 异常,也会在 finally 中恢复 cleaner。

避免的并发问题:

  • 数据重复写入: Cleaner 和 Checkpoint 同时 flush。
  • Buffer 状态不一致: 一边清空一边写入。
  • Future 清理冲突: 正在使用的 Future 被清理。

性能影响:

  • Checkpoint flush 期间,cleaner 暂停执行(通常 1-3 秒)。
  • Cleaner 跳过的周期会在下次正常执行时补偿。
  • 对整体吞吐影响极小(cleaner 间隔通常 30 秒)。

九、Checkpoint 语义保证

为什么 Checkpoint 时必须 Flush?

不 Flush 的后果

不Flush导致数据永久丢失

正确做法

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    logger.info("start doing snapshot. flush sink to ck");
    // 1. 先 flush buffer(将内存数据写入 ClickHouse)
    if (sink != null) {
        sink.flush();
    }
    // 2. 等待所有写入完成
    if (sinkManager != null && !sinkManager.isClosed()) {
        sinkManager.flush();
    }
    // 此时 Checkpoint 才能标记为成功
    logger.info("doing snapshot. flush sink to ck");
}

Flush 实现与并发协调

public class ClickHouseSinkManager {
    public void flush() {
        //  步骤1:暂停定时清理任务
        clickHouseSinkScheduledCheckerAndCleaner.beforeFlush(); // isFlushing = true
        try {
            //  步骤2:执行 buffer flush + 等待所有写入完成
            clickHouseWriter.waitUntilAllFuturesDone(falsefalse);
        } finally {
            //  步骤3:恢复定时清理任务(finally 确保执行)
            clickHouseSinkScheduledCheckerAndCleaner.afterFlush(); // isFlushing = false
        }
    }
}

并发协调详解:

// cleaner 线程执行流程
synchronized (this) {
    if (isFlushing) {
        return// Checkpoint 期间跳过本次执行
    }
    // 正常执行:清理已完成的 Future + 触发 Buffer flush
    futures.removeIf(filter);
    buffers.forEach(Buffer::tryAddToQueue);
}

关键点:

  • volatile 可见性: isFlushing 使用 volatile 确保 cleaner 线程立即看到状态变化。
  • synchronized互斥: getTask()方法体使用 synchronized (this) 确保原子性。
  • 标志位协调: 通过 beforeFlush() / afterFlush() 管理标志位。
  • finally 保证恢复: 即使 flush 异常,也会在 finally 中恢复 cleaner。

等待所有 Future 完成

public synchronized void waitUntilAllFuturesDone(boolean stopWriters, boolean clearFutures) {
    try {
        // 循环等待:直到所有 Future 完成 + 队列清空
        while (!futures.isEmpty() || unProcessedCounter.get() > 0) {
            CompletableFuture<Void> all = FutureUtil.allOf(futures);
            // 最多等待 3 分钟(与 Future 超时一致)
            all.get(3, TimeUnit.MINUTES);
            // 移除已完成的 Future(非异常)
            futures.removeIf(f -> f.isDone() && !f.isCompletedExceptionally());
            // 检查是否有异常 Future
            if (anyFutureFailed()) {
                break; // 有异常则退出
            }
        }
    } finally {
        if (stopWriters) stopWriters();
        if (clearFutures) futures.clear();
    }
}

关键逻辑:

  • 循环等待直到所有 Future 完成 + 队列清空。
  • 超时 3 分钟(与 Future 超时一致)。
  • 移除已完成的非异常 Future。
  • 有异常时退出循环。

三种 Flush 触发方式对比

Checkpoint 参数配置

// Checkpoint 配置建议
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint(间隔 1 分钟)
env.enableCheckpointing(60000);
// Checkpoint 超时(必须大于 Future 超时 + 重试时间)
// 建议:CheckpointTimeout > FutureTimeout * MaxRetries
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10 分钟
// 一致性模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔(避免过于频繁)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30 秒
// 最大并发 Checkpoint 数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

语义保证

推荐配置:

生产环境: 使用 ExceptionsThrowableSink + Checkpoint。

允许部分丢失: 使用 UnexceptionableSink。

十、最佳实践与调优

生产配置

// ========== ClickHouse 连接参数 ==========
clickhouse.sink.target-table = tb_logs_local
clickhouse.sink.max-buffer-size104857600        // 批次大小
clickhouse.sink.table-count0                // 0 表示不分表
// ========== 写入性能参数 ==========
clickhouse.sink.num-writers10               // 写入线程数
clickhouse.sink.queue-max-capacity10        // 队列容量
clickhouse.sink.timeout-sec30               // flush 超时
clickhouse.sink.retries10                   // 最大重试次数
clickhouse.sink.check.timeout-sec30         // 定时检查间隔
// ========== 异常处理参数 ==========
clickhouse.sink.ignoring-clickhouse-sending-exception-enabledfalse
clickhouse.sink.local-address-enabledtrue   // 启用本地表写入
// ========== ClickHouse 集群配置 ==========
clickhouse.access.hosts192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123
clickhouse.access.user = default
clickhouse.access.password = ***
clickhouse.access.database = dw_xx_xx
clickhouse.access.cluster = default
// ========== HikariCP 连接池配置 ==========
connectionTimeout30000                      // 连接超时 30s
maximumPoolSize20                           // 最大连接数 20
minimumIdle2                                // 最小空闲 2
socket_timeout180000                        // Socket 超时 3mi

性能调优

故障排查

十一、总结

本文深入分析了 Flink ClickHouse Sink 的实现方案,核心亮点包括:

技术亮点

  • 连接池选型: 使用 HikariCP,性能优异,连接管理可靠。
  • Future 超时控制: orTimeout(3min) 防止永久阻塞。
  • 显式资源管理: Connection 和 PreparedStatement 显式关闭,防止连接泄漏。
  • 负载均衡优化: Shuffle 初始化 + 轮询选择,避免热点。
  • 异常处理增强: future.isDone() 检查,避免重复完成。
  • 本地表写入: 动态节点发现 + 故障剔除,写入性能提升。
  • 分片策略: 按表(应用)维度路由,独立缓冲和隔离。
  • 攒批优化: 双触发机制(大小 + 超时)+ 随机抖动。
  • 流量控制: 有界队列 + 线程池,实现背压。
  • 健壮重试: 递归重试 + 异常节点剔除 + 最大重试限制。

Checkpoint 语义

  • At-Least-Once: ExceptionsThrowableSink + Checkpoint。
  • At-Most-Once: UnexceptionableSink。
  • Exactly-Once: 需要配合 ClickHouse 事务(未实现)。

生产建议

  1. 必须: Checkpoint 时 flush,否则会丢数据。
  2. 推荐: 使用 HikariCP + 本地表写入。
  3. 推荐: 配置合理的超时(Future < Socket < Checkpoint)。
  4. 推荐: 监控队列大小、Future 失败率、重试次数。

该方案已在生产环境大规模验证,能够稳定支撑百万级 TPS 的日志写入场景。

往期回顾

1.服务拆分之旅:测试过程全揭秘|得物技术

2.大模型网关:大模型时代的智能交通枢纽|得物技术

3.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

4.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

5.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

文 /虚白

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

服务拆分之旅:测试过程全揭秘|得物技术

作者 得物技术
2026年2月5日 14:47

一、引言

代码越写越多怎么办?在线等挺急的! Bidding-interface服务代码库代码量已经达到100w行!!

Bidding-interface应用是出价域核心应用之一,主要面向B端商家。跟商家后台有关的出价功能都围绕其展开。是目前出价域代码量最多的服务。

随着出价业务最近几年来的快速发展,出价服务承接的流量虽然都是围绕卖家出价,但是已远远超过卖家出价功能范围。业务的快速迭代而频繁变更给出价核心链路高可用、高性能都带来了巨大的风险。

经总结有如下几个痛点:

  • 核心出价链路未隔离:

    出价链路各子业务模块间代码有不同程度的耦合,迭代开发可扩展性差,往往会侵入到出价主流程代码的改动。每个子模块缺乏独立的封装,而且存在大量重复的代码,每次业务规则调整,需要改动多处,容易出现漏改漏测的问题。

  • 大单体&功能模块定义混乱:

    历史原因上层业务层代码缺乏抽象,代码无法实现复用,需求开发代码量大,导致需求估时偏高,经常出现20+人日的大需求,需求开发中又写出大量重复代码,导致出价服务代码库快速膨胀,应用启动耗时过长,恶性循环。

  • B/C端链路未隔离:

    B端卖家出价链路流量与C端价格业务场景链路流量没有完全隔离,由于历史原因,有些B端出价链路接口代码还存在于price应用中,偶尔B端需求开发会对C端应用做代码变更。存在一定的代码管控和应用权限管控成本。

  • 发布效率影响:

    代码量庞大,导致编译速度缓慢。代码过多,类的依赖关系更为复杂,持续迭代逐步加大编译成本,随着持续迭代,新的代码逻辑 ,引入更多jar 依赖,间接导致项目部署时长变长蓝绿发布和紧急问题处理时长显著增加;同时由于编译与部署时间长,直接影响开发人员在日常迭代中的效率(自测,debug,部署)。

  • 业务抽象&分层不合理:

    历史原因出价基础能力领域不明确,出价底层和业务层分层模糊,业务层代码和出价底层代码耦合严重,出价底层能力缺乏抽象,上层业务扩展需求频繁改动出价底层能力代码。给出价核心链路代码质量把控带来较高的成本, 每次上线变更也带来一定的风险。

以上,对于Bidding服务的拆分和治理,已经箭在弦上不得不发。否则,持续的迭代会继续恶化服务的上述问题。

经过前期慎重的筹备,设计,排期,拆分,和测试。目前Bidding应用经过四期的拆分节奏,已经马上要接近尾声了。服务被拆分成三个全新的应用,目前在小流量灰度放量中。

本次拆分涉及:1000+Dubbo接口,300+个HTTP接口,200+ MQ消息,100+个TOC任务,10+个 DJob任务。

本人是出价域测试一枚,参与了一期-四期的拆分测试工作。

项目在全组研发+测试的ALL IN投入下,已接近尾声。值此之际输出一篇文章,从测试视角复盘下,Bidding服务的拆分与治理,也全过程揭秘下出价域内的拆分测试过程。

二、服务拆分的原则

首先,在细节性介绍Bidding拆分之前。先过大概过一下服务拆分原则:

  • 单一职责原则 (SRP):  每个服务应该只负责一项特定的业务功能,避免功能混杂。

  • 高内聚、低耦合:  服务内部高度内聚,服务之间松耦合,尽量减少服务之间的依赖关系。

  • 业务能力导向:  根据业务领域和功能边界进行服务拆分,确保每个服务都代表一个完整的业务能力。

拆分原则之下,还有不同的策略可以采纳:基于业务能力拆分、基于领域驱动设计 (DDD) 拆分、基于数据拆分等等。同时,拆分时应该注意:避免过度拆分、考虑服务之间的通信成本、设计合理的 API 接口。

服务拆分是微服务架构设计的关键步骤,需要根据具体的业务场景和团队情况进行综合考虑。合理的服务拆分可以提高系统的灵活性、可扩展性和可维护性,而不合理的服务拆分则会带来一系列问题。

三、Bidding服务拆分的设计

如引言介绍过。Bidding服务被拆分出三个新的应用,同时保留bidding应用本身。目前共拆分成四个应用:Bidding-foundtion,Bidding-interface,Bidding-operation和Bidding-biz。详情如下:

  • 出价基础服务-Bidding-foundation:

出价基础服务,对出价基础能力抽象,出价领域能力封装,基础能力沉淀。

  • 出价服务-Bidding-interfaces:

商家端出价,提供出价基础能力和出价工具,提供商家在各端出价链路能力,重点保障商家出价基础功能和出价体验。

  • 出价运营服务-Bidding-operation:

出价运营,重点支撑运营对出价业务相关规则的维护以及平台其他域业务变更对出价域数据变更的业务处理:

  1. 出价管理相关配置:出价规则配置、指定卖家规则管理、出价应急隐藏/下线管理工具等;
  2. 业务大任务:包括控价生效/失效,商研鉴别能力变更,商家直发资质变更,品牌方出价资质变更等大任务执行。
  • 业务扩展服务-Bidding-biz:

更多业务场景扩展,侧重业务场景的灵活扩展,可拆出的现有业务范围:国补采购单出价,空中成单业务,活动出价,直播出价,现订现采业务,预约抢购,新品上线预出价,入仓预出价。

应用拆分前后流量分布情况:

图片

四、Bidding拆分的节奏和目标收益

服务拆分是项大工程,对目前的线上质量存在极大的挑战。合理的排期和拆分计划是重点,可预期的收益目标是灵魂。

经过前期充分调研和规划。Bidding拆分被分成了四期,每期推进一个新应用。并按如下六大步进行:

图片

Bidding拆分目标

  • 解决Bidding大单体问题: 对Bidding应用进行合理规划,完成代码和应用拆分,解决一直以来Bidding大单体提供的服务多而混乱,维护成本高,应用编译部署慢,发布效率低等等问题。
  • 核心链路隔离&提升稳定性: 明确出价基础能力,对出价基础能力下沉,出价基础能力代码拆分出独立的代码库,并且部署在独立的新应用中,实现出价核心链路隔离,提升出价核心链路稳定性。
  • 提升迭代需求开发效率: 完成业务层代码抽象,业务层做组件化配置化,实现业务层抽象复用,降低版本迭代需求开发成本。
  • 实现出价业务应用合理规划: 各服务定位、职能明确,分层抽象合理,更好服务于企/个商家、不同业务线运营等不同角色业务推进。

预期的拆分收益

  • 出价服务应用结构优化:

    完成对Bidding大单体应用合理规划拆分,向下沉淀出出价基础服务应用层,降低出价基础能力维护成功;向上抽离出业务扩展应用层,能够实现上层业务的灵活扩展;同时把面向平台运营和面向卖家出价的能力独立维护;在代码库和应用层面隔离,有效减少版本迭代业务需求开发变更对应用的影响面,降低应用和代码库的维护成本。

  • 完成业务层整体设计,业务层抽象复用,业务层做组件化配置化,提升版本迭代需求开发效率,降低版本迭代需求开发成本:

    按业务类型对业务代码进行分类,统一设计方案,提高代码复用性,支持业务场景变化时快速扩展,以引导降价为例,当有类似降价换流量/降价换销量新的降价场景需求时,可以快速上线,类似情况每个需求可以减少10-20人日开发工作量。

  • 代码质量提升 :

    通过拆分出价基础服务和对出价流程代码做重构,将出价基础底层能力代码与上层业务层代码解耦,降低代码复杂度,降低代码冲突和维护难度,从而提高整体代码质量和可维护性。

  • 开发效率提升 :

    1. 缩短应用部署时间: 治理后的出价服务将加快编译和部署速度,缩短Bidding-interfaces应用发布(编译+部署)时间 由12分钟降低到6分钟,从而显著提升开发人员的工作效率,减少自测、调试和部署所需的时间。以Bidding服务T1环境目前一个月编译部署至少1500次计算,每个月可以节约150h应用发布时间。
    2. 提升问题定位效率: 出价基础服务层与上层业务逻辑层代码库&应用分开后,排查定位开发过程中遇到的问题和线上问题时可以有效缩小代码范围,快速定位问题代码位置。

五、测试计划设计

服务拆分的前期,研发团队投入了大量的心血。现在代码终于提测了,进入我们的测试环节:

为了能收获更好的质量效果,同时也为了不同研发、测试同学的分工。我们需要细化到最细粒度,即接口维度整理出一份详细的文档。基于此文档的基础,我们确定工作量和人员排期:

如本迭代,我们投入4位研发同学,2位测试同学。完成该200个Dubbo接口和100个HTTP接口,以及20个Topic迁移。对应的提测接口,标记上负责的研发、测试、测试进度、接口详细信息等内容。

基于该文档的基础上,我们的工作清晰而明确。一个大型的服务拆分,也变成了一步一步的里程碑任务。

接下来给大家看一下,关于Bidding拆分。我们团队整体的测试计划,我们一共设计了五道流程。

  • 第一关:自测接口对比:

    每批次拆分接口提测前,研发同学必须完成接口自测。基于新旧接口返回结果对比验证。验证通过后标记在文档中,再进入测试流程。

    对于拆分项目,自测卡的相对更加严格。由于仅做接口迁移,逻辑无变更,自测也更加容易开展。由研发同学做好接口自测,可以避免提测后新接口不通的低级问题。提高项目进度。

    在这个环节中。偶尔遇见自测不充分、新接口参数传丢、新Topic未配置等问题。(三期、四期测试中,我们加强了对研发自测的要求)。

  • 第二关:测试功能回归

    这一步骤基本属于测试的人工验证,同时重点需关注写接口数据验证。

    回归时要测的细致。每个接口,测试同学进行合理评估。尽量针对接口主流程,进行细致功能回归。由于迁移的接口数量多,历史逻辑重。一方面在接口测试任务分配时,要尽量选择对该业务熟悉的同学。另一方面,承接的同学也有做好历史逻辑梳理。尽量不要产生漏测造成的问题。

    该步骤测出的问题五花八门。另外由于Bidding拆分成多个新服务。两个新服务经常彼此间调用会出现问题。比如二期Bidding-foundation迁移完成后,Bidding-operation的接口在迁移时,依赖接口需要从Bidding替换成foundation的接口。

    灰度打开情况下,调用新接口报错仍然走老逻辑。(测试时,需要关注trace中是否走了新应用)。

  • 第三关:自动化用例

    出价域内沉淀了比较完善的接口自动化用例。在人工测试时,测试同学可以借助自动化能力,完成对迁移接口的回归功能验证。

    同时在发布前天,组内会特地多跑一轮全量自动化。一次是迁移接口开关全部打开,一次是迁移接口开关全部关闭即正常的自动化回归。然后全员进行排错。

    全量的自动化用例执行,对迁移接口问题拦截,有比较好的效果。因为会有一些功能点,人工测试时关联功能未考虑到,但在接口自动化覆盖下无所遁形。

  • 第四关:流量回放

    在拆分接口开关打开的情况下,在预发环境进行流量回放。

    线上录制流量的数据往往更加复杂,经常会测出一些意料之外的问题。

    迭代过程中,我们组内仍然会在沿用两次回放。迁移接口开关打开后回放一次,开关关闭后回放一次。(跟发布配置保持一致)。

  • 第五关:灰度过程中,关闭接口开关,功能回滚

    为保证线上生产质量,在迁移接口小流量灰度过程中。我们持续监测线上问题告警群。

    以上,就是出价域测试团队,针对服务拆分的测试流程。同时遵循可回滚的发布标准,拆分接口做了非常完善的灰度功能。下一段落进行介绍。

六、各流量类型灰度切量方案

出价流程切新应用灰度控制从几个维度控制:总开关,出价类型范围,channel范围,source范围,bidSource范围,uid白名单&uid百分比(0-10000):

  • 灰度策略
  • 支持 接口维度 ,按照百分比进行灰度切流;

  • 支持一键回切;

Dubbo接口、HTTP接口、TOC任务迁移、DMQ消息迁移分别配有不同的灰度策略。

七、结语

拆分的过程中,伴随着很多迭代需求的开发。为了提高迁移效率,我们会在需求排期后,并行处理迭代功能相关的接口,把服务拆分和迭代需求一起完成掉。

目前,我们的拆分已经进入尾声。迭代发布后,整体的技术项目就结束了。灰度节奏在按预期节奏进行~

值得一提的是,目前我们的流量迁移仍处于第一阶段,即拆分应用出价域内灰度迁移,上游不感知。目前所有的流量仍然通过bidding服务接口进行转发。后续第二阶段,灰度验证完成后,需要进行上游接口替换,流量直接请求拆分后的应用。

往期回顾

1.大模型网关:大模型时代的智能交通枢纽|得物技术

2.从“人治”到“机治”:得物离线数仓发布流水线质量门禁实践

3.AI编程实践:从Claude Code实践到团队协作的优化思考|得物技术

4.入选AAAI-PerFM|得物社区推荐之基于大语言模型的新颖性推荐算法

5.Galaxy比数平台功能介绍及实现原理|得物技术

文 /寇森

关注得物技术,每周一、三更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

❌
❌