From a12684af2ab9df440695bf109e83c64e198ff422 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 1 Nov 2024 15:25:00 +0800 Subject: [PATCH] fix Signed-off-by: xxchan --- src/stream/src/executor/dispatch.rs | 12 +++++++++--- src/stream/src/executor/integration_tests.rs | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 7158e2f2c8a23..1abfc11067764 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1066,7 +1066,7 @@ mod tests { use super::*; use crate::executor::exchange::output::Output; use crate::executor::exchange::permit::channel_for_test; - use crate::executor::{BarrierInner as Barrier, MessageInner as Message}; + use crate::executor::{BarrierInner as Barrier, MergeExecutor, MessageInner as Message}; use crate::task::barrier_test_utils::LocalBarrierTestEnv; use crate::task::test_utils::helper_make_local_actor; @@ -1174,7 +1174,7 @@ mod tests { #[tokio::test] async fn test_configuration_change() { - let _schema = Schema { fields: vec![] }; + let schema = Schema { fields: vec![] }; let (tx, rx) = channel_for_test(); let actor_id = 233; let fragment_id = 666; @@ -1249,7 +1249,13 @@ mod tests { let input = Executor::new( Default::default(), - MergeExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()).boxed(), + MergeExecutor::for_test( + actor_id, + vec![rx], + barrier_test_env.shared_context.clone(), + schema, + ) + .boxed(), ); let executor = Box::new(DispatchExecutor::new( input, diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index a8e8f29ca40ef..0038ed00bce65 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -118,7 +118,7 @@ async fn test_merger_sum_aggr() { // create 17 local aggregation actors for _ in 0..17 { let (tx, rx) = channel_for_test(); - let (actor_future, channel) = make_actor(rx); + let (actor_future, channel) = make_actor(vec![rx]); outputs.push(channel); actor_futures.push(actor_future); inputs.push(Box::new(LocalOutput::new(233, tx)) as BoxedOutput); @@ -140,7 +140,7 @@ async fn test_merger_sum_aggr() { pk_indices: PkIndices::new(), identity: "MergeExecutor".to_string(), }, - MergeExecutor::for_test(actor_id, rx, shared_context.clone(), schema).boxed(), + MergeExecutor::for_test(actor_id, vec![rx], shared_context.clone(), schema).boxed(), ); let dispatcher = DispatchExecutor::new( receiver_op,