前言

目前我们EC Bigdata team 运维公司 4个 Redis 集群,300+ Redis 实例,500G+ 的内存数据,我们想要分析业务是否有误用,以提高资源利用率。伴随着业务team的广泛使用,近期数据增 长比较快,我们紧迫需要一个工具分析一下各种业务存储的数据有多大,是否存入僵死数据浪费资源;同时E4 WWW redis 集群有业务方反馈近期有比较明显的慢查询发生,所以我们需要针对 slow log 和存入的常用数据类型Hash,List,Set分析,是否有big key引起慢查询,是否有team存在超大的 big key和不合理设置ttl的情况

那有没有什么办法让我们安全高效的看到 Redis 内存消耗的详细报表呢?办法总比问题多,有需求就有解决方案。EC Bigdata team针对这个问题实现了一个 Redis 内存数据可视化分析平台 RCT (Redis Computed Tomography)。

RCT可以非常方便的对 Reids 的内存进行分析,了解一个 Redis 实例里都有哪些 key,哪类 key 占用的空间是多少,最耗内存的 key 有哪些,占比如何,非常直观,除此之外,我们还可以针对Redis slowlog/clientlist进行分钟级别监控,直观监控集群效应状况。

同类产品

  1. redis-rdb-tools
  2. rdr
  3. redis-rdb-cli

市面上已经存在这么多开源的产品,我们为什么还要重新做一个呢?主要还是没有满足我们的需求,以上的都是redis rdb解析工具,最接近我们需求的就是雪球开源的rdr,但是也只限于离线分析,而我们节点众多,不可能一个一个去线上机器copy,或者开发copy工具,这样也会给相应机器带来网络热点(总有办法解决这个问题),带来很多繁重无意义的劳动,么不是专门的运维工程师,还有更多的开发任务等着我们去做。

我们想要一个每天都会给我们产生报表,自动推送到我们的邮箱,或者在网页上就能看到最近redis内存的变化,它不仅仅是一个工具,还是常态化运行的服务,基于此我们打造了属于最近的redis内存分析平台RCT。

理论

设计思路

使用 bgsave,获取 rdb 文件,解析后获取数据。

优点:机制成熟,可靠性好;文件相对小,传输、解析效率高;

缺点:bgsave 虽然会 fork 子进程,但还是有可能导致主进程卡住一段时间,对业务有产生影响的风险;

采用低峰期在从节点做 bgsave 获取 rdb 文件,相对安全可靠。拿到了 rdb 文件就相当于拿到了 Redis 实例的所有数据,接下来就是生成报表的过程了:

解析 rdb 文件,获取到 Key 和 Value 的内容;根据相对应的数据结构及内容,估算内存消耗等;统计并生成报表;逻辑很简单,所以设计思路很清晰。

数据流图

slave节点均分计算

为了使对线上机器不产生影响,我们选择是在slave节点进行rdb文件分析,该任务是分布式的。为了均衡对每个机器的影响,通过算法去保证slave分配算法均匀的落在不同的机器上。

算法思路

  1. slave数量最小的优先分配
  2. 通过map存储不同IP分配的数量,按照规则,优先分配数量最小的IP
  3. staticsResult里面不存在的IP优先分配
  4. 如果上面未分配,则选择staticsResult中数值最小的那个slave

算法代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* 根据<master:slaves>获取执行分析任务ports规则
* 即获取其中一个slave,尽量保持均衡在不同机器上
*
* @param clusterNodesMap
* @return <ip:ports>
*/
public static Map<String, Set<String>> generateAnalyzeRule(Map<String, List<String>> clusterNodesMap) {

// 通过该map存储不同IP分配的数量,按照规则,优先分配数量最小的IP
Map<String, Integer> staticsResult = new HashMap<>();
Map<String, Set<String>> generateRule = new HashMap<>();

// 此处排序是为了将slave数量最小的优先分配
List<Map.Entry<String, List<String>>> sortList = new LinkedList<>(clusterNodesMap.entrySet());
Collections.sort(sortList, new Comparator<Entry<String, List<String>>>() {
@Override
public int compare(Entry<String, List<String>> o1, Entry<String, List<String>> o2) {
return o1.getValue().size() - o2.getValue().size();
}
});

for (Entry<String, List<String>> entry : sortList) {
List<String> slaves = entry.getValue();
boolean isSelected = false;
String tempPort = null;
String tempIP = null;
int num = 0;
for (String slave : slaves) {
String ip = slave.split(":")[0];
String port = slave.split(":")[1];
// 统计组里面不存在的IP优先分配
if (!staticsResult.containsKey(ip)) {
staticsResult.put(ip, 1);
Set<String> generatePorts = generateRule.get(ip);
if (generatePorts == null) {
generatePorts = new HashSet<>();
}
generatePorts.add(port);
generateRule.put(ip, generatePorts);
isSelected = true;
break;
} else {
// 此处是为了求出被使用最少的IP
Integer staticsNum = staticsResult.get(ip);
if (num == 0) {
num = staticsNum;
tempPort = port;
tempIP = ip;
continue;
}
if (staticsNum < num) {
tempPort = port;
tempIP = ip;
num = staticsNum;
}
}

}

// 如果上面未分配,则选择staticsResult中数值最小的那个slave
if (!isSelected) {
if (slaves != null && slaves.size() > 0) {
if (tempPort != null) {
Set<String> generatePorts = generateRule.get(tempIP);
if (generatePorts == null) {
generatePorts = new HashSet<>();
}
generatePorts.add(tempPort);
generateRule.put(tempIP, generatePorts);
staticsResult.put(tempIP, staticsResult.get(tempIP) + 1);
}
}
}
}
return generateRule;
}

Jemalloc内存预分配

算法思路

redis中支持多种内存分配算法,推荐Jemalloc,因此我们选择该算法来预估内存大小。因为这个算法比较复杂,我们参考雪球做法使用约定数组,根据不同数据大小分配不同内存空间,为了提高查找效率,这块采用了变种二分查找。

算法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public class Jemalloc {

private static long[] array16;

private static long[] array192;

private static long[] array768;

private static long[] array4096;

private static long[] array4194304;

static {
array16 = range(16, 128 + 1, 16);

array192 = range(192, 512 + 1, 64);

array768 = range(768, 4096 + 1, 256);

array4096 = range(4096, 4194304 + 1, 4096);

array4194304 = range(4194304, 536870912 + 1, 4194304);

}

/**
* 根据Jemalloc 估算分配内存大小 Small: All 2^n-aligned allocations of size 2^n will incur
* no additional overhead, due to how small allocations are aligned and packed.
* Small: [8], [16, 32, 48, ..., 128], [192, 256, 320, ..., 512], [768, 1024,
* 1280, ..., 3840] Large: The worst case size is half the chunk size, in which
* case only one allocation per chunk can be allocated. If the remaining
* (nearly) half of the chunk isn't otherwise useful for smaller allocations,
* the overhead will essentially be 50%. However, assuming you use a diverse
* mixture of size classes, the actual overhead shouldn't be a significant issue
* in practice. Large: [4 KiB, 8 KiB, 12 KiB, ..., 4072 KiB] Huge: Extra virtual
* memory is mapped, then the excess is trimmed and unmapped. This can leave
* virtual memory holes, but it incurs no physical memory overhead. Earlier
* versions of jemalloc heuristically attempted to optimistically map chunks
* without excess that would need to be trimmed, but it didn't save much system
* call overhead in practice. Huge: [4 MiB, 8 MiB, 12 MiB, ..., 512 MiB]
*
* @param size
* @return
*/
public static long assign(long size) {
if (size <= 4096) {
// Small
if (is_power2(size)) {
return size;
} else if (size < 128) {
return min_ge(array16, size);
} else if (size < 512) {
return min_ge(array192, size);
} else {
return min_ge(array768, size);
}
} else if (size < 4194304) {
// Large
return min_ge(array4096, size);
} else {
// Huge
return min_ge(array4194304, size);
}
}

/**
* 创建一个long数组
*
* @param start
* @param stop
* @param step
* @return
*/
public static long[] range(int start, int stop, int step) {
int size = (stop - 1 - start) / step + 1;
long[] array = new long[size];
int index = 0;
for (int i = start; i < stop; i = i + step) {
array[index] = i;
index++;
}
return array;
}

public static long min_ge(long[] srcArray, long key) {
int index = binarySearch(srcArray, key);
return srcArray[index];
}

// 二分查找最小值,即最接近要查找的值,但是要大于该值
public static int binarySearch(long srcArray[], long key) {
int mid = (0+srcArray.length-1) / 2;
if (key == srcArray[mid]) {
return mid;
}

if (key > srcArray[mid] && key <= srcArray[mid + 1]) {
return mid + 1;
}

int start = 0;
int end = srcArray.length - 1;
while (start <= end) {
mid = (end - start) / 2 + start;
if (key == srcArray[mid]) {
return mid;
}
if (key > srcArray[mid] && key <= srcArray[mid + 1]) {
return mid + 1;
}
if (key < srcArray[mid]) {
end = mid - 1;
}
if (key > srcArray[mid]) {
start = mid + 1;
}
}
return 0;
}

public static boolean is_power2(long size) {
if (size == 0) {
return false;
}

if ((size & (size - 1)) == 0) {
return true;
}

return false;
}
}

Redis不同数据结构预估

算法思路

详见redis源码

算法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
private static final long redisObject = (long)(8 + 8);
// 一个dictEntry,24字节,jemalloc会分配32字节的内存块
private static final long dicEntry = (long)(2 * 8 + 8 + 8);
private static final String patternString = "^[-\\\\+]?[\\\\d]*$";

private static long skiplistMaxLevel = 32;
private static long redisSharedInterges = 10000;
private static long longSize = 8;
private static long pointerSize = 8;

/**
* 一个SDS结构占据的空间为:free所占长度+len所占长度+ buf数组的长度=4+4+len+1=len+9
*
* @param length
* @return
*/
private static long sds(long length) {
long mem = 9 + length;
return mem;
}

/**
*
* 计算 string byte 大小
*
* @param kv
* https://searchdatabase.techtarget.com.cn/wp-content/uploads/res/database/article/2011/2011-11-14-16-56-18.jpg
* @return
*/
public static long CalculateString(KeyStringValueString kv) {
long mem = KeyExpiryOverhead(kv);
mem = dicEntry + SizeofString(kv.getRawKey());
mem = mem + redisObject + SizeofString(kv.getValueAsString());
return mem;
}

public static long CalculateLinkedList(KeyStringValueList kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsStringList().size();
mem = mem + LinkedListEntryOverhead() * length;
mem = mem + LinkedlistOverhead();
mem = mem + redisObject * length;
for (String value : kv.getValueAsStringList()) {
mem = mem + SizeofString(value);
}

return mem;
}

public static long CalculateZipList(KeyStringValueList kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + dicEntry;
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
long length = kv.getValueAsStringList().size();
mem = mem + ZiplistOverhead(length);

for (String value : kv.getValueAsStringList()) {
mem = mem + ZiplistAlignedStringOverhead(value);
}

return mem;
}

public static long CalculateHash(KeyStringValueHash kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsHash().size();
mem = mem + HashtableOverhead(length);

for (String key : kv.getValueAsHash().keySet()) {
String value = kv.getValueAsHash().get(key);
mem = mem + SizeofString(key);
mem = mem + SizeofString(value);
mem = mem + 2 * redisObject;
mem = mem + HashtableEntryOverhead();
}

return mem;
}

public static long CalculateSet(KeyStringValueSet kv) {
long mem = KeyExpiryOverhead(kv);

mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsSet().size();
mem = mem + HashtableOverhead(length);
mem = mem + redisObject * length;

for (String value : kv.getValueAsSet()) {
mem = mem + SizeofString(value);
mem = mem + 2 * redisObject;
mem = mem + HashtableEntryOverhead();
}

return mem;
}

public static long CalculateIntSet(KeyStringValueSet kv) {
long mem = KeyExpiryOverhead(kv);
mem = mem + dicEntry;
mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;

long length = kv.getValueAsSet().size();
mem = mem + IntsetOverhead(length);

for (String value : kv.getValueAsSet()) {
mem = mem + ZiplistAlignedStringOverhead(value);
}

return mem;
}

public static long CalculateZSet(KeyStringValueZSet kv) {
long mem = KeyExpiryOverhead(kv);

mem = mem + SizeofString(kv.getRawKey());
mem = mem + redisObject;
mem = mem + dicEntry;
long length = kv.getValueAsSet().size();
mem = mem + SkiplistOverhead(length);
mem = mem + redisObject * length;

for (ZSetEntry value : kv.getValueAsZSet()) {
mem = mem + 8;
mem = mem + SizeofString(value.getElement());
// TODO 还有个 score
mem = mem + 2 * redisObject;
mem = mem + SkiplistEntryOverhead();
}

return mem;
}

// TopLevelObjOverhead get memory use of a top level object
// Each top level object is an entry in a dictionary, and so we have to include
// the overhead of a dictionary entry
public static long TopLevelObjOverhead() {
return HashtableEntryOverhead();
}

/**
* SizeofString get memory use of a string
* https://github.com/antirez/redis/blob/unstable/src/sds.h
*
* @param bytes
* @return
*/
public static long SizeofString(byte[] bytes) {
String value = new String(bytes);

if (isInteger(value)) {
try {
Long num = Long.parseLong(value);
if (num < redisSharedInterges && num > 0) {
return 0;
}
return 8;
} catch (NumberFormatException e) {
}
}

return Jemalloc.assign(sds(bytes.length));
}


public static long SizeofString(String value) {
if (isInteger(value)) {
try {
Long num = Long.parseLong(value);
if (num < redisSharedInterges && num > 0) {
return 0;
}
return 8;
} catch (NumberFormatException e) {
}
}

return Jemalloc.assign(sds(value.length()));
}

public static long DictOverhead(long size) {
return Jemalloc.assign(56 + 2*pointerSize + nextPower(size) * 3*8);
}

public static boolean isInteger(String str) {
Pattern pattern = Pattern.compile(patternString);
return pattern.matcher(str).matches();
}

/**
* 过期时间也是存储为一个 dictEntry,时间戳为 int64;
*
* @param kv
* @return
*/
// KeyExpiryOverhead get memory useage of a key expiry
// Key expiry is stored in a hashtable, so we have to pay for the cost of a
// hashtable entry
// The timestamp itself is stored as an int64, which is a 8 bytes
@SuppressWarnings("rawtypes")
public static long KeyExpiryOverhead(KeyValuePair kv) {
// If there is no expiry, there isn't any overhead
if (kv.getExpiredType() == ExpiredType.NONE) {
return 0;
}
return HashtableEntryOverhead() + 8;
}

public static long HashtableOverhead(long size) {
return 4 + 7 * longSize + 4 * pointerSize + nextPower(size) * pointerSize * 3 / 2;
}

// HashtableEntryOverhead get memory use of hashtable entry
// See https://github.com/antirez/redis/blob/unstable/src/dict.h
// Each dictEntry has 2 pointers + int64
public static long HashtableEntryOverhead() {
return 2 * pointerSize + 8;
}

public static long ZiplistOverhead(long size) {
return Jemalloc.assign(12 + 21 * size);
}

public static long ZiplistAlignedStringOverhead(String value) {
try {
Long.parseLong(value);
return 8;
} catch (NumberFormatException e) {
}
return Jemalloc.assign(value.length());
}

// LinkedlistOverhead get memory use of a linked list
// See https://github.com/antirez/redis/blob/unstable/src/adlist.h
// A list has 5 pointers + an unsigned long
public static long LinkedlistOverhead() {
return longSize + 5 * pointerSize;
}

// LinkedListEntryOverhead get memory use of a linked list entry
// See https://github.com/antirez/redis/blob/unstable/src/adlist.h
// A node has 3 pointers
public static long LinkedListEntryOverhead() {
return 3 * pointerSize;
}

// SkiplistOverhead get memory use of a skiplist
public static long SkiplistOverhead(long size) {
return 2 * pointerSize + HashtableOverhead(size) + (2 * pointerSize + 16);
}

// SkiplistEntryOverhead get memory use of a skiplist entry
public static long SkiplistEntryOverhead() {
return HashtableEntryOverhead() + 2 * pointerSize + 8 + (pointerSize + 8) * zsetRandLevel();
}

public static long nextPower(long size) {
long power = 1;
while (power <= size) {
power = power << 1;
}
return power;
}

public static long zsetRandLevel() {
long level = 1;
int rint = new Random().nextInt(65536);
int flag = 65535 / 4;
while (rint < flag) {// skiplistP
level++;
rint = new Random().nextInt(65536);
}
if (level < skiplistMaxLevel) {
return level;
}
return skiplistMaxLevel;
}

public static long IntsetOverhead(long size) {
// typedef struct intset {
// uint32_t encoding;
// uint32_t length;
// int8_t contents[];
// } intset;
return (4 + 4) * size;
}

RCT分析redis rdb

RCT是一个一站式redis内存分析分析平台,分析任务是分布式的,需要将RCT-Analyze部署到rdb所在机器上,RCT-Dashboard部署在任何机器,只要能保持和RCT-Analyze通信即可。不同于以上列举的工具,我们最初定位RCT就是一个可以长期运行,尽可能每天分析redis中数据,为redis运维人员提供运维依据,便于做出更好的规范,高效的使用redis。

部署

部署过程略,详见官方文档,推荐使用docker方式,这样也是我们目前采用的方式。

配置分析任务

新增RDB分析配置

只有先创建了redis节点之后,才能进入到RCT工具导航页面。

  1. 点击导航RDB Analyze
  2. 如若一直没有添加过RDB信息,则可在页面弹出的框中进行完善信息,或者点击下方的Add按钮进行添加
  3. 点击edit则可以对RDB 信息进行修改
  4. 如若已经完善了RDB信息,则点击打开switch开关,则是对RDB直接进行定时任务的开启和关闭
  5. 点击Analyze,则是对RDB信息进行手动分析,分析进行的状态可在status中查看,可以通过status后面的链接进入到实时查看rdb分析任务的进度状态

RDB Add页面参数说明

  1. Automatic Analyze:是否开启定时任务
  2. Schedule:cron 表达式(填写完成之后,可以点击右侧的图标进行查看定时表达式执行的时间)
  3. Analyzer:分析器 (依次是生成报表,根据filter导key到elasticsearch中,根据preix导key到elasticsearch中)
  4. Data Path:rdb文件的目录(eg:/opt/app/redis/9002/dump.rdb,data path应为/opt/app/redis)
  5. Prefixes:key的前缀,可为空,但是在选择了分析器中的根据preix导key,则必须填写prefixes。我们强烈建议将已知前缀填入,可以提高分析效率,节省时间。
  6. Report:是否生成报表
  7. Mail:生成报表之后的收件人,平台将会将报表做为附件发送给收件人,如果收件人有多个,请用;隔开

RDB 主页面参数说明

RDB主页面参数与add页面参数基本相同,在此不再做赘述,唯一有区别的是:
Status:分析RDB文件的进度状态,分为成功,正在分析,失败三种状态,status为正在分析状态时,可通过点击后面的链接进入到实时分析页面,查看实时状态。

分析器介绍

  1. 生成报表:对rdb文件的数据进行分析并将结果写入数据库中,如果配置了Report,结果会以excel报表的形式发送邮件给用户
  2. 根据filter导key:在对rdb文件分析的时候,根据过滤器导出相应的key,将数据写入到EleasticSearch中
  3. 根据prefix导key:在对rdb文件分析的时候,根据制定的前缀key导出相应的key,将数据写入到EleasticSearch中

手动分析

点击Analyze,则是对RDB信息进行手动分析,分析进行的状态可在status中查看,可以通过status后面的链接进入到实时查看rdb分析任务的进度状态

查看报表

目前RCT支持dashboard/email两种方式,这里我仅展示dashboard.
image

项目地址

https://github.com/xaecbd/RCT,欢迎使用,欢迎加入我们,帮我们提高RCT。如果你有问题,可以前往github新建issue。

参考

  1. Redis持久化文件RDB的格式解析
  2. redis-rdb-tools
  3. rdr
  4. analysis-redis
  5. Redis内存模型

JVM的基本结构

一.类加载器(ClassLoader)

其作用是在程序运行时,将编译好的.class字节码文件装载到JVM的内存区域中.如下图所示流程,Java源码被编译器编译为字节码文件,字节码文件被类加载器加载到数据运行时区域(其实就是内存空间当中),然后再由执行引擎执行.class文件中的字节码指令.

二.执行引擎

执行.class字节码文件中的指令集,如果想了解class中的字节码指令,可以参考<<深入分析Java Web技术内幕>>的第5章深入class文件结构.

三.本地库接口(本地方法库)

我的理解这是JVM与本地操作系统交互的接口,调用一些由C语言等编写的本地方法,一般的开发者并不用细纠.

四.JVM内存区(运行时数据区)

这是JVM中非常重要的一部分,是Java程序运行时JVM所分配的内存区域,绝大部分开发者关注的重点都在此.

JVM的内存区域分为5大块,如下图所示.

1.虚拟机栈(Stack)

一般俗称栈区,是线程私有的.栈区一般与线程紧密相联,一旦有新的线程被创建,JVM就会为该线程分配一个对应的java栈区,在这个栈区中会有许多栈帧,每运行一个方法就创建一个栈帧,用于存储局部变量,方法返回值等.栈帧中存储的局部变量随着线程的结束而结束,其生命周期取决于线程的生命周期,所以讲java栈中的变量都是线程私有的.

2.堆(Heap)

真正存储对象的区域,当进行Object obj = new Object()这样一个操作时,真正的obj对象实例就会在heap中.

3.方法区(Method Area)

包含常量池,静态变量等,有人说常量池也属于heap的一部分,但是严格上讲方法区只是堆的逻辑部分,方法区还有个别名叫做非堆(non-heap),所以方法区和堆还是有不同的.

4.程序计数器(Program Couter Register)

用于保存当前线程的执行的内存地址.因为JVM是支持多线程的,多线程同时执行的时候可能会轮流切换,为了保证线程切换回来后还能恢复到原先状态,就需要一个独立的计数器,记录之前中断的位置,由此可以看出程序计数器也是线程私有的.

5.本地方法栈(Native Method Stack)

性质与虚拟机栈类似,是为了方便JVM去调用本地方法接口的栈区,此处开发者很少去关注,我也是了解有限,因此不深入探究其作用.

Mapping 各字段的选型流程

image

规则

elasticsearch 为了更好的让大家开箱即用,默认启用了大量不必要的设置,为了降低空间使用,提升查询效率,请按以下规则构建最佳的 Mapping

1. 禁用不需要的功能

不用查询,禁用 index

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT index
{
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "integer",
"index": false
}
}
}
}
}

不关心评分,禁用该功能

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT index
{
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "text",
"norms": false
}
}
}
}
}

不需要短语查询,禁用 index positions

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT index
{
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "text",
"index_options": "freqs"
}
}
}
}
}

2. 不要使用默认的 Mapping

默认 Mapping 的字段类型是系统自动识别的。其中:string 类型默认分成:text 和 keyword 两种类型。如果你的业务中不需要分词、检索,仅需要精确匹配,仅设置为 keyword 即可。

根据业务需要选择合适的类型,有利于节省空间和提升精度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
PUT index
{
"mappings": {
"_doc": {
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}
}

3. 考虑 identifiers 映射为 keyword

某些数据是数字的事实并不意味着它应该始终映射为数字字段。 Elasticsearch 索引数字的方式可以优化范围查询,而 keyword 字段在 term 查询时更好。
通常,存储诸如 ISBN 的标识符或标识来自另一数据库的记录的任何数字的字段很少用于范围查询或聚合。这就是为什么他们可能会受益于被映射为 keyword 而不是 integer 或 long。

参考

  1. Tune for disk usage
  2. 让 Elasticsearch 飞起来:性能优化实践干货

linux性能优化笔记之CPU篇

cpu优化用到的命令

  • mpstat
    进程相关统计工具,cpu/io可以同时对比
  • vmstat
    内存分析工具
  • pidstat
    进程分析工具
  • perf 使用perf record -g -p < pid>和perf report就足够了

CPU篇

根据指标找工具

image

根据工具查指标

image

指标工具关联

image

相关命令

查看cpu核数:

1
$ grep 'model name' /proc/cpuinfo | wc -l

mpstat 查看 CPU 使用率的变化情况

1
2
# -P ALL 表示监控所有 CPU,后面数字 5 表示间隔 5 秒后输出一组数据
$ mpstat -P ALL 5

pidstat查看具体进程cpu使用率

1
2
3
4
5
6
7
8
# 间隔 5 秒后输出一组数据

$ pidstat -u 5 1
# 查看进程IO读写情况
$ pidstat -d
# w可以查看上下文切换情况(进程) cswch/s 自愿切换(资源不够) nvcswch/s被动切换(时间片到期等中断)
# wt是可以查看到线程
$ pidstat -wt 5

vmstat内存分析工具,可以分析上下文切换情况

1
2
# in中断 sc上下文切换
$ vmstat 1

perl系能分析工具,可以追查引起性能问题的函数

1
$ sudo perf top -g -p 30377

名词解释

  1. 平均负载

    简单来说,平均负载是指单位时间内,系统处于可运行状态不可中断状态的平均进程数,也就是平均活跃进程数,它和 CPU 使用率并没有直接关系。可运行状态顾名思义只得是正在运行的任务,不可中断状态例如等待cpu,等待IO。

  2. cpu上下文

    CPU寄存器和程序计数器为任务运行必备依赖环境,即为cpu上下文

cpu上下文切换包含哪些?
(1)进程上下文切换(2)线程上下文切换(3)中断上下文切换

知识点

  1. cpu使用率和平均负载一致吗?

不一定,当平均负载高的时候,任务主要是等待IO型,IO密集型任务,CPU使用率就不一定高。

注意

  1. pidstat中缺少%wait

centos中版本较低是,安装新版本即有
2. pidstat中%wait与top wa区别

pidstat 中, %wait 表示进程等待 CPU 的时间百分。

top 中 ,iowait%(简写wa) 则表示等待 I/O 的 CPU 时间百分。

store是什么?

回答store是什么之前,先说一下正常使用es,我们的字段默认store:false,但是我们还是可以正常查出该数据的,那store:true有什么用呢?

默认情况下,字段值(index:true)以使其可搜索,但不会存储它们。这意味着可以查询该字段,但无法检索原始字段值。

通常这没关系。字段值已经是_source字段的一部分,默认情况下存储该字段。如果您只想检索单个字段或几个字段的值,而不是整个_source,则可以使用source filtering来实现。

现在我来说一下store是什么?字面意思是是否存储该数据,如果为true,则会单独存储该数据。如果_source 没有排除 exclude 掉这个字段,那么应该是会存储多份的。

如果你要求返回field1(store:true),es会分辨出field1已经被存储了,因此不会从_source中加载,而是从field1的存储块中加载

意义

当数据有title, date,a very large content,而只需要取回title, date,不用从很大的_source字段中抽取。这种场景下性能更高。

怎么使用?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PUT my_index
{
"mappings": {
"_doc": {
"properties": {
"title": {
"type": "text",
"store": true
},
"date": {
"type": "date",
"store": true
},
"content": {
"type": "text"
}
}
}
}
}

PUT my_index/_doc/1
{
"title": "Some short title",
"date": "2015-01-01",
"content": "A very long content field..."
}

GET my_index/_search
{
"stored_fields": [ "title", "date" ]
}

相关原理

当你将一个field的store属性设置为true,这个会在lucene层面处理。lucene是倒排索引,可以执行快速的全文检索,返回符合检索条 件的文档id列表。在全文索引之外,lucene也提供了存储字段的值的特性,以支持提供id的查询(根据id得到原始信息)。通常我们在lucene层 面存储的field的值是跟随search请求一起返回的(id+field的值)。es并不需要存储你想返回的每一个field的值,因为默认情况下每 一个文档的的完整信息都已经存储了,因此可以跟随查询结构返回你想要的所有field值。

有一些情况下,显式的存储某些field的值是必须的:当_source被disabled的时候,或者你并不想从source中parser来得到 field的值(即使这个过程是自动的)。

请记住:从每一个stored field中获取值都需要一次磁盘io,如果想获取多个field的值,就需要多次磁盘io,但是,如果从_source中获取多个field的值,则只 需要一次磁盘io,因为_source只是一个字段而已。所以在大多数情况下,从_source中获取是快速而高效的。

es中默认的设置_source是enable的,存储整个文档的值。这意味着在执行search操作的时候可以返回整个文档的信息。如果不想返回这个文 档的完整信息,也可以指定要求返回的field,es会自动从_source中抽取出指定field的值返回(比如说highlighting的需求)。

注意事项

哪些情形下需要显式的指定store属性呢?大多数情况并不是必须的。从_source中获取值是快速而且高效的。如果你的文档长度很长,存储 _source或者从_source中获取field的代价很大,

你可以显式的将某些field的store属性设置为true。缺点如上边所说:假设你存 储了10个field,而如果想获取这10个field的值,则需要多次的io,如果从_source中获取则只需要一次,而且_source是被压缩过 的

引用

  1. mapping-store
  2. elasticsearch的store属性 vs _source字段

初衷

为什么我要写算法篇,记得刚入大学的第一节课,老师就教我们软件=算法+数据结构,而我恰恰这两点学的最差了,学习这块两个初衷:1.阿里机试折腰,让我清楚的意识到自己的问题 2.职业瓶颈,已经工作四年了,知识面有了,但是缺乏深度。暂且先选择将基础算法打扎实点。

声明以下所有的定义不具有权威性,皆来自我对该算法的理解。

1.初级排序

1.1冒泡法

这大概是最简答的排序,可我一直是搞错的。
定义:将数组相邻的数进行对比,直到选出最大值或者最小值,每次冒出一个数,后面的逻辑不再处理它
实现

1
2
3
4
5
6
7
8
9
10
11
// 从小到大排序
for (int i = 0; i < wait2Sort.length-1; i++) {// 控制循环次数
for (int j = 0; j < wait2Sort.length - 1 - i; j++) {
if (wait2Sort[j] > wait2Sort[j + 1]) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j + 1];
wait2Sort[j + 1] = temp;
}

}
}

N个数字要排序完成,总共进行N-1趟排序,每第 i 趟的排序次数为 (N-i) 次,所以可以用双重循环语句,外层控制循环多少趟,内层控制每一趟的循环次数。

按说算法是这样的确实没错,但是网络上还有另外一种写法,就是第一层循环N次,按说是结果不会错,但是算法多此一举。我想追究的是会不会多算一次。例如如下写法:

1
2
3
4
5
6
7
8
9
10
11
// 从小到大排序
for (int i = 0; i < wait2Sort.length; i++) {// 控制循环次数
for (int j = 0; j < wait2Sort.length - 1 - i; j++) {
if (wait2Sort[j] > wait2Sort[j + 1]) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j + 1];
wait2Sort[j + 1] = temp;
}

}
}

至于N次是否进行运算,我们就需要探讨当i=N-1时,是否进入第二个循环,当i=N-1时,第二个循环的条件就变成了j<N-1-(N-1)即j<0,这个永远不会成立,因此最后一次的循环不会进入二层循环。相对于正确算法只多了一次判断,性能可以忽略不计。

1.2选择排序法

1.2.1初阶选择排序法

定义:选取确定的数据依次与其他数据进行对比
实现

1
2
3
4
5
6
7
8
9
10
// 从小到大排序
for (int i = 0; i < wait2Sort.length - 1; i++) {
for (int j = i + 1; j < wait2Sort.length; j++) {
if (wait2Sort[i] > wait2Sort[j]) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[i];
wait2Sort[i] = temp;
}
}
}

1.2.2进阶选择排序法

定义:选取确定的数据依次与其他数据进行对比,但是和1.2.1区别在于内部循环只确定最大数的索引,数据交换是在外层循环做的。
实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 从小到大排序
for (int i = 0; i < wait2Sort.length-1; i++) {
int min = i;
for (int j = i + 1; j < wait2Sort.length; j++) {
if (wait2Sort[min] > wait2Sort[j]) {
min = j;
}
}

if (min != i) {
int temp = wait2Sort[min];
wait2Sort[min] = wait2Sort[i];
wait2Sort[i] = temp;
}
}

该算法较1.2.1区别在于内部循环只确定最大数的索引,数据交换是在外层循环做的。少了多次交换,相较于前两种性能更好!

1.3插入排序

定义:像整理牌一样,将每一张牌插入到有序数组中适当的位置。左侧永远是有序的,当运行到数据最右端,即排序完毕,这种算法对于数组中部分数据是有序的话,性能会有很大的提升!
实现

1
2
3
4
5
6
7
8
// 从小到大排序
for (int i = 1; i < wait2Sort.length; i++) {
for (int j = i; j > 0 && (wait2Sort[j - 1]) > wait2Sort[j]; j--) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j - 1];
wait2Sort[j - 1] = temp;
}
}

1.4希尔排序

定义:希尔排序是把记录按下标的一定增量分组,对每组使用直接插入排序算法排序;随着增量逐渐减少,每组包含的关键词越来越多,当增量减至1时,整个文件恰被分成一组,算法便终止
实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 从小到大排序
int h = 1;
while (h < wait2Sort.length / 3)
h = 3 * h + 1;
while (h >= 1) {
for (int i = h; i < wait2Sort.length; i++) {
for (int j = i; j - h >= 0 && wait2Sort[j - h] > wait2Sort[j]; j = j - h) {
int temp = wait2Sort[j - h];
wait2Sort[j - h] = wait2Sort[j];
wait2Sort[j] = temp;
}
}

h = h / 3;
}

上面已经演示了以4为初始间隔对包含10个数据项的数组进行排序的情况。对于更大的数组开始的间隔也应该更大。然后间隔不断减小,直到间隔变成1。

举例来说,含有1000个数据项的数组可能先以364为增量,然后以121为增量,以40为增量,以13为增量,以4为增量,最后以 1为增量进行希尔排序。用来形成间隔的数列被称为间隔序列。这里所表示的间隔序列由Knuth提出,此序列是很常用的。在排序算法中,首先在一个短小的循环中使用序列的生成公式来计算出最初的间隔。h值最初被赋为1,然后应用公式h=3*h+1生成序列1,4,13,40,121,364,等等。当间隔大于数组大小的时候,这个过程停止。

1
2
3
4
5
6
7
8
9
10
11
// 从小到大排序
for (int h = wait2Sort.length / 2; h > 0; h = h / 2) {
for (int i = h; i < wait2Sort.length; i++) {
for (int j = i; j>=h && wait2Sort[j - h] > wait2Sort[j]; j = j - h) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j - h];
wait2Sort[j - h] = temp;
}
}

}

这个也是正确的,间距为N/2

2.归并排序

定义:归并排序,也叫归并算法,指的是将两个顺序序列合并成一个顺序序列的方法

如 设有数列{6,202,100,301,38,8,1}
初始状态:6,202,100,301,38,8,1
第一次归并后:{6,202},{100,301},{8,38},{1},比较次数:3;
第二次归并后:{6,100,202,301},{1,8,38},比较次数:4;
第三次归并后:{1,6,8,38,100,202,301},比较次数:4;
总的比较次数为:3+4+4=11;
逆序数为14;

实现:

3.快速排序

4.堆排序

总结

参考

  1. 图解排序算法(二)之希尔排序

不回忆,就再也想不起了

毕业四年了,整整四年了,工作忙碌,生活奔波,没有就给我留下太多的时间去回忆,不回忆,就再也想不起来了。

想不起来刚进校园就因甲流被隔离的那一个月,那是快乐的一个月,没有学业,只有大学的新鲜,没有看不完的书,只有发不完的呆,再也不会有带着口罩满校溜达,回忆就像一扇窗,打开它,我能看到一副风景,风景中是我丢失的碎片记忆。

想不起大学的第一节课,想不到工作这么久,竟然干的工作多少还和它有点联系,烈日灼心,记忆中那节所在的教室在午后是那么的热,即使有空调,也感觉不到一丝凉意,它骨子里留给我的不是竞争,不是压力,即使我们有早操,即使我们下了下午的课,还得赶匆忙的晚自习,我骨子里是一个意志力很差的人,它教会了我坚持,不知道未来是什么样,只是依稀懂得需要坚持。

世间巧合万万个,最后一节课依然是那个教室,这大概就是所谓的缘分。回忆是件可怕的事,不想倒是没什么,一想瞬间千万片段释放,不知道是不是触电。物是人非,我离它直线距离不过5公里,可这么多年都未曾再去过,再也见不到熟悉的面孔,再也不能坐在教室里安奈躁动的心。

毕业的时候忙忙碌碌的,没有参加过毕业典礼,多么想再去补考,再参加一次毕业典礼。

毕业的时候来不及说再见的人有很多

1.this 作用域处理方案

问题背景:手动触发事件,更改state中数据

  1. 箭头表达式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    mergeFilter(_this) {
    let store = [];
    if (!_this.state.merge) {
    store = _this.mergetIndexName(this.state.indices, /[^a-z]+$/);
    } else {
    store = _this.reBuildOriginData(this.state.indices);
    }
    _this.setState({
    store: store,
    merge: !_this.state.merge,
    });
    }
    render组件中调用
    1
    2
    3
    4
    5
    6
    7
    8
    <EuiSwitch
    label="Merge"
    key="MergeEuiSwitch"
    checked={
    this.state.merge
    }
    onChange={()=>this.mergeFilter(this)}
    />
  2. bind
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
     constructor(props) {
    super(props);
    this.state = {value: 'coconut'};

    this.handleChange = this.handleChange.bind(this);
    }

    handleChange(event) {
    this.setState({value: event.target.value});
    }

    render() {
    return (
    <form onSubmit={this.handleSubmit}>
    <label>
    Pick your favorite flavor:
    <select value={this.state.value} onChange={this.handleChange}>
    <option value="grapefruit">Grapefruit</option>
    <option value="lime">Lime</option>
    <option value="coconut">Coconut</option>
    <option value="mango">Mango</option>
    </select>
    </label>
    <input type="submit" value="Submit" />
    </form>
    );
    }

参考

1.he-select-tag

项目背景

本次教程是编写metrices,开发moduel 基本差不多,可以参考creating-metricbeat-module

本次教程是新增kafka metrices ,增加filesize metrices,实现的功能是根据配置的kafka 数据文件目录,获取所有topic,不同patition 数据文件大小,将该数据收集到elasticsearch中,通过kibana 根据不同粒度监控kafka集群。

正文

beats架构

image

项目生成

1
2
cd metricbeat
make create-metricset

根据提示输入相应的内容,然后生成field.yml文件(make update),编辑metricbeat.yml文件 ,编译然后运行即可。。

配置写入字段类型及文件

cd filesize

  1. 编辑fields.yml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    - name: filesize
    type: group
    description: >
    filesize
    fields:
    - name: topic
    type: keyword
    description: >
    topic
    - name: partition
    type: long
    description: >
    partition
    - name: filesize
    type: long
    description: >
    topic data file size

  2. 编辑docs.asciidoc

读取配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type MetricSet struct {
mb.BaseMetricSet
dataPath string
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
DataPath string `config:"dataPath"`
}{
DataPath:"",
}

err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
dataPath: config.DataPath,
}, nil
}

指标采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (m *MetricSet) Fetch(report mb.ReporterV2) {
PthSep := string(os.PathSeparator)
var dataPath = m.dataPath
files, _ := ioutil.ReadDir(dataPath)
for _, f := range files {
if !f.IsDir() {
continue
}
var path = dataPath+PthSep+f.Name()
cfiles,_ := ioutil.ReadDir(path)
var filesize = f.Size()

for _, cf := range cfiles {
filesize = filesize + cf.Size()
}

var name = f.Name();
var index = strings.LastIndex(name,"-")
if index <0 {
continue
}
var topic = name[0:index]
var partition = name[index+1:len(name)]
debugf("topic:%v",f.Name())
report.Event(mb.Event{
MetricSetFields: common.MapStr{
"topic": topic,
"partition": partition,
"filesize": filesize,
},
})
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package filesize

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/mb"
"io/ioutil"
"strings"
"os"
"github.com/elastic/beats/libbeat/logp"
)

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet("kafka", "filesize", New)
}

var debugf = logp.MakeDebug("kafka")
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
dataPath string
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
DataPath string `config:"dataPath"`
}{
DataPath:"",
}

err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
dataPath: config.DataPath,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
PthSep := string(os.PathSeparator)
var dataPath = m.dataPath
files, _ := ioutil.ReadDir(dataPath)
for _, f := range files {
if !f.IsDir() {
continue
}
var path = dataPath+PthSep+f.Name()
cfiles,_ := ioutil.ReadDir(path)
var filesize = f.Size()

for _, cf := range cfiles {
filesize = filesize + cf.Size()
}

var name = f.Name();
var index = strings.LastIndex(name,"-")
if index <0 {
continue
}
var topic = name[0:index]
var partition = name[index+1:len(name)]
debugf("topic:%v",f.Name())
report.Event(mb.Event{
MetricSetFields: common.MapStr{
"topic": topic,
"partition": partition,
"filesize": filesize,
},
})
}
}

运行

  1. 编译
    1
    2
    make collect
    make
  2. 运行
    1
    ./{beat} -e -d "*"
    * 代表选择输出的debug 日志,例如./metricset -e -d "kafka" 输出kafka moduel 相关debug log

tip: 在field.yml有变化的时候,记得先执行make update,该命令会重写metricbeat.yml文件。

开发建议

可以使用如下代码做到debug日志

1
2
var debugf = logp.MakeDebug("kafka")
debugf("topic:%v",f.Name())

参考

1.creating-metricsets

0%