Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support swap rename syntax for table/mview/view/source/sink and subscription #19172

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions e2e_test/ddl/alter_swap_rename.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# Create initial tables and views for testing swap
statement ok
CREATE TABLE t1 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

statement ok
CREATE TABLE t2 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

# Insert some test data
statement ok
INSERT INTO t1 VALUES(1,(1,(1,2)));

statement ok
INSERT INTO t2 VALUES(2,(2,(2,4)));

# Create materialized views referencing the tables
statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT v1, (t.v2).v1 AS v21 FROM t1 t;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 t;

# Create regular views
statement ok
CREATE VIEW v1 AS SELECT t1.v1 FROM t1;

statement ok
CREATE VIEW v2 AS SELECT t2.v2 FROM t2;

# Create sources
statement ok
CREATE SOURCE src1 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '1',
fields.v.end = '5',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE src2 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '6',
fields.v.end = '10',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

# Create sinks
statement ok
CREATE SINK sink1 AS SELECT * FROM mv1 WITH (
connector = 'blackhole'
);

statement ok
CREATE SINK sink2 AS SELECT * FROM mv2 WITH (
connector = 'blackhole'
);

# Create subscriptions
statement ok
CREATE SUBSCRIPTION sub1 FROM mv1 WITH (
retention = '1D'
);

statement ok
CREATE SUBSCRIPTION sub2 FROM mv2 WITH (
retention = '1D'
);

# Test table swap
statement ok
ALTER TABLE t1 SWAP WITH t2;

statement error Permission denied
ALTER TABLE t1 SWAP WITH mv1;

statement error not found
ALTER TABLE mv1 SWAP WITH mv2;

query II rowsort
SELECT * FROM t1;
----
2 (2,"(2,4)")

query II rowsort
SELECT * FROM t2;
----
1 (1,"(1,2)")

# Test materialized view swap
statement ok
ALTER MATERIALIZED VIEW mv1 SWAP WITH mv2;

# Verify materialized view contents
query II rowsort
SELECT * FROM mv1;
----
2 2

query II rowsort
SELECT * FROM mv2;
----
1 1

# Test view swap
statement ok
ALTER VIEW v1 SWAP WITH v2;

# Verify view definitions are swapped
query TT
SHOW CREATE VIEW v1;
----
public.v1 CREATE VIEW v1 AS SELECT t2.v2 FROM t1 AS t2

query TT
SHOW CREATE VIEW v2;
----
public.v2 CREATE VIEW v2 AS SELECT t1.v1 FROM t2 AS t1

# Test source swap
statement ok
ALTER SOURCE src1 SWAP WITH src2;

# Verify source definitions are swapped
query TT
SHOW CREATE SOURCE src1;
----
public.src1 CREATE SOURCE src1 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '6', fields.v.end = '10', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

query TT
SHOW CREATE SOURCE src2;
----
public.src2 CREATE SOURCE src2 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '1', fields.v.end = '5', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

# Test sink swap
statement ok
ALTER SINK sink1 SWAP WITH sink2;

# Verify sink definitions are swapped
query TT
SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT * FROM mv1 AS mv2 WITH (connector = 'blackhole')

query TT
SHOW CREATE SINK sink2;
----
public.sink2 CREATE SINK sink2 AS SELECT * FROM mv2 AS mv1 WITH (connector = 'blackhole')

# Test subscription swap
statement ok
ALTER SUBSCRIPTION sub1 SWAP WITH sub2;

# Verify subscription definitions are swapped
query TT
SHOW CREATE SUBSCRIPTION sub1;
----
public.sub1 CREATE SUBSCRIPTION sub1 FROM mv1 WITH (retention = '1D')

query TT
SHOW CREATE SUBSCRIPTION sub2;
----
public.sub2 CREATE SUBSCRIPTION sub2 FROM mv2 WITH (retention = '1D')

# Clean up
statement ok
DROP SOURCE src1;

statement ok
DROP SOURCE src2;

statement ok
DROP TABLE t1 CASCADE;

statement ok
DROP TABLE t2 CASCADE;
21 changes: 21 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ message AlterOwnerResponse {
WaitVersion version = 2;
}

message AlterSwapRenameRequest {
message ObjectNameSwapPair {
uint32 src_object_id = 1;
uint32 dst_object_id = 2;
}
oneof object {
ObjectNameSwapPair schema = 1;
ObjectNameSwapPair table = 2;
ObjectNameSwapPair view = 3;
ObjectNameSwapPair source = 4;
ObjectNameSwapPair sink = 5;
ObjectNameSwapPair subscription = 6;
}
}

message AlterSwapRenameResponse {
common.Status status = 1;
WaitVersion version = 2;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}
Expand Down Expand Up @@ -513,4 +533,5 @@ service DdlService {
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse);
}
12 changes: 10 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
64 changes: 46 additions & 18 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,14 @@ impl SchemaCatalog {
let table_ref = Arc::new(table);

let old_table = self.table_by_id.get(&id).unwrap();
// check if table name get updated.
if old_table.name() != name {
// check if the table name gets updated.
if old_table.name() != name
&& let Some(t) = self.table_by_name.get(old_table.name())
&& t.id == id
{
self.table_by_name.remove(old_table.name());
}

self.table_by_name.insert(name, table_ref.clone());
self.table_by_id.insert(id, table_ref.clone());
table_ref
Expand All @@ -137,8 +141,11 @@ impl SchemaCatalog {
let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
let index_ref = Arc::new(index);

// check if index name get updated.
if old_index.name != name {
// check if the index name gets updated.
if old_index.name != name
&& let Some(idx) = self.index_by_name.get(&old_index.name)
&& idx.id == id
{
self.index_by_name.remove(&old_index.name);
}
self.index_by_name.insert(name, index_ref.clone());
Expand Down Expand Up @@ -245,8 +252,11 @@ impl SchemaCatalog {
let source_ref = Arc::new(source);

let old_source = self.source_by_id.get(&id).unwrap();
// check if source name get updated.
if old_source.name != name {
// check if the source name gets updated.
if old_source.name != name
&& let Some(src) = self.source_by_name.get(&old_source.name)
&& src.id == id
{
self.source_by_name.remove(&old_source.name);
}

Expand Down Expand Up @@ -294,8 +304,11 @@ impl SchemaCatalog {
let sink_ref = Arc::new(sink);

let old_sink = self.sink_by_id.get(&id).unwrap();
// check if sink name get updated.
if old_sink.name != name {
// check if the sink name gets updated.
if old_sink.name != name
&& let Some(s) = self.sink_by_name.get(&old_sink.name)
&& s.id.sink_id == id
{
self.sink_by_name.remove(&old_sink.name);
}

Expand Down Expand Up @@ -331,8 +344,11 @@ impl SchemaCatalog {
let subscription_ref = Arc::new(subscription);

let old_subscription = self.subscription_by_id.get(&id).unwrap();
// check if subscription name get updated.
if old_subscription.name != name {
// check if the subscription name gets updated.
if old_subscription.name != name
&& let Some(s) = self.subscription_by_name.get(&old_subscription.name)
&& s.id.subscription_id == id
{
self.subscription_by_name.remove(&old_subscription.name);
}

Expand Down Expand Up @@ -365,8 +381,11 @@ impl SchemaCatalog {
let view_ref = Arc::new(view);

let old_view = self.view_by_id.get(&id).unwrap();
// check if view name get updated.
if old_view.name != name {
// check if the view name gets updated.
if old_view.name != name
&& let Some(v) = self.view_by_name.get(old_view.name())
&& v.id == id
{
self.view_by_name.remove(&old_view.name);
}

Expand Down Expand Up @@ -438,8 +457,11 @@ impl SchemaCatalog {
.function_by_name
.get_mut(&old_function_by_id.name)
.unwrap();
// check if function name get updated.
if old_function_by_id.name != name {
// check if the function name gets updated.
if old_function_by_id.name != name
&& let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
&& f.id == id
{
old_function_by_name.remove(&old_function_by_id.arg_types);
if old_function_by_name.is_empty() {
self.function_by_name.remove(&old_function_by_id.name);
Expand Down Expand Up @@ -473,8 +495,11 @@ impl SchemaCatalog {
let connection_ref = Arc::new(connection);

let old_connection = self.connection_by_id.get(&id).unwrap();
// check if connection name get updated.
if old_connection.name != name {
// check if the connection name gets updated.
if old_connection.name != name
&& let Some(conn) = self.connection_by_name.get(&old_connection.name)
&& conn.id == id
{
self.connection_by_name.remove(&old_connection.name);
}

Expand Down Expand Up @@ -513,8 +538,11 @@ impl SchemaCatalog {
let secret_ref = Arc::new(secret);

let old_secret = self.secret_by_id.get(&id).unwrap();
// check if secret name get updated.
if old_secret.name != name {
// check if the secret name gets updated.
if old_secret.name != name
&& let Some(s) = self.secret_by_name.get(&old_secret.name)
&& s.id == id
{
self.secret_by_name.remove(&old_secret.name);
}

Expand Down
Loading
Loading