Skip to content

Commit 109f399

Browse files
authored
Add Schema.JSON, Schema.STRING and Schema.BYTES support to merge-key-value (#94)
* Add Schema.JSON support to merge-key-value * Add support for STRING and BYTES to merge-key-value
1 parent 47a8338 commit 109f399

File tree

3 files changed

+144
-48
lines changed

3 files changed

+144
-48
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ Output: `{name: value1} (AVRO)`
128128

129129
### Merge KeyValue
130130

131-
Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. (Currently only AVRO is supported).
131+
Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. (Currently only AVRO and JSON are supported).
132132

133133
Step name: `merge-key-value`
134134

pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/MergeKeyValueStep.java

Lines changed: 66 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package com.datastax.oss.pulsar.functions.transforms;
1717

18+
import com.fasterxml.jackson.core.type.TypeReference;
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.fasterxml.jackson.databind.node.ObjectNode;
1821
import java.util.List;
1922
import java.util.Map;
2023
import java.util.concurrent.ConcurrentHashMap;
@@ -25,7 +28,7 @@
2528
import org.apache.pulsar.common.schema.SchemaType;
2629

2730
public class MergeKeyValueStep implements TransformStep {
28-
31+
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2932
private final Map<org.apache.avro.Schema, Map<org.apache.avro.Schema, org.apache.avro.Schema>>
3033
schemaCache = new ConcurrentHashMap<>();
3134

@@ -35,61 +38,77 @@ public void process(TransformContext transformContext) {
3538
if (keySchema == null) {
3639
return;
3740
}
38-
if (keySchema.getSchemaInfo().getType() == SchemaType.AVRO
41+
Object keyObject = transformContext.getKeyObject();
42+
Object valueObject = transformContext.getValueObject();
43+
if (keyObject instanceof Map && valueObject instanceof Map) {
44+
Map<Object, Object> value = (Map<Object, Object>) valueObject;
45+
Map<String, Object> keyCopy =
46+
OBJECT_MAPPER.convertValue(keyObject, new TypeReference<>() {});
47+
keyCopy.forEach(value::putIfAbsent);
48+
} else if (keySchema.getSchemaInfo().getType() == SchemaType.AVRO
3949
&& transformContext.getValueSchema().getSchemaInfo().getType() == SchemaType.AVRO) {
40-
GenericRecord avroKeyRecord = (GenericRecord) transformContext.getKeyObject();
50+
GenericRecord avroKeyRecord = (GenericRecord) keyObject;
4151
org.apache.avro.Schema avroKeySchema = avroKeyRecord.getSchema();
4252

43-
GenericRecord avroValueRecord = (GenericRecord) transformContext.getValueObject();
53+
GenericRecord avroValueRecord = (GenericRecord) valueObject;
4454
org.apache.avro.Schema avroValueSchema = avroValueRecord.getSchema();
4555

46-
List<String> valueSchemaFieldNames =
47-
avroValueSchema
48-
.getFields()
49-
.stream()
50-
.map(org.apache.avro.Schema.Field::name)
51-
.collect(Collectors.toList());
52-
List<org.apache.avro.Schema.Field> fields =
53-
avroKeySchema
54-
.getFields()
55-
.stream()
56-
.filter(field -> !valueSchemaFieldNames.contains(field.name()))
57-
.map(
58-
f ->
59-
new org.apache.avro.Schema.Field(
60-
f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
61-
.collect(Collectors.toList());
62-
fields.addAll(
63-
avroValueSchema
64-
.getFields()
65-
.stream()
66-
.map(
67-
f ->
68-
new org.apache.avro.Schema.Field(
69-
f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
70-
.collect(Collectors.toList()));
71-
72-
Map<org.apache.avro.Schema, org.apache.avro.Schema> schemaCacheKey =
73-
schemaCache.computeIfAbsent(avroKeySchema, s -> new ConcurrentHashMap<>());
74-
org.apache.avro.Schema modified =
75-
schemaCacheKey.computeIfAbsent(
76-
avroValueSchema,
77-
schema ->
78-
org.apache.avro.Schema.createRecord(
79-
avroValueSchema.getName(),
80-
null,
81-
avroValueSchema.getNamespace(),
82-
false,
83-
fields));
84-
GenericRecord newRecord = new GenericData.Record(modified);
85-
for (String fieldName : valueSchemaFieldNames) {
86-
newRecord.put(fieldName, avroValueRecord.get(fieldName));
87-
}
56+
org.apache.avro.Schema mergedSchema = getMergedSchema(avroKeySchema, avroValueSchema);
57+
GenericRecord newRecord = new GenericData.Record(mergedSchema);
58+
avroValueSchema
59+
.getFields()
60+
.forEach(field -> newRecord.put(field.name(), avroValueRecord.get(field.name())));
8861
for (org.apache.avro.Schema.Field field : avroKeySchema.getFields()) {
89-
newRecord.put(field.name(), avroKeyRecord.get(field.name()));
62+
if (avroValueSchema.getField(field.name()) == null) {
63+
newRecord.put(field.name(), avroKeyRecord.get(field.name()));
64+
}
9065
}
9166
transformContext.setValueObject(newRecord);
9267
transformContext.setValueModified(true);
68+
} else if (keySchema.getSchemaInfo().getType() == SchemaType.JSON
69+
&& transformContext.getValueSchema().getSchemaInfo().getType() == SchemaType.JSON) {
70+
org.apache.avro.Schema avroKeySchema =
71+
(org.apache.avro.Schema) keySchema.getNativeSchema().orElseThrow();
72+
org.apache.avro.Schema avroValueSchema =
73+
(org.apache.avro.Schema)
74+
transformContext.getValueSchema().getNativeSchema().orElseThrow();
75+
org.apache.avro.Schema mergedSchema = getMergedSchema(avroKeySchema, avroValueSchema);
76+
transformContext.setValueSchema(new JsonNodeSchema(mergedSchema));
77+
ObjectNode newValue = ((ObjectNode) keyObject).deepCopy();
78+
newValue.setAll(((ObjectNode) valueObject).deepCopy());
79+
transformContext.setValueObject(newValue);
80+
transformContext.setValueModified(true);
9381
}
9482
}
83+
84+
private org.apache.avro.Schema getMergedSchema(
85+
org.apache.avro.Schema avroKeySchema, org.apache.avro.Schema avroValueSchema) {
86+
List<org.apache.avro.Schema.Field> fields =
87+
avroKeySchema
88+
.getFields()
89+
.stream()
90+
.filter(field -> avroValueSchema.getField(field.name()) == null)
91+
.map(
92+
f ->
93+
new org.apache.avro.Schema.Field(
94+
f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
95+
.collect(Collectors.toList());
96+
fields.addAll(
97+
avroValueSchema
98+
.getFields()
99+
.stream()
100+
.map(
101+
f ->
102+
new org.apache.avro.Schema.Field(
103+
f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
104+
.collect(Collectors.toList()));
105+
106+
Map<org.apache.avro.Schema, org.apache.avro.Schema> schemaCacheKey =
107+
schemaCache.computeIfAbsent(avroKeySchema, s -> new ConcurrentHashMap<>());
108+
return schemaCacheKey.computeIfAbsent(
109+
avroValueSchema,
110+
schema ->
111+
org.apache.avro.Schema.createRecord(
112+
avroValueSchema.getName(), null, avroValueSchema.getNamespace(), false, fields));
113+
}
95114
}

pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/MergeKeyValueStepTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.testng.Assert.assertNotSame;
2020
import static org.testng.Assert.assertSame;
2121

22+
import com.fasterxml.jackson.databind.JsonNode;
23+
import java.nio.charset.StandardCharsets;
2224
import org.apache.avro.generic.GenericData;
2325
import org.apache.pulsar.client.api.Schema;
2426
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -52,6 +54,81 @@ void testKeyValueAvro() throws Exception {
5254
assertSame(messageValue.getKey(), recordValue.getKey());
5355
}
5456

57+
@Test
58+
void testKeyValueJson() throws Exception {
59+
Record<GenericObject> record = Utils.createTestJsonKeyValueRecord();
60+
Record<?> outputRecord = Utils.process(record, new MergeKeyValueStep());
61+
KeyValueSchema<?, ?> messageSchema = (KeyValueSchema<?, ?>) outputRecord.getSchema();
62+
KeyValue<?, ?> messageValue = (KeyValue<?, ?>) outputRecord.getValue();
63+
64+
JsonNode read = (JsonNode) messageValue.getValue();
65+
assertEquals(read.get("keyField1").asText(), "key1");
66+
assertEquals(read.get("keyField2").asText(), "key2");
67+
assertEquals(read.get("keyField3").asText(), "key3");
68+
assertEquals(read.get("valueField1").asText(), "value1");
69+
assertEquals(read.get("valueField2").asText(), "value2");
70+
assertEquals(read.get("valueField3").asText(), "value3");
71+
72+
KeyValueSchema<?, ?> recordSchema = (KeyValueSchema) record.getSchema();
73+
KeyValue<?, ?> recordValue = (KeyValue<?, ?>) record.getValue().getNativeObject();
74+
assertSame(messageSchema.getKeySchema(), recordSchema.getKeySchema());
75+
assertSame(messageValue.getKey(), recordValue.getKey());
76+
}
77+
78+
@Test
79+
void testKeyValueStringJson() throws Exception {
80+
Schema<KeyValue<String, String>> keyValueSchema =
81+
Schema.KeyValue(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED);
82+
83+
String key = "{\"keyField1\": \"key1\", \"keyField2\": \"key2\", \"keyField3\": \"key3\"}";
84+
String value =
85+
"{\"valueField1\": \"value1\", \"valueField2\": \"value2\", \"valueField3\": \"value3\"}";
86+
87+
KeyValue<String, String> keyValue = new KeyValue<>(key, value);
88+
89+
Record<GenericObject> record =
90+
new Utils.TestRecord<>(
91+
keyValueSchema,
92+
AutoConsumeSchema.wrapPrimitiveObject(keyValue, SchemaType.KEY_VALUE, new byte[] {}),
93+
null);
94+
95+
Record<?> outputRecord = Utils.process(record, new MergeKeyValueStep());
96+
KeyValue<?, ?> messageValue = (KeyValue<?, ?>) outputRecord.getValue();
97+
98+
assertEquals(
99+
messageValue.getValue(),
100+
"{\"valueField1\":\"value1\",\"valueField2\":\"value2\","
101+
+ "\"valueField3\":\"value3\",\"keyField1\":\"key1\",\"keyField2\":\"key2\",\"keyField3\":\"key3\"}");
102+
}
103+
104+
@Test
105+
void testKeyValueBytesJson() throws Exception {
106+
Schema<KeyValue<byte[], byte[]>> keyValueSchema =
107+
Schema.KeyValue(Schema.BYTES, Schema.BYTES, KeyValueEncodingType.SEPARATED);
108+
109+
String key = "{\"keyField1\": \"key1\", \"keyField2\": \"key2\", \"keyField3\": \"key3\"}";
110+
String value =
111+
"{\"valueField1\": \"value1\", \"valueField2\": \"value2\", \"valueField3\": \"value3\"}";
112+
113+
KeyValue<byte[], byte[]> keyValue =
114+
new KeyValue<>(
115+
key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
116+
117+
Record<GenericObject> record =
118+
new Utils.TestRecord<>(
119+
keyValueSchema,
120+
AutoConsumeSchema.wrapPrimitiveObject(keyValue, SchemaType.KEY_VALUE, new byte[] {}),
121+
null);
122+
123+
Record<?> outputRecord = Utils.process(record, new MergeKeyValueStep());
124+
KeyValue<?, ?> messageValue = (KeyValue<?, ?>) outputRecord.getValue();
125+
126+
assertEquals(
127+
new String((byte[]) messageValue.getValue(), StandardCharsets.UTF_8),
128+
"{\"valueField1\":\"value1\",\"valueField2\":\"value2\",\"valueField3\":\"value3\","
129+
+ "\"keyField1\":\"key1\",\"keyField2\":\"key2\",\"keyField3\":\"key3\"}");
130+
}
131+
55132
@Test
56133
void testPrimitive() throws Exception {
57134
Record<GenericObject> record =

0 commit comments

Comments
 (0)