kafka使用教程
kafka使用教程
shell操作
1.创建topic
–replication-factor 2:备份因子为2
–partitions 10:partitions数目为10
1
bin/kafka-topics.sh --create --zookeeper 192.168.0.101:2181 --replication-factor 2 --partitions 10 --topic upgrade-kafka_test
2.查询topic
列出所有的topic
1
bin/kafka-topics.sh --list --zookeeper localhost:2181
3.删除topic
首先需要确认集群是否配置delete.topic.enable=true,配置后即可删除,确保topic没有被使用。
1 | bin/kafka-topics.sh --delete --zookeeper 192.168.0.101:2181 --topic upgrade-kafka_test |
程序操作
参考
Kafka原理及设计理论总结
Kafka原理及设计理论总结
一、数据可靠性保证
当Producer向Leader发送数据时,可以通过request.required.acks参数设置数据可靠性的级别
- 0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
- 1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
- -1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证
仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失。要保证数据不丢除了设置acks=-1, 还要保 证ISR的大小大于等于2,具体参数设置:
(1).request.required.acks:设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
(2).min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica
Producer要在吞吐率和数据可靠性之间做一个权衡
二、数据一致性保证
一致性定义:若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到
- HighWaterMark
简称HW: Partition的高水位,取一个partition对应的ISR中最小的LEO作为HW,消费者最多只能消费到HW所在的位置,另外每个replica都有highWatermark,leader和follower各自负责更新自己的highWatermark状态,highWatermark <= leader. LogEndOffset
2. 对于Leader新写入的msg,
Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置
这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取。对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
RedisCluster监控系统
一、项目介绍
为了更好监控RedisCluster集群状态信息,提升性能,抛弃之前通过java api获取info信息。此次项目分为两个方面:
- 通过redis自带info信息监控集群
- 监控网络流量获取对集群的使用情况的监控。
总之,通过不同的粒度监控RedisCluster集群运行状况,提供良好的管理运维平台。
二、技术架构
1.info信息
1 | graph LR |
Logstash良好的插件结构设计,我们可以根据不同场景选择合适的input,filter,output插件。为了高效配置监控集群,input插件我们基于exec自定义了自己的插件redis-exec插件。output插件直接选择elasticsearch插件。
2.网络流量
1 | graph LR |
Packagebeat是一个分布式网络数据抓包软件,可以直接监控redis协议信息。
Kibana是一个es可视化的作图工具,根据不同的搜索条件可制定监控不同指标信息的动态图,让使用人员可以直观监控集群运行状况。
三、难点
1.redis-exec插件开发
详见logstash插件开发
2.监控指标
docker registry创建
centos6.X版本,docker version1.7.1版本创建过程如下:
- 运行容器
1
$ docker run -d -p 5000:5000 registry
- 修改配置
在docker1.3.X版本以后,与docker registry交互默认使用的是https,此处需要修改为http。在/etc/sysconfig/docker文件中添加以下内容即可:
1 | other_args="$other_args --insecure-registry myregistry.example.com:5000 " |
在1.12最新版本中可以使用以下方式修改,原理是一致的。
1 | Create or modify /etc/docker/daemon.json |
然后重新启动
1 | $ service docker restart |
- 测试新的registry
- 镜像打标签
1
$ docker tag <img_id> myregistry.example.com:5000/truman/opentsdb
- 提交镜像然后通过docker images即可查看到push 的镜像
1
$ docker push myregistry.example.com:5000/truman/opentsdb
在别的机器中就可以拉取镜像了命令如下:
1 | docker pull myregistry.example.com:5000/truman/opentsdb |
logstash使用教程
简介
logstash是一个实时流水式开源数据收集引擎。具有强大的plugin。可以根据自己的业务场景选择不同的input filter output。绝大多数情况下都是结合ElasticSearch Kibana一起使用的,俗称ELK。
模块介绍
Logstash使用管道方式进行日志的搜集处理和输出。有点类似*NIX系统的管道命令 xxx | ccc | ddd,xxx执行完了会执行ccc,然后执行ddd。
在logstash中,包括了三个阶段:
输入input –> 处理filter(不是必须的) –> 输出output
配置文件说明
前面介绍过logstash基本上由三部分组成,input、output以及用户需要才添加的filter,因此标准的配置文件格式如下:
1 | input { |
执行说明
1 | bin/logstash -f demo.conf |
使用Demo
Output plugins ElasticSearch
案例使用如下:
1 | output { |
Output plugins opentsdb
使用logstash收集数据,并发送到opentsdb中。分为三部分:Input,Filter,Output
输入数据时,输入一条数据,回车。以下为三条测试数据:
1 | threads.ThreadCount 1352279077 67 host=server1 port=1006 |
Input采用命令行输入数据
1
2
3
4input {
stdin{
}
}Filter过滤组织数据
采用的是grok插件,可以使用其他插件完成相同的目的
1 | filter { |
备忘:
- logstash输入数据自带host,@timestamp等自带,为了避免干扰存入opentsdb数据,此处特将隐含的host字段去掉。
- DATA/NUMBER等实为grok自带的正则规则。
- Output输出数据
此处输出数据到opentsdb中,官方文档有误,详见源码
1 | output { |
备忘:
opentsdb输入信息格式为:put metric timestamp value tagname=tagvalue tag2=value2,在logstash-output-opentsdb插件metrics配置中默认已经输入timestamp,因此metrics需要配置的第一个参数为metricName,第二个参数为 value 之后依次为tagname,tagValue。
参考
Filter plugins
Filter plugins
- grok
1
2
3
4grok {
match => {"command" => "redis-cli -c -h %{IP:node:} -p %{NUMBER:port}%{DATA:data}" }
remove_field => [ "host" ]
} - ruby
功能描述:将redis info 信息格式化按字段输出
1 | ruby { |
- mutate
功能描述:字段类型指定
1 | filter { |
Output plugins
logstash插件开发
背景
logstash强大魅力在于它的插件体系,虽然官方插件很多,但不可能满足所有的要求,因此就需要定制化个性化插件,本次结合Logstash Monitor Redis需求开发专用插件,以实现动态化获取master 实例中info 信息。
logstash插件介绍
体系结构
1 | $ tree logstash-input-example |
其实只需要这logstash-input-example.gemspec,example.rb两个文件即可。
mypluginname_spec.rb 是测试类。
先看看logstash-input-example.gemspec都做了什么吧!
1 | Gem::Specification.new do |s| |
上面的信息,只要改改版本和名字,其他的信息基本不需要动。
关键的信息还有:
- s.require_paths定义了插件核心文件的位置
- s.add_runtime_dependency 定义了插件运行的环境
然后再看看example.rb
这个文件就需要详细说说了,基本的框架如下,挨行看看!1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "stud/interval"
require "socket" # for Socket.gethostname
# Generate a repeating message.
#
# This plugin is intented only as an example.
class LogStash::Inputs::Example < LogStash::Inputs::Base
config_name "example"
# If undefined, Logstash will complain, even if codec is unused.
default :codec, "plain"
# The message string to use in the event.
config :message, :validate => :string, :default => "Hello World!"
# Set how frequently messages should be sent.
#
# The default, `1`, means send a message every second.
config :interval, :validate => :number, :default => 1
public
def register
@host = Socket.gethostname
end # def register
def run(queue)
# we can abort the loop if stop? becomes true
while !stop?
event = LogStash::Event.new("message" => @message, "host" => @host)
decorate(event)
queue << event
# because the sleep interval can be big, when shutdown happens
# we want to be able to abort the sleep
# Stud.stoppable_sleep will frequently evaluate the given block
# and abort the sleep(@interval) if the return value is true
Stud.stoppable_sleep(@interval) { stop? }
end # loop
end # def run
def stop
# nothing to do in this case so it is not necessary to define stop
# examples of common "stop" tasks:
# * close sockets (unblocking blocking reads/accepts)
# * cleanup temporary files
# * terminate spawned threads
end
end # class LogStash::Inputs::Example
首先第一行的# encoding: utf-8,不要以为是注释就没什么作用。它定义了插件的编码方式。
下面两行:
require “logstash/inputs/base”
require “logstash/namespace”
引入了插件必备的包。
1 | class LogStash::Inputs::Example < LogStash::Inputs::Base |
插件继承自Base基类,并配置插件的使用名称。
下面的一行对参数做了配置,参数有很多的配置属性,完整的如下:
1 | config :variable_name,:validate =>:variable_type,:default =>"Default value",:required => boolean,:deprecated => boolean |
其中
variable_name就是参数的名称了。
validate 定义是否进行校验,如果不是指定的类型,在logstash -f xxx –configtest的时候就会报错。它支持多种数据类型,比如:string, :password, :boolean, :number, :array, :hash, :path (a file-system path), :codec (since 1.2.0), :bytes.
default 定义参数的默认值
required 定义参数是否是必须值
deprecated 定义参数的额外信息,比如一个参数不再推荐使用了,就可以通过它给出提示!典型的就是es-output里面的Index_type,当使用这个参数时,就会给出提示
插件安装
- 便捷安装方式
第一步,首先把这个插件文件夹拷贝到下面的目录中
1 | logstash-2.1.0\vendor\bundle\jruby\1.9\gems |
第二步,修改logstash根目录下的Gemfile,添加如下的内容:
1 | gem "logstash-filter-example", :path => "vendor/bundle/jruby/1.9/gems/logstash-filter-example-1.0.0" |
第三步,编写配置文件,test.conf:
1 | input{ |
第四步,输入logstash -f test.conf时,输入任意字符,回车~~~大功告成!
1 | { |
- 官方指导方式
第一步,build
1 | gem build logstash-input-example.gemspec |
会在当前路径下生成logstash-input-example-2.0.4.gem
第二步,install
1 | bin/logstash-plugin install /logstash-input-example/logstash-input-example-2.0.4.gem |
验证
1 | validating /logstash-input-example/logstash-input-example-2.0.4.gem >= 0 |
第三步,查看plugin:
1 | bin/logstash-plugin list |
第四步,使用
略
开发案例
开发插件实现根据cluster nodes信息获取redis cluster 中master节点 info信息。使用该插件只用输入一条命令,即可动态获取相关信息。
插件开发
此插件是基于exec基础上封装的,主要修改内容为:
1 | def execute(command, queue) |
使用Demo
使用方式
1 | redisexec { |
完整使用案例
将info 信息存储到 ElasticSerach中
1 | input { |
参考
Eclipse Maven Update 时JDK版本变更问题
1.新建一个Maven项目JDK版本和系统版本不对应,
2.右键Maven项目->Maven->Update ProjectJDK版本改变了,
3.操作系统的JDK重装了新的版本,这是引起前面两个现象的主要原因。
修改方法(假如系统jdk版本是1.8):
方法一:在pom.xml文件中指定jdk的版本:
1 | <build> |
maven打包案例
一、spring-boot-maven-plugin
使用案例如下:
1 | <plugin> |
二、自定义打包
在开发过程中经常需要将依赖包与代码包分离,配置文件与代码包分离,这样更便于部署与修改参数。依次案例基于以上场景展开,借助于maven-jar-plugin与maven-assembly-plugin。
pom内容如下:
1 | <build> |
package.xml内容:
1 | <assembly> |
maven-jar-plugin只是讲代码打成一个jar,而对部署包的构建是由assembly插件完成的。
结合启动、停止脚本即可高效便捷的部署一个项目。
start.sh:
1 | #!/bin/sh |
stop.sh:
1 | #!/bin/sh |