Window下Docker Desktop搭建 Kubernetes

前言

本节主要讲解如何启用Kubernetes,以及如何搭建Kubernetes Dashboard。如果排除掉网络原因,本文没有任何意思,因为众所周知的原因,谷歌资源被墙,所以才存在搭建问题,这也就是写本文的原因。

因为不了解Kubernetes能做什么,所以才想着先搭建一个环境,玩一玩,看看这个到底能做什么。

准备

Docker Desktop 版本:2.1.0.1

支持Kubernetes版本:v1.14.3

查看这个版本很重要,具体查看About Docker Desktop菜单即可知道支持哪个版本的k8s。

首先安装Docker Desktop

安装Docker Desktop步骤略….

安装好Docker Desktop先别启用k8s。

其次拉取镜像

先把需要的镜像拉取下来,可以写个docker-k8s-images.bat,放入以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-proxy:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-proxy:v1.14.3 k8s.gcr.io/kube-proxy:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-proxy:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-scheduler:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-scheduler:v1.14.3 k8s.gcr.io/kube-scheduler:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-scheduler:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-controller-manager:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-controller-manager:v1.14.3 k8s.gcr.io/kube-controller-manager:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-controller-manager:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-apiserver:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-apiserver:v1.14.3 k8s.gcr.io/kube-apiserver:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-apiserver:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1 k8s.gcr.io/pause:3.1
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/etcd:3.3.10
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/etcd:3.3.10 k8s.gcr.io/etcd:3.3.10
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/etcd:3.3.10

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/coredns:1.3.1
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/coredns:1.3.1 k8s.gcr.io/coredns:1.3.1
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/coredns:1.3.1


docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kubernetes-dashboard-amd64:v1.10.1
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kubernetes-dashboard-amd64:v1.10.1 k8s.gcr.io/kubernetes-dashboard-amd64:v1.10.1
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kubernetes-dashboard-amd64:v1.10.1

其中kubernetes-dashboard-amd64为Kubernetes Dashboard,不是必须镜像以外,其他都是k8s必须的镜像。

最后启动Kubernetes

在Kubernetes菜单选项里,勾选所有的选项。然后执行kubectl get pods --namespace kube-system查看k8s相关容器是否启动。当启动必须的7个容器以后,再查看Docker Desktop左下角Kubernetes状态即为绿色。

1
2
3
4
5
6
7
8
9
C:\Users\lab>kubectl get pods --namespace kube-system
NAME READY STATUS RESTARTS AGE
coredns-fb8b8dccf-4w2ht 1/1 Running 1 17m
coredns-fb8b8dccf-b5vdv 1/1 Running 1 17m
etcd-docker-desktop 1/1 Running 0 16m
kube-apiserver-docker-desktop 1/1 Running 0 16m
kube-controller-manager-docker-desktop 1/1 Running 0 16m
kube-proxy-7w9lw 1/1 Running 0 17m
kube-scheduler-docker-desktop 1/1 Running 0 16m

搭建Kubernetes Dashboard

步骤1

部署Dashboard ,执行以下命令:

kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v1.10.1/src/deploy/recommended/kubernetes-dashboard.yaml

注意不同Dashboard选择不同的版本配置文件,这里的地址可以在kubernetes/dashboard/releases获取不同版本文件。

步骤2 Creating sample user

新建dashboard-adminuser.yaml文件,填写如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: ServiceAccount
metadata:
name: admin-user
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: admin-user
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: admin-user
namespace: kube-system

步骤3 Bearer Token

步骤2完成,执行kubectl proxy既可以访问Dashboard,但是需要登录。执行如下命令:

kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | grep admin-user | awk '{print $1}')

生成如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
Name:         admin-user-token-6gl6l
Namespace: kube-system
Labels: <none>
Annotations: kubernetes.io/service-account.name=admin-user
kubernetes.io/service-account.uid=b16afba9-dfec-11e7-bbb9-901b0e532516

Type: kubernetes.io/service-account-token

Data
====
ca.crt: 1025 bytes
namespace: 11 bytes
token: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLTZnbDZsIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiJiMTZhZmJhOS1kZmVjLTExZTctYmJiOS05MDFiMGU1MzI1MTYiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.M70CU3lbu3PP4OjhFms8PVL5pQKj-jj4RNSLA4YmQfTXpPUuxqXjiTf094_Rzr0fgN_IVX6gC4fiNUL5ynx9KU-lkPfk0HnX8scxfJNzypL039mpGt0bbe1IXKSIRaq_9VW59Xz-yBUhycYcKPO9RM2Qa1Ax29nqNVko4vLn1_1wPqJ6XSq3GYI8anTzV8Fku4jasUwjrws6Cn6_sPEGmL54sq5R4Z5afUtv-mItTmqZZdxnkRqcJLlg2Y8WbCPogErbsaCDJoABQ7ppaqHetwfM_0yMun6ABOQbIwwl8pspJhpplKwyo700OSpvTT9zlBsu-b35lzXGBRHzv5g_RA

现在访问:

1
http://localhost:8001/api/v1/namespaces/kube-system/services/https:kubernetes-dashboard:/proxy/

复制以上生成的token,填入token,即可显示如下页面:

至此k8s部署成功!Enjoy!

参考

  1. 如何成功启动 Docker 自带的 Kubernetes?
  2. kubernetes/dashboard
  3. Creating-sample-user

kafka幂等性和事务使用及实现原理

开篇

在开始这篇之前,先抛出问题,这章解决如下问题:

  1. 如何开启幂等性?
  2. 如何使用事务?
  3. 幂等性的原理
  4. 事务实现原理

正文

Producer 幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,但是这里的幂等性是有条件的:

  • 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。

使用方式:props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

当幂等性开启的时候acks即为all。如果显性的将acks设置为0,-1,那么将会报错Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

示例:

1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world.")).get();
kafkaProducer.close();

幂等性原理

幂等性是通过两个关键信息保证的,PID(Producer ID)和sequence numbers。

  • PID 用来标识每个producer client
  • sequence numbers 客户端发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复

producer初始化会由server端生成一个PID,然后发送每条信息都包含该PID和sequence number,在server端,是按照partition同样存放一个sequence numbers 信息,通过判断客户端发送过来的sequence number与server端number+1差值来决定数据是否重复或者漏掉。

通常情况下为了保证数据顺序性,我们可以通过max.in.flight.requests.per.connection=1来保证,这个也只是针对单实例。在kafka2.0+版本上,只要开启幂等性,不用设置这个参数也能保证发送数据的顺序性。

为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5

其实这里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 发送失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)。

Kafka 事务性

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id-0");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "key"+i, "hello world.")).get();
}
kafkaProducer.commitTransaction();
kafkaProducer.close();
//Consumer
Properties config = new Properties();
config.put("group.id", "test11");
config.put("bootstrap.servers", "127.0.0.1:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe(Arrays.asList(TOPIC));
boolean isConsumer = true;
while (isConsumer) {
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer message: key =" + record.key() + " value:" + record.value());
}
}
consumer.close();
}

事务实现原理

(1)查找TransactionCoordinator

通过transaction_id 找到TransactionCoordinator,具体算法是Utils.abs(transaction_id.hashCode %transactionTopicPartitionCount ),获取到partition,再找到该partition的leader,即为TransactionCoordinator。

(2)获取PID

凡是开启幂等性都是需要生成PID(Producer ID),只不过未开启事务的PID可以在任意broker生成,而开启事务只能在TransactionCoordinator节点生成。这里只讲开启事务的情况,Producer Client的initTransactions()方法会向TransactionCoordinator发起InitPidRequest ,这样就能获取PID。这里面还有一些细节问题,这里不探讨,例如transaction_id 之前的事务状态什么的。但需要说明的一点是这里会将 transaction_id 与相应的 TransactionMetadata 持久化到事务日志(_transaction_state)中。

(3)开启事务

Producer调用beginTransaction开始一个事务状态,这里只是在客户端将本地事务状态转移成 IN_TRANSACTION,只有在发送第一条信息后,TransactionCoordinator才会认为该事务已经开启。

(4)Consume-Porcess-Produce Loop

这里说的是一个典型的consume-process-produce场景:

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
producer.beginTransaction();
//start
for (ConsumerRecord record : records){
producer.send(producerRecord(“outputTopic1”, record));
producer.send(producerRecord(“outputTopic2”, record));
}
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
//end
producer.commitTransaction();
}

该阶段主要经历以下几个步骤:

  1. AddPartitionsToTxnRequest
  2. ProduceRequest
  3. AddOffsetsToTxnRequest
  4. TxnOffsetsCommitRequest

关于这里的详细介绍可以查看参考链接,或者直接查看官网文档!

(5)提交或者中断事务

Producer 调用 commitTransaction() 或者 abortTransaction() 方法来 commit 或者 abort 这个事务操作。

基本上经历以下三个步骤,才真正结束事务。

  1. EndTxnRequest
  2. WriteTxnMarkerRquest
  3. Writing the Final Commit or Abort Message

其中EndTxnRequest是在Producer发起的请求,其他阶段都是在TransactionCoordinator端发起完成的。WriteTxnMarkerRquest是发送请求到partition的leader上写入事务结果信息(ControlBatch),第三步主要是在_transaction_state中标记事务的结束。

参考

1.Kafka 事务性之幂等性实现

2.Kafka Exactly-Once 之事务性实现

3KIP-98 - Exactly Once Delivery and Transactional Messaging

Kafka 消息生产及消费原理

开篇

关于客户端生产和消费不在本文中探讨,本文主要集中在Kafka服务器端对消息如何存储和如何读取消息。

本文主要探讨如下问题:

  1. 服务器端接收到消息后如何处理?
  2. 如果我指定了一个offset,Kafka怎么查找到对应的消息?
  3. 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?

正文

服务器端接收到消息处理流程

Kafka Server接受消息处理流程

KafkaApis是Kafka server处理所有请求的入口,在 Kafka 中,每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。副本由ReplicaManager管理,对于消息的写入操作在Log与LogSement中进行的。

Log append处理流程

真正的日志写入,还是在 LogSegment 的 append() 方法中完成的,LogSegment 会跟 Kafka 最底层的文件通道、mmap 打交道。数据并不是实时持久化的,mmap只是写入了页缓存,并没有flush进磁盘,当满足if (unflushedMessages >= config.flushInterval) 才会真正写入磁盘。

Consumer 获取消息

指定offset,Kafka怎么查找到对应的消息

  1. 通过文件名前缀数字x找到该绝对offset 对应消息所在文件(log/index)
  2. offset-x为在文件中的相对偏移
  3. 通过相对偏移在index文件找到最近的消息的位置(使用二分查找)
  4. 在log文件从最近位置开始逐条寻找

首先根据offset获取LogSegment,即var segmentEntry = segments.floorEntry(startOffset)segmentEntry 是个抽象的对象,包含log、index,timeindex等对象。

接下来在index中获取position(物理位置),即val startOffsetAndSize = translateOffset(startOffset)

源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
//二分查找index文件得到下标,再根据算法计算最近的position物理位置
val mapping = offsetIndex.lookup(offset)
//根据计算的起始位置开始遍历获取准确的position及size
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
}
return null;
}

这里说明一下index中根据下标计算偏移量地址与物理地址:物理地址=n*8+4,偏移量地址=n*8

1
2
3
4
5
6
7
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)

private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)

override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

最后根据position在log中截取相应的message,log.slice(startPosition, fetchSize)

1
2
3
4
5
6
7
8
public FileRecords slice(int position, int size) throws IOException {
//省略校验代码
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
if (end < 0 || end >= start + sizeInBytes())
end = start + sizeInBytes();
return new FileRecords(file, channel, this.start + position, end, true);
}

看到searchForOffsetWithSize有个疑问,上面代码显示返回给客户端的records是批量的,假如提交的offset是这批次的中间一个,那么返回给Consumer的message是有已经被消费过的信息,我感觉不可能是这样的,查看了server端代码,再未发现删除已消费的message逻辑。

Kafka设计者真的这么蠢??

随后我查看了客户端consumer源码有发现到如下代码:if (record.offset() >= nextFetchOffset)有对大于指定offset消息抛弃的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Record nextFetchedRecord() {
while (true) {
if (records == null || !records.hasNext()) {
//略
} else {
Record record = records.next();
// skip any records out of range
if (record.offset() >= nextFetchOffset) {
// we only do validation when the message should not be skipped.
maybeEnsureValid(record);

// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
}
}
}
}

至此得出结论:为了提高server端的响应速度,没有对批量消息进行解压缩,然后精准返回指定信息,而是在客户端解压消息,然后再抛弃已处理过的message,这样就不会存在重复消费的问题。这个问题纠结了半天,不知是否正确,仅是自己的理解,如果有哪位同学对这里有研究,欢迎指出问题。

指定timestamp,Kafka怎么查找到对应的消息

类似根据offset获取消息,不过中间是从timeindex中获取position,然后遍历对比timestamp,获取相应的消息。

参考

1.Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)

2.Kafka 源码解析之 Server 端如何处理 Fetch 请求(十三)

Kafka 逻辑架构设计

开篇

本文主要探讨如下问题;

  1. Kafka架构设计
  2. Kafka的日志目录结构

正文

Kafka架构设计

kafka为分布式消息系统,由多个broker组成。消息是通过topic来分类的,一个topic下存在多个partition,每个partition又由多个segment构成。

发布订阅者模式

kafka集群架构

主题逻辑结构

Kafka的日志目录结构

开篇

在开始这篇之前,先抛出问题,这章主要通过研究consumer源码解决如下问题:

消费再均衡的原理是什么?

正文

消费再均衡的原理

主要分为四步

1.FIND_COORDINATOR

根据hash(group_id)%consumerOffsetPartitionNum查找出对应的partition,再查找出该partitiom对应的leader所在的broker,即可获得GroupCoordinator

2.JOIN_GROUP

在这一步主要完成消费组leader选举(获取第一个加入的组为leader,如果没有,选择map中的第一个node)和分区分配策略

3.SYNC_GROUP

客户端向GroupCoordinator发起同步请求,获取步骤2的分区分配方案。

4.HEARTBEAT

GroupCoordinator通过心跳来确定从属关系。

参考

kafka consumer 源码分析(二)分区分配策略

开篇

在开始这篇之前,先抛出问题,这章主要通过研究consumer源码解决如下问题:

分区分配策略是什么样的?

正文

分区分配策略

这里的分区分配策略仅仅只讨论consumer,关于producer端的分区不在本次探讨范围内。

consumer端分区分配策略官方有三种:RangeAssignor(默认值),RoundRobinAssignor,StickyAssignor

通常是通过partition.assignment.strategy配置生效。

RangeAssignor

1
2
3
4
5
6
7
8
9
10
11
12
org.apache.kafka.clients.consumer.RangeAssignor#assign

Collections.sort(consumersForTopic);//consumer按字典排序
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic,numPartitionsForTopic);
int i = 0;
for(int n = consumersForTopic.size(); i < n; ++i) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
((List)assignment.get(consumersForTopic.get(i))).addAll(partitions.subList(start, start + length));
}

源码太多,这里只贴出核心代码,需要的按照以上目录去找。

例如

有一个topic:truman partition:10 consumer:consumer0,consumer1,consumer2。那么numPartitionsPerConsumer=10/3=3,consumersWithExtraPartition=10%3=1。分配逻辑为:当i=0,按字典顺序,先分配consumer0,start=0,length=4,即consumer1分配tp0,tp1,tp2,tp3。同理最终方案为:

1
2
3
consumer0->tp0,tp1,tp2,tp3
consumer1->tp4,tp5,tp6
consumer2->tp7,tp8,tp9

从源码我们能得出如何分配的,还能得出一个结论,consumer分配partition和consumerId有关系,字典顺序靠前的,有可能会多分配partition。

为什么这么说?上面的源码其实没有给出,但能看出来consumersForTopic集合是按字典排序的,那么只用知道consumersForTopic是什么样的集合就能知道了,同样在该源码中可以看到put(res, topic, consumerId);,说明该集合是consumerId集合。因此可以得出该结论!

一句话总结该分配策略,就是尽可能的均分

RoundRobinAssignor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
org.apache.kafka.clients.consumer.RoundRobinAssignor
//assigner存放了所有的consumer
CircularIterator<String> assigner = new CircularIterator(Utils.sorted(subscriptions.keySet()));
//所有的consumer订阅的所有的TopicPartition的List
Iterator var9 = this.allPartitionsSorted(partitionsPerTopic, subscriptions).iterator();

while(var9.hasNext()) {
TopicPartition partition = (TopicPartition)var9.next();
String topic = partition.topic();
// 如果当前这个assigner(consumer)没有订阅这个topic,直接跳过
while(!((Subscription)subscriptions.get(assigner.peek())).topics().contains(topic)) {
assigner.next();
}
//跳出循环,表示终于找到了订阅过这个TopicPartition对应的topic的assigner
//将这个partition分派给对应的assigner
((List)assignment.get(assigner.next())).add(partition);
}

CircularIterator是一个封装类,让一个有限大小的list变成一个RoundRobin方式的无限遍历

RoundRobinAssignor与RangeAssignor最大的区别,是进行分区分配的时候不再逐个topic进行,即不是为某个topic完成了分区分派以后,再进行下一个topic的分区分派,而是首先将这个group中的所有consumer订阅的所有的topic-partition按顺序展开,然后,依次对于每一个topic-partition,在consumer进行round robin,为这个topic-partition选择一个consumer。

例如

有两个topic :t0和t1,每个topic都有3个分区,分区编号从0开始,因此展开以后得到的TopicPartition的List是:[t0p0,t0p1,t0p2,t1p0,t1p1,t1p2]

我们有两个consumer C0,C1,他们都同时订阅了这两个topic。

然后,在这个展开的TopicParititon的List开始进行分派:

t0p0 分配给C0
t0p1 分配给C1
t0p2 分配给C0
t1p0 分配给C1
t1p1 分配给C0
t1p1 分配给C1
从以上分配流程,我们可以很清楚地看到RoundRobinAssignor的两个基本特征:

  1. 对所有topic的topic partition求并集
  2. 基于consumer进行RoundRobin轮询

最终分配方案:

1
2
C0->[t0p0, t0p2, t1p1]
C1->[t0p1, t1p0, t1p2]

RoundRobinAssignor与RangeAssignor的重大区别,就是RoundRobinAssignor是在Group中所有consumer所关注的全体topic中进行分派,而RangeAssignor则是依次对每个topic进行分派。

那么什么时候选择RoundRobinAssignor,我们再看如下案例:

假如现在有两个Topic:Ta和Tb ,每个topic都有两个partition,同时,有两个消费者Ca和Cb与Cc,这三个消费者全部订阅了这两个主题,那么,通过两种不同的分区分派算法得到的结果将是:

RangeAssignor:

先对Ta进行处理:会将TaP0分派给Ca,将TaP1分派给Cb

在对Tb进行处理:会将TbP0分派给Ca,将TbP1分派给Cb

最终结果是:

1
2
3
Ca:[TaP0,TbP0]
Cb:[TaP1,TbP1]
Cc:[]

RoundRobinAssignor:

将所有TopicPartition展开,变成:[TaP0,TaP1,TbP0,TbP1]

将TaP0分派给Ca,将TaP1分派给Cb,将TbP0分派给Cc,将TbP1分派给Ca

最终分派结果是:

1
2
3
Ca:[TaP0,TbP1]
Cb:[TaP1]
Cc:[TbP0]

StickyAssignor

这个分配器源码较多,实现也要优于RoundRobinAssignor与RangeAssignor

它的主要特点是分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,均衡优先于上一次分配结果。鉴于这两个目标。

参考

  1. Kafka为Consumer分派分区:RangeAssignor和RoundRobinAssignor
  2. Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor

kafka consumer 源码分析(一)Consumer处理流程

开篇

在开始这篇之前,先抛出问题,这章主要通过研究consumer源码解决如下问题:

  1. consumer处理流程
  2. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

正文

Consumer处理流程

核心组件

ConsumerCoordinator: 消费者的协调者, 管理消费者的协调过程

  • 维持coordinator节点信息(也就是对consumer进行assignment的节点)
  • 维持当前consumerGroup的信息, 当前consumer已进入consumerGroup

Fetcher: 数据请求类

ConsumerNetworkClient: 消费者的网络客户端,负责网络传输的流程

SubscriptionState: 订阅状态类

Metadata: 集群的元数据管理类,使用租约机制

工作流程

消费者提交消费位移源码追究

查看官方API文档,在描述如何手动提交offset,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//此处为offset+1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}

The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed

意思是在调用commitSync(offsets)必须是当前offset+1。


其实我很想知道,自动提交offset方式(包含不指定offset,例如:commitSync()),具体源代码在哪里**+1**。

经过孜孜不倦的翻阅代码,感觉好像看懂了一点点,就把自己看懂的那点写出来,如果不对的话,欢迎看到的同学帮忙纠正一下,当不胜感激!

这里就只探究自动提交offset的情形,因为涉及的代码较多,这里只给出相应的关键代码。

1
2
3
4
5
6
7
8
9
10
public void commitSync(Duration timeout) {
try {
//subscriptions.allConsumed()获取需要提交的offset
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) {
...略
}
} finally {
release();
}
}
1
2
3
4
5
6
7
8
9
//可以看出allConsumed是从state.value().position中获取相应partition的offset
public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {
if (state.value().hasValidPosition())
allConsumed.put(state.topicPartition(), new OffsetAndMetadata(state.value().position));
}
return allConsumed;
}

下来只用查看到什么时候更新state中信息,即可知道提交的offset是如何计算的,首先我们要知道一个知识点,consumer非多线程处理逻辑,因此每次提交offset都是在poll中处理的,因此我们需要查看poll中的逻辑,接着往下看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
|
V
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
|
V
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
|
V
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
//这里就是所谓的offset+1,也就是开头问题的答案!
subscriptions.position(partitionRecords.partition, nextOffset);

这里其实已经看到答案,但是可能有同学还会问,不是说更新state吗?这里更新的是 subscriptions.position,接着往下看

1
2
3
4
5
6
7
8
9
10
public void position(TopicPartition tp, long offset) {
assignedState(tp).position(offset);
}

private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
}

终于一切真相大白!看了两天源码,累的要死!Enjoy!

参考

  1. The committed offset should always be the offset of the next message that your application will read
  2. KafkaConsumer 流程解析

前言

零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。

零拷贝操作减少了在用户空间与内核空间之间切换模式的次数

举例来说,如果要读取一个文件并通过网络发送它,传统方式下如下图,传统的I/O操作进行了4次用户空间与内核空间的上下文切换,以及4次数据拷贝。其中4次数据拷贝中包括了2次DMA拷贝和2次CPU拷贝。通过零拷贝技术完成相同的操作,减少了在用户空间与内核空间交互,并且不需要CPU复制数据。

linux中零拷贝技术

Linux系统的“用户空间”和“内核空间”

从Linux系统上看,除了引导系统的BIN区,整个内存空间主要被分成两个部分:内核空间(Kernel space)、用户空间(User space)。

“用户空间”和“内核空间”的空间、操作权限以及作用都是不一样的。

内核空间是Linux自身使用的内存空间,主要提供给程序调度、内存分配、连接硬件资源等程序逻辑使用;用户空间则是提供给各个进程的主要空间。

用户空间不具有访问内核空间资源的权限,因此如果应用程序需要使用到内核空间的资源,则需要通过系统调用来完成:从用户空间切换到内核空间,然后在完成相关操作后再从内核空间切换回用户空间。

Linux 中零拷贝技术的实现方向

  • 直接 I/O:对于这种数据传输方式来说,应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输。这种方式依旧存在用户空间和内核空间的上下文切换,但是硬件上的数据不会拷贝一份到内核空间,而是直接拷贝至了用户空间,因此直接I/O不存在内核空间缓冲区和用户空间缓冲区之间的数据拷贝。

  • 在数据传输过程中,避免数据在用户空间缓冲区和系统内核空间缓冲区之间的CPU拷贝,以及数据在系统内核空间内的CPU拷贝。本文主要讨论的就是该方式下的零拷贝机制。

  • copy-on-write(写时复制技术):在某些情况下,Linux操作系统的内核空间缓冲区可能被多个应用程序所共享,操作系统有可能会将用户空间缓冲区地址映射到内核空间缓存区中。当应用程序需要对共享的数据进行修改的时候,才需要真正地拷贝数据到应用程序的用户空间缓冲区中,并且对自己用户空间的缓冲区的数据进行修改不会影响到其他共享数据的应用程序。所以,如果应用程序不需要对数据进行任何修改的话,就不会存在数据从系统内核空间缓冲区拷贝到用户空间缓冲区的操作。

mmap

1
2
buf = mmap(diskfd, len);
write(sockfd, buf, len);

应用程序调用mmap(),磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去。

使用mmap替代read很明显减少了一次拷贝,当拷贝数据量很大时,无疑提升了效率。但是使用mmap是有代价的。当你使用mmap时,你可能会遇到一些隐藏的陷阱。例如,当你的程序map了一个文件,但是当这个文件被另一个进程截断(truncate)时, write系统调用会因为访问非法地址而被SIGBUS信号终止。SIGBUS信号默认会杀死你的进程并产生一个coredump,如果你的服务器这样被中止了,那会产生一笔损失。

当然也有办法解决,本文忽略解决办法。

sendfile

1
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

sendfile实现的零拷贝I/O只使用了2次用户空间与内核空间的上下文切换,以及3次数据的拷贝。其中3次数据拷贝中包括了2次DMA拷贝和1次CPU拷贝。

splice

sendfile只适用于将数据从文件拷贝到套接字上,限定了它的使用范围。Linux在2.6.17版本引入splice系统调用,用于在两个文件描述符中移动数据:

1
2
3
#define _GNU_SOURCE         /* See feature_test_macros(7) */
#include <fcntl.h>
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);

java中如何使用零拷贝

DMA简介

io读写的方式:中断,DMA

DMA 直接内存存取,是允许外设组件将I/O数据直接传送到主存储器中并且传输不需要CPU的参与,以此将CPU解放出来去完成其他的事情。,相较于中断方式,减少cpu中断次数,不用cpu拷贝数据。

主要流程如下:

  1. 用户进程发起数据读取请求
  2. 系统调度为该进程分配cpu
  3. cpu向DMA发送io请求
  4. 用户进程等待io完成,让出cpu
  5. 系统调度cpu执行其他任务
  6. 数据写入至io控制器的缓冲寄存器
  7. DMA不断获取缓冲寄存器中的数据(需要cpu时钟)
  8. 传输至内存(需要cpu时钟)
  9. 所需的全部数据获取完毕后向cpu发出中断信号

更多参考

参考

  1. 浅析Linux中的零拷贝技术
  2. 零复制
  3. 零拷贝原理-数据的收发-软中断和DMA
  4. 浅谈 Linux下的零拷贝机制

Kafka producer源码分析

前言

在开始文章之前,需要解释是一下为什么要研究producer源码。

为什么要研究producer源码

通常producer使用都很简单,初始化一个KafkaProducer实例,然后调用send方法就好,但是我们有了解后面是如何发送到kafka集群的吗?其实我们不知道,其次,到底客户端有几个线程?我们不知道。还有producer还能做什么?我们同样不知道。本篇文章就是想回答一下上面提出的几个问题,能力有限,如有错误,欢迎指出!

架构

在介绍客户端架构之前,先回答一个问题

producer到底存在几个线程?2个 Main threadsender,其中sender线程负责发送消息,main 线程负责interceptor、序列化、分区等其他操作。

  • Producer首先使用用户主线程将待发送的消息封装进一个ProducerRecord类实例中。
  • 进行interceptor、序列化、分区等其他操作,发送到Producer程序中RecordAccumulator中。
  • Producer的另一个工作线程(即Sender线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次的内,统一发送给对应的broker中。

消息发送源码分析

sender发送的时机是由两个指标决定的,一个是时间linger.ms,一个是数据量大小 batch.size

sender线程主要代码

run->run(long now)->sendProducerData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
//result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// 暂且只关心result.nextReadyCheckDelayMs
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);

pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}

1
2
3
long pollTimeout = sendProducerData(now);
// poll最终会调用selector,pollTimeout也就是selector阻塞的时间
client.poll(pollTimeout, now);

selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Check for data, waiting up to the given timeout.
*
* @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
* @return The number of keys ready
*/
private int select(long timeoutMs) throws IOException {
if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");

if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
// 初始化为最大值
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 和linger.ms有关
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

我们可以从实例化一个新的KafkaProducer开始分析(还没有调用send方法),在sender线程调用accumulator#ready(..)时候,会返回result,其中包含selector可能要阻塞的时间。由于还没有调用send方法,所以Deque为空,所以result中包含的nextReadyCheckDelayMs也是最大值,这个时候selector会一直阻塞。

然后我们调用send方法,该方法会调用waitOnMetadata,waitOnMetadata会调用sender.wakeup(),所以会唤醒sender线程。

这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程又运行到accumulator#ready(..),当Deque有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有关的值。也就是时候selector会最多阻塞lingger.ms后就返回,然后再次轮询。(根据源码分析,第一条消息会创建batch,因此newBatchCreated为true,同样会触发唤醒sender)

如果有一个ProducerBatch满了,也会调用Sender#wakeup(..),

KafkaProducer#doSend(…)

1
2
3
4
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}

所以综上所述:只要满足linger.ms和batch.size满了就会激活sender线程来发送消息。

客户端顺序性保证

因为消息发送是批量发送的,那么就有可能存在上一批发送失败,接下来一批发送成功。即会出现数据乱序。

max.in.flight.requests.per.connection=1,限制客户端在单个连接上能够发送的未响应请求的个数。如果业务对顺序性有很高的要求,将该参数值设置为1。

总结

kafka producer大概流程如上,数据发送是批量的,最后是按Node发送响应的消息的。即其中Integer即为broker id。消息在RecordAccumulator中还是按TopicPartition存放的,但是在最终发送会作相应的转换。

sender发送的时机是由两个指标决定的,一个是时间linger.ms,一个是数据量大小 batch.size

了解producer发送流程,这样就可以让我们更改的生产消息,使用更多的特性,例如拦截器可以做各种特殊处理的场景。

参考

1. kafka producer的batch.size和linger.ms

前言

为了深入研究kafka运行原理和架构,很有必要搭建开发环境,这样我们就很方便的debug 服务。

前期准备

  1. Gradle 5.0+
  2. jdk8
  3. Scala 2.12
  4. idea scale plugin

配置/运行

  • 首先执行在源码目录下执行gradle
  • 然后build ./gradlew jar
  • 最后生成idea工程./gradlew idea

运行MirrorMaker

运行kafka server

因为kafka依赖zookeeper,在开始之前请先启动一个zookeeper服务,本文章略。

错误总结

无log

下载log4j
slf4j-log4j12

然后在modules中找到core/main,将这两个依赖导入进来即可

最后将config目录下的log4j.properties复制到core/main/scala目录下(如果还是无法看到log,可以将该文件复制到out/production/classes目录下)

执行报错,无法找到相关类

执行./gradlew jar即可解决。

Core源码模块解读

模块名程说明
adminkafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
api主要负责数据交互,客户端与服务端交互数据的编码与解码。
cluster这里包含多个实体类,有Broker,Cluster,Partition,Replica。其中一个Cluster由多个Broker组成,一个Broker包含多个Partition,一个Topic的所有Partition分布在不同的Broker中,一个Replica包含都个Partition。
common这是一个通用模块,其只包含各种异常类以及错误验证。
consumer消费者处理模块,负责所有的客户端消费者数据和逻辑处理。
controller此模块负责中央控制器的选举,分区的Leader选举,Replica的分配或其重新分配,分区和副本的扩容等。
coordinator负责管理部分consumer group和他们的offset。
log这是一个负责Kafka文件存储模块,负责读写所有的Kafka的Topic消息数据。
message封装多条数据组成一个数据集或者压缩数据集。
metrics负责内部状态的监控模块。
network该模块负责处理和接收客户端连接,处理网络时间模块。
security负责Kafka的安全验证和管理模块。
serializer序列化和反序列化当前消息内容。
server 该模块涉及的内容较多,有Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader的选举,Admin和Replica的管理,以及各种元数据的缓存等内容。
tools阅读该模块,就是一个工具模块,涉及的内容也比较多。有导出对应consumer的offset值;导出LogSegments信息,以及当前Topic的log写的Location信息;导出Zookeeper上的offset值等内容。
utils各种工具类,比如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类,Mx4jLoader监控加载器,ReplicationUtils复制集工具类,CommandLineUtils命令行工具类,以及公共日志类等内容。

参考

  1. Apache Kafka
  2. 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置
0%