diff --git a/e2e_test/background_ddl/basic.slt b/e2e_test/background_ddl/basic.slt index 3c98b6943610e..c398a95cf1f5d 100644 --- a/e2e_test/background_ddl/basic.slt +++ b/e2e_test/background_ddl/basic.slt @@ -56,6 +56,48 @@ DROP MATERIALIZED VIEW m2; statement ok DROP TABLE t; +################### Test stream now + +statement ok +create table t (v1 timestamp with time zone, v2 timestamp with time zone); + +statement ok +insert into t select to_timestamp(x) as z, to_timestamp(x) as y from generate_series(1, 1000) t(x); + +statement ok +set backfill_rate_limit=1; + +statement ok +create materialized view m1 as select * from t where v1 >= now() and v2 >= now(); + +sleep 2s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +statement ok +recover; + +sleep 5s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +statement ok +drop materialized view m1; + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +0 + +statement ok +drop table t; + statement ok SET BACKGROUND_DDL=false; diff --git a/e2e_test/source_inline/kafka/background_ddl/basic.slt.serial b/e2e_test/source_inline/kafka/background_ddl/basic.slt.serial new file mode 100644 index 0000000000000..75a7601661a89 --- /dev/null +++ b/e2e_test/source_inline/kafka/background_ddl/basic.slt.serial @@ -0,0 +1,130 @@ +control substitution on + +statement ok +SET streaming_use_shared_source TO false; + +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + type = 'append-only', + force_append_only='true' +); + +# Wait for the topic to create +skipif in-memory +sleep 5s + +############## Create kafka source + +statement ok +create source kafka_source (v1 int) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + scan.startup.mode = 'earliest', + source_rate_limit = 1, +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Create background mv with only source upstream + +statement ok +set background_ddl = true; + +statement ok +create materialized view rl_mv1 as select count(*) from kafka_source; + +statement ok +wait + +statement ok +select * from rl_mv1; + +statement ok +drop materialized view rl_mv1; + +############## Create background mv joined with another mv + +statement ok +set streaming_parallelism=1; + +statement ok +create table t1(v1 int); + +statement ok +insert into t1 select * from generate_series(1, 10); + +statement ok +flush; + +statement ok +set backfill_rate_limit = 1; + +statement ok +set background_ddl=true; + +statement ok +create materialized view rl_mv2 as select kafka_source.v1 from kafka_source join t1 on kafka_source.v1 = t1.v1; + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +sleep 1s + +statement ok +recover + +sleep 5s + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +1 + +statement ok +wait + +query I +select count(*) from rw_catalog.rw_ddl_progress; +---- +0 + +statement ok +drop materialized view rl_mv2; + +statement ok +drop table t1; + +############## Cleanup + +statement ok +drop source kafka_source; + +statement ok +drop sink kafka_sink; + +statement ok +drop table kafka_seed_data; + +statement ok +set background_ddl=false; + +statement ok +SET streaming_use_shared_source TO true; + +statement ok +set streaming_parallelism=default; diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index fa5f60c2a99ef..342bfbedd1825 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -34,7 +34,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion}; use crate::error::Result; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl; +use crate::optimizer::plan_node::utils::plan_can_use_background_ddl; use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta}; use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -89,7 +89,7 @@ impl StreamMaterialize { let create_type = if matches!(table_type, TableType::MaterializedView) && input.ctx().session_ctx().config().background_ddl() - && plan_can_use_backgronud_ddl(&input) + && plan_can_use_background_ddl(&input) { CreateType::Background } else { diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 39cf84fe13705..48dc4dad85c5c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -46,7 +46,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec use crate::error::{ErrorCode, Result}; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl; +use crate::optimizer::plan_node::utils::plan_can_use_background_ddl; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -385,7 +385,7 @@ impl StreamSink { let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?; let distribution_key = input.distribution().dist_column_indices().to_vec(); let create_type = if input.ctx().session_ctx().config().background_ddl() - && plan_can_use_backgronud_ddl(&input) + && plan_can_use_background_ddl(&input) { CreateType::Background } else { diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index a96e1284a9f7e..40b350ade287a 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -380,9 +380,12 @@ pub fn infer_kv_log_store_table_catalog_inner( /// since that plan node maps to `backfill` executor, which supports recovery. /// Some other leaf nodes like `StreamValues` do not support recovery, and they /// cannot use background ddl. -pub(crate) fn plan_can_use_backgronud_ddl(plan: &PlanRef) -> bool { +pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool { if plan.inputs().is_empty() { - if plan.as_stream_source_scan().is_some() { + if plan.as_stream_source_scan().is_some() + || plan.as_stream_now().is_some() + || plan.as_stream_source().is_some() + { true } else if let Some(scan) = plan.as_stream_table_scan() { scan.stream_scan_type() == StreamScanType::Backfill @@ -392,7 +395,7 @@ pub(crate) fn plan_can_use_backgronud_ddl(plan: &PlanRef) -> bool { } } else { assert!(!plan.inputs().is_empty()); - plan.inputs().iter().all(plan_can_use_backgronud_ddl) + plan.inputs().iter().all(plan_can_use_background_ddl) } }