Skip to content

Commit

Permalink
refactor(test): unify sr_register usage for avro and protobuf (#19107)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Oct 26, 2024
1 parent d5d0283 commit 32666f2
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 80 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ echo "preparing confluent schema registry"
python3 -m pip install --break-system-packages requests confluent-kafka

echo "testing protobuf"
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
risedev slt 'e2e_test/sink/kafka/protobuf.slt'

echo "testing avro"
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
risedev slt 'e2e_test/sink/kafka/avro.slt'
15 changes: 9 additions & 6 deletions e2e_test/commands/sr_register
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,27 @@ set -euo pipefail

# Register a schema to schema registry
#
# Usage: sr_register <subject> <schema>
# Usage: sr_register <subject> <encoding>
# Schema content is read from stdin. Use redirection or heredoc.
#
# https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions

# Validate arguments
if [[ $# -ne 2 ]]; then
echo "Usage: sr_register <subject> <schema>"
echo "Usage: sr_register <subject> <encoding>"
echo "Schema content is read from stdin. Use redirection or heredoc."
echo "<encoding> is one of AVRO, PROTOBUF or JSON."
exit 1
fi

subject="$1"
schema="$2"
encoding="$2"


if [[ -z $subject || -z $schema ]]; then
if [[ -z $subject || -z $encoding ]]; then
echo "Error: Arguments cannot be empty"
exit 1
fi

echo "$schema" | jq '{"schema": tojson}' \
| curl -X POST -H 'content-type:application/vnd.schemaregistry.v1+json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/${subject}/versions"
jq -Rs "{\"schema\": ., \"schemaType\": \"$encoding\"}" \
| curl -X POST -H 'content-type:application/vnd.schemaregistry.v1+json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL:-http://schemaregistry:8082}/subjects/${subject}/versions"
10 changes: 6 additions & 4 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ set sink_decouple = false;
system ok
rpk topic create test-rw-sink-upsert-avro

# use jq to remove 2 fields/columns before registration
system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
jq '.fields = (.fields | map(select(.name | test("unsupported|mon_day_sec_field") | not)))' src/connector/src/test_data/all-types.avsc | sr_register 'test-rw-sink-upsert-avro-value' AVRO

# use jq to select 2 fields/columns used as key
system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
jq '.fields = (.fields | map(select(.name | test("string_field|int32_field"))))' src/connector/src/test_data/all-types.avsc | sr_register 'test-rw-sink-upsert-avro-key' AVRO

statement ok
create table from_kafka ( *, gen_i32_field int as int32_field + 2, primary key (some_key) )
Expand Down Expand Up @@ -58,7 +60,7 @@ system ok
rpk topic create test-rw-sink-plain-avro

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions'
sr_register test-rw-sink-plain-avro-value AVRO << EOF
{
"type": "record",
"name": "Simple",
Expand Down Expand Up @@ -106,7 +108,7 @@ format plain encode avro (
key encode text;

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions'
sr_register test-rw-sink-plain-avro-key AVRO << EOF
{
"type": "record",
"name": "Key",
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-a

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
sr_register 'test-rw-sink-append-only-protobuf-csr-a-value' PROTOBUF < src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_trivial with (
Expand All @@ -32,7 +32,7 @@ system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-hi

system ok
python3 e2e_test/sink/kafka/register_schema.py 'http://schemaregistry:8082' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sr_register 'test-rw-sink-append-only-protobuf-csr-hi-value' PROTOBUF < src/connector/src/test_data/test-index-array.proto

statement ok
create table from_kafka_csr_nested with (
Expand Down
54 changes: 0 additions & 54 deletions e2e_test/sink/kafka/register_schema.py

This file was deleted.

4 changes: 2 additions & 2 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ system ok
rpk topic create 'avro_alter_source_test'

system ok
sr_register avro_alter_source_test-value '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}'
sr_register avro_alter_source_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}'

statement ok
create source s
Expand All @@ -29,7 +29,7 @@ FORMAT PLAIN ENCODE AVRO (

# create a new version of schema and produce a message
system ok
sr_register avro_alter_source_test-value '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'
sr_register avro_alter_source_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_source_test
Expand Down
6 changes: 2 additions & 4 deletions e2e_test/source_inline/kafka/avro/alter_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ rpk topic create 'avro_alter_table_test'

# create a schema and produce a message
system ok
echo '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \
| curl -s -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_table_test-value/versions"
sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"bar","type":"int","default":0},{"name":"foo","type":"string"}]}'

system ok
echo '{"foo":"ABC", "bar":1}' | rpk topic produce --schema-id=topic avro_alter_table_test
Expand All @@ -38,8 +37,7 @@ select * from t

# create a new version of schema that removed field bar
system ok
echo '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}' | jq '{"schema": tojson}' \
| curl -s -X POST -H 'content-type:application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/avro_alter_table_test-value/versions"
sr_register avro_alter_table_test-value AVRO <<< '{"type":"record","name":"Root","fields":[{"name":"foo","type":"string"}]}'

# Refresh table schema should fail
statement error
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/source_inline/kafka/avro/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rpk topic delete 'avro-union' || true; \
rpk topic create avro-union

system ok
sr_register avro-union-value '
sr_register avro-union-value AVRO <<EOF
{
"type": "record",
"name": "Root",
Expand Down Expand Up @@ -36,7 +36,7 @@ sr_register avro-union-value '
}
]
}
'
EOF

system ok
cat<<EOF | rpk topic produce avro-union --schema-id=topic
Expand Down Expand Up @@ -107,7 +107,7 @@ rpk topic delete 'avro-union-simple' || true; \
rpk topic create avro-union-simple

system ok
sr_register avro-union-simple-value '
sr_register avro-union-simple-value AVRO <<EOF
{
"type": "record",
"name": "Root",
Expand All @@ -118,7 +118,7 @@ sr_register avro-union-simple-value '
}
]
}
'
EOF

system ok
cat<<EOF | rpk topic produce avro-union-simple --schema-id=topic
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/source_inline/kafka/protobuf/recover.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ rpk topic create 'test-pb-struct'


system ok
jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions"
sr_register test-pb-struct-value PROTOBUF << EOF
syntax = "proto3";
package test;
message User {
Expand All @@ -31,7 +31,7 @@ format plain encode protobuf (

# register a v2 schema
system ok
jq -sR '{"schema":.,"schemaType":"PROTOBUF"}' << EOF | curl -X POST -H 'content-type: application/json' -d @- "${RISEDEV_SCHEMA_REGISTRY_URL}/subjects/test-pb-struct-value/versions"
sr_register test-pb-struct-value PROTOBUF << EOF
syntax = "proto3";
package test;
message User {
Expand Down

0 comments on commit 32666f2

Please sign in to comment.