Skip to content
This repository was archived by the owner on Apr 11, 2024. It is now read-only.

Commit 79e6639

Browse files
committed
Merge branch 'dev-fix-restart-on-parse-error'
2 parents b346933 + 19c8a0d commit 79e6639

File tree

5 files changed

+45
-24
lines changed

5 files changed

+45
-24
lines changed

lib/logstash/outputs/cassandra/event_parser.rb

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@ def initialize(options)
1717

1818
def parse(event)
1919
action = {}
20-
action['table'] = event.sprintf(@table)
21-
filter_transform = get_filter_transform(event)
22-
if filter_transform
23-
action['data'] = {}
24-
filter_transform.each { |filter|
25-
add_event_value_from_filter_to_action(event, filter, action)
26-
}
27-
else
28-
add_event_data_using_configured_hints(event, action)
20+
begin
21+
action['table'] = event.sprintf(@table)
22+
filter_transform = get_filter_transform(event)
23+
if filter_transform
24+
action['data'] = {}
25+
filter_transform.each { |filter|
26+
add_event_value_from_filter_to_action(event, filter, action)
27+
}
28+
else
29+
add_event_data_using_configured_hints(event, action)
30+
end
31+
@logger.debug('event parsed to action', :action => action)
32+
rescue Exception => e
33+
@logger.error('failed parsing event', :event => event, :error => e)
34+
action = nil
2935
end
30-
31-
@logger.debug('event parsed to action', :action => action)
3236
action
3337
end
3438

lib/logstash/outputs/cassandra/safe_submitter.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ def prepare_queries(actions)
5757
remaining_queries = Queue.new
5858
actions.each do |action|
5959
begin
60-
query = get_query(action)
61-
remaining_queries << { :query => query, :arguments => action['data'].values }
60+
if action
61+
query = get_query(action)
62+
remaining_queries << { :query => query, :arguments => action['data'].values }
63+
end
6264
rescue Exception => e
6365
@logger.error('Failed to prepare query', :action => action, :exception => e, :backtrace => e.backtrace)
6466
end

logstash-output-cassandra.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-output-cassandra'
4-
s.version = '0.9.0'
4+
s.version = '0.9.1'
55
s.licenses = [ 'Apache License (2.0)' ]
66
s.summary = 'Store events into Cassandra'
77
s.description = 'This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program'

spec/unit/outputs/event_parser_spec.rb

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,10 @@
170170
options = default_opts.update({ 'filter_transform' => [{ 'event_key' => 'a_field', 'column_name' => 'a_column', 'cassandra_type' => 'what?!' }] })
171171
sut_instance = sut.new(options)
172172
sample_event['a_field'] = 'a_value'
173-
expect(options['logger']).to(receive(:error))
173+
expect(options['logger']).to(receive(:error)).at_least(:once)
174174

175-
expect { sut_instance.parse(sample_event) }.to raise_error(/Cannot convert/)
175+
result = sut_instance.parse(sample_event)
176+
expect(result).to be_nil
176177
end
177178
end
178179
end
@@ -238,21 +239,23 @@
238239
it 'fails for unknown hint types' do
239240
options = default_opts.update({ 'hints' => { 'a_field' => 'not_a_real_type' } })
240241
sut_instance = sut.new(options)
241-
expect(options['logger']).to(receive(:error))
242-
242+
expect(options['logger']).to(receive(:error)).at_least(:once)
243243
sample_event['a_field'] = 'a value'
244244

245-
expect { sut_instance.parse(sample_event) }.to raise_error(/Cannot convert/)
245+
result = sut_instance.parse(sample_event)
246+
247+
expect(result).to be_nil
246248
end
247249

248250
it 'fails for unsuccessful hint conversion' do
249251
options = default_opts.update({ 'hints' => { 'a_field' => 'int' } })
250-
expect(options['logger']).to(receive(:error))
251-
252+
expect(options['logger']).to(receive(:error)).at_least(:once)
252253
sut_instance = sut.new(options)
253-
254254
sample_event['a_field'] = 'i am not an int!!!'
255-
expect { sut_instance.parse(sample_event) }.to raise_error(/Cannot convert/)
255+
256+
result = sut_instance.parse(sample_event)
257+
258+
expect(result).to be_nil
256259
end
257260
end
258261

@@ -297,8 +300,11 @@
297300
options = default_opts.update({ 'ignore_bad_values' => true, 'hints' => { 'a_field' => 'map<float>' } })
298301
sut_instance = sut.new(options)
299302
sample_event['a_field'] = 'i am not a set'
303+
expect(options['logger']).to(receive(:error))
304+
305+
result = sut_instance.parse(sample_event)
300306

301-
expect { sut_instance.parse(sample_event) }.to raise_error ArgumentError
307+
expect(result).to be_nil
302308
end
303309
end
304310
end

spec/unit/outputs/safe_submitter_spec.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,15 @@ def generate_future_double
109109
return future_double
110110
end
111111

112+
it 'does nothing in case it got a nil action' do
113+
doubles = setup_session_double(default_options)
114+
expect(doubles[:session_double]).to_not(receive(:prepare))
115+
expect(doubles[:session_double]).to_not(receive(:execute_async))
116+
sut_instance = sut.new(default_options)
117+
118+
expect { sut_instance.submit([nil]) }.to_not raise_error
119+
end
120+
112121
it 'prepares and executes the query' do
113122
doubles = setup_session_double(default_options)
114123
expect(doubles[:session_double]).to(receive(:prepare).with(expected_query_for_one_action)).and_return('eureka')

0 commit comments

Comments
 (0)