From e5f247419a2c26d27ce56e8e46de98d0984b4b56 Mon Sep 17 00:00:00 2001 From: chenzy15 Date: Fri, 15 Sep 2023 00:04:12 +0800 Subject: [PATCH] Fix the review content --- .../apache/paimon/tag/TagAutoCreation.java | 57 ++++++++++--------- .../AutoTagForSavepointCommitterOperator.java | 16 +++--- 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 17f94cdd5ff1..1fa7938cf570 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -115,56 +115,57 @@ private TagAutoCreation( } public void run() { - long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L); - LocalDateTime currentTime = - Instant.ofEpochMilli(System.currentTimeMillis()) - .atZone(ZoneId.systemDefault()) - .toLocalDateTime(); - while (true) { if (snapshotManager.snapshotExists(nextSnapshot)) { - Snapshot snapshot = snapshotManager.snapshot(nextSnapshot); - processTags(snapshot, timeExtractor.extract(snapshot)); + tryToTag(snapshotManager.snapshot(nextSnapshot)); nextSnapshot++; } else { Long earliest = snapshotManager.earliestSnapshotId(); if (earliest != null && earliest > nextSnapshot) { nextSnapshot = earliest; } else { - if (snapshotManager.snapshotExists(snapshotId)) { - processTags(snapshotManager.snapshot(snapshotId), Optional.of(currentTime)); - } break; } } } + + if (timeExtractor instanceof ProcessTimeExtractor) { + tryToTag(snapshotManager.snapshot(nextSnapshot - 1), LocalDateTime.now()); + } + } + + private void tryToTag(Snapshot snapshot) { + timeExtractor.extract(snapshot).ifPresent(time -> createTagIfNecessary(snapshot, time)); } - private void processTags(Snapshot snapshot, Optional timeOptional) { - timeOptional.ifPresent(time -> createTags(snapshot, time)); - pruneOldTags(); + private void tryToTag(Snapshot snapshot, LocalDateTime time) { + createTagIfNecessary(snapshot, time); } - private void createTags(Snapshot snapshot, LocalDateTime time) { + private void createTagIfNecessary(Snapshot snapshot, LocalDateTime time) { if (nextTag == null || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { - LocalDateTime thisTag = periodHandler.normalizeToTagTime(time); - String tagName = periodHandler.timeToTag(thisTag); - tagManager.createTag(snapshot, tagName); - nextTag = periodHandler.nextTagTime(thisTag); + createTag(snapshot, time); + pruneOldTags(); } } + private void createTag(Snapshot snapshot, LocalDateTime time) { + LocalDateTime thisTag = periodHandler.normalizeToTagTime(time); + String tagName = periodHandler.timeToTag(thisTag); + tagManager.createTag(snapshot, tagName); + nextTag = periodHandler.nextTagTime(thisTag); + } + private void pruneOldTags() { - if (numRetainedMax == null) { - return; - } - SortedMap tags = tagManager.tags(); - int tagsToDelete = tags.size() - numRetainedMax; - if (tagsToDelete > 0) { - tags.values().stream() - .limit(tagsToDelete) - .forEach(tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager)); + if (numRetainedMax != null) { + SortedMap tags = tagManager.tags(); + int tagsToDelete = tags.size() - numRetainedMax; + if (tagsToDelete > 0) { + tags.values().stream() + .limit(tagsToDelete) + .forEach(tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager)); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 7a4dc02fb15d..c83ddb18f9e2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -109,13 +109,13 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) Field commitField = storeCommitterClass.getDeclaredField(COMMIT_FIELD_NAME); commitField.setAccessible(true); - Object commit = commitField.get(committer); - - Field tagAutoCreationField = - tableCommitImplClass.getDeclaredField(TAG_AUTO_CREATION_FIELD_NAME); - tagAutoCreationField.setAccessible(true); - tagAutoCreation = (TagAutoCreation) tagAutoCreationField.get(commit); - + if (committer != null) { + Object commit = commitField.get(committer); + Field tagAutoCreationField = + tableCommitImplClass.getDeclaredField(TAG_AUTO_CREATION_FIELD_NAME); + tagAutoCreationField.setAccessible(true); + tagAutoCreation = (TagAutoCreation) tagAutoCreationField.get(commit); + } snapshotManager = snapshotManagerFactory.get(); tagManager = tagManagerFactory.get(); identifiersForTagsState = @@ -153,7 +153,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { if (identifiersForTags.remove(checkpointId)) { createTagForIdentifiers(Collections.singletonList(checkpointId)); } - if (tagAutoCreation != null) { + if (tagAutoCreation != null && snapshotManager.latestSnapshotId() != null) { tagAutoCreation.run(); } }