Kafka生产者与消费者最佳实践
生产者最佳实践
使用
默认send方法是异步,kafka会进行消息的端到端批量压缩。
配置
1 | Properties props = new Properties(); |
异步
1 | KafkaProducer<String, String> producer = new KafkaProducer<>(props); |
当然异步发送推荐添加callback
1 | producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)),(recordMetadata,exception)->{ |
数据量大的业务场景推荐首选异步,做好异常情况处理逻辑就好。
同步
1 | KafkaProducer<String, String> producer = new KafkaProducer<>(props); |
同步适合业务规模不大,但对数据一致性要求高的场景。根据合适场景采取合适的方式。
核心参数
通常不确定需不需要修改默认参数的原则,就是不修改。除非你很确定修改的参数是解决什么问题的。
重试
参数 | 默认值 | 推荐值 | 是否推荐修改默认值 | 备注 |
---|---|---|---|---|
retries |
2147483647 | 否 | 重试次数。如果需要考虑重试,官方建议最好使用delivery.timeout.ms ,来调整重试策略 |
|
retry.backoff.ms |
100 | 1000 | 重试间隔 | |
delivery.timeout.ms | 2 min | 大于或等于(request.timeout.ms +linger.ms )的总和 |
该配置限制发送的最长时间(待发送时间+ack时间),或者失败重试的最长时间。 |
ACKS
参数 | 备注 | 是否推荐 |
---|---|---|
acks=0 | 无需broker响应,性能最佳,丢数据风险最好 | |
acks=1 | leader节点写成功即返回response。数据较为安全 | 是 |
acks=all/-1 | 该topic的主备broker节点都写成功才返回response,性能最差,安全性最高 |
一般建议选择acks=1
,重要的服务可以设置acks=all
。
其他
提升发送性能
对于有提升发送消息性能的场景,请考虑调整如下参数:
参数 | 备注 | 默认值 | 推荐值 |
---|---|---|---|
batch.size | 发往每个分区(Partition)的消息缓存量,切记不是数量。当超过该值就会立刻发送。 | 16KB | |
linger.ms | 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size 的限制,立即把消息发往服务器。 |
0 | 100~1000 |
除了以上参数调整,可以增加吞吐量,还可以通过配置压缩算法compression.type,继续增加吞吐量。
如果为了降低时延,推荐如下设置:
1.设置linger.ms=0(使用默认值就好)
2.不要启⽤压缩 (compression.type使用默认值就好)
3.置acks=1
大消息场景
对于消息大小限制,服务器端有参数限制,客户端也有参数限制,同时还有buffer限制。如果有大消息发送场景,请考虑调整如下参数:
参数 | 备注 | 默认值 |
---|---|---|
buffer.memory | producer client buffer最大值,如果超过有可能OOM | 32 MB |
max.request.size | 每一批次发送的最大请求 | 1M |
compression.type | 每一批消息压缩算法(none, gzip, snappy, lz4, or zstd) |
none |
强顺序性
kafka消息顺序性体现在分区顺序上,也就是说分区有序。除了使用key保持分区,也可以指定分区。除此以外。顺序性要求较强的场景还需要考虑重试机制破坏顺序性。
参数 | 备注 | |
---|---|---|
max.in.flight.requests.per.connection |
强顺序将该值设置为1 |
|
消费者最佳实践
使用
参数配置
1 | Properties props = new Properties(); |
自动提交offset
自动提交offset仅在poll()
和consumer.close()
提交,如果这两处出现任何异常,将会导致提交offset失败。
1 | props.setProperty("enable.auto.commit", "true"); |
手动提交offset
1 | props.setProperty("enable.auto.commit", "false"); |
以上代码会根据group coordination
自动分配响应的partition,这么做的好处是当其中一个consumer挂掉,group coordination
会自动触发reblance,进而处理灾备。
当然我们也可以根据场景手动分配consumer的partition,例如需要计算消费延迟,为了避免consumer group reblance,就可以选择手动分配partition。
1 | String topic = "foo"; |
核心参数
参数 | 默认值 | 描述 |
---|---|---|
enable.auto.commit | true | |
max.poll.records | 500 | 在单个poll() 调用中返回的最大记录数,帮助控制在轮询里需要处理的数据量。消费者先将消息拉取到本地缓存中,然后再通过poll()轮训获取。对于从服务器端拉取消息主要受max.partition.fetch.bytes 参数控制 |
auto.offset.reset | latest | 如果Kafka中没有初始偏移量,或者当前偏移量在服务器上不再存在。[latest, earliest, none] |
session.timeout.ms | 45 seconds | 消费组session超时时间 |
heartbeat.interval.ms | 3 seconds | 心跳检测间隔时间。如果准备调整的话,必须小于session.timeout.ms值的 1/3 |
max.poll.interval.ms | 5 minutes | 使用消费者组管理时,调用poll()之间的最大延迟。poll()在此超时到期之前没有调用,则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。 |
经验
1.消费客户端频繁出现Rebalance
出现rebalance主要有如下几种情况:
针对以上情况,大致解决方案:
提高消费速度
减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。
升级kafka client版本,最好和服务器版本保持一致。
按需按优先级优化参数
max.poll.records:降低该参数值,建议远远小于
<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>
的积。max.poll.interval.ms: 适当增大,该值要大于
<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)
的值。session.timeout.ms:适当增大,但必须在(broker config)group.min.session.timeout.ms
与
group.max.session.timeout.ms 之内。
2.拉取大消息
拉取大消息的核心是逐条拉取的。
- max.poll.records:如果单条消息超过1 MB,建议设置为1。
- fetch.max.bytes:设置比单条消息的大小略大一点。
- max.partition.fetch.bytes:设置比单条消息的大小略大一点。
3.offset重置问题
以下两种情况,会发生消费位点重置:
- 当服务端不存在曾经提交过的位点时(例如客户端第一次上线,或者消息被删除)。
- 当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。
可以通过auto.offset.reset来配置重置策略,主要有三种策略:
- latest:从最大位点开始消费。
- earliest:从最小位点开始消费。
- none:不做任何操作,即不重置。
4.提高消费速度
- 增加consumer实例数,但要小于等于partition数量
- 通过内存消息队列模式,线程池去解决。
5.消息重复和消费幂等
kafka消费模式保证至少消费一次,因此需要我们增加业务幂等性验证,常用做法是:
- 发送消息时,传入key作为唯一流水号ID。
- 消费消息时,判断key是否已经消费过(借助redis等存储消费过的key),如果已经被消费,则忽略,如果没消费过,则消费一次。
调优最佳实践汇总
除了在生产端,消费端,Broker端各自优化以外,还可以根据不同业务场景,多方面结合优化,进而取得最好的效果。
吞吐量优化
在考虑吞吐量优先的场景,放弃了时延性,无论是生产还是消费,原则是增大批量。
Producer:
• batch.size: 增加到 100000–200000 之间(default 16384)
• linger.ms: 增加到 10–100之间 (default 0)
• compression.type=lz4 (default none, i.e., no compression)
• acks=1 (default 1)
• buffer.memory:如果有很多分区,则增加该值 (default 33554432)
Consumer:
• fetch.min.bytes: 增加到 100000 (default 1)
低时延优化
低时延要求的是尽快获取(发送)数据,因此少用批量才是第一优先级。
Producer:
• linger.ms=0 (default 0)
• compression.type=none (default none, i.e., no compression)
• acks=1 (default 1)
Consumer:
• fetch.min.bytes=1 (default 1)
Streams:
• StreamsConfig.TOPOLOGY_OPTIMIZATION: StreamsConfig.OPTIMIZE (default StreamsConfig.NO_OPTIMIZATION)
• Streams applications have embedded producers and consumers, so also check those configuration recommendations
可用性优化
Consumer:
• session.timeout.ms: 可适当增加该值(default 10000)
Streams:
• StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG: 1 or more (default 0)
• Streams applications have embedded producers and consumers, so also check those configuration recommendations
可靠性优化
消息的可靠性主要是关注消息的安全性,这样势必牺牲了一些性能,以数据安全性为第一优先级。
Producer:
• replication.factor=3
• acks=all (default 1)
• enable.idempotence=true (default false), 防止重复消息和乱序消息
• max.in.flight.requests.per.connection=1 (default 5), 当不使用幂等生产者时,防止乱序消息
Consumer:
• enable.auto.commit=false (default true)
• isolation.level=read_committed (when using EOS transactions)
Streams:
• StreamsConfig.REPLICATION_FACTOR_CONFIG: 3 (default 1)
• StreamsConfig.PROCESSING_GUARANTEE_CONFIG: StreamsConfig.EXACTLY_ONCE (default StreamsConfig.AT_LEAST_ONCE)
• Streams applications have embedded producers and consumers, so also check those configuration recommendations