1
1
/*
2
- * Copyright 2018-2022 the original author or authors.
2
+ * Copyright 2018-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
17
17
package org .springframework .kafka .support ;
18
18
19
19
import java .nio .ByteBuffer ;
20
+ import java .util .Collection ;
20
21
import java .util .HashSet ;
21
22
import java .util .Map ;
22
23
import java .util .Set ;
24
+ import java .util .stream .Collectors ;
23
25
26
+ import org .apache .kafka .common .header .Header ;
24
27
import org .apache .kafka .common .header .Headers ;
25
28
import org .apache .kafka .common .header .internals .RecordHeader ;
29
+ import org .assertj .core .util .Streams ;
26
30
27
31
import org .springframework .messaging .MessageHeaders ;
28
32
36
40
*
37
41
* @author Gary Russell
38
42
* @since 2.1.3
39
- *
40
43
*/
41
44
public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
42
45
@@ -69,6 +72,7 @@ public SimpleKafkaHeaderMapper() {
69
72
* generally should not map the {@code "id" and "timestamp"} headers. Note:
70
73
* most of the headers in {@link KafkaHeaders} are never mapped as headers since they
71
74
* represent data in consumer/producer records.
75
+ *
72
76
* @param patterns the patterns.
73
77
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
74
78
*/
@@ -82,6 +86,7 @@ private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {
82
86
83
87
/**
84
88
* Create an instance for inbound mapping only with pattern matching.
89
+ *
85
90
* @param patterns the patterns to match.
86
91
* @return the header mapper.
87
92
* @since 2.8.8
@@ -94,27 +99,40 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte
94
99
public void fromHeaders (MessageHeaders headers , Headers target ) {
95
100
headers .forEach ((key , value ) -> {
96
101
if (!NEVER .contains (key )) {
97
- Object valueToAdd = headerValueToAddOut (key , value );
98
- if (valueToAdd instanceof byte [] && matches (key , valueToAdd )) {
99
- target .add (new RecordHeader (key , (byte []) valueToAdd ));
102
+ if (value instanceof Collection <?> values ) {
103
+ values .forEach (v -> mapIfMatched (target , key , v ));
104
+ } else {
105
+ mapIfMatched (target , key , value );
100
106
}
101
107
}
102
108
});
103
109
}
104
110
111
+ private void mapIfMatched (Headers target , String key , Object value ) {
112
+ Object valueToAdd = headerValueToAddOut (key , value );
113
+ if (valueToAdd instanceof byte [] && matches (key , valueToAdd )) {
114
+ target .add (new RecordHeader (key , (byte []) valueToAdd ));
115
+ }
116
+ }
117
+
105
118
@ Override
106
119
public void toHeaders (Headers source , Map <String , Object > target ) {
107
- source .forEach (header -> {
108
- String headerName = header .key ();
109
- if (matchesForInbound (headerName )) {
110
- if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT )) {
111
- target .put (headerName , ByteBuffer .wrap (header .value ()).getInt ());
112
- }
113
- else {
114
- target .put (headerName , headerValueToAddIn (header ));
115
- }
116
- }
117
- });
120
+ Streams .stream (source )
121
+ .collect (Collectors .groupingBy (Header ::key ))
122
+ .forEach ((headerName , headers ) -> {
123
+ if (matchesForInbound (headerName )) {
124
+ if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT )) {
125
+ target .put (headerName , ByteBuffer .wrap (headers .get (headers .size () - 1 ).value ()).getInt ());
126
+ } else {
127
+ var values = headers .stream ().map (super ::headerValueToAddIn ).toList ();
128
+ if (values .size () == 1 ) {
129
+ target .put (headerName , values .get (0 ));
130
+ } else {
131
+ target .put (headerName , values );
132
+ }
133
+ }
134
+ }
135
+ });
118
136
}
119
137
120
138
}
0 commit comments