Skip to content

Commit

Permalink
[FLINK-37130][table] Minor optimization of async state api usage in w…
Browse files Browse the repository at this point in the history
…indow join operator
  • Loading branch information
Zakelly committed Jan 15, 2025
1 parent 2f2018f commit d0165de
Showing 1 changed file with 8 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* A {@link AsyncStateWindowJoinOperator} implemented by async state api.
Expand Down Expand Up @@ -196,7 +193,7 @@ public void onProcessingTime(InternalTimer<RowData, Long> timer) throws Exceptio

@Override
public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception {
asyncProcessWithKey(timer.getKey(), () -> triggerJoin(timer.getNamespace()));
triggerJoin(timer.getNamespace());
}

/**
Expand All @@ -209,46 +206,13 @@ public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception {
private void triggerJoin(long window) {
StateFuture<StateIterator<RowData>> leftDataFuture = leftWindowState.asyncGet(window);
StateFuture<StateIterator<RowData>> rightDataFuture = rightWindowState.asyncGet(window);

// join left records and right records
AtomicReference<List<RowData>> leftDataRef = new AtomicReference<>();
AtomicReference<List<RowData>> rightDataRef = new AtomicReference<>();
leftDataFuture.thenCombine(
rightDataFuture,
(leftDataIterator, rightDataIterator) -> {
StateFuture<Void> leftLoadToMemFuture;
if (leftDataIterator == null) {
leftDataRef.set(null);
leftLoadToMemFuture = StateFutureUtils.completedVoidFuture();
} else {
leftDataRef.set(new ArrayList<>());
leftLoadToMemFuture =
leftDataIterator.onNext(
data -> {
leftDataRef.get().add(data);
});
}

StateFuture<Void> rightLoadToMemFuture;
if (rightDataIterator == null) {
rightDataRef.set(null);
rightLoadToMemFuture = StateFutureUtils.completedVoidFuture();
} else {
rightDataRef.set(new ArrayList<>());
rightLoadToMemFuture =
rightDataIterator.onNext(
data -> {
rightDataRef.get().add(data);
});
}

return leftLoadToMemFuture.thenCombine(
rightLoadToMemFuture,
(VOID1, VOID2) -> {
helper.joinAndClear(window, leftDataRef.get(), rightDataRef.get());
return null;
});
});
StateFutureUtils.toIterable(leftDataFuture)
.thenCombine(
StateFutureUtils.toIterable(rightDataFuture),
(leftDataIterator, rightDataIterator) -> {
helper.joinAndClear(window, leftDataIterator, rightDataIterator);
return null;
});
}

private class AsyncStateWindowJoinHelper extends WindowJoinHelper {
Expand Down

0 comments on commit d0165de

Please sign in to comment.