前言

写这篇文章的目的是为了记录一下学习笔记,其次为了能够在复习的时候快速掌握相关知识。本篇记录java8系列专栏之Date Time API

正文

Clock

详解

可以取代System.currentTimeMillis(),时区敏感,带有时区信息

用法
1
2
3
4
5
Clock clock = Clock.systemDefaultZone();
long millis = clock.millis();

Instant instant = clock.instant();
Date legacyDate = Date.from(instant); // legacy java.util.Date

ZoneId

详解

新的时区类 java.time.ZoneId 是原有的 java.util.TimeZone 类的替代品。 ZoneId对象可以通过 ZoneId.of() 方法创建,也可以通过 ZoneId.systemDefault() 获取系统默认时区。

用法
1
2
3
4
5
System.out.println(ZoneId.getAvailableZoneIds());
// prints all available timezone ids

ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
// ZoneRules[currentStandardOffset=+08:00]

有了 ZoneId,我们就可以将一个 LocalDate、LocalTime 或 LocalDateTime 对象转化为 ZonedDateTime 对象

1
2
LocalDateTime localDateTime = LocalDateTime.now();
ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, shanghaiZoneId);

ZonedDateTime 对象由两部分构成,LocalDateTime 和 ZoneId,其中 2018-03-03T15:26:56.147 部分为 LocalDateTime,+08:00[Asia/Shanghai] 部分为ZoneId。

LocalTime

详解

LocalTime类是Java 8中日期时间功能里表示一整天中某个时间点的类,它的时间是无时区属性的(早上10点等等)

用法
1
2
3
4
5
6
7
8
9
10
LocalTime now1 = LocalTime.now(zone1);
LocalTime now2 = LocalTime.now(zone2);

System.out.println(now1.isBefore(now2)); // false

long hoursBetween = ChronoUnit.HOURS.between(now1, now2);
long minutesBetween = ChronoUnit.MINUTES.between(now1, now2);

System.out.println(hoursBetween); // -3
System.out.println(minutesBetween); // -239

LocalDate

详解

LocalDate类是Java 8中日期时间功能里表示一个本地日期的类,它的日期是无时区属性的。 可以用来表示生日、节假日期等等。这个类用于表示一个确切的日期,而不是这个日期所在的时间

用法
1
2
3
4
5
6
7
LocalDate today = LocalDate.now();
LocalDate tomorrow = today.plus(1, ChronoUnit.DAYS);
LocalDate yesterday = tomorrow.minusDays(2);

LocalDate independenceDay = LocalDate.of(2014, Month.JULY, 4);
DayOfWeek dayOfWeek = independenceDay.getDayOfWeek();
System.out.println(dayOfWeek); // FRIDAY

LocalDateTime

详解

LocalDateTime类是Java 8中日期时间功能里,用于表示当地的日期与时间的类,它的值是无时区属性的。你可以将其视为Java 8中LocalDate与LocalTime两个类的结合。

用法
1
2
3
4
5
6
7
8
9
10
LocalDateTime sylvester = LocalDateTime.of(2014, Month.DECEMBER, 31, 23, 59, 59);

DayOfWeek dayOfWeek = sylvester.getDayOfWeek();
System.out.println(dayOfWeek); // WEDNESDAY

Month month = sylvester.getMonth();
System.out.println(month); // DECEMBER

long minuteOfDay = sylvester.getLong(ChronoField.MINUTE_OF_DAY);
System.out.println(minuteOfDay); // 1439

ZonedDateTime

详解

ZonedDateTime类是Java 8中日期时间功能里,用于表示带时区的日期与时间信息的类。可以用于表示一个真实事件的开始时间,如某火箭升空时间等等。

用法
1
2
ZonedDateTime dateTime = ZonedDateTime.now();
ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now(), shanghaiZoneId);

Duration

详解

一个Duration对象表示两个Instant间的一段时间

用法
1
2
3
4
Instant first = Instant.now();
// wait some time while something happens
Instant second = Instant.now();
Duration duration = Duration.between(first, second);

DateTimeFormatter

详解

DateTimeFormatter类是Java 8中日期时间功能里,线程安全。用于解析和格式化日期时间的类
。类中包含如下预定义的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
BASIC_ISO_DATE

ISO_LOCAL_DATE
ISO_LOCAL_TIME
ISO_LOCAL_DATE_TIME

ISO_OFFSET_DATE
ISO_OFFSET_TIME
ISO_OFFSET_DATE_TIME

ISO_ZONED_DATE_TIME

ISO_INSTANT

ISO_DATE
ISO_TIME
ISO_DATE_TIME

ISO_ORDINAL_TIME
ISO_WEEK_DATE

RFC_1123_DATE_TIME
用法
1
2
3
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
//DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedDate = formatter.format(LocalDateTime.now());

参考

  1. java8-tutorial#date-api
  2. java8-datetime-api

[toc]

声明

本文问题参考朱小厮的博客。写这个只是为了检验自己最近的学习成果,因为是自己的理解,如果问题,欢迎指出。废话少说,上干货。

正文

1. Kafka的用途有哪些?使用场景如何?

总结下来就几个字:异步处理、日常系统解耦、削峰、提速、广播

如果再说具体一点例如:消息,网站活动追踪,监测指标,日志聚合,流处理,事件采集,提交日志等

2. Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么

ISR:In-Sync Replicas 副本同步队列

AR:Assigned Replicas 所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

3. Kafka中的HW、LEO、LSO、LW等分别代表什么?

HW:High Watermark 高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置上一条信息。

LEO:LogEndOffset 当前日志文件中下一条待写信息的offset

HW/LEO这两个都是指最后一条的下一条的位置而不是指最后一条的位置。

LSO:Last Stable Offset 对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同

LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

4. Kafka中是怎么体现消息顺序性的?

kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.

5. Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

拦截器->序列化器->分区器

6. Kafka生产者客户端的整体结构是什么样子的?

7. Kafka生产者客户端中使用了几个线程来处理?分别是什么?

2个,主线程和Sender线程。主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。Sender线程负责将RecordAccumulator中消息发送到kafka中.

9. Kafka的旧版Scala的消费者客户端的设计有什么缺陷?

10. “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果不正确,那么有没有什么hack的手段?

不正确,通过自定义分区分配策略,可以将一个consumer指定消费所有partition。

11. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

offset+1

12. 有哪些情形会造成重复消费?

消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)

13. 那些情景下会造成消息漏消费?

消费者没有处理完消息 提交offset(自动提交偏移 未处理情况下程序异常结束)

14. KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

1.在每个线程中新建一个KafkaConsumer

2.单线程创建KafkaConsumer,多个处理线程处理消息(难点在于是否要考虑消息顺序性,offset的提交方式)

15. 简述消费者与消费组之间的关系

消费者从属与消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。

16. 当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?

创建:在zk上/brokers/topics/下节点 kafkabroker会监听节点变化创建主题
删除:调用脚本删除topic会在zk上将topic设置待删除标志,kafka后台有定时的线程会扫描所有需要删除的topic进行删除

17. topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

可以

18. topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

不可以

19. 创建topic时如何选择合适的分区数?

根据集群的机器数量和需要的吞吐量来决定适合的分区数

20. Kafka目前有那些内部topic,它们都有什么特征?各自的作用又是什么?

__consumer_offsets 以下划线开头,保存消费组的偏移

21. 优先副本是什么?它有什么特殊的作用?

优先副本 会是默认的leader副本 发生leader变化时重选举会优先选择优先副本作为leader

22. Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理

创建主题时
如果不手动指定分配方式 有两种分配方式

消费组内分配

23. 简述Kafka的日志目录结构

每个partition一个文件夹,包含四类文件.index .log .timeindex leader-epoch-checkpoint
.index .log .timeindex 三个文件成对出现 前缀为上一个segment的最后一个消息的偏移 log文件中保存了所有的消息 index文件中保存了稀疏的相对偏移的索引 timeindex保存的则是时间索引
leader-epoch-checkpoint中保存了每一任leader开始写入消息时的offset 会定时更新
follower被选为leader时会根据这个确定哪些消息可用

24. Kafka中有那些索引文件?

如上

25. 如果我指定了一个offset,Kafka怎么查找到对应的消息?

1.通过文件名前缀数字x找到该绝对offset 对应消息所在文件

2.offset-x为在文件中的相对偏移

3.通过index文件中记录的索引找到最近的消息的位置

4.从最近位置开始逐条寻找

26. 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?

原理同上 但是时间的因为消息体中不带有时间戳 所以不精确

27. 聊一聊你对Kafka的Log Retention的理解

kafka留存策略包括 删除和压缩两种
删除: 根据时间和大小两个方式进行删除 大小是整个partition日志文件的大小
超过的会从老到新依次删除 时间指日志文件中的最大时间戳而非文件的最后修改时间
压缩: 相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续

28. 聊一聊你对Kafka的Log Compaction的理解

29. 聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)

30. 聊一聊Kafka的延时操作的原理

31. 聊一聊Kafka控制器的作用

32. 消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)

33. Kafka中的幂等是怎么实现的

pid+序号实现,单个producer内幂等

33. Kafka中的事务是怎么实现的(这题我去面试6家被问4次,照着答案念也要念十几分钟,面试官简直凑不要脸。实在记不住的话…只要简历上不写精通Kafka一般不会问到,我简历上写的是“熟悉Kafka,了解RabbitMQ….”)

34. Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

35. 失效副本是指什么?有那些应对措施?

36. 多副本下,各个副本中的HW和LEO的演变过程

37. 为什么Kafka不支持读写分离?

38. Kafka在可靠性方面做了哪些改进?(HW, LeaderEpoch)

39. Kafka中怎么实现死信队列和重试队列?

40. Kafka中的延迟队列怎么实现(这题被问的比事务那题还要多!!!听说你会Kafka,那你说说延迟队列怎么实现?)

41. Kafka中怎么做消息审计?

42. Kafka中怎么做消息轨迹?

43. Kafka中有那些配置参数比较有意思?聊一聊你的看法

44. Kafka中有那些命名比较有意思?聊一聊你的看法

45. Kafka有哪些指标需要着重关注?

生产者关注MessagesInPerSec、BytesOutPerSec、BytesInPerSec 消费者关注消费延迟Lag

46. 怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)

参考 如何监控kafka消费Lag情况

47. Kafka的那些设计让它有如此高的性能?

零拷贝,页缓存,顺序写

48. Kafka有什么优缺点?

49. 还用过什么同质类的其它产品,与Kafka相比有什么优缺点?

50. 为什么选择Kafka?

吞吐量高,大数据消息系统唯一选择。

51. 在使用Kafka的过程中遇到过什么困难?怎么解决的?

52. 怎么样才能确保Kafka极大程度上的可靠性?

53. 聊一聊你对Kafka生态的理解

confluent全家桶(connect/kafka stream/ksql/center/rest proxy等),开源监控管理工具kafka-manager,kmanager等

参考

  1. Kafka面试题全套整理 | 划重点要考!
  2. Kafka科普系列 | 什么是LSO?
  3. Kafka面试题

前言

为什么会有这个需求?

kafka consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息Lag。目前是市面上也有相应的监控工具KafkaOffsetMonitor,我们自己也写了一套监控KafkaCenter。但是随着kafka版本的升级,消费方式也发生了很大的变化,因此,我们需要重构一下kafka offset监控。

正文

如何计算Lag

在计算Lag之前先普及几个基本常识

LEO(LogEndOffset): 这里说的和官网说的LEO有点区别,主要是指堆consumer可见的offset.即HW(High Watermark)

CURRENT-OFFSET: consumer消费到的具体位移

知道以上信息后,可知Lag=LEO-CURRENT-OFFSET。计算出来的值即为消费延迟情况。

官方查看方式

这里说的官方查看方式是在官网文档中提到的,使用官方包里提供的bin/kafka-consumer-groups.sh

最新版的工具只能获取到通过broker消费的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.0.101:8092 --group test
Consumer group 'test' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
truman_test_offset 2 1325 2361 1036 - - -
truman_test_offset 6 1265 2289 1024 - - -
truman_test_offset 4 1245 2243 998 - - -
truman_test_offset 9 1310 2307 997 - - -
truman_test_offset 1 1259 2257 998 - - -
truman_test_offset 8 1410 2438 1028 - - -
truman_test_offset 3 1225 2167 942 - - -
truman_test_offset 0 1218 2192 974 - - -
truman_test_offset 5 1262 2252 990 - - -
truman_test_offset 7 1265 2277 1012 - - -

程序查询方式

使用程序查询方式,有什么好处,可以实现自己的offset监控,可以无缝接入任何平台系统。

既然是用程序实现,那么做个更高级的需求,++根据topic获取不同消费组的消费情况++。

先定义两个实体

TopicConsumerGroupState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TopicConsumerGroupState {
private String groupId;
/**
* 消费方式:zk/broker 主要指的offset提交到哪里 新版本 broker 旧版本zk
*/
private String consumerMethod;
private List<PartitionAssignmentState> partitionAssignmentStates;
/**
* Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response:
* UNKNOWN_MEMBER_ID Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
* PreparingRebalance:组准备开启新的rebalance,等待成员加入 AwaitingSync:正在等待leader
* consumer将分配方案传给各个成员 Stable:rebalance完成!可以开始消费了~
*/
private ConsumerGroupState consumerGroupState;

//省略set和get方法..

}

PartitionAssignmentState

1
2
3
4
5
6
7
8
9
10
11
12
13
public class PartitionAssignmentState {
private String group;
private String topic;
private int partition;
private long offset;
private long lag;
private String consumerId;
private String host;
private String clientId;
private long logEndOffset;
//省略set和get方法..
}

broker消费方式 offset 获取

实现思路

  1. 根据topic 获取消费该topic的group
  2. 通过使用KafkaAdminClient的describeConsumerGroups读取broker上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition,host等
  3. 通过consumer获取LogEndOffset(可见offset)
  4. 将2与3处信息合并,计算Lag

代码设计

引入最新版依赖,使用KafkaAdminClient获取1,2处信息

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
  1. 步骤1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public Set<String> listConsumerGroups(String topic)
    throws InterruptedException, ExecutionException, TimeoutException {
    final Set<String> filteredGroups = new HashSet<>();

    Set<String> allGroups = this.adminClient.listConsumerGroups().all().get(30, TimeUnit.SECONDS).stream()
    .map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
    allGroups.forEach(groupId -> {
    try {
    adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get().keySet().stream()
    .filter(tp -> tp.topic().equals(topic)).forEach(tp -> filteredGroups.add(groupId));
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    });
    return filteredGroups;
    }
  2. 步骤2

使用describeConsumerGroup获取的消费情况,其中包含有members和无members情况。正常订阅消费是可以获取到members,但是通过制定patition消费拿不到members.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
* 根据topic 获取offset,该结果是不同group 组不同patition的当前消费offset
*
* @param topic
* @return Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>>
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>> listConsumerGroupOffsets(String topic)
throws InterruptedException, ExecutionException, TimeoutException {
Set<String> groupIds = this.listConsumerGroups(topic);
Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets = new HashMap<>();
groupIds.forEach(groupId -> {
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets = new HashSet<>();
try {
consumerPatitionOffsets = this.adminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get(30, TimeUnit.SECONDS).entrySet().stream()
.filter(entry -> topic.equalsIgnoreCase(entry.getKey().topic())).collect(Collectors.toSet());
;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
consumerGroupOffsets.put(groupId, consumerPatitionOffsets);
});
return consumerGroupOffsets;
}

/**
* 根据topic 获取topicConsumerGroupStates,其中未包含lag/logEndOffset(consumer可见offset)
*
* @param topic
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public List<TopicConsumerGroupState> describeConsumerGroups(String topic)
throws InterruptedException, ExecutionException, TimeoutException {
final List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
Set<String> groupIds = this.listConsumerGroups(topic);
Map<String, ConsumerGroupDescription> groupDetails = this.adminClient.describeConsumerGroups(groupIds).all()
.get(30, TimeUnit.SECONDS);

Map<String, Set<Entry<TopicPartition, OffsetAndMetadata>>> consumerPatitionOffsetMap = this
.listConsumerGroupOffsets(topic);

groupDetails.entrySet().forEach(entry -> {
String groupId = entry.getKey();
ConsumerGroupDescription description = entry.getValue();

TopicConsumerGroupState topicConsumerGroupState = new TopicConsumerGroupState();
topicConsumerGroupState.setGroupId(groupId);
topicConsumerGroupState.setConsumerMethod("broker");
topicConsumerGroupState.setConsumerGroupState(description.state());
// 获取group下不同patition消费offset信息
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets = consumerPatitionOffsetMap
.get(groupId);
List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();

if (!description.members().isEmpty()) {
// 获取存在consumer(memeber存在的情况)
partitionAssignmentStates = this.withMembers(consumerPatitionOffsets, topic, groupId, description);
} else {
// 获取不存在consumer
partitionAssignmentStates = this.withNoMembers(consumerPatitionOffsets, topic, groupId);
}
topicConsumerGroupState.setPartitionAssignmentStates(partitionAssignmentStates);
topicConsumerGroupStates.add(topicConsumerGroupState);
});

return topicConsumerGroupStates;
}

private List<PartitionAssignmentState> withMembers(
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets, String topic, String groupId,
ConsumerGroupDescription description) {
List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();
Map<Integer, Long> consumerPatitionOffsetMap = new HashMap<>();
consumerPatitionOffsets.forEach(entryInfo -> {
TopicPartition topicPartition = entryInfo.getKey();
OffsetAndMetadata offsetAndMetadata = entryInfo.getValue();
consumerPatitionOffsetMap.put(topicPartition.partition(), offsetAndMetadata.offset());
});
description.members().forEach(memberDescription -> {
memberDescription.assignment().topicPartitions().forEach(topicPation -> {
PartitionAssignmentState partitionAssignmentState = new PartitionAssignmentState();
partitionAssignmentState.setPartition(topicPation.partition());
partitionAssignmentState.setTopic(topic);
partitionAssignmentState.setClientId(memberDescription.clientId());
partitionAssignmentState.setGroup(groupId);
partitionAssignmentState.setConsumerId(memberDescription.consumerId());
partitionAssignmentState.setHost(memberDescription.host());
partitionAssignmentState.setOffset(consumerPatitionOffsetMap.get(topicPation.partition()));
partitionAssignmentStates.add(partitionAssignmentState);
});
});
return partitionAssignmentStates;
}

private List<PartitionAssignmentState> withNoMembers(
Set<Entry<TopicPartition, OffsetAndMetadata>> consumerPatitionOffsets, String topic, String groupId) {
List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();
consumerPatitionOffsets.forEach(entryInfo -> {
TopicPartition topicPartition = entryInfo.getKey();
OffsetAndMetadata offsetAndMetadata = entryInfo.getValue();
PartitionAssignmentState partitionAssignmentState = new PartitionAssignmentState();
partitionAssignmentState.setPartition(topicPartition.partition());
partitionAssignmentState.setTopic(topic);
partitionAssignmentState.setGroup(groupId);
partitionAssignmentState.setOffset(offsetAndMetadata.offset());
partitionAssignmentStates.add(partitionAssignmentState);
});
return partitionAssignmentStates;
}
  1. 步骤3
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public Map<TopicPartition, Long>  consumerOffset (String gorupName,String topicName) {
    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1.101:9092");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, gorupName);
    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    @SuppressWarnings("resource")
    KafkaConsumers<String, String> consumer = new KafkaConsumers<>(consumerProps);
    KafkaConsumer<String, String> kafkaConsumer = consumer.subscribe(topicName);
    List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
    List<TopicPartition>topicPatitions = new ArrayList<>();
    patitions.forEach(patition->{
    TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
    topicPatitions.add(topicPartition);
    });
    Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);
    return result;
    }
  2. 步骤4
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    private List<TopicConsumerGroupState> getBrokerConsumerOffsets(String clusterID, String topic) {
    List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
    try {
    topicConsumerGroupStates = this.describeConsumerGroups(topic);
    // 填充lag/logEndOffset
    topicConsumerGroupStates.forEach(topicConsumerGroupState -> {
    String groupId = topicConsumerGroupState.getGroupId();
    List<PartitionAssignmentState> partitionAssignmentStates = topicConsumerGroupState
    .getPartitionAssignmentStates();
    Map<TopicPartition, Long> offsetsMap = this.consumerOffset(clusterID, groupId, topic);
    for (Entry<TopicPartition, Long> entry : offsetsMap.entrySet()) {
    long logEndOffset = entry.getValue();
    for (PartitionAssignmentState partitionAssignmentState : partitionAssignmentStates) {
    if (partitionAssignmentState.getPartition() == entry.getKey().partition()) {
    partitionAssignmentState.setLogEndOffset(logEndOffset);
    partitionAssignmentState.setLag(getLag(partitionAssignmentState.getOffset(), logEndOffset));
    }
    }
    }
    });
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    return topicConsumerGroupStates;
    }

zookeeper消费方式 offset 获取

实现思路

  1. 根据topic 获取消费该topic的group
  2. 读取zookeeper上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition。
  3. 通过consumer获取LogEndOffset(可见offset)
  4. 将2与3处信息合并,计算Lag

代码设计

引入两个依赖,主要是为了读取zookeeper节点上的数据

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>
  1. 步骤1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public Set<String> listTopicGroups(String topic) {
    Set<String> groups = new HashSet<>();
    List<String> allGroups = zkClient.getChildren("/consumers");
    allGroups.forEach(group -> {
    if (zkClient.exists("/consumers/" + group + "/offsets")) {
    Set<String> offsets = new HashSet<>(zkClient.getChildren("/consumers/" + group + "/offsets"));
    if (offsets.contains(topic)) {
    groups.add(group);
    }
    }
    });
    return groups;
    }
  2. 步骤2
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public Map<String, Map<String, String>> getZKConsumerOffsets(String groupId, String topic) {
    Map<String, Map<String, String>> result = new HashMap<>();
    String offsetsPath = "/consumers/" + groupId + "/offsets/" + topic;
    if (zkClient.exists(offsetsPath)) {
    List<String> offsets = zkClient.getChildren(offsetsPath);
    offsets.forEach(patition -> {
    try {
    String offset = zkClient.readData(offsetsPath + "/" + patition, true);
    if (offset != null) {
    Map<String, String> map = new HashMap<>();
    map.put("offset", offset);
    result.put(patition, map);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    });
    }
    String ownersPath = "/consumers/" + groupId + "/owners/" + topic;
    if (zkClient.exists(ownersPath)) {
    List<String> owners = zkClient.getChildren(ownersPath);
    owners.forEach(patition -> {
    try {
    try {
    String owner = zkClient.readData(ownersPath + "/" + patition, true);
    if (owner != null) {
    Map<String, String> map = result.get(patition);
    map.put("owner", owner);
    result.put(patition, map);
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    });
    }

    return result;
    }
  3. 步骤3


4. 步骤4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private List<TopicConsumerGroupState> getZKConsumerOffsets(String topic) {
final List<TopicConsumerGroupState> topicConsumerGroupStates = new ArrayList<>();
Set<String> zkGroups = this.listTopicGroups(topic);
zkGroups.forEach(group -> {
Map<String, Map<String, String>> zkConsumerOffsets = this.getZKConsumerOffsets(group,
topic);

Map<TopicPartition, Long> offsetsMap = this.consumerOffset(group, topic);

TopicConsumerGroupState topicConsumerGroupState = new TopicConsumerGroupState();
topicConsumerGroupState.setGroupId(group);
topicConsumerGroupState.setConsumerMethod("zk");

List<PartitionAssignmentState> partitionAssignmentStates = new ArrayList<>();

Map<String, Long> offsetsTempMap = new HashMap<>(offsetsMap.size());
offsetsMap.forEach((k, v) -> {
offsetsTempMap.put(k.partition() + "", v);
});

zkConsumerOffsets.forEach((patition, topicDesribe) -> {
Long logEndOffset = offsetsTempMap.get(patition);
String owner = topicDesribe.get("owner");
long offset = Long.parseLong(topicDesribe.get("offset"));
PartitionAssignmentState partitionAssignmentState = new PartitionAssignmentState();
partitionAssignmentState.setClientId(owner);
partitionAssignmentState.setGroup(group);
partitionAssignmentState.setLogEndOffset(logEndOffset);
partitionAssignmentState.setTopic(topic);
partitionAssignmentState.setOffset(offset);
partitionAssignmentState.setPartition(Integer.parseInt(patition));
partitionAssignmentState.setLag(getLag(offset, logEndOffset));

partitionAssignmentStates.add(partitionAssignmentState);
});
topicConsumerGroupState.setPartitionAssignmentStates(partitionAssignmentStates);
topicConsumerGroupStates.add(topicConsumerGroupState);
});
return topicConsumerGroupStates;
}

参考

  1. 如何获取Kafka的消费者详情——从Scala到Java的切换
  2. Kafka的Lag计算误区及正确实现

前言

前端页面权限在日常开发中,主要有以下:

  • 登录授权,用户没有登录只能访问登录页面,如果处于登录状态则跳转到当前用户的默认首页;

  • 路由授权,当前登录用户的角色,如果对一个 URL 没有权限访问,则跳转到 403 页面;

  • 数据授权,当访问一个没有权限的 API,则跳转到 403 页面;

  • 操作授权,当页面中某个按钮或者区域没有权限访问则在页面中隐藏

本文主要针对ICE飞冰模板,介绍下如何开发权限管理,主要是使用,不涉及任何架构设计及原理,关于原理可以参考源码分析,或者参考本文参考链接。

实践

首先先安装依赖,因为飞冰权限是使用Authorized 权限组件实现了基本的权限管理方案。

1
2
npm install antd --save
npm install ant-design-pro@latest --save

因为ant-design-pro依赖antd,因此需要首先安装antd

在开始之前需要新增两个util,一个设置登录权限,一个为了重新渲染权限

authority.js

1
2
3
4
5
6
7
8
9
10
11
12
13
// use localStorage to store the authority info, which might be sent from server in actual project.
export function getAuthority() {
return localStorage.getItem('ice-pro-authority') || 'admin';
}

export function setAuthority(authority) {
return localStorage.setItem('ice-pro-authority', authority);
}

export function removeAuthority() {
return localStorage.removeItem('ice-pro-authority');
}

Authorized.js

1
2
3
4
5
6
7
8
9
10
11
12
13
import RenderAuthorized from 'ant-design-pro/lib/Authorized';
import { getAuthority } from './authority';

let Authorized = RenderAuthorized(getAuthority()); // eslint-disable-line

// 更新权限
const reloadAuthorized = () => {
Authorized = RenderAuthorized(getAuthority());
};

export { reloadAuthorized };
export default Authorized;

阅读全文 »

资源

看过很多文档,经过精心整理对比,觉得以下几个对我来说最有帮助,在此记录一下,分享给更多的人。

  1. React 基础知识和介绍
  2. JavaScript 标准库
  3. React Getting Started

常识

props 用来传递数据,state 用来存储组件内部的状态和数据。props 是只读的,state 当前组件 state 的值可以作为 props 传递给下层组件

  1. React 组件的变化是基于状态的

    通过setState更改状态的内容

1
2
3
this.setState({
switchStatus: !this.state.switchStatus
});
  1. React 组件的生命周期

    更多详见地址

问题解决

1.this 作用域处理方案

问题背景:手动触发事件,更改state中数据

  1. 箭头表达式
    1
    2
    3
    4
    5
    6
    7
    mergeFilter = ()=> {
    this.setState({
    store: store,
    merge: !this.state.merge,
    });
    }

    render组件中调用
    1
    2
    3
    4
    5
    6
    7
    8
    <EuiSwitch
    label="Merge"
    key="MergeEuiSwitch"
    checked={
    this.state.merge
    }
    onChange={()=>this.mergeFilter()}
    />
    handlePaginationChange也是可以调用的
  2. bind
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
     constructor(props) {
    super(props);
    this.state = {value: 'coconut'};

    this.handleChange = this.handleChange.bind(this);
    }

    handleChange(event) {
    this.setState({value: event.target.value});
    }

    render() {
    return (
    <form onSubmit={this.handleSubmit}>
    <select value={this.state.value} onChange={this.handleChange}>
    <option value="grapefruit">Grapefruit</option>
    </select>
    <input type="submit" value="Submit" />
    </form>
    );
    }

2.传值

这里的传值是组件化传值,属性在组件中是可以自定义的。
在父组件中添加子组件后,可以添加任意属性,如下的dialogObj:

1
<DetailDialog dialogObj={dialogObj} } />

对于在子组件中可以在构造方法中获取该属性

1
2
3
4
5
6
constructor(props) {
super(props);
this.state = {
dialogObj: this.props.dialogObj,
};
}

对于父组件该属性发生变化,如何更新呢,这里利用生命周期中的componentWillReceiveProps,这样即可获悉到属性的变化。

1
2
3
4
5
6
// 接受父类props改变,修改子类中的属性
componentWillReceiveProps(nextProps) {
this.setState({
dialogObj: nextProps.dialogObj,
});
}

3.父调用子函数

在开发的场景中,经常存在需要在父组件中触发子组件方法

通过属性传值能解决业务问题的,优先建议使用属性传值解决。如果不能解决,可以考虑使用ref解决调用问题

例如:

1
2
3
4
5
6
7
<SimpleFormDialog ref="getDialog">

父组件方法
parentMethod = () => {
//调用组件进行通信
this.refs.getDialog.childMethod();
}

4.子调用父函数

在开发的场景中,经常存在需要在子组件中触发父组件方法

在添加子组件的时候使用如下方式

1
<DetailDialog hideDetailDialog={this.hideDetailDialog} />

类似于属性传递,方法也可以传递的,这样在子组件中方法调用一下即可:

1
2
3
hideDetailDialog = () => {
this.props.hideDetailDialog();
}

高阶学习

react-route

略,详见简明React Router v4教程

react-redux

略,详见redux

参考

  1. he-select-tag
  2. React 基础知识和介绍
  3. React Refs

1. KSQL是什么?

KSQL是Apache Kafka的流式SQL引擎,让你可以SQL语方式句执行流处理任务。KSQL降低了数据流处理这个领域的准入门槛,为使用Kafka处理数据提供了一种简单的、完全交互的SQL界面。你不再需要用Java或Python之类的编程语言编写代码了!KSQL具有这些特点:开源(采用Apache 2.0许可证)、分布式、可扩展、可靠、实时。它支持众多功能强大的数据流处理操作,包括聚合、连接、加窗(windowing)和sessionization(捕获单一访问者的网站会话时间范围内所有的点击流事件)等等。

2. KSQL能解决什么问题?

  • 流式ETL

Apache Kafka是为数据管道的流行选择。 KSQL使得在管道中转换数据变得简单,准备好消息以便在另一个系统中干净地着陆。

  • 实时监控和分析

通过快速构建实时仪表板,生成指标以及创建自定义警报和消息,跟踪,了解和管理基础架构,应用程序和数据源。

  • 数据探索和发现

在Kafka中导航并浏览您的数据。

  • 异常检测

通过毫秒级延迟识别模式并发现实时数据中的异常,使您能够正确地表现出异常事件并分别处理欺诈活动。

  • 个性化

为用户创建数据驱动的实时体验和洞察力。

  • 传感器数据和物联网

理解并提供传感器数据的方式和位置。

  • 客户360视图

通过各种渠道全面了解客户的每一次互动,实时不断地整合新信息。

3.KSQL架构及组件

  • KSQL服务器

KSQL服务器运行执行KSQL查询的引擎。这包括处理,读取和写入目标Kafka集群的数据。 KSQL服务器构成KSQL集群,可以在容器,虚拟机和裸机中运行。您可以在实时操作期间向/从同一KSQL群集添加和删除服务器,以根据需要弹性扩展KSQL的处理能力。您可以部署不同的KSQL集群以实现工作负载隔离。

  • KSQL CLI

你可以使用KSQL命令行界面(CLI)以交互方式编写KSQL查询。 KSQL CLI充当KSQL服务器的客户端。对于生产方案,您还可以将KSQL服务器配置为以非交互式“无头”配置运行,从而阻止KSQL CLI访问。

KSQL服务器,客户端,查询和应用程序在Kafka brokers之外,在单独的JVM实例中运行,或者在完全独立的集群中运行。

4. 入门教程

4.1 数据类型及术语

4.1.1 数据类型

  • BOOLEAN
  • INTEGER
  • BIGINT
  • DOUBLE
  • VARCHAR (or STRING)
  • ARRAY (JSON and AVRO only. Index starts from 0)
  • MAP<VARCHAR, ValueType> (JSON and AVRO only)
  • STRUCT<FieldName FieldType, …> (JSON and AVRO only) The STRUCT type requires you to specify a list of fields.

4.1.2 术语

  • Stream

流是结构化数据的无界序列。例如,我们可以进行一系列金融交易,例如“Alice向Bob发送了100美元,然后Charlie向Bob发送了50美元”。流中的事实是不可变的,这意味着可以将新事实插入到流中,但是现有事实永远不会被更新或删除。可以从Kafka主题创建流,也可以从现有流派生流。流的基础数据在topic中持久存储(持久化)

  • Table

表是流或其他表的视图,表示不断变化的事实的集合。例如,我们可以有一个表格,其中包含最新的财务信息,例如“Bob的当前账户余额为150美元”。它相当于传统的数据库表,但通过流式语义(如窗口)进行了丰富。表中的事实是可变的,这意味着可以将新事实插入表中,并且可以更新或删除现有事实。可以从Kafka主题创建表,也可以从现有流和表派生表。在这两种情况下,表的基础数据都在topic中持久存储(持久化)。

  • STRUCT

在KSQL 5.0及更高版本中,您可以使用CREATE STREAM和CREATE TABLE语句中的STRUCT类型以Avro和JSON格式读取嵌套数据。

CREATE STREAM/TABLE (from a topic)

CREATE STREAM/TABLE AS SELECT (from existing streams/tables)

SELECT (non-persistent query)

例如

1
2
3
4
5
CREATE STREAM orders (
orderId BIGINT,
address STRUCT<street VARCHAR, zip INTEGER>) WITH (...);

SELECT address->city, address->zip FROM orders;
  • KSQL Time Units
  1. DAY, DAYS
  2. HOUR, HOURS
  3. MINUTE, MINUTES
  4. SECOND, SECONDS
  5. MILLISECOND, MILLISECONDS

4.2 如何使用?

目前confluent KSQL支持两种方式,一种是ksql-cli,一种是rest

ksql-cli

这里我仅演示使用docker方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sudo docker run -it --rm  confluentinc/cp-ksql-cli:5.1.2  http://192.168.0.11:8088
[sudo] password for bigdata:

===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.1.2, Server v5.1.2 located at http://192.168.0.11:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> show streams;

Stream Name | Kafka Topic | Format
------------------------------------------------------
EC_BOT_DETECTION | EC_bot_detection.replica | JSON
DEMO | trumantest | JSON
------------------------------------------------------

rest

1
2
3
4
curl --request POST \
--url http://192.168.0.11:8088/ksql \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
--data '{\r\n "ksql": "LIST STREAMS;",\r\n "streamsProperties": {}\r\n}'

rest主要有两个接口1.ksql2.query,对于一般查询语句可以使用query接口。其他的命令操作一般使用ksql接口。

4.3 快速指南

4.3.1 常用支持命令

命令 描述
CREATE STREAM
CREATE TABLE
CREATE STREAM AS SELECT
CREATE TABLE AS SELECT
INSERT INTO
DESCRIBE
DESCRIBE FUNCTION
EXPLAIN
DROP STREAM [IF EXISTS] [DELETE TOPIC];
DROP TABLE [IF EXISTS] [DELETE TOPIC];
PRINT
SELECT 在select下支持更多sql语法,这里不一一列出
SHOW FUNCTIONS
SHOW TOPICS
SHOW STREAMS
SHOW TABLES
SHOW QUERIES 列出持久化查询

4.3.2 KSQL Language Elements

  • DDL(数据定义语言)

DDL包括以下:

  1. CREATE STREAM
  2. CREATE TABLE
  3. DROP STREAM
  4. DROP TABLE
  5. CREATE STREAM AS SELECT (CSAS)
  6. CREATE TABLE AS SELECT (CTAS)
  • DML(数据处理语言)

DML包括以下:

  1. SELECT
  2. INSERT INTO
  3. CREATE STREAM AS SELECT (CSAS)
  4. CREATE TABLE AS SELECT (CTAS)

4.3.3 Time and Windows in KSQL

KSQL支持时间窗口操作,以下是目前支持的几种类型:

Window type 行为 描述
Hopping Window 基于时间 固定持续时间,重叠的窗口
Tumbling Window 基于时间 固定持续时间,非重叠,无间隙窗口
Session Window 基于session 动态大小,不重叠,数据驱动的窗口

4.4 使用案例

案例场景

这里我演示一个反爬虫检测的案例, 根据EC_bot_detection的数据抓取一分钟内超过限制的IP.

创建stream

根据消费kafka中EC_bot_detection.replica数据,定义不同字段的数据类型,按照该数据定义语句如下

1
CREATE STREAM ec_bot_detection (action VARCHAR, nvtc VARCHAR,nid VARCHAR,ip VARCHAR,msg VARCHAR,level VARCHAR,verb VARCHAR,url VARCHAR,purl VARCHAR,ua VARCHAR,valid INTEGER,nNvtc VARCHAR,nNid VARCHAR,cip VARCHAR,sip VARCHAR,xpost VARCHAR,time VARCHAR,id VARCHAR,xprotocol VARCHAR,xhost VARCHAR,xport VARCHAR,xpath VARCHAR,xquery VARCHAR,xpathalias VARCHAR,xkey VARCHAR,importance INTEGER,xpprotocol VARCHAR,xphost VARCHAR,xpport VARCHAR,xppath VARCHAR,xpquery VARCHAR,org VARCHAR,cyc VARCHAR,isp VARCHAR,cycName VARCHAR,region VARCHAR,city VARCHAR,geoLocation VARCHAR,zipcode VARCHAR) WITH (VALUE_FORMAT = 'JSON',KAFKA_TOPIC = 'EC_bot_detection.replica');
自定义sql

查询语句
bot_detection_1min

1
SELECT ip,org,cyc,count(1) as ccount FROM EC_BOT_DETECTION WINDOW TUMBLING (SIZE 60 SECONDS) WHERE xhost not like '%.tw' and level='-1' AND importance=1 AND (msg<>'W' OR msg is null) and action like'crawler%' AND ip NOT LIKE '127.0.0.1' AND ip NOT LIKE '172.16.%' AND ip <> ''  AND xpathalias !='LandingpageOverviewcontent4mobile'  AND xpathalias !='CommonCommonrecaptchavalidate'  GROUP BY ip,org,cyc HAVING count(1)>300;
输出结果

为了将查询结果存储到kafka中,我们可以使用

CREATE TABLE TableName as SELECT …

1
2
CREATE TABLE bot_detection_1min as SELECT ip,org,cyc,count(1) as ccount FROM EC_BOT_DETECTION WINDOW TUMBLING (SIZE 60 SECONDS) WHERE xhost not like '%.tw' and level='-1' AND importance=1 AND (msg<>'W' OR msg is null) and action like'crawler%' AND ip NOT LIKE '127.0.0.1' AND ip NOT LIKE '172.16.%' AND ip <> ''  AND xpathalias !='LandingpageOverviewcontent4mobile'  AND xpathalias !='CommonCommonrecaptchavalidate'  GROUP BY ip,org,cyc HAVING count(1)>300;

总结
  1. having不支持别名,可以直接使用聚合函数。例如 HAVING count(1)>300
  2. count(distinct xkey)不支持,可以使用自定义函数(array_length)实现数组求大小。例如 array_length(COLLECT_SET(xkey))
  3. not in不支持,可以使用!=替换

4.5 高阶应用之自定义函数

目前官方提供的函数不足以满足业务场景的需求情况下,我们可以扩展自定义函数udf/udaf

4.5.1 udf函数

用户自定义函数,可以实现针对column操作的函数,例如以下案例,实现字母转大写

1
2
3
4
5
6
7
8
@UdfDescription(name = "upper", description = "字符串转大写")
public class Upper {
@Udf
public String upper(final String column) {
return column.toUpperCase();
}

}

4.5.2 udaf函数

用户自定义聚合函数,以下是聚合求百分比函数demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@UdafDescription(name = "percent", description = "聚合求百分比")
public class Percent {
@UdafFactory(description = "statistics percent")
public static Udaf<Integer, Map<String, Double>> createStddev() {
return new Udaf<Integer, Map<String, Double>>() {
@Override
public Map<String, Double> initialize() {
Map<String, Double> stats = new HashMap<>();
stats.put("sum", 0.0);
stats.put("bussiness", 0.0);
stats.put("percent", 0.0);
return stats;

}

@Override
public Map<String, Double> aggregate(final Integer val, final Map<String, Double> aggregateValue) {
Double sum = aggregateValue.getOrDefault("sum", 0.0) + 1;
Double bussiness = aggregateValue.getOrDefault("bussiness", 0.0);
if (val % 4 == 0) {
bussiness = bussiness + 1;
}

Double percent = bussiness / sum;

Map<String, Double> agg = new HashMap<>();
agg.put("sum", sum);
agg.put("bussiness", bussiness);
agg.put("percent", percent);
return agg;
}

@Override
public Map<String, Double> merge(final Map<String, Double> aggOne, final Map<String, Double> aggTwo) {

Double sum = aggOne.getOrDefault("sum", 0.0) + aggTwo.getOrDefault("sum", 0.0);
Double bussiness = aggOne.getOrDefault("bussiness", 0.0) + aggTwo.getOrDefault("bussiness", 0.0);
Double percent = bussiness / sum;

Map<String, Double> agg = new HashMap<>();
agg.put("sum", sum);
agg.put("bussiness", bussiness);
agg.put("percent", percent);
return agg;
}
};

}
}

5. 已知问题

通过两周ksql的使用,现发现有如下问题不能满足

  • distinct
    1
    2
    select distinct ip from #table;
    select count(distinct ip) from #table
  • sub query
    1
    select ip from (select * from table);
  • case when
    1
    sum(case when nvtc='' then 1 else 0 end)
  • in/not in
    1
    xpathalias not in ('Landingpage','Wishlist','CommonMessagepage','Common')

6.参考

  1. KSQL
  2. KSQL REST API Reference
  3. KSQL Syntax Reference
  4. Time and Windows in KSQL
  5. KSQL Custom Function Reference (UDF and UDAF)

前言

最近开源了一个redis内存分析一站式平台RCT,想让更多的人指定我们这个项目,正好看到同类几个开源项目,就想着爬取关注的用户,给他们发邮件,推广一下我们的项目。

实践

因为本地linux机器安装的python版本为2.7.0,因此使用python2语法

安装依赖

1
$ pip install PyGithub

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from github import Github
import os
import codecs
def create(newfile):
if not os.path.exists(newfile):
f = open(newfile,'w')
print newfile
f.close()
return
fileName='rct.txt'
respository='xaecbd/RCT'
g=Github("token",timeout=300)

repo=g.get_repo(respository)
stargazers=repo.get_stargazers_with_dates()
create(fileName)
with codecs.open(fileName,'a', 'utf-8') as f:
for people in stargazers:
print people.user.email
if people.user.email is not None:
if people.user.name is None:
name = people.user.email
else:
name = people.user.name
data=(name)+":"+(people.user.email)+"\r\n"
f.write(data)

运行

1
python crawer.py

总结

在运行过程中发现关于用户数据是输出的时候才再去github查询的,这也就意味着,及时stargazers已经获取,还会继续去调用github api。可能会存在网络问题。

参考

  1. 怎样通过GitHub API获取某个项目被Star的详细信息
  2. PyGithub

前言

目前我们EC Bigdata team 运维公司 4个 Redis 集群,300+ Redis 实例,500G+ 的内存数据,我们想要分析业务是否有误用,以提高资源利用率。伴随着业务team的广泛使用,近期数据增 长比较快,我们紧迫需要一个工具分析一下各种业务存储的数据有多大,是否存入僵死数据浪费资源;同时E4 WWW redis 集群有业务方反馈近期有比较明显的慢查询发生,所以我们需要针对 slow log 和存入的常用数据类型Hash,List,Set分析,是否有big key引起慢查询,是否有team存在超大的 big key和不合理设置ttl的情况

那有没有什么办法让我们安全高效的看到 Redis 内存消耗的详细报表呢?办法总比问题多,有需求就有解决方案。EC Bigdata team针对这个问题实现了一个 Redis 内存数据可视化分析平台 RCT (Redis Computed Tomography)。

RCT可以非常方便的对 Reids 的内存进行分析,了解一个 Redis 实例里都有哪些 key,哪类 key 占用的空间是多少,最耗内存的 key 有哪些,占比如何,非常直观,除此之外,我们还可以针对Redis slowlog/clientlist进行分钟级别监控,直观监控集群效应状况。

同类产品

  1. redis-rdb-tools
  2. rdr
  3. redis-rdb-cli

市面上已经存在这么多开源的产品,我们为什么还要重新做一个呢?主要还是没有满足我们的需求,以上的都是redis rdb解析工具,最接近我们需求的就是雪球开源的rdr,但是也只限于离线分析,而我们节点众多,不可能一个一个去线上机器copy,或者开发copy工具,这样也会给相应机器带来网络热点(总有办法解决这个问题),带来很多繁重无意义的劳动,么不是专门的运维工程师,还有更多的开发任务等着我们去做。

我们想要一个每天都会给我们产生报表,自动推送到我们的邮箱,或者在网页上就能看到最近redis内存的变化,它不仅仅是一个工具,还是常态化运行的服务,基于此我们打造了属于最近的redis内存分析平台RCT。

理论

设计思路

使用 bgsave,获取 rdb 文件,解析后获取数据。

优点:机制成熟,可靠性好;文件相对小,传输、解析效率高;

缺点:bgsave 虽然会 fork 子进程,但还是有可能导致主进程卡住一段时间,对业务有产生影响的风险;

采用低峰期在从节点做 bgsave 获取 rdb 文件,相对安全可靠。拿到了 rdb 文件就相当于拿到了 Redis 实例的所有数据,接下来就是生成报表的过程了:

解析 rdb 文件,获取到 Key 和 Value 的内容;根据相对应的数据结构及内容,估算内存消耗等;统计并生成报表;逻辑很简单,所以设计思路很清晰。

数据流图

slave节点均分计算

为了使对线上机器不产生影响,我们选择是在slave节点进行rdb文件分析,该任务是分布式的。为了均衡对每个机器的影响,通过算法去保证slave分配算法均匀的落在不同的机器上。

算法思路

  1. slave数量最小的优先分配
  2. 通过map存储不同IP分配的数量,按照规则,优先分配数量最小的IP
  3. staticsResult里面不存在的IP优先分配
  4. 如果上面未分配,则选择staticsResult中数值最小的那个slave

算法代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* 根据<master:slaves>获取执行分析任务ports规则
* 即获取其中一个slave,尽量保持均衡在不同机器上
*
* @param clusterNodesMap
* @return <ip:ports>
*/
public static Map<String, Set<String>> generateAnalyzeRule(Map<String, List<String>> clusterNodesMap) {

// 通过该map存储不同IP分配的数量,按照规则,优先分配数量最小的IP
Map<String, Integer> staticsResult = new HashMap<>();
Map<String, Set<String>> generateRule = new HashMap<>();

// 此处排序是为了将slave数量最小的优先分配
List<Map.Entry<String, List<String>>> sortList = new LinkedList<>(clusterNodesMap.entrySet());
Collections.sort(sortList, new Comparator<Entry<String, List<String>>>() {
@Override
public int compare(Entry<String, List<String>> o1, Entry<String, List<String>> o2) {
return o1.getValue().size() - o2.getValue().size();
}
});

for (Entry<String, List<String>> entry : sortList) {
List<String> slaves = entry.getValue();
boolean isSelected = false;
String tempPort = null;
String tempIP = null;
int num = 0;
for (String slave : slaves) {
String ip = slave.split(":")[0];
String port = slave.split(":")[1];
// 统计组里面不存在的IP优先分配
if (!staticsResult.containsKey(ip)) {
staticsResult.put(ip, 1);
Set<String> generatePorts = generateRule.get(ip);
if (generatePorts == null) {
generatePorts = new HashSet<>();
}
generatePorts.add(port);
generateRule.put(ip, generatePorts);
isSelected = true;
break;
} else {
// 此处是为了求出被使用最少的IP
Integer staticsNum = staticsResult.get(ip);
if (num == 0) {
num = staticsNum;
tempPort = port;
tempIP = ip;
continue;
}
if (staticsNum < num) {
tempPort = port;
tempIP = ip;
num = staticsNum;
}
}

}

// 如果上面未分配,则选择staticsResult中数值最小的那个slave
if (!isSelected) {
if (slaves != null && slaves.size() > 0) {
if (tempPort != null) {
Set<String> generatePorts = generateRule.get(tempIP);
if (generatePorts == null) {
generatePorts = new HashSet<>();
}
generatePorts.add(tempPort);
generateRule.put(tempIP, generatePorts);
staticsResult.put(tempIP, staticsResult.get(tempIP) + 1);
}
}
}
}
return generateRule;
}

Jemalloc内存预分配

算法思路

redis中支持多种内存分配算法,推荐Jemalloc,因此我们选择该算法来预估内存大小。因为这个算法比较复杂,我们参考雪球做法使用约定数组,根据不同数据大小分配不同内存空间,为了提高查找效率,这块采用了变种二分查找。

算法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public class Jemalloc {

private static long[] array16;

private static long[] array192;

private static long[] array768;

private static long[] array4096;

private static long[] array4194304;

static {
array16 = range(16, 128 + 1, 16);

array192 = range(192, 512 + 1, 64);

array768 = range(768, 4096 + 1, 256);

array4096 = range(4096, 4194304 + 1, 4096);

array4194304 = range(4194304, 536870912 + 1, 4194304);

}

/**
* 根据Jemalloc 估算分配内存大小 Small: All 2^n-aligned allocations of size 2^n will incur
* no additional overhead, due to how small allocations are aligned and packed.
* Small: [8], [16, 32, 48, ..., 128], [192, 256, 320, ..., 512], [768, 1024,
* 1280, ..., 3840] Large: The worst case size is half the chunk size, in which
* case only one allocation per chunk can be allocated. If the remaining
* (nearly) half of the chunk isn't otherwise useful for smaller allocations,
* the overhead will essentially be 50%. However, assuming you use a diverse
* mixture of size classes, the actual overhead shouldn't be a significant issue
* in practice. Large: [4 KiB, 8 KiB, 12 KiB, ..., 4072 KiB] Huge: Extra virtual
* memory is mapped, then the excess is trimmed and unmapped. This can leave
* virtual memory holes, but it incurs no physical memory overhead. Earlier
* versions of jemalloc heuristically attempted to optimistically map chunks
* without excess that would need to be trimmed, but it didn't save much system
* call overhead in practice. Huge: [4 MiB, 8 MiB, 12 MiB, ..., 512 MiB]
*
* @param size
* @return
*/
public static long assign(long size) {
if (size <= 4096) {
// Small
if (is_power2(size)) {
return size;
} else if (size < 128) {
return min_ge(array16, size);
} else if (size < 512) {
return min_ge(array192, size);
} else {
return min_ge(array768, size);
}
} else if (size < 4194304) {
// Large
return min_ge(array4096, size);
} else {
// Huge
return min_ge(array4194304, size);
}
}

/**
* 创建一个long数组
*
* @param start
* @param stop
* @param step
* @return
*/
public static long[] range(int start, int stop, int step) {
int size = (stop - 1 - start) / step + 1;
long[] array = new long[size];
int index = 0;
for (int i = start; i < stop; i = i + step) {
array[index] = i;
index++;
}
return array;
}

public static long min_ge(long[] srcArray, long key) {
int index = binarySearch(srcArray, key);
return srcArray[index];
}

// 二分查找最小值,即最接近要查找的值,但是要大于该值
public static int binarySearch(long srcArray[], long key) {
int mid = (0+srcArray.length-1) / 2;
if (key == srcArray[mid]) {
return mid;
}

if (key > srcArray[mid] && key <= srcArray[mid + 1]) {
return mid + 1;
}

int start = 0;
int end = srcArray.length - 1;
while (start <= end) {
mid = (end - start) / 2 + start;
if (key == srcArray[mid]) {
return mid;
}
if (key > srcArray[mid] && key <= srcArray[mid + 1]) {
return mid + 1;
}
if (key < srcArray[mid]) {
end = mid - 1;
}
if (key > srcArray[mid]) {
start = mid + 1;
}
}
return 0;
}

public static boolean is_power2(long size) {
if (size == 0) {
return false;
}

if ((size & (size - 1)) == 0) {
return true;
}

return false;
}
}

Redis不同数据结构预估

算法思路

详见redis源码

算法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
private static final long redisObject = (long)(8 + 8);
// 一个dictEntry,24字节,jemalloc会分配32字节的内存块
private static final long dicEntry = (long)(2 * 8 + 8 + 8);
private static final String patternString = "^[-\\\\+]?[\\\\d]*$";

private static long skiplistMaxLevel = 32;
private static long redisSharedInterges = 10000;
private static long longSize = 8;
private static long pointerSize = 8;

/**
* 一个SDS结构占据的空间为:free所占长度+len所占长度+ buf数组的长度=4+4+len+1=len+9
*
* @param length
* @return
*/
private static long sds(long length) {
long mem = 9 + length;
return mem;
}

/**
*
* 计算 string byte 大小
*
* @param kv
* https://searchdatabase.techtarget.com.cn/wp-content/uploads/res/database/article/2011/2011-11-14-16-56-18.jpg
* @return
*/
public static long CalculateString(KeyStringValueString kv) {
long mem = KeyExpiryOverhead(kv);
mem = dicEntry + SizeofString(kv.getRawKey());
mem = mem + redisObject + SizeofString(kv.getValueAsString());
return mem;
}

public static long CalculateLinkedList(KeyStringValueList kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsStringList().size();
mem = mem + LinkedListEntryOverhead() * length;
mem = mem + LinkedlistOverhead();
mem = mem + redisObject * length;
for (String value : kv.getValueAsStringList()) {
mem = mem + SizeofString(value);
}

return mem;
}

public static long CalculateZipList(KeyStringValueList kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + dicEntry;
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
long length = kv.getValueAsStringList().size();
mem = mem + ZiplistOverhead(length);

for (String value : kv.getValueAsStringList()) {
mem = mem + ZiplistAlignedStringOverhead(value);
}

return mem;
}

public static long CalculateHash(KeyStringValueHash kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsHash().size();
mem = mem + HashtableOverhead(length);

for (String key : kv.getValueAsHash().keySet()) {
String value = kv.getValueAsHash().get(key);
mem = mem + SizeofString(key);
mem = mem + SizeofString(value);
mem = mem + 2 * redisObject;
mem = mem + HashtableEntryOverhead();
}

return mem;
}

public static long CalculateSet(KeyStringValueSet kv) {
long mem = KeyExpiryOverhead(kv);

mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsSet().size();
mem = mem + HashtableOverhead(length);
mem = mem + redisObject * length;

for (String value : kv.getValueAsSet()) {
mem = mem + SizeofString(value);
mem = mem + 2 * redisObject;
mem = mem + HashtableEntryOverhead();
}

return mem;
}

public static long CalculateIntSet(KeyStringValueSet kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + dicEntry;
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;

long length = kv.getValueAsSet().size();
mem = mem + IntsetOverhead(length);

for (String value : kv.getValueAsSet()) {
mem = mem + ZiplistAlignedStringOverhead(value);
}

return mem;
}

public static long CalculateZSet(KeyStringValueZSet kv) {
long mem = KeyExpiryOverhead(kv);

mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsSet().size();
mem = mem + SkiplistOverhead(length);
mem = mem + redisObject * length;

for (ZSetEntry value : kv.getValueAsZSet()) {
mem = mem + 8;
mem = mem + SizeofString(value.getElement());
// TODO 还有个 score
mem = mem + 2 * redisObject;
mem = mem + SkiplistEntryOverhead();
}

return mem;
}

// TopLevelObjOverhead get memory use of a top level object
// Each top level object is an entry in a dictionary, and so we have to include
// the overhead of a dictionary entry
public static long TopLevelObjOverhead() {
return HashtableEntryOverhead();
}

/**
* SizeofString get memory use of a string
* https://github.com/antirez/redis/blob/unstable/src/sds.h
*
* @param bytes
* @return
*/
public static long SizeofString(byte[] bytes) {
String value = new String(bytes);

if (isInteger(value)) {
try {
Long num = Long.parseLong(value);
if (num < redisSharedInterges && num > 0) {
return 0;
}
return 8;
} catch (NumberFormatException e) {
}
}

return Jemalloc.assign(sds(bytes.length));
}


public static long SizeofString(String value) {
if (isInteger(value)) {
try {
Long num = Long.parseLong(value);
if (num < redisSharedInterges && num > 0) {
return 0;
}
return 8;
} catch (NumberFormatException e) {
}
}

return Jemalloc.assign(sds(value.length()));
}

public static long DictOverhead(long size) {
return Jemalloc.assign(56 + 2*pointerSize + nextPower(size) * 3*8);
}

public static boolean isInteger(String str) {
Pattern pattern = Pattern.compile(patternString);
return pattern.matcher(str).matches();
}

/**
* 过期时间也是存储为一个 dictEntry,时间戳为 int64;
*
* @param kv
* @return
*/
// KeyExpiryOverhead get memory useage of a key expiry
// Key expiry is stored in a hashtable, so we have to pay for the cost of a
// hashtable entry
// The timestamp itself is stored as an int64, which is a 8 bytes
@SuppressWarnings("rawtypes")
public static long KeyExpiryOverhead(KeyValuePair kv) {
// If there is no expiry, there isn't any overhead
if (kv.getExpiredType() == ExpiredType.NONE) {
return 0;
}
return HashtableEntryOverhead() + 8;
}

public static long HashtableOverhead(long size) {
return 4 + 7 * longSize + 4 * pointerSize + nextPower(size) * pointerSize * 3 / 2;
}

// HashtableEntryOverhead get memory use of hashtable entry
// See https://github.com/antirez/redis/blob/unstable/src/dict.h
// Each dictEntry has 2 pointers + int64
public static long HashtableEntryOverhead() {
return 2 * pointerSize + 8;
}

public static long ZiplistOverhead(long size) {
return Jemalloc.assign(12 + 21 * size);
}

public static long ZiplistAlignedStringOverhead(String value) {
try {
Long.parseLong(value);
return 8;
} catch (NumberFormatException e) {
}
return Jemalloc.assign(value.length());
}

// LinkedlistOverhead get memory use of a linked list
// See https://github.com/antirez/redis/blob/unstable/src/adlist.h
// A list has 5 pointers + an unsigned long
public static long LinkedlistOverhead() {
return longSize + 5 * pointerSize;
}

// LinkedListEntryOverhead get memory use of a linked list entry
// See https://github.com/antirez/redis/blob/unstable/src/adlist.h
// A node has 3 pointers
public static long LinkedListEntryOverhead() {
return 3 * pointerSize;
}

// SkiplistOverhead get memory use of a skiplist
public static long SkiplistOverhead(long size) {
return 2 * pointerSize + HashtableOverhead(size) + (2 * pointerSize + 16);
}

// SkiplistEntryOverhead get memory use of a skiplist entry
public static long SkiplistEntryOverhead() {
return HashtableEntryOverhead() + 2 * pointerSize + 8 + (pointerSize + 8) * zsetRandLevel();
}

public static long nextPower(long size) {
long power = 1;
while (power <= size) {
power = power << 1;
}
return power;
}

public static long zsetRandLevel() {
long level = 1;
int rint = new Random().nextInt(65536);
int flag = 65535 / 4;
while (rint < flag) {// skiplistP
level++;
rint = new Random().nextInt(65536);
}
if (level < skiplistMaxLevel) {
return level;
}
return skiplistMaxLevel;
}

public static long IntsetOverhead(long size) {
// typedef struct intset {
// uint32_t encoding;
// uint32_t length;
// int8_t contents[];
// } intset;
return (4 + 4) * size;
}

RCT分析redis rdb

RCT是一个一站式redis内存分析分析平台,分析任务是分布式的,需要将RCT-Analyze部署到rdb所在机器上,RCT-Dashboard部署在任何机器,只要能保持和RCT-Analyze通信即可。不同于以上列举的工具,我们最初定位RCT就是一个可以长期运行,尽可能每天分析redis中数据,为redis运维人员提供运维依据,便于做出更好的规范,高效的使用redis。

部署

部署过程略,详见官方文档,推荐使用docker方式,这样也是我们目前采用的方式。

配置分析任务

新增RDB分析配置

只有先创建了redis节点之后,才能进入到RCT工具导航页面。

  1. 点击导航RDB Analyze
  2. 如若一直没有添加过RDB信息,则可在页面弹出的框中进行完善信息,或者点击下方的Add按钮进行添加
  3. 点击edit则可以对RDB 信息进行修改
  4. 如若已经完善了RDB信息,则点击打开switch开关,则是对RDB直接进行定时任务的开启和关闭
  5. 点击Analyze,则是对RDB信息进行手动分析,分析进行的状态可在status中查看,可以通过status后面的链接进入到实时查看rdb分析任务的进度状态

RDB Add页面参数说明

  1. Automatic Analyze:是否开启定时任务
  2. Schedule:cron 表达式(填写完成之后,可以点击右侧的图标进行查看定时表达式执行的时间)
  3. Analyzer:分析器 (依次是生成报表,根据filter导key到elasticsearch中,根据preix导key到elasticsearch中)
  4. Data Path:rdb文件的目录(eg:/opt/app/redis/9002/dump.rdb,data path应为/opt/app/redis)
  5. Prefixes:key的前缀,可为空,但是在选择了分析器中的根据preix导key,则必须填写prefixes。我们强烈建议将已知前缀填入,可以提高分析效率,节省时间。
  6. Report:是否生成报表
  7. Mail:生成报表之后的收件人,平台将会将报表做为附件发送给收件人,如果收件人有多个,请用;隔开

RDB 主页面参数说明

RDB主页面参数与add页面参数基本相同,在此不再做赘述,唯一有区别的是:
Status:分析RDB文件的进度状态,分为成功,正在分析,失败三种状态,status为正在分析状态时,可通过点击后面的链接进入到实时分析页面,查看实时状态。

分析器介绍

  1. 生成报表:对rdb文件的数据进行分析并将结果写入数据库中,如果配置了Report,结果会以excel报表的形式发送邮件给用户
  2. 根据filter导key:在对rdb文件分析的时候,根据过滤器导出相应的key,将数据写入到EleasticSearch中
  3. 根据prefix导key:在对rdb文件分析的时候,根据制定的前缀key导出相应的key,将数据写入到EleasticSearch中

手动分析

点击Analyze,则是对RDB信息进行手动分析,分析进行的状态可在status中查看,可以通过status后面的链接进入到实时查看rdb分析任务的进度状态

查看报表

目前RCT支持dashboard/email两种方式,这里我仅展示dashboard.
image

项目地址

https://github.com/xaecbd/RCT,欢迎使用,欢迎加入我们,帮我们提高RCT。如果你有问题,可以前往github新建issue。

参考

  1. Redis持久化文件RDB的格式解析
  2. redis-rdb-tools
  3. rdr
  4. analysis-redis
  5. Redis内存模型

JVM的基本结构

一.类加载器(ClassLoader)

其作用是在程序运行时,将编译好的.class字节码文件装载到JVM的内存区域中.如下图所示流程,Java源码被编译器编译为字节码文件,字节码文件被类加载器加载到数据运行时区域(其实就是内存空间当中),然后再由执行引擎执行.class文件中的字节码指令.

二.执行引擎

执行.class字节码文件中的指令集,如果想了解class中的字节码指令,可以参考<<深入分析Java Web技术内幕>>的第5章深入class文件结构.

三.本地库接口(本地方法库)

我的理解这是JVM与本地操作系统交互的接口,调用一些由C语言等编写的本地方法,一般的开发者并不用细纠.

四.JVM内存区(运行时数据区)

这是JVM中非常重要的一部分,是Java程序运行时JVM所分配的内存区域,绝大部分开发者关注的重点都在此.

JVM的内存区域分为5大块,如下图所示.

1.虚拟机栈(Stack)

一般俗称栈区,是线程私有的.栈区一般与线程紧密相联,一旦有新的线程被创建,JVM就会为该线程分配一个对应的java栈区,在这个栈区中会有许多栈帧,每运行一个方法就创建一个栈帧,用于存储局部变量,方法返回值等.栈帧中存储的局部变量随着线程的结束而结束,其生命周期取决于线程的生命周期,所以讲java栈中的变量都是线程私有的.

2.堆(Heap)

真正存储对象的区域,当进行Object obj = new Object()这样一个操作时,真正的obj对象实例就会在heap中.

3.方法区(Method Area)

包含常量池,静态变量等,有人说常量池也属于heap的一部分,但是严格上讲方法区只是堆的逻辑部分,方法区还有个别名叫做非堆(non-heap),所以方法区和堆还是有不同的.

4.程序计数器(Program Couter Register)

用于保存当前线程的执行的内存地址.因为JVM是支持多线程的,多线程同时执行的时候可能会轮流切换,为了保证线程切换回来后还能恢复到原先状态,就需要一个独立的计数器,记录之前中断的位置,由此可以看出程序计数器也是线程私有的.

5.本地方法栈(Native Method Stack)

性质与虚拟机栈类似,是为了方便JVM去调用本地方法接口的栈区,此处开发者很少去关注,我也是了解有限,因此不深入探究其作用.

0%