前言

零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。

零拷贝操作减少了在用户空间与内核空间之间切换模式的次数

举例来说,如果要读取一个文件并通过网络发送它,传统方式下如下图,传统的I/O操作进行了4次用户空间与内核空间的上下文切换,以及4次数据拷贝。其中4次数据拷贝中包括了2次DMA拷贝和2次CPU拷贝。通过零拷贝技术完成相同的操作,减少了在用户空间与内核空间交互,并且不需要CPU复制数据。

linux中零拷贝技术

Linux系统的“用户空间”和“内核空间”

从Linux系统上看,除了引导系统的BIN区,整个内存空间主要被分成两个部分:内核空间(Kernel space)、用户空间(User space)。

“用户空间”和“内核空间”的空间、操作权限以及作用都是不一样的。

内核空间是Linux自身使用的内存空间,主要提供给程序调度、内存分配、连接硬件资源等程序逻辑使用;用户空间则是提供给各个进程的主要空间。

用户空间不具有访问内核空间资源的权限,因此如果应用程序需要使用到内核空间的资源,则需要通过系统调用来完成:从用户空间切换到内核空间,然后在完成相关操作后再从内核空间切换回用户空间。

Linux 中零拷贝技术的实现方向

  • 直接 I/O:对于这种数据传输方式来说,应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输。这种方式依旧存在用户空间和内核空间的上下文切换,但是硬件上的数据不会拷贝一份到内核空间,而是直接拷贝至了用户空间,因此直接I/O不存在内核空间缓冲区和用户空间缓冲区之间的数据拷贝。

  • 在数据传输过程中,避免数据在用户空间缓冲区和系统内核空间缓冲区之间的CPU拷贝,以及数据在系统内核空间内的CPU拷贝。本文主要讨论的就是该方式下的零拷贝机制。

  • copy-on-write(写时复制技术):在某些情况下,Linux操作系统的内核空间缓冲区可能被多个应用程序所共享,操作系统有可能会将用户空间缓冲区地址映射到内核空间缓存区中。当应用程序需要对共享的数据进行修改的时候,才需要真正地拷贝数据到应用程序的用户空间缓冲区中,并且对自己用户空间的缓冲区的数据进行修改不会影响到其他共享数据的应用程序。所以,如果应用程序不需要对数据进行任何修改的话,就不会存在数据从系统内核空间缓冲区拷贝到用户空间缓冲区的操作。

mmap

1
2
buf = mmap(diskfd, len);
write(sockfd, buf, len);

应用程序调用mmap(),磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去。

使用mmap替代read很明显减少了一次拷贝,当拷贝数据量很大时,无疑提升了效率。但是使用mmap是有代价的。当你使用mmap时,你可能会遇到一些隐藏的陷阱。例如,当你的程序map了一个文件,但是当这个文件被另一个进程截断(truncate)时, write系统调用会因为访问非法地址而被SIGBUS信号终止。SIGBUS信号默认会杀死你的进程并产生一个coredump,如果你的服务器这样被中止了,那会产生一笔损失。

当然也有办法解决,本文忽略解决办法。

sendfile

1
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

sendfile实现的零拷贝I/O只使用了2次用户空间与内核空间的上下文切换,以及3次数据的拷贝。其中3次数据拷贝中包括了2次DMA拷贝和1次CPU拷贝。

splice

sendfile只适用于将数据从文件拷贝到套接字上,限定了它的使用范围。Linux在2.6.17版本引入splice系统调用,用于在两个文件描述符中移动数据:

1
2
3
#define _GNU_SOURCE         /* See feature_test_macros(7) */
#include <fcntl.h>
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);

java中如何使用零拷贝

DMA简介

io读写的方式:中断,DMA

DMA 直接内存存取,是允许外设组件将I/O数据直接传送到主存储器中并且传输不需要CPU的参与,以此将CPU解放出来去完成其他的事情。,相较于中断方式,减少cpu中断次数,不用cpu拷贝数据。

主要流程如下:

  1. 用户进程发起数据读取请求
  2. 系统调度为该进程分配cpu
  3. cpu向DMA发送io请求
  4. 用户进程等待io完成,让出cpu
  5. 系统调度cpu执行其他任务
  6. 数据写入至io控制器的缓冲寄存器
  7. DMA不断获取缓冲寄存器中的数据(需要cpu时钟)
  8. 传输至内存(需要cpu时钟)
  9. 所需的全部数据获取完毕后向cpu发出中断信号

更多参考

参考

  1. 浅析Linux中的零拷贝技术
  2. 零复制
  3. 零拷贝原理-数据的收发-软中断和DMA
  4. 浅谈 Linux下的零拷贝机制

Kafka producer源码分析

前言

在开始文章之前,需要解释是一下为什么要研究producer源码。

为什么要研究producer源码

通常producer使用都很简单,初始化一个KafkaProducer实例,然后调用send方法就好,但是我们有了解后面是如何发送到kafka集群的吗?其实我们不知道,其次,到底客户端有几个线程?我们不知道。还有producer还能做什么?我们同样不知道。本篇文章就是想回答一下上面提出的几个问题,能力有限,如有错误,欢迎指出!

架构

在介绍客户端架构之前,先回答一个问题

producer到底存在几个线程?2个 Main threadsender,其中sender线程负责发送消息,main 线程负责interceptor、序列化、分区等其他操作。

  • Producer首先使用用户主线程将待发送的消息封装进一个ProducerRecord类实例中。
  • 进行interceptor、序列化、分区等其他操作,发送到Producer程序中RecordAccumulator中。
  • Producer的另一个工作线程(即Sender线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次的内,统一发送给对应的broker中。

消息发送源码分析

sender发送的时机是由两个指标决定的,一个是时间linger.ms,一个是数据量大小 batch.size

sender线程主要代码

run->run(long now)->sendProducerData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
//result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// 暂且只关心result.nextReadyCheckDelayMs
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);

pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}

1
2
3
long pollTimeout = sendProducerData(now);
// poll最终会调用selector,pollTimeout也就是selector阻塞的时间
client.poll(pollTimeout, now);

selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Check for data, waiting up to the given timeout.
*
* @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
* @return The number of keys ready
*/
private int select(long timeoutMs) throws IOException {
if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");

if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
// 初始化为最大值
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 和linger.ms有关
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

我们可以从实例化一个新的KafkaProducer开始分析(还没有调用send方法),在sender线程调用accumulator#ready(..)时候,会返回result,其中包含selector可能要阻塞的时间。由于还没有调用send方法,所以Deque为空,所以result中包含的nextReadyCheckDelayMs也是最大值,这个时候selector会一直阻塞。

然后我们调用send方法,该方法会调用waitOnMetadata,waitOnMetadata会调用sender.wakeup(),所以会唤醒sender线程。

这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程又运行到accumulator#ready(..),当Deque有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有关的值。也就是时候selector会最多阻塞lingger.ms后就返回,然后再次轮询。(根据源码分析,第一条消息会创建batch,因此newBatchCreated为true,同样会触发唤醒sender)

如果有一个ProducerBatch满了,也会调用Sender#wakeup(..),

KafkaProducer#doSend(…)

1
2
3
4
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}

所以综上所述:只要满足linger.ms和batch.size满了就会激活sender线程来发送消息。

客户端顺序性保证

因为消息发送是批量发送的,那么就有可能存在上一批发送失败,接下来一批发送成功。即会出现数据乱序。

max.in.flight.requests.per.connection=1,限制客户端在单个连接上能够发送的未响应请求的个数。如果业务对顺序性有很高的要求,将该参数值设置为1。

总结

kafka producer大概流程如上,数据发送是批量的,最后是按Node发送响应的消息的。即其中Integer即为broker id。消息在RecordAccumulator中还是按TopicPartition存放的,但是在最终发送会作相应的转换。

sender发送的时机是由两个指标决定的,一个是时间linger.ms,一个是数据量大小 batch.size

了解producer发送流程,这样就可以让我们更改的生产消息,使用更多的特性,例如拦截器可以做各种特殊处理的场景。

参考

1. kafka producer的batch.size和linger.ms

前言

为了深入研究kafka运行原理和架构,很有必要搭建开发环境,这样我们就很方便的debug 服务。

前期准备

  1. Gradle 5.0+
  2. jdk8
  3. Scala 2.12
  4. idea scale plugin

配置/运行

  • 首先执行在源码目录下执行gradle
  • 然后build ./gradlew jar
  • 最后生成idea工程./gradlew idea

运行MirrorMaker

运行kafka server

因为kafka依赖zookeeper,在开始之前请先启动一个zookeeper服务,本文章略。

错误总结

无log

下载log4j
slf4j-log4j12

然后在modules中找到core/main,将这两个依赖导入进来即可

最后将config目录下的log4j.properties复制到core/main/scala目录下(如果还是无法看到log,可以将该文件复制到out/production/classes目录下)

执行报错,无法找到相关类

执行./gradlew jar即可解决。

Core源码模块解读

模块名程说明
adminkafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
api主要负责数据交互,客户端与服务端交互数据的编码与解码。
cluster这里包含多个实体类,有Broker,Cluster,Partition,Replica。其中一个Cluster由多个Broker组成,一个Broker包含多个Partition,一个Topic的所有Partition分布在不同的Broker中,一个Replica包含都个Partition。
common这是一个通用模块,其只包含各种异常类以及错误验证。
consumer消费者处理模块,负责所有的客户端消费者数据和逻辑处理。
controller此模块负责中央控制器的选举,分区的Leader选举,Replica的分配或其重新分配,分区和副本的扩容等。
coordinator负责管理部分consumer group和他们的offset。
log这是一个负责Kafka文件存储模块,负责读写所有的Kafka的Topic消息数据。
message封装多条数据组成一个数据集或者压缩数据集。
metrics负责内部状态的监控模块。
network该模块负责处理和接收客户端连接,处理网络时间模块。
security负责Kafka的安全验证和管理模块。
serializer序列化和反序列化当前消息内容。
server 该模块涉及的内容较多,有Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader的选举,Admin和Replica的管理,以及各种元数据的缓存等内容。
tools阅读该模块,就是一个工具模块,涉及的内容也比较多。有导出对应consumer的offset值;导出LogSegments信息,以及当前Topic的log写的Location信息;导出Zookeeper上的offset值等内容。
utils各种工具类,比如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类,Mx4jLoader监控加载器,ReplicationUtils复制集工具类,CommandLineUtils命令行工具类,以及公共日志类等内容。

参考

  1. Apache Kafka
  2. 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置

前言

写这篇文章的目的是为了记录一下学习笔记,其次为了能够在复习的时候快速掌握相关知识。本篇记录java8系列专栏之Lambda与Stream API

正文

Lambda

什么是Lambda

1
2
List<String>list = Arrays.asList("a","c","b");
Collections.sort(list, (o1,o2)->o1.compareTo(o2));

以下方式都是常用的lambda使用方式

1
2
3
4
str->str.toLowerCase()
(o1,o2)->o1.compareTo(o2)
(o1,o2)->{return o1.compareTo(o2)}
(String o1,String o2)->{return o1.compareTo(o2)}

怎么用,哪里用

函数接口声明既可使用。例如Runnable,Comparator都是函数接口。用@FunctionalInterface声明的都是函数接口

实现原理

首先需要明确说明的是lambda没有使用匿名内部类去实现。

Java 8设计人员决定使用在Java 7中添加的invokedynamic指令来推迟在运行时的翻译策略。当javac编译代码时,它捕获lambda表达式并生成invokedynamic调用站点(称为lambda工厂)。调用invokedynamic调用站点时,返回一个函数接口实例,lambda将被转换到这个函数接口

使用invokedynamic指令,运行时调用LambdaMetafactory.metafactory动态的生成内部类,实现了接口,内部类里的调用方法块并不是动态生成的,只是在原class里已经编译生成了一个静态的方法,内部类只需要调用该静态方法

参考1
参考2

内置函数接口

函数式接口 函数描述符 原始类型特化
Predicate<T> T->boolean IntPredicate,LongPredicate, DoublePredicate
Consumer<T> T->void IntConsumer,LongConsumer, DoubleConsumer
Function<T,R> T->R IntFunction<R>, IntToDoubleFunction,
IntToLongFunction, LongFunction<R>,
LongToDoubleFunction, LongToIntFunction,
DoubleFunction<R>, ToIntFunction<T>,
ToDoubleFunction<T>, ToLongFunction<T>
Supplier<T> ()->T BooleanSupplier,IntSupplier, LongSupplier, DoubleSupplier
UnaryOperator<T> T->T IntUnaryOperator, LongUnaryOperator, DoubleUnaryOperator
BinaryOperator<T> (T,T)->T IntBinaryOperator, LongBinaryOperator, DoubleBinaryOperator
BiPredicate<L,R> (L,R)->boolean
BiConsumer<T,U> (T,U)->void ObjIntConsumer<T>, ObjLongConsumer<T>, ObjDoubleConsumer<T>
BiFunction<T,U,R> (T,U)->R ToIntBiFunction<T,U>, ToLongBiFunction<T,U>, ToDoubleBiFunction<T,U>

Stream API

操作 类型 返回类型 使用的类型/函数式接口 函数描述符
filter 中间 Stream<T> Predicate<T> T -> boolean
distinct 中间 Stream<T>
skip 中间 Stream<T> long
map 中间 Stream<R> Function<T, R> T -> R
flatMap 中间 Stream<R> Function<T, Stream<R>> T -> Stream<R>
limit 中间 Stream<T> long
sorted 中间 Stream<T> Comparator<T> (T, T) -> int
anyMatch 终端 boolean Predicate<T> T -> boolean
noneMatch 终端 boolean Predicate<T> T -> boolean
allMatch 终端 boolean Predicate<T> T -> boolean
findAny 终端 Optional<T>
findFirst 终端 Optional<T>
forEach 终端 void Consumer<T> T -> void
collect 终端 R Collector<T, A, R>
reduce 终端 Optional<T> BinaryOperator<T> (T, T) -> T
count 终端 long

Collector 收集

Collectors 类的静态工厂方法

工厂方法 返回类型 用途 示例
toList List<T> 把流中所有项目收集到一个 List List<Project> projects = projectStream.collect(toList());
toSet Set<T> 把流中所有项目收集到一个 Set,删除重复项 Set<Project> projects = projectStream.collect(toSet());
toCollection Collection<T> 把流中所有项目收集到给定的供应源创建的集合 Collection<Project> projects = projectStream.collect(toCollection(), ArrayList::new);
counting Long 计算流中元素的个数 long howManyProjects = projectStream.collect(counting());
summingInt Integer 对流中项目的一个整数属性求和 int totalStars = projectStream.collect(summingInt(Project::getStars));
averagingInt Double 计算流中项目 Integer 属性的平均值 double avgStars = projectStream.collect(averagingInt(Project::getStars));
summarizingInt IntSummaryStatistics 收集关于流中项目 Integer 属性的统计值,例如最大、最小、 总和与平均值 IntSummaryStatistics projectStatistics = projectStream.collect(summarizingInt(Project::getStars));
joining String 连接对流中每个项目调用 toString 方法所生成的字符串 String shortProject = projectStream.map(Project::getName).collect(joining(", "));
maxBy Optional<T> 按照给定比较器选出的最大元素的 Optional, 或如果流为空则为 Optional.empty() Optional<Project> fattest = projectStream.collect(maxBy(comparingInt(Project::getStars)));
minBy Optional<T> 按照给定比较器选出的最小元素的 Optional, 或如果流为空则为 Optional.empty() Optional<Project> fattest = projectStream.collect(minBy(comparingInt(Project::getStars)));
reducing 归约操作产生的类型 从一个作为累加器的初始值开始,利用 BinaryOperator 与流中的元素逐个结合,从而将流归约为单个值 int totalStars = projectStream.collect(reducing(0, Project::getStars, Integer::sum));
collectingAndThen 转换函数返回的类型 包含另一个收集器,对其结果应用转换函数 int howManyProjects = projectStream.collect(collectingAndThen(toList(), List::size));
groupingBy Map<K, List<T>> 根据项目的一个属性的值对流中的项目作问组,并将属性值作 为结果 Map 的键 Map<String,List<Project>> projectByLanguage = projectStream.collect(groupingBy(Project::getLanguage));
partitioningBy Map<Boolean,List<T>> 根据对流中每个项目应用断言的结果来对项目进行分区 Map<Boolean,List<Project>> vegetarianDishes = projectStream.collect(partitioningBy(Project::isVegetarian));

参考

  1. Lambdas
  2. Lambda expressions
  3. Java 8 动态类型语言Lambda表达式实现原理分析
  4. Stream

Optional类的方法

方法 描述
empty 返回一个空的 Optional 实例
filter 如果值存在并且满足提供的断言, 就返回包含该值的 Optional 对象;否则返回一个空的 Optional 对象
map 如果值存在,就对该值执行提供的 mapping 函数调用
flatMap 如果值存在,就对该值执行提供的 mapping 函数调用,返回一个 Optional 类型的值,否则就返 回一个空的 Optional 对象
get 如果该值存在,将该值用 Optional 封装返回,否则抛出一个 NoSuchElementException 异常
ifPresent 如果值存在,就执行使用该值的方法调用,否则什么也不做
isPresent 如果值存在就返回 true,否则返回 false
of 将指定值用 Optional 封装之后返回,如果该值为 null,则抛出一个 NullPointerException 异常
ofNullable 将指定值用 Optional 封装之后返回,如果该值为 null,则返回一个空的 Optional 对象
orElse 如果有值则将其返回,否则返回一个默认值
orElseGet 如果有值则将其返回,否则返回一个由指定的 Supplier 接口生成的值
orElseThrow 如果有值则将其返回,否则抛出一个由指定的 Supplier 接口生成的异常

前言

写这篇文章的目的是为了记录一下学习笔记,其次为了能够在复习的时候快速掌握相关知识。本篇记录java8系列专栏之字符串

正文

StringJoiner

详解

拼接字符串

用法
1
2
3
4
5
6
//不指定前缀和后缀
StringJoiner stringJoiner = new StringJoiner(",");
//指定前缀和后缀
//StringJoiner stringJoiner = new StringJoiner(",","{","}");
List<String> list = Arrays.asList("a","b","c");
list.forEach(str->stringJoiner.add(str));

String.join

详解

拼接字符串,缺点是无法指定前缀和后缀

用法
1
2
List<String> list = Arrays.asList("a","b","c");
System.out.println(String.join(",", list));

参考

  1. Java8(2):JDK 对字符串连接的改进

前言

写这篇文章的目的是为了记录一下学习笔记,其次为了能够在复习的时候快速掌握相关知识。本篇记录java8系列专栏之Date Time API

正文

Clock

详解

可以取代System.currentTimeMillis(),时区敏感,带有时区信息

用法
1
2
3
4
5
Clock clock = Clock.systemDefaultZone();
long millis = clock.millis();

Instant instant = clock.instant();
Date legacyDate = Date.from(instant); // legacy java.util.Date

ZoneId

详解

新的时区类 java.time.ZoneId 是原有的 java.util.TimeZone 类的替代品。 ZoneId对象可以通过 ZoneId.of() 方法创建,也可以通过 ZoneId.systemDefault() 获取系统默认时区。

用法
1
2
3
4
5
System.out.println(ZoneId.getAvailableZoneIds());
// prints all available timezone ids

ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
// ZoneRules[currentStandardOffset=+08:00]

有了 ZoneId,我们就可以将一个 LocalDate、LocalTime 或 LocalDateTime 对象转化为 ZonedDateTime 对象

1
2
LocalDateTime localDateTime = LocalDateTime.now();
ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, shanghaiZoneId);

ZonedDateTime 对象由两部分构成,LocalDateTime 和 ZoneId,其中 2018-03-03T15:26:56.147 部分为 LocalDateTime,+08:00[Asia/Shanghai] 部分为ZoneId。

LocalTime

详解

LocalTime类是Java 8中日期时间功能里表示一整天中某个时间点的类,它的时间是无时区属性的(早上10点等等)

用法
1
2
3
4
5
6
7
8
9
10
LocalTime now1 = LocalTime.now(zone1);
LocalTime now2 = LocalTime.now(zone2);

System.out.println(now1.isBefore(now2)); // false

long hoursBetween = ChronoUnit.HOURS.between(now1, now2);
long minutesBetween = ChronoUnit.MINUTES.between(now1, now2);

System.out.println(hoursBetween); // -3
System.out.println(minutesBetween); // -239

LocalDate

详解

LocalDate类是Java 8中日期时间功能里表示一个本地日期的类,它的日期是无时区属性的。 可以用来表示生日、节假日期等等。这个类用于表示一个确切的日期,而不是这个日期所在的时间

用法
1
2
3
4
5
6
7
LocalDate today = LocalDate.now();
LocalDate tomorrow = today.plus(1, ChronoUnit.DAYS);
LocalDate yesterday = tomorrow.minusDays(2);

LocalDate independenceDay = LocalDate.of(2014, Month.JULY, 4);
DayOfWeek dayOfWeek = independenceDay.getDayOfWeek();
System.out.println(dayOfWeek); // FRIDAY

LocalDateTime

详解

LocalDateTime类是Java 8中日期时间功能里,用于表示当地的日期与时间的类,它的值是无时区属性的。你可以将其视为Java 8中LocalDate与LocalTime两个类的结合。

用法
1
2
3
4
5
6
7
8
9
10
LocalDateTime sylvester = LocalDateTime.of(2014, Month.DECEMBER, 31, 23, 59, 59);

DayOfWeek dayOfWeek = sylvester.getDayOfWeek();
System.out.println(dayOfWeek); // WEDNESDAY

Month month = sylvester.getMonth();
System.out.println(month); // DECEMBER

long minuteOfDay = sylvester.getLong(ChronoField.MINUTE_OF_DAY);
System.out.println(minuteOfDay); // 1439

ZonedDateTime

详解

ZonedDateTime类是Java 8中日期时间功能里,用于表示带时区的日期与时间信息的类。可以用于表示一个真实事件的开始时间,如某火箭升空时间等等。

用法
1
2
ZonedDateTime dateTime = ZonedDateTime.now();
ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now(), shanghaiZoneId);

Duration

详解

一个Duration对象表示两个Instant间的一段时间

用法
1
2
3
4
Instant first = Instant.now();
// wait some time while something happens
Instant second = Instant.now();
Duration duration = Duration.between(first, second);

DateTimeFormatter

详解

DateTimeFormatter类是Java 8中日期时间功能里,线程安全。用于解析和格式化日期时间的类
。类中包含如下预定义的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
BASIC_ISO_DATE

ISO_LOCAL_DATE
ISO_LOCAL_TIME
ISO_LOCAL_DATE_TIME

ISO_OFFSET_DATE
ISO_OFFSET_TIME
ISO_OFFSET_DATE_TIME

ISO_ZONED_DATE_TIME

ISO_INSTANT

ISO_DATE
ISO_TIME
ISO_DATE_TIME

ISO_ORDINAL_TIME
ISO_WEEK_DATE

RFC_1123_DATE_TIME
用法
1
2
3
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
//DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedDate = formatter.format(LocalDateTime.now());

参考

  1. java8-tutorial#date-api
  2. java8-datetime-api

[toc]

声明

本文问题参考朱小厮的博客。写这个只是为了检验自己最近的学习成果,因为是自己的理解,如果问题,欢迎指出。废话少说,上干货。

正文

1. Kafka的用途有哪些?使用场景如何?

总结下来就几个字:异步处理、日常系统解耦、削峰、提速、广播

如果再说具体一点例如:消息,网站活动追踪,监测指标,日志聚合,流处理,事件采集,提交日志等

2. Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么

ISR:In-Sync Replicas 副本同步队列

AR:Assigned Replicas 所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

3. Kafka中的HW、LEO、LSO、LW等分别代表什么?

HW:High Watermark 高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置上一条信息。

LEO:LogEndOffset 当前日志文件中下一条待写信息的offset

HW/LEO这两个都是指最后一条的下一条的位置而不是指最后一条的位置。

LSO:Last Stable Offset 对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同

LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

4. Kafka中是怎么体现消息顺序性的?

kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.

5. Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

拦截器->序列化器->分区器

6. Kafka生产者客户端的整体结构是什么样子的?

7. Kafka生产者客户端中使用了几个线程来处理?分别是什么?

2个,主线程和Sender线程。主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。Sender线程负责将RecordAccumulator中消息发送到kafka中.

9. Kafka的旧版Scala的消费者客户端的设计有什么缺陷?

10. “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果不正确,那么有没有什么hack的手段?

不正确,通过自定义分区分配策略,可以将一个consumer指定消费所有partition。

11. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

offset+1

12. 有哪些情形会造成重复消费?

消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)

13. 那些情景下会造成消息漏消费?

消费者没有处理完消息 提交offset(自动提交偏移 未处理情况下程序异常结束)

14. KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

1.在每个线程中新建一个KafkaConsumer

2.单线程创建KafkaConsumer,多个处理线程处理消息(难点在于是否要考虑消息顺序性,offset的提交方式)

15. 简述消费者与消费组之间的关系

消费者从属与消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。

16. 当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?

创建:在zk上/brokers/topics/下节点 kafkabroker会监听节点变化创建主题
删除:调用脚本删除topic会在zk上将topic设置待删除标志,kafka后台有定时的线程会扫描所有需要删除的topic进行删除

17. topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

可以

18. topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

不可以

19. 创建topic时如何选择合适的分区数?

根据集群的机器数量和需要的吞吐量来决定适合的分区数

20. Kafka目前有那些内部topic,它们都有什么特征?各自的作用又是什么?

__consumer_offsets 以下划线开头,保存消费组的偏移

21. 优先副本是什么?它有什么特殊的作用?

优先副本 会是默认的leader副本 发生leader变化时重选举会优先选择优先副本作为leader

22. Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理

创建主题时
如果不手动指定分配方式 有两种分配方式

消费组内分配

23. 简述Kafka的日志目录结构

每个partition一个文件夹,包含四类文件.index .log .timeindex leader-epoch-checkpoint
.index .log .timeindex 三个文件成对出现 前缀为上一个segment的最后一个消息的偏移 log文件中保存了所有的消息 index文件中保存了稀疏的相对偏移的索引 timeindex保存的则是时间索引
leader-epoch-checkpoint中保存了每一任leader开始写入消息时的offset 会定时更新
follower被选为leader时会根据这个确定哪些消息可用

24. Kafka中有那些索引文件?

如上

25. 如果我指定了一个offset,Kafka怎么查找到对应的消息?

1.通过文件名前缀数字x找到该绝对offset 对应消息所在文件

2.offset-x为在文件中的相对偏移

3.通过index文件中记录的索引找到最近的消息的位置

4.从最近位置开始逐条寻找

26. 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?

原理同上 但是时间的因为消息体中不带有时间戳 所以不精确

27. 聊一聊你对Kafka的Log Retention的理解

kafka留存策略包括 删除和压缩两种
删除: 根据时间和大小两个方式进行删除 大小是整个partition日志文件的大小
超过的会从老到新依次删除 时间指日志文件中的最大时间戳而非文件的最后修改时间
压缩: 相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续

28. 聊一聊你对Kafka的Log Compaction的理解

29. 聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)

30. 聊一聊Kafka的延时操作的原理

31. 聊一聊Kafka控制器的作用

32. 消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)

33. Kafka中的幂等是怎么实现的

pid+序号实现,单个producer内幂等

33. Kafka中的事务是怎么实现的(这题我去面试6家被问4次,照着答案念也要念十几分钟,面试官简直凑不要脸。实在记不住的话…只要简历上不写精通Kafka一般不会问到,我简历上写的是“熟悉Kafka,了解RabbitMQ….”)

34. Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

35. 失效副本是指什么?有那些应对措施?

36. 多副本下,各个副本中的HW和LEO的演变过程

37. 为什么Kafka不支持读写分离?

38. Kafka在可靠性方面做了哪些改进?(HW, LeaderEpoch)

39. Kafka中怎么实现死信队列和重试队列?

40. Kafka中的延迟队列怎么实现(这题被问的比事务那题还要多!!!听说你会Kafka,那你说说延迟队列怎么实现?)

41. Kafka中怎么做消息审计?

42. Kafka中怎么做消息轨迹?

43. Kafka中有那些配置参数比较有意思?聊一聊你的看法

44. Kafka中有那些命名比较有意思?聊一聊你的看法

45. Kafka有哪些指标需要着重关注?

生产者关注MessagesInPerSec、BytesOutPerSec、BytesInPerSec 消费者关注消费延迟Lag

46. 怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)

参考 如何监控kafka消费Lag情况

47. Kafka的那些设计让它有如此高的性能?

零拷贝,页缓存,顺序写

48. Kafka有什么优缺点?

49. 还用过什么同质类的其它产品,与Kafka相比有什么优缺点?

50. 为什么选择Kafka?

吞吐量高,大数据消息系统唯一选择。

51. 在使用Kafka的过程中遇到过什么困难?怎么解决的?

52. 怎么样才能确保Kafka极大程度上的可靠性?

53. 聊一聊你对Kafka生态的理解

confluent全家桶(connect/kafka stream/ksql/center/rest proxy等),开源监控管理工具kafka-manager,kmanager等

参考

  1. Kafka面试题全套整理 | 划重点要考!
  2. Kafka科普系列 | 什么是LSO?
  3. Kafka面试题

前言

为什么会有这个需求?

kafka consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息Lag。目前是市面上也有相应的监控工具KafkaOffsetMonitor,我们自己也写了一套监控KafkaCenter。但是随着kafka版本的升级,消费方式也发生了很大的变化,因此,我们需要重构一下kafka offset监控。

正文

如何计算Lag

在计算Lag之前先普及几个基本常识

LEO(LogEndOffset): 这里说的和官网说的LEO有点区别,主要是指堆consumer可见的offset.即HW(High Watermark)

CURRENT-OFFSET: consumer消费到的具体位移

知道以上信息后,可知Lag=LEO-CURRENT-OFFSET。计算出来的值即为消费延迟情况。

官方查看方式

这里说的官方查看方式是在官网文档中提到的,使用官方包里提供的bin/kafka-consumer-groups.sh

最新版的工具只能获取到通过broker消费的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.0.101:8092 --group test
Consumer group 'test' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
truman_test_offset 2 1325 2361 1036 - - -
truman_test_offset 6 1265 2289 1024 - - -
truman_test_offset 4 1245 2243 998 - - -
truman_test_offset 9 1310 2307 997 - - -
truman_test_offset 1 1259 2257 998 - - -
truman_test_offset 8 1410 2438 1028 - - -
truman_test_offset 3 1225 2167 942 - - -
truman_test_offset 0 1218 2192 974 - - -
truman_test_offset 5 1262 2252 990 - - -
truman_test_offset 7 1265 2277 1012 - - -

程序查询方式

使用程序查询方式,有什么好处,可以实现自己的offset监控,可以无缝接入任何平台系统。

既然是用程序实现,那么做个更高级的需求,++根据topic获取不同消费组的消费情况++。

先定义两个实体

TopicConsumerGroupState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TopicConsumerGroupState {
private String groupId;
/**
* 消费方式:zk/broker 主要指的offset提交到哪里 新版本 broker 旧版本zk
*/
private String consumerMethod;
private List<PartitionAssignmentState> partitionAssignmentStates;
/**
* Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response:
* UNKNOWN_MEMBER_ID Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
* PreparingRebalance:组准备开启新的rebalance,等待成员加入 AwaitingSync:正在等待leader
* consumer将分配方案传给各个成员 Stable:rebalance完成!可以开始消费了~
*/
private ConsumerGroupState consumerGroupState;

//省略set和get方法..

}

PartitionAssignmentState

1
2
3
4
5
6
7
8
9
10
11
12
13
public class PartitionAssignmentState {
private String group;
private String topic;
private int partition;
private long offset;
private long lag;
private String consumerId;
private String host;
private String clientId;
private long logEndOffset;
//省略set和get方法..
}

broker消费方式 offset 获取

实现思路

  1. 根据topic 获取消费该topic的group
  2. 通过使用KafkaAdminClient的describeConsumerGroups读取broker上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition,host等
  3. 通过consumer获取LogEndOffset(可见offset)
  4. 将2与3处信息合并,计算Lag

代码设计

引入最新版依赖,使用KafkaAdminClient获取1,2处信息

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
  1. 步骤1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public Set<String> listConsumerGroups(String topic)
    throws InterruptedException, ExecutionException, TimeoutException {
    final Set<String> filteredGroups = new HashSet<>();

    Set<String> allGroups = this.adminClient.listConsumerGroups().all().get(30, TimeUnit.SECONDS).stream()
    .map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
    allGroups.forEach(groupId -> {
    try {
    adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get().keySet().stream()
    .filter(tp -> tp.topic().equals(topic)).forEach(tp -> filteredGroups.add(groupId));
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    });
    return filteredGroups;
    }
  2. 步骤2

使用describeConsumerGroup获取的消费情况,其中包含有members和无members情况。正常订阅消费是可以获取到members,但是通过制定patition消费拿不到members.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
* 根据topic 获取offset,该结果是不同group 组不同patition的当前消费offset
*
* @param topic
* @return Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>>
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>> listConsumerGroupOffsets(String topic)
throws InterruptedException, ExecutionException, TimeoutException {
Set<String> groupIds = this.listConsumerGroups(topic);
Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets = new HashMap<>();
groupIds.forEach(groupId -> {
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets = new HashSet<>();
try {
consumerPatitionOffsets = this.adminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get(30, TimeUnit.SECONDS).entrySet().stream()
.filter(entry -> topic.equalsIgnoreCase(entry.getKey().topic())).collect(Collectors.toSet());
;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
consumerGroupOffsets.put(groupId, consumerPatitionOffsets);
});
return consumerGroupOffsets;
}

/**
* 根据topic 获取topicConsumerGroupStates,其中未包含lag/logEndOffset(consumer可见offset)
*
* @param topic
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public List<TopicConsumerGroupState> describeConsumerGroups(String topic)
throws InterruptedException, ExecutionException, TimeoutException {
final List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
Set<String> groupIds = this.listConsumerGroups(topic);
Map<String, ConsumerGroupDescription> groupDetails = this.adminClient.describeConsumerGroups(groupIds).all()
.get(30, TimeUnit.SECONDS);

Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>> consumerPatitionOffsetMap = this
.listConsumerGroupOffsets(topic);

groupDetails.entrySet().forEach(entry -> {
String groupId = entry.getKey();
ConsumerGroupDescription description = entry.getValue();

TopicConsumerGroupState topicConsumerGroupState = new TopicConsumerGroupState();
topicConsumerGroupState.setGroupId(groupId);
topicConsumerGroupState.setConsumerMethod("broker");
topicConsumerGroupState.setConsumerGroupState(description.state());
// 获取group下不同patition消费offset信息
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets = consumerPatitionOffsetMap
.get(groupId);
List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();

if (!description.members().isEmpty()) {
// 获取存在consumer(memeber存在的情况)
partitionAssignmentStates = this.withMembers(consumerPatitionOffsets, topic, groupId, description);
} else {
// 获取不存在consumer
partitionAssignmentStates = this.withNoMembers(consumerPatitionOffsets, topic, groupId);
}
topicConsumerGroupState.setPartitionAssignmentStates(partitionAssignmentStates);
topicConsumerGroupStates.add(topicConsumerGroupState);
});

return topicConsumerGroupStates;
}

private List<PartitionAssignmentState> withMembers(
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets, String topic, String groupId,
ConsumerGroupDescription description) {
List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();
Map<Integer, Long> consumerPatitionOffsetMap = new HashMap<>();
consumerPatitionOffsets.forEach(entryInfo -> {
TopicPartition topicPartition = entryInfo.getKey();
OffsetAndMetadata offsetAndMetadata = entryInfo.getValue();
consumerPatitionOffsetMap.put(topicPartition.partition(), offsetAndMetadata.offset());
});
description.members().forEach(memberDescription -> {
memberDescription.assignment().topicPartitions().forEach(topicPation -> {
PartitionAssignmentState partitionAssignmentState = new PartitionAssignmentState();
partitionAssignmentState.setPartition(topicPation.partition());
partitionAssignmentState.setTopic(topic);
partitionAssignmentState.setClientId(memberDescription.clientId());
partitionAssignmentState.setGroup(groupId);
partitionAssignmentState.setConsumerId(memberDescription.consumerId());
partitionAssignmentState.setHost(memberDescription.host());
partitionAssignmentState.setOffset(consumerPatitionOffsetMap.get(topicPation.partition()));
partitionAssignmentStates.add(partitionAssignmentState);
});
});
return partitionAssignmentStates;
}

private List<PartitionAssignmentState> withNoMembers(
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets, String topic, String groupId) {
List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();
consumerPatitionOffsets.forEach(entryInfo -> {
TopicPartition topicPartition = entryInfo.getKey();
OffsetAndMetadata offsetAndMetadata = entryInfo.getValue();
PartitionAssignmentState partitionAssignmentState = new PartitionAssignmentState();
partitionAssignmentState.setPartition(topicPartition.partition());
partitionAssignmentState.setTopic(topic);
partitionAssignmentState.setGroup(groupId);
partitionAssignmentState.setOffset(offsetAndMetadata.offset());
partitionAssignmentStates.add(partitionAssignmentState);
});
return partitionAssignmentStates;
}
  1. 步骤3
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public Map<TopicPartition, Long>  consumerOffset (String gorupName,String topicName) {
    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1.101:9092");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, gorupName);
    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    @SuppressWarnings("resource")
    KafkaConsumers<String, String> consumer = new KafkaConsumers<>(consumerProps);
    KafkaConsumer<String, String> kafkaConsumer = consumer.subscribe(topicName);
    List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
    List<TopicPartition>topicPatitions = new ArrayList<>();
    patitions.forEach(patition->{
    TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
    topicPatitions.add(topicPartition);
    });
    Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
    return result;
    }
  2. 步骤4
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    private List<TopicConsumerGroupState> getBrokerConsumerOffsets(String clusterID, String topic) {
    List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
    try {
    topicConsumerGroupStates = this.describeConsumerGroups(topic);
    // 填充lag/logEndOffset
    topicConsumerGroupStates.forEach(topicConsumerGroupState -> {
    String groupId = topicConsumerGroupState.getGroupId();
    List<PartitionAssignmentState> partitionAssignmentStates = topicConsumerGroupState
    .getPartitionAssignmentStates();
    Map<TopicPartition, Long> offsetsMap = this.consumerOffset(clusterID, groupId, topic);
    for (Entry<TopicPartition, Long> entry : offsetsMap.entrySet()) {
    long logEndOffset = entry.getValue();
    for (PartitionAssignmentState partitionAssignmentState : partitionAssignmentStates) {
    if (partitionAssignmentState.getPartition() == entry.getKey().partition()) {
    partitionAssignmentState.setLogEndOffset(logEndOffset);
    partitionAssignmentState.setLag(getLag(partitionAssignmentState.getOffset(), logEndOffset));
    }
    }
    }
    });
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    return topicConsumerGroupStates;
    }

zookeeper消费方式 offset 获取

实现思路

  1. 根据topic 获取消费该topic的group
  2. 读取zookeeper上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition。
  3. 通过consumer获取LogEndOffset(可见offset)
  4. 将2与3处信息合并,计算Lag

代码设计

引入两个依赖,主要是为了读取zookeeper节点上的数据

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>
  1. 步骤1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public Set<String> listTopicGroups(String topic) {
    Set<String> groups = new HashSet<>();
    List<String> allGroups = zkClient.getChildren("/consumers");
    allGroups.forEach(group -> {
    if (zkClient.exists("/consumers/" + group + "/offsets")) {
    Set<String> offsets = new HashSet<>(zkClient.getChildren("/consumers/" + group + "/offsets"));
    if (offsets.contains(topic)) {
    groups.add(group);
    }
    }
    });
    return groups;
    }
  2. 步骤2
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public Map<String, Map<String, String>> getZKConsumerOffsets(String groupId, String topic) {
    Map<String, Map<String, String>> result = new HashMap<>();
    String offsetsPath = "/consumers/" + groupId + "/offsets/" + topic;
    if (zkClient.exists(offsetsPath)) {
    List<String> offsets = zkClient.getChildren(offsetsPath);
    offsets.forEach(patition -> {
    try {
    String offset = zkClient.readData(offsetsPath + "/" + patition, true);
    if (offset != null) {
    Map<String, String> map = new HashMap<>();
    map.put("offset", offset);
    result.put(patition, map);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    });
    }
    String ownersPath = "/consumers/" + groupId + "/owners/" + topic;
    if (zkClient.exists(ownersPath)) {
    List<String> owners = zkClient.getChildren(ownersPath);
    owners.forEach(patition -> {
    try {
    try {
    String owner = zkClient.readData(ownersPath + "/" + patition, true);
    if (owner != null) {
    Map<String, String> map = result.get(patition);
    map.put("owner", owner);
    result.put(patition, map);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    });
    }

    return result;
    }
  3. 步骤3


4. 步骤4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private List<TopicConsumerGroupState> getZKConsumerOffsets(String topic) {
final List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
Set<String> zkGroups = this.listTopicGroups(topic);
zkGroups.forEach(group -> {
Map<String, Map<String, String>> zkConsumerOffsets = this.getZKConsumerOffsets(group,
topic);

Map<TopicPartition, Long> offsetsMap = this.consumerOffset(group, topic);

TopicConsumerGroupState topicConsumerGroupState = new TopicConsumerGroupState();
topicConsumerGroupState.setGroupId(group);
topicConsumerGroupState.setConsumerMethod("zk");

List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();

Map<String, Long> offsetsTempMap = new HashMap<>(offsetsMap.size());
offsetsMap.forEach((k, v) -> {
offsetsTempMap.put(k.partition() + "", v);
});

zkConsumerOffsets.forEach((patition, topicDesribe) -> {
Long logEndOffset = offsetsTempMap.get(patition);
String owner = topicDesribe.get("owner");
long offset = Long.parseLong(topicDesribe.get("offset"));
PartitionAssignmentState partitionAssignmentState = new PartitionAssignmentState();
partitionAssignmentState.setClientId(owner);
partitionAssignmentState.setGroup(group);
partitionAssignmentState.setLogEndOffset(logEndOffset);
partitionAssignmentState.setTopic(topic);
partitionAssignmentState.setOffset(offset);
partitionAssignmentState.setPartition(Integer.parseInt(patition));
partitionAssignmentState.setLag(getLag(offset, logEndOffset));

partitionAssignmentStates.add(partitionAssignmentState);
});
topicConsumerGroupState.setPartitionAssignmentStates(partitionAssignmentStates);
topicConsumerGroupStates.add(topicConsumerGroupState);
});
return topicConsumerGroupStates;
}

参考

  1. 如何获取Kafka的消费者详情——从Scala到Java的切换
  2. Kafka的Lag计算误区及正确实现

前言

前端页面权限在日常开发中,主要有以下:

  • 登录授权,用户没有登录只能访问登录页面,如果处于登录状态则跳转到当前用户的默认首页;

  • 路由授权,当前登录用户的角色,如果对一个 URL 没有权限访问,则跳转到 403 页面;

  • 数据授权,当访问一个没有权限的 API,则跳转到 403 页面;

  • 操作授权,当页面中某个按钮或者区域没有权限访问则在页面中隐藏

本文主要针对ICE飞冰模板,介绍下如何开发权限管理,主要是使用,不涉及任何架构设计及原理,关于原理可以参考源码分析,或者参考本文参考链接。

实践

首先先安装依赖,因为飞冰权限是使用Authorized 权限组件实现了基本的权限管理方案。

1
2
npm install antd --save
npm install ant-design-pro@latest --save

因为ant-design-pro依赖antd,因此需要首先安装antd

在开始之前需要新增两个util,一个设置登录权限,一个为了重新渲染权限

authority.js

1
2
3
4
5
6
7
8
9
10
11
12
13
// use localStorage to store the authority info, which might be sent from server in actual project.
export function getAuthority() {
return localStorage.getItem('ice-pro-authority') || 'admin';
}

export function setAuthority(authority) {
return localStorage.setItem('ice-pro-authority', authority);
}

export function removeAuthority() {
return localStorage.removeItem('ice-pro-authority');
}

Authorized.js

1
2
3
4
5
6
7
8
9
10
11
12
13
import RenderAuthorized from 'ant-design-pro/lib/Authorized';
import { getAuthority } from './authority';

let Authorized = RenderAuthorized(getAuthority()); // eslint-disable-line

// 更新权限
const reloadAuthorized = () => {
Authorized = RenderAuthorized(getAuthority());
};

export { reloadAuthorized };
export default Authorized;

阅读全文 »
0%