Skip to content

Commit

Permalink
S3/GCS incremental source should stick to ckp v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Jan 22, 2025
1 parent 6cf27af commit 75369c9
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
Expand All @@ -39,6 +42,10 @@

public class CheckpointUtils {

public static final Set<String> CKPV2_DATASOURCE_DISALLOW_LIST = new HashSet<>(Arrays.asList(
"org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource"
));
public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
|| !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) {
Expand All @@ -51,8 +58,9 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}

public static boolean targetCheckpointV2(int writeTableVersion) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode();
public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
&& !CKPV2_DATASOURCE_DISALLOW_LIST.contains(sourceClassName);
}

// TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.stream.Stream;

Expand Down Expand Up @@ -190,4 +192,21 @@ public void testConvertCheckpointWithUseTransitionTime() {

assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}

@ParameterizedTest
@CsvSource({
// version, sourceClassName, expectedResult
// Version >= 8 with allowed sources should return true
"8, org.apache.hudi.utilities.sources.TestSource, true",
"9, org.apache.hudi.utilities.sources.AnotherSource, true",
// Version < 8 should return false regardless of source
"7, org.apache.hudi.utilities.sources.TestSource, false",
"6, org.apache.hudi.utilities.sources.AnotherSource, false",
// Disallowed sources should return false even with version >= 8
"8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
"8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false"
})
public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) {
assertEquals(expected, CheckpointUtils.targetCheckpointV2(version, sourceClassName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,

private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
CheckpointUtils.convertToCheckpointV2ForCommitTime(handlingMode = handlingMode).getCheckpointKey
CheckpointUtils.convertToCheckpointV2ForCommitTime(
new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey
} else {
commitTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
}

private def translateCheckpoint(commitTime: String): String = {
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion.versionCode(), getClass.getName)) {
commitTime
} else {
CheckpointUtils.convertToCheckpointV1ForCommitTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi

@Override
public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
} else {
return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoi
if (lastCheckpoint.isEmpty()) {
return Option.empty();
}
if (CheckpointUtils.targetCheckpointV2(writeTableVersion)) {
if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
// V2 -> V2
if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
return lastCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -139,6 +140,7 @@
import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.targetCheckpointV2;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
Expand Down Expand Up @@ -542,7 +544,7 @@ private Option<String> getLastPendingCompactionInstant(Option<HoodieTimeline> co
* @throws Exception in case of any Exception
*/
public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
// Retrieve the previous round checkpoints, if any. Consumption of checkpoint
Option<Checkpoint> checkpointToResume = StreamerCheckpointUtils.getCheckpointToResumeFrom(commitsTimelineOpt, cfg, props);
LOG.info("Checkpoint to resume from : " + checkpointToResume);

Expand Down Expand Up @@ -810,12 +812,18 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
if (!hasErrors || cfg.commitOnErrors) {
Map<String, String> checkpointCommitMetadata =
!getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)
? inputBatch.getCheckpointForNextBatch() != null
? inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata(
cfg.checkpoint, cfg.ignoreCheckpoint)
: new StreamerCheckpointV2((String) null).getCheckpointCommitMetadata(
cfg.checkpoint, cfg.ignoreCheckpoint)
: Collections.emptyMap();
?
inputBatch.getCheckpointForNextBatch() != null
?
inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)
:
targetCheckpointV2(writeClient.getConfig().getWriteVersion().versionCode(), cfg.sourceClassName)
?
new StreamerCheckpointV2((String) null).getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)
:
new StreamerCheckpointV1((String) null).getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)
:
Collections.emptyMap();

if (hasErrors) {
LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Option<Checkpoint> getCheckpointToResumeFrom(Option<HoodieTimeline
LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
}
return checkpoint;
Expand Down Expand Up @@ -106,7 +106,7 @@ static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> com
resumeCheckpoint = Option.empty();
} else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey())
|| !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) {
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
} else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
Expand All @@ -124,7 +124,7 @@ static Option<Checkpoint> getCheckpointToResumeString(Option<HoodieTimeline> com
}
} else if (streamerConfig.checkpoint != null) {
// getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion)
resumeCheckpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
}
}
Expand Down

0 comments on commit 75369c9

Please sign in to comment.