之前写过一篇如何监控 kafka 消费 Lag 情况,五年前写的,在 google 上访问量很大,最近正好需要再写这个功能,就查看了最新 API,发现从2.5.0
版本后新增了listOffsets
方法,让计算 Lag 更简单方便和安全,代码量有质的下降,因为舍弃一些功能,代码精简的了很多。
实践
这里我用最新版做演示,在 pom 文件中增加依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.1</version> </dependency>
|
首先初始化 AdminClient
1 2 3
| Properties config = new Properties(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); AdminClient adminClient = AdminClient.create(config);
|
然后根据 topic 和 groupId 计算 Lag,这种方案要比之前方式优雅了很多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public long getConsumerLag(String topicName,String consumerGroupId) { try { // 获取主题描述 TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topicName)) .topicNameValues() .get(topicName).get(); List<TopicPartition> partitions = topicDescription.partitions() .stream() .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())) .collect(Collectors.toList());
// 获取每个分区的最新偏移量 Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>(partitions.size()); for (TopicPartition partition : partitions) { requestLatestOffsets.put(partition, OffsetSpec.latest()); }
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(requestLatestOffsets); Map<TopicPartition, Long> latestOffsets = new HashMap<>(partitions.size()); for (TopicPartition partition : partitions) { latestOffsets.put(partition, listOffsetsResult.partitionResult(partition).get().offset()); }
// 获取消费者组的当前偏移量 ListConsumerGroupOffsetsSpec listConsumerGroupOffsetsSpe = new ListConsumerGroupOffsetsSpec(); listConsumerGroupOffsetsSpe.topicPartitions(partitions); Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>(){ { put(consumerGroupId, listConsumerGroupOffsetsSpe); } }; Map<TopicPartition, OffsetAndMetadata> currentOffsets = adminClient.listConsumerGroupOffsets(groupSpecs) .partitionsToOffsetAndMetadata().get();
long lag = 0; for (TopicPartition partition : partitions) { OffsetAndMetadata offsetMetadata = currentOffsets.get(partition); long currentOffset = offsetMetadata != null ? offsetMetadata.offset() : 0; lag += latestOffsets.get(partition) - currentOffset; } return lag; } catch (InterruptedException | ExecutionException e) { return -1; } }
|
注意
API 对 kafka 的兼容性,我在 kakfa 服务器版本2.6.0
测试通过,更低版本,建议自测!
该方法对消息过期,计算 Lag 存在一定错误,请注意!!!
对于如何监控 kafka 消费 Lag 情况原文中 consumerOffset 存在一处错误,为避免 reblance,不要subscribe
,建议更改为以下
1 2 3 4 5 6 7 8
| KafkaConsumers<String, String> kafkaConsumer = new KafkaConsumers<>(consumerProps); List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName); List<TopicPartition>topicPatitions = new ArrayList<>(); patitions.forEach(patition->{ TopicPartition topicPartition = new TopicPartition(topicName,patition.partition()); topicPatitions.add(topicPartition); }); Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
|