diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 6ea34de857219..41495198632f9 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -176,17 +176,19 @@ async fn test_merger_sum_aggr() { let items = items.clone(); async move { // use a merge operator to collect data from dispatchers before sending them to aggregator + let schema = Schema::new(vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ]); let merger = Executor::new( ExecutorInfo { // output schema of local simple agg - schema: Schema::new(vec![ - Field::unnamed(DataType::Int64), - Field::unnamed(DataType::Int64), - ]), + schema: schema.clone(), pk_indices: PkIndices::new(), identity: "MergeExecutor".to_string(), }, - MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone()).boxed(), + MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone(), schema) + .boxed(), ); // for global aggregator, we need to sum data and sum row count @@ -217,7 +219,6 @@ async fn test_merger_sum_aggr() { ], MultiMap::new(), vec![], - 0.0, false, ); diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 0316f7cf36796..8136662b5eaf9 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::Context as _; use futures::stream::{FusedStream, FuturesUnordered, StreamFuture}; use prometheus::Histogram; +use risingwave_common::array::StreamChunkBuilder; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::LabelGuardedMetric; use tokio::sync::mpsc; @@ -45,6 +46,7 @@ pub(crate) struct MergeExecutorInput { shared_context: Arc, executor_stats: Arc, info: ExecutorInfo, + chunk_size: usize, } impl MergeExecutorInput { @@ -55,6 +57,7 @@ impl MergeExecutorInput { shared_context: Arc, executor_stats: Arc, info: ExecutorInfo, + chunk_size: usize, ) -> Self { Self { upstream, @@ -63,6 +66,7 @@ impl MergeExecutorInput { shared_context, executor_stats, info, + chunk_size, } } @@ -87,6 +91,8 @@ impl MergeExecutorInput { self.shared_context, self.executor_stats, barrier_rx, + self.chunk_size, + self.info.schema.clone(), ) .boxed(), }; @@ -127,6 +133,12 @@ pub struct MergeExecutor { metrics: Arc, barrier_rx: mpsc::UnboundedReceiver, + + /// Chunk size for the `StreamChunkBuilder` + chunk_size: usize, + + /// Data types for the `StreamChunkBuilder` + schema: Schema, } impl MergeExecutor { @@ -139,6 +151,8 @@ impl MergeExecutor { context: Arc, metrics: Arc, barrier_rx: mpsc::UnboundedReceiver, + chunk_size: usize, + schema: Schema, ) -> Self { Self { actor_context: ctx, @@ -148,6 +162,8 @@ impl MergeExecutor { context, metrics, barrier_rx, + chunk_size, + schema, } } @@ -156,6 +172,7 @@ impl MergeExecutor { actor_id: ActorId, inputs: Vec, shared_context: Arc, + schema: Schema, ) -> Self { use super::exchange::input::LocalInput; use crate::executor::exchange::input::Input; @@ -184,6 +201,8 @@ impl MergeExecutor { shared_context, metrics.into(), barrier_rx, + 100, + schema, ) } @@ -216,6 +235,7 @@ impl MergeExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self: Box) { let select_all = self.upstreams; + let select_all = BufferChunks::new(select_all, self.chunk_size, self.schema); let actor_id = self.actor_context.id; let mut metrics = self.metrics.new_actor_input_metrics( @@ -560,6 +580,91 @@ impl SelectReceivers { } } +/// A wrapper that buffers the `StreamChunk`s from upstream until no more ready items are available. +/// Besides, any message other than `StreamChunk` will trigger the buffered `StreamChunk`s +/// to be emitted immediately along with the message itself. +struct BufferChunks { + inner: S, + chunk_builder: StreamChunkBuilder, + + /// The items to be emitted. Whenever there's something here, we should return a `Poll::Ready` immediately. + pending_items: VecDeque, +} + +impl BufferChunks { + pub(super) fn new(inner: S, chunk_size: usize, schema: Schema) -> Self { + assert!(chunk_size > 0); + let chunk_builder = StreamChunkBuilder::new(chunk_size, schema.data_types()); + Self { + inner, + chunk_builder, + pending_items: VecDeque::new(), + } + } +} + +impl std::ops::Deref for BufferChunks { + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for BufferChunks { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Stream for BufferChunks +where + S: Stream + Unpin, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(item) = self.pending_items.pop_front() { + return Poll::Ready(Some(item)); + } + + match self.inner.poll_next_unpin(cx) { + Poll::Pending => { + return if let Some(chunk_out) = self.chunk_builder.take() { + Poll::Ready(Some(Ok(MessageInner::Chunk(chunk_out)))) + } else { + Poll::Pending + } + } + + Poll::Ready(Some(result)) => { + if let Ok(MessageInner::Chunk(chunk)) = result { + for row in chunk.records() { + if let Some(chunk_out) = self.chunk_builder.append_record(row) { + self.pending_items + .push_back(Ok(MessageInner::Chunk(chunk_out))); + } + } + } else { + return if let Some(chunk_out) = self.chunk_builder.take() { + self.pending_items.push_back(result); + Poll::Ready(Some(Ok(MessageInner::Chunk(chunk_out)))) + } else { + Poll::Ready(Some(result)) + }; + } + } + + Poll::Ready(None) => { + // See also the comments in `SelectReceivers::poll_next`. + unreachable!("SelectReceivers should never return None"); + } + } + } + } +} + #[cfg(test)] mod tests { use std::sync::atomic::{AtomicBool, Ordering}; @@ -582,18 +687,93 @@ mod tests { use tonic::{Request, Response, Status, Streaming}; use super::*; - use crate::executor::exchange::input::{Input, RemoteInput}; + use crate::executor::exchange::input::{Input, LocalInput, RemoteInput}; use crate::executor::exchange::permit::channel_for_test; use crate::executor::{BarrierInner as Barrier, MessageInner as Message}; use crate::task::barrier_test_utils::LocalBarrierTestEnv; use crate::task::test_utils::helper_make_local_actor; - fn build_test_chunk(epoch: u64) -> StreamChunk { - // The number of items in `ops` is the epoch count. - let ops = vec![Op::Insert; epoch as usize]; + fn build_test_chunk(size: u64) -> StreamChunk { + let ops = vec![Op::Insert; size as usize]; StreamChunk::new(ops, vec![]) } + #[tokio::test] + async fn test_buffer_chunks() { + let test_env = LocalBarrierTestEnv::for_test().await; + + let (tx, rx) = channel_for_test(); + let input = LocalInput::new(rx, 1).boxed_input(); + let mut buffer = BufferChunks::new(input, 100, Schema::new(vec![])); + + // Send a chunk + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { + assert_eq!(chunk.ops().len() as u64, 10); + }); + + // Send 2 chunks and expect them to be merged. + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { + assert_eq!(chunk.ops().len() as u64, 20); + }); + + // Send a watermark. + tx.send(Message::Watermark(Watermark { + col_idx: 0, + data_type: DataType::Int64, + val: ScalarImpl::Int64(233), + })) + .await + .unwrap(); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => { + assert_eq!(watermark.val, ScalarImpl::Int64(233)); + }); + + // Send 2 chunks before a watermark. Expect the 2 chunks to be merged and the watermark to be emitted. + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + tx.send(Message::Watermark(Watermark { + col_idx: 0, + data_type: DataType::Int64, + val: ScalarImpl::Int64(233), + })) + .await + .unwrap(); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { + assert_eq!(chunk.ops().len() as u64, 20); + }); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => { + assert_eq!(watermark.val, ScalarImpl::Int64(233)); + }); + + // Send a barrier. + let barrier = Barrier::new_test_barrier(test_epoch(1)); + test_env.inject_barrier(&barrier, [2]); + tx.send(Message::Barrier(barrier.clone().into_dispatcher())) + .await + .unwrap(); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => { + assert_eq!(barrier_epoch.curr, test_epoch(1)); + }); + + // Send 2 chunks before a barrier. Expect the 2 chunks to be merged and the barrier to be emitted. + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); + let barrier = Barrier::new_test_barrier(test_epoch(2)); + test_env.inject_barrier(&barrier, [2]); + tx.send(Message::Barrier(barrier.clone().into_dispatcher())) + .await + .unwrap(); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { + assert_eq!(chunk.ops().len() as u64, 20); + }); + assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => { + assert_eq!(barrier_epoch.curr, test_epoch(2)); + }); + } + #[tokio::test] async fn test_merger() { const CHANNEL_NUMBER: usize = 10; @@ -639,9 +819,7 @@ mod tests { let handle = tokio::spawn(async move { for (idx, epoch) in epochs { if idx % 20 == 0 { - tx.send(Message::Chunk(build_test_chunk(idx))) - .await - .unwrap(); + tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap(); } else { tx.send(Message::Watermark(Watermark { col_idx: (idx as usize / 20 + tx_id) % CHANNEL_NUMBER, @@ -663,18 +841,25 @@ mod tests { handles.push(handle); } - let merger = - MergeExecutor::for_test(actor_id, rxs, barrier_test_env.shared_context.clone()); + let merger = MergeExecutor::for_test( + actor_id, + rxs, + barrier_test_env.shared_context.clone(), + Schema::new(vec![]), + ); let mut merger = merger.boxed().execute(); for (idx, epoch) in epochs { - // expect n chunks if idx % 20 == 0 { - for _ in 0..CHANNEL_NUMBER { + // expect 1 or more chunks with 100 rows in total + let mut count = 0usize; + while count < 100 { assert_matches!(merger.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { - assert_eq!(chunk.ops().len() as u64, idx); + count += chunk.ops().len(); }); } + assert_eq!(count, 100); } else if idx as usize / 20 >= CHANNEL_NUMBER - 1 { + // expect n watermarks for _ in 0..CHANNEL_NUMBER { assert_matches!(merger.next().await.unwrap().unwrap(), Message::Watermark(watermark) => { assert_eq!(watermark.val, ScalarImpl::Int64((idx - 20 * (CHANNEL_NUMBER as u64 - 1)) as i64)); @@ -775,6 +960,8 @@ mod tests { ctx.clone(), metrics.clone(), barrier_rx, + 100, + Schema::new(vec![]), ) .boxed() .execute(); @@ -810,9 +997,8 @@ mod tests { } // 3. Send a chunk. - send!([untouched, old], Message::Chunk(StreamChunk::default())); - recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk twice. - recv!().unwrap().as_chunk().unwrap(); + send!([untouched, old], Message::Chunk(build_test_chunk(1))); + assert_eq!(2, recv!().unwrap().as_chunk().unwrap().cardinality()); // We should be able to receive the chunk twice. assert_recv_pending!(); send!( @@ -825,9 +1011,8 @@ mod tests { recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier. // 5. Send a chunk. - send!([untouched, new], Message::Chunk(StreamChunk::default())); - recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk twice, since old is removed. - recv!().unwrap().as_chunk().unwrap(); + send!([untouched, new], Message::Chunk(build_test_chunk(1))); + assert_eq!(2, recv!().unwrap().as_chunk().unwrap().cardinality()); // We should be able to receive the chunk twice. assert_recv_pending!(); } diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 6ea579afea524..d94daa926d97b 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -41,10 +41,6 @@ struct Inner { /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks. last_nondec_expr_values: Vec>, - /// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less - /// than the threshold, the Project executor will construct a new chunk before expr evaluation, - materialize_selectivity_threshold: f64, - /// Whether there are likely no-op updates in the output chunks, so that eliminating them with /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial. noop_update_hint: bool, @@ -58,7 +54,6 @@ impl ProjectExecutor { exprs: Vec, watermark_derivations: MultiMap, nondecreasing_expr_indices: Vec, - materialize_selectivity_threshold: f64, noop_update_hint: bool, ) -> Self { let n_nondecreasing_exprs = nondecreasing_expr_indices.len(); @@ -70,7 +65,6 @@ impl ProjectExecutor { watermark_derivations, nondecreasing_expr_indices, last_nondec_expr_values: vec![None; n_nondecreasing_exprs], - materialize_selectivity_threshold, noop_update_hint, }, } @@ -96,11 +90,6 @@ impl Inner { &self, chunk: StreamChunk, ) -> StreamExecutorResult> { - let chunk = if chunk.selectivity() <= self.materialize_selectivity_threshold { - chunk.compact() - } else { - chunk - }; let (data_chunk, ops) = chunk.into_parts(); let mut projected_columns = Vec::new(); @@ -242,7 +231,6 @@ mod tests { vec![test_expr], MultiMap::new(), vec![], - 0.0, false, ); let mut project = project.boxed().execute(); @@ -325,7 +313,6 @@ mod tests { vec![a_expr, b_expr, c_expr], MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()), vec![2], - 0.0, false, ); let mut project = project.boxed().execute(); diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index d6c7ce157a931..f56090f359eef 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -31,6 +31,7 @@ impl MergeExecutorBuilder { actor_context: ActorContextRef, info: ExecutorInfo, node: &MergeNode, + chunk_size: usize, ) -> StreamResult { let upstreams = node.get_upstream_actor_id(); let upstream_fragment_id = node.get_upstream_fragment_id(); @@ -76,6 +77,7 @@ impl MergeExecutorBuilder { shared_context, executor_stats, info, + chunk_size, )) } } @@ -98,6 +100,7 @@ impl ExecutorBuilder for MergeExecutorBuilder { params.actor_context, params.info, node, + params.env.config().developer.chunk_size, )? .into_executor(barrier_rx)) } diff --git a/src/stream/src/from_proto/project.rs b/src/stream/src/from_proto/project.rs index 177045b3eba7d..62d37b4326469 100644 --- a/src/stream/src/from_proto/project.rs +++ b/src/stream/src/from_proto/project.rs @@ -15,7 +15,6 @@ use multimap::MultiMap; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::build_non_strict_from_prost; -use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::stream_plan::ProjectNode; use super::*; @@ -53,20 +52,12 @@ impl ExecutorBuilder for ProjectExecutorBuilder { .iter() .map(|idx| *idx as usize) .collect(); - let extremely_light = node.get_select_list().iter().all(|expr| { - matches!( - expr.get_rex_node().unwrap(), - RexNode::InputRef(_) | RexNode::Constant(_) - ) - }); - let materialize_selectivity_threshold = if extremely_light { 0.0 } else { 0.5 }; let exec = ProjectExecutor::new( params.actor_context, input, project_exprs, watermark_derivations, nondecreasing_expr_indices, - materialize_selectivity_threshold, node.noop_update_hint, ); Ok((params.info, exec).into()) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 83a7137d4403b..799f98fc1a848 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -321,6 +321,7 @@ impl StreamActorManager { upstream_node: &StreamNode, actor_context: &ActorContextRef, shared_context: &Arc, + chunk_size: usize, ) -> StreamResult { let info = Self::get_executor_info( upstream_node, @@ -337,6 +338,7 @@ impl StreamActorManager { actor_context.clone(), info, upstream_merge, + chunk_size, ) } @@ -352,8 +354,13 @@ impl StreamActorManager { state_store: impl StateStore, ) -> StreamResult { let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap(); - let upstream = - self.create_snapshot_backfill_input(upstream_node, actor_context, shared_context)?; + let chunk_size = env.config().developer.chunk_size; + let upstream = self.create_snapshot_backfill_input( + upstream_node, + actor_context, + shared_context, + chunk_size, + )?; let table_desc: &StorageTableDesc = node.get_table_desc()?; @@ -387,7 +394,7 @@ impl StreamActorManager { output_indices, actor_context.clone(), progress, - env.config().developer.chunk_size, + chunk_size, node.rate_limit.map(|x| x as _), barrier_rx, self.streaming_metrics.clone(), @@ -404,10 +411,7 @@ impl StreamActorManager { troubled_info.identity = format!("{} (troubled)", info.identity); Ok(( info, - TroublemakerExecutor::new( - (troubled_info, executor).into(), - env.config().developer.chunk_size, - ), + TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size), ) .into()) } else {