-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): batch group chunks for dml #13872
Conversation
Will test performance in cloud before get merged |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #13872 +/- ##
==========================================
- Coverage 68.04% 67.96% -0.08%
==========================================
Files 1536 1536
Lines 265364 265375 +11
==========================================
- Hits 180557 180368 -189
- Misses 84807 85007 +200
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
src/stream/src/executor/dml.rs
Outdated
// txn buffer isn't small, so yield. | ||
for chunk in txn_buffer.vec.drain(..) { | ||
yield Message::Chunk(chunk); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we concatenate them before yielding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data from the txn_buffer
should have already tried its best to concat its chunk, so I think we don't need to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about small chunks from different transactions? Oh, chunks in the same buffer always come from the same transaction.
src/stream/src/executor/dml.rs
Outdated
if txn_buffer_cardinality < batch_group_cardinality { | ||
mem::swap(&mut txn_buffer.vec, &mut batch_group); | ||
} | ||
let concat_chunk = StreamChunk::concat(txn_buffer.vec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there're remaining spaces in the "txn buffer", should we also yield some chunks from the "batch group" here?
Can we utilize StreamChunkBuilder
for doing this all stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use that space I am afraid it would make small transaction data interleaved with each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. Will this lead to problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends on the downstream table conflicts handling logic. I think the default policy is the first writer win, so it is possible to cause some undetermined behavior under concurrent writes. Prefer to keep this invariant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. However if I understand correctly, records written by transactions from different sessions are already interleaved by the shuffle in the downstream. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the hash shuffle in the downstream is unavoidable, but I don't want to introduce another variable.
Co-authored-by: Bugen Zhao <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rest LGTM
// txn buffer is small and batch group has no space, so yield the large one. | ||
if txn_buffer_cardinality < batch_group_cardinality { | ||
mem::swap(&mut txn_buffer.vec, &mut batch_group); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the inserted records might be reordered here, which seems to be inconsistent with this discussion? #13872 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current implementation at least guarantees that data from the same transaction would be sent to downstream at the same time. Otherwise, part of data from the same transaction would be sent to downstream at first and the other part is kept in the batch_group
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, sounds acceptable to me
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.