From 56ba6d9020232c2cf6c5f82281b6ed8cd54bfb87 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 10 Oct 2023 13:21:34 +0800 Subject: [PATCH 1/6] handle pause on bootstrap for now and values Signed-off-by: Bugen Zhao --- src/stream/src/executor/mod.rs | 8 +------- src/stream/src/executor/now.rs | 3 ++- src/stream/src/executor/values.rs | 17 +++++++++++++++-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index fffa62f4794f8..99b090e21a240 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -332,13 +332,7 @@ impl Barrier { } } - /// Whether this barrier is for configuration change. Used for source executor initialization. - pub fn is_update(&self) -> bool { - matches!(self.mutation.as_deref(), Some(Mutation::Update { .. })) - } - - /// Whether this barrier is for resume. Used for now executor to determine whether to yield a - /// chunk and a watermark before this barrier. + /// Whether this barrier is for resume. pub fn is_resume(&self) -> bool { matches!(self.mutation.as_deref(), Some(Mutation::Resume)) } diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index d2cbf05d71f80..636dabd59095e 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -90,6 +90,7 @@ impl NowExecutor { } }; last_timestamp = state_row.and_then(|row| row[0].clone()); + paused = barrier.is_pause_on_startup(); initialized = true; } else if paused { // Assert that no data is updated. @@ -104,7 +105,7 @@ impl NowExecutor { // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause | Mutation::Update { .. } => paused = true, + Mutation::Pause => paused = true, Mutation::Resume => paused = false, _ => {} } diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index b30ad4c1ed2f3..aeb4a6b712d06 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -25,8 +25,8 @@ use risingwave_expr::expr::BoxedExpression; use tokio::sync::mpsc::UnboundedReceiver; use super::{ - ActorContextRef, Barrier, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, - StreamExecutorError, + ActorContextRef, Barrier, BoxedMessageStream, Executor, Message, Mutation, PkIndices, + PkIndicesRef, StreamExecutorError, }; use crate::task::CreateMviewProgress; @@ -83,10 +83,23 @@ impl ValuesExecutor { .unwrap(); let emit = barrier.is_newly_added(self.ctx.id); + let paused_on_startup = barrier.is_pause_on_startup(); yield Message::Barrier(barrier); + // If it's failover, do not evaluate rows (assume they have been yielded) if emit { + if paused_on_startup { + // Wait for the data stream to be resumed before yielding the chunks. + while let Some(barrier) = barrier_receiver.recv().await { + let is_resume = barrier.is_resume(); + yield Message::Barrier(barrier); + if is_resume { + break; + } + } + } + let cardinality = schema.len(); ensure!(cardinality > 0); while !rows.is_empty() { From 629121296831a48f2fdea41a5389f0973e9b3652 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 10 Oct 2023 13:57:43 +0800 Subject: [PATCH 2/6] add simulation test Signed-off-by: Bugen Zhao --- e2e_test/batch/transaction/now.slt | 2 - src/tests/simulation/src/cluster.rs | 6 + .../recovery/pause_on_bootstrap.rs | 106 +++++++++++++++--- 3 files changed, 96 insertions(+), 18 deletions(-) diff --git a/e2e_test/batch/transaction/now.slt b/e2e_test/batch/transaction/now.slt index d1be437ba371c..4f8d317f04261 100644 --- a/e2e_test/batch/transaction/now.slt +++ b/e2e_test/batch/transaction/now.slt @@ -1,5 +1,3 @@ -# Disabled, see https://github.com/risingwavelabs/risingwave/issues/10887 - statement ok create table t (ts timestamp); diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index f375eeac4cc85..4eb60e7af14dc 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -655,6 +655,12 @@ impl Session { self.query_tx.send((sql.into(), tx)).await?; rx.await? } + + /// Run `FLUSH` on the session. + pub async fn flush(&mut self) -> Result<()> { + self.run("FLUSH").await?; + Ok(()) + } } /// Options for killing nodes. diff --git a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs index d0288e6931e88..c4b7c11a956c7 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs @@ -15,21 +15,11 @@ use std::time::Duration; use anyhow::Result; -use risingwave_simulation::cluster::Configuration; +use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::nexmark::NexmarkCluster; use risingwave_simulation::utils::AssertResult; use tokio::time::{sleep, timeout}; -const CREATE_TABLE: &str = "CREATE TABLE t (v int)"; -const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)"; -const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t"; - -const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid"; -const SELECT: &str = "SELECT * FROM count_bid"; - -const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction"; -const SELECT_2: &str = "SELECT * FROM count_auction"; - const SET_PARAMETER: &str = "ALTER SYSTEM SET pause_on_next_bootstrap TO true"; enum ResumeBy { @@ -37,7 +27,29 @@ enum ResumeBy { Restart, } +impl ResumeBy { + async fn resume(&self, cluster: &mut Cluster) -> Result<()> { + Ok(match self { + ResumeBy::Risectl => cluster.resume().await?, + ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await, + }) + } +} + async fn test_impl(resume_by: ResumeBy) -> Result<()> { + const CREATE_TABLE: &str = "CREATE TABLE t (v int)"; + const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)"; + const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t"; + + const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid"; + const SELECT: &str = "SELECT * FROM count_bid"; + + const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction"; + const SELECT_2: &str = "SELECT * FROM count_auction"; + + const CREATE_3: &str = "CREATE MATERIALIZED VIEW values as VALUES (1), (2), (3)"; + const SELECT_3: &str = "SELECT count(*) FROM values"; + let mut cluster = NexmarkCluster::new( Configuration { meta_nodes: 1, @@ -77,18 +89,19 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { // New streaming jobs should also start from paused. cluster.run(CREATE_2).await?; sleep(Duration::from_secs(10)).await; - cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the + cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the aggregation // result will be 0 instead of empty or NULL + cluster.run(CREATE_3).await?; + sleep(Duration::from_secs(10)).await; + cluster.run(SELECT_3).await?.assert_result_eq("0"); // `VALUES` should be paused // DML on tables should be blocked. let result = timeout(Duration::from_secs(10), cluster.run(INSERT_INTO_TABLE)).await; assert!(result.is_err()); cluster.run(SELECT_COUNT_TABLE).await?.assert_result_eq("0"); - match resume_by { - ResumeBy::Risectl => cluster.resume().await?, - ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await, - } + // Resume the cluster. + resume_by.resume(&mut cluster).await?; sleep(Duration::from_secs(10)).await; // The source should be resumed. @@ -123,3 +136,64 @@ async fn test_pause_on_bootstrap_resume_by_risectl() -> Result<()> { async fn test_pause_on_bootstrap_resume_by_restart() -> Result<()> { test_impl(ResumeBy::Restart).await } + +// The idea is similar to `e2e_test/batch/transaction/now.slt`. +async fn test_temporal_filter(resume_by: ResumeBy) -> Result<()> { + const CREATE_TABLE: &str = "create table t (ts timestamp)"; + const CREATE_TEMPORAL_FILTER: &str = + "create materialized view mv as select count(*) from t where ts at time zone 'utc' >= now()"; + const INSERT_TIMESTAMPS: &str = " + insert into t select * from generate_series( + now() at time zone 'utc' - interval '10' second, + now() at time zone 'utc' + interval '20' second, + interval '1' second / 20 + ); + "; + const SELECT: &str = "select * from mv"; + + let mut cluster = Cluster::start(Configuration { + meta_nodes: 1, + ..Configuration::for_scale() + }) + .await?; + + cluster.run(SET_PARAMETER).await?; + + { + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(CREATE_TEMPORAL_FILTER).await?; + session.run(INSERT_TIMESTAMPS).await?; + session.flush().await?; + }; + + // Kill the meta node and wait for the service to recover. + cluster.kill_nodes(["meta-1"], 0).await; + sleep(Duration::from_secs(10)).await; + + let count: i32 = cluster.run(SELECT).await?.parse()?; + assert_ne!(count, 0, "the following tests are meaningless"); + + sleep(Duration::from_secs(10)).await; + let new_count: i32 = cluster.run(SELECT).await?.parse()?; + assert_eq!(count, new_count, "temporal filter should have been paused"); + + // Resume the cluster. + resume_by.resume(&mut cluster).await?; + sleep(Duration::from_secs(40)).await; // 40 seconds is enough for all timestamps to be expired + + let count: i32 = cluster.run(SELECT).await?.parse()?; + assert_eq!(count, 0, "temporal filter should have been resumed"); + + Ok(()) +} + +#[tokio::test] +async fn test_pause_on_bootstrap_temporal_filter_resume_by_risectl() -> Result<()> { + test_temporal_filter(ResumeBy::Risectl).await +} + +#[tokio::test] +async fn test_pause_on_bootstrap_temporal_filter_resume_by_restart() -> Result<()> { + test_temporal_filter(ResumeBy::Restart).await +} From 5c1a548146f77f2ff818bef086af8378e1d36815 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 10 Oct 2023 14:18:40 +0800 Subject: [PATCH 3/6] fix clippy Signed-off-by: Bugen Zhao --- src/stream/src/executor/values.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index aeb4a6b712d06..624b2531bf7bd 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -25,8 +25,8 @@ use risingwave_expr::expr::BoxedExpression; use tokio::sync::mpsc::UnboundedReceiver; use super::{ - ActorContextRef, Barrier, BoxedMessageStream, Executor, Message, Mutation, PkIndices, - PkIndicesRef, StreamExecutorError, + ActorContextRef, Barrier, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, + StreamExecutorError, }; use crate::task::CreateMviewProgress; From 218cc173cba0222fef442c16a9e93d1f27323edb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 10 Oct 2023 14:53:05 +0800 Subject: [PATCH 4/6] fix values test Signed-off-by: Bugen Zhao --- .../recovery/pause_on_bootstrap.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs index c4b7c11a956c7..003b6f5d2affe 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs @@ -47,8 +47,8 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction"; const SELECT_2: &str = "SELECT * FROM count_auction"; - const CREATE_3: &str = "CREATE MATERIALIZED VIEW values as VALUES (1), (2), (3)"; - const SELECT_3: &str = "SELECT count(*) FROM values"; + const CREATE_VALUES: &str = "CREATE MATERIALIZED VIEW values as VALUES (1), (2), (3)"; + const SELECT_VALUES: &str = "SELECT count(*) FROM values"; let mut cluster = NexmarkCluster::new( Configuration { @@ -91,9 +91,11 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { sleep(Duration::from_secs(10)).await; cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the aggregation // result will be 0 instead of empty or NULL - cluster.run(CREATE_3).await?; - sleep(Duration::from_secs(10)).await; - cluster.run(SELECT_3).await?.assert_result_eq("0"); // `VALUES` should be paused + + // `VALUES` should also be paused. + tokio::time::timeout(Duration::from_secs(10), cluster.run(CREATE_VALUES)) + .await + .expect_err("`VALUES` should be paused so creation should never complete"); // DML on tables should be blocked. let result = timeout(Duration::from_secs(10), cluster.run(INSERT_INTO_TABLE)).await; @@ -113,17 +115,20 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { { let mut session = cluster.start_session(); - session.run("FLUSH").await?; + session.flush().await?; let count: i64 = session.run(SELECT_COUNT_TABLE).await?.parse().unwrap(); session.run(INSERT_INTO_TABLE).await?; - session.run("FLUSH").await?; + session.flush().await?; session .run(SELECT_COUNT_TABLE) .await? .assert_result_eq(format!("{}", count + 1)); } + // `VALUES` should be successfully created + cluster.run(SELECT_VALUES).await?.assert_result_eq("3"); + Ok(()) } From 54dfb545ab549e99f930535421dbc9282665f427 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 10 Oct 2023 14:55:32 +0800 Subject: [PATCH 5/6] fix clippy Signed-off-by: Bugen Zhao --- .../tests/integration_tests/recovery/pause_on_bootstrap.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs index 003b6f5d2affe..7fca9192430ee 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs @@ -29,10 +29,11 @@ enum ResumeBy { impl ResumeBy { async fn resume(&self, cluster: &mut Cluster) -> Result<()> { - Ok(match self { + match self { ResumeBy::Risectl => cluster.resume().await?, ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await, - }) + }; + Ok(()) } } From 01759e6a7287db242e2acc821e49371577ccb2d9 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 10 Oct 2023 15:16:34 +0800 Subject: [PATCH 6/6] only check for risectl restart Signed-off-by: Bugen Zhao --- .../integration_tests/recovery/pause_on_bootstrap.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs index 7fca9192430ee..0eea61da67dfb 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs @@ -22,13 +22,14 @@ use tokio::time::{sleep, timeout}; const SET_PARAMETER: &str = "ALTER SYSTEM SET pause_on_next_bootstrap TO true"; +#[derive(Clone, Copy)] enum ResumeBy { Risectl, Restart, } impl ResumeBy { - async fn resume(&self, cluster: &mut Cluster) -> Result<()> { + async fn resume(self, cluster: &mut Cluster) -> Result<()> { match self { ResumeBy::Risectl => cluster.resume().await?, ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await, @@ -127,8 +128,10 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { .assert_result_eq(format!("{}", count + 1)); } - // `VALUES` should be successfully created - cluster.run(SELECT_VALUES).await?.assert_result_eq("3"); + if let ResumeBy::Risectl = resume_by { + // `VALUES` should be successfully created + cluster.run(SELECT_VALUES).await?.assert_result_eq("3"); + } Ok(()) }