Kafka入门与核心设计

环境搭建与快速入门

Kafka搭建

在开始前,首先准备好自己的docker环境,可以安装Docker Desktop.

为了更方便的搭建学习环境,推荐使用confluent公司社区版docker镜像。

对于confluent版本和apache版本对照表如下:

Confluent Platform and Apache Kafka Compatibility

Confluent Platform Apache Kafka® Release Date Standard End of Support Platinum End of Support
7.0.x 3.0.x October 27, 2021 October 27, 2023 October 27, 2024
6.2.x 2.8.x June 8, 2021 June 8, 2023 June 8, 2024
6.1.x 2.7.x February 9, 2021 February 9, 2023 February 9, 2024
6.0.x 2.6.x September 24, 2020 September 24, 2022 September 24, 2023
5.5.x 2.5.x April 24, 2020 April 24, 2022 April 24, 2023
5.4.x 2.4.x January 10, 2020 January 10, 2022 January 10, 2023
5.3.x 2.3.x July 19, 2019 July 19, 2021 July 19, 2022
5.2.x 2.2.x March 28, 2019 March 28, 2021 March 28, 2022
5.1.x 2.1.x December 14, 2018 December 14, 2020 December 14, 2021
5.0.x 2.0.x July 31, 2018 July 31, 2020 July 31, 2021
4.1.x 1.1.x April 16, 2018 April 16, 2020 April 16, 2021
4.0.x 1.0.x November 28, 2017 November 28, 2019 November 28, 2020
3.3.x 0.11.0.x August 1, 2017 August 1, 2019 August 1, 2020
3.2.x 0.10.2.x March 2, 2017 March 2, 2019 March 2, 2020
3.1.x 0.10.1.x November 15, 2016 November 15, 2018 November 15, 2019
3.0.x 0.10.0.x May 24, 2016 May 24, 2018 May 24, 2019
2.0.x 0.9.0.x December 7, 2015 December 7, 2017 December 7, 2018
1.0.0 February 25, 2015 February 25, 2017 February 25, 2018

首先新建文件docker-compose.yml,并写入如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:7.0.1
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

然后在该文件目录下执行docker-compose up -d,既可启动zookeeper容器和kafka broker容器,docker-compose down可以关闭创建好的集群。

1
2
3
4
PS F:\docker-workspace\kafka> docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
aac9044963e5 confluentinc/cp-kafka:7.0.1 "/etc/confluent/dock…" 11 seconds ago Up 10 seconds 0.0.0.0:9092->9092/tcp broker
debcc3a0235d confluentinc/cp-zookeeper:7.0.1 "/etc/confluent/dock…" 13 seconds ago Up 12 seconds 2181/tcp, 2888/tcp, 3888/tcp zookeeper

如上情况,即为创建成功。

Quick Start

创建Topic

默认自动创建Topic配置是启用的,也就是说第一步也可以省略,只要有Producer或者Consumer,就会自动创建Topic的。

1
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic trumandu

查看Topic信息

1
2
3
PS F:\docker-workspace\kafka> docker exec broker kafka-topics --describe --bootstrap-server broker:9092  --topic trumandu
Topic: trumandu TopicId: 8peszWGBTgulbSZPYHiMzQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: trumandu Partition: 0 Leader: 1 Replicas: 1 Isr: 1

生产消息

执行

1
docker exec --interactive --tty broker kafka-console-producer --bootstrap-server broker:9092 --topic trumandu

然后输入测试信息,完成后执行Ctrl-D可以退出命令行。

消费消息

1
2
3
4
PS F:\docker-workspace\kafka> docker exec --interactive --tty broker kafka-console-consumer --bootstrap-server broker:9092  --topic trumandu --from-beginning

hello
truman

执行Ctrl-C可以退出命令行

以上例子中kafka-topics kafka-console-producerkafka-console-consumer具体参数和用法详见文档

Kafka核心设计

架构设计

kafka为分布式消息系统,由多个broker组成。消息是通过topic来分类的,一个topic下存在多个partition,每个partition又由多个segment构成。

架构图

在2.8版本之前,元数据都是存储在zookeeper,暂且使用老的架构描述。

架构图-image-20220305165222945

Topic逻辑结构

Topic逻辑结构

日志目录结构

数据可靠性设计

当Producer向Leader发送数据时,可以通过request.required.acks参数设置数据可靠性的级别

  1. 0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;

  2. 1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据

  3. -1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证

仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失。要保证数据不丢除了设置acks=-1, 还要保 证ISR的大小大于等于2,具体参数设置:

(1).request.required.acks:设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
(2).min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica

Producer要在吞吐率和数据可靠性之间做一个权衡

数据一致性设计

一致性定义:若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到

  1. HighWaterMark

简称HW: Partition的高水位,取一个partition对应的ISR中最小的LEO(LogEndOffset)作为HW,消费者最多只能消费到HW所在的位置,另外每个replica都有HW,leader和follower各自负责更新自己的HW状态,HW<= leader. LEO

  1. 对于Leader新写入的msg

Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置

这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取。对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)

高性能设计

可以从两个方面来分析:客户端和服务器端。除了Kafka服务器端优秀设计之外,端到端批量压缩绝对算的上Kafka高性能设计的秘密武器。

image-20220309212827871

参考

  1. Apache Kafka® Quick Start
  2. Apache Kafka DESIGN