普通视图
深度实践:得物算法域全景可观测性从 0 到 1 的演进之路
前端平台大仓应用稳定性治理之路|得物技术
RocketMQ高性能揭秘:承载万亿级流量的架构奥秘|得物技术
一、前言
在分布式系统架构中,消息队列如同畅通的“信息神经网络”,承担着解耦、削峰与异步通信的核心使命。在众多成熟方案中,RocketMQ凭借其阿里巴巴与Apache双重基因,以卓越的金融级可靠性、万亿级消息堆积能力和灵活的分布式特性脱颖而出,成为构建高可用、高性能数据流转枢纽的关键技术选型。本文将深入解析RocketMQ的核心架构、设计哲学与实践要义。
二、RocketMQ架构总览
官网图片
RocketMQ架构上主要分为四部分,如上图所示:
RocketMQ作为一款高性能、高可用的分布式消息中间件,其核心架构采用了经典的四组件协同设计,实现了消息生产、存储、路由与消费的全链路解耦与高效协同。四大组件——生产者(Producer)、消费者(Consumer)、路由中心(NameServer)和代理服务器(Broker)——各司其职,共同构建了其坚实的基石。
生产者(Producer) 作为消息的源头,负责将业务消息高效、可靠地发布到系统中。它支持分布式集群部署,并通过内置的智能负载均衡机制,自动选择最优的Broker节点与队列进行投递。
消费者(Consumer) 是消息的处理终端,同样以集群化方式工作,支持推送(Push)和拉取(Pull)两种消息获取模式。它提供了集群消费与广播消费两种模式,并能动态维护其订阅关系。
路由中心(NameServer) 是整个架构的“注册中心”,扮演着轻量级服务发现的角色。所有Broker节点都会向NameServer注册,并通过定期心跳汇报健康状态。生产者与消费者则从NameServer获取实时的主题路由与Broker信息,从而实现消息寻址的完全解耦。
代理服务器(Broker) 是消息存储与流转的核心,负责消息的持久化存储、投递与查询。为了保障高可用性,Broker通常采用主从(Master-Slave)部署架构,确保数据与服务在故障时能无缝切换。其内部集成了通信处理、存储引擎、索引服务和高可用复制等核心模块。
三、核心组件深度解析
NameServer:轻量级服务发现枢纽
NameServer是RocketMQ的轻量级服务发现与路由中心, 其核心目标是实现生产消费与Broker服务的解耦。 它不存储消息数据,仅管理路由元数据。
核心是一张的路由表 HashMap<String/* Topic */, List>,记录了每个Topic对应在哪些Broker的哪些队列上。
客户端内置了故障规避机制。如果从某个NameServer获取路由失败,或根据路由信息访问Broker失败,会自动重试其他NameServer或Broker。
1. 核心角色与设计哲学: NameServer的设计哲学是 “简单、无状态、最终一致” 。 每个NameServer节点独立运行,节点间互不通信, 这使其具备极强的水平扩展能力和极高的可用性。客户端会配置所有NameServer地址,并向其广播请求。
2. 核心工作机制: 其运作围绕路由信息的生命周期展开,可通过下图一览其核心流程:
3. 和kafka注册中心对比
- NameServer 采用 “去中心化” 和 “最终一致” 思想,追求极致的简单、轻量和水平扩展, 牺牲了强一致性,以换取架构的简洁和高可用。这非常适合路由信息变动不频繁、客户端具备容错能力的消息场景。
- Kafka (KRaft) 采用 “中心化” 和 “强一致” 思想,追求数据的精确和系统的自包含。 它将元数据管理深度内化,通过共识协议保证全局一致,但代价是架构复杂度和运维成本更高。
优劣分析: NameServer在运维简易性、集群扩展性、无外部依赖上占优;而Kafka KRaft在元数据强一致性、系统自包含、架构统一性上更胜一筹。选择取决于你对一致性、复杂度、运维成本的具体权衡。
Broker:消息存储与转发的核心引擎
解密存储文件设计
Broker目录下的文件结构
所有核心存储文件均位于Broker节点的 ${storePathRootDir}/store/ 目录下(默认路径为 ~/store/),其下各子目录职责分明:
| 目录/文件 | 核心职责 | 关键设计说明 |
|---|---|---|
| commitlog/ | 消息实体存储库 | • 设计:所有Topic的消息顺序混合追加写入。• 文件:以起始物理偏移量命名(20位数字),默认每个1GB。lock文件确保同一时刻只有一个进程写入,保障严格顺序写。 |
| consumequeue/ | 逻辑消费队列索引 | • 结构:按 {Topic}/{QueueId}/三级目录组织。 • 文件:存储定长记录(20字节/条),包含物理偏移量、长度和Tag哈希码。 • 作用:为消费者提供按Topic和队列分组的逻辑视图,实现高效拉取。 |
| index/ | 消息键哈希索引 | • 文件:以创建时间戳命名(如20240515080000000)。 • 结构:采用 “哈希槽 + 链表” 结构。 • 用途:支持根据 Message Key 或时间范围进行消息查询,用于运维排查。 |
| config/ | 运行时元数据 | • 存储Broker运行期间生成的动态数据,如所有Topic的配置、消费者组的消费进度(offset) 等。 |
| checkpoint | 状态检查点文件 | • 记录 commitlog、consumequeue、index等文件最后一次刷盘的时间戳,用于崩溃恢复时确定数据恢复的起点。 |
| abort | 异常关闭标志文件 | • 该文件存在即表明Broker上一次是非正常关闭,重启时会触发恢复流程。 |
| lock | 锁文件 | • lock文件确保同一时刻只有一个进程写入,保障严格顺序写。 |
commitLog
消息主体以及元数据的存储主体, 存储Producer端写入的消息主体内容,消息内容不是定长的。 单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量, 比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
当我们消息发送到RocketMQ以后,消息在commitLog中,因为body大小是不固定的,所以每个消息的长度也是不固定的,其存储格式如下:
下面每个表格列举了每个字段的含义
| 字段 | 字段名 | 数据类型 | 字节数 | 说明与用途 |
|---|---|---|---|---|
| 1 | MsgLen / TOTALSIZE | int | 4 | 消息总长度,即从本字段开始到结束的总字节数,是解析消息的起点。 |
| 2 | MagicCode | int | 4 | 魔术字,固定值(如 0xdaa320a7),用于标识这是一个有效的消息存储起始点,也用于区分消息体和文件末尾空白填充区。 |
| 3 | BodyCRC | int | 4 | 消息体内容的CRC校验码, 用于校验消息体在存储过程中是否损坏。 |
| 4 | QueueId | int | 4 | 队列ID,标识此消息属于Topic下的哪个逻辑队列。 |
| 5 | Flag | int | 4 | 消息标志位,供应用程序自定义使用,RocketMQ内部未使用。 |
| 6 | QueueOffset | long | 8 | 消费队列偏移量,即此消息在其对应ConsumeQueue中的顺序索引,是连续的。 |
| 7 | PhysicalOffset | long | 8 | 物理偏移量,即此消息在所有CommitLog文件中的起始字节偏移量。由于消息长度不定,此偏移量不是连续的。 |
| 8 | SysFlag | int | 4 | 系统标志位,是一个二进制组合值,用于标识消息特性,如:是否压缩、是否为事务消息、是否等待事务提交等。 |
| 9 | BornTimestamp | long | 8 | 消息生成时间戳,由Producer客户端在发送时生成。 |
| 10 | BornHost | 8字节 | 8 | 消息发送者地址。其编码并非简单字符串,而是将IP的4个段和端口号的2个字节,共6个字节,按大端序组合并填充到8字节中。 |
| 11 | StoreTimestamp | long | 8 | 消息存储时间戳,即Broker收到消息并写入内存的时间。 |
| 12 | StoreHost | 8字节 | 8 | Broker存储地址,编码方式同BornHost。 |
| 13 | ReconsumeTimes | int | 4 | 消息重试消费次数,用于死信队列判断。 |
| 14 | PreparedTransationOffset | long | 8 | 事务消息专用,存储与之关联的事务日志(Transaction Log)的偏移量。 |
| 15 | BodyLength | int | 4 | 消息体实际长度,后跟Body内容。 |
| 16 | Body | byte[] | 不定 | 消息体内容,即Producer发送的原始业务数据。 |
| 17 | TopicLength | byte | 1 | Topic名称的长度(1字节,因此Topic名不能超过255字符)。 |
| 18 | Topic | byte[] | 不定 | Topic名称的字节数组。 |
| 19 | PropertiesLength | short | 2 | 消息属性长度,后跟Properties内容。 |
| 20 | Properties | byte[] | 不定 | 消息属性,用于存储用户自定义的Key-Value扩展信息。在编码时,Key和Value之间用特殊不可见字符(如\u0001)分隔,因此属性中不能包含这些字符。 |
ConsumeQueue
消息消费索引,引入的目的主要是提高消息消费的性能。 由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件,根据topic检索消息是非常低效的。
为了解决这个问题中,提高消费时候的速度,RocketMQ会启动后台的 dispatch 线程源源不断的将消息从 commitLog 取出消息在 CommitLog 中的物理偏移量,消息长度以及 Tag Hash 等信息作为单条消息的索引,分发到对应的消费队列,构成了对 CommitLog 的引用。
consumer可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件, 故consumequeue文件夹的组织方式如下:
$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
consumequeue文件采取定长设计, 每一个条目共20个字节,前8字节的commitlog物理偏移量、中间4字节的消息长度、8字节tag的hashcode。
indexFile
RocketMQ的IndexFile索引文件提供了通过消息Key或时间区间查询消息的能力,其存储路径为$HOME/store/index/{fileName},其中文件名以创建时间戳命名。单个IndexFile文件大小固定约为400M,可保存2000W个索引,其底层采用类HashMap的哈希索引结构实现。
IndexFile是一个固定大小的文件(约400MB),其物理结构由三部分组成
1.IndexHeader(索引头,40字节)
beginTimestamp: 第一条消息存储时间
endTimestamp: 最后一条消息存储时间
beginPhyoffset: 第一条消息在CommitLog中的物理偏移量
endPhyoffset: 最后一条消息在CommitLog中的物理偏移量
hashSlotCount: 已使用的哈希槽数量
indexCount: 索引单元总数
2.Slots(哈希槽)
每个IndexFile包含500万个哈希槽位,每个Slot槽位(4字节)存储的是链式索引的第一个索引序号,每个槽位可挂载多个索引单元,形成链式结构。
- 如果Slot值为0:表示该槽位没有索引链
- 如果Slot值为N:表示该槽位对应的索引链头节点索引序号为N
3.Indexes(索引单元,20字节/个)
每个索引单元包含以下字段:
- keyHash: 消息Key的哈希值
- phyOffset: 消息在CommitLog中的物理偏移量
- timeDiff: 消息存储时间与IndexFile创建时间的差值
- preIndexNo: 同一哈希槽中前一个索引单元的序号
这个结构和hashmap结构很像,但是支持每个key通过时间排序,就可以进行时间范围的检索。
通过定长索引结构和整体设计可以通过key快速定位索引数据,拿到真实数据的物理偏移量。
4.索引查询流程
消费者通过消息Key查询时,执行以下步骤:
- 计算槽位序号slot序号 = key哈希值 % 500万
- 定位槽位地址slot位置 = 40 + (slot序号 - 1) × 4
- 获取首个索引位置index位置 = 40 + 500万 × 4 + (索引序号 - 1) × 20
- 遍历索引链从槽位指向的索引开始,沿preIndexNo链式查找,匹配目标Key并校验时间范围
- 获取物理偏移量从匹配的索引单元中读取phyOffset,最终从CommitLog获取完整消息内容
通过此机制,IndexFile实现了基于Key的高效点查和基于时间范围的快速检索。
整体流程
RocketMQ 高性能存储的核心,在于其 “混合存储” 架构,这正是一种精妙的存储层读写分离设计。
其工作流程可以这样理解:
- 统一写入,保证极致性能: 所有消息顺序追加写入一个统一的 CommitLog 文件。这种单一的顺序写操作,是它能承受海量消息写入的根本。
- 异步构建,优化读取路径: 消息一旦持久化至 CommitLog,即视为安全。随后,后台服务线程会异步地构建出专供消费的 ConsumerQueue(逻辑队列索引)和用于查询的 IndexFile。这相当于为数据建立了高效的“目录”。
- 消费消息: 消费者实际拉取消息时,是先读取 ConsumerQueue 找到消息在 CommitLog 中的物理位置,再反查 CommitLog 获取完整消息内容。
- 可靠的消费机制: 基于上述持久化保障,配合消费者自身的偏移量管理及Broker的长轮询机制,共同实现了消息的可靠投递与高效获取。
这种 “读写分离” 设计的好处在于:将耗时的写操作(顺序写CommitLog)与复杂的读操作(构建索引、分散查询)解耦,让两者可以异步、独立地进行优化,从而在整体上获得更高的吞吐量和更低的延迟。这体现了“各司其职,异步协同”的经典架构思想。
下图是官方文档的流程图
写入流程
1.消息预处理
基础校验: 检查Topic名称、消息体长度等是否合法。
生成唯一ID: 结合Broker地址和CommitLog偏移量等,生成全局唯一的MsgID。
设置系统标志: 根据消息属性(如是否事务消息、是否压缩)设置SysFlag。
2.CommitLog核心写入
获取MappedFile: 根据当前写入位置,定位或创建对应的1GB内存映射文件。这里采用双重检查锁模式来保证性能和安全。
串行加锁写入: 获取全局或文件级锁(PutMessageLock),确保同一时刻只有一个线程写入文件,严格保证顺序性。
序列化与追加: 将消息按照之前分析的二进制协议, 序列化到MappedByteBuffer中,并更新写入指针。
3.刷盘(Flush)
同步刷盘: 消息写入内存映射区后,会创建一个GroupCommitRequest并放入请求组。写入线程会等待,直到刷盘线程完成该请求对应文件的物理刷盘后,才返回成功给Producer。数据最可靠,但延迟较高。
异步刷盘(默认): 消息写入内存映射区后,立即返回成功给Producer。同时唤醒异步刷盘线程, 该线程会定时或当PageCache中待刷盘数据积累到一定量时,执行一次批量刷盘。性能高,但有宕机丢数风险。
4.异步索引构建
由独立的ReputMessageService线程处理。它不断检查CommitLog中是否有新消息到达。
一旦有新消息被确认持久化(对于同步刷盘是已落盘,对于异步刷盘是已写入映射区),该线程就会读取消息内容。
随后,它会为这条消息在对应的consumequeue目录下构建消费队列索引(记录CommitLog物理偏移量和消息长度),更新index索引文件。
消费流程
1.启动与负载均衡
消费者启动后,会向NameServer获取Topic的路由信息(包含哪些队列、分布在哪些Broker上)。
如果消费者组内有多个实例,会触发队列负载均衡(默认策略是平均分配)。例如,一个Topic有8个队列,两个消费者实例,则通常每个消费者负责消费4个队列。这一步决定了每个消费者“认领”了哪些消息队列。
2.拉取消息循环
每个消费者实例内部都有一个PullMessageService线程,它循环从一个PullRequest队列中获取任务。
PullRequest包含了拉取目标(如Broker-A, 队列3)以及下一次要拉取的位点(offset)。
消费者向指定的Broker发送网络请求,请求体中就携带了这个offset。
3.Broker端处理与返回
Broker收到请求后,根据Topic、队列ID和offset,去查询对应的ConsumeQueue索引文件。
ConsumeQueue中存储的是定长(20字节)的记录,包含消息在CommitLog中的物理偏移量和长度。
Broker根据物理偏移量,从CommitLog文件中读取完整的消息内容,通过网络返回给消费者。
4.消息处理与位点提交
消费者将拉取到的消息提交到内部的消费线程池进行处理,你的业务逻辑就在这里执行。
消费位点的管理至关重要:
位点存储: 位点由OffsetStore管理。在集群模式(CLUSTER) 下,消费位点存储在Broker上;在广播模式(BROADCAST) 下,位点存储在本地。
位点提交: 消费成功后,消费者会异步(默认方式)向Broker提交已消费的位点。Broker将其持久化到store/config/consumerOffset.json文件中。
5.消息重试与死信
如果消息消费失败(抛出异常或超时未返回CONSUME_SUCCESS),RocketMQ会触发重试机制。
对于普通消息,消息会被发回Broker上一个特殊的重试主题(%RETRY%),延迟一段时间(延迟级别:1s、5s、10s…)后再被原消费者组拉取。
如果重试超过最大次数(默认16次),消息会被投递到死信主题(%DLQ%),等待人工干预。死信队列中的消息不会再被自动消费。
一体与分离:Kafka和RocketMQ的核心架构博弈
说起RocketMQ就不能不提起Kafka了,两者都是消息中间件这个领域的霸主,但它们的核心架构设计差异, 直接决定了各自不同的性能特性和适用场景,这也是技术选型时必须深入理解的重点。
核心架构设计差异
Kafka:读写一体的“分区日志”模型, Kafka的架构哲学是极简与统一。 它将每个主题分区抽象为一个仅追加(append-only)的物理日志文件。 生产者和消费者都直接与这个日志文件交互:生产者顺序写入尾部,消费者通过维护偏移量顺序读取。这种设计下,数据的读写路径完全一致, 逻辑与物理结构高度统一。
RocketMQ:读写分离的“二级制”模型 , RocketMQ的架构哲学是分工与优化。 它采用了物理CommitLog + 逻辑ConsumeQueue的二级结构。 所有消息都顺序写入一个统一的CommitLog物理文件,实现磁盘的最高效顺序写。同时,为每个消息队列异步构建一个轻量级的ConsumeQueue索引文件,消费者读取时先查询内存中的ConsumeQueue定位,再到CommitLog中获取消息体。这是一种逻辑与物理分离的设计。
优劣势对比
基于上述架构设计根本差异,两者在关键指标上各显优劣:
| 维度 | Kafka(读写一体) | RocketMQ(读写分离) |
|---|---|---|
| 核心优势 | 极致吞吐与低延迟:读写同路径,数据写入后立即可读,端到端延迟极低。架构简单:无中间状态,副本同步、故障恢复逻辑清晰。 | 高并发读与丰富功能:索引与数据分离,支持海量消费者并发读。业务友好:原生支持事务消息、定时/延时消息、消息轨迹查询。 |
| 存储效率 | 磁盘顺序IO最大化:生产和消费都是严格顺序IO,尤其适合机械硬盘。 | 写性能极致化:所有消息顺序写CommitLog,但存在“写放大” ,一条消息需写多次(1次CommitLog + N次ConsumeQueue)。 |
| 读性能 | 消费者落后时可能触发随机读:若消费者要读取非尾部历史数据,可能需磁盘寻道。但现代SSD和预读机制已大大缓解此问题。 | 读路径优化:ConsumeQueue小而固定,可全量缓存至内存,读操作变为“内存寻址 + CommitLog顺序/随机读”。在PageCache命中率高时表现优异。 |
| 扩展性与成本 | 文件句柄(inode)开销大:每个分区都是独立目录和文件,海量分区时运维成本高。 | 存储成本与效率更优:多Topic共享CommitLog,文件数少,特别适合中小消息体、多Topic的场景。 |
| 典型场景 | 日志流、指标监控、实时流处理:作为大数据管道,与Flink/Spark生态无缝集成。 | 电商交易、金融业务、异步解耦:需要严格顺序、事务保障、业务查询的在线业务场景。 |
总而言之,Kafka像一个设计精良的高速公路系统, 核心目标是让数据车辆(消息)能够高吞吐、低延迟地持续流动,并方便地引向各个处理工厂(流计算)。而RocketMQ则像一个高度可靠的快递网络, 不仅确保包裹(消息)准确送达,还提供预约配送(定时)、签收确认(事务)、异常重投(重试)等一系列服务于业务逻辑的增值功能。
RocketMQ对于随机读取的优化
RocketMQ在消费时候的流程
消费者请求 → ConsumeQueue(内存/顺序)获取commitlog上的物理偏移量 → 根据物理偏移量定位CommitLog(磁盘/随机) → 返回消息
从ConsumeQueue获取到消息在commitlog中的偏移量的时候,回查时候可能产生随机IO
- 第一次随机IO: 根据ConsumeQueue中的物理偏移量,在CommitLog中定位消息位置
- 可能的连续随机IO: 如果一次拉取多条消息,这些消息在CommitLog中可能物理不连续
为了保证RocketMQ的高性能,采用一些优化措施,尽量避免随机IO
1. ConsumeQueue的内存映射优化
实际上,RocketMQ将ConsumeQueue映射到内存,每个ConsumeQueue约5.72MB,可完全放入PageCache,读索引操作几乎是内存操作。
public class ConsumeQueue {
private MappedFile mappedFile; // 内存映射文件
// 20字节每条:8(offset) + 4(size) + 8(tagHashCode)
}
2. PageCache的充分利用
Linux PageCache工作流程:
- 消息写入CommitLog → 进入PageCache
- 消费者读取 → 优先从PageCache获取
- 如果PageCache命中:内存速度(≈100ns)
- 如果PageCache未命中:磁盘随机读取(≈10ms)
3. 批量读取优化
// DefaultMessageStore.java
public GetMessageResult getMessage(...) {
// 一次读取多条消息(默认最多32条)
// 即使这些消息物理不连续,通过批量读取减少IO次数
for (int i = 0; i < maxMsgNums; i++) {
// 使用同一个文件channel批量读取
readMessage(ctx, msgId, consumerGroup);
}
}
4. 读取顺序性的保持
虽然CommitLog中不同Topic的消息是随机存放的,但同一个Queue的消息在CommitLog中是基本连续的:
Queue1: | Msg1 | Msg3 | Msg5 | ... | 在ConsumeQueue中连续
↓ ↓ ↓
CommitLog: | Msg1 | Msg2(T2) | Msg3 | Msg4(T3) | Msg5 |
↑_________________________↑
物理上相对连续,减少磁头寻道
高可用设计:双轨并行的可靠性架构
主从架构(Master-Slave)
经典主从模式: RocketMQ早期采用Master-Slave架构,Master处理所有读写请求,Slave仅作为热备份。这种模式下,故障切换依赖人工干预或半自动脚本, 恢复时间通常在分钟级别。
Dledger高可用集群: RocketMQ 4.5引入的Dledger基于Raft协议实现真正的主从自动切换。 当Master故障时,集群能在秒级(通常2-10秒)内自动选举新Leader,期间消息仍可写入(写入请求会阻塞至新Leader选出)。
多副本机制: 现代部署中,建议采用2主2从或3主3从架构。例如在阿里云上,每个Broker组包含1个Master和2个Slave,形成跨可用区的三副本, 单机房故障不影响服务可用性。
同步/异步复制
同步复制保证强一致(消息不丢失),异步复制追求更高性能。
// Broker配置示例
brokerRole = SYNC_MASTER
// 生产者发送消息后,必须等待至少一个Slave确认
// 确保即使Master宕机,消息也不会丢失
- 强一致性保证:消息写入Master后,同步复制到Slave才返回成功
- 性能代价:延迟增加约30-50%,TPS下降约20-40%
- 适用场景:金融交易、资金变动等对数据一致性要求极高的业务
同步/异步刷盘
同步刷盘保证消息持久化不丢失,异步刷盘提升吞吐。
brokerRole = ASYNC_MASTER
// 消息写入Master即返回成功,Slave异步复制
// 存在极短时间的数据丢失风险
- 高性能模式: 延迟降低,吞吐量接近单节点性能
- 风险窗口: Master宕机且数据未同步时,最近几秒消息可能丢失
- 适用场景: 日志收集、监控数据、可容忍微量丢失的业务消息
刷盘策略的工程优化
同步刷盘(SYNC_FLUSH)
生产者 → Broker内存 → 磁盘强制刷盘 → 返回成功
- 零数据丢失: 即使机器掉电,消息也已持久化到磁盘
- 性能瓶颈: 每次写入都触发磁盘IO,机械硬盘下TPS通常<1000
- 优化手段: 使用SSD硬盘可大幅提升性能
异步刷盘(ASYNC_FLUSH)
生产者 → Broker内存 → 立即返回成功 → 异步批量刷盘
- 高性能选择: 依赖PageCache,SSD下TPS可达数万至数十万
- 可靠性依赖: 依赖操作系统的刷盘机制(通常5秒刷盘一次)
- 配置调优:
# 调整刷盘参数
flushCommitLogLeastPages = 4 # 至少4页(16KB)才刷盘
flushCommitLogThoroughInterval = 10000 # 10秒强制刷盘一次
四、Producer与Consumer:高效的生产与消费模型
Producer
消息路由策略:
// 内置多种队列选择算法
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
// 1. 轮询(默认):均匀分布到所有队列
// 2. 哈希:相同Key的消息路由到同一队列,保证局部顺序
// 3. 机房就近:优先选择同机房的Broker
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 自定义路由逻辑
return mqs.get(arg.hashCode() % mqs.size());
}
});
发送模式对比:
| 模式 | 特点 | 性能 | 适用场景 |
|---|---|---|---|
| 同步发送 | 阻塞等待Broker响应 | TPS约5000-20000 | 重要业务消息,需立即知道发送结果 |
| 异步发送 | 回调通知结果 | TPS可达50000+ | 高并发场景,如日志、监控数据 |
| 单向发送 | 发送后不等待 | TPS最高(100000+) | 可容忍少量丢失的非关键数据 |
失败重试与熔断:
- 智能重试: 发送失败时自动重试(默认2次),可配置退避策略
- 故障规避: 自动检测Broker可用性,故障期间路由到健康节点
- 慢请求熔断: 统计发送耗时,自动隔离响应慢的Broker
Consumer
负载均衡策略:
// 集群模式:同一ConsumerGroup内消费者均分队列
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式:每个消费者消费全量队列
consumer.setMessageModel(MessageModel.BROADCASTING);
消费进度管理:
Broker托管: 默认方式,消费进度存储在Broker
本地维护: 某些场景下可自主管理offset(如批量处理)
重置策略:
// 支持多种消费起点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从最后
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从头
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); // 从时间点
并发控制优化:
// 关键并发参数
consumer.setConsumeThreadMin(20); // 最小消费线程数
consumer.setConsumeThreadMax(64); // 最大消费线程数
consumer.setPullBatchSize(32); // 每次拉取消息数
consumer.setConsumeMessageBatchMaxSize(1); // 批量消费大小
// 流控机制
consumer.setPullThresholdForQueue(1000); // 队列堆积阈值
consumer.setPullInterval(0); // 拉取间隔(0为长轮询)
五、核心流程与特性背后的架构支撑
1 .顺序消息如何保证?
全局顺序: 单Topic单队列(牺牲并发)。
分区顺序: 通过MessageQueue选择器确保同一业务键(如订单ID)的消息发往同一队列,Consumer端按队列顺序消费。
2.事务消息的两阶段提交
流程详解: Half Message -> 执行本地事务 -> Commit/Rollback。
架构支撑: Op消息回查机制,解决分布式事务的最终一致性,是架构设计中“状态可回溯”思想的体现。
3.延时消息的实现奥秘
并非真正延迟投递: 为不同延迟级别预设独立的SCHEDULE_TOPIC, 定时任务扫描到期后投递至真实Topic。
设计权衡: 以存储和计算换取功能的灵活与可靠。
六、其他性能优化关键技术点
- 零拷贝(Zero-copy): 通过sendfile或mmap+write方式,减少内核态与用户态间数据拷贝,大幅提升网络发送与文件读写效率。
- 堆外内存与内存池: 避免JVM GC对大数据块处理的影响,实现高效的内存管理。
- 文件预热: 启动时将存储文件映射到内存并写入“假数据”,避免运行时缺页中断。
七、总结:RocketMQ架构设计的启示
RocketMQ的架构设计,尤其是其在简洁性、高性能和云原生演进方面的平衡,为构建现代分布式系统提供了许多宝贵启示。
- 在简单与完备间权衡: RocketMQ没有采用强一致性的ZooKeeper,而是自研了极其简单的NameServer。这说明在非核心路径上,牺牲一定的功能完备性来换取简单性和高可用性,可能也是个不错的选择。
- 以写定存储,以读优查询: 其存储架构是典型的写优化设计。所有消息顺序追加写入,保证了最高的写入性能。而针对消费和查询这两种主要的“读”场景,则分别通过异步构建索引数据结构(ConsumeQueue和IndexFile)来优化。
八、参考资料
- [RocketMQ官方文档](为什么选择RocketMQ | RocketMQ rocketmq.apache.org/zh/docs/)
- [RocketMQ中文社区](Apache RocketMQ 原理和架构 rocketmq-learning.com/course/base…)
往期回顾
1.PAG在得物社区S级活动的落地
2.Ant Design 6.0 尝鲜:上手现代化组件开发|得物技术
3.Java 设计模式:原理、框架应用与实战全解析|得物技术
4.Go语言在高并发高可用系统中的实践与解决方案|得物技术
5.从0到1搭建一个智能分析OBS埋点数据的AI Agent|得物技术
文 /磊子
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
PAG在得物社区S级活动的落地
Ant Design 6.0 尝鲜:上手现代化组件开发|得物技术
Java 设计模式:原理、框架应用与实战全解析|得物技术
Go语言在高并发高可用系统中的实践与解决方案|得物技术
从0到1搭建一个智能分析OBS埋点数据的AI Agent|得物技术
数据库AI方向探索-MCP原理解析&DB方向实战|得物技术
得物个人信息保护社会责任报告
项目性能优化实践:深入FMP算法原理探索|得物技术
一、前 言
最近在项目中遇到了页面加载速度优化的问题,为了提高秒开率等指标,我决定从eebi报表入手,分析一下当前项目的性能监控体系。
通过查看报表中的cost_time、is_first等字段,我开始了解项目的性能数据采集情况。为了更好地理解这些数据的含义,我深入研究了相关SDK的源码实现。
在分析过程中,我发现采集到的cost_time参数实际上就是FMP(First Meaningful Paint) 指标。于是我对FMP的算法实现进行了梳理,了解了它的计算逻辑。
本文将分享我在性能优化过程中的一些思考和发现,希望能对关注前端性能优化的同学有所帮助。
二、什么是FMP
FMP (First Meaningful Paint) 首次有意义绘制,是指页面首次绘制有意义内容的时间点。与 FCP (First Contentful Paint) 不同,FMP 更关注的是对用户有实际价值的内容,而不是任何内容的首次绘制。
三、FMP 计算原理
3.1核心思想
FMP 的核心思想是:通过分析视口内重要 DOM 元素的渲染时间,找到对用户最有意义的内容完成渲染的时间点。
3.2FMP的三种计算方式
- 新算法 FMP (specifiedValue) 基于用户指定的 DOM 元素计算通过fmpSelector配置指定元素计算指定元素的完整加载时间
- 传统算法 FMP (value) 基于视口内重要元素计算选择权重最高的元素取所有参考元素中最晚完成的时间
- P80 算法 FMP (p80Value) 基于 P80 百分位计算取排序后80%位置的时间更稳定的性能指标
3.3新算法vs传统算法
传统算法流程
- 遍历整个DOM树
- 计算每个元素的权重分数
- 选择多个重要元素
- 计算所有元素的加载时间
- 取最晚完成的时间作为FMP
新算法(指定元素算法)流程
核心思想: 直接指定一个关键 DOM 元素,计算该元素的完整加载时间作为FMP。
传统算法详细步骤
第一步:DOM元素选择
// 递归遍历 DOM 树,选择重要元素
selectMostImportantDOMs(dom: HTMLElement = document.body): void {
const score = this.getWeightScore(dom);
if (score > BODY_WEIGHT) {
// 权重大于 body 权重,作为参考元素
this.referDoms.push(dom);
} else if (score >= this.highestWeightScore) {
// 权重大于等于最高分数,作为重要元素
this.importantDOMs.push(dom);
}
// 递归处理子元素
for (let i = 0, l = dom.children.length; i < l; i++) {
this.selectMostImportantDOMs(dom.children[i] as HTMLElement);
}
}
第二步:权重计算
// 计算元素权重分数
getWeightScore(dom: Element) {
// 获取元素在视口中的位置和大小
const viewPortPos = dom.getBoundingClientRect();
const screenHeight = this.getScreenHeight();
// 计算元素在首屏中的可见面积
const fpWidth = Math.min(viewPortPos.right, SCREEN_WIDTH) - Math.max(0, viewPortPos.left);
const fpHeight = Math.min(viewPortPos.bottom, screenHeight) - Math.max(0, viewPortPos.top);
// 权重 = 可见面积 × 元素类型权重
return fpWidth * fpHeight * getDomWeight(dom);
}
权重计算公式:
权重分数 = 可见面积 × 元素类型权重
元素类型权重:
- OBJECT, EMBED, VIDEO: 最高权重
- SVG, IMG, CANVAS: 高权重
- 其他元素: 权重为 1
第三步:加载时间计算
getLoadingTime(dom: HTMLElement, resourceLoadingMap: Record<string, any>): number {
// 获取 DOM 标记时间
const baseTime = getMarkValueByDom(dom);
// 获取资源加载时间
let resourceTime = 0;
if (RESOURCE_TAG_SET.indexOf(tagType) >= 0) {
// 处理图片、视频等资源
const resourceTiming = resourceLoadingMap[resourceName];
resourceTime = resourceTiming ? resourceTiming.responseEnd : 0;
}
// 返回较大值(DOM 时间 vs 资源时间)
return Math.max(resourceTime, baseTime);
}
第四步:FMP值计算
calcValue(resourceLoadingMap: Record<string, any>, isSubPage: boolean = false): void {
// 构建参考元素列表(至少 3 个元素)
const referDoms = this.referDoms.length >= 3
? this.referDoms
: [...this.referDoms, ...this.importantDOMs.slice(this.referDoms.length - 3)];
// 计算每个元素的加载时间
const timings = referDoms.map(dom => this.getLoadingTime(dom, resourceLoadingMap));
// 排序时间数组
const sortedTimings = timings.sort((t1, t2) => t1 - t2);
// 计算最终值
const info = getMetricNumber(sortedTimings);
this.value = info.value; // 最后一个元素的时间(最晚完成)
this.p80Value = info.p80Value; // P80 百分位时间
}
新算法详细步骤
第一步:配置指定元素
// 通过全局配置指定 FMP 目标元素
const { fmpSelector = "" } = SingleGlobal?.getOptions?.();
配置示例:
// 初始化时配置
init({
fmpSelector: '.main-content', // 指定主要内容区域
// 或者
fmpSelector: '#hero-section', // 指定首屏区域
// 或者
fmpSelector: '.product-list' // 指定产品列表
});
第二步:查找指定元素
if (fmpSelector) {
// 使用 querySelector 查找指定的 DOM 元素
const $specifiedEl = document.querySelector(fmpSelector);
if ($specifiedEl && $specifiedEl instanceof HTMLElement) {
// 找到指定元素,进行后续计算
this.specifiedDom = $specifiedEl;
}
}
查找逻辑:
- 使用document.querySelector()查找元素
- 验证元素存在且为 HTMLElement 类型
- 保存元素引用到specifiedDom
第三步:计算指定元素的加载时间
// 计算指定元素的完整加载时间
this.specifiedValue = this.getLoadingTime(
$specifiedEl,
resourceLoadingMap
);
加载时间计算包含:
- DOM 标记时间
// 获取 DOM 元素的基础标记时间
const baseTime = getMarkValueByDom(dom);
- 资源加载时间
let resourceTime = 0;
// 处理直接资源(img, video, embed 等)
const tagType = dom.tagName.toUpperCase();
if (RESOURCE_TAG_SET.indexOf(tagType) >= 0) {
const resourceName = normalizeResourceName((dom as any).src);
const resourceTiming = resourceLoadingMap[resourceName];
resourceTime = resourceTiming ? resourceTiming.responseEnd : 0;
}
// 处理背景图片
const bgImgUrl = getDomBgImg(dom);
if (isImageUrl(bgImgUrl)) {
const resourceName = normalizeResourceName(bgImgUrl);
const resourceTiming = resourceLoadingMap[resourceName];
resourceTime = resourceTiming ? resourceTiming.responseEnd : 0;
}
- 综合时间计算
// 返回 DOM 时间和资源时间的较大值
return Math.max(resourceTime, baseTime);
第四步:FMP值确定
// 根据是否有指定值来决定使用哪个 FMP 值
if (specifiedValue === 0) {
// 如果没有指定值,回退到传统算法
fmp = isSubPage ? value - diffTime : value;
} else {
// 如果有指定值,使用指定值
fmp = isSubPage ? specifiedValue - diffTime : specifiedValue;
}
决策逻辑:
- 如果 specifiedValue > 0:使用指定元素的加载时间
- 如果 specifiedValue === 0:回退到传统算法
第五步:子页面时间调整
// 子页面的 FMP 值需要减去时间偏移
if (isSubPage) {
fmp = specifiedValue - diffTime;
// diffTime = startSubTime - initTime
}
新算法的优势
精确性更高
- 直接针对业务关键元素
- 避免权重计算的误差
- 更贴近业务需求
可控性强
- 开发者可以指定关键元素
- 可以根据业务场景调整
- 避免算法自动选择的偏差
计算简单
- 只需要计算一个元素
- 不需要复杂的权重计算
- 性能开销更小
业务导向
- 直接反映业务关键内容的加载时间
- 更符合用户体验评估需求
- 便于性能优化指导
3.4关键算法
P80 百分位计算
export function getMetricNumber(sortedTimings: number[]) {
const value = sortedTimings[sortedTimings.length - 1]; // 最后一个(最晚)
const p80Value = sortedTimings[Math.floor((sortedTimings.length - 1) * 0.8)]; // P80
return { value, p80Value };
}
元素类型权重
const IMPORTANT_ELEMENT_WEIGHT_MAP = {
SVG: IElementWeight.High, // 高权重
IMG: IElementWeight.High, // 高权重
CANVAS: IElementWeight.High, // 高权重
OBJECT: IElementWeight.Highest, // 最高权重
EMBED: IElementWeight.Highest, // 最高权重
VIDEO: IElementWeight.Highest // 最高权重
};
四、时间标记机制
4.1DOM变化监听
// MutationObserver 监听 DOM 变化
private observer = new MutationObserver((mutations = []) => {
const now = Date.now();
this.handleChange(mutations, now);
});
4.2时间标记
// 为每个 DOM 变化创建性能标记
mark(count); // 创建 performance.mark(`mutation_pc_${count}`)
// 为 DOM 元素设置标记
setDataAttr(elem, TAG_KEY, `${mutationCount}`);
4.3标记值获取
// 根据 DOM 元素获取标记时间
getMarkValueByDom(dom: HTMLElement) {
const markValue = getDataAttr(dom, TAG_KEY);
return getMarkValue(parseInt(markValue));
}
五、资源加载考虑
5.1资源类型识别
图片资源:
标签的 src属性
视频资源: 标签的 src属性
背景图片: CSS background-image属性
嵌入资源: , 标签
5.2资源时间获取
// 从 Performance API 获取资源加载时间
const resourceTiming = resourceLoadingMap[resourceName];
const resourceTime = resourceTiming ? resourceTiming.responseEnd : 0;
5.3综合时间计算
// DOM 时间和资源时间的较大值
return Math.max(resourceTime, baseTime);
六、子页面支持
6.1时间偏移处理
// 子页面从调用 send 方法开始计时
const diffTime = this.startSubTime - this.initTime;
// 子页面只统计开始时间之后的资源
if (!isSubPage || resource.startTime > diffTime) {
resourceLoadingMap[resourceName] = resource;
}
6.2FMP值调整
// 子页面的 FMP 值需要减去时间偏移
fmp = isSubPage ? value - diffTime : value;
七、FMP的核心优势
7.1用户感知导向
FMP 最大的优势在于它真正关注用户的实际体验:
- 内容价值优先:只计算对用户有意义的内容渲染时间
- 智能权重评估:根据元素的重要性和可见性进行差异化计算
- 真实体验映射:更贴近用户的实际感知,而非技术层面的指标
7.2多维度计算体系
FMP 采用了更加全面的计算方式:
- 元素权重分析:综合考虑元素类型和渲染面积的影响
- 资源加载关联:将静态资源加载时间纳入计算范围
- 算法对比验证:支持多种算法并行计算,确保结果准确性
7.3高精度测量
FMP 在测量精度方面表现突出:
- DOM 变化追踪:基于实际 DOM 结构变化的时间点
- API 数据融合:结合 Performance API 提供的详细数据
- 统计分析支持:支持 P80 百分位等多种统计指标,便于性能分析
八、FMP的实际应用场景
8.1性能监控实践
FMP 在性能监控中发挥着重要作用:
- 关键指标追踪:实时监控页面首次有意义内容的渲染时间
- 瓶颈识别:快速定位性能瓶颈和潜在的优化点
- 趋势分析:通过历史数据了解性能变化趋势
8.2用户体验评估
FMP 为产品团队提供了用户视角的性能评估:
- 真实感知测量:评估用户实际感受到的页面加载速度
- 竞品对比分析:对比不同页面或产品的性能表现
- 用户满意度关联:将技术指标与用户满意度建立关联
8.3优化指导价值
FMP 数据为性能优化提供了明确的方向:
- 资源优化策略:指导静态资源加载顺序和方式的优化
- 渲染路径优化:帮助优化关键渲染路径,提升首屏体验
- 量化效果评估:为优化效果提供可量化的评估标准
九、总结
通过这次深入分析,我对 FMP 有了更全面的认识。FMP 通过科学的算法设计,能够准确反映用户感知的页面加载性能,是前端性能监控的重要指标。
它不仅帮助我们更好地理解页面加载过程,更重要的是为性能优化提供了科学的依据。在实际项目中,合理运用 FMP 指标,能够有效提升用户体验,实现真正的"秒开"效果。
希望这篇文章能对正在关注前端性能优化的同学有所帮助,也欢迎大家分享自己的实践经验。
往期回顾
1. Dragonboat统一存储LogDB实现分析|得物技术
2. 从数字到版面:得物数据产品里数字格式化的那些事
3. 一文解析得物自建 Redis 最新技术演进
4. Golang HTTP请求超时与重试:构建高可靠网络请求|得物技术
5. RN与hawk碰撞的火花之C++异常捕获|得物技术
文 /阿列
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
Dragonboat统一存储LogDB实现分析|得物技术
一、项目概览
Dragonboat 是纯 Go 实现的(multi-group)Raft 库。
为应用屏蔽 Raft 复杂性,提供易于使用的 NodeHost 和状态机接口。该库(自称)有如下特点:
- 高吞吐、流水线化、批处理;
- 提供了内存/磁盘状态机多种实现;
- 提供了 ReadIndex、成员变更、Leader转移等管理端API;
- 默认使用 Pebble 作为 存储后端。
本次代码串讲以V3的稳定版本为基础,不包括GitHub上v4版本内容。
二、整体架构
![]()
三、LogDB 统一存储
LogDB 模块是 Dragonboat 的核心持久化存储层,虽然模块名字有Log,但是它囊括了所有和存储相关的API,负责管理 Raft 协议的所有持久化数据,包括:
Raft状态 (RaftState)
Raft内部状态变更的集合结构
包括但不限于:
- ClusterID/NodeID: 节点ID
- RaftState: Raft任期、投票情况、commit进度
- EntriesToSave:Raft提案日志数据
- Snapshot:快照元数据(包括快照文件路径,快照大小,快照对应的提案Index,快照对应的Raft任期等信息)
- Messages:发给其他节点的Raft消息
- ReadyToReads:ReadIndex就绪的请求
引导信息 (Bootstrap)
type Bootstrap struct {
Addresses map[uint64]string // 初始集群成员
Join bool
Type StateMachineType
}
ILogDB的API如下:
type ILogDB interface {
BinaryFormat() uint32 // 返回支持的二进制格式版本号
ListNodeInfo() ([]NodeInfo, error) // 列出 LogDB 中所有可用的节点信息
// 存储集群节点的初始化配置信息,包括是否加入集群、状态机类型等
SaveBootstrapInfo(clusterID uint64, nodeID uint64, bootstrap pb.Bootstrap) error
// 获取保存的引导信息
GetBootstrapInfo(clusterID uint64, nodeID uint64) (pb.Bootstrap, error)
// 原子性保存 Raft 状态、日志条目和快照元数据
SaveRaftState(updates []pb.Update, shardID uint64) error
// 迭代读取指定范围内的连续日志条目
IterateEntries(ents []pb.Entry, size uint64, clusterID uint64, nodeID uint64,
low uint64, high uint64, maxSize uint64) ([]pb.Entry, uint64, error)
// 读取持久化的 Raft 状态
ReadRaftState(clusterID uint64, nodeID uint64, lastIndex uint64) (RaftState, error)
// 删除指定索引之前的所有条目, 日志压缩、快照后清理旧日志
RemoveEntriesTo(clusterID uint64, nodeID uint64, index uint64) error
// 回收指定索引之前条目占用的存储空间
CompactEntriesTo(clusterID uint64, nodeID uint64, index uint64) (<-chan struct{}, error)
// 保存所有快照元数据
SaveSnapshots([]pb.Update) error
// 删除指定的快照元数据 清理过时或无效的快照
DeleteSnapshot(clusterID uint64, nodeID uint64, index uint64) error
// 列出指定索引范围内的可用快照
ListSnapshots(clusterID uint64, nodeID uint64, index uint64) ([]pb.Snapshot, error)
// 删除节点的所有相关数据
RemoveNodeData(clusterID uint64, nodeID uint64) error
// 导入快照并创建所有必需的元数据
ImportSnapshot(snapshot pb.Snapshot, nodeID uint64) error
}
3.1索引键
存储的底层本质是一个KVDB (pebble or rocksdb),由于业务的复杂性,要统一各类业务key的设计方法,而且要降低空间使用,所以有了如下的key设计方案。
龙舟中key分为3类:
![]()
其中,2字节的header用于区分各类不同业务的key空间。
entryKeyHeader = [2]byte{0x1, 0x1} // 普通日志条目
persistentStateKey = [2]byte{0x2, 0x2} // Raft状态
maxIndexKey = [2]byte{0x3, 0x3} // 最大索引记录
nodeInfoKey = [2]byte{0x4, 0x4} // 节点元数据
bootstrapKey = [2]byte{0x5, 0x5} // 启动配置
snapshotKey = [2]byte{0x6, 0x6} // 快照索引
entryBatchKey = [2]byte{0x7, 0x7} // 批量日志
在key的生成中,采用了useAsXXXKey和SetXXXKey的方式,复用了data这个二进制变量,减少GC。
type Key struct {
data []byte // 底层字节数组复用池
key []byte // 有效数据切片
pool *sync.Pool // 似乎并没有什么用
}
func (k *Key) useAsEntryKey() {
k.key = k.data
}
type IReusableKey interface {
SetEntryBatchKey(clusterID uint64, nodeID uint64, index uint64)
// SetEntryKey sets the key to be an entry key for the specified Raft node
// with the specified entry index.
SetEntryKey(clusterID uint64, nodeID uint64, index uint64)
// SetStateKey sets the key to be an persistent state key suitable
// for the specified Raft cluster node.
SetStateKey(clusterID uint64, nodeID uint64)
// SetMaxIndexKey sets the key to be the max possible index key for the
// specified Raft cluster node.
SetMaxIndexKey(clusterID uint64, nodeID uint64)
// Key returns the underlying byte slice of the key.
Key() []byte
// Release releases the key instance so it can be reused in the future.
Release()
}
func (k *Key) useAsEntryKey() {
k.key = k.data
}
// SetEntryKey sets the key value to the specified entry key.
func (k *Key) SetEntryKey(clusterID uint64, nodeID uint64, index uint64) {
k.useAsEntryKey()
k.key[0] = entryKeyHeader[0]
k.key[1] = entryKeyHeader[1]
k.key[2] = 0
k.key[3] = 0
binary.BigEndian.PutUint64(k.key[4:], clusterID)
// the 8 bytes node ID is actually not required in the key. it is stored as
// an extra safenet - we don't know what we don't know, it is used as extra
// protection between different node instances when things get ugly.
// the wasted 8 bytes per entry is not a big deal - storing the index is
// wasteful as well.
binary.BigEndian.PutUint64(k.key[12:], nodeID)
binary.BigEndian.PutUint64(k.key[20:], index)
}
3.2变量复用IContext
IContext的核心设计目的是实现并发安全的内存复用机制。在高并发场景下,频繁的内存分配和释放会造成较大的GC压力,通过IContext可以实现:
- 键对象复用:通过GetKey()获取可重用的IReusableKey
- 缓冲区复用:通过GetValueBuffer()获取可重用的字节缓冲区
- 批量操作对象复用:EntryBatch和WriteBatch的复用
// IContext is the per thread context used in the logdb module.
// IContext is expected to contain a list of reusable keys and byte
// slices that are owned per thread so they can be safely reused by the
// same thread when accessing ILogDB.
type IContext interface {
// Destroy destroys the IContext instance.
Destroy()
// Reset resets the IContext instance, all previous returned keys and
// buffers will be put back to the IContext instance and be ready to
// be used for the next iteration.
Reset()
// GetKey returns a reusable key.
GetKey() IReusableKey // 这就是上文中的key接口
// GetValueBuffer returns a byte buffer with at least sz bytes in length.
GetValueBuffer(sz uint64) []byte
// GetWriteBatch returns a write batch or transaction instance.
GetWriteBatch() interface{}
// SetWriteBatch adds the write batch to the IContext instance.
SetWriteBatch(wb interface{})
// GetEntryBatch returns an entry batch instance.
GetEntryBatch() pb.EntryBatch
// GetLastEntryBatch returns an entry batch instance.
GetLastEntryBatch() pb.EntryBatch
}
type context struct {
size uint64
maxSize uint64
eb pb.EntryBatch
lb pb.EntryBatch
key *Key
val []byte
wb kv.IWriteBatch
}
func (c *context) GetKey() IReusableKey {
return c.key
}
func (c *context) GetValueBuffer(sz uint64) []byte {
if sz <= c.size {
return c.val
}
val := make([]byte, sz)
if sz < c.maxSize {
c.size = sz
c.val = val
}
return val
}
func (c *context) GetEntryBatch() pb.EntryBatch {
return c.eb
}
func (c *context) GetLastEntryBatch() pb.EntryBatch {
return c.lb
}
func (c *context) GetWriteBatch() interface{} {
return c.wb
}
func (c *context) SetWriteBatch(wb interface{}) {
c.wb = wb.(kv.IWriteBatch)
}
3.3存储引擎封装IKVStore
IKVStore 是 Dragonboat 日志存储系统的抽象接口,它定义了底层键值存储引擎需要实现的所有基本操作。这个接口让 Dragonboat 能够支持不同的存储后端(如 Pebble、RocksDB 等),实现了存储引擎的可插拔性。
type IKVStore interface {
// Name is the IKVStore name.
Name() string
// Close closes the underlying Key-Value store.
Close() error
// 范围扫描 - 支持前缀遍历的迭代器
IterateValue(fk []byte,
lk []byte, inc bool, op func(key []byte, data []byte) (bool, error)) error
// 查询操作 - 基于回调的内存高效查询模式
GetValue(key []byte, op func([]byte) error) error
// 写入操作 - 单条记录的原子写入
SaveValue(key []byte, value []byte) error
// 删除操作 - 单条记录的精确删除
DeleteValue(key []byte) error
// 获取批量写入器
GetWriteBatch() IWriteBatch
// 原子提交批量操作
CommitWriteBatch(wb IWriteBatch) error
// 批量删除一个范围的键值对
BulkRemoveEntries(firstKey []byte, lastKey []byte) error
// 压缩指定范围的存储空间
CompactEntries(firstKey []byte, lastKey []byte) error
// 全量压缩整个数据库
FullCompaction() error
}
type IWriteBatch interface {
Destroy() // 清理资源,防止内存泄漏
Put(key, value []byte) // 添加写入操作
Delete(key []byte) // 添加删除操作
Clear() // 清空批处理中的所有操作
Count() int // 获取当前批处理中的操作数量
}
openPebbleDB是Dragonboat 中 Pebble 存储引擎的初始化入口,负责根据配置创建一个完整可用的键值存储实例。
![]()
// KV is a pebble based IKVStore type.
type KV struct {
db *pebble.DB
dbSet chan struct{}
opts *pebble.Options
ro *pebble.IterOptions
wo *pebble.WriteOptions
event *eventListener
callback kv.LogDBCallback
config config.LogDBConfig
}
var _ kv.IKVStore = (*KV)(nil)
// openPebbleDB
// =============
// 将 Dragonboat 的 LogDBConfig → Pebble 引擎实例
func openPebbleDB(
cfg config.LogDBConfig,
cb kv.LogDBCallback, // => busy通知:busy(true/false)
dir string, // 主数据目录
wal string, // WAL 独立目录(可空)
fs vfs.IFS, // 文件系统抽象(磁盘/memfs)
) (kv.IKVStore, error) {
//--------------------------------------------------
// 2️⃣ << 核心调优参数读入
//--------------------------------------------------
blockSz := int(cfg.KVBlockSize) // 数据块(4K/8K…)
writeBufSz := int(cfg.KVWriteBufferSize) // 写缓冲
bufCnt := int(cfg.KVMaxWriteBufferNumber) // MemTable数量
l0Compact := int(cfg.KVLevel0FileNumCompactionTrigger) // L0 层文件数量触发压缩的阈值
l0StopWrites := int(cfg.KVLevel0StopWritesTrigger)
baseBytes := int64(cfg.KVMaxBytesForLevelBase)
fileBaseSz := int64(cfg.KVTargetFileSizeBase)
cacheSz := int64(cfg.KVLRUCacheSize)
levelMult := int64(cfg.KVTargetFileSizeMultiplier) // 每层文件大小倍数
numLevels := int64(cfg.KVNumOfLevels)
//--------------------------------------------------
// 4️⃣ 构建 LSM-tree 层级选项 (每层无压缩)
//--------------------------------------------------
levelOpts := []pebble.LevelOptions{}
sz := fileBaseSz
for lvl := 0; lvl < int(numLevels); lvl++ {
levelOpts = append(levelOpts, pebble.LevelOptions{
Compression: pebble.NoCompression, // 写性能优先
BlockSize: blockSz,
TargetFileSize: sz, // L0 < L1 < … 呈指数增长
})
sz *= levelMult
}
//--------------------------------------------------
// 5️⃣ 初始化依赖:LRU Cache + 读写选项
//--------------------------------------------------
cache := pebble.NewCache(cacheSz) // block缓存
ro := &pebble.IterOptions{} // 迭代器默认配置
wo := &pebble.WriteOptions{Sync: true} // ❗fsync强制刷盘
opts := &pebble.Options{
Levels: levelOpts,
Cache: cache,
MemTableSize: writeBufSz,
MemTableStopWritesThreshold: bufCnt,
LBaseMaxBytes: baseBytes,
L0CompactionThreshold: l0Compact,
L0StopWritesThreshold: l0StopWrites,
Logger: PebbleLogger,
FS: vfs.NewPebbleFS(fs),
MaxManifestFileSize: 128 * 1024 * 1024,
// WAL 目录稍后条件注入
}
kv := &KV{
dbSet: make(chan struct{}), // 关闭->初始化完成信号
callback: cb, // 上层 raft engine 回调
config: cfg,
opts: opts,
ro: ro,
wo: wo,
}
event := &eventListener{
kv: kv,
stopper: syncutil.NewStopper(),
}
// => 关键事件触发
opts.EventListener = pebble.EventListener{
WALCreated: event.onWALCreated,
FlushEnd: event.onFlushEnd,
CompactionEnd: event.onCompactionEnd,
}
//--------------------------------------------------
// 7️⃣ 目录准备
//--------------------------------------------------
if wal != "" {
fs.MkdirAll(wal) // 📁 为 WAL 单独磁盘预留
opts.WALDir = wal
}
fs.MkdirAll(dir) // 📁 主数据目录
//--------------------------------------------------
// 8️⃣ 真正的数据库实例化
//--------------------------------------------------
pdb, err := pebble.Open(dir, opts)
if err != nil { return nil, err }
//--------------------------------------------------
// 9️⃣ 🧹 资源整理 & 启动事件
//--------------------------------------------------
cache.Unref() // 去除多余引用,防止泄露
kv.db = pdb
// 🔔 手动触发一次 WALCreated 确保反压逻辑进入首次轮询
kv.setEventListener(event) // 内部 close(kv.dbSet)
return kv, nil
}
其中eventListener是对pebble 内存繁忙的回调,繁忙判断的条件有两个:
- 内存表大小超过阈值(95%)
- L0 层文件数量超过阈值(L0写入最大文件数量-1)
func (l *eventListener) notify() {
l.stopper.RunWorker(func() {
select {
case <-l.kv.dbSet:
if l.kv.callback != nil {
memSizeThreshold := l.kv.config.KVWriteBufferSize *
l.kv.config.KVMaxWriteBufferNumber * 19 / 20
l0FileNumThreshold := l.kv.config.KVLevel0StopWritesTrigger - 1
m := l.kv.db.Metrics()
busy := m.MemTable.Size >= memSizeThreshold ||
uint64(m.Levels[0].NumFiles) >= l0FileNumThreshold
l.kv.callback(busy)
}
default:
}
})
}
![]()
3.4日志条目存储DB
db结构体是Dragonboat日志数据库的核心管理器,提供Raft日志、快照、状态等数据的持久化存储接口。是桥接了业务和pebble存储的中间层。
// db is the struct used to manage log DB.
type db struct {
cs *cache // 节点信息、Raft状态信息缓存
keys *keyPool // Raft日志索引键变量池
kvs kv.IKVStore // pebble的封装
entries entryManager // 日志条目读写封装
}
// 这里面的信息不会过期,叫寄存更合适
type cache struct {
nodeInfo map[raftio.NodeInfo]struct{}
ps map[raftio.NodeInfo]pb.State
lastEntryBatch map[raftio.NodeInfo]pb.EntryBatch
maxIndex map[raftio.NodeInfo]uint64
mu sync.Mutex
}
- 获取一个批量写容器
实现:
func (r *db) getWriteBatch(ctx IContext) kv.IWriteBatch {
if ctx != nil {
wb := ctx.GetWriteBatch()
if wb == nil {
wb = r.kvs.GetWriteBatch()
ctx.SetWriteBatch(wb)
}
return wb.(kv.IWriteBatch)
}
return r.kvs.GetWriteBatch()
}
降低GC压力
- 获取所有节点信息
实现:
func (r *db) listNodeInfo() ([]raftio.NodeInfo, error) {
fk := newKey(bootstrapKeySize, nil)
lk := newKey(bootstrapKeySize, nil)
fk.setBootstrapKey(0, 0)
lk.setBootstrapKey(math.MaxUint64, math.MaxUint64)
ni := make([]raftio.NodeInfo, 0)
op := func(key []byte, data []byte) (bool, error) {
cid, nid := parseNodeInfoKey(key)
ni = append(ni, raftio.GetNodeInfo(cid, nid))
return true, nil
}
if err := r.kvs.IterateValue(fk.Key(), lk.Key(), true, op); err != nil {
return []raftio.NodeInfo{}, err
}
return ni, nil
}
- 保存集群状态
实现:
type Update struct {
ClusterID uint64 // 集群ID,标识节点所属的Raft集群
NodeID uint64 // 节点ID,标识集群中的具体节点
State // 包含当前任期(Term)、投票节点(Vote)、提交索引(Commit)三个关键持久化状态
EntriesToSave []Entry // 需要持久化到稳定存储的日志条目
CommittedEntries []Entry // 已提交位apply的日志条目
MoreCommittedEntries bool // 指示是否还有更多已提交条目等待处理
Snapshot Snapshot // 快照元数据,当需要应用快照时设置
ReadyToReads []ReadyToRead // ReadIndex机制实现的线性一致读
Messages []Message // 需要发送给其他节点的Raft消息
UpdateCommit struct {
Processed uint64 // 已推送给RSM处理的最后索引
LastApplied uint64 // RSM确认已执行的最后索引
StableLogTo uint64 // 已稳定存储的日志到哪个索引
StableLogTerm uint64 // 已稳定存储的日志任期
StableSnapshotTo uint64 // 已稳定存储的快照到哪个索引
ReadyToRead uint64 // 已准备好读的ReadIndex请求索引
}
}
func (r *db) saveRaftState(updates []pb.Update, ctx IContext) error {
// 步骤1:获取写入批次对象,用于批量操作提高性能
// 优先从上下文中获取已存在的批次,避免重复创建
wb := r.getWriteBatch(ctx)
// 步骤2:遍历所有更新,处理每个节点的状态和快照
for _, ud := range updates {
// 保存 Raft 的硬状态(Term、Vote、Commit)
// 使用缓存机制避免重复保存相同状态
r.saveState(ud.ClusterID, ud.NodeID, ud.State, wb, ctx)
// 检查是否有快照需要保存
if !pb.IsEmptySnapshot(ud.Snapshot) {
// 快照索引一致性检查:确保快照索引不超过最后一个日志条目的索引
// 这是 Raft 协议的重要约束,防止状态不一致
if len(ud.EntriesToSave) > 0 {
lastIndex := ud.EntriesToSave[len(ud.EntriesToSave)-1].Index
if ud.Snapshot.Index > lastIndex {
plog.Panicf("max index not handled, %d, %d",
ud.Snapshot.Index, lastIndex)
}
}
// 保存快照元数据到数据库
r.saveSnapshot(wb, ud)
// 更新节点的最大日志索引为快照索引
r.setMaxIndex(wb, ud, ud.Snapshot.Index, ctx)
}
}
// 步骤3:批量保存所有日志条目
// 这里会调用 entryManager 接口的 record 方法,根据配置选择批量或单独存储策略
r.saveEntries(updates, wb, ctx)
// 步骤4:提交写入批次到磁盘
// 只有在批次中有实际操作时才提交,避免不必要的磁盘 I/O
if wb.Count() > 0 {
return r.kvs.CommitWriteBatch(wb)
}
return nil
}
- 保存引导信息
实现:
func (r *db) saveBootstrapInfo(clusterID uint64,
nodeID uint64, bs pb.Bootstrap) error {
wb := r.getWriteBatch(nil)
r.saveBootstrap(wb, clusterID, nodeID, bs)
return r.kvs.CommitWriteBatch(wb) // 提交至Pebble
}
func (r *db) saveBootstrap(wb kv.IWriteBatch,
clusterID uint64, nodeID uint64, bs pb.Bootstrap) {
k := newKey(maxKeySize, nil)
k.setBootstrapKey(clusterID, nodeID) // 序列化集群节点信息
data, err := bs.Marshal()
if err != nil {
panic(err)
}
wb.Put(k.Key(), data)
}
- 获取Raft状态
实现:
func (r *db) getState(clusterID uint64, nodeID uint64) (pb.State, error) {
k := r.keys.get()
defer k.Release()
k.SetStateKey(clusterID, nodeID)
hs := pb.State{}
if err := r.kvs.GetValue(k.Key(), func(data []byte) error {
if len(data) == 0 {
return raftio.ErrNoSavedLog
}
if err := hs.Unmarshal(data); err != nil {
panic(err)
}
return nil
}); err != nil {
return pb.State{}, err
}
return hs, nil
}
3.5对外存储API实现
龙舟对ILogDB提供了实现:ShardedDB,一个管理了多个pebble bucket的存储单元。
var _ raftio.ILogDB = (*ShardedDB)(nil)
// ShardedDB is a LogDB implementation using sharded pebble instances.
type ShardedDB struct {
completedCompactions uint64 // 原子计数器:已完成压缩操作数
config config.LogDBConfig // 日志存储配置
ctxs []IContext // 分片上下文池,减少GC压力
shards []*db // 核心:Pebble实例数组
partitioner server.IPartitioner // 智能分片策略器
compactionCh chan struct{} // 压缩任务信号通道
compactions *compactions // 压缩任务管理器
stopper *syncutil.Stopper // 优雅关闭管理器
}
- 初始化过程
实现:
// 入口函数:创建并初始化分片日志数据库
OpenShardedDB(config, cb, dirs, lldirs, batched, check, fs, kvf):
// ===阶段1:安全验证===
if 配置为空 then panic
if check和batched同时为true then panic
// ===阶段2:预分配资源管理器===
shards := 空数组
closeAll := func(all []*db) { //出错清理工具
for s in all {
s.close()
}
}
// ===阶段3:逐个创建分片===
loop i := 0 → 分片总数:
datadir := pathJoin(dirs[i], "logdb-"+i) //数据目录
snapdir := "" //快照目录(可选)
if lldirs非空 {
snapdir = pathJoin(lldirs[i], "logdb-"+i)
}
shardCb := {shard:i, callback:cb} //监控回调
db, err := openRDB(...) //创建实际数据库实例
if err != nil { //创建失败
closeAll(shards) //清理已创建的
return nil, err
}
shards = append(shards, db)
// ===阶段5:核心组件初始化===
partitioner := 新建分区器(execShards数量, logdbShards数量)
instance := &ShardedDB{
shards: shards,
partitioner: partitioner,
compactions: 新建压缩管理器(),
compactionCh: 通道缓冲1,
ctxs: make([]IContext, 执行分片数),
stopper: 新建停止器()
}
// ===阶段6:预分配上下文&启动后台===
for j := 0 → 执行分片数:
instance.ctxs[j] = 新建Context(saveBufferSize)
instance.stopper.RunWorker(func() { //后台压缩协程
instance.compactionWorkerMain()
})
return instance, nil //构造完成
![]()
- 保存集群状态
实现:
func (s *ShardedDB) SaveRaftState(updates []pb.Update, shardID uint64) error {
if shardID-1 >= uint64(len(s.ctxs)) {
plog.Panicf("invalid shardID %d, len(s.ctxs): %d", shardID, len(s.ctxs))
}
ctx := s.ctxs[shardID-1]
ctx.Reset()
return s.SaveRaftStateCtx(updates, ctx)
}
func (s *ShardedDB) SaveRaftStateCtx(updates []pb.Update, ctx IContext) error {
if len(updates) == 0 {
return nil
}
pid := s.getParititionID(updates)
return s.shards[pid].saveRaftState(updates, ctx)
}
以sylas为例子,我们每个分片都是单一cluster,所以logdb只使用了一个分片,龙舟设计初衷是为了解放多cluster的吞吐,我们暂时用不上,tindb可以考虑:
![]()
四、总结
LogDB是Dragonboat重要的存储层实现,作者将Pebble引擎包装为一组通用简洁的API,极大方便了上层应用与存储引擎的交互成本。
其中包含了很多Go语言的技巧,例如大量的内存变量复用设计,展示了这个库对高性能的极致追求,是一个十分值得学习的优秀工程案例。
往期回顾
1. 从数字到版面:得物数据产品里数字格式化的那些事
2. 一文解析得物自建 Redis 最新技术演进
3. Golang HTTP请求超时与重试:构建高可靠网络请求|得物技术
4. RN与hawk碰撞的火花之C++异常捕获|得物技术
5. 得物TiDB升级实践
文 /酒米
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
从数字到版面:得物数据产品里数字格式化的那些事
一文解析得物自建 Redis 最新技术演进
Golang HTTP请求超时与重试:构建高可靠网络请求|得物技术
得物TiDB升级实践
一、背 景
得物DBA自2020年初开始自建TiDB,5年以来随着NewSQL数据库迭代发展、运维体系逐步完善、产品自身能力逐步提升,接入业务涵盖了多个业务线和关键场景。从第一套TIDB v4.0.9 版本开始,到后来v4.0.11、v5.1.1、v5.3.0,在经历了各种 BUG 踩坑、问题调试后,最终稳定在 TIDB 5.3.3 版本。伴随着业务高速增长、数据量逐步增多,对 TiDB 的稳定性及性能也带来更多挑战和新的问题。为了应对这些问题,DBA团队决定对 TiDB 进行一次版本升级,收敛版本到7.5.x。本文基于内部的实践情况,从架构、新特性、升级方案及收益等几个方向讲述 TiDB 的升级之旅。
二、TiDB 架构
TiDB 是分布式关系型数据库,高度强兼容 MySQL 协议和 MySQL 生态,稳定适配 MySQL 5.7 和MySQL 8.0常用的功能及语法。随着版本的迭代,TiDB 在弹性扩展、分布式事务、强一致性基础上进一步针对稳定性、性能、易用性等方面进行优化和增强。与传统的单机数据库相比,TiDB具有以下优势:
- 分布式架构,拥有良好的扩展性,支持对业务透明灵活弹性的扩缩容能力,无需分片键设计以及开发运维。
- HTAP 架构支撑,支持在处理高并发事务操作的同时,对实时数据进行复杂分析,天然具备事务与分析物理隔离能力。
- 支持 SQL 完整生态,对外暴露 MySQL 的网络协议,强兼容 MySQL 的语法/语义,在大多数场景下可以直接替换 MySQL。
- 默认支持自愈高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务无感。
- 支持 ACID 事务,对于一些有强一致需求的场景友好,满足 RR 以及 RC 隔离级别,可以在通用开发框架完成业务开发迭代。
我们使用 SLB 来实现 TiDB 的高效负载均衡,通过调整 SLB 来管理访问流量的分配以及节点的扩展和缩减。确保在不同流量负载下,TiDB 集群能够始终保持稳定性能。在 TiDB 集群的部署方面,我们采用了单机单实例的架构设计。TiDB Server 和 PD Server 均选择了无本地 SSD 的机型,以优化资源配置,并降低开支。TiKV Server则配置在本地 SSD 的机型上,充分利用其高速读写能力,提升数据存储和检索的性能。这样的硬件配置不仅兼顾了系统的性能需求,又能降低集群成本。针对不同的业务需求,我们为各个组件量身定制了不同的服务器规格,以确保在多样化的业务场景下,资源得到最佳的利用,进一步提升系统的运行效率和响应速度。
三、TiDB v7 版本新特性
新版本带来了更强大的扩展能力和更快的性能,能够支持超大规模的工作负载,优化资源利用率,从而提升集群的整体性能。在 SQL 功能方面,它提升了兼容性、灵活性和易用性,从而助力复杂查询和现代应用程序的高效运行。此外,网络 IO 也进行了优化,通过多种批处理方法减少网络交互的次数,并支持更多的下推算子。同时,优化了Region 调度算法,显著提升了性能和稳定性。
四、TiDB升级之旅
4.1 当前存在的痛点
- 集群版本过低:当前 TiDB 生产环境(现网)最新版本为 v5.3.3,目前官方已停止对 4.x 和 5.x 版本的维护及支持,TiDB 内核最新版本为 v8.5.3,而被用户广泛采用且最为稳定的版本是 v7.5.x。
- TiCDC组件存在风险:TiCDC 作为增量数据同步工具,在 v6.5.0 版本以前在运行稳定性方面存在一定问题,经常出现数据同步延迟问题或者 OOM 问题。
- 备份周期时间长:集群每天备份时间大于8小时,在此期间,数据库备份会导致集群负载上升超过30%,当备份时间赶上业务高峰期,会导致应用RT上升。
- 集群偶发抖动及BUG:在低版本集群中,偶尔会出现基于唯一键查询的慢查询现象,同时低版本也存在一些影响可用性的BUG。比如在 TiDB v4.x 的集群中,TiKV 节点运行超过 2 年会导致节点自动重启。
4.2 升级方案:升级方式
TiDB的常见升级方式为原地升级和迁移升级,我们所有的升级方案均采用迁移升级的方式。
原地升级
- 优势:方式较为简单,不需要额外的硬件,升级过程中集群仍然可以对外提供服务。
- 劣势:该升级方案不支持回退、并且升级过程会有长时间的性能抖动。大版本(v4/v5 原地升级到 v7)跨度较大时,需要版本递增升级,抖动时间翻倍。
迁移升级
- 优势:业务影响时间较短、可灰度可回滚、不受版本跨度的影响。
- 劣势:搭建新集群将产生额外的成本支出,同时,原集群还需要部署TiCDC组件用于增量同步。
4.3 升级方案:集群调研
4.4 升级方案:升级前准备环境
4.5 升级方案:升级前验证集群
4.6 升级方案:升级中流量迁移
4.7 升级方案:升级后销毁集群
五、升级遇到的问题
5.1 v7.5.x版本查询SQL倾向全表扫描
表中记录数 215亿,查询 SQL存在合理的索引,但是优化器更倾向走全表扫描,重新收集表的统计信息后,执行计划依然为全表扫描。
走全表扫描执行60秒超时KILL,强制绑定索引仅需0.4秒。
-- 查询SQL
SELECT
*
FROM
fin_xxx_xxx
WHERE
xxx_head_id = 1111111111111111
AND xxx_type = 'XX0002'
AND xxx_user_id = 11111111
AND xxx_pay_way = 'XXX00000'
AND is_del IN ('N', 'Y')
LIMIT
1;
-- 涉及索引
KEY `idx_xxx` (`xxx_head_id`,`xxx_type`,`xxx_status`),
解决方案:
- 方式一:通过 SPM 进行 SQL 绑定。
- 方式二:调整集群参数 tidb_opt_prefer_range_scan,将该变量值设为 ON 后,优化器总是偏好区间扫描而不是全表扫描。
5.2 v7.5.x版本聚合查询执行计划不准确
集群升级后,在新集群上执行一些聚合查询或者大范围统计查询时无法命中有效索引。而低版本v4.x、5.x集群,会根据统计信息选择走合适的索引。
v4.0.11集群执行耗时:12秒,新集群执行耗时2分32.78秒
-- 查询SQL
select
statistics_date,count(1)
from
merchant_assessment_xxx
where
create_time between '2025-08-20 00:00:00' and '2025-09-09 00:00:00'
group by
statistics_date order by statistics_date;
-- 涉及索引
KEY `idx_create_time` (`create_time`)
解决方案:
方式一:调整集群参数tidb_opt_objective,该变量设为 determinate后,TiDB 在生成执行计划时将不再使用实时统计信息,这会让执行计划相对稳定。
六、升级带来的收益
版本升级稳定性增强:v7.5.x 版本的 TiDB 提供了更高的稳定性和可靠性,高版本改进了SQL优化器、增强的分布式事务处理能力等,加快了响应速度和处理大量数据的能力。升级后相比之前整体性能提升40%。特别是在处理复杂 SQL 和多索引场景时,优化器的性能得到了极大的增强,减少了全表扫描的发生,从而显著降低了 TiKV 的 CPU 消耗和 TiDB 的内存使用。
应用平均RT提升44.62%
原集群RT(平均16.9ms)
新集群RT(平均9.36ms)
新集群平均RT提升50%,并且稳定性增加,毛刺大幅减少
老集群RT(平均250ms)
新集群RT(平均125ms)
提升TiCDC同步性能:新版本在数据同步方面有了数十倍的提升,有效解决了之前版本中出现的同步延迟问题,提供更高的稳定性和可靠性。当下游需要订阅数据至数仓或风控平台时,可以使用TiCDC将数据实时同步至Kafka,提升数据处理的灵活性与响应能力。
缩短备份时间:数据库备份通常会消耗大量的CPU和IO资源。此前,由于备份任务的结束时间恰逢业务高峰期,经常导致应用响应时间(RT)上升等问题。通过进行版本升级将备份效率提升了超过50%。
高压缩存储引擎:新版本采用了高效的数据压缩算法,能够显著减少存储占用。同时,通过优化存储结构,能够快速读取和写入数据,提升整体性能。相同数据在 TiDB 中的存储占用空间更低,iDB 的3副本数据大小仅为 MySQL(主实例数据大小)的 55%。
完善的运维体验:新版本引入更好的监控工具、更智能的故障诊断机制和更简化的运维流程,提供了改进的 Dashboard 和 Top SQL 功能,使得慢查询和问题 SQL 的识别更加直观和便捷,降低 DBA 的工作负担。
更秀更实用的新功能:TiDB 7.x版本提供了TTL定期自动删除过期数据,实现行级别的生命周期控制策略。通过为表设置 TTL 属性,TiDB 可以周期性地自动检查并清理表中的过期数据。此功能在一些场景可以有效节省存储空间、提升性能。TTL 常见的使用场景:
- 定期删除验证码、短网址记录
- 定期删除不需要的历史订单
- 自动删除计算的中间结果
七、选择 TiDB 的原因
我们不是为了使用TiDB而使用,而是去解决一些MySQL无法满足的场景,关系型数据库我们还是优先推荐MySQL。能用分库分表能解决的问题尽量选择MySQL,毕竟运维成本相对较低、数据库版本更加稳定、单点查询速度更快、单机QPS性能更高这些特性是分布式数据库无法满足的。
- 非分片查询场景:上游 MySQL 采用了分库分表的设计,但部分业务查询无法利用分片。通过自建 DTS 将 MySQL 数据同步到 TiDB 集群,非分片/聚合查询则使用 TiDB 处理,能够在不依赖原始分片结构的情况下,实现高效的数据查询和分析。
- 分析 SQL 多场景:业务逻辑比较复杂,往往存在并发查询和分析查询的需求。通过自建 DTS 将 MySQL 数据同步到 TiDB,复杂查询在TiDB执行、点查在MySQL执行。TiDB支持水平扩展,其分布式计算和存储能力使其能够高效处理大量的并发查询请求。既保障了MySQL的稳定性,又提升了整体的查询能力。
- 磁盘使用大场景:在磁盘使用率较高的情况下,可能会出现 CPU 和内存使用率低,但磁盘容量已达到 MySQL 的瓶颈。TiDB 能够自动进行数据分片和负载均衡,将数据分布在多个节点上, 缓解单一节点的磁盘压力,避免了传统 MySQL 中常见的存储瓶颈问题,从而提高系统的可扩展性和灵活性。
- 数据倾斜场景:在电商业务场景上,每个电商平台都会有一些销量很好的头部卖家,数据量会很大。即使采取了进行分库分表的策略,仍难以避免大卖家的数据会存储在同一实例中,这样会导致热点查询和慢 SQL 问题,尽管可以通过添加索引或进一步分库分表来优化,但效果有限。采用分布式数据库能够有效解决这一问题。可以将数据均匀地分散存储在多个节点上,在查询时则能够并发执行,从而将流量分散,避免热点现象的出现。随着业务的快速发展和数据量的不断增长,借助简单地增加节点,即可实现水平扩展,满足海量数据及高并发的需求。
八、总结
综上所述,在本次 TiDB 集群版本升级到 v7.5.x 版本过程中,实现了性能和稳定性提升。通过优化的查询计划和更高效的执行引擎,数据读取和写入速度显著提升,大幅度降低了响应延迟,提升了在高并发操作下的可靠性。通过直观的监控界面和更全面的性能分析工具,能够更快速地识别和解决潜在问题,降低 DBA 的工作负担。也为未来的业务扩展和系统稳定性提供了强有力的支持。
后续依然会持续关注 TiDB 在 v8.5.x 版本稳定性、性能以及新产品特性带来应用开发以及运维人效收益进展。目前 TiDB 内核版本 v8.5.x 已经具备多模数据库 Data + AI 能力,在JSON函数、ARRAY 索引以及 Vector Index 实现特性。同时已经具备 Resource Control 资源管理能力,适合进行多业务系统数据归集方案,实现数据库资源池化多种自定义方案。技术研究方面我们数据库团队会持续投入,将产品最好的解决方案引入现网环境。
往期回顾
-
得物管理类目配置线上化:从业务痛点到技术实现
-
大模型如何革新搜索相关性?智能升级让搜索更“懂你”|得物技术
-
RAG—Chunking策略实战|得物技术
-
告别数据无序:得物数据研发与管理平台的破局之路
-
从一次启动失败深入剖析:Spring循环依赖的真相|得物技术
文 /岱影
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
得物管理类目配置线上化:从业务痛点到技术实现
一、引言
在电商交易领域,管理类目作为业务责权划分、统筹、管理核心载体,随着业务复杂性的提高,其规则调整频率从最初的 1 次 / 季度到多次 / 季度,三级类目的规则复杂度也呈指数级上升。传统依赖数仓底层更新的方式暴露出三大痛点:
- 行业无法自主、快速调管理类目;
- 业务管理类目规则调整,不支持校验类目覆盖范围是否有重复/遗漏,延长交付周期;
- 规则变更成功后、下游系统响应滞后,无法及时应用最新类目规则。
本文将从技术视角解析 “管理类目配置线上化” 项目如何通过全链路技术驱动,将规则迭代周期缩短至 1-2 天。
二、业务痛点与技术挑战:为什么需要线上化?
2.1 效率瓶颈:手工流程与
高频迭代的矛盾
问题场景:业务方需线下通过数仓提报规则变更,经数仓开发、测试、BI需要花费大量精力校验确认,一次类目变更需 3-4 周左右时间才能上线生效,上线时间无法保证。
技术瓶颈:数仓离线同步周期长(T+1),规则校验依赖人工梳理,无法应对 “商品类目量级激增”。
2.2 质量风险:规则复杂度与
校验能力的失衡
典型问题:当前的管理类目映射规则,依赖业务收集提报,但从实际操作看管理三级类目映射规则提报质量较差(主要原因为:业务无法及时校验提报规则是否准确,是否穷举完善,是否完全无交叉),存在大量重复 / 遗漏风险。
2.3 系统耦合:底层变更对
下游应用的多米诺效应
连锁影响:管理类目规则变更会需同步更新交易后台、智能运营系统、商运关系工作台等多下游系统,如无法及时同步,可能会影响下游应用如商运关系工作台的员工分工范围的准确性,影响商家找人、资质审批等场景应用。
三、技术方案:从架构设计到核心模块拆解
3.1 分层架构:解耦业务与数据链路
3.2 核心模块技术实现
规则生命周期管理: 规则操作流程
提交管理类目唯一性校验规则
新增:id为空,则为新增
删除:当前db数据不在提交保存列表中
更新:名称或是否兜底类目或规则改变则发生更新【其中如果只有名称改变则只触发审批,不需等待数据校验,业务规则校验逻辑为将所有规则包含id,按照顺序排序拼接之后结果是否相等】
多级类目查询
构建管理类目树
/**
* 构建管理类目树
*/
public List<ManagementCategoryDTO> buildTree(List<ManagementCategoryEntity> managementCategoryEntities) {
Map<Long, ManagementCategoryDTO> managementCategoryMap = new HashMap<>();
for (ManagementCategoryEntity category : managementCategoryEntities) {
ManagementCategoryDTO managementCategoryDTO = ManagementCategoryMapping.convertEntity2DTO(category);
managementCategoryMap.put(category.getId(), managementCategoryDTO);
}
// 找到根节点
List<ManagementCategoryDTO> rootNodes = new ArrayList<>();
for (ManagementCategoryDTO categoryNameDTO : managementCategoryMap.values()) {
//管理一级类目 parentId是0
if (Objects.equals(categoryNameDTO.getLevel(), ManagementCategoryLevelEnum.FIRST.getId()) && Objects.equals(categoryNameDTO.getParentId(), 0L)) {
rootNodes.add(categoryNameDTO);
}
}
// 构建树结构
for (ManagementCategoryDTO node : managementCategoryMap.values()) {
if (node.getLevel() > ManagementCategoryLevelEnum.FIRST.getId()) {
ManagementCategoryDTO parentNode = managementCategoryMap.get(node.getParentId());
if (parentNode != null) {
parentNode.getItems().add(node);
}
}
}
return rootNodes;
}
填充管理类目规则
/**
* 填充规则信息
*/
private void populateRuleData
(List<ManagementCategoryDTO> managementCategoryDTOS, List<ManagementCategoryRuleEntity> managementCategoryRuleEntities) {
if (CollectionUtils.isEmpty(managementCategoryDTOS) || CollectionUtils.isEmpty(managementCategoryRuleEntities)) {
return;
}
List<ManagementCategoryRuleDTO> managementCategoryRuleDTOS =managementCategoryMapping.convertRuleEntities2DTOS(managementCategoryRuleEntities);
// 将规则集合按 categoryId 分组
Map<Long, List<ManagementCategoryRuleDTO>> rulesByCategoryIdMap = managementCategoryRuleDTOS.stream()
.collect(Collectors.groupingBy(ManagementCategoryRuleDTO::getCategoryId));
// 递归填充规则到树结构
fillRulesRecursively(managementCategoryDTOS, rulesByCategoryIdMap);
}
/**
* 递归填充规则到树结构
*/
private static void fillRulesRecursively
(List<ManagementCategoryDTO> managementCategoryDTOS, Map<Long, List<ManagementCategoryRuleDTO>> rulesByCategoryIdMap) {
if (CollectionUtils.isEmpty(managementCategoryDTOS) || MapUtils.isEmpty(rulesByCategoryIdMap)) {
return;
}
for (ManagementCategoryDTO node : managementCategoryDTOS) {
// 获取当前节点对应的规则列表
List<ManagementCategoryRuleDTO> rules = rulesByCategoryIdMap.getOrDefault(node.getId(), new ArrayList<>());
node.setRules(rules);
// 递归处理子节点
fillRulesRecursively(node.getItems(), rulesByCategoryIdMap);
}
}
状态机驱动:管理类目生命周期管理
超时机制 :基于时间阈值的流程阻塞保护
其中,为防止长时间运营处于待确认规则状态,造成其他规则阻塞规则修改,定时判断待确认规则状态持续时间,当时间超过xxx时间之后,则将待确认状态改为长时间未操作,放弃变更状态,并飞书通知规则修改人。
管理类目状态变化级联传播策略
类目生效和失效状态为级联操作。规则如下:
- 管理二级类目有草稿状态时,不允许下挂三级类目的编辑;
- 管理三级类目有草稿状态时,不允许对应二级类目的规则编辑;
- 类目生效失效状态为级联操作,上层修改下层级联修改状态,如果下层管理类目存在草稿状态,则自动更改为放弃更改状态。
规则变更校验逻辑
当一次提交,可能出现的情况如下。一次提交可能会产生多个草稿,对应多个审批流程。
新增管理类目规则:
- 一级管理类目可以直接新增(点击新增一级管理类目)
- 二级管理类目和三级管理类目不可同时新增
- 三级管理类目需要在已有二级类目基础上新增
只有名称修改触发直接审批,有规则修改需要等待数仓计算结果之后,运营提交发起审批。
交互通知中心:飞书卡片推送
- 变更规则数据计算结果依赖数仓kafka计算结果回调。
- 基于飞书卡片推送数仓计算结果,回调提交审批和放弃变更事件。
飞书卡片:
卡片结果
卡片操作结果
审批流程:多维度权限控制与飞书集成
提交审批的四种情况:
- 名称修改
- 一级类目新增
- 管理类目规则修改
- 生效失效变更
审批通过,将草稿内容更新到管理类目表中,将管理类目设置为生效中。
审批驳回,清空草稿内容。
审批人分配机制:多草稿并行审批方案
一次提交可能会产生多个草稿,对应多个审批流程。
审批逻辑
public Map<String, List<String>> buildApprover(
ManagementCategoryDraftEntity draftEntity,
Map<Long, Set<String>> catAuditorMap,
Map<String, String> userIdOpenIdMap,
Integer hasApprover) {
Map<String, List<String>> nodeApprover = new HashMap<>();
// 无审批人模式,直接查询超级管理员
if (!Objects.equals(hasApprover, ManagementCategoryUtils.HAS_APPROVER_YES)) {
nodeApprover.put(ManagementCategoryApprovalField.NODE_SUPER_ADMIN_AUDIT,
queryApproverList(0L, catAuditorMap, userIdOpenIdMap));
return nodeApprover;
}
Integer level = draftEntity.getLevel();
Integer draftType = draftEntity.getType();
boolean isEditOperation = ManagementCategoryDraftTypeEnum.isEditOp(draftType);
// 动态构建审批链(支持N级类目)
List<Integer> approvalChain = buildApprovalChain(level);
for (int i = 0; i < approvalChain.size(); i++) {
int currentLevel = approvalChain.get(i);
Long categoryId = getCategoryIdByLevel(draftEntity, currentLevel);
// 生成节点名称(如:NODE_LEVEL2_ADMIN_AUDIT)
String nodeKey = String.format(
ManagementCategoryApprovalField.NODE_LEVEL_X_ADMIN_AUDIT_TEMPLATE,
currentLevel
);
// 编辑操作且当前层级等于提交层级时,添加本级审批人 【新增的管理类目没有还没有对应的审批人】
if (isEditOperation && currentLevel == level) {
addApprover(nodeApprover, nodeKey, categoryId, catAuditorMap, userIdOpenIdMap);
}
// 非本级审批人(上级层级)
if (currentLevel != level) {
addApprover(nodeApprover, nodeKey, categoryId, catAuditorMap, userIdOpenIdMap);
}
}
return nodeApprover;
}
private List<Integer> buildApprovalChain(Integer level) {
List<Integer> approvalChain = new ArrayList<>();
if (level == 3) {
approvalChain.add(2); // 管二审批人
approvalChain.add(1); // 管一审批人
} else if (level == 2) {
approvalChain.add(2); // 管二审批人
approvalChain.add(1); // 管一审批人
} else if (level == 1) {
approvalChain.add(1); // 管一审批人
approvalChain.add(0); // 超管
}
return approvalChain;
}
3.3 数据模型设计
3.4 数仓计算逻辑
同步数据方式
方案一:
每次修改规则之后通过调用SQL触发离线计算
优势:通过SQL调用触发计算,失效性较高
劣势:ODPS 资源峰值消耗与SQL脚本耦合问题
- 因为整个规则修改是三级类目维度,如果同时几十几百个类目触发规则改变,会同时触发几十几百个离线任务。同时需要大量ODPS 资源;
- 调用SQL方式需要把当前规则修改和计算逻辑的SQL一起调用计算。
方案二:
优势:同时只会产生一次规则计算
劣势:实时性受限于离线计算周期
- 实时性取决于离线规则计算的定时任务配置和离线数据同步频率,实时性不如直接调用SQL性能好
- 不重不漏为当前所有变更规则维度
技术决策:常态化迭代下的最优解
考虑到管理类目规则平均变更频率不高,且变更时间点较为集中(非紧急场景占比 90%),故选择定时任务方案实现:
- 资源利用率提升:ODPS 计算资源消耗降低 80%,避免批量变更时数百个任务同时触发的资源峰值;
- 完整性保障:通过全量维度扫描确保规则校验无遗漏,较 SQL 触发方案提升 20% 校验覆盖率;
- 可维护性优化:减少 SQL 脚本与业务逻辑的强耦合,维护成本降低 80%。
数据取数逻辑
生效中规则计算
草稿+生效中规格计算
如果是新增管理类目,直接参与计算。
如果是删除管理类目,需要将该删除草稿中对应的生效管理类目排除掉。
如果是更新:需要将草稿中的管理类目和规则替换生效中对应的管理类目和规则。
数仓实现
数据流程图
四、项目成果与技术价值
预期效率提升:从 “周级” 到 “日级” 的跨越
- 管理一级 / 二级类目变更开发零成本,无需额外人力投入
- 管理三级类目变更相关人力成本降低 100%,无需额外投入开发资源
- 规则上线周期压缩超 90%,仅需 1 - 2 天即可完成上线
质量保障:自动化校验替代人工梳理
- 规则重复 / 遗漏检测由人工梳理->自动化计算
- 下游感知管理类目规则变更由人工通知->实时感知
技术沉淀:规则模型化能力
沉淀管理类目规则配置模型,支持未来四级、五级多级管理类目快速适配。
五、总结
未来优化方向:
- 规则冲突预警:基于AI预测高风险规则变更,提前触发校验
- 接入flink做到实时计算管理类目和对应商品关系
技术重构的本质是 “释放业务创造力”
管理类目配置线上化项目的核心价值,不仅在于技术层面的效率提升,更在于通过自动化工具链,让业务方从 “规则提报的执行者” 转变为 “业务策略的设计者”。当技术架构能够快速响应业务迭代时,企业才能在电商领域的高频竞争中保持创新活力。
往期回顾
-
大模型如何革新搜索相关性?智能升级让搜索更“懂你”|得物技术
-
RAG—Chunking策略实战|得物技术
-
告别数据无序:得物数据研发与管理平台的破局之路
-
从一次启动失败深入剖析:Spring循环依赖的真相|得物技术
-
Apex AI辅助编码助手的设计和实践|得物技术
文 /维山
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。