From ff862dc97d05072dcb6b37a9be940ebd6a5221ad Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:37:34 +0900 Subject: [PATCH] feat: decouple iceberg commit from risingwave commit (#15634) Co-authored-by: ZENOTME --- ci/scripts/e2e-iceberg-sink-v2-test.sh | 1 + e2e_test/iceberg/main.py | 80 ++++++---- .../append_only_with_checkpoint_interval.slt | 70 +++++++++ .../append_only_with_checkpoint_interval.toml | 28 ++++ src/connector/src/lib.rs | 37 +++++ src/connector/src/sink/iceberg/log_sink.rs | 144 ++++++++++++++++++ src/connector/src/sink/iceberg/mod.rs | 83 +++++++--- src/connector/with_options_sink.yaml | 4 + 8 files changed, 397 insertions(+), 50 deletions(-) create mode 100644 e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt create mode 100644 e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.toml create mode 100644 src/connector/src/sink/iceberg/log_sink.rs diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index c3bac96c9654e..0032e5a64297a 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -44,6 +44,7 @@ bash ./start_spark_connect_server.sh "$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_upsert.toml "$HOME"/.local/bin/poetry run python main.py -t ./test_case/range_partition_append_only.toml "$HOME"/.local/bin/poetry run python main.py -t ./test_case/range_partition_upsert.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/main.py b/e2e_test/iceberg/main.py index 3b9cf2f320b77..03066033c6d24 100644 --- a/e2e_test/iceberg/main.py +++ b/e2e_test/iceberg/main.py @@ -13,7 +13,7 @@ def strtobool(v): - return v.lower() == 'true' + return v.lower() == "true" def strtodate(v): @@ -28,34 +28,32 @@ def strtots(v): def get_spark(args): - spark_config = args['spark'] + spark_config = args["spark"] global g_spark if g_spark is None: - g_spark = SparkSession.builder.remote(spark_config['url']).getOrCreate() + g_spark = SparkSession.builder.remote(spark_config["url"]).getOrCreate() return g_spark -def init_iceberg_table(args,init_sqls): +def init_iceberg_table(args, init_sqls): spark = get_spark(args) for sql in init_sqls: print(f"Executing sql: {sql}") spark.sql(sql) -def execute_slt(args,slt): +def execute_slt(args, slt): if slt is None or slt == "": return - rw_config = args['risingwave'] + rw_config = args["risingwave"] cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}" print(f"Command line is [{cmd}]") - subprocess.run(cmd, - shell=True, - check=True) + subprocess.run(cmd, shell=True, check=True) time.sleep(30) -def verify_result(args,verify_sql,verify_schema,verify_data): +def verify_result(args, verify_sql, verify_schema, verify_data): tc = unittest.TestCase() print(f"Executing sql: {verify_sql}") spark = get_spark(args) @@ -64,9 +62,9 @@ def verify_result(args,verify_sql,verify_schema,verify_data): print(row) rows = verify_data.splitlines() tc.assertEqual(len(df), len(rows)) - for (row1, row2) in zip(df, rows): + for row1, row2 in zip(df, rows): print(f"Row1: {row1}, Row 2: {row2}") - row2 = row2.split(',') + row2 = row2.split(",") for idx, ty in enumerate(verify_schema): if ty == "int" or ty == "long": tc.assertEqual(row1[idx], int(row2[idx])) @@ -77,7 +75,10 @@ def verify_result(args,verify_sql,verify_schema,verify_data): elif ty == "date": tc.assertEqual(row1[idx], strtodate(row2[idx])) elif ty == "timestamp": - tc.assertEqual(row1[idx].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[idx])) + tc.assertEqual( + row1[idx].astimezone(timezone.utc).replace(tzinfo=None), + strtots(row2[idx]), + ) elif ty == "timestamp_ntz": tc.assertEqual(row1[idx], datetime.fromisoformat(row2[idx])) elif ty == "string": @@ -90,34 +91,61 @@ def verify_result(args,verify_sql,verify_schema,verify_data): else: tc.fail(f"Unsupported type {ty}") -def drop_table(args,drop_sqls): +def compare_sql(args, cmp_sqls): + assert len(cmp_sqls) == 2 + spark = get_spark(args) + df1 = spark.sql(cmp_sqls[0]) + df2 = spark.sql(cmp_sqls[1]) + + tc = unittest.TestCase() + diff_df = df1.exceptAll(df2).collect() + print(f"diff {diff_df}") + tc.assertEqual(len(diff_df),0) + diff_df = df2.exceptAll(df1).collect() + print(f"diff {diff_df}") + tc.assertEqual(len(diff_df),0) + + +def drop_table(args, drop_sqls): spark = get_spark(args) for sql in drop_sqls: print(f"Executing sql: {sql}") spark.sql(sql) + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Test script for iceberg") parser.add_argument("-t", dest="test_case", type=str, help="Test case file") - with open(parser.parse_args().test_case,"rb") as test_case: + + with open(parser.parse_args().test_case, "rb") as test_case: test_case = toml.load(test_case) # Extract content from testcase - init_sqls = test_case['init_sqls'] + init_sqls = test_case["init_sqls"] print(f"init_sqls:{init_sqls}") - slt = test_case['slt'] + slt = test_case.get("slt") print(f"slt:{slt}") - verify_schema = test_case['verify_schema'] + verify_schema = test_case.get("verify_schema") print(f"verify_schema:{verify_schema}") - verify_sql = test_case['verify_sql'] + verify_sql = test_case.get("verify_sql") print(f"verify_sql:{verify_sql}") - verify_data = test_case['verify_data'] - drop_sqls = test_case['drop_sqls'] - + verify_data = test_case.get("verify_data") + cmp_sqls = test_case.get("cmp_sqls") + drop_sqls = test_case["drop_sqls"] config = configparser.ConfigParser() config.read("config.ini") print({section: dict(config[section]) for section in config.sections()}) - init_iceberg_table(config,init_sqls) - execute_slt(config,slt) - verify_result(config,verify_sql,verify_schema,verify_data) - drop_table(config,drop_sqls) + init_iceberg_table(config, init_sqls) + if slt is not None and slt != "": + execute_slt(config, slt) + if ( + (verify_data is not None and verify_data != "") + and (verify_sql is not None and verify_sql != "") + and (verify_schema is not None and verify_schema != "") + ): + verify_result(config, verify_sql, verify_schema, verify_data) + if cmp_sqls is not None and cmp_sqls != "" and len(cmp_sqls) == 2: + compare_sql(config, cmp_sqls) + if drop_sqls is not None and drop_sqls != "": + drop_table(config, drop_sqls) + diff --git a/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt new file mode 100644 index 0000000000000..f3cc47b8e1017 --- /dev/null +++ b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.slt @@ -0,0 +1,70 @@ +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar) WITH ( + connector = 'datagen', + fields.i1.kind = 'sequence', + fields.i2.kind = 'random', + fields.i2.length = '32', + fields.i2.seed = '4', + fields.i3.kind = 'random', + fields.i3.length = '64', + fields.i3.seed = '5', + datagen.rows.per.second = '30000' +) FORMAT PLAIN ENCODE JSON; + +sleep 2s + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + database.name = 'demo_db', + table.name = 't1', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' +); + +statement ok +CREATE SINK sink2 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + database.name = 'demo_db', + table.name = 't2', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + commit_checkpoint_interval = 5 +); + +sleep 20s + +statement ok +flush; + +statement ok +DROP SINK sink1; + +statement ok +DROP SINK sink2; + +statement ok +DROP MATERIALIZED VIEW mv1; + +statement ok +DROP TABLE s1; diff --git a/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.toml b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.toml new file mode 100644 index 0000000000000..be9a00977ca30 --- /dev/null +++ b/e2e_test/iceberg/test_case/append_only_with_checkpoint_interval.toml @@ -0,0 +1,28 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.t1 ( + i1 int, + i2 string, + i3 string + ) USING iceberg TBLPROPERTIES ('format-version'='2'); + ''', + ''' + CREATE TABLE demo_db.t2 ( + i1 int, + i2 string, + i3 string + ) USING iceberg TBLPROPERTIES ('format-version'='2'); + ''', +] + +slt = 'test_case/append_only_with_checkpoint_interval.slt' + +cmp_sqls = ["SELECT * FROM demo_db.t1", "SELECT * FROM demo_db.t2"] + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.t1', + 'DROP TABLE IF EXISTS demo_db.t2', + 'DROP SCHEMA IF EXISTS demo_db', +] diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index f0c7e86373eea..4beff70c921a1 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -87,6 +87,43 @@ where }) } +pub(crate) fn deserialize_optional_u64_from_string<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: de::Deserializer<'de>, +{ + let s: String = de::Deserialize::deserialize(deserializer)?; + if s.is_empty() { + Ok(None) + } else { + s.parse() + .map_err(|_| { + de::Error::invalid_value( + de::Unexpected::Str(&s), + &"integer greater than or equal to 0", + ) + }) + .map(Some) + } +} + +pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>( + deserializer: D, +) -> std::result::Result>, D::Error> +where + D: de::Deserializer<'de>, +{ + let s: Option = de::Deserialize::deserialize(deserializer)?; + if let Some(s) = s { + let s = s.to_ascii_lowercase(); + let s = s.split(',').map(|s| s.trim().to_owned()).collect(); + Ok(Some(s)) + } else { + Ok(None) + } +} + pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result where D: de::Deserializer<'de>, diff --git a/src/connector/src/sink/iceberg/log_sink.rs b/src/connector/src/sink/iceberg/log_sink.rs new file mode 100644 index 0000000000000..fec6285774bbe --- /dev/null +++ b/src/connector/src/sink/iceberg/log_sink.rs @@ -0,0 +1,144 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroU64; +use std::time::Instant; + +use async_trait::async_trait; + +use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; +use crate::sink::writer::SinkWriter; +use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; + +pub struct IcebergLogSinkerOf { + writer: W, + sink_metrics: SinkMetrics, + commit_checkpoint_interval: NonZeroU64, +} + +impl IcebergLogSinkerOf { + /// Create a log sinker with a commit checkpoint interval. The sinker should be used with a + /// decouple log reader `KvLogStoreReader`. + pub fn new( + writer: W, + sink_metrics: SinkMetrics, + commit_checkpoint_interval: NonZeroU64, + ) -> Self { + IcebergLogSinkerOf { + writer, + sink_metrics, + commit_checkpoint_interval, + } + } +} + +#[async_trait] +impl> LogSinker for IcebergLogSinkerOf { + async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> { + let mut sink_writer = self.writer; + let sink_metrics = self.sink_metrics; + #[derive(Debug)] + enum LogConsumerState { + /// Mark that the log consumer is not initialized yet + Uninitialized, + + /// Mark that a new epoch has begun. + EpochBegun { curr_epoch: u64 }, + + /// Mark that the consumer has just received a barrier + BarrierReceived { prev_epoch: u64 }, + } + + let mut state = LogConsumerState::Uninitialized; + + let mut current_checkpoint: u64 = 0; + let commit_checkpoint_interval = self.commit_checkpoint_interval; + + loop { + let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; + if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { + match &state { + LogConsumerState::BarrierReceived { .. } => {} + _ => unreachable!( + "update vnode bitmap can be accepted only right after \ + barrier, but current state is {:?}", + state + ), + } + } + // begin_epoch when not previously began + state = match state { + LogConsumerState::Uninitialized => { + sink_writer.begin_epoch(epoch).await?; + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + LogConsumerState::EpochBegun { curr_epoch } => { + assert!( + epoch >= curr_epoch, + "new epoch {} should not be below the current epoch {}", + epoch, + curr_epoch + ); + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + LogConsumerState::BarrierReceived { prev_epoch } => { + assert!( + epoch > prev_epoch, + "new epoch {} should be greater than prev epoch {}", + epoch, + prev_epoch + ); + sink_writer.begin_epoch(epoch).await?; + LogConsumerState::EpochBegun { curr_epoch: epoch } + } + }; + match item { + LogStoreReadItem::StreamChunk { chunk, .. } => { + if let Err(e) = sink_writer.write_batch(chunk).await { + sink_writer.abort().await?; + return Err(e); + } + } + LogStoreReadItem::Barrier { is_checkpoint } => { + let prev_epoch = match state { + LogConsumerState::EpochBegun { curr_epoch } => curr_epoch, + _ => unreachable!("epoch must have begun before handling barrier"), + }; + if is_checkpoint { + current_checkpoint += 1; + if current_checkpoint >= commit_checkpoint_interval.get() { + let start_time = Instant::now(); + sink_writer.barrier(true).await?; + sink_metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + log_reader + .truncate(TruncateOffset::Barrier { epoch }) + .await?; + current_checkpoint = 0; + } else { + sink_writer.barrier(false).await?; + } + } else { + sink_writer.barrier(false).await?; + } + state = LogConsumerState::BarrierReceived { prev_epoch } + } + LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { + sink_writer.update_vnode_bitmap(vnode_bitmap).await?; + } + } + } + } +} diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 8db3d8005757a..e41c5433fe10a 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -13,11 +13,13 @@ // limitations under the License. mod jni_catalog; +mod log_sink; mod mock_catalog; mod prometheus; use std::collections::HashMap; use std::fmt::Debug; +use std::num::NonZeroU64; use std::ops::Deref; use std::sync::Arc; @@ -48,23 +50,27 @@ use risingwave_common::catalog::Schema; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; -use serde::de; use serde_derive::Deserialize; use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; +use self::log_sink::IcebergLogSinkerOf; use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; +use super::catalog::desc::SinkDesc; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -use crate::deserialize_bool_from_string; use crate::error::ConnectorResult; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; -use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; +use crate::sink::writer::SinkWriter; +use crate::sink::{Result, SinkCommitCoordinator, SinkDecouple, SinkParam}; +use crate::{ + deserialize_bool_from_string, deserialize_optional_string_seq_from_string, + deserialize_optional_u64_from_string, +}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; @@ -116,29 +122,17 @@ pub struct IcebergConfig { #[serde( rename = "primary_key", default, - deserialize_with = "deserialize_string_seq_from_string" + deserialize_with = "deserialize_optional_string_seq_from_string" )] pub primary_key: Option>, // Props for java catalog props. #[serde(skip)] pub java_catalog_props: HashMap, -} -pub(crate) fn deserialize_string_seq_from_string<'de, D>( - deserializer: D, -) -> std::result::Result>, D::Error> -where - D: de::Deserializer<'de>, -{ - let s: Option = de::Deserialize::deserialize(deserializer)?; - if let Some(s) = s { - let s = s.to_ascii_lowercase(); - let s = s.split(',').map(|s| s.trim().to_owned()).collect(); - Ok(Some(s)) - } else { - Ok(None) - } + // Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint. + #[serde(default, deserialize_with = "deserialize_optional_u64_from_string")] + pub commit_checkpoint_interval: Option, } impl IcebergConfig { @@ -190,6 +184,12 @@ impl IcebergConfig { .map(|(k, v)| (k[8..].to_string(), v.to_string())) .collect(); + if config.commit_checkpoint_interval == Some(0) { + return Err(SinkError::Config(anyhow!( + "commit_checkpoint_interval must be greater than 0" + ))); + } + Ok(config) } @@ -517,10 +517,34 @@ impl IcebergSink { impl Sink for IcebergSink { type Coordinator = IcebergSinkCommitter; - type LogSinker = LogSinkerOf>; + type LogSinker = IcebergLogSinkerOf>; const SINK_NAME: &'static str = ICEBERG_SINK; + fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result { + let config_decouple = if let Some(interval) = + desc.properties.get("commit_checkpoint_interval") + && interval.parse::().unwrap_or(0) > 1 + { + true + } else { + false + }; + + match user_specified { + SinkDecouple::Default => Ok(config_decouple), + SinkDecouple::Disable => { + if config_decouple { + return Err(SinkError::Config(anyhow!( + "config conflict: Iceberg config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled" + ))); + } + Ok(false) + } + SinkDecouple::Enable => Ok(true), + } + } + async fn validate(&self) -> Result<()> { let _ = self.create_and_validate_table().await?; Ok(()) @@ -533,7 +557,7 @@ impl Sink for IcebergSink { } else { IcebergWriter::new_append_only(table, &writer_param).await? }; - Ok(CoordinatedSinkWriter::new( + let writer = CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -547,8 +571,18 @@ impl Sink for IcebergSink { })?, inner, ) - .await? - .into_log_sinker(writer_param.sink_metrics)) + .await?; + + let commit_checkpoint_interval = + NonZeroU64::new(self.config.commit_checkpoint_interval.unwrap_or(1)).expect( + "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation", + ); + + Ok(IcebergLogSinkerOf::new( + writer, + writer_param.sink_metrics, + commit_checkpoint_interval, + )) } async fn new_coordinator(&self) -> Result { @@ -1103,6 +1137,7 @@ mod test { .into_iter() .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), + commit_checkpoint_interval: None, }; assert_eq!(iceberg_config, expected_iceberg_config); diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 6d8a469975af1..5ce2c96cc64e3 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -159,6 +159,10 @@ IcebergConfig: - name: java_catalog_props field_type: HashMap required: false + - name: commit_checkpoint_interval + field_type: u64 + required: false + default: Default::default KafkaConfig: fields: - name: properties.bootstrap.server