普通视图

发现新文章,点击刷新页面。
今天 — 2025年9月19日首页

TDMQ CKafka 版客户端实战指南系列之二:消费消息最佳实践

2025年9月19日 15:32

导语

在数字化时代,消息队列系统已成为企业架构中不可或缺的一部分,其中,TDMQ CKafka 版作为一种高效、可扩展的分布式消息系统,广泛应用于各类业务场景中。上一篇我们深入探讨了 TDMQ CKafka 版的生产实践,从消息发送、分区策略到高可用性保障,全方位解析了如何在生产环境中高效利用 TDMQ CKafka 版。本文将接续前文,聚焦于 TDMQ CKafka 版的消费实践,探讨如何稳扎稳打、精准消费,确保消息从生产到消费的完整链条顺畅无阻。

在消费篇中,我们将详细阐述消费消息的基本流程、负载均衡机制、应对重平衡的策略、订阅关系的管理、消费位点的控制、消息重复与消费幂等性的处理、消费失败的应对、消费延迟与堆积的解决,以及如何通过调整套接字缓冲区、模拟消息广播、实现消息过滤等高级技巧,优化 TDMQ CKafka 版的消费性能。

接下来,让我们一同深入探索 TDMQ CKafka 版的消费实践,解锁高效消息处理的秘诀。

消费篇:稳扎稳打,精准消费

消费消息流程

消费消息的基本流程并不复杂,首先是 Poll 数据,消费者从消息队列中拉取消息;接着执行消费逻辑,对拉取到的消息进行处理;处理完成后再次 Poll 数据,如此循环往复。例如,在一个电商订单处理系统中,消费者从消息队列中拉取订单消息,然后根据订单信息进行库存扣减、订单状态更新等操作,完成后继续拉取下一批订单消息。

负载均衡机制

负载均衡

负载均衡在消费过程中起着关键作用。每个 Consumer Group 可以包含多个 Consumer ,只要将参数 group.id 设置成相同的值,这些 Consumer 就属于同一个 Consumer Group,共同负责消费订阅的 Topic。

例如:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。TDMQ CKafka 版默认会均匀地把消息传给各个消费实例,以做到消费负载均衡。

TDMQ CKafka 版负载均衡的内部原理是:把订阅的 Topic 的分区,平均分配给各个 Consumer。因此,Consumer 的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态,尽量保证消费者数量能被分区总数整除。除了第一次启动上线之外,后续消费实例发生重启、增加、减少,分区数发生增加等变更时,都会触发一次重均衡。

应对重均衡

如果频繁出现 Rebalance,可能有多种原因。

1、  消费者消费处理耗时很长,比如在处理一些复杂的业务逻辑时,可能需要进行多次数据库查询或远程接口调用,这会导致消费速度变慢;

2、  消费某一个异常消息也可能导致消费者阻塞或者失败,例如消息格式错误,消费者无法解析;

3、  心跳超时同样会引发 Rebalance 。

4、  在 v0.10.2 之前版本的客户端,Consumer 没有独立线程维持心跳,而是把心跳维持与 Poll 接口耦合在一起,若用户消费出现卡顿,就会导致 Consumer 心跳超时,引发 Rebalance;在 v0.10.2 及之后版本的客户端,如果消费时间过慢,超过一定时间(max.poll.interval.ms 设置的值,默认5分钟)未进行 Poll 拉取消息,则会导致客户端主动离开队列,而引发 Rebalance。

可以通过优化消费处理提高消费速度和参数调整等方法解决:

1、  消费端需要和 Broker 版本保持一致。

2、  可以参考以下说明调整参数值:

● session.timeout.ms:在 v0.10.2 之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过 30s,建议设置为25s ,而 v0.10.2 及其之后的版本,保持默认值10s即可;

● max.poll.records:降低该参数值,建议远远小于单个线程每秒消费的条数 * 消费线程的个数 * max.poll.interval.ms / 1000 的值;

● max.poll.interval.ms :该值要大于 max.poll.records / (单个线程每秒消费的条数 * 消费线程的个数 ) 的值。

3、  尽量提高客户端的消费速度,将消费逻辑另起线程进行处理,并针对耗时进行监控。

4、  减少 Group 订阅 Topic 的数量,一个 Group 订阅的 Topic 最好不要超过5个,建议一个 Group 只订阅一个 Topic。

主题订阅关系

在订阅关系方面,同一个 Consumer Group 内,建议客户端订阅的 Topic 保持一致,即一个 Consumer Group 订阅一个 Topic,这样可以避免给排查问题带来更多复杂度。

Consumer Group 订阅多个 Topic

一个 Consumer Group 可以订阅多个 Topic ,此时多个 Topic 的消息会被 Cosumer Group 中的 Consumer 均匀消费。例如 Consumer Group A 订阅了 Topic A、Topic B、Topic C,则这三个 Topic 中的消息,被 Consumer Group 中的 Consumer 均匀消费。

Consumer Group 订阅多个 Topic 的示例代码如下:

String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
    subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);

Topic 被多个 Consumer Group 订阅

一个 Topic 可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费 Topic 下的所有消息。例如 Consumer Group A 订阅了 Topic A,Consumer Group B 也订阅了 Topic A,则发送到 Topic A 的每条消息,不仅会传一份给 Consumer Group A 的消费实例,也会传一份给 Consumer Group B 的消费实例,且这两个过程相互独立,相互没有任何影响。

一个 Consumer Group 对应一个应用

建议一个 Consumer Group 对应一个应用,即不同的应用对应不同的代码。如果您需要将不同的代码写在同一个应用中,请准备多份不同的 kafka.properties。例如:kafka1.properties、kafka2.properties。

消费位点解析

每个 Topic 会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点 MaxOffset。

TDMQ CKafka 版的 Consumer 会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为消费位点 ConsumerOffset。

剩余的未消费的条数(也称为消息堆积量)=MaxOffset-ConsumerOffset。

Offset 提交

TDMQ CKafka 版的 Consumer 有两个相关参数:

● enable.auto.commit:默认值为 True。

● auto.commit.interval.ms: 默认值为5000,即5s。

这两个参数组合的结果为:每次 Poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数 auto.commit.interval.ms 规定的时长,则客户端会启动位点提交动作。

因此,如果将 enable.auto.commit 设置为 True,则需要在每次 Poll 数据时,确保前一次 Poll 出来的数据已经消费完毕,否则可能导致位点跳跃。

如果想自己控制位点提交,请把 enable.auto.commit 设为 False,并调用 Commit(Offsets) 函数自行控制位点提交。

注意:

尽量避免提交位点请求过于频繁,否则容易导致 Broker CPU 很高,影响正常的服务。例如自动提交位点设置 auto.commit.interval.ms 为100ms,手动提交位点,在高吞吐场景下,每消费一条消息提交一个位点。

重置 Offset

以下两种情况,会发生消费位点重置:

● 当服务端不存在曾经提交过的位点时(例如客户端第一次上线)。

● 当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。

Java 客户端可以通过 auto.offset.reset 来配置重置策略,主要有三种策略:

● Latest:从最大位点开始消费。

● Earliest:从最小位点开始消费。

● None:不做任何操作,即不重置。

说明:

建议设置成 Latest,而不要设置成 Earliest,避免因位点非法时从头开始消费,从而造成大量重复。

如果是您自己管理位点,可以设置成 None。

拉取消息优化

拉取消息

消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时需要控制拉取速度,注意以下参数设置:

● max.poll.records:如果单条消息超过1MB,建议设置为1。

● max.partition.fetch.bytes:设置为比单条消息的大小略大一点。

● fetch.max.bytes:设置为比单条消息的大小略大一点。

通过公网消费消息时,通常会因为公网带宽的限制导致连接被断开,此时需要注意控制拉取速度,修改配置:

● fetch.max.bytes:建议设置成公网带宽的一半(注意该参数的单位是 bytes,公网带宽的单位是 bits)。

● max.partition.fetch.bytes:建议设置成 fetch.max.bytes 的三分之一或者四分之一。

拉取大消息

消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时,需要注意控制拉取速度,注意修改配置:

● max.poll.records:每次 Poll 获取的最大消息数量。如果单条消息超过1MB,建议设置为1。

● fetch.max.bytes:设置比单条消息的大小略大一点。

● max.partition.fetch.bytes:设置比单条消息的大小略大一点。

拉取大消息的核心是逐条拉取。

消息异常处理

消息重复和消费幂等

TDMQ CKafka 版消费的语义是 at least once, 也就是至少投递一次,保证消息不丢失,但是无法保证消息不重复。在出现网络问题、客户端重启时均有可能造成少量重复消息,此时应用消费端如果对消息重复比较敏感(例如订单交易类),则应该做消息幂等。

以数据库类应用为例,常用做法为:

  • 发送消息时,传入 Key 作为唯一流水号 ID。

  • 消费消息时,判断 Key 是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次。

当然,如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。

消费失败

TDMQ CKafka 版是按分区消息顺序逐条向前推进消费的,如果消费端拿到某条消息后执行消费逻辑失败,例如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,那么有以下两种处理方式:

失败后一直尝试再次执行消费逻辑。这种方式有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。

由于 Kafka 没有处理失败消息的设计,实践中通常会打印失败的消息或者存储到某个服务(例如创建一个 Topic 专门用来放失败的消息),然后定时检查失败消息的情况,分析失败原因,根据情况处理。

消费延迟

消费过程是由客户端主动去服务端拉取消息。一般情况下,如果客户端能够及时消费,则不会产生较大延迟。若产生了较大延迟,请先关注是否有堆积,并注意提高消费速度。

消费堆积

通常造成消息堆积的原因是:

● 消费速度跟不上生产速度,此时应该提高消费速度。

● 消费端产生了阻塞。

● 消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。

消费端应该尽量避免堵塞消费线程,如果存在等待调用结果的情况,建议设置等待的超时时间,超时后作为消费失败进行处理。

提高消费速度

方式1:增加 Consumer 实例个数提高并行处理能力,如果消费者和分区数已经1:1,可以考虑增加分区数(注意:对于 Flink 自动维护分区的场景不会自动感知新增分区后可能需要修改相关代码后重启)。 可以在进程内直接增加(需要保证每个实例对应一个线程),也可以部署多个消费实例进程。

说明:

实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作。

方式2:增加消费线程。

  1. 定义一个线程池。

  2. Poll 数据。

  3. 把数据提交到线程池进行并发处理。

  4. 等并发结果返回成功后,再次 poll 数据执行。

消费某些分区不消费

消费者在消费过程中,可能遇到消费者在线,但是某些分区的位点一致不前进,可能原因如下:

  1. 遇到一条异常消息,可能是超大消息,格式异常,导致消费者拉取消息时候,转换成业务位点。

  2. 使用公网带宽,带宽较小,拉取大消息时候直接把带宽打满,导致在超时时间内拉取不到消息。

  3. 消费者假死,导致不去拉取。

解决方式:

关掉消费者,在 TDMQ CKafka 版控制台设置位点,跳过某些异常消息,或者优化消费代码,然后重启消费者消费。

消息订阅模式

消息广播

Kafka 目前没有消息广播的语义,可以通过创建不同的 Group 来模拟实现。

消息过滤

Kafka 自身没有消息过滤的语义。实践中可以采取以下两个办法:

● 如果过滤的种类不多,可以采取多个 Topic 的方式达到过滤的目的。

● 如果过滤的种类多,则最好在客户端业务层面自行过滤。

实践中请根据业务具体情况进行选择,也可以综合运用上面两种办法。

总结:回顾要点,展望应用

通过前面的学习,我们系统地了解了生产消费的关键要点。在生产方面,从 Topic 的使用与创建,到分区数的估算、重试策略的设置,再到发送模式和参数的优化,以及 Key、Value 和分区策略的应用,每一个环节都需要我们精心配置,以确保高效、稳定的生产。在消费方面,消费的基本流程、负载均衡的实现、应对重均衡的策略、订阅关系的管理、Offset 的管理与拉取策略,以及处理消息重复和消费失败的方法,这些都是我们在消费过程中需要重点关注的内容。

这些生产消费知识在实际应用中具有巨大的价值。希望大家能将所学的生产消费知识运用到实际工作和生活中,不断探索和实践。也期待大家在实践过程中,能总结出更多宝贵的经验,欢迎在留言区分享你们的实践故事和心得,让我们一起共同成长,共同进步 。

TDMQ CKafka 版客户端实战指南系列之一:生产最佳实践

2025年9月19日 15:04

导语

在当今数字化时代,数据的产生和流动呈爆发式增长,消息队列作为一种高效的数据传输和处理工具,在各种应用场景中发挥着关键作用。TDMQ CKafka 版作为一款分布式、高吞吐量、高可扩展性的消息系统,100% 兼容开源 Kafka API 2.4、2.8、3.2 版本 ,基于发布 / 订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。凭借高可用、数据压缩、同时支持离线和实时数据处理等优点,TDMQ CKafka 版广泛应用于日志压缩收集、监控数据聚合、流式数据集成等场景。

对于开发者而言,深入了解并熟练掌握 TDMQ CKafka 版的生产消费实践至关重要。它不仅能够帮助我们构建高效、稳定的数据传输和处理系统,还能在面对海量数据时,确保系统的性能和可靠性。本文将详细介绍 TDMQ CKafka 版的生产实践教程,包括生产消息的各个环节以及相关的参数配置和最佳实践,希望能为大家在实际项目中应用 TDMQ CKafka 版提供有益的参考和指导。

生产篇:步步为营,高效生产

Topic 使用与创建

配置要求:推荐节点的整倍数副本,减少数据倾斜问题,同步复制最小同步副本数为2,且同步副本数不能等于 Topic 副本数,否则宕机1个副本会导致无法生产消息。

创建方式:支持选择是否开启 CKafka 自动创建 Topic 的开关。选择开启后,表示生产或消费一个未创建的 Topic 时,会自动创建一个默认值包含3个分区和2个副本的 Topic,控制台支持修改默认值。

分区数估计

分区数的准确估算能实现数据的均衡分布。为了达到这个目的,分区数建议为节点数的整倍数。同时,还需结合预估流量来设置,按照 10MB/s 一个分区的标准来计算。例如,若一个 Topic 的预估吞吐为 100MB/s,那么建议设置分区数为 10。这样可以确保在高流量情况下,消息能均匀地分布在各个分区,避免某个分区负载过高。

失败重试

在分布式环境中,由于网络等原因,消息发送偶尔会失败,其原因可能是消息已经发送成功但是 ACK 机制失败或者是消息确实没有发送成功,这就需要设置合理的重试策略,您可以根据业务需求,设置以下重试参数:

  • Retries:用于设置重试次数,默认值为 3。重试不成功会触发报错,如果客户不接受消息丢失,建议改重试次数或者手动重试。

  • Retry.backoff.ms:设置重试间隔,建议设置为 1000。这个间隔时间可以让生产者在重试前等待一段时间,避免在短时间内频繁重试。

这样将能应对 Broker 的 Leader 分区出现无法立刻响应 Producer 请求的情况。

异步发送

消息发送接口通常是异步的,这意味着生产者在发送消息后不需要等待消息被完全处理就可以继续执行其他任务。如果想要接收发送的结果,可以使用 Send 方法中的 Callback 接口获取发送结果。

一个 Producer 对应一个应用

Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,建议一个应用对应一个 Producer。

Acks

Kafka 的 ACK 机制,指 Producer 的消息发送确认机制,Acks 参数决定了生产者在发送消息后等待服务端响应的方式,对 Kafka 集群的吞吐量和消息可靠性有直接影响。

Acks 的参数说明如下:

  • Acks=0 时,当生产者采用无确认机制时,消息发送后无需等待任何 Broker 节点的响应即可继续执行,这种模式可获得最高的吞吐性能,但因缺乏写入保障机制,存在较高的数据丢失风险;

  • Acks=1 时,采用主节点单确认机制时,生产者仅需等待 Leader 副本完成消息写入即会收到确认响应。该模式在性能与可靠性间取得平衡,但需注意:若 Leader 节点在同步完成前发生故障,已发送但未同步的消息存在部分丢失的可能性;

  • Acks=all 时,启用全副本确认机制时,生产者必须等待 Leader 副本及所有同步副本(ISR 集合)均完成消息持久化后才会收到确认。虽然该模式通过多重冗余保障实现了最高级别的数据安全性(仅当整个 ISR 集群同时失效时才会丢失数据),但跨节点同步带来的延迟使其吞吐性能相对较低。

一般建议选择 Acks=1,对于重要的服务可以设置 Acks=all 。

Batch

通常情况下,TDMQ CKafka 版的 Topic 会设置多个分区。Producer 客户端向服务端发送消息时,要先明确消息要发送到哪个 Topic 的哪个分区。当向同一分区发送多条消息时,Producer 客户端会把这些消息整合成一个 Batch,批量发送至服务端。不过,Producer 客户端在处理 Batch 时会产生额外开销。一般来说,小 Batch 会使 Producer 客户端产生大量请求,致使请求在客户端和服务端堆积排队,还会提高相关机器的 CPU 使用率,进而整体抬高消息发送和消费的延迟。而设置一个合适的 Batch 大小,能减少客户端向服务端发送消息时的请求次数,从整体上提升消息发送的吞吐量。

以下是 Batch 相关参数的说明:

  • Batch.size:这是发往每个分区的消息缓存量阈值,当缓存的消息量达到这个设定值时,就会触发一次网络请求,随后 Producer 客户端会把消息批量发送到服务器。

  • Linger.ms:它规定了每条消息在缓存中的最长停留时间,如果消息在缓存中的时间超过这个值,Producer 客户端就会不再遵循 Batch.size 的限制,直接把消息发送到服务器。

  • Buffer.memory:当所有缓存消息的总体大小超过这个数值时,就会触发消息发送到服务器的操作,此时会忽略 Batch.size 和 Linger.ms 的限制。Buffer.memory 的默认值是 32MB,对于单个 Producer 而言,这个数值足以保障其性能。

Key 和 Value

消息队列中的消息有 Key(消息标识)和 Value(消息内容)两个字段。为消息设置一个唯一的 Key 便于追踪消息,通过打印发送日志和消费日志,就能了解该消息的生产和消费情况。比如在电商订单系统中,将订单号作为 Key,就可以轻松追踪订单消息的流转过程。如果消息发送量较大,建议不要设置 Key,并使用黏性分区策略。

黏性分区

在消息队列 Kafka 中,只有被发送至同一分区的消息才会被归入同一个 Batch,所以 Kafka Producer 端配置的分区策略是影响 Batch 形成的关键因素之一。Kafka Producer 支持用户通过自定义 Partitioner 实现类,来契合业务需求选择合适的分区方式。

当消息指定了 Key 时,Kafka Producer 默认会先对消息的 Key 进行哈希计算,再依据哈希结果选定分区,以此确保相同 Key 的消息能够发送到同一分区。

当消息未指定 Key 时,在 Kafka 2.4 版本之前,其默认分区策略是按顺序循环遍历主题下的所有分区,以轮询形式把消息依次发送到各分区。不过,这种默认策略在 Batch 聚合方面表现不佳,实际应用中容易生成大量小 Batch,进而导致消息处理的实际延迟上升。为改善无 Key 消息分区效率低的问题,Kafka 在 2.4 版本推出了黏性分区策略(Sticky Partitioning Strategy)。

黏性分区策略重点针对无 Key 消息被分散到不同分区、进而产生众多小 Batch 的问题。它的核心机制是,当某个分区的 Batch 处理完毕,会随机挑选另一个分区,之后尽可能让后续消息都发送到这个新选定的分区。从短期视角看,消息会集中发送到同一分区;但从长期运行来看,消息仍能均匀分布到各个分区。如此一来,既避免了消息在分区上分布不均(分区倾斜),又能降低延迟,提升整个服务的性能。

要是你用的 Kafka Producer 客户端版本是 2.4 及以上,那默认就会采用黏性分区策略。要是客户端版本低于 2.4,你可以依据黏性分区策略的原理,自己动手实现分区策略,再通过参数 Partitioner.class 来指定所设置的分区策略。

关于黏性分区策略的实现,下面给出了 Java 版的代码示例,其核心逻辑是按照一定时间间隔来切换分区。

public class MyStickyPartitioner implements Partitioner {
    // 记录上一次切换分区时间。
    private long lastPartitionChangeTimeMillis = 0L;
    // 记录当前分区。
    private int currentPartition = -1;
    // 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。
    private long partitionChangeTimeGap = 100L;
    public void configure(Map<String, ?> configs) {}
    /**      * Compute the partition for the given record.      *
     * @param topic The topic name      * @param key The key to partition on (or null if no key)      * @param keyBytes serialized key to partition on (or null if no key)      * @param value The value to partition on or null      * @param valueBytes serialized value to partition on or null      * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取所有分区信息。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();
            // 判断当前可用分区。
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // 对于有key的消息,根据key的哈希值选择分区。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();
        // 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }
    public void close() {}
}

分区顺序

单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。每个主题下面都有若干分区,如果消息被分配到不同的分区中,不同 Partition 之间不能保证顺序。

如果需要消息具有消费顺序性,可以在生产端指定这一类消息的 Key,这类消息都用相同的 Key 进行消息发送,CKafka 就会根据 Key 哈希取模选取其中一个分区进行存储,由于一个分区只能由一个消费者进行监听消费,此时消息就具有消息消费的顺序性了。

TDMQ CKafka 版顺序消息场景实践教程

顺序消息场景

在 TDMQ CKafka 版中,确保消息顺序性的主要手段依赖于其分区(Partition)设计以及消息 Key 的使用。客户端所涉及的顺序消息使用场景可分为两类:一是全局顺序场景,二是分区顺序场景。针对这两种场景,CKafka 的实践教程如下:

  1. 全局顺序:为保证全局顺序,在 CKafka 控制台,您需设置 Topic 分区为1,副本数可以根据具体使用场景和可用性要求以及平衡成本指定,建议设置为2。

    全局顺序由于单分区存在吞吐上限,因此整体吞吐不会太高,单分区吞吐指标请参见:cloud.tencent.com/document/pr…

  2. 分区顺序:为保证分区顺序,您可以根据预估 Topic 的业务流量,除以单分区流量,取整后获得分区数,同时为避免数据倾斜,分区数尽量向节点整倍数取整,从而确定最终合理的分区数。单分区的吞吐量可参见:cloud.tencent.com/document/pr…

    在发送 CKafka 消息时候,需要指定 Key,CKafka 会根据 Key 计算出一个哈希值,确保具有相同 Key 的消息会被发送到同一个分区,从而确保这些消息在分区内部是有序的。同时建议尽可能让业务 Key 分散,如果生产消息都指定同一个 Key,那么分区顺序会退化为全局顺序,从而降低整体的写入吞吐。

参数实践教程

由于顺序消息,要求消息有序、不重复,默认的 Kafka 生产者发送参数当遇到网络抖动、Kafka Broker 节点变化、分区 Leader 选举等场景,容易出现消息重复、乱序问题,因此顺序场景,必须对 Kafka 生产者参数进行特别设置,关键参数设置如下:

  • Enable.idempotence

Enable.idempotence 表示是否开启幂等功能。顺序场景建议开启幂等功能,应对上述场景出现的分区消息乱序、消息重复等问题。建议 Kafka 的 Producer 设置:Enable.idempotence 为 True。需要注意,该功能要确保 Kafka 的 Broker 版本大于等于0.11,即 Kafka versions >= 0.11,同时:从 Kafka 3.0开始包括3.0,Kafka 的 Producer 默认 Enable.idempotence=True 和 Acks=All ,而对于 Kafka 版本>=0.11且 Kafka<3.0 的版本,默认是关闭幂等的,因此建议顺序场景显式指定该参数值确保开启幂等。

  • Acks

在开启幂等后,Acks 需要显式指定为 All,如果不指定为 All 的话,则会无法通过参数校验从而报错。

  • Max.in.flight.requests.per.connection

默认情况下,Kafka 生产者会尝试尽快发送记录,Max.in.flight.requests.per.connection 表示一个 Connection 同时发送的最大请求数,默认值是5。Kafka 在 0.11 版本之后包括 0.11,小于 1.1 的版本,即(Kafka >= 0.11 & < 1.1),Kafka Broker 没有针对该方面优化,需要设置

Max.in.flight.requests.per.connection 为1,在 Kafka>=1.1 后,针对幂等场景的吞吐进行优化,在 Broker 端会维持一个队列对5个并发批次的消息的顺序进行顺序校验,允许 Max.in.flight.requests.per.connection 设置5,但不能大于5。

因此建议:

  • Kafka >= 0.11 & < 1.1:显式设置 Max.in.flight.requests.per.connection 为1。

  • Kafka>=1.1:显式设置 Max.in.flight.requests.per.connection 可以为 1<=Max.in.flight.requests.per.connection<=5;建议设置为5。

  • Retries

在顺序场景下,建议指定重试参数,Retries 在不同版本,有不同的默认行为,在 Kafka <= 2.0,默认为0;Kafka >= 2.1,默认为 Integer.MAX_VALUE,即2147483647;建议顺序场景,显式设置为 Integer.MAX_VALUE。

总结

在顺序场景中,需要开启的生产者参数示例如下:

Kafka >= 0.11 & < 1.1:

// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "1");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));

Kafka>=1.1:

// create Producer properties
Properties properties = new Properties();
properties.setProperty("enable.idempotence", "true");
properties.setProperty("acks", "all");
properties.setProperty("max.in.flight.requests.per.connection", "5");
properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));

数据倾斜

Kafka Broker 数据倾斜问题通常是由于分区分布不均匀或者生产者发送数据的 Key 分布不均匀导致的,会引发几类问题:

  1. 整体流量没有限流,但是节点局部限流;

  2. 某些节点负载过大,导致整体 Kafka 使用率不高,影响整体吞吐。

针对该类问题可以通过以下方式进行优化:

  1. 使用合理分区数,分区数保障为节点数的整倍数。

  2. 合理的分区策略,例如:RoundRobin(轮询)、Range(范围)和 Sticky(粘性)或者自定义的分区策略,均衡发送消息。

  3. 查是否使用 Key 进行发送,如果使用了 Key 进行发送,尽量设计策略让 Key 更加分区均衡。

总结

在消息队列的消息生产环节中,“高效” 不仅是吞吐的追求,更是稳定性与可靠性的平衡。无论是 Topic 的副本与分区设计、重试策略的精细调优,还是顺序消息的场景化实现,每一个配置细节都可能影响集群的整体性能。本文围绕 TDMQ CKafka 版的生产实践,详解如何通过合理的参数设置与策略选择,构建高可靠、低延迟的消息生产链路,避免数据倾斜、消息乱序等典型问题,为业务流量的平稳流转奠定基础。下一篇,我们将会为大家详细介绍 TDMQ CKafka 版的消费实践,敬请期待!

❌
❌