Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Sep 25, 2023
1 parent 6808759 commit e5f2474
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,56 +115,57 @@ private TagAutoCreation(
}

public void run() {
long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L);
LocalDateTime currentTime =
Instant.ofEpochMilli(System.currentTimeMillis())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();

while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
Snapshot snapshot = snapshotManager.snapshot(nextSnapshot);
processTags(snapshot, timeExtractor.extract(snapshot));
tryToTag(snapshotManager.snapshot(nextSnapshot));
nextSnapshot++;
} else {
Long earliest = snapshotManager.earliestSnapshotId();
if (earliest != null && earliest > nextSnapshot) {
nextSnapshot = earliest;
} else {
if (snapshotManager.snapshotExists(snapshotId)) {
processTags(snapshotManager.snapshot(snapshotId), Optional.of(currentTime));
}
break;
}
}
}

if (timeExtractor instanceof ProcessTimeExtractor) {
tryToTag(snapshotManager.snapshot(nextSnapshot - 1), LocalDateTime.now());
}
}

private void tryToTag(Snapshot snapshot) {
timeExtractor.extract(snapshot).ifPresent(time -> createTagIfNecessary(snapshot, time));
}

private void processTags(Snapshot snapshot, Optional<LocalDateTime> timeOptional) {
timeOptional.ifPresent(time -> createTags(snapshot, time));
pruneOldTags();
private void tryToTag(Snapshot snapshot, LocalDateTime time) {
createTagIfNecessary(snapshot, time);
}

private void createTags(Snapshot snapshot, LocalDateTime time) {
private void createTagIfNecessary(Snapshot snapshot, LocalDateTime time) {
if (nextTag == null
|| isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) {
LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
String tagName = periodHandler.timeToTag(thisTag);
tagManager.createTag(snapshot, tagName);
nextTag = periodHandler.nextTagTime(thisTag);
createTag(snapshot, time);
pruneOldTags();
}
}

private void createTag(Snapshot snapshot, LocalDateTime time) {
LocalDateTime thisTag = periodHandler.normalizeToTagTime(time);
String tagName = periodHandler.timeToTag(thisTag);
tagManager.createTag(snapshot, tagName);
nextTag = periodHandler.nextTagTime(thisTag);
}

private void pruneOldTags() {
if (numRetainedMax == null) {
return;
}
SortedMap<Snapshot, String> tags = tagManager.tags();
int tagsToDelete = tags.size() - numRetainedMax;
if (tagsToDelete > 0) {
tags.values().stream()
.limit(tagsToDelete)
.forEach(tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager));
if (numRetainedMax != null) {
SortedMap<Snapshot, String> tags = tagManager.tags();
int tagsToDelete = tags.size() - numRetainedMax;
if (tagsToDelete > 0) {
tags.values().stream()
.limit(tagsToDelete)
.forEach(tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)

Field commitField = storeCommitterClass.getDeclaredField(COMMIT_FIELD_NAME);
commitField.setAccessible(true);
Object commit = commitField.get(committer);

Field tagAutoCreationField =
tableCommitImplClass.getDeclaredField(TAG_AUTO_CREATION_FIELD_NAME);
tagAutoCreationField.setAccessible(true);
tagAutoCreation = (TagAutoCreation) tagAutoCreationField.get(commit);

if (committer != null) {
Object commit = commitField.get(committer);
Field tagAutoCreationField =
tableCommitImplClass.getDeclaredField(TAG_AUTO_CREATION_FIELD_NAME);
tagAutoCreationField.setAccessible(true);
tagAutoCreation = (TagAutoCreation) tagAutoCreationField.get(commit);
}
snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();
identifiersForTagsState =
Expand Down Expand Up @@ -153,7 +153,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (identifiersForTags.remove(checkpointId)) {
createTagForIdentifiers(Collections.singletonList(checkpointId));
}
if (tagAutoCreation != null) {
if (tagAutoCreation != null && snapshotManager.latestSnapshotId() != null) {
tagAutoCreation.run();
}
}
Expand Down

0 comments on commit e5f2474

Please sign in to comment.