Skip to content

Commit 350ba7f

Browse files
committed
make sure the test pass when using java classes
Fixes #102
1 parent 9b16097 commit 350ba7f

25 files changed

+278
-63
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
require "fileutils"
1212
require "set"
1313
require "pathname"
14+
require "aws-sdk"
15+
require "logstash/outputs/s3/patch"
1416

17+
Aws.eager_autoload!
1518

1619
# INFORMATION:
1720
#
@@ -118,7 +121,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
118121
## This is hack for not destroy the new files after restoring the initial files.
119122
## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket,
120123
## for example if you have single Instance.
121-
config :restore, :validate => :boolean, :default => false
124+
config :restore, :validate => :boolean, :default => true
122125

123126
# The S3 canned ACL to use when putting the file. Defaults to "private".
124127
config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"],
@@ -191,20 +194,21 @@ def register
191194
@file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)
192195

193196
@rotation = rotation_strategy
194-
@uploader = Uploader.new(bucket_resource, @logger, Concurrent::ThreadPoolExecutor.new({
195-
:min_threads => 1,
196-
:max_threads => @upload_workers_count,
197-
:max_queue => @upload_queue_size,
198-
:fallback_policy => :caller_runs
199-
}))
197+
198+
executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1,
199+
:max_threads => @upload_workers_count,
200+
:max_queue => @upload_queue_size,
201+
:fallback_policy => :caller_runs })
202+
203+
@uploader = Uploader.new(bucket_resource, @logger, executor)
200204

201205
# Restoring from crash will use a new threadpool to slowly recover
202206
# New events should have more priority.
203207
restore_from_crash if @restore
204208

205209
# If we need time based rotation we need to do periodic check on the file
206210
# to take care of file that were not updated recently
207-
start_periodic_check if @rotation.need_periodic?
211+
start_periodic_check if @rotation.needs_periodic?
208212
end
209213

210214
def multi_receive_encoded(events_and_encoded)
@@ -229,7 +233,7 @@ def multi_receive_encoded(events_and_encoded)
229233
end
230234

231235
def close
232-
stop_periodic_check if @rotation.need_periodic?
236+
stop_periodic_check if @rotation.needs_periodic?
233237

234238
@logger.debug("Uploading current workspace")
235239

@@ -294,7 +298,8 @@ def upload_options
294298

295299
def rotate_if_needed(prefixes)
296300
prefixes.each do |prefix|
297-
# Each file access is thread safe, until the rotation is done then only
301+
# Each file access is thread safe,
302+
# until the rotation is done then only
298303
# one thread has access to the resource.
299304
@file_repository.get_factory(prefix) do |factory|
300305
temp_file = factory.current
@@ -316,7 +321,7 @@ def upload_file(temp_file)
316321
@logger.debug("Queue for upload", :path => temp_file.path)
317322

318323
# if the queue is full the calling thread will be used to upload
319-
temp_file.fsync # make sure we flush the fd before uploading it.
324+
temp_file.close # make sure the content is on disk
320325
if temp_file.size > 0
321326
@uploader.upload_async(temp_file,
322327
:on_complete => method(:clean_temporary_file),
@@ -346,14 +351,12 @@ def restore_from_crash
346351
@crash_uploader = Uploader.new(bucket_resource, @logger, CRASH_RECOVERY_THREADPOOL)
347352

348353
temp_folder_path = Pathname.new(@temporary_directory)
349-
Dir.glob(::File.join(@temporary_directory, "**/*")) do |file|
350-
if ::File.file?(file)
351-
key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR)
352-
temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1))
353-
354-
@logger.debug("Recovering from crash and uploading", :file => temp_file.path)
355-
@crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file))
356-
end
354+
Dir.glob(::File.join(@temporary_directory, "**/*"))
355+
.select { |file| ::File.file?(file) }
356+
.each do |file|
357+
temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
358+
@logger.debug("Recovering from crash and uploading", :file => temp_file.path)
359+
@crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file))
357360
end
358361
end
359362
end

lib/logstash/outputs/s3/file_repository.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ class FileRepository
1515
# Ensure that all access or work done
1616
# on a factory is threadsafe
1717
class PrefixedValue
18-
def initialize(factory, stale_time)
19-
@factory = factory
18+
def initialize(file_factory, stale_time)
19+
@file_factory = file_factory
2020
@lock = Mutex.new
2121
@stale_time = stale_time
2222
end
2323

2424
def with_lock
2525
@lock.synchronize {
26-
yield @factory
26+
yield @file_factory
2727
}
2828
end
2929

lib/logstash/outputs/s3/patch.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# This is patch related to the autoloading and ruby
2+
#
3+
# The fix exist in jruby 9k but not in the current jruby, not sure when or it will be backported
4+
# https://github.com/jruby/jruby/issues/3645
5+
#
6+
# AWS is doing tricky name discovery in the module to generate the correct error class and
7+
# this strategy is bogus in jruby and `eager_autoload` don't fix this issue.
8+
#
9+
# This will be a short lived patch since AWS is removing the need.
10+
# see: https://github.com/aws/aws-sdk-ruby/issues/1301#issuecomment-261115960
11+
old_stderr = $stderr
12+
13+
$stderr = StringIO.new
14+
begin
15+
module Aws
16+
const_set(:S3, Aws::S3)
17+
end
18+
ensure
19+
$stderr = old_stderr
20+
end
21+
22+

lib/logstash/outputs/s3/size_and_time_rotation_policy.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def rotate?(file)
1515
@size_strategy.rotate?(file) || @time_strategy.rotate?(file)
1616
end
1717

18-
def need_periodic?
18+
def needs_periodic?
1919
true
2020
end
2121
end

lib/logstash/outputs/s3/size_rotation_policy.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def rotate?(file)
1717
file.size >= size_file
1818
end
1919

20-
def need_periodic?
20+
def needs_periodic?
2121
false
2222
end
2323
end

lib/logstash/outputs/s3/temporary_file.rb

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ class S3
1010
# It make it more OOP and easier to reason with the paths.
1111
class TemporaryFile
1212
extend Forwardable
13-
DELEGATES_METHODS = [:path, :write, :close, :size, :fsync]
1413

15-
def_delegators :@fd, *DELEGATES_METHODS
14+
def_delegators :@fd, :path, :write, :close, :fsync
15+
16+
attr_reader :fd
1617

1718
def initialize(key, fd, temp_path)
1819
@fd = fd
@@ -29,6 +30,17 @@ def temp_path
2930
@temp_path
3031
end
3132

33+
def size
34+
# Use the fd size to get the accurate result,
35+
# so we dont have to deal with fsync
36+
# if the file is close we will use the File::size
37+
begin
38+
@fd.size
39+
rescue IOError
40+
::File.size(path)
41+
end
42+
end
43+
3244
def key
3345
@key.gsub(/^\//, "")
3446
end
@@ -45,6 +57,14 @@ def delete!
4557
def empty?
4658
size == 0
4759
end
60+
61+
def self.create_from_existing_file(file_path, temporary_folder)
62+
key_parts = Pathname.new(file_path).relative_path_from(temporary_folder).to_s.split(::File::SEPARATOR)
63+
64+
TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"),
65+
::File.open(file_path, "r"),
66+
::File.join(temporary_folder, key_parts.slice(0, 1)))
67+
end
4868
end
4969
end
5070
end

lib/logstash/outputs/s3/temporary_file_factory.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
require "socket"
33
require "securerandom"
44
require "fileutils"
5+
require "zlib"
6+
require "forwardable"
57

68
module LogStash
79
module Outputs
@@ -80,13 +82,41 @@ def new_file
8082
FileUtils.mkdir_p(::File.join(path, prefix))
8183

8284
io = if gzip?
83-
Zlib::GzipWriter.open(::File.join(path, key))
85+
# We have to use this wrapper because we cannot access the size of the
86+
# file directly on the gzip writer.
87+
IOWrappedGzip.new(::File.open(::File.join(path, key), FILE_MODE))
8488
else
8589
::File.open(::File.join(path, key), FILE_MODE)
8690
end
8791

8892
TemporaryFile.new(key, io, path)
8993
end
94+
95+
class IOWrappedGzip
96+
extend Forwardable
97+
98+
def_delegators :@gzip_writer, :write, :close
99+
attr_reader :file_io, :gzip_writer
100+
101+
def initialize(file_io)
102+
@file_io = file_io
103+
@gzip_writer = Zlib::GzipWriter.open(file_io)
104+
end
105+
106+
def path
107+
@gzip_writer.to_io.path
108+
end
109+
110+
def size
111+
# to get the current file size
112+
@gzip_writer.flush
113+
@gzip_writer.to_io.size
114+
end
115+
116+
def fsync
117+
@gzip_writer.to_io.fsync
118+
end
119+
end
90120
end
91121
end
92122
end

lib/logstash/outputs/s3/time_rotation_policy.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def rotate?(file)
1717
file.size > 0 && (Time.now - file.ctime) >= time_file
1818
end
1919

20-
def need_periodic?
20+
def needs_periodic?
2121
true
2222
end
2323
end

lib/logstash/outputs/s3/uploader.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# encoding: utf-8
22
require "logstash/util"
3-
require "aws-sdk-resources"
3+
require "aws-sdk"
44

55
module LogStash
66
module Outputs
@@ -42,8 +42,7 @@ def upload(file, options = {})
4242
#
4343
# Thread might be stuck here, but I think its better than losing anything
4444
# its either a transient errors or something bad really happened.
45-
sleep(TIME_BEFORE_RETRYING_SECONDS)
46-
logger.error("Uploading failed, retrying", :exception => e, :path => file.path)
45+
logger.error("Uploading failed, retrying", :exception => e, :path => file.path, :backtrace => e.backtrace)
4746
retry
4847
end
4948

spec/integration/dynamic_prefix_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
end
8686

8787
it "creates dated path" do
88-
re = /^#{sandbox}\/\d{4}-\d{2}-\d{2}\/ls\.s3\./
88+
re = /^#{sandbox}\/\d{4}-\d{2}-\d{1,2}\/ls\.s3\./
8989
expect(bucket_resource.objects(:prefix => sandbox).first.key).to match(re)
9090
end
9191
end

spec/integration/gzip_file_spec.rb

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# encoding: utf-8
2+
require_relative "../spec_helper"
3+
require "logstash/outputs/s3"
4+
require "logstash/codecs/line"
5+
require "stud/temporary"
6+
7+
describe "Gzip File Time rotation with constant write", :integration => true do
8+
include_context "setup plugin"
9+
10+
let(:time_file) { 0.004 }
11+
let(:options) { main_options.merge({ "encoding" => "gzip",
12+
"rotation_strategy" => "time" }) }
13+
let(:number_of_events) { 5000 }
14+
let(:batch_size) { 125 }
15+
let(:event_encoded) { "Hello world" }
16+
let(:batch) do
17+
b = {}
18+
number_of_events.times do
19+
event = LogStash::Event.new({ "message" => event_encoded })
20+
b[event] = "#{event_encoded}\n"
21+
end
22+
b
23+
end
24+
let(:minimum_number_of_time_rotation) { 3 }
25+
let(:batch_step) { (number_of_events / minimum_number_of_time_rotation).ceil }
26+
27+
before do
28+
clean_remote_files(prefix)
29+
subject.register
30+
31+
# simulate batch read/write
32+
batch.each_slice(batch_step) do |batch_time|
33+
batch_time.each_slice(batch_size) do |smaller_batch|
34+
subject.multi_receive_encoded(smaller_batch)
35+
end
36+
sleep(1)
37+
end
38+
39+
subject.close
40+
end
41+
42+
it "creates multiples files" do
43+
# using close will upload the current file
44+
expect(bucket_resource.objects(:prefix => prefix).count).to be_between(minimum_number_of_time_rotation, minimum_number_of_time_rotation + 1).inclusive
45+
end
46+
47+
it "Persists all events" do
48+
download_directory = Stud::Temporary.pathname
49+
50+
FileUtils.rm_rf(download_directory)
51+
FileUtils.mkdir_p(download_directory)
52+
53+
counter = 0
54+
bucket_resource.objects(:prefix => prefix).each do |object|
55+
target = File.join(download_directory, "#{counter}.gz")
56+
object.get(:response_target => target)
57+
counter += 1
58+
end
59+
60+
expect(Dir.glob(File.join(download_directory, "**", "*.gz")).inject(0) { |sum, f| sum + Zlib::GzipReader.new(File.open(f)).readlines.size }).to eq(number_of_events)
61+
end
62+
end

0 commit comments

Comments
 (0)