Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streaming): handle pause on bootstrap for Now and Values (#12716) #12736

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions e2e_test/batch/transaction/now.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Disabled, see https://github.com/risingwavelabs/risingwave/issues/10887

statement ok
create table t (ts timestamp);

Expand Down
8 changes: 1 addition & 7 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,7 @@ impl Barrier {
}
}

/// Whether this barrier is for configuration change. Used for source executor initialization.
pub fn is_update(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Update { .. }))
}

/// Whether this barrier is for resume. Used for now executor to determine whether to yield a
/// chunk and a watermark before this barrier.
/// Whether this barrier is for resume.
pub fn is_resume(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Resume))
}
Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl<S: StateStore> NowExecutor<S> {
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
Expand All @@ -104,7 +105,7 @@ impl<S: StateStore> NowExecutor<S> {
// Update paused state.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause | Mutation::Update { .. } => paused = true,
Mutation::Pause => paused = true,
Mutation::Resume => paused = false,
_ => {}
}
Expand Down
13 changes: 13 additions & 0 deletions src/stream/src/executor/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,23 @@ impl ValuesExecutor {
.unwrap();

let emit = barrier.is_newly_added(self.ctx.id);
let paused_on_startup = barrier.is_pause_on_startup();

yield Message::Barrier(barrier);

// If it's failover, do not evaluate rows (assume they have been yielded)
if emit {
if paused_on_startup {
// Wait for the data stream to be resumed before yielding the chunks.
while let Some(barrier) = barrier_receiver.recv().await {
let is_resume = barrier.is_resume();
yield Message::Barrier(barrier);
if is_resume {
break;
}
}
}

let cardinality = schema.len();
ensure!(cardinality > 0);
while !rows.is_empty() {
Expand Down
6 changes: 6 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ impl Session {
self.query_tx.send((sql.into(), tx)).await?;
rx.await?
}

/// Run `FLUSH` on the session.
pub async fn flush(&mut self) -> Result<()> {
self.run("FLUSH").await?;
Ok(())
}
}

/// Options for killing nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,43 @@
use std::time::Duration;

use anyhow::Result;
use risingwave_simulation::cluster::Configuration;
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::nexmark::NexmarkCluster;
use risingwave_simulation::utils::AssertResult;
use tokio::time::{sleep, timeout};

const CREATE_TABLE: &str = "CREATE TABLE t (v int)";
const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)";
const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t";

const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid";
const SELECT: &str = "SELECT * FROM count_bid";

const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction";
const SELECT_2: &str = "SELECT * FROM count_auction";

const SET_PARAMETER: &str = "ALTER SYSTEM SET pause_on_next_bootstrap TO true";

#[derive(Clone, Copy)]
enum ResumeBy {
Risectl,
Restart,
}

impl ResumeBy {
async fn resume(self, cluster: &mut Cluster) -> Result<()> {
match self {
ResumeBy::Risectl => cluster.resume().await?,
ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await,
};
Ok(())
}
}

async fn test_impl(resume_by: ResumeBy) -> Result<()> {
const CREATE_TABLE: &str = "CREATE TABLE t (v int)";
const INSERT_INTO_TABLE: &str = "INSERT INTO t VALUES (1)";
const SELECT_COUNT_TABLE: &str = "SELECT COUNT(*) FROM t";

const CREATE: &str = "CREATE MATERIALIZED VIEW count_bid as SELECT COUNT(*) FROM bid";
const SELECT: &str = "SELECT * FROM count_bid";

const CREATE_2: &str = "CREATE MATERIALIZED VIEW count_auction as SELECT COUNT(*) FROM auction";
const SELECT_2: &str = "SELECT * FROM count_auction";

const CREATE_VALUES: &str = "CREATE MATERIALIZED VIEW values as VALUES (1), (2), (3)";
const SELECT_VALUES: &str = "SELECT count(*) FROM values";

let mut cluster = NexmarkCluster::new(
Configuration {
meta_nodes: 1,
Expand Down Expand Up @@ -77,18 +91,21 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> {
// New streaming jobs should also start from paused.
cluster.run(CREATE_2).await?;
sleep(Duration::from_secs(10)).await;
cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the
cluster.run(SELECT_2).await?.assert_result_eq("0"); // even there's no data from source, the aggregation
// result will be 0 instead of empty or NULL

// `VALUES` should also be paused.
tokio::time::timeout(Duration::from_secs(10), cluster.run(CREATE_VALUES))
.await
.expect_err("`VALUES` should be paused so creation should never complete");

// DML on tables should be blocked.
let result = timeout(Duration::from_secs(10), cluster.run(INSERT_INTO_TABLE)).await;
assert!(result.is_err());
cluster.run(SELECT_COUNT_TABLE).await?.assert_result_eq("0");

match resume_by {
ResumeBy::Risectl => cluster.resume().await?,
ResumeBy::Restart => cluster.kill_nodes(["meta-1"], 0).await,
}
// Resume the cluster.
resume_by.resume(&mut cluster).await?;
sleep(Duration::from_secs(10)).await;

// The source should be resumed.
Expand All @@ -100,17 +117,22 @@ async fn test_impl(resume_by: ResumeBy) -> Result<()> {
{
let mut session = cluster.start_session();

session.run("FLUSH").await?;
session.flush().await?;
let count: i64 = session.run(SELECT_COUNT_TABLE).await?.parse().unwrap();

session.run(INSERT_INTO_TABLE).await?;
session.run("FLUSH").await?;
session.flush().await?;
session
.run(SELECT_COUNT_TABLE)
.await?
.assert_result_eq(format!("{}", count + 1));
}

if let ResumeBy::Risectl = resume_by {
// `VALUES` should be successfully created
cluster.run(SELECT_VALUES).await?.assert_result_eq("3");
}

Ok(())
}

Expand All @@ -123,3 +145,64 @@ async fn test_pause_on_bootstrap_resume_by_risectl() -> Result<()> {
async fn test_pause_on_bootstrap_resume_by_restart() -> Result<()> {
test_impl(ResumeBy::Restart).await
}

// The idea is similar to `e2e_test/batch/transaction/now.slt`.
async fn test_temporal_filter(resume_by: ResumeBy) -> Result<()> {
const CREATE_TABLE: &str = "create table t (ts timestamp)";
const CREATE_TEMPORAL_FILTER: &str =
"create materialized view mv as select count(*) from t where ts at time zone 'utc' >= now()";
const INSERT_TIMESTAMPS: &str = "
insert into t select * from generate_series(
now() at time zone 'utc' - interval '10' second,
now() at time zone 'utc' + interval '20' second,
interval '1' second / 20
);
";
const SELECT: &str = "select * from mv";

let mut cluster = Cluster::start(Configuration {
meta_nodes: 1,
..Configuration::for_scale()
})
.await?;

cluster.run(SET_PARAMETER).await?;

{
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(CREATE_TEMPORAL_FILTER).await?;
session.run(INSERT_TIMESTAMPS).await?;
session.flush().await?;
};

// Kill the meta node and wait for the service to recover.
cluster.kill_nodes(["meta-1"], 0).await;
sleep(Duration::from_secs(10)).await;

let count: i32 = cluster.run(SELECT).await?.parse()?;
assert_ne!(count, 0, "the following tests are meaningless");

sleep(Duration::from_secs(10)).await;
let new_count: i32 = cluster.run(SELECT).await?.parse()?;
assert_eq!(count, new_count, "temporal filter should have been paused");

// Resume the cluster.
resume_by.resume(&mut cluster).await?;
sleep(Duration::from_secs(40)).await; // 40 seconds is enough for all timestamps to be expired

let count: i32 = cluster.run(SELECT).await?.parse()?;
assert_eq!(count, 0, "temporal filter should have been resumed");

Ok(())
}

#[tokio::test]
async fn test_pause_on_bootstrap_temporal_filter_resume_by_risectl() -> Result<()> {
test_temporal_filter(ResumeBy::Risectl).await
}

#[tokio::test]
async fn test_pause_on_bootstrap_temporal_filter_resume_by_restart() -> Result<()> {
test_temporal_filter(ResumeBy::Restart).await
}