Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Sep 14, 2023
1 parent 2debf2b commit 02dca98
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 34 deletions.
9 changes: 3 additions & 6 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,11 @@ Paimon's snapshot is similar to flink's checkpoint, and both will automatically
allows snapshots to be retained for a long time. Therefore, we can combine the two features of paimon's tag and flink's
savepoint to achieve incremental recovery of job from the specified savepoint.
At the same time to open the change parameter will also promote the automatic creation of the tag, now although open the "tag.automatic-creation" parameter but,
the automatic creation of the tag is based on the write data to trigger the creation. This can't be used in a batch processing scenario
because batch processing requires an accurate trigger signal. When no data is written into the stream table, the auto-generation
of tags is interrupted. Therefore, we need a mechanism for generating tags based on checkpoints. This parameter can address the aforementioned issue.
**Step 1: Enable automatically create tags for savepoint.**
You can set `sink.savepoint.auto-tag` to `true` to enable the feature of automatically creating tags for savepoint.
You can set the `sink.savepoint.auto-tag` parameter to `true` to enable the feature that automatically creates tags for savepoints.
Enabling this option will also facilitate the automatic creation of labels.
For instance, it will force the creation of a tag for the `tag.automatic-creation=process-time` mode at the appropriate time, even when no data has been written.
**Step 2: Trigger savepoint.**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,26 @@ private TagAutoCreation(
}
}

public void refreshTags() {
Long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L);
public void run() {
long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L);
LocalDateTime currentTime =
Instant.ofEpochMilli(System.currentTimeMillis())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
processTags(snapshotManager.snapshot(snapshotId), Optional.of(currentTime));
}

public void run() {
while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
Snapshot snapshot = snapshotManager.snapshot(nextSnapshot);
processTags(snapshot, timeExtractor.extract(snapshot));
nextSnapshot++;
} else {
// avoid snapshot has been expired
Long earliest = snapshotManager.earliestSnapshotId();
if (earliest != null && earliest > nextSnapshot) {
nextSnapshot = earliest;
} else {
if (snapshotManager.snapshotExists(snapshotId)) {
processTags(snapshotManager.snapshot(snapshotId), Optional.of(currentTime));
}
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ public Path tagPath(String tagName) {
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
if (tagExists(tagName)) {
return;
}
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
checkArgument(
!tagName.chars().allMatch(Character::isDigit),
"Tag name cannot be pure numeric string but is '%s'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -61,26 +63,36 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
SetupableStreamOperator,
BoundedOneInput {
@VisibleForTesting public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";

private static final String COMMIT_FIELD_NAME = "commit";

private static final String TAG_AUTO_CREATION_FIELD_NAME = "tagAutoCreation";

private static final long serialVersionUID = 1L;

private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;

private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;

private final SerializableSupplier<TagManager> tagManagerFactory;
private final SerializableSupplier<TagAutoCreation> tagAutoCreationFactory;

private final Set<Long> identifiersForTags;

private SnapshotManager snapshotManager;

private TagManager tagManager;

private TagAutoCreation tagAutoCreation;

private transient ListState<Long> identifiersForTagsState;

public AutoTagForSavepointCommitterOperator(
CommitterOperator<CommitT, GlobalCommitT> commitOperator,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagAutoCreation> tagAutoCreationFactory) {
SerializableSupplier<TagManager> tagManagerFactory) {
this.commitOperator = commitOperator;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagAutoCreationFactory = tagAutoCreationFactory;
this.identifiersForTags = new HashSet<>();
}

Expand All @@ -90,9 +102,22 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
try {
commitOperator.initializeState(streamTaskStateManager);
} finally {
Committer<CommitT, GlobalCommitT> committer = commitOperator.committer;

Class<?> storeCommitterClass = StoreCommitter.class;
Class<?> tableCommitImplClass = TableCommitImpl.class;

Field commitField = storeCommitterClass.getDeclaredField(COMMIT_FIELD_NAME);
commitField.setAccessible(true);
Object commit = commitField.get(committer);

Field tagAutoCreationField =
tableCommitImplClass.getDeclaredField(TAG_AUTO_CREATION_FIELD_NAME);
tagAutoCreationField.setAccessible(true);
tagAutoCreation = (TagAutoCreation) tagAutoCreationField.get(commit);

snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();
tagAutoCreation = tagAutoCreationFactory.get();
identifiersForTagsState =
commitOperator
.getOperatorStateBackend()
Expand Down Expand Up @@ -129,7 +154,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
createTagForIdentifiers(Collections.singletonList(checkpointId));
}
if (tagAutoCreation != null) {
tagAutoCreation.refreshTags();
tagAutoCreation.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
Expand Down Expand Up @@ -202,9 +200,7 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
new AutoTagForSavepointCommitterOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
table::tagManager);
}
SingleOutputStreamOperator<?> committed =
written.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.ThrowingConsumer;

import org.apache.flink.core.execution.SavepointFormatType;
Expand Down Expand Up @@ -163,9 +161,7 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
(CommitterOperator<Committable, ManifestCommittable>)
super.createCommitterOperator(table, commitUser, committableStateManager),
table::snapshotManager,
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
table::tagManager);
}

@Override
Expand All @@ -179,8 +175,6 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
super.createCommitterOperator(
table, commitUser, committableStateManager, initializeFunction),
table::snapshotManager,
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
table::tagManager);
}
}

0 comments on commit 02dca98

Please sign in to comment.