From 7b8d802789b8c43f9d4a7c3e7653a1c2d96d5bc4 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 22 Nov 2017 06:49:10 -0600 Subject: [PATCH] Fix concurrency issues when decoding netflowV9 and serializing data Fixes https://github.com/logstash-plugins/logstash-codec-netflow/issues/90 --- CHANGELOG.md | 4 ++++ CONTRIBUTORS | 1 + lib/logstash/codecs/netflow.rb | 17 +++++++++++------ logstash-codec-netflow.gemspec | 2 +- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0484861..12d7c21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.8.3 + + - Fixed a race condition that could cause some errors when running in a multithreaded input + ## 3.8.2 - Fixed exceptions due to NilClass in util.rb and netflow.rb diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 55f8090..0f33fe8 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -4,6 +4,7 @@ reports, or in general have helped logstash along its way. Contributors: * Aaron Mildenstein (untergeek) * Adam Kaminski (thimslugga) +* Andrew Cholakian (andrewvc) * Bjørn Ruberg (bruberg) * Colin Surprenant (colinsurprenant) * Daniel Nägele (analogbyte) diff --git a/lib/logstash/codecs/netflow.rb b/lib/logstash/codecs/netflow.rb index 51b7c0b..e709589 100644 --- a/lib/logstash/codecs/netflow.rb +++ b/lib/logstash/codecs/netflow.rb @@ -50,6 +50,7 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base FLOWSET_ID = "flowset_id" def initialize(params = {}) + @file_cache_mutex = Mutex.new super(params) @threadsafe = true @decode_mutex_netflow = Mutex.new @@ -240,9 +241,10 @@ def decode_netflow9(flowset, record, metadata = nil) else key = "#{flowset.source_id}|#{record.flowset_id}" end - if @netflow_templates[key] != nil - template = @netflow_templates[key] - else + + template = @decode_mutex_netflow.synchronize { @netflow_templates[key] } + + if !template @logger.warn("Can't (yet) decode flowset id #{record.flowset_id} from source id #{flowset.source_id}, because no template to decode it with has been received. This message will usually go away after 1 minute.") return events end @@ -419,9 +421,10 @@ def load_templates_cache(file_path) templates_cache = {} begin @logger.debug? and @logger.debug("Loading templates from template cache #{file_path}") - templates_cache = JSON.parse(File.read(file_path)) + file_data = @file_cache_mutex.synchronize { File.read(file_path)} + templates_cache = JSON.parse(file_data) rescue Exception => e - raise "#{self.class.name}: templates cache file corrupt (#{file_path})" + raise "#{self.class.name}: templates cache file could not be read @ (#{file_path}: #{e.class.name} #{e.message})" end templates_cache @@ -430,7 +433,9 @@ def load_templates_cache(file_path) def save_templates_cache(templates_cache, file_path) begin @logger.debug? and @logger.debug("Writing templates to template cache #{file_path}") - File.open(file_path, 'w') {|file| file.write templates_cache.to_json } + @file_cache_mutex.synchronize do + File.open(file_path, 'w') {|file| file.write templates_cache.to_json } + end rescue Exception => e raise "#{self.class.name}: saving templates cache file failed (#{file_path}) with error #{e}" end diff --git a/logstash-codec-netflow.gemspec b/logstash-codec-netflow.gemspec index 2fb8f81..0ff5c6b 100644 --- a/logstash-codec-netflow.gemspec +++ b/logstash-codec-netflow.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-netflow' - s.version = '3.8.2' + s.version = '3.8.3' s.licenses = ['Apache License (2.0)'] s.summary = "Reads Netflow v5 and Netflow v9 data" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"