Skip to content

Commit

Permalink
Dynamic Parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
z-vr committed Sep 13, 2017
1 parent f0984cd commit 16c1514
Showing 1 changed file with 45 additions and 5 deletions.
50 changes: 45 additions & 5 deletions lib/logstash/inputs/http_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "socket" # for Socket.gethostname
require "manticore"
require "rufus/scheduler"
require "yaml" # persistence

class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
Expand Down Expand Up @@ -57,10 +58,15 @@ def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
end

private
def filter_dynamic_params(allowed_keys, params)
params.slice(*allowed_keys)
end

private
def normalize_request(url_or_spec)
if url_or_spec.is_a?(String)
res = [:get, url_or_spec]
res = [:get, url_or_spec, {}]
elsif url_or_spec.is_a?(Hash)
# The client will expect keys / values
spec = Hash[url_or_spec.clone.map {|k,v| [k.to_sym, v] }] # symbolize keys
Expand All @@ -77,17 +83,29 @@ def normalize_request(url_or_spec)
auth = spec[:auth]
user = spec.delete(:user) || (auth && auth["user"])
password = spec.delete(:password) || (auth && auth["password"])

if user.nil? ^ password.nil?
raise LogStash::ConfigurationError, "'user' and 'password' must both be specified for input HTTP poller!"
end

if user && password
spec[:auth] = {
user: user,
user: user,
pass: password,
eager: true
}
}

if spec.delete(:use_dynamic_params)
last_dynamic_params_location = spec[:last_dynamic_params]
dynamic_params_map = spec[:dynamic_params_map]

if last_dynamic_params_location.is_a?(String) && File.exist?(last_dynamic_params_location)
dynamic_params = YAML.load(File.read(last_dynamic_params_location))
allowed_keys = dynamic_params_map.is_a?(Hash) ? dynamic_params_map.keys : []
spec[:dynamic_params] = filter_dynamic_params(allowed_keys, dynamic_params)
else
spec[:dynamic_params] = {}
end
end
res = [method, url, spec]
else
Expand Down Expand Up @@ -133,13 +151,23 @@ def setup_schedule(queue)

@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
@scheduler.join
end

private
def assign_dynamic_params(request)
params = request[2][:dynamic_params]
request[2][:query] = {} if !request[2][:query]
params.keys.each do |key|
request[2][:query][key] = params[key]
end
end

def run_once(queue)
@requests.each do |name, request|
assign_dynamic_params(request) if request[2][:dynamic_params]
request_async(queue, name, request)
end

Expand Down Expand Up @@ -175,11 +203,23 @@ def handle_success(queue, name, request, response, execution_time)
end
end

private
def update_dynamic_params(request, event)
request[2][:dynamic_params_map].keys.each do |key|
value = request[2][:dynamic_params_map][key]
event_value = event.get(value)
request[2][:dynamic_params][key] = event_value if event_value
end
File.write(request[2][:last_dynamic_params], YAML.dump(request[2][:dynamic_params]))
end

private
def handle_decoded_event(queue, name, request, response, event, execution_time)
apply_metadata(event, name, request, response, execution_time)
decorate(event)
queue << event

update_dynamic_params(request, event) if request[2][:dynamic_params]
rescue StandardError, java.lang.Exception => e
@logger.error? && @logger.error("Error eventifying response!",
:exception => e,
Expand Down

0 comments on commit 16c1514

Please sign in to comment.