From edadfa48851d33859d90796987adebf9612a5f11 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 17 Dec 2024 17:37:52 +0800 Subject: [PATCH 1/5] fix(frontend): check data type in column id generator Signed-off-by: Bugen Zhao --- src/common/src/catalog/column.rs | 21 ++++ src/common/src/catalog/mod.rs | 2 +- src/common/src/catalog/schema.rs | 16 +++ src/frontend/src/handler/create_source.rs | 2 +- src/frontend/src/handler/create_table.rs | 120 +++++++++++++++----- src/frontend/src/handler/create_table_as.rs | 2 +- src/frontend/src/session.rs | 2 +- 7 files changed, 133 insertions(+), 32 deletions(-) diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 4fd564b5201ff..3f5b532ccfda0 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -23,6 +23,7 @@ use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc, }; +use super::schema::FieldLike; use super::{ iceberg_sequence_num_column_desc, row_id_column_desc, rw_timestamp_column_desc, USER_COLUMN_ID_OFFSET, @@ -523,6 +524,26 @@ impl ColumnCatalog { } } +impl FieldLike for ColumnDesc { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn name(&self) -> &str { + &self.name + } +} + +impl FieldLike for ColumnCatalog { + fn data_type(&self) -> &DataType { + &self.column_desc.data_type + } + + fn name(&self) -> &str { + &self.column_desc.name + } +} + pub fn columns_extend(preserved_columns: &mut Vec, columns: Vec) { debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0); let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id(); diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 240be79d51a44..d76f14b38c8c1 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -33,7 +33,7 @@ use risingwave_pb::catalog::{ StreamJobStatus as PbStreamJobStatus, }; use risingwave_pb::plan_common::ColumnDescVersion; -pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema}; +pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, FieldLike, Schema}; use serde::{Deserialize, Serialize}; use crate::array::DataChunk; diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index 0b49bf4da8bf0..979f4b775680f 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -97,6 +97,22 @@ impl From<&PbColumnDesc> for Field { } } +#[auto_impl::auto_impl(&, &mut)] +pub trait FieldLike { + fn data_type(&self) -> &DataType; + fn name(&self) -> &str; +} + +impl FieldLike for Field { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn name(&self) -> &str { + &self.name + } +} + pub struct FieldDisplay<'a>(pub &'a Field); impl std::fmt::Debug for FieldDisplay<'_> { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 91abf9acf6c99..5ee7ddd5352a1 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -742,7 +742,7 @@ pub async fn bind_create_source_or_table_with_connector( // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary. // XXX: should we also chenge the col id for struct fields? for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(c.name()) + c.column_desc.column_id = col_id_gen.generate(&*c) } debug_assert_column_ids_distinct(&columns); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1fa960da971d9..9815a5ba345b8 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -23,13 +23,14 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ - CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, - INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID, ROWID_PREFIX, + CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, FieldLike, TableId, TableVersionId, + DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID, ROWID_PREFIX, }; use risingwave_common::config::MetaBackend; use risingwave_common::license::Feature; use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::system_param::reader::SystemParamsRead; +use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; @@ -52,9 +53,9 @@ use risingwave_pb::secret::PbSecretRef; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, CreateSink, CreateSinkStatement, - CreateSourceStatement, DataType, DataType as AstDataType, ExplainOptions, Format, - FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark, - Statement, TableConstraint, WebhookSourceInfo, WithProperties, + CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format, FormatEncodeOptions, + Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark, Statement, TableConstraint, + WebhookSourceInfo, WithProperties, }; use risingwave_sqlparser::parser::{IncludeOption, Parser}; use thiserror_ext::AsReport; @@ -77,6 +78,7 @@ use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; +use crate::session::current::notice_to_user; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::OverwriteOptions; @@ -91,7 +93,7 @@ pub struct ColumnIdGenerator { /// exists, its ID is reused. Otherwise, a new ID is generated. /// /// For a new table, this is empty. - pub existing: HashMap, + pub existing: HashMap, /// The next column ID to generate, used for new columns that do not exist in `existing`. pub next_column_id: ColumnId, @@ -109,7 +111,12 @@ impl ColumnIdGenerator { let existing = original .columns() .iter() - .map(|col| (col.name().to_owned(), col.column_id())) + .map(|col| { + ( + col.name().to_owned(), + (col.column_id(), col.data_type().clone()), + ) + }) .collect(); let version = original.version().expect("version field not set"); @@ -130,15 +137,29 @@ impl ColumnIdGenerator { } } - /// Generates a new [`ColumnId`] for a column with the given name. - pub fn generate(&mut self, name: &str) -> ColumnId { - if let Some(id) = self.existing.get(name) { - *id - } else { - let id = self.next_column_id; - self.next_column_id = self.next_column_id.next(); - id + /// Generates a new [`ColumnId`] for a column with the given field. + pub fn generate(&mut self, field: impl FieldLike) -> ColumnId { + if let Some((id, original_type)) = self.existing.get(field.name()) { + // Intentionally not using `datatype_equals` here because we want nested types to be + // exactly the same, **NOT** ignoring field names as they may be referenced in expressions + // of generated columns or downstream jobs. + if original_type == field.data_type() { + return *id; + } else { + notice_to_user(format!( + "The data type of column \"{}\" has been changed from {} to {}. \ + This is currently not supported, even if it could be a compatible change in external systems. \ + The original column will be dropped and a new column will be created.", + field.name(), + original_type, + field.data_type() + )); + } } + + let id = self.next_column_id; + self.next_column_id = self.next_column_id.next(); + id } /// Consume this generator and return a [`TableVersion`] for the table to be created or altered. @@ -564,7 +585,7 @@ pub(crate) fn gen_create_table_plan( let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(c.name()) + c.column_desc.column_id = col_id_gen.generate(&*c) } let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); @@ -817,7 +838,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( )?; for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(c.name()) + c.column_desc.column_id = col_id_gen.generate(&*c) } let (mut columns, pk_column_ids, _row_id_index) = @@ -1901,7 +1922,8 @@ fn bind_webhook_info( webhook_info: WebhookSourceInfo, ) -> Result { // validate columns - if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &DataType::Jsonb { + if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb + { return Err(ErrorCode::InvalidInputSyntax( "Table with webhook source should have exactly one JSONB column".to_owned(), ) @@ -1963,12 +1985,28 @@ mod tests { use super::*; use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA}; + struct BrandNewColumn(&'static str); + use BrandNewColumn as B; + + impl FieldLike for BrandNewColumn { + fn name(&self) -> &str { + self.0 + } + + fn data_type(&self) -> &DataType { + unreachable!("for brand new columns, data type will not be accessed") + } + } + #[test] - fn test_col_id_gen() { + fn test_col_id_gen_initial() { let mut gen = ColumnIdGenerator::new_initial(); - assert_eq!(gen.generate("v1"), ColumnId::new(1)); - assert_eq!(gen.generate("v2"), ColumnId::new(2)); + assert_eq!(gen.generate(B("v1")), ColumnId::new(1)); + assert_eq!(gen.generate(B("v2")), ColumnId::new(2)); + } + #[test] + fn test_col_id_gen_alter() { let mut gen = ColumnIdGenerator::new_alter(&TableCatalog { columns: vec![ ColumnCatalog { @@ -1985,16 +2023,42 @@ mod tests { ), is_hidden: false, }, + ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id( + &Field::with_name( + StructType::new([("f1", DataType::Int32)]).into(), + "nested", + ), + 3, + ), + is_hidden: false, + }, ], - version: Some(TableVersion::new_initial_for_test(ColumnId::new(2))), + version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))), ..Default::default() }); - assert_eq!(gen.generate("v1"), ColumnId::new(3)); - assert_eq!(gen.generate("v2"), ColumnId::new(4)); - assert_eq!(gen.generate("f32"), ColumnId::new(1)); - assert_eq!(gen.generate("f64"), ColumnId::new(2)); - assert_eq!(gen.generate("v3"), ColumnId::new(5)); + assert_eq!(gen.generate(B("v1")), ColumnId::new(4)); + assert_eq!(gen.generate(B("v2")), ColumnId::new(5)); + assert_eq!( + gen.generate(Field::new("f32", DataType::Float32)), + ColumnId::new(1) + ); + assert_eq!( + // mismatched data type, will generate a new column id + gen.generate(Field::new("f64", DataType::Float32)), + ColumnId::new(6) + ); + assert_eq!( + // mismatched data type, will generate a new column id + // we require the nested data type to be exactly the same + gen.generate(Field::new( + "nested", + StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into() + )), + ColumnId::new(7) + ); + assert_eq!(gen.generate(B("v3")), ColumnId::new(8)); } #[tokio::test] @@ -2086,7 +2150,7 @@ mod tests { let mut columns = bind_sql_columns(&column_defs)?; let mut col_id_gen = ColumnIdGenerator::new_initial(); for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(c.name()) + c.column_desc.column_id = col_id_gen.generate(&*c) } let pk_names = diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 2e24d4a516f27..76b0c720b6ae2 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -69,7 +69,7 @@ pub async fn handle_create_as( .fields() .iter() .map(|field| { - let id = col_id_gen.generate(&field.name); + let id = col_id_gen.generate(field); ColumnCatalog { column_desc: ColumnDesc::from_field_with_column_id(field, id.get_id()), is_hidden: false, diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 36f6a5dc12e17..51dd414e950a5 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1145,7 +1145,7 @@ impl SessionImpl { pub fn notice_to_user(&self, str: impl Into) { let notice = str.into(); - tracing::trace!("notice to user:{}", notice); + tracing::trace!(notice, "notice to user"); self.notices.write().push(notice); } From f1474755d7daa9f3dd0f537ce0b2402e80245aa4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 17 Dec 2024 17:47:28 +0800 Subject: [PATCH 2/5] add docs Signed-off-by: Bugen Zhao --- src/common/src/catalog/schema.rs | 3 ++- src/frontend/src/handler/create_table.rs | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/common/src/catalog/schema.rs b/src/common/src/catalog/schema.rs index 979f4b775680f..7c7c12fb031a8 100644 --- a/src/common/src/catalog/schema.rs +++ b/src/common/src/catalog/schema.rs @@ -97,7 +97,8 @@ impl From<&PbColumnDesc> for Field { } } -#[auto_impl::auto_impl(&, &mut)] +/// Something that has a data type and a name. +#[auto_impl::auto_impl(&)] pub trait FieldLike { fn data_type(&self) -> &DataType; fn name(&self) -> &str; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 9815a5ba345b8..92dd42a7c8529 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -87,10 +87,10 @@ use crate::{Binder, TableCatalog, WithOptions}; /// Column ID generator for a new table or a new version of an existing table to alter. #[derive(Debug)] pub struct ColumnIdGenerator { - /// Existing column names and their IDs. + /// Existing column names and their IDs and data types. /// /// This is used for aligning column IDs between versions (`ALTER`s). If a column already - /// exists, its ID is reused. Otherwise, a new ID is generated. + /// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated. /// /// For a new table, this is empty. pub existing: HashMap, @@ -143,6 +143,8 @@ impl ColumnIdGenerator { // Intentionally not using `datatype_equals` here because we want nested types to be // exactly the same, **NOT** ignoring field names as they may be referenced in expressions // of generated columns or downstream jobs. + // TODO: support compatible changes on types, typically for `STRUCT` types. + // https://github.com/risingwavelabs/risingwave/issues/19755 if original_type == field.data_type() { return *id; } else { From 86ec910a61664a8584cbc92748301bdd7b86fd8f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 18 Dec 2024 13:56:06 +0800 Subject: [PATCH 3/5] throw error if data type mismatches Signed-off-by: Bugen Zhao --- src/frontend/src/handler/create_source.rs | 2 +- src/frontend/src/handler/create_table.rs | 69 ++++++++++----------- src/frontend/src/handler/create_table_as.rs | 7 +-- 3 files changed, 38 insertions(+), 40 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5ee7ddd5352a1..6b89712471591 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -742,7 +742,7 @@ pub async fn bind_create_source_or_table_with_connector( // XXX: why do we use col_id_gen here? It doesn't seem to be very necessary. // XXX: should we also chenge the col id for struct fields? for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(&*c) + c.column_desc.column_id = col_id_gen.generate(&*c)?; } debug_assert_column_ids_distinct(&columns); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 92dd42a7c8529..d7c582fadbe51 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -78,7 +78,6 @@ use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; -use crate::session::current::notice_to_user; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::OverwriteOptions; @@ -138,7 +137,9 @@ impl ColumnIdGenerator { } /// Generates a new [`ColumnId`] for a column with the given field. - pub fn generate(&mut self, field: impl FieldLike) -> ColumnId { + /// + /// Returns an error if the data type of the column has been changed. + pub fn generate(&mut self, field: impl FieldLike) -> Result { if let Some((id, original_type)) = self.existing.get(field.name()) { // Intentionally not using `datatype_equals` here because we want nested types to be // exactly the same, **NOT** ignoring field names as they may be referenced in expressions @@ -146,22 +147,21 @@ impl ColumnIdGenerator { // TODO: support compatible changes on types, typically for `STRUCT` types. // https://github.com/risingwavelabs/risingwave/issues/19755 if original_type == field.data_type() { - return *id; + Ok(*id) } else { - notice_to_user(format!( - "The data type of column \"{}\" has been changed from {} to {}. \ - This is currently not supported, even if it could be a compatible change in external systems. \ - The original column will be dropped and a new column will be created.", + bail_not_implemented!( + "The data type of column \"{}\" has been changed from \"{}\" to \"{}\". \ + This is currently not supported, even if it could be a compatible change in external systems.", field.name(), original_type, field.data_type() - )); + ); } + } else { + let id = self.next_column_id; + self.next_column_id = self.next_column_id.next(); + Ok(id) } - - let id = self.next_column_id; - self.next_column_id = self.next_column_id.next(); - id } /// Consume this generator and return a [`TableVersion`] for the table to be created or altered. @@ -587,7 +587,7 @@ pub(crate) fn gen_create_table_plan( let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(&*c) + c.column_desc.column_id = col_id_gen.generate(&*c)?; } let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); @@ -840,7 +840,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( )?; for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(&*c) + c.column_desc.column_id = col_id_gen.generate(&*c)?; } let (mut columns, pk_column_ids, _row_id_index) = @@ -2003,8 +2003,8 @@ mod tests { #[test] fn test_col_id_gen_initial() { let mut gen = ColumnIdGenerator::new_initial(); - assert_eq!(gen.generate(B("v1")), ColumnId::new(1)); - assert_eq!(gen.generate(B("v2")), ColumnId::new(2)); + assert_eq!(gen.generate(B("v1")).unwrap(), ColumnId::new(1)); + assert_eq!(gen.generate(B("v2")).unwrap(), ColumnId::new(2)); } #[test] @@ -2040,27 +2040,26 @@ mod tests { ..Default::default() }); - assert_eq!(gen.generate(B("v1")), ColumnId::new(4)); - assert_eq!(gen.generate(B("v2")), ColumnId::new(5)); + assert_eq!(gen.generate(B("v1")).unwrap(), ColumnId::new(4)); + assert_eq!(gen.generate(B("v2")).unwrap(), ColumnId::new(5)); assert_eq!( - gen.generate(Field::new("f32", DataType::Float32)), + gen.generate(Field::new("f32", DataType::Float32)).unwrap(), ColumnId::new(1) ); - assert_eq!( - // mismatched data type, will generate a new column id - gen.generate(Field::new("f64", DataType::Float32)), - ColumnId::new(6) - ); - assert_eq!( - // mismatched data type, will generate a new column id - // we require the nested data type to be exactly the same - gen.generate(Field::new( - "nested", - StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into() - )), - ColumnId::new(7) - ); - assert_eq!(gen.generate(B("v3")), ColumnId::new(8)); + + // mismatched data type + gen.generate(Field::new("f64", DataType::Float32)) + .unwrap_err(); + + // mismatched data type + // we require the nested data type to be exactly the same + gen.generate(Field::new( + "nested", + StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(), + )) + .unwrap_err(); + + assert_eq!(gen.generate(B("v3")).unwrap(), ColumnId::new(6)); } #[tokio::test] @@ -2152,7 +2151,7 @@ mod tests { let mut columns = bind_sql_columns(&column_defs)?; let mut col_id_gen = ColumnIdGenerator::new_initial(); for c in &mut columns { - c.column_desc.column_id = col_id_gen.generate(&*c) + c.column_desc.column_id = col_id_gen.generate(&*c).unwrap(); } let pk_names = diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 76b0c720b6ae2..d90ad6afe9a73 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -69,13 +69,12 @@ pub async fn handle_create_as( .fields() .iter() .map(|field| { - let id = col_id_gen.generate(field); - ColumnCatalog { + col_id_gen.generate(field).map(|id| ColumnCatalog { column_desc: ColumnDesc::from_field_with_column_id(field, id.get_id()), is_hidden: false, - } + }) }) - .collect() + .try_collect()? } else { unreachable!() } From 7ee4bded24a7962df3661d34d17a27ee49444308 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 18 Dec 2024 13:56:21 +0800 Subject: [PATCH 4/5] add tests Signed-off-by: Bugen Zhao --- .../source_inline/kafka/avro/alter_table.slt | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/avro/alter_table.slt b/e2e_test/source_inline/kafka/avro/alter_table.slt index 08a98c2cca4c9..00f050c82e954 100644 --- a/e2e_test/source_inline/kafka/avro/alter_table.slt +++ b/e2e_test/source_inline/kafka/avro/alter_table.slt @@ -35,6 +35,47 @@ select * from t ---- 1 ABC 2 +# create a new version of schema that added a new optional nested field +system ok +sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"},{"name":"nested","type":["null",{"type":"record","name":"Nested","fields":[{"name":"baz","type":"int"}]}],"default":null}]}' + +# Refresh table schema should succeed +statement ok +ALTER TABLE t REFRESH SCHEMA; + +query ? +select * from t +---- +1 ABC NULL 2 + +# Produce a new message with the new schema +system ok +echo '{"foo":"DEF", "bar":2, "nested":{"Nested":{"baz":2}}}' | rpk topic produce --schema-id=topic avro_alter_table_test + +sleep 4s + +query ? rowsort +select * from t +---- +1 ABC NULL 2 +2 DEF (2) 3 + +# create a new version of schema that added a new field to "nested" +system ok +sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"},{"name":"nested","type":["null",{"type":"record","name":"Nested","fields":[{"name":"baz","type":"int"},{"name":"qux","type":"string","default":""}]}],"default":null}]}' + +# Refresh table schema should fail +statement error +ALTER TABLE t REFRESH SCHEMA; +---- +db error: ERROR: Failed to run the query + +Caused by: + Feature is not yet implemented: The data type of column "nested" has been changed from "struct" to "struct". This is currently not supported, even if it could be a compatible change in external systems. +No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + + + # create a new version of schema that removed field bar system ok sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' @@ -71,10 +112,11 @@ ALTER TABLE t DROP COLUMN gen_col; statement ok ALTER TABLE t REFRESH SCHEMA; -query ? +query ? rowsort select * from t ---- ABC +DEF statement ok drop table t; From b0bc75722be973815d1a9244bf2d4af286077071 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 18 Dec 2024 14:29:53 +0800 Subject: [PATCH 5/5] add docs for alter sourrrrrrrrrrrrrrrrrrrrrrrrrrce Signed-off-by: Bugen Zhao --- src/frontend/src/handler/alter_source_with_sr.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 196d4a7eaf39e..bcc83959d096d 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -81,6 +81,9 @@ fn encode_type_to_encode(from: EncodeType) -> Option { /// - Hidden columns and `INCLUDE ... AS ...` columns are ignored. Because it's only for the special handling of alter sr. /// For the newly resolved `columns_from_resolve_source` (created by [`bind_columns_from_source`]), it doesn't contain hidden columns (`_row_id`) and `INCLUDE ... AS ...` columns. /// This is fragile and we should really refactor it later. +/// - Column with the same name but different data type is considered as a different column, i.e., altering the data type of a column +/// will be treated as dropping the old column and adding a new column. Note that we don't reject here like we do in `ALTER TABLE REFRESH SCHEMA`, +/// because there's no data persistence (thus compatibility concern) in the source case. fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec { columns_a .iter()