Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Auto create new tag for flink checkpoint #1949

Closed
wants to merge 5 commits into from

Conversation

MonsterChenzhuo
Copy link
Contributor

@MonsterChenzhuo MonsterChenzhuo commented Sep 6, 2023

Purpose

close #1942

图片 The generation of tags is driven by checkpoints. Although no new data is written to trigger the creation of a new snapshot, tags can still be generated and point to the current latest snapshot.

Tests

API and Format

Documentation

@MonsterChenzhuo MonsterChenzhuo changed the title [WIP][flink] auto create new tag for flink checkpoint [flink] auto create new tag for flink checkpoint Sep 6, 2023
@MonsterChenzhuo MonsterChenzhuo changed the title [flink] auto create new tag for flink checkpoint [flink] Auto create new tag for flink checkpoint Sep 7, 2023
@@ -257,6 +257,11 @@ Paimon's snapshot is similar to flink's checkpoint, and both will automatically
allows snapshots to be retained for a long time. Therefore, we can combine the two features of paimon's tag and flink's
savepoint to achieve incremental recovery of job from the specified savepoint.

At the same time to open the change parameter will also promote the automatic creation of the tag, now although open the "tag.automatic-creation" parameter but,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this sentence.. there is no sink.savepoint.auto-tag here.

Document after You can set sink.savepoint.auto-tagtotrue to enable the feature of automatically creating tags for savepoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -257,6 +257,11 @@ Paimon's snapshot is similar to flink's checkpoint, and both will automatically
allows snapshots to be retained for a long time. Therefore, we can combine the two features of paimon's tag and flink's
savepoint to achieve incremental recovery of job from the specified savepoint.

At the same time to open the change parameter will also promote the automatic creation of the tag, now although open the "tag.automatic-creation" parameter but,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just simplify documentation. For example: This option will also force creating a Tag for the tag.automatic-creation=process-time mode at an appropriate time even when no data is written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

private static final long serialVersionUID = 1L;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't remove this empty line, this is just personal code style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -61,7 +61,9 @@ public Path tagPath(String tagName) {
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
if (tagExists(tagName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -115,10 +114,20 @@ private TagAutoCreation(
}
}

public void refreshTags() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not add new method for TagAutoCreation.
We should just invoke run.

We can create a method:

private void tryToTag(Snapshot snapshot, LocalDateTime time) {
        if (nextTag == null
                || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) {
            LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
            String tagName = periodHandler.timeToTag(thisTag);
            tagManager.createTag(snapshot, tagName);
            nextTag = periodHandler.nextTagTime(thisTag);

            if (numRetainedMax != null) {
                SortedMap<Snapshot, String> tags = tagManager.tags();
                if (tags.size() > numRetainedMax) {
                    int toDelete = tags.size() - numRetainedMax;
                    int i = 0;
                    for (String tag : tags.values()) {
                        tagManager.deleteTag(tag, tagDeletion, snapshotManager);
                        i++;
                        if (i == toDelete) {
                            break;
                        }
                    }
                }
            }
        }
    }

And in run:

    public void run() {
        while (true) {
            if (snapshotManager.snapshotExists(nextSnapshot)) {
                tryToTag(snapshotManager.snapshot(nextSnapshot));
                nextSnapshot++;
            } else {
                // avoid snapshot has been expired
                Long earliest = snapshotManager.earliestSnapshotId();
                if (earliest != null && earliest > nextSnapshot) {
                    nextSnapshot = earliest;
                } else {
                    break;
                }
            }
        }

        if (timeExtractor instanceof ProcessTimeExtractor) {
            tryToTag(snapshotManager.snapshot(nextSnapshot - 1), LocalDateTime.now());
        }
    }

Something like above codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -94,7 +92,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
} finally {
snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();

tagAutoCreation = tagAutoCreationFactory.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can try to extract tagAutoCreation from CommitterOperator. It is better to maintain only one object for in the committer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@MonsterChenzhuo MonsterChenzhuo changed the title [flink] Auto create new tag for flink checkpoint [WIP][flink] Auto create new tag for flink checkpoint Sep 14, 2023
@MonsterChenzhuo MonsterChenzhuo changed the title [WIP][flink] Auto create new tag for flink checkpoint [flink] Auto create new tag for flink checkpoint Sep 14, 2023
@MonsterChenzhuo MonsterChenzhuo changed the title [flink] Auto create new tag for flink checkpoint [WIP][flink] Auto create new tag for flink checkpoint Sep 14, 2023
@MonsterChenzhuo MonsterChenzhuo changed the title [WIP][flink] Auto create new tag for flink checkpoint [flink] Auto create new tag for flink checkpoint Sep 14, 2023
@MonsterChenzhuo MonsterChenzhuo changed the title [flink] Auto create new tag for flink checkpoint [WIP][flink] Auto create new tag for flink checkpoint Sep 20, 2023
@MonsterChenzhuo
Copy link
Contributor Author

图片 @JingsongLi We conducted a 10-day online test, and the results met our expectations.

@MonsterChenzhuo MonsterChenzhuo changed the title [WIP][flink] Auto create new tag for flink checkpoint [flink] Auto create new tag for flink checkpoint Sep 26, 2023
@JingsongLi
Copy link
Contributor

I think we can use a generic way to implement this.
For example, if the extractor is ProcessTimeExtractor in TagAutoCreation, we can launch a async thread to check tag creation.

@JingsongLi
Copy link
Contributor

When I developed #2134
I found that snapshots are our fundamental solution, and another way to solve this problem is to force the creation of snapshots for tables (in this configuration case, or add a new configuration option), which is a relatively simple thing.

@JingsongLi
Copy link
Contributor

#2188 should fixes this.

@JingsongLi JingsongLi closed this Oct 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] auto create new tag for flink checkpoint
2 participants