Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make shard exhaustion events a retry-able error. #148

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions lib/fluent/plugin/opensearch_error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
chunk.msgpack_each do |time, rawrecord|
bulk_message = ''
next unless rawrecord.is_a? Hash

begin
# we need a deep copy for process_message to alter
processrecord = Marshal.load(Marshal.dump(rawrecord))
Expand All @@ -95,6 +96,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[:bad_chunk_record] += 1
next
end

item = items.shift
if item.is_a?(Hash) && item.has_key?(@plugin.write_operation)
write_operation = @plugin.write_operation
Expand All @@ -111,6 +113,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[:errors_bad_resp] += 1
next
end

if item[write_operation].has_key?('status')
status = item[write_operation]['status']
else
Expand All @@ -119,25 +122,30 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[:errors_bad_resp] += 1
next
end

case
when [200, 201].include?(status)
stats[:successes] += 1
when CREATE_OP == write_operation && 409 == status
stats[:duplicates] += 1
when 400 == status
stats[:bad_argument] += 1
reason = ""
log_os_400_reason do
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
reason = " [error type]: #{item[write_operation]['error']['type']}"
error_type = item.dig(write_operation, 'error', 'type')
error_reason = item.dig(write_operation, 'error', 'reason')

# OS presents shard exhaustion as an exception, but this is 100% retryable...
if error_type == 'illegal_argument_exception' && error_reason =~ /would add \[\d+\] total shards, but this cluster/
retry_stream.add(time, rawrecord)
else
stats[:bad_argument] += 1
reason = ""
log_os_400_reason do
reason = " [error type]: #{error_type}" if error_type
reason += " [reason]: \'#{error_reason}\'" if error_reason
end
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason')
reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
if emit_error_label_event?
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
end
end
if emit_error_label_event?
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
end
else
if item[write_operation]['error'].is_a?(String)
reason = item[write_operation]['error']
Expand Down Expand Up @@ -172,11 +180,13 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[type] += 1
end
end

@plugin.log.on_debug do
msg = ["Indexed (op = #{@plugin.write_operation})"]
stats.each_pair { |key, value| msg << "#{value} #{key}" }
@plugin.log.debug msg.join(', ')
end

raise Fluent::Plugin::OpenSearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty?
end
end
62 changes: 49 additions & 13 deletions test/plugin/test_opensearch_error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,21 @@ def setup
def test_400_responses_reason_log
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse"
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse"
}
}
}
}
]
}))
]
}))
chunk = MockChunk.new(records)
dummy_extracted_values = []
@handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values)
Expand Down Expand Up @@ -371,6 +371,42 @@ def test_rejected_execution_exception_responses
end
end

def test_out_shard_exhaustion_responses
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "illegal_argument_exception",
"reason" : "Validation Failed: 1: this action would add [4] total shards, but this cluster currently has [998]/[1000] maximum shards open;"
}
}
}
]
}))

begin
failed = false
chunk = MockChunk.new(records)
dummy_extracted_values = []
@handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values)
rescue Fluent::Plugin::OpenSearchErrorHandler::OpenSearchRequestAbortError, Fluent::Plugin::OpenSearchOutput::RetryStreamError => e
failed = true
records = [].tap do |records|
next unless e.respond_to?(:retry_stream)
e.retry_stream.each {|time, record| records << record}
end
# should retry chunk when unrecoverable error is not thrown
assert_equal(1, records.length)
end
assert_true failed
end

def test_es_rejected_execution_exception_responses_as_not_error
plugin = TestPlugin.new(@log)
plugin.unrecoverable_error_types = ["out_of_memory_error"]
Expand Down