From 2f4008ceff378c88312a6d93fb2f28495a2ee043 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 25 Dec 2024 22:47:22 +0800 Subject: [PATCH] fix(iceberg): fix hidden row id for iceberg engine table (#19901) --- e2e_test/iceberg/test_case/iceberg_engine.slt | 23 +++++++ src/frontend/src/binder/mod.rs | 1 + .../src/binder/relation/table_or_source.rs | 65 ++----------------- 3 files changed, 31 insertions(+), 58 deletions(-) diff --git a/e2e_test/iceberg/test_case/iceberg_engine.slt b/e2e_test/iceberg/test_case/iceberg_engine.slt index a8b5fd21c598e..c3e8fe7b7016b 100644 --- a/e2e_test/iceberg/test_case/iceberg_engine.slt +++ b/e2e_test/iceberg/test_case/iceberg_engine.slt @@ -89,3 +89,26 @@ with ( statement ok DROP TABLE nexmark_t + +# test hidden _row_id column for iceberg engine table +statement ok +create table t_without_pk(name varchar) with(commit_checkpoint_interval = 1) engine = iceberg; + +statement ok +insert into t_without_pk values('xxx'); + +statement ok +FLUSH; + +query ? +select * from t_without_pk; +---- +xxx + +query ? +select count(_row_id) from t_without_pk; +---- +1 + +statement ok +DROP TABLE t_without_pk diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 82fb74d575e86..b52fef4bb25e5 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -393,6 +393,7 @@ impl Binder { matches!(self.bind_for, BindFor::Stream) } + #[allow(dead_code)] fn is_for_batch(&self) -> bool { matches!(self.bind_for, BindFor::Batch) } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 624f1ce6a28b7..428e2051b153c 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use either::Either; use itertools::Itertools; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::{ - debug_assert_column_ids_distinct, is_system_schema, Engine, Field, -}; +use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias}; @@ -131,31 +129,7 @@ impl Binder { .catalog .get_created_table_by_name(&self.db_name, schema_path, table_name) { - match table_catalog.engine() { - Engine::Iceberg => { - if self.is_for_batch() - && let Ok((source_catalog, _)) = - self.catalog.get_source_by_name( - &self.db_name, - schema_path, - &table_catalog.iceberg_source_name().unwrap(), - ) - { - self.resolve_source_relation(&source_catalog.clone(), as_of) - } else { - self.resolve_table_relation( - table_catalog.clone(), - schema_name, - as_of, - )? - } - } - Engine::Hummock => self.resolve_table_relation( - table_catalog.clone(), - schema_name, - as_of, - )?, - } + self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)? } else if let Ok((source_catalog, _)) = self.catalog .get_source_by_name(&self.db_name, schema_path, table_name) @@ -203,36 +177,11 @@ impl Binder { } else if let Some(table_catalog) = schema.get_created_table_by_name(table_name) { - match table_catalog.engine { - Engine::Iceberg => { - if self.is_for_batch() - && let Some(source_catalog) = schema - .get_source_by_name( - &table_catalog - .iceberg_source_name() - .unwrap(), - ) - { - return Ok(self.resolve_source_relation( - &source_catalog.clone(), - as_of, - )); - } else { - return self.resolve_table_relation( - table_catalog.clone(), - &schema_name.clone(), - as_of, - ); - } - } - Engine::Hummock => { - return self.resolve_table_relation( - table_catalog.clone(), - &schema_name.clone(), - as_of, - ); - } - } + return self.resolve_table_relation( + table_catalog.clone(), + &schema_name.clone(), + as_of, + ); } else if let Some(source_catalog) = schema.get_source_by_name(table_name) {