diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 3f5b532ccfda0..0b8f302c278b5 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -413,6 +413,10 @@ impl ColumnCatalog { !self.is_generated() && !self.is_rw_timestamp_column() } + pub fn can_drop(&self) -> bool { + !(self.is_hidden() || self.is_rw_sys_column()) + } + /// If the column is a generated column pub fn generated_expr(&self) -> Option<&ExprNode> { if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) = diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 0342004f6b1b5..6d5eeb3ce3fcf 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::{ColumnCatalog, Engine}; +use risingwave_common::catalog::{ColumnCatalog, Engine, ROW_ID_COLUMN_ID}; use risingwave_common::hash::VnodeCount; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -29,13 +29,18 @@ use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph}; use risingwave_sqlparser::ast::{ - AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName, - Statement, StructField, TableConstraint, + AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Ident, + ObjectName, Statement, StructField, TableConstraint, }; use risingwave_sqlparser::parser::Parser; use super::create_source::schema_has_schema_registry; -use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator}; +use super::create_table::{ + bind_pk_and_row_id_on_relation, bind_sql_columns, + bind_sql_columns_generated_and_default_constraints, gen_create_table_plan, + gen_create_table_plan_without_source, gen_table_plan_inner, + generate_stream_graph_for_replace_table, ColumnIdGenerator, CreateTableInfo, CreateTableProps, +}; use super::util::SourceSchemaCompatExt; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; @@ -45,7 +50,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal}; use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project}; use crate::handler::create_table::bind_table_constraints; use crate::session::SessionImpl; -use crate::{Binder, TableCatalog}; +use crate::{build_graph, Binder, OptimizerContext, TableCatalog}; /// Used in auto schema change process pub async fn get_new_table_definition_for_cdc_table( @@ -328,42 +333,43 @@ pub async fn handle_alter_table_column( } // Retrieve the original table definition and parse it to AST. - let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition) - .context("unable to parse original table definition")? - .try_into() - .unwrap(); - let Statement::CreateTable { - columns, - format_encode, - .. - } = &mut definition - else { - panic!("unexpected statement: {:?}", definition); - }; - - let format_encode = format_encode - .clone() - .map(|format_encode| format_encode.into_v2_with_warning()); - - let fail_if_has_schema_registry = || { - if let Some(format_encode) = &format_encode - && schema_has_schema_registry(format_encode) - { - Err(ErrorCode::NotSupported( - "alter table with schema registry".to_owned(), - "try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(), - )) - } else { - Ok(()) - } - }; - - if columns.is_empty() { - Err(ErrorCode::NotSupported( - "alter a table with empty column definitions".to_owned(), - "Please recreate the table with column definitions.".to_owned(), - ))? - } + // let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition) + // .context("unable to parse original table definition")? + // .try_into() + // .unwrap(); + + // let Statement::CreateTable { + // columns: _, + // format_encode, + // .. + // } = &mut definition + // else { + // panic!("unexpected statement: {:?}", definition); + // }; + + // let format_encode = format_encode + // .clone() + // .map(|format_encode| format_encode.into_v2_with_warning()); + + // let fail_if_has_schema_registry = || { + // if let Some(format_encode) = &format_encode + // && schema_has_schema_registry(format_encode) + // { + // Err(ErrorCode::NotSupported( + // "alter table with schema registry".to_owned(), + // "try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(), + // )) + // } else { + // Ok(()) + // } + // }; + + // if columns.is_empty() { + // Err(ErrorCode::NotSupported( + // "alter a table with empty column definitions".to_owned(), + // "Please recreate the table with column definitions.".to_owned(), + // ))? + // } if !original_catalog.incoming_sinks.is_empty() && matches!(operation, AlterTableOperation::DropColumn { .. }) @@ -373,19 +379,19 @@ pub async fn handle_alter_table_column( ))?; } + let mut column_catalogs = original_catalog.columns().to_vec(); + let pk_column_ids = original_catalog.pk_column_ids(); + match operation { AlterTableOperation::AddColumn { column_def: new_column, } => { - fail_if_has_schema_registry()?; + // fail_if_has_schema_registry()?; // Duplicated names can actually be checked by `StreamMaterialize`. We do here for // better error reporting. let new_column_name = new_column.name.real_value(); - if columns - .iter() - .any(|c| c.name.real_value() == new_column_name) - { + if column_catalogs.iter().any(|c| c.name() == new_column_name) { Err(ErrorCode::InvalidInputSyntax(format!( "column \"{new_column_name}\" of table \"{table_name}\" already exists" )))? @@ -401,8 +407,18 @@ pub async fn handle_alter_table_column( ))? } - // Add the new column to the table definition if it is not created by `create table (*)` syntax. - columns.push(new_column); + let new_column = vec![new_column]; + + let mut new_column_catalog = bind_sql_columns(&new_column)?; + + bind_sql_columns_generated_and_default_constraints( + &session, + table_name.real_value(), + &mut new_column_catalog, // no ref to existing columns + new_column, + )?; + + column_catalogs.extend(new_column_catalog); } AlterTableOperation::DropColumn { @@ -417,7 +433,7 @@ pub async fn handle_alter_table_column( // Check if the column to drop is referenced by any generated columns. for column in original_catalog.columns() { if column_name.real_value() == column.name() && !column.is_generated() { - fail_if_has_schema_registry()?; + // fail_if_has_schema_registry()?; } if let Some(expr) = column.generated_expr() { @@ -438,13 +454,16 @@ pub async fn handle_alter_table_column( // Locate the column by name and remove it. let column_name = column_name.real_value(); - let removed_column = columns - .extract_if(|c| c.name.real_value() == column_name) + let removed_column = column_catalogs + .extract_if(|c| c.name() == column_name) .at_most_one() .ok() .unwrap(); - if removed_column.is_some() { + if let Some(removed_column) = removed_column { + if !removed_column.can_drop() { + bail!("cannot drop"); + } // PASS } else if if_exists { return Ok(PgResponse::builder(StatementType::ALTER_TABLE) @@ -464,8 +483,18 @@ pub async fn handle_alter_table_column( _ => unreachable!(), }; - let (source, table, graph, col_index_mapping, job_type) = - get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?; + column_catalogs.retain(|c| !c.is_rw_timestamp_column()); + + // TODO: Check pk not changed. + + let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan_2( + &session, + table_name, + column_catalogs, + &original_catalog, + None, + ) + .await?; let catalog_writer = session.catalog_writer()?; @@ -475,6 +504,112 @@ pub async fn handle_alter_table_column( Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } +pub async fn get_replace_table_plan_2( + session: &Arc, + table_name: ObjectName, + mut new_columns: Vec, + old_catalog: &Arc, + new_version_columns: Option>, // only provided in auto schema change +) -> Result<( + Option, + Table, + StreamFragmentGraph, + ColIndexMapping, + TableJobType, +)> { + // Create handler args as if we're creating a new table with the altered definition. + let handler_args = HandlerArgs::new(session.clone(), &Statement::Abort, Arc::from(""))?; + let mut col_id_gen = ColumnIdGenerator::new_alter(old_catalog); + + for new_column in &mut new_columns { + new_column.column_desc.column_id = col_id_gen.generate(&*new_column)?; + } + + let context = OptimizerContext::new(handler_args, ExplainOptions::default()); + + let db_name = session.database(); + let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; + let (database_id, schema_id) = + session.get_database_and_schema_id_for_create(schema_name.clone())?; + + let row_id_index = new_columns + .iter() + .position(|c| c.column_id() == ROW_ID_COLUMN_ID); + + if old_catalog.has_associated_source() || old_catalog.cdc_table_id.is_some() { + bail_not_implemented!("new replace table not supported for table with source"); + } + + let props = CreateTableProps { + definition: "".to_owned(), // TODO: no more definition! + append_only: old_catalog.append_only, + on_conflict: old_catalog.conflict_behavior.into(), + with_version_column: old_catalog + .version_column_index + .map(|i| old_catalog.columns()[i].name().to_owned()), + webhook_info: old_catalog.webhook_info.clone(), + engine: old_catalog.engine, + }; + + let info = CreateTableInfo { + columns: new_columns, + pk_column_ids: old_catalog.pk_column_ids(), + row_id_index, + watermark_descs: vec![], // TODO: this is not persisted in the catalog! + source_catalog: None, + version: col_id_gen.into_version(), + }; + + let (plan, table) = gen_table_plan_inner(context.into(), schema_name, name, info, props)?; + + let mut graph = build_graph(plan)?; + + // Fill the original table ID. + let table = Table { + id: old_catalog.id().table_id(), + ..table + }; + + // Calculate the mapping from the original columns to the new columns. + let col_index_mapping = ColIndexMapping::new( + old_catalog + .columns() + .iter() + .map(|old_c| { + table.columns.iter().position(|new_c| { + new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id() + }) + }) + .collect(), + table.columns.len(), + ); + + let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect(); + + let target_columns = table + .columns + .iter() + .map(|col| ColumnCatalog::from(col.clone())) + .filter(|col| !col.is_rw_timestamp_column()) + .collect_vec(); + + for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? { + hijack_merger_for_target_table( + &mut graph, + &target_columns, + &sink, + Some(&sink.unique_identity()), + )?; + } + + // Set some fields ourselves so that the meta service does not need to maintain them. + let mut table = table; + table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); + table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf(); + + Ok((None, table, graph, col_index_mapping, TableJobType::General)) +} + pub fn fetch_table_catalog_for_alter( session: &SessionImpl, table_name: &ObjectName, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 97d9b304d84b8..241ddddda8298 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -689,7 +689,7 @@ pub struct CreateTableProps { } #[allow(clippy::too_many_arguments)] -fn gen_table_plan_inner( +pub fn gen_table_plan_inner( context: OptimizerContextRef, schema_name: Option, table_name: String,