diff --git a/e2e_test/iceberg/test_case/iceberg_engine.slt b/e2e_test/iceberg/test_case/iceberg_engine.slt index 8129dd6265d83..a8b5fd21c598e 100644 --- a/e2e_test/iceberg/test_case/iceberg_engine.slt +++ b/e2e_test/iceberg/test_case/iceberg_engine.slt @@ -66,4 +66,26 @@ statement ok DROP TABLE full_type_t; +# test connector with commit_checkpoint_interval +statement ok +create table nexmark_t ( + id BIGINT, + item_name VARCHAR, + description VARCHAR, + initial_bid BIGINT, + reserve BIGINT, + date_time TIMESTAMP, + expires TIMESTAMP, + seller BIGINT, + category BIGINT, + extra VARCHAR) +with ( + connector = 'nexmark', + nexmark.table.type = 'Auction', + nexmark.split.num = '2', + nexmark.min.event.gap.in.ns = '500000', + commit_checkpoint_interval = 1 +) engine = iceberg; +statement ok +DROP TABLE nexmark_t diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1fa960da971d9..6668448c3d836 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -35,6 +35,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_common::{bail, bail_not_implemented}; use risingwave_connector::jvm_runtime::JVM; +use risingwave_connector::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL; use risingwave_connector::source::cdc::build_cdc_table_id; use risingwave_connector::source::cdc::external::{ ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, @@ -1381,7 +1382,7 @@ pub async fn handle_create_table( pub async fn create_iceberg_engine_table( session: Arc, handler_args: HandlerArgs, - source: Option, + mut source: Option, table: PbTable, graph: StreamFragmentGraph, job_type: TableJobType, @@ -1621,7 +1622,7 @@ pub async fn create_iceberg_engine_table( with.insert("table.name".to_owned(), iceberg_table_name.clone()); let commit_checkpoint_interval = handler_args .with_options - .get("commit_checkpoint_interval") + .get(COMMIT_CHECKPOINT_INTERVAL) .map(|v| v.to_owned()) .unwrap_or_else(|| "60".to_owned()); let commit_checkpoint_interval = commit_checkpoint_interval.parse::().map_err(|_| { @@ -1635,13 +1636,18 @@ pub async fn create_iceberg_engine_table( bail!("commit_checkpoint_interval must be a positive integer: 0"); } + // remove commit_checkpoint_interval from source options, otherwise it will be considered as an unknown field. + source + .as_mut() + .map(|x| x.with_properties.remove(COMMIT_CHECKPOINT_INTERVAL)); + let sink_decouple = session.config().sink_decouple(); if matches!(sink_decouple, SinkDecouple::Disable) && commit_checkpoint_interval > 1 { bail!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled") } with.insert( - "commit_checkpoint_interval".to_owned(), + COMMIT_CHECKPOINT_INTERVAL.to_owned(), commit_checkpoint_interval.to_string(), ); with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());