Skip to content

Commit bcdb0df

Browse files
committed
refactoring for dynamic prefixes and new concurrency model :shared
**Motivation** One of the most requested features was adding a way to add dynamic prefixes using the fieldref syntax for the files on the bucket and also the changes in the pipeline to support shared delegator. The S3 output by nature was always a single threaded writes but had multiples workers to process the upload, the code was threadsafe when used in the concurrency `:single` mode. This PR addresses a few problems and provide shorter and more structured code: - This Plugin now uses the V2 version of the SDK, this make sure we receive the latest updates and changes. - We now uses S3's `upload_file` instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big. - You can now use the `fieldref` syntax in the prefix to dynamically changes the target with the events it receives. - The Upload queue is now a bounded list, this options is necessary to allow back pressure to be communicated back to the pipeline but its configurable by the user. - If the queue is full the plugin will start the upload in the current thread. - The plugin now threadsafe and support the concurrency model `shared` - The rotation strategy can be selected, the recommended is `size_and_time` that will check for both the configured limits (`size` and `time` are also available) - The `restore` option will now use a separate threadpool with an unbounded queue - The `restore` option will not block the launch of logstash and will uses less resources than the real time path - The plugin now uses `multi_receive_encode`, this will optimize the writes to the files - rotate operation are now batched to reduce the number of IO calls. - Empty file will not be uploaded by any rotation rotation strategy - We now use Concurrent-Ruby for the implementation of the java executor - If you have finer grain permission on prefixes or want faster boot, you can disable the credentials check with `validate_credentials_on_root_bucket` - The credentials check will no longer fails if we can't delete the file - We now have a full suite of integration test for all the defined rotation Fixes: #4 #81 #44 #59 #50 Fixes #102
1 parent f0365ed commit bcdb0df

32 files changed

+1693
-751
lines changed

lib/logstash/outputs/s3.rb

+185-308
Large diffs are not rendered by default.
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# encoding: utf-8
2+
require "concurrent"
3+
require "concurrent/map"
4+
require "concurrent/timer_task"
5+
require "logstash/util"
6+
7+
module LogStash
8+
module Outputs
9+
class S3
10+
class FileRepository
11+
DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60
12+
DEFAULT_STALE_TIME_SECS = 15 * 60
13+
# Ensure that all access or work done
14+
# on a factory is threadsafe
15+
class PrefixedValue
16+
def initialize(factory, stale_time)
17+
@factory = factory
18+
@lock = Mutex.new
19+
@stale_time = stale_time
20+
end
21+
22+
def with_lock
23+
@lock.synchronize {
24+
yield @factory
25+
}
26+
end
27+
28+
def stale?
29+
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
30+
end
31+
end
32+
33+
def initialize(tags, encoding, temporary_directory,
34+
stale_time = DEFAULT_STALE_TIME_SECS,
35+
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
36+
# The path need to contains the prefix so when we start
37+
# logtash after a crash we keep the remote structure
38+
@prefixed_factories = Concurrent::Map.new
39+
40+
@tags = tags
41+
@encoding = encoding
42+
@temporary_directory = temporary_directory
43+
44+
@stale_time = stale_time
45+
@sweeper_interval = sweeper_interval
46+
47+
start_stale_sweeper
48+
end
49+
50+
def keys
51+
@prefixed_factories.keys
52+
end
53+
54+
def each_files
55+
@prefixed_factories.each_value do |prefixed_file|
56+
prefixed_file.with_lock { |factory| yield factory.current }
57+
end
58+
end
59+
60+
# Return the file factory
61+
def get_factory(prefix_key)
62+
@prefixed_factories.compute_if_absent(prefix_key) { PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) }
63+
.with_lock { |factory| yield factory }
64+
end
65+
66+
def get_file(prefix_key)
67+
get_factory(prefix_key) { |factory| yield factory.current }
68+
end
69+
70+
def shutdown
71+
stop_stale_sweeper
72+
end
73+
74+
def size
75+
@prefixed_factories.size
76+
end
77+
78+
def start_stale_sweeper
79+
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
80+
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
81+
82+
@prefixed_factories.each_pair do |k, v|
83+
@prefixed_factories.delete_pair(k, v) if v.stale?
84+
end
85+
end
86+
87+
@stale_sweeper.execute
88+
end
89+
90+
def stop_stale_sweeper
91+
@stale_sweeper.shutdown
92+
end
93+
end
94+
end
95+
end
96+
end
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# encoding: utf-8
2+
module LogStash
3+
module Outputs
4+
class S3
5+
class PathValidator
6+
INVALID_CHARACTERS = "\^`><"
7+
8+
def self.valid?(name)
9+
name.match(matches_re).nil?
10+
end
11+
12+
def self.matches_re
13+
/[#{Regexp.escape(INVALID_CHARACTERS)}]/
14+
end
15+
end
16+
end
17+
end
18+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# encoding: utf-8
2+
require "logstash/outputs/s3/size_rotation_policy"
3+
require "logstash/outputs/s3/time_rotation_policy"
4+
5+
module LogStash
6+
module Outputs
7+
class S3
8+
class SizeAndTimeRotationPolicy
9+
def initialize(file_size, time_file)
10+
@size_strategy = SizeRotationPolicy.new(file_size)
11+
@time_strategy = TimeRotationPolicy.new(time_file)
12+
end
13+
14+
def rotate?(file)
15+
@size_strategy.rotate?(file) || @time_strategy.rotate?(file)
16+
end
17+
18+
def need_periodic?
19+
true
20+
end
21+
end
22+
end
23+
end
24+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# encoding: utf-8
2+
module LogStash
3+
module Outputs
4+
class S3
5+
class SizeRotationPolicy
6+
attr_reader :size_file
7+
8+
def initialize(size_file)
9+
if size_file <= 0
10+
raise LogStash::ConfigurationError, "`size_file` need to be greather than 0"
11+
end
12+
13+
@size_file = size_file
14+
end
15+
16+
def rotate?(file)
17+
file.size >= size_file
18+
end
19+
20+
def need_periodic?
21+
false
22+
end
23+
end
24+
end
25+
end
26+
end
+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# encoding: utf-8
2+
require "thread"
3+
require "forwardable"
4+
require "fileutils"
5+
6+
module LogStash
7+
module Outputs
8+
class S3
9+
# Wrap the actual file descriptor into an utility classe
10+
# It make it more OOP and easier to reason with the paths.
11+
class TemporaryFile
12+
extend Forwardable
13+
DELEGATES_METHODS = [:path, :write, :close, :size, :fsync]
14+
15+
def_delegators :@fd, *DELEGATES_METHODS
16+
17+
def initialize(key, fd)
18+
@fd = fd
19+
@key = key
20+
@created_at = Time.now
21+
end
22+
23+
def ctime
24+
@created_at
25+
end
26+
27+
def key
28+
@key.gsub(/^\//, "")
29+
end
30+
31+
# Each temporary file is made inside a directory named with an UUID,
32+
# instead of deleting the file directly and having the risk of deleting other files
33+
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
34+
# a sandbox.
35+
def delete!
36+
::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, ""))
37+
end
38+
39+
def empty?
40+
size == 0
41+
end
42+
end
43+
end
44+
end
45+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# encoding: utf-8
2+
require "socket"
3+
require "securerandom"
4+
require "fileutils"
5+
6+
module LogStash
7+
module Outputs
8+
class S3
9+
# Since the file can contains dynamic part, we have to handle a more local structure to
10+
# allow a nice recovery from a crash.
11+
#
12+
# The local structure will look like this.
13+
#
14+
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
15+
#
16+
# Since the UUID should be fairly unique I can destroy the whole path when an upload is complete.
17+
# I do not have to mess around to check if the other directory have file in it before destroying them.
18+
class TemporaryFileFactory
19+
FILE_MODE = "a"
20+
GZIP_ENCODING = "gzip"
21+
GZIP_EXTENSION = "txt.gz"
22+
TXT_EXTENSION = "txt"
23+
STRFTIME = "%Y-%m-%dT%H.%M"
24+
25+
attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current
26+
27+
def initialize(prefix, tags, encoding, temporary_directory)
28+
@counter = 0
29+
@prefix = prefix
30+
31+
@tags = tags
32+
@encoding = encoding
33+
@temporary_directory = temporary_directory
34+
35+
rotate!
36+
end
37+
38+
def rotate!
39+
@current = new_file
40+
increment_counter
41+
@current
42+
end
43+
44+
private
45+
def extension
46+
gzip? ? GZIP_EXTENSION : TXT_EXTENSION
47+
end
48+
49+
def gzip?
50+
encoding == GZIP_ENCODING
51+
end
52+
53+
def increment_counter
54+
@counter += 1
55+
end
56+
57+
def current_time
58+
Time.now.strftime(STRFTIME)
59+
end
60+
61+
def generate_name
62+
filename = "ls.s3.#{Socket.gethostname}.#{current_time}"
63+
64+
if tags.size > 0
65+
"#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}"
66+
else
67+
"#{filename}.part#{counter}.#{extension}"
68+
end
69+
end
70+
71+
def new_file
72+
uuid = SecureRandom.uuid
73+
name = generate_name
74+
path = ::File.join(temporary_directory, uuid, prefix)
75+
key = ::File.join(prefix, name)
76+
77+
FileUtils.mkdir_p(path)
78+
79+
io = if gzip?
80+
Zlib::GzipWriter.open(::File.join(path, name))
81+
else
82+
::File.open(::File.join(path, name), FILE_MODE)
83+
end
84+
85+
TemporaryFile.new(key, io)
86+
end
87+
end
88+
end
89+
end
90+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# encoding: utf-8
2+
module LogStash
3+
module Outputs
4+
class S3
5+
class TimeRotationPolicy
6+
attr_reader :time_file
7+
8+
def initialize(time_file)
9+
if time_file <= 0
10+
raise LogStash::ConfigurationError, "`time_file` need to be greather than 0"
11+
end
12+
13+
@time_file = time_file
14+
end
15+
16+
def rotate?(file)
17+
file.size > 0 && Time.now - file.ctime >= time_file
18+
end
19+
20+
def need_periodic?
21+
true
22+
end
23+
end
24+
end
25+
end
26+
end

lib/logstash/outputs/s3/uploader.rb

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# encoding: utf-8
2+
require "logstash/util"
3+
require "aws-sdk-resources"
4+
5+
module LogStash
6+
module Outputs
7+
class S3
8+
class Uploader
9+
TIME_BEFORE_RETRYING_SECONDS = 1
10+
DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new({
11+
:min_threads => 1,
12+
:max_threads => 8,
13+
:max_queue => 1,
14+
:fallback_policy => :caller_runs
15+
})
16+
17+
18+
attr_reader :bucket, :upload_options, :logger
19+
20+
def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL)
21+
@bucket = bucket
22+
@workers_pool = threadpool
23+
@logger = logger
24+
end
25+
26+
def upload_async(file, options = {})
27+
@workers_pool.post do
28+
LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}")
29+
upload(file, options)
30+
end
31+
end
32+
33+
def upload(file, options = {})
34+
upload_options = options.fetch(:upload_options, {})
35+
36+
begin
37+
obj = bucket.object(file.key)
38+
obj.upload_file(file.path, upload_options)
39+
rescue => e
40+
# When we get here it usually mean that S3 tried to do some retry by himself (default is 3)
41+
# When the retry limit is reached or another error happen we will wait and retry.
42+
#
43+
# Thread might be stuck here, but I think its better than losing anything
44+
# its either a transient errors or something bad really happened.
45+
sleep(TIME_BEFORE_RETRYING_SECONDS)
46+
logger.error("Uploading failed, retrying", :exception => e, :path => file.path)
47+
retry
48+
end
49+
50+
options[:on_complete].call(file) unless options[:on_complete].nil?
51+
end
52+
53+
def stop
54+
@workers_pool.shutdown
55+
@workers_pool.wait_for_termination(nil) # block until its done
56+
end
57+
end
58+
end
59+
end
60+
end

0 commit comments

Comments
 (0)