Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 1, 2024
1 parent 28c1465 commit a12684a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
12 changes: 9 additions & 3 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ mod tests {
use super::*;
use crate::executor::exchange::output::Output;
use crate::executor::exchange::permit::channel_for_test;
use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
use crate::executor::{BarrierInner as Barrier, MergeExecutor, MessageInner as Message};
use crate::task::barrier_test_utils::LocalBarrierTestEnv;
use crate::task::test_utils::helper_make_local_actor;

Expand Down Expand Up @@ -1174,7 +1174,7 @@ mod tests {

#[tokio::test]
async fn test_configuration_change() {
let _schema = Schema { fields: vec![] };
let schema = Schema { fields: vec![] };
let (tx, rx) = channel_for_test();
let actor_id = 233;
let fragment_id = 666;
Expand Down Expand Up @@ -1249,7 +1249,13 @@ mod tests {

let input = Executor::new(
Default::default(),
MergeExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()).boxed(),
MergeExecutor::for_test(
actor_id,
vec![rx],
barrier_test_env.shared_context.clone(),
schema,
)
.boxed(),
);
let executor = Box::new(DispatchExecutor::new(
input,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn test_merger_sum_aggr() {
// create 17 local aggregation actors
for _ in 0..17 {
let (tx, rx) = channel_for_test();
let (actor_future, channel) = make_actor(rx);
let (actor_future, channel) = make_actor(vec![rx]);
outputs.push(channel);
actor_futures.push(actor_future);
inputs.push(Box::new(LocalOutput::new(233, tx)) as BoxedOutput);
Expand All @@ -140,7 +140,7 @@ async fn test_merger_sum_aggr() {
pk_indices: PkIndices::new(),
identity: "MergeExecutor".to_string(),
},
MergeExecutor::for_test(actor_id, rx, shared_context.clone(), schema).boxed(),
MergeExecutor::for_test(actor_id, vec![rx], shared_context.clone(), schema).boxed(),
);
let dispatcher = DispatchExecutor::new(
receiver_op,
Expand Down

0 comments on commit a12684a

Please sign in to comment.