Skip to content

Commit

Permalink
[WIP][flink] auto create new tag for flink checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Sep 5, 2023
1 parent 9967b53 commit 58187d0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -58,31 +60,27 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";

@VisibleForTesting public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";
private static final long serialVersionUID = 1L;

private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;

private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;

private final SerializableSupplier<TagManager> tagManagerFactory;

private final SerializableSupplier<TagAutoCreation> tagAutoCreationFactory;
private final Set<Long> identifiersForTags;

protected SnapshotManager snapshotManager;

protected TagManager tagManager;

private SnapshotManager snapshotManager;
private TagManager tagManager;
private TagAutoCreation tagAutoCreation;
private transient ListState<Long> identifiersForTagsState;

public AutoTagForSavepointCommitterOperator(
CommitterOperator<CommitT, GlobalCommitT> commitOperator,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory) {
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagAutoCreation> tagAutoCreationFactory) {
this.commitOperator = commitOperator;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagAutoCreationFactory = tagAutoCreationFactory;
this.identifiersForTags = new HashSet<>();
}

Expand All @@ -94,7 +92,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
} finally {
snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();

tagAutoCreation = tagAutoCreationFactory.get();
identifiersForTagsState =
commitOperator
.getOperatorStateBackend()
Expand Down Expand Up @@ -130,6 +128,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (identifiersForTags.remove(checkpointId)) {
createTagForIdentifiers(Collections.singletonList(checkpointId));
}
if (tagAutoCreation != null){
tagAutoCreation.refreshTags();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
Expand Down Expand Up @@ -195,7 +197,9 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
new AutoTagForSavepointCommitterOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager);
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
}
SingleOutputStreamOperator<?> committed =
written.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.ThrowingConsumer;

import org.apache.flink.core.execution.SavepointFormatType;
Expand Down Expand Up @@ -161,7 +163,9 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
(CommitterOperator<Committable, ManifestCommittable>)
super.createCommitterOperator(table, commitUser, committableStateManager),
table::snapshotManager,
table::tagManager);
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
}

@Override
Expand All @@ -175,6 +179,8 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
super.createCommitterOperator(
table, commitUser, committableStateManager, initializeFunction),
table::snapshotManager,
table::tagManager);
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
}
}

0 comments on commit 58187d0

Please sign in to comment.