Skip to content

Commit

Permalink
feat(stream): merge stream chunks at MergeExecutor (#17968)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Oct 28, 2024
1 parent 6263ea6 commit e99ad67
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 54 deletions.
13 changes: 7 additions & 6 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,19 @@ async fn test_merger_sum_aggr() {
let items = items.clone();
async move {
// use a merge operator to collect data from dispatchers before sending them to aggregator
let schema = Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]);
let merger = Executor::new(
ExecutorInfo {
// output schema of local simple agg
schema: Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]),
schema: schema.clone(),
pk_indices: PkIndices::new(),
identity: "MergeExecutor".to_string(),
},
MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone()).boxed(),
MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone(), schema)
.boxed(),
);

// for global aggregator, we need to sum data and sum row count
Expand Down Expand Up @@ -217,7 +219,6 @@ async fn test_merger_sum_aggr() {
],
MultiMap::new(),
vec![],
0.0,
false,
);

Expand Down
Loading

0 comments on commit e99ad67

Please sign in to comment.