初衷

为什么我要写算法篇,记得刚入大学的第一节课,老师就教我们软件=算法+数据结构,而我恰恰这两点学的最差了,学习这块两个初衷:1.阿里机试折腰,让我清楚的意识到自己的问题 2.职业瓶颈,已经工作四年了,知识面有了,但是缺乏深度。暂且先选择将基础算法打扎实点。

声明以下所有的定义不具有权威性,皆来自我对该算法的理解。

1.初级排序

1.1冒泡法

这大概是最简答的排序,可我一直是搞错的。
定义:将数组相邻的数进行对比,直到选出最大值或者最小值,每次冒出一个数,后面的逻辑不再处理它
实现

1
2
3
4
5
6
7
8
9
10
11
// 从小到大排序
for (int i = 0; i < wait2Sort.length-1; i++) {// 控制循环次数
for (int j = 0; j < wait2Sort.length - 1 - i; j++) {
if (wait2Sort[j] > wait2Sort[j + 1]) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j + 1];
wait2Sort[j + 1] = temp;
}

}
}

N个数字要排序完成,总共进行N-1趟排序,每第 i 趟的排序次数为 (N-i) 次,所以可以用双重循环语句,外层控制循环多少趟,内层控制每一趟的循环次数。

按说算法是这样的确实没错,但是网络上还有另外一种写法,就是第一层循环N次,按说是结果不会错,但是算法多此一举。我想追究的是会不会多算一次。例如如下写法:

1
2
3
4
5
6
7
8
9
10
11
// 从小到大排序
for (int i = 0; i < wait2Sort.length; i++) {// 控制循环次数
for (int j = 0; j < wait2Sort.length - 1 - i; j++) {
if (wait2Sort[j] > wait2Sort[j + 1]) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j + 1];
wait2Sort[j + 1] = temp;
}

}
}

至于N次是否进行运算,我们就需要探讨当i=N-1时,是否进入第二个循环,当i=N-1时,第二个循环的条件就变成了j<N-1-(N-1)即j<0,这个永远不会成立,因此最后一次的循环不会进入二层循环。相对于正确算法只多了一次判断,性能可以忽略不计。

1.2选择排序法

1.2.1初阶选择排序法

定义:选取确定的数据依次与其他数据进行对比
实现

1
2
3
4
5
6
7
8
9
10
// 从小到大排序
for (int i = 0; i < wait2Sort.length - 1; i++) {
for (int j = i + 1; j < wait2Sort.length; j++) {
if (wait2Sort[i] > wait2Sort[j]) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[i];
wait2Sort[i] = temp;
}
}
}

1.2.2进阶选择排序法

定义:选取确定的数据依次与其他数据进行对比,但是和1.2.1区别在于内部循环只确定最大数的索引,数据交换是在外层循环做的。
实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 从小到大排序
for (int i = 0; i < wait2Sort.length-1; i++) {
int min = i;
for (int j = i + 1; j < wait2Sort.length; j++) {
if (wait2Sort[min] > wait2Sort[j]) {
min = j;
}
}

if (min != i) {
int temp = wait2Sort[min];
wait2Sort[min] = wait2Sort[i];
wait2Sort[i] = temp;
}
}

该算法较1.2.1区别在于内部循环只确定最大数的索引,数据交换是在外层循环做的。少了多次交换,相较于前两种性能更好!

1.3插入排序

定义:像整理牌一样,将每一张牌插入到有序数组中适当的位置。左侧永远是有序的,当运行到数据最右端,即排序完毕,这种算法对于数组中部分数据是有序的话,性能会有很大的提升!
实现

1
2
3
4
5
6
7
8
// 从小到大排序
for (int i = 1; i < wait2Sort.length; i++) {
for (int j = i; j > 0 && (wait2Sort[j - 1]) > wait2Sort[j]; j--) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j - 1];
wait2Sort[j - 1] = temp;
}
}

1.4希尔排序

定义:希尔排序是把记录按下标的一定增量分组,对每组使用直接插入排序算法排序;随着增量逐渐减少,每组包含的关键词越来越多,当增量减至1时,整个文件恰被分成一组,算法便终止
实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 从小到大排序
int h = 1;
while (h < wait2Sort.length / 3)
h = 3 * h + 1;
while (h >= 1) {
for (int i = h; i < wait2Sort.length; i++) {
for (int j = i; j - h >= 0 && wait2Sort[j - h] > wait2Sort[j]; j = j - h) {
int temp = wait2Sort[j - h];
wait2Sort[j - h] = wait2Sort[j];
wait2Sort[j] = temp;
}
}

h = h / 3;
}

上面已经演示了以4为初始间隔对包含10个数据项的数组进行排序的情况。对于更大的数组开始的间隔也应该更大。然后间隔不断减小,直到间隔变成1。

举例来说,含有1000个数据项的数组可能先以364为增量,然后以121为增量,以40为增量,以13为增量,以4为增量,最后以 1为增量进行希尔排序。用来形成间隔的数列被称为间隔序列。这里所表示的间隔序列由Knuth提出,此序列是很常用的。在排序算法中,首先在一个短小的循环中使用序列的生成公式来计算出最初的间隔。h值最初被赋为1,然后应用公式h=3*h+1生成序列1,4,13,40,121,364,等等。当间隔大于数组大小的时候,这个过程停止。

1
2
3
4
5
6
7
8
9
10
11
// 从小到大排序
for (int h = wait2Sort.length / 2; h > 0; h = h / 2) {
for (int i = h; i < wait2Sort.length; i++) {
for (int j = i; j>=h && wait2Sort[j - h] > wait2Sort[j]; j = j - h) {
int temp = wait2Sort[j];
wait2Sort[j] = wait2Sort[j - h];
wait2Sort[j - h] = temp;
}
}

}

这个也是正确的,间距为N/2

2.归并排序

定义:归并排序,也叫归并算法,指的是将两个顺序序列合并成一个顺序序列的方法

如 设有数列{6,202,100,301,38,8,1}
初始状态:6,202,100,301,38,8,1
第一次归并后:{6,202},{100,301},{8,38},{1},比较次数:3;
第二次归并后:{6,100,202,301},{1,8,38},比较次数:4;
第三次归并后:{1,6,8,38,100,202,301},比较次数:4;
总的比较次数为:3+4+4=11;
逆序数为14;

实现:

3.快速排序

4.堆排序

总结

参考

  1. 图解排序算法(二)之希尔排序

不回忆,就再也想不起了

毕业四年了,整整四年了,工作忙碌,生活奔波,没有就给我留下太多的时间去回忆,不回忆,就再也想不起来了。

想不起来刚进校园就因甲流被隔离的那一个月,那是快乐的一个月,没有学业,只有大学的新鲜,没有看不完的书,只有发不完的呆,再也不会有带着口罩满校溜达,回忆就像一扇窗,打开它,我能看到一副风景,风景中是我丢失的碎片记忆。

想不起大学的第一节课,想不到工作这么久,竟然干的工作多少还和它有点联系,烈日灼心,记忆中那节所在的教室在午后是那么的热,即使有空调,也感觉不到一丝凉意,它骨子里留给我的不是竞争,不是压力,即使我们有早操,即使我们下了下午的课,还得赶匆忙的晚自习,我骨子里是一个意志力很差的人,它教会了我坚持,不知道未来是什么样,只是依稀懂得需要坚持。

世间巧合万万个,最后一节课依然是那个教室,这大概就是所谓的缘分。回忆是件可怕的事,不想倒是没什么,一想瞬间千万片段释放,不知道是不是触电。物是人非,我离它直线距离不过5公里,可这么多年都未曾再去过,再也见不到熟悉的面孔,再也不能坐在教室里安奈躁动的心。

毕业的时候忙忙碌碌的,没有参加过毕业典礼,多么想再去补考,再参加一次毕业典礼。

毕业的时候来不及说再见的人有很多

1.this 作用域处理方案

问题背景:手动触发事件,更改state中数据

  1. 箭头表达式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    mergeFilter(_this) {
    let store = [];
    if (!_this.state.merge) {
    store = _this.mergetIndexName(this.state.indices, /[^a-z]+$/);
    } else {
    store = _this.reBuildOriginData(this.state.indices);
    }
    _this.setState({
    store: store,
    merge: !_this.state.merge,
    });
    }
    render组件中调用
    1
    2
    3
    4
    5
    6
    7
    8
    <EuiSwitch
    label="Merge"
    key="MergeEuiSwitch"
    checked={
    this.state.merge
    }
    onChange={()=>this.mergeFilter(this)}
    />
  2. bind
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
     constructor(props) {
    super(props);
    this.state = {value: 'coconut'};

    this.handleChange = this.handleChange.bind(this);
    }

    handleChange(event) {
    this.setState({value: event.target.value});
    }

    render() {
    return (
    <form onSubmit={this.handleSubmit}>
    <label>
    Pick your favorite flavor:
    <select value={this.state.value} onChange={this.handleChange}>
    <option value="grapefruit">Grapefruit</option>
    <option value="lime">Lime</option>
    <option value="coconut">Coconut</option>
    <option value="mango">Mango</option>
    </select>
    </label>
    <input type="submit" value="Submit" />
    </form>
    );
    }

参考

1.he-select-tag

项目背景

本次教程是编写metrices,开发moduel 基本差不多,可以参考creating-metricbeat-module

本次教程是新增kafka metrices ,增加filesize metrices,实现的功能是根据配置的kafka 数据文件目录,获取所有topic,不同patition 数据文件大小,将该数据收集到elasticsearch中,通过kibana 根据不同粒度监控kafka集群。

正文

beats架构

image

项目生成

1
2
cd metricbeat
make create-metricset

根据提示输入相应的内容,然后生成field.yml文件(make update),编辑metricbeat.yml文件 ,编译然后运行即可。。

配置写入字段类型及文件

cd filesize

  1. 编辑fields.yml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    - name: filesize
    type: group
    description: >
    filesize
    fields:
    - name: topic
    type: keyword
    description: >
    topic
    - name: partition
    type: long
    description: >
    partition
    - name: filesize
    type: long
    description: >
    topic data file size

  2. 编辑docs.asciidoc

读取配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type MetricSet struct {
mb.BaseMetricSet
dataPath string
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
DataPath string `config:"dataPath"`
}{
DataPath:"",
}

err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
dataPath: config.DataPath,
}, nil
}

指标采集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (m *MetricSet) Fetch(report mb.ReporterV2) {
PthSep := string(os.PathSeparator)
var dataPath = m.dataPath
files, _ := ioutil.ReadDir(dataPath)
for _, f := range files {
if !f.IsDir() {
continue
}
var path = dataPath+PthSep+f.Name()
cfiles,_ := ioutil.ReadDir(path)
var filesize = f.Size()

for _, cf := range cfiles {
filesize = filesize + cf.Size()
}

var name = f.Name();
var index = strings.LastIndex(name,"-")
if index <0 {
continue
}
var topic = name[0:index]
var partition = name[index+1:len(name)]
debugf("topic:%v",f.Name())
report.Event(mb.Event{
MetricSetFields: common.MapStr{
"topic": topic,
"partition": partition,
"filesize": filesize,
},
})
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package filesize

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/mb"
"io/ioutil"
"strings"
"os"
"github.com/elastic/beats/libbeat/logp"
)

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet("kafka", "filesize", New)
}

var debugf = logp.MakeDebug("kafka")
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
dataPath string
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
DataPath string `config:"dataPath"`
}{
DataPath:"",
}

err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
dataPath: config.DataPath,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
PthSep := string(os.PathSeparator)
var dataPath = m.dataPath
files, _ := ioutil.ReadDir(dataPath)
for _, f := range files {
if !f.IsDir() {
continue
}
var path = dataPath+PthSep+f.Name()
cfiles,_ := ioutil.ReadDir(path)
var filesize = f.Size()

for _, cf := range cfiles {
filesize = filesize + cf.Size()
}

var name = f.Name();
var index = strings.LastIndex(name,"-")
if index <0 {
continue
}
var topic = name[0:index]
var partition = name[index+1:len(name)]
debugf("topic:%v",f.Name())
report.Event(mb.Event{
MetricSetFields: common.MapStr{
"topic": topic,
"partition": partition,
"filesize": filesize,
},
})
}
}

运行

  1. 编译
    1
    2
    make collect
    make
  2. 运行
    1
    ./{beat} -e -d "*"
    * 代表选择输出的debug 日志,例如./metricset -e -d "kafka" 输出kafka moduel 相关debug log

tip: 在field.yml有变化的时候,记得先执行make update,该命令会重写metricbeat.yml文件。

开发建议

可以使用如下代码做到debug日志

1
2
var debugf = logp.MakeDebug("kafka")
debugf("topic:%v",f.Name())

参考

1.creating-metricsets

前言

beats 是什么?Beats是您在服务器上作为代理安装的开源数据托运方,用于将操作数据发送到Elasticsearch。

elastc 官方提供的beats 有以下方面:

Audit data Auditbeat
Log files Filebeat
Availability Heartbeat
Metrics Metricbeat
Network traffic Packetbeat
Windows event logs Winlogbeat

image
beats可以直接发送数据到Elasticsearch或通过Logstash,您可以在Kibana中进行可视化之前进一步处理和增强数据。

安装软件

  1. 安装新版 go

    beats 是用go 语言写的,因此需要安装go
    (1).配置GOROOT 例如:GOROOT=D:\Go</code>
    (2).配置Path 环境中新增D:\Go\bin
    (3).配置GOPATH GOPATH=D:\Go\GOPATH

  2. 安装 Python 2

    (1).安装python
    (2).安装pip

    1
    2
    wget https://bootstrap.pypa.io/get-pip.py
    sudo python get-pip.py

    (3). virtualenv
    virtualenv 用来生成支持make update

    1
    pip install virtualenv

源码配置

1
2
3
mkdir -p ${GOPATH}/src/github.com/elastic
cd ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats.git

切记上面配置地址路径,不然运行就会报错

编译运行

  1. 编译
    1
    2
    3
    make collect
    make update
    make
  2. 运行
    1
    ./{beat} -e -d "*"
    * 代表选择输出的debug 日志,例如./metricset -e -d "kafka" 输出kafka moduel 相关debug log

注意事项

执行make update可以生成配置文件和文档,例如运行beat需要field.yml文件,即可由该你命令生成,但是切记,该命令也会重写{beat}.yml配置文件

参考

  1. beats-contributing

Thymeleaf布局

前言

在web 开发过程中,经常会有一些公共页面,例如head,foot,menu等,利用layout可以极大的提高开发效率。通过使用Thymeleaf Layout Dialect,可以便捷使用布局,减少代码的重复。

正文

1.配置

在pom 中增加依赖:

1
2
3
4
<dependency>
<groupId>nz.net.ultraq.thymeleaf</groupId>
<artifactId>thymeleaf-layout-dialect</artifactId>
</dependency>

Spring Boot 2 java 初始化配置:

1
2
3
4
@Bean
public LayoutDialect layoutDialect() {
return new LayoutDialect();
}

以上配置会使layout 命名空间可以引入五种属性:decorate, title-pattern, insert, replace, fragment

Thymeleaf Layout Dialect 默认可以合并layout与content中head信息,默认在layout 标签之后追加content中的标签。

2.布局

Layout.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!DOCTYPE html>
<html xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<title>Layout page</title>
<script src="common-script.js"></script>
</head>
<body>
<header>
<h1>My website</h1>
</header>
<section layout:fragment="content">
<p>Page content goes here</p>
</section>
<footer>
<p>My footer</p>
<p layout:fragment="custom-footer">Custom footer here</p>
</footer>
</body>
</html>

Content1.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!DOCTYPE html>
<html xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout"
layout:decorate="~{Layout}">
<head>
<title>Content page 1</title>
<script src="content-script.js"></script>
</head>
<body>
<section layout:fragment="content">
<p>This is a paragraph from content page 1</p>
</section>
<footer>
<p layout:fragment="custom-footer">This is some footer content from content page 1</p>
</footer>
</body>
</html>

生成结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!DOCTYPE html>
<html>
<head>
<title>Content page 1</title>
<script src="common-script.js"></script>
<script src="content-script.js"></script>
</head>
<body>
<header>
<h1>My website</h1>
</header>
<section>
<p>This is a paragraph from content page 1</p>
</section>
<footer>
<p>My footer</p>
<p>This is some footer content from content page 1</p>
</footer>
</body>
</html>

3.布局传递数据

Child/content template:

1
<html layout:decorate="your-layout.html" th:with="greeting='Hello!'">

Parent/layout template:

1
2
3
<html>
...
<p th:text="${greeting}"></p> <!-- You'll end up with "Hello!" in here -->

4.配置title

Layout.html

1
2
3
4
5
6
7
<!DOCTYPE html>
<html xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<title layout:title-pattern="$LAYOUT_TITLE - $CONTENT_TITLE">My website</title>
</head>
...
</html>

Content.html

1
2
3
4
5
6
7
8
<!DOCTYPE html>
<html xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout"
layout:decorator="Layout">
<head>
<title>My blog</title>
</head>
...
</html>

生成结果如下:

1
2
3
4
5
6
7
<!DOCTYPE html>
<html>
<head>
<title>My website - My blog</title>
</head>
...
</html>

参考

  1. Thymeleaf Layout Dialect

Elasticsearch For HDFS

1.前言

本文主要是探讨repository-hdfs插件,该插件的目的是可以将es的数据备份到HDFS中,并且可以从该文件恢复数据。


2.安装

  1. 安装

    1
    sudo bin/elasticsearch-plugin install repository-hdfs

    这里的安装是在线安装,当然还可以离线安装。例如:

    1
    sudo bin/elasticsearch-plugin install file:///home/hadoop/elk/repository-hdfs.zip
  2. 卸载

    1
    sudo bin/elasticsearch-plugin remove repository-hdfs

3.使用

3.1创建仓库

  • 创建仓库
    1
    2
    3
    4
    5
    6
    7
    8
    PUT _snapshot/my_hdfs_repository
    {
    "type": "hdfs",
    "settings": {
    "uri": "hdfs://namenode:8020/",
    "path": "elasticsearch/respositories/my_hdfs_repository"
    }
    }
属性 描述
uri hdfs url 地址. 例如: “hdfs://:/“. (Required)
path 数据备份与加载地址. 例如: “path/to/file”. (Required)
load_defaults 是否加载默认的Hadoop配置。 (Enabled by default)
conf.<key> 要添加到Hadoop配置的内联配置参数。 (可选)插件只能识别hadoopcorehdfs配置文件中面向客户端的属性。
compress 是否压缩元数据. (Disabled by default)
chunk_size 覆盖块大小. (Disabled by default)
security.principal
  • 查看所有的仓库
    1
    GET /_snapshot/_all

3.2备份数据

  • 快照所有index
    1
    PUT /_snapshot/my_hdfs_repository/snapshot_all?wait_for_completion=false
  • 快照指定index
    1
    2
    3
    4
    5
    6
    PUT /_snapshot/my_hdfs_repository/snapshot_1?wait_for_completion=false
    {
    "indices": "index_1,index_2",
    "ignore_unavailable": true,
    "include_global_state": false
    }
    其中indices可以指定多个index,例如:index_1,index_2
  • 查看快照进度
    1
    GET /_snapshot/my_hdfs_repository/snapshot_all/_status
  • 取消快照任务(删除快照)
    1
    DELETE /_snapshot/my_hdfs_repository/snapshot_all
    这个可以删除一个存在的快照或者取消一个正在进行的快照。

3.3恢复数据

  • 恢复指定快照中所有index
    1
    POST /_snapshot/my_hdfs_repository/snapshot_all/_restore
  • 恢复指定快照中的index
    1
    2
    3
    4
    5
    6
    7
    8
    POST /_snapshot/my_hdfs_repository/snapshot_all/_restore
    {
    "indices": "metricbeat-6.3.0-2018.07.18",
    "ignore_unavailable": true,
    "include_global_state": true,
    "rename_pattern": "metricbeat-(.+)",
    "rename_replacement": "allrestored_metricbeat-$1"
    }
    在这里可以设置恢复后新的index的名称,也可以和之前的保持一致(前提是集群中不存在该index)
  • 查看恢复进度
    1
    2
    GET index1,index2/_recovery?human
    GET _cat/recovery?v
  • 恢复任务设置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    POST /_snapshot/my_hdfs_repository/snapshot_2/_restore
    {
    "indices": "ec_gatewaymsg_jpf",
    "ignore_unavailable": true,
    "include_global_state": true,
    "index_settings": {
    "index.number_of_replicas": 0
    },
    "rename_pattern": "ec_gatewaymsg_(.+)",
    "rename_replacement": "restore_$1"
    }
    在恢复过程,可以修改部分index 的设置,但是分片数除外。

4.参考

  1. repository-hdfs
  2. modules-snapshots

OutOfMemoryError-unable to create native thread问题追究

1.问题背景

某天,发现线上elasticsearch 集群,有个节点down了,重启后,过了一个多小时,又down,这下才去认真的查看log,发现在down掉之前,有报以下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[67329.555s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

[67329.557s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

[2018-07-12T02:47:53,549][ERROR][o.e.b.ElasticsearchUncaughtExceptionHandler] [e11redis28.mercury.corp] fatal error in thread [elasticsearch[e11redis28.mercury.corp][refresh][T#2]], exiting

java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached

t java.lang.Thread.start0(Native Method) ~[?:?]

t java.lang.Thread.start(Thread.java:813) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:944) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1012) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]

t java.lang.Thread.run(Thread.java:844) [?:?]

困扰了好几天,修改各种jvm 配置发现无用,就在github 上提了个issues,里面详细的描述了该问题及环境的配置。

2.该问题产生可能原因

经过查阅大量资料,才发现该错误不是我们一眼看的那么简单。

OutOfMemoryError: Unable to Create New Native Thread

这个错误其实已经告诉我们,该问题可能是两个原因造成的:

  • 内存不足

    该内存不足,其实不是指堆内存,而是创建栈内存不足。所以说出现该问题修改Xmx/Xms是无法解决的。默认情况下jvm 创建1个线程,会分配1m空间。这里所指内存不足是机器中可用内存不足。对于该原因的解决办法是给jvm 留够需要的内存,方法有多个:

    1. 减少Xss配置,这样话可以利用有限的内存创建更多的线程,但是需注意避免值过小栈溢出。
    2. 减少堆配置,主要还是留下足够内存供jvm 创建线程所用。
    3. 杀掉其他程序,留出空闲内存。
  • 机器线程数受限制

    众所周知,linux系统中对进程的创建是有限制的,不可能无限创建的,影响该数量主要是由以下三个方面:

    1. /proc/sys/kernel/threads-max
    2. max_user_process(ulimit –u)
    3. /proc/sys/kernel/pid_max

    通过这三个参数共同决定了linux机器中能创建线程数量

3.linux 进程相关信息

在这里主要是想写一些命令帮助人们分析linux进程下的线程等信息

  • 查看linux所有用户下所有进程数

    1
    ps -eLo ruser|awk 'NR>1'|sort|uniq -c
  • 查看进程中的线程数

    1
    ps –o nlwp 27989 
  • 查看某个进程下的线程

    1
    2
     top -H -p <pid>
    ps -eLf | grep <pid>

4.检测系统参数

  1. 查看内存 free -g

  2. 查看limit

    • ulimit -a
    • cat /proc/sys/kernel/threads-max
    • cat /proc/sys/kernel/pid_max

5.问题模拟验证

经过以上步骤分析,写了个脚本,监控该机器上内存使用情况及线程使用情况,发现问题出现在有个程序会创建大量的线程,该程序是java 程序。查看机器pid_max为32768,ulimit -u 未限制大小。 总结一下这个机器线程最多允许创建 32768。

以下是验证代码:注意该代码有一定危险性,请勿在window 下运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int i =0;
while(true) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});

thread.start();
i++;
System.out.println("curent thread num:"+i);
}

运行该代码,当线程增长到3.1w时,报该问题。

6.本次问题原因

本次问题是发现是有个程序创建了大量线程,造成达到linux 机器允许最大的线程数。切记这个不是一个参数控制的,多个参数控制pid_max,一般人可能会忽略!

7.问题解决办法

根据不同问题选择不同办法,详见第2点 该问题产生可能原因

8.参考

  1. We are out of memory
  2. 为何线程有PID
  3. Troubleshoot OutOfMemoryError: Unable to Create New Native Thread

eureka获取注册服务

eureka client获取注册服务

首先在启动类中增加@EnableDiscoveryClient

其次在spring 项目中注入

1
2
@Autowired
private DiscoveryClient discoveryClient;

然后通过以下方式进行获取

1
List<ServiceInstance> list = discoveryClient.getInstances("analyze");

analyze为注册服务名

eureka server获取注册服务

  1. 方法1

首先在配置文件中增加

1
eureka.client.fetch-registry=true

其次在spring 项目中注入

1
2
@Autowired
private DiscoveryClient discoveryClient;

然后通过以下方式进行获取

1
List<ServiceInstance> list = discoveryClient.getInstances("analyze");

analyze为注册服务名

这种方法有一定的延迟,原理和Client一样,如果需要及时更新那么需要配置一下其他参数,及时更新注册信息.

server

1
2
3
4
5
#清理时间间隔
eureka.server.eviction-interval-timer-in-ms=10000
#关闭自我保护模式。自我保护模式是指,出现网络分区、eureka在短时间内丢失过多客户端时,会进入自我保护模式。
#自我保护:一个服务长时间没有发送心跳包,eureka也不会将其删除,默认为true。
eureka.server.enable-self-preservation=false

client

1
2
3
4
#发送时间间隔
eureka.instance.lease-renewal-interval-in-seconds=10
#多长时间过期
eureka.instance.lease-expiration-duration-in-seconds=30
  1. 方法2

如果需要在server中获取,强烈建议使用这种方式,优点就是和dashboard状态保持一致。

1
2
3
4
5
6
7
8
PeerAwareInstanceRegistry registry = EurekaServerContextHolder.getInstance().getServerContext().getRegistry();
Applications applications = registry.getApplications();

applications.getRegisteredApplications().forEach((registeredApplication) -> {
registeredApplication.getInstances().forEach((instance) -> {
System.out.println(instance.getAppName() + " (" + instance.getInstanceId() + ") : " + response);
});
});

参考

  1. Spring Cloud中文文档
  2. Spring Cloud doc
  3. Eureka Server - list all registered instances

Spring Cloud

什么是Spring Cloud

Spring Cloud是一系列框架的有序集合。它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如配置管理服务发现断路器智能路由微代理控制总线等,都可以用Spring Boot的开发风格做到一键启动和部署。Spring并没有重复制造轮子,它只是将目前各家公司开发的比较成熟、经得起实际考验的服务框架组合起来,通过Spring Boot风格进行再封装屏蔽掉了复杂的配置和实现原理,最终给开发者留出了一套简单易懂、易部署和易维护的分布式系统开发工具包。

微服务是可以独立部署、水平扩展、独立访问(或者有独立的数据库)的服务单元,Spring Cloud就是这些微服务的大管家,采用了微服务这种架构之后,项目的数量会非常多,Spring Cloud做为大管家就需要提供各种方案来维护整个生态。

Spring Cloud就是一套分布式服务治理的框架,既然它是一套服务治理的框架,那么它本身不会提供具体功能性的操作,更专注于服务之间的通讯、熔断、监控等。因此就需要很多的组件来支持一套功能。

特性

Spring Cloud专注于提供良好的开箱即用经验的典型用例和可扩展性机制覆盖。

  • 分布式/版本化配置
  • 服务注册和发现
  • 路由
  • service - to - service调用
  • 负载均衡
  • 断路器
  • 全局锁
  • 决策竞选
  • 分布式消息传递

Spring Cloud组件架构

image

1、外部或者内部的非Spring Cloud项目都统一通过API网关(Zuul)来访问内部服务.
2、网关接收到请求后,从注册中心(Eureka)获取可用服务
3、由Ribbon进行均衡负载后,分发到后端的具体实例
4、微服务之间通过Feign进行通信处理业务
5、Hystrix负责处理服务超时熔断
6、Turbine监控服务间的调用和熔断相关指标

核心成员

Spring Cloud Netflix

这可是个大boss,地位仅次于老大,老大各项服务依赖与它,与各种Netflix OSS组件集成,组成微服务的核心,它的小弟主要有Eureka, Hystrix, Zuul, Archaius… 太多了

Netflix Eureka

服务中心,云端服务发现,一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移。这个可是springcloud最牛鼻的小弟,服务中心,任何小弟需要其它小弟支持什么都需要从这里来拿,同样的你有什么独门武功的都赶紧过报道,方便以后其它小弟来调用;它的好处是你不需要直接找各种什么小弟支持,只需要到服务中心来领取,也不需要知道提供支持的其它小弟在哪里,还是几个小弟来支持的,反正拿来用就行,服务中心来保证稳定性和质量。

Netflix Hystrix

熔断器,容错管理工具,旨在通过熔断机制控制服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。比如突然某个小弟生病了,但是你还需要它的支持,然后调用之后它半天没有响应,你却不知道,一直在等等这个响应;有可能别的小弟也正在调用你的武功绝技,那么当请求多之后,就会发生严重的阻塞影响老大的整体计划。这个时候Hystrix就派上用场了,当Hystrix发现某个小弟不在状态不稳定立马马上让它下线,让其它小弟来顶上来,或者给你说不用等了这个小弟今天肯定不行,该干嘛赶紧干嘛去别在这排队了。

Netflix Zuul

Zuul 是在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。Zuul 相当于是设备和 Netflix 流应用的 Web 网站后端所有请求的前门。当其它门派来找大哥办事的时候一定要先经过zuul,看下有没有带刀子什么的给拦截回去,或者是需要找那个小弟的直接给带过去。

Netflix Archaius

配置管理API,包含一系列配置管理API,提供动态类型化属性、线程安全配置操作、轮询框架、回调机制等功能。可以实现动态获取配置, 原理是每隔60s(默认,可配置)从配置源读取一次内容,这样修改了配置文件后不需要重启服务就可以使修改后的内容生效,前提使用archaius的API来读取。

Spring Cloud Config

俗称的配置中心,配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储、Git以及Subversion。就是以后大家武器、枪火什么的东西都集中放到一起,别随便自己带,方便以后统一管理、升级装备。

Spring Cloud Bus

事件、消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与Spring Cloud Config联合实现热部署。相当于水浒传中日行八百里的神行太保戴宗,确保各个小弟之间消息保持畅通。

Spring Cloud for Cloud Foundry

Cloud Foundry是VMware推出的业界第一个开源PaaS云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题

其实就是与CloudFoundry进行集成的一套解决方案,抱了Cloud Foundry的大腿。

Spring Cloud Cluster

Spring Cloud Cluster将取代Spring Integration。提供在分布式系统中的集群所需要的基础功能支持,如:选举、集群的状态一致性、全局锁、tokens等常见状态模式的抽象和实现。

如果把不同的帮派组织成统一的整体,Spring Cloud Cluster已经帮你提供了很多方便组织成统一的工具。

Spring Cloud Consul

Consul 是一个支持多数据中心分布式高可用的服务发现和配置共享的服务软件,由 HashiCorp 公司用 Go 语言开发, 基于 Mozilla Public License 2.0 的协议进行开源. Consul 支持健康检查,并允许 HTTP 和 DNS 协议调用 API 存储键值对.

Spring Cloud Consul 封装了Consul操作,consul是一个服务发现与配置工具,与Docker容器可以无缝集成。

其它小弟

Spring Cloud Security

基于spring security的安全工具包,为你的应用程序添加安全控制。这个小弟很牛鼻专门负责整个帮派的安全问题,设置不同的门派访问特定的资源,不能把秘籍葵花宝典泄漏了。

Spring Cloud Sleuth

日志收集工具包,封装了Dapper和log-based追踪以及Zipkin和HTrace操作,为SpringCloud应用实现了一种分布式追踪解决方案。

Spring Cloud Data Flow

Data flow 是一个用于开发和执行大范围数据处理其模式包括ETL,批量运算和持续运算的统一编程模型和托管服务。

对于在现代运行环境中可组合的微服务程序来说,Spring Cloud data flow是一个原生云可编配的服务。使用Spring Cloud data flow,开发者可以为像数据抽取,实时分析,和数据导入/导出这种常见用例创建和编配数据通道 (data pipelines)。

Spring Cloud data flow 是基于原生云对 spring XD的重新设计,该项目目标是简化大数据应用的开发。Spring XD 的流处理和批处理模块的重构分别是基于 Spring Boot的stream 和 task/batch 的微服务程序。这些程序现在都是自动部署单元而且他们原生的支持像 Cloud Foundry、Apache YARN、Apache Mesos和Kubernetes 等现代运行环境。

Spring Cloud data flow 为基于微服务的分布式流处理和批处理数据通道提供了一系列模型和最佳实践。

Spring Cloud Stream

Spring Cloud Stream是创建消息驱动微服务应用的框架。Spring Cloud Stream是基于Spring Boot创建,用来建立单独的/工业级spring应用,使用spring integration提供与消息代理之间的连接。数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

一个业务会牵扯到多个任务,任务之间是通过事件触发的,这就是Spring Cloud stream要干的事了

Spring Cloud Task

Spring Cloud Task 主要解决短命微服务的任务管理,任务调度的工作,比如说某些定时任务晚上就跑一次,或者某项数据分析临时就跑几次。

Spring Cloud Zookeeper

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

操作Zookeeper的工具包,用于使用zookeeper方式的服务发现和配置管理,抱了Zookeeper的大腿。

Spring Cloud Connectors

Spring Cloud Connectors 简化了连接到服务的过程和从云平台获取操作的过程,有很强的扩展性,可以利用Spring Cloud Connectors来构建你自己的云平台。

便于云端应用程序在各种PaaS平台连接到后端,如:数据库和消息代理服务。

Spring Cloud Starters

Spring Boot式的启动项目,为Spring Cloud提供开箱即用的依赖管理。

Spring Cloud CLI

基于 Spring Boot CLI,可以让你以命令行方式快速建立云组件。

参考

1.Spring Cloud在国内中小型公司能用起来吗?

2.Spring Cloud Dalston中文文档

3.spring-cloud

0%