OutOfMemoryError-unable to create native thread问题追究

1.问题背景

某天,发现线上elasticsearch 集群,有个节点down了,重启后,过了一个多小时,又down,这下才去认真的查看log,发现在down掉之前,有报以下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[67329.555s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

[67329.557s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

[2018-07-12T02:47:53,549][ERROR][o.e.b.ElasticsearchUncaughtExceptionHandler] [e11redis28.mercury.corp] fatal error in thread [elasticsearch[e11redis28.mercury.corp][refresh][T#2]], exiting

java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached

t java.lang.Thread.start0(Native Method) ~[?:?]

t java.lang.Thread.start(Thread.java:813) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:944) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1012) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:?]

t java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]

t java.lang.Thread.run(Thread.java:844) [?:?]

困扰了好几天,修改各种jvm 配置发现无用,就在github 上提了个issues,里面详细的描述了该问题及环境的配置。

2.该问题产生可能原因

经过查阅大量资料,才发现该错误不是我们一眼看的那么简单。

OutOfMemoryError: Unable to Create New Native Thread

这个错误其实已经告诉我们,该问题可能是两个原因造成的:

  • 内存不足

    该内存不足,其实不是指堆内存,而是创建栈内存不足。所以说出现该问题修改Xmx/Xms是无法解决的。默认情况下jvm 创建1个线程,会分配1m空间。这里所指内存不足是机器中可用内存不足。对于该原因的解决办法是给jvm 留够需要的内存,方法有多个:

    1. 减少Xss配置,这样话可以利用有限的内存创建更多的线程,但是需注意避免值过小栈溢出。
    2. 减少堆配置,主要还是留下足够内存供jvm 创建线程所用。
    3. 杀掉其他程序,留出空闲内存。
  • 机器线程数受限制

    众所周知,linux系统中对进程的创建是有限制的,不可能无限创建的,影响该数量主要是由以下三个方面:

    1. /proc/sys/kernel/threads-max
    2. max_user_process(ulimit –u)
    3. /proc/sys/kernel/pid_max

    通过这三个参数共同决定了linux机器中能创建线程数量

3.linux 进程相关信息

在这里主要是想写一些命令帮助人们分析linux进程下的线程等信息

  • 查看linux所有用户下所有进程数

    1
    ps -eLo ruser|awk 'NR>1'|sort|uniq -c
  • 查看进程中的线程数

    1
    ps –o nlwp 27989 
  • 查看某个进程下的线程

    1
    2
     top -H -p <pid>
    ps -eLf | grep <pid>

4.检测系统参数

  1. 查看内存 free -g

  2. 查看limit

    • ulimit -a
    • cat /proc/sys/kernel/threads-max
    • cat /proc/sys/kernel/pid_max

5.问题模拟验证

经过以上步骤分析,写了个脚本,监控该机器上内存使用情况及线程使用情况,发现问题出现在有个程序会创建大量的线程,该程序是java 程序。查看机器pid_max为32768,ulimit -u 未限制大小。 总结一下这个机器线程最多允许创建 32768。

以下是验证代码:注意该代码有一定危险性,请勿在window 下运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int i =0;
while(true) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});

thread.start();
i++;
System.out.println("curent thread num:"+i);
}

运行该代码,当线程增长到3.1w时,报该问题。

6.本次问题原因

本次问题是发现是有个程序创建了大量线程,造成达到linux 机器允许最大的线程数。切记这个不是一个参数控制的,多个参数控制pid_max,一般人可能会忽略!

7.问题解决办法

根据不同问题选择不同办法,详见第2点 该问题产生可能原因

8.参考

  1. We are out of memory
  2. 为何线程有PID
  3. Troubleshoot OutOfMemoryError: Unable to Create New Native Thread

eureka获取注册服务

eureka client获取注册服务

首先在启动类中增加@EnableDiscoveryClient

其次在spring 项目中注入

1
2
@Autowired
private DiscoveryClient discoveryClient;

然后通过以下方式进行获取

1
List<ServiceInstance> list = discoveryClient.getInstances("analyze");

analyze为注册服务名

eureka server获取注册服务

  1. 方法1

首先在配置文件中增加

1
eureka.client.fetch-registry=true

其次在spring 项目中注入

1
2
@Autowired
private DiscoveryClient discoveryClient;

然后通过以下方式进行获取

1
List<ServiceInstance> list = discoveryClient.getInstances("analyze");

analyze为注册服务名

这种方法有一定的延迟,原理和Client一样,如果需要及时更新那么需要配置一下其他参数,及时更新注册信息.

server

1
2
3
4
5
#清理时间间隔
eureka.server.eviction-interval-timer-in-ms=10000
#关闭自我保护模式。自我保护模式是指,出现网络分区、eureka在短时间内丢失过多客户端时,会进入自我保护模式。
#自我保护:一个服务长时间没有发送心跳包,eureka也不会将其删除,默认为true。
eureka.server.enable-self-preservation=false

client

1
2
3
4
#发送时间间隔
eureka.instance.lease-renewal-interval-in-seconds=10
#多长时间过期
eureka.instance.lease-expiration-duration-in-seconds=30
  1. 方法2

如果需要在server中获取,强烈建议使用这种方式,优点就是和dashboard状态保持一致。

1
2
3
4
5
6
7
8
PeerAwareInstanceRegistry registry = EurekaServerContextHolder.getInstance().getServerContext().getRegistry();
Applications applications = registry.getApplications();

applications.getRegisteredApplications().forEach((registeredApplication) -> {
registeredApplication.getInstances().forEach((instance) -> {
System.out.println(instance.getAppName() + " (" + instance.getInstanceId() + ") : " + response);
});
});

参考

  1. Spring Cloud中文文档
  2. Spring Cloud doc
  3. Eureka Server - list all registered instances

Spring Cloud

什么是Spring Cloud

Spring Cloud是一系列框架的有序集合。它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如配置管理服务发现断路器智能路由微代理控制总线等,都可以用Spring Boot的开发风格做到一键启动和部署。Spring并没有重复制造轮子,它只是将目前各家公司开发的比较成熟、经得起实际考验的服务框架组合起来,通过Spring Boot风格进行再封装屏蔽掉了复杂的配置和实现原理,最终给开发者留出了一套简单易懂、易部署和易维护的分布式系统开发工具包。

微服务是可以独立部署、水平扩展、独立访问(或者有独立的数据库)的服务单元,Spring Cloud就是这些微服务的大管家,采用了微服务这种架构之后,项目的数量会非常多,Spring Cloud做为大管家就需要提供各种方案来维护整个生态。

Spring Cloud就是一套分布式服务治理的框架,既然它是一套服务治理的框架,那么它本身不会提供具体功能性的操作,更专注于服务之间的通讯、熔断、监控等。因此就需要很多的组件来支持一套功能。

特性

Spring Cloud专注于提供良好的开箱即用经验的典型用例和可扩展性机制覆盖。

  • 分布式/版本化配置
  • 服务注册和发现
  • 路由
  • service - to - service调用
  • 负载均衡
  • 断路器
  • 全局锁
  • 决策竞选
  • 分布式消息传递

Spring Cloud组件架构

image

1、外部或者内部的非Spring Cloud项目都统一通过API网关(Zuul)来访问内部服务.
2、网关接收到请求后,从注册中心(Eureka)获取可用服务
3、由Ribbon进行均衡负载后,分发到后端的具体实例
4、微服务之间通过Feign进行通信处理业务
5、Hystrix负责处理服务超时熔断
6、Turbine监控服务间的调用和熔断相关指标

核心成员

Spring Cloud Netflix

这可是个大boss,地位仅次于老大,老大各项服务依赖与它,与各种Netflix OSS组件集成,组成微服务的核心,它的小弟主要有Eureka, Hystrix, Zuul, Archaius… 太多了

Netflix Eureka

服务中心,云端服务发现,一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移。这个可是springcloud最牛鼻的小弟,服务中心,任何小弟需要其它小弟支持什么都需要从这里来拿,同样的你有什么独门武功的都赶紧过报道,方便以后其它小弟来调用;它的好处是你不需要直接找各种什么小弟支持,只需要到服务中心来领取,也不需要知道提供支持的其它小弟在哪里,还是几个小弟来支持的,反正拿来用就行,服务中心来保证稳定性和质量。

Netflix Hystrix

熔断器,容错管理工具,旨在通过熔断机制控制服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。比如突然某个小弟生病了,但是你还需要它的支持,然后调用之后它半天没有响应,你却不知道,一直在等等这个响应;有可能别的小弟也正在调用你的武功绝技,那么当请求多之后,就会发生严重的阻塞影响老大的整体计划。这个时候Hystrix就派上用场了,当Hystrix发现某个小弟不在状态不稳定立马马上让它下线,让其它小弟来顶上来,或者给你说不用等了这个小弟今天肯定不行,该干嘛赶紧干嘛去别在这排队了。

Netflix Zuul

Zuul 是在云平台上提供动态路由,监控,弹性,安全等边缘服务的框架。Zuul 相当于是设备和 Netflix 流应用的 Web 网站后端所有请求的前门。当其它门派来找大哥办事的时候一定要先经过zuul,看下有没有带刀子什么的给拦截回去,或者是需要找那个小弟的直接给带过去。

Netflix Archaius

配置管理API,包含一系列配置管理API,提供动态类型化属性、线程安全配置操作、轮询框架、回调机制等功能。可以实现动态获取配置, 原理是每隔60s(默认,可配置)从配置源读取一次内容,这样修改了配置文件后不需要重启服务就可以使修改后的内容生效,前提使用archaius的API来读取。

Spring Cloud Config

俗称的配置中心,配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储、Git以及Subversion。就是以后大家武器、枪火什么的东西都集中放到一起,别随便自己带,方便以后统一管理、升级装备。

Spring Cloud Bus

事件、消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与Spring Cloud Config联合实现热部署。相当于水浒传中日行八百里的神行太保戴宗,确保各个小弟之间消息保持畅通。

Spring Cloud for Cloud Foundry

Cloud Foundry是VMware推出的业界第一个开源PaaS云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题

其实就是与CloudFoundry进行集成的一套解决方案,抱了Cloud Foundry的大腿。

Spring Cloud Cluster

Spring Cloud Cluster将取代Spring Integration。提供在分布式系统中的集群所需要的基础功能支持,如:选举、集群的状态一致性、全局锁、tokens等常见状态模式的抽象和实现。

如果把不同的帮派组织成统一的整体,Spring Cloud Cluster已经帮你提供了很多方便组织成统一的工具。

Spring Cloud Consul

Consul 是一个支持多数据中心分布式高可用的服务发现和配置共享的服务软件,由 HashiCorp 公司用 Go 语言开发, 基于 Mozilla Public License 2.0 的协议进行开源. Consul 支持健康检查,并允许 HTTP 和 DNS 协议调用 API 存储键值对.

Spring Cloud Consul 封装了Consul操作,consul是一个服务发现与配置工具,与Docker容器可以无缝集成。

其它小弟

Spring Cloud Security

基于spring security的安全工具包,为你的应用程序添加安全控制。这个小弟很牛鼻专门负责整个帮派的安全问题,设置不同的门派访问特定的资源,不能把秘籍葵花宝典泄漏了。

Spring Cloud Sleuth

日志收集工具包,封装了Dapper和log-based追踪以及Zipkin和HTrace操作,为SpringCloud应用实现了一种分布式追踪解决方案。

Spring Cloud Data Flow

Data flow 是一个用于开发和执行大范围数据处理其模式包括ETL,批量运算和持续运算的统一编程模型和托管服务。

对于在现代运行环境中可组合的微服务程序来说,Spring Cloud data flow是一个原生云可编配的服务。使用Spring Cloud data flow,开发者可以为像数据抽取,实时分析,和数据导入/导出这种常见用例创建和编配数据通道 (data pipelines)。

Spring Cloud data flow 是基于原生云对 spring XD的重新设计,该项目目标是简化大数据应用的开发。Spring XD 的流处理和批处理模块的重构分别是基于 Spring Boot的stream 和 task/batch 的微服务程序。这些程序现在都是自动部署单元而且他们原生的支持像 Cloud Foundry、Apache YARN、Apache Mesos和Kubernetes 等现代运行环境。

Spring Cloud data flow 为基于微服务的分布式流处理和批处理数据通道提供了一系列模型和最佳实践。

Spring Cloud Stream

Spring Cloud Stream是创建消息驱动微服务应用的框架。Spring Cloud Stream是基于Spring Boot创建,用来建立单独的/工业级spring应用,使用spring integration提供与消息代理之间的连接。数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。

一个业务会牵扯到多个任务,任务之间是通过事件触发的,这就是Spring Cloud stream要干的事了

Spring Cloud Task

Spring Cloud Task 主要解决短命微服务的任务管理,任务调度的工作,比如说某些定时任务晚上就跑一次,或者某项数据分析临时就跑几次。

Spring Cloud Zookeeper

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

操作Zookeeper的工具包,用于使用zookeeper方式的服务发现和配置管理,抱了Zookeeper的大腿。

Spring Cloud Connectors

Spring Cloud Connectors 简化了连接到服务的过程和从云平台获取操作的过程,有很强的扩展性,可以利用Spring Cloud Connectors来构建你自己的云平台。

便于云端应用程序在各种PaaS平台连接到后端,如:数据库和消息代理服务。

Spring Cloud Starters

Spring Boot式的启动项目,为Spring Cloud提供开箱即用的依赖管理。

Spring Cloud CLI

基于 Spring Boot CLI,可以让你以命令行方式快速建立云组件。

参考

1.Spring Cloud在国内中小型公司能用起来吗?

2.Spring Cloud Dalston中文文档

3.spring-cloud

x-pack之graph研究

简介

Graph 功能是一种基于 API 和基于 UI 的工具,能够让您数据中存在的相关关系浮现出来,同时能够在任何规模下利用各项 Elasticsearch 功能,例如分布式查询执行、实时数据可用性和索引等。

  • 反欺诈:挖掘海量购物行为数据和用户画像,探究由哪位店主对一组信用卡被盗事件负责。
  • 个性化推荐:根据听众偏好为喜欢莫扎特的听众推荐下一首最佳歌曲, 从而吸引和取悦听众。
  • 安全分析:发现潜在的破坏分子和其他意想不到的相关事件, 如挖掘网络中的主机与之进行通信的外部 IP的关联关系。

场景实战

业务场景

针对反欺诈场景,对商家打分作弊,马甲风险评估

准备分析数据

运行python IndexReviews.py 将review 数据写入到es 中

1
2
3
4
5
6
7
{
"date": "2006-04-22 13:48",
"reviewer": "15300",
"rating": 5,
"hour": "2006-04-22 13",
"seller": "74"
}

IndexReviews.py 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import csv
from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
import sys
reload(sys)
sys.setdefaultencoding('utf8')

es = Elasticsearch(
[
'http://192.168.0.101:9200/'
]
)
indexName = "reviews"
es.indices.delete(index=indexName, ignore=[400, 404])
indexSettings = {
"settings": {
"index.number_of_replicas": 0,
"index.number_of_shards": 1
},
"mappings": {

"review": {
"properties": {
"reviewer": {
"type": "keyword"
},
"seller": {
"type": "keyword"
},
"rating": {
"type": "integer"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm"
},
"hour": {
"type": "keyword"
}
}
}

}
}
es.indices.create(index=indexName, body=indexSettings)

actions = []
rowNum = 0
csvFilename = "reviews.csv"
with open(csvFilename, 'rb') as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
rowNum += 1
if rowNum == 1:
continue

action = {
"_index": indexName,
'_op_type': 'index',
"_type": "review",
"_source": {
"reviewer": row[0],
"seller": row[1],
"rating": int(row[2]),
"date": row[3],
"hour": row[3][:-3],
}
}
actions.append(action)
# Flush bulk indexing action if necessary
if len(actions) >= 5000:
print rowNum
helpers.bulk(es, actions)
del actions[0:len(actions)]

if len(actions) >= 0:
helpers.bulk(es, actions)
del actions[0:len(actions)]

使用xpack graph 分析

  1. 创建index
    创建用于输出结果的index,

  2. 使用es api 获取前50000个seller

  3. 调用graph 针对每个seller进行关系分析
    例如

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    POST review/_xpack/graph/_explore
    {
    "query": {
    "bool": {
    "must": [
    {
    "term": {"seller": "A9JN"}
    },
    {
    "match": {"rating": 5}
    }
    ]
    }
    },
    "controls": {
    "sample_size": 20,
    "use_significance": true
    },
    "vertices": [
    {
    "field": "customer",
    "size": 50,
    "min_doc_count": 1
    }
    ],
    "connections": {
    "query": {
    "term": {
    "seller": "A9JN"
    }
    },
    "vertices": [
    {
    "field": "hour",
    "size": 500,
    "min_doc_count": 1
    }
    ]
    }
    }
  4. 根据查询的图关系进行客户端图计算,查找出reviewer —->hour—->reviewer出现一次以上的路径相框,将该结果汇总。

  5. 将第四步的结果组织存储到es 中

    1
    2
    3
    4
    5
    6
    7
    {
    "url": workspaceUrl,
    "seller": sellerId,
    "riskType": "SockPuppetry",
    "riskRating": numCoincidences,
    "numDocs": sellerDetails["numReviews"]
    }

完整代码参考一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import networkx as nx
from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
import sys
import urllib
import json

es = Elasticsearch(
[
'http://192.168.0.101:9200/'
]
)

alertsIndexName = "alerts"

def createAlertsIndex():
alertsIndexSettings = {
"settings": {
"number_of_replicas": "0",
"number_of_shards": "1"
},
"mappings": {
"task": {
"properties": {
"riskType": {
"type": "keyword"
},
"url": {
"type": "text",
"index": "false"
}
}
}
}
}
if not es.indices.exists(index=alertsIndexName):
es.indices.create(index=alertsIndexName, body=alertsIndexSettings)

def getSellersList():
sellers=[]
sellersQuery = {
"size": 0,
"aggs": {
"topTerms": {
"terms": {
"field": "seller",
"size": 50000
}
}
}
}
results = es.search(index="reviews", body=sellersQuery)["aggregations"]["topTerms"]["buckets"]
for bucket in results:
sellers.append({
"seller" : bucket["key"],
"numReviews" : bucket["doc_count"]
})
return sellers

def nodeId(node):
return node["field"] + ":" + node["term"]

def erasePrint(msg):
sys.stdout.write('\r')
sys.stdout.write(msg)
sys.stdout.flush()

createAlertsIndex()
sellers=getSellersList()
rowNum = 0;
totalNumReviews=0
for sellerDetails in sellers:
rowNum +=1
totalNumReviews+=sellerDetails["numReviews"]
sellerId=sellerDetails["seller"]
q = {
"query": {
"bool": {
"must": [
{
"term": {"seller": sellerId}
},
{
"match": {"rating": 5}
}
]
}
},
"controls": {
"sample_size": 2000,
"use_significance": True
},
"vertices": [
{
# Find most loyal reviewers - significantly connected to the current seller
"field": "reviewer",
"size": 50,
"min_doc_count": 1
}
],
"connections": {
# Find date/times of loyal reviewers' activity - guide the exploration so only current-seller-related reviews
"query": {
"term": {
"seller": sellerId
}
},
# This will naturally favour date/times that are common to multiple reviewers
"vertices": [
{
"field": "hour",
"size": 500,
"min_doc_count": 1
}
]
}
}

erasePrint("Examining seller " + str(rowNum) + " of " + str(len(sellers)))

results = es.transport.perform_request('POST', "/reviews/_xpack/graph/_explore", body=q)

# Use NetworkX to create a client-side graph of reviewers and date/times we can analyze
G = nx.Graph()

for node in results["vertices"]:
G.add_node(nodeId(node), type=node["field"])
for edge in results["connections"]:
n1 = results["vertices"][int(edge["source"])]
n2 = results["vertices"][int(edge["target"])]
G.add_edge(nodeId(n1), nodeId(n2))

# Examine all "islands" of reviewers connected by same date/time
subgraphs = nx.connected_component_subgraphs(G)

numCoincidences = 0
for subgraph in subgraphs:
numHours = 0
reviewers = []
for n, d in subgraph.nodes(data=True):
if d["type"] == "hour":
numHours += 1
if d["type"] == "reviewer":
reviewers.append(n)
if numHours > 1:
if len(reviewers) > 1:
for srcI in range(0, len(reviewers)):
reviewer1 = reviewers[srcI]
for targetI in range(srcI + 1, len(reviewers)):
reviewer2 = reviewers[targetI]
paths = nx.all_simple_paths(subgraph, source=reviewer1, target=reviewer2, cutoff=2)
# Each path is a reviewer <-> date/time <-> reviewer triple
sameTimeReviews = 0
for path in paths:
sameTimeReviews += 1
# Two reviewers reviewing at same date/time is not a coincidence - however
# repeated synchronized reviews *are* a coincidence
if sameTimeReviews > 1:
numCoincidences += sameTimeReviews - 1

if numCoincidences > 0:
erasePrint("")
print "seller:" + str(sellerId) + " has ", numCoincidences, " reviewer coincidences in", sellerDetails["numReviews"], "reviews"
gq = json.dumps(q)
workspaceUrl = "graph#/workspace/32935810-5e4d-11e8-811e-7b68199c5a31?query=" + urllib.quote_plus(gq)
doc = {
"url": workspaceUrl,
"seller": sellerId,
"riskType": "SockPuppetry",
"riskRating": numCoincidences,
"numDocs": sellerDetails["numReviews"]
}
res = es.index(index=alertsIndexName, doc_type='task', id=sellerId, body=doc)
erasePrint("")
print "Completed analysis of", len(sellers), "sellers and",totalNumReviews,"reviews"

注意

本文代码适用于python2.7,其他版本,请自行参照修改

参考

  1. Fraud detection using the elastic stack

命令场景总结

命令 作用域 常用情景
git reset 提交层面 在私有分支上舍弃一些没有提交的更改
git reset 文件层面 将文件从缓存区中移除
git checkout 提交层面 切换分支或查看旧版本
git checkout 文件层面 舍弃工作目录中的更改
git revert 提交层面 在公共分支上回滚更改

详细解释

  1. git checkout

    一般有两种场景:切换分支,舍弃工作目录中的更改

例如:

1
2
git checkout brach2
git checkout -- src/cli/serve/serve.js
  1. git revert

    撤销一个提交的同时会创建一个新的提交。这是一个安全的方法,因为它不会重写提交历史。比如,下面的命令会找出倒数第二个提交,然后创建一个新的提交来撤销这些更改,然后把这个提交加入项目中。

1
2
git checkout hotfix
git revert HEAD~2
  1. git reset

    如果你的更改还没有共享给别人,git reset是撤销这些更改的简单方法。当你开发一个功能的时候发现『糟糕,我做了什么?我应该重新来过!』时,reset就像是go-to命令一样。
    除了在当前分支上操作,

  • 提交层面

reset将一个分支的末端指向另一个提交。这可以用来移除当前分支的一些提交。你传入HEAD以外的其他提交的时候要格外小心,因为reset操作会重写当前分支的历史

1
2
git checkout hotfix
git reset HEAD~2

可以通过传入这些标记来修改你的缓存区或工作目录:

–soft – 缓存区和工作目录都不会被改变

–mixed – 默认选项。缓存区和你指定的提交同步,但工作目录不受影响

–hard – 缓存区和工作目录都同步到你指定的提交

  • 文件层面
1
git reset HEAD~2 foo.py

会将当前的foo.py从缓存区中移除出去,而不会影响工作目录中对foo.py的更改。–soft、–mixed和–hard对文件层面的git reset毫无作用,因为缓存区中的文件一定会变化,而工作目录中的文件一定不变。

总结

一般我们开发中常用以下两个恢复文件

  1. git checkout
    1
    git checkout -- src/cli/serve/serve.js
    这条命令把serve.js从HEAD中签出并且把它恢复成未修改时的样子
  2. git reset –hard HEAD
    1
    git reset --hard HEAD
    这条命令会把你工作目录中所有未提交的内容清空(当然这不包括未置于版控制下的文件 untracked files).让工作目录回到上次提交时的状态(last committed state)
  3. git revert
    撤消(revert)了前期修改的提交(commit)是很容易的; 只要把出错的提交(commit)的名字(reference)做为参数传给命令
    1
    2
    git revert HEAD

    这条命令创建了一个撤消了上次提交(HEAD)的新提交

Elasticsearch集群原理探索

1. Elasticsearch Master 选举过程

2. Elasticsearch 文档检索过程

3. Elasticsearch 分片分配

3.1 集群级别

在es集群,可以通过以下设置,控制分片分配进程。

  • Cluster Level Shard Allocation

集群级别分片分配

在集群初始化(重启恢复),副本分配,再平衡,或者节点加入、离开触发集群分配分片

  • Disk-based Shard Allocation

依据磁盘分片分配

磁盘因素(默认值85%)达到最低值,将阻止最新分片到该机器上,或者直接移除分片。

达到85%,阻止,达到90,将移除分片到其他机器 ,达到95%对该node上的index 强制只读,如果磁盘够用,撤销只读index block。

主要设置参数如下:

1
2
3
4
5
6
7
8
9
PUT _cluster/settings
{
"transient": {
"cluster.routing.allocation.disk.watermark.low": "100gb",
"cluster.routing.allocation.disk.watermark.high": "50gb",
"cluster.routing.allocation.disk.watermark.flood_stage": "10gb",
"cluster.info.update.interval": "1m"
}
}
  • Shard Allocation Awareness and Forced Awarenessedit

分片分配意识

分片分配感知设置允许您告知Elasticsearch您的硬件配置,主要是用来解决避免物理机划分多个虚拟带来不利影响。详细配置见官网文档

  • Shard Allocation Filtering

主要存在以下参数

1
cluster.routing.allocation.include.{attribute}

分配分片到包含{attribute}node中

1
cluster.routing.allocation.require.{attribute}

分配分片到必须包含所有{attribute}node中

1
cluster.routing.allocation.exclude.{attribute}

分配分片到不包含{attribute}node中

attribute支持如下属性:

属性 注释
_name 匹配节点名称
_ip 匹配节点IP
_host 匹配hostnames

分片分配过滤

支持用include/exclude过滤器来控制分片的分配。过滤器可以设置在索引级别或者是集群级别

下线一个节点(10.0.0.1)可以按如下操作:

1
2
3
4
5
6
PUT _cluster/settings
{
"transient" : {
"cluster.routing.allocation.exclude._ip" : "10.0.0.1"
}
}

除了逗号作为分隔列出多个值之外,所有属性值都可以用通配符指定

1
2
3
4
5
6
PUT _cluster/settings
{
"transient": {
"cluster.routing.allocation.exclude._ip": "192.168.2.*"
}
}

3.2 index级别

  • 分片分配过滤 控制哪些分片落在哪些节点

详见集群

  • 延迟分配 延迟因为一个节点离开分配未分配分片
  1. 修改默认设置,默认值是1m
1
2
3
4
5
6
7
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}

  1. 监控延迟未分配分片
1
GET _cluster/health
  1. 永久移除一个节点

如果一个节点要永久性移除,需要立即分配分片,仅仅需要将timeout 设置为0

1
2
3
4
5
6
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "0"
}
}
  • 单个节点分片总数 对来自每个节点的相同索引的碎片数量进行硬性限制

以下动态设置允许您为每个节点允许的单个索引指定分片总数的硬性限制

1
index.routing.allocation.total_shards_per_node

同样可以设置节点限制,不用关心index情况

1
cluster.routing.allocation.total_shards_per_node

以上大小都是包括主本与副本数量,两个默认值是无穷大

引用

  1. Elasticsearch分布式一致性原理剖析(一)-节点篇
  2. elastic

jvm 在docker中内存占用问题探索

问题背景

最近有个项目在PRD上部署,因为涉及到读取大量数据,会出现内存占用。为了避免因为该项目影响线上其他服务,所以设置了-m=2048,结果发现运行会超过这个值,docker 进程即将该container killed.

随后设置了好几个级别,直到-m=6048,依然无法避免container 被干掉。但是在本地测试和在同事机器上测试,不会出现内存飙升。同样的数据,同样的容器,唯一不同的就是机器物理配置的不同。

问题原因

线上机器是128g内存,目前制作的jre image 是1.8版本,未设置堆栈等jvm 配置,那么jvm 会字节分配一个默认堆栈大小,这个大小是根据物理机配置分配的。这样就会造成越高的配置,默认分配(使用1/4的物理内存)的堆内存就越大,而docker设置限制内存大小,jvm却无法感知,不知道自己在容器中运行。目前存在该问题的不止jvm,一些linux 命令也是如此,例如:top,free,ps等。

因此就会出现container 被docker killed情况。这是个惨痛教训。。。

问题复现

略…

有个不错的文章,可以查看一下。

解决方案

  • Dockerfile增加jvm参数

在调用java 可以增加jvm 参数,控制堆栈大小。

1
CMD java  $JAVA_OPTIONS -jar java-container.jar
1
$ docker run -d --name mycontainer8g -p 8080:8080 -m 800M -e JAVA_OPTIONS='-Xmx300m' rafabene/java-container:openjdk-env
  • 选用Fabric8 docker image

镜像fabric8/java-jboss-openjdk8-jdk使用了脚本来计算容器的内存限制,并且使用50%的内存作为上限。也就是有50%的内存可以写入。你也可以使用这个镜像来开/关调试、诊断或者其他更多的事情

百度地图调用问题解决方案

背景

做地图可视化,我们经常使用百度地图,如果是纯js,在html 中调用,一般没什么问题,可以很好的融合,但是放在node.js(vue/angularjs/react)中经常会出现

1
Bmap is not defined

问题原因

产生上面的问题是因为百度地图api 会异步加载,在该js 还未加载完毕,即调用,即会产生该问题。这个问题发现许多人遇到,但是网上的回答特别少,有几个有帮助的也是提供思路,基本都是异步加载再调用。也就是等资源加载完毕再调用。

解决方案

新建map.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
export function MP() {
return new Promise(function (resolve, reject) {
// 如果已加载直接返回
if(typeof BMap !== 'undefined') {
resolve(BMap);
return true;
}
// 百度地图异步加载回调处理
window.onBMapCallback = function () {
console.log('百度地图脚本初始化成功...');
resolve(BMap);
};
const script = document.createElement('script');
script.type = 'text/javascript';
//script.src = 'http://api.map.baidu.com/api?v=2.0&ak=ZUONbpqGBsYGXNIYHicvbAbM';
script.src = 'http://api.map.baidu.com/api?v=2.0&ak=ZUONbpqGBsYGXNIYHicvbAbM&s=1&callback=onBMapCallback';
script.onerror = reject;
document.head.appendChild(script);
});
}

使用

1
2
3
4
5
6
7
import { MP } from './map.js';

MP().then(BMap => {
//调用
}).catch(error =>{
console.log('error');
});

相关项目

  • Bmap kibana 插件,开发该项目遇到这个问题

nginx 教程之 nginx 配置学习

1.location配置

语法规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
location [=|~|~*|^~] /uri/ { … }

= 表示精确匹配,这个优先级也是最高的

~ 表示区分大小写的正则匹配

~* 表示不区分大小写的正则匹配(和上面的唯一区别就是大小写)

^~ 表示uri以某个常规字符串开头,理解为匹配 url路径即可。nginx不对url做编码,因此请求为/static/20%/aa,可以被规则^~ /static/ /aa匹配到(注意是空格)。

!~和!~* 分别为区分大小写不匹配及不区分大小写不匹配的正则

/ 通用匹配,任何请求都会匹配到,默认匹配.

优先级=>^~>

首先匹配 =,其次匹配^~, 其次是按文件中顺序的正则匹配,最后是交给 / 通用匹配。当有匹配成功时候,停止匹配,按当前匹配规则处理请求

一般线上的配置

1
2
3
4
5
6
7
8
9
10
11
location ~* .*\.(js|css)?$
{
expires 7d; //7天过期,后续讲解
access_log off; //不保存日志
}

location ~* .*\.(png|jpg|gif|jpeg|bmp|ico)?$
{
expires 7d;
access_log off;
}

2.nginx 逻辑运算

nginx的配置中不支持if条件的逻辑与&& 逻辑或|| 运算 ,而且不支持if的嵌套语法,否则会报下面的错误:nginx: [emerg] invalid condition。
我们可以用变量的方式来间接实现。
要实现的语句:

1
2
3
if ($arg_unitid = 42012 && $uri ~/thumb/){
echo "www.baidu.com";
}

可以这么来实现,如下所示:

1
2
3
4
5
6
7
8
9
10
set $flag 0;
if ($uri ~ ^/thumb/[0-9]+_160.jpg$){
set $flag "${flag}1";
}
if ($arg_unitid = 42012){
set $flag "${flag}1";
}
if ($flag = "011"){
echo "www.baidu.com";
}

3.ngx_http_core_module模块提供的变量

参数名称 注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$arg_PARAMETER HTTP 请求中某个参数的值,如/index.php?site=www.ttlsa.com,可以用$arg_site取得www.ttlsa.com这个值.
$args HTTP 请求中的完整参数。例如,在请求/index.php?width=400&height=200 中,$args表示字符串width=400&height=200.
$binary_remote_addr 二进制格式的客户端地址。例如:\x0A\xE0B\x0E
$body_bytes_sent 表示在向客户端发送的http响应中,包体部分的字节数
$content_length 表示客户端请求头部中的Content-Length 字段
$content_type 表示客户端请求头部中的Content-Type 字段
$cookie_COOKIE 表示在客户端请求头部中的cookie 字段
$document_root 表示当前请求所使用的root 配置项的值
$uri 表示当前请求的URI,不带任何参数
$document_uri 与$uri 含义相同
$request_uri 表示客户端发来的原始请求URI,带完整的参数。$uri和$document_uri未必是用户的原始请求,在内部重定向后可能是重定向后的URI,而$request_uri 永远不会改变,始终是客户端的原始URI.
$host 表示客户端请求头部中的Host字段。如果Host字段不存在,则以实际处理的server(虚拟主机)名称代替。如果Host字段中带有端口,如IP:PORT,那么$host是去掉端口的,它的值为IP。$host 是全小写的。这些特性与http_HEADER中的http_host不同,http_host只取出Host头部对应的值。
$hostname 表示 Nginx所在机器的名称,与 gethostbyname调用返回的值相同
$http_HEADER 表示当前 HTTP请求中相应头部的值。HEADER名称全小写。例如,示请求中 Host头部对应的值 用 $http_host表
$sent_http_HEADER 表示返回客户端的 HTTP响应中相应头部的值。HEADER名称全小写。例如,用 $sent_ http_content_type表示响应中 Content-Type头部对应的值
$is_args 表示请求中的 URI是否带参数,如果带参数,$is_args值为 ?,如果不带参数,则是空字符串
$limit_rate 表示当前连接的限速是多少,0表示无限速
$nginx_version 表示当前 Nginx的版本号
$query_string 请求 URI中的参数,与 $args相同,然而 $query_string是只读的不会改变
$remote_addr 表示客户端的地址
$remote_port 表示客户端连接使用的端口
$remote_user 表示使用 Auth Basic Module时定义的用户名
$request_filename 表示用户请求中的 URI经过 root或 alias转换后的文件路径
$request_body 表示 HTTP请求中的包体,该参数只在 proxy_pass或 fastcgi_pass中有意义
$request_body_file 表示 HTTP请求中的包体存储的临时文件名
$request_completion 当请求已经全部完成时,其值为 “ok”。若没有完成,就要返回客户端,则其值为空字符串;或者在断点续传等情况下使用 HTTP range访问的并不是文件的最后一块,那么其值也是空字符串。
$request_method 表示 HTTP请求的方法名,如 GET、PUT、POST等
$scheme 表示 HTTP scheme,如在请求 https://nginx.com/中表示 https
$server_addr 表示服务器地址
$server_name 表示服务器名称
$server_port 表示服务器端口
$server_protocol 表示服务器向客户端发送响应的协议,如 HTTP/1.1或 HTTP/1.0

nginx 教程之 docker 化安装

1.Why docker

因为要依赖很多lib,安装复杂,提高了学习和使用的难度,所以推荐使用docker 部署,可以快速迁移,快速部署,更多优点详见docker官网

2.部署

这里我们选择官方镜像,使用文档也可以在这个页面内找到。

使用完整nginx配置文件替换默认配置信息

1
docker run --name=nginx --net=host -v /host/path/nginx.conf:/etc/nginx/nginx.conf -d nginx:1.13.7-alpine

当然还可以继承默认配置,这样使用

1
docker run --name=nginx --net=host -v /host/path/nginx.conf:/etc/nginx/nginx.conf:ro -d nginx:1.13.7-alpine

3.nginx image默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

user nginx;
worker_processes 1;

error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;


events {
worker_connections 1024;
}


http {
include /etc/nginx/mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;

sendfile on;
#tcp_nopush on;

keepalive_timeout 65;

#gzip on;

include /etc/nginx/conf.d/*.conf;
}

4.相关学习资源

0%