如何监控kafka消费Lag情况
前言
为什么会有这个需求?
kafka consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息Lag。目前是市面上也有相应的监控工具KafkaOffsetMonitor,我们自己也写了一套监控KafkaCenter。但是随着kafka版本的升级,消费方式也发生了很大的变化,因此,我们需要重构一下kafka offset监控。
正文
如何计算Lag
在计算Lag之前先普及几个基本常识
LEO(LogEndOffset): 这里说的和官网说的LEO有点区别,主要是指堆consumer可见的offset.即HW(High Watermark)
CURRENT-OFFSET: consumer消费到的具体位移
知道以上信息后,可知Lag=LEO-CURRENT-OFFSET。计算出来的值即为消费延迟情况。
官方查看方式
这里说的官方查看方式是在官网文档中提到的,使用官方包里提供的bin/kafka-consumer-groups.sh
最新版的工具只能获取到通过broker消费的情况
| 1 | $ bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.0.101:8092 --group test | 
程序查询方式
使用程序查询方式,有什么好处,可以实现自己的offset监控,可以无缝接入任何平台系统。
既然是用程序实现,那么做个更高级的需求,++根据topic获取不同消费组的消费情况++。
先定义两个实体
TopicConsumerGroupState
| 1 | public class TopicConsumerGroupState { | 
PartitionAssignmentState
| 1 | public class PartitionAssignmentState { | 
broker消费方式 offset 获取
实现思路
- 根据topic 获取消费该topic的group
- 通过使用KafkaAdminClient的describeConsumerGroups读取broker上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition,host等
- 通过consumer获取LogEndOffset(可见offset)
- 将2与3处信息合并,计算Lag
代码设计
引入最新版依赖,使用KafkaAdminClient获取1,2处信息
| 1 | <dependency> | 
- 步骤11 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18public Set<String> listConsumerGroups(String topic) 
 throws InterruptedException, ExecutionException, TimeoutException {
 final Set<String> filteredGroups = new HashSet<>();
 Set<String> allGroups = this.adminClient.listConsumerGroups().all().get(30, TimeUnit.SECONDS).stream()
 .map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
 allGroups.forEach(groupId -> {
 try {
 adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get().keySet().stream()
 .filter(tp -> tp.topic().equals(topic)).forEach(tp -> filteredGroups.add(groupId));
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (ExecutionException e) {
 e.printStackTrace();
 }
 });
 return filteredGroups;
 }
- 步骤2
使用describeConsumerGroup获取的消费情况,其中包含有members和无members情况。正常订阅消费是可以获取到members,但是通过制定patition消费拿不到members.
| 1 | /** | 
- 步骤31 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20public Map<TopicPartition, Long> consumerOffset (String gorupName,String topicName) { 
 Properties consumerProps = new Properties();
 consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1.101:9092");
 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, gorupName);
 consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 @SuppressWarnings("resource")
 KafkaConsumers<String, String> consumer = new KafkaConsumers<>(consumerProps);
 KafkaConsumer<String, String> kafkaConsumer = consumer.subscribe(topicName);
 List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
 List<TopicPartition>topicPatitions = new ArrayList<>();
 patitions.forEach(patition->{
 TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
 topicPatitions.add(topicPartition);
 });
 Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
 return result;
 }
- 步骤41 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29private List<TopicConsumerGroupState> getBrokerConsumerOffsets(String clusterID, String topic) { 
 List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
 try {
 topicConsumerGroupStates = this.describeConsumerGroups(topic);
 // 填充lag/logEndOffset
 topicConsumerGroupStates.forEach(topicConsumerGroupState -> {
 String groupId = topicConsumerGroupState.getGroupId();
 List<PartitionAssignmentState> partitionAssignmentStates = topicConsumerGroupState
 .getPartitionAssignmentStates();
 Map<TopicPartition, Long> offsetsMap = this.consumerOffset(clusterID, groupId, topic);
 for (Entry<TopicPartition, Long> entry : offsetsMap.entrySet()) {
 long logEndOffset = entry.getValue();
 for (PartitionAssignmentState partitionAssignmentState : partitionAssignmentStates) {
 if (partitionAssignmentState.getPartition() == entry.getKey().partition()) {
 partitionAssignmentState.setLogEndOffset(logEndOffset);
 partitionAssignmentState.setLag(getLag(partitionAssignmentState.getOffset(), logEndOffset));
 }
 }
 }
 });
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (ExecutionException e) {
 e.printStackTrace();
 } catch (TimeoutException e) {
 e.printStackTrace();
 }
 return topicConsumerGroupStates;
 }
zookeeper消费方式 offset 获取
实现思路
- 根据topic 获取消费该topic的group
- 读取zookeeper上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition。
- 通过consumer获取LogEndOffset(可见offset)
- 将2与3处信息合并,计算Lag
代码设计
引入两个依赖,主要是为了读取zookeeper节点上的数据
| 1 | <dependency> | 
- 步骤11 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13public Set<String> listTopicGroups(String topic) { 
 Set<String> groups = new HashSet<>();
 List<String> allGroups = zkClient.getChildren("/consumers");
 allGroups.forEach(group -> {
 if (zkClient.exists("/consumers/" + group + "/offsets")) {
 Set<String> offsets = new HashSet<>(zkClient.getChildren("/consumers/" + group + "/offsets"));
 if (offsets.contains(topic)) {
 groups.add(group);
 }
 }
 });
 return groups;
 }
- 步骤21 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43public Map<String, Map<String, String>> getZKConsumerOffsets(String groupId, String topic) { 
 Map<String, Map<String, String>> result = new HashMap<>();
 String offsetsPath = "/consumers/" + groupId + "/offsets/" + topic;
 if (zkClient.exists(offsetsPath)) {
 List<String> offsets = zkClient.getChildren(offsetsPath);
 offsets.forEach(patition -> {
 try {
 String offset = zkClient.readData(offsetsPath + "/" + patition, true);
 if (offset != null) {
 Map<String, String> map = new HashMap<>();
 map.put("offset", offset);
 result.put(patition, map);
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 }
 String ownersPath = "/consumers/" + groupId + "/owners/" + topic;
 if (zkClient.exists(ownersPath)) {
 List<String> owners = zkClient.getChildren(ownersPath);
 owners.forEach(patition -> {
 try {
 try {
 String owner = zkClient.readData(ownersPath + "/" + patition, true);
 if (owner != null) {
 Map<String, String> map = result.get(patition);
 map.put("owner", owner);
 result.put(patition, map);
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 }
 return result;
 }
- 步骤3
略
4. 步骤4
| 1 | private List<TopicConsumerGroupState> getZKConsumerOffsets(String topic) { |