Kafka 消息生产及消费原理

开篇

关于客户端生产和消费不在本文中探讨,本文主要集中在Kafka服务器端对消息如何存储和如何读取消息。

本文主要探讨如下问题:

  1. 服务器端接收到消息后如何处理?
  2. 如果我指定了一个offset,Kafka怎么查找到对应的消息?
  3. 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?

正文

服务器端接收到消息处理流程

Kafka Server接受消息处理流程

KafkaApis是Kafka server处理所有请求的入口,在 Kafka 中,每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。副本由ReplicaManager管理,对于消息的写入操作在Log与LogSement中进行的。

Log append处理流程

真正的日志写入,还是在 LogSegment 的 append() 方法中完成的,LogSegment 会跟 Kafka 最底层的文件通道、mmap 打交道。数据并不是实时持久化的,mmap只是写入了页缓存,并没有flush进磁盘,当满足if (unflushedMessages >= config.flushInterval) 才会真正写入磁盘。

Consumer 获取消息

指定offset,Kafka怎么查找到对应的消息

  1. 通过文件名前缀数字x找到该绝对offset 对应消息所在文件(log/index)
  2. offset-x为在文件中的相对偏移
  3. 通过相对偏移在index文件找到最近的消息的位置(使用二分查找)
  4. 在log文件从最近位置开始逐条寻找

首先根据offset获取LogSegment,即var segmentEntry = segments.floorEntry(startOffset)segmentEntry 是个抽象的对象,包含log、index,timeindex等对象。

接下来在index中获取position(物理位置),即val startOffsetAndSize = translateOffset(startOffset)

源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
//二分查找index文件得到下标,再根据算法计算最近的position物理位置
val mapping = offsetIndex.lookup(offset)
//根据计算的起始位置开始遍历获取准确的position及size
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
}
return null;
}

这里说明一下index中根据下标计算偏移量地址与物理地址:物理地址=n*8+4,偏移量地址=n*8

1
2
3
4
5
6
7
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)

private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)

override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

最后根据position在log中截取相应的message,log.slice(startPosition, fetchSize)

1
2
3
4
5
6
7
8
public FileRecords slice(int position, int size) throws IOException {
//省略校验代码
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
if (end < 0 || end >= start + sizeInBytes())
end = start + sizeInBytes();
return new FileRecords(file, channel, this.start + position, end, true);
}

看到searchForOffsetWithSize有个疑问,上面代码显示返回给客户端的records是批量的,假如提交的offset是这批次的中间一个,那么返回给Consumer的message是有已经被消费过的信息,我感觉不可能是这样的,查看了server端代码,再未发现删除已消费的message逻辑。

Kafka设计者真的这么蠢??

随后我查看了客户端consumer源码有发现到如下代码:if (record.offset() >= nextFetchOffset)有对大于指定offset消息抛弃的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Record nextFetchedRecord() {
while (true) {
if (records == null || !records.hasNext()) {
//略
} else {
Record record = records.next();
// skip any records out of range
if (record.offset() >= nextFetchOffset) {
// we only do validation when the message should not be skipped.
maybeEnsureValid(record);

// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
}
}
}
}

至此得出结论:为了提高server端的响应速度,没有对批量消息进行解压缩,然后精准返回指定信息,而是在客户端解压消息,然后再抛弃已处理过的message,这样就不会存在重复消费的问题。这个问题纠结了半天,不知是否正确,仅是自己的理解,如果有哪位同学对这里有研究,欢迎指出问题。

指定timestamp,Kafka怎么查找到对应的消息

类似根据offset获取消息,不过中间是从timeindex中获取position,然后遍历对比timestamp,获取相应的消息。

参考

1.Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)

2.Kafka 源码解析之 Server 端如何处理 Fetch 请求(十三)

Kafka 逻辑架构设计

开篇

本文主要探讨如下问题;

  1. Kafka架构设计
  2. Kafka的日志目录结构

正文

Kafka架构设计

kafka为分布式消息系统,由多个broker组成。消息是通过topic来分类的,一个topic下存在多个partition,每个partition又由多个segment构成。

发布订阅者模式

kafka集群架构

主题逻辑结构

Kafka的日志目录结构

开篇

在开始这篇之前,先抛出问题,这章主要通过研究consumer源码解决如下问题:

消费再均衡的原理是什么?

正文

消费再均衡的原理

主要分为四步

1.FIND_COORDINATOR

根据hash(group_id)%consumerOffsetPartitionNum查找出对应的partition,再查找出该partitiom对应的leader所在的broker,即可获得GroupCoordinator

2.JOIN_GROUP

在这一步主要完成消费组leader选举(获取第一个加入的组为leader,如果没有,选择map中的第一个node)和分区分配策略

3.SYNC_GROUP

客户端向GroupCoordinator发起同步请求,获取步骤2的分区分配方案。

4.HEARTBEAT

GroupCoordinator通过心跳来确定从属关系。

参考

kafka consumer 源码分析(二)分区分配策略

开篇

在开始这篇之前,先抛出问题,这章主要通过研究consumer源码解决如下问题:

分区分配策略是什么样的?

正文

分区分配策略

这里的分区分配策略仅仅只讨论consumer,关于producer端的分区不在本次探讨范围内。

consumer端分区分配策略官方有三种:RangeAssignor(默认值),RoundRobinAssignor,StickyAssignor

通常是通过partition.assignment.strategy配置生效。

RangeAssignor

1
2
3
4
5
6
7
8
9
10
11
12
org.apache.kafka.clients.consumer.RangeAssignor#assign

Collections.sort(consumersForTopic);//consumer按字典排序
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic,numPartitionsForTopic);
int i = 0;
for(int n = consumersForTopic.size(); i < n; ++i) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
((List)assignment.get(consumersForTopic.get(i))).addAll(partitions.subList(start, start + length));
}

源码太多,这里只贴出核心代码,需要的按照以上目录去找。

例如

有一个topic:truman partition:10 consumer:consumer0,consumer1,consumer2。那么numPartitionsPerConsumer=10/3=3,consumersWithExtraPartition=10%3=1。分配逻辑为:当i=0,按字典顺序,先分配consumer0,start=0,length=4,即consumer1分配tp0,tp1,tp2,tp3。同理最终方案为:

1
2
3
consumer0->tp0,tp1,tp2,tp3
consumer1->tp4,tp5,tp6
consumer2->tp7,tp8,tp9

从源码我们能得出如何分配的,还能得出一个结论,consumer分配partition和consumerId有关系,字典顺序靠前的,有可能会多分配partition。

为什么这么说?上面的源码其实没有给出,但能看出来consumersForTopic集合是按字典排序的,那么只用知道consumersForTopic是什么样的集合就能知道了,同样在该源码中可以看到put(res, topic, consumerId);,说明该集合是consumerId集合。因此可以得出该结论!

一句话总结该分配策略,就是尽可能的均分

RoundRobinAssignor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
org.apache.kafka.clients.consumer.RoundRobinAssignor
//assigner存放了所有的consumer
CircularIterator<String> assigner = new CircularIterator(Utils.sorted(subscriptions.keySet()));
//所有的consumer订阅的所有的TopicPartition的List
Iterator var9 = this.allPartitionsSorted(partitionsPerTopic, subscriptions).iterator();

while(var9.hasNext()) {
TopicPartition partition = (TopicPartition)var9.next();
String topic = partition.topic();
// 如果当前这个assigner(consumer)没有订阅这个topic,直接跳过
while(!((Subscription)subscriptions.get(assigner.peek())).topics().contains(topic)) {
assigner.next();
}
//跳出循环,表示终于找到了订阅过这个TopicPartition对应的topic的assigner
//将这个partition分派给对应的assigner
((List)assignment.get(assigner.next())).add(partition);
}

CircularIterator是一个封装类,让一个有限大小的list变成一个RoundRobin方式的无限遍历

RoundRobinAssignor与RangeAssignor最大的区别,是进行分区分配的时候不再逐个topic进行,即不是为某个topic完成了分区分派以后,再进行下一个topic的分区分派,而是首先将这个group中的所有consumer订阅的所有的topic-partition按顺序展开,然后,依次对于每一个topic-partition,在consumer进行round robin,为这个topic-partition选择一个consumer。

例如

有两个topic :t0和t1,每个topic都有3个分区,分区编号从0开始,因此展开以后得到的TopicPartition的List是:[t0p0,t0p1,t0p2,t1p0,t1p1,t1p2]

我们有两个consumer C0,C1,他们都同时订阅了这两个topic。

然后,在这个展开的TopicParititon的List开始进行分派:

t0p0 分配给C0
t0p1 分配给C1
t0p2 分配给C0
t1p0 分配给C1
t1p1 分配给C0
t1p1 分配给C1
从以上分配流程,我们可以很清楚地看到RoundRobinAssignor的两个基本特征:

  1. 对所有topic的topic partition求并集
  2. 基于consumer进行RoundRobin轮询

最终分配方案:

1
2
C0->[t0p0, t0p2, t1p1]
C1->[t0p1, t1p0, t1p2]

RoundRobinAssignor与RangeAssignor的重大区别,就是RoundRobinAssignor是在Group中所有consumer所关注的全体topic中进行分派,而RangeAssignor则是依次对每个topic进行分派。

那么什么时候选择RoundRobinAssignor,我们再看如下案例:

假如现在有两个Topic:Ta和Tb ,每个topic都有两个partition,同时,有两个消费者Ca和Cb与Cc,这三个消费者全部订阅了这两个主题,那么,通过两种不同的分区分派算法得到的结果将是:

RangeAssignor:

先对Ta进行处理:会将TaP0分派给Ca,将TaP1分派给Cb

在对Tb进行处理:会将TbP0分派给Ca,将TbP1分派给Cb

最终结果是:

1
2
3
Ca:[TaP0,TbP0]
Cb:[TaP1,TbP1]
Cc:[]

RoundRobinAssignor:

将所有TopicPartition展开,变成:[TaP0,TaP1,TbP0,TbP1]

将TaP0分派给Ca,将TaP1分派给Cb,将TbP0分派给Cc,将TbP1分派给Ca

最终分派结果是:

1
2
3
Ca:[TaP0,TbP1]
Cb:[TaP1]
Cc:[TbP0]

StickyAssignor

这个分配器源码较多,实现也要优于RoundRobinAssignor与RangeAssignor

它的主要特点是分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,均衡优先于上一次分配结果。鉴于这两个目标。

参考

  1. Kafka为Consumer分派分区:RangeAssignor和RoundRobinAssignor
  2. Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor

kafka consumer 源码分析(一)Consumer处理流程

开篇

在开始这篇之前,先抛出问题,这章主要通过研究consumer源码解决如下问题:

  1. consumer处理流程
  2. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

正文

Consumer处理流程

核心组件

ConsumerCoordinator: 消费者的协调者, 管理消费者的协调过程

  • 维持coordinator节点信息(也就是对consumer进行assignment的节点)
  • 维持当前consumerGroup的信息, 当前consumer已进入consumerGroup

Fetcher: 数据请求类

ConsumerNetworkClient: 消费者的网络客户端,负责网络传输的流程

SubscriptionState: 订阅状态类

Metadata: 集群的元数据管理类,使用租约机制

工作流程

消费者提交消费位移源码追究

查看官方API文档,在描述如何手动提交offset,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//此处为offset+1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}

The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed

意思是在调用commitSync(offsets)必须是当前offset+1。


其实我很想知道,自动提交offset方式(包含不指定offset,例如:commitSync()),具体源代码在哪里**+1**。

经过孜孜不倦的翻阅代码,感觉好像看懂了一点点,就把自己看懂的那点写出来,如果不对的话,欢迎看到的同学帮忙纠正一下,当不胜感激!

这里就只探究自动提交offset的情形,因为涉及的代码较多,这里只给出相应的关键代码。

1
2
3
4
5
6
7
8
9
10
public void commitSync(Duration timeout) {
try {
//subscriptions.allConsumed()获取需要提交的offset
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) {
...略
}
} finally {
release();
}
}
1
2
3
4
5
6
7
8
9
//可以看出allConsumed是从state.value().position中获取相应partition的offset
public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
if (state.value().hasValidPosition())
allConsumed.put(state.topicPartition(), new OffsetAndMetadata(state.value().position));
}
return allConsumed;
}

下来只用查看到什么时候更新state中信息,即可知道提交的offset是如何计算的,首先我们要知道一个知识点,consumer非多线程处理逻辑,因此每次提交offset都是在poll中处理的,因此我们需要查看poll中的逻辑,接着往下看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
|
V
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
|
V
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
|
V
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
//这里就是所谓的offset+1,也就是开头问题的答案!
subscriptions.position(partitionRecords.partition, nextOffset);

这里其实已经看到答案,但是可能有同学还会问,不是说更新state吗?这里更新的是 subscriptions.position,接着往下看

1
2
3
4
5
6
7
8
9
10
public void position(TopicPartition tp, long offset) {
assignedState(tp).position(offset);
}

private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
}

终于一切真相大白!看了两天源码,累的要死!Enjoy!

参考

  1. The committed offset should always be the offset of the next message that your application will read
  2. KafkaConsumer 流程解析

前言

零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。

零拷贝操作减少了在用户空间与内核空间之间切换模式的次数

举例来说,如果要读取一个文件并通过网络发送它,传统方式下如下图,传统的I/O操作进行了4次用户空间与内核空间的上下文切换,以及4次数据拷贝。其中4次数据拷贝中包括了2次DMA拷贝和2次CPU拷贝。通过零拷贝技术完成相同的操作,减少了在用户空间与内核空间交互,并且不需要CPU复制数据。

linux中零拷贝技术

Linux系统的“用户空间”和“内核空间”

从Linux系统上看,除了引导系统的BIN区,整个内存空间主要被分成两个部分:内核空间(Kernel space)、用户空间(User space)。

“用户空间”和“内核空间”的空间、操作权限以及作用都是不一样的。

内核空间是Linux自身使用的内存空间,主要提供给程序调度、内存分配、连接硬件资源等程序逻辑使用;用户空间则是提供给各个进程的主要空间。

用户空间不具有访问内核空间资源的权限,因此如果应用程序需要使用到内核空间的资源,则需要通过系统调用来完成:从用户空间切换到内核空间,然后在完成相关操作后再从内核空间切换回用户空间。

Linux 中零拷贝技术的实现方向

  • 直接 I/O:对于这种数据传输方式来说,应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输。这种方式依旧存在用户空间和内核空间的上下文切换,但是硬件上的数据不会拷贝一份到内核空间,而是直接拷贝至了用户空间,因此直接I/O不存在内核空间缓冲区和用户空间缓冲区之间的数据拷贝。

  • 在数据传输过程中,避免数据在用户空间缓冲区和系统内核空间缓冲区之间的CPU拷贝,以及数据在系统内核空间内的CPU拷贝。本文主要讨论的就是该方式下的零拷贝机制。

  • copy-on-write(写时复制技术):在某些情况下,Linux操作系统的内核空间缓冲区可能被多个应用程序所共享,操作系统有可能会将用户空间缓冲区地址映射到内核空间缓存区中。当应用程序需要对共享的数据进行修改的时候,才需要真正地拷贝数据到应用程序的用户空间缓冲区中,并且对自己用户空间的缓冲区的数据进行修改不会影响到其他共享数据的应用程序。所以,如果应用程序不需要对数据进行任何修改的话,就不会存在数据从系统内核空间缓冲区拷贝到用户空间缓冲区的操作。

mmap

1
2
buf = mmap(diskfd, len);
write(sockfd, buf, len);

应用程序调用mmap(),磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去。

使用mmap替代read很明显减少了一次拷贝,当拷贝数据量很大时,无疑提升了效率。但是使用mmap是有代价的。当你使用mmap时,你可能会遇到一些隐藏的陷阱。例如,当你的程序map了一个文件,但是当这个文件被另一个进程截断(truncate)时, write系统调用会因为访问非法地址而被SIGBUS信号终止。SIGBUS信号默认会杀死你的进程并产生一个coredump,如果你的服务器这样被中止了,那会产生一笔损失。

当然也有办法解决,本文忽略解决办法。

sendfile

1
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

sendfile实现的零拷贝I/O只使用了2次用户空间与内核空间的上下文切换,以及3次数据的拷贝。其中3次数据拷贝中包括了2次DMA拷贝和1次CPU拷贝。

splice

sendfile只适用于将数据从文件拷贝到套接字上,限定了它的使用范围。Linux在2.6.17版本引入splice系统调用,用于在两个文件描述符中移动数据:

1
2
3
#define _GNU_SOURCE         /* See feature_test_macros(7) */
#include <fcntl.h>
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);

java中如何使用零拷贝

DMA简介

io读写的方式:中断,DMA

DMA 直接内存存取,是允许外设组件将I/O数据直接传送到主存储器中并且传输不需要CPU的参与,以此将CPU解放出来去完成其他的事情。,相较于中断方式,减少cpu中断次数,不用cpu拷贝数据。

主要流程如下:

  1. 用户进程发起数据读取请求
  2. 系统调度为该进程分配cpu
  3. cpu向DMA发送io请求
  4. 用户进程等待io完成,让出cpu
  5. 系统调度cpu执行其他任务
  6. 数据写入至io控制器的缓冲寄存器
  7. DMA不断获取缓冲寄存器中的数据(需要cpu时钟)
  8. 传输至内存(需要cpu时钟)
  9. 所需的全部数据获取完毕后向cpu发出中断信号

更多参考

参考

  1. 浅析Linux中的零拷贝技术
  2. 零复制
  3. 零拷贝原理-数据的收发-软中断和DMA
  4. 浅谈 Linux下的零拷贝机制

Kafka producer源码分析

前言

在开始文章之前,需要解释是一下为什么要研究producer源码。

为什么要研究producer源码

通常producer使用都很简单,初始化一个KafkaProducer实例,然后调用send方法就好,但是我们有了解后面是如何发送到kafka集群的吗?其实我们不知道,其次,到底客户端有几个线程?我们不知道。还有producer还能做什么?我们同样不知道。本篇文章就是想回答一下上面提出的几个问题,能力有限,如有错误,欢迎指出!

架构

在介绍客户端架构之前,先回答一个问题

producer到底存在几个线程?2个 Main threadsender,其中sender线程负责发送消息,main 线程负责interceptor、序列化、分区等其他操作。

  • Producer首先使用用户主线程将待发送的消息封装进一个ProducerRecord类实例中。
  • 进行interceptor、序列化、分区等其他操作,发送到Producer程序中RecordAccumulator中。
  • Producer的另一个工作线程(即Sender线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次的内,统一发送给对应的broker中。

消息发送源码分析

sender发送的时机是由两个指标决定的,一个是时间linger.ms,一个是数据量大小 batch.size

sender线程主要代码

run->run(long now)->sendProducerData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
//result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// 暂且只关心result.nextReadyCheckDelayMs
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);

pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}

1
2
3
long pollTimeout = sendProducerData(now);
// poll最终会调用selector,pollTimeout也就是selector阻塞的时间
client.poll(pollTimeout, now);

selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Check for data, waiting up to the given timeout.
*
* @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
* @return The number of keys ready
*/
private int select(long timeoutMs) throws IOException {
if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");

if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
// 初始化为最大值
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 和linger.ms有关
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

我们可以从实例化一个新的KafkaProducer开始分析(还没有调用send方法),在sender线程调用accumulator#ready(..)时候,会返回result,其中包含selector可能要阻塞的时间。由于还没有调用send方法,所以Deque为空,所以result中包含的nextReadyCheckDelayMs也是最大值,这个时候selector会一直阻塞。

然后我们调用send方法,该方法会调用waitOnMetadata,waitOnMetadata会调用sender.wakeup(),所以会唤醒sender线程。

这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程又运行到accumulator#ready(..),当Deque有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有关的值。也就是时候selector会最多阻塞lingger.ms后就返回,然后再次轮询。(根据源码分析,第一条消息会创建batch,因此newBatchCreated为true,同样会触发唤醒sender)

如果有一个ProducerBatch满了,也会调用Sender#wakeup(..),

KafkaProducer#doSend(…)

1
2
3
4
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}

所以综上所述:只要满足linger.ms和batch.size满了就会激活sender线程来发送消息。

客户端顺序性保证

因为消息发送是批量发送的,那么就有可能存在上一批发送失败,接下来一批发送成功。即会出现数据乱序。

max.in.flight.requests.per.connection=1,限制客户端在单个连接上能够发送的未响应请求的个数。如果业务对顺序性有很高的要求,将该参数值设置为1。

总结

kafka producer大概流程如上,数据发送是批量的,最后是按Node发送响应的消息的。即其中Integer即为broker id。消息在RecordAccumulator中还是按TopicPartition存放的,但是在最终发送会作相应的转换。

sender发送的时机是由两个指标决定的,一个是时间linger.ms,一个是数据量大小 batch.size

了解producer发送流程,这样就可以让我们更改的生产消息,使用更多的特性,例如拦截器可以做各种特殊处理的场景。

参考

1. kafka producer的batch.size和linger.ms

前言

为了深入研究kafka运行原理和架构,很有必要搭建开发环境,这样我们就很方便的debug 服务。

前期准备

  1. Gradle 5.0+
  2. jdk8
  3. Scala 2.12
  4. idea scale plugin

配置/运行

  • 首先执行在源码目录下执行gradle
  • 然后build ./gradlew jar
  • 最后生成idea工程./gradlew idea

运行MirrorMaker

运行kafka server

因为kafka依赖zookeeper,在开始之前请先启动一个zookeeper服务,本文章略。

错误总结

无log

下载log4j
slf4j-log4j12

然后在modules中找到core/main,将这两个依赖导入进来即可

最后将config目录下的log4j.properties复制到core/main/scala目录下(如果还是无法看到log,可以将该文件复制到out/production/classes目录下)

执行报错,无法找到相关类

执行./gradlew jar即可解决。

Core源码模块解读

模块名程说明
adminkafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
api主要负责数据交互,客户端与服务端交互数据的编码与解码。
cluster这里包含多个实体类,有Broker,Cluster,Partition,Replica。其中一个Cluster由多个Broker组成,一个Broker包含多个Partition,一个Topic的所有Partition分布在不同的Broker中,一个Replica包含都个Partition。
common这是一个通用模块,其只包含各种异常类以及错误验证。
consumer消费者处理模块,负责所有的客户端消费者数据和逻辑处理。
controller此模块负责中央控制器的选举,分区的Leader选举,Replica的分配或其重新分配,分区和副本的扩容等。
coordinator负责管理部分consumer group和他们的offset。
log这是一个负责Kafka文件存储模块,负责读写所有的Kafka的Topic消息数据。
message封装多条数据组成一个数据集或者压缩数据集。
metrics负责内部状态的监控模块。
network该模块负责处理和接收客户端连接,处理网络时间模块。
security负责Kafka的安全验证和管理模块。
serializer序列化和反序列化当前消息内容。
server 该模块涉及的内容较多,有Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader的选举,Admin和Replica的管理,以及各种元数据的缓存等内容。
tools阅读该模块,就是一个工具模块,涉及的内容也比较多。有导出对应consumer的offset值;导出LogSegments信息,以及当前Topic的log写的Location信息;导出Zookeeper上的offset值等内容。
utils各种工具类,比如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类,Mx4jLoader监控加载器,ReplicationUtils复制集工具类,CommandLineUtils命令行工具类,以及公共日志类等内容。

参考

  1. Apache Kafka
  2. 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置

前言

写这篇文章的目的是为了记录一下学习笔记,其次为了能够在复习的时候快速掌握相关知识。本篇记录java8系列专栏之Lambda与Stream API

正文

Lambda

什么是Lambda

1
2
List<String>list = Arrays.asList("a","c","b");
Collections.sort(list, (o1,o2)->o1.compareTo(o2));

以下方式都是常用的lambda使用方式

1
2
3
4
str->str.toLowerCase()
(o1,o2)->o1.compareTo(o2)
(o1,o2)->{return o1.compareTo(o2)}
(String o1,String o2)->{return o1.compareTo(o2)}

怎么用,哪里用

函数接口声明既可使用。例如Runnable,Comparator都是函数接口。用@FunctionalInterface声明的都是函数接口

实现原理

首先需要明确说明的是lambda没有使用匿名内部类去实现。

Java 8设计人员决定使用在Java 7中添加的invokedynamic指令来推迟在运行时的翻译策略。当javac编译代码时,它捕获lambda表达式并生成invokedynamic调用站点(称为lambda工厂)。调用invokedynamic调用站点时,返回一个函数接口实例,lambda将被转换到这个函数接口

使用invokedynamic指令,运行时调用LambdaMetafactory.metafactory动态的生成内部类,实现了接口,内部类里的调用方法块并不是动态生成的,只是在原class里已经编译生成了一个静态的方法,内部类只需要调用该静态方法

参考1
参考2

内置函数接口

函数式接口 函数描述符 原始类型特化
Predicate<T> T->boolean IntPredicate,LongPredicate, DoublePredicate
Consumer<T> T->void IntConsumer,LongConsumer, DoubleConsumer
Function<T,R> T->R IntFunction<R>, IntToDoubleFunction,
IntToLongFunction, LongFunction<R>,
LongToDoubleFunction, LongToIntFunction,
DoubleFunction<R>, ToIntFunction<T>,
ToDoubleFunction<T>, ToLongFunction<T>
Supplier<T> ()->T BooleanSupplier,IntSupplier, LongSupplier, DoubleSupplier
UnaryOperator<T> T->T IntUnaryOperator, LongUnaryOperator, DoubleUnaryOperator
BinaryOperator<T> (T,T)->T IntBinaryOperator, LongBinaryOperator, DoubleBinaryOperator
BiPredicate<L,R> (L,R)->boolean
BiConsumer<T,U> (T,U)->void ObjIntConsumer<T>, ObjLongConsumer<T>, ObjDoubleConsumer<T>
BiFunction<T,U,R> (T,U)->R ToIntBiFunction<T,U>, ToLongBiFunction<T,U>, ToDoubleBiFunction<T,U>

Stream API

操作 类型 返回类型 使用的类型/函数式接口 函数描述符
filter 中间 Stream<T> Predicate<T> T -> boolean
distinct 中间 Stream<T>
skip 中间 Stream<T> long
map 中间 Stream<R> Function<T, R> T -> R
flatMap 中间 Stream<R> Function<T, Stream<R>> T -> Stream<R>
limit 中间 Stream<T> long
sorted 中间 Stream<T> Comparator<T> (T, T) -> int
anyMatch 终端 boolean Predicate<T> T -> boolean
noneMatch 终端 boolean Predicate<T> T -> boolean
allMatch 终端 boolean Predicate<T> T -> boolean
findAny 终端 Optional<T>
findFirst 终端 Optional<T>
forEach 终端 void Consumer<T> T -> void
collect 终端 R Collector<T, A, R>
reduce 终端 Optional<T> BinaryOperator<T> (T, T) -> T
count 终端 long

Collector 收集

Collectors 类的静态工厂方法

工厂方法 返回类型 用途 示例
toList List<T> 把流中所有项目收集到一个 List List<Project> projects = projectStream.collect(toList());
toSet Set<T> 把流中所有项目收集到一个 Set,删除重复项 Set<Project> projects = projectStream.collect(toSet());
toCollection Collection<T> 把流中所有项目收集到给定的供应源创建的集合 Collection<Project> projects = projectStream.collect(toCollection(), ArrayList::new);
counting Long 计算流中元素的个数 long howManyProjects = projectStream.collect(counting());
summingInt Integer 对流中项目的一个整数属性求和 int totalStars = projectStream.collect(summingInt(Project::getStars));
averagingInt Double 计算流中项目 Integer 属性的平均值 double avgStars = projectStream.collect(averagingInt(Project::getStars));
summarizingInt IntSummaryStatistics 收集关于流中项目 Integer 属性的统计值,例如最大、最小、 总和与平均值 IntSummaryStatistics projectStatistics = projectStream.collect(summarizingInt(Project::getStars));
joining String 连接对流中每个项目调用 toString 方法所生成的字符串 String shortProject = projectStream.map(Project::getName).collect(joining(", "));
maxBy Optional<T> 按照给定比较器选出的最大元素的 Optional, 或如果流为空则为 Optional.empty() Optional<Project> fattest = projectStream.collect(maxBy(comparingInt(Project::getStars)));
minBy Optional<T> 按照给定比较器选出的最小元素的 Optional, 或如果流为空则为 Optional.empty() Optional<Project> fattest = projectStream.collect(minBy(comparingInt(Project::getStars)));
reducing 归约操作产生的类型 从一个作为累加器的初始值开始,利用 BinaryOperator 与流中的元素逐个结合,从而将流归约为单个值 int totalStars = projectStream.collect(reducing(0, Project::getStars, Integer::sum));
collectingAndThen 转换函数返回的类型 包含另一个收集器,对其结果应用转换函数 int howManyProjects = projectStream.collect(collectingAndThen(toList(), List::size));
groupingBy Map<K, List<T>> 根据项目的一个属性的值对流中的项目作问组,并将属性值作 为结果 Map 的键 Map<String,List<Project>> projectByLanguage = projectStream.collect(groupingBy(Project::getLanguage));
partitioningBy Map<Boolean,List<T>> 根据对流中每个项目应用断言的结果来对项目进行分区 Map<Boolean,List<Project>> vegetarianDishes = projectStream.collect(partitioningBy(Project::isVegetarian));

参考

  1. Lambdas
  2. Lambda expressions
  3. Java 8 动态类型语言Lambda表达式实现原理分析
  4. Stream

Optional类的方法

方法 描述
empty 返回一个空的 Optional 实例
filter 如果值存在并且满足提供的断言, 就返回包含该值的 Optional 对象;否则返回一个空的 Optional 对象
map 如果值存在,就对该值执行提供的 mapping 函数调用
flatMap 如果值存在,就对该值执行提供的 mapping 函数调用,返回一个 Optional 类型的值,否则就返 回一个空的 Optional 对象
get 如果该值存在,将该值用 Optional 封装返回,否则抛出一个 NoSuchElementException 异常
ifPresent 如果值存在,就执行使用该值的方法调用,否则什么也不做
isPresent 如果值存在就返回 true,否则返回 false
of 将指定值用 Optional 封装之后返回,如果该值为 null,则抛出一个 NullPointerException 异常
ofNullable 将指定值用 Optional 封装之后返回,如果该值为 null,则返回一个空的 Optional 对象
orElse 如果有值则将其返回,否则返回一个默认值
orElseGet 如果有值则将其返回,否则返回一个由指定的 Supplier 接口生成的值
orElseThrow 如果有值则将其返回,否则抛出一个由指定的 Supplier 接口生成的异常
0%