Elasticsearch在EventHub项目中的实战应用

Elasticsearch在EventHub项目中的实战应用

前言

Event Hub是一个高度可缩放、分布式、基于时间序列的事件中心,能够实时的处理流式事件并进行告警和提醒。

Event Hub作为Newegg事件信息中枢,产品化新蛋各产品资源及平台底层基础设施服务生命周期与运转中的重要事件信息,并构建完善的事件消费渠道与流程,支撑线上监控与运维。

Event Hub产品化提供的事件信息,由Newegg内部各产品模块与底层基础设施服务获取,经过聚合,判定和收敛再最终呈现。信息源来自各模块底层的系统日志与监控项,保障客户透传客户的信息准确性与价值。关于Event Hub更详细介绍,请查看Newegg的事件中心之Event Hub

为了实现事件的多维度查询,事件的追溯性,我们将事件存储在elasticsearch中。我们设计了两个index:event_hub_currentevent_hub_history

事件是具有时间序列特征的,我们会在event_hub_historyindex中写入每条事件信息,通过事件唯一ID,在event_hub_current中更新事件信息,随着时间的流逝,大多数情况我们关注的结果。如果你了解过实时计算,可以参照Stream和Table。一个是流式的,一个是结果。

Index设计

两个index模板,限制了timestamp字段类型为date,

event_hub_current

该index为事件结果,最终状态的,该index设计为一个,使用别名操作。

event_hub_history

该index为事件历史记录,该index设计为按一定规则生成(借助ILM),提高查询性能,使用别名操作。

数据更新

数据批量插入

1
2
3
4
5
POST _bulk
{ "index" : { "_index" : "index:`event_hub_current", "_id" : "1" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "index:`event_hub_current", "_id" : "2" } }
{ "field1" : "value2" }
1
2
3
4
5
6
7
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

数据批量更新

1
2
3
POST _bulk
{ "update" : {"_id" : "2", "_index" : "event_hub_current"} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }

存在的字段更新,不存在的字段插入。

根据_id更新指定字段

1
2
3
4
5
6
7
GET event_hub_current/_update/1
{

"script": {
"source": "ctx._source['test'] = 'test111'"
}
}

如果是更新多个字段的话,以分号分割:

1
2
3
4
5
6
7
GET event_hub_current/_update/1
{

"script": {
"source": "ctx._source['test'] = 'test111';ctx._source['field'] = 'name'"
}
}

根据_id新增字段

1
2
3
4
POST test/_update/1
{
"script" : "ctx._source.new_field = 'value_of_new_field'"
}

根据_id删除字段

1
2
3
4
POST event_hub_current/_update/1
{
"script" : "ctx._source.remove('new_field')"
}

查询和聚合

灵活的数据查询

为了支持输入框查询,给用户提供灵活的查询,我们选用simple_query_string

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
GET event_hub_current/_search
{
"size": 10000,
"query": {
"bool": {
"must": [
{
"simple_query_string": {
"query": "severity:(\"critical\",\"major\",\"minor\",\"warning\",\"normal\") AND status:(\"open\")"
}
}
],
"filter": {
"range": {
"timestamp": {
"gte": "now-60d",
"lte": "now"
}
}
}
}
}
}

过滤查询不需要的字段

1
2
3
4
5
6
7
8
9
10
11
12
GET event_hub_current/_search
{
"size": 10000,
"_source": {
"excludes": [
"text ",
"postDate ",
"attributes"
]
},
"query": {"match_all": {}}
}

数据排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET event_hub_current/_search
{
"size": 10000,
"sort": [
{
"severity.keyword": {
"order": "asc"
}
},
{
"timestamp": {
"order": "desc"
}
}
],
"query": {"match_all": {}}
}

获取字段Top50 数据

1
2
3
4
5
6
7
8
9
10
11
{
"size": 0,
"aggs": {
"Department": {
"terms": {
"field": "eventType.keyword",
"size": 50
}
}
}
}

按不同类型统计不同级别的事件占比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
"size": 0,
"aggs": {
"severity": {
"terms": {
"field": "severity.keyword",
"size": 10000
},
"aggs": {
"type": {
"terms": {
"field": "type.keyword",
"size": 10
}
}
}
}
},
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "severity:(\"critical\",\"major\",\"minor\",\"warning\",\"normal\",\"informational\",\"debug\",\"indeterminate\") AND status:(\"open\",\"ack\",\"hold\",\"unknown\",\"shelve\")"
}
}
],
"filter": {
"range": {
"timestamp": {
"gte": "now-60d",
"lte": "now"
}
}
}
}
}
}

其他

delete文档删除

因为current的设计和elasticsearch 更新的本质(先删除,后新增),current 索引会存在大量的标记为delete 的文档,从利用资源的角度,我们需要将它释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET event_hub_current-2020-12-12-1/_stats
{
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_all" : {
"primaries" : {
"docs" : {
"count" : 7912,
"deleted" : 34153
}
}
}
}

我们采用段合并的策略,删除已经标记为deleted的文档。

1
POST event_hub_current-2020-12-12-1/_forcemerge?only_expunge_deletes=true

only_expunge_deletes为true,表示仅清除(expunge )包含文档删除的片段