计算kafka消费组Lag最佳实践最新版

之前写过一篇如何监控 kafka 消费 Lag 情况,五年前写的,在 google 上访问量很大,最近正好需要再写这个功能,就查看了最新 API,发现从2.5.0版本后新增了listOffsets方法,让计算 Lag 更简单方便和安全,代码量有质的下降,因为舍弃一些功能,代码精简的了很多。

实践

这里我用最新版做演示,在 pom 文件中增加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.1</version>
</dependency>

首先初始化 AdminClient

1
2
3
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
AdminClient adminClient = AdminClient.create(config);

然后根据 topic 和 groupId 计算 Lag,这种方案要比之前方式优雅了很多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public long getConsumerLag(String topicName,String consumerGroupId) {
try {
// 获取主题描述
TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topicName))
.topicNameValues()
.get(topicName).get();
List<TopicPartition> partitions = topicDescription.partitions()
.stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toList());

// 获取每个分区的最新偏移量
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>(partitions.size());
for (TopicPartition partition : partitions) {
requestLatestOffsets.put(partition, OffsetSpec.latest());
}

ListOffsetsResult listOffsetsResult = adminClient.listOffsets(requestLatestOffsets);
Map<TopicPartition, Long> latestOffsets = new HashMap<>(partitions.size());
for (TopicPartition partition : partitions) {
latestOffsets.put(partition, listOffsetsResult.partitionResult(partition).get().offset());
}

// 获取消费者组的当前偏移量
ListConsumerGroupOffsetsSpec listConsumerGroupOffsetsSpe = new ListConsumerGroupOffsetsSpec();
listConsumerGroupOffsetsSpe.topicPartitions(partitions);
Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>(){
{
put(consumerGroupId, listConsumerGroupOffsetsSpe);
}
};
Map<TopicPartition, OffsetAndMetadata> currentOffsets = adminClient.listConsumerGroupOffsets(groupSpecs)
.partitionsToOffsetAndMetadata().get();

long lag = 0;
for (TopicPartition partition : partitions) {
OffsetAndMetadata offsetMetadata = currentOffsets.get(partition);
long currentOffset = offsetMetadata != null ? offsetMetadata.offset() : 0;
lag += latestOffsets.get(partition) - currentOffset;
}
return lag;
} catch (InterruptedException | ExecutionException e) {
return -1;
}
}

注意

API 对 kafka 的兼容性,我在 kakfa 服务器版本2.6.0测试通过,更低版本,建议自测!

该方法对消息过期,计算 Lag 存在一定错误,请注意!!!

对于如何监控 kafka 消费 Lag 情况原文中 consumerOffset 存在一处错误,为避免 reblance,不要subscribe,建议更改为以下

1
2
3
4
5
6
7
8
KafkaConsumers<String, String>  kafkaConsumer  = new KafkaConsumers<>(consumerProps);
List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
List<TopicPartition>topicPatitions = new ArrayList<>();
patitions.forEach(patition->{
TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
topicPatitions.add(topicPartition);
});
Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);