Skip to content

Commit 729f62c

Browse files
author
Liang Zhao
committed
add now executor builder
1 parent fd65678 commit 729f62c

File tree

5 files changed

+64
-1
lines changed

5 files changed

+64
-1
lines changed

proto/stream_plan.proto

+6
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,11 @@ message SortNode {
413413
uint32 sort_column_index = 2;
414414
}
415415

416+
message NowNode {
417+
// Persists emitted 'now'.
418+
catalog.Table state_table = 1;
419+
}
420+
416421
message StreamNode {
417422
oneof node_body {
418423
SourceNode source = 100;
@@ -442,6 +447,7 @@ message StreamNode {
442447
GroupTopNNode group_top_n = 124;
443448
SortNode sort = 125;
444449
WatermarkFilterNode watermark_filter = 126;
450+
NowNode now = 127;
445451
}
446452
// The id for the operator. This is local per mview.
447453
// TODO: should better be a uint32.

src/stream/src/executor/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ pub use lookup::*;
106106
pub use lookup_union::LookupUnionExecutor;
107107
pub use merge::MergeExecutor;
108108
pub use mview::*;
109+
pub use now::NowExecutor;
109110
pub use project::ProjectExecutor;
110111
pub use project_set::*;
111112
pub use rearranged_chain::RearrangedChainExecutor;

src/stream/src/executor/now.rs

-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ pub struct NowExecutor<S: StateStore> {
4242
}
4343

4444
impl<S: StateStore> NowExecutor<S> {
45-
#[allow(dead_code)]
4645
pub fn new(
4746
barrier_receiver: UnboundedReceiver<Barrier>,
4847
executor_id: u64,

src/stream/src/from_proto/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod lookup;
3030
mod lookup_union;
3131
mod merge;
3232
mod mview;
33+
mod now;
3334
mod project;
3435
mod project_set;
3536
mod sink;
@@ -62,6 +63,7 @@ use self::lookup::*;
6263
use self::lookup_union::*;
6364
use self::merge::*;
6465
use self::mview::*;
66+
use self::now::NowExecutorBuilder;
6567
use self::project::*;
6668
use self::project_set::*;
6769
use self::sink::*;
@@ -136,5 +138,6 @@ pub async fn create_executor(
136138
NodeBody::GroupTopN => GroupTopNExecutorBuilder,
137139
NodeBody::Sort => SortExecutorBuilder,
138140
NodeBody::WatermarkFilter => WatermarkFilterBuilder,
141+
NodeBody::Now => NowExecutorBuilder,
139142
}
140143
}

src/stream/src/from_proto/now.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2022 Singularity Data
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use risingwave_common::try_match_expand;
16+
use risingwave_pb::stream_plan::stream_node::NodeBody;
17+
use risingwave_pb::stream_plan::StreamNode;
18+
use risingwave_storage::table::streaming_table::state_table::StateTable;
19+
use risingwave_storage::StateStore;
20+
use tokio::sync::mpsc::unbounded_channel;
21+
22+
use super::ExecutorBuilder;
23+
use crate::error::StreamResult;
24+
use crate::executor::{BoxedExecutor, NowExecutor};
25+
use crate::task::{ExecutorParams, LocalStreamManagerCore};
26+
27+
pub struct NowExecutorBuilder;
28+
29+
#[async_trait::async_trait]
30+
impl ExecutorBuilder for NowExecutorBuilder {
31+
async fn new_boxed_executor(
32+
params: ExecutorParams,
33+
node: &StreamNode,
34+
store: impl StateStore,
35+
stream: &mut LocalStreamManagerCore,
36+
) -> StreamResult<BoxedExecutor> {
37+
let node = try_match_expand!(node.get_node_body().unwrap(), NodeBody::Now)?;
38+
39+
let (sender, barrier_receiver) = unbounded_channel();
40+
stream
41+
.context
42+
.lock_barrier_manager()
43+
.register_sender(params.actor_context.id, sender);
44+
45+
let state_table =
46+
StateTable::from_table_catalog(node.get_state_table()?, store, None).await;
47+
48+
Ok(Box::new(NowExecutor::new(
49+
barrier_receiver,
50+
params.executor_id,
51+
state_table,
52+
)))
53+
}
54+
}

0 commit comments

Comments
 (0)