Skip to content

Commit

Permalink
[fix] Fix writer cleaning in AbstractFileStoreWrite
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Sep 18, 2024
1 parent f5e9f25 commit 7b3d190
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;

Expand All @@ -64,6 +65,10 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {

private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);

// TODO: for write-only writers, commitUser might be some random value, because write-only
// writers won't compact files and have no conflicts. If commitUser will be used in write-only
// writers, I highly suggest a refactor to split writer class into a write-only class and a
// with-compaction class.
private final String commitUser;
protected final SnapshotManager snapshotManager;
private final FileStoreScan scan;
Expand Down Expand Up @@ -169,28 +174,50 @@ public void notifyNewFiles(
@Override
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
long latestCommittedIdentifier;
if (writers.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.mapToLong(w -> w.lastModifiedCommitIdentifier)
.max()
.orElse(Long.MIN_VALUE)
== Long.MIN_VALUE) {
// Optimization for the first commit.
//
// If this is the first commit, no writer has previous modified commit, so the value of
// `latestCommittedIdentifier` does not matter.
Function<WriterContainer<T>, Boolean> shouldCloseWriter;
if (hasCompaction()) {
long latestCommittedIdentifier;
if (writers.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.mapToLong(w -> w.lastModifiedCommitIdentifier)
.max()
.orElse(Long.MIN_VALUE)
== Long.MIN_VALUE) {
// Optimization for the first commit.
//
// If this is the first commit, no writer has previous modified commit, so the value
// of `latestCommittedIdentifier` does not matter.
//
// Without this optimization, we may need to scan through all snapshots only to find
// that there is no previous snapshot by this user, which is very inefficient.
latestCommittedIdentifier = Long.MIN_VALUE;
} else {
latestCommittedIdentifier =
snapshotManager
.latestSnapshotOfUser(commitUser)
.map(Snapshot::commitIdentifier)
.orElse(Long.MIN_VALUE);
}

if (LOG.isDebugEnabled()) {
LOG.debug("Latest committed identifier is {}", latestCommittedIdentifier);
}

// Condition 1: There is no more record waiting to be committed. Note that the condition
// is < (instead of <=), because each commit identifier may have multiple snapshots. We
// must make sure all snapshots of this identifier are committed.
//
// Without this optimization, we may need to scan through all snapshots only to find
// that there is no previous snapshot by this user, which is very inefficient.
latestCommittedIdentifier = Long.MIN_VALUE;
// Condition 2: No compaction is in progress. That is, no more changelog will be
// produced.
shouldCloseWriter =
writerContainer ->
writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
&& !writerContainer.writer.isCompacting();
} else {
latestCommittedIdentifier =
snapshotManager
.latestSnapshotOfUser(commitUser)
.map(Snapshot::commitIdentifier)
.orElse(Long.MIN_VALUE);
// No conflict for write-only writers, so we can always close writer if it has nothing
// to commit.
shouldCloseWriter = writerContainer -> true;
}

List<CommitMessage> result = new ArrayList<>();
Expand Down Expand Up @@ -226,28 +253,17 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden
result.add(committable);

if (committable.isEmpty()) {
// Condition 1: There is no more record waiting to be committed. Note that the
// condition is < (instead of <=), because each commit identifier may have
// multiple snapshots. We must make sure all snapshots of this identifier are
// committed.
// Condition 2: No compaction is in progress. That is, no more changelog will be
// produced.
if (writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
&& !writerContainer.writer.isCompacting()) {
// Clear writer if no update, and if its latest modification has committed.
//
// We need a mechanism to clear writers, otherwise there will be more and
// more such as yesterday's partition that no longer needs to be written.
// We need a mechanism to clear writers, otherwise there will be more and more
// such as yesterday's partition that no longer needs to be written.
if (shouldCloseWriter.apply(writerContainer)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing writer for partition {}, bucket {}. "
+ "Writer's last modified identifier is {}, "
+ "while latest committed identifier is {}, "
+ "current commit identifier is {}.",
+ "while current commit identifier is {}.",
partition,
bucket,
writerContainer.lastModifiedCommitIdentifier,
latestCommittedIdentifier,
commitIdentifier);
}
writerContainer.writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
private final FileIndexOptions fileIndexOptions;
private final BucketMode bucketMode;
private boolean forceBufferSpill = false;
private final boolean skipCompaction;

public AppendOnlyFileStoreWrite(
FileIO fileIO,
Expand All @@ -115,14 +114,13 @@ public AppendOnlyFileStoreWrite(
this.compactionMinFileNum = options.compactionMinFileNum();
this.compactionMaxFileNum = options.compactionMaxFileNum().orElse(5);
this.commitForceCompact = options.commitForceCompact();

// AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in
// unaware-bucket mode (no compaction and force empty-writer).
if (bucketMode == BucketMode.BUCKET_UNAWARE) {
super.withIgnorePreviousFiles(true);
this.skipCompaction = true;
} else {
this.skipCompaction = options.writeOnly();
}

this.fileCompression = options.fileCompression();
this.spillCompression = options.spillCompressOptions();
this.useWriteBuffer = options.useWriteBufferForAppend();
Expand All @@ -144,7 +142,7 @@ protected RecordWriter<InternalRow> createWriter(
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
CompactManager compactManager = new NoopCompactManager();
if (!skipCompaction) {
if (hasCompaction()) {
Function<String, DeletionVector> dvFactory =
dvMaintainer != null
? f -> dvMaintainer.deletionVectorOf(f).orElse(null)
Expand Down Expand Up @@ -259,6 +257,11 @@ public void withIgnorePreviousFiles(boolean ignorePrevious) {
super.withIgnorePreviousFiles(ignorePrevious || bucketMode == BucketMode.BUCKET_UNAWARE);
}

@Override
public boolean hasCompaction() {
return !(options.writeOnly() || bucketMode == BucketMode.BUCKET_UNAWARE);
}

@Override
protected void forceBufferSpill() throws Exception {
if (ioManager == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
*/
void write(BinaryRow partition, int bucket, T data) throws Exception;

/** If this writer will actually perform compactions. */
boolean hasCompaction();

/**
* Compact data stored in given partition and bucket. Note that compaction process is only
* submitted and may not be completed when the method returns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ private CompactManager createCompactManager(
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
if (hasCompaction()) {
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
@Nullable FieldsComparator userDefinedSeqComparator = udsComparatorSupplier.get();
CompactRewriter rewriter =
Expand All @@ -261,6 +259,8 @@ private CompactManager createCompactManager(
: compactionMetrics.createReporter(partition, bucket),
dvMaintainer,
options.prepareCommitWaitCompaction());
} else {
return new NoopCompactManager();
}
}

Expand Down Expand Up @@ -389,6 +389,11 @@ private <T> LookupLevels<T> createLookupLevels(
lookupFileCache);
}

@Override
public boolean hasCompaction() {
return !options.writeOnly();
}

@Override
public void close() throws Exception {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public SinkRecord toLogRecord(SinkRecord record) {
record.row());
}

public boolean hasCompaction() {
return write.hasCompaction();
}

@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
write.compact(partition, bucket, fullCompaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand Down Expand Up @@ -154,6 +155,7 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode)
.withBucketMode(table.bucketMode());
Preconditions.checkArgument(tableWrite.hasCompaction() == (state != null));

if (metricGroup != null) {
tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
Expand Down

0 comments on commit 7b3d190

Please sign in to comment.