Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cherry-pick #18941 and #19172 (SWAP) to release 2.1 #19287

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ header:
- "src/sqlparser/**/*.rs"
- "java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/*.java"
- "java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/**/*.java"
- "src/meta/model_v2/migration/**/*.rs"
- "src/meta/model/migration/**/*.rs"
- "lints/ui/**"

comment: on-failure
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ members = [
"src/license",
"src/meta",
"src/meta/dashboard",
"src/meta/model_v2",
"src/meta/model_v2/migration",
"src/meta/model",
"src/meta/model/migration",
"src/meta/node",
"src/meta/service",
"src/object_store",
Expand Down Expand Up @@ -228,8 +228,8 @@ risingwave_mem_table_spill_test = { path = "./src/stream/spill_test" }
risingwave_meta = { path = "./src/meta" }
risingwave_meta_dashboard = { path = "./src/meta/dashboard" }
risingwave_meta_service = { path = "./src/meta/service" }
risingwave_meta_model_migration = { path = "src/meta/model_v2/migration" }
risingwave_meta_model_v2 = { path = "./src/meta/model_v2" }
risingwave_meta_model = { path = "src/meta/model" }
risingwave_meta_model_migration = { path = "src/meta/model/migration" }
risingwave_meta_node = { path = "./src/meta/node" }
risingwave_object_store = { path = "./src/object_store" }
risingwave_pb = { path = "./src/prost" }
Expand Down
181 changes: 181 additions & 0 deletions e2e_test/ddl/alter_swap_rename.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# Create initial tables and views for testing swap
statement ok
CREATE TABLE t1 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

statement ok
CREATE TABLE t2 (v1 INT primary key, v2 STRUCT<v1 INT, v2 STRUCT<v1 INT, v2 INT>>);

# Insert some test data
statement ok
INSERT INTO t1 VALUES(1,(1,(1,2)));

statement ok
INSERT INTO t2 VALUES(2,(2,(2,4)));

# Create materialized views referencing the tables
statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT v1, (t.v2).v1 AS v21 FROM t1 t;

statement ok
CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 t;

# Create regular views
statement ok
CREATE VIEW v1 AS SELECT t1.v1 FROM t1;

statement ok
CREATE VIEW v2 AS SELECT t2.v2 FROM t2;

# Create sources
statement ok
CREATE SOURCE src1 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '1',
fields.v.end = '5',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE src2 (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '6',
fields.v.end = '10',
datagen.rows.per.second='10',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;

# Create sinks
statement ok
CREATE SINK sink1 AS SELECT * FROM mv1 WITH (
connector = 'blackhole'
);

statement ok
CREATE SINK sink2 AS SELECT * FROM mv2 WITH (
connector = 'blackhole'
);

# Create subscriptions
statement ok
CREATE SUBSCRIPTION sub1 FROM mv1 WITH (
retention = '1D'
);

statement ok
CREATE SUBSCRIPTION sub2 FROM mv2 WITH (
retention = '1D'
);

# Test table swap
statement ok
ALTER TABLE t1 SWAP WITH t2;

statement error Permission denied
ALTER TABLE t1 SWAP WITH mv1;

statement error not found
ALTER TABLE mv1 SWAP WITH mv2;

query II rowsort
SELECT * FROM t1;
----
2 (2,"(2,4)")

query II rowsort
SELECT * FROM t2;
----
1 (1,"(1,2)")

# Test materialized view swap
statement ok
ALTER MATERIALIZED VIEW mv1 SWAP WITH mv2;

# Verify materialized view contents
query II rowsort
SELECT * FROM mv1;
----
2 2

query II rowsort
SELECT * FROM mv2;
----
1 1

# Test view swap
statement ok
ALTER VIEW v1 SWAP WITH v2;

# Verify view definitions are swapped
query TT
SHOW CREATE VIEW v1;
----
public.v1 CREATE VIEW v1 AS SELECT t2.v2 FROM t1 AS t2

query TT
SHOW CREATE VIEW v2;
----
public.v2 CREATE VIEW v2 AS SELECT t1.v1 FROM t2 AS t1

# Test source swap
statement ok
ALTER SOURCE src1 SWAP WITH src2;

# Verify source definitions are swapped
query TT
SHOW CREATE SOURCE src1;
----
public.src1 CREATE SOURCE src1 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '6', fields.v.end = '10', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

query TT
SHOW CREATE SOURCE src2;
----
public.src2 CREATE SOURCE src2 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '1', fields.v.end = '5', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON

# Test sink swap
statement ok
ALTER SINK sink1 SWAP WITH sink2;

# Verify sink definitions are swapped
query TT
SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT * FROM mv1 AS mv2 WITH (connector = 'blackhole')

query TT
SHOW CREATE SINK sink2;
----
public.sink2 CREATE SINK sink2 AS SELECT * FROM mv2 AS mv1 WITH (connector = 'blackhole')

# Test subscription swap
statement ok
ALTER SUBSCRIPTION sub1 SWAP WITH sub2;

# Verify subscription definitions are swapped
query TT
SHOW CREATE SUBSCRIPTION sub1;
----
public.sub1 CREATE SUBSCRIPTION sub1 FROM mv1 WITH (retention = '1D')

query TT
SHOW CREATE SUBSCRIPTION sub2;
----
public.sub2 CREATE SUBSCRIPTION sub2 FROM mv2 WITH (retention = '1D')

# Clean up
statement ok
DROP SOURCE src1;

statement ok
DROP SOURCE src2;

statement ok
DROP TABLE t1 CASCADE;

statement ok
DROP TABLE t2 CASCADE;
21 changes: 21 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ message AlterOwnerResponse {
WaitVersion version = 2;
}

message AlterSwapRenameRequest {
message ObjectNameSwapPair {
uint32 src_object_id = 1;
uint32 dst_object_id = 2;
}
oneof object {
ObjectNameSwapPair schema = 1;
ObjectNameSwapPair table = 2;
ObjectNameSwapPair view = 3;
ObjectNameSwapPair source = 4;
ObjectNameSwapPair sink = 5;
ObjectNameSwapPair subscription = 6;
}
}

message AlterSwapRenameResponse {
common.Status status = 1;
WaitVersion version = 2;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}
Expand Down Expand Up @@ -513,4 +533,5 @@ service DdlService {
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse);
}
2 changes: 1 addition & 1 deletion src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ risingwave_connector = { workspace = true }
risingwave_frontend = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_meta_model = { workspace = true }
risingwave_meta_model_migration = { workspace = true }
risingwave_meta_model_v2 = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use anyhow::{anyhow, Result};
use inquire::Confirm;
use itertools::Itertools;
use regex::Regex;
use risingwave_meta_model_v2::WorkerId;
use risingwave_meta_model::WorkerId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
use serde::{Deserialize, Serialize};
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
Loading
Loading