之前写过一篇如何监控 kafka 消费 Lag 情况,五年前写的,在 google 上访问量很大,最近正好需要再写这个功能,就查看了最新 API,发现从2.5.0版本后新增了listOffsets方法,让计算 Lag 更简单方便和安全,代码量有质的下降,因为舍弃一些功能,代码精简的了很多。
实践
这里我用最新版做演示,在 pom 文件中增加依赖
1 2 3 4 5
   | <dependency> 	<groupId>org.apache.kafka</groupId> 	<artifactId>kafka-clients</artifactId> 	<version>3.7.1</version> </dependency>
   | 
 
首先初始化 AdminClient
1 2 3
   | Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); AdminClient adminClient = AdminClient.create(config);
   | 
 
然后根据 topic 和 groupId 计算 Lag,这种方案要比之前方式优雅了很多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
   | public long getConsumerLag(String topicName,String consumerGroupId) {     try {         // 获取主题描述         TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topicName))                 .topicNameValues()                 .get(topicName).get();         List<TopicPartition> partitions = topicDescription.partitions()                 .stream()                 .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))                 .collect(Collectors.toList());
          // 获取每个分区的最新偏移量         Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>(partitions.size());         for (TopicPartition partition : partitions) {             requestLatestOffsets.put(partition, OffsetSpec.latest());         }
          ListOffsetsResult listOffsetsResult = adminClient.listOffsets(requestLatestOffsets);         Map<TopicPartition, Long> latestOffsets = new HashMap<>(partitions.size());         for (TopicPartition partition : partitions) {             latestOffsets.put(partition, listOffsetsResult.partitionResult(partition).get().offset());         }
          // 获取消费者组的当前偏移量         ListConsumerGroupOffsetsSpec listConsumerGroupOffsetsSpe = new ListConsumerGroupOffsetsSpec();         listConsumerGroupOffsetsSpe.topicPartitions(partitions);         Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>(){             {                 put(consumerGroupId, listConsumerGroupOffsetsSpe);             }         };         Map<TopicPartition, OffsetAndMetadata> currentOffsets = adminClient.listConsumerGroupOffsets(groupSpecs)                 .partitionsToOffsetAndMetadata().get();
          long lag = 0;         for (TopicPartition partition : partitions) {             OffsetAndMetadata offsetMetadata = currentOffsets.get(partition);             long currentOffset = offsetMetadata != null ? offsetMetadata.offset() : 0;             lag += latestOffsets.get(partition) - currentOffset;         }         return lag;     } catch (InterruptedException | ExecutionException e) {         return -1;     } }
  | 
 
注意
API 对 kafka 的兼容性,我在 kakfa 服务器版本2.6.0测试通过,更低版本,建议自测!
该方法对消息过期,计算 Lag 存在一定错误,请注意!!!
对于如何监控 kafka 消费 Lag 情况原文中 consumerOffset 存在一处错误,为避免 reblance,不要subscribe,建议更改为以下
1 2 3 4 5 6 7 8
   | KafkaConsumers<String, String>  kafkaConsumer  = new KafkaConsumers<>(consumerProps); 	List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName); 	List<TopicPartition>topicPatitions = new ArrayList<>(); 	patitions.forEach(patition->{ 		TopicPartition topicPartition = new TopicPartition(topicName,patition.partition()); 		topicPatitions.add(topicPartition); 	}); 	Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
   |