12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use async_stack_trace:: StackTrace ;
15
16
use chrono:: NaiveDateTime ;
16
17
use futures:: { pin_mut, StreamExt } ;
17
18
use futures_async_stream:: try_stream;
@@ -23,14 +24,17 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
23
24
use risingwave_common:: util:: epoch:: Epoch ;
24
25
use risingwave_storage:: table:: streaming_table:: state_table:: StateTable ;
25
26
use risingwave_storage:: StateStore ;
27
+ use tokio:: sync:: mpsc:: UnboundedReceiver ;
26
28
27
29
use super :: {
28
- expect_first_barrier , BoxedExecutor , BoxedMessageStream , Executor , Message , PkIndices ,
29
- PkIndicesRef , StreamExecutorError , Watermark ,
30
+ Barrier , BoxedMessageStream , Executor , Message , PkIndices , PkIndicesRef , StreamExecutorError ,
31
+ Watermark ,
30
32
} ;
31
33
32
34
pub struct NowExecutor < S : StateStore > {
33
- input : Option < BoxedExecutor > ,
35
+ /// Receiver of barrier channel.
36
+ barrier_receiver : Option < UnboundedReceiver < Barrier > > ,
37
+
34
38
pk_indices : PkIndices ,
35
39
identity : String ,
36
40
schema : Schema ,
@@ -39,15 +43,19 @@ pub struct NowExecutor<S: StateStore> {
39
43
40
44
impl < S : StateStore > NowExecutor < S > {
41
45
#[ allow( dead_code) ]
42
- pub fn new ( input : BoxedExecutor , executor_id : u64 , state_table : StateTable < S > ) -> Self {
46
+ pub fn new (
47
+ barrier_receiver : UnboundedReceiver < Barrier > ,
48
+ executor_id : u64 ,
49
+ state_table : StateTable < S > ,
50
+ ) -> Self {
43
51
let schema = Schema :: new ( vec ! [ Field {
44
52
data_type: DataType :: Timestamp ,
45
53
name: String :: from( "now" ) ,
46
54
sub_fields: vec![ ] ,
47
55
type_name: String :: default ( ) ,
48
56
} ] ) ;
49
57
Self {
50
- input : Some ( input ) ,
58
+ barrier_receiver : Some ( barrier_receiver ) ,
51
59
pk_indices : vec ! [ 0 ] ,
52
60
identity : format ! ( "NowExecutor {:X}" , executor_id) ,
53
61
schema,
@@ -57,10 +65,14 @@ impl<S: StateStore> NowExecutor<S> {
57
65
58
66
#[ try_stream( ok = Message , error = StreamExecutorError ) ]
59
67
async fn into_stream ( mut self ) {
60
- let mut input = self . input . take ( ) . unwrap ( ) . execute ( ) ;
68
+ let mut barrier_receiver = self . barrier_receiver . take ( ) . unwrap ( ) ;
61
69
62
70
// Consume the first barrier message and initialize state table.
63
- let barrier = expect_first_barrier ( & mut input) . await ?;
71
+ let barrier = barrier_receiver
72
+ . recv ( )
73
+ . stack_trace ( "now_executor_recv_first_barrier" )
74
+ . await
75
+ . unwrap ( ) ;
64
76
self . state_table . init_epoch ( barrier. epoch ) ;
65
77
66
78
// The first barrier message should be propagated.
@@ -78,57 +90,51 @@ impl<S: StateStore> NowExecutor<S> {
78
90
79
91
let mut last_timestamp = state_row. and_then ( |row| row[ 0 ] . clone ( ) ) ;
80
92
81
- #[ for_await]
82
- for msg in input {
83
- if let Message :: Barrier ( barrier) = msg? {
84
- if !barrier. is_update ( ) {
85
- let time_millis = Epoch :: from ( barrier. epoch . curr ) . as_unix_millis ( ) ;
86
- let timestamp = Some ( ScalarImpl :: NaiveDateTime ( NaiveDateTimeWrapper :: new (
87
- NaiveDateTime :: from_timestamp (
88
- ( time_millis / 1000 ) as i64 ,
89
- ( time_millis % 1000 * 1_000_000 ) as u32 ,
90
- ) ,
91
- ) ) ) ;
92
-
93
- let mut data_chunk_builder = DataChunkBuilder :: new (
94
- self . schema ( ) . data_types ( ) ,
95
- if last_timestamp. is_some ( ) { 2 } else { 1 } ,
96
- ) ;
97
- if last_timestamp. is_some ( ) {
98
- let chunk_popped = data_chunk_builder
99
- . append_one_row_from_datums ( [ & last_timestamp] . into_iter ( ) ) ;
100
- debug_assert ! ( chunk_popped. is_none( ) ) ;
101
- }
102
- let data_chunk = data_chunk_builder
103
- . append_one_row_from_datums ( [ & timestamp] . into_iter ( ) )
104
- . unwrap ( ) ;
105
- let mut ops = if last_timestamp. is_some ( ) {
106
- vec ! [ Op :: Delete ]
107
- } else {
108
- vec ! [ ]
109
- } ;
110
- ops. push ( Op :: Insert ) ;
111
- let stream_chunk = StreamChunk :: from_parts ( ops, data_chunk) ;
112
- yield Message :: Chunk ( stream_chunk) ;
113
-
114
- yield Message :: Watermark ( Watermark :: new (
115
- 0 ,
116
- timestamp. as_ref ( ) . unwrap ( ) . clone ( ) ,
117
- ) ) ;
118
-
119
- if last_timestamp. is_some ( ) {
120
- self . state_table . delete ( Row :: new ( vec ! [ last_timestamp] ) ) ;
121
- }
122
- self . state_table . insert ( Row :: new ( vec ! [ timestamp. clone( ) ] ) ) ;
123
- last_timestamp = timestamp;
124
-
125
- self . state_table . commit ( barrier. epoch ) . await ?;
93
+ while let Some ( barrier) = barrier_receiver. recv ( ) . await {
94
+ if !barrier. is_update ( ) {
95
+ let time_millis = Epoch :: from ( barrier. epoch . curr ) . as_unix_millis ( ) ;
96
+ let timestamp = Some ( ScalarImpl :: NaiveDateTime ( NaiveDateTimeWrapper :: new (
97
+ NaiveDateTime :: from_timestamp (
98
+ ( time_millis / 1000 ) as i64 ,
99
+ ( time_millis % 1000 * 1_000_000 ) as u32 ,
100
+ ) ,
101
+ ) ) ) ;
102
+
103
+ let mut data_chunk_builder = DataChunkBuilder :: new (
104
+ self . schema ( ) . data_types ( ) ,
105
+ if last_timestamp. is_some ( ) { 2 } else { 1 } ,
106
+ ) ;
107
+ if last_timestamp. is_some ( ) {
108
+ let chunk_popped = data_chunk_builder
109
+ . append_one_row_from_datums ( [ & last_timestamp] . into_iter ( ) ) ;
110
+ debug_assert ! ( chunk_popped. is_none( ) ) ;
111
+ }
112
+ let data_chunk = data_chunk_builder
113
+ . append_one_row_from_datums ( [ & timestamp] . into_iter ( ) )
114
+ . unwrap ( ) ;
115
+ let mut ops = if last_timestamp. is_some ( ) {
116
+ vec ! [ Op :: Delete ]
126
117
} else {
127
- self . state_table . commit_no_data_expected ( barrier. epoch ) ;
118
+ vec ! [ ]
119
+ } ;
120
+ ops. push ( Op :: Insert ) ;
121
+ let stream_chunk = StreamChunk :: from_parts ( ops, data_chunk) ;
122
+ yield Message :: Chunk ( stream_chunk) ;
123
+
124
+ yield Message :: Watermark ( Watermark :: new ( 0 , timestamp. as_ref ( ) . unwrap ( ) . clone ( ) ) ) ;
125
+
126
+ if last_timestamp. is_some ( ) {
127
+ self . state_table . delete ( Row :: new ( vec ! [ last_timestamp] ) ) ;
128
128
}
129
+ self . state_table . insert ( Row :: new ( vec ! [ timestamp. clone( ) ] ) ) ;
130
+ last_timestamp = timestamp;
129
131
130
- yield Message :: Barrier ( barrier) ;
132
+ self . state_table . commit ( barrier. epoch ) . await ?;
133
+ } else {
134
+ self . state_table . commit_no_data_expected ( barrier. epoch ) ;
131
135
}
136
+
137
+ yield Message :: Barrier ( barrier) ;
132
138
}
133
139
}
134
140
}
0 commit comments