Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Sep 14, 2023
1 parent df865ed commit 7c73f02
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ public PartitionExpire newPartitionExpire(String commitUser) {
@Override
@Nullable
public TagAutoCreation newTagCreationManager() {
TagAutoCreation.Singleton.INSTANCE.init(
return TagAutoCreation.create(
options, snapshotManager(), newTagManager(), newTagDeletion());
return TagAutoCreation.Singleton.INSTANCE.getInstance();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private TagAutoCreation(
}

public void run() {
Long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L);
long snapshotId = Optional.ofNullable(snapshotManager.latestSnapshotId()).orElse(1L);
LocalDateTime currentTime =
Instant.ofEpochMilli(System.currentTimeMillis())
.atZone(ZoneId.systemDefault())
Expand Down Expand Up @@ -308,35 +308,6 @@ protected DateTimeFormatter formatter() {
}
}

/**
* `Singleton` is an enumeration that implements the singleton pattern to manage the
* initialization and access to a single instance of `TagAutoCreation`. This ensures that there
* is only one `TagAutoCreation` instance throughout the application, helping to manage
* resources and maintain consistency.
*
* <p>The enum has a single element INSTANCE which holds the singleton instance of
* `TagAutoCreation`.
*/
public enum Singleton {
INSTANCE;

private TagAutoCreation tagAutoCreation;

public void init(
CoreOptions options,
SnapshotManager snapshotManager,
TagManager tagManager,
TagDeletion tagDeletion) {
if (tagAutoCreation == null) {
tagAutoCreation = create(options, snapshotManager, tagManager, tagDeletion);
}
}

public TagAutoCreation getInstance() {
return tagAutoCreation;
}
}

@Nullable
public static TagAutoCreation create(
CoreOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -62,6 +64,10 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
BoundedOneInput {
@VisibleForTesting public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";

private static final String COMMIT_FIELD_NAME = "commit";

private static final String TAG_AUTO_CREATION_FIELD_NAME = "tagAutoCreation";

private static final long serialVersionUID = 1L;

private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
Expand All @@ -70,8 +76,6 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>

private final SerializableSupplier<TagManager> tagManagerFactory;

private final SerializableSupplier<TagAutoCreation> tagAutoCreationFactory;

private final Set<Long> identifiersForTags;

private SnapshotManager snapshotManager;
Expand All @@ -85,12 +89,10 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
public AutoTagForSavepointCommitterOperator(
CommitterOperator<CommitT, GlobalCommitT> commitOperator,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagAutoCreation> tagAutoCreationFactory) {
SerializableSupplier<TagManager> tagManagerFactory) {
this.commitOperator = commitOperator;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagAutoCreationFactory = tagAutoCreationFactory;
this.identifiersForTags = new HashSet<>();
}

Expand All @@ -100,9 +102,22 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
try {
commitOperator.initializeState(streamTaskStateManager);
} finally {
Committer<CommitT, GlobalCommitT> committer = commitOperator.committer;

Class<?> storeCommitterClass = StoreCommitter.class;
Class<?> tableCommitImplClass = TableCommitImpl.class;

Field commitField = storeCommitterClass.getDeclaredField(COMMIT_FIELD_NAME);
commitField.setAccessible(true);
Object commit = commitField.get(committer);

Field tagAutoCreationField =
tableCommitImplClass.getDeclaredField(TAG_AUTO_CREATION_FIELD_NAME);
tagAutoCreationField.setAccessible(true);
tagAutoCreation = (TagAutoCreation) tagAutoCreationField.get(commit);

snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();
tagAutoCreation = tagAutoCreationFactory.get();
identifiersForTagsState =
commitOperator
.getOperatorStateBackend()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
Expand Down Expand Up @@ -202,9 +200,7 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
new AutoTagForSavepointCommitterOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
table::tagManager);
}
SingleOutputStreamOperator<?> committed =
written.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.ThrowingConsumer;

import org.apache.flink.core.execution.SavepointFormatType;
Expand Down Expand Up @@ -163,9 +161,7 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
(CommitterOperator<Committable, ManifestCommittable>)
super.createCommitterOperator(table, commitUser, committableStateManager),
table::snapshotManager,
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
table::tagManager);
}

@Override
Expand All @@ -179,8 +175,6 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
super.createCommitterOperator(
table, commitUser, committableStateManager, initializeFunction),
table::snapshotManager,
table::tagManager,
(SerializableSupplier<TagAutoCreation>)
() -> table.store().newTagCreationManager());
table::tagManager);
}
}

0 comments on commit 7c73f02

Please sign in to comment.