diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0b70e36..9370851d 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -181,22 +181,15 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) - @bulk_response_metrics.increment(response.code.to_s) - case response.code - when 200 # OK + if response.code == 200 LogStash::Json.load(response.body) - when 413 # Payload Too Large - logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') else + logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) if response.code == 413 url = ::LogStash::Util::SafeURI.new(response.final_url) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body - ) + raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(response.code, url, body_stream.to_s, response.body) end end @@ -414,13 +407,21 @@ def exists?(path, use_get=false) end def template_exists?(template_endpoint, name) - exists?("/#{template_endpoint}/#{name}") + response = @pool.get("/#{template_endpoint}/#{name}") + return true if response.code >= 200 && response.code <= 299 + return false if response.code == 404 + url = ::LogStash::Util::SafeURI.new(response.final_url) + raise BadResponseCodeError.new(response.code, url, nil, response.body) end def template_put(template_endpoint, name, template) - path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) - @pool.put(path, nil, LogStash::Json.dump(template)) + path = "#{template_endpoint}/#{name}" + response = @pool.put(path, nil, LogStash::Json.dump(template)) + if response.code < 200 || response.code > 299 + url = ::LogStash::Util::SafeURI.new(response.final_url) + raise BadResponseCodeError.new(response.code, url, template, response.body) + end end # ILM methods @@ -432,16 +433,14 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition) - begin - @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) - logger.info("Created rollover alias", name: alias_name) - # If the rollover alias already exists, ignore the error that comes back from Elasticsearch - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 400 - logger.info("Rollover alias already exists, skipping", name: alias_name) - return - end - raise e + response = @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + if response.code == 400 + logger.info("Rollover alias already exists, skipping", name: alias_name) + return + end + unless rresponse.code >= 200 && response.code <= 299 + url = ::LogStash::Util::SafeURI.new(response.final_url) + raise BadResponseCodeError.new(response.code, url, alias_definition, response.body) end end diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index 1d9feb71..ef64a5da 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -76,15 +76,6 @@ def perform_request(url, method, path, params={}, body=nil) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string) end - # 404s are excluded because they are valid codes in the case of - # template installation. 413s are excluded to allow the bulk_send - # error handling to process "Payload Too Large" responses rather - # than triggering retries. - code = resp.code - if code < 200 || (code > 299 && ![404, 413].include?(code)) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) - end - resp end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index fb3194cc..3a38e002 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -253,13 +253,12 @@ def get_license(url) def health_check_request(url) logger.debug("Running health check to see if an Elasticsearch connection is working", :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path) - begin - response = perform_request_to_url(url, :head, @healthcheck_path) - return response, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) - return nil, e + response = perform_request_to_url(url, :head, @healthcheck_path) + if response.code < 200 || response.code > 299 + logger.warn("Health check failed", code: response.code, url: url.sanitized.to_s) + return nil, BadResponseCodeError.new(response.code, url, nil, response.body) end + return response, nil end def healthcheck!(register_phase = true) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 977ef204..c789dbfc 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -7,8 +7,12 @@ module Common attr_reader :hosts - # These codes apply to documents, not at the request level - DOC_DLQ_CODES = [400, 404] + # These codes apply to documents, not at the request level. While the 413 error technically is at the request level + # it should be treated like an error at the document level. Specifically when the payload is too large it is wholesale + # not accepted by ES, in this case it is dumped to DLQ and not retried. Note that this applies to batches or a single message, + # if the batch size results in a 413 due to exceeding ES limit *all* events in the batch are rejected, regardless of whether + # the individual parts that were rejected would have been accepted. + DOC_DLQ_CODES = [400, 404, 413] DOC_SUCCESS_CODES = [200, 201] DOC_CONFLICT_CODE = 409