opentsdb输入信息格式为:put metric timestamp value tagname=tagvalue tag2=value2,在logstash-output-opentsdb插件metrics配置中默认已经输入timestamp,因此metrics需要配置的第一个参数为metricName,第二个参数为 value 之后依次为tagname,tagValue。
ruby { code => "fields = event['message'].split(/\r\n|\n/) length = fields.length-1 for i in 1..length do if fields[i].include?':' then field = fields[i].split(':') event[field[0]] = field[1].to_f end end " remove_field => [ "message" ] }
Gem::Specification.new do |s| s.name = 'logstash-input-example' s.version = '2.0.4' s.licenses = ['Apache License (2.0)'] s.summary = "This example input streams a string at a definable interval." s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" s.authors = ["Elastic"] s.email = 'info@elastic.co' s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" s.require_paths = ["lib"]
# Generate a repeating message. # # This plugin is intented only as an example.
class LogStash::Inputs::Example < LogStash::Inputs::Base config_name "example"
# If undefined, Logstash will complain, even if codec is unused. default :codec, "plain"
# The message string to use in the event. config :message, :validate => :string, :default => "Hello World!"
# Set how frequently messages should be sent. # # The default, `1`, means send a message every second. config :interval, :validate => :number, :default => 1
public def register @host = Socket.gethostname end # def register
def run(queue) # we can abort the loop if stop? becomes true while !stop? event = LogStash::Event.new("message" => @message, "host" => @host) decorate(event) queue << event # because the sleep interval can be big, when shutdown happens # we want to be able to abort the sleep # Stud.stoppable_sleep will frequently evaluate the given block # and abort the sleep(@interval) if the return value is true Stud.stoppable_sleep(@interval) { stop? } end # loop end # def run
def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end end # class LogStash::Inputs::Example
def execute(command, queue) @logger.debug? && @logger.debug("Running exec", :command => command) begin @io = IO.popen(command) fields = (@io.read).split(/\r\n|\n/) puts fields length = fields.length-1 for i in 0..length do if fields[i].include?':' then field = fields[i].split(':') newcommand = "redis-cli -c -h #{field[0]} -p #{field[1]} info" @io = IO.popen(newcommand) @codec.decode(@io.read) do |event| decorate(event) event.set("host", @hostname) event.set("command", newcommand) queue << event end end
end rescue StandardError => e @logger.error("Error while running command", :command => command, :e => e, :backtrace => e.backtrace) rescue Exception => e @logger.error("Exception while running command", :command => command, :e => e, :backtrace => e.backtrace) ensure stop end end
/data/bigdata/hadoop-2.6.0-cdh5.4.0/bin/hadoop jar ../lib/hbase-server-1.0.0-cdh5.4.0.jar completebulkload /user/truman/hfile truman
java方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
public class HFileLoader { public static void doBulkLoad(String pathToHFile, String tableName,Configuration configuration){ try { HBaseConfiguration.addHbaseResources(configuration); LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration); HTable hTable = new HTable(configuration, tableName);//指定表名 loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据 System.out.println("Bulk Load Completed.."); } catch(Exception exception) { exception.printStackTrace(); }
}
}
四、结果查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
[root@LAB3 bin]# ./hbase shell HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.0.0-cdh5.4.0, rUnknown, Tue Apr 21 12:19:34 PDT 2015
Commands: attach Attach to a running container build Build an image from a Dockerfile commit Create a new image from a container's changes cp Copy files/folders from a container's filesystem to the host path create Create a new container diff Inspect changes on a container's filesystem events Get real time events from the server exec Run a command in a running container export Stream the contents of a container as a tar archive history Show the history of an image images List images import Create a new filesystem image from the contents of a tarball info Display system-wide information inspect Return low-level information on a container or image kill Kill a running container load Load an image from a tar archive login Register or log in to a Docker registry server logout Log out from a Docker registry server logs Fetch the logs of a container pause Pause all processes within a container port Lookup the public-facing port that is NAT-ed to PRIVATE_PORT ps List containers pull Pull an image or a repository from a Docker registry server push Push an image or a repository to a Docker registry server rename Rename an existing container restart Restart a running container rm Remove one or more containers rmi Remove one or more images run Run a command in a new container save Save an image to a tar archive search Search for an image on the Docker Hub start Start a stopped container stats Display a stream of a containers' resource usage statistics stop Stop a running container tag Tag an image into a repository top Lookup the running processes of a container unpause Unpause a paused container version Show the Docker version information wait Block until a container stops, then print its exit code