Skip to content

Commit

Permalink
[WIP][flink] auto create new tag for flink checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Sep 6, 2023
1 parent 9967b53 commit 7f65a6b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class TagAutoCreation {
private final TagPeriodHandler periodHandler;
private final Duration delay;
private final Integer numRetainedMax;

private LocalDateTime nextTag;
private long nextSnapshot;

Expand Down Expand Up @@ -115,10 +114,20 @@ private TagAutoCreation(
}
}

public void refreshTags() {
Long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L);
LocalDateTime currentTime =
Instant.ofEpochMilli(System.currentTimeMillis())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
processTags(snapshotManager.snapshot(snapshotId), Optional.of(currentTime));
}

public void run() {
while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
tryToTag(snapshotManager.snapshot(nextSnapshot));
Snapshot snapshot = snapshotManager.snapshot(nextSnapshot);
processTags(snapshot, timeExtractor.extract(snapshot));
nextSnapshot++;
} else {
// avoid snapshot has been expired
Expand All @@ -132,34 +141,31 @@ public void run() {
}
}

private void tryToTag(Snapshot snapshot) {
Optional<LocalDateTime> timeOptional = timeExtractor.extract(snapshot);
if (!timeOptional.isPresent()) {
return;
}
private void processTags(Snapshot snapshot, Optional<LocalDateTime> timeOptional) {
timeOptional.ifPresent(time -> createTags(snapshot, time));
pruneOldTags();
}

LocalDateTime time = timeOptional.get();
private void createTags(Snapshot snapshot, LocalDateTime time) {
if (nextTag == null
|| isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) {
LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
String tagName = periodHandler.timeToTag(thisTag);
tagManager.createTag(snapshot, tagName);
nextTag = periodHandler.nextTagTime(thisTag);
}
}

if (numRetainedMax != null) {
SortedMap<Snapshot, String> tags = tagManager.tags();
if (tags.size() > numRetainedMax) {
int toDelete = tags.size() - numRetainedMax;
int i = 0;
for (String tag : tags.values()) {
tagManager.deleteTag(tag, tagDeletion, snapshotManager);
i++;
if (i == toDelete) {
break;
}
}
}
}
private void pruneOldTags() {
if (numRetainedMax == null) {
return;
}
SortedMap<Snapshot, String> tags = tagManager.tags();
int tagsToDelete = tags.size() - numRetainedMax;
if (tagsToDelete > 0) {
tags.values().stream()
.limit(tagsToDelete)
.forEach(tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public Path tagPath(String tagName) {
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
if (tagExists(tagName)) {
return;
}
checkArgument(
!tagName.chars().allMatch(Character::isDigit),
"Tag name cannot be pure numeric string but is '%s'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -58,31 +60,27 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";

@VisibleForTesting public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";
private static final long serialVersionUID = 1L;

private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;

private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;

private final SerializableSupplier<TagManager> tagManagerFactory;

private final SerializableSupplier<TagAutoCreation> tagAutoCreationFactory;
private final Set<Long> identifiersForTags;

protected SnapshotManager snapshotManager;

protected TagManager tagManager;

private SnapshotManager snapshotManager;
private TagManager tagManager;
private TagAutoCreation tagAutoCreation;
private transient ListState<Long> identifiersForTagsState;

public AutoTagForSavepointCommitterOperator(
CommitterOperator<CommitT, GlobalCommitT> commitOperator,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory) {
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagAutoCreation> tagAutoCreationFactory) {
this.commitOperator = commitOperator;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagAutoCreationFactory = tagAutoCreationFactory;
this.identifiersForTags = new HashSet<>();
}

Expand All @@ -94,7 +92,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
} finally {
snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();

tagAutoCreation = tagAutoCreationFactory.get();
identifiersForTagsState =
commitOperator
.getOperatorStateBackend()
Expand Down Expand Up @@ -130,6 +128,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (identifiersForTags.remove(checkpointId)) {
createTagForIdentifiers(Collections.singletonList(checkpointId));
}
if (tagAutoCreation != null) {
tagAutoCreation.refreshTags();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
Expand Down Expand Up @@ -195,7 +197,9 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
new AutoTagForSavepointCommitterOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager);
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
}
SingleOutputStreamOperator<?> committed =
written.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.ThrowingConsumer;

import org.apache.flink.core.execution.SavepointFormatType;
Expand Down Expand Up @@ -161,7 +163,9 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
(CommitterOperator<Committable, ManifestCommittable>)
super.createCommitterOperator(table, commitUser, committableStateManager),
table::snapshotManager,
table::tagManager);
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
}

@Override
Expand All @@ -175,6 +179,8 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
super.createCommitterOperator(
table, commitUser, committableStateManager, initializeFunction),
table::snapshotManager,
table::tagManager);
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
}
}

0 comments on commit 7f65a6b

Please sign in to comment.