Skip to content

Commit

Permalink
[core] Support async refresh for PrimaryKeyPartialLookupTable
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Sep 25, 2024
1 parent 102e5a9 commit 99df25c
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.ExecutorUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;

/** A {@link LookupTable} supports async refresh. */
public abstract class AsyncRefreshLookupTable implements LookupTable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncRefreshLookupTable.class);
private final FileStoreTable table;

private final int maxPendingSnapshotCount;

@Nullable private final ExecutorService refreshExecutor;

private final AtomicReference<Exception> cachedException;

private Future<?> refreshFuture;

protected final Object lock;
protected final boolean refreshAsync;

public AsyncRefreshLookupTable(FileStoreTable table) {
Options options = Options.fromMap(table.options());
this.table = table;
this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
this.refreshExecutor =
this.refreshAsync
? Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
String.format(
"%s-lookup-refresh",
Thread.currentThread().getName())))
: null;
this.lock = this.refreshAsync ? new Object() : null;
this.cachedException = new AtomicReference<>();
this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
}

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> values;
if (refreshAsync) {
synchronized (lock) {
values = doGet(key);
}
} else {
values = doGet(key);
}

return values;
}

public abstract List<InternalRow> doGet(InternalRow key) throws IOException;

@Override
public void refresh() throws Exception {
if (refreshExecutor == null) {
doRefresh();
return;
}

Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
Long nextSnapshotId = nextSnapshotId();
if (latestSnapshotId != null
&& nextSnapshotId != null
&& latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) {
LOG.warn(
"The latest snapshot id {} is much greater than the next snapshot id {} for {}}, "
+ "you may need to increase the parallelism of lookup operator.",
latestSnapshotId,
nextSnapshotId,
maxPendingSnapshotCount);
sync();
doRefresh();
} else {
try {
refreshFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
} catch (Exception e) {
LOG.error(
"Refresh lookup table {} failed", table.name(), e);
cachedException.set(e);
}
});
} catch (RejectedExecutionException e) {
LOG.warn("Add refresh task for lookup table {} failed", table.name(), e);
}
}
}

/** Wait until the previous refresh task to be finished. */
public void sync() throws Exception {
if (refreshFuture != null) {
this.refreshFuture.get();
}
}

public abstract void doRefresh() throws Exception;

public abstract Long nextSnapshotId();

@Override
public void close() throws IOException {
if (refreshExecutor != null) {
ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
Expand All @@ -54,19 +52,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;

/** Lookup table of full cache. */
public abstract class FullCacheLookupTable implements LookupTable {
public abstract class FullCacheLookupTable extends AsyncRefreshLookupTable {
private static final Logger LOG = LoggerFactory.getLogger(FullCacheLookupTable.class);

protected final Object lock = new Object();
Expand All @@ -78,15 +69,12 @@ public abstract class FullCacheLookupTable implements LookupTable {
protected final int appendUdsFieldNumber;

protected RocksDBStateFactory stateFactory;
@Nullable private final ExecutorService refreshExecutor;
private final AtomicReference<Exception> cachedException;
private final int maxPendingSnapshotCount;
private final FileStoreTable table;
private Future<?> refreshFuture;
private LookupStreamingReader reader;
private Predicate specificPartition;

public FullCacheLookupTable(Context context) {
super(context.table);
this.table = context.table;
List<String> sequenceFields = new ArrayList<>();
if (table.primaryKeys().size() > 0) {
Expand Down Expand Up @@ -121,16 +109,6 @@ public FullCacheLookupTable(Context context) {
Options options = Options.fromMap(context.table.options());
this.projectedType = projectedType;
this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
this.refreshExecutor =
this.refreshAsync
? Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
String.format(
"%s-lookup-refresh",
Thread.currentThread().getName())))
: null;
this.cachedException = new AtomicReference<>();
this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
}

@Override
Expand Down Expand Up @@ -188,54 +166,16 @@ protected void bootstrap() throws Exception {
}

@Override
public void refresh() throws Exception {
if (refreshExecutor == null) {
doRefresh();
return;
public Long nextSnapshotId() {
if (reader != null) {
return reader.nextSnapshotId();
}

Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
Long nextSnapshotId = reader.nextSnapshotId();
if (latestSnapshotId != null
&& nextSnapshotId != null
&& latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) {
LOG.warn(
"The latest snapshot id {} is much greater than the next snapshot id {} for {}}, "
+ "you may need to increase the parallelism of lookup operator.",
latestSnapshotId,
nextSnapshotId,
maxPendingSnapshotCount);
if (refreshFuture != null) {
// Wait the previous refresh task to be finished.
refreshFuture.get();
}
doRefresh();
} else {
Future<?> currentFuture = null;
try {
currentFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
} catch (Exception e) {
LOG.error(
"Refresh lookup table {} failed",
context.table.name(),
e);
cachedException.set(e);
}
});
} catch (RejectedExecutionException e) {
LOG.warn("Add refresh task for lookup table {} failed", context.table.name(), e);
}
if (currentFuture != null) {
refreshFuture = currentFuture;
}
}
return null;
}

private void doRefresh() throws Exception {
@Override
public void doRefresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(reader.nextBatch(false))) {
Expand All @@ -249,14 +189,7 @@ private void doRefresh() throws Exception {

@Override
public final List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> values;
if (refreshAsync) {
synchronized (lock) {
values = innerGet(key);
}
} else {
values = innerGet(key);
}
List<InternalRow> values = super.get(key);
if (appendUdsFieldNumber == 0) {
return values;
}
Expand All @@ -269,6 +202,11 @@ public final List<InternalRow> get(InternalRow key) throws IOException {
return dropSequence;
}

@Override
public List<InternalRow> doGet(InternalRow key) throws IOException {
return innerGet(key);
}

public void refresh(Iterator<InternalRow> input) throws IOException {
Predicate predicate = projectedPredicate();
while (input.hasNext()) {
Expand Down Expand Up @@ -301,9 +239,7 @@ public Predicate projectedPredicate() {
@Override
public void close() throws IOException {
try {
if (refreshExecutor != null) {
ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor);
}
super.close();
} finally {
stateFactory.close();
FileIOUtils.deleteDirectory(context.tempPath);
Expand Down
Loading

0 comments on commit 99df25c

Please sign in to comment.