diff --git a/Cargo.lock b/Cargo.lock index 57306720c578b..04b8b4353ed2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12151,6 +12151,7 @@ name = "risingwave_stream" version = "2.1.0-rc.2" dependencies = [ "anyhow", + "arc-swap", "assert_matches", "async-recursion", "async-stream", diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 2a393186e9a6e..4f77b56c5f279 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -99,6 +99,7 @@ if [[ $mode != "single-node" ]]; then fi sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt' +sqllogictest -p 4566 -d dev './e2e_test/dml/*.slt' sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt' sqllogictest -p 4566 -d test './e2e_test/database/test.slt' diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 73e84f371c35e..043d623589482 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -31,6 +31,7 @@ user client_encoding user client_min_messages user create_compaction_group_for_mv user datestyle +user dml_rate_limit user enable_join_ordering user enable_share_plan user enable_two_phase_agg diff --git a/e2e_test/dml/inserts.slt.part b/e2e_test/dml/inserts.slt.part new file mode 100644 index 0000000000000..1f18d7b403e45 --- /dev/null +++ b/e2e_test/dml/inserts.slt.part @@ -0,0 +1,94 @@ +# Due to the transaction semantic and MAX_CHUNK_FOR_ATOMICITY in DmlExecutor, +# data chunks will be buffered for certain period before yielding. +# To overcome this buffer, instead of inserting data in one transaction like insert into t SELECT generate_series, +# this test inserts data via multiple inserts. + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & + +system ok +nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 & diff --git a/e2e_test/dml/rate_limit_no_limit.slt b/e2e_test/dml/rate_limit_no_limit.slt new file mode 100644 index 0000000000000..1b503204cd4db --- /dev/null +++ b/e2e_test/dml/rate_limit_no_limit.slt @@ -0,0 +1,17 @@ +statement ok +create table t(v1 int); + +statement ok +alter table t set parallelism to 1; + +include ./inserts.slt.part + +sleep 3s + +query I +select count(*) from t; +---- +30 + +statement ok +drop table t; diff --git a/e2e_test/dml/rate_limit_pause_resume.slt b/e2e_test/dml/rate_limit_pause_resume.slt new file mode 100644 index 0000000000000..f8e152178d6ce --- /dev/null +++ b/e2e_test/dml/rate_limit_pause_resume.slt @@ -0,0 +1,32 @@ +statement ok +create table t(v1 int); + +statement ok +alter table t set parallelism to 1; + +# Pause data stream. +statement ok +alter table t set dml_rate_limit to 0; + +include ./inserts.slt.part + +sleep 3s + +query I +select count(*) from t; +---- +0 + +# Resume data stream. +statement ok +alter table t set dml_rate_limit to 1; + +sleep 3s + +query II +select case when count(*) between 1 and 5 then 3 else 0 end from t; +---- +3 + +statement ok +drop table t; diff --git a/e2e_test/dml/rate_limit_set.slt b/e2e_test/dml/rate_limit_set.slt new file mode 100644 index 0000000000000..23a60c973fd15 --- /dev/null +++ b/e2e_test/dml/rate_limit_set.slt @@ -0,0 +1,20 @@ +statement ok +set dml_rate_limit to 2; + +statement ok +create table t(v1 int); + +statement ok +alter table t set parallelism to 1; + +include ./inserts.slt.part + +sleep 3s + +query II +select case when count(*) between 2 and 10 then 6 else 0 end from t; +---- +6 + +statement ok +drop table t; diff --git a/proto/meta.proto b/proto/meta.proto index 910d7997069df..bbeb263b74097 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -306,6 +306,7 @@ enum ThrottleTarget { MV = 2; TABLE_WITH_SOURCE = 3; CDC_TABLE = 4; + TABLE_DML = 5; } message ApplyThrottleRequest { diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 379a6001d9d5d..10c00242d9917 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -765,6 +765,7 @@ message DmlNode { uint64 table_version_id = 3; // Column descriptions of the table. repeated plan_common.ColumnDesc column_descs = 2; + optional uint32 rate_limit = 4; } message RowIdGenNode { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 01d11075ec03a..1949f3f4d5b0f 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -57,6 +57,7 @@ type SessionConfigResult = std::result::Result; // otherwise seems like it can't infer the type of -1 when written inline. const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1; const DISABLE_SOURCE_RATE_LIMIT: i32 = -1; +const DISABLE_DML_RATE_LIMIT: i32 = -1; #[serde_as] /// This is the Session Config of RisingWave. @@ -273,6 +274,12 @@ pub struct SessionConfig { #[parameter(default = DISABLE_SOURCE_RATE_LIMIT)] source_rate_limit: i32, + /// Set streaming rate limit (rows per second) for each parallelism for table DML. + /// If set to -1, disable rate limit. + /// If set to 0, this pauses the DML. + #[parameter(default = DISABLE_DML_RATE_LIMIT)] + dml_rate_limit: i32, + /// Cache policy for partition cache in streaming over window. /// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`". #[serde_as(as = "DisplayFromStr")] diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index f36cd6cf8164f..a1cf12b8e0d94 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -187,12 +187,14 @@ async fn test_table_materialize() -> StreamResult<()> { identity: format!("DmlExecutor {:X}", 2), }, DmlExecutor::new( + ActorContext::for_test(0), source_executor, dml_manager.clone(), table_id, INITIAL_TABLE_VERSION_ID, column_descs.clone(), 1024, + None, ) .boxed(), ); diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index e916d8ed8b87a..4e03765099ae9 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -85,6 +85,13 @@ pub async fn handle_alter_streaming_rate_limit( session.check_privilege_for_drop_alter(schema_name, &**table)?; (StatementType::ALTER_TABLE, table.id.table_id) } + PbThrottleTarget::TableDml => { + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?; + session.check_privilege_for_drop_alter(schema_name, &**table)?; + (StatementType::ALTER_TABLE, table.id.table_id) + } _ => bail!("Unsupported throttle target: {:?}", kind), }; diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 44a2c0590a3e2..fcc17cea4b3f9 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -722,6 +722,18 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::SetDmlRateLimit { rate_limit }, + } => { + alter_streaming_rate_limit::handle_alter_streaming_rate_limit( + handler_args, + PbThrottleTarget::TableDml, + name, + rate_limit, + ) + .await + } Statement::AlterTable { name, operation: AlterTableOperation::SetBackfillRateLimit { rate_limit }, diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index 7b671efa24c23..c69baf5ad53d9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -92,6 +92,7 @@ impl StreamNode for StreamDml { table_id: 0, // Meta will fill this table id. table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id. column_descs: self.column_descs.iter().map(Into::into).collect(), + rate_limit: self.base.ctx().overwrite_options().dml_rate_limit, }) } } diff --git a/src/frontend/src/utils/overwrite_options.rs b/src/frontend/src/utils/overwrite_options.rs index 185f1b80b154b..b6ffab6a71ef5 100644 --- a/src/frontend/src/utils/overwrite_options.rs +++ b/src/frontend/src/utils/overwrite_options.rs @@ -18,10 +18,12 @@ use crate::handler::HandlerArgs; pub struct OverwriteOptions { pub source_rate_limit: Option, pub backfill_rate_limit: Option, + pub dml_rate_limit: Option, } impl OverwriteOptions { pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit"; + pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit"; pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit"; pub fn new(args: &mut HandlerArgs) -> Self { @@ -51,9 +53,23 @@ impl OverwriteOptions { } } }; + let dml_rate_limit = { + if let Some(x) = args.with_options.remove(Self::DML_RATE_LIMIT_KEY) { + // FIXME(tabVersion): validate the value + Some(x.parse::().unwrap()) + } else { + let rate_limit = args.session.config().dml_rate_limit(); + if rate_limit < 0 { + None + } else { + Some(rate_limit as u32) + } + } + }; Self { source_rate_limit, backfill_rate_limit, + dml_rate_limit, } } } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 168b54a648da3..e7a641959e5d8 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -117,6 +117,11 @@ impl StreamManagerService for StreamServiceImpl { .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } + ThrottleTarget::TableDml => { + self.metadata_manager + .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate) + .await? + } ThrottleTarget::Unspecified => { return Err(Status::invalid_argument("unspecified throttle target")) } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 595b03bb3bba1..a23cf6f127c47 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1407,6 +1407,65 @@ impl CatalogController { Ok(fragment_actors) } + pub async fn update_dml_rate_limit_by_job_id( + &self, + job_id: ObjectId, + rate_limit: Option, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find() + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::FragmentTypeMask, + fragment::Column::StreamNode, + ]) + .filter(fragment::Column::JobId.eq(job_id)) + .into_tuple() + .all(&txn) + .await?; + let mut fragments = fragments + .into_iter() + .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) + .collect_vec(); + + fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { + let mut found = false; + if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 { + visit_stream_node(stream_node, |node| { + if let PbNodeBody::Dml(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + found + }); + + if fragments.is_empty() { + return Err(MetaError::invalid_parameter(format!( + "dml node not found in job id {job_id}" + ))); + } + let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); + for (id, _, stream_node) in fragments { + fragment::ActiveModel { + fragment_id: Set(id), + stream_node: Set(StreamNode::from(&stream_node)), + ..Default::default() + } + .update(&txn) + .await?; + } + let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?; + + txn.commit().await?; + + Ok(fragment_actors) + } + pub async fn post_apply_reschedules( &self, reschedules: HashMap, diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index b3b3843cc1304..c78a0923b7084 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use anyhow::anyhow; use risingwave_common::catalog::TableId; +use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -383,6 +384,15 @@ impl HummockManager { skip_sst_ids: &HashSet, tables_to_commit: impl Iterator, ) -> Result>> { + if self + .env + .system_params_reader() + .await + .time_travel_retention_ms() + == 0 + { + return Ok(None); + } let select_groups = group_parents .iter() .filter_map(|(cg_id, _)| { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 2d42b2f9f9e6f..ac5f0f63f43ce 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -565,6 +565,21 @@ impl MetadataManager { .collect()) } + pub async fn update_dml_rate_limit_by_table_id( + &self, + table_id: TableId, + rate_limit: Option, + ) -> MetaResult>> { + let fragment_actors = self + .catalog_controller + .update_dml_rate_limit_by_job_id(table_id.table_id as _, rate_limit) + .await?; + Ok(fragment_actors + .into_iter() + .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect())) + .collect()) + } + pub async fn update_actor_splits_by_split_assignment( &self, split_assignment: &SplitAssignment, diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 80eea383f576c..4c268cbdbf557 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -334,6 +334,10 @@ impl stream_plan::FragmentTypeFlag { pub fn rate_limit_fragments() -> i32 { Self::backfill_rate_limit_fragments() | Self::source_rate_limit_fragments() } + + pub fn dml_rate_limit_fragments() -> i32 { + stream_plan::FragmentTypeFlag::Dml as i32 + } } impl catalog::StreamSourceInfo { diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 41c096b61dcc8..827febf5de951 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -111,6 +111,10 @@ pub enum AlterTableOperation { SetBackfillRateLimit { rate_limit: i32, }, + /// `SET DML_RATE_LIMIT TO ` + SetDmlRateLimit { + rate_limit: i32, + }, /// `SWAP WITH ` SwapRenameTable { target_table: ObjectName, @@ -318,6 +322,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterTableOperation::SetDmlRateLimit { rate_limit } => { + write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit) + } AlterTableOperation::SwapRenameTable { target_table } => { write!(f, "SWAP WITH {}", target_table) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 3fe13f26d7bb8..5191d5de15eec 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3158,8 +3158,11 @@ impl Parser<'_> { AlterTableOperation::SetSourceRateLimit { rate_limit } } else if let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? { AlterTableOperation::SetBackfillRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_dml_rate_limit()? { + AlterTableOperation::SetDmlRateLimit { rate_limit } } else { - return self.expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT after SET"); + return self + .expected("SCHEMA/PARALLELISM/SOURCE_RATE_LIMIT/DML_RATE_LIMIT after SET"); } } else if self.parse_keyword(Keyword::DROP) { let _ = self.parse_keyword(Keyword::COLUMN); @@ -3237,6 +3240,28 @@ impl Parser<'_> { Ok(Some(rate_limit)) } + /// DML_RATE_LIMIT = default | NUMBER + /// DML_RATE_LIMIT TO default | NUMBER + pub fn parse_alter_dml_rate_limit(&mut self) -> PResult> { + if !self.parse_word("DML_RATE_LIMIT") { + return Ok(None); + } + if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { + return self.expected("TO or = after ALTER TABLE SET DML_RATE_LIMIT"); + } + let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { + -1 + } else { + let s = self.parse_number_value()?; + if let Ok(n) = s.parse::() { + n + } else { + return self.expected("number or DEFAULT"); + } + }; + Ok(Some(rate_limit)) + } + /// SOURCE_RATE_LIMIT = default | NUMBER /// SOURCE_RATE_LIMIT TO default | NUMBER pub fn parse_alter_source_rate_limit(&mut self, is_table: bool) -> PResult> { diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index de25cf8439be1..0a8738b96a5da 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -16,6 +16,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" +arc-swap = "1" async-recursion = "1" async-stream = "0.3" async-trait = "0.1" diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index adcb3f01ab8bd..d343db7698cef 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -14,20 +14,30 @@ use std::collections::BTreeMap; use std::mem; +use std::num::NonZeroU32; +use arc_swap::ArcSwap; use either::Either; use futures::TryStreamExt; +use governor::clock::MonotonicClock; +use governor::{Quota, RateLimiter}; +use parking_lot::Mutex; use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId}; use risingwave_common::transaction::transaction_id::TxnId; use risingwave_common::transaction::transaction_message::TxnMsg; use risingwave_dml::dml_manager::DmlManagerRef; +use risingwave_expr::codegen::BoxStream; +use tokio::sync::oneshot; use crate::executor::prelude::*; use crate::executor::stream_reader::StreamReaderWithPause; +use crate::executor::utils::compute_rate_limit_chunk_permits; /// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific /// table. The two streams will be merged into one and then sent to downstream. pub struct DmlExecutor { + actor_ctx: ActorContextRef, + upstream: Executor, /// Stores the information of batch data channels. @@ -43,6 +53,58 @@ pub struct DmlExecutor { column_descs: Vec, chunk_size: usize, + + /// Rate limit in rows/s. + rate_limiter: Arc>, + /// The handle used to resume a data stream that has been paused due to rate limiting. + rate_limit_resume_tx: oneshot::Sender<()>, +} + +type RateLimiterType = + RateLimiter; +struct DmlRateLimiter { + row_per_second: Option, + rate_limiter: Option, + resume_rx: Mutex>>, +} + +impl DmlRateLimiter { + fn new(row_per_second: Option, resume_rx: oneshot::Receiver<()>) -> Self { + let rate_limiter = if row_per_second == Some(0) { + None + } else { + row_per_second.map(|limit| { + tracing::info!(rate_limit = limit, "DML rate limit applied"); + RateLimiter::direct_with_clock( + Quota::per_second(NonZeroU32::new(limit).unwrap()), + &MonotonicClock, + ) + }) + }; + Self { + row_per_second, + rate_limiter, + resume_rx: Mutex::new(Some(resume_rx)), + } + } + + /// If true, the rate limiter should block the data stream, by invoking `block_until_resume`. + fn is_pause(&self) -> bool { + self.row_per_second == Some(0) + } + + async fn block_until_resume(&self) { + let Some(resume_rx) = self.resume_rx.lock().take() else { + tracing::warn!("DML rate limier has already been resumed."); + return; + }; + let _ = resume_rx.await; + } + + /// If true, the rate limiter should never block the data stream. + fn is_unlimited(&self) -> bool { + self.row_per_second.is_none() + } } /// If a transaction's data is less than `MAX_CHUNK_FOR_ATOMICITY` * `CHUNK_SIZE`, we can provide @@ -62,26 +124,33 @@ struct TxnBuffer { } impl DmlExecutor { + #[allow(clippy::too_many_arguments)] pub fn new( + actor_ctx: ActorContextRef, upstream: Executor, dml_manager: DmlManagerRef, table_id: TableId, table_version_id: TableVersionId, column_descs: Vec, chunk_size: usize, + rate_limit_info: Option, ) -> Self { + let (tx, rx) = oneshot::channel(); Self { + actor_ctx, upstream, dml_manager, table_id, table_version_id, column_descs, chunk_size, + rate_limiter: ArcSwap::new(DmlRateLimiter::new(rate_limit_info, rx).into()).into(), + rate_limit_resume_tx: tx, } } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(self: Box) { + async fn execute_inner(mut self: Box) { let mut upstream = self.upstream.execute(); // The first barrier message should be propagated. @@ -99,11 +168,12 @@ impl DmlExecutor { self.table_version_id, &self.column_descs, )?; - let reader = handle - .stream_reader() - .into_stream() - .map_err(StreamExecutorError::from) - .boxed(); + let reader = apply_dml_rate_limit( + handle.stream_reader().into_stream(), + self.rate_limiter.clone(), + ) + .boxed() + .map_err(StreamExecutorError::from); // Merge the two streams using `StreamReaderWithPause` because when we receive a pause // barrier, we should stop receiving the data from DML. We poll data from the two streams in @@ -141,6 +211,26 @@ impl DmlExecutor { match mutation { Mutation::Pause => stream.pause_stream(), Mutation::Resume => stream.resume_stream(), + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = + actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit + != self.rate_limiter.load().row_per_second + { + tracing::info!( + "Updating rate limit from {:?} to {:?}.", + self.rate_limiter.load().row_per_second, + *new_rate_limit + ); + let (tx, rx) = oneshot::channel(); + self.rate_limiter + .store(DmlRateLimiter::new(*new_rate_limit, rx).into()); + // Resume the data stream if it is being blocked by the old rate limiter. + let _ = self.rate_limit_resume_tx.send(()); + // Store the new resume tx. + self.rate_limit_resume_tx = tx; + } + } _ => {} } } @@ -271,6 +361,67 @@ impl Execute for DmlExecutor { } } +type BoxTxnMessageStream = BoxStream<'static, risingwave_dml::error::Result>; +#[try_stream(ok = TxnMsg, error = risingwave_dml::error::DmlError)] +async fn apply_dml_rate_limit( + stream: BoxTxnMessageStream, + rate_limiter: Arc>, +) { + #[for_await] + for txn_msg in stream { + let txn_msg = txn_msg?; + match txn_msg { + TxnMsg::Begin(txn_id) => { + yield TxnMsg::Begin(txn_id); + } + TxnMsg::End(txn_id) => { + yield TxnMsg::End(txn_id); + } + TxnMsg::Rollback(txn_id) => { + yield TxnMsg::Rollback(txn_id); + } + TxnMsg::Data(txn_id, chunk) => { + let chunk_size = chunk.capacity(); + if chunk_size == 0 { + // empty chunk + yield TxnMsg::Data(txn_id, chunk); + continue; + } + let mut guard = rate_limiter.load(); + while guard.is_pause() { + // block the stream until the rate limit is reset + guard.block_until_resume().await; + // load the new rate limiter + guard = rate_limiter.load(); + } + if guard.is_unlimited() { + yield TxnMsg::Data(txn_id, chunk); + continue; + } + let rate_limiter = guard.rate_limiter.as_ref().unwrap(); + let max_permits = guard.row_per_second.unwrap() as usize; + let required_permits = compute_rate_limit_chunk_permits(&chunk, max_permits); + if required_permits <= max_permits { + let n = NonZeroU32::new(required_permits as u32).unwrap(); + // `InsufficientCapacity` should never happen because we have check the cardinality. + rate_limiter.until_n_ready(n).await.unwrap(); + yield TxnMsg::Data(txn_id, chunk); + } else { + // Split the chunk into smaller chunks. + for small_chunk in chunk.split(max_permits) { + let required_permits = + compute_rate_limit_chunk_permits(&small_chunk, max_permits); + let n = NonZeroU32::new(required_permits as u32).unwrap(); + // Smaller chunks should have effective chunk size <= max_permits. + rate_limiter.until_n_ready(n).await.unwrap(); + yield TxnMsg::Data(txn_id, small_chunk); + } + } + } + } + } +} + #[cfg(test)] mod tests { @@ -303,12 +454,14 @@ mod tests { let source = source.into_executor(schema, pk_indices); let dml_executor = DmlExecutor::new( + ActorContext::for_test(0), source, dml_manager.clone(), table_id, INITIAL_TABLE_VERSION_ID, column_descs, 1024, + None, ); let mut dml_executor = dml_executor.boxed().execute(); diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index cab3527a32f25..7624813f86110 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -51,6 +51,7 @@ use futures_async_stream::try_stream; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::error::StreamExecutorError; +use crate::executor::utils::compute_rate_limit_chunk_permits; use crate::executor::{Barrier, Message}; /// Receive barriers from barrier manager with the channel, error on channel close. @@ -133,24 +134,6 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti ) }); - fn compute_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize { - let chunk_size = chunk.capacity(); - let ends_with_update = if chunk_size >= 2 { - // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. - // If something inconsistent happens in the stream, we may not have `U+` after this `U-`. - chunk.ops()[chunk_size - 2].is_update_delete() - } else { - false - }; - if chunk_size == limit + 1 && ends_with_update { - // If the chunk size exceed limit because of the last `Update` operation, - // we should minus 1 to make sure the permits consumed is within the limit (max burst). - chunk_size - 1 - } else { - chunk_size - } - } - #[for_await] for chunk in stream { let chunk = chunk?; @@ -165,7 +148,7 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti let limiter = limiter.as_ref().unwrap(); let limit = rate_limit_rps.unwrap() as usize; - let required_permits = compute_chunk_permits(&chunk, limit); + let required_permits = compute_rate_limit_chunk_permits(&chunk, limit); if required_permits <= limit { let n = NonZeroU32::new(required_permits as u32).unwrap(); // `InsufficientCapacity` should never happen because we have check the cardinality @@ -174,7 +157,8 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti } else { // Cut the chunk into smaller chunks for chunk in chunk.split(limit) { - let n = NonZeroU32::new(compute_chunk_permits(&chunk, limit) as u32).unwrap(); + let n = NonZeroU32::new(compute_rate_limit_chunk_permits(&chunk, limit) as u32) + .unwrap(); // chunks split should have effective chunk size <= limit limiter.until_n_ready(n).await.unwrap(); yield chunk; diff --git a/src/stream/src/executor/utils.rs b/src/stream/src/executor/utils.rs index 37ed7803a44c7..ad2e1b8a4268c 100644 --- a/src/stream/src/executor/utils.rs +++ b/src/stream/src/executor/utils.rs @@ -22,3 +22,21 @@ impl Execute for DummyExecutor { futures::stream::pending().boxed() } } + +pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize { + let chunk_size = chunk.capacity(); + let ends_with_update = if chunk_size >= 2 { + // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. + // If something inconsistent happens in the stream, we may not have `U+` after this `U-`. + chunk.ops()[chunk_size - 2].is_update_delete() + } else { + false + }; + if chunk_size == limit + 1 && ends_with_update { + // If the chunk size exceed limit because of the last `Update` operation, + // we should minus 1 to make sure the permits consumed is within the limit (max burst). + chunk_size - 1 + } else { + chunk_size + } +} diff --git a/src/stream/src/from_proto/dml.rs b/src/stream/src/from_proto/dml.rs index cc8836890ce7f..3cd809b436e52 100644 --- a/src/stream/src/from_proto/dml.rs +++ b/src/stream/src/from_proto/dml.rs @@ -38,12 +38,14 @@ impl ExecutorBuilder for DmlExecutorBuilder { let column_descs = node.column_descs.iter().map(Into::into).collect_vec(); let exec = DmlExecutor::new( + params.actor_context.clone(), upstream, params.env.dml_manager_ref(), table_id, node.table_version_id, column_descs, params.env.config().developer.chunk_size, + node.rate_limit, ); Ok((params.info, exec).into()) }