diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncRefreshLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncRefreshLookupTable.java new file mode 100644 index 000000000000..241bf223a6eb --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/AsyncRefreshLookupTable.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.ExecutorUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT; + +/** A {@link LookupTable} supports async refresh. */ +public abstract class AsyncRefreshLookupTable implements LookupTable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncRefreshLookupTable.class); + private final FileStoreTable table; + + private final int maxPendingSnapshotCount; + + @Nullable private final ExecutorService refreshExecutor; + + private final AtomicReference cachedException; + + private Future refreshFuture; + + protected final Object lock; + protected final boolean refreshAsync; + + public AsyncRefreshLookupTable(FileStoreTable table) { + Options options = Options.fromMap(table.options()); + this.table = table; + this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); + this.refreshExecutor = + this.refreshAsync + ? Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh", + Thread.currentThread().getName()))) + : null; + this.lock = this.refreshAsync ? new Object() : null; + this.cachedException = new AtomicReference<>(); + this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT); + } + + @Override + public List get(InternalRow key) throws IOException { + List values; + if (refreshAsync) { + synchronized (lock) { + values = doGet(key); + } + } else { + values = doGet(key); + } + + return values; + } + + public abstract List doGet(InternalRow key) throws IOException; + + @Override + public void refresh() throws Exception { + if (refreshExecutor == null) { + doRefresh(); + return; + } + + Long latestSnapshotId = table.snapshotManager().latestSnapshotId(); + Long nextSnapshotId = nextSnapshotId(); + if (latestSnapshotId != null + && nextSnapshotId != null + && latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) { + LOG.warn( + "The latest snapshot id {} is much greater than the next snapshot id {} for {}}, " + + "you may need to increase the parallelism of lookup operator.", + latestSnapshotId, + nextSnapshotId, + maxPendingSnapshotCount); + sync(); + doRefresh(); + } else { + try { + refreshFuture = + refreshExecutor.submit( + () -> { + try { + doRefresh(); + } catch (Exception e) { + LOG.error( + "Refresh lookup table {} failed", table.name(), e); + cachedException.set(e); + } + }); + } catch (RejectedExecutionException e) { + LOG.warn("Add refresh task for lookup table {} failed", table.name(), e); + } + } + } + + /** Wait until the previous refresh task to be finished. */ + public void sync() throws Exception { + if (refreshFuture != null) { + this.refreshFuture.get(); + } + } + + public abstract void doRefresh() throws Exception; + + public abstract Long nextSnapshotId(); + + @Override + public void close() throws IOException { + if (refreshExecutor != null) { + ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 28b0da0d150c..7c59a3692134 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -34,7 +34,6 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ExecutorThreadFactory; -import org.apache.paimon.utils.ExecutorUtils; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.MutableObjectIterator; @@ -57,8 +56,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -66,7 +63,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT; /** Lookup table of full cache. */ -public abstract class FullCacheLookupTable implements LookupTable { +public abstract class FullCacheLookupTable extends AsyncRefreshLookupTable { private static final Logger LOG = LoggerFactory.getLogger(FullCacheLookupTable.class); protected final Object lock = new Object(); @@ -87,6 +84,7 @@ public abstract class FullCacheLookupTable implements LookupTable { private Predicate specificPartition; public FullCacheLookupTable(Context context) { + super(context.table); this.table = context.table; List sequenceFields = new ArrayList<>(); if (table.primaryKeys().size() > 0) { @@ -188,54 +186,16 @@ protected void bootstrap() throws Exception { } @Override - public void refresh() throws Exception { - if (refreshExecutor == null) { - doRefresh(); - return; + public Long nextSnapshotId() { + if (reader != null) { + return reader.nextSnapshotId(); } - Long latestSnapshotId = table.snapshotManager().latestSnapshotId(); - Long nextSnapshotId = reader.nextSnapshotId(); - if (latestSnapshotId != null - && nextSnapshotId != null - && latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) { - LOG.warn( - "The latest snapshot id {} is much greater than the next snapshot id {} for {}}, " - + "you may need to increase the parallelism of lookup operator.", - latestSnapshotId, - nextSnapshotId, - maxPendingSnapshotCount); - if (refreshFuture != null) { - // Wait the previous refresh task to be finished. - refreshFuture.get(); - } - doRefresh(); - } else { - Future currentFuture = null; - try { - currentFuture = - refreshExecutor.submit( - () -> { - try { - doRefresh(); - } catch (Exception e) { - LOG.error( - "Refresh lookup table {} failed", - context.table.name(), - e); - cachedException.set(e); - } - }); - } catch (RejectedExecutionException e) { - LOG.warn("Add refresh task for lookup table {} failed", context.table.name(), e); - } - if (currentFuture != null) { - refreshFuture = currentFuture; - } - } + return null; } - private void doRefresh() throws Exception { + @Override + public void doRefresh() throws Exception { while (true) { try (RecordReaderIterator batch = new RecordReaderIterator<>(reader.nextBatch(false))) { @@ -249,17 +209,7 @@ private void doRefresh() throws Exception { @Override public final List get(InternalRow key) throws IOException { - List values; - if (refreshAsync) { - synchronized (lock) { - values = innerGet(key); - } - } else { - values = innerGet(key); - } - if (appendUdsFieldNumber == 0) { - return values; - } + List values = super.get(key); List dropSequence = new ArrayList<>(values.size()); for (InternalRow matchedRow : values) { @@ -269,6 +219,11 @@ public final List get(InternalRow key) throws IOException { return dropSequence; } + @Override + public List doGet(InternalRow key) throws IOException { + return innerGet(key); + } + public void refresh(Iterator input) throws IOException { Predicate predicate = projectedPredicate(); while (input.hasNext()) { @@ -301,9 +256,7 @@ public Predicate projectedPredicate() { @Override public void close() throws IOException { try { - if (refreshExecutor != null) { - ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor); - } + super.close(); } finally { stateFactory.close(); FileIOUtils.deleteDirectory(context.tempPath); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index bdf0a1b4af77..87697de830c9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -51,7 +51,7 @@ import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR; /** Lookup table for primary key which supports to read the LSM tree directly. */ -public class PrimaryKeyPartialLookupTable implements LookupTable { +public class PrimaryKeyPartialLookupTable extends AsyncRefreshLookupTable { private final Function executorFactory; private final FixedBucketFromPkExtractor extractor; @@ -65,6 +65,7 @@ private PrimaryKeyPartialLookupTable( Function executorFactory, FileStoreTable table, List joinKey) { + super(table); this.executorFactory = executorFactory; if (table.bucketMode() != BucketMode.HASH_FIXED) { @@ -115,7 +116,7 @@ public void open() throws Exception { } @Override - public List get(InternalRow key) throws IOException { + public List doGet(InternalRow key) throws IOException { InternalRow adjustedKey = key; if (keyRearrange != null) { adjustedKey = keyRearrange.replaceRow(adjustedKey); @@ -138,14 +139,23 @@ public List get(InternalRow key) throws IOException { } @Override - public void refresh() { + public void doRefresh() { queryExecutor.refresh(); } + @Override + public Long nextSnapshotId() { + return queryExecutor.nextSnapshotId(); + } + @Override public void close() throws IOException { - if (queryExecutor != null) { - queryExecutor.close(); + try { + super.close(); + } finally { + if (queryExecutor != null) { + queryExecutor.close(); + } } } @@ -174,10 +184,11 @@ interface QueryExecutor extends Closeable { InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException; void refresh(); + + Long nextSnapshotId(); } static class LocalQueryExecutor implements QueryExecutor { - private final LocalTableQuery tableQuery; private final StreamTableScan scan; @@ -235,6 +246,11 @@ public void refresh() { } } + @Override + public Long nextSnapshotId() { + return scan.checkpoint(); + } + @Override public void close() throws IOException { tableQuery.close(); @@ -258,6 +274,11 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) @Override public void refresh() {} + @Override + public Long nextSnapshotId() { + return null; + } + @Override public void close() throws IOException { tableQuery.close(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 14643542e73d..19fecd8cbe6c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -591,6 +591,42 @@ public void testPartialLookupTable() throws Exception { assertThat(result).hasSize(0); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testPartialLookupTableRefreshAsync(boolean refreshAsync) throws Exception { + FileStoreTable dimTable = createDimTable(refreshAsync); + PrimaryKeyPartialLookupTable table = + PrimaryKeyPartialLookupTable.createLocalTable( + dimTable, + new int[] {0, 1, 2}, + tempDir.toFile(), + ImmutableList.of("pk1", "pk2"), + null); + table.open(); + + List result = table.get(row(1, -1)); + assertThat(result).hasSize(0); + + write(dimTable, ioManager, GenericRow.of(1, -1, 11), GenericRow.of(2, -2, 22)); + result = table.get(row(1, -1)); + assertThat(result).hasSize(0); + + table.refresh(); + table.sync(); + result = table.get(row(1, -1)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 1, -1, 11); + result = table.get(row(2, -2)); + assertThat(result).hasSize(1); + assertRow(result.get(0), 2, -2, 22); + + write(dimTable, ioManager, GenericRow.ofKind(RowKind.DELETE, 1, -1, 11)); + table.refresh(); + table.sync(); + result = table.get(row(1, -1)); + assertThat(result).hasSize(0); + } + @Test public void testPartialLookupTableWithProjection() throws Exception { FileStoreTable dimTable = createDimTable(); @@ -690,8 +726,8 @@ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception table.refresh(); Set batchKeys = new HashSet<>(); long start = System.currentTimeMillis(); + table.sync(); while (batchKeys.size() < 100_000) { - Thread.sleep(10); for (int i = 1; i <= 100_000; i++) { List result = table.get(row(i)); if (!result.isEmpty()) { @@ -723,6 +759,10 @@ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception } private FileStoreTable createDimTable() throws Exception { + return createDimTable(false); + } + + private FileStoreTable createDimTable(boolean refreshAsync) throws Exception { FileIO fileIO = LocalFileIO.create(); org.apache.paimon.fs.Path tablePath = new org.apache.paimon.fs.Path( @@ -735,6 +775,9 @@ private FileStoreTable createDimTable() throws Exception { .primaryKey("pk1", "pk2") .option(CoreOptions.BUCKET.key(), "2") .option(CoreOptions.BUCKET_KEY.key(), "pk2") + .option( + FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC.key(), + String.valueOf(refreshAsync)) .build(); TableSchema tableSchema = SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);