diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java index 3eb36620286be..999aa57482a17 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java @@ -29,7 +29,10 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1; @@ -39,6 +42,10 @@ public class CheckpointUtils { + public static final Set CKPV2_DATASOURCE_DISALLOW_LIST = new HashSet<>(Arrays.asList( + "org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource", + "org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource" + )); public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2)) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) { @@ -51,8 +58,9 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); } - public static boolean targetCheckpointV2(int writeTableVersion) { - return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode(); + public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) { + return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode() + && !CKPV2_DATASOURCE_DISALLOW_LIST.contains(sourceClassName); } // TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java index 89bed5cd47157..f20f7ae5d6473 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java @@ -31,6 +31,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import java.util.stream.Stream; @@ -190,4 +192,21 @@ public void testConvertCheckpointWithUseTransitionTime() { assertEquals(completionTime, translatedCheckpoint.getCheckpointKey()); } + + @ParameterizedTest + @CsvSource({ + // version, sourceClassName, expectedResult + // Version >= 8 with allowed sources should return true + "8, org.apache.hudi.utilities.sources.TestSource, true", + "9, org.apache.hudi.utilities.sources.AnotherSource, true", + // Version < 8 should return false regardless of source + "7, org.apache.hudi.utilities.sources.TestSource, false", + "6, org.apache.hudi.utilities.sources.AnotherSource, false", + // Disallowed sources should return false even with version >= 8 + "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false", + "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false" + }) + public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) { + assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName)); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala index 32a97f3fd7c1d..e683521909c1a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala @@ -187,7 +187,8 @@ class HoodieStreamSourceV1(sqlContext: SQLContext, private def translateCheckpoint(commitTime: String): String = { if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { - CheckpointUtils.convertToCheckpointV2ForCommitTime(handlingMode = handlingMode).getCheckpointKey + CheckpointUtils.convertToCheckpointV2ForCommitTime( + new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey } else { commitTime } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala index f825aa7e186f7..2f3436a1c1e2e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala @@ -163,7 +163,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext, } private def translateCheckpoint(commitTime: String): String = { - if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion.versionCode(), getClass.getName)) { commitTime } else { CheckpointUtils.convertToCheckpointV1ForCommitTime( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 6a98c9442cc37..ed7e938372432 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -190,7 +190,7 @@ protected Option translateCheckpoint(Option lastCheckpoi @Override public Pair>, Checkpoint> fetchNextBatch(Option lastCheckpoint, long sourceLimit) { - if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) { return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit); } else { return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 754c9e9fe607f..39e03596867d4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -104,7 +104,7 @@ protected Option translateCheckpoint(Option lastCheckpoi if (lastCheckpoint.isEmpty()) { return Option.empty(); } - if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) { + if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) { // V2 -> V2 if (lastCheckpoint.get() instanceof StreamerCheckpointV2) { return lastCheckpoint; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index bb408753374f9..10176f9ea8b19 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -139,6 +140,7 @@ import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE; import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH; import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING; +import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.targetCheckpointV2; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING; @@ -542,7 +544,7 @@ private Option getLastPendingCompactionInstant(Option co * @throws Exception in case of any Exception */ public Pair readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException { - // Retrieve the previous round checkpoints, if any + // Retrieve the previous round checkpoints, if any. Consumption of checkpoint Option checkpointToResume = StreamerCheckpointUtils.getCheckpointToResumeFrom(commitsTimelineOpt, cfg, props); LOG.info("Checkpoint to resume from : " + checkpointToResume); @@ -810,12 +812,18 @@ private Pair, JavaRDD> writeToSinkAndDoMetaSync(Stri if (!hasErrors || cfg.commitOnErrors) { Map checkpointCommitMetadata = !getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP) - ? inputBatch.getCheckpointForNextBatch() != null - ? inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata( - cfg.checkpoint, cfg.ignoreCheckpoint) - : new StreamerCheckpointV2((String) null).getCheckpointCommitMetadata( - cfg.checkpoint, cfg.ignoreCheckpoint) - : Collections.emptyMap(); + ? + inputBatch.getCheckpointForNextBatch() != null + ? + inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint) + : + targetCheckpointV2(writeClient.getConfig().getWriteVersion().versionCode(), cfg.sourceClassName) + ? + new StreamerCheckpointV2((String) null).getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint) + : + new StreamerCheckpointV1((String) null).getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint) + : + Collections.emptyMap(); if (hasErrors) { LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java index c4af4385a335c..b1961e532e622 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java @@ -64,7 +64,7 @@ public static Option getCheckpointToResumeFrom(Option getCheckpointToResumeString(Option com resumeCheckpoint = Option.empty(); } else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey()) || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) { - resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) + resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint)); } else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) { //if previous checkpoint is an empty string, skip resume use Option.empty() @@ -124,7 +124,7 @@ static Option getCheckpointToResumeString(Option com } } else if (streamerConfig.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set. - resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion) + resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName) ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint)); } }