diff --git a/e2e_test/ddl/alter_swap_rename.slt b/e2e_test/ddl/alter_swap_rename.slt new file mode 100644 index 0000000000000..4accda4256951 --- /dev/null +++ b/e2e_test/ddl/alter_swap_rename.slt @@ -0,0 +1,181 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +# Create initial tables and views for testing swap +statement ok +CREATE TABLE t1 (v1 INT primary key, v2 STRUCT>); + +statement ok +CREATE TABLE t2 (v1 INT primary key, v2 STRUCT>); + +# Insert some test data +statement ok +INSERT INTO t1 VALUES(1,(1,(1,2))); + +statement ok +INSERT INTO t2 VALUES(2,(2,(2,4))); + +# Create materialized views referencing the tables +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT v1, (t.v2).v1 AS v21 FROM t1 t; + +statement ok +CREATE MATERIALIZED VIEW mv2 AS SELECT v1, (t.v2).v1 AS v21 FROM t2 t; + +# Create regular views +statement ok +CREATE VIEW v1 AS SELECT t1.v1 FROM t1; + +statement ok +CREATE VIEW v2 AS SELECT t2.v2 FROM t2; + +# Create sources +statement ok +CREATE SOURCE src1 (v INT) WITH ( + connector = 'datagen', + fields.v.kind = 'sequence', + fields.v.start = '1', + fields.v.end = '5', + datagen.rows.per.second='10', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +statement ok +CREATE SOURCE src2 (v INT) WITH ( + connector = 'datagen', + fields.v.kind = 'sequence', + fields.v.start = '6', + fields.v.end = '10', + datagen.rows.per.second='10', + datagen.split.num = '1' +) FORMAT PLAIN ENCODE JSON; + +# Create sinks +statement ok +CREATE SINK sink1 AS SELECT * FROM mv1 WITH ( + connector = 'blackhole' +); + +statement ok +CREATE SINK sink2 AS SELECT * FROM mv2 WITH ( + connector = 'blackhole' +); + +# Create subscriptions +statement ok +CREATE SUBSCRIPTION sub1 FROM mv1 WITH ( + retention = '1D' +); + +statement ok +CREATE SUBSCRIPTION sub2 FROM mv2 WITH ( + retention = '1D' +); + +# Test table swap +statement ok +ALTER TABLE t1 SWAP WITH t2; + +statement error Permission denied +ALTER TABLE t1 SWAP WITH mv1; + +statement error not found +ALTER TABLE mv1 SWAP WITH mv2; + +query II rowsort +SELECT * FROM t1; +---- +2 (2,"(2,4)") + +query II rowsort +SELECT * FROM t2; +---- +1 (1,"(1,2)") + +# Test materialized view swap +statement ok +ALTER MATERIALIZED VIEW mv1 SWAP WITH mv2; + +# Verify materialized view contents +query II rowsort +SELECT * FROM mv1; +---- +2 2 + +query II rowsort +SELECT * FROM mv2; +---- +1 1 + +# Test view swap +statement ok +ALTER VIEW v1 SWAP WITH v2; + +# Verify view definitions are swapped +query TT +SHOW CREATE VIEW v1; +---- +public.v1 CREATE VIEW v1 AS SELECT t2.v2 FROM t1 AS t2 + +query TT +SHOW CREATE VIEW v2; +---- +public.v2 CREATE VIEW v2 AS SELECT t1.v1 FROM t2 AS t1 + +# Test source swap +statement ok +ALTER SOURCE src1 SWAP WITH src2; + +# Verify source definitions are swapped +query TT +SHOW CREATE SOURCE src1; +---- +public.src1 CREATE SOURCE src1 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '6', fields.v.end = '10', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON + +query TT +SHOW CREATE SOURCE src2; +---- +public.src2 CREATE SOURCE src2 (v INT) WITH (connector = 'datagen', fields.v.kind = 'sequence', fields.v.start = '1', fields.v.end = '5', datagen.rows.per.second = '10', datagen.split.num = '1') FORMAT PLAIN ENCODE JSON + +# Test sink swap +statement ok +ALTER SINK sink1 SWAP WITH sink2; + +# Verify sink definitions are swapped +query TT +SHOW CREATE SINK sink1; +---- +public.sink1 CREATE SINK sink1 AS SELECT * FROM mv1 AS mv2 WITH (connector = 'blackhole') + +query TT +SHOW CREATE SINK sink2; +---- +public.sink2 CREATE SINK sink2 AS SELECT * FROM mv2 AS mv1 WITH (connector = 'blackhole') + +# Test subscription swap +statement ok +ALTER SUBSCRIPTION sub1 SWAP WITH sub2; + +# Verify subscription definitions are swapped +query TT +SHOW CREATE SUBSCRIPTION sub1; +---- +public.sub1 CREATE SUBSCRIPTION sub1 FROM mv1 WITH (retention = '1D') + +query TT +SHOW CREATE SUBSCRIPTION sub2; +---- +public.sub2 CREATE SUBSCRIPTION sub2 FROM mv2 WITH (retention = '1D') + +# Clean up +statement ok +DROP SOURCE src1; + +statement ok +DROP SOURCE src2; + +statement ok +DROP TABLE t1 CASCADE; + +statement ok +DROP TABLE t2 CASCADE; diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index de860593e8105..6467bd6e1d7e7 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -268,6 +268,26 @@ message AlterOwnerResponse { WaitVersion version = 2; } +message AlterSwapRenameRequest { + message ObjectNameSwapPair { + uint32 src_object_id = 1; + uint32 dst_object_id = 2; + } + oneof object { + ObjectNameSwapPair schema = 1; + ObjectNameSwapPair table = 2; + ObjectNameSwapPair view = 3; + ObjectNameSwapPair source = 4; + ObjectNameSwapPair sink = 5; + ObjectNameSwapPair subscription = 6; + } +} + +message AlterSwapRenameResponse { + common.Status status = 1; + WaitVersion version = 2; +} + message CreateFunctionRequest { catalog.Function function = 1; } @@ -513,4 +533,5 @@ service DdlService { rpc Wait(WaitRequest) returns (WaitResponse); rpc CommentOn(CommentOnRequest) returns (CommentOnResponse); rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse); + rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse); } diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 2d3fbd7e0178a..271d395181df8 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{ PbSubscription, PbTable, PbView, }; use risingwave_pb::ddl_service::{ - alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request, - PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, + alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request, + create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, + WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync { object: alter_set_schema_request::Object, new_schema_id: u32, ) -> Result<()>; + + async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>; } #[derive(Clone)] @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl { Ok(()) } + + async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> { + let version = self.meta_client.alter_swap_rename(object).await?; + self.wait_version(version).await + } } impl CatalogWriterImpl { diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 0394da2a70f81..5f4d884bf53c9 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -115,10 +115,14 @@ impl SchemaCatalog { let table_ref = Arc::new(table); let old_table = self.table_by_id.get(&id).unwrap(); - // check if table name get updated. - if old_table.name() != name { + // check if the table name gets updated. + if old_table.name() != name + && let Some(t) = self.table_by_name.get(old_table.name()) + && t.id == id + { self.table_by_name.remove(old_table.name()); } + self.table_by_name.insert(name, table_ref.clone()); self.table_by_id.insert(id, table_ref.clone()); table_ref @@ -137,8 +141,11 @@ impl SchemaCatalog { let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table); let index_ref = Arc::new(index); - // check if index name get updated. - if old_index.name != name { + // check if the index name gets updated. + if old_index.name != name + && let Some(idx) = self.index_by_name.get(&old_index.name) + && idx.id == id + { self.index_by_name.remove(&old_index.name); } self.index_by_name.insert(name, index_ref.clone()); @@ -245,8 +252,11 @@ impl SchemaCatalog { let source_ref = Arc::new(source); let old_source = self.source_by_id.get(&id).unwrap(); - // check if source name get updated. - if old_source.name != name { + // check if the source name gets updated. + if old_source.name != name + && let Some(src) = self.source_by_name.get(&old_source.name) + && src.id == id + { self.source_by_name.remove(&old_source.name); } @@ -294,8 +304,11 @@ impl SchemaCatalog { let sink_ref = Arc::new(sink); let old_sink = self.sink_by_id.get(&id).unwrap(); - // check if sink name get updated. - if old_sink.name != name { + // check if the sink name gets updated. + if old_sink.name != name + && let Some(s) = self.sink_by_name.get(&old_sink.name) + && s.id.sink_id == id + { self.sink_by_name.remove(&old_sink.name); } @@ -331,8 +344,11 @@ impl SchemaCatalog { let subscription_ref = Arc::new(subscription); let old_subscription = self.subscription_by_id.get(&id).unwrap(); - // check if subscription name get updated. - if old_subscription.name != name { + // check if the subscription name gets updated. + if old_subscription.name != name + && let Some(s) = self.subscription_by_name.get(&old_subscription.name) + && s.id.subscription_id == id + { self.subscription_by_name.remove(&old_subscription.name); } @@ -365,8 +381,11 @@ impl SchemaCatalog { let view_ref = Arc::new(view); let old_view = self.view_by_id.get(&id).unwrap(); - // check if view name get updated. - if old_view.name != name { + // check if the view name gets updated. + if old_view.name != name + && let Some(v) = self.view_by_name.get(old_view.name()) + && v.id == id + { self.view_by_name.remove(&old_view.name); } @@ -438,8 +457,11 @@ impl SchemaCatalog { .function_by_name .get_mut(&old_function_by_id.name) .unwrap(); - // check if function name get updated. - if old_function_by_id.name != name { + // check if the function name gets updated. + if old_function_by_id.name != name + && let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types) + && f.id == id + { old_function_by_name.remove(&old_function_by_id.arg_types); if old_function_by_name.is_empty() { self.function_by_name.remove(&old_function_by_id.name); @@ -473,8 +495,11 @@ impl SchemaCatalog { let connection_ref = Arc::new(connection); let old_connection = self.connection_by_id.get(&id).unwrap(); - // check if connection name get updated. - if old_connection.name != name { + // check if the connection name gets updated. + if old_connection.name != name + && let Some(conn) = self.connection_by_name.get(&old_connection.name) + && conn.id == id + { self.connection_by_name.remove(&old_connection.name); } @@ -513,8 +538,11 @@ impl SchemaCatalog { let secret_ref = Arc::new(secret); let old_secret = self.secret_by_id.get(&id).unwrap(); - // check if secret name get updated. - if old_secret.name != name { + // check if the secret name gets updated. + if old_secret.name != name + && let Some(s) = self.secret_by_name.get(&old_secret.name) + && s.id == id + { self.secret_by_name.remove(&old_secret.name); } diff --git a/src/frontend/src/handler/alter_swap_rename.rs b/src/frontend/src/handler/alter_swap_rename.rs new file mode 100644 index 0000000000000..a1d23484576f8 --- /dev/null +++ b/src/frontend/src/handler/alter_swap_rename.rs @@ -0,0 +1,179 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use pgwire::pg_response::StatementType; +use risingwave_common::bail_not_implemented; +use risingwave_pb::ddl_service::alter_swap_rename_request; +use risingwave_pb::ddl_service::alter_swap_rename_request::ObjectNameSwapPair; +use risingwave_sqlparser::ast::ObjectName; + +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::CatalogError; +use crate::error::{ErrorCode, Result}; +use crate::handler::{HandlerArgs, RwPgResponse}; +use crate::session::SessionImpl; +use crate::user::UserId; +use crate::Binder; + +/// Check if the session user has the privilege to swap and rename the objects. +fn check_swap_rename_privilege( + session_impl: &Arc, + src_owner: UserId, + target_owner: UserId, +) -> Result<()> { + if !session_impl.is_super_user() + && (src_owner != session_impl.user_id() || target_owner != session_impl.user_id()) + { + return Err(ErrorCode::PermissionDenied(format!( + "{} is not super user and not the owner of the objects.", + session_impl.user_name() + )) + .into()); + } + Ok(()) +} + +pub async fn handle_swap_rename( + handler_args: HandlerArgs, + source_object: ObjectName, + target_object: ObjectName, + stmt_type: StatementType, +) -> Result { + let session = handler_args.session; + let db_name = session.database(); + let (src_schema_name, src_obj_name) = + Binder::resolve_schema_qualified_name(db_name, source_object)?; + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name); + let (target_schema_name, target_obj_name) = + Binder::resolve_schema_qualified_name(db_name, target_object)?; + let target_schema_path = + SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name); + + let obj = match stmt_type { + StatementType::ALTER_SCHEMA => { + // TODO: support it until resolves https://github.com/risingwavelabs/risingwave/issues/19028 + bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet"); + } + StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_table, _) = catalog_reader.get_created_table_by_name( + db_name, + src_schema_path, + &src_obj_name, + )?; + let (target_table, _) = catalog_reader.get_created_table_by_name( + db_name, + target_schema_path, + &target_obj_name, + )?; + + if src_table.table_type != target_table.table_type { + return Err(ErrorCode::PermissionDenied(format!( + "cannot swap between {} and {}: type mismatch", + src_obj_name, target_obj_name + )) + .into()); + } + if stmt_type == StatementType::ALTER_TABLE && !src_table.is_table() { + return Err(CatalogError::NotFound("table", src_obj_name.to_string()).into()); + } else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() { + return Err( + CatalogError::NotFound("materialized view", src_obj_name.to_string()).into(), + ); + } + + check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?; + + alter_swap_rename_request::Object::Table(ObjectNameSwapPair { + src_object_id: src_table.id.table_id, + dst_object_id: target_table.id.table_id, + }) + } + StatementType::ALTER_VIEW => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_view, _) = + catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_view, _) = + catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?; + check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?; + + alter_swap_rename_request::Object::View(ObjectNameSwapPair { + src_object_id: src_view.id, + dst_object_id: target_view.id, + }) + } + StatementType::ALTER_SOURCE => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_source, _) = + catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_source, _) = + catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?; + check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?; + + alter_swap_rename_request::Object::Source(ObjectNameSwapPair { + src_object_id: src_source.id, + dst_object_id: target_source.id, + }) + } + StatementType::ALTER_SINK => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_sink, _) = + catalog_reader.get_sink_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_sink, _) = + catalog_reader.get_sink_by_name(db_name, target_schema_path, &target_obj_name)?; + check_swap_rename_privilege( + &session, + src_sink.owner.user_id, + target_sink.owner.user_id, + )?; + + alter_swap_rename_request::Object::Sink(ObjectNameSwapPair { + src_object_id: src_sink.id.sink_id, + dst_object_id: target_sink.id.sink_id, + }) + } + StatementType::ALTER_SUBSCRIPTION => { + let catalog_reader = session.env().catalog_reader().read_guard(); + let (src_subscription, _) = + catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?; + let (target_subscription, _) = catalog_reader.get_subscription_by_name( + db_name, + target_schema_path, + &target_obj_name, + )?; + check_swap_rename_privilege( + &session, + src_subscription.owner.user_id, + target_subscription.owner.user_id, + )?; + + alter_swap_rename_request::Object::Subscription(ObjectNameSwapPair { + src_object_id: src_subscription.id.subscription_id, + dst_object_id: target_subscription.id.subscription_id, + }) + } + _ => { + unreachable!("handle_swap_rename: unsupported statement type") + } + }; + + let catalog_writer = session.catalog_writer()?; + catalog_writer.alter_swap_rename(obj).await?; + + Ok(RwPgResponse::empty_result(stmt_type)) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 42de3a116d0b6..44a2c0590a3e2 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -46,6 +46,7 @@ mod alter_set_schema; mod alter_source_column; mod alter_source_with_sr; mod alter_streaming_rate_limit; +mod alter_swap_rename; mod alter_system; mod alter_table_column; mod alter_table_with_sr; @@ -638,6 +639,18 @@ pub async fn handle( ) .await } + Statement::AlterSchema { + name, + operation: AlterSchemaOperation::SwapRenameSchema { target_schema }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_schema, + StatementType::ALTER_SCHEMA, + ) + .await + } Statement::AlterTable { name, operation: @@ -721,6 +734,18 @@ pub async fn handle( ) .await } + Statement::AlterTable { + name, + operation: AlterTableOperation::SwapRenameTable { target_table }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_table, + StatementType::ALTER_TABLE, + ) + .await + } Statement::AlterIndex { name, operation: AlterIndexOperation::RenameIndex { index_name }, @@ -838,6 +863,19 @@ pub async fn handle( ) .await } + Statement::AlterView { + materialized, + name, + operation: AlterViewOperation::SwapRenameView { target_view }, + } => { + let statement_type = if materialized { + StatementType::ALTER_MATERIALIZED_VIEW + } else { + StatementType::ALTER_VIEW + }; + alter_swap_rename::handle_swap_rename(handler_args, name, target_view, statement_type) + .await + } Statement::AlterSink { name, operation: AlterSinkOperation::RenameSink { sink_name }, @@ -885,6 +923,18 @@ pub async fn handle( ) .await } + Statement::AlterSink { + name, + operation: AlterSinkOperation::SwapRenameSink { target_sink }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_sink, + StatementType::ALTER_SINK, + ) + .await + } Statement::AlterSubscription { name, operation: AlterSubscriptionOperation::RenameSubscription { subscription_name }, @@ -914,6 +964,21 @@ pub async fn handle( ) .await } + Statement::AlterSubscription { + name, + operation: + AlterSubscriptionOperation::SwapRenameSubscription { + target_subscription, + }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_subscription, + StatementType::ALTER_SUBSCRIPTION, + ) + .await + } Statement::AlterSource { name, operation: AlterSourceOperation::RenameSource { source_name }, @@ -970,6 +1035,18 @@ pub async fn handle( ) .await } + Statement::AlterSource { + name, + operation: AlterSourceOperation::SwapRenameSource { target_source }, + } => { + alter_swap_rename::handle_swap_rename( + handler_args, + name, + target_source, + StatementType::ALTER_SOURCE, + ) + .await + } Statement::AlterFunction { name, args, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 14befbaeb7357..288cfaa377e20 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -44,8 +44,8 @@ use risingwave_pb::catalog::{ use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, create_connection_request, DdlProgress, - PbTableJobType, ReplaceTablePlan, TableJobType, + alter_name_request, alter_set_schema_request, alter_swap_rename_request, + create_connection_request, DdlProgress, PbTableJobType, ReplaceTablePlan, TableJobType, }; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ @@ -645,6 +645,10 @@ impl CatalogWriter for MockCatalogWriter { ) -> Result<()> { todo!() } + + async fn alter_swap_rename(&self, _object: alter_swap_rename_request::Object) -> Result<()> { + todo!() + } } impl MockCatalogWriter { diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1578813e2ead9..c99a4fbd365ee 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -1086,6 +1086,23 @@ impl DdlService for DdlServiceImpl { Ok(Response::new(AutoSchemaChangeResponse {})) } + + async fn alter_swap_rename( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let version = self + .ddl_controller + .run_command(DdlCommand::AlterSwapRename(req.object.unwrap())) + .await?; + + Ok(Response::new(AlterSwapRenameResponse { + status: None, + version, + })) + } } impl DdlServiceImpl { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index ec761a39e37d9..826321fb1efa0 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -63,8 +63,10 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::info; -use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs}; -use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; +use super::utils::{ + check_subscription_name_duplicate, get_fragment_ids_by_jobs, rename_relation, + rename_relation_refer, +}; use crate::controller::utils::{ build_relation_group, check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, @@ -2470,139 +2472,104 @@ impl CatalogController { ) .await?; - let mut to_update_relations = vec![]; // rename relation. - macro_rules! rename_relation { - ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ - let (mut relation, obj) = $entity::find_by_id($object_id) - .find_also_related(Object) - .one(&txn) - .await? - .unwrap(); - let obj = obj.unwrap(); - let old_name = relation.name.clone(); - relation.name = object_name.into(); - if obj.obj_type != ObjectType::View { - relation.definition = alter_relation_rename(&relation.definition, object_name); - } - let active_model = $table::ActiveModel { - $identity: Set(relation.$identity), - name: Set(object_name.into()), - definition: Set(relation.definition.clone()), - ..Default::default() - }; - active_model.update(&txn).await?; - to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::$entity(ObjectModel(relation, obj).into())), - }); - old_name - }}; - } + let (mut to_update_relations, old_name) = + rename_relation(&txn, object_type, object_id, object_name).await?; + // rename referring relation name. + to_update_relations.extend( + rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?, + ); - // TODO: check is there any thing to change for shared source? - let old_name = match object_type { - ObjectType::Table => rename_relation!(Table, table, table_id, object_id), - ObjectType::Source => rename_relation!(Source, source, source_id, object_id), - ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id), - ObjectType::Subscription => { - rename_relation!(Subscription, subscription, subscription_id, object_id) - } - ObjectType::View => rename_relation!(View, view, view_id, object_id), - ObjectType::Index => { - let (mut index, obj) = Index::find_by_id(object_id) - .find_also_related(Object) - .one(&txn) - .await? - .unwrap(); - index.name = object_name.into(); - let index_table_id = index.index_table_id; - let old_name = rename_relation!(Table, table, table_id, index_table_id); - - // the name of index and its associated table is the same. - let active_model = index::ActiveModel { - index_id: Set(index.index_id), - name: Set(object_name.into()), - ..Default::default() - }; - active_model.update(&txn).await?; - to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Index( - ObjectModel(index, obj.unwrap()).into(), - )), - }); - old_name - } - _ => unreachable!("only relation name can be altered."), - }; + txn.commit().await?; - // rename referring relation name. - macro_rules! rename_relation_ref { - ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ - let (mut relation, obj) = $entity::find_by_id($object_id) - .find_also_related(Object) - .one(&txn) - .await? - .unwrap(); - relation.definition = - alter_relation_rename_refs(&relation.definition, &old_name, object_name); - let active_model = $table::ActiveModel { - $identity: Set(relation.$identity), - definition: Set(relation.definition.clone()), - ..Default::default() - }; - active_model.update(&txn).await?; - to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::$entity( - ObjectModel(relation, obj.unwrap()).into(), - )), - }); - }}; - } - let mut objs = get_referring_objects(object_id, &txn).await?; - if object_type == ObjectType::Table { - let incoming_sinks: I32Array = Table::find_by_id(object_id) + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { + relations: to_update_relations, + }), + ) + .await; + + Ok(version) + } + + pub async fn alter_swap_rename( + &self, + object_type: ObjectType, + object_id: ObjectId, + dst_object_id: ObjectId, + ) -> MetaResult { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let dst_name: String = match object_type { + ObjectType::Table => Table::find_by_id(dst_object_id) .select_only() - .column(table::Column::IncomingSinks) + .column(table::Column::Name) .into_tuple() .one(&txn) .await? - .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::Source => Source::find_by_id(dst_object_id) + .select_only() + .column(source::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::Sink => Sink::find_by_id(dst_object_id) + .select_only() + .column(sink::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::View => View::find_by_id(dst_object_id) + .select_only() + .column(view::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + ObjectType::Subscription => Subscription::find_by_id(dst_object_id) + .select_only() + .column(subscription::Column::Name) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id) + })?, + _ => { + return Err(MetaError::permission_denied(format!( + "swap rename not supported for object type: {:?}", + object_type + ))); + } + }; - objs.extend( - incoming_sinks - .into_inner() - .into_iter() - .map(|id| PartialObject { - oid: id, - obj_type: ObjectType::Sink, - schema_id: None, - database_id: None, - }), - ); - } + // rename relations. + let (mut to_update_relations, src_name) = + rename_relation(&txn, object_type, object_id, &dst_name).await?; + let (to_update_relations2, _) = + rename_relation(&txn, object_type, dst_object_id, &src_name).await?; + to_update_relations.extend(to_update_relations2); + // rename referring relation name. + to_update_relations.extend( + rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?, + ); + to_update_relations.extend( + rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?, + ); - for obj in objs { - match obj.obj_type { - ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), - ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid), - ObjectType::Subscription => { - rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid) - } - ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid), - ObjectType::Index => { - let index_table_id: Option = Index::find_by_id(obj.oid) - .select_only() - .column(index::Column::IndexTableId) - .into_tuple() - .one(&txn) - .await?; - rename_relation_ref!(Table, table, table_id, index_table_id.unwrap()); - } - _ => { - bail!("only table, sink, subscription, view and index depend on other objects.") - } - } - } txn.commit().await?; let version = self diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index aaa71b1f21cb3..f7532fbaebf07 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -17,8 +17,8 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash; use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::{bail, hash}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; @@ -27,7 +27,7 @@ use risingwave_meta_model::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege, view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, - PrivilegeId, SchemaId, SourceId, StreamNode, UserId, VnodeBitmap, WorkerId, + PrivilegeId, SchemaId, SourceId, StreamNode, TableId, UserId, VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::WithQuery; use risingwave_pb::catalog::{ @@ -49,12 +49,15 @@ use sea_orm::sea_query::{ WithClause, }; use sea_orm::{ - ColumnTrait, ConnectionTrait, DerivePartialModel, EntityTrait, FromQueryResult, JoinType, - Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, + ColumnTrait, ConnectionTrait, DatabaseTransaction, DerivePartialModel, EntityTrait, + FromQueryResult, JoinType, Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Set, + Statement, }; use thiserror_ext::AsReport; +use crate::controller::ObjectModel; use crate::{MetaError, MetaResult}; + /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. /// /// # Examples @@ -1171,6 +1174,170 @@ pub fn extract_external_table_name_from_definition(table_definition: &str) -> Op } } +/// `rename_relation` renames the target relation and its definition, +/// it commits the changes to the transaction and returns the updated relations and the old name. +pub async fn rename_relation( + txn: &DatabaseTransaction, + object_type: ObjectType, + object_id: ObjectId, + object_name: &str, +) -> MetaResult<(Vec, String)> { + use sea_orm::ActiveModelTrait; + + use crate::controller::rename::alter_relation_rename; + + let mut to_update_relations = vec![]; + // rename relation. + macro_rules! rename_relation { + ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ + let (mut relation, obj) = $entity::find_by_id($object_id) + .find_also_related(Object) + .one(txn) + .await? + .unwrap(); + let obj = obj.unwrap(); + let old_name = relation.name.clone(); + relation.name = object_name.into(); + if obj.obj_type != ObjectType::View { + relation.definition = alter_relation_rename(&relation.definition, object_name); + } + let active_model = $table::ActiveModel { + $identity: Set(relation.$identity), + name: Set(object_name.into()), + definition: Set(relation.definition.clone()), + ..Default::default() + }; + active_model.update(txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::$entity(ObjectModel(relation, obj).into())), + }); + old_name + }}; + } + // TODO: check is there any thing to change for shared source? + let old_name = match object_type { + ObjectType::Table => rename_relation!(Table, table, table_id, object_id), + ObjectType::Source => rename_relation!(Source, source, source_id, object_id), + ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id), + ObjectType::Subscription => { + rename_relation!(Subscription, subscription, subscription_id, object_id) + } + ObjectType::View => rename_relation!(View, view, view_id, object_id), + ObjectType::Index => { + let (mut index, obj) = Index::find_by_id(object_id) + .find_also_related(Object) + .one(txn) + .await? + .unwrap(); + index.name = object_name.into(); + let index_table_id = index.index_table_id; + let old_name = rename_relation!(Table, table, table_id, index_table_id); + + // the name of index and its associated table is the same. + let active_model = index::ActiveModel { + index_id: sea_orm::ActiveValue::Set(index.index_id), + name: sea_orm::ActiveValue::Set(object_name.into()), + ..Default::default() + }; + active_model.update(txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index( + ObjectModel(index, obj.unwrap()).into(), + )), + }); + old_name + } + _ => unreachable!("only relation name can be altered."), + }; + + Ok((to_update_relations, old_name)) +} + +/// `rename_relation_refer` updates the definition of relations that refer to the target one, +/// it commits the changes to the transaction and returns all the updated relations. +pub async fn rename_relation_refer( + txn: &DatabaseTransaction, + object_type: ObjectType, + object_id: ObjectId, + object_name: &str, + old_name: &str, +) -> MetaResult> { + use sea_orm::ActiveModelTrait; + + use crate::controller::rename::alter_relation_rename_refs; + + let mut to_update_relations = vec![]; + macro_rules! rename_relation_ref { + ($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{ + let (mut relation, obj) = $entity::find_by_id($object_id) + .find_also_related(Object) + .one(txn) + .await? + .unwrap(); + relation.definition = + alter_relation_rename_refs(&relation.definition, old_name, object_name); + let active_model = $table::ActiveModel { + $identity: Set(relation.$identity), + definition: Set(relation.definition.clone()), + ..Default::default() + }; + active_model.update(txn).await?; + to_update_relations.push(PbRelation { + relation_info: Some(PbRelationInfo::$entity( + ObjectModel(relation, obj.unwrap()).into(), + )), + }); + }}; + } + let mut objs = get_referring_objects(object_id, txn).await?; + if object_type == ObjectType::Table { + let incoming_sinks: I32Array = Table::find_by_id(object_id) + .select_only() + .column(table::Column::IncomingSinks) + .into_tuple() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?; + + objs.extend( + incoming_sinks + .into_inner() + .into_iter() + .map(|id| PartialObject { + oid: id, + obj_type: ObjectType::Sink, + schema_id: None, + database_id: None, + }), + ); + } + + for obj in objs { + match obj.obj_type { + ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid), + ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid), + ObjectType::Subscription => { + rename_relation_ref!(Subscription, subscription, subscription_id, obj.oid) + } + ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid), + ObjectType::Index => { + let index_table_id: Option = Index::find_by_id(obj.oid) + .select_only() + .column(index::Column::IndexTableId) + .into_tuple() + .one(txn) + .await?; + rename_relation_ref!(Table, table, table_id, index_table_id.unwrap()); + } + _ => { + bail!("only table, sink, subscription, view and index depend on other objects.") + } + } + } + + Ok(to_update_relations) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 995643215d317..a7c1a360c85d9 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -53,7 +53,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion, + alter_name_request, alter_set_schema_request, alter_swap_rename_request, DdlProgress, + TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -148,6 +149,7 @@ pub enum DdlCommand { ), DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), + AlterSwapRename(alter_swap_rename_request::Object), ReplaceTable(ReplaceTableInfo), AlterSourceColumn(Source), AlterObjectOwner(Object, UserId), @@ -349,6 +351,7 @@ impl DdlController { DdlCommand::DropSubscription(subscription_id, drop_mode) => { ctrl.drop_subscription(subscription_id, drop_mode).await } + DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await, } } .in_current_span(); @@ -1885,6 +1888,55 @@ impl DdlController { .await } + async fn alter_swap_rename( + &self, + object: alter_swap_rename_request::Object, + ) -> MetaResult { + let (obj_type, src_id, dst_id) = match object { + alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"), + alter_swap_rename_request::Object::Table(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Table, src_id, dst_id) + } + alter_swap_rename_request::Object::View(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::View, src_id, dst_id) + } + alter_swap_rename_request::Object::Source(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Source, src_id, dst_id) + } + alter_swap_rename_request::Object::Sink(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Sink, src_id, dst_id) + } + alter_swap_rename_request::Object::Subscription(objs) => { + let (src_id, dst_id) = ( + objs.src_object_id as ObjectId, + objs.dst_object_id as ObjectId, + ); + (ObjectType::Subscription, src_id, dst_id) + } + }; + + self.metadata_manager + .catalog_controller + .alter_swap_rename(obj_type, src_id, dst_id) + .await + } + async fn alter_owner( &self, object: Object, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 6e5dd813a240b..c0dada8d55f06 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -573,6 +573,19 @@ impl MetaClient { Ok(()) } + pub async fn alter_swap_rename( + &self, + object: alter_swap_rename_request::Object, + ) -> Result { + let request = AlterSwapRenameRequest { + object: Some(object), + }; + let resp = self.inner.alter_swap_rename(request).await?; + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) + } + pub async fn replace_table( &self, source: Option, @@ -2096,6 +2109,7 @@ macro_rules! for_all_meta_rpc { ,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse } ,{ ddl_client, wait, WaitRequest, WaitResponse } ,{ ddl_client, auto_schema_change, AutoSchemaChangeRequest, AutoSchemaChangeResponse } + ,{ ddl_client, alter_swap_rename, AlterSwapRenameRequest, AlterSwapRenameResponse } ,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse } ,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse } ,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse } diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 89e8f24bf5922..41c096b61dcc8 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -38,6 +38,7 @@ pub enum AlterDatabaseOperation { pub enum AlterSchemaOperation { ChangeOwner { new_owner_name: Ident }, RenameSchema { schema_name: ObjectName }, + SwapRenameSchema { target_schema: ObjectName }, } /// An `ALTER TABLE` (`Statement::AlterTable`) operation @@ -110,6 +111,10 @@ pub enum AlterTableOperation { SetBackfillRateLimit { rate_limit: i32, }, + /// `SWAP WITH ` + SwapRenameTable { + target_table: ObjectName, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -146,6 +151,10 @@ pub enum AlterViewOperation { SetBackfillRateLimit { rate_limit: i32, }, + /// `SWAP WITH ` + SwapRenameView { + target_view: ObjectName, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -165,6 +174,10 @@ pub enum AlterSinkOperation { parallelism: SetVariableValue, deferred: bool, }, + /// `SWAP WITH ` + SwapRenameSink { + target_sink: ObjectName, + }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -173,6 +186,7 @@ pub enum AlterSubscriptionOperation { RenameSubscription { subscription_name: ObjectName }, ChangeOwner { new_owner_name: Ident }, SetSchema { new_schema_name: ObjectName }, + SwapRenameSubscription { target_subscription: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -185,6 +199,7 @@ pub enum AlterSourceOperation { FormatEncode { connector_schema: ConnectorSchema }, RefreshSchema, SetSourceRateLimit { rate_limit: i32 }, + SwapRenameSource { target_source: ObjectName }, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -221,6 +236,9 @@ impl fmt::Display for AlterSchemaOperation { AlterSchemaOperation::RenameSchema { schema_name } => { write!(f, "RENAME TO {}", schema_name) } + AlterSchemaOperation::SwapRenameSchema { target_schema } => { + write!(f, "SWAP WITH {}", target_schema) + } } } } @@ -300,6 +318,9 @@ impl fmt::Display for AlterTableOperation { AlterTableOperation::SetBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterTableOperation::SwapRenameTable { target_table } => { + write!(f, "SWAP WITH {}", target_table) + } } } } @@ -351,6 +372,9 @@ impl fmt::Display for AlterViewOperation { AlterViewOperation::SetBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterViewOperation::SwapRenameView { target_view } => { + write!(f, "SWAP WITH {}", target_view) + } } } } @@ -378,6 +402,9 @@ impl fmt::Display for AlterSinkOperation { if *deferred { " DEFERRED" } else { "" } ) } + AlterSinkOperation::SwapRenameSink { target_sink } => { + write!(f, "SWAP WITH {}", target_sink) + } } } } @@ -394,6 +421,11 @@ impl fmt::Display for AlterSubscriptionOperation { AlterSubscriptionOperation::SetSchema { new_schema_name } => { write!(f, "SET SCHEMA {}", new_schema_name) } + AlterSubscriptionOperation::SwapRenameSubscription { + target_subscription, + } => { + write!(f, "SWAP WITH {}", target_subscription) + } } } } @@ -422,6 +454,9 @@ impl fmt::Display for AlterSourceOperation { AlterSourceOperation::SetSourceRateLimit { rate_limit } => { write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) } + AlterSourceOperation::SwapRenameSource { target_source } => { + write!(f, "SWAP WITH {}", target_source) + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 8ec7191f749c2..93e47d7a6b11a 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -503,6 +503,7 @@ define_keywords!( SUCCEEDS, SUM, SUPERUSER, + SWAP, SYMMETRIC, SYNC, SYSTEM, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index dac4fc9c5ec47..ad3dff10efa32 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3079,8 +3079,11 @@ impl Parser<'_> { self.expect_keyword(Keyword::TO)?; let schema_name = self.parse_object_name()?; AlterSchemaOperation::RenameSchema { schema_name } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_schema = self.parse_object_name()?; + AlterSchemaOperation::SwapRenameSchema { target_schema } } else { - return self.expected("RENAME OR OWNER TO after ALTER SCHEMA"); + return self.expected("RENAME, OWNER TO, OR SWAP WITH after ALTER SCHEMA"); }; Ok(Statement::AlterSchema { @@ -3199,8 +3202,12 @@ impl Parser<'_> { AlterTableOperation::AlterColumn { column_name, op } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterTableOperation::RefreshSchema + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_table = self.parse_object_name()?; + AlterTableOperation::SwapRenameTable { target_table } } else { - return self.expected("ADD or RENAME or OWNER TO or SET or DROP after ALTER TABLE"); + return self + .expected("ADD or RENAME or OWNER TO or SET or DROP or SWAP after ALTER TABLE"); }; Ok(Statement::AlterTable { name: table_name, @@ -3305,6 +3312,9 @@ impl Parser<'_> { AlterViewOperation::ChangeOwner { new_owner_name: owner_name, } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_view = self.parse_object_name()?; + AlterViewOperation::SwapRenameView { target_view } } else if self.parse_keyword(Keyword::SET) { if self.parse_keyword(Keyword::SCHEMA) { let schema_name = self.parse_object_name()?; @@ -3335,7 +3345,7 @@ impl Parser<'_> { } } else { return self.expected(&format!( - "RENAME or OWNER TO or SET after ALTER {}VIEW", + "RENAME or OWNER TO or SET or SWAP after ALTER {}VIEW", if materialized { "MATERIALIZED " } else { "" } )); }; @@ -3384,6 +3394,9 @@ impl Parser<'_> { } else { return self.expected("SCHEMA/PARALLELISM after SET"); } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_sink = self.parse_object_name()?; + AlterSinkOperation::SwapRenameSink { target_sink } } else { return self.expected("RENAME or OWNER TO or SET after ALTER SINK"); }; @@ -3417,8 +3430,13 @@ impl Parser<'_> { } else { return self.expected("SCHEMA after SET"); } + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_subscription = self.parse_object_name()?; + AlterSubscriptionOperation::SwapRenameSubscription { + target_subscription, + } } else { - return self.expected("RENAME or OWNER TO or SET after ALTER SUBSCRIPTION"); + return self.expected("RENAME or OWNER TO or SET or SWAP after ALTER SUBSCRIPTION"); }; Ok(Statement::AlterSubscription { @@ -3465,6 +3483,9 @@ impl Parser<'_> { AlterSourceOperation::FormatEncode { connector_schema } } else if self.parse_keywords(&[Keyword::REFRESH, Keyword::SCHEMA]) { AlterSourceOperation::RefreshSchema + } else if self.parse_keywords(&[Keyword::SWAP, Keyword::WITH]) { + let target_source = self.parse_object_name()?; + AlterSourceOperation::SwapRenameSource { target_source } } else { return self.expected( "RENAME, ADD COLUMN, OWNER TO, SET or SOURCE_RATE_LIMIT after ALTER SOURCE",