From d0165dee9e0281c53221f433ddbc42ff18c6159e Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 15 Jan 2025 11:50:00 +0800 Subject: [PATCH] [FLINK-37130][table] Minor optimization of async state api usage in window join operator --- .../AsyncStateWindowJoinOperator.java | 52 +++---------------- 1 file changed, 8 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java index b0c5e037fb98d..5a5ee45b408d5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java @@ -49,9 +49,6 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; /** * A {@link AsyncStateWindowJoinOperator} implemented by async state api. @@ -196,7 +193,7 @@ public void onProcessingTime(InternalTimer timer) throws Exceptio @Override public void onEventTime(InternalTimer timer) throws Exception { - asyncProcessWithKey(timer.getKey(), () -> triggerJoin(timer.getNamespace())); + triggerJoin(timer.getNamespace()); } /** @@ -209,46 +206,13 @@ public void onEventTime(InternalTimer timer) throws Exception { private void triggerJoin(long window) { StateFuture> leftDataFuture = leftWindowState.asyncGet(window); StateFuture> rightDataFuture = rightWindowState.asyncGet(window); - - // join left records and right records - AtomicReference> leftDataRef = new AtomicReference<>(); - AtomicReference> rightDataRef = new AtomicReference<>(); - leftDataFuture.thenCombine( - rightDataFuture, - (leftDataIterator, rightDataIterator) -> { - StateFuture leftLoadToMemFuture; - if (leftDataIterator == null) { - leftDataRef.set(null); - leftLoadToMemFuture = StateFutureUtils.completedVoidFuture(); - } else { - leftDataRef.set(new ArrayList<>()); - leftLoadToMemFuture = - leftDataIterator.onNext( - data -> { - leftDataRef.get().add(data); - }); - } - - StateFuture rightLoadToMemFuture; - if (rightDataIterator == null) { - rightDataRef.set(null); - rightLoadToMemFuture = StateFutureUtils.completedVoidFuture(); - } else { - rightDataRef.set(new ArrayList<>()); - rightLoadToMemFuture = - rightDataIterator.onNext( - data -> { - rightDataRef.get().add(data); - }); - } - - return leftLoadToMemFuture.thenCombine( - rightLoadToMemFuture, - (VOID1, VOID2) -> { - helper.joinAndClear(window, leftDataRef.get(), rightDataRef.get()); - return null; - }); - }); + StateFutureUtils.toIterable(leftDataFuture) + .thenCombine( + StateFutureUtils.toIterable(rightDataFuture), + (leftDataIterator, rightDataIterator) -> { + helper.joinAndClear(window, leftDataIterator, rightDataIterator); + return null; + }); } private class AsyncStateWindowJoinHelper extends WindowJoinHelper {