From fc9a532bd17d774ba213d2f312de1b238e64ec1c Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 10 Sep 2024 15:49:06 +0800 Subject: [PATCH 1/3] Fix ArrayIndexOutOfBoundsException when updating nested fields with null values. --- .../mergetree/compact/aggregate/FieldNestedUpdateAgg.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 5e0149345eea..3bc5a85279cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -80,9 +80,15 @@ public Object agg(Object accumulator, Object inputField) { List rows = new ArrayList<>(); for (int i = 0; i < acc.size(); i++) { + if (acc.isNullAt(i)) { + continue; + } rows.add(acc.getRow(i, nestedFields)); } for (int i = 0; i < input.size(); i++) { + if (input.isNullAt(i)) { + continue; + } rows.add(input.getRow(i, nestedFields)); } From 7c5a64469e387b945a9741ed63cfed40d69f5efa Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 10 Sep 2024 17:58:45 +0800 Subject: [PATCH 2/3] add test --- .../aggregate/FieldNestedUpdateAgg.java | 45 ++++++++++-------- .../paimon/flink/PreAggregationITCase.java | 46 +++++++++++++++---- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 3bc5a85279cb..affc79417d9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -71,26 +71,19 @@ String name() { @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null || inputField == null) { - return accumulator == null ? inputField : accumulator; + if (accumulator == null) { + return inputField; + } + if (inputField == null) { + return accumulator; } InternalArray acc = (InternalArray) accumulator; InternalArray input = (InternalArray) inputField; - List rows = new ArrayList<>(); - for (int i = 0; i < acc.size(); i++) { - if (acc.isNullAt(i)) { - continue; - } - rows.add(acc.getRow(i, nestedFields)); - } - for (int i = 0; i < input.size(); i++) { - if (input.isNullAt(i)) { - continue; - } - rows.add(input.getRow(i, nestedFields)); - } + List rows = new ArrayList<>(acc.size() + input.size()); + addNonNullRows(acc, rows); + addNonNullRows(input, rows); if (keyProjection != null) { Map map = new HashMap<>(); @@ -117,10 +110,11 @@ public Object retract(Object accumulator, Object retractField) { if (keyProjection == null) { checkNotNull(elementEqualiser); List rows = new ArrayList<>(); - for (int i = 0; i < acc.size(); i++) { - rows.add(acc.getRow(i, nestedFields)); - } + addNonNullRows(acc, rows); for (int i = 0; i < retract.size(); i++) { + if (retract.isNullAt(i)) { + continue; + } InternalRow retractRow = retract.getRow(i, nestedFields); rows.removeIf(next -> elementEqualiser.equals(next, retractRow)); } @@ -129,15 +123,30 @@ public Object retract(Object accumulator, Object retractField) { Map map = new HashMap<>(); for (int i = 0; i < acc.size(); i++) { + if (acc.isNullAt(i)) { + continue; + } InternalRow row = acc.getRow(i, nestedFields); map.put(keyProjection.apply(row).copy(), row); } for (int i = 0; i < retract.size(); i++) { + if (retract.isNullAt(i)) { + continue; + } map.remove(keyProjection.apply(retract.getRow(i, nestedFields))); } return new GenericArray(new ArrayList<>(map.values()).toArray()); } } + + private void addNonNullRows(InternalArray array, List rows) { + for (int i = 0; i < array.size(); i++) { + if (array.isNullAt(i)) { + continue; + } + rows.add(array.getRow(i, nestedFields)); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index e61cc65e990d..3d864f274738 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -41,12 +41,7 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1321,6 +1316,39 @@ public void testUseCase() { Row.of(3, "Liu", "NanJing", 1, "12-26", "Cup", 30L)); } + @Test + public void testUseCaseWithNullValue() { + sql( + "INSERT INTO order_wide\n" + + "SELECT 6, CAST (NULL AS STRING), CAST (NULL AS STRING), " + + "ARRAY[cast(null as ROW)]"); + + List result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + + assertThat(checkOneRecord(result.get(0), 6, null, null, (Row) null)).isTrue(); + + sql( + "INSERT INTO order_wide\n" + + "SELECT 6, 'Sun', CAST (NULL AS STRING), " + + "ARRAY[ROW(1, '01-01','Apple', 6999)]"); + + result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + assertThat( + checkOneRecord( + result.get(0), + 6, + "Sun", + null, + Row.of(1, "01-01", "Apple", 6999L))) + .isTrue(); + } + @Test public void testUseCaseAppend() { sql( @@ -1429,10 +1457,10 @@ private boolean checkOneRecord( if ((int) record.getField(0) != orderId) { return false; } - if (!record.getFieldAs(1).equals(userName)) { + if (!Objects.equals(record.getFieldAs(1), userName)) { return false; } - if (!record.getFieldAs(2).equals(address)) { + if (!Objects.equals(record.getFieldAs(2), address)) { return false; } @@ -1455,7 +1483,7 @@ private boolean checkNestedTable(Row[] nestedTable, Row... subOrders) { Arrays.stream(subOrders).sorted(comparator).collect(Collectors.toList()); for (int i = 0; i < sortedActual.size(); i++) { - if (!sortedActual.get(i).equals(sortedExpected.get(i))) { + if (!Objects.equals(sortedActual.get(i), sortedExpected.get(i))) { return false; } } From 7ad7f523e1fbd8d1b647136948c0ceb3ccb020f3 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Tue, 10 Sep 2024 18:03:51 +0800 Subject: [PATCH 3/3] fix code style --- .../org/apache/paimon/flink/PreAggregationITCase.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index 3d864f274738..e4c90695b1b2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -41,7 +41,13 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.IntStream;