Skip to content

Commit 9b16097

Browse files
Nevins Bartolomeoph
authored andcommitted
cleanup, also fixing a bug where was causing creation of large number of unused files/directories
Fixes #102
1 parent d8c1d98 commit 9b16097

10 files changed

+84
-37
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
112112
# If you define file_size you have a number of files in consideration of the section and the current tag.
113113
# 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket,
114114
# for now the only thing this plugin can do is to put the file when logstash restart.
115-
config :time_file, :validate => :number, :default => 15 * 60
115+
config :time_file, :validate => :number, :default => 15
116116

117117
## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false".
118118
## This is hack for not destroy the new files after restoring the initial files.
@@ -136,10 +136,10 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
136136
config :prefix, :validate => :string, :default => ''
137137

138138
# Specify how many workers to use to upload the files to S3
139-
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round
139+
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil
140140

141141
# Number of items we can keep in the local queue before uploading them
142-
config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round
142+
config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).ceil
143143

144144
# The version of the S3 signature hash to use. Normally uses the internal client default, can be explicitly
145145
# specified here
@@ -349,7 +349,7 @@ def restore_from_crash
349349
Dir.glob(::File.join(@temporary_directory, "**/*")) do |file|
350350
if ::File.file?(file)
351351
key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR)
352-
temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"))
352+
temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1))
353353

354354
@logger.debug("Recovering from crash and uploading", :file => temp_file.path)
355355
@crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file))

lib/logstash/outputs/s3/file_repository.rb

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
require "concurrent/timer_task"
55
require "logstash/util"
66

7-
java_import "java.util.concurrent.ConcurrentHashMap"
7+
ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap
88

99
module LogStash
1010
module Outputs
@@ -34,6 +34,23 @@ def stale?
3434
def apply(prefix)
3535
return self
3636
end
37+
38+
def delete!
39+
with_lock{ |factory| factory.current.delete! }
40+
end
41+
end
42+
43+
class FactoryInitializer
44+
def initialize(tags, encoding, temporary_directory, stale_time)
45+
@tags = tags
46+
@encoding = encoding
47+
@temporary_directory = temporary_directory
48+
@stale_time = stale_time
49+
end
50+
51+
def apply(prefix_key)
52+
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
53+
end
3754
end
3855

3956
def initialize(tags, encoding, temporary_directory,
@@ -43,20 +60,15 @@ def initialize(tags, encoding, temporary_directory,
4360
# logtash after a crash we keep the remote structure
4461
@prefixed_factories = ConcurrentHashMap.new
4562

46-
@tags = tags
47-
@encoding = encoding
48-
@temporary_directory = temporary_directory
49-
50-
@stale_time = stale_time
5163
@sweeper_interval = sweeper_interval
5264

65+
@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)
66+
5367
start_stale_sweeper
5468
end
5569

5670
def keys
57-
arr = []
58-
@prefixed_factories.keys.each {|k| arr << k}
59-
arr
71+
@prefixed_factories.keySet
6072
end
6173

6274
def each_files
@@ -67,7 +79,7 @@ def each_files
6779

6880
# Return the file factory
6981
def get_factory(prefix_key)
70-
@prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory }
82+
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
7183
end
7284

7385
def get_file(prefix_key)
@@ -85,7 +97,7 @@ def size
8597
def remove_stale(k, v)
8698
if v.stale?
8799
@prefixed_factories.remove(k, v)
88-
v.with_lock{ |factor| factor.current.delete!}
100+
v.delete!
89101
end
90102
end
91103

lib/logstash/outputs/s3/temporary_file.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,21 @@ class TemporaryFile
1414

1515
def_delegators :@fd, *DELEGATES_METHODS
1616

17-
def initialize(key, fd)
17+
def initialize(key, fd, temp_path)
1818
@fd = fd
1919
@key = key
20+
@temp_path = temp_path
2021
@created_at = Time.now
2122
end
2223

2324
def ctime
2425
@created_at
2526
end
2627

28+
def temp_path
29+
@temp_path
30+
end
31+
2732
def key
2833
@key.gsub(/^\//, "")
2934
end
@@ -33,7 +38,8 @@ def key
3338
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
3439
# a sandbox.
3540
def delete!
36-
::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, ""))
41+
@fd.close
42+
::FileUtils.rm_rf(@temp_path, :secure => true)
3743
end
3844

3945
def empty?

lib/logstash/outputs/s3/temporary_file_factory.rb

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,17 @@ def initialize(prefix, tags, encoding, temporary_directory)
3131
@tags = tags
3232
@encoding = encoding
3333
@temporary_directory = temporary_directory
34+
@lock = Mutex.new
3435

3536
rotate!
3637
end
3738

3839
def rotate!
39-
@current = new_file
40-
increment_counter
41-
@current
40+
@lock.synchronize {
41+
@current = new_file
42+
increment_counter
43+
@current
44+
}
4245
end
4346

4447
private
@@ -71,18 +74,18 @@ def generate_name
7174
def new_file
7275
uuid = SecureRandom.uuid
7376
name = generate_name
74-
path = ::File.join(temporary_directory, uuid, prefix)
77+
path = ::File.join(temporary_directory, uuid)
7578
key = ::File.join(prefix, name)
7679

77-
FileUtils.mkdir_p(path)
80+
FileUtils.mkdir_p(::File.join(path, prefix))
7881

7982
io = if gzip?
80-
Zlib::GzipWriter.open(::File.join(path, name))
83+
Zlib::GzipWriter.open(::File.join(path, key))
8184
else
82-
::File.open(::File.join(path, name), FILE_MODE)
85+
::File.open(::File.join(path, key), FILE_MODE)
8386
end
8487

85-
TemporaryFile.new(key, io)
88+
TemporaryFile.new(key, io, path)
8689
end
8790
end
8891
end

spec/outputs/s3/file_repository_spec.rb

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,22 @@
3434
end
3535
end
3636

37+
it "returns the same file for the same dynamic prefix key" do
38+
prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/"
39+
event = LogStash::Event.new({ "type" => "syslog"})
40+
key = event.sprintf(prefix)
41+
file_path = nil
42+
43+
44+
subject.get_file(key) do |file|
45+
file_path = file.path
46+
end
47+
48+
subject.get_file(key) do |file|
49+
expect(file.path).to eq(file_path)
50+
end
51+
end
52+
3753
it "returns different file for different prefix keys" do
3854
file_path = nil
3955

@@ -72,21 +88,27 @@
7288

7389
it "returns all available keys" do
7490
subject.get_file(prefix_key) { |file| file.write("something") }
75-
expect(subject.keys).to eq([prefix_key])
91+
expect(subject.keys.toArray).to eq([prefix_key])
7692
end
7793

7894
it "clean stale factories" do
79-
file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
80-
expect(file_repository.size).to eq(0)
81-
file_repository.get_factory(prefix_key) do |factory|
95+
@file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
96+
expect(@file_repository.size).to eq(0)
97+
path = ""
98+
@file_repository.get_factory(prefix_key) do |factory|
8299
factory.current.write("hello")
83100
# force a rotation so we get an empty file that will get stale.
84101
factory.rotate!
102+
path = factory.current.temp_path
85103
end
86104

87-
file_repository.get_file("another-prefix") { |file| file.write("hello") }
88-
expect(file_repository.size).to eq(2)
89-
try(10) { expect(file_repository.size).to eq(1) }
105+
@file_repository.get_file("another-prefix") { |file| file.write("hello") }
106+
expect(@file_repository.size).to eq(2)
107+
@file_repository.keys.each do |k|
108+
puts k
109+
end
110+
try(10) { expect(@file_repository.size).to eq(1) }
111+
expect(File.directory?(path)).to be_falsey
90112
end
91113
end
92114

spec/outputs/s3/size_and_time_rotation_policy_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
let(:time_file) { 1 }
99
subject { described_class.new(file_size, time_file) }
1010

11+
let(:temporary_directory) { Stud::Temporary.pathname }
1112
let(:temporary_file) { Stud::Temporary.file }
1213
let(:name) { "foobar" }
1314
let(:content) { "hello" * 1000 }
14-
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) }
15+
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) }
1516

1617
it "raises an exception if the `time_file` is set to 0" do
1718
expect { described_class.new(100, 0) }.to raise_error(LogStash::ConfigurationError, /time_file/)

spec/outputs/s3/size_rotation_policy_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
describe LogStash::Outputs::S3::SizeRotationPolicy do
88
subject { described_class.new(size_file) }
99

10+
let(:temporary_directory) { Stud::Temporary.directory }
1011
let(:temporary_file) { Stud::Temporary.file }
1112
let(:name) { "foobar" }
1213
let(:content) { "hello" * 1000 }
1314
let(:size_file) { 10 } # in bytes
14-
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) }
15+
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) }
1516

1617
it "returns true if the size on disk is higher than the `size_file`" do
1718
file.write(content)

spec/outputs/s3/temporary_file_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
FileUtils.mkdir_p(::File.join(temporary_directory, uuid))
1717
end
1818

19-
subject { described_class.new(key, temporary_file) }
19+
subject { described_class.new(key, temporary_file, temporary_directory) }
2020

2121
it "returns the key of the file" do
2222
expect(subject.key).to eq(key)

spec/outputs/s3/time_rotation_policy_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
subject { described_class.new(max_time) }
88

99
let(:max_time) { 1 }
10+
let(:temporary_directory) { Stud::Temporary.directory }
1011
let(:temporary_file) { Stud::Temporary.file }
1112
let(:name) { "foobar" }
1213
let(:content) { "hello" * 1000 }
13-
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) }
14+
let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) }
1415

1516
it "raises an exception if the `file_time` is set to 0" do
1617
expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/)

spec/outputs/s3/uploader_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
let(:bucket_name) { "foobar-bucket" }
1212
let(:client) { Aws::S3::Client.new(stub_responses: true) }
1313
let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) }
14+
let(:temporary_directory) { Stud::Temporary.pathname }
1415
let(:temporary_file) { Stud::Temporary.file }
1516
let(:key) { "foobar" }
1617
let(:upload_options) { {} }
@@ -24,7 +25,7 @@
2425
end
2526

2627
let(:file) do
27-
f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file)
28+
f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file, temporary_directory)
2829
f.write("random content")
2930
f.fsync
3031
f

0 commit comments

Comments
 (0)