Skip to content

Commit

Permalink
feat: decouple iceberg commit from risingwave commit (#15634)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Apr 8, 2024
1 parent 7702a3b commit ff862dc
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 50 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ bash ./start_spark_connect_server.sh
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_upsert.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/range_partition_append_only.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/range_partition_upsert.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml


echo "--- Kill cluster"
Expand Down
80 changes: 54 additions & 26 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


def strtobool(v):
return v.lower() == 'true'
return v.lower() == "true"


def strtodate(v):
Expand All @@ -28,34 +28,32 @@ def strtots(v):


def get_spark(args):
spark_config = args['spark']
spark_config = args["spark"]
global g_spark
if g_spark is None:
g_spark = SparkSession.builder.remote(spark_config['url']).getOrCreate()
g_spark = SparkSession.builder.remote(spark_config["url"]).getOrCreate()

return g_spark


def init_iceberg_table(args,init_sqls):
def init_iceberg_table(args, init_sqls):
spark = get_spark(args)
for sql in init_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def execute_slt(args,slt):
def execute_slt(args, slt):
if slt is None or slt == "":
return
rw_config = args['risingwave']
rw_config = args["risingwave"]
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}"
print(f"Command line is [{cmd}]")
subprocess.run(cmd,
shell=True,
check=True)
subprocess.run(cmd, shell=True, check=True)
time.sleep(30)


def verify_result(args,verify_sql,verify_schema,verify_data):
def verify_result(args, verify_sql, verify_schema, verify_data):
tc = unittest.TestCase()
print(f"Executing sql: {verify_sql}")
spark = get_spark(args)
Expand All @@ -64,9 +62,9 @@ def verify_result(args,verify_sql,verify_schema,verify_data):
print(row)
rows = verify_data.splitlines()
tc.assertEqual(len(df), len(rows))
for (row1, row2) in zip(df, rows):
for row1, row2 in zip(df, rows):
print(f"Row1: {row1}, Row 2: {row2}")
row2 = row2.split(',')
row2 = row2.split(",")
for idx, ty in enumerate(verify_schema):
if ty == "int" or ty == "long":
tc.assertEqual(row1[idx], int(row2[idx]))
Expand All @@ -77,7 +75,10 @@ def verify_result(args,verify_sql,verify_schema,verify_data):
elif ty == "date":
tc.assertEqual(row1[idx], strtodate(row2[idx]))
elif ty == "timestamp":
tc.assertEqual(row1[idx].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[idx]))
tc.assertEqual(
row1[idx].astimezone(timezone.utc).replace(tzinfo=None),
strtots(row2[idx]),
)
elif ty == "timestamp_ntz":
tc.assertEqual(row1[idx], datetime.fromisoformat(row2[idx]))
elif ty == "string":
Expand All @@ -90,34 +91,61 @@ def verify_result(args,verify_sql,verify_schema,verify_data):
else:
tc.fail(f"Unsupported type {ty}")

def drop_table(args,drop_sqls):
def compare_sql(args, cmp_sqls):
assert len(cmp_sqls) == 2
spark = get_spark(args)
df1 = spark.sql(cmp_sqls[0])
df2 = spark.sql(cmp_sqls[1])

tc = unittest.TestCase()
diff_df = df1.exceptAll(df2).collect()
print(f"diff {diff_df}")
tc.assertEqual(len(diff_df),0)
diff_df = df2.exceptAll(df1).collect()
print(f"diff {diff_df}")
tc.assertEqual(len(diff_df),0)


def drop_table(args, drop_sqls):
spark = get_spark(args)
for sql in drop_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test script for iceberg")
parser.add_argument("-t", dest="test_case", type=str, help="Test case file")
with open(parser.parse_args().test_case,"rb") as test_case:

with open(parser.parse_args().test_case, "rb") as test_case:
test_case = toml.load(test_case)
# Extract content from testcase
init_sqls = test_case['init_sqls']
init_sqls = test_case["init_sqls"]
print(f"init_sqls:{init_sqls}")
slt = test_case['slt']
slt = test_case.get("slt")
print(f"slt:{slt}")
verify_schema = test_case['verify_schema']
verify_schema = test_case.get("verify_schema")
print(f"verify_schema:{verify_schema}")
verify_sql = test_case['verify_sql']
verify_sql = test_case.get("verify_sql")
print(f"verify_sql:{verify_sql}")
verify_data = test_case['verify_data']
drop_sqls = test_case['drop_sqls']

verify_data = test_case.get("verify_data")
cmp_sqls = test_case.get("cmp_sqls")
drop_sqls = test_case["drop_sqls"]
config = configparser.ConfigParser()
config.read("config.ini")
print({section: dict(config[section]) for section in config.sections()})

init_iceberg_table(config,init_sqls)
execute_slt(config,slt)
verify_result(config,verify_sql,verify_schema,verify_data)
drop_table(config,drop_sqls)
init_iceberg_table(config, init_sqls)
if slt is not None and slt != "":
execute_slt(config, slt)
if (
(verify_data is not None and verify_data != "")
and (verify_sql is not None and verify_sql != "")
and (verify_schema is not None and verify_schema != "")
):
verify_result(config, verify_sql, verify_schema, verify_data)
if cmp_sqls is not None and cmp_sqls != "" and len(cmp_sqls) == 2:
compare_sql(config, cmp_sqls)
if drop_sqls is not None and drop_sqls != "":
drop_table(config, drop_sqls)

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar) WITH (
connector = 'datagen',
fields.i1.kind = 'sequence',
fields.i2.kind = 'random',
fields.i2.length = '32',
fields.i2.seed = '4',
fields.i3.kind = 'random',
fields.i3.length = '64',
fields.i3.seed = '5',
datagen.rows.per.second = '30000'
) FORMAT PLAIN ENCODE JSON;

sleep 2s

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

statement ok
CREATE SINK sink2 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't2',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 5
);

sleep 20s

statement ok
flush;

statement ok
DROP SINK sink1;

statement ok
DROP SINK sink2;

statement ok
DROP MATERIALIZED VIEW mv1;

statement ok
DROP TABLE s1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.t1 (
i1 int,
i2 string,
i3 string
) USING iceberg TBLPROPERTIES ('format-version'='2');
''',
'''
CREATE TABLE demo_db.t2 (
i1 int,
i2 string,
i3 string
) USING iceberg TBLPROPERTIES ('format-version'='2');
''',
]

slt = 'test_case/append_only_with_checkpoint_interval.slt'

cmp_sqls = ["SELECT * FROM demo_db.t1", "SELECT * FROM demo_db.t2"]

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP TABLE IF EXISTS demo_db.t2',
'DROP SCHEMA IF EXISTS demo_db',
]
37 changes: 37 additions & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,43 @@ where
})
}

pub(crate) fn deserialize_optional_u64_from_string<'de, D>(
deserializer: D,
) -> Result<Option<u64>, D::Error>
where
D: de::Deserializer<'de>,
{
let s: String = de::Deserialize::deserialize(deserializer)?;
if s.is_empty() {
Ok(None)
} else {
s.parse()
.map_err(|_| {
de::Error::invalid_value(
de::Unexpected::Str(&s),
&"integer greater than or equal to 0",
)
})
.map(Some)
}
}

pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Vec<String>>, D::Error>
where
D: de::Deserializer<'de>,
{
let s: Option<String> = de::Deserialize::deserialize(deserializer)?;
if let Some(s) = s {
let s = s.to_ascii_lowercase();
let s = s.split(',').map(|s| s.trim().to_owned()).collect();
Ok(Some(s))
} else {
Ok(None)
}
}

pub(crate) fn deserialize_bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: de::Deserializer<'de>,
Expand Down
Loading

0 comments on commit ff862dc

Please sign in to comment.