Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for dml_rate_limit (#19679) #19699

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ if [[ $mode != "single-node" ]]; then
fi

sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt'
sqllogictest -p 4566 -d dev './e2e_test/dml/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ user client_encoding
user client_min_messages
user create_compaction_group_for_mv
user datestyle
user dml_rate_limit
user enable_join_ordering
user enable_share_plan
user enable_two_phase_agg
Expand Down
94 changes: 94 additions & 0 deletions e2e_test/dml/inserts.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Due to the transaction semantic and MAX_CHUNK_FOR_ATOMICITY in DmlExecutor,
# data chunks will be buffered for certain period before yielding.
# To overcome this buffer, instead of inserting data in one transaction like insert into t SELECT generate_series,
# this test inserts data via multiple inserts.

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &

system ok
nohup ./risedev psql -c 'insert into t values (1)' > /dev/null 2>&1 &
17 changes: 17 additions & 0 deletions e2e_test/dml/rate_limit_no_limit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
statement ok
create table t(v1 int);

statement ok
alter table t set parallelism to 1;

include ./inserts.slt.part

sleep 3s

query I
select count(*) from t;
----
30

statement ok
drop table t;
32 changes: 32 additions & 0 deletions e2e_test/dml/rate_limit_pause_resume.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
statement ok
create table t(v1 int);

statement ok
alter table t set parallelism to 1;

# Pause data stream.
statement ok
alter table t set dml_rate_limit to 0;

include ./inserts.slt.part

sleep 3s

query I
select count(*) from t;
----
0

# Resume data stream.
statement ok
alter table t set dml_rate_limit to 1;

sleep 3s

query II
select case when count(*) between 1 and 5 then 3 else 0 end from t;
----
3

statement ok
drop table t;
20 changes: 20 additions & 0 deletions e2e_test/dml/rate_limit_set.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
statement ok
set dml_rate_limit to 2;

statement ok
create table t(v1 int);

statement ok
alter table t set parallelism to 1;

include ./inserts.slt.part

sleep 3s

query II
select case when count(*) between 2 and 10 then 6 else 0 end from t;
----
6

statement ok
drop table t;
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ enum ThrottleTarget {
MV = 2;
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
TABLE_DML = 5;
}

message ApplyThrottleRequest {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ message DmlNode {
uint64 table_version_id = 3;
// Column descriptions of the table.
repeated plan_common.ColumnDesc column_descs = 2;
optional uint32 rate_limit = 4;
}

message RowIdGenNode {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;
const DISABLE_DML_RATE_LIMIT: i32 = -1;

#[serde_as]
/// This is the Session Config of RisingWave.
Expand Down Expand Up @@ -273,6 +274,12 @@ pub struct SessionConfig {
#[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
source_rate_limit: i32,

/// Set streaming rate limit (rows per second) for each parallelism for table DML.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the DML.
#[parameter(default = DISABLE_DML_RATE_LIMIT)]
dml_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
#[serde_as(as = "DisplayFromStr")]
Expand Down
2 changes: 2 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ async fn test_table_materialize() -> StreamResult<()> {
identity: format!("DmlExecutor {:X}", 2),
},
DmlExecutor::new(
ActorContext::for_test(0),
source_executor,
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
None,
)
.boxed(),
);
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ pub async fn handle_alter_streaming_rate_limit(
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
}
PbThrottleTarget::TableDml => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.table_id)
}
_ => bail!("Unsupported throttle target: {:?}", kind),
};

Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ pub async fn handle(
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetDmlRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::TableDml,
name,
rate_limit,
)
.await
}
Statement::AlterTable {
name,
operation: AlterTableOperation::SetBackfillRateLimit { rate_limit },
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl StreamNode for StreamDml {
table_id: 0, // Meta will fill this table id.
table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id.
column_descs: self.column_descs.iter().map(Into::into).collect(),
rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
})
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::handler::HandlerArgs;
pub struct OverwriteOptions {
pub source_rate_limit: Option<u32>,
pub backfill_rate_limit: Option<u32>,
pub dml_rate_limit: Option<u32>,
}

impl OverwriteOptions {
pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit";
pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit";

pub fn new(args: &mut HandlerArgs) -> Self {
Expand Down Expand Up @@ -51,9 +53,23 @@ impl OverwriteOptions {
}
}
};
let dml_rate_limit = {
if let Some(x) = args.with_options.remove(Self::DML_RATE_LIMIT_KEY) {
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
} else {
let rate_limit = args.session.config().dml_rate_limit();
if rate_limit < 0 {
None
} else {
Some(rate_limit as u32)
}
}
};
Self {
source_rate_limit,
backfill_rate_limit,
dml_rate_limit,
}
}
}
5 changes: 5 additions & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ impl StreamManagerService for StreamServiceImpl {
.update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::TableDml => {
self.metadata_manager
.update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::Unspecified => {
return Err(Status::invalid_argument("unspecified throttle target"))
}
Expand Down
59 changes: 59 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,65 @@ impl CatalogController {
Ok(fragment_actors)
}

pub async fn update_dml_rate_limit_by_job_id(
&self,
job_id: ObjectId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;

let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::FragmentTypeMask,
fragment::Column::StreamNode,
])
.filter(fragment::Column::JobId.eq(job_id))
.into_tuple()
.all(&txn)
.await?;
let mut fragments = fragments
.into_iter()
.map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
.collect_vec();

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
visit_stream_node(stream_node, |node| {
if let PbNodeBody::Dml(node) = node {
node.rate_limit = rate_limit;
found = true;
}
});
}
found
});

if fragments.is_empty() {
return Err(MetaError::invalid_parameter(format!(
"dml node not found in job id {job_id}"
)));
}
let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
for (id, _, stream_node) in fragments {
fragment::ActiveModel {
fragment_id: Set(id),
stream_node: Set(StreamNode::from(&stream_node)),
..Default::default()
}
.update(&txn)
.await?;
}
let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;

txn.commit().await?;

Ok(fragment_actors)
}

pub async fn post_apply_reschedules(
&self,
reschedules: HashMap<FragmentId, Reschedule>,
Expand Down
Loading
Loading