From 6f2d0c33848871a5f24b2537e0c713fc66afcf96 Mon Sep 17 00:00:00 2001 From: Lanqing Yang Date: Mon, 2 Dec 2024 18:40:56 -0800 Subject: [PATCH] feat(frontend): support alter source pause/resume (#19636) --- .../resume_pause_source_kafka.slt.serial | 143 ++++++++++++++++++ .../src/handler/alter_streaming_rate_limit.rs | 19 ++- src/sqlparser/src/ast/ddl.rs | 9 +- src/sqlparser/src/keywords.rs | 2 + src/sqlparser/src/parser.rs | 14 +- 5 files changed, 182 insertions(+), 5 deletions(-) create mode 100644 e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial diff --git a/e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial b/e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial new file mode 100644 index 0000000000000..0922b4e56eee3 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial @@ -0,0 +1,143 @@ +control substitution on + +statement ok +SET streaming_use_shared_source TO false; + +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +############## Sink into kafka + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + type = 'append-only', + force_append_only='true' +); + +############## Source from kafka (rate_limit = 0) + +# Wait for the topic to create +skipif in-memory +sleep 5s + +statement ok +create source kafka_source (v1 int) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + scan.startup.mode = 'earliest', +) FORMAT PLAIN ENCODE JSON + + +statement ok +create source kafka_source2 (v1 int) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit', + scan.startup.mode = 'earliest', + source_rate_limit = 100, +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Check data + +skipif in-memory +sleep 3s + +############## Create MV on source + +# This should be ignored. +statement ok +SET SOURCE_RATE_LIMIT=1000; + +statement ok +create materialized view rl_mv1 as select count(*) from kafka_source; + +statement ok +create materialized view rl_mv2 as select count(*) from kafka_source; + +statement ok +create materialized view rl_mv3 as select count(*) from kafka_source; + +skipif in-memory +statement count 0 +alter source kafka_source pause; + +skipif in-memory +statement error +alter source kafka_source2 pause; + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +rl_mv1 SOURCE {SOURCE} 0 +rl_mv2 SOURCE {SOURCE} 0 +rl_mv3 SOURCE {SOURCE} 0 + + +skipif in-memory +statement count 0 +alter source kafka_source resume; + +# rate limit becomes None +query T +select count(*) from rw_rate_limit; +---- +0 + +skipif in-memory +sleep 3s + +skipif in-memory +query I +select count(*) > 0 from rl_mv1; +---- +t + +skipif in-memory +query I +select count(*) > 0 from rl_mv2; +---- +t + +skipif in-memory +query I +select count(*) > 0 from rl_mv3; +---- +t + +############## Cleanup + +statement ok +drop materialized view rl_mv1; + +statement ok +drop materialized view rl_mv2; + +statement ok +drop materialized view rl_mv3; + +statement ok +drop source kafka_source; + +statement ok +drop source kafka_source2; + +statement ok +drop sink kafka_sink; + +statement ok +drop table kafka_seed_data; + +statement ok +SET streaming_use_shared_source TO true; diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index e916d8ed8b87a..5a7335619e043 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -16,10 +16,12 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail; use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget; use risingwave_sqlparser::ast::ObjectName; +use risingwave_sqlparser::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED}; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; +use crate::error::ErrorCode::InvalidInputSyntax; use crate::error::{ErrorCode, Result}; use crate::Binder; @@ -56,6 +58,17 @@ pub async fn handle_alter_streaming_rate_limit( let reader = session.env().catalog_reader().read_guard(); let (source, schema_name) = reader.get_source_by_name(db_name, schema_path, &real_table_name)?; + if let Some(prev_limit) = source.rate_limit { + if rate_limit == SOURCE_RATE_LIMIT_PAUSED + || (prev_limit != 0 && rate_limit == SOURCE_RATE_LIMIT_RESUMED) + { + return Err(InvalidInputSyntax( + "PAUSE or RESUME is invalid when the stream has pre configured ratelimit." + .to_string(), + ) + .into()); + } + } session.check_privilege_for_drop_alter(schema_name, &**source)?; (StatementType::ALTER_SOURCE, source.id) } @@ -91,7 +104,11 @@ pub async fn handle_alter_streaming_rate_limit( let meta_client = session.env().meta_client(); let rate_limit = if rate_limit < 0 { - None + if rate_limit == SOURCE_RATE_LIMIT_PAUSED { + Some(0) + } else { + None + } } else { Some(rate_limit as u32) }; diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index c7cf401221b07..77e393f20a87b 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -25,6 +25,7 @@ use crate::ast::{ display_comma_separated, display_separated, DataType, Expr, Ident, ObjectName, SecretRefValue, SetVariableValue, Value, }; +use crate::parser::{SOURCE_RATE_LIMIT_PAUSED, SOURCE_RATE_LIMIT_RESUMED}; use crate::tokenizer::Token; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -458,9 +459,11 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::RefreshSchema => { write!(f, "REFRESH SCHEMA") } - AlterSourceOperation::SetSourceRateLimit { rate_limit } => { - write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) - } + AlterSourceOperation::SetSourceRateLimit { rate_limit } => match *rate_limit { + SOURCE_RATE_LIMIT_PAUSED => write!(f, "PAUSE"), + SOURCE_RATE_LIMIT_RESUMED => write!(f, "RESUME"), + _ => write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit), + }, AlterSourceOperation::SwapRenameSource { target_source } => { write!(f, "SWAP WITH {}", target_source) } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 83e7d31debf07..30d8036216e9f 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -384,6 +384,7 @@ define_keywords!( PARTITIONED, PARTITIONS, PASSWORD, + PAUSE, PERCENT, PERCENTILE_CONT, PERCENTILE_DISC, @@ -433,6 +434,7 @@ define_keywords!( REPLACE, RESTRICT, RESULT, + RESUME, RETURN, RETURNING, RETURNS, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 43c7997818a14..ac3ebc9785c74 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -39,6 +39,10 @@ use crate::{impl_parse_to, parser_v2}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; pub(crate) const WEBHOOK_CONNECTOR: &str = "webhook"; +// reserve i32::MIN for pause. +pub const SOURCE_RATE_LIMIT_PAUSED: i32 = i32::MIN; +// reserve i32::MIN + 1 for resume. +pub const SOURCE_RATE_LIMIT_RESUMED: i32 = i32::MIN + 1; #[derive(Debug, Clone, PartialEq)] pub enum ParserError { @@ -3515,9 +3519,17 @@ impl Parser<'_> { } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { let target_source = self.parse_object_name()?; AlterSourceOperation::SwapRenameSource { target_source } + } else if self.parse_keyword(Keyword::PAUSE) { + AlterSourceOperation::SetSourceRateLimit { + rate_limit: SOURCE_RATE_LIMIT_PAUSED, + } + } else if self.parse_keyword(Keyword::RESUME) { + AlterSourceOperation::SetSourceRateLimit { + rate_limit: SOURCE_RATE_LIMIT_RESUMED, + } } else { return self.expected( - "RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE", + "RENAME, ADD COLUMN, OWNER TO, SET, PAUSE, RESUME, or SOURCE_RATE_LIMIT after ALTER SOURCE", ); };