From 8c32a10d82d1217a4f4bcc953f457cfcf9f026a0 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:08:52 +0800 Subject: [PATCH] [core] Fix ArrayIndexOutOfBoundsException when updating nested fields with null values. (#4156) --- .../aggregate/FieldNestedUpdateAgg.java | 39 ++++++++++++------ .../paimon/flink/PreAggregationITCase.java | 40 +++++++++++++++++-- 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 5e0149345eea..affc79417d9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -71,20 +71,19 @@ String name() { @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null || inputField == null) { - return accumulator == null ? inputField : accumulator; + if (accumulator == null) { + return inputField; + } + if (inputField == null) { + return accumulator; } InternalArray acc = (InternalArray) accumulator; InternalArray input = (InternalArray) inputField; - List rows = new ArrayList<>(); - for (int i = 0; i < acc.size(); i++) { - rows.add(acc.getRow(i, nestedFields)); - } - for (int i = 0; i < input.size(); i++) { - rows.add(input.getRow(i, nestedFields)); - } + List rows = new ArrayList<>(acc.size() + input.size()); + addNonNullRows(acc, rows); + addNonNullRows(input, rows); if (keyProjection != null) { Map map = new HashMap<>(); @@ -111,10 +110,11 @@ public Object retract(Object accumulator, Object retractField) { if (keyProjection == null) { checkNotNull(elementEqualiser); List rows = new ArrayList<>(); - for (int i = 0; i < acc.size(); i++) { - rows.add(acc.getRow(i, nestedFields)); - } + addNonNullRows(acc, rows); for (int i = 0; i < retract.size(); i++) { + if (retract.isNullAt(i)) { + continue; + } InternalRow retractRow = retract.getRow(i, nestedFields); rows.removeIf(next -> elementEqualiser.equals(next, retractRow)); } @@ -123,15 +123,30 @@ public Object retract(Object accumulator, Object retractField) { Map map = new HashMap<>(); for (int i = 0; i < acc.size(); i++) { + if (acc.isNullAt(i)) { + continue; + } InternalRow row = acc.getRow(i, nestedFields); map.put(keyProjection.apply(row).copy(), row); } for (int i = 0; i < retract.size(); i++) { + if (retract.isNullAt(i)) { + continue; + } map.remove(keyProjection.apply(retract.getRow(i, nestedFields))); } return new GenericArray(new ArrayList<>(map.values()).toArray()); } } + + private void addNonNullRows(InternalArray array, List rows) { + for (int i = 0; i < array.size(); i++) { + if (array.isNullAt(i)) { + continue; + } + rows.add(array.getRow(i, nestedFields)); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index e61cc65e990d..e4c90695b1b2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1321,6 +1322,39 @@ public void testUseCase() { Row.of(3, "Liu", "NanJing", 1, "12-26", "Cup", 30L)); } + @Test + public void testUseCaseWithNullValue() { + sql( + "INSERT INTO order_wide\n" + + "SELECT 6, CAST (NULL AS STRING), CAST (NULL AS STRING), " + + "ARRAY[cast(null as ROW)]"); + + List result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + + assertThat(checkOneRecord(result.get(0), 6, null, null, (Row) null)).isTrue(); + + sql( + "INSERT INTO order_wide\n" + + "SELECT 6, 'Sun', CAST (NULL AS STRING), " + + "ARRAY[ROW(1, '01-01','Apple', 6999)]"); + + result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + assertThat( + checkOneRecord( + result.get(0), + 6, + "Sun", + null, + Row.of(1, "01-01", "Apple", 6999L))) + .isTrue(); + } + @Test public void testUseCaseAppend() { sql( @@ -1429,10 +1463,10 @@ private boolean checkOneRecord( if ((int) record.getField(0) != orderId) { return false; } - if (!record.getFieldAs(1).equals(userName)) { + if (!Objects.equals(record.getFieldAs(1), userName)) { return false; } - if (!record.getFieldAs(2).equals(address)) { + if (!Objects.equals(record.getFieldAs(2), address)) { return false; } @@ -1455,7 +1489,7 @@ private boolean checkNestedTable(Row[] nestedTable, Row... subOrders) { Arrays.stream(subOrders).sorted(comparator).collect(Collectors.toList()); for (int i = 0; i < sortedActual.size(); i++) { - if (!sortedActual.get(i).equals(sortedExpected.get(i))) { + if (!Objects.equals(sortedActual.get(i), sortedExpected.get(i))) { return false; } }