-
Notifications
You must be signed in to change notification settings - Fork 594
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(frontend): support alter source pause/resume (#19636)
- Loading branch information
Showing
5 changed files
with
182 additions
and
5 deletions.
There are no files selected for viewing
143 changes: 143 additions & 0 deletions
143
e2e_test/source_inline/kafka/alter/resume_pause_source_kafka.slt.serial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
control substitution on | ||
|
||
statement ok | ||
SET streaming_use_shared_source TO false; | ||
|
||
############## Create kafka seed data | ||
|
||
statement ok | ||
create table kafka_seed_data (v1 int); | ||
|
||
statement ok | ||
insert into kafka_seed_data select * from generate_series(1, 1000); | ||
|
||
############## Sink into kafka | ||
|
||
statement ok | ||
create sink kafka_sink | ||
from | ||
kafka_seed_data with ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'test_rate_limit', | ||
type = 'append-only', | ||
force_append_only='true' | ||
); | ||
|
||
############## Source from kafka (rate_limit = 0) | ||
|
||
# Wait for the topic to create | ||
skipif in-memory | ||
sleep 5s | ||
|
||
statement ok | ||
create source kafka_source (v1 int) with ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'test_rate_limit', | ||
scan.startup.mode = 'earliest', | ||
) FORMAT PLAIN ENCODE JSON | ||
|
||
|
||
statement ok | ||
create source kafka_source2 (v1 int) with ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'test_rate_limit', | ||
scan.startup.mode = 'earliest', | ||
source_rate_limit = 100, | ||
) FORMAT PLAIN ENCODE JSON | ||
|
||
statement ok | ||
flush; | ||
|
||
############## Check data | ||
|
||
skipif in-memory | ||
sleep 3s | ||
|
||
############## Create MV on source | ||
|
||
# This should be ignored. | ||
statement ok | ||
SET SOURCE_RATE_LIMIT=1000; | ||
|
||
statement ok | ||
create materialized view rl_mv1 as select count(*) from kafka_source; | ||
|
||
statement ok | ||
create materialized view rl_mv2 as select count(*) from kafka_source; | ||
|
||
statement ok | ||
create materialized view rl_mv3 as select count(*) from kafka_source; | ||
|
||
skipif in-memory | ||
statement count 0 | ||
alter source kafka_source pause; | ||
|
||
skipif in-memory | ||
statement error | ||
alter source kafka_source2 pause; | ||
|
||
query T | ||
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id | ||
order by name; | ||
---- | ||
rl_mv1 SOURCE {SOURCE} 0 | ||
rl_mv2 SOURCE {SOURCE} 0 | ||
rl_mv3 SOURCE {SOURCE} 0 | ||
|
||
|
||
skipif in-memory | ||
statement count 0 | ||
alter source kafka_source resume; | ||
|
||
# rate limit becomes None | ||
query T | ||
select count(*) from rw_rate_limit; | ||
---- | ||
0 | ||
|
||
skipif in-memory | ||
sleep 3s | ||
|
||
skipif in-memory | ||
query I | ||
select count(*) > 0 from rl_mv1; | ||
---- | ||
t | ||
|
||
skipif in-memory | ||
query I | ||
select count(*) > 0 from rl_mv2; | ||
---- | ||
t | ||
|
||
skipif in-memory | ||
query I | ||
select count(*) > 0 from rl_mv3; | ||
---- | ||
t | ||
|
||
############## Cleanup | ||
|
||
statement ok | ||
drop materialized view rl_mv1; | ||
|
||
statement ok | ||
drop materialized view rl_mv2; | ||
|
||
statement ok | ||
drop materialized view rl_mv3; | ||
|
||
statement ok | ||
drop source kafka_source; | ||
|
||
statement ok | ||
drop source kafka_source2; | ||
|
||
statement ok | ||
drop sink kafka_sink; | ||
|
||
statement ok | ||
drop table kafka_seed_data; | ||
|
||
statement ok | ||
SET streaming_use_shared_source TO true; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters