diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 318c36aba2f4..04a624a13696 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -19,6 +19,8 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.utils.SerializableSupplier; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -58,31 +60,27 @@ public class AutoTagForSavepointCommitterOperator implements OneInputStreamOperator, SetupableStreamOperator, BoundedOneInput { - public static final String SAVEPOINT_TAG_PREFIX = "savepoint-"; - + @VisibleForTesting public static final String SAVEPOINT_TAG_PREFIX = "savepoint-"; private static final long serialVersionUID = 1L; - private final CommitterOperator commitOperator; - private final SerializableSupplier snapshotManagerFactory; - private final SerializableSupplier tagManagerFactory; - + private final SerializableSupplier tagAutoCreationFactory; private final Set identifiersForTags; - - protected SnapshotManager snapshotManager; - - protected TagManager tagManager; - + private SnapshotManager snapshotManager; + private TagManager tagManager; + private TagAutoCreation tagAutoCreation; private transient ListState identifiersForTagsState; public AutoTagForSavepointCommitterOperator( CommitterOperator commitOperator, SerializableSupplier snapshotManagerFactory, - SerializableSupplier tagManagerFactory) { + SerializableSupplier tagManagerFactory, + SerializableSupplier tagAutoCreationFactory) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; + this.tagAutoCreationFactory = tagAutoCreationFactory; this.identifiersForTags = new HashSet<>(); } @@ -94,7 +92,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) } finally { snapshotManager = snapshotManagerFactory.get(); tagManager = tagManagerFactory.get(); - + tagAutoCreation = tagAutoCreationFactory.get(); identifiersForTagsState = commitOperator .getOperatorStateBackend() @@ -130,6 +128,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { if (identifiersForTags.remove(checkpointId)) { createTagForIdentifiers(Collections.singletonList(checkpointId)); } + if (tagAutoCreation != null){ + tagAutoCreation.refreshTags(); + } } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 4a010a494f33..d5968bd79887 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -24,6 +24,8 @@ import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.utils.SerializableSupplier; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.ExecutionOptions; @@ -195,7 +197,9 @@ protected DataStreamSink doCommit(DataStream written, String com new AutoTagForSavepointCommitterOperator<>( (CommitterOperator) committerOperator, table::snapshotManager, - table::tagManager); + table::tagManager, + (SerializableSupplier) + () -> table.store().newTagCreationManager()); } SingleOutputStreamOperator committed = written.transform( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 7b44a1930362..61b0b1226474 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -25,6 +25,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.utils.SerializableSupplier; import org.apache.paimon.utils.ThrowingConsumer; import org.apache.flink.core.execution.SavepointFormatType; @@ -161,7 +163,9 @@ protected OneInputStreamOperator createCommitterOperat (CommitterOperator) super.createCommitterOperator(table, commitUser, committableStateManager), table::snapshotManager, - table::tagManager); + table::tagManager, + (SerializableSupplier) + () -> table.store().newTagCreationManager()); } @Override @@ -175,6 +179,8 @@ protected OneInputStreamOperator createCommitterOperat super.createCommitterOperator( table, commitUser, committableStateManager, initializeFunction), table::snapshotManager, - table::tagManager); + table::tagManager, + (SerializableSupplier) + () -> table.store().newTagCreationManager()); } }