Skip to content

Commit b127992

Browse files
nigoelelyscape
authored andcommitted
Incorporating the review comments for the dynamic prefix based s3 outputs
1 parent b6d430d commit b127992

File tree

1 file changed

+13
-19
lines changed
  • lib/logstash/outputs

1 file changed

+13
-19
lines changed

lib/logstash/outputs/s3.rb

+13-19
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
require "thread"
99
require "tmpdir"
1010
require "fileutils"
11-
require 'pathname'
11+
require "pathname"
1212

1313

1414
# INFORMATION:
@@ -61,7 +61,7 @@
6161
# time_file => 5 (optional)
6262
# format => "plain" (optional)
6363
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
64-
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
64+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no events for the prefix, before cleaning up the watch on that)
6565
# }
6666
#
6767
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -155,8 +155,8 @@ def write_on_bucket(file)
155155
# find and use the bucket
156156
bucket = @s3.buckets[@bucket]
157157

158-
first = Pathname.new @temporary_directory
159-
second = Pathname.new file
158+
first = Pathname.new(@temporary_directory)
159+
second = Pathname.new(file)
160160

161161
remote_filename_path = second.relative_path_from first
162162

@@ -187,11 +187,9 @@ def create_temporary_file(prefix)
187187
@tempfile[prefix].close
188188
end
189189

190-
if @prefixes.include? prefix
190+
if @prefixes.include?(prefix)
191191
dirname = File.dirname(filename)
192-
unless File.directory?(dirname)
193-
FileUtils.mkdir_p(dirname)
194-
end
192+
FileUtils.mkdir_p(dirname) unless File.directory?(dirname)
195193
@logger.debug("S3: Creating a new temporary file", :filename => filename)
196194
@tempfile[prefix] = File.open(filename, "a")
197195
end
@@ -226,8 +224,6 @@ def register
226224

227225
test_s3_write
228226
restore_from_crashes if @restore == true
229-
#reset_page_counter
230-
#create_temporary_file
231227
configure_periodic_rotation if time_file != 0
232228
configure_upload_workers
233229

@@ -270,7 +266,7 @@ def restore_from_crashes
270266
end
271267

272268
public
273-
def shouldcleanup(prefix)
269+
def need_cleanup?(prefix)
274270
return @empty_uploads[prefix] > @no_event_wait
275271
end
276272

@@ -281,8 +277,7 @@ def move_file_to_bucket(file)
281277

282278
basepath = Pathname.new @temporary_directory
283279
dirname = Pathname.new File.dirname(file)
284-
prefixpath = dirname.relative_path_from basepath
285-
prefix = prefixpath.to_s
280+
prefix = dirname.relative_path_from(basepath).to_s
286281
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
287282

288283
if !File.zero?(file)
@@ -308,9 +303,8 @@ def move_file_to_bucket(file)
308303
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
309304
end
310305

311-
if shouldcleanup(prefix)
312-
cleanprefix(prefix)
313-
end
306+
clean_prefix(prefix) if need_cleanup?(prefix)
307+
314308
end
315309

316310
public
@@ -369,7 +363,7 @@ def close
369363
shutdown_upload_workers
370364
@periodic_rotation_thread.stop! if @periodic_rotation_thread
371365

372-
for prefix in @prefixes
366+
@prefixes.each do |prefix|
373367
@file_rotation_lock[prefix].synchronize do
374368
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
375369
end
@@ -385,7 +379,7 @@ def shutdown_upload_workers
385379
private
386380
def handle_event(encoded_event, event)
387381
actualprefix = event.sprintf(@prefix)
388-
if not @prefixes.to_a().include? actualprefix
382+
if !@prefixes.include? actualprefix
389383
@file_rotation_lock[actualprefix] = Mutex.new
390384
@prefixes.add(actualprefix)
391385
reset_page_counter(actualprefix)
@@ -425,7 +419,7 @@ def configure_periodic_rotation
425419
end
426420

427421
private
428-
def cleanprefix(prefix)
422+
def clean_prefix(prefix)
429423
path = File.join(@temporary_directory, prefix)
430424
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
431425
@file_rotation_lock[prefix].synchronize do

0 commit comments

Comments
 (0)