@@ -16,11 +16,10 @@ use async_stack_trace::StackTrace;
16
16
use chrono:: NaiveDateTime ;
17
17
use futures:: { pin_mut, StreamExt } ;
18
18
use futures_async_stream:: try_stream;
19
- use risingwave_common:: array:: { Op , StreamChunk } ;
19
+ use risingwave_common:: array:: { DataChunk , Op , StreamChunk } ;
20
20
use risingwave_common:: catalog:: { Field , Schema } ;
21
21
use risingwave_common:: row:: Row ;
22
22
use risingwave_common:: types:: { DataType , NaiveDateTimeWrapper , ScalarImpl } ;
23
- use risingwave_common:: util:: chunk_coalesce:: DataChunkBuilder ;
24
23
use risingwave_common:: util:: epoch:: Epoch ;
25
24
use risingwave_storage:: StateStore ;
26
25
use tokio:: sync:: mpsc:: UnboundedReceiver ;
@@ -106,18 +105,14 @@ impl<S: StateStore> NowExecutor<S> {
106
105
) ,
107
106
) ) ) ;
108
107
109
- let mut data_chunk_builder = DataChunkBuilder :: new (
110
- schema. data_types ( ) ,
111
- if last_timestamp. is_some ( ) { 2 } else { 1 } ,
108
+ let data_chunk = DataChunk :: from_rows (
109
+ & [ Row :: new ( if last_timestamp. is_some ( ) {
110
+ vec ! [ last_timestamp. clone( ) , timestamp. clone( ) ]
111
+ } else {
112
+ vec ! [ timestamp. clone( ) ]
113
+ } ) ] ,
114
+ & schema. data_types ( ) ,
112
115
) ;
113
- if last_timestamp. is_some ( ) {
114
- let chunk_popped = data_chunk_builder
115
- . append_one_row_from_datums ( [ & last_timestamp] . into_iter ( ) ) ;
116
- debug_assert ! ( chunk_popped. is_none( ) ) ;
117
- }
118
- let data_chunk = data_chunk_builder
119
- . append_one_row_from_datums ( [ & timestamp] . into_iter ( ) )
120
- . unwrap ( ) ;
121
116
let mut ops = if last_timestamp. is_some ( ) {
122
117
vec ! [ Op :: Delete ]
123
118
} else {
0 commit comments