@@ -51,7 +51,9 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
51
51
52
52
def initialize ( params = { } )
53
53
super ( params )
54
- @threadsafe = false
54
+ @threadsafe = true
55
+ @decode_mutex_netflow = Mutex . new
56
+ @decode_mutex_ipfix = Mutex . new
55
57
end
56
58
57
59
def register
@@ -212,17 +214,20 @@ def decode_netflow9(flowset, record, metadata = nil)
212
214
else
213
215
key = "#{ flowset . source_id } |#{ template . template_id } "
214
216
end
215
- @netflow_templates [ key , @cache_ttl ] = BinData ::Struct . new ( :endian => :big , :fields => fields )
216
- @logger . debug ( "Received template #{ template . template_id } with fields #{ fields . inspect } " )
217
- @logger . debug ( "Received template #{ template . template_id } of size #{ template_length } bytes. Representing in #{ @netflow_templates [ key ] . num_bytes } BinData bytes" )
218
- if template_length != @netflow_templates [ key ] . num_bytes
219
- @logger . warn ( "Received template #{ template . template_id } of size #{ template_length } bytes doesn't match BinData representation we built (#{ @netflow_templates [ key ] . num_bytes } bytes)" )
220
- end
221
- # Purge any expired templates
222
- @netflow_templates . cleanup!
223
- if @cache_save_path
224
- @netflow_templates_cache [ key ] = fields
225
- save_templates_cache ( @netflow_templates_cache , "#{ @cache_save_path } /netflow_templates.cache" )
217
+ # Prevent netflow_templates array from being concurrently modified
218
+ @decode_mutex_netflow . synchronize do
219
+ @netflow_templates [ key , @cache_ttl ] = BinData ::Struct . new ( :endian => :big , :fields => fields )
220
+ @logger . debug ( "Received template #{ template . template_id } with fields #{ fields . inspect } " )
221
+ @logger . debug ( "Received template #{ template . template_id } of size #{ template_length } bytes. Representing in #{ @netflow_templates [ key ] . num_bytes } BinData bytes" )
222
+ if template_length != @netflow_templates [ key ] . num_bytes
223
+ @logger . warn ( "Received template #{ template . template_id } of size #{ template_length } bytes doesn't match BinData representation we built (#{ @netflow_templates [ key ] . num_bytes } bytes)" )
224
+ end
225
+ # Purge any expired templates
226
+ @netflow_templates . cleanup!
227
+ if @cache_save_path
228
+ @netflow_templates_cache [ key ] = fields
229
+ save_templates_cache ( @netflow_templates_cache , "#{ @cache_save_path } /netflow_templates.cache" )
230
+ end
226
231
end
227
232
end
228
233
end
@@ -316,12 +321,15 @@ def decode_ipfix(flowset, record)
316
321
end
317
322
# FIXME Source IP address required in key
318
323
key = "#{ flowset . observation_domain_id } |#{ template . template_id } "
319
- @ipfix_templates [ key , @cache_ttl ] = BinData ::Struct . new ( :endian => :big , :fields => fields )
320
- # Purge any expired templates
321
- @ipfix_templates . cleanup!
322
- if @cache_save_path
323
- @ipfix_templates_cache [ key ] = fields
324
- save_templates_cache ( @ipfix_templates_cache , "#{ @cache_save_path } /ipfix_templates.cache" )
324
+ # Prevent ipfix_templates array from being concurrently modified
325
+ @decode_mutex_ipfix . synchronize do
326
+ @ipfix_templates [ key , @cache_ttl ] = BinData ::Struct . new ( :endian => :big , :fields => fields )
327
+ # Purge any expired templates
328
+ @ipfix_templates . cleanup!
329
+ if @cache_save_path
330
+ @ipfix_templates_cache [ key ] = fields
331
+ save_templates_cache ( @ipfix_templates_cache , "#{ @cache_save_path } /ipfix_templates.cache" )
332
+ end
325
333
end
326
334
end
327
335
end
0 commit comments