Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8726] s3/gcs incremental source should stick to checkpoint v1 #12688

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

Davis-Zhang-Onehouse
Copy link
Contributor

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse commented Jan 22, 2025

Reviewers please review commit by commit, checking the commit messages would save you great amount of time

Change Logs

Fixed and bump up massive test coverage for checkpoint and some missing coverage when related code is introduced

How the checkpoint is consumed

  public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
    // Retrieve the previous round checkpoints, if any. Consumption of checkpoint
    Option<Checkpoint> checkpointToResume = StreamerCheckpointUtils.getCheckpointToResumeFrom(commitsTimelineOpt, cfg, props);


  public static Option<Checkpoint> getCheckpointToResumeFrom(Option<HoodieTimeline> commitsTimelineOpt,
                                                             HoodieStreamer.Config streamerConfig,
                                                             TypedProperties props) throws IOException {
    Option<Checkpoint> checkpoint = Option.empty();
    if (commitsTimelineOpt.isPresent()) {
      checkpoint = getCheckpointToResumeString(commitsTimelineOpt, streamerConfig, props);
    }

    LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
    if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
      int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
      // TODO: we need to add data source class! Similarily, check all other places where we use this func as well!
      checkpoint = Option.of(CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
          ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
    }
    return checkpoint;
  }

How it is produced (from the deepest call stack to the caller at a higher level)

public class HoodieIncrSource extends RowSource {
  @Override
  public Pair<Option<Dataset<Row>>, Checkpoint> fetchNextBatch(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
    if (CheckpointUtils.targetCheckpointV2(writeTableVersion, getClass().getName())) {
      return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
    } else {
      return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit); <== as long as the branch condition is good, we should be good.
    }
  }

  @Override
  protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
    throw new UnsupportedOperationException("RowSource#fetchNewData should not be called");
  }

public abstract class RowSource extends Source<Dataset<Row>> {
  @Override
  protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
    throw new UnsupportedOperationException("RowSource#fetchNewData should not be called");
  }

  @Override
  protected final InputBatch<Dataset<Row>> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
    Pair<Option<Dataset<Row>>, Checkpoint> res = fetchNextBatch(lastCheckpoint, sourceLimit);
    return res.getKey().map(dsr -> {
      Dataset<Row> sanitizedRows = SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
      SchemaProvider rowSchemaProvider =
          UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), props, sparkContext);
      Dataset<Row> wrappedDf = HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, HoodieReadFromSourceException.class.getName(),
          "Failed to read from row source", ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS));
      return new InputBatch<>(Option.of(wrappedDf), res.getValue(), rowSchemaProvider);
    }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
  }


public abstract class Source<T> implements SourceCommitCallback, Serializable {
  public final InputBatch<T> fetchNext(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
    InputBatch<T> batch = readFromCheckpoint(translateCheckpoint(lastCheckpoint), sourceLimit);
    // If overriddenSchemaProvider is passed in CLI, use it
    return overriddenSchemaProvider == null ? batch
        : new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
  }

  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
  protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
    LOG.warn("In Hudi 1.0+, the checkpoint based on Hudi timeline is changed. "
        + "If your Source implementation relies on request time as the checkpoint, "
        + "you may consider migrating to completion time-based checkpoint by overriding "
        + "Source#translateCheckpoint and Source#fetchNewDataFromCheckpoint");
    return fetchNewData(
        lastCheckpoint.isPresent()
            ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty(),
        sourceLimit);
  }


  public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<Checkpoint> lastCheckpoint, long sourceLimit) {

  Pair<InputBatch, Boolean> fetchNextBatchFromSource(Option<Checkpoint> resumeCheckpoint, HoodieTableMetaClient metaClient) {

  public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
  private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
    if (!hasErrors || cfg.commitOnErrors) { 
      Map<String, String> checkpointCommitMetadata = <-------------- generate commit metadata
          !getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)
            ?
              inputBatch.getCheckpointForNextBatch() != null
              ?
                inputBatch.getCheckpointForNextBatch().getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)
                :
                    CheckpointUtils.targetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
                    ?
                        new StreamerCheckpointV2((String) null).getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)
                    :
                        new StreamerCheckpointV1((String) null).getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint)
            :
              Collections.emptyMap();

      boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds, Option.empty()); <-- write bindly of whatever kv in the checkpointCommitMetadata.
   

Impact

as title mentioned

Risk level (write none, low medium or high below)

low

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

Apart from basic unit test, we need e2e functional test of the s3 / gcs source. I can only think of writing IT test coverage.

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jan 22, 2025
@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:M PR with lines of changes in (100, 300] labels Jan 23, 2025
@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-8726-s3gcs branch 6 times, most recently from 33e010e to 4fc962e Compare January 23, 2025 23:24
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source code changes looks good to me.
while I review test code, reminding @yihua to review the logic and certify

Pure refactoring of the test classes - moving some method to the parent
class so the subsequent commits can use them. Extracting methods to a
new class so it can be consumed by later changes.
For testing ingestion flow e2e to ensure we consume the right checkpoint
version and write out the right checkpoint version, we split the
coverage into 2 parts:
- Test anything interacting with S3/GCS incremental source. This is done
  by introducing a dummy class as an injected dependency so we can do
validations and trigger the ingestion code covering the e2e behavior
- Test S3/GCS incremental source itself, this is done by existing unit
  test against the class, they have done the testing already about the
relevant code part.
@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-8726-s3gcs branch 3 times, most recently from 0c644d8 to 00653bc Compare January 27, 2025 01:21
Previously in order for EMPTY_ROW_SET_NONE_NULL_CKP to return different
value, we create different BiFunction with hard coded value. Now it is
parameterized via RETURN_CHECKPOINT_KEY.

Also moved some constant member to the right util class.
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants