From ef5835e6b3b633b86d9eeaa98a034b59c9cc18c8 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 25 Dec 2024 00:59:53 +0800 Subject: [PATCH] feat(pb): box stream NodeBody to reduce stack memory usage (#19911) Signed-off-by: xxchan --- Cargo.lock | 56 ++++++++++++++----- Cargo.toml | 2 + Makefile.toml | 13 +---- ci/scripts/common.sh | 1 - ci/scripts/e2e-source-test.sh | 1 - ci/scripts/pr-unit-test.sh | 4 -- .../src/expr/user_defined_function.rs | 4 +- .../src/handler/alter_table_column.rs | 4 +- src/frontend/src/handler/create_sink.rs | 4 +- .../optimizer/plan_node/stream_asof_join.rs | 4 +- .../plan_node/stream_cdc_table_scan.rs | 12 ++-- .../optimizer/plan_node/stream_changelog.rs | 4 +- .../src/optimizer/plan_node/stream_dedup.rs | 4 +- .../optimizer/plan_node/stream_delta_join.rs | 4 +- .../src/optimizer/plan_node/stream_dml.rs | 4 +- .../plan_node/stream_dynamic_filter.rs | 4 +- .../plan_node/stream_eowc_over_window.rs | 4 +- .../optimizer/plan_node/stream_exchange.rs | 4 +- .../src/optimizer/plan_node/stream_expand.rs | 4 +- .../src/optimizer/plan_node/stream_filter.rs | 4 +- .../optimizer/plan_node/stream_fs_fetch.rs | 4 +- .../stream_global_approx_percentile.rs | 2 +- .../optimizer/plan_node/stream_group_topn.rs | 4 +- .../optimizer/plan_node/stream_hash_agg.rs | 4 +- .../optimizer/plan_node/stream_hash_join.rs | 4 +- .../optimizer/plan_node/stream_hop_window.rs | 4 +- .../stream_local_approx_percentile.rs | 2 +- .../optimizer/plan_node/stream_materialize.rs | 4 +- .../src/optimizer/plan_node/stream_now.rs | 4 +- .../optimizer/plan_node/stream_over_window.rs | 4 +- .../src/optimizer/plan_node/stream_project.rs | 4 +- .../optimizer/plan_node/stream_project_set.rs | 4 +- .../optimizer/plan_node/stream_row_id_gen.rs | 4 +- .../optimizer/plan_node/stream_row_merge.rs | 4 +- .../optimizer/plan_node/stream_simple_agg.rs | 4 +- .../src/optimizer/plan_node/stream_sink.rs | 4 +- .../src/optimizer/plan_node/stream_sort.rs | 4 +- .../src/optimizer/plan_node/stream_source.rs | 2 +- .../optimizer/plan_node/stream_source_scan.rs | 2 +- .../plan_node/stream_stateless_simple_agg.rs | 4 +- .../optimizer/plan_node/stream_table_scan.rs | 6 +- .../plan_node/stream_temporal_join.rs | 4 +- .../src/optimizer/plan_node/stream_topn.rs | 4 +- .../src/optimizer/plan_node/stream_values.rs | 4 +- .../plan_node/stream_watermark_filter.rs | 4 +- src/frontend/src/stream_fragmenter/mod.rs | 4 +- .../stream_fragmenter/rewrite/delta_join.rs | 14 +++-- src/meta/src/controller/fragment.rs | 4 +- src/meta/src/rpc/ddl_controller.rs | 2 +- src/meta/src/stream/stream_graph/actor.rs | 12 ++-- src/meta/src/stream/stream_graph/fragment.rs | 2 +- src/meta/src/stream/test_fragmenter.rs | 32 +++++------ src/prost/Cargo.toml | 3 + src/prost/build.rs | 49 ++++++++++++++++ src/prost/src/lib.rs | 33 ++++++++--- 55 files changed, 239 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 601c2966b37d1..31034235c93dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7396,7 +7396,7 @@ checksum = "f271a476bbaa9d2139e1e1a5beb869c6119e805a0b67ad2b2857e4a8785b111a" dependencies = [ "prettyplease 0.2.15", "proc-macro2", - "prost-build 0.13.1", + "prost-build 0.13.4", "quote", "syn 2.0.87", "tonic-build", @@ -9548,6 +9548,15 @@ dependencies = [ "prost-derive 0.13.1", ] +[[package]] +name = "prost" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" +dependencies = [ + "bytes", + "prost-derive 0.13.4", +] + [[package]] name = "prost-build" version = "0.11.9" @@ -9572,20 +9581,18 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" dependencies = [ - "bytes", - "heck 0.5.0", - "itertools 0.13.0", + "heck 0.4.1", + "itertools 0.10.5", "log", - "multimap 0.10.0", + "multimap 0.8.3", "once_cell", "petgraph", "prettyplease 0.2.15", - "prost 0.13.1", - "prost-types 0.13.1", + "prost 0.13.4", + "prost-types 0.13.4", "regex", "syn 2.0.87", "tempfile", @@ -9624,7 +9631,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "prost-derive" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" +dependencies = [ + "anyhow", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.87", @@ -9673,6 +9692,14 @@ dependencies = [ "prost 0.13.1", ] +[[package]] +name = "prost-types" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" +dependencies = [ + "prost 0.13.4", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -11124,7 +11151,7 @@ dependencies = [ "madsim-tokio", "num-bigint", "prost 0.13.1", - "prost-build 0.13.1", + "prost-build 0.13.4", "prost-reflect", "prost-types 0.13.1", "protox", @@ -11790,10 +11817,11 @@ dependencies = [ "pbjson", "pbjson-build", "prost 0.13.1", - "prost-build 0.13.1", + "prost-build 0.13.4", "prost-helpers", "risingwave_error", "serde", + "static_assertions", "strum 0.26.3", "thiserror 1.0.63", "walkdir", @@ -14731,7 +14759,7 @@ checksum = "fe4ee8877250136bd7e3d2331632810a4df4ea5e004656990d8d66d2f5ee8a67" dependencies = [ "prettyplease 0.2.15", "proc-macro2", - "prost-build 0.13.1", + "prost-build 0.13.4", "quote", "syn 2.0.87", ] diff --git a/Cargo.toml b/Cargo.toml index 4e7cc205e15ad..c7177b037f736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -375,6 +375,8 @@ sqlx = { git = "https://github.com/madsim-rs/sqlx.git", rev = "3efe6d0065963db2a futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" } # patch to remove preserve_order from serde_json bson = { git = "https://github.com/risingwavelabs/bson-rust", rev = "e5175ec" } +# TODO: unpatch after PR merged https://github.com/tokio-rs/prost/pull/1210 +prost-build = { git = "https://github.com/xxchan/prost.git", rev = "0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" } [workspace.metadata.dylint] libraries = [{ path = "./lints" }] diff --git a/Makefile.toml b/Makefile.toml index 29e1064d4586f..6079a69e6ab4d 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -382,11 +382,7 @@ ln -s "$(pwd)/target/${RISEDEV_BUILD_TARGET_DIR}${BUILD_MODE_DIR}/risingwave" "$ [tasks.post-build-risingwave] category = "RiseDev - Build" description = "Copy RisingWave binaries to bin" -condition = { env_set = [ - "ENABLE_BUILD_RUST", -], env_not_set = [ - "USE_SYSTEM_RISINGWAVE", -] } +condition = { env_set = ["ENABLE_BUILD_RUST"], env_not_set = ["USE_SYSTEM_RISINGWAVE"] } dependencies = [ "link-all-in-one-binaries", "link-user-bin", @@ -873,11 +869,6 @@ script = """ #!/usr/bin/env bash set -e -echo "Running Planner Test requires larger stack size, setting RUST_MIN_STACK to 8388608 (8MB) as default." -if [[ -z "${RUST_MIN_STACK}" ]]; then - export RUST_MIN_STACK=8388608 -fi - cargo nextest run --workspace --exclude risingwave_simulation "$@" """ description = "🌟 Run unit tests" @@ -1313,7 +1304,7 @@ echo If you still feel this is not enough, you may copy $(tput setaf 4)risedev$( [tasks.ci-start] category = "RiseDev - CI" dependencies = ["clean-data", "pre-start-dev"] -command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile +command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile env = { RISEDEV_CLEAN_START = true } args = ["${@}"] description = "Clean data and start a full RisingWave dev cluster using risedev-dev" diff --git a/ci/scripts/common.sh b/ci/scripts/common.sh index 4c43da1fcaeaa..9d807070ca57a 100755 --- a/ci/scripts/common.sh +++ b/ci/scripts/common.sh @@ -16,7 +16,6 @@ export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud export NEXTEST_HIDE_PROGRESS_BAR=true export RW_TELEMETRY_TYPE=test export RW_SECRET_STORE_PRIVATE_KEY_HEX="0123456789abcdef0123456789abcdef" -export RUST_MIN_STACK=4194304 unset LANG diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 94ceaaaa853c1..20a06a18de7a4 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -144,7 +144,6 @@ risedev ci-kill export RISINGWAVE_CI=true echo "--- e2e, ci-kafka-plus-pubsub, legacy kafka tests" -export RUST_MIN_STACK=4194304 RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-kafka ./e2e_test/source_legacy/basic/scripts/prepare_ci_kafka.sh diff --git a/ci/scripts/pr-unit-test.sh b/ci/scripts/pr-unit-test.sh index 73b50909802bf..25f11f446206e 100755 --- a/ci/scripts/pr-unit-test.sh +++ b/ci/scripts/pr-unit-test.sh @@ -5,8 +5,4 @@ set -euo pipefail source ci/scripts/common.sh source ci/scripts/pr.env.sh - -# Set RUST_MIN_STACK to 8MB to avoid stack overflow in planner test. -# This is a Unit Test specific setting. -export RUST_MIN_STACK=8388608 ./ci/scripts/run-unit-test.sh diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 084fe7387d766..38d702e94b5b6 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -81,7 +81,7 @@ impl Expr for UserDefinedFunction { ExprNode { function_type: Type::Unspecified.into(), return_type: Some(self.return_type().to_protobuf()), - rex_node: Some(RexNode::Udf(UserDefinedFunction { + rex_node: Some(RexNode::Udf(Box::new(UserDefinedFunction { children: self.args.iter().map(Expr::to_expr_proto).collect(), name: self.catalog.name.clone(), arg_names: self.catalog.arg_names.clone(), @@ -98,7 +98,7 @@ impl Expr for UserDefinedFunction { body: self.catalog.body.clone(), compressed_binary: self.catalog.compressed_binary.clone(), always_retry_on_network_error: self.catalog.always_retry_on_network_error, - })), + }))), } } } diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 0342004f6b1b5..04cc9c8f8defa 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -296,10 +296,10 @@ pub(crate) fn hijack_merger_for_target_table( } } - let pb_project = PbNodeBody::Project(ProjectNode { + let pb_project = PbNodeBody::Project(Box::new(ProjectNode { select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(), ..Default::default() - }); + })); for fragment in graph.fragments.values_mut() { if let Some(node) = &mut fragment.node { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index c89050b331d3f..3502455021ca7 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -649,9 +649,9 @@ pub(crate) fn insert_merger_to_union_with_project( // TODO: MergeNode is used as a placeholder, see issue #17658 node.input.push(StreamNode { input: vec![StreamNode { - node_body: Some(NodeBody::Merge(MergeNode { + node_body: Some(NodeBody::Merge(Box::new(MergeNode { ..Default::default() - })), + }))), ..Default::default() }], identity: uniq_identity diff --git a/src/frontend/src/optimizer/plan_node/stream_asof_join.rs b/src/frontend/src/optimizer/plan_node/stream_asof_join.rs index e4ce2c7edfff2..80d2afd017f81 100644 --- a/src/frontend/src/optimizer/plan_node/stream_asof_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_asof_join.rs @@ -310,7 +310,7 @@ impl StreamNode for StreamAsOfJoin { _ => unreachable!(), }; - NodeBody::AsOfJoin(AsOfJoinNode { + NodeBody::AsOfJoin(Box::new(AsOfJoinNode { join_type: asof_join_type.into(), left_key: left_jk_indices_prost, right_key: right_jk_indices_prost, @@ -321,7 +321,7 @@ impl StreamNode for StreamAsOfJoin { right_deduped_input_pk_indices, output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), asof_desc: Some(self.inequality_desc), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 7ab2912bd1118..e2d57f9f5626d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -190,10 +190,10 @@ impl StreamCdcTableScan { append_only: true, identity: "StreamCdcFilter".to_owned(), fields: cdc_source_schema.clone(), - node_body: Some(PbNodeBody::CdcFilter(CdcFilterNode { + node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode { search_condition: Some(filter_expr.to_expr_proto()), upstream_source_id, - })), + }))), }; let exchange_operator_id = self.core.ctx.next_plan_node_id(); @@ -205,13 +205,13 @@ impl StreamCdcTableScan { append_only: true, identity: "Exchange".to_owned(), fields: cdc_source_schema.clone(), - node_body: Some(PbNodeBody::Exchange(ExchangeNode { + node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode { strategy: Some(DispatchStrategy { r#type: DispatcherType::Simple as _, dist_key_indices: vec![], // simple exchange doesn't need dist key output_indices: (0..cdc_source_schema.len() as u32).collect(), }), - })), + }))), }; // The required columns from the external table @@ -242,7 +242,7 @@ impl StreamCdcTableScan { ); let options = self.core.options.to_proto(); - let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode { + let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode { table_id: upstream_source_id, upstream_column_ids, output_indices, @@ -252,7 +252,7 @@ impl StreamCdcTableScan { rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, disable_backfill: options.disable_backfill, options: Some(options), - }); + })); // plan: merge -> filter -> exchange(simple) -> stream_scan Ok(PbStreamNode { diff --git a/src/frontend/src/optimizer/plan_node/stream_changelog.rs b/src/frontend/src/optimizer/plan_node/stream_changelog.rs index 34bfdec281815..bf09b060c4f87 100644 --- a/src/frontend/src/optimizer/plan_node/stream_changelog.rs +++ b/src/frontend/src/optimizer/plan_node/stream_changelog.rs @@ -72,9 +72,9 @@ impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog"); impl StreamNode for StreamChangeLog { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - PbNodeBody::Changelog(ChangeLogNode { + PbNodeBody::Changelog(Box::new(ChangeLogNode { need_op: self.core.need_op, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index d642d0f9e7ee0..689dd0432454d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -92,7 +92,7 @@ impl StreamNode for StreamDedup { let table_catalog = self .infer_internal_table_catalog() .with_id(state.gen_table_id_wrapped()); - PbNodeBody::AppendOnlyDedup(DedupNode { + PbNodeBody::AppendOnlyDedup(Box::new(DedupNode { state_table: Some(table_catalog.to_internal_table_prost()), dedup_column_indices: self .core @@ -100,7 +100,7 @@ impl StreamNode for StreamDedup { .iter() .map(|idx| *idx as _) .collect_vec(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 84592aee1829a..af9694471b3ea 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -159,7 +159,7 @@ impl TryToStreamPb for StreamDeltaJoin { // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we // don't need an intermediate representation. let eq_join_predicate = &self.eq_join_predicate; - Ok(NodeBody::DeltaIndexJoin(DeltaIndexJoinNode { + Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode { join_type: self.core.join_type as i32, left_key: eq_join_predicate .left_eq_indexes() @@ -210,7 +210,7 @@ impl TryToStreamPb for StreamDeltaJoin { .collect(), }), output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), - })) + }))) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index c69baf5ad53d9..72393159a4006 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -88,12 +88,12 @@ impl StreamNode for StreamDml { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; - PbNodeBody::Dml(DmlNode { + PbNodeBody::Dml(Box::new(DmlNode { table_id: 0, // Meta will fill this table id. table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id. column_descs: self.column_descs.iter().map(Into::into).collect(), rate_limit: self.base.ctx().overwrite_options().dml_rate_limit, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 02fb426905c7e..1f704f5c9eb74 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -171,13 +171,13 @@ impl StreamNode for StreamDynamicFilter { let right_table = infer_right_internal_table_catalog(right.plan_base()) .with_id(state.gen_table_id_wrapped()); #[allow(deprecated)] - NodeBody::DynamicFilter(DynamicFilterNode { + NodeBody::DynamicFilter(Box::new(DynamicFilterNode { left_key: left_index as u32, condition, left_table: Some(left_table.to_internal_table_prost()), right_table: Some(right_table.to_internal_table_prost()), condition_always_relax: false, // deprecated - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 4d134df37799b..d27c1d417060e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -155,12 +155,12 @@ impl StreamNode for StreamEowcOverWindow { .with_id(state.gen_table_id_wrapped()) .to_internal_table_prost(); - PbNodeBody::EowcOverWindow(EowcOverWindowNode { + PbNodeBody::EowcOverWindow(Box::new(EowcOverWindowNode { calls, partition_by, order_by, state_table: Some(state_table), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index d42f9a9392b41..28a41935d07a4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -120,7 +120,7 @@ impl_plan_tree_node_for_unary! {StreamExchange} impl StreamNode for StreamExchange { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { - NodeBody::Exchange(ExchangeNode { + NodeBody::Exchange(Box::new(ExchangeNode { strategy: if self.no_shuffle { Some(DispatchStrategy { r#type: DispatcherType::NoShuffle as i32, @@ -144,7 +144,7 @@ impl StreamNode for StreamExchange { output_indices: (0..self.schema().len() as u32).collect(), }) }, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index fa0268a46fcf5..a6737fb22c360 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -79,13 +79,13 @@ impl_distill_by_unit!(StreamExpand, core, "StreamExpand"); impl StreamNode for StreamExpand { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - PbNodeBody::Expand(ExpandNode { + PbNodeBody::Expand(Box::new(ExpandNode { column_subsets: self .column_subsets() .iter() .map(|subset| subset_to_protobuf(subset)) .collect(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 0a3126ffe7180..e3e6bb90a7f0b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -69,9 +69,9 @@ impl_distill_by_unit!(StreamFilter, core, "StreamFilter"); impl StreamNode for StreamFilter { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - PbNodeBody::Filter(FilterNode { + PbNodeBody::Filter(Box::new(FilterNode { search_condition: Some(ExprImpl::from(self.predicate().clone()).to_expr_proto()), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 9d3f46ca77cc3..f3ec2b280fef9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -123,8 +123,8 @@ impl StreamNode for StreamFsFetch { secret_refs, } }); - NodeBody::StreamFsFetch(StreamFsFetchNode { + NodeBody::StreamFsFetch(Box::new(StreamFsFetchNode { node_inner: source_inner, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs index e2c795892e5f9..2b1f78f919c66 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs @@ -132,7 +132,7 @@ impl StreamNode for StreamGlobalApproxPercentile { .to_internal_table_prost(), ), }; - PbNodeBody::GlobalApproxPercentile(body) + PbNodeBody::GlobalApproxPercentile(Box::new(body)) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index b9230270e634e..a539c0f2a1af2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -129,9 +129,9 @@ impl StreamNode for StreamGroupTopN { order_by: self.topn_order().to_protobuf(), }; if self.input().append_only() { - PbNodeBody::AppendOnlyGroupTopN(group_topn_node) + PbNodeBody::AppendOnlyGroupTopN(Box::new(group_topn_node)) } else { - PbNodeBody::GroupTopN(group_topn_node) + PbNodeBody::GroupTopN(Box::new(group_topn_node)) } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 66266a893c042..542b6784069f9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -187,7 +187,7 @@ impl StreamNode for StreamHashAgg { self.core .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx); - PbNodeBody::HashAgg(HashAggNode { + PbNodeBody::HashAgg(Box::new(HashAggNode { group_key: self.group_key().to_vec_as_u32(), agg_calls: self .agg_calls() @@ -220,7 +220,7 @@ impl StreamNode for StreamHashAgg { row_count_index: self.row_count_idx as u32, emit_on_window_close: self.base.emit_on_window_close(), version: PbAggNodeVersion::Issue13465 as _, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 0d7863a247d9c..b330a9e24c6d8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -359,7 +359,7 @@ impl StreamNode for StreamHashJoin { let null_safe_prost = self.eq_join_predicate.null_safes().into_iter().collect(); - NodeBody::HashJoin(HashJoinNode { + NodeBody::HashJoin(Box::new(HashJoinNode { join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, right_key: right_jk_indices_prost, @@ -403,7 +403,7 @@ impl StreamNode for StreamHashJoin { right_deduped_input_pk_indices, output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), is_append_only: self.is_append_only, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 4a50387c50be0..ee86b37fe35d7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -104,7 +104,7 @@ impl_plan_tree_node_for_unary! {StreamHopWindow} impl StreamNode for StreamHopWindow { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - PbNodeBody::HopWindow(HopWindowNode { + PbNodeBody::HopWindow(Box::new(HopWindowNode { time_col: self.core.time_col.index() as _, window_slide: Some(self.core.window_slide.into()), window_size: Some(self.core.window_size.into()), @@ -121,7 +121,7 @@ impl StreamNode for StreamHopWindow { .iter() .map(|x| x.to_expr_proto()) .collect(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs index b5af9be49df05..c62c1f5d45d45 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs @@ -120,7 +120,7 @@ impl StreamNode for StreamLocalApproxPercentile { base, percentile_index, }; - PbNodeBody::LocalApproxPercentile(body) + PbNodeBody::LocalApproxPercentile(Box::new(body)) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 6e4339658966b..fa912c7046985 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -383,7 +383,7 @@ impl StreamNode for StreamMaterialize { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; - PbNodeBody::Materialize(MaterializeNode { + PbNodeBody::Materialize(Box::new(MaterializeNode { // We don't need table id for materialize node in frontend. The id will be generated on // meta catalog service. table_id: 0, @@ -394,7 +394,7 @@ impl StreamNode for StreamMaterialize { .map(ColumnOrder::to_protobuf) .collect(), table: Some(self.table().to_internal_table_prost()), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index 9ec80d15bac30..b8db60b11507b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -81,7 +81,7 @@ impl StreamNode for StreamNow { let table_catalog = internal_table_catalog_builder .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); - NodeBody::Now(PbNowNode { + NodeBody::Now(Box::new(PbNowNode { state_table: Some(table_catalog.to_internal_table_prost()), mode: Some(match &self.core.mode { Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}), @@ -93,7 +93,7 @@ impl StreamNode for StreamNow { interval: Some(Datum::Some((*interval).into()).to_protobuf()), }), }), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index 6b0beaa9f99cc..405f25301f7b4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -130,13 +130,13 @@ impl StreamNode for StreamOverWindow { .config() .streaming_over_window_cache_policy(); - PbNodeBody::OverWindow(OverWindowNode { + PbNodeBody::OverWindow(Box::new(OverWindowNode { calls, partition_by, order_by, state_table: Some(state_table), cache_policy: cache_policy.to_protobuf() as _, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index d6ac8af7b146c..92aefd429ad9c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -159,13 +159,13 @@ impl StreamNode for StreamProject { .iter() .map(|(i, o)| (*i as u32, *o as u32)) .unzip(); - PbNodeBody::Project(ProjectNode { + PbNodeBody::Project(Box::new(ProjectNode { select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(), watermark_input_cols, watermark_output_cols, nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), noop_update_hint: self.noop_update_hint, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 5735c4b9d5644..5e75ba7d3cb56 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -111,7 +111,7 @@ impl StreamNode for StreamProjectSet { .iter() .map(|(i, o)| (*i as u32, *o as u32)) .unzip(); - PbNodeBody::ProjectSet(ProjectSetNode { + PbNodeBody::ProjectSet(Box::new(ProjectSetNode { select_list: self .core .select_list @@ -121,7 +121,7 @@ impl StreamNode for StreamProjectSet { watermark_input_cols, watermark_expr_indices, nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index 36b96f4dad36e..3ac868dc224fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -83,9 +83,9 @@ impl StreamNode for StreamRowIdGen { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; - PbNodeBody::RowIdGen(RowIdGenNode { + PbNodeBody::RowIdGen(Box::new(RowIdGenNode { row_id_index: self.row_id_index as _, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_row_merge.rs b/src/frontend/src/optimizer/plan_node/stream_row_merge.rs index 1295702135ffc..c065ffcc5a2a8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_merge.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_merge.rs @@ -141,10 +141,10 @@ impl_plan_tree_node_for_binary! { StreamRowMerge } impl StreamNode for StreamRowMerge { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - PbNodeBody::RowMerge(risingwave_pb::stream_plan::RowMergeNode { + PbNodeBody::RowMerge(Box::new(risingwave_pb::stream_plan::RowMergeNode { lhs_mapping: Some(self.lhs_mapping.to_protobuf()), rhs_mapping: Some(self.rhs_mapping.to_protobuf()), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index f9f125654f402..66fe3424e06cf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -113,7 +113,7 @@ impl StreamNode for StreamSimpleAgg { let (intermediate_state_table, agg_states, distinct_dedup_tables) = self.core.infer_tables(&self.base, None, None); - PbNodeBody::SimpleAgg(SimpleAggNode { + PbNodeBody::SimpleAgg(Box::new(SimpleAggNode { agg_calls: self .agg_calls() .iter() @@ -151,7 +151,7 @@ impl StreamNode for StreamSimpleAgg { row_count_index: self.row_count_idx as u32, version: PbAggNodeVersion::Issue13465 as _, must_output_per_barrier: self.must_output_per_barrier, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index d9e0d116a61e5..1451a86ffb68f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -598,12 +598,12 @@ impl StreamNode for StreamSink { .infer_kv_log_store_table_catalog() .with_id(state.gen_table_id_wrapped()); - PbNodeBody::Sink(SinkNode { + PbNodeBody::Sink(Box::new(SinkNode { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), log_store_type: self.log_store_type as i32, rate_limit: self.base.ctx().overwrite_options().sink_rate_limit, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index c4acd275f1236..3b5cbf7e5d85c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -128,14 +128,14 @@ impl_plan_tree_node_for_unary! { StreamEowcSort } impl StreamNode for StreamEowcSort { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; - PbNodeBody::Sort(SortNode { + PbNodeBody::Sort(Box::new(SortNode { state_table: Some( self.infer_state_table() .with_id(state.gen_table_id_wrapped()) .to_internal_table_prost(), ), sort_column_index: self.sort_column_index as _, - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 909fa1e0d3009..b8e3ea6bf1834 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -96,7 +96,7 @@ impl StreamNode for StreamSource { secret_refs, } }); - PbNodeBody::Source(SourceNode { source_inner }) + PbNodeBody::Source(Box::new(SourceNode { source_inner })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index f72361efbf03d..ab002a23d262e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -158,7 +158,7 @@ impl StreamSourceScan { ..Default::default() }, ], - node_body: Some(PbNodeBody::SourceBackfill(backfill)), + node_body: Some(PbNodeBody::SourceBackfill(Box::new(backfill))), stream_key, operator_id: self.base.id().0 as u64, identity: self.distill_to_string(), diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index edb9121baf595..43ae49d3bdaa4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -84,7 +84,7 @@ impl_plan_tree_node_for_unary! { StreamStatelessSimpleAgg } impl StreamNode for StreamStatelessSimpleAgg { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; - PbNodeBody::StatelessSimpleAgg(SimpleAggNode { + PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode { agg_calls: self .agg_calls() .iter() @@ -103,7 +103,7 @@ impl StreamNode for StreamStatelessSimpleAgg { distinct_dedup_tables: Default::default(), version: AggNodeVersion::Issue13465 as _, must_output_per_barrier: false, // this is not used - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index b75ec5b1c1d46..426d2add79f5d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -310,7 +310,7 @@ impl StreamTableScan { None }; - let node_body = PbNodeBody::StreamScan(StreamScanNode { + let node_body = PbNodeBody::StreamScan(Box::new(StreamScanNode { table_id: self.core.table_desc.table_id.table_id, stream_scan_type: self.stream_scan_type as i32, // The column indices need to be forwarded to the downstream @@ -322,7 +322,7 @@ impl StreamTableScan { arrangement_table, rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit, ..Default::default() - }); + })); Ok(PbStreamNode { fields: self.schema().to_prost(), @@ -338,7 +338,7 @@ impl StreamTableScan { }, // Snapshot read PbStreamNode { - node_body: Some(PbNodeBody::BatchPlan(batch_plan_node)), + node_body: Some(PbNodeBody::BatchPlan(Box::new(batch_plan_node))), operator_id: self.batch_plan_id.0 as u64, identity: "BatchPlanNode".into(), fields: snapshot_schema, diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 390a141dcb385..b76c55a0b810c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -246,7 +246,7 @@ impl TryToStreamPb for StreamTemporalJoin { .as_stream_table_scan() .expect("should be a stream table scan"); - Ok(NodeBody::TemporalJoin(TemporalJoinNode { + Ok(NodeBody::TemporalJoin(Box::new(TemporalJoinNode { join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, right_key: right_jk_indices_prost, @@ -267,7 +267,7 @@ impl TryToStreamPb for StreamTemporalJoin { Some(memo_table.to_internal_table_prost()) }, is_nested_loop: self.is_nested_loop, - })) + }))) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 80ca9141033c5..9109a76316884 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -112,9 +112,9 @@ impl StreamNode for StreamTopN { order_by: self.topn_order().to_protobuf(), }; if self.input().append_only() { - PbNodeBody::AppendOnlyTopN(topn_node) + PbNodeBody::AppendOnlyTopN(Box::new(topn_node)) } else { - PbNodeBody::TopN(topn_node) + PbNodeBody::TopN(Box::new(topn_node)) } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index 0a71c208c32ee..eb42ecba57852 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -72,7 +72,7 @@ impl Distill for StreamValues { impl StreamNode for StreamValues { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode { - ProstStreamNode::Values(ValuesNode { + ProstStreamNode::Values(Box::new(ValuesNode { tuples: self .logical .rows() @@ -86,7 +86,7 @@ impl StreamNode for StreamValues { .iter() .map(|f| f.to_prost()) .collect(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index 63699998ea4b0..1d0935225abb0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -148,12 +148,12 @@ impl StreamNode for StreamWatermarkFilter { let table = infer_internal_table_catalog(watermark_type); - PbNodeBody::WatermarkFilter(WatermarkFilterNode { + PbNodeBody::WatermarkFilter(Box::new(WatermarkFilterNode { watermark_descs: self.watermark_descs.clone(), tables: vec![table .with_id(state.gen_table_id_wrapped()) .to_internal_table_prost()], - }) + })) } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index f30b0abf5b4c4..b9447fd678444 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -439,9 +439,9 @@ fn build_fragment( let node = state.gen_no_op_stream_node(StreamNode { operator_id: no_shuffle_exchange_operator_id, identity: "StreamNoShuffleExchange".into(), - node_body: Some(NodeBody::Exchange(ExchangeNode { + node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode { strategy: Some(no_shuffle_strategy.clone()), - })), + }))), input: vec![], // Take reference's properties. diff --git a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs index 8b6a2cb06c871..5c441a3aefeab 100644 --- a/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs +++ b/src/frontend/src/stream_fragmenter/rewrite/delta_join.rs @@ -35,11 +35,11 @@ fn build_no_shuffle_exchange_for_delta_join( identity: "NO SHUFFLE Exchange (Lookup and Merge)".into(), fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), - node_body: Some(NodeBody::Exchange(ExchangeNode { + node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode { strategy: Some(dispatch_no_shuffle( (0..(upstream.fields.len() as u32)).collect(), )), - })), + }))), input: vec![], append_only: upstream.append_only, } @@ -55,12 +55,12 @@ fn build_consistent_hash_shuffle_exchange_for_delta_join( identity: "HASH Exchange (Lookup and Merge)".into(), fields: upstream.fields.clone(), stream_key: upstream.stream_key.clone(), - node_body: Some(NodeBody::Exchange(ExchangeNode { + node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode { strategy: Some(dispatch_consistent_hash_shuffle( dist_key_indices, (0..(upstream.fields.len() as u32)).collect(), )), - })), + }))), input: vec![], append_only: upstream.append_only, } @@ -97,7 +97,7 @@ fn build_lookup_for_delta_join( identity: "Lookup".into(), fields: output_fields, stream_key: output_stream_key, - node_body: Some(NodeBody::Lookup(lookup_node)), + node_body: Some(NodeBody::Lookup(Box::new(lookup_node))), input: vec![ exchange_node_arrangement.clone(), exchange_node_stream.clone(), @@ -281,7 +281,9 @@ fn build_delta_join_inner( identity: "Union".into(), fields: node.fields.clone(), stream_key: node.stream_key.clone(), - node_body: Some(NodeBody::LookupUnion(LookupUnionNode { order: vec![1, 0] })), + node_body: Some(NodeBody::LookupUnion(Box::new(LookupUnionNode { + order: vec![1, 0], + }))), input: vec![exchange_l0m.clone(), exchange_l1m.clone()], append_only: node.append_only, }; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index a9a536f0b394d..d8c928c7e95df 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1639,11 +1639,11 @@ mod tests { let mut input = vec![]; for (upstream_fragment_id, upstream_actor_ids) in actor_upstream_actor_ids { input.push(PbStreamNode { - node_body: Some(PbNodeBody::Merge(MergeNode { + node_body: Some(PbNodeBody::Merge(Box::new(MergeNode { upstream_actor_id: upstream_actor_ids.iter().map(|id| *id as _).collect(), upstream_fragment_id: *upstream_fragment_id as _, ..Default::default() - })), + }))), ..Default::default() }); } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index b3c8b47b29af2..89200b4b63d6c 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -871,7 +871,7 @@ impl DdlController { format!("ProjectExecutor(from sink {})", sink_id); } - *merge_node = MergeNode { + **merge_node = MergeNode { upstream_actor_id: sink_actor_ids.clone(), upstream_fragment_id, upstream_dispatcher_type: DispatcherType::Hash as _, diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 093f45b27958d..3d290605a38a9 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -151,12 +151,12 @@ impl ActorBuilder { }]; Ok(StreamNode { - node_body: Some(NodeBody::Merge(MergeNode { + node_body: Some(NodeBody::Merge(Box::new(MergeNode { upstream_actor_id: upstreams.actors.as_global_ids(), upstream_fragment_id: upstreams.fragment_id.as_global_id(), upstream_dispatcher_type: exchange.get_strategy()?.r#type, fields: stream_node.get_fields().clone(), - })), + }))), identity: "MergeExecutor".to_owned(), ..stream_node.clone() }) @@ -197,12 +197,12 @@ impl ActorBuilder { let input = vec![ // Fill the merge node body with correct upstream info. StreamNode { - node_body: Some(NodeBody::Merge(MergeNode { + node_body: Some(NodeBody::Merge(Box::new(MergeNode { upstream_actor_id, upstream_fragment_id: upstreams.fragment_id.as_global_id(), upstream_dispatcher_type, fields: merge_node.fields.clone(), - })), + }))), ..merge_node.clone() }, batch_plan_node.clone(), @@ -246,12 +246,12 @@ impl ActorBuilder { let input = vec![ // Fill the merge node body with correct upstream info. StreamNode { - node_body: Some(NodeBody::Merge(MergeNode { + node_body: Some(NodeBody::Merge(Box::new(MergeNode { upstream_actor_id, upstream_fragment_id: upstreams.fragment_id.as_global_id(), upstream_dispatcher_type: DispatcherType::NoShuffle as _, fields: merge_node.fields.clone(), - })), + }))), ..merge_node.clone() }, ]; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 21103fe1460c1..e6d989f81162e 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -624,7 +624,7 @@ impl StreamFragmentGraph { } else { None }, - stream_scan.clone(), + *stream_scan.clone(), )); true } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index ce844a6c764cb..3d64f378fcf5f 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -206,14 +206,14 @@ fn make_stream_fragments() -> Vec { }) .collect_vec(); let source_node = StreamNode { - node_body: Some(NodeBody::Source(SourceNode { + node_body: Some(NodeBody::Source(Box::new(SourceNode { source_inner: Some(StreamSource { source_id: 1, state_table: Some(make_source_internal_table(0)), columns, ..Default::default() }), - })), + }))), stream_key: vec![2], ..Default::default() }; @@ -228,13 +228,13 @@ fn make_stream_fragments() -> Vec { // exchange node let exchange_node = StreamNode { - node_body: Some(NodeBody::Exchange(ExchangeNode { + node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode { strategy: Some(DispatchStrategy { r#type: DispatcherType::Hash as i32, dist_key_indices: vec![0], output_indices: vec![0, 1, 2], }), - })), + }))), fields: vec![ make_field(TypeName::Int32), make_field(TypeName::Int32), @@ -252,7 +252,7 @@ fn make_stream_fragments() -> Vec { children: vec![make_inputref(0), make_inputref(1)], }; let filter_node = StreamNode { - node_body: Some(NodeBody::Filter(FilterNode { + node_body: Some(NodeBody::Filter(Box::new(FilterNode { search_condition: Some(ExprNode { function_type: GreaterThan as i32, return_type: Some(DataType { @@ -261,7 +261,7 @@ fn make_stream_fragments() -> Vec { }), rex_node: Some(RexNode::FuncCall(function_call)), }), - })), + }))), fields: vec![], // TODO: fill this later input: vec![exchange_node], stream_key: vec![0, 1], @@ -272,14 +272,14 @@ fn make_stream_fragments() -> Vec { // simple agg node let simple_agg_node = StreamNode { - node_body: Some(NodeBody::SimpleAgg(SimpleAggNode { + node_body: Some(NodeBody::SimpleAgg(Box::new(SimpleAggNode { agg_calls: vec![make_sum_aggcall(0), make_sum_aggcall(1)], distribution_key: Default::default(), is_append_only: false, agg_call_states: vec![make_agg_call_result_state(), make_agg_call_result_state()], intermediate_state_table: Some(make_empty_table(1)), ..Default::default() - })), + }))), input: vec![filter_node], fields: vec![], // TODO: fill this later stream_key: vec![0, 1], @@ -299,12 +299,12 @@ fn make_stream_fragments() -> Vec { // exchange node let exchange_node_1 = StreamNode { - node_body: Some(NodeBody::Exchange(ExchangeNode { + node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode { strategy: Some(DispatchStrategy { r#type: DispatcherType::Simple as i32, ..Default::default() }), - })), + }))), fields: vec![make_field(TypeName::Int64), make_field(TypeName::Int64)], input: vec![], stream_key: vec![0, 1], @@ -315,14 +315,14 @@ fn make_stream_fragments() -> Vec { // agg node let simple_agg_node_1 = StreamNode { - node_body: Some(NodeBody::SimpleAgg(SimpleAggNode { + node_body: Some(NodeBody::SimpleAgg(Box::new(SimpleAggNode { agg_calls: vec![make_sum_aggcall(0), make_sum_aggcall(1)], distribution_key: Default::default(), is_append_only: false, agg_call_states: vec![make_agg_call_result_state(), make_agg_call_result_state()], intermediate_state_table: Some(make_empty_table(2)), ..Default::default() - })), + }))), fields: vec![], // TODO: fill this later input: vec![exchange_node_1], stream_key: vec![0, 1], @@ -336,7 +336,7 @@ fn make_stream_fragments() -> Vec { children: vec![make_inputref(0), make_inputref(1)], }; let project_node = StreamNode { - node_body: Some(NodeBody::Project(ProjectNode { + node_body: Some(NodeBody::Project(Box::new(ProjectNode { select_list: vec![ ExprNode { rex_node: Some(RexNode::FuncCall(function_call_1)), @@ -350,7 +350,7 @@ fn make_stream_fragments() -> Vec { make_inputref(1), ], ..Default::default() - })), + }))), fields: vec![], // TODO: fill this later input: vec![simple_agg_node_1], stream_key: vec![1, 2], @@ -363,11 +363,11 @@ fn make_stream_fragments() -> Vec { let mview_node = StreamNode { input: vec![project_node], stream_key: vec![], - node_body: Some(NodeBody::Materialize(MaterializeNode { + node_body: Some(NodeBody::Materialize(Box::new(MaterializeNode { table_id: 1, table: Some(make_materialize_table(888)), column_orders: vec![make_column_order(1), make_column_order(2)], - })), + }))), fields: vec![], // TODO: fill this later operator_id: 7, identity: "MaterializeExecutor".to_owned(), diff --git a/src/prost/Cargo.toml b/src/prost/Cargo.toml index 2918c41fcb89a..c9fcb69dafd9b 100644 --- a/src/prost/Cargo.toml +++ b/src/prost/Cargo.toml @@ -21,6 +21,9 @@ tonic = { workspace = true } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } +[dev-dependencies] +static_assertions = "1" + [build-dependencies] fs-err = "3.0" pbjson-build = "0.7" diff --git a/src/prost/build.rs b/src/prost/build.rs index e906c47efbd55..554fc1d67b446 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -112,6 +112,55 @@ fn main() -> Result<(), Box> { "#[derive(::enum_as_inner::EnumAsInner)]", ) .btree_map(btree_map_paths) + // node body is a very large enum, so we box it to avoid stack overflow. + // TODO: ideally we should box all enum variants automatically https://github.com/tokio-rs/prost/issues/1209 + .boxed(".stream_plan.StreamNode.node_body.source") + .boxed(".stream_plan.StreamNode.node_body.project") + .boxed(".stream_plan.StreamNode.node_body.filter") + .boxed(".stream_plan.StreamNode.node_body.materialize") + .boxed(".stream_plan.StreamNode.node_body.stateless_simple_agg") + .boxed(".stream_plan.StreamNode.node_body.simple_agg") + .boxed(".stream_plan.StreamNode.node_body.hash_agg") + .boxed(".stream_plan.StreamNode.node_body.append_only_top_n") + .boxed(".stream_plan.StreamNode.node_body.hash_join") + .boxed(".stream_plan.StreamNode.node_body.top_n") + .boxed(".stream_plan.StreamNode.node_body.hop_window") + .boxed(".stream_plan.StreamNode.node_body.merge") + .boxed(".stream_plan.StreamNode.node_body.exchange") + .boxed(".stream_plan.StreamNode.node_body.stream_scan") + .boxed(".stream_plan.StreamNode.node_body.batch_plan") + .boxed(".stream_plan.StreamNode.node_body.lookup") + .boxed(".stream_plan.StreamNode.node_body.arrange") + .boxed(".stream_plan.StreamNode.node_body.lookup_union") + .boxed(".stream_plan.StreamNode.node_body.delta_index_join") + .boxed(".stream_plan.StreamNode.node_body.sink") + .boxed(".stream_plan.StreamNode.node_body.expand") + .boxed(".stream_plan.StreamNode.node_body.dynamic_filter") + .boxed(".stream_plan.StreamNode.node_body.project_set") + .boxed(".stream_plan.StreamNode.node_body.group_top_n") + .boxed(".stream_plan.StreamNode.node_body.sort") + .boxed(".stream_plan.StreamNode.node_body.watermark_filter") + .boxed(".stream_plan.StreamNode.node_body.dml") + .boxed(".stream_plan.StreamNode.node_body.row_id_gen") + .boxed(".stream_plan.StreamNode.node_body.now") + .boxed(".stream_plan.StreamNode.node_body.append_only_group_top_n") + .boxed(".stream_plan.StreamNode.node_body.temporal_join") + .boxed(".stream_plan.StreamNode.node_body.barrier_recv") + .boxed(".stream_plan.StreamNode.node_body.values") + .boxed(".stream_plan.StreamNode.node_body.append_only_dedup") + .boxed(".stream_plan.StreamNode.node_body.eowc_over_window") + .boxed(".stream_plan.StreamNode.node_body.over_window") + .boxed(".stream_plan.StreamNode.node_body.stream_fs_fetch") + .boxed(".stream_plan.StreamNode.node_body.stream_cdc_scan") + .boxed(".stream_plan.StreamNode.node_body.cdc_filter") + .boxed(".stream_plan.StreamNode.node_body.source_backfill") + .boxed(".stream_plan.StreamNode.node_body.changelog") + .boxed(".stream_plan.StreamNode.node_body.local_approx_percentile") + .boxed(".stream_plan.StreamNode.node_body.global_approx_percentile") + .boxed(".stream_plan.StreamNode.node_body.row_merge") + .boxed(".stream_plan.StreamNode.node_body.as_of_join") + // `Udf` is 248 bytes, while 2nd largest field is 32 bytes. + .boxed(".expr.ExprNode.rex_node.udf") // Eq + Hash are for plan nodes to do common sub-plan detection. // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 8fa584522354d..294d5d9514bcf 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -13,8 +13,15 @@ // limitations under the License. // for derived code of `Message` -#![expect(clippy::all)] #![expect(clippy::doc_markdown)] +#![expect(clippy::upper_case_acronyms)] +#![expect(clippy::needless_lifetimes)] +// For tonic::transport::Endpoint::connect +#![expect(clippy::disallowed_methods)] +#![expect(clippy::enum_variant_names)] +#![expect(clippy::module_inception)] +// FIXME: This should be fixed!!! https://github.com/risingwavelabs/risingwave/issues/19906 +#![expect(clippy::large_enum_variant)] use std::collections::HashMap; use std::str::FromStr; @@ -496,11 +503,12 @@ impl std::fmt::Debug for plan_common::ColumnDesc { s.field("additional_column_type", additional_column_type); } s.field("version", version); - if let Some(AdditionalColumn { column_type }) = additional_column { + if let Some(AdditionalColumn { + column_type: Some(column_type), + }) = additional_column + { // AdditionalColumn { None } means a normal column - if let Some(column_type) = column_type { - s.field("additional_column", &column_type); - } + s.field("additional_column", &column_type); } if let Some(generated_or_default_column) = generated_or_default_column { s.field("generated_or_default_column", &generated_or_default_column); @@ -513,11 +521,14 @@ impl std::fmt::Debug for plan_common::ColumnDesc { mod tests { use crate::data::{data_type, DataType}; use crate::plan_common::Field; + use crate::stream_plan::stream_node::NodeBody; #[test] fn test_getter() { - let mut data_type: DataType = DataType::default(); - data_type.is_nullable = true; + let data_type: DataType = DataType { + is_nullable: true, + ..Default::default() + }; let field = Field { data_type: Some(data_type), name: "".to_owned(), @@ -551,4 +562,12 @@ mod tests { }; assert!(!new_data_type.is_nullable); } + + #[test] + fn test_size() { + use static_assertions::const_assert_eq; + // box all fields in NodeBody to avoid large_enum_variant + // see https://github.com/risingwavelabs/risingwave/issues/19910 + const_assert_eq!(std::mem::size_of::(), 16); + } }