0%

Filter plugins

  1. grok
    1
    2
    3
    4
    grok {
    match => {"command" => "redis-cli -c -h %{IP:node:} -p %{NUMBER:port}%{DATA:data}" }
    remove_field => [ "host" ]
    }
  2. ruby

功能描述:将redis info 信息格式化按字段输出

1
2
3
4
5
6
7
8
9
10
11
12
ruby {
code => "fields = event['message'].split(/\r\n|\n/)
length = fields.length-1
for i in 1..length do
if fields[i].include?':' then
field = fields[i].split(':')
event[field[0]] = field[1].to_f
end
end
"
remove_field => [ "message" ]
}
  1. mutate

功能描述:字段类型指定

1
2
3
4
5
6
7
8
filter {
mutate {
convert => {"latestResponse" => "integer"}
convert => {"cacheHit" => "string"}
convert => {"cacheRate" => "float"}
}

}

Output plugins

背景

logstash强大魅力在于它的插件体系,虽然官方插件很多,但不可能满足所有的要求,因此就需要定制化个性化插件,本次结合Logstash Monitor Redis需求开发专用插件,以实现动态化获取master 实例中info 信息。

logstash插件介绍

体系结构

1
2
3
4
5
6
7
8
9
10
11
12
13
$ tree logstash-input-example
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── lib
│ └── logstash
│ └── inputs
│ └── example.rb
├── logstash-input-example.gemspec
└── spec
└── inputs
└── example_spec.rb

其实只需要这logstash-input-example.gemspec,example.rb两个文件即可。
mypluginname_spec.rb 是测试类。

先看看logstash-input-example.gemspec都做了什么吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Gem::Specification.new do |s|
s.name = 'logstash-input-example'
s.version = '2.0.4'
s.licenses = ['Apache License (2.0)']
s.summary = "This example input streams a string at a definable interval."
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Elastic"]
s.email = 'info@elastic.co'
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
s.require_paths = ["lib"]

# Files
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }

# Gem dependencies
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_development_dependency 'logstash-devutils', '>= 0.0.16'
end

上面的信息,只要改改版本和名字,其他的信息基本不需要动。

关键的信息还有:

  • s.require_paths定义了插件核心文件的位置
  • s.add_runtime_dependency 定义了插件运行的环境
    然后再看看example.rb
    这个文件就需要详细说说了,基本的框架如下,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    # encoding: utf-8
    require "logstash/inputs/base"
    require "logstash/namespace"
    require "stud/interval"
    require "socket" # for Socket.gethostname

    # Generate a repeating message.
    #
    # This plugin is intented only as an example.

    class LogStash::Inputs::Example < LogStash::Inputs::Base
    config_name "example"

    # If undefined, Logstash will complain, even if codec is unused.
    default :codec, "plain"

    # The message string to use in the event.
    config :message, :validate => :string, :default => "Hello World!"

    # Set how frequently messages should be sent.
    #
    # The default, `1`, means send a message every second.
    config :interval, :validate => :number, :default => 1

    public
    def register
    @host = Socket.gethostname
    end # def register

    def run(queue)
    # we can abort the loop if stop? becomes true
    while !stop?
    event = LogStash::Event.new("message" => @message, "host" => @host)
    decorate(event)
    queue << event
    # because the sleep interval can be big, when shutdown happens
    # we want to be able to abort the sleep
    # Stud.stoppable_sleep will frequently evaluate the given block
    # and abort the sleep(@interval) if the return value is true
    Stud.stoppable_sleep(@interval) { stop? }
    end # loop
    end # def run

    def stop
    # nothing to do in this case so it is not necessary to define stop
    # examples of common "stop" tasks:
    # * close sockets (unblocking blocking reads/accepts)
    # * cleanup temporary files
    # * terminate spawned threads
    end
    end # class LogStash::Inputs::Example
    挨行看看!

首先第一行的# encoding: utf-8,不要以为是注释就没什么作用。它定义了插件的编码方式。

下面两行:

require “logstash/inputs/base”
require “logstash/namespace”
引入了插件必备的包。

1
2
class LogStash::Inputs::Example < LogStash::Inputs::Base
config_name "example"

插件继承自Base基类,并配置插件的使用名称。

下面的一行对参数做了配置,参数有很多的配置属性,完整的如下:

1
config :variable_name,:validate =>:variable_type,:default =>"Default value",:required => boolean,:deprecated => boolean

其中

variable_name就是参数的名称了。
validate 定义是否进行校验,如果不是指定的类型,在logstash -f xxx –configtest的时候就会报错。它支持多种数据类型,比如:string, :password, :boolean, :number, :array, :hash, :path (a file-system path), :codec (since 1.2.0), :bytes.
default 定义参数的默认值
required 定义参数是否是必须值
deprecated 定义参数的额外信息,比如一个参数不再推荐使用了,就可以通过它给出提示!典型的就是es-output里面的Index_type,当使用这个参数时,就会给出提示

插件安装

  1. 便捷安装方式

第一步,首先把这个插件文件夹拷贝到下面的目录中

1
logstash-2.1.0\vendor\bundle\jruby\1.9\gems

第二步,修改logstash根目录下的Gemfile,添加如下的内容:

1
gem "logstash-filter-example", :path => "vendor/bundle/jruby/1.9/gems/logstash-filter-example-1.0.0"

第三步,编写配置文件,test.conf:

1
2
3
4
5
6
7
8
9
10
11
input{
example{}
}
filter{

}
output{
stdout{
codec => rubydebug
}
}

第四步,输入logstash -f test.conf时,输入任意字符,回车~~~大功告成!

1
2
3
4
5
6
{
"message" => "Hello World!",
"@version" => "1",
"@timestamp" => "2016-01-27T19:17:18.932Z",
"host" => "cadenza"
}
  1. 官方指导方式

第一步,build

1
gem build logstash-input-example.gemspec

会在当前路径下生成logstash-input-example-2.0.4.gem
第二步,install

1
bin/logstash-plugin install /logstash-input-example/logstash-input-example-2.0.4.gem

验证

1
2
3
validating /logstash-input-example/logstash-input-example-2.0.4.gem >= 0
Valid logstash plugin. Continuing...
Successfully installed 'logstash-input-example' with version '2.0.4'

第三步,查看plugin:

1
bin/logstash-plugin list

第四步,使用

开发案例

开发插件实现根据cluster nodes信息获取redis cluster 中master节点 info信息。使用该插件只用输入一条命令,即可动态获取相关信息。

插件开发

此插件是基于exec基础上封装的,主要修改内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def execute(command, queue)
@logger.debug? && @logger.debug("Running exec", :command => command)
begin
@io = IO.popen(command)
fields = (@io.read).split(/\r\n|\n/)
puts fields
length = fields.length-1
for i in 0..length do
if fields[i].include?':' then
field = fields[i].split(':')
newcommand = "redis-cli -c -h #{field[0]} -p #{field[1]} info"
@io = IO.popen(newcommand)
@codec.decode(@io.read) do |event|
decorate(event)
event.set("host", @hostname)
event.set("command", newcommand)
queue << event
end
end

end
rescue StandardError => e
@logger.error("Error while running command",
:command => command, :e => e, :backtrace => e.backtrace)
rescue Exception => e
@logger.error("Exception while running command",
:command => command, :e => e, :backtrace => e.backtrace)
ensure
stop
end
end

使用Demo

使用方式

1
2
3
4
5
redisexec {
command => "redis-cli -h 127.0.0.1 -p 6379 cluster nodes|grep master|awk '{print $2}'"
interval => 20
type => "info"
}

完整使用案例

将info 信息存储到 ElasticSerach中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
input {
redisexec {
command => "redis-cli -h 127.0.0.1 -p 6379 cluster nodes|grep master|awk '{print $2}'"
interval => 20
type => "info"
}
}
filter {
grok {
match => {"command" => "redis-cli -c -h %{IP:node:} -p %{NUMBER:port}%{DATA:data}" }
remove_field => [ "host" ]
}
ruby {
code => "fields = event['message'].split(/\r\n|\n/)
length = fields.length-1
for i in 1..length do
if fields[i].include?':' then
field = fields[i].split(':')
event[field[0]] = field[1].to_f
end
end
"
remove_field => [ "message" ]
}
}
output {
#stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
template_overwrite => true
index => "rediscluster-%{+YYYY.MM.dd}"
workers => 5
}
}

参考

  1. http://www.cnblogs.com/xing901022/p/5259750.html
  2. https://github.com/logstash-plugins?utf8=%E2%9C%93&query=example
  3. https://www.elastic.co/guide/en/logstash/current/_how_to_write_a_logstash_input_plugin.html

1.新建一个Maven项目JDK版本和系统版本不对应,

2.右键Maven项目->Maven->Update ProjectJDK版本改变了,

3.操作系统的JDK重装了新的版本,这是引起前面两个现象的主要原因。

修改方法(假如系统jdk版本是1.8):

方法一:在pom.xml文件中指定jdk的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<build>  
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
```
方法二:修改settings.xml,找到profiles节点,在里面添加
jdk-1.8 true 1.8 1.8 1.8 1.8 ``` 然后在update项目就可以保持和系统jdk版本一致。推荐使用第二种方法,减少对每个项目配置。

一、spring-boot-maven-plugin

使用案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>com.aibibang.AppMain</mainClass>
<addResources>true</addResources>
<excludes>
<exclude>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>

二、自定义打包

在开发过程中经常需要将依赖包与代码包分离,配置文件与代码包分离,这样更便于部署与修改参数。依次案例基于以上场景展开,借助于maven-jar-plugin与maven-assembly-plugin。

pom内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.aibibang.AppMain</mainClass>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
<excludes>
<exclude>*.properties</exclude>
<exclude>*.xml</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

package.xml内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<assembly>
<id>bin</id>
<!-- 最终打包成一个用于发布的zip文件 -->
<formats>
<format>tar.gz</format>
</formats>

<fileSets>
<!-- 打包jar文件 -->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory></outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
<!-- 把项目相关的启动脚本,打包进zip文件的bin目录 -->
<fileSet>
<directory>${project.basedir}/src/main/bash</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*</include>
</includes>
</fileSet>

<!-- 把项目的配置文件,打包进zip文件的config目录 -->
<fileSet>
<directory>${project.build.directory}/classes</directory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>*.properties</include>
<include>*.xml</include>
</includes>
</fileSet>
</fileSets>
<!-- 把项目的依赖的jar打包到lib目录下 -->
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<scope>runtime</scope>
<excludes>
<exclude>${groupId}:${artifactId}</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

maven-jar-plugin只是讲代码打成一个jar,而对部署包的构建是由assembly插件完成的。

结合启动、停止脚本即可高效便捷的部署一个项目。

start.sh:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/sh
export JAVA_HOME=$JAVA_HOME
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
PIDFILE=service.pid
ROOT_DIR="$(cd $(dirname $0) && pwd)"
CLASSPATH=./*:$ROOT_DIR/lib/*:$ROOT_DIR/conf/
JAVA_OPTS="-Xms512m -Xmx1024m -XX:+UseParallelGC"
MAIN_CLASS=com.aibibang.AppMain


if [ ! -d "logs" ]; then
mkdir logs
fi

if [ -f "$PIDFILE" ]; then
echo "Service is already start ..."
else
echo "Service start ..."
nohup java $JAVA_OPTS -cp $CLASSPATH $MAIN_CLASS 1> logs/server.out 2>&1 &
printf '%d' $! > $PIDFILE
fi

stop.sh:

1
2
3
4
5
6
7
8
9
#!/bin/sh
PIDFILE=service.pid

if [ -f "$PIDFILE" ]; then
kill -9 `cat $PIDFILE`
rm -rf $PIDFILE
else
echo "Service is already stop ..."
fi

Elasticsearch使用备注

简介

beats + elasticsearch +logstash + kibana 这套工具集合出自于Elastic公司 https://www.elastic.co/guide/index.html
工具集功能

  • beats是结合elasticsearch,logstash,kibana进行数据分析展示工具集
  • beats主动获取数据,如:日志数据,文件数据,top数据,网络包数据,数据库数据等
  • logstash(可选)用于日志分析,然后将分析后的数据存储到elasticsearch中
  • elasticsearch用于分析、存储beats获取的数据
  • kibana用于展示图形elasticsearch上的数据,如:线图,饼图,表格等

备注

  1. Elasticsearch rest api
  • 查询模板
    1
    http://localhost:9200/_template
  • 查询索引
    1
    http://localhost:9200/_cat/indices
  • 查重指定索引数据
    1
    http://localhost:9200/packetbeat-*/_search?pretty
  1. Kibana地址
    1
    http://localhost:5601/
  2. Plugin Head集群可视化管理工具
    需要额外安装
    访问地址如下:
    1
    http://localhost:9200/_plugin/head/

index template管理

1.删除模板

1
curl -XDELETE 'http://localhost:9200/_template/packetbeat'

2.上传模板

1
curl -XPUT 'http://localhost:9200/_template/packetbeat' -d@/etc/packetbeat/packetbeat.template.json

3.删除documents

1
curl -XDELETE 'http://localhost:9200/packetbeat-*'

目标

  1. 将txt数据通过mapredure写到hbase中
  2. 将sqlserver数据写入hive表中,从hive表中写入hbase
  3. 将sqlserver数据写入hbase

实现

1.第一个目标

一、上传原数据文件

将data.txt文件上传到到hdfs上,内容如下:

1
2
3
4
key1	col1	value1  
key2 col2 value2
key3 col3 value3
key4 col4 value4

数据以制表符(\t)分割。

二、将数据写成HFile

通过mapredure将data.txt按hbase表格式写成hfile

pom.xml文件中依赖如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.4.0</version>
</dependency>
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.0-cdh5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.0-cdh5.4.0</version>
</dependency>

编写BulkLoadMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static final Logger logger = LoggerFactory.getLogger(BulkLoadMapper.class);
private String dataSeperator;
private String columnFamily1;
private String columnFamily2;

public void setup(Context context) {
Configuration configuration = context.getConfiguration();//获取作业参数
dataSeperator = configuration.get("data.seperator");
columnFamily1 = configuration.get("COLUMN_FAMILY_1");
columnFamily2 = configuration.get("COLUMN_FAMILY_2");
}

public void map(LongWritable key, Text value, Context context){
try {
String[] values = value.toString().split(dataSeperator);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(values[0]));
Put put = new Put(Bytes.toBytes(values[0]));
put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("words"), Bytes.toBytes(values[1]));
put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("sum"), Bytes.toBytes(values[2]));

context.write(rowKey, put);
} catch(Exception exception) {
exception.printStackTrace();
}

}

}

编写BulkLoadDriver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class BulkLoadDriver extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(BulkLoadDriver.class);
private static final String DATA_SEPERATOR = "\t";
private static final String TABLE_NAME = "truman";//表名
private static final String COLUMN_FAMILY_1="personal";//列组1
private static final String COLUMN_FAMILY_2="professional";//列组2

public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "D:/hadoop");
System.setProperty("HADOOP_USER_NAME", "root");
logger.info("---------------------------------------------");
try {
int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args);
if(response == 0) {
System.out.println("Job is successfully completed...");
} else {
System.out.println("Job failed...");
}
} catch(Exception exception) {
exception.printStackTrace();
}
}

public int run(String[] args) throws Exception {
String inputPath = "/user/truman/data.txt";
String outputPath = "/user/truman/hfile";
/**
* 设置作业参数
*/
Configuration configuration = getConf();

configuration.set("mapreduce.framework.name", "yarn");
configuration.set("yarn.resourcemanager.address", "192.168.1.2:8032");
configuration.set("yarn.resourcemanager.scheduler.address", "192.168.1.2:8030");
configuration.set("fs.defaultFS", "hdfs://192.168.1.2:8020");
configuration.set("mapred.jar", "D://workspace//SqlDataToHbase//target//SqlDataToHbase-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name", TABLE_NAME);
configuration.set("COLUMN_FAMILY_1", COLUMN_FAMILY_1);
configuration.set("COLUMN_FAMILY_2", COLUMN_FAMILY_2);

/* configuration.set("hbase.zookeeper.quorum", "192.168.1.2,192.168.1.3,192.168.1.4");
configuration.set("hbase.zookeeper.property.clientPort", "2181");*/

Job job = Job.getInstance(configuration, "Bulk Loading HBase Table::" + TABLE_NAME);
job.setJarByClass(BulkLoadDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类
job.setMapOutputValueClass(Put.class);//指定输出值类
job.setMapperClass(BulkLoadMapper.class);//指定Map函数
FileInputFormat.addInputPaths(job, inputPath);//输入路径
FileSystem fs = FileSystem.get(configuration);
Path output = new Path(outputPath);
if (fs.exists(output)) {
fs.delete(output, true);//如果输出路径存在,就将其删除
}
FileOutputFormat.setOutputPath(job, output);//输出路径


Connection connection = ConnectionFactory.createConnection(configuration);
TableName tableName = TableName.valueOf(TABLE_NAME);
HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
job.waitForCompletion(true);
if (job.isSuccessful()){
HFileLoader.doBulkLoad(outputPath, TABLE_NAME,configuration);//导入数据
return 0;
} else {
return 1;
}
}

}

整个项目需要将hbase-site.xml、yarn-site.xml、mapred-site.xml放入resources下。本地运行出错的话,再加入org.apache.hadoop.io.nativeio.NativeIO到当前工程中

三、数据加载

  1. 命令方式

首先修改hadoop-env.sh配置,加入以下:

1
2
export HBASE_HOME=/data/bigdata/hbase-1.0.0-cdh5.4.0
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/hbase-server-1.0.0-cdh5.4.0.jar:$HBASE_HOME/lib/hbase-server-1.0.0-cdh5.4.0-tests.jar:$HBASE_HOME/conf:$HBASE_HOME/lib/zookeeper-3.4.5-cdh5.4.0.jar:$HBASE_HOME/lib/guava-12.0.1.jar:$HBASE_HOME/lib/hbase-client-1.0.0-cdh5.4.0.jar:$HADOOP_CLASSPATH:$HBASE_HOME/lib/*

将数据按HFile写入到hdfs中,然后进入$HBASE_HOME/bin中执行以下命令

1
/data/bigdata/hadoop-2.6.0-cdh5.4.0/bin/hadoop jar ../lib/hbase-server-1.0.0-cdh5.4.0.jar completebulkload /user/truman/hfile  truman
  1. java方式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class HFileLoader {
    public static void doBulkLoad(String pathToHFile, String tableName,Configuration configuration){
    try {

    HBaseConfiguration.addHbaseResources(configuration);
    LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
    HTable hTable = new HTable(configuration, tableName);//指定表名
    loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据
    System.out.println("Bulk Load Completed..");
    } catch(Exception exception) {
    exception.printStackTrace();
    }

    }

    }
    四、结果查询
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    [root@LAB3 bin]# ./hbase shell
    HBase Shell; enter 'help<RETURN>' for list of supported commands.
    Type "exit<RETURN>" to leave the HBase Shell
    Version 1.0.0-cdh5.4.0, rUnknown, Tue Apr 21 12:19:34 PDT 2015

    hbase(main):001:0> scan 'truman'
    ROW COLUMN+CELL
    key1 column=personal:words, timestamp=1476179060738, value=col1
    key1 column=professional:sum, timestamp=1476179060738, value=value1
    key2 column=personal:words, timestamp=1476179060738, value=col2
    key2 column=professional:sum, timestamp=1476179060738, value=value2
    key3 column=personal:words, timestamp=1476179060738, value=col3
    key3 column=professional:sum, timestamp=1476179060738, value=value3
    key4 column=personal:words, timestamp=1476179060738, value=col4
    key4 column=professional:sum, timestamp=1476179060738, value=value4
    4 row(s) in 0.4300 seconds

  1. docker version

查看docker安装版本
2. docker search

查找opentsdb相关的镜像

1
$ docker search opentsdb
  1. docker pull

拉去镜像

1
$ docker pull **/**
  1. docker ps/build

查看当前机器运行的docker容器

构建镜像

1
docker build -t=truman/redis:3.0.6 .
  1. docker run
  • 不带参数
1
$ docker run ubuntu /bin/echo 'Hello world'
参数 解释
docker 告诉操作系统我们要使用docker应用
docker run 组合起来意思就是运行一个docker镜像
ubuntu 镜像(image)名称
/bin/echo ‘Hello world’ 告诉docker我们要在容器中执行的操作
之后我们就可以看到输出结果:Hello world
  • 带参
    1
    2
    $ docker run -t -i ubuntu /bin/bash
    $ docker run -d -p 127.0.0.1:80:9000 --privileged -v /var/run/docker.sock:/var/run/docker.sock uifd/ui-for-docker
参数 解释
-t 为这个容器分配一个虚拟的终端
-i 保持对于容器的stdin为打开的状态(输入)。
-d 让docker容器在后台中运行
-p 将docker容器内部端口映射到我们的host上面,我们可以使用 docker port CONTAINER_ID 来查询容器的端口 映射情况
一般情况下 -i 与 -t 参数都是结合在一起使用,这样交互会比较好一点。
  • 镜像运行传参

这个参数是在容器生成的时候传入的,例如:指定hosts

1
docker run -d -p 4244:4242 --name opentsdb5 --add-host lab1:192.168.0.101 --add-host lab2:192.168.0.102 --add-host lab3:192.168.0.103 truman/opentsdb  

都是在镜像名字之前传入的,可以写多个
6. docher start/stop/restart

该命令可以操作容器
7. docker rmi

强制删除镜像

1
$ docker rmi -f <img_id>
  1. docker logs

在容器以守护进程运行的过程中,可以通过docker logs命令查看log日志,具体用法如下:

1
$ docker logs -ft <img_id>

以终端模式查看最新log。还有其他命令:docker logs –tail 10 获取日志最后10行内容,也可以使用 docker logs –tail 0 -f 跟踪最新日志
9. 更多命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Commands:
attach Attach to a running container
build Build an image from a Dockerfile
commit Create a new image from a container's changes
cp Copy files/folders from a container's filesystem to the host path
create Create a new container
diff Inspect changes on a container's filesystem
events Get real time events from the server
exec Run a command in a running container
export Stream the contents of a container as a tar archive
history Show the history of an image
images List images
import Create a new filesystem image from the contents of a tarball
info Display system-wide information
inspect Return low-level information on a container or image
kill Kill a running container
load Load an image from a tar archive
login Register or log in to a Docker registry server
logout Log out from a Docker registry server
logs Fetch the logs of a container
pause Pause all processes within a container
port Lookup the public-facing port that is NAT-ed to PRIVATE_PORT
ps List containers
pull Pull an image or a repository from a Docker registry server
push Push an image or a repository to a Docker registry server
rename Rename an existing container
restart Restart a running container
rm Remove one or more containers
rmi Remove one or more images
run Run a command in a new container
save Save an image to a tar archive
search Search for an image on the Docker Hub
start Start a stopped container
stats Display a stream of a containers' resource usage statistics
stop Stop a running container
tag Tag an image into a repository
top Lookup the running processes of a container
unpause Unpause a paused container
version Show the Docker version information
wait Block until a container stops, then print its exit code

所有指令都是大写

  1. ADD,COPY

两个都是将本地文件复制到镜像中,区别是ADD可以指定绝对路径的文件,言外之意是可以上传除当前目录之外的文件。而COPY只能上传当前目录的文件。

这两条命令复制文件夹的话,只会讲子目录复制到指定目录下。例如

ADD redis3.0.4 /opt/app/redis/

只会将redis3.0.4下文件复制到redis目录下,不包含redis3.0.4目录。COPY同理
2. CMD,ENTRYPOINT

两个都是容器启动时运行的命令,区别是CMD可以被覆盖,而ENTRYPOINT不会。ENTRYPOINT只能是最后一个生效。

MapReduce运行过程

个人理解整个过程是先对数据分片(这个过程还未读取真正数据),将数据划分到多个map,一个job可以包含多个map,MapReduce框架将多个job发送到多个节点上执行,每个job中map读取自己分片数据,然后根据业务代码过滤,再根据map输出进行reduce操作,最后将生成结果输出到一个目录中。

前言

在开发过程中经常需要新建工程,新建工程使用自带的archetype,往往不能满足项目开发需求,这就需要我们开发出自己的archetype。

实现

本次使用create-from-project来实现自定义archetype(方法至少两种)

1.构建模板项目

首先使用eclipse创建一个新的maven project,然后把配置好的一些公用的东西放到相应的目录下面 比如说会将一些常用的java代码存放到src/main/java目录下面;会将一些通用的配置文件放到src/main/resources目录下面;如果是javeEE工程,还会有一些jsp等等的文件存放到src/main/webapp目录下面

2.pom.xml编辑

在pom.xml文件中添加以下内容

1
2
3
4
5
6
7
8
9
10
11
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-archetype-plugin</artifactId>
<version>2.4</version>
</plugin>
</plugins>
</pluginManagement>
</build>

3.编译

在工程跟目录下运行maven命令

1
mvn archetype:create-from-project

然后会在target目录下面生成generated-sources目录,这个就是生成的 archetype

4.安装/发布

进入generated-sourced/archetype目录,运行maven命令:

1
mvn install

这样就把自定义的archetype安装到本地仓库了。

archetype安装的地址是在maven安装目录下面的conf/settings.xml文件中指定的(字节)。

默认会在 ~/.m2 目录下面生成一个archetype-catalog.xml文件(和默认的settings.xml在同一个目录), 声明了该archetype的groupId、artifactId和其他属性。

因为Eclipse创建maven项目过程中,选择的“Default Local”指向的地址就是 ~/.m2,所以文件archetype-catalog.xml会被eclipse自动读取,使用eclipse创建maven项目的时候可以在”Default Local”一项中找到刚才自定义archetype名字。

安装到本地仓库中的archetype只可以被自己使用,如果想要共享,那么在第四步的时候使用deploy命令,不要使用install命令。

5.卸载

如果想要卸载刚才安装的archetype,只需要将~/.m2目录下面的archetype-catalog.xml文件中对应的字节段删掉,并且把本地仓库中相应groupId和artifactId下面的文件删掉就可以了。

备注

问题:eclipse中找不到自定义archetype?

首先查看自定义的版本是否是0.0.1-SNAPSHOT,如果是这个的话,需要勾选include snapshot archetypes

参考

1.http://my.oschina.net/wangrikui/blog/498807
2.http://blog.csdn.net/sxdtzhaoxinguo/article/details/46895013