如何监控kafka消费Lag情况
前言
为什么会有这个需求?
kafka consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息Lag。目前是市面上也有相应的监控工具KafkaOffsetMonitor,我们自己也写了一套监控KafkaCenter。但是随着kafka版本的升级,消费方式也发生了很大的变化,因此,我们需要重构一下kafka offset监控。
正文
如何计算Lag
在计算Lag之前先普及几个基本常识
LEO(LogEndOffset): 这里说的和官网说的LEO有点区别,主要是指堆consumer可见的offset.即HW(High Watermark)
CURRENT-OFFSET: consumer消费到的具体位移
知道以上信息后,可知Lag=LEO-CURRENT-OFFSET
。计算出来的值即为消费延迟情况。
官方查看方式
这里说的官方查看方式是在官网文档中提到的,使用官方包里提供的bin/kafka-consumer-groups.sh
最新版的工具只能获取到通过broker消费的情况
1 | $ bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.0.101:8092 --group test |
程序查询方式
使用程序查询方式,有什么好处,可以实现自己的offset监控,可以无缝接入任何平台系统。
既然是用程序实现,那么做个更高级的需求,++根据topic获取不同消费组的消费情况++。
先定义两个实体
TopicConsumerGroupState
1 | public class TopicConsumerGroupState { |
PartitionAssignmentState
1 | public class PartitionAssignmentState { |
broker消费方式 offset 获取
实现思路
- 根据topic 获取消费该topic的group
- 通过使用KafkaAdminClient的describeConsumerGroups读取broker上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition,host等
- 通过consumer获取LogEndOffset(可见offset)
- 将2与3处信息合并,计算Lag
代码设计
引入最新版依赖,使用KafkaAdminClient获取1,2处信息
1 | <dependency> |
- 步骤1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public Set<String> listConsumerGroups(String topic)
throws InterruptedException, ExecutionException, TimeoutException {
final Set<String> filteredGroups = new HashSet<>();
Set<String> allGroups = this.adminClient.listConsumerGroups().all().get(30, TimeUnit.SECONDS).stream()
.map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
allGroups.forEach(groupId -> {
try {
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get().keySet().stream()
.filter(tp -> tp.topic().equals(topic)).forEach(tp -> filteredGroups.add(groupId));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
return filteredGroups;
} - 步骤2
使用describeConsumerGroup获取的消费情况,其中包含有members和无members情况。正常订阅消费是可以获取到members,但是通过制定patition消费拿不到members.
1 | /** |
- 步骤3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public Map<TopicPartition, Long> consumerOffset (String gorupName,String topicName) {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1.101:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, gorupName);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@SuppressWarnings("resource")
KafkaConsumers<String, String> consumer = new KafkaConsumers<>(consumerProps);
KafkaConsumer<String, String> kafkaConsumer = consumer.subscribe(topicName);
List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
List<TopicPartition>topicPatitions = new ArrayList<>();
patitions.forEach(patition->{
TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
topicPatitions.add(topicPartition);
});
Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
return result;
} - 步骤4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private List<TopicConsumerGroupState> getBrokerConsumerOffsets(String clusterID, String topic) {
List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
try {
topicConsumerGroupStates = this.describeConsumerGroups(topic);
// 填充lag/logEndOffset
topicConsumerGroupStates.forEach(topicConsumerGroupState -> {
String groupId = topicConsumerGroupState.getGroupId();
List<PartitionAssignmentState> partitionAssignmentStates = topicConsumerGroupState
.getPartitionAssignmentStates();
Map<TopicPartition, Long> offsetsMap = this.consumerOffset(clusterID, groupId, topic);
for (Entry<TopicPartition, Long> entry : offsetsMap.entrySet()) {
long logEndOffset = entry.getValue();
for (PartitionAssignmentState partitionAssignmentState : partitionAssignmentStates) {
if (partitionAssignmentState.getPartition() == entry.getKey().partition()) {
partitionAssignmentState.setLogEndOffset(logEndOffset);
partitionAssignmentState.setLag(getLag(partitionAssignmentState.getOffset(), logEndOffset));
}
}
}
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return topicConsumerGroupStates;
}
zookeeper消费方式 offset 获取
实现思路
- 根据topic 获取消费该topic的group
- 读取zookeeper上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition。
- 通过consumer获取LogEndOffset(可见offset)
- 将2与3处信息合并,计算Lag
代码设计
引入两个依赖,主要是为了读取zookeeper节点上的数据
1 | <dependency> |
- 步骤1
1
2
3
4
5
6
7
8
9
10
11
12
13public Set<String> listTopicGroups(String topic) {
Set<String> groups = new HashSet<>();
List<String> allGroups = zkClient.getChildren("/consumers");
allGroups.forEach(group -> {
if (zkClient.exists("/consumers/" + group + "/offsets")) {
Set<String> offsets = new HashSet<>(zkClient.getChildren("/consumers/" + group + "/offsets"));
if (offsets.contains(topic)) {
groups.add(group);
}
}
});
return groups;
} - 步骤2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public Map<String, Map<String, String>> getZKConsumerOffsets(String groupId, String topic) {
Map<String, Map<String, String>> result = new HashMap<>();
String offsetsPath = "/consumers/" + groupId + "/offsets/" + topic;
if (zkClient.exists(offsetsPath)) {
List<String> offsets = zkClient.getChildren(offsetsPath);
offsets.forEach(patition -> {
try {
String offset = zkClient.readData(offsetsPath + "/" + patition, true);
if (offset != null) {
Map<String, String> map = new HashMap<>();
map.put("offset", offset);
result.put(patition, map);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
String ownersPath = "/consumers/" + groupId + "/owners/" + topic;
if (zkClient.exists(ownersPath)) {
List<String> owners = zkClient.getChildren(ownersPath);
owners.forEach(patition -> {
try {
try {
String owner = zkClient.readData(ownersPath + "/" + patition, true);
if (owner != null) {
Map<String, String> map = result.get(patition);
map.put("owner", owner);
result.put(patition, map);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
return result;
} - 步骤3
略
4. 步骤4
1 | private List<TopicConsumerGroupState> getZKConsumerOffsets(String topic) { |