Skip to content

Commit

Permalink
[core] Support async refresh for PrimaryKeyPartialLookupTable
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Sep 18, 2024
1 parent 102e5a9 commit a623f81
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.ExecutorUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;

/** A {@link LookupTable} supports async refresh. */
public abstract class AsyncRefreshLookupTable implements LookupTable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncRefreshLookupTable.class);
private final FileStoreTable table;

private final int maxPendingSnapshotCount;

@Nullable private final ExecutorService refreshExecutor;

private final AtomicReference<Exception> cachedException;

private Future<?> refreshFuture;

protected final Object lock;
protected final boolean refreshAsync;

public AsyncRefreshLookupTable(FileStoreTable table) {
Options options = Options.fromMap(table.options());
this.table = table;
this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
this.refreshExecutor =
this.refreshAsync
? Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
String.format(
"%s-lookup-refresh",
Thread.currentThread().getName())))
: null;
this.lock = this.refreshAsync ? new Object() : null;
this.cachedException = new AtomicReference<>();
this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
}

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> values;
if (refreshAsync) {
synchronized (lock) {
values = doGet(key);
}
} else {
values = doGet(key);
}

return values;
}

public abstract List<InternalRow> doGet(InternalRow key) throws IOException;

@Override
public void refresh() throws Exception {
if (refreshExecutor == null) {
doRefresh();
return;
}

Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
Long nextSnapshotId = nextSnapshotId();
if (latestSnapshotId != null
&& nextSnapshotId != null
&& latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) {
LOG.warn(
"The latest snapshot id {} is much greater than the next snapshot id {} for {}}, "
+ "you may need to increase the parallelism of lookup operator.",
latestSnapshotId,
nextSnapshotId,
maxPendingSnapshotCount);
sync();
doRefresh();
} else {
try {
refreshFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
} catch (Exception e) {
LOG.error(
"Refresh lookup table {} failed", table.name(), e);
cachedException.set(e);
}
});
} catch (RejectedExecutionException e) {
LOG.warn("Add refresh task for lookup table {} failed", table.name(), e);
}
}
}

/** Wait until the previous refresh task to be finished. */
public void sync() throws Exception {
if (refreshFuture != null) {
this.refreshFuture.get();
}
}

public abstract void doRefresh() throws Exception;

public abstract Long nextSnapshotId();

@Override
public void close() throws IOException {
if (refreshExecutor != null) {
ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
Expand All @@ -57,16 +56,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;

/** Lookup table of full cache. */
public abstract class FullCacheLookupTable implements LookupTable {
public abstract class FullCacheLookupTable extends AsyncRefreshLookupTable {
private static final Logger LOG = LoggerFactory.getLogger(FullCacheLookupTable.class);

protected final Object lock = new Object();
Expand All @@ -87,6 +84,7 @@ public abstract class FullCacheLookupTable implements LookupTable {
private Predicate specificPartition;

public FullCacheLookupTable(Context context) {
super(context.table);
this.table = context.table;
List<String> sequenceFields = new ArrayList<>();
if (table.primaryKeys().size() > 0) {
Expand Down Expand Up @@ -188,54 +186,16 @@ protected void bootstrap() throws Exception {
}

@Override
public void refresh() throws Exception {
if (refreshExecutor == null) {
doRefresh();
return;
public Long nextSnapshotId() {
if (reader != null) {
return reader.nextSnapshotId();
}

Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
Long nextSnapshotId = reader.nextSnapshotId();
if (latestSnapshotId != null
&& nextSnapshotId != null
&& latestSnapshotId - nextSnapshotId > maxPendingSnapshotCount) {
LOG.warn(
"The latest snapshot id {} is much greater than the next snapshot id {} for {}}, "
+ "you may need to increase the parallelism of lookup operator.",
latestSnapshotId,
nextSnapshotId,
maxPendingSnapshotCount);
if (refreshFuture != null) {
// Wait the previous refresh task to be finished.
refreshFuture.get();
}
doRefresh();
} else {
Future<?> currentFuture = null;
try {
currentFuture =
refreshExecutor.submit(
() -> {
try {
doRefresh();
} catch (Exception e) {
LOG.error(
"Refresh lookup table {} failed",
context.table.name(),
e);
cachedException.set(e);
}
});
} catch (RejectedExecutionException e) {
LOG.warn("Add refresh task for lookup table {} failed", context.table.name(), e);
}
if (currentFuture != null) {
refreshFuture = currentFuture;
}
}
return null;
}

private void doRefresh() throws Exception {
@Override
public void doRefresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(reader.nextBatch(false))) {
Expand All @@ -249,17 +209,7 @@ private void doRefresh() throws Exception {

@Override
public final List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> values;
if (refreshAsync) {
synchronized (lock) {
values = innerGet(key);
}
} else {
values = innerGet(key);
}
if (appendUdsFieldNumber == 0) {
return values;
}
List<InternalRow> values = super.get(key);

List<InternalRow> dropSequence = new ArrayList<>(values.size());
for (InternalRow matchedRow : values) {
Expand All @@ -269,6 +219,11 @@ public final List<InternalRow> get(InternalRow key) throws IOException {
return dropSequence;
}

@Override
public List<InternalRow> doGet(InternalRow key) throws IOException {
return innerGet(key);
}

public void refresh(Iterator<InternalRow> input) throws IOException {
Predicate predicate = projectedPredicate();
while (input.hasNext()) {
Expand Down Expand Up @@ -301,9 +256,7 @@ public Predicate projectedPredicate() {
@Override
public void close() throws IOException {
try {
if (refreshExecutor != null) {
ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, refreshExecutor);
}
super.close();
} finally {
stateFactory.close();
FileIOUtils.deleteDirectory(context.tempPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import static org.apache.paimon.CoreOptions.StreamScanMode.FILE_MONITOR;

/** Lookup table for primary key which supports to read the LSM tree directly. */
public class PrimaryKeyPartialLookupTable implements LookupTable {
public class PrimaryKeyPartialLookupTable extends AsyncRefreshLookupTable {

private final Function<Predicate, QueryExecutor> executorFactory;
private final FixedBucketFromPkExtractor extractor;
Expand All @@ -65,6 +65,7 @@ private PrimaryKeyPartialLookupTable(
Function<Predicate, QueryExecutor> executorFactory,
FileStoreTable table,
List<String> joinKey) {
super(table);
this.executorFactory = executorFactory;

if (table.bucketMode() != BucketMode.HASH_FIXED) {
Expand Down Expand Up @@ -115,7 +116,7 @@ public void open() throws Exception {
}

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
public List<InternalRow> doGet(InternalRow key) throws IOException {
InternalRow adjustedKey = key;
if (keyRearrange != null) {
adjustedKey = keyRearrange.replaceRow(adjustedKey);
Expand All @@ -138,14 +139,23 @@ public List<InternalRow> get(InternalRow key) throws IOException {
}

@Override
public void refresh() {
public void doRefresh() {
queryExecutor.refresh();
}

@Override
public Long nextSnapshotId() {
return queryExecutor.nextSnapshotId();
}

@Override
public void close() throws IOException {
if (queryExecutor != null) {
queryExecutor.close();
try {
super.close();
} finally {
if (queryExecutor != null) {
queryExecutor.close();
}
}
}

Expand Down Expand Up @@ -174,10 +184,11 @@ interface QueryExecutor extends Closeable {
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException;

void refresh();

Long nextSnapshotId();
}

static class LocalQueryExecutor implements QueryExecutor {

private final LocalTableQuery tableQuery;
private final StreamTableScan scan;

Expand Down Expand Up @@ -235,6 +246,11 @@ public void refresh() {
}
}

@Override
public Long nextSnapshotId() {
return scan.checkpoint();
}

@Override
public void close() throws IOException {
tableQuery.close();
Expand All @@ -258,6 +274,11 @@ public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)
@Override
public void refresh() {}

@Override
public Long nextSnapshotId() {
return null;
}

@Override
public void close() throws IOException {
tableQuery.close();
Expand Down
Loading

0 comments on commit a623f81

Please sign in to comment.