From 8d797b2be5ded0a0e29e0b20b42596c709e3e095 Mon Sep 17 00:00:00 2001 From: Anton Date: Wed, 13 Sep 2017 13:49:54 +0100 Subject: [PATCH] Dynamic Params --- lib/logstash/inputs/http_poller.rb | 51 +++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/lib/logstash/inputs/http_poller.rb b/lib/logstash/inputs/http_poller.rb index 4b2e292..b959ee6 100644 --- a/lib/logstash/inputs/http_poller.rb +++ b/lib/logstash/inputs/http_poller.rb @@ -5,6 +5,7 @@ require "socket" # for Socket.gethostname require "manticore" require "rufus/scheduler" +require "yaml" # persistence class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base include LogStash::PluginMixins::HttpClient @@ -57,10 +58,15 @@ def setup_requests! @requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }] end + private + def filter_dynamic_params(allowed_keys, params) + params.slice(*allowed_keys) + end + private def normalize_request(url_or_spec) if url_or_spec.is_a?(String) - res = [:get, url_or_spec] + res = [:get, url_or_spec, {}] elsif url_or_spec.is_a?(Hash) # The client will expect keys / values spec = Hash[url_or_spec.clone.map {|k,v| [k.to_sym, v] }] # symbolize keys @@ -77,17 +83,30 @@ def normalize_request(url_or_spec) auth = spec[:auth] user = spec.delete(:user) || (auth && auth["user"]) password = spec.delete(:password) || (auth && auth["password"]) - + if user.nil? ^ password.nil? raise LogStash::ConfigurationError, "'user' and 'password' must both be specified for input HTTP poller!" end if user && password spec[:auth] = { - user: user, + user: user, pass: password, eager: true - } + } + end + + if spec.delete(:use_dynamic_params) + last_dynamic_params_location = spec[:last_dynamic_params] + dynamic_params_map = spec[:dynamic_params_map] + + if last_dynamic_params_location.is_a?(String) && File.exist?(last_dynamic_params_location) + dynamic_params = YAML.load(File.read(last_dynamic_params_location)) + allowed_keys = dynamic_params_map.is_a?(Hash) ? dynamic_params_map.keys : [] + spec[:dynamic_params] = filter_dynamic_params(allowed_keys, dynamic_params) + else + spec[:dynamic_params] = {} + end end res = [method, url, spec] else @@ -133,13 +152,23 @@ def setup_schedule(queue) @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead - opts = schedule_type == "every" ? { :first_in => 0.01 } : {} + opts = schedule_type == "every" ? { :first_in => 0.01 } : {} @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) } @scheduler.join end + private + def assign_dynamic_params(request) + params = request[2][:dynamic_params] + request[2][:query] = {} if !request[2][:query] + params.keys.each do |key| + request[2][:query][key] = params[key] + end + end + def run_once(queue) @requests.each do |name, request| + assign_dynamic_params(request) if request[2][:dynamic_params] request_async(queue, name, request) end @@ -175,11 +204,23 @@ def handle_success(queue, name, request, response, execution_time) end end + private + def update_dynamic_params(request, event) + request[2][:dynamic_params_map].keys.each do |key| + value = request[2][:dynamic_params_map][key] + event_value = event.get(value) + request[2][:dynamic_params][key] = event_value if event_value + end + File.write(request[2][:last_dynamic_params], YAML.dump(request[2][:dynamic_params])) + end + private def handle_decoded_event(queue, name, request, response, event, execution_time) apply_metadata(event, name, request, response, execution_time) decorate(event) queue << event + + update_dynamic_params(request, event) if request[2][:dynamic_params] rescue StandardError, java.lang.Exception => e @logger.error? && @logger.error("Error eventifying response!", :exception => e,