Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): implement streaming part of now executor #6408

Merged
merged 8 commits into from
Dec 2, 2022
Merged

Conversation

soundOfDestiny
Copy link
Contributor

@soundOfDestiny soundOfDestiny commented Nov 16, 2022

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Summarize your change (mandatory)
    implement streaming part of now executor, which will return current time now()
  • How does this PR work? Need a brief introduction for the changed logic (optional)
    emit on most barrier
  • Describe any limitations of the current code (optional)
    frontend part not done

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.

Refer to a related PR or issue link (optional)

#6407

@codecov
Copy link

codecov bot commented Nov 16, 2022

Codecov Report

Merging #6408 (fe1aeb9) into main (5a0fad1) will increase coverage by 0.00%.
The diff coverage is 79.37%.

@@           Coverage Diff            @@
##             main    #6408    +/-   ##
========================================
  Coverage   73.29%   73.29%            
========================================
  Files        1012     1014     +2     
  Lines      162172   162331   +159     
========================================
+ Hits       118862   118987   +125     
- Misses      43310    43344    +34     
Flag Coverage Δ
rust 73.29% <79.37%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/stream/src/from_proto/mod.rs 0.00% <ø> (ø)
src/stream/src/from_proto/now.rs 0.00% <0.00%> (ø)
src/stream/src/executor/mod.rs 49.07% <78.57%> (+1.13%) ⬆️
src/stream/src/executor/now.rs 91.05% <91.05%> (ø)
src/common/src/util/epoch.rs 90.32% <100.00%> (+0.32%) ⬆️
src/source/src/row_id.rs 91.01% <0.00%> (-1.13%) ⬇️
src/common/src/field_generator/mod.rs 80.10% <0.00%> (-0.54%) ⬇️
src/stream/src/executor/aggregation/minput.rs 96.49% <0.00%> (+0.10%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@soundOfDestiny
Copy link
Contributor Author

we should use barrier_receiver

@soundOfDestiny soundOfDestiny marked this pull request as ready for review November 16, 2022 15:33
@soundOfDestiny
Copy link
Contributor Author

we should use barrier_receiver

done

@TennyZhuang
Copy link
Contributor

related #6472

@soundOfDestiny soundOfDestiny force-pushed the zl_now branch 3 times, most recently from 29a6845 to 794cec8 Compare November 22, 2022 11:28
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

let mut last_timestamp = state_row.and_then(|row| row[0].clone());

while let Some(barrier) = barrier_receiver.recv().await {
if !barrier.is_update() && !barrier.is_resume() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest maintaining a paused state instead of relying on the assumption of "pause -> update -> resume`, in case we have other barriers inside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest maintaining a paused state instead of relying on the assumption of "pause -> update -> resume`, in case we have other barriers inside.

fixed

state_table.init_epoch(barrier.epoch);

// The first barrier message should be propagated.
yield Message::Barrier(barrier);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't emit the timestamp on the first barrier? Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't emit the timestamp on the first barrier? Is this expected?

We can't because no message is permitted before the first barrier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about emitting the timestamp right after the barrier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about emitting the timestamp right after the barrier?

The timestamp will be updated on the second barrier so I think that it is unnecessary.

Comment on lines 111 to 118
if last_timestamp.is_some() {
let chunk_popped = data_chunk_builder
.append_one_row_from_datums([&last_timestamp].into_iter());
debug_assert!(chunk_popped.is_none());
}
let data_chunk = data_chunk_builder
.append_one_row_from_datums([&timestamp].into_iter())
.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly use DataChunk::from_rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly use DataChunk::from_rows?

fixed

@soundOfDestiny soundOfDestiny force-pushed the zl_now branch 3 times, most recently from 73cbf0f to 2dfbafd Compare November 24, 2022 08:38
@TennyZhuang
Copy link
Contributor

Generally LGTM.

I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter.

@soundOfDestiny
Copy link
Contributor Author

soundOfDestiny commented Nov 30, 2022

Generally LGTM.

I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter.

There will be too much things in this PR:

  • backend part
  • frontend part
  • barrier part because we need to modify meta to let it send barriers to actors which have Now Executor

The latter two cannot be placed in a separate PR before this because they need to use Now Executor.

If it is recommended, I will put these into this PR.

@TennyZhuang
Copy link
Contributor

Generally LGTM.
I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter.

There will be too much things in this PR:

  • backend part
  • frontend part
  • barrier part because we need to modify meta to let it send barriers to actors which have Now Executor

The latter two cannot be placed in a separate PR before this because they need to use Now Executor.

If it is recommended, I will put these into this PR.

In multiple PRs, but merge at the same time?

@soundOfDestiny
Copy link
Contributor Author

Generally LGTM.
I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter.

There will be too much things in this PR:

  • backend part
  • frontend part
  • barrier part because we need to modify meta to let it send barriers to actors which have Now Executor

The latter two cannot be placed in a separate PR before this because they need to use Now Executor.
If it is recommended, I will put these into this PR.

In multiple PRs, but merge at the same time?

Do you mean that create multiple PRs and other PRs also include codes of NowExecutor in this PR?

@soundOfDestiny
Copy link
Contributor Author

Generally LGTM.
I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter.

There will be too much things in this PR:

  • backend part
  • frontend part
  • barrier part because we need to modify meta to let it send barriers to actors which have Now Executor

The latter two cannot be placed in a separate PR before this because they need to use Now Executor.
If it is recommended, I will put these into this PR.

In multiple PRs, but merge at the same time?

Therefore let's work on this base PR: #6677 of following multiple PRs first. It defines interfaces of Now.

@jon-chuang
Copy link
Contributor

I'd prefer to hold the PR for a while, and wait for the frontend part, so that we can test e2e with dynamic filter.

Actually, if we expect this PR is > 90% correct, then I don't see the harm of merging first, and then once we have e2e with dynamic filter, we can fix any bugs we encounter. And incremental change is easier to review.

IMO Now executor is also independent of dynamic filter.

wdyt @TennyZhuang ?

Copy link
Contributor

@jon-chuang jon-chuang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mergify mergify bot merged commit 240aaa9 into main Dec 2, 2022
@mergify mergify bot deleted the zl_now branch December 2, 2022 01:13
chenzl25 pushed a commit that referenced this pull request Dec 2, 2022
* feat(stream): implement now executor (close #6407)

* use if let

* use barrier receiver

* timestamp sanity check

* add now executor builder

* fix gen proto

* rebase main

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants