diff --git a/e2e_test/batch/transaction/now.slt b/e2e_test/batch/transaction/now.slt index d1be437ba371c..4f8d317f04261 100644 --- a/e2e_test/batch/transaction/now.slt +++ b/e2e_test/batch/transaction/now.slt @@ -1,5 +1,3 @@ -# Disabled, see https://github.com/risingwavelabs/risingwave/issues/10887 - statement ok create table t (ts timestamp); diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index fffa62f4794f8..99b090e21a240 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -332,13 +332,7 @@ impl Barrier { } } - /// Whether this barrier is for configuration change. Used for source executor initialization. - pub fn is_update(&self) -> bool { - matches!(self.mutation.as_deref(), Some(Mutation::Update { .. })) - } - - /// Whether this barrier is for resume. Used for now executor to determine whether to yield a - /// chunk and a watermark before this barrier. + /// Whether this barrier is for resume. pub fn is_resume(&self) -> bool { matches!(self.mutation.as_deref(), Some(Mutation::Resume)) } diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index d2cbf05d71f80..636dabd59095e 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -90,6 +90,7 @@ impl NowExecutor { } }; last_timestamp = state_row.and_then(|row| row[0].clone()); + paused = barrier.is_pause_on_startup(); initialized = true; } else if paused { // Assert that no data is updated. @@ -104,7 +105,7 @@ impl NowExecutor { // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause | Mutation::Update { .. } => paused = true, + Mutation::Pause => paused = true, Mutation::Resume => paused = false, _ => {} } diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index b30ad4c1ed2f3..624b2531bf7bd 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -83,10 +83,23 @@ impl ValuesExecutor { .unwrap(); let emit = barrier.is_newly_added(self.ctx.id); + let paused_on_startup = barrier.is_pause_on_startup(); yield Message::Barrier(barrier); + // If it's failover, do not evaluate rows (assume they have been yielded) if emit { + if paused_on_startup { + // Wait for the data stream to be resumed before yielding the chunks. + while let Some(barrier) = barrier_receiver.recv().await { + let is_resume = barrier.is_resume(); + yield Message::Barrier(barrier); + if is_resume { + break; + } + } + } + let cardinality = schema.len(); ensure!(cardinality > 0); while !rows.is_empty() { diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index f375eeac4cc85..4eb60e7af14dc 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -655,6 +655,12 @@ impl Session { self.query_tx.send((sql.into(), tx)).await?; rx.await? } + + /// Run `FLUSH` on the session. + pub async fn flush(&mut self) -> Result<()> { + self.run("FLUSH").await?; + Ok(()) + } } /// Options for killing nodes. diff --git a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs index d0288e6931e88..0eea61da67dfb 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/pause_on_bootstrap.rs @@ -15,29 +15,43 @@ use std::time::Duration; use anyhow::Result; -use risingwave_simulation::cluster::Configuration; +use risingwave_simulation::cluster::{Cluster, Configuration}; use risingwave_simulation::nexmark::NexmarkCluster; use risingwave_simulation::utils::AssertResult; use tokio::time::{sleep, timeout}; -const CREATE_TABLE: &str = "CREATE TABLE t (v int)"; -const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)"; -const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t"; - -const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid"; -const SELECT: &str = "SELECT * FROM count_bid"; - -const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction"; -const SELECT_2: &str = "SELECT * FROM count_auction"; - const SET_PARAMETER: &str = "ALTER SYSTEM SET pause_on_next_bootstrap TO true"; +#[derive(Clone, Copy)] enum ResumeBy { Risectl, Restart, } +impl ResumeBy { + async fn resume(self, cluster: &mut Cluster) -> Result<()> { + match self { + ResumeBy::Risectl => cluster.resume().await?, + ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await, + }; + Ok(()) + } +} + async fn test_impl(resume_by: ResumeBy) -> Result<()> { + const CREATE_TABLE: &str = "CREATE TABLE t (v int)"; + const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)"; + const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t"; + + const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid"; + const SELECT: &str = "SELECT * FROM count_bid"; + + const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction"; + const SELECT_2: &str = "SELECT * FROM count_auction"; + + const CREATE_VALUES: &str = "CREATE MATERIALIZED VIEW values as VALUES (1), (2), (3)"; + const SELECT_VALUES: &str = "SELECT count(*) FROM values"; + let mut cluster = NexmarkCluster::new( Configuration { meta_nodes: 1, @@ -77,18 +91,21 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { // New streaming jobs should also start from paused. cluster.run(CREATE_2).await?; sleep(Duration::from_secs(10)).await; - cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the + cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the aggregation // result will be 0 instead of empty or NULL + // `VALUES` should also be paused. + tokio::time::timeout(Duration::from_secs(10), cluster.run(CREATE_VALUES)) + .await + .expect_err("`VALUES` should be paused so creation should never complete"); + // DML on tables should be blocked. let result = timeout(Duration::from_secs(10), cluster.run(INSERT_INTO_TABLE)).await; assert!(result.is_err()); cluster.run(SELECT_COUNT_TABLE).await?.assert_result_eq("0"); - match resume_by { - ResumeBy::Risectl => cluster.resume().await?, - ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await, - } + // Resume the cluster. + resume_by.resume(&mut cluster).await?; sleep(Duration::from_secs(10)).await; // The source should be resumed. @@ -100,17 +117,22 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> { { let mut session = cluster.start_session(); - session.run("FLUSH").await?; + session.flush().await?; let count: i64 = session.run(SELECT_COUNT_TABLE).await?.parse().unwrap(); session.run(INSERT_INTO_TABLE).await?; - session.run("FLUSH").await?; + session.flush().await?; session .run(SELECT_COUNT_TABLE) .await? .assert_result_eq(format!("{}", count + 1)); } + if let ResumeBy::Risectl = resume_by { + // `VALUES` should be successfully created + cluster.run(SELECT_VALUES).await?.assert_result_eq("3"); + } + Ok(()) } @@ -123,3 +145,64 @@ async fn test_pause_on_bootstrap_resume_by_risectl() -> Result<()> { async fn test_pause_on_bootstrap_resume_by_restart() -> Result<()> { test_impl(ResumeBy::Restart).await } + +// The idea is similar to `e2e_test/batch/transaction/now.slt`. +async fn test_temporal_filter(resume_by: ResumeBy) -> Result<()> { + const CREATE_TABLE: &str = "create table t (ts timestamp)"; + const CREATE_TEMPORAL_FILTER: &str = + "create materialized view mv as select count(*) from t where ts at time zone 'utc' >= now()"; + const INSERT_TIMESTAMPS: &str = " + insert into t select * from generate_series( + now() at time zone 'utc' - interval '10' second, + now() at time zone 'utc' + interval '20' second, + interval '1' second / 20 + ); + "; + const SELECT: &str = "select * from mv"; + + let mut cluster = Cluster::start(Configuration { + meta_nodes: 1, + ..Configuration::for_scale() + }) + .await?; + + cluster.run(SET_PARAMETER).await?; + + { + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(CREATE_TEMPORAL_FILTER).await?; + session.run(INSERT_TIMESTAMPS).await?; + session.flush().await?; + }; + + // Kill the meta node and wait for the service to recover. + cluster.kill_nodes(["meta-1"], 0).await; + sleep(Duration::from_secs(10)).await; + + let count: i32 = cluster.run(SELECT).await?.parse()?; + assert_ne!(count, 0, "the following tests are meaningless"); + + sleep(Duration::from_secs(10)).await; + let new_count: i32 = cluster.run(SELECT).await?.parse()?; + assert_eq!(count, new_count, "temporal filter should have been paused"); + + // Resume the cluster. + resume_by.resume(&mut cluster).await?; + sleep(Duration::from_secs(40)).await; // 40 seconds is enough for all timestamps to be expired + + let count: i32 = cluster.run(SELECT).await?.parse()?; + assert_eq!(count, 0, "temporal filter should have been resumed"); + + Ok(()) +} + +#[tokio::test] +async fn test_pause_on_bootstrap_temporal_filter_resume_by_risectl() -> Result<()> { + test_temporal_filter(ResumeBy::Risectl).await +} + +#[tokio::test] +async fn test_pause_on_bootstrap_temporal_filter_resume_by_restart() -> Result<()> { + test_temporal_filter(ResumeBy::Restart).await +}