Skip to content

Commit 0b9e4cc

Browse files
authored
fix(reduce transform): improve array handling (#3076)
Signed-off-by: Luke Steensen <[email protected]>
1 parent efcf4b9 commit 0b9e4cc

File tree

2 files changed

+132
-14
lines changed

2 files changed

+132
-14
lines changed

src/transforms/reduce/merge_strategy.rs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,45 @@ impl ReduceValueMerger for ConcatMerger {
7474
//------------------------------------------------------------------------------
7575

7676
#[derive(Debug, Clone)]
77-
struct ArrayMerger {
77+
struct ConcatArrayMerger {
7878
v: Vec<Value>,
7979
}
8080

81-
impl ArrayMerger {
81+
impl ConcatArrayMerger {
8282
fn new(v: Vec<Value>) -> Self {
8383
Self { v }
8484
}
8585
}
8686

87+
impl ReduceValueMerger for ConcatArrayMerger {
88+
fn add(&mut self, v: Value) -> Result<(), String> {
89+
if let Value::Array(a) = v {
90+
self.v.extend_from_slice(&a);
91+
} else {
92+
self.v.push(v);
93+
}
94+
Ok(())
95+
}
96+
97+
fn insert_into(self: Box<Self>, k: String, v: &mut LogEvent) -> Result<(), String> {
98+
v.insert(k, Value::Array(self.v));
99+
Ok(())
100+
}
101+
}
102+
103+
//------------------------------------------------------------------------------
104+
105+
#[derive(Debug, Clone)]
106+
struct ArrayMerger {
107+
v: Vec<Value>,
108+
}
109+
110+
impl ArrayMerger {
111+
fn new(v: Value) -> Self {
112+
Self { v: vec![v] }
113+
}
114+
}
115+
87116
impl ReduceValueMerger for ArrayMerger {
88117
fn add(&mut self, v: Value) -> Result<(), String> {
89118
self.v.push(v);
@@ -370,15 +399,13 @@ pub fn get_value_merger(v: Value, m: &MergeStrategy) -> Result<Box<dyn ReduceVal
370399
},
371400
MergeStrategy::Concat => match v {
372401
Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b))),
402+
Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
373403
_ => Err(format!(
374-
"expected string value, found: '{}'",
404+
"expected string or array value, found: '{}'",
375405
v.to_string_lossy()
376406
)),
377407
},
378-
MergeStrategy::Array => match v {
379-
Value::Array(a) => Ok(Box::new(ArrayMerger::new(a))),
380-
_ => Ok(Box::new(ArrayMerger::new(vec![v]))),
381-
},
408+
MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
382409
MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
383410
}
384411
}
@@ -432,7 +459,7 @@ mod test {
432459
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
433460
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
434461
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
435-
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_err());
462+
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());
436463

437464
assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
438465
assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
@@ -512,6 +539,15 @@ mod test {
512539
merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
513540
Ok(4.2.into())
514541
);
542+
543+
assert_eq!(
544+
merge(json!([4]).into(), json!([2]).into(), &MergeStrategy::Concat),
545+
Ok(json!([4, 2]).into())
546+
);
547+
assert_eq!(
548+
merge(json!([]).into(), 42.into(), &MergeStrategy::Concat),
549+
Ok(json!([42]).into())
550+
);
515551
}
516552

517553
fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {

src/transforms/reduce/mod.rs

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,33 +77,32 @@ impl ReduceState {
7777
fn new(e: LogEvent, strategies: &IndexMap<String, MergeStrategy>) -> Self {
7878
Self {
7979
stale_since: Instant::now(),
80-
// TODO: all_fields alternative that consumes
8180
fields: e
82-
.all_fields()
81+
.into_iter()
8382
.filter_map(|(k, v)| {
8483
if let Some(strat) = strategies.get(&k) {
85-
match get_value_merger(v.clone(), strat) {
84+
match get_value_merger(v, strat) {
8685
Ok(m) => Some((k, m)),
8786
Err(err) => {
8887
warn!("failed to create merger for field '{}': {}", k, err);
8988
None
9089
}
9190
}
9291
} else {
93-
Some((k, v.clone().into()))
92+
Some((k, v.into()))
9493
}
9594
})
9695
.collect(),
9796
}
9897
}
9998

10099
fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<String, MergeStrategy>) {
101-
for (k, v) in e.all_fields() {
100+
for (k, v) in e.into_iter() {
102101
let strategy = strategies.get(&k);
103102
match self.fields.entry(k) {
104103
hash_map::Entry::Vacant(entry) => {
105104
if let Some(strat) = strategy {
106-
match get_value_merger(v.clone(), strat) {
105+
match get_value_merger(v, strat) {
107106
Ok(m) => {
108107
entry.insert(m);
109108
}
@@ -291,6 +290,7 @@ mod test {
291290
topology::config::{TransformConfig, TransformContext},
292291
Event,
293292
};
293+
use serde_json::json;
294294

295295
#[test]
296296
fn reduce_from_condition() {
@@ -493,4 +493,86 @@ identifier_fields = [ "request_id" ]
493493
Value::from(7)
494494
);
495495
}
496+
497+
#[test]
498+
fn arrays() {
499+
let mut reduce = toml::from_str::<ReduceConfig>(
500+
r#"
501+
identifier_fields = [ "request_id" ]
502+
503+
merge_strategies.foo = "array"
504+
merge_strategies.bar = "concat"
505+
506+
[ends_when]
507+
"test_end.exists" = true
508+
"#,
509+
)
510+
.unwrap()
511+
.build(TransformContext::new_test())
512+
.unwrap();
513+
514+
let mut outputs = Vec::new();
515+
516+
let mut e = Event::from("test message 1");
517+
e.as_mut_log().insert("foo", json!([1, 3]));
518+
e.as_mut_log().insert("bar", json!([1, 3]));
519+
e.as_mut_log().insert("request_id", "1");
520+
reduce.transform_into(&mut outputs, e);
521+
522+
let mut e = Event::from("test message 2");
523+
e.as_mut_log().insert("foo", json!([2, 4]));
524+
e.as_mut_log().insert("bar", json!([2, 4]));
525+
e.as_mut_log().insert("request_id", "2");
526+
reduce.transform_into(&mut outputs, e);
527+
528+
let mut e = Event::from("test message 3");
529+
e.as_mut_log().insert("foo", json!([5, 7]));
530+
e.as_mut_log().insert("bar", json!([5, 7]));
531+
e.as_mut_log().insert("request_id", "1");
532+
reduce.transform_into(&mut outputs, e);
533+
534+
let mut e = Event::from("test message 4");
535+
e.as_mut_log().insert("foo", json!("done"));
536+
e.as_mut_log().insert("bar", json!("done"));
537+
e.as_mut_log().insert("request_id", "1");
538+
e.as_mut_log().insert("test_end", "yep");
539+
reduce.transform_into(&mut outputs, e);
540+
541+
assert_eq!(outputs.len(), 1);
542+
assert_eq!(
543+
outputs.first().unwrap().as_log()[&"foo".into()],
544+
json!([[1, 3], [5, 7], "done"]).into()
545+
);
546+
547+
assert_eq!(outputs.len(), 1);
548+
assert_eq!(
549+
outputs.first().unwrap().as_log()[&"bar".into()],
550+
json!([1, 3, 5, 7, "done"]).into()
551+
);
552+
553+
outputs.clear();
554+
555+
let mut e = Event::from("test message 5");
556+
e.as_mut_log().insert("foo", json!([6, 8]));
557+
e.as_mut_log().insert("bar", json!([6, 8]));
558+
e.as_mut_log().insert("request_id", "2");
559+
reduce.transform_into(&mut outputs, e);
560+
561+
let mut e = Event::from("test message 6");
562+
e.as_mut_log().insert("foo", json!("done"));
563+
e.as_mut_log().insert("bar", json!("done"));
564+
e.as_mut_log().insert("request_id", "2");
565+
e.as_mut_log().insert("test_end", "yep");
566+
reduce.transform_into(&mut outputs, e);
567+
568+
assert_eq!(outputs.len(), 1);
569+
assert_eq!(
570+
outputs.first().unwrap().as_log()[&"foo".into()],
571+
json!([[2, 4], [6, 8], "done"]).into()
572+
);
573+
assert_eq!(
574+
outputs.first().unwrap().as_log()[&"bar".into()],
575+
json!([2, 4, 6, 8, "done"]).into()
576+
);
577+
}
496578
}

0 commit comments

Comments
 (0)