Kafka生产者与消费者最佳实践

生产者最佳实践

使用

默认send方法是异步,kafka会进行消息的端到端批量压缩。

配置

1
2
3
4
5
6
7
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

异步

1
2
3
4
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();

当然异步发送推荐添加callback

1
2
3
4
5
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)),(recordMetadata,exception)->{
if(exception!=null){
exception.printStackTrace();
}
});

数据量大的业务场景推荐首选异步,做好异常情况处理逻辑就好。

同步

1
2
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic","1","1")).get();

同步适合业务规模不大,但对数据一致性要求高的场景。根据合适场景采取合适的方式。

核心参数

通常不确定需不需要修改默认参数的原则,就是不修改。除非你很确定修改的参数是解决什么问题的。

重试

参数 默认值 推荐值 是否推荐修改默认值 备注
retries 2147483647 重试次数。如果需要考虑重试,官方建议最好使用delivery.timeout.ms,来调整重试策略
retry.backoff.ms 100 1000 重试间隔
delivery.timeout.ms 2 min 大于或等于(request.timeout.ms+linger.ms)的总和 该配置限制发送的最长时间(待发送时间+ack时间),或者失败重试的最长时间。

ACKS

参数 备注 是否推荐
acks=0 无需broker响应,性能最佳,丢数据风险最好
acks=1 leader节点写成功即返回response。数据较为安全
acks=all/-1 该topic的主备broker节点都写成功才返回response,性能最差,安全性最高

一般建议选择acks=1,重要的服务可以设置acks=all

其他

提升发送性能

对于有提升发送消息性能的场景,请考虑调整如下参数:

参数 备注 默认值 推荐值
batch.size 发往每个分区(Partition)的消息缓存量,切记不是数量。当超过该值就会立刻发送。 16KB
linger.ms 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size的限制,立即把消息发往服务器。 0 100~1000

除了以上参数调整,可以增加吞吐量,还可以通过配置压缩算法compression.type,继续增加吞吐量。

如果为了降低时延,推荐如下设置:

1.设置linger.ms=0(使用默认值就好)

2.不要启⽤压缩 (compression.type使用默认值就好)

3.置acks=1

大消息场景

对于消息大小限制,服务器端有参数限制,客户端也有参数限制,同时还有buffer限制。如果有大消息发送场景,请考虑调整如下参数:

参数 备注 默认值
buffer.memory producer client buffer最大值,如果超过有可能OOM 32 MB
max.request.size 每一批次发送的最大请求 1M
compression.type 每一批消息压缩算法(none, gzip, snappy, lz4, or zstd) none

强顺序性

kafka消息顺序性体现在分区顺序上,也就是说分区有序。除了使用key保持分区,也可以指定分区。除此以外。顺序性要求较强的场景还需要考虑重试机制破坏顺序性。

参数 备注
max.in.flight.requests.per.connection 强顺序将该值设置为1

消费者最佳实践

使用

参数配置

1
2
3
4
5
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

自动提交offset

自动提交offset仅在poll()consumer.close()提交,如果这两处出现任何异常,将会导致提交offset失败。

1
2
3
4
5
6
7
8
9
props.setProperty("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 除指定订阅多个topic,还支持正则订阅多个topic
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

手动提交offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();//这里是同步提交offset,还有异步提交。对于没有拉取到消息的场景,请勿调用该方法。
// consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
buffer.clear();
}
}

以上代码会根据group coordination自动分配响应的partition,这么做的好处是当其中一个consumer挂掉,group coordination会自动触发reblance,进而处理灾备。

当然我们也可以根据场景手动分配consumer的partition,例如需要计算消费延迟,为了避免consumer group reblance,就可以选择手动分配partition。

1
2
3
4
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

核心参数

参数 默认值 描述
enable.auto.commit true
max.poll.records 500 在单个poll()调用中返回的最大记录数,帮助控制在轮询里需要处理的数据量。消费者先将消息拉取到本地缓存中,然后再通过poll()轮训获取。对于从服务器端拉取消息主要受max.partition.fetch.bytes 参数控制
auto.offset.reset latest 如果Kafka中没有初始偏移量,或者当前偏移量在服务器上不再存在。[latest, earliest, none]
session.timeout.ms 45 seconds 消费组session超时时间
heartbeat.interval.ms 3 seconds 心跳检测间隔时间。如果准备调整的话,必须小于session.timeout.ms值的 1/3
max.poll.interval.ms 5 minutes 使用消费者组管理时,调用poll()之间的最大延迟。poll()在此超时到期之前没有调用,则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。

经验

1.消费客户端频繁出现Rebalance

出现rebalance主要有如下几种情况:

针对以上情况,大致解决方案:

  1. 提高消费速度

  2. 减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。

  3. 升级kafka client版本,最好和服务器版本保持一致。

  4. 按需按优先级优化参数

    max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。

    max.poll.interval.ms: 适当增大,该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

    session.timeout.ms:适当增大,但必须在(broker config)group.min.session.timeout.msgroup.max.session.timeout.ms 之内。

2.拉取大消息

拉取大消息的核心是逐条拉取的。

  • max.poll.records:如果单条消息超过1 MB,建议设置为1。
  • fetch.max.bytes:设置比单条消息的大小略大一点。
  • max.partition.fetch.bytes:设置比单条消息的大小略大一点。

3.offset重置问题

以下两种情况,会发生消费位点重置:

  • 当服务端不存在曾经提交过的位点时(例如客户端第一次上线,或者消息被删除)。
  • 当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。

可以通过auto.offset.reset来配置重置策略,主要有三种策略:

  • latest:从最大位点开始消费。
  • earliest:从最小位点开始消费。
  • none:不做任何操作,即不重置。

4.提高消费速度

  • 增加consumer实例数,但要小于等于partition数量
  • 通过内存消息队列模式,线程池去解决。

5.消息重复和消费幂等

kafka消费模式保证至少消费一次,因此需要我们增加业务幂等性验证,常用做法是:

  • 发送消息时,传入key作为唯一流水号ID。
  • 消费消息时,判断key是否已经消费过(借助redis等存储消费过的key),如果已经被消费,则忽略,如果没消费过,则消费一次。

调优最佳实践汇总

除了在生产端,消费端,Broker端各自优化以外,还可以根据不同业务场景,多方面结合优化,进而取得最好的效果。

吞吐量优化

在考虑吞吐量优先的场景,放弃了时延性,无论是生产还是消费,原则是增大批量。

Producer:

• batch.size: 增加到 100000–200000 之间(default 16384)

• linger.ms: 增加到 10–100之间 (default 0)

• compression.type=lz4 (default none, i.e., no compression)

• acks=1 (default 1)

• buffer.memory:如果有很多分区,则增加该值 (default 33554432)

Consumer:

• fetch.min.bytes: 增加到 100000 (default 1)

低时延优化

低时延要求的是尽快获取(发送)数据,因此少用批量才是第一优先级。

Producer:

• linger.ms=0 (default 0)

• compression.type=none (default none, i.e., no compression)

• acks=1 (default 1)

Consumer:

• fetch.min.bytes=1 (default 1)

Streams:

• StreamsConfig.TOPOLOGY_OPTIMIZATION: StreamsConfig.OPTIMIZE (default StreamsConfig.NO_OPTIMIZATION)

• Streams applications have embedded producers and consumers, so also check those configuration recommendations

可用性优化

Consumer:

• session.timeout.ms: 可适当增加该值(default 10000)

Streams:

• StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG: 1 or more (default 0)

• Streams applications have embedded producers and consumers, so also check those configuration recommendations

可靠性优化

消息的可靠性主要是关注消息的安全性,这样势必牺牲了一些性能,以数据安全性为第一优先级。

Producer:

• replication.factor=3

• acks=all (default 1)

• enable.idempotence=true (default false), 防止重复消息和乱序消息

• max.in.flight.requests.per.connection=1 (default 5), 当不使用幂等生产者时,防止乱序消息

Consumer:

• enable.auto.commit=false (default true)

• isolation.level=read_committed (when using EOS transactions)

Streams:

• StreamsConfig.REPLICATION_FACTOR_CONFIG: 3 (default 1)

• StreamsConfig.PROCESSING_GUARANTEE_CONFIG: StreamsConfig.EXACTLY_ONCE (default StreamsConfig.AT_LEAST_ONCE)

• Streams applications have embedded producers and consumers, so also check those configuration recommendations

参考

  1. 发布者最佳实践

  2. Producer Configs

  3. 订阅者最佳实践

  4. KafkaConsumer.html