Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): respect max chunk size in source connector parser #19698

Merged
merged 14 commits into from
Dec 19, 2024

Conversation

stdrc
Copy link
Member

@stdrc stdrc commented Dec 6, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In SourceStreamChunkBuilder, with the information of SourceCtrlOps, we can actually restrict the chunk size early during chunk building, instead emitting large chunks first and cutting them into small ones later in apply_rate_limit. This has two benefits:

  1. Reduce the chunk splitting cost in apply_rate_limit used by FetchExecutor, FsSourceExecutor, SourceBackfillExecutor and SourceExecutor.
  2. Source connectors and parsers can utilize their better understanding of the payload (e.g. transaction information), to avoid blindly cutting chunks at random positions.

Main changes happened in this PR:

  1. Change SourceCtrlOpts::rate_limit: Option<u32> to split_txn: bool.

    This is because we already pass in a limited chunk size (basically min(stream_chunk_size, rate_limit_burst)) via SourceCtrlOpts::chunk_size. We still need split_txn to control whether we need to cut chunks during (CDC) transactions, as concluded here.

  2. Simplify SourceStreamChunkRowWriter, by directly store a reference to SourceStreamChunkBuilder inside it.

  3. Rewrite SourceStreamChunkBuilder to allow creating chunk during writing to SourceStreamChunkRowWriter. The trick is to add a ready_chunks field.

    This is because that it's possible to parse multiple rows from one SourceMessage, while the parser API doesn't use async stream to yield rows, instead, it directly write to SourceStreamChunkRowWriter.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@stdrc stdrc changed the title change SourceCtrlOpts::rate_limit to split_txn: bool refactor(source): change SourceCtrlOpts::rate_limit to split_txn: bool Dec 6, 2024
@stdrc stdrc changed the title refactor(source): change SourceCtrlOpts::rate_limit to split_txn: bool refactor(source): Dec 6, 2024
@stdrc stdrc changed the title refactor(source): refactor(source): respect max chunk size in Dec 6, 2024
@stdrc stdrc changed the title refactor(source): respect max chunk size in refactor(source): respect max chunk size in source connector parser Dec 6, 2024
@stdrc stdrc force-pushed the rc/merge-hb-and-data-chunk-for-cdc branch from a8d2c05 to 185defc Compare December 6, 2024 08:28
@stdrc stdrc force-pushed the rc/respect-chunk-size-limit branch from d3af95f to 46e5995 Compare December 6, 2024 08:29
Base automatically changed from rc/merge-hb-and-data-chunk-for-cdc to main December 6, 2024 17:35
@stdrc stdrc force-pushed the rc/respect-chunk-size-limit branch 2 times, most recently from 5f97579 to c7cdfb5 Compare December 11, 2024 08:56
@tabVersion
Copy link
Contributor

any context about the refactor?

@stdrc
Copy link
Member Author

stdrc commented Dec 12, 2024

any context about the refactor?

still WIP, will add description when it's ready

@stdrc stdrc force-pushed the rc/respect-chunk-size-limit branch 2 times, most recently from 3f2f0a0 to 1d9e42e Compare December 17, 2024 06:58
@stdrc stdrc marked this pull request as ready for review December 17, 2024 07:07
@stdrc stdrc requested a review from a team as a code owner December 17, 2024 07:07
@graphite-app graphite-app bot requested a review from a team December 17, 2024 07:31
@BugenZhao BugenZhao self-requested a review December 18, 2024 02:45
@stdrc stdrc force-pushed the rc/respect-chunk-size-limit branch from 9fc46c4 to 10e0ea4 Compare December 18, 2024 02:48
@stdrc stdrc force-pushed the rc/respect-chunk-size-limit branch from eeeebb4 to 486f99c Compare December 18, 2024 06:09
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

if let Some(ref mut txn) = self.ongoing_txn {
txn.len += 1;

if txn.len >= MAX_TRANSACTION_SIZE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a warning log before. Shall we keep it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning log is moved to finish_current_chunk, so that we won't miss any call site of finish_current_chunk that can cause an interrupt of transaction.

let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
/// Consumes and returns the ready [`StreamChunk`]s.
pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrite SourceStreamChunkBuilder to allow creating chunk during writing to SourceStreamChunkRowWriter. The trick is to add a ready_chunks field.

This is because that it's possible to parse multiple rows from one SourceMessage, while the parser API doesn't use async stream to yield rows, instead, it directly write to SourceStreamChunkRowWriter.

I understand the motivation to return multiple chunks, but what's reason of separating finish() into finish_current_chunk() + consume_ready_chunks()? I mean, if you just want multiple chunks, changing finish() to return Vec<StreamChunk> can achieve that.

Copy link
Member Author

@stdrc stdrc Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because sometimes we need to call consume_ready_chunks separately even we (the user of SourceStreamChunkBuilder) don't call finish_current_chunk explicitly before it. For example 1) in the arm ParseResult::Rows of match parser.parse_one_with_txn(); 2) after chunk_builder.commit_transaction(id).

stdrc added 14 commits December 18, 2024 17:32
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good

builders: Vec<ArrayBuilderImpl>,
op_builder: Vec<Op>,
vis_builder: BitmapBuilder,
ongoing_txn: Option<Transaction>,
ready_chunks: SmallVec<[StreamChunk; 1]>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SmallVec looks good.

Can you please elaborate in which (unusual) case we'll have multiple ready chunks stashed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When chunk_size is limited to 1, and somehow we parsed 3 rows from one SourceMessage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May update the doc comment to reflect it would build multiple chunks instead of only one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment for the chunk builder in 307ebc2

@stdrc stdrc added this pull request to the merge queue Dec 19, 2024
@stdrc stdrc removed this pull request from the merge queue due to a manual request Dec 19, 2024
@stdrc stdrc force-pushed the rc/respect-chunk-size-limit branch from d986091 to b1a892c Compare December 19, 2024 05:34
@stdrc stdrc enabled auto-merge December 19, 2024 05:53
@stdrc stdrc added this pull request to the merge queue Dec 19, 2024
auto-merge was automatically disabled December 19, 2024 07:08

Pull Request is not mergeable

Merged via the queue into main with commit f3d8e0d Dec 19, 2024
31 of 33 checks passed
@stdrc stdrc deleted the rc/respect-chunk-size-limit branch December 19, 2024 07:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants