Skip to content

Commit

Permalink
feat: support swap rename syntax for table/mview/view/source/sink and…
Browse files Browse the repository at this point in the history
… subscription (#19172)

Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
yezizp2012 and BugenZhao committed Nov 7, 2024
1 parent fa95ab5 commit 8260686
Show file tree
Hide file tree
Showing 15 changed files with 929 additions and 157 deletions.
181 changes: 181 additions & 0 deletions e2e_test/ddl/alter_swap_rename.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# Create initial tables and views for testing swap
statement ok
CREATE TABLE t1 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

statement ok
CREATE TABLE t2 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

# Insert some test data
statement ok
INSERT INTO t1 VALUES(1,(1,(1,2)));

statement ok
INSERT INTO t2 VALUES(2,(2,(2,4)));

# Create materialized views referencing the tables
statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT v1, (t.v2).v1 AS v21 FROM t1 t;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 t;

# Create regular views
statement ok
CREATE VIEW v1 AS SELECT t1.v1 FROM t1;

statement ok
CREATE VIEW v2 AS SELECT t2.v2 FROM t2;

# Create sources
statement ok
CREATE SOURCE src1 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '1',
fields.v.end = '5',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE src2 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '6',
fields.v.end = '10',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

# Create sinks
statement ok
CREATE SINK sink1 AS SELECT * FROM mv1 WITH (
connector = 'blackhole'
);

statement ok
CREATE SINK sink2 AS SELECT * FROM mv2 WITH (
connector = 'blackhole'
);

# Create subscriptions
statement ok
CREATE SUBSCRIPTION sub1 FROM mv1 WITH (
retention = '1D'
);

statement ok
CREATE SUBSCRIPTION sub2 FROM mv2 WITH (
retention = '1D'
);

# Test table swap
statement ok
ALTER TABLE t1 SWAP WITH t2;

statement error Permission denied
ALTER TABLE t1 SWAP WITH mv1;

statement error not found
ALTER TABLE mv1 SWAP WITH mv2;

query II rowsort
SELECT * FROM t1;
----
2 (2,"(2,4)")

query II rowsort
SELECT * FROM t2;
----
1 (1,"(1,2)")

# Test materialized view swap
statement ok
ALTER MATERIALIZED VIEW mv1 SWAP WITH mv2;

# Verify materialized view contents
query II rowsort
SELECT * FROM mv1;
----
2 2

query II rowsort
SELECT * FROM mv2;
----
1 1

# Test view swap
statement ok
ALTER VIEW v1 SWAP WITH v2;

# Verify view definitions are swapped
query TT
SHOW CREATE VIEW v1;
----
public.v1 CREATE VIEW v1 AS SELECT t2.v2 FROM t1 AS t2

query TT
SHOW CREATE VIEW v2;
----
public.v2 CREATE VIEW v2 AS SELECT t1.v1 FROM t2 AS t1

# Test source swap
statement ok
ALTER SOURCE src1 SWAP WITH src2;

# Verify source definitions are swapped
query TT
SHOW CREATE SOURCE src1;
----
public.src1 CREATE SOURCE src1 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '6', fields.v.end = '10', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

query TT
SHOW CREATE SOURCE src2;
----
public.src2 CREATE SOURCE src2 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '1', fields.v.end = '5', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

# Test sink swap
statement ok
ALTER SINK sink1 SWAP WITH sink2;

# Verify sink definitions are swapped
query TT
SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT * FROM mv1 AS mv2 WITH (connector = 'blackhole')

query TT
SHOW CREATE SINK sink2;
----
public.sink2 CREATE SINK sink2 AS SELECT * FROM mv2 AS mv1 WITH (connector = 'blackhole')

# Test subscription swap
statement ok
ALTER SUBSCRIPTION sub1 SWAP WITH sub2;

# Verify subscription definitions are swapped
query TT
SHOW CREATE SUBSCRIPTION sub1;
----
public.sub1 CREATE SUBSCRIPTION sub1 FROM mv1 WITH (retention = '1D')

query TT
SHOW CREATE SUBSCRIPTION sub2;
----
public.sub2 CREATE SUBSCRIPTION sub2 FROM mv2 WITH (retention = '1D')

# Clean up
statement ok
DROP SOURCE src1;

statement ok
DROP SOURCE src2;

statement ok
DROP TABLE t1 CASCADE;

statement ok
DROP TABLE t2 CASCADE;
21 changes: 21 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ message AlterOwnerResponse {
WaitVersion version = 2;
}

message AlterSwapRenameRequest {
message ObjectNameSwapPair {
uint32 src_object_id = 1;
uint32 dst_object_id = 2;
}
oneof object {
ObjectNameSwapPair schema = 1;
ObjectNameSwapPair table = 2;
ObjectNameSwapPair view = 3;
ObjectNameSwapPair source = 4;
ObjectNameSwapPair sink = 5;
ObjectNameSwapPair subscription = 6;
}
}

message AlterSwapRenameResponse {
common.Status status = 1;
WaitVersion version = 2;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}
Expand Down Expand Up @@ -513,4 +533,5 @@ service DdlService {
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse);
}
12 changes: 10 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
64 changes: 46 additions & 18 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ impl SchemaCatalog {
let table_ref = Arc::new(table);

let old_table = self.table_by_id.get(&id).unwrap();
// check if table name get updated.
if old_table.name() != name {
// check if the table name gets updated.
if old_table.name() != name
&& let Some(t) = self.table_by_name.get(old_table.name())
&& t.id == id
{
self.table_by_name.remove(old_table.name());
}

self.table_by_name.insert(name, table_ref.clone());
self.table_by_id.insert(id, table_ref.clone());
table_ref
Expand All @@ -137,8 +141,11 @@ impl SchemaCatalog {
let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
let index_ref = Arc::new(index);

// check if index name get updated.
if old_index.name != name {
// check if the index name gets updated.
if old_index.name != name
&& let Some(idx) = self.index_by_name.get(&old_index.name)
&& idx.id == id
{
self.index_by_name.remove(&old_index.name);
}
self.index_by_name.insert(name, index_ref.clone());
Expand Down Expand Up @@ -245,8 +252,11 @@ impl SchemaCatalog {
let source_ref = Arc::new(source);

let old_source = self.source_by_id.get(&id).unwrap();
// check if source name get updated.
if old_source.name != name {
// check if the source name gets updated.
if old_source.name != name
&& let Some(src) = self.source_by_name.get(&old_source.name)
&& src.id == id
{
self.source_by_name.remove(&old_source.name);
}

Expand Down Expand Up @@ -294,8 +304,11 @@ impl SchemaCatalog {
let sink_ref = Arc::new(sink);

let old_sink = self.sink_by_id.get(&id).unwrap();
// check if sink name get updated.
if old_sink.name != name {
// check if the sink name gets updated.
if old_sink.name != name
&& let Some(s) = self.sink_by_name.get(&old_sink.name)
&& s.id.sink_id == id
{
self.sink_by_name.remove(&old_sink.name);
}

Expand Down Expand Up @@ -331,8 +344,11 @@ impl SchemaCatalog {
let subscription_ref = Arc::new(subscription);

let old_subscription = self.subscription_by_id.get(&id).unwrap();
// check if subscription name get updated.
if old_subscription.name != name {
// check if the subscription name gets updated.
if old_subscription.name != name
&& let Some(s) = self.subscription_by_name.get(&old_subscription.name)
&& s.id.subscription_id == id
{
self.subscription_by_name.remove(&old_subscription.name);
}

Expand Down Expand Up @@ -365,8 +381,11 @@ impl SchemaCatalog {
let view_ref = Arc::new(view);

let old_view = self.view_by_id.get(&id).unwrap();
// check if view name get updated.
if old_view.name != name {
// check if the view name gets updated.
if old_view.name != name
&& let Some(v) = self.view_by_name.get(old_view.name())
&& v.id == id
{
self.view_by_name.remove(&old_view.name);
}

Expand Down Expand Up @@ -438,8 +457,11 @@ impl SchemaCatalog {
.function_by_name
.get_mut(&old_function_by_id.name)
.unwrap();
// check if function name get updated.
if old_function_by_id.name != name {
// check if the function name gets updated.
if old_function_by_id.name != name
&& let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
&& f.id == id
{
old_function_by_name.remove(&old_function_by_id.arg_types);
if old_function_by_name.is_empty() {
self.function_by_name.remove(&old_function_by_id.name);
Expand Down Expand Up @@ -473,8 +495,11 @@ impl SchemaCatalog {
let connection_ref = Arc::new(connection);

let old_connection = self.connection_by_id.get(&id).unwrap();
// check if connection name get updated.
if old_connection.name != name {
// check if the connection name gets updated.
if old_connection.name != name
&& let Some(conn) = self.connection_by_name.get(&old_connection.name)
&& conn.id == id
{
self.connection_by_name.remove(&old_connection.name);
}

Expand Down Expand Up @@ -513,8 +538,11 @@ impl SchemaCatalog {
let secret_ref = Arc::new(secret);

let old_secret = self.secret_by_id.get(&id).unwrap();
// check if secret name get updated.
if old_secret.name != name {
// check if the secret name gets updated.
if old_secret.name != name
&& let Some(s) = self.secret_by_name.get(&old_secret.name)
&& s.id == id
{
self.secret_by_name.remove(&old_secret.name);
}

Expand Down
Loading

0 comments on commit 8260686

Please sign in to comment.