0%

oozie搭建教程

简介

Apache Oozie是Hadoop工作流调度框架。它是一个运行相关的作业工作流系统。这里,用户被允许创建向非循环图工作流程,其可以在并列 Hadoop 并顺序地运行。

它由两部分组成:

工作流引擎:一个工作流引擎的职责是存储和运行工作流程,由 Hadoop 作业组成:MapReduce, Pig, Hive.
协调器引擎:它运行基于预定义的时间表和数据的可用性工作流程作业。
Oozie可扩展性和可管理及时执行成千上万的工作流程(每个由几十个作业)的Hadoop集群。

Oozie 也非常灵活。人们可以很容易启动,停止,暂停和重新运行作业。Oozie 可以很容易地重新运行失败的工作流。可以很容易重做因宕机或故障错过或失败的作业。甚至有可能跳过一个特定故障节点。

部署

编译

1
2
3
4
5
Unix box (tested on Mac OS X and Linux)
Java JDK 1.7+
Maven 3.0.1+
Hadoop 0.20.2+
Pig 0.7+

1.下载及解压

1
2
$ wget http://apache.fayea.com/oozie/4.3.0/oozie-4.3.0.tar.gz
$ tar xvf oozie-4.3.0.tar.gz

2.编译

进入解压后的目录 oozie-4.3.0

1
$ mvn clean package assembly:single -DskipTests

在国内的同学可以将中央仓库设成阿里云的地址,这样下载速度能快一点,部分下载不下来的,建议手动在mvnrepository.com中央仓库下载一下

配置

编译好的文件在以下路径

1
oozie-4.3.0/distro/target/oozie-4.3.0-distro/oozie-4.3.0/

1.环境变量配置

1
$ vi /etc/profile

在profile文件中添加以下内容:

1
2
export OOZIE_HOME=/oozie-4.3.0/distro/target/oozie-4.3.0-distro/oozie-4.3.0
export PATH=$PATH:$OOZIE_HOME/bin

执行以下命令生效

1
$ source /etc/profile

2.hadoop 集群集成

修改core-site.xml,添加以下信息,该教程全程采用的是root用户,因此配置中填入的是root,如果是其他用户需要改成其他用户,切记,不然会报没有权限的错误的。修改该文件后,还需要重启hadoop集群,以便参数生效。

1
2
3
4
5
6
7
8
9
 <property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>

<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>

3.oozie配置

以下配置默认在oozie-4.3.0/distro/target/oozie-4.3.0-distro/oozie-4.3.0/ 目录下操作

  • 配置文件修改

修改conf目录下oozie-site.xml文件,默认配置可以运行,但是运行hadoop job会报错的。主要是将hadoop集群的配置信息导入oozie中

1
2
3
4
5
6
7
8
9
10
11
12
 <property>
<name>oozie.service.HadoopAccessorService.hadoop.configurations</name>
<value>*=/data/bigdata/hadoop-2.6.0-cdh5.7.0/etc/hadoop</value>
<description>
Comma separated AUTHORITY=HADOOP_CONF_DIR, where AUTHORITY is the HOST:PORT of
the Hadoop service (JobTracker, HDFS). The wildcard '*' configuration is
used when there is no exact match for an authority. The HADOOP_CONF_DIR contains
the relevant Hadoop *-site.xml files. If the path is relative is looked within
the Oozie configuration directory; though the path can be absolute (i.e. to point
to Hadoop client conf/ directories in the local filesystem.
</description>
</property>
  • ext

新建libext目录

1
2
3
$ cd libext
$ wget http://archive.cloudera.com/gplextras/misc/ext-2.2.zip
$ cd ..
  • 打包
1
bin/oozie-setup.sh prepare-war
  • 复制依赖jar
1
2
3
4
$ cd oozie-4.3.0/distro/target/oozie-4.3.0-distro/oozie-4.3.0/oozie-server/webapps/oozie/WEB-INF/lib/
$ cp -rf /data/bigdata/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce1/lib/*.jar ./lib/
$ cp -rf /data/bigdata/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce1/*.jar ./lib/

删除其中mr1的包,及jsp-api.jar,jasper-compiler-5.5.23.jar

  • 上传 examples

上传examples及oozie-sharelib-4.3.0

1
2
3
4
$ tar xvf oozie-examples.tar.gz
$ tar xvf oozie-sharelib-4.3.0.tar.gz
$ hadoop fs -put examples examples
$ hadoop fs -put share share

4.运行

初始化数据库

1
$ bin/ooziedb.sh create -sqlfile oozie.sql -run

以下输出即为成功:

1
2
3
4
5
6
7
8
9
10
11
12
Validate DB Connection.
DONE
Check DB schema does not exist
DONE
Check OOZIE_SYS table does not exist
DONE
Create SQL schema
DONE
DONE
Create OOZIE_SYS table
DONE
Oozie DB has been created for Oozie version '4.3.0'

守护进程运行

1
$ bin/oozied.sh start

验证是否成功

1
$ bin/oozie admin -oozie http://localhost:11000/oozie -status

输出System mode: NORMAL即表示配置成功,或者在浏览器中打开
http://localhost:11000/oozie/

examples运行

1.执行map-reduce

1
$ oozie job -oozie http://localhost:11000/oozie -config examples/apps/map-reduce/job.properties -run

2.验证结果

1
2
$ oozie job -oozie http://localhost:11000/oozie -info 0000007-161223101553230-oozie-root-W

如下结果则证明成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Job ID : 0000007-161223101553230-oozie-root-W
------------------------------------------------------------------------------------------------------------------------------------
Workflow Name : map-reduce-wf
App Path : hdfs://192.168.0.105:8020/user/root/examples/apps/map-reduce/workflow.xml
Status : SUCCEEDED
Run : 0
User : root
Group : -
Created : 2016-12-23 08:40 GMT
Started : 2016-12-23 08:40 GMT
Last Modified : 2016-12-23 08:41 GMT
Ended : 2016-12-23 08:41 GMT
CoordAction ID: -

Actions
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Ext Status Err Code
------------------------------------------------------------------------------------------------------------------------------------
0000007-161223101553230-oozie-root-W@:start: OK - OK -
------------------------------------------------------------------------------------------------------------------------------------
0000007-161223101553230-oozie-root-W@mr-node OK job_1482454814338_0025 SUCCEEDED -
------------------------------------------------------------------------------------------------------------------------------------
0000007-161223101553230-oozie-root-W@end OK - OK -
------------------------------------------------------------------------------------------------------------------------------------

RedisCluster 集群配置文件损坏修复

问题现象

启动集群后,log文件显示如下信息:

1
2
Unrecoverable error: corrupted cluster config file.

这种情况,因为集群cluster-config-file文件损坏引起,导致该节点无法启动

修复方案

  1. 首先在各个node上移除该出错节点
  2. 删除该cluster-config-file文件
  3. 重新启动该节点
  4. 将该节点加入集群
  5. 指定该节点的master,将该节点以slave加入集群(本次修复未执行,集群自动恢复正常)

实施步骤

一、首先在各个node上移除该出错节点

执行以下脚本,将fail节点移除出集群,该操作仅移除一个机器中的无用节点信息,如果是多个机器,请在host_array中添加多个机器IP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
host_array=(192.168.0.101)
for var in ${host_array[@]};
do
for i in $(seq 7000 7012)
do
nodeids=`src/redis-cli -c -h $var -p $i cluster nodes|grep fail|awk '{print $1}'`
for d in $nodeids
do
echo $d
src/redis-cli -c -h $var -p $i cluster forget $d
done
done
done

二、删除该cluster-config-file文件

1
mv nodes-7004.conf nodes-7004.conf.bak

三、重新启动该节点

1
redis-server redis.conf

四、将该节点加入集群

在集群任意instance 执行以下命令

1
CLUSTER MEET <ip> <port>   //将ip和port所指定的节点添加到集群当中,让它成为集群的一份子

五、指定该节点的master,将该节点以slave加入集群

在此次修复场景中未进行第五步,集群自动分配成为不均衡节点的slave

如果集群未自动分配,在需要加入集群的实例上执行,指定为某一个master的slave

1
CLUSTER REPLICATE <node_id> //将当前节点设置为 node_id 指定的节点的从节点。

kafka patition计算

kafka生产信息的时候,可以指定该信息存储在具体的patition上,如果不指定会使用默认的算法计算要落的patition.以下是针对此做一个探讨和理解

消息路由

  1. 指定了 patition,则直接使用;
  2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
  3. patition 和 key 都未指定,使用轮询选出一个 patition。

Kafka Client计算patition

详细代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//创建消息实例
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException("Invalid timestamp " + timestamp);
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
}

//计算 patition,如果指定了 patition 则直接使用,否则使用 key 计算
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
if (partition != null) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int lastPartition = partitions.size() - 1;
if (partition < 0 || partition > lastPartition) {
throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
}
return partition;
}
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

// 使用 key 选取 patition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
return DefaultPartitioner.toPositive(nextValue) % numPartitions;
}
} else {
//对 keyBytes 进行 hash 选出一个 patition
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

one step

创建topic-move-test.json,迁移可以指定多个topic

1
2
3
4
5
6
7
{"topics": [{
"topic": "upgrade-kafka"
},{
"topic":"upgrade-kafka1"
}],
"version": 1
}

two step

在kafka工作目录,执行以下生成迁移配置,其中91为需要迁移到的broker id

1
bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --topics-to-move-json-file topic-move-test.json --broker-list "91" --generate

生成以下内容:

1
2
3
4
5
6
7
Current partition replica assignment

{"version":1,"partitions":[{"topic":"upgrade-kafka1","partition":1,"replicas":[96]},{"topic":"upgrade-kafka","partition":5,"replicas":[96]},{"topic":"upgrade-kafka1","partition":2,"replicas":[91]},{"topic":"upgrade-kafka","partition":1,"replicas":[92]},{"topic":"upgrade-kafka","partition":4,"replicas":[95]},{"topic":"upgrade-kafka1","partition":0,"replicas":[95]},{"topic":"upgrade-kafka","partition":3,"replicas":[94]},{"topic":"upgrade-kafka","partition":0,"replicas":[91]},{"topic":"upgrade-kafka","partition":2,"replicas":[93]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"upgrade-kafka","partition":5,"replicas":[91]},{"topic":"upgrade-kafka1","partition":1,"replicas":[91]},{"topic":"upgrade-kafka","partition":1,"replicas":[91]},{"topic":"upgrade-kafka","partition":4,"replicas":[91]},{"topic":"upgrade-kafka1","partition":2,"replicas":[91]},{"topic":"upgrade-kafka","partition":3,"replicas":[91]},{"topic":"upgrade-kafka1","partition":0,"replicas":[91]},{"topic":"upgrade-kafka","partition":0,"replicas":[91]},{"topic":"upgrade-kafka","partition":2,"replicas":[91]}]}

three step

新建expand-cluster-reassignment.json文件,将上一步输出放入,内容如下:

1
{"version":1,"partitions":[{"topic":"upgrade-kafka","partition":5,"replicas":[91]},{"topic":"upgrade-kafka1","partition":1,"replicas":[91]},{"topic":"upgrade-kafka","partition":1,"replicas":[91]},{"topic":"upgrade-kafka","partition":4,"replicas":[91]},{"topic":"upgrade-kafka1","partition":2,"replicas":[91]},{"topic":"upgrade-kafka","partition":3,"replicas":[91]},{"topic":"upgrade-kafka1","partition":0,"replicas":[91]},{"topic":"upgrade-kafka","partition":0,"replicas":[91]},{"topic":"upgrade-kafka","partition":2,"replicas":[91]}]}

four step

按自己需求可以重新编辑expand-cluster-reassignment.json内容,修改broker id.
执行迁移命令

1
bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

检查执行结果

1
bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

以下即为迁移成功:

1
2
3
4
5
6
7
8
9
10
11
Status of partition reassignment:
Reassignment of partition [upgrade-kafka,4] completed successfully
Reassignment of partition [upgrade-kafka,5] completed successfully
Reassignment of partition [upgrade-kafka1,0] completed successfully
Reassignment of partition [upgrade-kafka,2] completed successfully
Reassignment of partition [upgrade-kafka1,1] completed successfully
Reassignment of partition [upgrade-kafka,1] completed successfully
Reassignment of partition [upgrade-kafka,3] completed successfully
Reassignment of partition [upgrade-kafka,0] completed successfully
Reassignment of partition [upgrade-kafka1,2] completed successfully

Reference

  1. kafka.apache.org

zookeeper运维经验

参数配置

在默认zoo.cfg配置中,zookeeper生成的历史镜像,log不会删除,生成的频率也比较快,因此生产环境需要配置以下两个参数

  • autopurge.purgeInterval

解释:清楚间隔,单位小时,默认值为0,修改次值,表示启用清楚

  • autopurge.snapRetainCount

解释:保留数量,默认为3,最小值为3

引用

  1. zookeeper官网

kafka使用教程

shell操作

1.创建topic

–replication-factor 2:备份因子为2

–partitions 10:partitions数目为10

1
bin/kafka-topics.sh --create --zookeeper 192.168.0.101:2181 --replication-factor 2 --partitions 10 --topic upgrade-kafka_test

2.查询topic

列出所有的topic

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

3.删除topic

首先需要确认集群是否配置delete.topic.enable=true,配置后即可删除,确保topic没有被使用。

1
2
bin/kafka-topics.sh --delete --zookeeper 192.168.0.101:2181 --topic upgrade-kafka_test

程序操作

参考

  1. kafka official website

Kafka原理及设计理论总结

一、数据可靠性保证

当Producer向Leader发送数据时,可以通过request.required.acks参数设置数据可靠性的级别

  1. 0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
  2. 1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
  3. -1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证
    仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失。要保证数据不丢除了设置acks=-1, 还要保 证ISR的大小大于等于2,具体参数设置:

(1).request.required.acks:设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
(2).min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica
Producer要在吞吐率和数据可靠性之间做一个权衡

二、数据一致性保证

一致性定义:若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到

  1. HighWaterMark

简称HW: Partition的高水位,取一个partition对应的ISR中最小的LEO作为HW,消费者最多只能消费到HW所在的位置,另外每个replica都有highWatermark,leader和follower各自负责更新自己的highWatermark状态,highWatermark <= leader. LogEndOffset
2. 对于Leader新写入的msg

Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置

这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取。对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)

一、项目介绍

为了更好监控RedisCluster集群状态信息,提升性能,抛弃之前通过java api获取info信息。此次项目分为两个方面:

  1. 通过redis自带info信息监控集群
  2. 监控网络流量获取对集群的使用情况的监控。

总之,通过不同的粒度监控RedisCluster集群运行状况,提供良好的管理运维平台。

二、技术架构

1.info信息

1
2
3
graph LR
Logstash-->ElasticSearch
ElasticSearch-->Kibana

Logstash良好的插件结构设计,我们可以根据不同场景选择合适的input,filter,output插件。为了高效配置监控集群,input插件我们基于exec自定义了自己的插件redis-exec插件。output插件直接选择elasticsearch插件。

2.网络流量

1
2
3
graph LR
Packagebeat-->ElasticSearch
ElasticSearch-->Kibana

Packagebeat是一个分布式网络数据抓包软件,可以直接监控redis协议信息。

Kibana是一个es可视化的作图工具,根据不同的搜索条件可制定监控不同指标信息的动态图,让使用人员可以直观监控集群运行状况。

三、难点

1.redis-exec插件开发

详见logstash插件开发
2.监控指标

详见国外案例

centos6.X版本,docker version1.7.1版本创建过程如下:

  1. 运行容器
    1
    $ docker run -d -p 5000:5000 registry
  2. 修改配置

在docker1.3.X版本以后,与docker registry交互默认使用的是https,此处需要修改为http。在/etc/sysconfig/docker文件中添加以下内容即可:

1
other_args="$other_args --insecure-registry myregistry.example.com:5000 "

在1.12最新版本中可以使用以下方式修改,原理是一致的。

1
2
 Create or modify /etc/docker/daemon.json
{ "insecure-registries":["myregistry.example.com:5000"] }

然后重新启动

1
$ service docker restart
  1. 测试新的registry
  • 镜像打标签
    1
    $ docker tag <img_id> myregistry.example.com:5000/truman/opentsdb
  • 提交镜像
    1
    $ docker push  myregistry.example.com:5000/truman/opentsdb
    然后通过docker images即可查看到push 的镜像

在别的机器中就可以拉取镜像了命令如下:

1
docker pull myregistry.example.com:5000/truman/opentsdb

简介

logstash是一个实时流水式开源数据收集引擎。具有强大的plugin。可以根据自己的业务场景选择不同的input filter output。绝大多数情况下都是结合ElasticSearch Kibana一起使用的,俗称ELK。

模块介绍

Logstash使用管道方式进行日志的搜集处理和输出。有点类似*NIX系统的管道命令 xxx | ccc | ddd,xxx执行完了会执行ccc,然后执行ddd。

在logstash中,包括了三个阶段:

输入input –> 处理filter(不是必须的) –> 输出output

配置文件说明

前面介绍过logstash基本上由三部分组成,input、output以及用户需要才添加的filter,因此标准的配置文件格式如下:

1
2
3
4
5
6
7
8
9
input {

}
filter {

}
output {

}

执行说明

1
bin/logstash -f demo.conf

使用Demo

Output plugins ElasticSearch

案例使用如下:

1
2
3
4
5
6
7
8
9
output {
#stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
template_overwrite => true
index => "rediscluster-%{+YYYY.MM.dd}"
workers => 5
}
}

Output plugins opentsdb

使用logstash收集数据,并发送到opentsdb中。分为三部分:Input,Filter,Output

输入数据时,输入一条数据,回车。以下为三条测试数据:

1
2
3
threads.ThreadCount 1352279077 67 host=server1 port=1006
gc.PSScavenge.CollectionTime 1352279137 1360 host=server2 port=1010
memorypool.CodeCache.Usage_used 1352279137 11625472 host=server1 port=1009
  • Input采用命令行输入数据

    1
    2
    3
    4
    input {
    stdin{
    }
    }
  • Filter过滤组织数据

采用的是grok插件,可以使用其他插件完成相同的目的

1
2
3
4
5
6
7
filter {
grok {
match => { "message" => "%{DATA:metricName} %{NUMBER:unixtime} %{NUMBER:data} host=%{DATA:metricHost} port=%{NUMBER:port}" }
remove_field => [ "host" ]
}

}

备忘

  1. logstash输入数据自带host,@timestamp等自带,为了避免干扰存入opentsdb数据,此处特将隐含的host字段去掉。
  2. DATA/NUMBER等实为grok自带的正则规则。
  • Output输出数据

此处输出数据到opentsdb中,官方文档有误,详见源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
output {
stdout { codec => rubydebug }#此处是为了将filter结果输出到控制台中
opentsdb {
host => '***.***.***.***'
port => 4242
metrics => [
"%{metricName}",
"%{data}",
"host",
"%{metricHost}",
"port",
"%{port}"
]
}
}

备忘

opentsdb输入信息格式为:put metric timestamp value tagname=tagvalue tag2=value2,在logstash-output-opentsdb插件metrics配置中默认已经输入timestamp,因此metrics需要配置的第一个参数为metricName,第二个参数为 value 之后依次为tagname,tagValue。

参考

  1. https://www.elastic.co/guide/en/logstash/current/index.html