Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_non_pk_watermark_clean
  • Loading branch information
Li0k committed Dec 25, 2024
2 parents 96de9ba + 2f4008c commit 3127678
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 58 deletions.
23 changes: 23 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,26 @@ with (

statement ok
DROP TABLE nexmark_t

# test hidden _row_id column for iceberg engine table
statement ok
create table t_without_pk(name varchar) with(commit_checkpoint_interval = 1) engine = iceberg;

statement ok
insert into t_without_pk values('xxx');

statement ok
FLUSH;

query ?
select * from t_without_pk;
----
xxx

query ?
select count(_row_id) from t_without_pk;
----
1

statement ok
DROP TABLE t_without_pk
1 change: 1 addition & 0 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ impl Binder {
matches!(self.bind_for, BindFor::Stream)
}

#[allow(dead_code)]
fn is_for_batch(&self) -> bool {
matches!(self.bind_for, BindFor::Batch)
}
Expand Down
65 changes: 7 additions & 58 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use std::sync::Arc;
use either::Either;
use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
debug_assert_column_ids_distinct, is_system_schema, Engine, Field,
};
use risingwave_common::catalog::{debug_assert_column_ids_distinct, is_system_schema, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
Expand Down Expand Up @@ -131,31 +129,7 @@ impl Binder {
.catalog
.get_created_table_by_name(&self.db_name, schema_path, table_name)
{
match table_catalog.engine() {
Engine::Iceberg => {
if self.is_for_batch()
&& let Ok((source_catalog, _)) =
self.catalog.get_source_by_name(
&self.db_name,
schema_path,
&table_catalog.iceberg_source_name().unwrap(),
)
{
self.resolve_source_relation(&source_catalog.clone(), as_of)
} else {
self.resolve_table_relation(
table_catalog.clone(),
schema_name,
as_of,
)?
}
}
Engine::Hummock => self.resolve_table_relation(
table_catalog.clone(),
schema_name,
as_of,
)?,
}
self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
} else if let Ok((source_catalog, _)) =
self.catalog
.get_source_by_name(&self.db_name, schema_path, table_name)
Expand Down Expand Up @@ -203,36 +177,11 @@ impl Binder {
} else if let Some(table_catalog) =
schema.get_created_table_by_name(table_name)
{
match table_catalog.engine {
Engine::Iceberg => {
if self.is_for_batch()
&& let Some(source_catalog) = schema
.get_source_by_name(
&table_catalog
.iceberg_source_name()
.unwrap(),
)
{
return Ok(self.resolve_source_relation(
&source_catalog.clone(),
as_of,
));
} else {
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
as_of,
);
}
}
Engine::Hummock => {
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
as_of,
);
}
}
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
as_of,
);
} else if let Some(source_catalog) =
schema.get_source_by_name(table_name)
{
Expand Down

0 comments on commit 3127678

Please sign in to comment.