-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): implement streaming part of now executor #6408
Conversation
24845d9
to
9100591
Compare
Codecov Report
@@ Coverage Diff @@
## main #6408 +/- ##
========================================
Coverage 73.29% 73.29%
========================================
Files 1012 1014 +2
Lines 162172 162331 +159
========================================
+ Hits 118862 118987 +125
- Misses 43310 43344 +34
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
we should use |
386cb81
to
ec1ddf4
Compare
done |
ce2b319
to
42a69db
Compare
42a69db
to
e0fc47c
Compare
6cb4643
to
de1d1bf
Compare
related #6472 |
29a6845
to
794cec8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
src/stream/src/executor/now.rs
Outdated
let mut last_timestamp = state_row.and_then(|row| row[0].clone()); | ||
|
||
while let Some(barrier) = barrier_receiver.recv().await { | ||
if !barrier.is_update() && !barrier.is_resume() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest maintaining a paused
state instead of relying on the assumption of "pause -> update -> resume`, in case we have other barriers inside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest maintaining a
paused
state instead of relying on the assumption of "pause -> update -> resume`, in case we have other barriers inside.
fixed
state_table.init_epoch(barrier.epoch); | ||
|
||
// The first barrier message should be propagated. | ||
yield Message::Barrier(barrier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don't emit the timestamp on the first barrier? Is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we don't emit the timestamp on the first barrier? Is this expected?
We can't because no message is permitted before the first barrier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about emitting the timestamp right after the barrier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about emitting the timestamp right after the barrier?
The timestamp will be updated on the second barrier so I think that it is unnecessary.
src/stream/src/executor/now.rs
Outdated
if last_timestamp.is_some() { | ||
let chunk_popped = data_chunk_builder | ||
.append_one_row_from_datums([&last_timestamp].into_iter()); | ||
debug_assert!(chunk_popped.is_none()); | ||
} | ||
let data_chunk = data_chunk_builder | ||
.append_one_row_from_datums([×tamp].into_iter()) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly use DataChunk::from_rows
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly use
DataChunk::from_rows
?
fixed
210f748
to
2fc2346
Compare
73cbf0f
to
2dfbafd
Compare
2dfbafd
to
cde35d6
Compare
Generally LGTM. I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter. |
There will be too much things in this PR:
The latter two cannot be placed in a separate PR before this because they need to use If it is recommended, I will put these into this PR. |
In multiple PRs, but merge at the same time? |
Do you mean that create multiple PRs and other PRs also include codes of |
Therefore let's work on this base PR: #6677 of following multiple PRs first. It defines interfaces of |
Actually, if we expect this PR is > 90% correct, then I don't see the harm of merging first, and then once we have e2e with dynamic filter, we can fix any bugs we encounter. And incremental change is easier to review. IMO Now executor is also independent of dynamic filter. wdyt @TennyZhuang ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* feat(stream): implement now executor (close #6407) * use if let * use barrier receiver * timestamp sanity check * add now executor builder * fix gen proto * rebase main Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
implement streaming part of now executor, which will return current time
now()
emit on most barrier
frontend part not done
Checklist
./risedev check
(or alias,./risedev c
)Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)
#6407