Skip to content

Commit

Permalink
feat: add support for dml_rate_limit (#19679)
Browse files Browse the repository at this point in the history
(cherry picked from commit f0a91df)
  • Loading branch information
zwang28 committed Dec 6, 2024
1 parent 2a6aa41 commit f1e014a
Show file tree
Hide file tree
Showing 26 changed files with 516 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ if [[ $mode != "single-node" ]]; then
fi

sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt'
sqllogictest -p 4566 -d dev './e2e_test/dml/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ user client_encoding
user client_min_messages
user create_compaction_group_for_mv
user datestyle
user dml_rate_limit
user enable_join_ordering
user enable_share_plan
user enable_two_phase_agg
Expand Down
94 changes: 94 additions & 0 deletions e2e_test/dml/inserts.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Due to the transaction semantic and MAX_CHUNK_FOR_ATOMICITY in DmlExecutor,
# data chunks will be buffered for certain period before yielding.
# To overcome this buffer, instead of inserting data in one transaction like insert into t SELECT generate_series,
# this test inserts data via multiple inserts.

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &
17 changes: 17 additions & 0 deletions e2e_test/dml/rate_limit_no_limit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
statement ok
create table t(v1 int);

statement ok
alter table t set parallelism to 1;

include ./inserts.slt.part

sleep 3s

query I
select count(*) from t;
----
30

statement ok
drop table t;
32 changes: 32 additions & 0 deletions e2e_test/dml/rate_limit_pause_resume.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
statement ok
create table t(v1 int);

statement ok
alter table t set parallelism to 1;

# Pause data stream.
statement ok
alter table t set dml_rate_limit to 0;

include ./inserts.slt.part

sleep 3s

query I
select count(*) from t;
----
0

# Resume data stream.
statement ok
alter table t set dml_rate_limit to 1;

sleep 3s

query II
select case when count(*) between 1 and 5 then 3 else 0 end from t;
----
3

statement ok
drop table t;
20 changes: 20 additions & 0 deletions e2e_test/dml/rate_limit_set.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
statement ok
set dml_rate_limit to 2;

statement ok
create table t(v1 int);

statement ok
alter table t set parallelism to 1;

include ./inserts.slt.part

sleep 3s

query II
select case when count(*) between 2 and 10 then 6 else 0 end from t;
----
6

statement ok
drop table t;
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ enum ThrottleTarget {
MV = 2;
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
TABLE_DML = 5;
}

message ApplyThrottleRequest {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ message DmlNode {
uint64 table_version_id = 3;
// Column descriptions of the table.
repeated plan_common.ColumnDesc column_descs = 2;
optional uint32 rate_limit = 4;
}

message RowIdGenNode {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
const DISABLE_DML_RATE_LIMIT: i32 = -1;

#[serde_as]
/// This is the Session Config of RisingWave.
Expand Down Expand Up @@ -273,6 +274,12 @@ pub struct SessionConfig {
#[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
source_rate_limit: i32,

/// Set streaming rate limit (rows per second) for each parallelism for table DML.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the DML.
#[parameter(default = DISABLE_DML_RATE_LIMIT)]
dml_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
#[serde_as(as = "DisplayFromStr")]
Expand Down
2 changes: 2 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ async fn test_table_materialize() -> StreamResult<()> {
identity: format!("DmlExecutor {:X}", 2),
},
DmlExecutor::new(
ActorContext::for_test(0),
source_executor,
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
None,
)
.boxed(),
);
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ pub async fn handle_alter_streaming_rate_limit(
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
}
PbThrottleTarget::TableDml => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::Table {
return Err(InvalidInputSyntax(format!("\"{table_name}\" is not a table",)).into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
}
_ => bail!("Unsupported throttle target: {:?}", kind),
};

Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ pub async fn handle(
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetDmlRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::TableDml,
name,
rate_limit,
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetBackfillRateLimit { rate_limit },
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl StreamNode for StreamDml {
table_id: 0, // Meta will fill this table id.
table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id.
column_descs: self.column_descs.iter().map(Into::into).collect(),
rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
})
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::handler::HandlerArgs;
pub struct OverwriteOptions {
pub source_rate_limit: Option<u32>,
pub backfill_rate_limit: Option<u32>,
pub dml_rate_limit: Option<u32>,
}

impl OverwriteOptions {
pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit";
pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit";

pub fn new(args: &mut HandlerArgs) -> Self {
Expand Down Expand Up @@ -51,9 +53,23 @@ impl OverwriteOptions {
}
}
};
let dml_rate_limit = {
if let Some(x) = args.with_options.remove(Self::DML_RATE_LIMIT_KEY) {
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
} else {
let rate_limit = args.session.config().dml_rate_limit();
if rate_limit < 0 {
None
} else {
Some(rate_limit as u32)
}
}
};
Self {
source_rate_limit,
backfill_rate_limit,
dml_rate_limit,
}
}
}
5 changes: 5 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ impl StreamManagerService for StreamServiceImpl {
.update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::TableDml => {
self.metadata_manager
.update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::Unspecified => {
return Err(Status::invalid_argument("unspecified throttle target"))
}
Expand Down
Loading

0 comments on commit f1e014a

Please sign in to comment.