之前写过一篇如何监控 kafka 消费 Lag 情况,五年前写的,在 google 上访问量很大,最近正好需要再写这个功能,就查看了最新 API,发现从2.5.0版本后新增了listOffsets方法,让计算 Lag 更简单方便和安全,代码量有质的下降,因为舍弃一些功能,代码精简的了很多。
实践
这里我用最新版做演示,在 pom 文件中增加依赖
| 12
 3
 4
 5
 
 | <dependency><groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.7.1</version>
 </dependency>
 
 | 
首先初始化 AdminClient
| 12
 3
 
 | Properties config = new Properties();config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
 AdminClient adminClient = AdminClient.create(config);
 
 | 
然后根据 topic 和 groupId 计算 Lag,这种方案要比之前方式优雅了很多。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | public long getConsumerLag(String topicName,String consumerGroupId) {try {
 // 获取主题描述
 TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topicName))
 .topicNameValues()
 .get(topicName).get();
 List<TopicPartition> partitions = topicDescription.partitions()
 .stream()
 .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
 .collect(Collectors.toList());
 
 // 获取每个分区的最新偏移量
 Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>(partitions.size());
 for (TopicPartition partition : partitions) {
 requestLatestOffsets.put(partition, OffsetSpec.latest());
 }
 
 ListOffsetsResult listOffsetsResult = adminClient.listOffsets(requestLatestOffsets);
 Map<TopicPartition, Long> latestOffsets = new HashMap<>(partitions.size());
 for (TopicPartition partition : partitions) {
 latestOffsets.put(partition, listOffsetsResult.partitionResult(partition).get().offset());
 }
 
 // 获取消费者组的当前偏移量
 ListConsumerGroupOffsetsSpec listConsumerGroupOffsetsSpe = new ListConsumerGroupOffsetsSpec();
 listConsumerGroupOffsetsSpe.topicPartitions(partitions);
 Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>(){
 {
 put(consumerGroupId, listConsumerGroupOffsetsSpe);
 }
 };
 Map<TopicPartition, OffsetAndMetadata> currentOffsets = adminClient.listConsumerGroupOffsets(groupSpecs)
 .partitionsToOffsetAndMetadata().get();
 
 long lag = 0;
 for (TopicPartition partition : partitions) {
 OffsetAndMetadata offsetMetadata = currentOffsets.get(partition);
 long currentOffset = offsetMetadata != null ? offsetMetadata.offset() : 0;
 lag += latestOffsets.get(partition) - currentOffset;
 }
 return lag;
 } catch (InterruptedException | ExecutionException e) {
 return -1;
 }
 }
 
 | 
注意
API 对 kafka 的兼容性,我在 kakfa 服务器版本2.6.0测试通过,更低版本,建议自测!
该方法对消息过期,计算 Lag 存在一定错误,请注意!!!
对于如何监控 kafka 消费 Lag 情况原文中 consumerOffset 存在一处错误,为避免 reblance,不要subscribe,建议更改为以下
| 12
 3
 4
 5
 6
 7
 8
 
 | KafkaConsumers<String, String>  kafkaConsumer  = new KafkaConsumers<>(consumerProps);List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
 List<TopicPartition>topicPatitions = new ArrayList<>();
 patitions.forEach(patition->{
 TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
 topicPatitions.add(topicPartition);
 });
 Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
 
 |