From 19b3ec14b2148724073999eae8daaa00814c0540 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Sat, 11 Jan 2025 10:46:54 +0800 Subject: [PATCH 1/3] fix --- pkg/sink/codec/avro/avro.go | 10 ++++++++++ tests/integration_tests/avro_basic/data/data.sql | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 6ac329a22ea..18090b67c38 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -560,8 +560,18 @@ func (a *BatchEncoder) columns2AvroSchema(tableName model.TableName, input avroE } } else { if colx.GetFlag().IsNullable() { + // the string literal "null" must be coerced to a `nil` + // see https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/record.go#L109-L114 // https://stackoverflow.com/questions/22938124/avro-field-default-values + defaultFirst := false if defaultValue == nil { + defaultFirst = true + } else if s, ok := defaultValue.(string); ok && s == "null" { + defaultFirst = true + } else if b, ok := defaultValue.([]byte); ok && string(b) == "null" { + defaultFirst = true + } + if defaultFirst { field["type"] = []interface{}{"null", avroType} } else { field["type"] = []interface{}{avroType, "null"} diff --git a/tests/integration_tests/avro_basic/data/data.sql b/tests/integration_tests/avro_basic/data/data.sql index 0be102ba636..1fce9b8bc38 100644 --- a/tests/integration_tests/avro_basic/data/data.sql +++ b/tests/integration_tests/avro_basic/data/data.sql @@ -164,6 +164,15 @@ insert into t(c_tinyint, c_mediumint, c_int, c_bigint, a) values (4, 5, 6, 7, 8) alter table t modify c_mediumint varchar(10) null; insert into t(c_tinyint, c_mediumint, c_int, c_bigint, a) values (5, "234", 6, 7, 8); +create table t1( + id int primary key, + c1 varchar(255) DEFAULT "null", + c2 varchar(255) DEFAULT "NULL", + c3 varchar(255) DEFAULT NULL +); + +insert into t1(id) values(1); + create table finish_mark ( id int PRIMARY KEY From 362267ea06ebf4eee798ee4e8707a80657227ae3 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 13 Jan 2025 13:36:11 +0800 Subject: [PATCH 2/3] update --- tests/integration_tests/avro_basic/data/data.sql | 1 + tests/integration_tests/avro_basic/run.sh | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests/avro_basic/data/data.sql b/tests/integration_tests/avro_basic/data/data.sql index 1fce9b8bc38..5158c23e7c1 100644 --- a/tests/integration_tests/avro_basic/data/data.sql +++ b/tests/integration_tests/avro_basic/data/data.sql @@ -172,6 +172,7 @@ create table t1( ); insert into t1(id) values(1); +update t1 set c1 = "null", c2 = "NULL", c3 = NULL where id = 1; create table finish_mark ( diff --git a/tests/integration_tests/avro_basic/run.sh b/tests/integration_tests/avro_basic/run.sh index 27f8b0b0d51..6abb9063e8b 100644 --- a/tests/integration_tests/avro_basic/run.sh +++ b/tests/integration_tests/avro_basic/run.sh @@ -47,7 +47,7 @@ function run() { SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=avro&enable-tidb-extension=true&avro-enable-watermark=true&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" --schema-registry=http://127.0.0.1:8088 - cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --schema-registry-uri=http://127.0.0.1:8088 --config="$CUR/conf/changefeed.toml" 2>&1 & + run_kafka_consumer $WORK_DIR $SINK_URI "$CUR/conf/changefeed.toml" http://127.0.0.1:8088 run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} From a27a92743e3981b8bccb848e5d0e33a2679fc1e5 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 14 Jan 2025 17:25:02 +0800 Subject: [PATCH 3/3] add test --- tests/integration_tests/avro_basic/data/data.sql | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/avro_basic/data/data.sql b/tests/integration_tests/avro_basic/data/data.sql index 5158c23e7c1..a9e36afe834 100644 --- a/tests/integration_tests/avro_basic/data/data.sql +++ b/tests/integration_tests/avro_basic/data/data.sql @@ -166,13 +166,16 @@ insert into t(c_tinyint, c_mediumint, c_int, c_bigint, a) values (5, "234", 6, 7 create table t1( id int primary key, - c1 varchar(255) DEFAULT "null", - c2 varchar(255) DEFAULT "NULL", - c3 varchar(255) DEFAULT NULL + c1 varchar(255) default "null", + c2 varchar(255) default "NULL", + c3 varchar(255) default null ); insert into t1(id) values(1); -update t1 set c1 = "null", c2 = "NULL", c3 = NULL where id = 1; +update t1 set c1 = "null", c2 = "NULL", c3 = null where id = 1; +alter table t1 add column col json not null; +alter table t1 modify column col json default null; +insert into t1(id) values(2); create table finish_mark (