diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 80709c12bb..6ba36c4318 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than config_param :queued_chunks_limit_size, :integer, default: nil desc 'Compress buffered data.' - config_param :compress, :enum, list: [:text, :gzip], default: :text + config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text desc 'If true, chunks are thrown away when unrecoverable error happens' config_param :disable_chunk_backup, :bool, default: false diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 19c0afa8ef..d74aea1760 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -29,11 +29,12 @@ class FileOutput < Output helpers :formatter, :inject, :compat_parameters - SUPPORTED_COMPRESS = [:text, :gz, :gzip] + SUPPORTED_COMPRESS = [:text, :gz, :gzip, :zstd] SUPPORTED_COMPRESS_MAP = { text: nil, gz: :gzip, gzip: :gzip, + zstd: :zstd, } DEFAULT_TIMEKEY = 60 * 60 * 24 @@ -216,9 +217,9 @@ def write(chunk) method(:write_without_compression) when @compress_method == :gzip if @buffer.compress != :gzip || @recompress - method(:write_gzip_with_compression) + method(:write_with_compression) else - method(:write_gzip_from_gzipped_chunk) + method(:write_from_compressed_chunk) end else raise "BUG: unknown compression method #{@compress_method}" @@ -253,7 +254,7 @@ def write_without_compression(path, chunk) end end - def write_gzip_with_compression(path, chunk) + def write_with_compression(path, chunk) File.open(path, "ab", @file_perm) do |f| gz = Zlib::GzipWriter.new(f) chunk.write_to(gz, compressed: :text) @@ -261,7 +262,7 @@ def write_gzip_with_compression(path, chunk) end end - def write_gzip_from_gzipped_chunk(path, chunk) + def write_from_compressed_chunk(path, chunk) File.open(path, "ab", @file_perm) do |f| chunk.write_to(f, compressed: :gzip) end @@ -280,6 +281,7 @@ def timekey_to_timeformat(timekey) def compression_suffix(compress) case compress when :gzip then '.gz' + when :zstd then '.zstd' when nil then '' else raise ArgumentError, "unknown compression type #{compress}" diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0aed67db95..4e03d5ce54 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1014,13 +1014,17 @@ def write_guard(&block) end FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } - FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) } FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } - FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) } def generate_format_proc if @buffer && @buffer.compress == :gzip - @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM + @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP : FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP + elsif @buffer && @buffer.compress == :zstd + @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD : FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index c9cc649f0a..5923ee0937 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1010,13 +1010,25 @@ def waiting(seconds) test 'when output has and compress is gzip' do i = create_output(:buffered) i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'gzip'})])) - assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, i.generate_format_proc + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP, i.generate_format_proc end test 'when output has and compress is gzip and time_as_integer is true' do i = create_output(:buffered) i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'gzip'})])) - assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, i.generate_format_proc + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP, i.generate_format_proc + end + + test 'when output has and compress is zstd' do + i = create_output(:buffered) + i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'zstd'})])) + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD, i.generate_format_proc + end + + test 'when output has and compress is zstd and time_as_integer is true' do + i = create_output(:buffered) + i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'zstd'})])) + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD, i.generate_format_proc end test 'when output has and compress is text' do