Skip to content

Commit

Permalink
stash!!
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Dec 24, 2024
1 parent 860c773 commit 30ebff0
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 55 deletions.
4 changes: 4 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ impl ColumnCatalog {
!self.is_generated() && !self.is_rw_timestamp_column()
}

pub fn can_drop(&self) -> bool {
!(self.is_hidden() || self.is_rw_sys_column())
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down
243 changes: 189 additions & 54 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ColumnCatalog, Engine};
use risingwave_common::catalog::{ColumnCatalog, Engine, ROW_ID_COLUMN_ID};
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand All @@ -29,13 +29,18 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Ident, ObjectName,
Statement, StructField, TableConstraint,
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, ExplainOptions, Ident,
ObjectName, Statement, StructField, TableConstraint,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::schema_has_schema_registry;
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_columns,
bind_sql_columns_generated_and_default_constraints, gen_create_table_plan,
gen_create_table_plan_without_source, gen_table_plan_inner,
generate_stream_graph_for_replace_table, ColumnIdGenerator, CreateTableInfo, CreateTableProps,
};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand All @@ -45,7 +50,7 @@ use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::{Binder, TableCatalog};
use crate::{build_graph, Binder, OptimizerContext, TableCatalog};

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
Expand Down Expand Up @@ -328,42 +333,43 @@ pub async fn handle_alter_table_column(
}

// Retrieve the original table definition and parse it to AST.
let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable {
columns,
format_encode,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let fail_if_has_schema_registry = || {
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_owned(),
"try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
))
} else {
Ok(())
}
};

if columns.is_empty() {
Err(ErrorCode::NotSupported(
"alter a table with empty column definitions".to_owned(),
"Please recreate the table with column definitions.".to_owned(),
))?
}
// let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition)
// .context("unable to parse original table definition")?
// .try_into()
// .unwrap();

// let Statement::CreateTable {
// columns: _,
// format_encode,
// ..
// } = &mut definition
// else {
// panic!("unexpected statement: {:?}", definition);
// };

// let format_encode = format_encode
// .clone()
// .map(|format_encode| format_encode.into_v2_with_warning());

// let fail_if_has_schema_registry = || {
// if let Some(format_encode) = &format_encode
// && schema_has_schema_registry(format_encode)
// {
// Err(ErrorCode::NotSupported(
// "alter table with schema registry".to_owned(),
// "try `ALTER TABLE .. FORMAT .. ENCODE .. (...)` instead".to_owned(),
// ))
// } else {
// Ok(())
// }
// };

// if columns.is_empty() {
// Err(ErrorCode::NotSupported(
// "alter a table with empty column definitions".to_owned(),
// "Please recreate the table with column definitions.".to_owned(),
// ))?
// }

if !original_catalog.incoming_sinks.is_empty()
&& matches!(operation, AlterTableOperation::DropColumn { .. })
Expand All @@ -373,19 +379,19 @@ pub async fn handle_alter_table_column(
))?;
}

let mut column_catalogs = original_catalog.columns().to_vec();
let pk_column_ids = original_catalog.pk_column_ids();

match operation {
AlterTableOperation::AddColumn {
column_def: new_column,
} => {
fail_if_has_schema_registry()?;
// fail_if_has_schema_registry()?;

// Duplicated names can actually be checked by `StreamMaterialize`. We do here for
// better error reporting.
let new_column_name = new_column.name.real_value();
if columns
.iter()
.any(|c| c.name.real_value() == new_column_name)
{
if column_catalogs.iter().any(|c| c.name() == new_column_name) {
Err(ErrorCode::InvalidInputSyntax(format!(
"column \"{new_column_name}\" of table \"{table_name}\" already exists"
)))?
Expand All @@ -401,8 +407,18 @@ pub async fn handle_alter_table_column(
))?
}

// Add the new column to the table definition if it is not created by `create table (*)` syntax.
columns.push(new_column);
let new_column = vec![new_column];

let mut new_column_catalog = bind_sql_columns(&new_column)?;

bind_sql_columns_generated_and_default_constraints(
&session,
table_name.real_value(),
&mut new_column_catalog, // no ref to existing columns
new_column,
)?;

column_catalogs.extend(new_column_catalog);
}

AlterTableOperation::DropColumn {
Expand All @@ -417,7 +433,7 @@ pub async fn handle_alter_table_column(
// Check if the column to drop is referenced by any generated columns.
for column in original_catalog.columns() {
if column_name.real_value() == column.name() && !column.is_generated() {
fail_if_has_schema_registry()?;
// fail_if_has_schema_registry()?;
}

if let Some(expr) = column.generated_expr() {
Expand All @@ -438,13 +454,16 @@ pub async fn handle_alter_table_column(

// Locate the column by name and remove it.
let column_name = column_name.real_value();
let removed_column = columns
.extract_if(|c| c.name.real_value() == column_name)
let removed_column = column_catalogs
.extract_if(|c| c.name() == column_name)
.at_most_one()
.ok()
.unwrap();

if removed_column.is_some() {
if let Some(removed_column) = removed_column {
if !removed_column.can_drop() {
bail!("cannot drop");
}
// PASS
} else if if_exists {
return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
Expand All @@ -464,8 +483,18 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?;
column_catalogs.retain(|c| !c.is_rw_timestamp_column());

// TODO: Check pk not changed.

let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan_2(
&session,
table_name,
column_catalogs,
&original_catalog,
None,
)
.await?;

let catalog_writer = session.catalog_writer()?;

Expand All @@ -475,6 +504,112 @@ pub async fn handle_alter_table_column(
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

pub async fn get_replace_table_plan_2(
session: &Arc<SessionImpl>,
table_name: ObjectName,
mut new_columns: Vec<ColumnCatalog>,
old_catalog: &Arc<TableCatalog>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Table,
StreamFragmentGraph,
ColIndexMapping,
TableJobType,
)> {
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &Statement::Abort, Arc::from(""))?;
let mut col_id_gen = ColumnIdGenerator::new_alter(old_catalog);

for new_column in &mut new_columns {
new_column.column_desc.column_id = col_id_gen.generate(&*new_column)?;
}

let context = OptimizerContext::new(handler_args, ExplainOptions::default());

let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let (database_id, schema_id) =
session.get_database_and_schema_id_for_create(schema_name.clone())?;

let row_id_index = new_columns
.iter()
.position(|c| c.column_id() == ROW_ID_COLUMN_ID);

if old_catalog.has_associated_source() || old_catalog.cdc_table_id.is_some() {
bail_not_implemented!("new replace table not supported for table with source");
}

let props = CreateTableProps {
definition: "".to_owned(), // TODO: no more definition!
append_only: old_catalog.append_only,
on_conflict: old_catalog.conflict_behavior.into(),
with_version_column: old_catalog
.version_column_index
.map(|i| old_catalog.columns()[i].name().to_owned()),
webhook_info: old_catalog.webhook_info.clone(),
engine: old_catalog.engine,
};

let info = CreateTableInfo {
columns: new_columns,
pk_column_ids: old_catalog.pk_column_ids(),
row_id_index,
watermark_descs: vec![], // TODO: this is not persisted in the catalog!
source_catalog: None,
version: col_id_gen.into_version(),
};

let (plan, table) = gen_table_plan_inner(context.into(), schema_name, name, info, props)?;

let mut graph = build_graph(plan)?;

// Fill the original table ID.
let table = Table {
id: old_catalog.id().table_id(),
..table
};

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
old_catalog
.columns()
.iter()
.map(|old_c| {
table.columns.iter().position(|new_c| {
new_c.get_column_desc().unwrap().column_id == old_c.column_id().get_id()
})
})
.collect(),
table.columns.len(),
);

let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();

let target_columns = table
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()))
.filter(|col| !col.is_rw_timestamp_column())
.collect_vec();

for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? {
hijack_merger_for_target_table(
&mut graph,
&target_columns,
&sink,
Some(&sink.unique_identity()),
)?;
}

// Set some fields ourselves so that the meta service does not need to maintain them.
let mut table = table;
table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();

Ok((None, table, graph, col_index_mapping, TableJobType::General))
}

pub fn fetch_table_catalog_for_alter(
session: &SessionImpl,
table_name: &ObjectName,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ pub struct CreateTableProps {
}

#[allow(clippy::too_many_arguments)]
fn gen_table_plan_inner(
pub fn gen_table_plan_inner(
context: OptimizerContextRef,
schema_name: Option<String>,
table_name: String,
Expand Down

0 comments on commit 30ebff0

Please sign in to comment.