一、spring-boot-maven-plugin

使用案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>com.aibibang.AppMain</mainClass>
<addResources>true</addResources>
<excludes>
<exclude>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>

二、自定义打包

在开发过程中经常需要将依赖包与代码包分离,配置文件与代码包分离,这样更便于部署与修改参数。依次案例基于以上场景展开,借助于maven-jar-plugin与maven-assembly-plugin。

pom内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.aibibang.AppMain</mainClass>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
<excludes>
<exclude>*.properties</exclude>
<exclude>*.xml</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

package.xml内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<assembly>
<id>bin</id>
<!-- 最终打包成一个用于发布的zip文件 -->
<formats>
<format>tar.gz</format>
</formats>

<fileSets>
<!-- 打包jar文件 -->
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory></outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
<!-- 把项目相关的启动脚本,打包进zip文件的bin目录 -->
<fileSet>
<directory>${project.basedir}/src/main/bash</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*</include>
</includes>
</fileSet>

<!-- 把项目的配置文件,打包进zip文件的config目录 -->
<fileSet>
<directory>${project.build.directory}/classes</directory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>*.properties</include>
<include>*.xml</include>
</includes>
</fileSet>
</fileSets>
<!-- 把项目的依赖的jar打包到lib目录下 -->
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<scope>runtime</scope>
<excludes>
<exclude>${groupId}:${artifactId}</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

maven-jar-plugin只是讲代码打成一个jar,而对部署包的构建是由assembly插件完成的。

结合启动、停止脚本即可高效便捷的部署一个项目。

start.sh:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/sh
export JAVA_HOME=$JAVA_HOME
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
PIDFILE=service.pid
ROOT_DIR="$(cd $(dirname $0) && pwd)"
CLASSPATH=./*:$ROOT_DIR/lib/*:$ROOT_DIR/conf/
JAVA_OPTS="-Xms512m -Xmx1024m -XX:+UseParallelGC"
MAIN_CLASS=com.aibibang.AppMain


if [ ! -d "logs" ]; then
mkdir logs
fi

if [ -f "$PIDFILE" ]; then
echo "Service is already start ..."
else
echo "Service start ..."
nohup java $JAVA_OPTS -cp $CLASSPATH $MAIN_CLASS 1> logs/server.out 2>&1 &
printf '%d' $! > $PIDFILE
fi

stop.sh:

1
2
3
4
5
6
7
8
9
#!/bin/sh
PIDFILE=service.pid

if [ -f "$PIDFILE" ]; then
kill -9 `cat $PIDFILE`
rm -rf $PIDFILE
else
echo "Service is already stop ..."
fi

Elasticsearch使用备注

简介

beats + elasticsearch +logstash + kibana 这套工具集合出自于Elastic公司 https://www.elastic.co/guide/index.html
工具集功能

  • beats是结合elasticsearch,logstash,kibana进行数据分析展示工具集
  • beats主动获取数据,如:日志数据,文件数据,top数据,网络包数据,数据库数据等
  • logstash(可选)用于日志分析,然后将分析后的数据存储到elasticsearch中
  • elasticsearch用于分析、存储beats获取的数据
  • kibana用于展示图形elasticsearch上的数据,如:线图,饼图,表格等

备注

  1. Elasticsearch rest api
  • 查询模板
    1
    http://localhost:9200/_template
  • 查询索引
    1
    http://localhost:9200/_cat/indices
  • 查重指定索引数据
    1
    http://localhost:9200/packetbeat-*/_search?pretty
  1. Kibana地址
    1
    http://localhost:5601/
  2. Plugin Head集群可视化管理工具
    需要额外安装
    访问地址如下:
    1
    http://localhost:9200/_plugin/head/

index template管理

1.删除模板

1
curl -XDELETE 'http://localhost:9200/_template/packetbeat'

2.上传模板

1
curl -XPUT 'http://localhost:9200/_template/packetbeat' -d@/etc/packetbeat/packetbeat.template.json

3.删除documents

1
curl -XDELETE 'http://localhost:9200/packetbeat-*'

目标

  1. 将txt数据通过mapredure写到hbase中
  2. 将sqlserver数据写入hive表中,从hive表中写入hbase
  3. 将sqlserver数据写入hbase

实现

1.第一个目标

一、上传原数据文件

将data.txt文件上传到到hdfs上,内容如下:

1
2
3
4
key1	col1	value1  
key2 col2 value2
key3 col3 value3
key4 col4 value4

数据以制表符(\t)分割。

二、将数据写成HFile

通过mapredure将data.txt按hbase表格式写成hfile

pom.xml文件中依赖如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.4.0</version>
</dependency>
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.0-cdh5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.0-cdh5.4.0</version>
</dependency>

编写BulkLoadMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static final Logger logger = LoggerFactory.getLogger(BulkLoadMapper.class);
private String dataSeperator;
private String columnFamily1;
private String columnFamily2;

public void setup(Context context) {
Configuration configuration = context.getConfiguration();//获取作业参数
dataSeperator = configuration.get("data.seperator");
columnFamily1 = configuration.get("COLUMN_FAMILY_1");
columnFamily2 = configuration.get("COLUMN_FAMILY_2");
}

public void map(LongWritable key, Text value, Context context){
try {
String[] values = value.toString().split(dataSeperator);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(values[0]));
Put put = new Put(Bytes.toBytes(values[0]));
put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("words"), Bytes.toBytes(values[1]));
put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("sum"), Bytes.toBytes(values[2]));

context.write(rowKey, put);
} catch(Exception exception) {
exception.printStackTrace();
}

}

}

编写BulkLoadDriver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class BulkLoadDriver extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(BulkLoadDriver.class);
private static final String DATA_SEPERATOR = "\t";
private static final String TABLE_NAME = "truman";//表名
private static final String COLUMN_FAMILY_1="personal";//列组1
private static final String COLUMN_FAMILY_2="professional";//列组2

public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "D:/hadoop");
System.setProperty("HADOOP_USER_NAME", "root");
logger.info("---------------------------------------------");
try {
int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args);
if(response == 0) {
System.out.println("Job is successfully completed...");
} else {
System.out.println("Job failed...");
}
} catch(Exception exception) {
exception.printStackTrace();
}
}

public int run(String[] args) throws Exception {
String inputPath = "/user/truman/data.txt";
String outputPath = "/user/truman/hfile";
/**
* 设置作业参数
*/
Configuration configuration = getConf();

configuration.set("mapreduce.framework.name", "yarn");
configuration.set("yarn.resourcemanager.address", "192.168.1.2:8032");
configuration.set("yarn.resourcemanager.scheduler.address", "192.168.1.2:8030");
configuration.set("fs.defaultFS", "hdfs://192.168.1.2:8020");
configuration.set("mapred.jar", "D://workspace//SqlDataToHbase//target//SqlDataToHbase-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name", TABLE_NAME);
configuration.set("COLUMN_FAMILY_1", COLUMN_FAMILY_1);
configuration.set("COLUMN_FAMILY_2", COLUMN_FAMILY_2);

/* configuration.set("hbase.zookeeper.quorum", "192.168.1.2,192.168.1.3,192.168.1.4");
configuration.set("hbase.zookeeper.property.clientPort", "2181");*/

Job job = Job.getInstance(configuration, "Bulk Loading HBase Table::" + TABLE_NAME);
job.setJarByClass(BulkLoadDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类
job.setMapOutputValueClass(Put.class);//指定输出值类
job.setMapperClass(BulkLoadMapper.class);//指定Map函数
FileInputFormat.addInputPaths(job, inputPath);//输入路径
FileSystem fs = FileSystem.get(configuration);
Path output = new Path(outputPath);
if (fs.exists(output)) {
fs.delete(output, true);//如果输出路径存在,就将其删除
}
FileOutputFormat.setOutputPath(job, output);//输出路径


Connection connection = ConnectionFactory.createConnection(configuration);
TableName tableName = TableName.valueOf(TABLE_NAME);
HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));
job.waitForCompletion(true);
if (job.isSuccessful()){
HFileLoader.doBulkLoad(outputPath, TABLE_NAME,configuration);//导入数据
return 0;
} else {
return 1;
}
}

}

整个项目需要将hbase-site.xml、yarn-site.xml、mapred-site.xml放入resources下。本地运行出错的话,再加入org.apache.hadoop.io.nativeio.NativeIO到当前工程中

三、数据加载

  1. 命令方式

首先修改hadoop-env.sh配置,加入以下:

1
2
export HBASE_HOME=/data/bigdata/hbase-1.0.0-cdh5.4.0
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/hbase-server-1.0.0-cdh5.4.0.jar:$HBASE_HOME/lib/hbase-server-1.0.0-cdh5.4.0-tests.jar:$HBASE_HOME/conf:$HBASE_HOME/lib/zookeeper-3.4.5-cdh5.4.0.jar:$HBASE_HOME/lib/guava-12.0.1.jar:$HBASE_HOME/lib/hbase-client-1.0.0-cdh5.4.0.jar:$HADOOP_CLASSPATH:$HBASE_HOME/lib/*

将数据按HFile写入到hdfs中,然后进入$HBASE_HOME/bin中执行以下命令

1
/data/bigdata/hadoop-2.6.0-cdh5.4.0/bin/hadoop jar ../lib/hbase-server-1.0.0-cdh5.4.0.jar completebulkload /user/truman/hfile  truman
  1. java方式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class HFileLoader {
    public static void doBulkLoad(String pathToHFile, String tableName,Configuration configuration){
    try {

    HBaseConfiguration.addHbaseResources(configuration);
    LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
    HTable hTable = new HTable(configuration, tableName);//指定表名
    loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据
    System.out.println("Bulk Load Completed..");
    } catch(Exception exception) {
    exception.printStackTrace();
    }

    }

    }
    四、结果查询
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    [root@LAB3 bin]# ./hbase shell
    HBase Shell; enter 'help<RETURN>' for list of supported commands.
    Type "exit<RETURN>" to leave the HBase Shell
    Version 1.0.0-cdh5.4.0, rUnknown, Tue Apr 21 12:19:34 PDT 2015

    hbase(main):001:0> scan 'truman'
    ROW COLUMN+CELL
    key1 column=personal:words, timestamp=1476179060738, value=col1
    key1 column=professional:sum, timestamp=1476179060738, value=value1
    key2 column=personal:words, timestamp=1476179060738, value=col2
    key2 column=professional:sum, timestamp=1476179060738, value=value2
    key3 column=personal:words, timestamp=1476179060738, value=col3
    key3 column=professional:sum, timestamp=1476179060738, value=value3
    key4 column=personal:words, timestamp=1476179060738, value=col4
    key4 column=professional:sum, timestamp=1476179060738, value=value4
    4 row(s) in 0.4300 seconds

  1. docker version

查看docker安装版本
2. docker search

查找opentsdb相关的镜像

1
$ docker search opentsdb
  1. docker pull

拉去镜像

1
$ docker pull **/**
  1. docker ps/build

查看当前机器运行的docker容器

构建镜像

1
docker build -t=truman/redis:3.0.6 .
  1. docker run
  • 不带参数
1
$ docker run ubuntu /bin/echo 'Hello world'
参数 解释
docker 告诉操作系统我们要使用docker应用
docker run 组合起来意思就是运行一个docker镜像
ubuntu 镜像(image)名称
/bin/echo ‘Hello world’ 告诉docker我们要在容器中执行的操作
之后我们就可以看到输出结果:Hello world
  • 带参
    1
    2
    $ docker run -t -i ubuntu /bin/bash
    $ docker run -d -p 127.0.0.1:80:9000 --privileged -v /var/run/docker.sock:/var/run/docker.sock uifd/ui-for-docker
参数 解释
-t 为这个容器分配一个虚拟的终端
-i 保持对于容器的stdin为打开的状态(输入)。
-d 让docker容器在后台中运行
-p 将docker容器内部端口映射到我们的host上面,我们可以使用 docker port CONTAINER_ID 来查询容器的端口 映射情况
一般情况下 -i 与 -t 参数都是结合在一起使用,这样交互会比较好一点。
  • 镜像运行传参

这个参数是在容器生成的时候传入的,例如:指定hosts

1
docker run -d -p 4244:4242 --name opentsdb5 --add-host lab1:192.168.0.101 --add-host lab2:192.168.0.102 --add-host lab3:192.168.0.103 truman/opentsdb  

都是在镜像名字之前传入的,可以写多个
6. docher start/stop/restart

该命令可以操作容器
7. docker rmi

强制删除镜像

1
$ docker rmi -f <img_id>
  1. docker logs

在容器以守护进程运行的过程中,可以通过docker logs命令查看log日志,具体用法如下:

1
$ docker logs -ft <img_id>

以终端模式查看最新log。还有其他命令:docker logs –tail 10 获取日志最后10行内容,也可以使用 docker logs –tail 0 -f 跟踪最新日志
9. 更多命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Commands:
attach Attach to a running container
build Build an image from a Dockerfile
commit Create a new image from a container's changes
cp Copy files/folders from a container's filesystem to the host path
create Create a new container
diff Inspect changes on a container's filesystem
events Get real time events from the server
exec Run a command in a running container
export Stream the contents of a container as a tar archive
history Show the history of an image
images List images
import Create a new filesystem image from the contents of a tarball
info Display system-wide information
inspect Return low-level information on a container or image
kill Kill a running container
load Load an image from a tar archive
login Register or log in to a Docker registry server
logout Log out from a Docker registry server
logs Fetch the logs of a container
pause Pause all processes within a container
port Lookup the public-facing port that is NAT-ed to PRIVATE_PORT
ps List containers
pull Pull an image or a repository from a Docker registry server
push Push an image or a repository to a Docker registry server
rename Rename an existing container
restart Restart a running container
rm Remove one or more containers
rmi Remove one or more images
run Run a command in a new container
save Save an image to a tar archive
search Search for an image on the Docker Hub
start Start a stopped container
stats Display a stream of a containers' resource usage statistics
stop Stop a running container
tag Tag an image into a repository
top Lookup the running processes of a container
unpause Unpause a paused container
version Show the Docker version information
wait Block until a container stops, then print its exit code

所有指令都是大写

  1. ADD,COPY

两个都是将本地文件复制到镜像中,区别是ADD可以指定绝对路径的文件,言外之意是可以上传除当前目录之外的文件。而COPY只能上传当前目录的文件。

这两条命令复制文件夹的话,只会讲子目录复制到指定目录下。例如

ADD redis3.0.4 /opt/app/redis/

只会将redis3.0.4下文件复制到redis目录下,不包含redis3.0.4目录。COPY同理
2. CMD,ENTRYPOINT

两个都是容器启动时运行的命令,区别是CMD可以被覆盖,而ENTRYPOINT不会。ENTRYPOINT只能是最后一个生效。

MapReduce运行过程

个人理解整个过程是先对数据分片(这个过程还未读取真正数据),将数据划分到多个map,一个job可以包含多个map,MapReduce框架将多个job发送到多个节点上执行,每个job中map读取自己分片数据,然后根据业务代码过滤,再根据map输出进行reduce操作,最后将生成结果输出到一个目录中。

前言

在开发过程中经常需要新建工程,新建工程使用自带的archetype,往往不能满足项目开发需求,这就需要我们开发出自己的archetype。

实现

本次使用create-from-project来实现自定义archetype(方法至少两种)

1.构建模板项目

首先使用eclipse创建一个新的maven project,然后把配置好的一些公用的东西放到相应的目录下面 比如说会将一些常用的java代码存放到src/main/java目录下面;会将一些通用的配置文件放到src/main/resources目录下面;如果是javeEE工程,还会有一些jsp等等的文件存放到src/main/webapp目录下面

2.pom.xml编辑

在pom.xml文件中添加以下内容

1
2
3
4
5
6
7
8
9
10
11
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-archetype-plugin</artifactId>
<version>2.4</version>
</plugin>
</plugins>
</pluginManagement>
</build>

3.编译

在工程跟目录下运行maven命令

1
mvn archetype:create-from-project

然后会在target目录下面生成generated-sources目录,这个就是生成的 archetype

4.安装/发布

进入generated-sourced/archetype目录,运行maven命令:

1
mvn install

这样就把自定义的archetype安装到本地仓库了。

archetype安装的地址是在maven安装目录下面的conf/settings.xml文件中指定的(字节)。

默认会在 ~/.m2 目录下面生成一个archetype-catalog.xml文件(和默认的settings.xml在同一个目录), 声明了该archetype的groupId、artifactId和其他属性。

因为Eclipse创建maven项目过程中,选择的“Default Local”指向的地址就是 ~/.m2,所以文件archetype-catalog.xml会被eclipse自动读取,使用eclipse创建maven项目的时候可以在”Default Local”一项中找到刚才自定义archetype名字。

安装到本地仓库中的archetype只可以被自己使用,如果想要共享,那么在第四步的时候使用deploy命令,不要使用install命令。

5.卸载

如果想要卸载刚才安装的archetype,只需要将~/.m2目录下面的archetype-catalog.xml文件中对应的字节段删掉,并且把本地仓库中相应groupId和artifactId下面的文件删掉就可以了。

备注

问题:eclipse中找不到自定义archetype?

首先查看自定义的版本是否是0.0.1-SNAPSHOT,如果是这个的话,需要勾选include snapshot archetypes

参考

1.http://my.oschina.net/wangrikui/blog/498807
2.http://blog.csdn.net/sxdtzhaoxinguo/article/details/46895013

RedisCluster集群定位

故障排除

如果在产线中Redis 操作变慢,可按以下操作排除故障

  • slowlog查看引发延迟的慢命令

获取10条慢操作日志

1
slowlog get 10

结果为查询ID、发生时间、运行时长和原命令。默认10毫秒,默认只保留最后的128条。单线程的模型下,一个请求占掉10毫秒是件大事情,注意设置和显示的单位为微秒,注意这个时间是不包含网络延迟的。

  • 监控客户端的连接

在Redis-cli工具中输入info clients可以查看到当前实例的所有客户端连接信息,过多的连接数也会影响redis性能

1
info clients
  • 机器固有延迟

如果使用的是虚拟机的话,会存在一个固有延迟.可以使用如下命令,在redis server中执行:

1
./redis-cli --intrinsic-latency 100

100位一秒内测试数量

  • 计算延迟

如果你正在经历redis 延迟问题,可能需要衡量延迟到底是多大,用以下命令即可实现

1
redis-cli --latency -h `host` -p `port`
  • AOF和磁盘IO引起延迟

查看redis中fdatasync(2),追踪进程

1
sudo strace -p $(pidof redis-server) -T -e trace=fdatasync,write
  • 过期key引起延迟

当数据库有超过1/4key在同一时间过期,会引起redis 阻塞

产线调整

  • aof配置调整

经过查看redis log,我们发现aof延迟比较厉害,猜测写aof影响了redis 性能。但为了保证数据安全,又无法关闭aof文件。经过查看官方文档,未更改aof保存策略维持fsync every second,更改

1
no-appendfsync-on-rewrite yes

这个配置是指在重写aof时,不持久化数据到aof中。

  • rdb配置调整

线上机器IO操作特别频繁,用命令

1
iostat -d -k 2

查看当前IO情况。经过查看发现redis rdb temp文件变更比较频繁。将配置改为

1
save 600 30000

线上机器IO才降下来。

Client使用定位

  • pipeline操作串行改并行

经过测试,高并发情况下,key批量pipeline处理已经成为性能瓶颈。经过分析将这块的处理更改成并行处理,并行处理的线程数为当前master的数量,这样会将Client的处理能力提升n倍,经过测试,效果明显。

  • 集群节点信息单独线程维护

之前Client为了实时获取集群node信息,在每次操作都会主动获取集群信息。这里操作不用这么频繁,如果并发特别大的话,也会消耗大量的时间,经过分析后将此处做更改。有两种修改方法(1.)单独新建线程,专门维护集群信息(2.)更改成处理失败情况下,再去更新集群信息。目前采用第二种

  • 增加slow log记录

系统如果不存在对操作时间log统计,那么就会出现不容易定位是哪一块出的问题,但也是一把双刃剑,增加slow log 统计会浪费时间,增加系统负担。基于以上综合考虑:特将slow log架构设计成通过rest API动态改变统计维度,统计分为set和get两种(set数据理论上会比get数据慢)。

问题1

更改成并行处理后,出现许多莫名其妙的问题,错误如下:

1
redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.

解决
分析log,发现有同时多线程处理同一个key情况,更改key处理顺序,避免多线程同时操作一个key,此问题再未复现。

问题2

更改成并行处理后,出现开始一部分操作报null错误

解决

分析log,发现多线程同时调用一个获取集群信息方法,该方法避免多次调用,已经加lock,这会导致多数线程得不到集群信息,因此会出现null,将该方法提到初始化时即调用,彻底避免多线程调用。

参考

以下链接为我收藏的网址,对追踪Redis latency有很大帮助

  1. http://redis.io/topics/latency
  2. http://www.tuicool.com/articles/2Q7nauM
  3. http://www.jianshu.com/p/ee2aa7fe341b
  4. http://www.cnblogs.com/mushroom/p/4738170.html

Java 多线程实现方式

新建线程

1.继承Thread,复写run

实现一个线程最简单的方法

1
2
3
4
5
new Thread() {
public void run() {
System.out.println("I`am a thread!");
};
}.start();

2.实现runnable,实现run

runnable是一个接口类,使用的话,需要实现。

1
2
3
4
5
6
new Thread(new Runnable() {
public void run() {
// TODO Auto-generated method stub
System.out.println("I`am a second thread!");
}
}).start();

3.利用jdk中Executor框架实现

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

1
2
3
4
5
6
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
public void run() {
System.out.println("I`am a thrid thread!");
}
});

带返回值的多线程

结合Future,Callable一起使用,可以取得多线程执行的返回值

1.第一种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<String>> results = new ArrayList<Future<String>>();
for( int i=0;i<10000;i++){
final int j = i;
Future<String> result = executorService.submit(new Callable<String>() {
public String call() throws Exception {
// TODO Auto-generated method stub
return "value"+j;
}
});
results.add(result);
}
for(Future<String>result :results){
System.out.println(result.get());//当线程没有执行完毕会阻塞
}
executorService.shutdown();

2.第二种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
for (int i = 0; i < 10000; i++) {
final int j = i;
tasks.add(new Callable<String>() {
public String call() throws Exception {
// TODO Auto-generated method stub
return "value" + j;
}
});
}
//此时还未执行
List<Future<String>> results = executorService.invokeAll(tasks);//未执行完成阻塞
for (Future<String> result : results) {
System.out.println(result.get());//当线程没有执行完毕会阻塞
}
executorService.shutdown();

利用外部表读取orc文件

前言

因为orc文件压缩,并且可以快速加载到hive中,因为这种应用于hadoop平台的上文件获得许多开发者的注意。

ORC : Optimized Row Columnar (ORC) file
据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。它的设计目标是来克服Hive其他格式的缺陷。运用ORC File可以提高Hive的读、写以及处理数据的性能。

方案

可以利用构建hive中外部表来读取orc文件。

1.建表

其中location即为orc文件所在的目录,该目录下新增的文件,都可以被hive查出

1
2
3
4
5
CREATE EXTERNAL TABLE test_orc_v2(
status string,
type string
)
stored as orc location "/user/test";

2.orc文件生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.io.IOException;
import java.util.List;

import org.apache.crunch.types.orc.OrcUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
public class HdfsFileTest {

public static void main(String[] args) throws IOException {
System.setProperty("hadoop.home.dir", "D:/hadoop");
HdfsFileTest test = new HdfsFileTest();
test.createOrcFile();
}

public void createOrcFile() throws IOException {
String typeStr = "struct<status:string,type:string>";
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
ObjectInspector inspector = OrcStruct.createObjectInspector(typeInfo);


Configuration conf = new Configuration();
Path tempPath = new Path("/user/test/test1.orc");

Writer writer = OrcFile.createWriter(tempPath, OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));

OrcStruct struct = OrcUtils.createOrcStruct(typeInfo,new Text("OK1"),new Text("http2"));

writer.addRow(struct);
writer.close();

ReaderOptions options = OrcFile.readerOptions(conf);
Reader reader = OrcFile.createReader(tempPath, options);
RecordReader rows = reader.rows();

List next = (List) ObjectInspectorUtils.copyToStandardJavaObject(rows.next(null),
reader.getObjectInspector());
System.out.println(next.toString());
rows.close();
}

}

其中OrcUtils使用如下依赖,在pom中注意添加

1
2
3
4
5
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-hive</artifactId>
<version>0.14.0</version>
</dependency>
0%