Skip to content

Commit

Permalink
fix(iceberg): fix select empty iceberg table (#18449)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 7, 2024
1 parent 71753f1 commit becb896
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 4 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ poetry run python main.py -t ./test_case/partition_upsert.toml
poetry run python main.py -t ./test_case/range_partition_append_only.toml
poetry run python main.py -t ./test_case/range_partition_upsert.toml
poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml
poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml


echo "--- Kill cluster"
Expand Down
60 changes: 60 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_select_empty_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
flush;

query I
select count(*) from iceberg_t1_source;
----
0

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_select_empty_table.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_select_empty_table.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
19 changes: 15 additions & 4 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ impl IcebergSplitEnumerator {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table_v2().await?;
let current_snapshot = table.metadata().current_snapshot();
if current_snapshot.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit {
split_id: 0,
snapshot_id: 0, // unused
table_meta: TableMetadataJsonStr::serialize(table.metadata()),
files: vec![],
}]);
}

let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
Expand All @@ -232,10 +243,10 @@ impl IcebergSplitEnumerator {
}
}
}
None => match table.metadata().current_snapshot() {
Some(snapshot) => snapshot.snapshot_id(),
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
None => {
assert!(current_snapshot.is_some());
current_snapshot.unwrap().snapshot_id()
}
};
let mut files = vec![];

Expand Down

0 comments on commit becb896

Please sign in to comment.