Skip to content

Commit 9f9a631

Browse files
committed
parser_json: Add stream_buffer_size config param
Allow configuration of the size of the buffer that Yajl uses when parsing streaming input. The advantage of this is that when using `out_exec_filter`, and parsing as JSON, it's now possible to configure this plugin to avoid having to wait for 8092 bytes of data to be parsed before events are emitted. Configuration in the `out_exec_filter` tests has been modified to use this parameter, as it shaves 60 seconds off the test run time. Signed-off-by: Ben Wheatley <[email protected]>
1 parent fe59adc commit 9f9a631

File tree

4 files changed

+38
-1
lines changed

4 files changed

+38
-1
lines changed

lib/fluent/plugin/out_exec_filter.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class ExecFilterOutput < Output
9595
COMPAT_PARSE_PARAMS = {
9696
'out_format' => '@type',
9797
'out_keys' => 'keys',
98+
'out_stream_buffer_size' => 'stream_buffer_size',
9899
}
99100
COMPAT_EXTRACT_PARAMS = {
100101
'out_tag_key' => 'tag_key',

lib/fluent/plugin/parser_json.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ class JSONParser < Parser
3030
desc 'Set JSON parser'
3131
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj
3232

33+
# The Yajl library defines a default buffer size of 8092 when parsing
34+
# from IO streams, so maintain this for backwards-compatibility.
35+
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
36+
desc 'Set the buffer size that Yajl will use when parsing streaming input'
37+
config_param :stream_buffer_size, :integer, default: 8092
38+
3339
config_set_default :time_type, :float
3440

3541
def configure(conf)
@@ -81,7 +87,7 @@ def parse_io(io, &block)
8187
y.on_parse_complete = ->(record){
8288
block.call(parse_time(record), record)
8389
}
84-
y.parse(io)
90+
y.parse(io, @stream_buffer_size)
8591
end
8692
end
8793
end

test/plugin/test_out_exec_filter.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ def create_driver(conf)
328328
</format>
329329
<parse>
330330
@type json
331+
stream_buffer_size 1
331332
</parse>
332333
<extract>
333334
tag_key tag
@@ -338,6 +339,7 @@ def create_driver(conf)
338339
command cat
339340
in_keys message
340341
out_format json
342+
out_stream_buffer_size 1
341343
time_key time
342344
tag_key tag
343345
]
@@ -372,6 +374,7 @@ def create_driver(conf)
372374
</format>
373375
<parse>
374376
@type json
377+
stream_buffer_size 1
375378
</parse>
376379
<extract>
377380
tag_key tag
@@ -382,6 +385,7 @@ def create_driver(conf)
382385
command cat
383386
in_keys message
384387
out_format json
388+
out_stream_buffer_size 1
385389
time_key time
386390
tag_key tag
387391
]
@@ -414,6 +418,7 @@ def create_driver(conf)
414418
</format>
415419
<parse>
416420
@type json
421+
stream_buffer_size 1
417422
</parse>
418423
<extract>
419424
tag_key tag
@@ -426,6 +431,7 @@ def create_driver(conf)
426431
command cat
427432
in_keys message
428433
out_format json
434+
out_stream_buffer_size 1
429435
time_key time
430436
time_format %d/%b/%Y %H:%M:%S.%N %z
431437
tag_key tag

test/plugin/test_parser_json.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,28 @@ def test_parse_with_keep_time_key_without_time_format(data)
111111
assert_equal text, record['time']
112112
end
113113
end
114+
115+
def test_yajl_parse_io_with_buffer_smaller_than_input
116+
parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser)
117+
parser.configure(
118+
'keep_time_key' => 'true',
119+
'json_parser' => 'yajl',
120+
'stream_buffer_size' => 1,
121+
)
122+
text = "100"
123+
124+
waiting(5) do
125+
rd, wr = IO.pipe
126+
wr.write "{\"time\":\"#{text}\"}"
127+
128+
parser.instance.parse_io(rd) do |time, record|
129+
assert_equal text.to_i, time.sec
130+
assert_equal text, record['time']
131+
132+
# Once a record has been received the 'write' end of the pipe must be
133+
# closed, otherwise the test will block waiting for more input.
134+
wr.close
135+
end
136+
end
137+
end
114138
end

0 commit comments

Comments
 (0)