Skip to content

Commit

Permalink
feat: support swap rename syntax for table, mview, view, source, sink…
Browse files Browse the repository at this point in the history
… and subscription
  • Loading branch information
yezizp2012 committed Oct 29, 2024
1 parent 661939a commit a455ac6
Show file tree
Hide file tree
Showing 14 changed files with 760 additions and 159 deletions.
21 changes: 21 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ message AlterOwnerResponse {
WaitVersion version = 2;
}

message AlterSwapRenameRequest {
message ObjectNameSwapPair {
uint32 src_object_id = 1;
uint32 dst_object_id = 2;
}
oneof object {
ObjectNameSwapPair schema = 1;
ObjectNameSwapPair table = 2;
ObjectNameSwapPair view = 3;
ObjectNameSwapPair source = 4;
ObjectNameSwapPair sink = 5;
ObjectNameSwapPair subscription = 6;
}
}

message AlterSwapRenameResponse {
common.Status status = 1;
WaitVersion version = 2;
}

message CreateFunctionRequest {
catalog.Function function = 1;
}
Expand Down Expand Up @@ -513,4 +533,5 @@ service DdlService {
rpc Wait(WaitRequest) returns (WaitResponse);
rpc CommentOn(CommentOnRequest) returns (CommentOnResponse);
rpc AutoSchemaChange(AutoSchemaChangeRequest) returns (AutoSchemaChangeResponse);
rpc AlterSwapRename(AlterSwapRenameRequest) returns (AlterSwapRenameResponse);
}
12 changes: 10 additions & 2 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use risingwave_pb::catalog::{
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
alter_name_request, alter_owner_request, alter_set_schema_request, alter_swap_rename_request,
create_connection_request, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -197,6 +198,8 @@ pub trait CatalogWriter: Send + Sync {
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -498,6 +501,11 @@ impl CatalogWriter for CatalogWriterImpl {

Ok(())
}

async fn alter_swap_rename(&self, object: alter_swap_rename_request::Object) -> Result<()> {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
82 changes: 62 additions & 20 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,14 @@ impl SchemaCatalog {
let table_ref = Arc::new(table);

let old_table = self.table_by_id.get(&id).unwrap();
// check if table name get updated.
// check if the table name gets updated.
if old_table.name() != name {
self.table_by_name.remove(old_table.name());
// keep the old table for swap rename if the old name assigned to another table.
if let Some(t) = self.table_by_name.get(old_table.name())
&& t.id == id
{
self.table_by_name.remove(old_table.name());
}
}
self.table_by_name.insert(name, table_ref.clone());
self.table_by_id.insert(id, table_ref.clone());
Expand All @@ -137,9 +142,13 @@ impl SchemaCatalog {
let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
let index_ref = Arc::new(index);

// check if index name get updated.
// check if the index name gets updated.
if old_index.name != name {
self.index_by_name.remove(&old_index.name);
if let Some(idx) = self.index_by_name.get(&old_index.name)
&& idx.id == id
{
self.index_by_name.remove(&old_index.name);
}
}
self.index_by_name.insert(name, index_ref.clone());
self.index_by_id.insert(id, index_ref.clone());
Expand Down Expand Up @@ -245,9 +254,14 @@ impl SchemaCatalog {
let source_ref = Arc::new(source);

let old_source = self.source_by_id.get(&id).unwrap();
// check if source name get updated.
// check if the source name gets updated.
if old_source.name != name {
self.source_by_name.remove(&old_source.name);
// keep the old source for swap rename if the old name assigned to another source.
if let Some(src) = self.source_by_name.get(&old_source.name)
&& src.id == id
{
self.source_by_name.remove(&old_source.name);
}
}

self.source_by_name.insert(name, source_ref.clone());
Expand Down Expand Up @@ -294,9 +308,14 @@ impl SchemaCatalog {
let sink_ref = Arc::new(sink);

let old_sink = self.sink_by_id.get(&id).unwrap();
// check if sink name get updated.
// check if the sink name gets updated.
if old_sink.name != name {
self.sink_by_name.remove(&old_sink.name);
// keep the old sink for swap rename if the old name assigned to another sink.
if let Some(s) = self.sink_by_name.get(&old_sink.name)
&& s.id.sink_id == id
{
self.sink_by_name.remove(&old_sink.name);
}
}

self.sink_by_name.insert(name, sink_ref.clone());
Expand Down Expand Up @@ -331,9 +350,15 @@ impl SchemaCatalog {
let subscription_ref = Arc::new(subscription);

let old_subscription = self.subscription_by_id.get(&id).unwrap();
// check if subscription name get updated.
// check if the subscription name gets updated.
if old_subscription.name != name {
self.subscription_by_name.remove(&old_subscription.name);
// keep the old subscription for swap rename
// if the old name assigned to another subscription.
if let Some(s) = self.subscription_by_name.get(&old_subscription.name)
&& s.id.subscription_id == id
{
self.subscription_by_name.remove(&old_subscription.name);
}
}

self.subscription_by_name
Expand Down Expand Up @@ -365,9 +390,14 @@ impl SchemaCatalog {
let view_ref = Arc::new(view);

let old_view = self.view_by_id.get(&id).unwrap();
// check if view name get updated.
// check if the view name gets updated.
if old_view.name != name {
self.view_by_name.remove(&old_view.name);
// keep the old view for swap rename if the old name assigned to another view.
if let Some(v) = self.view_by_name.get(old_view.name())
&& v.id == id
{
self.view_by_name.remove(&old_view.name);
}
}

self.view_by_name.insert(name, view_ref.clone());
Expand Down Expand Up @@ -438,11 +468,15 @@ impl SchemaCatalog {
.function_by_name
.get_mut(&old_function_by_id.name)
.unwrap();
// check if function name get updated.
// check if the function name gets updated.
if old_function_by_id.name != name {
old_function_by_name.remove(&old_function_by_id.arg_types);
if old_function_by_name.is_empty() {
self.function_by_name.remove(&old_function_by_id.name);
if let Some(f) = old_function_by_name.get(&old_function_by_id.arg_types)
&& f.id == id
{
old_function_by_name.remove(&old_function_by_id.arg_types);
if old_function_by_name.is_empty() {
self.function_by_name.remove(&old_function_by_id.name);
}
}
}

Expand Down Expand Up @@ -473,9 +507,13 @@ impl SchemaCatalog {
let connection_ref = Arc::new(connection);

let old_connection = self.connection_by_id.get(&id).unwrap();
// check if connection name get updated.
// check if the connection name gets updated.
if old_connection.name != name {
self.connection_by_name.remove(&old_connection.name);
if let Some(conn) = self.connection_by_name.get(&old_connection.name)
&& conn.id == id
{
self.connection_by_name.remove(&old_connection.name);
}
}

self.connection_by_name.insert(name, connection_ref.clone());
Expand Down Expand Up @@ -513,9 +551,13 @@ impl SchemaCatalog {
let secret_ref = Arc::new(secret);

let old_secret = self.secret_by_id.get(&id).unwrap();
// check if secret name get updated.
// check if the secret name gets updated.
if old_secret.name != name {
self.secret_by_name.remove(&old_secret.name);
if let Some(s) = self.secret_by_name.get(&old_secret.name)
&& s.id == id
{
self.secret_by_name.remove(&old_secret.name);
}
}

self.secret_by_name.insert(name, secret_ref.clone());
Expand Down
179 changes: 179 additions & 0 deletions src/frontend/src/handler/alter_swap_rename.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_pb::ddl_service::alter_swap_rename_request;
use risingwave_pb::ddl_service::alter_swap_rename_request::ObjectNameSwapPair;
use risingwave_sqlparser::ast::ObjectName;

use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::CatalogError;
use crate::error::{ErrorCode, Result};
use crate::handler::{HandlerArgs, RwPgResponse};
use crate::session::SessionImpl;
use crate::user::UserId;
use crate::Binder;

/// Check if the session user has the privilege to swap and rename the objects.
fn check_swap_rename_privilege(
session_impl: &Arc<SessionImpl>,
src_owner: UserId,
target_owner: UserId,
) -> Result<()> {
if !session_impl.is_super_user()
&& (src_owner != session_impl.user_id() || target_owner != session_impl.user_id())
{
return Err(ErrorCode::PermissionDenied(format!(
"{} is not super user and not the owner of the objects.",
session_impl.user_name()
))
.into());
}
Ok(())
}

pub async fn handle_swap_rename(
handler_args: HandlerArgs,
source_object: ObjectName,
target_object: ObjectName,
stmt_type: StatementType,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let (src_schema_name, src_obj_name) =
Binder::resolve_schema_qualified_name(db_name, source_object)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let src_schema_path = SchemaPath::new(src_schema_name.as_deref(), &search_path, user_name);
let (target_schema_name, target_obj_name) =
Binder::resolve_schema_qualified_name(db_name, target_object)?;
let target_schema_path =
SchemaPath::new(target_schema_name.as_deref(), &search_path, user_name);

let obj = match stmt_type {
StatementType::ALTER_SCHEMA => {
// TODO: support it until resolves https://github.com/risingwavelabs/risingwave/issues/19028
bail_not_implemented!("ALTER SCHEMA SWAP WITH is not supported yet");
}
StatementType::ALTER_TABLE | StatementType::ALTER_MATERIALIZED_VIEW => {
let catalog_reader = session.env().catalog_reader().read_guard();
let (src_table, _) = catalog_reader.get_created_table_by_name(
db_name,
src_schema_path,
&src_obj_name,
)?;
let (target_table, _) = catalog_reader.get_created_table_by_name(
db_name,
target_schema_path,
&target_obj_name,
)?;

if src_table.table_type != target_table.table_type {
return Err(ErrorCode::PermissionDenied(format!(
"cannot swap between {} and {}: type mismatch",
src_obj_name, target_obj_name
))
.into());
}
if stmt_type == StatementType::ALTER_TABLE && !src_table.is_table() {
return Err(CatalogError::NotFound("table", src_obj_name.to_string()).into());
} else if stmt_type == StatementType::ALTER_MATERIALIZED_VIEW && !src_table.is_mview() {
return Err(
CatalogError::NotFound("materialized view", src_obj_name.to_string()).into(),
);
}

check_swap_rename_privilege(&session, src_table.owner, target_table.owner)?;

alter_swap_rename_request::Object::Table(ObjectNameSwapPair {
src_object_id: src_table.id.table_id,
dst_object_id: target_table.id.table_id,
})
}
StatementType::ALTER_VIEW => {
let catalog_reader = session.env().catalog_reader().read_guard();
let (src_view, _) =
catalog_reader.get_view_by_name(db_name, src_schema_path, &src_obj_name)?;
let (target_view, _) =
catalog_reader.get_view_by_name(db_name, target_schema_path, &target_obj_name)?;
check_swap_rename_privilege(&session, src_view.owner, target_view.owner)?;

alter_swap_rename_request::Object::View(ObjectNameSwapPair {
src_object_id: src_view.id,
dst_object_id: target_view.id,
})
}
StatementType::ALTER_SOURCE => {
let catalog_reader = session.env().catalog_reader().read_guard();
let (src_source, _) =
catalog_reader.get_source_by_name(db_name, src_schema_path, &src_obj_name)?;
let (target_source, _) =
catalog_reader.get_source_by_name(db_name, target_schema_path, &target_obj_name)?;
check_swap_rename_privilege(&session, src_source.owner, target_source.owner)?;

alter_swap_rename_request::Object::Source(ObjectNameSwapPair {
src_object_id: src_source.id,
dst_object_id: target_source.id,
})
}
StatementType::ALTER_SINK => {
let catalog_reader = session.env().catalog_reader().read_guard();
let (src_sink, _) =
catalog_reader.get_sink_by_name(db_name, src_schema_path, &src_obj_name)?;
let (target_sink, _) =
catalog_reader.get_sink_by_name(db_name, target_schema_path, &target_obj_name)?;
check_swap_rename_privilege(
&session,
src_sink.owner.user_id,
target_sink.owner.user_id,
)?;

alter_swap_rename_request::Object::Sink(ObjectNameSwapPair {
src_object_id: src_sink.id.sink_id,
dst_object_id: target_sink.id.sink_id,
})
}
StatementType::ALTER_SUBSCRIPTION => {
let catalog_reader = session.env().catalog_reader().read_guard();
let (src_subscription, _) =
catalog_reader.get_subscription_by_name(db_name, src_schema_path, &src_obj_name)?;
let (target_subscription, _) = catalog_reader.get_subscription_by_name(
db_name,
target_schema_path,
&target_obj_name,
)?;
check_swap_rename_privilege(
&session,
src_subscription.owner.user_id,
target_subscription.owner.user_id,
)?;

alter_swap_rename_request::Object::Subscription(ObjectNameSwapPair {
src_object_id: src_subscription.id.subscription_id,
dst_object_id: target_subscription.id.subscription_id,
})
}
_ => {
unreachable!("handle_swap_rename: unsupported statement type")
}
};

let catalog_writer = session.catalog_writer()?;
catalog_writer.alter_swap_rename(obj).await?;

Ok(RwPgResponse::empty_result(stmt_type))
}
Loading

0 comments on commit a455ac6

Please sign in to comment.