Skip to content

Commit

Permalink
[flink] Remove Flink state from write-only writers
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Sep 13, 2024
1 parent 647865f commit 7298661
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand Down Expand Up @@ -91,6 +92,8 @@ public void snapshotState() throws Exception {
partitions.getKey(), bucket, new byte[0]));
}
}
state.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList);
Preconditions.checkNotNull(
state, "State is null for AsyncLookupSinkWrite. This is unexpected.")
.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,22 @@ private StoreSinkWrite.Provider createWriteProvider(
metricGroup);
};
}
}

if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new AsyncLookupSinkWrite(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new AsyncLookupSinkWrite(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
isStreaming,
memoryPool,
metricGroup);
};
}
}

return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -253,7 +254,10 @@ public void snapshotState() throws Exception {
bucket.f0, bucket.f1, longToBytes(entry.getKey())));
}
}
state.put(tableName, WRITTEN_BUCKETS_STATE_NAME, writtenBucketList);
Preconditions.checkNotNull(
state,
"State is null for GlobalFullCompactionSinkWrite. This is unexpected.")
.put(tableName, WRITTEN_BUCKETS_STATE_NAME, writtenBucketList);
}

private static byte[] longToBytes(long l) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ interface Provider extends Serializable {
StoreSinkWrite provide(
FileStoreTable table,
String commitUser,
StoreSinkWriteState state,
@Nullable StoreSinkWriteState state,
IOManager ioManager,
@Nullable MemorySegmentPool memoryPool,
@Nullable MetricGroup metricGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
private static final Logger LOG = LoggerFactory.getLogger(StoreSinkWriteImpl.class);

protected final String commitUser;
protected final StoreSinkWriteState state;
@Nullable protected final StoreSinkWriteState state;
private final IOManagerImpl paimonIOManager;
private final boolean ignorePreviousFiles;
private final boolean waitCompaction;
Expand All @@ -68,7 +69,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
public StoreSinkWriteImpl(
FileStoreTable table,
String commitUser,
StoreSinkWriteState state,
@Nullable StoreSinkWriteState state,
IOManager ioManager,
boolean ignorePreviousFiles,
boolean waitCompaction,
Expand All @@ -91,7 +92,7 @@ public StoreSinkWriteImpl(
public StoreSinkWriteImpl(
FileStoreTable table,
String commitUser,
StoreSinkWriteState state,
@Nullable StoreSinkWriteState state,
IOManager ioManager,
boolean ignorePreviousFiles,
boolean waitCompaction,
Expand All @@ -114,7 +115,7 @@ public StoreSinkWriteImpl(
private StoreSinkWriteImpl(
FileStoreTable table,
String commitUser,
StoreSinkWriteState state,
@Nullable StoreSinkWriteState state,
IOManager ioManager,
boolean ignorePreviousFiles,
boolean waitCompaction,
Expand All @@ -139,11 +140,16 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
!(memoryPool != null && memoryPoolFactory != null),
"memoryPool and memoryPoolFactory cannot be set at the same time.");

ManifestCacheFilter manifestFilter;
if (state == null) {
manifestFilter = (part, bucket) -> true;
} else {
manifestFilter =
(part, bucket) -> state.stateValueFilter().filter(table.name(), part, bucket);
}

TableWriteImpl<?> tableWrite =
table.newWrite(
commitUser,
(part, bucket) ->
state.stateValueFilter().filter(table.name(), part, bucket))
table.newWrite(commitUser, manifestFilter)
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.UUID;

/** An abstract class for table write operator. */
public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, Committable> {
Expand All @@ -39,7 +42,7 @@ public abstract class TableWriteOperator<IN> extends PrepareCommitOperator<IN, C
private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;

private transient StoreSinkWriteState state;
@Nullable private transient StoreSinkWriteState state;
protected transient StoreSinkWrite write;

public TableWriteOperator(
Expand All @@ -52,37 +55,54 @@ public TableWriteOperator(
this.initialCommitUser = initialCommitUser;
}

private boolean needState() {
if (table.coreOptions().writeOnly()) {
// Commit user for writers are used to avoid conflicts.
// Write-only writers won't cause conflicts, so there is no need for commit user.
return false;
}
if (table.schema().primaryKeys().isEmpty()) {
// Unaware bucket writer is actually a write-only writer.
return table.coreOptions().bucket() != -1;
}
return true;
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);

// Each job can only have one user name and this name must be consistent across restarts.
// We cannot use job id as commit user name here because user may change job id by creating
// a savepoint, stop the job and then resume from savepoint.
String commitUser =
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);

boolean containLogSystem = containLogSystem();
int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
StateValueFilter stateFilter =
(tableName, partition, bucket) -> {
int task =
containLogSystem
? ChannelComputer.select(bucket, numTasks)
: ChannelComputer.select(partition, bucket, numTasks);
return task == getRuntimeContext().getIndexOfThisSubtask();
};

initStateAndWriter(
context,
stateFilter,
getContainingTask().getEnvironment().getIOManager(),
commitUser);
if (needState()) {
// Each job can only have one username and this name must be consistent across restarts.
// We cannot use job id as commit username here because user may change job id by
// creating a savepoint, stop the job and then resume from savepoint.
String commitUser =
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);

boolean containLogSystem = containLogSystem();
int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
StateValueFilter stateFilter =
(tableName, partition, bucket) -> {
int task =
containLogSystem
? ChannelComputer.select(bucket, numTasks)
: ChannelComputer.select(partition, bucket, numTasks);
return task == getRuntimeContext().getIndexOfThisSubtask();
};

initStateAndWriterWithState(
context,
stateFilter,
getContainingTask().getEnvironment().getIOManager(),
commitUser);
} else {
initStateAndWriterWithoutState(getContainingTask().getEnvironment().getIOManager());
}
}

@VisibleForTesting
void initStateAndWriter(
void initStateAndWriterWithState(
StateInitializationContext context,
StateValueFilter stateFilter,
IOManager ioManager,
Expand All @@ -91,20 +111,32 @@ void initStateAndWriter(
// We put state and write init in this method for convenient testing. Without construct a
// runtime context, we can test to construct a writer here
state = new StoreSinkWriteState(context, stateFilter);

write =
storeSinkWriteProvider.provide(
table, commitUser, state, ioManager, memoryPool, getMetricGroup());
}

private void initStateAndWriterWithoutState(IOManager ioManager) {
write =
storeSinkWriteProvider.provide(
table,
// Commit user is meaningless for writers without state. See `needState`.
UUID.randomUUID().toString(),
null,
ioManager,
memoryPool,
getMetricGroup());
}

protected abstract boolean containLogSystem();

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);

write.snapshotState();
state.snapshotState();
if (state != null) {
state.snapshotState();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public <S> ListState<S> getUnionListState(
null,
null,
null);
operator.initStateAndWriter(context, (a, b, c) -> true, new IOManagerAsync(), "123");
operator.initStateAndWriterWithState(
context, (a, b, c) -> true, new IOManagerAsync(), "123");
return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl) operator.write).write.getWrite())
.bufferSpillable();
}
Expand Down

0 comments on commit 7298661

Please sign in to comment.