Skip to content

Commit

Permalink
fix(frontend): check data type in column id generator (#19828)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored and BugenZhao committed Dec 23, 2024
1 parent 33ed977 commit a6bbdb4
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 35 deletions.
44 changes: 43 additions & 1 deletion e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,47 @@ select * from t
----
1 ABC 2

# create a new version of schema that added a new optional nested field
system ok
sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"},{"name":"nested","type":["null",{"type":"record","name":"Nested","fields":[{"name":"baz","type":"int"}]}],"default":null}]}'

# Refresh table schema should succeed
statement ok
ALTER TABLE t REFRESH SCHEMA;

query ?
select * from t
----
1 ABC NULL 2

# Produce a new message with the new schema
system ok
echo '{"foo":"DEF", "bar":2, "nested":{"Nested":{"baz":2}}}' | rpk topic produce --schema-id=topic avro_alter_table_test

sleep 4s

query ? rowsort
select * from t
----
1 ABC NULL 2
2 DEF (2) 3

# create a new version of schema that added a new field to "nested"
system ok
sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"},{"name":"nested","type":["null",{"type":"record","name":"Nested","fields":[{"name":"baz","type":"int"},{"name":"qux","type":"string","default":""}]}],"default":null}]}'

# Refresh table schema should fail
statement error
ALTER TABLE t REFRESH SCHEMA;
----
db error: ERROR: Failed to run the query

Caused by:
Feature is not yet implemented: The data type of column "nested" has been changed from "struct<baz integer>" to "struct<baz integer, qux character varying>". This is currently not supported, even if it could be a compatible change in external systems.
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml



# create a new version of schema that removed field bar
system ok
sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}'
Expand Down Expand Up @@ -71,10 +112,11 @@ ALTER TABLE t DROP COLUMN gen_col;
statement ok
ALTER TABLE t REFRESH SCHEMA;

query ?
query ? rowsort
select * from t
----
ABC
DEF

statement ok
drop table t;
Expand Down
21 changes: 21 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
};

use super::schema::FieldLike;
use super::{
iceberg_sequence_num_column_desc, row_id_column_desc, rw_timestamp_column_desc,
USER_COLUMN_ID_OFFSET,
Expand Down Expand Up @@ -523,6 +524,26 @@ impl ColumnCatalog {
}
}

impl FieldLike for ColumnDesc {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn name(&self) -> &str {
&self.name
}
}

impl FieldLike for ColumnCatalog {
fn data_type(&self) -> &DataType {
&self.column_desc.data_type
}

fn name(&self) -> &str {
&self.column_desc.name
}
}

pub fn columns_extend(preserved_columns: &mut Vec<ColumnCatalog>, columns: Vec<ColumnCatalog>) {
debug_assert_eq!(ROW_ID_COLUMN_ID.get_id(), 0);
let mut max_incoming_column_id = ROW_ID_COLUMN_ID.get_id();
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_pb::catalog::{
StreamJobStatus as PbStreamJobStatus,
};
use risingwave_pb::plan_common::ColumnDescVersion;
pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema};
pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, FieldLike, Schema};
use serde::{Deserialize, Serialize};

use crate::array::DataChunk;
Expand Down
17 changes: 17 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ impl From<&PbColumnDesc> for Field {
}
}

/// Something that has a data type and a name.
#[auto_impl::auto_impl(&)]
pub trait FieldLike {
fn data_type(&self) -> &DataType;
fn name(&self) -> &str;
}

impl FieldLike for Field {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn name(&self) -> &str {
&self.name
}
}

pub struct FieldDisplay<'a>(pub &'a Field);

impl std::fmt::Debug for FieldDisplay<'_> {
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ fn encode_type_to_encode(from: EncodeType) -> Option<Encode> {
/// - Hidden columns and `INCLUDE ... AS ...` columns are ignored. Because it's only for the special handling of alter sr.
/// For the newly resolved `columns_from_resolve_source` (created by [`bind_columns_from_source`]), it doesn't contain hidden columns (`_row_id`) and `INCLUDE ... AS ...` columns.
/// This is fragile and we should really refactor it later.
/// - Column with the same name but different data type is considered as a different column, i.e., altering the data type of a column
/// will be treated as dropping the old column and adding a new column. Note that we don't reject here like we do in `ALTER TABLE REFRESH SCHEMA`,
/// because there's no data persistence (thus compatibility concern) in the source case.
fn columns_minus(columns_a: &[ColumnCatalog], columns_b: &[ColumnCatalog]) -> Vec<ColumnCatalog> {
columns_a
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ pub async fn bind_create_source_or_table_with_connector(
// XXX: why do we use col_id_gen here? It doesn't seem to be very necessary.
// XXX: should we also chenge the col id for struct fields?
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
c.column_desc.column_id = col_id_gen.generate(&*c)?;
}
debug_assert_column_ids_distinct(&columns);

Expand Down
119 changes: 92 additions & 27 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME,
INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID, ROWID_PREFIX,
CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, FieldLike, TableId, TableVersionId,
DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, RISINGWAVE_ICEBERG_ROW_ID, ROWID_PREFIX,
};
use risingwave_common::config::MetaBackend;
use risingwave_common::license::Feature;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
Expand All @@ -52,9 +53,9 @@ use risingwave_pb::secret::PbSecretRef;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{
CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, CreateSink, CreateSinkStatement,
CreateSourceStatement, DataType, DataType as AstDataType, ExplainOptions, Format,
FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark,
Statement, TableConstraint, WebhookSourceInfo, WithProperties,
CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format, FormatEncodeOptions,
Ident, ObjectName, OnConflict, SecretRefAsType, SourceWatermark, Statement, TableConstraint,
WebhookSourceInfo, WithProperties,
};
use risingwave_sqlparser::parser::{IncludeOption, Parser};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -85,13 +86,13 @@ use crate::{Binder, TableCatalog, WithOptions};
/// Column ID generator for a new table or a new version of an existing table to alter.
#[derive(Debug)]
pub struct ColumnIdGenerator {
/// Existing column names and their IDs.
/// Existing column names and their IDs and data types.
///
/// This is used for aligning column IDs between versions (`ALTER`s). If a column already
/// exists, its ID is reused. Otherwise, a new ID is generated.
/// exists and the data type matches, its ID is reused. Otherwise, a new ID is generated.
///
/// For a new table, this is empty.
pub existing: HashMap<String, ColumnId>,
pub existing: HashMap<String, (ColumnId, DataType)>,

/// The next column ID to generate, used for new columns that do not exist in `existing`.
pub next_column_id: ColumnId,
Expand All @@ -109,7 +110,12 @@ impl ColumnIdGenerator {
let existing = original
.columns()
.iter()
.map(|col| (col.name().to_owned(), col.column_id()))
.map(|col| {
(
col.name().to_owned(),
(col.column_id(), col.data_type().clone()),
)
})
.collect();

let version = original.version().expect("version field not set");
Expand All @@ -130,14 +136,31 @@ impl ColumnIdGenerator {
}
}

/// Generates a new [`ColumnId`] for a column with the given name.
pub fn generate(&mut self, name: &str) -> ColumnId {
if let Some(id) = self.existing.get(name) {
*id
/// Generates a new [`ColumnId`] for a column with the given field.
///
/// Returns an error if the data type of the column has been changed.
pub fn generate(&mut self, field: impl FieldLike) -> Result<ColumnId> {
if let Some((id, original_type)) = self.existing.get(field.name()) {
// Intentionally not using `datatype_equals` here because we want nested types to be
// exactly the same, **NOT** ignoring field names as they may be referenced in expressions
// of generated columns or downstream jobs.
// TODO: support compatible changes on types, typically for `STRUCT` types.
// https://github.com/risingwavelabs/risingwave/issues/19755
if original_type == field.data_type() {
Ok(*id)
} else {
bail_not_implemented!(
"The data type of column \"{}\" has been changed from \"{}\" to \"{}\". \
This is currently not supported, even if it could be a compatible change in external systems.",
field.name(),
original_type,
field.data_type()
);
}
} else {
let id = self.next_column_id;
self.next_column_id = self.next_column_id.next();
id
Ok(id)
}
}

Expand Down Expand Up @@ -564,7 +587,7 @@ pub(crate) fn gen_create_table_plan(
let definition = context.normalized_sql().to_owned();
let mut columns = bind_sql_columns(&column_defs)?;
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
c.column_desc.column_id = col_id_gen.generate(&*c)?;
}

let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
Expand Down Expand Up @@ -817,7 +840,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
)?;

for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
c.column_desc.column_id = col_id_gen.generate(&*c)?;
}

let (mut columns, pk_column_ids, _row_id_index) =
Expand Down Expand Up @@ -1901,7 +1924,8 @@ fn bind_webhook_info(
webhook_info: WebhookSourceInfo,
) -> Result<PbWebhookSourceInfo> {
// validate columns
if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &DataType::Jsonb {
if columns_defs.len() != 1 || columns_defs[0].data_type.as_ref().unwrap() != &AstDataType::Jsonb
{
return Err(ErrorCode::InvalidInputSyntax(
"Table with webhook source should have exactly one JSONB column".to_owned(),
)
Expand Down Expand Up @@ -1963,12 +1987,28 @@ mod tests {
use super::*;
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

struct BrandNewColumn(&'static str);
use BrandNewColumn as B;

impl FieldLike for BrandNewColumn {
fn name(&self) -> &str {
self.0
}

fn data_type(&self) -> &DataType {
unreachable!("for brand new columns, data type will not be accessed")
}
}

#[test]
fn test_col_id_gen() {
fn test_col_id_gen_initial() {
let mut gen = ColumnIdGenerator::new_initial();
assert_eq!(gen.generate("v1"), ColumnId::new(1));
assert_eq!(gen.generate("v2"), ColumnId::new(2));
assert_eq!(gen.generate(B("v1")).unwrap(), ColumnId::new(1));
assert_eq!(gen.generate(B("v2")).unwrap(), ColumnId::new(2));
}

#[test]
fn test_col_id_gen_alter() {
let mut gen = ColumnIdGenerator::new_alter(&TableCatalog {
columns: vec![
ColumnCatalog {
Expand All @@ -1985,16 +2025,41 @@ mod tests {
),
is_hidden: false,
},
ColumnCatalog {
column_desc: ColumnDesc::from_field_with_column_id(
&Field::with_name(
StructType::new([("f1", DataType::Int32)]).into(),
"nested",
),
3,
),
is_hidden: false,
},
],
version: Some(TableVersion::new_initial_for_test(ColumnId::new(2))),
version: Some(TableVersion::new_initial_for_test(ColumnId::new(3))),
..Default::default()
});

assert_eq!(gen.generate("v1"), ColumnId::new(3));
assert_eq!(gen.generate("v2"), ColumnId::new(4));
assert_eq!(gen.generate("f32"), ColumnId::new(1));
assert_eq!(gen.generate("f64"), ColumnId::new(2));
assert_eq!(gen.generate("v3"), ColumnId::new(5));
assert_eq!(gen.generate(B("v1")).unwrap(), ColumnId::new(4));
assert_eq!(gen.generate(B("v2")).unwrap(), ColumnId::new(5));
assert_eq!(
gen.generate(Field::new("f32", DataType::Float32)).unwrap(),
ColumnId::new(1)
);

// mismatched data type
gen.generate(Field::new("f64", DataType::Float32))
.unwrap_err();

// mismatched data type
// we require the nested data type to be exactly the same
gen.generate(Field::new(
"nested",
StructType::new([("f1", DataType::Int32), ("f2", DataType::Int64)]).into(),
))
.unwrap_err();

assert_eq!(gen.generate(B("v3")).unwrap(), ColumnId::new(6));
}

#[tokio::test]
Expand Down Expand Up @@ -2086,7 +2151,7 @@ mod tests {
let mut columns = bind_sql_columns(&column_defs)?;
let mut col_id_gen = ColumnIdGenerator::new_initial();
for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
c.column_desc.column_id = col_id_gen.generate(&*c).unwrap();
}

let pk_names =
Expand Down
7 changes: 3 additions & 4 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,12 @@ pub async fn handle_create_as(
.fields()
.iter()
.map(|field| {
let id = col_id_gen.generate(&field.name);
ColumnCatalog {
col_id_gen.generate(field).map(|id| ColumnCatalog {
column_desc: ColumnDesc::from_field_with_column_id(field, id.get_id()),
is_hidden: false,
}
})
})
.collect()
.try_collect()?
} else {
unreachable!()
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ impl SessionImpl {

pub fn notice_to_user(&self, str: impl Into<String>) {
let notice = str.into();
tracing::trace!("notice to user:{}", notice);
tracing::trace!(notice, "notice to user");
self.notices.write().push(notice);
}

Expand Down

0 comments on commit a6bbdb4

Please sign in to comment.