Skip to content

Commit

Permalink
Fix concurrency issues when decoding netflowV9 and serializing data
Browse files Browse the repository at this point in the history
Fixes #90
  • Loading branch information
andrewvc authored and jorritfolmer committed Nov 23, 2017
1 parent 54a211d commit 7b8d802
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.8.3

- Fixed a race condition that could cause some errors when running in a multithreaded input

## 3.8.2

- Fixed exceptions due to NilClass in util.rb and netflow.rb
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ reports, or in general have helped logstash along its way.
Contributors:
* Aaron Mildenstein (untergeek)
* Adam Kaminski (thimslugga)
* Andrew Cholakian (andrewvc)
* Bjørn Ruberg (bruberg)
* Colin Surprenant (colinsurprenant)
* Daniel Nägele (analogbyte)
Expand Down
17 changes: 11 additions & 6 deletions lib/logstash/codecs/netflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
FLOWSET_ID = "flowset_id"

def initialize(params = {})
@file_cache_mutex = Mutex.new
super(params)
@threadsafe = true
@decode_mutex_netflow = Mutex.new
Expand Down Expand Up @@ -240,9 +241,10 @@ def decode_netflow9(flowset, record, metadata = nil)
else
key = "#{flowset.source_id}|#{record.flowset_id}"
end
if @netflow_templates[key] != nil
template = @netflow_templates[key]
else

template = @decode_mutex_netflow.synchronize { @netflow_templates[key] }

if !template
@logger.warn("Can't (yet) decode flowset id #{record.flowset_id} from source id #{flowset.source_id}, because no template to decode it with has been received. This message will usually go away after 1 minute.")
return events
end
Expand Down Expand Up @@ -419,9 +421,10 @@ def load_templates_cache(file_path)
templates_cache = {}
begin
@logger.debug? and @logger.debug("Loading templates from template cache #{file_path}")
templates_cache = JSON.parse(File.read(file_path))
file_data = @file_cache_mutex.synchronize { File.read(file_path)}
templates_cache = JSON.parse(file_data)
rescue Exception => e
raise "#{self.class.name}: templates cache file corrupt (#{file_path})"
raise "#{self.class.name}: templates cache file could not be read @ (#{file_path}: #{e.class.name} #{e.message})"
end

templates_cache
Expand All @@ -430,7 +433,9 @@ def load_templates_cache(file_path)
def save_templates_cache(templates_cache, file_path)
begin
@logger.debug? and @logger.debug("Writing templates to template cache #{file_path}")
File.open(file_path, 'w') {|file| file.write templates_cache.to_json }
@file_cache_mutex.synchronize do
File.open(file_path, 'w') {|file| file.write templates_cache.to_json }
end
rescue Exception => e
raise "#{self.class.name}: saving templates cache file failed (#{file_path}) with error #{e}"
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-codec-netflow.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-netflow'
s.version = '3.8.2'
s.version = '3.8.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads Netflow v5 and Netflow v9 data"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down

0 comments on commit 7b8d802

Please sign in to comment.