-
Notifications
You must be signed in to change notification settings - Fork 949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Auto create new tag for flink checkpoint #1949
Conversation
58187d0
to
7f65a6b
Compare
7f65a6b
to
0af9574
Compare
@@ -257,6 +257,11 @@ Paimon's snapshot is similar to flink's checkpoint, and both will automatically | |||
allows snapshots to be retained for a long time. Therefore, we can combine the two features of paimon's tag and flink's | |||
savepoint to achieve incremental recovery of job from the specified savepoint. | |||
|
|||
At the same time to open the change parameter will also promote the automatic creation of the tag, now although open the "tag.automatic-creation" parameter but, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this sentence.. there is no sink.savepoint.auto-tag
here.
Document after You can set
sink.savepoint.auto-tagto
true to enable the feature of automatically creating tags for savepoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -257,6 +257,11 @@ Paimon's snapshot is similar to flink's checkpoint, and both will automatically | |||
allows snapshots to be retained for a long time. Therefore, we can combine the two features of paimon's tag and flink's | |||
savepoint to achieve incremental recovery of job from the specified savepoint. | |||
|
|||
At the same time to open the change parameter will also promote the automatic creation of the tag, now although open the "tag.automatic-creation" parameter but, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just simplify documentation. For example: This option will also force creating a Tag for the tag.automatic-creation=process-time mode at an appropriate time even when no data is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private static final long serialVersionUID = 1L; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't remove this empty line, this is just personal code style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -61,7 +61,9 @@ public Path tagPath(String tagName) { | |||
/** Create a tag from given snapshot and save it in the storage. */ | |||
public void createTag(Snapshot snapshot, String tagName) { | |||
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); | |||
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName); | |||
if (tagExists(tagName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -115,10 +114,20 @@ private TagAutoCreation( | |||
} | |||
} | |||
|
|||
public void refreshTags() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not add new method for TagAutoCreation
.
We should just invoke run
.
We can create a method:
private void tryToTag(Snapshot snapshot, LocalDateTime time) {
if (nextTag == null
|| isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) {
LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
String tagName = periodHandler.timeToTag(thisTag);
tagManager.createTag(snapshot, tagName);
nextTag = periodHandler.nextTagTime(thisTag);
if (numRetainedMax != null) {
SortedMap<Snapshot, String> tags = tagManager.tags();
if (tags.size() > numRetainedMax) {
int toDelete = tags.size() - numRetainedMax;
int i = 0;
for (String tag : tags.values()) {
tagManager.deleteTag(tag, tagDeletion, snapshotManager);
i++;
if (i == toDelete) {
break;
}
}
}
}
}
}
And in run
:
public void run() {
while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
tryToTag(snapshotManager.snapshot(nextSnapshot));
nextSnapshot++;
} else {
// avoid snapshot has been expired
Long earliest = snapshotManager.earliestSnapshotId();
if (earliest != null && earliest > nextSnapshot) {
nextSnapshot = earliest;
} else {
break;
}
}
}
if (timeExtractor instanceof ProcessTimeExtractor) {
tryToTag(snapshotManager.snapshot(nextSnapshot - 1), LocalDateTime.now());
}
}
Something like above codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -94,7 +92,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) | |||
} finally { | |||
snapshotManager = snapshotManagerFactory.get(); | |||
tagManager = tagManagerFactory.get(); | |||
|
|||
tagAutoCreation = tagAutoCreationFactory.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can try to extract tagAutoCreation
from CommitterOperator
. It is better to maintain only one object for in the committer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
ede1ce7
to
df865ed
Compare
7c73f02
to
02dca98
Compare
cb7ef14
to
cd40e8c
Compare
@JingsongLi We conducted a 10-day online test, and the results met our expectations. |
cd40e8c
to
e5f2474
Compare
e5f2474
to
6c9b0fe
Compare
I think we can use a generic way to implement this. |
When I developed #2134 |
#2188 should fixes this. |
Purpose
close #1942
The generation of tags is driven by checkpoints. Although no new data is written to trigger the creation of a new snapshot, tags can still be generated and point to the current latest snapshot.Tests
API and Format
Documentation