Skip to content

Commit

Permalink
This came from elasticsearch/logstash at cf2242170011fbf2d264ae876601…
Browse files Browse the repository at this point in the history
…92754697671a
  • Loading branch information
Richard Pijnenburg committed Oct 20, 2014
0 parents commit 54bea30
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.gem
Gemfile.lock
.bundle
vendor
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source 'https://rubygems.org'
gem 'rake'
gem 'gem_publisher'
gem 'archive-tar-minitar'
6 changes: 6 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@files=[]

task :default do
system("rake -T")
end

101 changes: 101 additions & 0 deletions lib/logstash/outputs/opentsdb.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "socket"

# This output allows you to pull metrics from your logs and ship them to
# opentsdb. Opentsdb is an open source tool for storing and graphing metrics.
#
class LogStash::Outputs::Opentsdb < LogStash::Outputs::Base
config_name "opentsdb"
milestone 1

# Enable debugging.
config :debug, :validate => :boolean, :default => false, :deprecated => "This setting was never used by this plugin. It will be removed soon."

# The address of the opentsdb server.
config :host, :validate => :string, :default => "localhost"

# The port to connect on your graphite server.
config :port, :validate => :number, :default => 4242

# The metric(s) to use. This supports dynamic strings like %{source_host}
# for metric names and also for values. This is an array field with key
# of the metric name, value of the metric value, and multiple tag,values . Example:
#
# [
# "%{host}/uptime",
# %{uptime_1m} " ,
# "hostname" ,
# "%{host}
# "anotherhostname" ,
# "%{host}
# ]
#
# The value will be coerced to a floating point value. Values which cannot be
# coerced will zero (0)
config :metrics, :validate => :array, :required => true

def register
connect
end # def register

def connect
# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
begin
@socket = TCPSocket.new(@host, @port)
rescue Errno::ECONNREFUSED => e
@logger.warn("Connection refused to opentsdb server, sleeping...",
:host => @host, :port => @port)
sleep(2)
retry
end
end # def connect

public
def receive(event)
return unless output?(event)

# Opentsdb message format: put metric timestamp value tagname=tagvalue tag2=value2\n

# Catch exceptions like ECONNRESET and friends, reconnect on failure.
begin
name = metrics[0]
value = metrics[1]
tags = metrics[2..-1]

# The first part of the message
message = ['put',
event.sprintf(name),
event.sprintf("%{+%s}"),
event.sprintf(value),
].join(" ")

# If we have have tags we need to add it to the message
event_tags = []
unless tags.nil?
Hash[*tags.flatten].each do |tag_name,tag_value|
# Interprete variables if neccesary
real_tag_name = event.sprintf(tag_name)
real_tag_value = event.sprintf(tag_value)
event_tags << [real_tag_name , real_tag_value ].join('=')
end
message+=' '+event_tags.join(' ')
end

# TODO(sissel): Test error cases. Catch exceptions. Find fortune and glory.
begin
@socket.puts(message)
rescue Errno::EPIPE, Errno::ECONNRESET => e
@logger.warn("Connection to opentsdb server died",
:exception => e, :host => @host, :port => @port)
sleep(2)
connect
end

# TODO(sissel): resend on failure
# TODO(sissel): Make 'resend on failure' tunable; sometimes it's OK to
# drop metrics.
end # @metrics.each
end # def receive
end # class LogStash::Outputs::Opentsdb
26 changes: 26 additions & 0 deletions logstash-output-opentsdb.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-opentsdb'
s.version = '0.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This output allows you to pull metrics from your logs and ship them to opentsdb"
s.description = "This output allows you to pull metrics from your logs and ship them to opentsdb"
s.authors = ["Elasticsearch"]
s.email = '[email protected]'
s.homepage = "http://logstash.net/"
s.require_paths = ["lib"]

# Files
s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*')

# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "group" => "output" }

# Gem dependencies
s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'

end

9 changes: 9 additions & 0 deletions rakelib/publish.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require "gem_publisher"

desc "Publish gem to RubyGems.org"
task :publish_gem do |t|
gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first
gem = GemPublisher.publish_if_updated(gem_file, :rubygems)
puts "Published #{gem}" if gem
end

169 changes: 169 additions & 0 deletions rakelib/vendor.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
require "net/http"
require "uri"
require "digest/sha1"

def vendor(*args)
return File.join("vendor", *args)
end

directory "vendor/" => ["vendor"] do |task, args|
mkdir task.name
end

def fetch(url, sha1, output)

puts "Downloading #{url}"
actual_sha1 = download(url, output)

if actual_sha1 != sha1
fail "SHA1 does not match (expected '#{sha1}' but got '#{actual_sha1}')"
end
end # def fetch

def file_fetch(url, sha1)
filename = File.basename( URI(url).path )
output = "vendor/#{filename}"
task output => [ "vendor/" ] do
begin
actual_sha1 = file_sha1(output)
if actual_sha1 != sha1
fetch(url, sha1, output)
end
rescue Errno::ENOENT
fetch(url, sha1, output)
end
end.invoke

return output
end

def file_sha1(path)
digest = Digest::SHA1.new
fd = File.new(path, "r")
while true
begin
digest << fd.sysread(16384)
rescue EOFError
break
end
end
return digest.hexdigest
ensure
fd.close if fd
end

def download(url, output)
uri = URI(url)
digest = Digest::SHA1.new
tmp = "#{output}.tmp"
Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http|
request = Net::HTTP::Get.new(uri.path)
http.request(request) do |response|
fail "HTTP fetch failed for #{url}. #{response}" if [200, 301].include?(response.code)
size = (response["content-length"].to_i || -1).to_f
count = 0
File.open(tmp, "w") do |fd|
response.read_body do |chunk|
fd.write(chunk)
digest << chunk
if size > 0 && $stdout.tty?
count += chunk.bytesize
$stdout.write(sprintf("\r%0.2f%%", count/size * 100))
end
end
end
$stdout.write("\r \r") if $stdout.tty?
end
end

File.rename(tmp, output)

return digest.hexdigest
rescue SocketError => e
puts "Failure while downloading #{url}: #{e}"
raise
ensure
File.unlink(tmp) if File.exist?(tmp)
end # def download

def untar(tarball, &block)
require "archive/tar/minitar"
tgz = Zlib::GzipReader.new(File.open(tarball))
# Pull out typesdb
tar = Archive::Tar::Minitar::Input.open(tgz)
tar.each do |entry|
path = block.call(entry)
next if path.nil?
parent = File.dirname(path)

mkdir_p parent unless File.directory?(parent)

# Skip this file if the output file is the same size
if entry.directory?
mkdir path unless File.directory?(path)
else
entry_mode = entry.instance_eval { @mode } & 0777
if File.exists?(path)
stat = File.stat(path)
# TODO(sissel): Submit a patch to archive-tar-minitar upstream to
# expose headers in the entry.
entry_size = entry.instance_eval { @size }
# If file sizes are same, skip writing.
next if stat.size == entry_size && (stat.mode & 0777) == entry_mode
end
puts "Extracting #{entry.full_name} from #{tarball} #{entry_mode.to_s(8)}"
File.open(path, "w") do |fd|
# eof? check lets us skip empty files. Necessary because the API provided by
# Archive::Tar::Minitar::Reader::EntryStream only mostly acts like an
# IO object. Something about empty files in this EntryStream causes
# IO.copy_stream to throw "can't convert nil into String" on JRuby
# TODO(sissel): File a bug about this.
while !entry.eof?
chunk = entry.read(16384)
fd.write(chunk)
end
#IO.copy_stream(entry, fd)
end
File.chmod(entry_mode, path)
end
end
tar.close
File.unlink(tarball) if File.file?(tarball)
end # def untar

def ungz(file)

outpath = file.gsub('.gz', '')
tgz = Zlib::GzipReader.new(File.open(file))
begin
File.open(outpath, "w") do |out|
IO::copy_stream(tgz, out)
end
File.unlink(file)
rescue
File.unlink(outpath) if File.file?(outpath)
raise
end
tgz.close
end

desc "Process any vendor files required for this plugin"
task "vendor" do |task, args|

@files.each do |file|
download = file_fetch(file['url'], file['sha1'])
if download =~ /.tar.gz/
prefix = download.gsub('.tar.gz', '').gsub('vendor/', '')
untar(download) do |entry|
if !file['files'].nil?
next unless file['files'].include?(entry.full_name.gsub(prefix, ''))
out = entry.full_name.split("/").last
end
File.join('vendor', out)
end
elsif download =~ /.gz/
ungz(download)
end
end

end

0 comments on commit 54bea30

Please sign in to comment.