Skip to content

Commit b91eeac

Browse files
shikharewencp
authored andcommitted
KAFKA-4100: Ensure 'fields' and 'fieldsByName' are not null for Struct schemas
Author: Shikhar Bhushan <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes apache#1800 from shikhar/kafka-4100
1 parent 9aaeb33 commit b91eeac

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.math.BigDecimal;
2323
import java.nio.ByteBuffer;
2424
import java.util.Arrays;
25+
import java.util.Collections;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -106,12 +107,13 @@ public ConnectSchema(Type type, boolean optional, Object defaultValue, String na
106107
this.doc = doc;
107108
this.parameters = parameters;
108109

109-
this.fields = fields;
110-
if (this.fields != null && this.type == Type.STRUCT) {
111-
this.fieldsByName = new HashMap<>();
112-
for (Field field : fields)
110+
if (this.type == Type.STRUCT) {
111+
this.fields = fields == null ? Collections.<Field>emptyList() : fields;
112+
this.fieldsByName = new HashMap<>(this.fields.size());
113+
for (Field field : this.fields)
113114
fieldsByName.put(field.name(), field);
114115
} else {
116+
this.fields = null;
115117
this.fieldsByName = null;
116118
}
117119

connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ public class SchemaBuilder implements Schema {
7878

7979
private SchemaBuilder(Type type) {
8080
this.type = type;
81+
if (type == Type.STRUCT) {
82+
fields = new ArrayList<>();
83+
}
8184
}
8285

8386
// Common/metadata fields
@@ -317,8 +320,6 @@ public static SchemaBuilder struct() {
317320
public SchemaBuilder field(String fieldName, Schema fieldSchema) {
318321
if (type != Type.STRUCT)
319322
throw new SchemaBuilderException("Cannot create fields on type " + type);
320-
if (fields == null)
321-
fields = new ArrayList<>();
322323
int fieldIndex = fields.size();
323324
fields.add(new Field(fieldName, fieldIndex, fieldSchema));
324325
return this;

connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -300,4 +300,11 @@ public void testStructEquality() {
300300
assertNotEquals(s1, differentField);
301301
}
302302

303+
@Test
304+
public void testEmptyStruct() {
305+
final ConnectSchema emptyStruct = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null);
306+
assertEquals(0, emptyStruct.fields().size());
307+
new Struct(emptyStruct);
308+
}
309+
303310
}

connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,16 @@ public void testMapBuilderInvalidDefault() {
283283
.defaultValue(defMap).build();
284284
}
285285

286-
286+
@Test
287+
public void testEmptyStruct() {
288+
final SchemaBuilder emptyStructSchemaBuilder = SchemaBuilder.struct();
289+
assertEquals(0, emptyStructSchemaBuilder.fields().size());
290+
new Struct(emptyStructSchemaBuilder);
291+
292+
final Schema emptyStructSchema = emptyStructSchemaBuilder.build();
293+
assertEquals(0, emptyStructSchema.fields().size());
294+
new Struct(emptyStructSchema);
295+
}
287296

288297
private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) {
289298
assertEquals(type, schema.type());

0 commit comments

Comments
 (0)