Skip to content

Commit

Permalink
[fix] Fix writer release in AbstractFileStoreWrite
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Sep 18, 2024
1 parent f5e9f25 commit 3eb9fe6
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;

Expand All @@ -64,10 +66,14 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {

private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);

// TODO: for write-only writers, commitUser might be some random value, because write-only
// writers won't compact files and have no conflicts. If commitUser will be used in write-only
// writers, I highly suggest a refactor to split writer class into a write-only class and a
// with-compaction class.
private final String commitUser;
protected final SnapshotManager snapshotManager;
private final FileStoreScan scan;
private final int writerNumberMax;
private final CoreOptions options;
@Nullable private final IndexMaintainer.Factory<T> indexFactory;
@Nullable private final DeletionVectorsMaintainer.Factory dvMaintainerFactory;

Expand All @@ -91,15 +97,15 @@ protected AbstractFileStoreWrite(
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName,
int writerNumberMax) {
CoreOptions options) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
this.dvMaintainerFactory = dvMaintainerFactory;
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.options = options;
}

@Override
Expand Down Expand Up @@ -169,28 +175,50 @@ public void notifyNewFiles(
@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
long latestCommittedIdentifier;
if (writers.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.mapToLong(w -> w.lastModifiedCommitIdentifier)
.max()
.orElse(Long.MIN_VALUE)
== Long.MIN_VALUE) {
// Optimization for the first commit.
//
// If this is the first commit, no writer has previous modified commit, so the value of
// `latestCommittedIdentifier` does not matter.
//
// Without this optimization, we may need to scan through all snapshots only to find
// that there is no previous snapshot by this user, which is very inefficient.
latestCommittedIdentifier = Long.MIN_VALUE;
Function<WriterContainer<T>, Boolean> shouldCloseWriter;
if (options.writeOnly()) {
// No conflict for write-only writers, so we can always close writer if it has nothing
// to commit.
shouldCloseWriter = writerContainer -> true;
} else {
latestCommittedIdentifier =
snapshotManager
.latestSnapshotOfUser(commitUser)
.map(Snapshot::commitIdentifier)
.orElse(Long.MIN_VALUE);
long latestCommittedIdentifier;
if (writers.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.mapToLong(w -> w.lastModifiedCommitIdentifier)
.max()
.orElse(Long.MIN_VALUE)
== Long.MIN_VALUE) {
// Optimization for the first commit.
//
// If this is the first commit, no writer has previous modified commit, so the value
// of `latestCommittedIdentifier` does not matter.
//
// Without this optimization, we may need to scan through all snapshots only to find
// that there is no previous snapshot by this user, which is very inefficient.
latestCommittedIdentifier = Long.MIN_VALUE;
} else {
latestCommittedIdentifier =
snapshotManager
.latestSnapshotOfUser(commitUser)
.map(Snapshot::commitIdentifier)
.orElse(Long.MIN_VALUE);
}

if (LOG.isDebugEnabled()) {
LOG.debug("Latest committed identifier is {}", latestCommittedIdentifier);
}

// Condition 1: There is no more record waiting to be committed. Note that the condition
// is < (instead of <=), because each commit identifier may have multiple snapshots. We
// must make sure all snapshots of this identifier are committed.
//
// Condition 2: No compaction is in progress. That is, no more changelog will be
// produced.
shouldCloseWriter =
writerContainer ->
writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
&& !writerContainer.writer.isCompacting();
}

List<CommitMessage> result = new ArrayList<>();
Expand Down Expand Up @@ -226,28 +254,17 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden
result.add(committable);

if (committable.isEmpty()) {
// Condition 1: There is no more record waiting to be committed. Note that the
// condition is < (instead of <=), because each commit identifier may have
// multiple snapshots. We must make sure all snapshots of this identifier are
// committed.
// Condition 2: No compaction is in progress. That is, no more changelog will be
// produced.
if (writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
&& !writerContainer.writer.isCompacting()) {
// Clear writer if no update, and if its latest modification has committed.
//
// We need a mechanism to clear writers, otherwise there will be more and
// more such as yesterday's partition that no longer needs to be written.
// We need a mechanism to clear writers, otherwise there will be more and more
// such as yesterday's partition that no longer needs to be written.
if (shouldCloseWriter.apply(writerContainer)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing writer for partition {}, bucket {}. "
+ "Writer's last modified identifier is {}, "
+ "while latest committed identifier is {}, "
+ "current commit identifier is {}.",
+ "while current commit identifier is {}.",
partition,
bucket,
writerContainer.lastModifiedCommitIdentifier,
latestCommittedIdentifier,
commitIdentifier);
}
writerContainer.writer.close();
Expand Down Expand Up @@ -384,7 +401,7 @@ public WriterContainer<T> createWriterContainer(
LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket);
}

if (!isStreamingMode && writerNumber() >= writerNumberMax) {
if (!isStreamingMode && writerNumber() >= options.writeMaxWritersToSpill()) {
try {
forceBufferSpill();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public MemoryFileStoreWrite(
indexFactory,
dvMaintainerFactory,
tableName,
options.writeMaxWritersToSpill());
options);
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
Expand Down

0 comments on commit 3eb9fe6

Please sign in to comment.